Use the new dry-run mode in cmdlib
[ganeti-local] / lib / cmdlib.py
index 08b6bf0..7ba8630 100644 (file)
@@ -129,17 +129,16 @@ class LogicalUnit(object):
     self.proc = processor
     self.op = op
     self.cfg = context.cfg
+    self.glm = context.glm
     self.context = context
     self.rpc = rpc
     # Dicts used to declare locking needs to mcpu
     self.needed_locks = None
-    self.acquired_locks = {}
     self.share_locks = dict.fromkeys(locking.LEVELS, 0)
     self.add_locks = {}
     self.remove_locks = {}
     # Used to force good behavior when calling helper functions
     self.recalculate_locks = {}
-    self.__ssh = None
     # logging
     self.Log = processor.Log # pylint: disable-msg=C0103
     self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
@@ -160,16 +159,6 @@ class LogicalUnit(object):
 
     self.CheckArguments()
 
-  def __GetSSH(self):
-    """Returns the SshRunner object
-
-    """
-    if not self.__ssh:
-      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
-    return self.__ssh
-
-  ssh = property(fget=__GetSSH)
-
   def CheckArguments(self):
     """Check syntactic validity for the opcode arguments.
 
@@ -396,7 +385,7 @@ class LogicalUnit(object):
     # future we might want to have different behaviors depending on the value
     # of self.recalculate_locks[locking.LEVEL_NODE]
     wanted_nodes = []
-    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
+    for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE):
       instance = self.context.cfg.GetInstanceInfo(instance_name)
       wanted_nodes.append(instance.primary_node)
       if not primary_only:
@@ -510,7 +499,7 @@ class _QueryBase:
 
     """
     if self.do_locking:
-      names = lu.acquired_locks[lock_level]
+      names = lu.glm.list_owned(lock_level)
     else:
       names = all_names
 
@@ -521,7 +510,7 @@ class _QueryBase:
 
     # caller specified names and we must keep the same order
     assert self.names
-    assert not self.do_locking or lu.acquired_locks[lock_level]
+    assert not self.do_locking or lu.glm.is_owned(lock_level)
 
     missing = set(self.wanted).difference(names)
     if missing:
@@ -641,6 +630,51 @@ def _GetUpdatedParams(old_params, update_dict,
   return params_copy
 
 
+def _ReleaseLocks(lu, level, names=None, keep=None):
+  """Releases locks owned by an LU.
+
+  @type lu: L{LogicalUnit}
+  @param level: Lock level
+  @type names: list or None
+  @param names: Names of locks to release
+  @type keep: list or None
+  @param keep: Names of locks to retain
+
+  """
+  assert not (keep is not None and names is not None), \
+         "Only one of the 'names' and the 'keep' parameters can be given"
+
+  if names is not None:
+    should_release = names.__contains__
+  elif keep:
+    should_release = lambda name: name not in keep
+  else:
+    should_release = None
+
+  if should_release:
+    retain = []
+    release = []
+
+    # Determine which locks to release
+    for name in lu.glm.list_owned(level):
+      if should_release(name):
+        release.append(name)
+      else:
+        retain.append(name)
+
+    assert len(lu.glm.list_owned(level)) == (len(retain) + len(release))
+
+    # Release just some locks
+    lu.glm.release(level, names=release)
+
+    assert frozenset(lu.glm.list_owned(level)) == frozenset(retain)
+  else:
+    # Release everything
+    lu.glm.release(level)
+
+    assert not lu.glm.is_owned(level), "No locks should be owned"
+
+
 def _RunPostHook(lu, node_name):
   """Runs the post-hook for an opcode on a single node.
 
@@ -1458,7 +1492,7 @@ class LUClusterVerify(LogicalUnit):
                  hv_name, item, hv_result)
 
     test = nresult.get(constants.NV_NODESETUP,
-                           ["Missing NODESETUP results"])
+                       ["Missing NODESETUP results"])
     _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
              "; ".join(test))
 
@@ -1902,6 +1936,7 @@ class LUClusterVerify(LogicalUnit):
 
     assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
 
+    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
     for os_name, os_data in nimg.oslist.items():
       assert os_data, "Empty OS status for OS %s?!" % os_name
       f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
@@ -1929,11 +1964,12 @@ class LUClusterVerify(LogicalUnit):
         continue
       for kind, a, b in [("API version", f_api, b_api),
                          ("variants list", f_var, b_var),
-                         ("parameters", f_param, b_param)]:
+                         ("parameters", beautify_params(f_param),
+                          beautify_params(b_param))]:
         _ErrorIf(a != b, self.ENODEOS, node,
-                 "OS %s %s differs from reference node %s: %s vs. %s",
+                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
                  kind, os_name, base.name,
-                 utils.CommaJoin(a), utils.CommaJoin(b))
+                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
 
     # check any missing OSes
     missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
@@ -2613,10 +2649,7 @@ class LUClusterRepairDiskSizes(NoHooksLU):
 
   def ExpandNames(self):
     if self.op.instances:
-      self.wanted_names = []
-      for name in self.op.instances:
-        full_name = _ExpandInstanceName(self.cfg, name)
-        self.wanted_names.append(full_name)
+      self.wanted_names = _GetWantedInstances(self, self.op.instances)
       self.needed_locks = {
         locking.LEVEL_NODE: [],
         locking.LEVEL_INSTANCE: self.wanted_names,
@@ -2628,7 +2661,7 @@ class LUClusterRepairDiskSizes(NoHooksLU):
         locking.LEVEL_NODE: locking.ALL_SET,
         locking.LEVEL_INSTANCE: locking.ALL_SET,
         }
-    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
+    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
 
   def DeclareLocks(self, level):
     if level == locking.LEVEL_NODE and self.wanted_names is not None:
@@ -2641,7 +2674,7 @@ class LUClusterRepairDiskSizes(NoHooksLU):
 
     """
     if self.wanted_names is None:
-      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
+      self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE)
 
     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
                              in self.wanted_names]
@@ -2866,7 +2899,7 @@ class LUClusterSetParams(LogicalUnit):
                                    " drbd-based instances exist",
                                    errors.ECODE_INVAL)
 
-    node_list = self.acquired_locks[locking.LEVEL_NODE]
+    node_list = self.glm.list_owned(locking.LEVEL_NODE)
 
     # if vg_name not None, checks given volume group on all nodes
     if self.op.vg_name:
@@ -3393,6 +3426,20 @@ class LUOobCommand(NoHooksLU):
   REG_BGL = False
   _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
 
+  def ExpandNames(self):
+    """Gather locks we need.
+
+    """
+    if self.op.node_names:
+      self.op.node_names = _GetWantedNodes(self, self.op.node_names)
+      lock_names = self.op.node_names
+    else:
+      lock_names = locking.ALL_SET
+
+    self.needed_locks = {
+      locking.LEVEL_NODE: lock_names,
+      }
+
   def CheckPrereq(self):
     """Check prerequisites.
 
@@ -3409,23 +3456,23 @@ class LUOobCommand(NoHooksLU):
     assert self.op.power_delay >= 0.0
 
     if self.op.node_names:
-      if self.op.command in self._SKIP_MASTER:
-        if self.master_node in self.op.node_names:
-          master_node_obj = self.cfg.GetNodeInfo(self.master_node)
-          master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
-
-          if master_oob_handler:
-            additional_text = ("Run '%s %s %s' if you want to operate on the"
-                               " master regardless") % (master_oob_handler,
-                                                        self.op.command,
-                                                        self.master_node)
-          else:
-            additional_text = "The master node does not support out-of-band"
+      if (self.op.command in self._SKIP_MASTER and
+          self.master_node in self.op.node_names):
+        master_node_obj = self.cfg.GetNodeInfo(self.master_node)
+        master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
+
+        if master_oob_handler:
+          additional_text = ("run '%s %s %s' if you want to operate on the"
+                             " master regardless") % (master_oob_handler,
+                                                      self.op.command,
+                                                      self.master_node)
+        else:
+          additional_text = "it does not support out-of-band operations"
 
-          raise errors.OpPrereqError(("Operating on the master node %s is not"
-                                      " allowed for %s\n%s") %
-                                     (self.master_node, self.op.command,
-                                      additional_text), errors.ECODE_INVAL)
+        raise errors.OpPrereqError(("Operating on the master node %s is not"
+                                    " allowed for %s; %s") %
+                                   (self.master_node, self.op.command,
+                                    additional_text), errors.ECODE_INVAL)
     else:
       self.op.node_names = self.cfg.GetNodeList()
       if self.op.command in self._SKIP_MASTER:
@@ -3449,21 +3496,6 @@ class LUOobCommand(NoHooksLU):
                                     " not marked offline") % node_name,
                                    errors.ECODE_STATE)
 
-  def ExpandNames(self):
-    """Gather locks we need.
-
-    """
-    if self.op.node_names:
-      self.op.node_names = [_ExpandNodeName(self.cfg, name)
-                            for name in self.op.node_names]
-      lock_names = self.op.node_names
-    else:
-      lock_names = locking.ALL_SET
-
-    self.needed_locks = {
-      locking.LEVEL_NODE: lock_names,
-      }
-
   def Exec(self, feedback_fn):
     """Execute OOB and return result if we expect any.
 
@@ -3471,7 +3503,8 @@ class LUOobCommand(NoHooksLU):
     master_node = self.master_node
     ret = []
 
-    for idx, node in enumerate(self.nodes):
+    for idx, node in enumerate(utils.NiceSort(self.nodes,
+                                              key=lambda node: node.name)):
       node_entry = [(constants.RS_NORMAL, node.name)]
       ret.append(node_entry)
 
@@ -3488,14 +3521,14 @@ class LUOobCommand(NoHooksLU):
                                      self.op.timeout)
 
       if result.fail_msg:
-        self.LogWarning("On node '%s' out-of-band RPC failed with: %s",
+        self.LogWarning("Out-of-band RPC failed on node '%s': %s",
                         node.name, result.fail_msg)
         node_entry.append((constants.RS_NODATA, None))
       else:
         try:
           self._CheckPayload(result)
         except errors.OpExecError, err:
-          self.LogWarning("The payload returned by '%s' is not valid: %s",
+          self.LogWarning("Payload returned by node '%s' is not valid: %s",
                           node.name, err)
           node_entry.append((constants.RS_NODATA, None))
         else:
@@ -3504,8 +3537,8 @@ class LUOobCommand(NoHooksLU):
             for item, status in result.payload:
               if status in [constants.OOB_STATUS_WARNING,
                             constants.OOB_STATUS_CRITICAL]:
-                self.LogWarning("On node '%s' item '%s' has status '%s'",
-                                node.name, item, status)
+                self.LogWarning("Item '%s' on node '%s' has status '%s'",
+                                item, node.name, status)
 
           if self.op.command == constants.OOB_POWER_ON:
             node.powered = True
@@ -3634,7 +3667,10 @@ class _OsQuery(_QueryBase):
 
     """
     # Locking is not used
-    assert not (lu.acquired_locks or self.do_locking or self.use_locking)
+    assert not (compat.any(lu.glm.is_owned(level)
+                           for level in locking.LEVELS
+                           if level != locking.LEVEL_CLUSTER) or
+                self.do_locking or self.use_locking)
 
     valid_nodes = [node.name
                    for node in lu.cfg.GetAllNodesInfo().values()
@@ -3775,15 +3811,14 @@ class LUNodeRemove(LogicalUnit):
 
     masternode = self.cfg.GetMasterNode()
     if node.name == masternode:
-      raise errors.OpPrereqError("Node is the master node,"
-                                 " you need to failover first.",
-                                 errors.ECODE_INVAL)
+      raise errors.OpPrereqError("Node is the master node, failover to another"
+                                 " node is required", errors.ECODE_INVAL)
 
     for instance_name in instance_list:
       instance = self.cfg.GetInstanceInfo(instance_name)
       if node.name in instance.all_nodes:
         raise errors.OpPrereqError("Instance %s is still running on the node,"
-                                   " please remove first." % instance_name,
+                                   " please remove first" % instance_name,
                                    errors.ECODE_INVAL)
     self.op.node_name = node.name
     self.node = node
@@ -3941,7 +3976,7 @@ class LUNodeQueryvols(NoHooksLU):
     """Computes the list of nodes and their attributes.
 
     """
-    nodenames = self.acquired_locks[locking.LEVEL_NODE]
+    nodenames = self.glm.list_owned(locking.LEVEL_NODE)
     volumes = self.rpc.call_node_volumes(nodenames)
 
     ilist = [self.cfg.GetInstanceInfo(iname) for iname
@@ -4019,7 +4054,7 @@ class LUNodeQueryStorage(NoHooksLU):
     """Computes the list of nodes and their attributes.
 
     """
-    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
+    self.nodes = self.glm.list_owned(locking.LEVEL_NODE)
 
     # Always get name to sort by
     if constants.SF_NAME in self.op.output_fields:
@@ -4266,6 +4301,11 @@ class LUNodeAdd(LogicalUnit):
     self.hostname = netutils.GetHostname(name=self.op.node_name,
                                          family=self.primary_ip_family)
     self.op.node_name = self.hostname.name
+
+    if self.op.readd and self.op.node_name == self.cfg.GetMasterNode():
+      raise errors.OpPrereqError("Cannot readd the master node",
+                                 errors.ECODE_STATE)
+
     if self.op.readd and self.op.group:
       raise errors.OpPrereqError("Cannot pass a node group when a node is"
                                  " being readded", errors.ECODE_INVAL)
@@ -4501,7 +4541,7 @@ class LUNodeAdd(LogicalUnit):
           feedback_fn("ssh/hostname verification failed"
                       " (checking from %s): %s" %
                       (verifier, nl_payload[failed]))
-        raise errors.OpExecError("ssh/hostname verification failed.")
+        raise errors.OpExecError("ssh/hostname verification failed")
 
     if self.op.readd:
       _RedistributeAncillaryFiles(self)
@@ -4584,21 +4624,22 @@ class LUNodeSetParams(LogicalUnit):
     # If we have locked all instances, before waiting to lock nodes, release
     # all the ones living on nodes unrelated to the current operation.
     if level == locking.LEVEL_NODE and self.lock_instances:
-      instances_release = []
-      instances_keep = []
       self.affected_instances = []
       if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
-        for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
+        instances_keep = []
+
+        # Build list of instances to release
+        for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE):
           instance = self.context.cfg.GetInstanceInfo(instance_name)
-          i_mirrored = instance.disk_template in constants.DTS_INT_MIRROR
-          if i_mirrored and self.op.node_name in instance.all_nodes:
+          if (instance.disk_template in constants.DTS_INT_MIRROR and
+              self.op.node_name in instance.all_nodes):
             instances_keep.append(instance_name)
             self.affected_instances.append(instance)
-          else:
-            instances_release.append(instance_name)
-        if instances_release:
-          self.context.glm.release(locking.LEVEL_INSTANCE, instances_release)
-          self.acquired_locks[locking.LEVEL_INSTANCE] = instances_keep
+
+        _ReleaseLocks(self, locking.LEVEL_INSTANCE, keep=instances_keep)
+
+        assert (set(self.glm.list_owned(locking.LEVEL_INSTANCE)) ==
+                set(instances_keep))
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -4664,7 +4705,7 @@ class LUNodeSetParams(LogicalUnit):
 
     self.old_flags = old_flags = (node.master_candidate,
                                   node.drained, node.offline)
-    assert old_flags in self._F2R, "Un-handled old flags  %s" % str(old_flags)
+    assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
     self.old_role = old_role = self._F2R[old_flags]
 
     # Check for ineffective changes
@@ -4680,12 +4721,12 @@ class LUNodeSetParams(LogicalUnit):
     if _SupportsOob(self.cfg, node):
       if self.op.offline is False and not (node.powered or
                                            self.op.powered == True):
-        raise errors.OpPrereqError(("Please power on node %s first before you"
-                                    " can reset offline state") %
+        raise errors.OpPrereqError(("Node %s needs to be turned on before its"
+                                    " offline status can be reset") %
                                    self.op.node_name)
     elif self.op.powered is not None:
       raise errors.OpPrereqError(("Unable to change powered state for node %s"
-                                  " which does not support out-of-band"
+                                  " as it does not support out-of-band"
                                   " handling") % self.op.node_name)
 
     # If we're being deofflined/drained, we'll MC ourself if needed
@@ -5696,7 +5737,7 @@ class LUInstanceRecreateDisks(LogicalUnit):
     else:
       for idx in self.op.disks:
         if idx >= len(instance.disks):
-          raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx,
+          raise errors.OpPrereqError("Invalid disk index '%s'" % idx,
                                      errors.ECODE_INVAL)
 
     self.instance = instance
@@ -5727,7 +5768,7 @@ class LUInstanceRename(LogicalUnit):
     """
     if self.op.ip_check and not self.op.name_check:
       # TODO: make the ip check more flexible and not depend on the name check
-      raise errors.OpPrereqError("Cannot do ip check without a name check",
+      raise errors.OpPrereqError("IP address check requires a name check",
                                  errors.ECODE_INVAL)
 
   def BuildHooksEnv(self):
@@ -5800,8 +5841,8 @@ class LUInstanceRename(LogicalUnit):
     # Change the instance lock. This is definitely safe while we hold the BGL.
     # Otherwise the new lock would have to be added in acquired mode.
     assert self.REQ_BGL
-    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
-    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
+    self.glm.remove(locking.LEVEL_INSTANCE, old_name)
+    self.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
 
     # re-read the instance from the configuration after rename
     inst = self.cfg.GetInstanceInfo(self.op.new_name)
@@ -5970,8 +6011,6 @@ class LUInstanceFailover(LogicalUnit):
     shutdown_timeout = self.op.shutdown_timeout
     self._migrater = TLMigrateInstance(self, self.op.instance_name,
                                        cleanup=False,
-                                       iallocator=self.op.iallocator,
-                                       target_node=self.op.target_node,
                                        failover=True,
                                        ignore_consistency=ignore_consistency,
                                        shutdown_timeout=shutdown_timeout)
@@ -5998,7 +6037,7 @@ class LUInstanceFailover(LogicalUnit):
     """
     instance = self._migrater.instance
     source_node = instance.primary_node
-    target_node = self._migrater.target_node
+    target_node = self.op.target_node
     env = {
       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
       "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
@@ -6047,8 +6086,6 @@ class LUInstanceMigrate(LogicalUnit):
 
     self._migrater = TLMigrateInstance(self, self.op.instance_name,
                                        cleanup=self.op.cleanup,
-                                       iallocator=self.op.iallocator,
-                                       target_node=self.op.target_node,
                                        failover=False,
                                        fallback=self.op.allow_failover)
     self.tasklets = [self._migrater]
@@ -6074,7 +6111,7 @@ class LUInstanceMigrate(LogicalUnit):
     """
     instance = self._migrater.instance
     source_node = instance.primary_node
-    target_node = self._migrater.target_node
+    target_node = self.op.target_node
     env = _BuildInstanceHookEnvByObject(self, instance)
     env.update({
       "MIGRATE_LIVE": self._migrater.live,
@@ -6310,9 +6347,7 @@ class LUNodeMigrate(LogicalUnit):
       logging.debug("Migrating instance %s", inst.name)
       names.append(inst.name)
 
-      tasklets.append(TLMigrateInstance(self, inst.name, cleanup=False,
-                                        iallocator=self.op.iallocator,
-                                        taget_node=None))
+      tasklets.append(TLMigrateInstance(self, inst.name, cleanup=False))
 
       if inst.disk_template in constants.DTS_EXT_MIRROR:
         # We need to lock all nodes, as the iallocator will choose the
@@ -6377,8 +6412,8 @@ class TLMigrateInstance(Tasklet):
   @ivar shutdown_timeout: In case of failover timeout of the shutdown
 
   """
-  def __init__(self, lu, instance_name, cleanup=False, iallocator=None,
-               target_node=None, failover=False, fallback=False,
+  def __init__(self, lu, instance_name, cleanup=False,
+               failover=False, fallback=False,
                ignore_consistency=False,
                shutdown_timeout=constants.DEFAULT_SHUTDOWN_TIMEOUT):
     """Initializes this class.
@@ -6390,8 +6425,6 @@ class TLMigrateInstance(Tasklet):
     self.instance_name = instance_name
     self.cleanup = cleanup
     self.live = False # will be overridden later
-    self.iallocator = iallocator
-    self.target_node = target_node
     self.failover = failover
     self.fallback = fallback
     self.ignore_consistency = ignore_consistency
@@ -6426,20 +6459,22 @@ class TLMigrateInstance(Tasklet):
     if instance.disk_template in constants.DTS_EXT_MIRROR:
       _CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
 
-      if self.iallocator:
+      if self.lu.op.iallocator:
         self._RunAllocator()
+      else:
+        # We set set self.target_node as it is required by
+        # BuildHooksEnv
+        self.target_node = self.lu.op.target_node
 
       # self.target_node is already populated, either directly or by the
       # iallocator run
       target_node = self.target_node
 
       if len(self.lu.tasklets) == 1:
-        # It is safe to remove locks only when we're the only tasklet in the LU
-        nodes_keep = [instance.primary_node, self.target_node]
-        nodes_rel = [node for node in self.lu.acquired_locks[locking.LEVEL_NODE]
-                     if node not in nodes_keep]
-        self.lu.context.glm.release(locking.LEVEL_NODE, nodes_rel)
-        self.lu.acquired_locks[locking.LEVEL_NODE] = nodes_keep
+        # It is safe to release locks only when we're the only tasklet
+        # in the LU
+        _ReleaseLocks(self.lu, locking.LEVEL_NODE,
+                      keep=[instance.primary_node, self.target_node])
 
     else:
       secondary_nodes = instance.secondary_nodes
@@ -6448,17 +6483,17 @@ class TLMigrateInstance(Tasklet):
                                         " %s disk template" %
                                         instance.disk_template)
       target_node = secondary_nodes[0]
-      if self.iallocator or (self.target_node and
-                             self.target_node != target_node):
+      if self.lu.op.iallocator or (self.lu.op.target_node and
+                                   self.lu.op.target_node != target_node):
         if self.failover:
           text = "failed over"
         else:
           text = "migrated"
         raise errors.OpPrereqError("Instances with disk template %s cannot"
-                                   " be %s over to arbitrary nodes"
+                                   " be %s to arbitrary nodes"
                                    " (neither an iallocator nor a target"
                                    " node can be passed)" %
-                                   (text, instance.disk_template),
+                                   (instance.disk_template, text),
                                    errors.ECODE_INVAL)
 
     i_be = self.cfg.GetClusterInfo().FillBE(instance)
@@ -6490,6 +6525,30 @@ class TLMigrateInstance(Tasklet):
 
     assert not (self.failover and self.cleanup)
 
+    if not self.failover:
+      if self.lu.op.live is not None and self.lu.op.mode is not None:
+        raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
+                                   " parameters are accepted",
+                                   errors.ECODE_INVAL)
+      if self.lu.op.live is not None:
+        if self.lu.op.live:
+          self.lu.op.mode = constants.HT_MIGRATION_LIVE
+        else:
+          self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
+        # reset the 'live' parameter to None so that repeated
+        # invocations of CheckPrereq do not raise an exception
+        self.lu.op.live = None
+      elif self.lu.op.mode is None:
+        # read the default value from the hypervisor
+        i_hv = self.cfg.GetClusterInfo().FillHV(self.instance,
+                                                skip_globals=False)
+        self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
+
+      self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
+    else:
+      # Failover is never live
+      self.live = False
+
   def _RunAllocator(self):
     """Run the allocator based on input opcode.
 
@@ -6502,47 +6561,23 @@ class TLMigrateInstance(Tasklet):
                                     self.instance.primary_node],
                      )
 
-    ial.Run(self.iallocator)
+    ial.Run(self.lu.op.iallocator)
 
     if not ial.success:
       raise errors.OpPrereqError("Can't compute nodes using"
                                  " iallocator '%s': %s" %
-                                 (self.iallocator, ial.info),
+                                 (self.lu.op.iallocator, ial.info),
                                  errors.ECODE_NORES)
     if len(ial.result) != ial.required_nodes:
       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
                                  " of nodes (%s), required %s" %
-                                 (self.iallocator, len(ial.result),
+                                 (self.lu.op.iallocator, len(ial.result),
                                   ial.required_nodes), errors.ECODE_FAULT)
     self.target_node = ial.result[0]
     self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
-                 self.instance_name, self.iallocator,
+                 self.instance_name, self.lu.op.iallocator,
                  utils.CommaJoin(ial.result))
 
-    if not self.failover:
-      if self.lu.op.live is not None and self.lu.op.mode is not None:
-        raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
-                                   " parameters are accepted",
-                                   errors.ECODE_INVAL)
-      if self.lu.op.live is not None:
-        if self.lu.op.live:
-          self.lu.op.mode = constants.HT_MIGRATION_LIVE
-        else:
-          self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
-        # reset the 'live' parameter to None so that repeated
-        # invocations of CheckPrereq do not raise an exception
-        self.lu.op.live = None
-      elif self.lu.op.mode is None:
-        # read the default value from the hypervisor
-        i_hv = self.cfg.GetClusterInfo().FillHV(self.instance,
-                                                skip_globals=False)
-        self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
-
-      self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
-    else:
-      # Failover is never live
-      self.live = False
-
   def _WaitUntilSync(self):
     """Poll with custom rpc for disk sync.
 
@@ -6636,15 +6671,15 @@ class TLMigrateInstance(Tasklet):
 
     if runningon_source and runningon_target:
       raise errors.OpExecError("Instance seems to be running on two nodes,"
-                               " or the hypervisor is confused. You will have"
+                               " or the hypervisor is confused; you will have"
                                " to ensure manually that it runs only on one"
-                               " and restart this operation.")
+                               " and restart this operation")
 
     if not (runningon_source or runningon_target):
-      raise errors.OpExecError("Instance does not seem to be running at all."
-                               " In this case, it's safer to repair by"
+      raise errors.OpExecError("Instance does not seem to be running at all;"
+                               " in this case it's safer to repair by"
                                " running 'gnt-instance stop' to ensure disk"
-                               " shutdown, and then restarting it.")
+                               " shutdown, and then restarting it")
 
     if runningon_target:
       # the migration has actually succeeded, we need to update the config
@@ -6686,10 +6721,9 @@ class TLMigrateInstance(Tasklet):
       self._GoReconnect(False)
       self._WaitUntilSync()
     except errors.OpExecError, err:
-      self.lu.LogWarning("Migration failed and I can't reconnect the"
-                         " drives: error '%s'\n"
-                         "Please look and recover the instance status" %
-                         str(err))
+      self.lu.LogWarning("Migration failed and I can't reconnect the drives,"
+                         " please try to recover the instance manually;"
+                         " error '%s'" % str(err))
 
   def _AbortMigration(self):
     """Call the hypervisor code to abort a started migration.
@@ -6731,7 +6765,7 @@ class TLMigrateInstance(Tasklet):
       if not _CheckDiskConsistency(self.lu, dev, target_node, False):
         raise errors.OpExecError("Disk %s is degraded or not fully"
                                  " synchronized on target node,"
-                                 " aborting migrate." % dev.iv_name)
+                                 " aborting migration" % dev.iv_name)
 
     # First get the migration information from the remote node
     result = self.rpc.call_migration_info(source_node, instance)
@@ -6825,7 +6859,7 @@ class TLMigrateInstance(Tasklet):
         if not _CheckDiskConsistency(self, dev, target_node, False):
           if not self.ignore_consistency:
             raise errors.OpExecError("Disk %s is degraded on target node,"
-                                     " aborting failover." % dev.iv_name)
+                                     " aborting failover" % dev.iv_name)
     else:
       self.feedback_fn("* not checking disk consistency as instance is not"
                        " running")
@@ -6839,9 +6873,9 @@ class TLMigrateInstance(Tasklet):
     msg = result.fail_msg
     if msg:
       if self.ignore_consistency or primary_node.offline:
-        self.lu.LogWarning("Could not shutdown instance %s on node %s."
-                           " Proceeding anyway. Please make sure node"
-                           " %s is down. Error details: %s",
+        self.lu.LogWarning("Could not shutdown instance %s on node %s,"
+                           " proceeding anyway; please make sure node"
+                           " %s is down; error details: %s",
                            instance.name, source_node, source_node, msg)
       else:
         raise errors.OpExecError("Could not shutdown instance %s on"
@@ -6992,17 +7026,18 @@ def _GenerateUniqueNames(lu, exts):
   return results
 
 
-def _GenerateDRBD8Branch(lu, primary, secondary, size, vgname, names, iv_name,
-                         p_minor, s_minor):
+def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
+                         iv_name, p_minor, s_minor):
   """Generate a drbd8 device complete with its children.
 
   """
+  assert len(vgnames) == len(names) == 2
   port = lu.cfg.AllocatePort()
   shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
-                          logical_id=(vgname, names[0]))
+                          logical_id=(vgnames[0], names[0]))
   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
-                          logical_id=(vgname, names[1]))
+                          logical_id=(vgnames[1], names[1]))
   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
                           logical_id=(primary, secondary, port,
                                       p_minor, s_minor,
@@ -7057,9 +7092,11 @@ def _GenerateDiskTemplate(lu, template_name,
       names.append(lv_prefix + "_meta")
     for idx, disk in enumerate(disk_info):
       disk_index = idx + base_index
-      vg = disk.get(constants.IDISK_VG, vgname)
+      data_vg = disk.get(constants.IDISK_VG, vgname)
+      meta_vg = disk.get(constants.IDISK_METAVG, data_vg)
       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
-                                      disk[constants.IDISK_SIZE], vg,
+                                      disk[constants.IDISK_SIZE],
+                                      [data_vg, meta_vg],
                                       names[idx * 2:idx * 2 + 2],
                                       "disk/%d" % disk_index,
                                       minors[idx * 2], minors[idx * 2 + 1])
@@ -7161,14 +7198,17 @@ def _WipeDisks(lu, instance):
 
   try:
     for idx, device in enumerate(instance.disks):
-      lu.LogInfo("* Wiping disk %d", idx)
-      logging.info("Wiping disk %d for instance %s, node %s",
-                   idx, instance.name, node)
-
       # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
       # MAX_WIPE_CHUNK at max
       wipe_chunk_size = min(constants.MAX_WIPE_CHUNK, device.size / 100.0 *
                             constants.MIN_WIPE_CHUNK_PERCENT)
+      # we _must_ make this an int, otherwise rounding errors will
+      # occur
+      wipe_chunk_size = int(wipe_chunk_size)
+
+      lu.LogInfo("* Wiping disk %d", idx)
+      logging.info("Wiping disk %d for instance %s, node %s using"
+                   " chunk size %s", idx, instance.name, node, wipe_chunk_size)
 
       offset = 0
       size = device.size
@@ -7177,6 +7217,8 @@ def _WipeDisks(lu, instance):
 
       while offset < size:
         wipe_size = min(wipe_chunk_size, size - offset)
+        logging.debug("Wiping disk %d, offset %s, chunk %s",
+                      idx, offset, wipe_size)
         result = lu.rpc.call_blockdev_wipe(node, device, offset, wipe_size)
         result.Raise("Could not wipe disk %d at offset %d for size %d" %
                      (idx, offset, wipe_size))
@@ -7194,8 +7236,8 @@ def _WipeDisks(lu, instance):
 
     for idx, success in enumerate(result.payload):
       if not success:
-        lu.LogWarning("Warning: Resume sync of disk %d failed. Please have a"
-                      " look at the status and troubleshoot the issue.", idx)
+        lu.LogWarning("Resume sync of disk %d failed, please have a"
+                      " look at the status and troubleshoot the issue", idx)
         logging.warn("resume-sync of instance %s for disks %d failed",
                      instance.name, idx)
 
@@ -7444,8 +7486,8 @@ class LUInstanceCreate(LogicalUnit):
 
     if self.op.ip_check and not self.op.name_check:
       # TODO: make the ip check more flexible and not depend on the name check
-      raise errors.OpPrereqError("Cannot do ip check without a name check",
-                                 errors.ECODE_INVAL)
+      raise errors.OpPrereqError("Cannot do IP address check without a name"
+                                 " check", errors.ECODE_INVAL)
 
     # check nics' parameter names
     for nic in self.op.nics:
@@ -7623,7 +7665,7 @@ class LUInstanceCreate(LogicalUnit):
         self.op.src_node = None
         if os.path.isabs(src_path):
           raise errors.OpPrereqError("Importing an instance from an absolute"
-                                     " path requires a source node option.",
+                                     " path requires a source node option",
                                      errors.ECODE_INVAL)
       else:
         self.op.src_node = src_node = _ExpandNodeName(self.cfg, src_node)
@@ -7725,7 +7767,7 @@ class LUInstanceCreate(LogicalUnit):
     src_path = self.op.src_path
 
     if src_node is None:
-      locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
+      locked_nodes = self.glm.list_owned(locking.LEVEL_NODE)
       exp_list = self.rpc.call_export_list(locked_nodes)
       found = False
       for node in exp_list:
@@ -7975,10 +8017,13 @@ class LUInstanceCreate(LogicalUnit):
       except (TypeError, ValueError):
         raise errors.OpPrereqError("Invalid disk size '%s'" % size,
                                    errors.ECODE_INVAL)
+
+      data_vg = disk.get(constants.IDISK_VG, default_vg)
       new_disk = {
         constants.IDISK_SIZE: size,
         constants.IDISK_MODE: mode,
-        constants.IDISK_VG: disk.get(constants.IDISK_VG, default_vg),
+        constants.IDISK_VG: data_vg,
+        constants.IDISK_METAVG: disk.get(constants.IDISK_METAVG, data_vg),
         }
       if constants.IDISK_ADOPT in disk:
         new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
@@ -8069,7 +8114,7 @@ class LUInstanceCreate(LogicalUnit):
     if self.op.disk_template in constants.DTS_INT_MIRROR:
       if self.op.snode == pnode.name:
         raise errors.OpPrereqError("The secondary node cannot be the"
-                                   " primary node.", errors.ECODE_INVAL)
+                                   " primary node", errors.ECODE_INVAL)
       _CheckNodeOnline(self, self.op.snode)
       _CheckNodeNotDrained(self, self.op.snode)
       _CheckNodeVmCapable(self, self.op.snode)
@@ -8247,18 +8292,6 @@ class LUInstanceCreate(LogicalUnit):
           self.cfg.ReleaseDRBDMinors(instance)
           raise
 
-      if self.cfg.GetClusterInfo().prealloc_wipe_disks:
-        feedback_fn("* wiping instance disks...")
-        try:
-          _WipeDisks(self, iobj)
-        except errors.OpExecError:
-          self.LogWarning("Device wiping failed, reverting...")
-          try:
-            _RemoveDisks(self, iobj)
-          finally:
-            self.cfg.ReleaseDRBDMinors(instance)
-            raise
-
     feedback_fn("adding instance %s to cluster config" % instance)
 
     self.cfg.AddInstance(iobj, self.proc.GetECId())
@@ -8266,18 +8299,28 @@ class LUInstanceCreate(LogicalUnit):
     # Declare that we don't want to remove the instance lock anymore, as we've
     # added the instance to the config
     del self.remove_locks[locking.LEVEL_INSTANCE]
-    # Unlock all the nodes
+
     if self.op.mode == constants.INSTANCE_IMPORT:
-      nodes_keep = [self.op.src_node]
-      nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
-                       if node != self.op.src_node]
-      self.context.glm.release(locking.LEVEL_NODE, nodes_release)
-      self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
+      # Release unused nodes
+      _ReleaseLocks(self, locking.LEVEL_NODE, keep=[self.op.src_node])
     else:
-      self.context.glm.release(locking.LEVEL_NODE)
-      del self.acquired_locks[locking.LEVEL_NODE]
+      # Release all nodes
+      _ReleaseLocks(self, locking.LEVEL_NODE)
 
-    if self.op.wait_for_sync:
+    disk_abort = False
+    if not self.adopt_disks and self.cfg.GetClusterInfo().prealloc_wipe_disks:
+      feedback_fn("* wiping instance disks...")
+      try:
+        _WipeDisks(self, iobj)
+      except errors.OpExecError, err:
+        logging.exception("Wiping disks failed")
+        self.LogWarning("Wiping instance disks failed (%s)", err)
+        disk_abort = True
+
+    if disk_abort:
+      # Something is already wrong with the disks, don't do anything else
+      pass
+    elif self.op.wait_for_sync:
       disk_abort = not _WaitForSync(self, iobj)
     elif iobj.disk_template in constants.DTS_INT_MIRROR:
       # make sure the disks are not degraded (still sync-ing is ok)
@@ -8461,24 +8504,29 @@ class LUInstanceReplaceDisks(LogicalUnit):
   def ExpandNames(self):
     self._ExpandAndLockInstance()
 
-    if self.op.iallocator is not None:
-      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+    assert locking.LEVEL_NODE not in self.needed_locks
+    assert locking.LEVEL_NODEGROUP not in self.needed_locks
+
+    assert self.op.iallocator is None or self.op.remote_node is None, \
+      "Conflicting options"
 
-    elif self.op.remote_node is not None:
-      remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
-      self.op.remote_node = remote_node
+    if self.op.remote_node is not None:
+      self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
 
       # Warning: do not remove the locking of the new secondary here
       # unless DRBD8.AddChildren is changed to work in parallel;
       # currently it doesn't since parallel invocations of
       # FindUnusedMinor will conflict
-      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
+      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
-
     else:
       self.needed_locks[locking.LEVEL_NODE] = []
       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
 
+      if self.op.iallocator is not None:
+        # iallocator will select a new node in the same group
+        self.needed_locks[locking.LEVEL_NODEGROUP] = []
+
     self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
                                    self.op.iallocator, self.op.remote_node,
                                    self.op.disks, False, self.op.early_release)
@@ -8486,11 +8534,26 @@ class LUInstanceReplaceDisks(LogicalUnit):
     self.tasklets = [self.replacer]
 
   def DeclareLocks(self, level):
-    # If we're not already locking all nodes in the set we have to declare the
-    # instance's primary/secondary nodes.
-    if (level == locking.LEVEL_NODE and
-        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
-      self._LockInstancesNodes()
+    if level == locking.LEVEL_NODEGROUP:
+      assert self.op.remote_node is None
+      assert self.op.iallocator is not None
+      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
+
+      self.share_locks[locking.LEVEL_NODEGROUP] = 1
+      self.needed_locks[locking.LEVEL_NODEGROUP] = \
+        self.cfg.GetInstanceNodeGroups(self.op.instance_name)
+
+    elif level == locking.LEVEL_NODE:
+      if self.op.iallocator is not None:
+        assert self.op.remote_node is None
+        assert not self.needed_locks[locking.LEVEL_NODE]
+
+        # Lock member nodes of all locked groups
+        self.needed_locks[locking.LEVEL_NODE] = [node_name
+          for group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP)
+          for node_name in self.cfg.GetNodeGroup(group_uuid).members]
+      else:
+        self._LockInstancesNodes()
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -8520,6 +8583,26 @@ class LUInstanceReplaceDisks(LogicalUnit):
       nl.append(self.op.remote_node)
     return nl, nl
 
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    """
+    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
+            self.op.iallocator is None)
+
+    owned_groups = self.glm.list_owned(locking.LEVEL_NODEGROUP)
+    if owned_groups:
+      groups = self.cfg.GetInstanceNodeGroups(self.op.instance_name)
+      if owned_groups != groups:
+        raise errors.OpExecError("Node groups used by instance '%s' changed"
+                                 " since lock was acquired, current list is %r,"
+                                 " used to be '%s'" %
+                                 (self.op.instance_name,
+                                  utils.CommaJoin(groups),
+                                  utils.CommaJoin(owned_groups)))
+
+    return LogicalUnit.CheckPrereq(self)
+
 
 class TLReplaceDisks(Tasklet):
   """Replaces disks for an instance.
@@ -8631,7 +8714,6 @@ class TLReplaceDisks(Tasklet):
 
     return True
 
-
   def CheckPrereq(self):
     """Check prerequisites.
 
@@ -8673,20 +8755,23 @@ class TLReplaceDisks(Tasklet):
       remote_node = self._RunAllocator(self.lu, self.iallocator_name,
                                        instance.name, instance.secondary_nodes)
 
-    if remote_node is not None:
+    if remote_node is None:
+      self.remote_node_info = None
+    else:
+      assert remote_node in self.lu.glm.list_owned(locking.LEVEL_NODE), \
+             "Remote node '%s' is not locked" % remote_node
+
       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
       assert self.remote_node_info is not None, \
         "Cannot retrieve locked node %s" % remote_node
-    else:
-      self.remote_node_info = None
 
     if remote_node == self.instance.primary_node:
       raise errors.OpPrereqError("The specified node is the primary node of"
-                                 " the instance.", errors.ECODE_INVAL)
+                                 " the instance", errors.ECODE_INVAL)
 
     if remote_node == secondary_node:
       raise errors.OpPrereqError("The specified node is already the"
-                                 " secondary node of the instance.",
+                                 " secondary node of the instance",
                                  errors.ECODE_INVAL)
 
     if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
@@ -8762,18 +8847,26 @@ class TLReplaceDisks(Tasklet):
     for node in check_nodes:
       _CheckNodeOnline(self.lu, node)
 
+    touched_nodes = frozenset(node_name for node_name in [self.new_node,
+                                                          self.other_node,
+                                                          self.target_node]
+                              if node_name is not None)
+
+    # Release unneeded node locks
+    _ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
+
+    # Release any owned node group
+    if self.lu.glm.is_owned(locking.LEVEL_NODEGROUP):
+      _ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
+
     # Check whether disks are valid
     for disk_idx in self.disks:
       instance.FindDisk(disk_idx)
 
     # Get secondary node IP addresses
-    node_2nd_ip = {}
-
-    for node_name in [self.target_node, self.other_node, self.new_node]:
-      if node_name is not None:
-        node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
-
-    self.node_secondary_ip = node_2nd_ip
+    self.node_secondary_ip = \
+      dict((node_name, self.cfg.GetNodeInfo(node_name).secondary_ip)
+           for node_name in touched_nodes)
 
   def Exec(self, feedback_fn):
     """Execute disk replacement.
@@ -8784,6 +8877,20 @@ class TLReplaceDisks(Tasklet):
     if self.delay_iallocator:
       self._CheckPrereq2()
 
+    if __debug__:
+      # Verify owned locks before starting operation
+      owned_locks = self.lu.glm.list_owned(locking.LEVEL_NODE)
+      assert set(owned_locks) == set(self.node_secondary_ip), \
+          ("Incorrect node locks, owning %s, expected %s" %
+           (owned_locks, self.node_secondary_ip.keys()))
+
+      owned_locks = self.lu.glm.list_owned(locking.LEVEL_INSTANCE)
+      assert list(owned_locks) == [self.instance_name], \
+          "Instance '%s' not locked" % self.instance_name
+
+      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
+          "Should not own any node group lock at this point"
+
     if not self.disks:
       feedback_fn("No disks need replacement")
       return
@@ -8804,14 +8911,24 @@ class TLReplaceDisks(Tasklet):
       else:
         fn = self._ExecDrbd8DiskOnly
 
-      return fn(feedback_fn)
-
+      result = fn(feedback_fn)
     finally:
       # Deactivate the instance disks if we're replacing them on a
       # down instance
       if activate_disks:
         _SafeShutdownInstanceDisks(self.lu, self.instance)
 
+    if __debug__:
+      # Verify owned locks
+      owned_locks = self.lu.glm.list_owned(locking.LEVEL_NODE)
+      nodes = frozenset(self.node_secondary_ip)
+      assert ((self.early_release and not owned_locks) or
+              (not self.early_release and not (set(owned_locks) - nodes))), \
+        ("Not owning the correct locks, early_release=%s, owned=%r,"
+         " nodes=%r" % (self.early_release, owned_locks, nodes))
+
+    return result
+
   def _CheckVolumeGroup(self, nodes):
     self.lu.LogInfo("Checking volume groups")
 
@@ -8863,7 +8980,6 @@ class TLReplaceDisks(Tasklet):
                                  (node_name, self.instance.name))
 
   def _CreateNewStorage(self, node_name):
-    vgname = self.cfg.GetVGName()
     iv_names = {}
 
     for idx, dev in enumerate(self.instance.disks):
@@ -8877,10 +8993,12 @@ class TLReplaceDisks(Tasklet):
       lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
       names = _GenerateUniqueNames(self.lu, lv_names)
 
+      vg_data = dev.children[0].logical_id[0]
       lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
-                             logical_id=(vgname, names[0]))
+                             logical_id=(vg_data, names[0]))
+      vg_meta = dev.children[1].logical_id[0]
       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
-                             logical_id=(vgname, names[1]))
+                             logical_id=(vg_meta, names[1]))
 
       new_lvs = [lv_data, lv_meta]
       old_lvs = dev.children
@@ -8921,10 +9039,6 @@ class TLReplaceDisks(Tasklet):
           self.lu.LogWarning("Can't remove old LV: %s" % msg,
                              hint="remove unused LVs manually")
 
-  def _ReleaseNodeLock(self, node_name):
-    """Releases the lock for a given node."""
-    self.lu.context.glm.release(locking.LEVEL_NODE, node_name)
-
   def _ExecDrbd8DiskOnly(self, feedback_fn):
     """Replace a disk on the primary or secondary for DRBD 8.
 
@@ -9042,7 +9156,8 @@ class TLReplaceDisks(Tasklet):
       self._RemoveOldStorage(self.target_node, iv_names)
       # WARNING: we release both node locks here, do not do other RPCs
       # than WaitForSync to the primary node
-      self._ReleaseNodeLock([self.target_node, self.other_node])
+      _ReleaseLocks(self.lu, locking.LEVEL_NODE,
+                    names=[self.target_node, self.other_node])
 
     # Wait for sync
     # This can fail as the old devices are degraded and _WaitForSync
@@ -9199,9 +9314,10 @@ class TLReplaceDisks(Tasklet):
       self._RemoveOldStorage(self.target_node, iv_names)
       # WARNING: we release all node locks here, do not do other RPCs
       # than WaitForSync to the primary node
-      self._ReleaseNodeLock([self.instance.primary_node,
-                             self.target_node,
-                             self.new_node])
+      _ReleaseLocks(self.lu, locking.LEVEL_NODE,
+                    names=[self.instance.primary_node,
+                           self.target_node,
+                           self.new_node])
 
     # Wait for sync
     # This can fail as the old devices are degraded and _WaitForSync
@@ -9379,7 +9495,7 @@ class LUInstanceGrowDisk(LogicalUnit):
 
     if instance.disk_template not in constants.DTS_GROWABLE:
       raise errors.OpPrereqError("Instance's disk layout does not support"
-                                 " growing.", errors.ECODE_INVAL)
+                                 " growing", errors.ECODE_INVAL)
 
     self.disk = instance.FindDisk(self.op.disk)
 
@@ -9401,9 +9517,17 @@ class LUInstanceGrowDisk(LogicalUnit):
     if not disks_ok:
       raise errors.OpExecError("Cannot activate block device to grow")
 
+    # First run all grow ops in dry-run mode
     for node in instance.all_nodes:
       self.cfg.SetDiskID(disk, node)
-      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
+      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, True)
+      result.Raise("Grow request failed to node %s" % node)
+
+    # We know that (as far as we can test) operations across different
+    # nodes will succeed, time to run it for real
+    for node in instance.all_nodes:
+      self.cfg.SetDiskID(disk, node)
+      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, False)
       result.Raise("Grow request failed to node %s" % node)
 
       # TODO: Rewrite code to work properly
@@ -9418,14 +9542,14 @@ class LUInstanceGrowDisk(LogicalUnit):
     if self.op.wait_for_sync:
       disk_abort = not _WaitForSync(self, instance, disks=[disk])
       if disk_abort:
-        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
-                             " status.\nPlease check the instance.")
+        self.proc.LogWarning("Disk sync-ing has not returned a good"
+                             " status; please check the instance")
       if not instance.admin_up:
         _SafeShutdownInstanceDisks(self, instance, disks=[disk])
     elif not instance.admin_up:
       self.proc.LogWarning("Not shutting down the disk even if the instance is"
                            " not supposed to be running because no wait for"
-                           " sync mode was requested.")
+                           " sync mode was requested")
 
 
 class LUInstanceQueryData(NoHooksLU):
@@ -9473,7 +9597,7 @@ class LUInstanceQueryData(NoHooksLU):
     """
     if self.wanted_names is None:
       assert self.op.use_locking, "Locking was not used"
-      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
+      self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE)
 
     self.wanted_instances = [self.cfg.GetInstanceInfo(name)
                              for name in self.wanted_names]
@@ -10036,7 +10160,8 @@ class LUInstanceSetParams(LogicalUnit):
     snode = self.op.remote_node
 
     # create a fake disk info for _GenerateDiskTemplate
-    disk_info = [{constants.IDISK_SIZE: d.size, constants.IDISK_MODE: d.mode}
+    disk_info = [{constants.IDISK_SIZE: d.size, constants.IDISK_MODE: d.mode,
+                  constants.IDISK_VG: d.logical_id[0]}
                  for d in instance.disks]
     new_disks = _GenerateDiskTemplate(self, self.op.disk_template,
                                       instance.name, pnode, [snode],
@@ -10280,7 +10405,7 @@ class LUBackupQuery(NoHooksLU):
         that node.
 
     """
-    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
+    self.nodes = self.glm.list_owned(locking.LEVEL_NODE)
     rpcresult = self.rpc.call_export_list(self.nodes)
     result = {}
     for node in rpcresult:
@@ -10662,7 +10787,7 @@ class LUBackupRemove(NoHooksLU):
       fqdn_warn = True
       instance_name = self.op.instance_name
 
-    locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
+    locked_nodes = self.glm.list_owned(locking.LEVEL_NODE)
     exportlist = self.rpc.call_export_list(locked_nodes)
     found = False
     for node in exportlist:
@@ -10800,7 +10925,7 @@ class LUGroupAssignNodes(NoHooksLU):
 
         if previous_splits:
           self.LogWarning("In addition, these already-split instances continue"
-                          " to be spit across groups: %s",
+                          " to be split across groups: %s",
                           utils.CommaJoin(utils.NiceSort(previous_splits)))
 
   def Exec(self, feedback_fn):
@@ -10889,7 +11014,8 @@ class _GroupQuery(_QueryBase):
           missing.append(name)
 
       if missing:
-        raise errors.OpPrereqError("Some groups do not exist: %s" % missing,
+        raise errors.OpPrereqError("Some groups do not exist: %s" %
+                                   utils.CommaJoin(missing),
                                    errors.ECODE_NOENT)
 
   def DeclareLocks(self, lu, level):