Use the new dry-run mode in cmdlib
[ganeti-local] / lib / cmdlib.py
index 443d28a..7ba8630 100644 (file)
@@ -129,11 +129,11 @@ class LogicalUnit(object):
     self.proc = processor
     self.op = op
     self.cfg = context.cfg
+    self.glm = context.glm
     self.context = context
     self.rpc = rpc
     # Dicts used to declare locking needs to mcpu
     self.needed_locks = None
-    self.acquired_locks = {}
     self.share_locks = dict.fromkeys(locking.LEVELS, 0)
     self.add_locks = {}
     self.remove_locks = {}
@@ -385,7 +385,7 @@ class LogicalUnit(object):
     # future we might want to have different behaviors depending on the value
     # of self.recalculate_locks[locking.LEVEL_NODE]
     wanted_nodes = []
-    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
+    for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE):
       instance = self.context.cfg.GetInstanceInfo(instance_name)
       wanted_nodes.append(instance.primary_node)
       if not primary_only:
@@ -499,7 +499,7 @@ class _QueryBase:
 
     """
     if self.do_locking:
-      names = lu.acquired_locks[lock_level]
+      names = lu.glm.list_owned(lock_level)
     else:
       names = all_names
 
@@ -510,7 +510,7 @@ class _QueryBase:
 
     # caller specified names and we must keep the same order
     assert self.names
-    assert not self.do_locking or lu.acquired_locks[lock_level]
+    assert not self.do_locking or lu.glm.is_owned(lock_level)
 
     missing = set(self.wanted).difference(names)
     if missing:
@@ -656,25 +656,23 @@ def _ReleaseLocks(lu, level, names=None, keep=None):
     release = []
 
     # Determine which locks to release
-    for name in lu.acquired_locks[level]:
+    for name in lu.glm.list_owned(level):
       if should_release(name):
         release.append(name)
       else:
         retain.append(name)
 
-    assert len(lu.acquired_locks[level]) == (len(retain) + len(release))
+    assert len(lu.glm.list_owned(level)) == (len(retain) + len(release))
 
     # Release just some locks
-    lu.context.glm.release(level, names=release)
-    lu.acquired_locks[level] = retain
+    lu.glm.release(level, names=release)
 
-    assert frozenset(lu.context.glm.list_owned(level)) == frozenset(retain)
+    assert frozenset(lu.glm.list_owned(level)) == frozenset(retain)
   else:
     # Release everything
-    lu.context.glm.release(level)
-    del lu.acquired_locks[level]
+    lu.glm.release(level)
 
-    assert not lu.context.glm.list_owned(level), "No locks should be owned"
+    assert not lu.glm.is_owned(level), "No locks should be owned"
 
 
 def _RunPostHook(lu, node_name):
@@ -2651,10 +2649,7 @@ class LUClusterRepairDiskSizes(NoHooksLU):
 
   def ExpandNames(self):
     if self.op.instances:
-      self.wanted_names = []
-      for name in self.op.instances:
-        full_name = _ExpandInstanceName(self.cfg, name)
-        self.wanted_names.append(full_name)
+      self.wanted_names = _GetWantedInstances(self, self.op.instances)
       self.needed_locks = {
         locking.LEVEL_NODE: [],
         locking.LEVEL_INSTANCE: self.wanted_names,
@@ -2666,7 +2661,7 @@ class LUClusterRepairDiskSizes(NoHooksLU):
         locking.LEVEL_NODE: locking.ALL_SET,
         locking.LEVEL_INSTANCE: locking.ALL_SET,
         }
-    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
+    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
 
   def DeclareLocks(self, level):
     if level == locking.LEVEL_NODE and self.wanted_names is not None:
@@ -2679,7 +2674,7 @@ class LUClusterRepairDiskSizes(NoHooksLU):
 
     """
     if self.wanted_names is None:
-      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
+      self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE)
 
     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
                              in self.wanted_names]
@@ -2904,7 +2899,7 @@ class LUClusterSetParams(LogicalUnit):
                                    " drbd-based instances exist",
                                    errors.ECODE_INVAL)
 
-    node_list = self.acquired_locks[locking.LEVEL_NODE]
+    node_list = self.glm.list_owned(locking.LEVEL_NODE)
 
     # if vg_name not None, checks given volume group on all nodes
     if self.op.vg_name:
@@ -3431,6 +3426,20 @@ class LUOobCommand(NoHooksLU):
   REG_BGL = False
   _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
 
+  def ExpandNames(self):
+    """Gather locks we need.
+
+    """
+    if self.op.node_names:
+      self.op.node_names = _GetWantedNodes(self, self.op.node_names)
+      lock_names = self.op.node_names
+    else:
+      lock_names = locking.ALL_SET
+
+    self.needed_locks = {
+      locking.LEVEL_NODE: lock_names,
+      }
+
   def CheckPrereq(self):
     """Check prerequisites.
 
@@ -3487,21 +3496,6 @@ class LUOobCommand(NoHooksLU):
                                     " not marked offline") % node_name,
                                    errors.ECODE_STATE)
 
-  def ExpandNames(self):
-    """Gather locks we need.
-
-    """
-    if self.op.node_names:
-      self.op.node_names = [_ExpandNodeName(self.cfg, name)
-                            for name in self.op.node_names]
-      lock_names = self.op.node_names
-    else:
-      lock_names = locking.ALL_SET
-
-    self.needed_locks = {
-      locking.LEVEL_NODE: lock_names,
-      }
-
   def Exec(self, feedback_fn):
     """Execute OOB and return result if we expect any.
 
@@ -3509,7 +3503,8 @@ class LUOobCommand(NoHooksLU):
     master_node = self.master_node
     ret = []
 
-    for idx, node in enumerate(self.nodes):
+    for idx, node in enumerate(utils.NiceSort(self.nodes,
+                                              key=lambda node: node.name)):
       node_entry = [(constants.RS_NORMAL, node.name)]
       ret.append(node_entry)
 
@@ -3672,7 +3667,10 @@ class _OsQuery(_QueryBase):
 
     """
     # Locking is not used
-    assert not (lu.acquired_locks or self.do_locking or self.use_locking)
+    assert not (compat.any(lu.glm.is_owned(level)
+                           for level in locking.LEVELS
+                           if level != locking.LEVEL_CLUSTER) or
+                self.do_locking or self.use_locking)
 
     valid_nodes = [node.name
                    for node in lu.cfg.GetAllNodesInfo().values()
@@ -3978,7 +3976,7 @@ class LUNodeQueryvols(NoHooksLU):
     """Computes the list of nodes and their attributes.
 
     """
-    nodenames = self.acquired_locks[locking.LEVEL_NODE]
+    nodenames = self.glm.list_owned(locking.LEVEL_NODE)
     volumes = self.rpc.call_node_volumes(nodenames)
 
     ilist = [self.cfg.GetInstanceInfo(iname) for iname
@@ -4056,7 +4054,7 @@ class LUNodeQueryStorage(NoHooksLU):
     """Computes the list of nodes and their attributes.
 
     """
-    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
+    self.nodes = self.glm.list_owned(locking.LEVEL_NODE)
 
     # Always get name to sort by
     if constants.SF_NAME in self.op.output_fields:
@@ -4631,7 +4629,7 @@ class LUNodeSetParams(LogicalUnit):
         instances_keep = []
 
         # Build list of instances to release
-        for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
+        for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE):
           instance = self.context.cfg.GetInstanceInfo(instance_name)
           if (instance.disk_template in constants.DTS_INT_MIRROR and
               self.op.node_name in instance.all_nodes):
@@ -4640,7 +4638,7 @@ class LUNodeSetParams(LogicalUnit):
 
         _ReleaseLocks(self, locking.LEVEL_INSTANCE, keep=instances_keep)
 
-        assert (set(self.acquired_locks.get(locking.LEVEL_INSTANCE, [])) ==
+        assert (set(self.glm.list_owned(locking.LEVEL_INSTANCE)) ==
                 set(instances_keep))
 
   def BuildHooksEnv(self):
@@ -5843,8 +5841,8 @@ class LUInstanceRename(LogicalUnit):
     # Change the instance lock. This is definitely safe while we hold the BGL.
     # Otherwise the new lock would have to be added in acquired mode.
     assert self.REQ_BGL
-    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
-    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
+    self.glm.remove(locking.LEVEL_INSTANCE, old_name)
+    self.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
 
     # re-read the instance from the configuration after rename
     inst = self.cfg.GetInstanceInfo(self.op.new_name)
@@ -6013,8 +6011,6 @@ class LUInstanceFailover(LogicalUnit):
     shutdown_timeout = self.op.shutdown_timeout
     self._migrater = TLMigrateInstance(self, self.op.instance_name,
                                        cleanup=False,
-                                       iallocator=self.op.iallocator,
-                                       target_node=self.op.target_node,
                                        failover=True,
                                        ignore_consistency=ignore_consistency,
                                        shutdown_timeout=shutdown_timeout)
@@ -6041,7 +6037,7 @@ class LUInstanceFailover(LogicalUnit):
     """
     instance = self._migrater.instance
     source_node = instance.primary_node
-    target_node = self._migrater.target_node
+    target_node = self.op.target_node
     env = {
       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
       "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
@@ -6090,8 +6086,6 @@ class LUInstanceMigrate(LogicalUnit):
 
     self._migrater = TLMigrateInstance(self, self.op.instance_name,
                                        cleanup=self.op.cleanup,
-                                       iallocator=self.op.iallocator,
-                                       target_node=self.op.target_node,
                                        failover=False,
                                        fallback=self.op.allow_failover)
     self.tasklets = [self._migrater]
@@ -6117,7 +6111,7 @@ class LUInstanceMigrate(LogicalUnit):
     """
     instance = self._migrater.instance
     source_node = instance.primary_node
-    target_node = self._migrater.target_node
+    target_node = self.op.target_node
     env = _BuildInstanceHookEnvByObject(self, instance)
     env.update({
       "MIGRATE_LIVE": self._migrater.live,
@@ -6353,9 +6347,7 @@ class LUNodeMigrate(LogicalUnit):
       logging.debug("Migrating instance %s", inst.name)
       names.append(inst.name)
 
-      tasklets.append(TLMigrateInstance(self, inst.name, cleanup=False,
-                                        iallocator=self.op.iallocator,
-                                        taget_node=None))
+      tasklets.append(TLMigrateInstance(self, inst.name, cleanup=False))
 
       if inst.disk_template in constants.DTS_EXT_MIRROR:
         # We need to lock all nodes, as the iallocator will choose the
@@ -6420,8 +6412,8 @@ class TLMigrateInstance(Tasklet):
   @ivar shutdown_timeout: In case of failover timeout of the shutdown
 
   """
-  def __init__(self, lu, instance_name, cleanup=False, iallocator=None,
-               target_node=None, failover=False, fallback=False,
+  def __init__(self, lu, instance_name, cleanup=False,
+               failover=False, fallback=False,
                ignore_consistency=False,
                shutdown_timeout=constants.DEFAULT_SHUTDOWN_TIMEOUT):
     """Initializes this class.
@@ -6433,8 +6425,6 @@ class TLMigrateInstance(Tasklet):
     self.instance_name = instance_name
     self.cleanup = cleanup
     self.live = False # will be overridden later
-    self.iallocator = iallocator
-    self.target_node = target_node
     self.failover = failover
     self.fallback = fallback
     self.ignore_consistency = ignore_consistency
@@ -6469,16 +6459,21 @@ class TLMigrateInstance(Tasklet):
     if instance.disk_template in constants.DTS_EXT_MIRROR:
       _CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
 
-      if self.iallocator:
+      if self.lu.op.iallocator:
         self._RunAllocator()
+      else:
+        # We set set self.target_node as it is required by
+        # BuildHooksEnv
+        self.target_node = self.lu.op.target_node
 
       # self.target_node is already populated, either directly or by the
       # iallocator run
       target_node = self.target_node
 
       if len(self.lu.tasklets) == 1:
-        # It is safe to release locks only when we're the only tasklet in the LU
-        _ReleaseLocks(self, locking.LEVEL_NODE,
+        # It is safe to release locks only when we're the only tasklet
+        # in the LU
+        _ReleaseLocks(self.lu, locking.LEVEL_NODE,
                       keep=[instance.primary_node, self.target_node])
 
     else:
@@ -6488,8 +6483,8 @@ class TLMigrateInstance(Tasklet):
                                         " %s disk template" %
                                         instance.disk_template)
       target_node = secondary_nodes[0]
-      if self.iallocator or (self.target_node and
-                             self.target_node != target_node):
+      if self.lu.op.iallocator or (self.lu.op.target_node and
+                                   self.lu.op.target_node != target_node):
         if self.failover:
           text = "failed over"
         else:
@@ -6530,6 +6525,30 @@ class TLMigrateInstance(Tasklet):
 
     assert not (self.failover and self.cleanup)
 
+    if not self.failover:
+      if self.lu.op.live is not None and self.lu.op.mode is not None:
+        raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
+                                   " parameters are accepted",
+                                   errors.ECODE_INVAL)
+      if self.lu.op.live is not None:
+        if self.lu.op.live:
+          self.lu.op.mode = constants.HT_MIGRATION_LIVE
+        else:
+          self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
+        # reset the 'live' parameter to None so that repeated
+        # invocations of CheckPrereq do not raise an exception
+        self.lu.op.live = None
+      elif self.lu.op.mode is None:
+        # read the default value from the hypervisor
+        i_hv = self.cfg.GetClusterInfo().FillHV(self.instance,
+                                                skip_globals=False)
+        self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
+
+      self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
+    else:
+      # Failover is never live
+      self.live = False
+
   def _RunAllocator(self):
     """Run the allocator based on input opcode.
 
@@ -6542,47 +6561,23 @@ class TLMigrateInstance(Tasklet):
                                     self.instance.primary_node],
                      )
 
-    ial.Run(self.iallocator)
+    ial.Run(self.lu.op.iallocator)
 
     if not ial.success:
       raise errors.OpPrereqError("Can't compute nodes using"
                                  " iallocator '%s': %s" %
-                                 (self.iallocator, ial.info),
+                                 (self.lu.op.iallocator, ial.info),
                                  errors.ECODE_NORES)
     if len(ial.result) != ial.required_nodes:
       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
                                  " of nodes (%s), required %s" %
-                                 (self.iallocator, len(ial.result),
+                                 (self.lu.op.iallocator, len(ial.result),
                                   ial.required_nodes), errors.ECODE_FAULT)
     self.target_node = ial.result[0]
     self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
-                 self.instance_name, self.iallocator,
+                 self.instance_name, self.lu.op.iallocator,
                  utils.CommaJoin(ial.result))
 
-    if not self.failover:
-      if self.lu.op.live is not None and self.lu.op.mode is not None:
-        raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
-                                   " parameters are accepted",
-                                   errors.ECODE_INVAL)
-      if self.lu.op.live is not None:
-        if self.lu.op.live:
-          self.lu.op.mode = constants.HT_MIGRATION_LIVE
-        else:
-          self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
-        # reset the 'live' parameter to None so that repeated
-        # invocations of CheckPrereq do not raise an exception
-        self.lu.op.live = None
-      elif self.lu.op.mode is None:
-        # read the default value from the hypervisor
-        i_hv = self.cfg.GetClusterInfo().FillHV(self.instance,
-                                                skip_globals=False)
-        self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
-
-      self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
-    else:
-      # Failover is never live
-      self.live = False
-
   def _WaitUntilSync(self):
     """Poll with custom rpc for disk sync.
 
@@ -7772,7 +7767,7 @@ class LUInstanceCreate(LogicalUnit):
     src_path = self.op.src_path
 
     if src_node is None:
-      locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
+      locked_nodes = self.glm.list_owned(locking.LEVEL_NODE)
       exp_list = self.rpc.call_export_list(locked_nodes)
       found = False
       for node in exp_list:
@@ -8509,24 +8504,29 @@ class LUInstanceReplaceDisks(LogicalUnit):
   def ExpandNames(self):
     self._ExpandAndLockInstance()
 
-    if self.op.iallocator is not None:
-      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+    assert locking.LEVEL_NODE not in self.needed_locks
+    assert locking.LEVEL_NODEGROUP not in self.needed_locks
 
-    elif self.op.remote_node is not None:
-      remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
-      self.op.remote_node = remote_node
+    assert self.op.iallocator is None or self.op.remote_node is None, \
+      "Conflicting options"
+
+    if self.op.remote_node is not None:
+      self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
 
       # Warning: do not remove the locking of the new secondary here
       # unless DRBD8.AddChildren is changed to work in parallel;
       # currently it doesn't since parallel invocations of
       # FindUnusedMinor will conflict
-      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
+      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
-
     else:
       self.needed_locks[locking.LEVEL_NODE] = []
       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
 
+      if self.op.iallocator is not None:
+        # iallocator will select a new node in the same group
+        self.needed_locks[locking.LEVEL_NODEGROUP] = []
+
     self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
                                    self.op.iallocator, self.op.remote_node,
                                    self.op.disks, False, self.op.early_release)
@@ -8534,11 +8534,26 @@ class LUInstanceReplaceDisks(LogicalUnit):
     self.tasklets = [self.replacer]
 
   def DeclareLocks(self, level):
-    # If we're not already locking all nodes in the set we have to declare the
-    # instance's primary/secondary nodes.
-    if (level == locking.LEVEL_NODE and
-        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
-      self._LockInstancesNodes()
+    if level == locking.LEVEL_NODEGROUP:
+      assert self.op.remote_node is None
+      assert self.op.iallocator is not None
+      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
+
+      self.share_locks[locking.LEVEL_NODEGROUP] = 1
+      self.needed_locks[locking.LEVEL_NODEGROUP] = \
+        self.cfg.GetInstanceNodeGroups(self.op.instance_name)
+
+    elif level == locking.LEVEL_NODE:
+      if self.op.iallocator is not None:
+        assert self.op.remote_node is None
+        assert not self.needed_locks[locking.LEVEL_NODE]
+
+        # Lock member nodes of all locked groups
+        self.needed_locks[locking.LEVEL_NODE] = [node_name
+          for group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP)
+          for node_name in self.cfg.GetNodeGroup(group_uuid).members]
+      else:
+        self._LockInstancesNodes()
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -8568,6 +8583,26 @@ class LUInstanceReplaceDisks(LogicalUnit):
       nl.append(self.op.remote_node)
     return nl, nl
 
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    """
+    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
+            self.op.iallocator is None)
+
+    owned_groups = self.glm.list_owned(locking.LEVEL_NODEGROUP)
+    if owned_groups:
+      groups = self.cfg.GetInstanceNodeGroups(self.op.instance_name)
+      if owned_groups != groups:
+        raise errors.OpExecError("Node groups used by instance '%s' changed"
+                                 " since lock was acquired, current list is %r,"
+                                 " used to be '%s'" %
+                                 (self.op.instance_name,
+                                  utils.CommaJoin(groups),
+                                  utils.CommaJoin(owned_groups)))
+
+    return LogicalUnit.CheckPrereq(self)
+
 
 class TLReplaceDisks(Tasklet):
   """Replaces disks for an instance.
@@ -8720,12 +8755,15 @@ class TLReplaceDisks(Tasklet):
       remote_node = self._RunAllocator(self.lu, self.iallocator_name,
                                        instance.name, instance.secondary_nodes)
 
-    if remote_node is not None:
+    if remote_node is None:
+      self.remote_node_info = None
+    else:
+      assert remote_node in self.lu.glm.list_owned(locking.LEVEL_NODE), \
+             "Remote node '%s' is not locked" % remote_node
+
       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
       assert self.remote_node_info is not None, \
         "Cannot retrieve locked node %s" % remote_node
-    else:
-      self.remote_node_info = None
 
     if remote_node == self.instance.primary_node:
       raise errors.OpPrereqError("The specified node is the primary node of"
@@ -8809,12 +8847,17 @@ class TLReplaceDisks(Tasklet):
     for node in check_nodes:
       _CheckNodeOnline(self.lu, node)
 
-    touched_nodes = frozenset([self.new_node, self.other_node,
-                               self.target_node])
+    touched_nodes = frozenset(node_name for node_name in [self.new_node,
+                                                          self.other_node,
+                                                          self.target_node]
+                              if node_name is not None)
+
+    # Release unneeded node locks
+    _ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
 
-    if self.lu.needed_locks[locking.LEVEL_NODE] == locking.ALL_SET:
-      # Release unneeded node locks
-      _ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
+    # Release any owned node group
+    if self.lu.glm.is_owned(locking.LEVEL_NODEGROUP):
+      _ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
 
     # Check whether disks are valid
     for disk_idx in self.disks:
@@ -8823,8 +8866,7 @@ class TLReplaceDisks(Tasklet):
     # Get secondary node IP addresses
     self.node_secondary_ip = \
       dict((node_name, self.cfg.GetNodeInfo(node_name).secondary_ip)
-           for node_name in touched_nodes
-           if node_name is not None)
+           for node_name in touched_nodes)
 
   def Exec(self, feedback_fn):
     """Execute disk replacement.
@@ -8835,12 +8877,19 @@ class TLReplaceDisks(Tasklet):
     if self.delay_iallocator:
       self._CheckPrereq2()
 
-    if (self.lu.needed_locks[locking.LEVEL_NODE] == locking.ALL_SET and
-        __debug__):
+    if __debug__:
       # Verify owned locks before starting operation
-      owned_locks = self.lu.context.glm.list_owned(locking.LEVEL_NODE)
+      owned_locks = self.lu.glm.list_owned(locking.LEVEL_NODE)
       assert set(owned_locks) == set(self.node_secondary_ip), \
-          "Not owning the correct locks: %s" % (owned_locks, )
+          ("Incorrect node locks, owning %s, expected %s" %
+           (owned_locks, self.node_secondary_ip.keys()))
+
+      owned_locks = self.lu.glm.list_owned(locking.LEVEL_INSTANCE)
+      assert list(owned_locks) == [self.instance_name], \
+          "Instance '%s' not locked" % self.instance_name
+
+      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
+          "Should not own any node group lock at this point"
 
     if not self.disks:
       feedback_fn("No disks need replacement")
@@ -8871,7 +8920,7 @@ class TLReplaceDisks(Tasklet):
 
     if __debug__:
       # Verify owned locks
-      owned_locks = self.lu.context.glm.list_owned(locking.LEVEL_NODE)
+      owned_locks = self.lu.glm.list_owned(locking.LEVEL_NODE)
       nodes = frozenset(self.node_secondary_ip)
       assert ((self.early_release and not owned_locks) or
               (not self.early_release and not (set(owned_locks) - nodes))), \
@@ -9468,9 +9517,17 @@ class LUInstanceGrowDisk(LogicalUnit):
     if not disks_ok:
       raise errors.OpExecError("Cannot activate block device to grow")
 
+    # First run all grow ops in dry-run mode
+    for node in instance.all_nodes:
+      self.cfg.SetDiskID(disk, node)
+      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, True)
+      result.Raise("Grow request failed to node %s" % node)
+
+    # We know that (as far as we can test) operations across different
+    # nodes will succeed, time to run it for real
     for node in instance.all_nodes:
       self.cfg.SetDiskID(disk, node)
-      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
+      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, False)
       result.Raise("Grow request failed to node %s" % node)
 
       # TODO: Rewrite code to work properly
@@ -9540,7 +9597,7 @@ class LUInstanceQueryData(NoHooksLU):
     """
     if self.wanted_names is None:
       assert self.op.use_locking, "Locking was not used"
-      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
+      self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE)
 
     self.wanted_instances = [self.cfg.GetInstanceInfo(name)
                              for name in self.wanted_names]
@@ -10348,7 +10405,7 @@ class LUBackupQuery(NoHooksLU):
         that node.
 
     """
-    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
+    self.nodes = self.glm.list_owned(locking.LEVEL_NODE)
     rpcresult = self.rpc.call_export_list(self.nodes)
     result = {}
     for node in rpcresult:
@@ -10730,7 +10787,7 @@ class LUBackupRemove(NoHooksLU):
       fqdn_warn = True
       instance_name = self.op.instance_name
 
-    locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
+    locked_nodes = self.glm.list_owned(locking.LEVEL_NODE)
     exportlist = self.rpc.call_export_list(locked_nodes)
     found = False
     for node in exportlist: