gnt-debug: rename allocator to iallocator
[ganeti-local] / lib / cmdlib.py
index b967979..98a056c 100644 (file)
@@ -57,6 +57,7 @@ from ganeti import netutils
 from ganeti import query
 from ganeti import qlang
 from ganeti import opcodes
+from ganeti import ht
 
 import ganeti.masterd.instance # pylint: disable-msg=W0611
 
@@ -74,7 +75,28 @@ def _SupportsOob(cfg, node):
   return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
 
 
-# End types
+class ResultWithJobs:
+  """Data container for LU results with jobs.
+
+  Instances of this class returned from L{LogicalUnit.Exec} will be recognized
+  by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
+  contained in the C{jobs} attribute and include the job IDs in the opcode
+  result.
+
+  """
+  def __init__(self, jobs, **kwargs):
+    """Initializes this class.
+
+    Additional return values can be specified as keyword arguments.
+
+    @type jobs: list of lists of L{opcode.OpCode}
+    @param jobs: A list of lists of opcode objects
+
+    """
+    self.jobs = jobs
+    self.other = kwargs
+
+
 class LogicalUnit(object):
   """Logical Unit base class.
 
@@ -83,6 +105,7 @@ class LogicalUnit(object):
     - implement CheckPrereq (except when tasklets are used)
     - implement Exec (except when tasklets are used)
     - implement BuildHooksEnv
+    - implement BuildHooksNodes
     - redefine HPATH and HTYPE
     - optionally redefine their run requirements:
         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
@@ -107,17 +130,16 @@ class LogicalUnit(object):
     self.proc = processor
     self.op = op
     self.cfg = context.cfg
+    self.glm = context.glm
     self.context = context
     self.rpc = rpc
     # Dicts used to declare locking needs to mcpu
     self.needed_locks = None
-    self.acquired_locks = {}
     self.share_locks = dict.fromkeys(locking.LEVELS, 0)
     self.add_locks = {}
     self.remove_locks = {}
     # Used to force good behavior when calling helper functions
     self.recalculate_locks = {}
-    self.__ssh = None
     # logging
     self.Log = processor.Log # pylint: disable-msg=C0103
     self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
@@ -138,16 +160,6 @@ class LogicalUnit(object):
 
     self.CheckArguments()
 
-  def __GetSSH(self):
-    """Returns the SshRunner object
-
-    """
-    if not self.__ssh:
-      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
-    return self.__ssh
-
-  ssh = property(fget=__GetSSH)
-
   def CheckArguments(self):
     """Check syntactic validity for the opcode arguments.
 
@@ -273,21 +285,28 @@ class LogicalUnit(object):
   def BuildHooksEnv(self):
     """Build hooks environment for this LU.
 
-    This method should return a three-node tuple consisting of: a dict
-    containing the environment that will be used for running the
-    specific hook for this LU, a list of node names on which the hook
-    should run before the execution, and a list of node names on which
-    the hook should run after the execution.
+    @rtype: dict
+    @return: Dictionary containing the environment that will be used for
+      running the hooks for this LU. The keys of the dict must not be prefixed
+      with "GANETI_"--that'll be added by the hooks runner. The hooks runner
+      will extend the environment with additional variables. If no environment
+      should be defined, an empty dictionary should be returned (not C{None}).
+    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
+      will not be called.
 
-    The keys of the dict must not have 'GANETI_' prefixed as this will
-    be handled in the hooks runner. Also note additional keys will be
-    added by the hooks runner. If the LU doesn't define any
-    environment, an empty dict (and not None) should be returned.
+    """
+    raise NotImplementedError
 
-    No nodes should be returned as an empty list (and not None).
+  def BuildHooksNodes(self):
+    """Build list of nodes to run LU's hooks.
 
-    Note that if the HPATH for a LU class is None, this function will
-    not be called.
+    @rtype: tuple; (list, list)
+    @return: Tuple containing a list of node names on which the hook
+      should run before the execution and a list of node names on which the
+      hook should run after the execution. No nodes should be returned as an
+      empty list (and not None).
+    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
+      will not be called.
 
     """
     raise NotImplementedError
@@ -367,7 +386,7 @@ class LogicalUnit(object):
     # future we might want to have different behaviors depending on the value
     # of self.recalculate_locks[locking.LEVEL_NODE]
     wanted_nodes = []
-    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
+    for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE):
       instance = self.context.cfg.GetInstanceInfo(instance_name)
       wanted_nodes.append(instance.primary_node)
       if not primary_only:
@@ -397,7 +416,13 @@ class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
     This just raises an error.
 
     """
-    assert False, "BuildHooksEnv called for NoHooksLUs"
+    raise AssertionError("BuildHooksEnv called for NoHooksLUs")
+
+  def BuildHooksNodes(self):
+    """Empty BuildHooksNodes for NoHooksLU.
+
+    """
+    raise AssertionError("BuildHooksNodes called for NoHooksLU")
 
 
 class Tasklet:
@@ -475,7 +500,7 @@ class _QueryBase:
 
     """
     if self.do_locking:
-      names = lu.acquired_locks[lock_level]
+      names = lu.glm.list_owned(lock_level)
     else:
       names = all_names
 
@@ -486,7 +511,7 @@ class _QueryBase:
 
     # caller specified names and we must keep the same order
     assert self.names
-    assert not self.do_locking or lu.acquired_locks[lock_level]
+    assert not self.do_locking or lu.glm.is_owned(lock_level)
 
     missing = set(self.wanted).difference(names)
     if missing:
@@ -496,15 +521,6 @@ class _QueryBase:
     # Return expanded names
     return self.wanted
 
-  @classmethod
-  def FieldsQuery(cls, fields):
-    """Returns list of available fields.
-
-    @return: List of L{objects.QueryFieldDefinition}
-
-    """
-    return query.QueryFields(cls.FIELDS, fields)
-
   def ExpandNames(self, lu):
     """Expand names for this query.
 
@@ -615,6 +631,63 @@ def _GetUpdatedParams(old_params, update_dict,
   return params_copy
 
 
+def _ReleaseLocks(lu, level, names=None, keep=None):
+  """Releases locks owned by an LU.
+
+  @type lu: L{LogicalUnit}
+  @param level: Lock level
+  @type names: list or None
+  @param names: Names of locks to release
+  @type keep: list or None
+  @param keep: Names of locks to retain
+
+  """
+  assert not (keep is not None and names is not None), \
+         "Only one of the 'names' and the 'keep' parameters can be given"
+
+  if names is not None:
+    should_release = names.__contains__
+  elif keep:
+    should_release = lambda name: name not in keep
+  else:
+    should_release = None
+
+  if should_release:
+    retain = []
+    release = []
+
+    # Determine which locks to release
+    for name in lu.glm.list_owned(level):
+      if should_release(name):
+        release.append(name)
+      else:
+        retain.append(name)
+
+    assert len(lu.glm.list_owned(level)) == (len(retain) + len(release))
+
+    # Release just some locks
+    lu.glm.release(level, names=release)
+
+    assert frozenset(lu.glm.list_owned(level)) == frozenset(retain)
+  else:
+    # Release everything
+    lu.glm.release(level)
+
+    assert not lu.glm.is_owned(level), "No locks should be owned"
+
+
+def _RunPostHook(lu, node_name):
+  """Runs the post-hook for an opcode on a single node.
+
+  """
+  hm = lu.proc.hmclass(lu.rpc.call_hooks_runner, lu)
+  try:
+    hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
+  except:
+    # pylint: disable-msg=W0702
+    lu.LogWarning("Errors occurred running hooks on %s" % node_name)
+
+
 def _CheckOutputFields(static, dynamic, selected):
   """Checks whether all selected fields are valid.
 
@@ -1081,7 +1154,7 @@ def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
   iallocator = getattr(lu.op, iallocator_slot, None)
 
   if node is not None and iallocator is not None:
-    raise errors.OpPrereqError("Do not specify both, iallocator and node.",
+    raise errors.OpPrereqError("Do not specify both, iallocator and node",
                                errors.ECODE_INVAL)
   elif node is None and iallocator is None:
     default_iallocator = lu.cfg.GetDefaultIAllocator()
@@ -1089,10 +1162,10 @@ def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
       setattr(lu.op, iallocator_slot, default_iallocator)
     else:
       raise errors.OpPrereqError("No iallocator or node given and no"
-                                 " cluster-wide default iallocator found."
-                                 " Please specify either an iallocator or a"
+                                 " cluster-wide default iallocator found;"
+                                 " please specify either an iallocator or a"
                                  " node, or set a cluster-wide default"
-                                 " iallocator.")
+                                 " iallocator")
 
 
 class LUClusterPostInit(LogicalUnit):
@@ -1106,9 +1179,15 @@ class LUClusterPostInit(LogicalUnit):
     """Build hooks env.
 
     """
-    env = {"OP_TARGET": self.cfg.GetClusterName()}
-    mn = self.cfg.GetMasterNode()
-    return env, [], [mn]
+    return {
+      "OP_TARGET": self.cfg.GetClusterName(),
+      }
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
+    return ([], [self.cfg.GetMasterNode()])
 
   def Exec(self, feedback_fn):
     """Nothing to do.
@@ -1128,8 +1207,15 @@ class LUClusterDestroy(LogicalUnit):
     """Build hooks env.
 
     """
-    env = {"OP_TARGET": self.cfg.GetClusterName()}
-    return env, [], []
+    return {
+      "OP_TARGET": self.cfg.GetClusterName(),
+      }
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
+    return ([], [])
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -1159,12 +1245,7 @@ class LUClusterDestroy(LogicalUnit):
     master = self.cfg.GetMasterNode()
 
     # Run post hooks on master node before it's removed
-    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
-    try:
-      hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
-    except:
-      # pylint: disable-msg=W0702
-      self.LogWarning("Errors occurred running hooks on %s" % master)
+    _RunPostHook(self, master)
 
     result = self.rpc.call_node_stop_master(master, False)
     result.Raise("Could not disable the master role")
@@ -1173,7 +1254,7 @@ class LUClusterDestroy(LogicalUnit):
 
 
 def _VerifyCertificate(filename):
-  """Verifies a certificate for LUClusterVerify.
+  """Verifies a certificate for LUClusterVerifyConfig.
 
   @type filename: string
   @param filename: Path to PEM file
@@ -1183,7 +1264,7 @@ def _VerifyCertificate(filename):
     cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
                                            utils.ReadFile(filename))
   except Exception, err: # pylint: disable-msg=W0703
-    return (LUClusterVerify.ETYPE_ERROR,
+    return (LUClusterVerifyConfig.ETYPE_ERROR,
             "Failed to load X509 certificate %s: %s" % (filename, err))
 
   (errcode, msg) = \
@@ -1198,27 +1279,61 @@ def _VerifyCertificate(filename):
   if errcode is None:
     return (None, fnamemsg)
   elif errcode == utils.CERT_WARNING:
-    return (LUClusterVerify.ETYPE_WARNING, fnamemsg)
+    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
   elif errcode == utils.CERT_ERROR:
-    return (LUClusterVerify.ETYPE_ERROR, fnamemsg)
+    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
 
   raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
 
 
-class LUClusterVerify(LogicalUnit):
-  """Verifies the cluster status.
+def _GetAllHypervisorParameters(cluster, instances):
+  """Compute the set of all hypervisor parameters.
+
+  @type cluster: L{objects.Cluster}
+  @param cluster: the cluster object
+  @param instances: list of L{objects.Instance}
+  @param instances: additional instances from which to obtain parameters
+  @rtype: list of (origin, hypervisor, parameters)
+  @return: a list with all parameters found, indicating the hypervisor they
+       apply to, and the origin (can be "cluster", "os X", or "instance Y")
 
   """
-  HPATH = "cluster-verify"
-  HTYPE = constants.HTYPE_CLUSTER
-  REQ_BGL = False
+  hvp_data = []
+
+  for hv_name in cluster.enabled_hypervisors:
+    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
+
+  for os_name, os_hvp in cluster.os_hvp.items():
+    for hv_name, hv_params in os_hvp.items():
+      if hv_params:
+        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
+        hvp_data.append(("os %s" % os_name, hv_name, full_params))
+
+  # TODO: collapse identical parameter values in a single one
+  for instance in instances:
+    if instance.hvparams:
+      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
+                       cluster.FillHV(instance)))
+
+  return hvp_data
 
+
+class _VerifyErrors(object):
+  """Mix-in for cluster/group verify LUs.
+
+  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
+  self.op and self._feedback_fn to be available.)
+
+  """
   TCLUSTER = "cluster"
   TNODE = "node"
   TINSTANCE = "instance"
 
   ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
   ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
+  ECLUSTERFILECHECK = (TCLUSTER, "ECLUSTERFILECHECK")
+  ECLUSTERDANGLINGNODES = (TNODE, "ECLUSTERDANGLINGNODES")
+  ECLUSTERDANGLINGINST = (TNODE, "ECLUSTERDANGLINGINST")
   EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
   EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
   EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
@@ -1248,6 +1363,138 @@ class LUClusterVerify(LogicalUnit):
   ETYPE_ERROR = "ERROR"
   ETYPE_WARNING = "WARNING"
 
+  def _Error(self, ecode, item, msg, *args, **kwargs):
+    """Format an error message.
+
+    Based on the opcode's error_codes parameter, either format a
+    parseable error code, or a simpler error string.
+
+    This must be called only from Exec and functions called from Exec.
+
+    """
+    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
+    itype, etxt = ecode
+    # first complete the msg
+    if args:
+      msg = msg % args
+    # then format the whole message
+    if self.op.error_codes: # This is a mix-in. pylint: disable-msg=E1101
+      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
+    else:
+      if item:
+        item = " " + item
+      else:
+        item = ""
+      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
+    # and finally report it via the feedback_fn
+    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable-msg=E1101
+
+  def _ErrorIf(self, cond, *args, **kwargs):
+    """Log an error message if the passed condition is True.
+
+    """
+    cond = (bool(cond)
+            or self.op.debug_simulate_errors) # pylint: disable-msg=E1101
+    if cond:
+      self._Error(*args, **kwargs)
+    # do not mark the operation as failed for WARN cases only
+    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
+      self.bad = self.bad or cond
+
+
+class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
+  """Verifies the cluster config.
+
+  """
+  REQ_BGL = False
+
+  def _VerifyHVP(self, hvp_data):
+    """Verifies locally the syntax of the hypervisor parameters.
+
+    """
+    for item, hv_name, hv_params in hvp_data:
+      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
+             (item, hv_name))
+      try:
+        hv_class = hypervisor.GetHypervisor(hv_name)
+        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
+        hv_class.CheckParameterSyntax(hv_params)
+      except errors.GenericError, err:
+        self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err))
+
+  def ExpandNames(self):
+    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
+    self.all_node_info = self.cfg.GetAllNodesInfo()
+    self.all_inst_info = self.cfg.GetAllInstancesInfo()
+    self.needed_locks = {}
+
+  def Exec(self, feedback_fn):
+    """Verify integrity of cluster, performing various test on nodes.
+
+    """
+    self.bad = False
+    self._feedback_fn = feedback_fn
+
+    feedback_fn("* Verifying cluster config")
+
+    for msg in self.cfg.VerifyConfig():
+      self._ErrorIf(True, self.ECLUSTERCFG, None, msg)
+
+    feedback_fn("* Verifying cluster certificate files")
+
+    for cert_filename in constants.ALL_CERT_FILES:
+      (errcode, msg) = _VerifyCertificate(cert_filename)
+      self._ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
+
+    feedback_fn("* Verifying hypervisor parameters")
+
+    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
+                                                self.all_inst_info.values()))
+
+    feedback_fn("* Verifying all nodes belong to an existing group")
+
+    # We do this verification here because, should this bogus circumstance
+    # occur, it would never be catched by VerifyGroup, which only acts on
+    # nodes/instances reachable from existing node groups.
+
+    dangling_nodes = set(node.name for node in self.all_node_info.values()
+                         if node.group not in self.all_group_info)
+
+    dangling_instances = {}
+    no_node_instances = []
+
+    for inst in self.all_inst_info.values():
+      if inst.primary_node in dangling_nodes:
+        dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
+      elif inst.primary_node not in self.all_node_info:
+        no_node_instances.append(inst.name)
+
+    pretty_dangling = [
+        "%s (%s)" %
+        (node.name,
+         utils.CommaJoin(dangling_instances.get(node.name,
+                                                ["no instances"])))
+        for node in dangling_nodes]
+
+    self._ErrorIf(bool(dangling_nodes), self.ECLUSTERDANGLINGNODES, None,
+                  "the following nodes (and their instances) belong to a non"
+                  " existing group: %s", utils.CommaJoin(pretty_dangling))
+
+    self._ErrorIf(bool(no_node_instances), self.ECLUSTERDANGLINGINST, None,
+                  "the following instances have a non-existing primary-node:"
+                  " %s", utils.CommaJoin(no_node_instances))
+
+    return (not self.bad, [g.name for g in self.all_group_info.values()])
+
+
+class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
+  """Verifies the status of a node group.
+
+  """
+  HPATH = "cluster-verify"
+  HTYPE = constants.HTYPE_CLUSTER
+  REQ_BGL = False
+
   _HOOKS_INDENT_RE = re.compile("^", re.M)
 
   class NodeImage(object):
@@ -1301,48 +1548,90 @@ class LUClusterVerify(LogicalUnit):
       self.oslist = {}
 
   def ExpandNames(self):
+    # This raises errors.OpPrereqError on its own:
+    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
+
+    all_node_info = self.cfg.GetAllNodesInfo()
+    all_inst_info = self.cfg.GetAllInstancesInfo()
+
+    node_names = set(node.name
+                     for node in all_node_info.values()
+                     if node.group == self.group_uuid)
+
+    inst_names = [inst.name
+                  for inst in all_inst_info.values()
+                  if inst.primary_node in node_names]
+
+    # In Exec(), we warn about mirrored instances that have primary and
+    # secondary living in separate node groups. To fully verify that
+    # volumes for these instances are healthy, we will need to do an
+    # extra call to their secondaries. We ensure here those nodes will
+    # be locked.
+    for inst in inst_names:
+      if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
+        node_names.update(all_inst_info[inst].secondary_nodes)
+
     self.needed_locks = {
-      locking.LEVEL_NODE: locking.ALL_SET,
-      locking.LEVEL_INSTANCE: locking.ALL_SET,
+      locking.LEVEL_NODEGROUP: [self.group_uuid],
+      locking.LEVEL_NODE: list(node_names),
+      locking.LEVEL_INSTANCE: inst_names,
     }
+
     self.share_locks = dict.fromkeys(locking.LEVELS, 1)
 
-  def _Error(self, ecode, item, msg, *args, **kwargs):
-    """Format an error message.
+  def CheckPrereq(self):
+    self.all_node_info = self.cfg.GetAllNodesInfo()
+    self.all_inst_info = self.cfg.GetAllInstancesInfo()
 
-    Based on the opcode's error_codes parameter, either format a
-    parseable error code, or a simpler error string.
+    group_nodes = set(node.name
+                      for node in self.all_node_info.values()
+                      if node.group == self.group_uuid)
 
-    This must be called only from Exec and functions called from Exec.
+    group_instances = set(inst.name
+                          for inst in self.all_inst_info.values()
+                          if inst.primary_node in group_nodes)
 
-    """
-    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
-    itype, etxt = ecode
-    # first complete the msg
-    if args:
-      msg = msg % args
-    # then format the whole message
-    if self.op.error_codes:
-      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
-    else:
-      if item:
-        item = " " + item
-      else:
-        item = ""
-      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
-    # and finally report it via the feedback_fn
-    self._feedback_fn("  - %s" % msg)
+    unlocked_nodes = \
+        group_nodes.difference(self.glm.list_owned(locking.LEVEL_NODE))
 
-  def _ErrorIf(self, cond, *args, **kwargs):
-    """Log an error message if the passed condition is True.
+    unlocked_instances = \
+        group_instances.difference(self.glm.list_owned(locking.LEVEL_INSTANCE))
 
-    """
-    cond = bool(cond) or self.op.debug_simulate_errors
-    if cond:
-      self._Error(*args, **kwargs)
-    # do not mark the operation as failed for WARN cases only
-    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
-      self.bad = self.bad or cond
+    if unlocked_nodes:
+      raise errors.OpPrereqError("missing lock for nodes: %s" %
+                                 utils.CommaJoin(unlocked_nodes))
+
+    if unlocked_instances:
+      raise errors.OpPrereqError("missing lock for instances: %s" %
+                                 utils.CommaJoin(unlocked_instances))
+
+    self.my_node_names = utils.NiceSort(group_nodes)
+    self.my_inst_names = utils.NiceSort(group_instances)
+
+    self.my_node_info = dict((name, self.all_node_info[name])
+                             for name in self.my_node_names)
+
+    self.my_inst_info = dict((name, self.all_inst_info[name])
+                             for name in self.my_inst_names)
+
+    # We detect here the nodes that will need the extra RPC calls for verifying
+    # split LV volumes; they should be locked.
+    extra_lv_nodes = set()
+
+    for inst in self.my_inst_info.values():
+      if inst.disk_template in constants.DTS_INT_MIRROR:
+        group = self.my_node_info[inst.primary_node].group
+        for nname in inst.secondary_nodes:
+          if self.all_node_info[nname].group != group:
+            extra_lv_nodes.add(nname)
+
+    unlocked_lv_nodes = \
+        extra_lv_nodes.difference(self.glm.list_owned(locking.LEVEL_NODE))
+
+    if unlocked_lv_nodes:
+      raise errors.OpPrereqError("these nodes could be locked: %s" %
+                                 utils.CommaJoin(unlocked_lv_nodes))
+    self.extra_lv_nodes = list(extra_lv_nodes)
 
   def _VerifyNode(self, ninfo, nresult):
     """Perform some basic validation on data returned from a node.
@@ -1411,7 +1700,7 @@ class LUClusterVerify(LogicalUnit):
                  hv_name, item, hv_result)
 
     test = nresult.get(constants.NV_NODESETUP,
-                           ["Missing NODESETUP results"])
+                       ["Missing NODESETUP results"])
     _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
              "; ".join(test))
 
@@ -1450,7 +1739,7 @@ class LUClusterVerify(LogicalUnit):
              ntime_diff)
 
   def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
-    """Check the node time.
+    """Check the node LVM results.
 
     @type ninfo: L{objects.Node}
     @param ninfo: the node to check
@@ -1486,8 +1775,31 @@ class LUClusterVerify(LogicalUnit):
         _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
                  " '%s' of VG '%s'", pvname, owner_vg)
 
+  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
+    """Check the node bridges.
+
+    @type ninfo: L{objects.Node}
+    @param ninfo: the node to check
+    @param nresult: the remote results for the node
+    @param bridges: the expected list of bridges
+
+    """
+    if not bridges:
+      return
+
+    node = ninfo.name
+    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+
+    missing = nresult.get(constants.NV_BRIDGES, None)
+    test = not isinstance(missing, list)
+    _ErrorIf(test, self.ENODENET, node,
+             "did not return valid bridge information")
+    if not test:
+      _ErrorIf(bool(missing), self.ENODENET, node, "missing bridges: %s" %
+               utils.CommaJoin(sorted(missing)))
+
   def _VerifyNodeNetwork(self, ninfo, nresult):
-    """Check the node time.
+    """Check the node network connectivity results.
 
     @type ninfo: L{objects.Node}
     @param ninfo: the node to check
@@ -1559,12 +1871,6 @@ class LUClusterVerify(LogicalUnit):
                "instance not running on its primary node %s",
                node_current)
 
-    for node, n_img in node_image.items():
-      if node != node_current:
-        test = instance in n_img.instances
-        _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
-                 "instance should not run on node %s", node)
-
     diskdata = [(nname, success, status, idx)
                 for (nname, disks) in diskstatus.items()
                 for idx, (success, status) in enumerate(disks)]
@@ -1604,18 +1910,6 @@ class LUClusterVerify(LogicalUnit):
         self._ErrorIf(test, self.ENODEORPHANLV, node,
                       "volume %s is unknown", volume)
 
-  def _VerifyOrphanInstances(self, instancelist, node_image):
-    """Verify the list of running instances.
-
-    This checks what instances are running but unknown to the cluster.
-
-    """
-    for node, n_img in node_image.items():
-      for o_inst in n_img.instances:
-        test = o_inst not in instancelist
-        self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
-                      "instance %s on node %s should not exist", o_inst, node)
-
   def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
     """Verify N+1 Memory Resilience.
 
@@ -1648,51 +1942,94 @@ class LUClusterVerify(LogicalUnit):
         test = n_img.mfree < needed_mem
         self._ErrorIf(test, self.ENODEN1, node,
                       "not enough memory to accomodate instance failovers"
-                      " should node %s fail", prinode)
+                      " should node %s fail (%dMiB needed, %dMiB available)",
+                      prinode, needed_mem, n_img.mfree)
 
-  def _VerifyNodeFiles(self, ninfo, nresult, file_list, local_cksum,
-                       master_files):
-    """Verifies and computes the node required file checksums.
+  @classmethod
+  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
+                   (files_all, files_all_opt, files_mc, files_vm)):
+    """Verifies file checksums collected from all nodes.
 
-    @type ninfo: L{objects.Node}
-    @param ninfo: the node to check
-    @param nresult: the remote results for the node
-    @param file_list: required list of files
-    @param local_cksum: dictionary of local files and their checksums
-    @param master_files: list of files that only masters should have
+    @param errorif: Callback for reporting errors
+    @param nodeinfo: List of L{objects.Node} objects
+    @param master_node: Name of master node
+    @param all_nvinfo: RPC results
 
     """
-    node = ninfo.name
-    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+    node_names = frozenset(node.name for node in nodeinfo)
 
-    remote_cksum = nresult.get(constants.NV_FILELIST, None)
-    test = not isinstance(remote_cksum, dict)
-    _ErrorIf(test, self.ENODEFILECHECK, node,
-             "node hasn't returned file checksum data")
-    if test:
-      return
+    assert master_node in node_names
+    assert (len(files_all | files_all_opt | files_mc | files_vm) ==
+            sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
+           "Found file listed in more than one file list"
+
+    # Define functions determining which nodes to consider for a file
+    file2nodefn = dict([(filename, fn)
+      for (files, fn) in [(files_all, None),
+                          (files_all_opt, None),
+                          (files_mc, lambda node: (node.master_candidate or
+                                                   node.name == master_node)),
+                          (files_vm, lambda node: node.vm_capable)]
+      for filename in files])
 
-    for file_name in file_list:
-      node_is_mc = ninfo.master_candidate
-      must_have = (file_name not in master_files) or node_is_mc
-      # missing
-      test1 = file_name not in remote_cksum
-      # invalid checksum
-      test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
-      # existing and good
-      test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
-      _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
-               "file '%s' missing", file_name)
-      _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
-               "file '%s' has wrong checksum", file_name)
-      # not candidate and this is not a must-have file
-      _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
-               "file '%s' should not exist on non master"
-               " candidates (and the file is outdated)", file_name)
-      # all good, except non-master/non-must have combination
-      _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
-               "file '%s' should not exist"
-               " on non master candidates", file_name)
+    fileinfo = dict((filename, {}) for filename in file2nodefn.keys())
+
+    for node in nodeinfo:
+      nresult = all_nvinfo[node.name]
+
+      if nresult.fail_msg or not nresult.payload:
+        node_files = None
+      else:
+        node_files = nresult.payload.get(constants.NV_FILELIST, None)
+
+      test = not (node_files and isinstance(node_files, dict))
+      errorif(test, cls.ENODEFILECHECK, node.name,
+              "Node did not return file checksum data")
+      if test:
+        continue
+
+      for (filename, checksum) in node_files.items():
+        # Check if the file should be considered for a node
+        fn = file2nodefn[filename]
+        if fn is None or fn(node):
+          fileinfo[filename].setdefault(checksum, set()).add(node.name)
+
+    for (filename, checksums) in fileinfo.items():
+      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
+
+      # Nodes having the file
+      with_file = frozenset(node_name
+                            for nodes in fileinfo[filename].values()
+                            for node_name in nodes)
+
+      # Nodes missing file
+      missing_file = node_names - with_file
+
+      if filename in files_all_opt:
+        # All or no nodes
+        errorif(missing_file and missing_file != node_names,
+                cls.ECLUSTERFILECHECK, None,
+                "File %s is optional, but it must exist on all or no nodes (not"
+                " found on %s)",
+                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
+      else:
+        errorif(missing_file, cls.ECLUSTERFILECHECK, None,
+                "File %s is missing from node(s) %s", filename,
+                utils.CommaJoin(utils.NiceSort(missing_file)))
+
+      # See if there are multiple versions of the file
+      test = len(checksums) > 1
+      if test:
+        variants = ["variant %s on %s" %
+                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
+                    for (idx, (checksum, nodes)) in
+                      enumerate(sorted(checksums.items()))]
+      else:
+        variants = []
+
+      errorif(test, cls.ECLUSTERFILECHECK, None,
+              "File %s found with %s different checksums (%s)",
+              filename, len(checksums), "; ".join(variants))
 
   def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
                       drbd_map):
@@ -1812,6 +2149,7 @@ class LUClusterVerify(LogicalUnit):
 
     assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
 
+    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
     for os_name, os_data in nimg.oslist.items():
       assert os_data, "Empty OS status for OS %s?!" % os_name
       f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
@@ -1839,11 +2177,12 @@ class LUClusterVerify(LogicalUnit):
         continue
       for kind, a, b in [("API version", f_api, b_api),
                          ("variants list", f_var, b_var),
-                         ("parameters", f_param, b_param)]:
+                         ("parameters", beautify_params(f_param),
+                          beautify_params(b_param))]:
         _ErrorIf(a != b, self.ENODEOS, node,
-                 "OS %s %s differs from reference node %s: %s vs. %s",
+                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
                  kind, os_name, base.name,
-                 utils.CommaJoin(a), utils.CommaJoin(b))
+                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
 
     # check any missing OSes
     missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
@@ -2053,21 +2392,6 @@ class LUClusterVerify(LogicalUnit):
 
     return instdisk
 
-  def _VerifyHVP(self, hvp_data):
-    """Verifies locally the syntax of the hypervisor parameters.
-
-    """
-    for item, hv_name, hv_params in hvp_data:
-      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
-             (item, hv_name))
-      try:
-        hv_class = hypervisor.GetHypervisor(hv_name)
-        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
-        hv_class.CheckParameterSyntax(hv_params)
-      except errors.GenericError, err:
-        self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err))
-
-
   def BuildHooksEnv(self):
     """Build hooks env.
 
@@ -2075,17 +2399,25 @@ class LUClusterVerify(LogicalUnit):
     the output be logged in the verify output and the verification to fail.
 
     """
-    all_nodes = self.cfg.GetNodeList()
     env = {
       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
       }
-    for node in self.cfg.GetAllNodesInfo().values():
-      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
 
-    return env, [], all_nodes
+    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
+               for node in self.my_node_info.values())
+
+    return env
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
+    assert self.my_node_names, ("Node list not gathered,"
+      " has CheckPrereq been executed?")
+    return ([], self.my_node_names)
 
   def Exec(self, feedback_fn):
-    """Verify integrity of cluster, performing various test on nodes.
+    """Verify integrity of the node group, performing various test on nodes.
 
     """
     # This method has too many local variables. pylint: disable-msg=R0914
@@ -2093,26 +2425,14 @@ class LUClusterVerify(LogicalUnit):
     _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
     verbose = self.op.verbose
     self._feedback_fn = feedback_fn
-    feedback_fn("* Verifying global settings")
-    for msg in self.cfg.VerifyConfig():
-      _ErrorIf(True, self.ECLUSTERCFG, None, msg)
-
-    # Check the cluster certificates
-    for cert_filename in constants.ALL_CERT_FILES:
-      (errcode, msg) = _VerifyCertificate(cert_filename)
-      _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
 
     vg_name = self.cfg.GetVGName()
     drbd_helper = self.cfg.GetDRBDHelper()
-    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
     cluster = self.cfg.GetClusterInfo()
-    nodelist = utils.NiceSort(self.cfg.GetNodeList())
-    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
-    nodeinfo_byname = dict(zip(nodelist, nodeinfo))
-    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
-    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
-                        for iname in instancelist)
     groupinfo = self.cfg.GetAllNodeGroupsInfo()
+    hypervisors = cluster.enabled_hypervisors
+    node_data_list = [self.my_node_info[name] for name in self.my_node_names]
+
     i_non_redundant = [] # Non redundant instances
     i_non_a_balanced = [] # Non auto-balanced instances
     n_offline = 0 # Count of offline nodes
@@ -2120,47 +2440,40 @@ class LUClusterVerify(LogicalUnit):
     node_vol_should = {}
 
     # FIXME: verify OS list
+
+    # File verification
+    filemap = _ComputeAncillaryFiles(cluster, False)
+
     # do local checksums
-    master_files = [constants.CLUSTER_CONF_FILE]
     master_node = self.master_node = self.cfg.GetMasterNode()
     master_ip = self.cfg.GetMasterIP()
 
-    file_names = ssconf.SimpleStore().GetFileList()
-    file_names.extend(constants.ALL_CERT_FILES)
-    file_names.extend(master_files)
-    if cluster.modify_etc_hosts:
-      file_names.append(constants.ETC_HOSTS)
-
-    local_checksums = utils.FingerprintFiles(file_names)
-
-    # Compute the set of hypervisor parameters
-    hvp_data = []
-    for hv_name in hypervisors:
-      hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
-    for os_name, os_hvp in cluster.os_hvp.items():
-      for hv_name, hv_params in os_hvp.items():
-        if not hv_params:
-          continue
-        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
-        hvp_data.append(("os %s" % os_name, hv_name, full_params))
-    # TODO: collapse identical parameter values in a single one
-    for instance in instanceinfo.values():
-      if not instance.hvparams:
-        continue
-      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
-                       cluster.FillHV(instance)))
-    # and verify them locally
-    self._VerifyHVP(hvp_data)
+    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
+
+    # We will make nodes contact all nodes in their group, and one node from
+    # every other group.
+    # TODO: should it be a *random* node, different every time?
+    online_nodes = [node.name for node in node_data_list if not node.offline]
+    other_group_nodes = {}
+
+    for name in sorted(self.all_node_info):
+      node = self.all_node_info[name]
+      if (node.group not in other_group_nodes
+          and node.group != self.group_uuid
+          and not node.offline):
+        other_group_nodes[node.group] = node.name
 
-    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
     node_verify_param = {
-      constants.NV_FILELIST: file_names,
-      constants.NV_NODELIST: [node.name for node in nodeinfo
-                              if not node.offline],
+      constants.NV_FILELIST:
+        utils.UniqueSequence(filename
+                             for files in filemap
+                             for filename in files),
+      constants.NV_NODELIST: online_nodes + other_group_nodes.values(),
       constants.NV_HYPERVISOR: hypervisors,
-      constants.NV_HVPARAMS: hvp_data,
-      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
-                                  node.secondary_ip) for node in nodeinfo
+      constants.NV_HVPARAMS:
+        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
+      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
+                                 for node in node_data_list
                                  if not node.offline],
       constants.NV_INSTANCELIST: hypervisors,
       constants.NV_VERSION: None,
@@ -2181,15 +2494,30 @@ class LUClusterVerify(LogicalUnit):
     if drbd_helper:
       node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
 
+    # bridge checks
+    # FIXME: this needs to be changed per node-group, not cluster-wide
+    bridges = set()
+    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
+    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
+      bridges.add(default_nicpp[constants.NIC_LINK])
+    for instance in self.my_inst_info.values():
+      for nic in instance.nics:
+        full_nic = cluster.SimpleFillNIC(nic.nicparams)
+        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
+          bridges.add(full_nic[constants.NIC_LINK])
+
+    if bridges:
+      node_verify_param[constants.NV_BRIDGES] = list(bridges)
+
     # Build our expected cluster state
     node_image = dict((node.name, self.NodeImage(offline=node.offline,
                                                  name=node.name,
                                                  vm_capable=node.vm_capable))
-                      for node in nodeinfo)
+                      for node in node_data_list)
 
     # Gather OOB paths
     oob_paths = []
-    for node in nodeinfo:
+    for node in self.all_node_info.values():
       path = _SupportsOob(self.cfg, node)
       if path and path not in oob_paths:
         oob_paths.append(path)
@@ -2197,14 +2525,13 @@ class LUClusterVerify(LogicalUnit):
     if oob_paths:
       node_verify_param[constants.NV_OOB_PATHS] = oob_paths
 
-    for instance in instancelist:
-      inst_config = instanceinfo[instance]
+    for instance in self.my_inst_names:
+      inst_config = self.my_inst_info[instance]
 
       for nname in inst_config.all_nodes:
         if nname not in node_image:
-          # ghost node
           gnode = self.NodeImage(name=nname)
-          gnode.ghost = True
+          gnode.ghost = (nname not in self.all_node_info)
           node_image[nname] = gnode
 
       inst_config.MapLVsByNode(node_vol_should)
@@ -2227,20 +2554,59 @@ class LUClusterVerify(LogicalUnit):
     # time before and after executing the request, we can at least have a time
     # window.
     nvinfo_starttime = time.time()
-    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
+    all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
+                                           node_verify_param,
                                            self.cfg.GetClusterName())
+    if self.extra_lv_nodes and vg_name is not None:
+      extra_lv_nvinfo = \
+          self.rpc.call_node_verify(self.extra_lv_nodes,
+                                    {constants.NV_LVLIST: vg_name},
+                                    self.cfg.GetClusterName())
+    else:
+      extra_lv_nvinfo = {}
     nvinfo_endtime = time.time()
 
     all_drbd_map = self.cfg.ComputeDRBDMap()
 
-    feedback_fn("* Gathering disk information (%s nodes)" % len(nodelist))
-    instdisk = self._CollectDiskInfo(nodelist, node_image, instanceinfo)
+    feedback_fn("* Gathering disk information (%s nodes)" %
+                len(self.my_node_names))
+    instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
+                                     self.my_inst_info)
+
+    feedback_fn("* Verifying configuration file consistency")
+
+    # If not all nodes are being checked, we need to make sure the master node
+    # and a non-checked vm_capable node are in the list.
+    absent_nodes = set(self.all_node_info).difference(self.my_node_info)
+    if absent_nodes:
+      vf_nvinfo = all_nvinfo.copy()
+      vf_node_info = list(self.my_node_info.values())
+      additional_nodes = []
+      if master_node not in self.my_node_info:
+        additional_nodes.append(master_node)
+        vf_node_info.append(self.all_node_info[master_node])
+      # Add the first vm_capable node we find which is not included
+      for node in absent_nodes:
+        nodeinfo = self.all_node_info[node]
+        if nodeinfo.vm_capable and not nodeinfo.offline:
+          additional_nodes.append(node)
+          vf_node_info.append(self.all_node_info[node])
+          break
+      key = constants.NV_FILELIST
+      vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
+                                                 {key: node_verify_param[key]},
+                                                 self.cfg.GetClusterName()))
+    else:
+      vf_nvinfo = all_nvinfo
+      vf_node_info = self.my_node_info.values()
+
+    self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
 
     feedback_fn("* Verifying node status")
 
     refos_img = None
 
-    for node_i in nodeinfo:
+    for node_i in node_data_list:
       node = node_i.name
       nimg = node_image[node]
 
@@ -2273,30 +2639,45 @@ class LUClusterVerify(LogicalUnit):
       nimg.call_ok = self._VerifyNode(node_i, nresult)
       self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
       self._VerifyNodeNetwork(node_i, nresult)
-      self._VerifyNodeFiles(node_i, nresult, file_names, local_checksums,
-                            master_files)
-
       self._VerifyOob(node_i, nresult)
 
       if nimg.vm_capable:
         self._VerifyNodeLVM(node_i, nresult, vg_name)
-        self._VerifyNodeDrbd(node_i, nresult, instanceinfo, drbd_helper,
+        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
                              all_drbd_map)
 
         self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
         self._UpdateNodeInstances(node_i, nresult, nimg)
         self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
         self._UpdateNodeOS(node_i, nresult, nimg)
+
         if not nimg.os_fail:
           if refos_img is None:
             refos_img = nimg
           self._VerifyNodeOS(node_i, nimg, refos_img)
+        self._VerifyNodeBridges(node_i, nresult, bridges)
+
+        # Check whether all running instancies are primary for the node. (This
+        # can no longer be done from _VerifyInstance below, since some of the
+        # wrong instances could be from other node groups.)
+        non_primary_inst = set(nimg.instances).difference(nimg.pinst)
+
+        for inst in non_primary_inst:
+          test = inst in self.all_inst_info
+          _ErrorIf(test, self.EINSTANCEWRONGNODE, inst,
+                   "instance should not run on node %s", node_i.name)
+          _ErrorIf(not test, self.ENODEORPHANINSTANCE, node_i.name,
+                   "node is running unknown instance %s", inst)
+
+    for node, result in extra_lv_nvinfo.items():
+      self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
+                              node_image[node], vg_name)
 
     feedback_fn("* Verifying instance status")
-    for instance in instancelist:
+    for instance in self.my_inst_names:
       if verbose:
         feedback_fn("* Verifying instance %s" % instance)
-      inst_config = instanceinfo[instance]
+      inst_config = self.my_inst_info[instance]
       self._VerifyInstance(instance, inst_config, node_image,
                            instdisk[instance])
       inst_nodes_offline = []
@@ -2307,8 +2688,10 @@ class LUClusterVerify(LogicalUnit):
                self.ENODERPC, pnode, "instance %s, connection to"
                " primary node failed", instance)
 
-      _ErrorIf(pnode_img.offline, self.EINSTANCEBADNODE, instance,
-               "instance lives on offline node %s", inst_config.primary_node)
+      _ErrorIf(inst_config.admin_up and pnode_img.offline,
+               self.EINSTANCEBADNODE, instance,
+               "instance is marked as running and lives on offline node %s",
+               inst_config.primary_node)
 
       # If the instance is non-redundant we cannot survive losing its primary
       # node, so we are not N+1 compliant. On the other hand we have no disk
@@ -2329,7 +2712,7 @@ class LUClusterVerify(LogicalUnit):
         instance_groups = {}
 
         for node in instance_nodes:
-          instance_groups.setdefault(nodeinfo_byname[node].group,
+          instance_groups.setdefault(self.all_node_info[node].group,
                                      []).append(node)
 
         pretty_list = [
@@ -2368,14 +2751,22 @@ class LUClusterVerify(LogicalUnit):
 
     feedback_fn("* Verifying orphan volumes")
     reserved = utils.FieldSet(*cluster.reserved_lvs)
-    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
 
-    feedback_fn("* Verifying orphan instances")
-    self._VerifyOrphanInstances(instancelist, node_image)
+    # We will get spurious "unknown volume" warnings if any node of this group
+    # is secondary for an instance whose primary is in another group. To avoid
+    # them, we find these instances and add their volumes to node_vol_should.
+    for inst in self.all_inst_info.values():
+      for secondary in inst.secondary_nodes:
+        if (secondary in self.my_node_info
+            and inst.name not in self.my_inst_info):
+          inst.MapLVsByNode(node_vol_should)
+          break
+
+    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
 
     if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
       feedback_fn("* Verifying N+1 Memory redundancy")
-      self._VerifyNPlusOneMemory(node_image, instanceinfo)
+      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
 
     feedback_fn("* Other Notes")
     if i_non_redundant:
@@ -2516,10 +2907,7 @@ class LUClusterRepairDiskSizes(NoHooksLU):
 
   def ExpandNames(self):
     if self.op.instances:
-      self.wanted_names = []
-      for name in self.op.instances:
-        full_name = _ExpandInstanceName(self.cfg, name)
-        self.wanted_names.append(full_name)
+      self.wanted_names = _GetWantedInstances(self, self.op.instances)
       self.needed_locks = {
         locking.LEVEL_NODE: [],
         locking.LEVEL_INSTANCE: self.wanted_names,
@@ -2531,7 +2919,7 @@ class LUClusterRepairDiskSizes(NoHooksLU):
         locking.LEVEL_NODE: locking.ALL_SET,
         locking.LEVEL_INSTANCE: locking.ALL_SET,
         }
-    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
+    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
 
   def DeclareLocks(self, level):
     if level == locking.LEVEL_NODE and self.wanted_names is not None:
@@ -2544,7 +2932,7 @@ class LUClusterRepairDiskSizes(NoHooksLU):
 
     """
     if self.wanted_names is None:
-      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
+      self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE)
 
     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
                              in self.wanted_names]
@@ -2636,13 +3024,16 @@ class LUClusterRename(LogicalUnit):
     """Build hooks env.
 
     """
-    env = {
+    return {
       "OP_TARGET": self.cfg.GetClusterName(),
       "NEW_NAME": self.op.name,
       }
-    mn = self.cfg.GetMasterNode()
-    all_nodes = self.cfg.GetNodeList()
-    return env, [mn], all_nodes
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
+    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
 
   def CheckPrereq(self):
     """Verify that the passed name is a valid one.
@@ -2736,12 +3127,17 @@ class LUClusterSetParams(LogicalUnit):
     """Build hooks env.
 
     """
-    env = {
+    return {
       "OP_TARGET": self.cfg.GetClusterName(),
       "NEW_VG_NAME": self.op.vg_name,
       }
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
     mn = self.cfg.GetMasterNode()
-    return env, [mn], [mn]
+    return ([mn], [mn])
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -2761,7 +3157,7 @@ class LUClusterSetParams(LogicalUnit):
                                    " drbd-based instances exist",
                                    errors.ECODE_INVAL)
 
-    node_list = self.acquired_locks[locking.LEVEL_NODE]
+    node_list = self.glm.list_owned(locking.LEVEL_NODE)
 
     # if vg_name not None, checks given volume group on all nodes
     if self.op.vg_name:
@@ -2808,6 +3204,12 @@ class LUClusterSetParams(LogicalUnit):
       utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
       self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
 
+      # TODO: we need a more general way to handle resetting
+      # cluster-level parameters to default values
+      if self.new_ndparams["oob_program"] == "":
+        self.new_ndparams["oob_program"] = \
+            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
+
     if self.op.nicparams:
       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
       self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
@@ -2830,8 +3232,8 @@ class LUClusterSetParams(LogicalUnit):
           # if we're moving instances to routed, check that they have an ip
           target_mode = params_filled[constants.NIC_MODE]
           if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
-            nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
-                              (instance.name, nic_idx))
+            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
+                              " address" % (instance.name, nic_idx))
       if nic_errors:
         raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
                                    "\n".join(nic_errors))
@@ -3049,6 +3451,50 @@ def _UploadHelper(lu, nodes, fname):
         lu.proc.LogWarning(msg)
 
 
+def _ComputeAncillaryFiles(cluster, redist):
+  """Compute files external to Ganeti which need to be consistent.
+
+  @type redist: boolean
+  @param redist: Whether to include files which need to be redistributed
+
+  """
+  # Compute files for all nodes
+  files_all = set([
+    constants.SSH_KNOWN_HOSTS_FILE,
+    constants.CONFD_HMAC_KEY,
+    constants.CLUSTER_DOMAIN_SECRET_FILE,
+    ])
+
+  if not redist:
+    files_all.update(constants.ALL_CERT_FILES)
+    files_all.update(ssconf.SimpleStore().GetFileList())
+
+  if cluster.modify_etc_hosts:
+    files_all.add(constants.ETC_HOSTS)
+
+  # Files which must either exist on all nodes or on none
+  files_all_opt = set([
+    constants.RAPI_USERS_FILE,
+    ])
+
+  # Files which should only be on master candidates
+  files_mc = set()
+  if not redist:
+    files_mc.add(constants.CLUSTER_CONF_FILE)
+
+  # Files which should only be on VM-capable nodes
+  files_vm = set(filename
+    for hv_name in cluster.enabled_hypervisors
+    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles())
+
+  # Filenames must be unique
+  assert (len(files_all | files_all_opt | files_mc | files_vm) ==
+          sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
+         "Found file listed in more than one file list"
+
+  return (files_all, files_all_opt, files_mc, files_vm)
+
+
 def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
   """Distribute additional files which are part of the cluster configuration.
 
@@ -3062,40 +3508,42 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
   @param additional_vm: whether the additional nodes are vm-capable or not
 
   """
-  # 1. Gather target nodes
-  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
-  dist_nodes = lu.cfg.GetOnlineNodeList()
-  nvm_nodes = lu.cfg.GetNonVmCapableNodeList()
-  vm_nodes = [name for name in dist_nodes if name not in nvm_nodes]
+  # Gather target nodes
+  cluster = lu.cfg.GetClusterInfo()
+  master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
+
+  online_nodes = lu.cfg.GetOnlineNodeList()
+  vm_nodes = lu.cfg.GetVmCapableNodeList()
+
   if additional_nodes is not None:
-    dist_nodes.extend(additional_nodes)
+    online_nodes.extend(additional_nodes)
     if additional_vm:
       vm_nodes.extend(additional_nodes)
-  if myself.name in dist_nodes:
-    dist_nodes.remove(myself.name)
-  if myself.name in vm_nodes:
-    vm_nodes.remove(myself.name)
-
-  # 2. Gather files to distribute
-  dist_files = set([constants.ETC_HOSTS,
-                    constants.SSH_KNOWN_HOSTS_FILE,
-                    constants.RAPI_CERT_FILE,
-                    constants.RAPI_USERS_FILE,
-                    constants.CONFD_HMAC_KEY,
-                    constants.CLUSTER_DOMAIN_SECRET_FILE,
-                   ])
-
-  vm_files = set()
-  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
-  for hv_name in enabled_hypervisors:
-    hv_class = hypervisor.GetHypervisor(hv_name)
-    vm_files.update(hv_class.GetAncillaryFiles())
-
-  # 3. Perform the files upload
-  for fname in dist_files:
-    _UploadHelper(lu, dist_nodes, fname)
-  for fname in vm_files:
-    _UploadHelper(lu, vm_nodes, fname)
+
+  # Never distribute to master node
+  for nodelist in [online_nodes, vm_nodes]:
+    if master_info.name in nodelist:
+      nodelist.remove(master_info.name)
+
+  # Gather file lists
+  (files_all, files_all_opt, files_mc, files_vm) = \
+    _ComputeAncillaryFiles(cluster, True)
+
+  # Never re-distribute configuration file from here
+  assert not (constants.CLUSTER_CONF_FILE in files_all or
+              constants.CLUSTER_CONF_FILE in files_vm)
+  assert not files_mc, "Master candidates not handled in this function"
+
+  filemap = [
+    (online_nodes, files_all),
+    (online_nodes, files_all_opt),
+    (vm_nodes, files_vm),
+    ]
+
+  # Upload the files
+  for (node_list, files) in filemap:
+    for fname in files:
+      _UploadHelper(lu, node_list, fname)
 
 
 class LUClusterRedistConf(NoHooksLU):
@@ -3236,6 +3684,20 @@ class LUOobCommand(NoHooksLU):
   REG_BGL = False
   _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
 
+  def ExpandNames(self):
+    """Gather locks we need.
+
+    """
+    if self.op.node_names:
+      self.op.node_names = _GetWantedNodes(self, self.op.node_names)
+      lock_names = self.op.node_names
+    else:
+      lock_names = locking.ALL_SET
+
+    self.needed_locks = {
+      locking.LEVEL_NODE: lock_names,
+      }
+
   def CheckPrereq(self):
     """Check prerequisites.
 
@@ -3252,23 +3714,23 @@ class LUOobCommand(NoHooksLU):
     assert self.op.power_delay >= 0.0
 
     if self.op.node_names:
-      if self.op.command in self._SKIP_MASTER:
-        if self.master_node in self.op.node_names:
-          master_node_obj = self.cfg.GetNodeInfo(self.master_node)
-          master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
-
-          if master_oob_handler:
-            additional_text = ("Run '%s %s %s' if you want to operate on the"
-                               " master regardless") % (master_oob_handler,
-                                                        self.op.command,
-                                                        self.master_node)
-          else:
-            additional_text = "The master node does not support out-of-band"
+      if (self.op.command in self._SKIP_MASTER and
+          self.master_node in self.op.node_names):
+        master_node_obj = self.cfg.GetNodeInfo(self.master_node)
+        master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
+
+        if master_oob_handler:
+          additional_text = ("run '%s %s %s' if you want to operate on the"
+                             " master regardless") % (master_oob_handler,
+                                                      self.op.command,
+                                                      self.master_node)
+        else:
+          additional_text = "it does not support out-of-band operations"
 
-          raise errors.OpPrereqError(("Operating on the master node %s is not"
-                                      " allowed for %s\n%s") %
-                                     (self.master_node, self.op.command,
-                                      additional_text), errors.ECODE_INVAL)
+        raise errors.OpPrereqError(("Operating on the master node %s is not"
+                                    " allowed for %s; %s") %
+                                   (self.master_node, self.op.command,
+                                    additional_text), errors.ECODE_INVAL)
     else:
       self.op.node_names = self.cfg.GetNodeList()
       if self.op.command in self._SKIP_MASTER:
@@ -3283,29 +3745,14 @@ class LUOobCommand(NoHooksLU):
       if node is None:
         raise errors.OpPrereqError("Node %s not found" % node_name,
                                    errors.ECODE_NOENT)
-      else:
-        self.nodes.append(node)
-
-      if (not self.op.ignore_status and
-          (self.op.command == constants.OOB_POWER_OFF and not node.offline)):
-        raise errors.OpPrereqError(("Cannot power off node %s because it is"
-                                    " not marked offline") % node_name,
-                                   errors.ECODE_STATE)
-
-  def ExpandNames(self):
-    """Gather locks we need.
-
-    """
-    if self.op.node_names:
-      self.op.node_names = [_ExpandNodeName(self.cfg, name)
-                            for name in self.op.node_names]
-      lock_names = self.op.node_names
-    else:
-      lock_names = locking.ALL_SET
+      else:
+        self.nodes.append(node)
 
-    self.needed_locks = {
-      locking.LEVEL_NODE: lock_names,
-      }
+      if (not self.op.ignore_status and
+          (self.op.command == constants.OOB_POWER_OFF and not node.offline)):
+        raise errors.OpPrereqError(("Cannot power off node %s because it is"
+                                    " not marked offline") % node_name,
+                                   errors.ECODE_STATE)
 
   def Exec(self, feedback_fn):
     """Execute OOB and return result if we expect any.
@@ -3314,7 +3761,8 @@ class LUOobCommand(NoHooksLU):
     master_node = self.master_node
     ret = []
 
-    for idx, node in enumerate(self.nodes):
+    for idx, node in enumerate(utils.NiceSort(self.nodes,
+                                              key=lambda node: node.name)):
       node_entry = [(constants.RS_NORMAL, node.name)]
       ret.append(node_entry)
 
@@ -3331,14 +3779,14 @@ class LUOobCommand(NoHooksLU):
                                      self.op.timeout)
 
       if result.fail_msg:
-        self.LogWarning("On node '%s' out-of-band RPC failed with: %s",
+        self.LogWarning("Out-of-band RPC failed on node '%s': %s",
                         node.name, result.fail_msg)
         node_entry.append((constants.RS_NODATA, None))
       else:
         try:
           self._CheckPayload(result)
         except errors.OpExecError, err:
-          self.LogWarning("The payload returned by '%s' is not valid: %s",
+          self.LogWarning("Payload returned by node '%s' is not valid: %s",
                           node.name, err)
           node_entry.append((constants.RS_NODATA, None))
         else:
@@ -3347,8 +3795,8 @@ class LUOobCommand(NoHooksLU):
             for item, status in result.payload:
               if status in [constants.OOB_STATUS_WARNING,
                             constants.OOB_STATUS_CRITICAL]:
-                self.LogWarning("On node '%s' item '%s' has status '%s'",
-                                node.name, item, status)
+                self.LogWarning("Item '%s' on node '%s' has status '%s'",
+                                item, node.name, status)
 
           if self.op.command == constants.OOB_POWER_ON:
             node.powered = True
@@ -3477,12 +3925,10 @@ class _OsQuery(_QueryBase):
 
     """
     # Locking is not used
-    assert not (lu.acquired_locks or self.do_locking or self.use_locking)
-
-    # Used further down
-    assert "valid" in self.FIELDS
-    assert "hidden" in self.FIELDS
-    assert "blacklisted" in self.FIELDS
+    assert not (compat.any(lu.glm.is_owned(level)
+                           for level in locking.LEVELS
+                           if level != locking.LEVEL_CLUSTER) or
+                self.do_locking or self.use_locking)
 
     valid_nodes = [node.name
                    for node in lu.cfg.GetAllNodesInfo().values()
@@ -3490,9 +3936,6 @@ class _OsQuery(_QueryBase):
     pol = self._DiagnoseByOS(lu.rpc.call_os_diagnose(valid_nodes))
     cluster = lu.cfg.GetClusterInfo()
 
-    # Build list of used field names
-    fields = [fdef.name for fdef in self.query.GetFields()]
-
     data = {}
 
     for (os_name, os_data) in pol.items():
@@ -3525,12 +3968,6 @@ class _OsQuery(_QueryBase):
       info.parameters = list(parameters)
       info.api_versions = list(api_versions)
 
-      # TODO: Move this to filters provided by the client
-      if (("hidden" not in fields and info.hidden) or
-          ("blacklisted" not in fields and info.blacklisted) or
-          ("valid" not in fields and not info.valid)):
-        continue
-
       data[os_name] = info
 
     # Prepare data in requested order
@@ -3544,8 +3981,35 @@ class LUOsDiagnose(NoHooksLU):
   """
   REQ_BGL = False
 
+  @staticmethod
+  def _BuildFilter(fields, names):
+    """Builds a filter for querying OSes.
+
+    """
+    name_filter = qlang.MakeSimpleFilter("name", names)
+
+    # Legacy behaviour: Hide hidden, blacklisted or invalid OSes if the
+    # respective field is not requested
+    status_filter = [[qlang.OP_NOT, [qlang.OP_TRUE, fname]]
+                     for fname in ["hidden", "blacklisted"]
+                     if fname not in fields]
+    if "valid" not in fields:
+      status_filter.append([qlang.OP_TRUE, "valid"])
+
+    if status_filter:
+      status_filter.insert(0, qlang.OP_AND)
+    else:
+      status_filter = None
+
+    if name_filter and status_filter:
+      return [qlang.OP_AND, name_filter, status_filter]
+    elif name_filter:
+      return name_filter
+    else:
+      return status_filter
+
   def CheckArguments(self):
-    self.oq = _OsQuery(qlang.MakeSimpleFilter("name", self.op.names),
+    self.oq = _OsQuery(self._BuildFilter(self.op.output_fields, self.op.names),
                        self.op.output_fields, False)
 
   def ExpandNames(self):
@@ -3569,17 +4033,22 @@ class LUNodeRemove(LogicalUnit):
     node would then be impossible to remove.
 
     """
-    env = {
+    return {
       "OP_TARGET": self.op.node_name,
       "NODE_NAME": self.op.node_name,
       }
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
     all_nodes = self.cfg.GetNodeList()
     try:
       all_nodes.remove(self.op.node_name)
     except ValueError:
-      logging.warning("Node %s which is about to be removed not found"
-                      " in the all nodes list", self.op.node_name)
-    return env, all_nodes, all_nodes
+      logging.warning("Node '%s', which is about to be removed, was not found"
+                      " in the list of all nodes", self.op.node_name)
+    return (all_nodes, all_nodes)
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -3600,15 +4069,14 @@ class LUNodeRemove(LogicalUnit):
 
     masternode = self.cfg.GetMasterNode()
     if node.name == masternode:
-      raise errors.OpPrereqError("Node is the master node,"
-                                 " you need to failover first.",
-                                 errors.ECODE_INVAL)
+      raise errors.OpPrereqError("Node is the master node, failover to another"
+                                 " node is required", errors.ECODE_INVAL)
 
     for instance_name in instance_list:
       instance = self.cfg.GetInstanceInfo(instance_name)
       if node.name in instance.all_nodes:
         raise errors.OpPrereqError("Instance %s is still running on the node,"
-                                   " please remove first." % instance_name,
+                                   " please remove first" % instance_name,
                                    errors.ECODE_INVAL)
     self.op.node_name = node.name
     self.node = node
@@ -3628,12 +4096,7 @@ class LUNodeRemove(LogicalUnit):
     self.context.RemoveNode(node.name)
 
     # Run post hooks on the node before it's removed
-    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
-    try:
-      hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
-    except:
-      # pylint: disable-msg=W0702
-      self.LogWarning("Errors occurred running hooks on %s" % node.name)
+    _RunPostHook(self, node.name)
 
     result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
     msg = result.fail_msg
@@ -3771,7 +4234,7 @@ class LUNodeQueryvols(NoHooksLU):
     """Computes the list of nodes and their attributes.
 
     """
-    nodenames = self.acquired_locks[locking.LEVEL_NODE]
+    nodenames = self.glm.list_owned(locking.LEVEL_NODE)
     volumes = self.rpc.call_node_volumes(nodenames)
 
     ilist = [self.cfg.GetInstanceInfo(iname) for iname
@@ -3849,7 +4312,7 @@ class LUNodeQueryStorage(NoHooksLU):
     """Computes the list of nodes and their attributes.
 
     """
-    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
+    self.nodes = self.glm.list_owned(locking.LEVEL_NODE)
 
     # Always get name to sort by
     if constants.SF_NAME in self.op.output_fields:
@@ -3961,10 +4424,16 @@ class _InstanceQuery(_QueryBase):
           bad_nodes.append(name)
         elif result.payload:
           for inst in result.payload:
-            if all_info[inst].primary_node == name:
-              live_data.update(result.payload)
+            if inst in all_info:
+              if all_info[inst].primary_node == name:
+                live_data.update(result.payload)
+              else:
+                wrongnode_inst.add(inst)
             else:
-              wrongnode_inst.add(inst)
+              # orphan instance; we don't list it here as we don't
+              # handle this case yet in the output of instance listing
+              logging.warning("Orphan instance '%s' found on node %s",
+                              inst, name)
         # else no instance is alive
     else:
       live_data = {}
@@ -3972,7 +4441,7 @@ class _InstanceQuery(_QueryBase):
     if query.IQ_DISKUSAGE in self.requested_data:
       disk_usage = dict((inst.name,
                          _ComputeDiskSize(inst.disk_template,
-                                          [{"size": disk.size}
+                                          [{constants.IDISK_SIZE: disk.size}
                                            for disk in inst.disks]))
                         for inst in instance_list)
     else:
@@ -4031,7 +4500,7 @@ class LUQueryFields(NoHooksLU):
     self.needed_locks = {}
 
   def Exec(self, feedback_fn):
-    return self.qcls.FieldsQuery(self.op.fields)
+    return query.QueryFields(self.qcls.FIELDS, self.op.fields)
 
 
 class LUNodeModifyStorage(NoHooksLU):
@@ -4090,6 +4559,11 @@ class LUNodeAdd(LogicalUnit):
     self.hostname = netutils.GetHostname(name=self.op.node_name,
                                          family=self.primary_ip_family)
     self.op.node_name = self.hostname.name
+
+    if self.op.readd and self.op.node_name == self.cfg.GetMasterNode():
+      raise errors.OpPrereqError("Cannot readd the master node",
+                                 errors.ECODE_STATE)
+
     if self.op.readd and self.op.group:
       raise errors.OpPrereqError("Cannot pass a node group when a node is"
                                  " being readded", errors.ECODE_INVAL)
@@ -4100,7 +4574,7 @@ class LUNodeAdd(LogicalUnit):
     This will run on all nodes before, and on all nodes + the new node after.
 
     """
-    env = {
+    return {
       "OP_TARGET": self.op.node_name,
       "NODE_NAME": self.op.node_name,
       "NODE_PIP": self.op.primary_ip,
@@ -4108,9 +4582,16 @@ class LUNodeAdd(LogicalUnit):
       "MASTER_CAPABLE": str(self.op.master_capable),
       "VM_CAPABLE": str(self.op.vm_capable),
       }
-    nodes_0 = self.cfg.GetNodeList()
-    nodes_1 = nodes_0 + [self.op.node_name, ]
-    return env, nodes_0, nodes_1
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
+    # Exclude added node
+    pre_nodes = list(set(self.cfg.GetNodeList()) - set([self.op.node_name]))
+    post_nodes = pre_nodes + [self.op.node_name, ]
+
+    return (pre_nodes, post_nodes)
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -4318,7 +4799,7 @@ class LUNodeAdd(LogicalUnit):
           feedback_fn("ssh/hostname verification failed"
                       " (checking from %s): %s" %
                       (verifier, nl_payload[failed]))
-        raise errors.OpExecError("ssh/hostname verification failed.")
+        raise errors.OpExecError("ssh/hostname verification failed")
 
     if self.op.readd:
       _RedistributeAncillaryFiles(self)
@@ -4401,21 +4882,22 @@ class LUNodeSetParams(LogicalUnit):
     # If we have locked all instances, before waiting to lock nodes, release
     # all the ones living on nodes unrelated to the current operation.
     if level == locking.LEVEL_NODE and self.lock_instances:
-      instances_release = []
-      instances_keep = []
       self.affected_instances = []
       if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
-        for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
+        instances_keep = []
+
+        # Build list of instances to release
+        for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE):
           instance = self.context.cfg.GetInstanceInfo(instance_name)
-          i_mirrored = instance.disk_template in constants.DTS_INT_MIRROR
-          if i_mirrored and self.op.node_name in instance.all_nodes:
+          if (instance.disk_template in constants.DTS_INT_MIRROR and
+              self.op.node_name in instance.all_nodes):
             instances_keep.append(instance_name)
             self.affected_instances.append(instance)
-          else:
-            instances_release.append(instance_name)
-        if instances_release:
-          self.context.glm.release(locking.LEVEL_INSTANCE, instances_release)
-          self.acquired_locks[locking.LEVEL_INSTANCE] = instances_keep
+
+        _ReleaseLocks(self, locking.LEVEL_INSTANCE, keep=instances_keep)
+
+        assert (set(self.glm.list_owned(locking.LEVEL_INSTANCE)) ==
+                set(instances_keep))
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -4423,7 +4905,7 @@ class LUNodeSetParams(LogicalUnit):
     This runs on the master node.
 
     """
-    env = {
+    return {
       "OP_TARGET": self.op.node_name,
       "MASTER_CANDIDATE": str(self.op.master_candidate),
       "OFFLINE": str(self.op.offline),
@@ -4431,9 +4913,13 @@ class LUNodeSetParams(LogicalUnit):
       "MASTER_CAPABLE": str(self.op.master_capable),
       "VM_CAPABLE": str(self.op.vm_capable),
       }
-    nl = [self.cfg.GetMasterNode(),
-          self.op.node_name]
-    return env, nl, nl
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
+    nl = [self.cfg.GetMasterNode(), self.op.node_name]
+    return (nl, nl)
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -4477,7 +4963,7 @@ class LUNodeSetParams(LogicalUnit):
 
     self.old_flags = old_flags = (node.master_candidate,
                                   node.drained, node.offline)
-    assert old_flags in self._F2R, "Un-handled old flags  %s" % str(old_flags)
+    assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
     self.old_role = old_role = self._F2R[old_flags]
 
     # Check for ineffective changes
@@ -4493,12 +4979,12 @@ class LUNodeSetParams(LogicalUnit):
     if _SupportsOob(self.cfg, node):
       if self.op.offline is False and not (node.powered or
                                            self.op.powered == True):
-        raise errors.OpPrereqError(("Please power on node %s first before you"
-                                    " can reset offline state") %
+        raise errors.OpPrereqError(("Node %s needs to be turned on before its"
+                                    " offline status can be reset") %
                                    self.op.node_name)
     elif self.op.powered is not None:
       raise errors.OpPrereqError(("Unable to change powered state for node %s"
-                                  " which does not support out-of-band"
+                                  " as it does not support out-of-band"
                                   " handling") % self.op.node_name)
 
     # If we're being deofflined/drained, we'll MC ourself if needed
@@ -5127,9 +5613,17 @@ class LUInstanceStartup(LogicalUnit):
     env = {
       "FORCE": self.op.force,
       }
+
     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
+
+    return env
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
-    return env, nl, nl
+    return (nl, nl)
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -5184,7 +5678,8 @@ class LUInstanceStartup(LogicalUnit):
     instance = self.instance
     force = self.op.force
 
-    self.cfg.MarkInstanceUp(instance.name)
+    if not self.op.no_remember:
+      self.cfg.MarkInstanceUp(instance.name)
 
     if self.primary_offline:
       assert self.op.ignore_offline_nodes
@@ -5224,9 +5719,17 @@ class LUInstanceReboot(LogicalUnit):
       "REBOOT_TYPE": self.op.reboot_type,
       "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
       }
+
     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
+
+    return env
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
-    return env, nl, nl
+    return (nl, nl)
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -5306,8 +5809,14 @@ class LUInstanceShutdown(LogicalUnit):
     """
     env = _BuildInstanceHookEnvByObject(self, self.instance)
     env["TIMEOUT"] = self.op.timeout
+    return env
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
-    return env, nl, nl
+    return (nl, nl)
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -5335,7 +5844,8 @@ class LUInstanceShutdown(LogicalUnit):
     node_current = instance.primary_node
     timeout = self.op.timeout
 
-    self.cfg.MarkInstanceDown(instance.name)
+    if not self.op.no_remember:
+      self.cfg.MarkInstanceDown(instance.name)
 
     if self.primary_offline:
       assert self.op.ignore_offline_nodes
@@ -5366,9 +5876,14 @@ class LUInstanceReinstall(LogicalUnit):
     This runs on master, primary and secondary nodes of the instance.
 
     """
-    env = _BuildInstanceHookEnvByObject(self, self.instance)
+    return _BuildInstanceHookEnvByObject(self, self.instance)
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
-    return env, nl, nl
+    return (nl, nl)
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -5443,8 +5958,25 @@ class LUInstanceRecreateDisks(LogicalUnit):
   HTYPE = constants.HTYPE_INSTANCE
   REQ_BGL = False
 
+  def CheckArguments(self):
+    # normalise the disk list
+    self.op.disks = sorted(frozenset(self.op.disks))
+
   def ExpandNames(self):
     self._ExpandAndLockInstance()
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+    if self.op.nodes:
+      self.op.nodes = [_ExpandNodeName(self.cfg, n) for n in self.op.nodes]
+      self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
+    else:
+      self.needed_locks[locking.LEVEL_NODE] = []
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      # if we replace the nodes, we only need to lock the old primary,
+      # otherwise we need to lock all nodes for disk re-creation
+      primary_only = bool(self.op.nodes)
+      self._LockInstancesNodes(primary_only=primary_only)
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -5452,9 +5984,14 @@ class LUInstanceRecreateDisks(LogicalUnit):
     This runs on master, primary and secondary nodes of the instance.
 
     """
-    env = _BuildInstanceHookEnvByObject(self, self.instance)
+    return _BuildInstanceHookEnvByObject(self, self.instance)
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
-    return env, nl, nl
+    return (nl, nl)
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -5465,32 +6002,72 @@ class LUInstanceRecreateDisks(LogicalUnit):
     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
-    _CheckNodeOnline(self, instance.primary_node)
+    if self.op.nodes:
+      if len(self.op.nodes) != len(instance.all_nodes):
+        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
+                                   " %d replacement nodes were specified" %
+                                   (instance.name, len(instance.all_nodes),
+                                    len(self.op.nodes)),
+                                   errors.ECODE_INVAL)
+      assert instance.disk_template != constants.DT_DRBD8 or \
+          len(self.op.nodes) == 2
+      assert instance.disk_template != constants.DT_PLAIN or \
+          len(self.op.nodes) == 1
+      primary_node = self.op.nodes[0]
+    else:
+      primary_node = instance.primary_node
+    _CheckNodeOnline(self, primary_node)
 
     if instance.disk_template == constants.DT_DISKLESS:
       raise errors.OpPrereqError("Instance '%s' has no disks" %
                                  self.op.instance_name, errors.ECODE_INVAL)
-    _CheckInstanceDown(self, instance, "cannot recreate disks")
+    # if we replace nodes *and* the old primary is offline, we don't
+    # check
+    assert instance.primary_node in self.needed_locks[locking.LEVEL_NODE]
+    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
+    if not (self.op.nodes and old_pnode.offline):
+      _CheckInstanceDown(self, instance, "cannot recreate disks")
 
     if not self.op.disks:
       self.op.disks = range(len(instance.disks))
     else:
       for idx in self.op.disks:
         if idx >= len(instance.disks):
-          raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx,
+          raise errors.OpPrereqError("Invalid disk index '%s'" % idx,
                                      errors.ECODE_INVAL)
-
+    if self.op.disks != range(len(instance.disks)) and self.op.nodes:
+      raise errors.OpPrereqError("Can't recreate disks partially and"
+                                 " change the nodes at the same time",
+                                 errors.ECODE_INVAL)
     self.instance = instance
 
   def Exec(self, feedback_fn):
     """Recreate the disks.
 
     """
+    # change primary node, if needed
+    if self.op.nodes:
+      self.instance.primary_node = self.op.nodes[0]
+      self.LogWarning("Changing the instance's nodes, you will have to"
+                      " remove any disks left on the older nodes manually")
+
     to_skip = []
-    for idx, _ in enumerate(self.instance.disks):
+    for idx, disk in enumerate(self.instance.disks):
       if idx not in self.op.disks: # disk idx has not been passed in
         to_skip.append(idx)
         continue
+      # update secondaries for disks, if needed
+      if self.op.nodes:
+        if disk.dev_type == constants.LD_DRBD8:
+          # need to update the nodes
+          assert len(self.op.nodes) == 2
+          logical_id = list(disk.logical_id)
+          logical_id[0] = self.op.nodes[0]
+          logical_id[1] = self.op.nodes[1]
+          disk.logical_id = tuple(logical_id)
+
+    if self.op.nodes:
+      self.cfg.Update(self.instance, feedback_fn)
 
     _CreateDisks(self, self.instance, to_skip=to_skip)
 
@@ -5508,7 +6085,7 @@ class LUInstanceRename(LogicalUnit):
     """
     if self.op.ip_check and not self.op.name_check:
       # TODO: make the ip check more flexible and not depend on the name check
-      raise errors.OpPrereqError("Cannot do ip check without a name check",
+      raise errors.OpPrereqError("IP address check requires a name check",
                                  errors.ECODE_INVAL)
 
   def BuildHooksEnv(self):
@@ -5519,8 +6096,14 @@ class LUInstanceRename(LogicalUnit):
     """
     env = _BuildInstanceHookEnvByObject(self, self.instance)
     env["INSTANCE_NEW_NAME"] = self.op.new_name
+    return env
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
-    return env, nl, nl
+    return (nl, nl)
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -5539,8 +6122,9 @@ class LUInstanceRename(LogicalUnit):
     new_name = self.op.new_name
     if self.op.name_check:
       hostname = netutils.GetHostname(name=new_name)
-      self.LogInfo("Resolved given name '%s' to '%s'", new_name,
-                   hostname.name)
+      if hostname != new_name:
+        self.LogInfo("Resolved given name '%s' to '%s'", new_name,
+                     hostname.name)
       if not utils.MatchNameComponent(self.op.new_name, [hostname.name]):
         raise errors.OpPrereqError(("Resolved hostname '%s' does not look the"
                                     " same as given hostname '%s'") %
@@ -5572,9 +6156,11 @@ class LUInstanceRename(LogicalUnit):
       rename_file_storage = True
 
     self.cfg.RenameInstance(inst.name, self.op.new_name)
-    # Change the instance lock. This is definitely safe while we hold the BGL
-    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
-    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
+    # Change the instance lock. This is definitely safe while we hold the BGL.
+    # Otherwise the new lock would have to be added in acquired mode.
+    assert self.REQ_BGL
+    self.glm.remove(locking.LEVEL_INSTANCE, old_name)
+    self.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
 
     # re-read the instance from the configuration after rename
     inst = self.cfg.GetInstanceInfo(self.op.new_name)
@@ -5630,9 +6216,15 @@ class LUInstanceRemove(LogicalUnit):
     """
     env = _BuildInstanceHookEnvByObject(self, self.instance)
     env["SHUTDOWN_TIMEOUT"] = self.op.shutdown_timeout
+    return env
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
     nl = [self.cfg.GetMasterNode()]
     nl_post = list(self.instance.all_nodes) + nl
-    return env, nl, nl_post
+    return (nl, nl_post)
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -5733,6 +6325,15 @@ class LUInstanceFailover(LogicalUnit):
     self.needed_locks[locking.LEVEL_NODE] = []
     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
 
+    ignore_consistency = self.op.ignore_consistency
+    shutdown_timeout = self.op.shutdown_timeout
+    self._migrater = TLMigrateInstance(self, self.op.instance_name,
+                                       cleanup=False,
+                                       failover=True,
+                                       ignore_consistency=ignore_consistency,
+                                       shutdown_timeout=shutdown_timeout)
+    self.tasklets = [self._migrater]
+
   def DeclareLocks(self, level):
     if level == locking.LEVEL_NODE:
       instance = self.context.cfg.GetInstanceInfo(self.op.instance_name)
@@ -5752,13 +6353,14 @@ class LUInstanceFailover(LogicalUnit):
     This runs on master, primary and secondary nodes of the instance.
 
     """
-    instance = self.instance
+    instance = self._migrater.instance
     source_node = instance.primary_node
+    target_node = self.op.target_node
     env = {
       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
       "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
       "OLD_PRIMARY": source_node,
-      "NEW_PRIMARY": self.op.target_node,
+      "NEW_PRIMARY": target_node,
       }
 
     if instance.disk_template in constants.DTS_INT_MIRROR:
@@ -5768,173 +6370,16 @@ class LUInstanceFailover(LogicalUnit):
       env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
 
     env.update(_BuildInstanceHookEnvByObject(self, instance))
-    nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
-    nl_post = list(nl)
-    nl_post.append(source_node)
-    return env, nl, nl_post
-
-  def CheckPrereq(self):
-    """Check prerequisites.
-
-    This checks that the instance is in the cluster.
-
-    """
-    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
-    assert self.instance is not None, \
-      "Cannot retrieve locked instance %s" % self.op.instance_name
-
-    bep = self.cfg.GetClusterInfo().FillBE(instance)
-    if instance.disk_template not in constants.DTS_MIRRORED:
-      raise errors.OpPrereqError("Instance's disk layout is not"
-                                 " mirrored, cannot failover.",
-                                 errors.ECODE_STATE)
-
-    if instance.disk_template in constants.DTS_EXT_MIRROR:
-      _CheckIAllocatorOrNode(self, "iallocator", "target_node")
-      if self.op.iallocator:
-        self._RunAllocator()
-        # Release all unnecessary node locks
-        nodes_keep = [instance.primary_node, self.op.target_node]
-        nodes_rel = [node for node in self.acquired_locks[locking.LEVEL_NODE]
-                     if node not in nodes_keep]
-        self.context.glm.release(locking.LEVEL_NODE, nodes_rel)
-        self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
-
-      # self.op.target_node is already populated, either directly or by the
-      # iallocator run
-      target_node = self.op.target_node
-
-    else:
-      secondary_nodes = instance.secondary_nodes
-      if not secondary_nodes:
-        raise errors.ConfigurationError("No secondary node but using"
-                                        " %s disk template" %
-                                        instance.disk_template)
-      target_node = secondary_nodes[0]
-
-      if self.op.iallocator or (self.op.target_node and
-                                self.op.target_node != target_node):
-        raise errors.OpPrereqError("Instances with disk template %s cannot"
-                                   " be failed over to arbitrary nodes"
-                                   " (neither an iallocator nor a target"
-                                   " node can be passed)" %
-                                   instance.disk_template, errors.ECODE_INVAL)
-    _CheckNodeOnline(self, target_node)
-    _CheckNodeNotDrained(self, target_node)
-
-    # Save target_node so that we can use it in BuildHooksEnv
-    self.op.target_node = target_node
-
-    if instance.admin_up:
-      # check memory requirements on the secondary node
-      _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
-                           instance.name, bep[constants.BE_MEMORY],
-                           instance.hypervisor)
-    else:
-      self.LogInfo("Not checking memory on the secondary node as"
-                   " instance will not be started")
-
-    # check bridge existance
-    _CheckInstanceBridgesExist(self, instance, node=target_node)
-
-  def Exec(self, feedback_fn):
-    """Failover an instance.
-
-    The failover is done by shutting it down on its present node and
-    starting it on the secondary.
-
-    """
-    instance = self.instance
-    primary_node = self.cfg.GetNodeInfo(instance.primary_node)
-
-    source_node = instance.primary_node
-    target_node = self.op.target_node
-
-    if instance.admin_up:
-      feedback_fn("* checking disk consistency between source and target")
-      for dev in instance.disks:
-        # for drbd, these are drbd over lvm
-        if not _CheckDiskConsistency(self, dev, target_node, False):
-          if not self.op.ignore_consistency:
-            raise errors.OpExecError("Disk %s is degraded on target node,"
-                                     " aborting failover." % dev.iv_name)
-    else:
-      feedback_fn("* not checking disk consistency as instance is not running")
-
-    feedback_fn("* shutting down instance on source node")
-    logging.info("Shutting down instance %s on node %s",
-                 instance.name, source_node)
-
-    result = self.rpc.call_instance_shutdown(source_node, instance,
-                                             self.op.shutdown_timeout)
-    msg = result.fail_msg
-    if msg:
-      if self.op.ignore_consistency or primary_node.offline:
-        self.proc.LogWarning("Could not shutdown instance %s on node %s."
-                             " Proceeding anyway. Please make sure node"
-                             " %s is down. Error details: %s",
-                             instance.name, source_node, source_node, msg)
-      else:
-        raise errors.OpExecError("Could not shutdown instance %s on"
-                                 " node %s: %s" %
-                                 (instance.name, source_node, msg))
-
-    feedback_fn("* deactivating the instance's disks on source node")
-    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
-      raise errors.OpExecError("Can't shut down the instance's disks.")
-
-    instance.primary_node = target_node
-    # distribute new instance config to the other nodes
-    self.cfg.Update(instance, feedback_fn)
-
-    # Only start the instance if it's marked as up
-    if instance.admin_up:
-      feedback_fn("* activating the instance's disks on target node")
-      logging.info("Starting instance %s on node %s",
-                   instance.name, target_node)
-
-      disks_ok, _ = _AssembleInstanceDisks(self, instance,
-                                           ignore_secondaries=True)
-      if not disks_ok:
-        _ShutdownInstanceDisks(self, instance)
-        raise errors.OpExecError("Can't activate the instance's disks")
 
-      feedback_fn("* starting the instance on the target node")
-      result = self.rpc.call_instance_start(target_node, instance, None, None)
-      msg = result.fail_msg
-      if msg:
-        _ShutdownInstanceDisks(self, instance)
-        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
-                                 (instance.name, target_node, msg))
+    return env
 
-  def _RunAllocator(self):
-    """Run the allocator based on input opcode.
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
 
     """
-    ial = IAllocator(self.cfg, self.rpc,
-                     mode=constants.IALLOCATOR_MODE_RELOC,
-                     name=self.instance.name,
-                     # TODO See why hail breaks with a single node below
-                     relocate_from=[self.instance.primary_node,
-                                    self.instance.primary_node],
-                     )
-
-    ial.Run(self.op.iallocator)
-
-    if not ial.success:
-      raise errors.OpPrereqError("Can't compute nodes using"
-                                 " iallocator '%s': %s" %
-                                 (self.op.iallocator, ial.info),
-                                 errors.ECODE_NORES)
-    if len(ial.result) != ial.required_nodes:
-      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
-                                 " of nodes (%s), required %s" %
-                                 (self.op.iallocator, len(ial.result),
-                                  ial.required_nodes), errors.ECODE_FAULT)
-    self.op.target_node = ial.result[0]
-    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
-                 self.instance.name, self.op.iallocator,
-                 utils.CommaJoin(ial.result))
+    instance = self._migrater.instance
+    nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
+    return (nl, nl + [instance.primary_node])
 
 
 class LUInstanceMigrate(LogicalUnit):
@@ -5958,8 +6403,9 @@ class LUInstanceMigrate(LogicalUnit):
     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
 
     self._migrater = TLMigrateInstance(self, self.op.instance_name,
-                                       self.op.cleanup, self.op.iallocator,
-                                       self.op.target_node)
+                                       cleanup=self.op.cleanup,
+                                       failover=False,
+                                       fallback=self.op.allow_failover)
     self.tasklets = [self._migrater]
 
   def DeclareLocks(self, level):
@@ -5983,14 +6429,14 @@ class LUInstanceMigrate(LogicalUnit):
     """
     instance = self._migrater.instance
     source_node = instance.primary_node
-    target_node = self._migrater.target_node
+    target_node = self.op.target_node
     env = _BuildInstanceHookEnvByObject(self, instance)
-    env["MIGRATE_LIVE"] = self._migrater.live
-    env["MIGRATE_CLEANUP"] = self.op.cleanup
     env.update({
-        "OLD_PRIMARY": source_node,
-        "NEW_PRIMARY": target_node,
-        })
+      "MIGRATE_LIVE": self._migrater.live,
+      "MIGRATE_CLEANUP": self.op.cleanup,
+      "OLD_PRIMARY": source_node,
+      "NEW_PRIMARY": target_node,
+      })
 
     if instance.disk_template in constants.DTS_INT_MIRROR:
       env["OLD_SECONDARY"] = target_node
@@ -5998,10 +6444,15 @@ class LUInstanceMigrate(LogicalUnit):
     else:
       env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = None
 
+    return env
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
+    instance = self._migrater.instance
     nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
-    nl_post = list(nl)
-    nl_post.append(source_node)
-    return env, nl, nl_post
+    return (nl, nl + [instance.primary_node])
 
 
 class LUInstanceMove(LogicalUnit):
@@ -6034,9 +6485,18 @@ class LUInstanceMove(LogicalUnit):
       "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
       }
     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
-    nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node,
-                                       self.op.target_node]
-    return env, nl, nl
+    return env
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
+    nl = [
+      self.cfg.GetMasterNode(),
+      self.instance.primary_node,
+      self.op.target_node,
+      ]
+    return (nl, nl)
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -6205,8 +6665,7 @@ class LUNodeMigrate(LogicalUnit):
       logging.debug("Migrating instance %s", inst.name)
       names.append(inst.name)
 
-      tasklets.append(TLMigrateInstance(self, inst.name, False,
-                                        self.op.iallocator, None))
+      tasklets.append(TLMigrateInstance(self, inst.name, cleanup=False))
 
       if inst.disk_template in constants.DTS_EXT_MIRROR:
         # We need to lock all nodes, as the iallocator will choose the
@@ -6235,13 +6694,16 @@ class LUNodeMigrate(LogicalUnit):
     This runs on the master, the primary and all the secondaries.
 
     """
-    env = {
+    return {
       "NODE_NAME": self.op.node_name,
       }
 
-    nl = [self.cfg.GetMasterNode()]
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
 
-    return (env, nl, nl)
+    """
+    nl = [self.cfg.GetMasterNode()]
+    return (nl, nl)
 
 
 class TLMigrateInstance(Tasklet):
@@ -6250,10 +6712,28 @@ class TLMigrateInstance(Tasklet):
   @type live: boolean
   @ivar live: whether the migration will be done live or non-live;
       this variable is initalized only after CheckPrereq has run
+  @type cleanup: boolean
+  @ivar cleanup: Wheater we cleanup from a failed migration
+  @type iallocator: string
+  @ivar iallocator: The iallocator used to determine target_node
+  @type target_node: string
+  @ivar target_node: If given, the target_node to reallocate the instance to
+  @type failover: boolean
+  @ivar failover: Whether operation results in failover or migration
+  @type fallback: boolean
+  @ivar fallback: Whether fallback to failover is allowed if migration not
+                  possible
+  @type ignore_consistency: boolean
+  @ivar ignore_consistency: Wheter we should ignore consistency between source
+                            and target node
+  @type shutdown_timeout: int
+  @ivar shutdown_timeout: In case of failover timeout of the shutdown
 
   """
-  def __init__(self, lu, instance_name, cleanup,
-               iallocator=None, target_node=None):
+  def __init__(self, lu, instance_name, cleanup=False,
+               failover=False, fallback=False,
+               ignore_consistency=False,
+               shutdown_timeout=constants.DEFAULT_SHUTDOWN_TIMEOUT):
     """Initializes this class.
 
     """
@@ -6263,8 +6743,10 @@ class TLMigrateInstance(Tasklet):
     self.instance_name = instance_name
     self.cleanup = cleanup
     self.live = False # will be overridden later
-    self.iallocator = iallocator
-    self.target_node = target_node
+    self.failover = failover
+    self.fallback = fallback
+    self.ignore_consistency = ignore_consistency
+    self.shutdown_timeout = shutdown_timeout
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -6277,28 +6759,44 @@ class TLMigrateInstance(Tasklet):
     assert instance is not None
     self.instance = instance
 
+    if (not self.cleanup and not instance.admin_up and not self.failover and
+        self.fallback):
+      self.lu.LogInfo("Instance is marked down, fallback allowed, switching"
+                      " to failover")
+      self.failover = True
+
     if instance.disk_template not in constants.DTS_MIRRORED:
+      if self.failover:
+        text = "failovers"
+      else:
+        text = "migrations"
       raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
-                                 " migrations" % instance.disk_template,
+                                 " %s" % (instance.disk_template, text),
                                  errors.ECODE_STATE)
 
     if instance.disk_template in constants.DTS_EXT_MIRROR:
       _CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
 
-      if self.iallocator:
+      if self.lu.op.iallocator:
         self._RunAllocator()
+      else:
+        # We set set self.target_node as it is required by
+        # BuildHooksEnv
+        self.target_node = self.lu.op.target_node
 
       # self.target_node is already populated, either directly or by the
       # iallocator run
       target_node = self.target_node
+      if self.target_node == instance.primary_node:
+        raise errors.OpPrereqError("Cannot migrate instance %s"
+                                   " to its primary (%s)" %
+                                   (instance.name, instance.primary_node))
 
       if len(self.lu.tasklets) == 1:
-        # It is safe to remove locks only when we're the only tasklet in the LU
-        nodes_keep = [instance.primary_node, self.target_node]
-        nodes_rel = [node for node in self.lu.acquired_locks[locking.LEVEL_NODE]
-                     if node not in nodes_keep]
-        self.lu.context.glm.release(locking.LEVEL_NODE, nodes_rel)
-        self.lu.acquired_locks[locking.LEVEL_NODE] = nodes_keep
+        # It is safe to release locks only when we're the only tasklet
+        # in the LU
+        _ReleaseLocks(self.lu, locking.LEVEL_NODE,
+                      keep=[instance.primary_node, self.target_node])
 
     else:
       secondary_nodes = instance.secondary_nodes
@@ -6309,29 +6807,69 @@ class TLMigrateInstance(Tasklet):
       target_node = secondary_nodes[0]
       if self.lu.op.iallocator or (self.lu.op.target_node and
                                    self.lu.op.target_node != target_node):
+        if self.failover:
+          text = "failed over"
+        else:
+          text = "migrated"
         raise errors.OpPrereqError("Instances with disk template %s cannot"
-                                   " be migrated over to arbitrary nodes"
+                                   " be %s to arbitrary nodes"
                                    " (neither an iallocator nor a target"
                                    " node can be passed)" %
-                                   instance.disk_template, errors.ECODE_INVAL)
+                                   (instance.disk_template, text),
+                                   errors.ECODE_INVAL)
 
     i_be = self.cfg.GetClusterInfo().FillBE(instance)
 
     # check memory requirements on the secondary node
-    _CheckNodeFreeMemory(self.lu, target_node, "migrating instance %s" %
-                         instance.name, i_be[constants.BE_MEMORY],
-                         instance.hypervisor)
+    if not self.failover or instance.admin_up:
+      _CheckNodeFreeMemory(self.lu, target_node, "migrating instance %s" %
+                           instance.name, i_be[constants.BE_MEMORY],
+                           instance.hypervisor)
+    else:
+      self.lu.LogInfo("Not checking memory on the secondary node as"
+                      " instance will not be started")
 
     # check bridge existance
     _CheckInstanceBridgesExist(self.lu, instance, node=target_node)
 
     if not self.cleanup:
       _CheckNodeNotDrained(self.lu, target_node)
-      result = self.rpc.call_instance_migratable(instance.primary_node,
-                                                 instance)
-      result.Raise("Can't migrate, please use failover",
-                   prereq=True, ecode=errors.ECODE_STATE)
+      if not self.failover:
+        result = self.rpc.call_instance_migratable(instance.primary_node,
+                                                   instance)
+        if result.fail_msg and self.fallback:
+          self.lu.LogInfo("Can't migrate, instance offline, fallback to"
+                          " failover")
+          self.failover = True
+        else:
+          result.Raise("Can't migrate, please use failover",
+                       prereq=True, ecode=errors.ECODE_STATE)
 
+    assert not (self.failover and self.cleanup)
+
+    if not self.failover:
+      if self.lu.op.live is not None and self.lu.op.mode is not None:
+        raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
+                                   " parameters are accepted",
+                                   errors.ECODE_INVAL)
+      if self.lu.op.live is not None:
+        if self.lu.op.live:
+          self.lu.op.mode = constants.HT_MIGRATION_LIVE
+        else:
+          self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
+        # reset the 'live' parameter to None so that repeated
+        # invocations of CheckPrereq do not raise an exception
+        self.lu.op.live = None
+      elif self.lu.op.mode is None:
+        # read the default value from the hypervisor
+        i_hv = self.cfg.GetClusterInfo().FillHV(self.instance,
+                                                skip_globals=False)
+        self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
+
+      self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
+    else:
+      # Failover is never live
+      self.live = False
 
   def _RunAllocator(self):
     """Run the allocator based on input opcode.
@@ -6345,42 +6883,23 @@ class TLMigrateInstance(Tasklet):
                                     self.instance.primary_node],
                      )
 
-    ial.Run(self.iallocator)
+    ial.Run(self.lu.op.iallocator)
 
     if not ial.success:
       raise errors.OpPrereqError("Can't compute nodes using"
                                  " iallocator '%s': %s" %
-                                 (self.iallocator, ial.info),
+                                 (self.lu.op.iallocator, ial.info),
                                  errors.ECODE_NORES)
     if len(ial.result) != ial.required_nodes:
       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
                                  " of nodes (%s), required %s" %
-                                 (self.iallocator, len(ial.result),
+                                 (self.lu.op.iallocator, len(ial.result),
                                   ial.required_nodes), errors.ECODE_FAULT)
     self.target_node = ial.result[0]
     self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
-                 self.instance_name, self.iallocator,
+                 self.instance_name, self.lu.op.iallocator,
                  utils.CommaJoin(ial.result))
 
-    if self.lu.op.live is not None and self.lu.op.mode is not None:
-      raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
-                                 " parameters are accepted",
-                                 errors.ECODE_INVAL)
-    if self.lu.op.live is not None:
-      if self.lu.op.live:
-        self.lu.op.mode = constants.HT_MIGRATION_LIVE
-      else:
-        self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
-      # reset the 'live' parameter to None so that repeated
-      # invocations of CheckPrereq do not raise an exception
-      self.lu.op.live = None
-    elif self.lu.op.mode is None:
-      # read the default value from the hypervisor
-      i_hv = self.cfg.GetClusterInfo().FillHV(self.instance, skip_globals=False)
-      self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
-
-    self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
-
   def _WaitUntilSync(self):
     """Poll with custom rpc for disk sync.
 
@@ -6474,15 +6993,15 @@ class TLMigrateInstance(Tasklet):
 
     if runningon_source and runningon_target:
       raise errors.OpExecError("Instance seems to be running on two nodes,"
-                               " or the hypervisor is confused. You will have"
+                               " or the hypervisor is confused; you will have"
                                " to ensure manually that it runs only on one"
-                               " and restart this operation.")
+                               " and restart this operation")
 
     if not (runningon_source or runningon_target):
-      raise errors.OpExecError("Instance does not seem to be running at all."
-                               " In this case, it's safer to repair by"
+      raise errors.OpExecError("Instance does not seem to be running at all;"
+                               " in this case it's safer to repair by"
                                " running 'gnt-instance stop' to ensure disk"
-                               " shutdown, and then restarting it.")
+                               " shutdown, and then restarting it")
 
     if runningon_target:
       # the migration has actually succeeded, we need to update the config
@@ -6524,10 +7043,9 @@ class TLMigrateInstance(Tasklet):
       self._GoReconnect(False)
       self._WaitUntilSync()
     except errors.OpExecError, err:
-      self.lu.LogWarning("Migration failed and I can't reconnect the"
-                         " drives: error '%s'\n"
-                         "Please look and recover the instance status" %
-                         str(err))
+      self.lu.LogWarning("Migration failed and I can't reconnect the drives,"
+                         " please try to recover the instance manually;"
+                         " error '%s'" % str(err))
 
   def _AbortMigration(self):
     """Call the hypervisor code to abort a started migration.
@@ -6569,7 +7087,7 @@ class TLMigrateInstance(Tasklet):
       if not _CheckDiskConsistency(self.lu, dev, target_node, False):
         raise errors.OpExecError("Disk %s is degraded or not fully"
                                  " synchronized on target node,"
-                                 " aborting migrate." % dev.iv_name)
+                                 " aborting migration" % dev.iv_name)
 
     # First get the migration information from the remote node
     result = self.rpc.call_migration_info(source_node, instance)
@@ -6606,7 +7124,6 @@ class TLMigrateInstance(Tasklet):
                                (instance.name, msg))
 
     self.feedback_fn("* migrating instance to %s" % target_node)
-    time.sleep(10)
     result = self.rpc.call_instance_migrate(source_node, instance,
                                             self.nodes_ip[target_node],
                                             self.live)
@@ -6619,7 +7136,6 @@ class TLMigrateInstance(Tasklet):
       self._RevertDiskStatus()
       raise errors.OpExecError("Could not migrate instance %s: %s" %
                                (instance.name, msg))
-    time.sleep(10)
 
     instance.primary_node = target_node
     # distribute new instance config to the other nodes
@@ -6645,14 +7161,82 @@ class TLMigrateInstance(Tasklet):
 
     self.feedback_fn("* done")
 
+  def _ExecFailover(self):
+    """Failover an instance.
+
+    The failover is done by shutting it down on its present node and
+    starting it on the secondary.
+
+    """
+    instance = self.instance
+    primary_node = self.cfg.GetNodeInfo(instance.primary_node)
+
+    source_node = instance.primary_node
+    target_node = self.target_node
+
+    if instance.admin_up:
+      self.feedback_fn("* checking disk consistency between source and target")
+      for dev in instance.disks:
+        # for drbd, these are drbd over lvm
+        if not _CheckDiskConsistency(self, dev, target_node, False):
+          if not self.ignore_consistency:
+            raise errors.OpExecError("Disk %s is degraded on target node,"
+                                     " aborting failover" % dev.iv_name)
+    else:
+      self.feedback_fn("* not checking disk consistency as instance is not"
+                       " running")
+
+    self.feedback_fn("* shutting down instance on source node")
+    logging.info("Shutting down instance %s on node %s",
+                 instance.name, source_node)
+
+    result = self.rpc.call_instance_shutdown(source_node, instance,
+                                             self.shutdown_timeout)
+    msg = result.fail_msg
+    if msg:
+      if self.ignore_consistency or primary_node.offline:
+        self.lu.LogWarning("Could not shutdown instance %s on node %s,"
+                           " proceeding anyway; please make sure node"
+                           " %s is down; error details: %s",
+                           instance.name, source_node, source_node, msg)
+      else:
+        raise errors.OpExecError("Could not shutdown instance %s on"
+                                 " node %s: %s" %
+                                 (instance.name, source_node, msg))
+
+    self.feedback_fn("* deactivating the instance's disks on source node")
+    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
+      raise errors.OpExecError("Can't shut down the instance's disks.")
+
+    instance.primary_node = target_node
+    # distribute new instance config to the other nodes
+    self.cfg.Update(instance, self.feedback_fn)
+
+    # Only start the instance if it's marked as up
+    if instance.admin_up:
+      self.feedback_fn("* activating the instance's disks on target node")
+      logging.info("Starting instance %s on node %s",
+                   instance.name, target_node)
+
+      disks_ok, _ = _AssembleInstanceDisks(self, instance,
+                                           ignore_secondaries=True)
+      if not disks_ok:
+        _ShutdownInstanceDisks(self, instance)
+        raise errors.OpExecError("Can't activate the instance's disks")
+
+      self.feedback_fn("* starting the instance on the target node")
+      result = self.rpc.call_instance_start(target_node, instance, None, None)
+      msg = result.fail_msg
+      if msg:
+        _ShutdownInstanceDisks(self, instance)
+        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
+                                 (instance.name, target_node, msg))
+
   def Exec(self, feedback_fn):
     """Perform the migration.
 
     """
-    feedback_fn("Migrating instance %s" % self.instance.name)
-
     self.feedback_fn = feedback_fn
-
     self.source_node = self.instance.primary_node
 
     # FIXME: if we implement migrate-to-any in DRBD, this needs fixing
@@ -6667,10 +7251,16 @@ class TLMigrateInstance(Tasklet):
       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
       }
 
-    if self.cleanup:
-      return self._ExecCleanup()
+    if self.failover:
+      feedback_fn("Failover instance %s" % self.instance.name)
+      self._ExecFailover()
     else:
-      return self._ExecMigration()
+      feedback_fn("Migrating instance %s" % self.instance.name)
+
+      if self.cleanup:
+        return self._ExecCleanup()
+      else:
+        return self._ExecMigration()
 
 
 def _CreateBlockDev(lu, node, instance, device, force_create,
@@ -6758,17 +7348,18 @@ def _GenerateUniqueNames(lu, exts):
   return results
 
 
-def _GenerateDRBD8Branch(lu, primary, secondary, size, vgname, names, iv_name,
-                         p_minor, s_minor):
+def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
+                         iv_name, p_minor, s_minor):
   """Generate a drbd8 device complete with its children.
 
   """
+  assert len(vgnames) == len(names) == 2
   port = lu.cfg.AllocatePort()
   shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
-                          logical_id=(vgname, names[0]))
+                          logical_id=(vgnames[0], names[0]))
   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
-                          logical_id=(vgname, names[1]))
+                          logical_id=(vgnames[1], names[1]))
   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
                           logical_id=(primary, secondary, port,
                                       p_minor, s_minor,
@@ -6801,12 +7392,13 @@ def _GenerateDiskTemplate(lu, template_name,
                                       for i in range(disk_count)])
     for idx, disk in enumerate(disk_info):
       disk_index = idx + base_index
-      vg = disk.get("vg", vgname)
+      vg = disk.get(constants.IDISK_VG, vgname)
       feedback_fn("* disk %i, vg %s, name %s" % (idx, vg, names[idx]))
-      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
+      disk_dev = objects.Disk(dev_type=constants.LD_LV,
+                              size=disk[constants.IDISK_SIZE],
                               logical_id=(vg, names[idx]),
                               iv_name="disk/%d" % disk_index,
-                              mode=disk["mode"])
+                              mode=disk[constants.IDISK_MODE])
       disks.append(disk_dev)
   elif template_name == constants.DT_DRBD8:
     if len(secondary_nodes) != 1:
@@ -6822,12 +7414,15 @@ def _GenerateDiskTemplate(lu, template_name,
       names.append(lv_prefix + "_meta")
     for idx, disk in enumerate(disk_info):
       disk_index = idx + base_index
-      vg = disk.get("vg", vgname)
+      data_vg = disk.get(constants.IDISK_VG, vgname)
+      meta_vg = disk.get(constants.IDISK_METAVG, data_vg)
       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
-                                      disk["size"], vg, names[idx*2:idx*2+2],
+                                      disk[constants.IDISK_SIZE],
+                                      [data_vg, meta_vg],
+                                      names[idx * 2:idx * 2 + 2],
                                       "disk/%d" % disk_index,
-                                      minors[idx*2], minors[idx*2+1])
-      disk_dev.mode = disk["mode"]
+                                      minors[idx * 2], minors[idx * 2 + 1])
+      disk_dev.mode = disk[constants.IDISK_MODE]
       disks.append(disk_dev)
   elif template_name == constants.DT_FILE:
     if len(secondary_nodes) != 0:
@@ -6837,12 +7432,13 @@ def _GenerateDiskTemplate(lu, template_name,
 
     for idx, disk in enumerate(disk_info):
       disk_index = idx + base_index
-      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
+      disk_dev = objects.Disk(dev_type=constants.LD_FILE,
+                              size=disk[constants.IDISK_SIZE],
                               iv_name="disk/%d" % disk_index,
                               logical_id=(file_driver,
                                           "%s/disk%d" % (file_storage_dir,
                                                          disk_index)),
-                              mode=disk["mode"])
+                              mode=disk[constants.IDISK_MODE])
       disks.append(disk_dev)
   elif template_name == constants.DT_SHARED_FILE:
     if len(secondary_nodes) != 0:
@@ -6852,12 +7448,13 @@ def _GenerateDiskTemplate(lu, template_name,
 
     for idx, disk in enumerate(disk_info):
       disk_index = idx + base_index
-      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
+      disk_dev = objects.Disk(dev_type=constants.LD_FILE,
+                              size=disk[constants.IDISK_SIZE],
                               iv_name="disk/%d" % disk_index,
                               logical_id=(file_driver,
                                           "%s/disk%d" % (file_storage_dir,
                                                          disk_index)),
-                              mode=disk["mode"])
+                              mode=disk[constants.IDISK_MODE])
       disks.append(disk_dev)
   elif template_name == constants.DT_BLOCK:
     if len(secondary_nodes) != 0:
@@ -6865,11 +7462,12 @@ def _GenerateDiskTemplate(lu, template_name,
 
     for idx, disk in enumerate(disk_info):
       disk_index = idx + base_index
-      disk_dev = objects.Disk(dev_type=constants.LD_BLOCKDEV, size=disk["size"],
+      disk_dev = objects.Disk(dev_type=constants.LD_BLOCKDEV,
+                              size=disk[constants.IDISK_SIZE],
                               logical_id=(constants.BLOCKDEV_DRIVER_MANUAL,
-                                          disk["adopt"]),
+                                          disk[constants.IDISK_ADOPT]),
                               iv_name="disk/%d" % disk_index,
-                              mode=disk["mode"])
+                              mode=disk[constants.IDISK_MODE])
       disks.append(disk_dev)
 
   else:
@@ -6922,14 +7520,17 @@ def _WipeDisks(lu, instance):
 
   try:
     for idx, device in enumerate(instance.disks):
-      lu.LogInfo("* Wiping disk %d", idx)
-      logging.info("Wiping disk %d for instance %s, node %s",
-                   idx, instance.name, node)
-
       # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
       # MAX_WIPE_CHUNK at max
       wipe_chunk_size = min(constants.MAX_WIPE_CHUNK, device.size / 100.0 *
                             constants.MIN_WIPE_CHUNK_PERCENT)
+      # we _must_ make this an int, otherwise rounding errors will
+      # occur
+      wipe_chunk_size = int(wipe_chunk_size)
+
+      lu.LogInfo("* Wiping disk %d", idx)
+      logging.info("Wiping disk %d for instance %s, node %s using"
+                   " chunk size %s", idx, instance.name, node, wipe_chunk_size)
 
       offset = 0
       size = device.size
@@ -6938,6 +7539,8 @@ def _WipeDisks(lu, instance):
 
       while offset < size:
         wipe_size = min(wipe_chunk_size, size - offset)
+        logging.debug("Wiping disk %d, offset %s, chunk %s",
+                      idx, offset, wipe_size)
         result = lu.rpc.call_blockdev_wipe(node, device, offset, wipe_size)
         result.Raise("Could not wipe disk %d at offset %d for size %d" %
                      (idx, offset, wipe_size))
@@ -6955,8 +7558,8 @@ def _WipeDisks(lu, instance):
 
     for idx, success in enumerate(result.payload):
       if not success:
-        lu.LogWarning("Warning: Resume sync of disk %d failed. Please have a"
-                      " look at the status and troubleshoot the issue.", idx)
+        lu.LogWarning("Resume sync of disk %d failed, please have a"
+                      " look at the status and troubleshoot the issue", idx)
         logging.warn("resume-sync of instance %s for disks %d failed",
                      instance.name, idx)
 
@@ -7060,12 +7663,13 @@ def _ComputeDiskSizePerVG(disk_template, disks):
 
   """
   def _compute(disks, payload):
-    """Universal algorithm
+    """Universal algorithm.
 
     """
     vgs = {}
     for disk in disks:
-      vgs[disk["vg"]] = vgs.get("vg", 0) + disk["size"] + payload
+      vgs[disk[constants.IDISK_VG]] = \
+        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
 
     return vgs
 
@@ -7093,9 +7697,9 @@ def _ComputeDiskSize(disk_template, disks):
   # Required free disk space as a function of disk and swap space
   req_size_dict = {
     constants.DT_DISKLESS: None,
-    constants.DT_PLAIN: sum(d["size"] for d in disks),
+    constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks),
     # 128 MB are added for drbd metadata for each disk
-    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
+    constants.DT_DRBD8: sum(d[constants.IDISK_SIZE] + 128 for d in disks),
     constants.DT_FILE: None,
     constants.DT_SHARED_FILE: 0,
     constants.DT_BLOCK: 0,
@@ -7204,8 +7808,8 @@ class LUInstanceCreate(LogicalUnit):
 
     if self.op.ip_check and not self.op.name_check:
       # TODO: make the ip check more flexible and not depend on the name check
-      raise errors.OpPrereqError("Cannot do ip check without a name check",
-                                 errors.ECODE_INVAL)
+      raise errors.OpPrereqError("Cannot do IP address check without a name"
+                                 " check", errors.ECODE_INVAL)
 
     # check nics' parameter names
     for nic in self.op.nics:
@@ -7215,7 +7819,7 @@ class LUInstanceCreate(LogicalUnit):
     has_adopt = has_no_adopt = False
     for disk in self.op.disks:
       utils.ForceDictType(disk, constants.IDISK_PARAMS_TYPES)
-      if "adopt" in disk:
+      if constants.IDISK_ADOPT in disk:
         has_adopt = True
       else:
         has_no_adopt = True
@@ -7383,7 +7987,7 @@ class LUInstanceCreate(LogicalUnit):
         self.op.src_node = None
         if os.path.isabs(src_path):
           raise errors.OpPrereqError("Importing an instance from an absolute"
-                                     " path requires a source node option.",
+                                     " path requires a source node option",
                                      errors.ECODE_INVAL)
       else:
         self.op.src_node = src_node = _ExpandNodeName(self.cfg, src_node)
@@ -7454,15 +8058,21 @@ class LUInstanceCreate(LogicalUnit):
       vcpus=self.be_full[constants.BE_VCPUS],
       nics=_NICListToTuple(self, self.nics),
       disk_template=self.op.disk_template,
-      disks=[(d["size"], d["mode"]) for d in self.disks],
+      disks=[(d[constants.IDISK_SIZE], d[constants.IDISK_MODE])
+             for d in self.disks],
       bep=self.be_full,
       hvp=self.hv_full,
       hypervisor_name=self.op.hypervisor,
     ))
 
-    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
-          self.secondaries)
-    return env, nl, nl
+    return env
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
+    nl = [self.cfg.GetMasterNode(), self.op.pnode] + self.secondaries
+    return nl, nl
 
   def _ReadExportInfo(self):
     """Reads the export information from disk.
@@ -7479,7 +8089,7 @@ class LUInstanceCreate(LogicalUnit):
     src_path = self.op.src_path
 
     if src_node is None:
-      locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
+      locked_nodes = self.glm.list_owned(locking.LEVEL_NODE)
       exp_list = self.rpc.call_export_list(locked_nodes)
       found = False
       for node in exp_list:
@@ -7536,7 +8146,7 @@ class LUInstanceCreate(LogicalUnit):
         # TODO: import the disk iv_name too
         for idx in range(einfo.getint(constants.INISECT_INS, "disk_count")):
           disk_sz = einfo.getint(constants.INISECT_INS, "disk%d_size" % idx)
-          disks.append({"size": disk_sz})
+          disks.append({constants.IDISK_SIZE: disk_sz})
         self.op.disks = disks
       else:
         raise errors.OpPrereqError("No disk info specified and the export"
@@ -7657,7 +8267,7 @@ class LUInstanceCreate(LogicalUnit):
     # NIC buildup
     self.nics = []
     for idx, nic in enumerate(self.op.nics):
-      nic_mode_req = nic.get("mode", None)
+      nic_mode_req = nic.get(constants.INIC_MODE, None)
       nic_mode = nic_mode_req
       if nic_mode is None:
         nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
@@ -7669,7 +8279,7 @@ class LUInstanceCreate(LogicalUnit):
         default_ip_mode = constants.VALUE_NONE
 
       # ip validity checks
-      ip = nic.get("ip", default_ip_mode)
+      ip = nic.get(constants.INIC_IP, default_ip_mode)
       if ip is None or ip.lower() == constants.VALUE_NONE:
         nic_ip = None
       elif ip.lower() == constants.VALUE_AUTO:
@@ -7690,7 +8300,7 @@ class LUInstanceCreate(LogicalUnit):
                                    errors.ECODE_INVAL)
 
       # MAC address verification
-      mac = nic.get("mac", constants.VALUE_AUTO)
+      mac = nic.get(constants.INIC_MAC, constants.VALUE_AUTO)
       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
         mac = utils.NormalizeAndValidateMac(mac)
 
@@ -7714,13 +8324,14 @@ class LUInstanceCreate(LogicalUnit):
       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
 
     # disk checks/pre-build
+    default_vg = self.cfg.GetVGName()
     self.disks = []
     for disk in self.op.disks:
-      mode = disk.get("mode", constants.DISK_RDWR)
+      mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
       if mode not in constants.DISK_ACCESS_SET:
         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
                                    mode, errors.ECODE_INVAL)
-      size = disk.get("size", None)
+      size = disk.get(constants.IDISK_SIZE, None)
       if size is None:
         raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
       try:
@@ -7728,10 +8339,16 @@ class LUInstanceCreate(LogicalUnit):
       except (TypeError, ValueError):
         raise errors.OpPrereqError("Invalid disk size '%s'" % size,
                                    errors.ECODE_INVAL)
-      vg = disk.get("vg", self.cfg.GetVGName())
-      new_disk = {"size": size, "mode": mode, "vg": vg}
-      if "adopt" in disk:
-        new_disk["adopt"] = disk["adopt"]
+
+      data_vg = disk.get(constants.IDISK_VG, default_vg)
+      new_disk = {
+        constants.IDISK_SIZE: size,
+        constants.IDISK_MODE: mode,
+        constants.IDISK_VG: data_vg,
+        constants.IDISK_METAVG: disk.get(constants.IDISK_METAVG, data_vg),
+        }
+      if constants.IDISK_ADOPT in disk:
+        new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
       self.disks.append(new_disk)
 
     if self.op.mode == constants.INSTANCE_IMPORT:
@@ -7819,7 +8436,7 @@ class LUInstanceCreate(LogicalUnit):
     if self.op.disk_template in constants.DTS_INT_MIRROR:
       if self.op.snode == pnode.name:
         raise errors.OpPrereqError("The secondary node cannot be the"
-                                   " primary node.", errors.ECODE_INVAL)
+                                   " primary node", errors.ECODE_INVAL)
       _CheckNodeOnline(self, self.op.snode)
       _CheckNodeNotDrained(self, self.op.snode)
       _CheckNodeVmCapable(self, self.op.snode)
@@ -7833,7 +8450,9 @@ class LUInstanceCreate(LogicalUnit):
       _CheckNodesFreeDiskPerVG(self, nodenames, req_sizes)
 
     elif self.op.disk_template == constants.DT_PLAIN: # Check the adoption data
-      all_lvs = set([i["vg"] + "/" + i["adopt"] for i in self.disks])
+      all_lvs = set(["%s/%s" % (disk[constants.IDISK_VG],
+                                disk[constants.IDISK_ADOPT])
+                     for disk in self.disks])
       if len(all_lvs) != len(self.disks):
         raise errors.OpPrereqError("Duplicate volume names given for adoption",
                                    errors.ECODE_INVAL)
@@ -7866,11 +8485,14 @@ class LUInstanceCreate(LogicalUnit):
                                    errors.ECODE_STATE)
       # update the size of disk based on what is found
       for dsk in self.disks:
-        dsk["size"] = int(float(node_lvs[dsk["vg"] + "/" + dsk["adopt"]][0]))
+        dsk[constants.IDISK_SIZE] = \
+          int(float(node_lvs["%s/%s" % (dsk[constants.IDISK_VG],
+                                        dsk[constants.IDISK_ADOPT])][0]))
 
     elif self.op.disk_template == constants.DT_BLOCK:
       # Normalize and de-duplicate device paths
-      all_disks = set([os.path.abspath(i["adopt"]) for i in self.disks])
+      all_disks = set([os.path.abspath(disk[constants.IDISK_ADOPT])
+                       for disk in self.disks])
       if len(all_disks) != len(self.disks):
         raise errors.OpPrereqError("Duplicate disk names given for adoption",
                                    errors.ECODE_INVAL)
@@ -7894,7 +8516,8 @@ class LUInstanceCreate(LogicalUnit):
                                    utils.CommaJoin(delta),
                                    errors.ECODE_INVAL)
       for dsk in self.disks:
-        dsk["size"] = int(float(node_disks[dsk["adopt"]]))
+        dsk[constants.IDISK_SIZE] = \
+          int(float(node_disks[dsk[constants.IDISK_ADOPT]]))
 
     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
 
@@ -7974,7 +8597,7 @@ class LUInstanceCreate(LogicalUnit):
         rename_to = []
         for t_dsk, a_dsk in zip (tmp_disks, self.disks):
           rename_to.append(t_dsk.logical_id)
-          t_dsk.logical_id = (t_dsk.logical_id[0], a_dsk["adopt"])
+          t_dsk.logical_id = (t_dsk.logical_id[0], a_dsk[constants.IDISK_ADOPT])
           self.cfg.SetDiskID(t_dsk, pnode_name)
         result = self.rpc.call_blockdev_rename(pnode_name,
                                                zip(tmp_disks, rename_to))
@@ -7991,18 +8614,6 @@ class LUInstanceCreate(LogicalUnit):
           self.cfg.ReleaseDRBDMinors(instance)
           raise
 
-      if self.cfg.GetClusterInfo().prealloc_wipe_disks:
-        feedback_fn("* wiping instance disks...")
-        try:
-          _WipeDisks(self, iobj)
-        except errors.OpExecError:
-          self.LogWarning("Device wiping failed, reverting...")
-          try:
-            _RemoveDisks(self, iobj)
-          finally:
-            self.cfg.ReleaseDRBDMinors(instance)
-            raise
-
     feedback_fn("adding instance %s to cluster config" % instance)
 
     self.cfg.AddInstance(iobj, self.proc.GetECId())
@@ -8010,18 +8621,28 @@ class LUInstanceCreate(LogicalUnit):
     # Declare that we don't want to remove the instance lock anymore, as we've
     # added the instance to the config
     del self.remove_locks[locking.LEVEL_INSTANCE]
-    # Unlock all the nodes
+
     if self.op.mode == constants.INSTANCE_IMPORT:
-      nodes_keep = [self.op.src_node]
-      nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
-                       if node != self.op.src_node]
-      self.context.glm.release(locking.LEVEL_NODE, nodes_release)
-      self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
+      # Release unused nodes
+      _ReleaseLocks(self, locking.LEVEL_NODE, keep=[self.op.src_node])
     else:
-      self.context.glm.release(locking.LEVEL_NODE)
-      del self.acquired_locks[locking.LEVEL_NODE]
+      # Release all nodes
+      _ReleaseLocks(self, locking.LEVEL_NODE)
 
-    if self.op.wait_for_sync:
+    disk_abort = False
+    if not self.adopt_disks and self.cfg.GetClusterInfo().prealloc_wipe_disks:
+      feedback_fn("* wiping instance disks...")
+      try:
+        _WipeDisks(self, iobj)
+      except errors.OpExecError, err:
+        logging.exception("Wiping disks failed")
+        self.LogWarning("Wiping instance disks failed (%s)", err)
+        disk_abort = True
+
+    if disk_abort:
+      # Something is already wrong with the disks, don't do anything else
+      pass
+    elif self.op.wait_for_sync:
       disk_abort = not _WaitForSync(self, iobj)
     elif iobj.disk_template in constants.DTS_INT_MIRROR:
       # make sure the disks are not degraded (still sync-ing is ok)
@@ -8205,24 +8826,29 @@ class LUInstanceReplaceDisks(LogicalUnit):
   def ExpandNames(self):
     self._ExpandAndLockInstance()
 
-    if self.op.iallocator is not None:
-      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+    assert locking.LEVEL_NODE not in self.needed_locks
+    assert locking.LEVEL_NODEGROUP not in self.needed_locks
 
-    elif self.op.remote_node is not None:
-      remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
-      self.op.remote_node = remote_node
+    assert self.op.iallocator is None or self.op.remote_node is None, \
+      "Conflicting options"
+
+    if self.op.remote_node is not None:
+      self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
 
       # Warning: do not remove the locking of the new secondary here
       # unless DRBD8.AddChildren is changed to work in parallel;
       # currently it doesn't since parallel invocations of
       # FindUnusedMinor will conflict
-      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
+      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
-
     else:
       self.needed_locks[locking.LEVEL_NODE] = []
       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
 
+      if self.op.iallocator is not None:
+        # iallocator will select a new node in the same group
+        self.needed_locks[locking.LEVEL_NODEGROUP] = []
+
     self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
                                    self.op.iallocator, self.op.remote_node,
                                    self.op.disks, False, self.op.early_release)
@@ -8230,11 +8856,26 @@ class LUInstanceReplaceDisks(LogicalUnit):
     self.tasklets = [self.replacer]
 
   def DeclareLocks(self, level):
-    # If we're not already locking all nodes in the set we have to declare the
-    # instance's primary/secondary nodes.
-    if (level == locking.LEVEL_NODE and
-        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
-      self._LockInstancesNodes()
+    if level == locking.LEVEL_NODEGROUP:
+      assert self.op.remote_node is None
+      assert self.op.iallocator is not None
+      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
+
+      self.share_locks[locking.LEVEL_NODEGROUP] = 1
+      self.needed_locks[locking.LEVEL_NODEGROUP] = \
+        self.cfg.GetInstanceNodeGroups(self.op.instance_name)
+
+    elif level == locking.LEVEL_NODE:
+      if self.op.iallocator is not None:
+        assert self.op.remote_node is None
+        assert not self.needed_locks[locking.LEVEL_NODE]
+
+        # Lock member nodes of all locked groups
+        self.needed_locks[locking.LEVEL_NODE] = [node_name
+          for group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP)
+          for node_name in self.cfg.GetNodeGroup(group_uuid).members]
+      else:
+        self._LockInstancesNodes()
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -8249,13 +8890,40 @@ class LUInstanceReplaceDisks(LogicalUnit):
       "OLD_SECONDARY": instance.secondary_nodes[0],
       }
     env.update(_BuildInstanceHookEnvByObject(self, instance))
+    return env
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
+    instance = self.replacer.instance
     nl = [
       self.cfg.GetMasterNode(),
       instance.primary_node,
       ]
     if self.op.remote_node is not None:
       nl.append(self.op.remote_node)
-    return env, nl, nl
+    return nl, nl
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    """
+    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
+            self.op.iallocator is None)
+
+    owned_groups = self.glm.list_owned(locking.LEVEL_NODEGROUP)
+    if owned_groups:
+      groups = self.cfg.GetInstanceNodeGroups(self.op.instance_name)
+      if owned_groups != groups:
+        raise errors.OpExecError("Node groups used by instance '%s' changed"
+                                 " since lock was acquired, current list is %r,"
+                                 " used to be '%s'" %
+                                 (self.op.instance_name,
+                                  utils.CommaJoin(groups),
+                                  utils.CommaJoin(owned_groups)))
+
+    return LogicalUnit.CheckPrereq(self)
 
 
 class TLReplaceDisks(Tasklet):
@@ -8345,6 +9013,29 @@ class TLReplaceDisks(Tasklet):
     return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
                                     node_name, True)
 
+  def _CheckDisksActivated(self, instance):
+    """Checks if the instance disks are activated.
+
+    @param instance: The instance to check disks
+    @return: True if they are activated, False otherwise
+
+    """
+    nodes = instance.all_nodes
+
+    for idx, dev in enumerate(instance.disks):
+      for node in nodes:
+        self.lu.LogInfo("Checking disk/%d on %s", idx, node)
+        self.cfg.SetDiskID(dev, node)
+
+        result = self.rpc.call_blockdev_find(node, dev)
+
+        if result.offline:
+          continue
+        elif result.fail_msg or not result.payload:
+          return False
+
+    return True
+
   def CheckPrereq(self):
     """Check prerequisites.
 
@@ -8386,20 +9077,23 @@ class TLReplaceDisks(Tasklet):
       remote_node = self._RunAllocator(self.lu, self.iallocator_name,
                                        instance.name, instance.secondary_nodes)
 
-    if remote_node is not None:
+    if remote_node is None:
+      self.remote_node_info = None
+    else:
+      assert remote_node in self.lu.glm.list_owned(locking.LEVEL_NODE), \
+             "Remote node '%s' is not locked" % remote_node
+
       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
       assert self.remote_node_info is not None, \
         "Cannot retrieve locked node %s" % remote_node
-    else:
-      self.remote_node_info = None
 
     if remote_node == self.instance.primary_node:
       raise errors.OpPrereqError("The specified node is the primary node of"
-                                 " the instance.", errors.ECODE_INVAL)
+                                 " the instance", errors.ECODE_INVAL)
 
     if remote_node == secondary_node:
       raise errors.OpPrereqError("The specified node is already the"
-                                 " secondary node of the instance.",
+                                 " secondary node of the instance",
                                  errors.ECODE_INVAL)
 
     if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
@@ -8408,6 +9102,10 @@ class TLReplaceDisks(Tasklet):
                                  errors.ECODE_INVAL)
 
     if self.mode == constants.REPLACE_DISK_AUTO:
+      if not self._CheckDisksActivated(instance):
+        raise errors.OpPrereqError("Please run activate-disks on instance %s"
+                                   " first" % self.instance_name,
+                                   errors.ECODE_STATE)
       faulty_primary = self._FindFaultyDisks(instance.primary_node)
       faulty_secondary = self._FindFaultyDisks(secondary_node)
 
@@ -8471,18 +9169,26 @@ class TLReplaceDisks(Tasklet):
     for node in check_nodes:
       _CheckNodeOnline(self.lu, node)
 
+    touched_nodes = frozenset(node_name for node_name in [self.new_node,
+                                                          self.other_node,
+                                                          self.target_node]
+                              if node_name is not None)
+
+    # Release unneeded node locks
+    _ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
+
+    # Release any owned node group
+    if self.lu.glm.is_owned(locking.LEVEL_NODEGROUP):
+      _ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
+
     # Check whether disks are valid
     for disk_idx in self.disks:
       instance.FindDisk(disk_idx)
 
     # Get secondary node IP addresses
-    node_2nd_ip = {}
-
-    for node_name in [self.target_node, self.other_node, self.new_node]:
-      if node_name is not None:
-        node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
-
-    self.node_secondary_ip = node_2nd_ip
+    self.node_secondary_ip = \
+      dict((node_name, self.cfg.GetNodeInfo(node_name).secondary_ip)
+           for node_name in touched_nodes)
 
   def Exec(self, feedback_fn):
     """Execute disk replacement.
@@ -8493,6 +9199,20 @@ class TLReplaceDisks(Tasklet):
     if self.delay_iallocator:
       self._CheckPrereq2()
 
+    if __debug__:
+      # Verify owned locks before starting operation
+      owned_locks = self.lu.glm.list_owned(locking.LEVEL_NODE)
+      assert set(owned_locks) == set(self.node_secondary_ip), \
+          ("Incorrect node locks, owning %s, expected %s" %
+           (owned_locks, self.node_secondary_ip.keys()))
+
+      owned_locks = self.lu.glm.list_owned(locking.LEVEL_INSTANCE)
+      assert list(owned_locks) == [self.instance_name], \
+          "Instance '%s' not locked" % self.instance_name
+
+      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
+          "Should not own any node group lock at this point"
+
     if not self.disks:
       feedback_fn("No disks need replacement")
       return
@@ -8513,14 +9233,24 @@ class TLReplaceDisks(Tasklet):
       else:
         fn = self._ExecDrbd8DiskOnly
 
-      return fn(feedback_fn)
-
+      result = fn(feedback_fn)
     finally:
       # Deactivate the instance disks if we're replacing them on a
       # down instance
       if activate_disks:
         _SafeShutdownInstanceDisks(self.lu, self.instance)
 
+    if __debug__:
+      # Verify owned locks
+      owned_locks = self.lu.glm.list_owned(locking.LEVEL_NODE)
+      nodes = frozenset(self.node_secondary_ip)
+      assert ((self.early_release and not owned_locks) or
+              (not self.early_release and not (set(owned_locks) - nodes))), \
+        ("Not owning the correct locks, early_release=%s, owned=%r,"
+         " nodes=%r" % (self.early_release, owned_locks, nodes))
+
+    return result
+
   def _CheckVolumeGroup(self, nodes):
     self.lu.LogInfo("Checking volume groups")
 
@@ -8572,7 +9302,6 @@ class TLReplaceDisks(Tasklet):
                                  (node_name, self.instance.name))
 
   def _CreateNewStorage(self, node_name):
-    vgname = self.cfg.GetVGName()
     iv_names = {}
 
     for idx, dev in enumerate(self.instance.disks):
@@ -8586,10 +9315,12 @@ class TLReplaceDisks(Tasklet):
       lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
       names = _GenerateUniqueNames(self.lu, lv_names)
 
+      vg_data = dev.children[0].logical_id[0]
       lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
-                             logical_id=(vgname, names[0]))
+                             logical_id=(vg_data, names[0]))
+      vg_meta = dev.children[1].logical_id[0]
       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
-                             logical_id=(vgname, names[1]))
+                             logical_id=(vg_meta, names[1]))
 
       new_lvs = [lv_data, lv_meta]
       old_lvs = dev.children
@@ -8630,10 +9361,6 @@ class TLReplaceDisks(Tasklet):
           self.lu.LogWarning("Can't remove old LV: %s" % msg,
                              hint="remove unused LVs manually")
 
-  def _ReleaseNodeLock(self, node_name):
-    """Releases the lock for a given node."""
-    self.lu.context.glm.release(locking.LEVEL_NODE, node_name)
-
   def _ExecDrbd8DiskOnly(self, feedback_fn):
     """Replace a disk on the primary or secondary for DRBD 8.
 
@@ -8751,7 +9478,8 @@ class TLReplaceDisks(Tasklet):
       self._RemoveOldStorage(self.target_node, iv_names)
       # WARNING: we release both node locks here, do not do other RPCs
       # than WaitForSync to the primary node
-      self._ReleaseNodeLock([self.target_node, self.other_node])
+      _ReleaseLocks(self.lu, locking.LEVEL_NODE,
+                    names=[self.target_node, self.other_node])
 
     # Wait for sync
     # This can fail as the old devices are degraded and _WaitForSync
@@ -8908,9 +9636,10 @@ class TLReplaceDisks(Tasklet):
       self._RemoveOldStorage(self.target_node, iv_names)
       # WARNING: we release all node locks here, do not do other RPCs
       # than WaitForSync to the primary node
-      self._ReleaseNodeLock([self.instance.primary_node,
-                             self.target_node,
-                             self.new_node])
+      _ReleaseLocks(self.lu, locking.LEVEL_NODE,
+                    names=[self.instance.primary_node,
+                           self.target_node,
+                           self.new_node])
 
     # Wait for sync
     # This can fail as the old devices are degraded and _WaitForSync
@@ -9062,8 +9791,14 @@ class LUInstanceGrowDisk(LogicalUnit):
       "AMOUNT": self.op.amount,
       }
     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
+    return env
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
-    return env, nl, nl
+    return (nl, nl)
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -9082,7 +9817,7 @@ class LUInstanceGrowDisk(LogicalUnit):
 
     if instance.disk_template not in constants.DTS_GROWABLE:
       raise errors.OpPrereqError("Instance's disk layout does not support"
-                                 " growing.", errors.ECODE_INVAL)
+                                 " growing", errors.ECODE_INVAL)
 
     self.disk = instance.FindDisk(self.op.disk)
 
@@ -9104,9 +9839,17 @@ class LUInstanceGrowDisk(LogicalUnit):
     if not disks_ok:
       raise errors.OpExecError("Cannot activate block device to grow")
 
+    # First run all grow ops in dry-run mode
+    for node in instance.all_nodes:
+      self.cfg.SetDiskID(disk, node)
+      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, True)
+      result.Raise("Grow request failed to node %s" % node)
+
+    # We know that (as far as we can test) operations across different
+    # nodes will succeed, time to run it for real
     for node in instance.all_nodes:
       self.cfg.SetDiskID(disk, node)
-      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
+      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, False)
       result.Raise("Grow request failed to node %s" % node)
 
       # TODO: Rewrite code to work properly
@@ -9121,14 +9864,14 @@ class LUInstanceGrowDisk(LogicalUnit):
     if self.op.wait_for_sync:
       disk_abort = not _WaitForSync(self, instance, disks=[disk])
       if disk_abort:
-        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
-                             " status.\nPlease check the instance.")
+        self.proc.LogWarning("Disk sync-ing has not returned a good"
+                             " status; please check the instance")
       if not instance.admin_up:
         _SafeShutdownInstanceDisks(self, instance, disks=[disk])
     elif not instance.admin_up:
       self.proc.LogWarning("Not shutting down the disk even if the instance is"
                            " not supposed to be running because no wait for"
-                           " sync mode was requested.")
+                           " sync mode was requested")
 
 
 class LUInstanceQueryData(NoHooksLU):
@@ -9139,23 +9882,33 @@ class LUInstanceQueryData(NoHooksLU):
 
   def ExpandNames(self):
     self.needed_locks = {}
-    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
 
-    if self.op.instances:
-      self.wanted_names = []
-      for name in self.op.instances:
-        full_name = _ExpandInstanceName(self.cfg, name)
-        self.wanted_names.append(full_name)
-      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
+    # Use locking if requested or when non-static information is wanted
+    if not (self.op.static or self.op.use_locking):
+      self.LogWarning("Non-static data requested, locks need to be acquired")
+      self.op.use_locking = True
+
+    if self.op.instances or not self.op.use_locking:
+      # Expand instance names right here
+      self.wanted_names = _GetWantedInstances(self, self.op.instances)
     else:
+      # Will use acquired locks
       self.wanted_names = None
-      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
 
-    self.needed_locks[locking.LEVEL_NODE] = []
-    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+    if self.op.use_locking:
+      self.share_locks = dict.fromkeys(locking.LEVELS, 1)
+
+      if self.wanted_names is None:
+        self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
+      else:
+        self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
+
+      self.needed_locks[locking.LEVEL_NODE] = []
+      self.share_locks = dict.fromkeys(locking.LEVELS, 1)
+      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
 
   def DeclareLocks(self, level):
-    if level == locking.LEVEL_NODE:
+    if self.op.use_locking and level == locking.LEVEL_NODE:
       self._LockInstancesNodes()
 
   def CheckPrereq(self):
@@ -9165,10 +9918,11 @@ class LUInstanceQueryData(NoHooksLU):
 
     """
     if self.wanted_names is None:
-      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
+      assert self.op.use_locking, "Locking was not used"
+      self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE)
 
-    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
-                             in self.wanted_names]
+    self.wanted_instances = [self.cfg.GetInstanceInfo(name)
+                             for name in self.wanted_names]
 
   def _ComputeBlockdevStatus(self, node, instance_name, dev):
     """Returns the status of a block device
@@ -9214,7 +9968,7 @@ class LUInstanceQueryData(NoHooksLU):
     else:
       dev_children = []
 
-    data = {
+    return {
       "iv_name": dev.iv_name,
       "dev_type": dev.dev_type,
       "logical_id": dev.logical_id,
@@ -9226,8 +9980,6 @@ class LUInstanceQueryData(NoHooksLU):
       "size": dev.size,
       }
 
-    return data
-
   def Exec(self, feedback_fn):
     """Gather and return data"""
     result = {}
@@ -9255,7 +10007,7 @@ class LUInstanceQueryData(NoHooksLU):
       disks = [self._ComputeDiskStatus(instance, None, device)
                for device in instance.disks]
 
-      idict = {
+      result[instance.name] = {
         "name": instance.name,
         "config_state": config_state,
         "run_state": remote_state,
@@ -9280,8 +10032,6 @@ class LUInstanceQueryData(NoHooksLU):
         "uuid": instance.uuid,
         }
 
-      result[instance.name] = idict
-
     return result
 
 
@@ -9318,11 +10068,11 @@ class LUInstanceSetParams(LogicalUnit):
           raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
 
       if disk_op == constants.DDM_ADD:
-        mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
+        mode = disk_dict.setdefault(constants.IDISK_MODE, constants.DISK_RDWR)
         if mode not in constants.DISK_ACCESS_SET:
           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode,
                                      errors.ECODE_INVAL)
-        size = disk_dict.get('size', None)
+        size = disk_dict.get(constants.IDISK_SIZE, None)
         if size is None:
           raise errors.OpPrereqError("Required disk parameter size missing",
                                      errors.ECODE_INVAL)
@@ -9331,10 +10081,10 @@ class LUInstanceSetParams(LogicalUnit):
         except (TypeError, ValueError), err:
           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
                                      str(err), errors.ECODE_INVAL)
-        disk_dict['size'] = size
+        disk_dict[constants.IDISK_SIZE] = size
       else:
         # modification of disk
-        if 'size' in disk_dict:
+        if constants.IDISK_SIZE in disk_dict:
           raise errors.OpPrereqError("Disk size change not possible, use"
                                      " grow-disk", errors.ECODE_INVAL)
 
@@ -9371,32 +10121,32 @@ class LUInstanceSetParams(LogicalUnit):
           raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
 
       # nic_dict should be a dict
-      nic_ip = nic_dict.get('ip', None)
+      nic_ip = nic_dict.get(constants.INIC_IP, None)
       if nic_ip is not None:
         if nic_ip.lower() == constants.VALUE_NONE:
-          nic_dict['ip'] = None
+          nic_dict[constants.INIC_IP] = None
         else:
           if not netutils.IPAddress.IsValid(nic_ip):
             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip,
                                        errors.ECODE_INVAL)
 
       nic_bridge = nic_dict.get('bridge', None)
-      nic_link = nic_dict.get('link', None)
+      nic_link = nic_dict.get(constants.INIC_LINK, None)
       if nic_bridge and nic_link:
         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
                                    " at the same time", errors.ECODE_INVAL)
       elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
         nic_dict['bridge'] = None
       elif nic_link and nic_link.lower() == constants.VALUE_NONE:
-        nic_dict['link'] = None
+        nic_dict[constants.INIC_LINK] = None
 
       if nic_op == constants.DDM_ADD:
-        nic_mac = nic_dict.get('mac', None)
+        nic_mac = nic_dict.get(constants.INIC_MAC, None)
         if nic_mac is None:
-          nic_dict['mac'] = constants.VALUE_AUTO
+          nic_dict[constants.INIC_MAC] = constants.VALUE_AUTO
 
-      if 'mac' in nic_dict:
-        nic_mac = nic_dict['mac']
+      if constants.INIC_MAC in nic_dict:
+        nic_mac = nic_dict[constants.INIC_MAC]
         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
           nic_mac = utils.NormalizeAndValidateMac(nic_mac)
 
@@ -9442,12 +10192,12 @@ class LUInstanceSetParams(LogicalUnit):
           this_nic_override = nic_override[idx]
         else:
           this_nic_override = {}
-        if 'ip' in this_nic_override:
-          ip = this_nic_override['ip']
+        if constants.INIC_IP in this_nic_override:
+          ip = this_nic_override[constants.INIC_IP]
         else:
           ip = nic.ip
-        if 'mac' in this_nic_override:
-          mac = this_nic_override['mac']
+        if constants.INIC_MAC in this_nic_override:
+          mac = this_nic_override[constants.INIC_MAC]
         else:
           mac = nic.mac
         if idx in self.nic_pnew:
@@ -9458,8 +10208,8 @@ class LUInstanceSetParams(LogicalUnit):
         link = nicparams[constants.NIC_LINK]
         args['nics'].append((ip, mac, mode, link))
       if constants.DDM_ADD in nic_override:
-        ip = nic_override[constants.DDM_ADD].get('ip', None)
-        mac = nic_override[constants.DDM_ADD]['mac']
+        ip = nic_override[constants.DDM_ADD].get(constants.INIC_IP, None)
+        mac = nic_override[constants.DDM_ADD][constants.INIC_MAC]
         nicparams = self.nic_pnew[constants.DDM_ADD]
         mode = nicparams[constants.NIC_MODE]
         link = nicparams[constants.NIC_LINK]
@@ -9470,8 +10220,15 @@ class LUInstanceSetParams(LogicalUnit):
     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
     if self.op.disk_template:
       env["NEW_DISK_TEMPLATE"] = self.op.disk_template
+
+    return env
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
-    return env, nl, nl
+    return (nl, nl)
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -9517,7 +10274,8 @@ class LUInstanceSetParams(LogicalUnit):
         _CheckNodeNotDrained(self, self.op.remote_node)
         # FIXME: here we assume that the old instance type is DT_PLAIN
         assert instance.disk_template == constants.DT_PLAIN
-        disks = [{"size": d.size, "vg": d.logical_id[0]}
+        disks = [{constants.IDISK_SIZE: d.size,
+                  constants.IDISK_VG: d.logical_id[0]}
                  for d in instance.disks]
         required = _ComputeDiskSizePerVG(self.op.disk_template, disks)
         _CheckNodesFreeDiskPerVG(self, [self.op.remote_node], required)
@@ -9547,6 +10305,7 @@ class LUInstanceSetParams(LogicalUnit):
       self.be_inst = i_bedict # the new dict (without defaults)
     else:
       self.be_new = self.be_inst = {}
+    be_old = cluster.FillBE(instance)
 
     # osparams processing
     if self.op.osparams:
@@ -9558,7 +10317,8 @@ class LUInstanceSetParams(LogicalUnit):
 
     self.warn = []
 
-    if constants.BE_MEMORY in self.op.beparams and not self.op.force:
+    if (constants.BE_MEMORY in self.op.beparams and not self.op.force and
+        be_new[constants.BE_MEMORY] > be_old[constants.BE_MEMORY]):
       mem_check_list = [pnode]
       if be_new[constants.BE_AUTO_BALANCE]:
         # either we changed auto_balance to yes or it was from before
@@ -9599,16 +10359,17 @@ class LUInstanceSetParams(LogicalUnit):
         for node, nres in nodeinfo.items():
           if node not in instance.secondary_nodes:
             continue
-          msg = nres.fail_msg
-          if msg:
-            self.warn.append("Can't get info from secondary node %s: %s" %
-                             (node, msg))
-          elif not isinstance(nres.payload.get('memory_free', None), int):
-            self.warn.append("Secondary node %s didn't return free"
-                             " memory information" % node)
+          nres.Raise("Can't get info from secondary node %s" % node,
+                     prereq=True, ecode=errors.ECODE_STATE)
+          if not isinstance(nres.payload.get('memory_free', None), int):
+            raise errors.OpPrereqError("Secondary node %s didn't return free"
+                                       " memory information" % node,
+                                       errors.ECODE_STATE)
           elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
-            self.warn.append("Not enough memory to failover instance to"
-                             " secondary node %s" % node)
+            raise errors.OpPrereqError("This change will prevent the instance"
+                                       " from failover to its secondary node"
+                                       " %s, due to not enough memory" % node,
+                                       errors.ECODE_STATE)
 
     # NIC processing
     self.nic_pnew = {}
@@ -9662,21 +10423,22 @@ class LUInstanceSetParams(LogicalUnit):
           else:
             raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
       if new_nic_mode == constants.NIC_MODE_ROUTED:
-        if 'ip' in nic_dict:
-          nic_ip = nic_dict['ip']
+        if constants.INIC_IP in nic_dict:
+          nic_ip = nic_dict[constants.INIC_IP]
         else:
           nic_ip = old_nic_ip
         if nic_ip is None:
           raise errors.OpPrereqError('Cannot set the nic ip to None'
                                      ' on a routed nic', errors.ECODE_INVAL)
-      if 'mac' in nic_dict:
-        nic_mac = nic_dict['mac']
+      if constants.INIC_MAC in nic_dict:
+        nic_mac = nic_dict[constants.INIC_MAC]
         if nic_mac is None:
           raise errors.OpPrereqError('Cannot set the nic mac to None',
                                      errors.ECODE_INVAL)
         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
           # otherwise generate the mac
-          nic_dict['mac'] = self.cfg.GenerateMAC(self.proc.GetECId())
+          nic_dict[constants.INIC_MAC] = \
+            self.cfg.GenerateMAC(self.proc.GetECId())
         else:
           # or validate/reserve the current one
           try:
@@ -9723,7 +10485,9 @@ class LUInstanceSetParams(LogicalUnit):
     snode = self.op.remote_node
 
     # create a fake disk info for _GenerateDiskTemplate
-    disk_info = [{"size": d.size, "mode": d.mode} for d in instance.disks]
+    disk_info = [{constants.IDISK_SIZE: d.size, constants.IDISK_MODE: d.mode,
+                  constants.IDISK_VG: d.logical_id[0]}
+                 for d in instance.disks]
     new_disks = _GenerateDiskTemplate(self, self.op.disk_template,
                                       instance.name, pnode, [snode],
                                       disk_info, None, None, 0, feedback_fn)
@@ -9757,7 +10521,8 @@ class LUInstanceSetParams(LogicalUnit):
     self.cfg.Update(instance, feedback_fn)
 
     # disks are created, waiting for sync
-    disk_abort = not _WaitForSync(self, instance)
+    disk_abort = not _WaitForSync(self, instance,
+                                  oneshot=not self.op.wait_for_sync)
     if disk_abort:
       raise errors.OpExecError("There are some degraded disks for"
                                " this instance, please cleanup manually")
@@ -9865,8 +10630,9 @@ class LUInstanceSetParams(LogicalUnit):
                        (new_disk.size, new_disk.mode)))
       else:
         # change a given disk
-        instance.disks[disk_op].mode = disk_dict['mode']
-        result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
+        instance.disks[disk_op].mode = disk_dict[constants.IDISK_MODE]
+        result.append(("disk.mode/%d" % disk_op,
+                       disk_dict[constants.IDISK_MODE]))
 
     if self.op.disk_template:
       r_shut = _ShutdownInstanceDisks(self, instance)
@@ -9889,8 +10655,8 @@ class LUInstanceSetParams(LogicalUnit):
         result.append(("nic.%d" % len(instance.nics), "remove"))
       elif nic_op == constants.DDM_ADD:
         # mac and bridge should be set, by now
-        mac = nic_dict['mac']
-        ip = nic_dict.get('ip', None)
+        mac = nic_dict[constants.INIC_MAC]
+        ip = nic_dict.get(constants.INIC_IP, None)
         nicparams = self.nic_pinst[constants.DDM_ADD]
         new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
         instance.nics.append(new_nic)
@@ -9901,7 +10667,7 @@ class LUInstanceSetParams(LogicalUnit):
                         self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
                        )))
       else:
-        for key in 'mac', 'ip':
+        for key in (constants.INIC_MAC, constants.INIC_IP):
           if key in nic_dict:
             setattr(instance.nics[nic_op], key, nic_dict[key])
         if nic_op in self.nic_pinst:
@@ -9965,7 +10731,7 @@ class LUBackupQuery(NoHooksLU):
         that node.
 
     """
-    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
+    self.nodes = self.glm.list_owned(locking.LEVEL_NODE)
     rpcresult = self.rpc.call_export_list(self.nodes)
     result = {}
     for node in rpcresult:
@@ -10088,12 +10854,18 @@ class LUBackupExport(LogicalUnit):
 
     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
 
+    return env
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
     nl = [self.cfg.GetMasterNode(), self.instance.primary_node]
 
     if self.op.mode == constants.EXPORT_MODE_LOCAL:
       nl.append(self.op.target_node)
 
-    return env, nl, nl
+    return (nl, nl)
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -10341,7 +11113,7 @@ class LUBackupRemove(NoHooksLU):
       fqdn_warn = True
       instance_name = self.op.instance_name
 
-    locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
+    locked_nodes = self.glm.list_owned(locking.LEVEL_NODE)
     exportlist = self.rpc.call_export_list(locked_nodes)
     found = False
     for node in exportlist:
@@ -10403,11 +11175,16 @@ class LUGroupAdd(LogicalUnit):
     """Build hooks env.
 
     """
-    env = {
+    return {
       "GROUP_NAME": self.op.group_name,
       }
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
     mn = self.cfg.GetMasterNode()
-    return env, [mn], [mn]
+    return ([mn], [mn])
 
   def Exec(self, feedback_fn):
     """Add the node group to the cluster.
@@ -10435,20 +11212,40 @@ class LUGroupAssignNodes(NoHooksLU):
 
     # We want to lock all the affected nodes and groups. We have readily
     # available the list of nodes, and the *destination* group. To gather the
-    # list of "source" groups, we need to fetch node information.
-    self.node_data = self.cfg.GetAllNodesInfo()
-    affected_groups = set(self.node_data[node].group for node in self.op.nodes)
-    affected_groups.add(self.group_uuid)
-
+    # list of "source" groups, we need to fetch node information later on.
     self.needed_locks = {
-      locking.LEVEL_NODEGROUP: list(affected_groups),
+      locking.LEVEL_NODEGROUP: set([self.group_uuid]),
       locking.LEVEL_NODE: self.op.nodes,
       }
 
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODEGROUP:
+      assert len(self.needed_locks[locking.LEVEL_NODEGROUP]) == 1
+
+      # Try to get all affected nodes' groups without having the group or node
+      # lock yet. Needs verification later in the code flow.
+      groups = self.cfg.GetNodeGroupsFromNodes(self.op.nodes)
+
+      self.needed_locks[locking.LEVEL_NODEGROUP].update(groups)
+
   def CheckPrereq(self):
     """Check prerequisites.
 
     """
+    assert self.needed_locks[locking.LEVEL_NODEGROUP]
+    assert (frozenset(self.glm.list_owned(locking.LEVEL_NODE)) ==
+            frozenset(self.op.nodes))
+
+    expected_locks = (set([self.group_uuid]) |
+                      self.cfg.GetNodeGroupsFromNodes(self.op.nodes))
+    actual_locks = self.glm.list_owned(locking.LEVEL_NODEGROUP)
+    if actual_locks != expected_locks:
+      raise errors.OpExecError("Nodes changed groups since locks were acquired,"
+                               " current groups are '%s', used to be '%s'" %
+                               (utils.CommaJoin(expected_locks),
+                                utils.CommaJoin(actual_locks)))
+
+    self.node_data = self.cfg.GetAllNodesInfo()
     self.group = self.cfg.GetNodeGroup(self.group_uuid)
     instance_data = self.cfg.GetAllInstancesInfo()
 
@@ -10474,7 +11271,7 @@ class LUGroupAssignNodes(NoHooksLU):
 
         if previous_splits:
           self.LogWarning("In addition, these already-split instances continue"
-                          " to be spit across groups: %s",
+                          " to be split across groups: %s",
                           utils.CommaJoin(utils.NiceSort(previous_splits)))
 
   def Exec(self, feedback_fn):
@@ -10484,6 +11281,9 @@ class LUGroupAssignNodes(NoHooksLU):
     for node in self.op.nodes:
       self.node_data[node].group = self.group_uuid
 
+    # FIXME: Depends on side-effects of modifying the result of
+    # C{cfg.GetAllNodesInfo}
+
     self.cfg.Update(self.group, feedback_fn) # Saves all modified nodes.
 
   @staticmethod
@@ -10563,7 +11363,8 @@ class _GroupQuery(_QueryBase):
           missing.append(name)
 
       if missing:
-        raise errors.OpPrereqError("Some groups do not exist: %s" % missing,
+        raise errors.OpPrereqError("Some groups do not exist: %s" %
+                                   utils.CommaJoin(missing),
                                    errors.ECODE_NOENT)
 
   def DeclareLocks(self, lu, level):
@@ -10674,12 +11475,17 @@ class LUGroupSetParams(LogicalUnit):
     """Build hooks env.
 
     """
-    env = {
+    return {
       "GROUP_NAME": self.op.group_name,
       "NEW_ALLOC_POLICY": self.op.alloc_policy,
       }
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
     mn = self.cfg.GetMasterNode()
-    return env, [mn], [mn]
+    return ([mn], [mn])
 
   def Exec(self, feedback_fn):
     """Modifies the node group.
@@ -10742,11 +11548,16 @@ class LUGroupRemove(LogicalUnit):
     """Build hooks env.
 
     """
-    env = {
+    return {
       "GROUP_NAME": self.op.group_name,
       }
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
     mn = self.cfg.GetMasterNode()
-    return env, [mn], [mn]
+    return ([mn], [mn])
 
   def Exec(self, feedback_fn):
     """Remove the node group.
@@ -10794,21 +11605,25 @@ class LUGroupRename(LogicalUnit):
     """Build hooks env.
 
     """
-    env = {
+    return {
       "OLD_NAME": self.op.group_name,
       "NEW_NAME": self.op.new_name,
       }
 
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
     mn = self.cfg.GetMasterNode()
+
     all_nodes = self.cfg.GetAllNodesInfo()
-    run_nodes = [mn]
     all_nodes.pop(mn, None)
 
-    for node in all_nodes.values():
-      if node.group == self.group_uuid:
-        run_nodes.append(node.name)
+    run_nodes = [mn]
+    run_nodes.extend(node.name for node in all_nodes.values()
+                     if node.group == self.group_uuid)
 
-    return env, run_nodes, run_nodes
+    return (run_nodes, run_nodes)
 
   def Exec(self, feedback_fn):
     """Rename the node group.
@@ -10832,8 +11647,8 @@ class TagsLU(NoHooksLU): # pylint: disable-msg=W0223
   This is an abstract class which is the parent of all the other tags LUs.
 
   """
-
   def ExpandNames(self):
+    self.group_uuid = None
     self.needed_locks = {}
     if self.op.kind == constants.TAG_NODE:
       self.op.name = _ExpandNodeName(self.cfg, self.op.name)
@@ -10841,6 +11656,8 @@ class TagsLU(NoHooksLU): # pylint: disable-msg=W0223
     elif self.op.kind == constants.TAG_INSTANCE:
       self.op.name = _ExpandInstanceName(self.cfg, self.op.name)
       self.needed_locks[locking.LEVEL_INSTANCE] = self.op.name
+    elif self.op.kind == constants.TAG_NODEGROUP:
+      self.group_uuid = self.cfg.LookupNodeGroup(self.op.name)
 
     # FIXME: Acquire BGL for cluster tag operations (as of this writing it's
     # not possible to acquire the BGL based on opcode parameters)
@@ -10855,6 +11672,8 @@ class TagsLU(NoHooksLU): # pylint: disable-msg=W0223
       self.target = self.cfg.GetNodeInfo(self.op.name)
     elif self.op.kind == constants.TAG_INSTANCE:
       self.target = self.cfg.GetInstanceInfo(self.op.name)
+    elif self.op.kind == constants.TAG_NODEGROUP:
+      self.target = self.cfg.GetNodeGroup(self.group_uuid)
     else:
       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
                                  str(self.op.kind), errors.ECODE_INVAL)
@@ -10910,6 +11729,8 @@ class LUTagsSearch(NoHooksLU):
     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
     nlist = cfg.GetAllNodesInfo().values()
     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
+    tgts.extend(("/nodegroup/%s" % n.name, n)
+                for n in cfg.GetAllNodeGroupsInfo().values())
     results = []
     for path, target in tgts:
       for tag in target.GetTags():
@@ -11179,16 +12000,6 @@ class IAllocator(object):
   """
   # pylint: disable-msg=R0902
   # lots of instance attributes
-  _ALLO_KEYS = [
-    "name", "mem_size", "disks", "disk_template",
-    "os", "tags", "nics", "vcpus", "hypervisor",
-    ]
-  _RELO_KEYS = [
-    "name", "relocate_from",
-    ]
-  _EVAC_KEYS = [
-    "evac_nodes",
-    ]
 
   def __init__(self, cfg, rpc, mode, **kwargs):
     self.cfg = cfg
@@ -11203,22 +12014,20 @@ class IAllocator(object):
     self.relocate_from = None
     self.name = None
     self.evac_nodes = None
+    self.instances = None
+    self.reloc_mode = None
+    self.target_groups = None
     # computed fields
     self.required_nodes = None
     # init result fields
     self.success = self.info = self.result = None
-    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
-      keyset = self._ALLO_KEYS
-      fn = self._AddNewInstance
-    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
-      keyset = self._RELO_KEYS
-      fn = self._AddRelocateInstance
-    elif self.mode == constants.IALLOCATOR_MODE_MEVAC:
-      keyset = self._EVAC_KEYS
-      fn = self._AddEvacuateNodes
-    else:
+
+    try:
+      (fn, keyset, self._result_check) = self._MODE_DATA[self.mode]
+    except KeyError:
       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
                                    " IAllocator" % self.mode)
+
     for key in kwargs:
       if key not in keyset:
         raise errors.ProgrammerError("Invalid input parameter '%s' to"
@@ -11229,7 +12038,7 @@ class IAllocator(object):
       if key not in kwargs:
         raise errors.ProgrammerError("Missing input parameter '%s' to"
                                      " IAllocator" % key)
-    self._BuildInputData(fn)
+    self._BuildInputData(compat.partial(fn, self))
 
   def _ComputeClusterData(self):
     """Compute the generic allocator input data.
@@ -11258,7 +12067,8 @@ class IAllocator(object):
       hypervisor_name = self.hypervisor
     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
-    elif self.mode == constants.IALLOCATOR_MODE_MEVAC:
+    elif self.mode in (constants.IALLOCATOR_MODE_MEVAC,
+                       constants.IALLOCATOR_MODE_MRELOC):
       hypervisor_name = cluster_info.enabled_hypervisors[0]
 
     node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
@@ -11284,12 +12094,12 @@ class IAllocator(object):
     """Compute node groups data.
 
     """
-    ng = {}
-    for guuid, gdata in cfg.GetAllNodeGroupsInfo().items():
-      ng[guuid] = {
-        "name": gdata.name,
-        "alloc_policy": gdata.alloc_policy,
-        }
+    ng = dict((guuid, {
+      "name": gdata.name,
+      "alloc_policy": gdata.alloc_policy,
+      })
+      for guuid, gdata in cfg.GetAllNodeGroupsInfo().items())
+
     return ng
 
   @staticmethod
@@ -11300,22 +12110,19 @@ class IAllocator(object):
     @returns: a dict of name: (node dict, node config)
 
     """
-    node_results = {}
-    for ninfo in node_cfg.values():
-      # fill in static (config-based) values
-      pnr = {
-        "tags": list(ninfo.GetTags()),
-        "primary_ip": ninfo.primary_ip,
-        "secondary_ip": ninfo.secondary_ip,
-        "offline": ninfo.offline,
-        "drained": ninfo.drained,
-        "master_candidate": ninfo.master_candidate,
-        "group": ninfo.group,
-        "master_capable": ninfo.master_capable,
-        "vm_capable": ninfo.vm_capable,
-        }
-
-      node_results[ninfo.name] = pnr
+    # fill in static (config-based) values
+    node_results = dict((ninfo.name, {
+      "tags": list(ninfo.GetTags()),
+      "primary_ip": ninfo.primary_ip,
+      "secondary_ip": ninfo.secondary_ip,
+      "offline": ninfo.offline,
+      "drained": ninfo.drained,
+      "master_candidate": ninfo.master_candidate,
+      "group": ninfo.group,
+      "master_capable": ninfo.master_capable,
+      "vm_capable": ninfo.vm_capable,
+      })
+      for ninfo in node_cfg.values())
 
     return node_results
 
@@ -11389,11 +12196,12 @@ class IAllocator(object):
       nic_data = []
       for nic in iinfo.nics:
         filled_params = cluster_info.SimpleFillNIC(nic.nicparams)
-        nic_dict = {"mac": nic.mac,
-                    "ip": nic.ip,
-                    "mode": filled_params[constants.NIC_MODE],
-                    "link": filled_params[constants.NIC_LINK],
-                   }
+        nic_dict = {
+          "mac": nic.mac,
+          "ip": nic.ip,
+          "mode": filled_params[constants.NIC_MODE],
+          "link": filled_params[constants.NIC_LINK],
+          }
         if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
           nic_dict["bridge"] = filled_params[constants.NIC_LINK]
         nic_data.append(nic_dict)
@@ -11405,7 +12213,9 @@ class IAllocator(object):
         "os": iinfo.os,
         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
         "nics": nic_data,
-        "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
+        "disks": [{constants.IDISK_SIZE: dsk.size,
+                   constants.IDISK_MODE: dsk.mode}
+                  for dsk in iinfo.disks],
         "disk_template": iinfo.disk_template,
         "hypervisor": iinfo.hypervisor,
         }
@@ -11431,6 +12241,7 @@ class IAllocator(object):
       self.required_nodes = 2
     else:
       self.required_nodes = 1
+
     request = {
       "name": self.name,
       "disk_template": self.disk_template,
@@ -11443,6 +12254,7 @@ class IAllocator(object):
       "nics": self.nics,
       "required_nodes": self.required_nodes,
       }
+
     return request
 
   def _AddRelocateInstance(self):
@@ -11470,7 +12282,7 @@ class IAllocator(object):
                                  errors.ECODE_STATE)
 
     self.required_nodes = 1
-    disk_sizes = [{'size': disk.size} for disk in instance.disks]
+    disk_sizes = [{constants.IDISK_SIZE: disk.size} for disk in instance.disks]
     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
 
     request = {
@@ -11490,6 +12302,16 @@ class IAllocator(object):
       }
     return request
 
+  def _AddMultiRelocate(self):
+    """Get data for multi-relocate requests.
+
+    """
+    return {
+      "instances": self.instances,
+      "reloc_mode": self.reloc_mode,
+      "target_groups": self.target_groups,
+      }
+
   def _BuildInputData(self, fn):
     """Build input data structures.
 
@@ -11502,6 +12324,28 @@ class IAllocator(object):
 
     self.in_text = serializer.Dump(self.in_data)
 
+  _MODE_DATA = {
+    constants.IALLOCATOR_MODE_ALLOC:
+      (_AddNewInstance,
+       ["name", "mem_size", "disks", "disk_template", "os", "tags", "nics",
+        "vcpus", "hypervisor"], ht.TList),
+    constants.IALLOCATOR_MODE_RELOC:
+      (_AddRelocateInstance, ["name", "relocate_from"], ht.TList),
+    constants.IALLOCATOR_MODE_MEVAC:
+      (_AddEvacuateNodes, ["evac_nodes"],
+       ht.TListOf(ht.TAnd(ht.TIsLength(2),
+                          ht.TListOf(ht.TString)))),
+    constants.IALLOCATOR_MODE_MRELOC:
+      (_AddMultiRelocate, ["instances", "reloc_mode", "target_groups"],
+       ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, {
+         # pylint: disable-msg=E1101
+         # Class '...' has no 'OP_ID' member
+         "OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID,
+                              opcodes.OpInstanceMigrate.OP_ID,
+                              opcodes.OpInstanceReplaceDisks.OP_ID])
+         })))),
+    }
+
   def Run(self, name, validate=True, call_fn=None):
     """Run an instance allocator and return the results.
 
@@ -11542,11 +12386,81 @@ class IAllocator(object):
                                  " missing key '%s'" % key)
       setattr(self, key, rdict[key])
 
-    if not isinstance(rdict["result"], list):
-      raise errors.OpExecError("Can't parse iallocator results: 'result' key"
-                               " is not a list")
+    if not self._result_check(self.result):
+      raise errors.OpExecError("Iallocator returned invalid result,"
+                               " expected %s, got %s" %
+                               (self._result_check, self.result),
+                               errors.ECODE_INVAL)
+
+    if self.mode in (constants.IALLOCATOR_MODE_RELOC,
+                     constants.IALLOCATOR_MODE_MEVAC):
+      node2group = dict((name, ndata["group"])
+                        for (name, ndata) in self.in_data["nodes"].items())
+
+      fn = compat.partial(self._NodesToGroups, node2group,
+                          self.in_data["nodegroups"])
+
+      if self.mode == constants.IALLOCATOR_MODE_RELOC:
+        assert self.relocate_from is not None
+        assert self.required_nodes == 1
+
+        request_groups = fn(self.relocate_from)
+        result_groups = fn(rdict["result"])
+
+        if result_groups != request_groups:
+          raise errors.OpExecError("Groups of nodes returned by iallocator (%s)"
+                                   " differ from original groups (%s)" %
+                                   (utils.CommaJoin(result_groups),
+                                    utils.CommaJoin(request_groups)))
+      elif self.mode == constants.IALLOCATOR_MODE_MEVAC:
+        request_groups = fn(self.evac_nodes)
+        for (instance_name, secnode) in self.result:
+          result_groups = fn([secnode])
+          if result_groups != request_groups:
+            raise errors.OpExecError("Iallocator returned new secondary node"
+                                     " '%s' (group '%s') for instance '%s'"
+                                     " which is not in original group '%s'" %
+                                     (secnode, utils.CommaJoin(result_groups),
+                                      instance_name,
+                                      utils.CommaJoin(request_groups)))
+      else:
+        raise errors.ProgrammerError("Unhandled mode '%s'" % self.mode)
+
     self.out_data = rdict
 
+  @staticmethod
+  def _NodesToGroups(node2group, groups, nodes):
+    """Returns a list of unique group names for a list of nodes.
+
+    @type node2group: dict
+    @param node2group: Map from node name to group UUID
+    @type groups: dict
+    @param groups: Group information
+    @type nodes: list
+    @param nodes: Node names
+
+    """
+    result = set()
+
+    for node in nodes:
+      try:
+        group_uuid = node2group[node]
+      except KeyError:
+        # Ignore unknown node
+        pass
+      else:
+        try:
+          group = groups[group_uuid]
+        except KeyError:
+          # Can't find group, let's use UUID
+          group_name = group_uuid
+        else:
+          group_name = group["name"]
+
+        result.add(group_name)
+
+    return sorted(result)
+
 
 class LUTestAllocator(NoHooksLU):
   """Run allocator tests.
@@ -11594,6 +12508,12 @@ class LUTestAllocator(NoHooksLU):
       if not hasattr(self.op, "evac_nodes"):
         raise errors.OpPrereqError("Missing attribute 'evac_nodes' on"
                                    " opcode input", errors.ECODE_INVAL)
+    elif self.op.mode == constants.IALLOCATOR_MODE_MRELOC:
+      if self.op.instances:
+        self.op.instances = _GetWantedInstances(self, self.op.instances)
+      else:
+        raise errors.OpPrereqError("Missing instances to relocate",
+                                   errors.ECODE_INVAL)
     else:
       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
                                  self.op.mode, errors.ECODE_INVAL)
@@ -11633,6 +12553,12 @@ class LUTestAllocator(NoHooksLU):
       ial = IAllocator(self.cfg, self.rpc,
                        mode=self.op.mode,
                        evac_nodes=self.op.evac_nodes)
+    elif self.op.mode == constants.IALLOCATOR_MODE_MRELOC:
+      ial = IAllocator(self.cfg, self.rpc,
+                       mode=self.op.mode,
+                       instances=self.op.instances,
+                       reloc_mode=self.op.reloc_mode,
+                       target_groups=self.op.target_groups)
     else:
       raise errors.ProgrammerError("Uncatched mode %s in"
                                    " LUTestAllocator.Exec", self.op.mode)
@@ -11653,13 +12579,13 @@ _QUERY_IMPL = {
   constants.QR_OS: _OsQuery,
   }
 
-assert set(_QUERY_IMPL.keys()) == constants.QR_OP_QUERY
+assert set(_QUERY_IMPL.keys()) == constants.QR_VIA_OP
 
 
 def _GetQueryImplementation(name):
   """Returns the implemtnation for a query type.
 
-  @param name: Query type, must be one of L{constants.QR_OP_QUERY}
+  @param name: Query type, must be one of L{constants.QR_VIA_OP}
 
   """
   try: