TLMigrateInstance: do not migrate to self
[ganeti-local] / lib / cmdlib.py
index 2e0bd84..67e4522 100644 (file)
@@ -129,11 +129,11 @@ class LogicalUnit(object):
     self.proc = processor
     self.op = op
     self.cfg = context.cfg
+    self.glm = context.glm
     self.context = context
     self.rpc = rpc
     # Dicts used to declare locking needs to mcpu
     self.needed_locks = None
-    self.acquired_locks = {}
     self.share_locks = dict.fromkeys(locking.LEVELS, 0)
     self.add_locks = {}
     self.remove_locks = {}
@@ -385,7 +385,7 @@ class LogicalUnit(object):
     # future we might want to have different behaviors depending on the value
     # of self.recalculate_locks[locking.LEVEL_NODE]
     wanted_nodes = []
-    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
+    for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE):
       instance = self.context.cfg.GetInstanceInfo(instance_name)
       wanted_nodes.append(instance.primary_node)
       if not primary_only:
@@ -499,7 +499,7 @@ class _QueryBase:
 
     """
     if self.do_locking:
-      names = lu.acquired_locks[lock_level]
+      names = lu.glm.list_owned(lock_level)
     else:
       names = all_names
 
@@ -510,7 +510,7 @@ class _QueryBase:
 
     # caller specified names and we must keep the same order
     assert self.names
-    assert not self.do_locking or lu.acquired_locks[lock_level]
+    assert not self.do_locking or lu.glm.is_owned(lock_level)
 
     missing = set(self.wanted).difference(names)
     if missing:
@@ -656,25 +656,23 @@ def _ReleaseLocks(lu, level, names=None, keep=None):
     release = []
 
     # Determine which locks to release
-    for name in lu.acquired_locks[level]:
+    for name in lu.glm.list_owned(level):
       if should_release(name):
         release.append(name)
       else:
         retain.append(name)
 
-    assert len(lu.acquired_locks[level]) == (len(retain) + len(release))
+    assert len(lu.glm.list_owned(level)) == (len(retain) + len(release))
 
     # Release just some locks
-    lu.context.glm.release(level, names=release)
-    lu.acquired_locks[level] = retain
+    lu.glm.release(level, names=release)
 
-    assert frozenset(lu.context.glm.list_owned(level)) == frozenset(retain)
+    assert frozenset(lu.glm.list_owned(level)) == frozenset(retain)
   else:
     # Release everything
-    lu.context.glm.release(level)
-    del lu.acquired_locks[level]
+    lu.glm.release(level)
 
-    assert not lu.context.glm.list_owned(level), "No locks should be owned"
+    assert not lu.glm.is_owned(level), "No locks should be owned"
 
 
 def _RunPostHook(lu, node_name):
@@ -1533,7 +1531,7 @@ class LUClusterVerify(LogicalUnit):
              ntime_diff)
 
   def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
-    """Check the node time.
+    """Check the node LVM results.
 
     @type ninfo: L{objects.Node}
     @param ninfo: the node to check
@@ -1569,8 +1567,31 @@ class LUClusterVerify(LogicalUnit):
         _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
                  " '%s' of VG '%s'", pvname, owner_vg)
 
+  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
+    """Check the node bridges.
+
+    @type ninfo: L{objects.Node}
+    @param ninfo: the node to check
+    @param nresult: the remote results for the node
+    @param bridges: the expected list of bridges
+
+    """
+    if not bridges:
+      return
+
+    node = ninfo.name
+    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+
+    missing = nresult.get(constants.NV_BRIDGES, None)
+    test = not isinstance(missing, list)
+    _ErrorIf(test, self.ENODENET, node,
+             "did not return valid bridge information")
+    if not test:
+      _ErrorIf(bool(missing), self.ENODENET, node, "missing bridges: %s" %
+               utils.CommaJoin(sorted(missing)))
+
   def _VerifyNodeNetwork(self, ninfo, nresult):
-    """Check the node time.
+    """Check the node network connectivity results.
 
     @type ninfo: L{objects.Node}
     @param ninfo: the node to check
@@ -2241,12 +2262,11 @@ class LUClusterVerify(LogicalUnit):
     drbd_helper = self.cfg.GetDRBDHelper()
     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
     cluster = self.cfg.GetClusterInfo()
-    nodelist = utils.NiceSort(self.cfg.GetNodeList())
-    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
-    nodeinfo_byname = dict(zip(nodelist, nodeinfo))
-    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
-    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
-                        for iname in instancelist)
+    nodeinfo_byname = self.cfg.GetAllNodesInfo()
+    nodelist = utils.NiceSort(nodeinfo_byname.keys())
+    nodeinfo = [nodeinfo_byname[nname] for nname in nodelist]
+    instanceinfo = self.cfg.GetAllInstancesInfo()
+    instancelist = utils.NiceSort(instanceinfo.keys())
     groupinfo = self.cfg.GetAllNodeGroupsInfo()
     i_non_redundant = [] # Non redundant instances
     i_non_a_balanced = [] # Non auto-balanced instances
@@ -2314,6 +2334,21 @@ class LUClusterVerify(LogicalUnit):
     if drbd_helper:
       node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
 
+    # bridge checks
+    # FIXME: this needs to be changed per node-group, not cluster-wide
+    bridges = set()
+    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
+    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
+      bridges.add(default_nicpp[constants.NIC_LINK])
+    for instance in instanceinfo.values():
+      for nic in instance.nics:
+        full_nic = cluster.SimpleFillNIC(nic.nicparams)
+        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
+          bridges.add(full_nic[constants.NIC_LINK])
+
+    if bridges:
+      node_verify_param[constants.NV_BRIDGES] = list(bridges)
+
     # Build our expected cluster state
     node_image = dict((node.name, self.NodeImage(offline=node.offline,
                                                  name=node.name,
@@ -2424,6 +2459,7 @@ class LUClusterVerify(LogicalUnit):
           if refos_img is None:
             refos_img = nimg
           self._VerifyNodeOS(node_i, nimg, refos_img)
+        self._VerifyNodeBridges(node_i, nresult, bridges)
 
     feedback_fn("* Verifying instance status")
     for instance in instancelist:
@@ -2651,10 +2687,7 @@ class LUClusterRepairDiskSizes(NoHooksLU):
 
   def ExpandNames(self):
     if self.op.instances:
-      self.wanted_names = []
-      for name in self.op.instances:
-        full_name = _ExpandInstanceName(self.cfg, name)
-        self.wanted_names.append(full_name)
+      self.wanted_names = _GetWantedInstances(self, self.op.instances)
       self.needed_locks = {
         locking.LEVEL_NODE: [],
         locking.LEVEL_INSTANCE: self.wanted_names,
@@ -2666,7 +2699,7 @@ class LUClusterRepairDiskSizes(NoHooksLU):
         locking.LEVEL_NODE: locking.ALL_SET,
         locking.LEVEL_INSTANCE: locking.ALL_SET,
         }
-    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
+    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
 
   def DeclareLocks(self, level):
     if level == locking.LEVEL_NODE and self.wanted_names is not None:
@@ -2679,7 +2712,7 @@ class LUClusterRepairDiskSizes(NoHooksLU):
 
     """
     if self.wanted_names is None:
-      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
+      self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE)
 
     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
                              in self.wanted_names]
@@ -2904,7 +2937,7 @@ class LUClusterSetParams(LogicalUnit):
                                    " drbd-based instances exist",
                                    errors.ECODE_INVAL)
 
-    node_list = self.acquired_locks[locking.LEVEL_NODE]
+    node_list = self.glm.list_owned(locking.LEVEL_NODE)
 
     # if vg_name not None, checks given volume group on all nodes
     if self.op.vg_name:
@@ -2979,8 +3012,8 @@ class LUClusterSetParams(LogicalUnit):
           # if we're moving instances to routed, check that they have an ip
           target_mode = params_filled[constants.NIC_MODE]
           if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
-            nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
-                              (instance.name, nic_idx))
+            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
+                              " address" % (instance.name, nic_idx))
       if nic_errors:
         raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
                                    "\n".join(nic_errors))
@@ -3431,6 +3464,20 @@ class LUOobCommand(NoHooksLU):
   REG_BGL = False
   _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
 
+  def ExpandNames(self):
+    """Gather locks we need.
+
+    """
+    if self.op.node_names:
+      self.op.node_names = _GetWantedNodes(self, self.op.node_names)
+      lock_names = self.op.node_names
+    else:
+      lock_names = locking.ALL_SET
+
+    self.needed_locks = {
+      locking.LEVEL_NODE: lock_names,
+      }
+
   def CheckPrereq(self):
     """Check prerequisites.
 
@@ -3487,21 +3534,6 @@ class LUOobCommand(NoHooksLU):
                                     " not marked offline") % node_name,
                                    errors.ECODE_STATE)
 
-  def ExpandNames(self):
-    """Gather locks we need.
-
-    """
-    if self.op.node_names:
-      self.op.node_names = [_ExpandNodeName(self.cfg, name)
-                            for name in self.op.node_names]
-      lock_names = self.op.node_names
-    else:
-      lock_names = locking.ALL_SET
-
-    self.needed_locks = {
-      locking.LEVEL_NODE: lock_names,
-      }
-
   def Exec(self, feedback_fn):
     """Execute OOB and return result if we expect any.
 
@@ -3509,7 +3541,8 @@ class LUOobCommand(NoHooksLU):
     master_node = self.master_node
     ret = []
 
-    for idx, node in enumerate(self.nodes):
+    for idx, node in enumerate(utils.NiceSort(self.nodes,
+                                              key=lambda node: node.name)):
       node_entry = [(constants.RS_NORMAL, node.name)]
       ret.append(node_entry)
 
@@ -3672,7 +3705,10 @@ class _OsQuery(_QueryBase):
 
     """
     # Locking is not used
-    assert not (lu.acquired_locks or self.do_locking or self.use_locking)
+    assert not (compat.any(lu.glm.is_owned(level)
+                           for level in locking.LEVELS
+                           if level != locking.LEVEL_CLUSTER) or
+                self.do_locking or self.use_locking)
 
     valid_nodes = [node.name
                    for node in lu.cfg.GetAllNodesInfo().values()
@@ -3978,7 +4014,7 @@ class LUNodeQueryvols(NoHooksLU):
     """Computes the list of nodes and their attributes.
 
     """
-    nodenames = self.acquired_locks[locking.LEVEL_NODE]
+    nodenames = self.glm.list_owned(locking.LEVEL_NODE)
     volumes = self.rpc.call_node_volumes(nodenames)
 
     ilist = [self.cfg.GetInstanceInfo(iname) for iname
@@ -4056,7 +4092,7 @@ class LUNodeQueryStorage(NoHooksLU):
     """Computes the list of nodes and their attributes.
 
     """
-    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
+    self.nodes = self.glm.list_owned(locking.LEVEL_NODE)
 
     # Always get name to sort by
     if constants.SF_NAME in self.op.output_fields:
@@ -4631,7 +4667,7 @@ class LUNodeSetParams(LogicalUnit):
         instances_keep = []
 
         # Build list of instances to release
-        for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
+        for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE):
           instance = self.context.cfg.GetInstanceInfo(instance_name)
           if (instance.disk_template in constants.DTS_INT_MIRROR and
               self.op.node_name in instance.all_nodes):
@@ -4640,7 +4676,7 @@ class LUNodeSetParams(LogicalUnit):
 
         _ReleaseLocks(self, locking.LEVEL_INSTANCE, keep=instances_keep)
 
-        assert (set(self.acquired_locks.get(locking.LEVEL_INSTANCE, [])) ==
+        assert (set(self.glm.list_owned(locking.LEVEL_INSTANCE)) ==
                 set(instances_keep))
 
   def BuildHooksEnv(self):
@@ -5700,8 +5736,25 @@ class LUInstanceRecreateDisks(LogicalUnit):
   HTYPE = constants.HTYPE_INSTANCE
   REQ_BGL = False
 
+  def CheckArguments(self):
+    # normalise the disk list
+    self.op.disks = sorted(frozenset(self.op.disks))
+
   def ExpandNames(self):
     self._ExpandAndLockInstance()
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+    if self.op.nodes:
+      self.op.nodes = [_ExpandNodeName(self.cfg, n) for n in self.op.nodes]
+      self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
+    else:
+      self.needed_locks[locking.LEVEL_NODE] = []
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      # if we replace the nodes, we only need to lock the old primary,
+      # otherwise we need to lock all nodes for disk re-creation
+      primary_only = bool(self.op.nodes)
+      self._LockInstancesNodes(primary_only=primary_only)
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -5727,12 +5780,31 @@ class LUInstanceRecreateDisks(LogicalUnit):
     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
-    _CheckNodeOnline(self, instance.primary_node)
+    if self.op.nodes:
+      if len(self.op.nodes) != len(instance.all_nodes):
+        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
+                                   " %d replacement nodes were specified" %
+                                   (instance.name, len(instance.all_nodes),
+                                    len(self.op.nodes)),
+                                   errors.ECODE_INVAL)
+      assert instance.disk_template != constants.DT_DRBD8 or \
+          len(self.op.nodes) == 2
+      assert instance.disk_template != constants.DT_PLAIN or \
+          len(self.op.nodes) == 1
+      primary_node = self.op.nodes[0]
+    else:
+      primary_node = instance.primary_node
+    _CheckNodeOnline(self, primary_node)
 
     if instance.disk_template == constants.DT_DISKLESS:
       raise errors.OpPrereqError("Instance '%s' has no disks" %
                                  self.op.instance_name, errors.ECODE_INVAL)
-    _CheckInstanceDown(self, instance, "cannot recreate disks")
+    # if we replace nodes *and* the old primary is offline, we don't
+    # check
+    assert instance.primary_node in self.needed_locks[locking.LEVEL_NODE]
+    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
+    if not (self.op.nodes and old_pnode.offline):
+      _CheckInstanceDown(self, instance, "cannot recreate disks")
 
     if not self.op.disks:
       self.op.disks = range(len(instance.disks))
@@ -5741,18 +5813,39 @@ class LUInstanceRecreateDisks(LogicalUnit):
         if idx >= len(instance.disks):
           raise errors.OpPrereqError("Invalid disk index '%s'" % idx,
                                      errors.ECODE_INVAL)
-
+    if self.op.disks != range(len(instance.disks)) and self.op.nodes:
+      raise errors.OpPrereqError("Can't recreate disks partially and"
+                                 " change the nodes at the same time",
+                                 errors.ECODE_INVAL)
     self.instance = instance
 
   def Exec(self, feedback_fn):
     """Recreate the disks.
 
     """
+    # change primary node, if needed
+    if self.op.nodes:
+      self.instance.primary_node = self.op.nodes[0]
+      self.LogWarning("Changing the instance's nodes, you will have to"
+                      " remove any disks left on the older nodes manually")
+
     to_skip = []
-    for idx, _ in enumerate(self.instance.disks):
+    for idx, disk in enumerate(self.instance.disks):
       if idx not in self.op.disks: # disk idx has not been passed in
         to_skip.append(idx)
         continue
+      # update secondaries for disks, if needed
+      if self.op.nodes:
+        if disk.dev_type == constants.LD_DRBD8:
+          # need to update the nodes
+          assert len(self.op.nodes) == 2
+          logical_id = list(disk.logical_id)
+          logical_id[0] = self.op.nodes[0]
+          logical_id[1] = self.op.nodes[1]
+          disk.logical_id = tuple(logical_id)
+
+    if self.op.nodes:
+      self.cfg.Update(self.instance, feedback_fn)
 
     _CreateDisks(self, self.instance, to_skip=to_skip)
 
@@ -5807,8 +5900,9 @@ class LUInstanceRename(LogicalUnit):
     new_name = self.op.new_name
     if self.op.name_check:
       hostname = netutils.GetHostname(name=new_name)
-      self.LogInfo("Resolved given name '%s' to '%s'", new_name,
-                   hostname.name)
+      if hostname != new_name:
+        self.LogInfo("Resolved given name '%s' to '%s'", new_name,
+                     hostname.name)
       if not utils.MatchNameComponent(self.op.new_name, [hostname.name]):
         raise errors.OpPrereqError(("Resolved hostname '%s' does not look the"
                                     " same as given hostname '%s'") %
@@ -5843,8 +5937,8 @@ class LUInstanceRename(LogicalUnit):
     # Change the instance lock. This is definitely safe while we hold the BGL.
     # Otherwise the new lock would have to be added in acquired mode.
     assert self.REQ_BGL
-    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
-    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
+    self.glm.remove(locking.LEVEL_INSTANCE, old_name)
+    self.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
 
     # re-read the instance from the configuration after rename
     inst = self.cfg.GetInstanceInfo(self.op.new_name)
@@ -6013,8 +6107,6 @@ class LUInstanceFailover(LogicalUnit):
     shutdown_timeout = self.op.shutdown_timeout
     self._migrater = TLMigrateInstance(self, self.op.instance_name,
                                        cleanup=False,
-                                       iallocator=self.op.iallocator,
-                                       target_node=self.op.target_node,
                                        failover=True,
                                        ignore_consistency=ignore_consistency,
                                        shutdown_timeout=shutdown_timeout)
@@ -6041,7 +6133,7 @@ class LUInstanceFailover(LogicalUnit):
     """
     instance = self._migrater.instance
     source_node = instance.primary_node
-    target_node = self._migrater.target_node
+    target_node = self.op.target_node
     env = {
       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
       "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
@@ -6090,8 +6182,6 @@ class LUInstanceMigrate(LogicalUnit):
 
     self._migrater = TLMigrateInstance(self, self.op.instance_name,
                                        cleanup=self.op.cleanup,
-                                       iallocator=self.op.iallocator,
-                                       target_node=self.op.target_node,
                                        failover=False,
                                        fallback=self.op.allow_failover)
     self.tasklets = [self._migrater]
@@ -6117,7 +6207,7 @@ class LUInstanceMigrate(LogicalUnit):
     """
     instance = self._migrater.instance
     source_node = instance.primary_node
-    target_node = self._migrater.target_node
+    target_node = self.op.target_node
     env = _BuildInstanceHookEnvByObject(self, instance)
     env.update({
       "MIGRATE_LIVE": self._migrater.live,
@@ -6353,9 +6443,7 @@ class LUNodeMigrate(LogicalUnit):
       logging.debug("Migrating instance %s", inst.name)
       names.append(inst.name)
 
-      tasklets.append(TLMigrateInstance(self, inst.name, cleanup=False,
-                                        iallocator=self.op.iallocator,
-                                        taget_node=None))
+      tasklets.append(TLMigrateInstance(self, inst.name, cleanup=False))
 
       if inst.disk_template in constants.DTS_EXT_MIRROR:
         # We need to lock all nodes, as the iallocator will choose the
@@ -6420,8 +6508,8 @@ class TLMigrateInstance(Tasklet):
   @ivar shutdown_timeout: In case of failover timeout of the shutdown
 
   """
-  def __init__(self, lu, instance_name, cleanup=False, iallocator=None,
-               target_node=None, failover=False, fallback=False,
+  def __init__(self, lu, instance_name, cleanup=False,
+               failover=False, fallback=False,
                ignore_consistency=False,
                shutdown_timeout=constants.DEFAULT_SHUTDOWN_TIMEOUT):
     """Initializes this class.
@@ -6433,8 +6521,6 @@ class TLMigrateInstance(Tasklet):
     self.instance_name = instance_name
     self.cleanup = cleanup
     self.live = False # will be overridden later
-    self.iallocator = iallocator
-    self.target_node = target_node
     self.failover = failover
     self.fallback = fallback
     self.ignore_consistency = ignore_consistency
@@ -6469,16 +6555,25 @@ class TLMigrateInstance(Tasklet):
     if instance.disk_template in constants.DTS_EXT_MIRROR:
       _CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
 
-      if self.iallocator:
+      if self.lu.op.iallocator:
         self._RunAllocator()
+      else:
+        # We set set self.target_node as it is required by
+        # BuildHooksEnv
+        self.target_node = self.lu.op.target_node
 
       # self.target_node is already populated, either directly or by the
       # iallocator run
       target_node = self.target_node
+      if self.target_node == instance.primary_node:
+        raise errors.OpPrereqError("Cannot migrate instance %s"
+                                   " to its primary (%s)" %
+                                   (instance.name, instance.primary_node))
 
       if len(self.lu.tasklets) == 1:
-        # It is safe to release locks only when we're the only tasklet in the LU
-        _ReleaseLocks(self, locking.LEVEL_NODE,
+        # It is safe to release locks only when we're the only tasklet
+        # in the LU
+        _ReleaseLocks(self.lu, locking.LEVEL_NODE,
                       keep=[instance.primary_node, self.target_node])
 
     else:
@@ -6488,8 +6583,8 @@ class TLMigrateInstance(Tasklet):
                                         " %s disk template" %
                                         instance.disk_template)
       target_node = secondary_nodes[0]
-      if self.iallocator or (self.target_node and
-                             self.target_node != target_node):
+      if self.lu.op.iallocator or (self.lu.op.target_node and
+                                   self.lu.op.target_node != target_node):
         if self.failover:
           text = "failed over"
         else:
@@ -6566,21 +6661,21 @@ class TLMigrateInstance(Tasklet):
                                     self.instance.primary_node],
                      )
 
-    ial.Run(self.iallocator)
+    ial.Run(self.lu.op.iallocator)
 
     if not ial.success:
       raise errors.OpPrereqError("Can't compute nodes using"
                                  " iallocator '%s': %s" %
-                                 (self.iallocator, ial.info),
+                                 (self.lu.op.iallocator, ial.info),
                                  errors.ECODE_NORES)
     if len(ial.result) != ial.required_nodes:
       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
                                  " of nodes (%s), required %s" %
-                                 (self.iallocator, len(ial.result),
+                                 (self.lu.op.iallocator, len(ial.result),
                                   ial.required_nodes), errors.ECODE_FAULT)
     self.target_node = ial.result[0]
     self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
-                 self.instance_name, self.iallocator,
+                 self.instance_name, self.lu.op.iallocator,
                  utils.CommaJoin(ial.result))
 
   def _WaitUntilSync(self):
@@ -7772,7 +7867,7 @@ class LUInstanceCreate(LogicalUnit):
     src_path = self.op.src_path
 
     if src_node is None:
-      locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
+      locked_nodes = self.glm.list_owned(locking.LEVEL_NODE)
       exp_list = self.rpc.call_export_list(locked_nodes)
       found = False
       for node in exp_list:
@@ -8555,7 +8650,7 @@ class LUInstanceReplaceDisks(LogicalUnit):
 
         # Lock member nodes of all locked groups
         self.needed_locks[locking.LEVEL_NODE] = [node_name
-          for group_uuid in self.acquired_locks[locking.LEVEL_NODEGROUP]
+          for group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP)
           for node_name in self.cfg.GetNodeGroup(group_uuid).members]
       else:
         self._LockInstancesNodes()
@@ -8592,19 +8687,19 @@ class LUInstanceReplaceDisks(LogicalUnit):
     """Check prerequisites.
 
     """
-    assert (locking.LEVEL_NODEGROUP in self.acquired_locks or
+    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
             self.op.iallocator is None)
 
-    if locking.LEVEL_NODEGROUP in self.acquired_locks:
+    owned_groups = self.glm.list_owned(locking.LEVEL_NODEGROUP)
+    if owned_groups:
       groups = self.cfg.GetInstanceNodeGroups(self.op.instance_name)
-      prevgroups = self.acquired_locks[locking.LEVEL_NODEGROUP]
-      if prevgroups != groups:
+      if owned_groups != groups:
         raise errors.OpExecError("Node groups used by instance '%s' changed"
                                  " since lock was acquired, current list is %r,"
                                  " used to be '%s'" %
                                  (self.op.instance_name,
                                   utils.CommaJoin(groups),
-                                  utils.CommaJoin(prevgroups)))
+                                  utils.CommaJoin(owned_groups)))
 
     return LogicalUnit.CheckPrereq(self)
 
@@ -8763,7 +8858,7 @@ class TLReplaceDisks(Tasklet):
     if remote_node is None:
       self.remote_node_info = None
     else:
-      assert remote_node in self.lu.acquired_locks[locking.LEVEL_NODE], \
+      assert remote_node in self.lu.glm.list_owned(locking.LEVEL_NODE), \
              "Remote node '%s' is not locked" % remote_node
 
       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
@@ -8861,7 +8956,7 @@ class TLReplaceDisks(Tasklet):
     _ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
 
     # Release any owned node group
-    if self.lu.context.glm.is_owned(locking.LEVEL_NODEGROUP):
+    if self.lu.glm.is_owned(locking.LEVEL_NODEGROUP):
       _ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
 
     # Check whether disks are valid
@@ -8884,16 +8979,16 @@ class TLReplaceDisks(Tasklet):
 
     if __debug__:
       # Verify owned locks before starting operation
-      owned_locks = self.lu.context.glm.list_owned(locking.LEVEL_NODE)
+      owned_locks = self.lu.glm.list_owned(locking.LEVEL_NODE)
       assert set(owned_locks) == set(self.node_secondary_ip), \
           ("Incorrect node locks, owning %s, expected %s" %
            (owned_locks, self.node_secondary_ip.keys()))
 
-      owned_locks = self.lu.context.glm.list_owned(locking.LEVEL_INSTANCE)
+      owned_locks = self.lu.glm.list_owned(locking.LEVEL_INSTANCE)
       assert list(owned_locks) == [self.instance_name], \
           "Instance '%s' not locked" % self.instance_name
 
-      assert not self.lu.context.glm.is_owned(locking.LEVEL_NODEGROUP), \
+      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
           "Should not own any node group lock at this point"
 
     if not self.disks:
@@ -8925,7 +9020,7 @@ class TLReplaceDisks(Tasklet):
 
     if __debug__:
       # Verify owned locks
-      owned_locks = self.lu.context.glm.list_owned(locking.LEVEL_NODE)
+      owned_locks = self.lu.glm.list_owned(locking.LEVEL_NODE)
       nodes = frozenset(self.node_secondary_ip)
       assert ((self.early_release and not owned_locks) or
               (not self.early_release and not (set(owned_locks) - nodes))), \
@@ -9522,9 +9617,17 @@ class LUInstanceGrowDisk(LogicalUnit):
     if not disks_ok:
       raise errors.OpExecError("Cannot activate block device to grow")
 
+    # First run all grow ops in dry-run mode
+    for node in instance.all_nodes:
+      self.cfg.SetDiskID(disk, node)
+      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, True)
+      result.Raise("Grow request failed to node %s" % node)
+
+    # We know that (as far as we can test) operations across different
+    # nodes will succeed, time to run it for real
     for node in instance.all_nodes:
       self.cfg.SetDiskID(disk, node)
-      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
+      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, False)
       result.Raise("Grow request failed to node %s" % node)
 
       # TODO: Rewrite code to work properly
@@ -9594,7 +9697,7 @@ class LUInstanceQueryData(NoHooksLU):
     """
     if self.wanted_names is None:
       assert self.op.use_locking, "Locking was not used"
-      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
+      self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE)
 
     self.wanted_instances = [self.cfg.GetInstanceInfo(name)
                              for name in self.wanted_names]
@@ -10193,7 +10296,8 @@ class LUInstanceSetParams(LogicalUnit):
     self.cfg.Update(instance, feedback_fn)
 
     # disks are created, waiting for sync
-    disk_abort = not _WaitForSync(self, instance)
+    disk_abort = not _WaitForSync(self, instance,
+                                  oneshot=not self.op.wait_for_sync)
     if disk_abort:
       raise errors.OpExecError("There are some degraded disks for"
                                " this instance, please cleanup manually")
@@ -10402,7 +10506,7 @@ class LUBackupQuery(NoHooksLU):
         that node.
 
     """
-    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
+    self.nodes = self.glm.list_owned(locking.LEVEL_NODE)
     rpcresult = self.rpc.call_export_list(self.nodes)
     result = {}
     for node in rpcresult:
@@ -10784,7 +10888,7 @@ class LUBackupRemove(NoHooksLU):
       fqdn_warn = True
       instance_name = self.op.instance_name
 
-    locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
+    locked_nodes = self.glm.list_owned(locking.LEVEL_NODE)
     exportlist = self.rpc.call_export_list(locked_nodes)
     found = False
     for node in exportlist:
@@ -10883,20 +10987,40 @@ class LUGroupAssignNodes(NoHooksLU):
 
     # We want to lock all the affected nodes and groups. We have readily
     # available the list of nodes, and the *destination* group. To gather the
-    # list of "source" groups, we need to fetch node information.
-    self.node_data = self.cfg.GetAllNodesInfo()
-    affected_groups = set(self.node_data[node].group for node in self.op.nodes)
-    affected_groups.add(self.group_uuid)
-
+    # list of "source" groups, we need to fetch node information later on.
     self.needed_locks = {
-      locking.LEVEL_NODEGROUP: list(affected_groups),
+      locking.LEVEL_NODEGROUP: set([self.group_uuid]),
       locking.LEVEL_NODE: self.op.nodes,
       }
 
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODEGROUP:
+      assert len(self.needed_locks[locking.LEVEL_NODEGROUP]) == 1
+
+      # Try to get all affected nodes' groups without having the group or node
+      # lock yet. Needs verification later in the code flow.
+      groups = self.cfg.GetNodeGroupsFromNodes(self.op.nodes)
+
+      self.needed_locks[locking.LEVEL_NODEGROUP].update(groups)
+
   def CheckPrereq(self):
     """Check prerequisites.
 
     """
+    assert self.needed_locks[locking.LEVEL_NODEGROUP]
+    assert (frozenset(self.glm.list_owned(locking.LEVEL_NODE)) ==
+            frozenset(self.op.nodes))
+
+    expected_locks = (set([self.group_uuid]) |
+                      self.cfg.GetNodeGroupsFromNodes(self.op.nodes))
+    actual_locks = self.glm.list_owned(locking.LEVEL_NODEGROUP)
+    if actual_locks != expected_locks:
+      raise errors.OpExecError("Nodes changed groups since locks were acquired,"
+                               " current groups are '%s', used to be '%s'" %
+                               (utils.CommaJoin(expected_locks),
+                                utils.CommaJoin(actual_locks)))
+
+    self.node_data = self.cfg.GetAllNodesInfo()
     self.group = self.cfg.GetNodeGroup(self.group_uuid)
     instance_data = self.cfg.GetAllInstancesInfo()
 
@@ -10932,6 +11056,9 @@ class LUGroupAssignNodes(NoHooksLU):
     for node in self.op.nodes:
       self.node_data[node].group = self.group_uuid
 
+    # FIXME: Depends on side-effects of modifying the result of
+    # C{cfg.GetAllNodesInfo}
+
     self.cfg.Update(self.group, feedback_fn) # Saves all modified nodes.
 
   @staticmethod
@@ -11648,16 +11775,6 @@ class IAllocator(object):
   """
   # pylint: disable-msg=R0902
   # lots of instance attributes
-  _ALLO_KEYS = [
-    "name", "mem_size", "disks", "disk_template",
-    "os", "tags", "nics", "vcpus", "hypervisor",
-    ]
-  _RELO_KEYS = [
-    "name", "relocate_from",
-    ]
-  _EVAC_KEYS = [
-    "evac_nodes",
-    ]
 
   def __init__(self, cfg, rpc, mode, **kwargs):
     self.cfg = cfg
@@ -11676,18 +11793,13 @@ class IAllocator(object):
     self.required_nodes = None
     # init result fields
     self.success = self.info = self.result = None
-    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
-      keyset = self._ALLO_KEYS
-      fn = self._AddNewInstance
-    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
-      keyset = self._RELO_KEYS
-      fn = self._AddRelocateInstance
-    elif self.mode == constants.IALLOCATOR_MODE_MEVAC:
-      keyset = self._EVAC_KEYS
-      fn = self._AddEvacuateNodes
-    else:
+
+    try:
+      (fn, keyset) = self._MODE_DATA[self.mode]
+    except KeyError:
       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
                                    " IAllocator" % self.mode)
+
     for key in kwargs:
       if key not in keyset:
         raise errors.ProgrammerError("Invalid input parameter '%s' to"
@@ -11698,7 +11810,7 @@ class IAllocator(object):
       if key not in kwargs:
         raise errors.ProgrammerError("Missing input parameter '%s' to"
                                      " IAllocator" % key)
-    self._BuildInputData(fn)
+    self._BuildInputData(compat.partial(fn, self))
 
   def _ComputeClusterData(self):
     """Compute the generic allocator input data.
@@ -11973,6 +12085,17 @@ class IAllocator(object):
 
     self.in_text = serializer.Dump(self.in_data)
 
+  _MODE_DATA = {
+    constants.IALLOCATOR_MODE_ALLOC:
+      (_AddNewInstance,
+       ["name", "mem_size", "disks", "disk_template", "os", "tags", "nics",
+        "vcpus", "hypervisor"]),
+    constants.IALLOCATOR_MODE_RELOC:
+      (_AddRelocateInstance, ["name", "relocate_from"]),
+    constants.IALLOCATOR_MODE_MEVAC:
+      (_AddEvacuateNodes, ["evac_nodes"]),
+    }
+
   def Run(self, name, validate=True, call_fn=None):
     """Run an instance allocator and return the results.