cmdlib: Adapt the rpc calls
authorRené Nussbaumer <rn@google.com>
Wed, 4 Apr 2012 11:34:45 +0000 (13:34 +0200)
committerRené Nussbaumer <rn@google.com>
Thu, 10 May 2012 11:30:28 +0000 (13:30 +0200)
The following (blockdev) RPC calls are not converted yet (as they are
not straight forward or need more research):

* bdev_sizes
* blockdev_remove
* blockdev_shutdown
* blockdev_removechildren
* blockdev_close
* blockdev_getsize
* drbd_disconnect_net
* blockdev_rename (has already a special encoder, needs further research
  if needed at all)
* blockdev_getmirrorstatus (not sure if we have everywhere a clear link
  to the instance the disk belongs)
* blockdev_getmirrorstatus_multi (same here, further research)

Then special cases where we take care later in the patch series:

* blockdev_create (special cased)
* blockdev_find (special cased, like blockdev_create)

Signed-off-by: René Nussbaumer <rn@google.com>
Reviewed-by: Michael Hanselmann <hansmi@google.com>

lib/cmdlib.py
lib/masterd/instance.py

index 2be7902..6844949 100644 (file)
@@ -4500,7 +4500,7 @@ def _WaitForSync(lu, instance, disks=None, oneshot=False):
   return not cumul_degraded
 
 
-def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
+def _CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
   """Check that mirrors are not degraded.
 
   The ldisk parameter, if True, will change the test from the
@@ -4529,7 +4529,8 @@ def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
 
   if dev.children:
     for child in dev.children:
-      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
+      result = result and _CheckDiskConsistency(lu, instance, child, node,
+                                                on_primary)
 
   return result
 
@@ -6315,7 +6316,8 @@ def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
         node_disk = node_disk.Copy()
         node_disk.UnsetSize()
       lu.cfg.SetDiskID(node_disk, node)
-      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False, idx)
+      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
+                                             False, idx)
       msg = result.fail_msg
       if msg:
         lu.proc.LogWarning("Could not prepare block device %s on node %s"
@@ -6337,7 +6339,8 @@ def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
         node_disk = node_disk.Copy()
         node_disk.UnsetSize()
       lu.cfg.SetDiskID(node_disk, node)
-      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True, idx)
+      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
+                                             True, idx)
       msg = result.fail_msg
       if msg:
         lu.proc.LogWarning("Could not prepare block device %s on node %s"
@@ -7732,7 +7735,7 @@ class LUInstanceMove(LogicalUnit):
     # activate, get path, copy the data over
     for idx, disk in enumerate(instance.disks):
       self.LogInfo("Copying data for disk %d", idx)
-      result = self.rpc.call_blockdev_assemble(target_node, disk,
+      result = self.rpc.call_blockdev_assemble(target_node, (disk, instance),
                                                instance.name, True, idx)
       if result.fail_msg:
         self.LogWarning("Can't assemble newly created disk %d: %s",
@@ -7740,7 +7743,7 @@ class LUInstanceMove(LogicalUnit):
         errs.append(result.fail_msg)
         break
       dev_path = result.payload
-      result = self.rpc.call_blockdev_export(source_node, disk,
+      result = self.rpc.call_blockdev_export(source_node, (disk, instance),
                                              target_node, dev_path,
                                              cluster_name)
       if result.fail_msg:
@@ -8103,7 +8106,8 @@ class TLMigrateInstance(Tasklet):
       all_done = True
       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
                                             self.nodes_ip,
-                                            self.instance.disks)
+                                            (self.instance.disks,
+                                             self.instance))
       min_percent = 100
       for node, nres in result.items():
         nres.Raise("Cannot resync disks on node %s" % node)
@@ -8149,7 +8153,7 @@ class TLMigrateInstance(Tasklet):
       msg = "single-master"
     self.feedback_fn("* changing disks into %s mode" % msg)
     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
-                                           self.instance.disks,
+                                           (self.instance.disks, self.instance),
                                            self.instance.name, multimaster)
     for node, nres in result.items():
       nres.Raise("Cannot change disks config on node %s" % node)
@@ -8301,7 +8305,7 @@ class TLMigrateInstance(Tasklet):
 
     self.feedback_fn("* checking disk consistency between source and target")
     for (idx, dev) in enumerate(instance.disks):
-      if not _CheckDiskConsistency(self.lu, dev, target_node, False):
+      if not _CheckDiskConsistency(self.lu, instance, dev, target_node, False):
         raise errors.OpExecError("Disk %s is degraded or not fully"
                                  " synchronized on target node,"
                                  " aborting migration" % idx)
@@ -8464,7 +8468,8 @@ class TLMigrateInstance(Tasklet):
       self.feedback_fn("* checking disk consistency between source and target")
       for (idx, dev) in enumerate(instance.disks):
         # for drbd, these are drbd over lvm
-        if not _CheckDiskConsistency(self.lu, dev, target_node, False):
+        if not _CheckDiskConsistency(self.lu, instance, dev, target_node,
+                                     False):
           if primary_node.offline:
             self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
                              " target node %s" %
@@ -8810,7 +8815,9 @@ def _WipeDisks(lu, instance):
     lu.cfg.SetDiskID(device, node)
 
   logging.info("Pause sync of instance %s disks", instance.name)
-  result = lu.rpc.call_blockdev_pause_resume_sync(node, instance.disks, True)
+  result = lu.rpc.call_blockdev_pause_resume_sync(node,
+                                                  (instance.disks, instance),
+                                                  True)
 
   for idx, success in enumerate(result.payload):
     if not success:
@@ -8840,7 +8847,8 @@ def _WipeDisks(lu, instance):
         wipe_size = min(wipe_chunk_size, size - offset)
         logging.debug("Wiping disk %d, offset %s, chunk %s",
                       idx, offset, wipe_size)
-        result = lu.rpc.call_blockdev_wipe(node, device, offset, wipe_size)
+        result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
+                                           wipe_size)
         result.Raise("Could not wipe disk %d at offset %d for size %d" %
                      (idx, offset, wipe_size))
         now = time.time()
@@ -8853,7 +8861,9 @@ def _WipeDisks(lu, instance):
   finally:
     logging.info("Resume sync of instance %s disks", instance.name)
 
-    result = lu.rpc.call_blockdev_pause_resume_sync(node, instance.disks, False)
+    result = lu.rpc.call_blockdev_pause_resume_sync(node,
+                                                    (instance.disks, instance),
+                                                    False)
 
     for idx, success in enumerate(result.payload):
       if not success:
@@ -10072,7 +10082,8 @@ class LUInstanceCreate(LogicalUnit):
           if pause_sync:
             feedback_fn("* pausing disk sync to install instance OS")
             result = self.rpc.call_blockdev_pause_resume_sync(pnode_name,
-                                                              iobj.disks, True)
+                                                              (iobj.disks,
+                                                               iobj), True)
             for idx, success in enumerate(result.payload):
               if not success:
                 logging.warn("pause-sync of instance %s for disk %d failed",
@@ -10086,7 +10097,8 @@ class LUInstanceCreate(LogicalUnit):
           if pause_sync:
             feedback_fn("* resuming disk sync")
             result = self.rpc.call_blockdev_pause_resume_sync(pnode_name,
-                                                              iobj.disks, False)
+                                                              (iobj.disks,
+                                                               iobj), False)
             for idx, success in enumerate(result.payload):
               if not success:
                 logging.warn("resume-sync of instance %s for disk %d failed",
@@ -10766,8 +10778,8 @@ class TLReplaceDisks(Tasklet):
       self.lu.LogInfo("Checking disk/%d consistency on node %s" %
                       (idx, node_name))
 
-      if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
-                                   ldisk=ldisk):
+      if not _CheckDiskConsistency(self.lu, self.instance, dev, node_name,
+                                   on_primary, ldisk=ldisk):
         raise errors.OpExecError("Node %s has degraded storage, unsafe to"
                                  " replace disks for instance %s" %
                                  (node_name, self.instance.name))
@@ -10937,8 +10949,9 @@ class TLReplaceDisks(Tasklet):
 
       # Now that the new lvs have the old name, we can add them to the device
       self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
-      result = self.rpc.call_blockdev_addchildren(self.target_node, dev,
-                                                  new_lvs)
+      result = self.rpc.call_blockdev_addchildren(self.target_node,
+                                                  (dev, self.instance),
+                                                  (new_lvs, self.instance))
       msg = result.fail_msg
       if msg:
         for new_lv in new_lvs:
@@ -11109,7 +11122,7 @@ class TLReplaceDisks(Tasklet):
     result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
                                             self.new_node],
                                            self.node_secondary_ip,
-                                           self.instance.disks,
+                                           (self.instance.disks, self.instance),
                                            self.instance.name,
                                            False)
     for to_node, to_result in result.items():
@@ -11581,14 +11594,16 @@ class LUInstanceGrowDisk(LogicalUnit):
     # First run all grow ops in dry-run mode
     for node in instance.all_nodes:
       self.cfg.SetDiskID(disk, node)
-      result = self.rpc.call_blockdev_grow(node, disk, self.delta, True)
+      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
+                                           True)
       result.Raise("Grow request failed to node %s" % node)
 
     # We know that (as far as we can test) operations across different
     # nodes will succeed, time to run it for real
     for node in instance.all_nodes:
       self.cfg.SetDiskID(disk, node)
-      result = self.rpc.call_blockdev_grow(node, disk, self.delta, False)
+      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
+                                           False)
       result.Raise("Grow request failed to node %s" % node)
 
       # TODO: Rewrite code to work properly
@@ -11696,7 +11711,7 @@ class LUInstanceQueryData(NoHooksLU):
 
     self.wanted_instances = instances.values()
 
-  def _ComputeBlockdevStatus(self, node, instance_name, dev):
+  def _ComputeBlockdevStatus(self, node, instance, dev):
     """Returns the status of a block device
 
     """
@@ -11709,7 +11724,7 @@ class LUInstanceQueryData(NoHooksLU):
     if result.offline:
       return None
 
-    result.Raise("Can't compute disk status for %s" % instance_name)
+    result.Raise("Can't compute disk status for %s" % instance.name)
 
     status = result.payload
     if status is None:
@@ -11731,8 +11746,8 @@ class LUInstanceQueryData(NoHooksLU):
         snode = dev.logical_id[0]
 
     dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
-                                              instance.name, dev)
-    dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
+                                              instance, dev)
+    dev_sstatus = self._ComputeBlockdevStatus(snode, instance, dev)
 
     if dev.children:
       dev_children = map(compat.partial(self._ComputeDiskStatus,
index 32f6497..e1fc908 100644 (file)
@@ -1164,7 +1164,7 @@ class ExportInstanceHelper:
 
       # result.payload will be a snapshot of an lvm leaf of the one we
       # passed
-      result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
+      result = self._lu.rpc.call_blockdev_snapshot(src_node, (disk, instance))
       new_dev = False
       msg = result.fail_msg
       if msg: