Fix group verification of offline nodes
[ganeti-local] / lib / cmdlib.py
index 0c24ecd..41ba0e4 100644 (file)
@@ -40,6 +40,7 @@ import socket
 import tempfile
 import shutil
 import itertools
+import operator
 
 from ganeti import ssh
 from ganeti import utils
@@ -57,23 +58,11 @@ from ganeti import netutils
 from ganeti import query
 from ganeti import qlang
 from ganeti import opcodes
+from ganeti import ht
 
 import ganeti.masterd.instance # pylint: disable-msg=W0611
 
 
-def _SupportsOob(cfg, node):
-  """Tells if node supports OOB.
-
-  @type cfg: L{config.ConfigWriter}
-  @param cfg: The cluster configuration
-  @type node: L{objects.Node}
-  @param node: The node
-  @return: The OOB script if supported or an empty string otherwise
-
-  """
-  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
-
-
 class ResultWithJobs:
   """Data container for LU results with jobs.
 
@@ -559,6 +548,26 @@ class _QueryBase:
                                     sort_by_name=self.sort_by_name)
 
 
+def _ShareAll():
+  """Returns a dict declaring all lock levels shared.
+
+  """
+  return dict.fromkeys(locking.LEVELS, 1)
+
+
+def _SupportsOob(cfg, node):
+  """Tells if node supports OOB.
+
+  @type cfg: L{config.ConfigWriter}
+  @param cfg: The cluster configuration
+  @type node: L{objects.Node}
+  @param node: The node
+  @return: The OOB script if supported or an empty string otherwise
+
+  """
+  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
+
+
 def _GetWantedNodes(lu, nodes):
   """Returns list of checked and expanded node names.
 
@@ -675,6 +684,19 @@ def _ReleaseLocks(lu, level, names=None, keep=None):
     assert not lu.glm.is_owned(level), "No locks should be owned"
 
 
+def _MapInstanceDisksToNodes(instances):
+  """Creates a map from (node, volume) to instance name.
+
+  @type instances: list of L{objects.Instance}
+  @rtype: dict; tuple of (node name, volume name) as key, instance name as value
+
+  """
+  return dict(((node, vol), inst.name)
+              for inst in instances
+              for (node, vols) in inst.MapLVsByNode().items()
+              for vol in vols)
+
+
 def _RunPostHook(lu, node_name):
   """Runs the post-hook for an opcode on a single node.
 
@@ -860,7 +882,7 @@ def _ExpandInstanceName(cfg, name):
 
 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
                           memory, vcpus, nics, disk_template, disks,
-                          bep, hvp, hypervisor_name):
+                          bep, hvp, hypervisor_name, tags):
   """Builds instance related env variables for hooks
 
   This builds the hook environment from individual variables.
@@ -892,6 +914,8 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
   @param hvp: the hypervisor parameters for the instance
   @type hypervisor_name: string
   @param hypervisor_name: the hypervisor for the instance
+  @type tags: list
+  @param tags: list of instance tags as strings
   @rtype: dict
   @return: the hook environment for this instance
 
@@ -939,6 +963,11 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
 
   env["INSTANCE_DISK_COUNT"] = disk_count
 
+  if not tags:
+    tags = []
+
+  env["INSTANCE_TAGS"] = " ".join(tags)
+
   for source, kind in [(bep, "BE"), (hvp, "HV")]:
     for key, value in source.items():
       env["INSTANCE_%s_%s" % (kind, key)] = value
@@ -989,19 +1018,20 @@ def _BuildInstanceHookEnvByObject(lu, instance, override=None):
   bep = cluster.FillBE(instance)
   hvp = cluster.FillHV(instance)
   args = {
-    'name': instance.name,
-    'primary_node': instance.primary_node,
-    'secondary_nodes': instance.secondary_nodes,
-    'os_type': instance.os,
-    'status': instance.admin_up,
-    'memory': bep[constants.BE_MEMORY],
-    'vcpus': bep[constants.BE_VCPUS],
-    'nics': _NICListToTuple(lu, instance.nics),
-    'disk_template': instance.disk_template,
-    'disks': [(disk.size, disk.mode) for disk in instance.disks],
-    'bep': bep,
-    'hvp': hvp,
-    'hypervisor_name': instance.hypervisor,
+    "name": instance.name,
+    "primary_node": instance.primary_node,
+    "secondary_nodes": instance.secondary_nodes,
+    "os_type": instance.os,
+    "status": instance.admin_up,
+    "memory": bep[constants.BE_MEMORY],
+    "vcpus": bep[constants.BE_VCPUS],
+    "nics": _NICListToTuple(lu, instance.nics),
+    "disk_template": instance.disk_template,
+    "disks": [(disk.size, disk.mode) for disk in instance.disks],
+    "bep": bep,
+    "hvp": hvp,
+    "hypervisor_name": instance.hypervisor,
+    "tags": instance.tags,
   }
   if override:
     args.update(override)
@@ -1067,9 +1097,13 @@ def _CheckOSVariant(os_obj, name):
   @param name: OS name passed by the user, to check for validity
 
   """
+  variant = objects.OS.GetVariant(name)
   if not os_obj.supported_variants:
+    if variant:
+      raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'"
+                                 " passed)" % (os_obj.name, variant),
+                                 errors.ECODE_INVAL)
     return
-  variant = objects.OS.GetVariant(name)
   if not variant:
     raise errors.OpPrereqError("OS name must include a variant",
                                errors.ECODE_INVAL)
@@ -1153,7 +1187,7 @@ def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
   iallocator = getattr(lu.op, iallocator_slot, None)
 
   if node is not None and iallocator is not None:
-    raise errors.OpPrereqError("Do not specify both, iallocator and node.",
+    raise errors.OpPrereqError("Do not specify both, iallocator and node",
                                errors.ECODE_INVAL)
   elif node is None and iallocator is None:
     default_iallocator = lu.cfg.GetDefaultIAllocator()
@@ -1161,10 +1195,10 @@ def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
       setattr(lu.op, iallocator_slot, default_iallocator)
     else:
       raise errors.OpPrereqError("No iallocator or node given and no"
-                                 " cluster-wide default iallocator found."
-                                 " Please specify either an iallocator or a"
+                                 " cluster-wide default iallocator found;"
+                                 " please specify either an iallocator or a"
                                  " node, or set a cluster-wide default"
-                                 " iallocator.")
+                                 " iallocator")
 
 
 class LUClusterPostInit(LogicalUnit):
@@ -1253,7 +1287,7 @@ class LUClusterDestroy(LogicalUnit):
 
 
 def _VerifyCertificate(filename):
-  """Verifies a certificate for LUClusterVerify.
+  """Verifies a certificate for L{LUClusterVerifyConfig}.
 
   @type filename: string
   @param filename: Path to PEM file
@@ -1263,7 +1297,7 @@ def _VerifyCertificate(filename):
     cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
                                            utils.ReadFile(filename))
   except Exception, err: # pylint: disable-msg=W0703
-    return (LUClusterVerify.ETYPE_ERROR,
+    return (LUClusterVerifyConfig.ETYPE_ERROR,
             "Failed to load X509 certificate %s: %s" % (filename, err))
 
   (errcode, msg) = \
@@ -1278,21 +1312,52 @@ def _VerifyCertificate(filename):
   if errcode is None:
     return (None, fnamemsg)
   elif errcode == utils.CERT_WARNING:
-    return (LUClusterVerify.ETYPE_WARNING, fnamemsg)
+    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
   elif errcode == utils.CERT_ERROR:
-    return (LUClusterVerify.ETYPE_ERROR, fnamemsg)
+    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
 
   raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
 
 
-class LUClusterVerify(LogicalUnit):
-  """Verifies the cluster status.
+def _GetAllHypervisorParameters(cluster, instances):
+  """Compute the set of all hypervisor parameters.
+
+  @type cluster: L{objects.Cluster}
+  @param cluster: the cluster object
+  @param instances: list of L{objects.Instance}
+  @param instances: additional instances from which to obtain parameters
+  @rtype: list of (origin, hypervisor, parameters)
+  @return: a list with all parameters found, indicating the hypervisor they
+       apply to, and the origin (can be "cluster", "os X", or "instance Y")
 
   """
-  HPATH = "cluster-verify"
-  HTYPE = constants.HTYPE_CLUSTER
-  REQ_BGL = False
+  hvp_data = []
+
+  for hv_name in cluster.enabled_hypervisors:
+    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
+
+  for os_name, os_hvp in cluster.os_hvp.items():
+    for hv_name, hv_params in os_hvp.items():
+      if hv_params:
+        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
+        hvp_data.append(("os %s" % os_name, hv_name, full_params))
 
+  # TODO: collapse identical parameter values in a single one
+  for instance in instances:
+    if instance.hvparams:
+      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
+                       cluster.FillHV(instance)))
+
+  return hvp_data
+
+
+class _VerifyErrors(object):
+  """Mix-in for cluster/group verify LUs.
+
+  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
+  self.op and self._feedback_fn to be available.)
+
+  """
   TCLUSTER = "cluster"
   TNODE = "node"
   TINSTANCE = "instance"
@@ -1300,6 +1365,8 @@ class LUClusterVerify(LogicalUnit):
   ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
   ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
   ECLUSTERFILECHECK = (TCLUSTER, "ECLUSTERFILECHECK")
+  ECLUSTERDANGLINGNODES = (TNODE, "ECLUSTERDANGLINGNODES")
+  ECLUSTERDANGLINGINST = (TNODE, "ECLUSTERDANGLINGINST")
   EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
   EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
   EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
@@ -1329,6 +1396,140 @@ class LUClusterVerify(LogicalUnit):
   ETYPE_ERROR = "ERROR"
   ETYPE_WARNING = "WARNING"
 
+  def _Error(self, ecode, item, msg, *args, **kwargs):
+    """Format an error message.
+
+    Based on the opcode's error_codes parameter, either format a
+    parseable error code, or a simpler error string.
+
+    This must be called only from Exec and functions called from Exec.
+
+    """
+    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
+    itype, etxt = ecode
+    # first complete the msg
+    if args:
+      msg = msg % args
+    # then format the whole message
+    if self.op.error_codes: # This is a mix-in. pylint: disable-msg=E1101
+      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
+    else:
+      if item:
+        item = " " + item
+      else:
+        item = ""
+      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
+    # and finally report it via the feedback_fn
+    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable-msg=E1101
+
+  def _ErrorIf(self, cond, *args, **kwargs):
+    """Log an error message if the passed condition is True.
+
+    """
+    cond = (bool(cond)
+            or self.op.debug_simulate_errors) # pylint: disable-msg=E1101
+    if cond:
+      self._Error(*args, **kwargs)
+    # do not mark the operation as failed for WARN cases only
+    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
+      self.bad = self.bad or cond
+
+
+class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
+  """Verifies the cluster config.
+
+  """
+  REQ_BGL = True
+
+  def _VerifyHVP(self, hvp_data):
+    """Verifies locally the syntax of the hypervisor parameters.
+
+    """
+    for item, hv_name, hv_params in hvp_data:
+      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
+             (item, hv_name))
+      try:
+        hv_class = hypervisor.GetHypervisor(hv_name)
+        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
+        hv_class.CheckParameterSyntax(hv_params)
+      except errors.GenericError, err:
+        self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err))
+
+  def ExpandNames(self):
+    # Information can be safely retrieved as the BGL is acquired in exclusive
+    # mode
+    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
+    self.all_node_info = self.cfg.GetAllNodesInfo()
+    self.all_inst_info = self.cfg.GetAllInstancesInfo()
+    self.needed_locks = {}
+
+  def Exec(self, feedback_fn):
+    """Verify integrity of cluster, performing various test on nodes.
+
+    """
+    self.bad = False
+    self._feedback_fn = feedback_fn
+
+    feedback_fn("* Verifying cluster config")
+
+    for msg in self.cfg.VerifyConfig():
+      self._ErrorIf(True, self.ECLUSTERCFG, None, msg)
+
+    feedback_fn("* Verifying cluster certificate files")
+
+    for cert_filename in constants.ALL_CERT_FILES:
+      (errcode, msg) = _VerifyCertificate(cert_filename)
+      self._ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
+
+    feedback_fn("* Verifying hypervisor parameters")
+
+    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
+                                                self.all_inst_info.values()))
+
+    feedback_fn("* Verifying all nodes belong to an existing group")
+
+    # We do this verification here because, should this bogus circumstance
+    # occur, it would never be caught by VerifyGroup, which only acts on
+    # nodes/instances reachable from existing node groups.
+
+    dangling_nodes = set(node.name for node in self.all_node_info.values()
+                         if node.group not in self.all_group_info)
+
+    dangling_instances = {}
+    no_node_instances = []
+
+    for inst in self.all_inst_info.values():
+      if inst.primary_node in dangling_nodes:
+        dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
+      elif inst.primary_node not in self.all_node_info:
+        no_node_instances.append(inst.name)
+
+    pretty_dangling = [
+        "%s (%s)" %
+        (node.name,
+         utils.CommaJoin(dangling_instances.get(node.name,
+                                                ["no instances"])))
+        for node in dangling_nodes]
+
+    self._ErrorIf(bool(dangling_nodes), self.ECLUSTERDANGLINGNODES, None,
+                  "the following nodes (and their instances) belong to a non"
+                  " existing group: %s", utils.CommaJoin(pretty_dangling))
+
+    self._ErrorIf(bool(no_node_instances), self.ECLUSTERDANGLINGINST, None,
+                  "the following instances have a non-existing primary-node:"
+                  " %s", utils.CommaJoin(no_node_instances))
+
+    return (not self.bad, [g.name for g in self.all_group_info.values()])
+
+
+class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
+  """Verifies the status of a node group.
+
+  """
+  HPATH = "cluster-verify"
+  HTYPE = constants.HTYPE_CLUSTER
+  REQ_BGL = False
+
   _HOOKS_INDENT_RE = re.compile("^", re.M)
 
   class NodeImage(object):
@@ -1382,48 +1583,87 @@ class LUClusterVerify(LogicalUnit):
       self.oslist = {}
 
   def ExpandNames(self):
+    # This raises errors.OpPrereqError on its own:
+    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
+
+    # Get instances in node group; this is unsafe and needs verification later
+    inst_names = self.cfg.GetNodeGroupInstances(self.group_uuid)
+
     self.needed_locks = {
-      locking.LEVEL_NODE: locking.ALL_SET,
-      locking.LEVEL_INSTANCE: locking.ALL_SET,
-    }
-    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
+      locking.LEVEL_INSTANCE: inst_names,
+      locking.LEVEL_NODEGROUP: [self.group_uuid],
+      locking.LEVEL_NODE: [],
+      }
 
-  def _Error(self, ecode, item, msg, *args, **kwargs):
-    """Format an error message.
+    self.share_locks = _ShareAll()
 
-    Based on the opcode's error_codes parameter, either format a
-    parseable error code, or a simpler error string.
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      # Get members of node group; this is unsafe and needs verification later
+      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
 
-    This must be called only from Exec and functions called from Exec.
+      all_inst_info = self.cfg.GetAllInstancesInfo()
 
-    """
-    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
-    itype, etxt = ecode
-    # first complete the msg
-    if args:
-      msg = msg % args
-    # then format the whole message
-    if self.op.error_codes:
-      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
-    else:
-      if item:
-        item = " " + item
-      else:
-        item = ""
-      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
-    # and finally report it via the feedback_fn
-    self._feedback_fn("  - %s" % msg)
+      # In Exec(), we warn about mirrored instances that have primary and
+      # secondary living in separate node groups. To fully verify that
+      # volumes for these instances are healthy, we will need to do an
+      # extra call to their secondaries. We ensure here those nodes will
+      # be locked.
+      for inst in self.glm.list_owned(locking.LEVEL_INSTANCE):
+        # Important: access only the instances whose lock is owned
+        if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
+          nodes.update(all_inst_info[inst].secondary_nodes)
 
-  def _ErrorIf(self, cond, *args, **kwargs):
-    """Log an error message if the passed condition is True.
+      self.needed_locks[locking.LEVEL_NODE] = nodes
 
-    """
-    cond = bool(cond) or self.op.debug_simulate_errors
-    if cond:
-      self._Error(*args, **kwargs)
-    # do not mark the operation as failed for WARN cases only
-    if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
-      self.bad = self.bad or cond
+  def CheckPrereq(self):
+    group_nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
+    group_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
+
+    unlocked_nodes = \
+        group_nodes.difference(self.glm.list_owned(locking.LEVEL_NODE))
+
+    unlocked_instances = \
+        group_instances.difference(self.glm.list_owned(locking.LEVEL_INSTANCE))
+
+    if unlocked_nodes:
+      raise errors.OpPrereqError("Missing lock for nodes: %s" %
+                                 utils.CommaJoin(unlocked_nodes))
+
+    if unlocked_instances:
+      raise errors.OpPrereqError("Missing lock for instances: %s" %
+                                 utils.CommaJoin(unlocked_instances))
+
+    self.all_node_info = self.cfg.GetAllNodesInfo()
+    self.all_inst_info = self.cfg.GetAllInstancesInfo()
+
+    self.my_node_names = utils.NiceSort(group_nodes)
+    self.my_inst_names = utils.NiceSort(group_instances)
+
+    self.my_node_info = dict((name, self.all_node_info[name])
+                             for name in self.my_node_names)
+
+    self.my_inst_info = dict((name, self.all_inst_info[name])
+                             for name in self.my_inst_names)
+
+    # We detect here the nodes that will need the extra RPC calls for verifying
+    # split LV volumes; they should be locked.
+    extra_lv_nodes = set()
+
+    for inst in self.my_inst_info.values():
+      if inst.disk_template in constants.DTS_INT_MIRROR:
+        group = self.my_node_info[inst.primary_node].group
+        for nname in inst.secondary_nodes:
+          if self.all_node_info[nname].group != group:
+            extra_lv_nodes.add(nname)
+
+    unlocked_lv_nodes = \
+        extra_lv_nodes.difference(self.glm.list_owned(locking.LEVEL_NODE))
+
+    if unlocked_lv_nodes:
+      raise errors.OpPrereqError("these nodes could be locked: %s" %
+                                 utils.CommaJoin(unlocked_lv_nodes))
+    self.extra_lv_nodes = list(extra_lv_nodes)
 
   def _VerifyNode(self, ninfo, nresult):
     """Perform some basic validation on data returned from a node.
@@ -1531,7 +1771,7 @@ class LUClusterVerify(LogicalUnit):
              ntime_diff)
 
   def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
-    """Check the node time.
+    """Check the node LVM results.
 
     @type ninfo: L{objects.Node}
     @param ninfo: the node to check
@@ -1567,8 +1807,31 @@ class LUClusterVerify(LogicalUnit):
         _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
                  " '%s' of VG '%s'", pvname, owner_vg)
 
+  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
+    """Check the node bridges.
+
+    @type ninfo: L{objects.Node}
+    @param ninfo: the node to check
+    @param nresult: the remote results for the node
+    @param bridges: the expected list of bridges
+
+    """
+    if not bridges:
+      return
+
+    node = ninfo.name
+    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+
+    missing = nresult.get(constants.NV_BRIDGES, None)
+    test = not isinstance(missing, list)
+    _ErrorIf(test, self.ENODENET, node,
+             "did not return valid bridge information")
+    if not test:
+      _ErrorIf(bool(missing), self.ENODENET, node, "missing bridges: %s" %
+               utils.CommaJoin(sorted(missing)))
+
   def _VerifyNodeNetwork(self, ninfo, nresult):
-    """Check the node time.
+    """Check the node network connectivity results.
 
     @type ninfo: L{objects.Node}
     @param ninfo: the node to check
@@ -1640,12 +1903,6 @@ class LUClusterVerify(LogicalUnit):
                "instance not running on its primary node %s",
                node_current)
 
-    for node, n_img in node_image.items():
-      if node != node_current:
-        test = instance in n_img.instances
-        _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
-                 "instance should not run on node %s", node)
-
     diskdata = [(nname, success, status, idx)
                 for (nname, disks) in diskstatus.items()
                 for idx, (success, status) in enumerate(disks)]
@@ -1685,18 +1942,6 @@ class LUClusterVerify(LogicalUnit):
         self._ErrorIf(test, self.ENODEORPHANLV, node,
                       "volume %s is unknown", volume)
 
-  def _VerifyOrphanInstances(self, instancelist, node_image):
-    """Verify the list of running instances.
-
-    This checks what instances are running but unknown to the cluster.
-
-    """
-    for node, n_img in node_image.items():
-      for o_inst in n_img.instances:
-        test = o_inst not in instancelist
-        self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
-                      "instance %s on node %s should not exist", o_inst, node)
-
   def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
     """Verify N+1 Memory Resilience.
 
@@ -1743,7 +1988,7 @@ class LUClusterVerify(LogicalUnit):
     @param all_nvinfo: RPC results
 
     """
-    node_names = frozenset(node.name for node in nodeinfo)
+    node_names = frozenset(node.name for node in nodeinfo if not node.offline)
 
     assert master_node in node_names
     assert (len(files_all | files_all_opt | files_mc | files_vm) ==
@@ -1762,6 +2007,9 @@ class LUClusterVerify(LogicalUnit):
     fileinfo = dict((filename, {}) for filename in file2nodefn.keys())
 
     for node in nodeinfo:
+      if node.offline:
+        continue
+
       nresult = all_nvinfo[node.name]
 
       if nresult.fail_msg or not nresult.payload:
@@ -1796,8 +2044,8 @@ class LUClusterVerify(LogicalUnit):
         # All or no nodes
         errorif(missing_file and missing_file != node_names,
                 cls.ECLUSTERFILECHECK, None,
-                "File %s is optional, but it must exist on all or no nodes (not"
-                " found on %s)",
+                "File %s is optional, but it must exist on all or no"
+                " nodes (not found on %s)",
                 filename, utils.CommaJoin(utils.NiceSort(missing_file)))
       else:
         errorif(missing_file, cls.ECLUSTERFILECHECK, None,
@@ -2179,20 +2427,6 @@ class LUClusterVerify(LogicalUnit):
 
     return instdisk
 
-  def _VerifyHVP(self, hvp_data):
-    """Verifies locally the syntax of the hypervisor parameters.
-
-    """
-    for item, hv_name, hv_params in hvp_data:
-      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
-             (item, hv_name))
-      try:
-        hv_class = hypervisor.GetHypervisor(hv_name)
-        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
-        hv_class.CheckParameterSyntax(hv_params)
-      except errors.GenericError, err:
-        self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err))
-
   def BuildHooksEnv(self):
     """Build hooks env.
 
@@ -2200,14 +2434,12 @@ class LUClusterVerify(LogicalUnit):
     the output be logged in the verify output and the verification to fail.
 
     """
-    cfg = self.cfg
-
     env = {
-      "CLUSTER_TAGS": " ".join(cfg.GetClusterInfo().GetTags())
+      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
       }
 
     env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
-               for node in cfg.GetAllNodesInfo().values())
+               for node in self.my_node_info.values())
 
     return env
 
@@ -2215,37 +2447,31 @@ class LUClusterVerify(LogicalUnit):
     """Build hooks nodes.
 
     """
-    return ([], self.cfg.GetNodeList())
+    return ([], self.my_node_names)
 
   def Exec(self, feedback_fn):
-    """Verify integrity of cluster, performing various test on nodes.
+    """Verify integrity of the node group, performing various test on nodes.
 
     """
     # This method has too many local variables. pylint: disable-msg=R0914
+
+    if not self.my_node_names:
+      # empty node group
+      feedback_fn("* Empty node group, skipping verification")
+      return True
+
     self.bad = False
     _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
     verbose = self.op.verbose
     self._feedback_fn = feedback_fn
-    feedback_fn("* Verifying global settings")
-    for msg in self.cfg.VerifyConfig():
-      _ErrorIf(True, self.ECLUSTERCFG, None, msg)
-
-    # Check the cluster certificates
-    for cert_filename in constants.ALL_CERT_FILES:
-      (errcode, msg) = _VerifyCertificate(cert_filename)
-      _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
 
     vg_name = self.cfg.GetVGName()
     drbd_helper = self.cfg.GetDRBDHelper()
-    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
     cluster = self.cfg.GetClusterInfo()
-    nodelist = utils.NiceSort(self.cfg.GetNodeList())
-    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
-    nodeinfo_byname = dict(zip(nodelist, nodeinfo))
-    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
-    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
-                        for iname in instancelist)
     groupinfo = self.cfg.GetAllNodeGroupsInfo()
+    hypervisors = cluster.enabled_hypervisors
+    node_data_list = [self.my_node_info[name] for name in self.my_node_names]
+
     i_non_redundant = [] # Non redundant instances
     i_non_a_balanced = [] # Non auto-balanced instances
     n_offline = 0 # Count of offline nodes
@@ -2261,37 +2487,32 @@ class LUClusterVerify(LogicalUnit):
     master_node = self.master_node = self.cfg.GetMasterNode()
     master_ip = self.cfg.GetMasterIP()
 
-    # Compute the set of hypervisor parameters
-    hvp_data = []
-    for hv_name in hypervisors:
-      hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
-    for os_name, os_hvp in cluster.os_hvp.items():
-      for hv_name, hv_params in os_hvp.items():
-        if not hv_params:
-          continue
-        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
-        hvp_data.append(("os %s" % os_name, hv_name, full_params))
-    # TODO: collapse identical parameter values in a single one
-    for instance in instanceinfo.values():
-      if not instance.hvparams:
-        continue
-      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
-                       cluster.FillHV(instance)))
-    # and verify them locally
-    self._VerifyHVP(hvp_data)
+    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
+
+    # We will make nodes contact all nodes in their group, and one node from
+    # every other group.
+    # TODO: should it be a *random* node, different every time?
+    online_nodes = [node.name for node in node_data_list if not node.offline]
+    other_group_nodes = {}
+
+    for name in sorted(self.all_node_info):
+      node = self.all_node_info[name]
+      if (node.group not in other_group_nodes
+          and node.group != self.group_uuid
+          and not node.offline):
+        other_group_nodes[node.group] = node.name
 
-    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
     node_verify_param = {
       constants.NV_FILELIST:
         utils.UniqueSequence(filename
                              for files in filemap
                              for filename in files),
-      constants.NV_NODELIST: [node.name for node in nodeinfo
-                              if not node.offline],
+      constants.NV_NODELIST: online_nodes + other_group_nodes.values(),
       constants.NV_HYPERVISOR: hypervisors,
-      constants.NV_HVPARAMS: hvp_data,
-      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
-                                  node.secondary_ip) for node in nodeinfo
+      constants.NV_HVPARAMS:
+        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
+      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
+                                 for node in node_data_list
                                  if not node.offline],
       constants.NV_INSTANCELIST: hypervisors,
       constants.NV_VERSION: None,
@@ -2312,15 +2533,30 @@ class LUClusterVerify(LogicalUnit):
     if drbd_helper:
       node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
 
+    # bridge checks
+    # FIXME: this needs to be changed per node-group, not cluster-wide
+    bridges = set()
+    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
+    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
+      bridges.add(default_nicpp[constants.NIC_LINK])
+    for instance in self.my_inst_info.values():
+      for nic in instance.nics:
+        full_nic = cluster.SimpleFillNIC(nic.nicparams)
+        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
+          bridges.add(full_nic[constants.NIC_LINK])
+
+    if bridges:
+      node_verify_param[constants.NV_BRIDGES] = list(bridges)
+
     # Build our expected cluster state
     node_image = dict((node.name, self.NodeImage(offline=node.offline,
                                                  name=node.name,
                                                  vm_capable=node.vm_capable))
-                      for node in nodeinfo)
+                      for node in node_data_list)
 
     # Gather OOB paths
     oob_paths = []
-    for node in nodeinfo:
+    for node in self.all_node_info.values():
       path = _SupportsOob(self.cfg, node)
       if path and path not in oob_paths:
         oob_paths.append(path)
@@ -2328,14 +2564,13 @@ class LUClusterVerify(LogicalUnit):
     if oob_paths:
       node_verify_param[constants.NV_OOB_PATHS] = oob_paths
 
-    for instance in instancelist:
-      inst_config = instanceinfo[instance]
+    for instance in self.my_inst_names:
+      inst_config = self.my_inst_info[instance]
 
       for nname in inst_config.all_nodes:
         if nname not in node_image:
-          # ghost node
           gnode = self.NodeImage(name=nname)
-          gnode.ghost = True
+          gnode.ghost = (nname not in self.all_node_info)
           node_image[nname] = gnode
 
       inst_config.MapLVsByNode(node_vol_should)
@@ -2358,23 +2593,60 @@ class LUClusterVerify(LogicalUnit):
     # time before and after executing the request, we can at least have a time
     # window.
     nvinfo_starttime = time.time()
-    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
+    all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
+                                           node_verify_param,
                                            self.cfg.GetClusterName())
     nvinfo_endtime = time.time()
 
+    if self.extra_lv_nodes and vg_name is not None:
+      extra_lv_nvinfo = \
+          self.rpc.call_node_verify(self.extra_lv_nodes,
+                                    {constants.NV_LVLIST: vg_name},
+                                    self.cfg.GetClusterName())
+    else:
+      extra_lv_nvinfo = {}
+
     all_drbd_map = self.cfg.ComputeDRBDMap()
 
-    feedback_fn("* Gathering disk information (%s nodes)" % len(nodelist))
-    instdisk = self._CollectDiskInfo(nodelist, node_image, instanceinfo)
+    feedback_fn("* Gathering disk information (%s nodes)" %
+                len(self.my_node_names))
+    instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
+                                     self.my_inst_info)
 
     feedback_fn("* Verifying configuration file consistency")
-    self._VerifyFiles(_ErrorIf, nodeinfo, master_node, all_nvinfo, filemap)
+
+    # If not all nodes are being checked, we need to make sure the master node
+    # and a non-checked vm_capable node are in the list.
+    absent_nodes = set(self.all_node_info).difference(self.my_node_info)
+    if absent_nodes:
+      vf_nvinfo = all_nvinfo.copy()
+      vf_node_info = list(self.my_node_info.values())
+      additional_nodes = []
+      if master_node not in self.my_node_info:
+        additional_nodes.append(master_node)
+        vf_node_info.append(self.all_node_info[master_node])
+      # Add the first vm_capable node we find which is not included
+      for node in absent_nodes:
+        nodeinfo = self.all_node_info[node]
+        if nodeinfo.vm_capable and not nodeinfo.offline:
+          additional_nodes.append(node)
+          vf_node_info.append(self.all_node_info[node])
+          break
+      key = constants.NV_FILELIST
+      vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
+                                                 {key: node_verify_param[key]},
+                                                 self.cfg.GetClusterName()))
+    else:
+      vf_nvinfo = all_nvinfo
+      vf_node_info = self.my_node_info.values()
+
+    self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
 
     feedback_fn("* Verifying node status")
 
     refos_img = None
 
-    for node_i in nodeinfo:
+    for node_i in node_data_list:
       node = node_i.name
       nimg = node_image[node]
 
@@ -2411,23 +2683,41 @@ class LUClusterVerify(LogicalUnit):
 
       if nimg.vm_capable:
         self._VerifyNodeLVM(node_i, nresult, vg_name)
-        self._VerifyNodeDrbd(node_i, nresult, instanceinfo, drbd_helper,
+        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
                              all_drbd_map)
 
         self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
         self._UpdateNodeInstances(node_i, nresult, nimg)
         self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
         self._UpdateNodeOS(node_i, nresult, nimg)
+
         if not nimg.os_fail:
           if refos_img is None:
             refos_img = nimg
           self._VerifyNodeOS(node_i, nimg, refos_img)
+        self._VerifyNodeBridges(node_i, nresult, bridges)
+
+        # Check whether all running instancies are primary for the node. (This
+        # can no longer be done from _VerifyInstance below, since some of the
+        # wrong instances could be from other node groups.)
+        non_primary_inst = set(nimg.instances).difference(nimg.pinst)
+
+        for inst in non_primary_inst:
+          test = inst in self.all_inst_info
+          _ErrorIf(test, self.EINSTANCEWRONGNODE, inst,
+                   "instance should not run on node %s", node_i.name)
+          _ErrorIf(not test, self.ENODEORPHANINSTANCE, node_i.name,
+                   "node is running unknown instance %s", inst)
+
+    for node, result in extra_lv_nvinfo.items():
+      self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
+                              node_image[node], vg_name)
 
     feedback_fn("* Verifying instance status")
-    for instance in instancelist:
+    for instance in self.my_inst_names:
       if verbose:
         feedback_fn("* Verifying instance %s" % instance)
-      inst_config = instanceinfo[instance]
+      inst_config = self.my_inst_info[instance]
       self._VerifyInstance(instance, inst_config, node_image,
                            instdisk[instance])
       inst_nodes_offline = []
@@ -2462,7 +2752,7 @@ class LUClusterVerify(LogicalUnit):
         instance_groups = {}
 
         for node in instance_nodes:
-          instance_groups.setdefault(nodeinfo_byname[node].group,
+          instance_groups.setdefault(self.all_node_info[node].group,
                                      []).append(node)
 
         pretty_list = [
@@ -2501,14 +2791,22 @@ class LUClusterVerify(LogicalUnit):
 
     feedback_fn("* Verifying orphan volumes")
     reserved = utils.FieldSet(*cluster.reserved_lvs)
-    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
 
-    feedback_fn("* Verifying orphan instances")
-    self._VerifyOrphanInstances(instancelist, node_image)
+    # We will get spurious "unknown volume" warnings if any node of this group
+    # is secondary for an instance whose primary is in another group. To avoid
+    # them, we find these instances and add their volumes to node_vol_should.
+    for inst in self.all_inst_info.values():
+      for secondary in inst.secondary_nodes:
+        if (secondary in self.my_node_info
+            and inst.name not in self.my_inst_info):
+          inst.MapLVsByNode(node_vol_should)
+          break
+
+    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
 
     if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
       feedback_fn("* Verifying N+1 Memory redundancy")
-      self._VerifyNPlusOneMemory(node_image, instanceinfo)
+      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
 
     feedback_fn("* Other Notes")
     if i_non_redundant:
@@ -2542,9 +2840,12 @@ class LUClusterVerify(LogicalUnit):
         and hook results
 
     """
-    # We only really run POST phase hooks, and are only interested in
-    # their results
-    if phase == constants.HOOKS_PHASE_POST:
+    # We only really run POST phase hooks, only for non-empty groups,
+    # and are only interested in their results
+    if not self.my_node_names:
+      # empty node group
+      pass
+    elif phase == constants.HOOKS_PHASE_POST:
       # Used to change hooks' output to proper indentation
       feedback_fn("* Hooks Results")
       assert hooks_results, "invalid result from hooks"
@@ -2566,11 +2867,11 @@ class LUClusterVerify(LogicalUnit):
           self._ErrorIf(test, self.ENODEHOOKS, node_name,
                         "Script %s failed, output:", script)
           if test:
-            output = self._HOOKS_INDENT_RE.sub('      ', output)
+            output = self._HOOKS_INDENT_RE.sub("      ", output)
             feedback_fn("%s" % output)
             lu_result = 0
 
-      return lu_result
+    return lu_result
 
 
 class LUClusterVerifyDisks(NoHooksLU):
@@ -2580,11 +2881,109 @@ class LUClusterVerifyDisks(NoHooksLU):
   REQ_BGL = False
 
   def ExpandNames(self):
+    self.share_locks = _ShareAll()
     self.needed_locks = {
-      locking.LEVEL_NODE: locking.ALL_SET,
-      locking.LEVEL_INSTANCE: locking.ALL_SET,
-    }
-    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
+      locking.LEVEL_NODEGROUP: locking.ALL_SET,
+      }
+
+  def Exec(self, feedback_fn):
+    group_names = self.glm.list_owned(locking.LEVEL_NODEGROUP)
+
+    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
+    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
+                           for group in group_names])
+
+
+class LUGroupVerifyDisks(NoHooksLU):
+  """Verifies the status of all disks in a node group.
+
+  """
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    # Raises errors.OpPrereqError on its own if group can't be found
+    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
+
+    self.share_locks = _ShareAll()
+    self.needed_locks = {
+      locking.LEVEL_INSTANCE: [],
+      locking.LEVEL_NODEGROUP: [],
+      locking.LEVEL_NODE: [],
+      }
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_INSTANCE:
+      assert not self.needed_locks[locking.LEVEL_INSTANCE]
+
+      # Lock instances optimistically, needs verification once node and group
+      # locks have been acquired
+      self.needed_locks[locking.LEVEL_INSTANCE] = \
+        self.cfg.GetNodeGroupInstances(self.group_uuid)
+
+    elif level == locking.LEVEL_NODEGROUP:
+      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
+
+      self.needed_locks[locking.LEVEL_NODEGROUP] = \
+        set([self.group_uuid] +
+            # Lock all groups used by instances optimistically; this requires
+            # going via the node before it's locked, requiring verification
+            # later on
+            [group_uuid
+             for instance_name in
+               self.glm.list_owned(locking.LEVEL_INSTANCE)
+             for group_uuid in
+               self.cfg.GetInstanceNodeGroups(instance_name)])
+
+    elif level == locking.LEVEL_NODE:
+      # This will only lock the nodes in the group to be verified which contain
+      # actual instances
+      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+      self._LockInstancesNodes()
+
+      # Lock all nodes in group to be verified
+      assert self.group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP)
+      member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members
+      self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
+
+  def CheckPrereq(self):
+    owned_instances = frozenset(self.glm.list_owned(locking.LEVEL_INSTANCE))
+    owned_groups = frozenset(self.glm.list_owned(locking.LEVEL_NODEGROUP))
+    owned_nodes = frozenset(self.glm.list_owned(locking.LEVEL_NODE))
+
+    assert self.group_uuid in owned_groups
+
+    # Check if locked instances are still correct
+    wanted_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
+    if owned_instances != wanted_instances:
+      raise errors.OpPrereqError("Instances in node group %s changed since"
+                                 " locks were acquired, wanted %s, have %s;"
+                                 " retry the operation" %
+                                 (self.op.group_name,
+                                  utils.CommaJoin(wanted_instances),
+                                  utils.CommaJoin(owned_instances)),
+                                 errors.ECODE_STATE)
+
+    # Get instance information
+    self.instances = dict((name, self.cfg.GetInstanceInfo(name))
+                          for name in owned_instances)
+
+    # Check if node groups for locked instances are still correct
+    for (instance_name, inst) in self.instances.items():
+      assert self.group_uuid in self.cfg.GetInstanceNodeGroups(instance_name), \
+        "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
+      assert owned_nodes.issuperset(inst.all_nodes), \
+        "Instance %s's nodes changed while we kept the lock" % instance_name
+
+      inst_groups = self.cfg.GetInstanceNodeGroups(instance_name)
+      if not owned_groups.issuperset(inst_groups):
+        raise errors.OpPrereqError("Instance %s's node groups changed since"
+                                   " locks were acquired, current groups are"
+                                   " are '%s', owning groups '%s'; retry the"
+                                   " operation" %
+                                   (instance_name,
+                                    utils.CommaJoin(inst_groups),
+                                    utils.CommaJoin(owned_groups)),
+                                   errors.ECODE_STATE)
 
   def Exec(self, feedback_fn):
     """Verify integrity of cluster disks.
@@ -2595,50 +2994,41 @@ class LUClusterVerifyDisks(NoHooksLU):
         missing volumes
 
     """
-    result = res_nodes, res_instances, res_missing = {}, [], {}
+    res_nodes = {}
+    res_instances = set()
+    res_missing = {}
 
-    nodes = utils.NiceSort(self.cfg.GetVmCapableNodeList())
-    instances = self.cfg.GetAllInstancesInfo().values()
+    nv_dict = _MapInstanceDisksToNodes([inst
+                                        for inst in self.instances.values()
+                                        if inst.admin_up])
 
-    nv_dict = {}
-    for inst in instances:
-      inst_lvs = {}
-      if not inst.admin_up:
-        continue
-      inst.MapLVsByNode(inst_lvs)
-      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
-      for node, vol_list in inst_lvs.iteritems():
-        for vol in vol_list:
-          nv_dict[(node, vol)] = inst
-
-    if not nv_dict:
-      return result
-
-    node_lvs = self.rpc.call_lv_list(nodes, [])
-    for node, node_res in node_lvs.items():
-      if node_res.offline:
-        continue
-      msg = node_res.fail_msg
-      if msg:
-        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
-        res_nodes[node] = msg
-        continue
+    if nv_dict:
+      nodes = utils.NiceSort(set(self.glm.list_owned(locking.LEVEL_NODE)) &
+                             set(self.cfg.GetVmCapableNodeList()))
 
-      lvs = node_res.payload
-      for lv_name, (_, _, lv_online) in lvs.items():
-        inst = nv_dict.pop((node, lv_name), None)
-        if (not lv_online and inst is not None
-            and inst.name not in res_instances):
-          res_instances.append(inst.name)
+      node_lvs = self.rpc.call_lv_list(nodes, [])
 
-    # any leftover items in nv_dict are missing LVs, let's arrange the
-    # data better
-    for key, inst in nv_dict.iteritems():
-      if inst.name not in res_missing:
-        res_missing[inst.name] = []
-      res_missing[inst.name].append(key)
+      for (node, node_res) in node_lvs.items():
+        if node_res.offline:
+          continue
 
-    return result
+        msg = node_res.fail_msg
+        if msg:
+          logging.warning("Error enumerating LVs on node %s: %s", node, msg)
+          res_nodes[node] = msg
+          continue
+
+        for lv_name, (_, _, lv_online) in node_res.payload.items():
+          inst = nv_dict.pop((node, lv_name), None)
+          if not (lv_online or inst is None):
+            res_instances.add(inst)
+
+      # any leftover items in nv_dict are missing LVs, let's arrange the data
+      # better
+      for key, inst in nv_dict.iteritems():
+        res_missing.setdefault(inst, []).append(key)
+
+    return (res_nodes, list(res_instances), res_missing)
 
 
 class LUClusterRepairDiskSizes(NoHooksLU):
@@ -2649,10 +3039,7 @@ class LUClusterRepairDiskSizes(NoHooksLU):
 
   def ExpandNames(self):
     if self.op.instances:
-      self.wanted_names = []
-      for name in self.op.instances:
-        full_name = _ExpandInstanceName(self.cfg, name)
-        self.wanted_names.append(full_name)
+      self.wanted_names = _GetWantedInstances(self, self.op.instances)
       self.needed_locks = {
         locking.LEVEL_NODE: [],
         locking.LEVEL_INSTANCE: self.wanted_names,
@@ -2664,7 +3051,7 @@ class LUClusterRepairDiskSizes(NoHooksLU):
         locking.LEVEL_NODE: locking.ALL_SET,
         locking.LEVEL_INSTANCE: locking.ALL_SET,
         }
-    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
+    self.share_locks = _ShareAll()
 
   def DeclareLocks(self, level):
     if level == locking.LEVEL_NODE and self.wanted_names is not None:
@@ -2977,8 +3364,8 @@ class LUClusterSetParams(LogicalUnit):
           # if we're moving instances to routed, check that they have an ip
           target_mode = params_filled[constants.NIC_MODE]
           if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
-            nic_errors.append("Instance %s, nic/%d: routed nick with no ip" %
-                              (instance.name, nic_idx))
+            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
+                              " address" % (instance.name, nic_idx))
       if nic_errors:
         raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
                                    "\n".join(nic_errors))
@@ -3429,6 +3816,20 @@ class LUOobCommand(NoHooksLU):
   REG_BGL = False
   _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
 
+  def ExpandNames(self):
+    """Gather locks we need.
+
+    """
+    if self.op.node_names:
+      self.op.node_names = _GetWantedNodes(self, self.op.node_names)
+      lock_names = self.op.node_names
+    else:
+      lock_names = locking.ALL_SET
+
+    self.needed_locks = {
+      locking.LEVEL_NODE: lock_names,
+      }
+
   def CheckPrereq(self):
     """Check prerequisites.
 
@@ -3485,21 +3886,6 @@ class LUOobCommand(NoHooksLU):
                                     " not marked offline") % node_name,
                                    errors.ECODE_STATE)
 
-  def ExpandNames(self):
-    """Gather locks we need.
-
-    """
-    if self.op.node_names:
-      self.op.node_names = [_ExpandNodeName(self.cfg, name)
-                            for name in self.op.node_names]
-      lock_names = self.op.node_names
-    else:
-      lock_names = locking.ALL_SET
-
-    self.needed_locks = {
-      locking.LEVEL_NODE: lock_names,
-      }
-
   def Exec(self, feedback_fn):
     """Execute OOB and return result if we expect any.
 
@@ -3507,7 +3893,8 @@ class LUOobCommand(NoHooksLU):
     master_node = self.master_node
     ret = []
 
-    for idx, node in enumerate(self.nodes):
+    for idx, node in enumerate(utils.NiceSort(self.nodes,
+                                              key=lambda node: node.name)):
       node_entry = [(constants.RS_NORMAL, node.name)]
       ret.append(node_entry)
 
@@ -3671,7 +4058,8 @@ class _OsQuery(_QueryBase):
     """
     # Locking is not used
     assert not (compat.any(lu.glm.is_owned(level)
-                           for level in locking.LEVELS) or
+                           for level in locking.LEVELS
+                           if level != locking.LEVEL_CLUSTER) or
                 self.do_locking or self.use_locking)
 
     valid_nodes = [node.name
@@ -3981,10 +4369,8 @@ class LUNodeQueryvols(NoHooksLU):
     nodenames = self.glm.list_owned(locking.LEVEL_NODE)
     volumes = self.rpc.call_node_volumes(nodenames)
 
-    ilist = [self.cfg.GetInstanceInfo(iname) for iname
-             in self.cfg.GetInstanceList()]
-
-    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
+    ilist = self.cfg.GetAllInstancesInfo()
+    vol2inst = _MapInstanceDisksToNodes(ilist.values())
 
     output = []
     for node in nodenames:
@@ -3996,8 +4382,8 @@ class LUNodeQueryvols(NoHooksLU):
         self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
         continue
 
-      node_vols = nresult.payload[:]
-      node_vols.sort(key=lambda vol: vol['dev'])
+      node_vols = sorted(nresult.payload,
+                         key=operator.itemgetter("dev"))
 
       for vol in node_vols:
         node_output = []
@@ -4005,22 +4391,15 @@ class LUNodeQueryvols(NoHooksLU):
           if field == "node":
             val = node
           elif field == "phys":
-            val = vol['dev']
+            val = vol["dev"]
           elif field == "vg":
-            val = vol['vg']
+            val = vol["vg"]
           elif field == "name":
-            val = vol['name']
+            val = vol["name"]
           elif field == "size":
-            val = int(float(vol['size']))
+            val = int(float(vol["size"]))
           elif field == "instance":
-            for inst in ilist:
-              if node not in lv_by_node[inst]:
-                continue
-              if vol['name'] in lv_by_node[inst][node]:
-                val = inst.name
-                break
-            else:
-              val = '-'
+            val = vol2inst.get((node, vol["vg"] + "/" + vol["name"]), "-")
           else:
             raise errors.ParameterError(field)
           node_output.append(str(val))
@@ -5259,7 +5638,7 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
   nodeinfo = lu.rpc.call_node_info([node], None, hypervisor_name)
   nodeinfo[node].Raise("Can't get data from node %s" % node,
                        prereq=True, ecode=errors.ECODE_ENVIRON)
-  free_mem = nodeinfo[node].payload.get('memory_free', None)
+  free_mem = nodeinfo[node].payload.get("memory_free", None)
   if not isinstance(free_mem, int):
     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
                                " was '%s'" % (node, free_mem),
@@ -5422,7 +5801,8 @@ class LUInstanceStartup(LogicalUnit):
     instance = self.instance
     force = self.op.force
 
-    self.cfg.MarkInstanceUp(instance.name)
+    if not self.op.no_remember:
+      self.cfg.MarkInstanceUp(instance.name)
 
     if self.primary_offline:
       assert self.op.ignore_offline_nodes
@@ -5433,7 +5813,8 @@ class LUInstanceStartup(LogicalUnit):
       _StartInstanceDisks(self, instance, force)
 
       result = self.rpc.call_instance_start(node_current, instance,
-                                            self.op.hvparams, self.op.beparams)
+                                            self.op.hvparams, self.op.beparams,
+                                            self.op.startup_paused)
       msg = result.fail_msg
       if msg:
         _ShutdownInstanceDisks(self, instance)
@@ -5523,7 +5904,8 @@ class LUInstanceReboot(LogicalUnit):
         self.LogInfo("Instance %s was already stopped, starting now",
                      instance.name)
       _StartInstanceDisks(self, instance, ignore_secondaries)
-      result = self.rpc.call_instance_start(node_current, instance, None, None)
+      result = self.rpc.call_instance_start(node_current, instance,
+                                            None, None, False)
       msg = result.fail_msg
       if msg:
         _ShutdownInstanceDisks(self, instance)
@@ -5587,7 +5969,8 @@ class LUInstanceShutdown(LogicalUnit):
     node_current = instance.primary_node
     timeout = self.op.timeout
 
-    self.cfg.MarkInstanceDown(instance.name)
+    if not self.op.no_remember:
+      self.cfg.MarkInstanceDown(instance.name)
 
     if self.primary_offline:
       assert self.op.ignore_offline_nodes
@@ -5700,8 +6083,25 @@ class LUInstanceRecreateDisks(LogicalUnit):
   HTYPE = constants.HTYPE_INSTANCE
   REQ_BGL = False
 
+  def CheckArguments(self):
+    # normalise the disk list
+    self.op.disks = sorted(frozenset(self.op.disks))
+
   def ExpandNames(self):
     self._ExpandAndLockInstance()
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+    if self.op.nodes:
+      self.op.nodes = [_ExpandNodeName(self.cfg, n) for n in self.op.nodes]
+      self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
+    else:
+      self.needed_locks[locking.LEVEL_NODE] = []
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      # if we replace the nodes, we only need to lock the old primary,
+      # otherwise we need to lock all nodes for disk re-creation
+      primary_only = bool(self.op.nodes)
+      self._LockInstancesNodes(primary_only=primary_only)
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -5727,12 +6127,31 @@ class LUInstanceRecreateDisks(LogicalUnit):
     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
-    _CheckNodeOnline(self, instance.primary_node)
+    if self.op.nodes:
+      if len(self.op.nodes) != len(instance.all_nodes):
+        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
+                                   " %d replacement nodes were specified" %
+                                   (instance.name, len(instance.all_nodes),
+                                    len(self.op.nodes)),
+                                   errors.ECODE_INVAL)
+      assert instance.disk_template != constants.DT_DRBD8 or \
+          len(self.op.nodes) == 2
+      assert instance.disk_template != constants.DT_PLAIN or \
+          len(self.op.nodes) == 1
+      primary_node = self.op.nodes[0]
+    else:
+      primary_node = instance.primary_node
+    _CheckNodeOnline(self, primary_node)
 
     if instance.disk_template == constants.DT_DISKLESS:
       raise errors.OpPrereqError("Instance '%s' has no disks" %
                                  self.op.instance_name, errors.ECODE_INVAL)
-    _CheckInstanceDown(self, instance, "cannot recreate disks")
+    # if we replace nodes *and* the old primary is offline, we don't
+    # check
+    assert instance.primary_node in self.needed_locks[locking.LEVEL_NODE]
+    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
+    if not (self.op.nodes and old_pnode.offline):
+      _CheckInstanceDown(self, instance, "cannot recreate disks")
 
     if not self.op.disks:
       self.op.disks = range(len(instance.disks))
@@ -5741,20 +6160,54 @@ class LUInstanceRecreateDisks(LogicalUnit):
         if idx >= len(instance.disks):
           raise errors.OpPrereqError("Invalid disk index '%s'" % idx,
                                      errors.ECODE_INVAL)
-
+    if self.op.disks != range(len(instance.disks)) and self.op.nodes:
+      raise errors.OpPrereqError("Can't recreate disks partially and"
+                                 " change the nodes at the same time",
+                                 errors.ECODE_INVAL)
     self.instance = instance
 
   def Exec(self, feedback_fn):
     """Recreate the disks.
 
     """
+    instance = self.instance
+
     to_skip = []
-    for idx, _ in enumerate(self.instance.disks):
+    mods = [] # keeps track of needed logical_id changes
+
+    for idx, disk in enumerate(instance.disks):
       if idx not in self.op.disks: # disk idx has not been passed in
         to_skip.append(idx)
         continue
+      # update secondaries for disks, if needed
+      if self.op.nodes:
+        if disk.dev_type == constants.LD_DRBD8:
+          # need to update the nodes and minors
+          assert len(self.op.nodes) == 2
+          assert len(disk.logical_id) == 6 # otherwise disk internals
+                                           # have changed
+          (_, _, old_port, _, _, old_secret) = disk.logical_id
+          new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
+          new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
+                    new_minors[0], new_minors[1], old_secret)
+          assert len(disk.logical_id) == len(new_id)
+          mods.append((idx, new_id))
+
+    # now that we have passed all asserts above, we can apply the mods
+    # in a single run (to avoid partial changes)
+    for idx, new_id in mods:
+      instance.disks[idx].logical_id = new_id
+
+    # change primary node, if needed
+    if self.op.nodes:
+      instance.primary_node = self.op.nodes[0]
+      self.LogWarning("Changing the instance's nodes, you will have to"
+                      " remove any disks left on the older nodes manually")
+
+    if self.op.nodes:
+      self.cfg.Update(instance, feedback_fn)
 
-    _CreateDisks(self, self.instance, to_skip=to_skip)
+    _CreateDisks(self, instance, to_skip=to_skip)
 
 
 class LUInstanceRename(LogicalUnit):
@@ -5807,8 +6260,9 @@ class LUInstanceRename(LogicalUnit):
     new_name = self.op.new_name
     if self.op.name_check:
       hostname = netutils.GetHostname(name=new_name)
-      self.LogInfo("Resolved given name '%s' to '%s'", new_name,
-                   hostname.name)
+      if hostname != new_name:
+        self.LogInfo("Resolved given name '%s' to '%s'", new_name,
+                     hostname.name)
       if not utils.MatchNameComponent(self.op.new_name, [hostname.name]):
         raise errors.OpPrereqError(("Resolved hostname '%s' does not look the"
                                     " same as given hostname '%s'") %
@@ -5834,7 +6288,7 @@ class LUInstanceRename(LogicalUnit):
     old_name = inst.name
 
     rename_file_storage = False
-    if (inst.disk_template in (constants.DT_FILE, constants.DT_SHARED_FILE) and
+    if (inst.disk_template in constants.DTS_FILEBASED and
         self.op.new_name != inst.name):
       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
       rename_file_storage = True
@@ -6315,7 +6769,8 @@ class LUInstanceMove(LogicalUnit):
         _ShutdownInstanceDisks(self, instance)
         raise errors.OpExecError("Can't activate the instance's disks")
 
-      result = self.rpc.call_instance_start(target_node, instance, None, None)
+      result = self.rpc.call_instance_start(target_node, instance,
+                                            None, None, False)
       msg = result.fail_msg
       if msg:
         _ShutdownInstanceDisks(self, instance)
@@ -6332,45 +6787,15 @@ class LUNodeMigrate(LogicalUnit):
   REQ_BGL = False
 
   def CheckArguments(self):
-    _CheckIAllocatorOrNode(self, "iallocator", "remote_node")
-
-  def ExpandNames(self):
-    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
-
-    self.needed_locks = {}
-
-    # Create tasklets for migrating instances for all instances on this node
-    names = []
-    tasklets = []
-
-    self.lock_all_nodes = False
-
-    for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
-      logging.debug("Migrating instance %s", inst.name)
-      names.append(inst.name)
-
-      tasklets.append(TLMigrateInstance(self, inst.name, cleanup=False))
-
-      if inst.disk_template in constants.DTS_EXT_MIRROR:
-        # We need to lock all nodes, as the iallocator will choose the
-        # destination nodes afterwards
-        self.lock_all_nodes = True
-
-    self.tasklets = tasklets
-
-    # Declare node locks
-    if self.lock_all_nodes:
-      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
-    else:
-      self.needed_locks[locking.LEVEL_NODE] = [self.op.node_name]
-      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+    pass
 
-    # Declare instance locks
-    self.needed_locks[locking.LEVEL_INSTANCE] = names
+  def ExpandNames(self):
+    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
 
-  def DeclareLocks(self, level):
-    if level == locking.LEVEL_NODE and not self.lock_all_nodes:
-      self._LockInstancesNodes()
+    self.share_locks = _ShareAll()
+    self.needed_locks = {
+      locking.LEVEL_NODE: [self.op.node_name],
+      }
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -6389,6 +6814,30 @@ class LUNodeMigrate(LogicalUnit):
     nl = [self.cfg.GetMasterNode()]
     return (nl, nl)
 
+  def CheckPrereq(self):
+    pass
+
+  def Exec(self, feedback_fn):
+    # Prepare jobs for migration instances
+    jobs = [
+      [opcodes.OpInstanceMigrate(instance_name=inst.name,
+                                 mode=self.op.mode,
+                                 live=self.op.live,
+                                 iallocator=self.op.iallocator,
+                                 target_node=self.op.target_node)]
+      for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name)
+      ]
+
+    # TODO: Run iallocator in this opcode and pass correct placement options to
+    # OpInstanceMigrate. Since other jobs can modify the cluster between
+    # running the iallocator and the actual migration, a good consistency model
+    # will have to be found.
+
+    assert (frozenset(self.glm.list_owned(locking.LEVEL_NODE)) ==
+            frozenset([self.op.node_name]))
+
+    return ResultWithJobs(jobs)
+
 
 class TLMigrateInstance(Tasklet):
   """Tasklet class for instance migration.
@@ -6471,6 +6920,10 @@ class TLMigrateInstance(Tasklet):
       # self.target_node is already populated, either directly or by the
       # iallocator run
       target_node = self.target_node
+      if self.target_node == instance.primary_node:
+        raise errors.OpPrereqError("Cannot migrate instance %s"
+                                   " to its primary (%s)" %
+                                   (instance.name, instance.primary_node))
 
       if len(self.lu.tasklets) == 1:
         # It is safe to release locks only when we're the only tasklet
@@ -6858,8 +7311,12 @@ class TLMigrateInstance(Tasklet):
       self.feedback_fn("* checking disk consistency between source and target")
       for dev in instance.disks:
         # for drbd, these are drbd over lvm
-        if not _CheckDiskConsistency(self, dev, target_node, False):
-          if not self.ignore_consistency:
+        if not _CheckDiskConsistency(self.lu, dev, target_node, False):
+          if primary_node.offline:
+            self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
+                             " target node %s" %
+                             (primary_node.name, dev.iv_name, target_node))
+          elif not self.ignore_consistency:
             raise errors.OpExecError("Disk %s is degraded on target node,"
                                      " aborting failover" % dev.iv_name)
     else:
@@ -6885,8 +7342,8 @@ class TLMigrateInstance(Tasklet):
                                  (instance.name, source_node, msg))
 
     self.feedback_fn("* deactivating the instance's disks on source node")
-    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
-      raise errors.OpExecError("Can't shut down the instance's disks.")
+    if not _ShutdownInstanceDisks(self.lu, instance, ignore_primary=True):
+      raise errors.OpExecError("Can't shut down the instance's disks")
 
     instance.primary_node = target_node
     # distribute new instance config to the other nodes
@@ -6894,21 +7351,24 @@ class TLMigrateInstance(Tasklet):
 
     # Only start the instance if it's marked as up
     if instance.admin_up:
-      self.feedback_fn("* activating the instance's disks on target node")
+      self.feedback_fn("* activating the instance's disks on target node %s" %
+                       target_node)
       logging.info("Starting instance %s on node %s",
                    instance.name, target_node)
 
-      disks_ok, _ = _AssembleInstanceDisks(self, instance,
+      disks_ok, _ = _AssembleInstanceDisks(self.lu, instance,
                                            ignore_secondaries=True)
       if not disks_ok:
-        _ShutdownInstanceDisks(self, instance)
+        _ShutdownInstanceDisks(self.lu, instance)
         raise errors.OpExecError("Can't activate the instance's disks")
 
-      self.feedback_fn("* starting the instance on the target node")
-      result = self.rpc.call_instance_start(target_node, instance, None, None)
+      self.feedback_fn("* starting the instance on the target node %s" %
+                       target_node)
+      result = self.rpc.call_instance_start(target_node, instance, None, None,
+                                            False)
       msg = result.fail_msg
       if msg:
-        _ShutdownInstanceDisks(self, instance)
+        _ShutdownInstanceDisks(self.lu, instance)
         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
                                  (instance.name, target_node, msg))
 
@@ -7269,7 +7729,7 @@ def _CreateDisks(lu, instance, to_skip=None, target_node=None):
     pnode = target_node
     all_nodes = [pnode]
 
-  if instance.disk_template in (constants.DT_FILE, constants.DT_SHARED_FILE):
+  if instance.disk_template in constants.DTS_FILEBASED:
     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
 
@@ -7542,9 +8002,10 @@ class LUInstanceCreate(LogicalUnit):
       raise errors.OpPrereqError("Invalid file driver name '%s'" %
                                  self.op.file_driver, errors.ECODE_INVAL)
 
-    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
-      raise errors.OpPrereqError("File storage directory path not absolute",
-                                 errors.ECODE_INVAL)
+    if self.op.disk_template == constants.DT_FILE:
+      opcodes.RequireFileStorage()
+    elif self.op.disk_template == constants.DT_SHARED_FILE:
+      opcodes.RequireSharedFileStorage()
 
     ### Node/iallocator related checks
     _CheckIAllocatorOrNode(self, "iallocator", "pnode")
@@ -7686,10 +8147,10 @@ class LUInstanceCreate(LogicalUnit):
                      mode=constants.IALLOCATOR_MODE_ALLOC,
                      name=self.op.instance_name,
                      disk_template=self.op.disk_template,
-                     tags=[],
+                     tags=self.op.tags,
                      os=self.op.os_type,
                      vcpus=self.be_full[constants.BE_VCPUS],
-                     mem_size=self.be_full[constants.BE_MEMORY],
+                     memory=self.be_full[constants.BE_MEMORY],
                      disks=self.disks,
                      nics=nics,
                      hypervisor=self.op.hypervisor,
@@ -7743,6 +8204,7 @@ class LUInstanceCreate(LogicalUnit):
       bep=self.be_full,
       hvp=self.hv_full,
       hypervisor_name=self.op.hypervisor,
+      tags=self.op.tags,
     ))
 
     return env
@@ -7844,9 +8306,13 @@ class LUInstanceCreate(LogicalUnit):
         nics.append(ndict)
       self.op.nics = nics
 
+    if not self.op.tags and einfo.has_option(constants.INISECT_INS, "tags"):
+      self.op.tags = einfo.get(constants.INISECT_INS, "tags").split()
+
     if (self.op.hypervisor is None and
         einfo.has_option(constants.INISECT_INS, "hypervisor")):
       self.op.hypervisor = einfo.get(constants.INISECT_INS, "hypervisor")
+
     if einfo.has_section(constants.INISECT_HYP):
       # use the export parameters but do not override the ones
       # specified by the user
@@ -7898,10 +8364,40 @@ class LUInstanceCreate(LogicalUnit):
       if name in os_defs and os_defs[name] == self.op.osparams[name]:
         del self.op.osparams[name]
 
+  def _CalculateFileStorageDir(self):
+    """Calculate final instance file storage dir.
+
+    """
+    # file storage dir calculation/check
+    self.instance_file_storage_dir = None
+    if self.op.disk_template in constants.DTS_FILEBASED:
+      # build the full file storage dir path
+      joinargs = []
+
+      if self.op.disk_template == constants.DT_SHARED_FILE:
+        get_fsd_fn = self.cfg.GetSharedFileStorageDir
+      else:
+        get_fsd_fn = self.cfg.GetFileStorageDir
+
+      cfg_storagedir = get_fsd_fn()
+      if not cfg_storagedir:
+        raise errors.OpPrereqError("Cluster file storage dir not defined")
+      joinargs.append(cfg_storagedir)
+
+      if self.op.file_storage_dir is not None:
+        joinargs.append(self.op.file_storage_dir)
+
+      joinargs.append(self.op.instance_name)
+
+      # pylint: disable-msg=W0142
+      self.instance_file_storage_dir = utils.PathJoin(*joinargs)
+
   def CheckPrereq(self):
     """Check prerequisites.
 
     """
+    self._CalculateFileStorageDir()
+
     if self.op.mode == constants.INSTANCE_IMPORT:
       export_info = self._ReadExportInfo()
       self._ReadExportParams(export_info)
@@ -7922,6 +8418,10 @@ class LUInstanceCreate(LogicalUnit):
                                   ",".join(enabled_hvs)),
                                  errors.ECODE_STATE)
 
+    # Check tag validity
+    for tag in self.op.tags:
+      objects.TaggableObject.ValidateTag(tag)
+
     # check hypervisor parameter syntax (locally)
     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
     filled_hvp = cluster.SimpleFillHV(self.op.hypervisor, self.op.os_type,
@@ -8044,7 +8544,7 @@ class LUInstanceCreate(LogicalUnit):
 
       disk_images = []
       for idx in range(export_disks):
-        option = 'disk%d_dump' % idx
+        option = "disk%d_dump" % idx
         if export_info.has_option(constants.INISECT_INS, option):
           # FIXME: are the old os-es, disk sizes, etc. useful?
           export_name = export_info.get(constants.INISECT_INS, option)
@@ -8055,9 +8555,9 @@ class LUInstanceCreate(LogicalUnit):
 
       self.src_images = disk_images
 
-      old_name = export_info.get(constants.INISECT_INS, 'name')
+      old_name = export_info.get(constants.INISECT_INS, "name")
       try:
-        exp_nic_count = export_info.getint(constants.INISECT_INS, 'nic_count')
+        exp_nic_count = export_info.getint(constants.INISECT_INS, "nic_count")
       except (TypeError, ValueError), err:
         raise errors.OpPrereqError("Invalid export file, nic_count is not"
                                    " an integer: %s" % str(err),
@@ -8065,7 +8565,7 @@ class LUInstanceCreate(LogicalUnit):
       if self.op.instance_name == old_name:
         for idx, nic in enumerate(self.nics):
           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
-            nic_mac_ini = 'nic%d_mac' % idx
+            nic_mac_ini = "nic%d_mac" % idx
             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
 
     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
@@ -8229,30 +8729,12 @@ class LUInstanceCreate(LogicalUnit):
     else:
       network_port = None
 
-    if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
-      # this is needed because os.path.join does not accept None arguments
-      if self.op.file_storage_dir is None:
-        string_file_storage_dir = ""
-      else:
-        string_file_storage_dir = self.op.file_storage_dir
-
-      # build the full file storage dir path
-      if self.op.disk_template == constants.DT_SHARED_FILE:
-        get_fsd_fn = self.cfg.GetSharedFileStorageDir
-      else:
-        get_fsd_fn = self.cfg.GetFileStorageDir
-
-      file_storage_dir = utils.PathJoin(get_fsd_fn(),
-                                        string_file_storage_dir, instance)
-    else:
-      file_storage_dir = ""
-
     disks = _GenerateDiskTemplate(self,
                                   self.op.disk_template,
                                   instance, pnode_name,
                                   self.secondaries,
                                   self.disks,
-                                  file_storage_dir,
+                                  self.instance_file_storage_dir,
                                   self.op.file_driver,
                                   0,
                                   feedback_fn)
@@ -8269,6 +8751,10 @@ class LUInstanceCreate(LogicalUnit):
                             osparams=self.op.osparams,
                             )
 
+    if self.op.tags:
+      for tag in self.op.tags:
+        iobj.AddTag(tag)
+
     if self.adopt_disks:
       if self.op.disk_template == constants.DT_PLAIN:
         # rename LVs to the newly-generated names; we need to construct
@@ -8416,7 +8902,8 @@ class LUInstanceCreate(LogicalUnit):
       self.cfg.Update(iobj, feedback_fn)
       logging.info("Starting instance %s on node %s", instance, pnode_name)
       feedback_fn("* starting instance...")
-      result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
+      result = self.rpc.call_instance_start(pnode_name, iobj,
+                                            None, None, False)
       result.Raise("Could not start instance")
 
     return list(iobj.all_nodes)
@@ -8982,6 +9469,12 @@ class TLReplaceDisks(Tasklet):
                                  (node_name, self.instance.name))
 
   def _CreateNewStorage(self, node_name):
+    """Create new storage on the primary or secondary node.
+
+    This is only used for same-node replaces, not for changing the
+    secondary node, hence we don't want to modify the existing disk.
+
+    """
     iv_names = {}
 
     for idx, dev in enumerate(self.instance.disks):
@@ -9003,7 +9496,7 @@ class TLReplaceDisks(Tasklet):
                              logical_id=(vg_meta, names[1]))
 
       new_lvs = [lv_data, lv_meta]
-      old_lvs = dev.children
+      old_lvs = [child.Copy() for child in dev.children]
       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
 
       # we pass force_create=True to force the LVM creation
@@ -9041,7 +9534,7 @@ class TLReplaceDisks(Tasklet):
           self.lu.LogWarning("Can't remove old LV: %s" % msg,
                              hint="remove unused LVs manually")
 
-  def _ExecDrbd8DiskOnly(self, feedback_fn):
+  def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable-msg=W0613
     """Replace a disk on the primary or secondary for DRBD 8.
 
     The algorithm for replace is quite complicated:
@@ -9124,10 +9617,14 @@ class TLReplaceDisks(Tasklet):
                                              rename_new_to_old)
       result.Raise("Can't rename new LVs on node %s" % self.target_node)
 
+      # Intermediate steps of in memory modifications
       for old, new in zip(old_lvs, new_lvs):
         new.logical_id = old.logical_id
         self.cfg.SetDiskID(new, self.target_node)
 
+      # We need to modify old_lvs so that removal later removes the
+      # right LVs, not the newly added ones; note that old_lvs is a
+      # copy here
       for disk in old_lvs:
         disk.logical_id = ren_fn(disk, temp_suffix)
         self.cfg.SetDiskID(disk, self.target_node)
@@ -9147,10 +9644,6 @@ class TLReplaceDisks(Tasklet):
                                      "volumes"))
         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
 
-      dev.children = new_lvs
-
-      self.cfg.Update(self.instance, feedback_fn)
-
     cstep = 5
     if self.early_release:
       self.lu.LogStep(cstep, steps_total, "Removing old storage")
@@ -9399,8 +9892,8 @@ class LURepairNodeStorage(NoHooksLU):
                  (self.op.name, self.op.node_name))
 
 
-class LUNodeEvacStrategy(NoHooksLU):
-  """Computes the node evacuation strategy.
+class LUNodeEvacuate(NoHooksLU):
+  """Evacuates instances off a list of nodes.
 
   """
   REQ_BGL = False
@@ -9409,38 +9902,213 @@ class LUNodeEvacStrategy(NoHooksLU):
     _CheckIAllocatorOrNode(self, "iallocator", "remote_node")
 
   def ExpandNames(self):
-    self.op.nodes = _GetWantedNodes(self, self.op.nodes)
-    self.needed_locks = locks = {}
+    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
+
+    if self.op.remote_node is not None:
+      self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
+      assert self.op.remote_node
+
+      if self.op.remote_node == self.op.node_name:
+        raise errors.OpPrereqError("Can not use evacuated node as a new"
+                                   " secondary node", errors.ECODE_INVAL)
+
+      if self.op.mode != constants.IALLOCATOR_NEVAC_SEC:
+        raise errors.OpPrereqError("Without the use of an iallocator only"
+                                   " secondary instances can be evacuated",
+                                   errors.ECODE_INVAL)
+
+    # Declare locks
+    self.share_locks = _ShareAll()
+    self.needed_locks = {
+      locking.LEVEL_INSTANCE: [],
+      locking.LEVEL_NODEGROUP: [],
+      locking.LEVEL_NODE: [],
+      }
+
     if self.op.remote_node is None:
-      locks[locking.LEVEL_NODE] = locking.ALL_SET
+      # Iallocator will choose any node(s) in the same group
+      group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_name])
     else:
-      self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
-      locks[locking.LEVEL_NODE] = self.op.nodes + [self.op.remote_node]
+      group_nodes = frozenset([self.op.remote_node])
+
+    # Determine nodes to be locked
+    self.lock_nodes = set([self.op.node_name]) | group_nodes
+
+  def _DetermineInstances(self):
+    """Builds list of instances to operate on.
+
+    """
+    assert self.op.mode in constants.IALLOCATOR_NEVAC_MODES
+
+    if self.op.mode == constants.IALLOCATOR_NEVAC_PRI:
+      # Primary instances only
+      inst_fn = _GetNodePrimaryInstances
+      assert self.op.remote_node is None, \
+        "Evacuating primary instances requires iallocator"
+    elif self.op.mode == constants.IALLOCATOR_NEVAC_SEC:
+      # Secondary instances only
+      inst_fn = _GetNodeSecondaryInstances
+    else:
+      # All instances
+      assert self.op.mode == constants.IALLOCATOR_NEVAC_ALL
+      inst_fn = _GetNodeInstances
+
+    return inst_fn(self.cfg, self.op.node_name)
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_INSTANCE:
+      # Lock instances optimistically, needs verification once node and group
+      # locks have been acquired
+      self.needed_locks[locking.LEVEL_INSTANCE] = \
+        set(i.name for i in self._DetermineInstances())
+
+    elif level == locking.LEVEL_NODEGROUP:
+      # Lock node groups optimistically, needs verification once nodes have
+      # been acquired
+      self.needed_locks[locking.LEVEL_NODEGROUP] = \
+        self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
+
+    elif level == locking.LEVEL_NODE:
+      self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
+
+  def CheckPrereq(self):
+    # Verify locks
+    owned_instances = self.glm.list_owned(locking.LEVEL_INSTANCE)
+    owned_nodes = self.glm.list_owned(locking.LEVEL_NODE)
+    owned_groups = self.glm.list_owned(locking.LEVEL_NODEGROUP)
+
+    assert owned_nodes == self.lock_nodes
+
+    wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
+    if owned_groups != wanted_groups:
+      raise errors.OpExecError("Node groups changed since locks were acquired,"
+                               " current groups are '%s', used to be '%s'" %
+                               (utils.CommaJoin(wanted_groups),
+                                utils.CommaJoin(owned_groups)))
+
+    # Determine affected instances
+    self.instances = self._DetermineInstances()
+    self.instance_names = [i.name for i in self.instances]
+
+    if set(self.instance_names) != owned_instances:
+      raise errors.OpExecError("Instances on node '%s' changed since locks"
+                               " were acquired, current instances are '%s',"
+                               " used to be '%s'" %
+                               (self.op.node_name,
+                                utils.CommaJoin(self.instance_names),
+                                utils.CommaJoin(owned_instances)))
+
+    if self.instance_names:
+      self.LogInfo("Evacuating instances from node '%s': %s",
+                   self.op.node_name,
+                   utils.CommaJoin(utils.NiceSort(self.instance_names)))
+    else:
+      self.LogInfo("No instances to evacuate from node '%s'",
+                   self.op.node_name)
 
-  def Exec(self, feedback_fn):
     if self.op.remote_node is not None:
-      instances = []
-      for node in self.op.nodes:
-        instances.extend(_GetNodeSecondaryInstances(self.cfg, node))
-      result = []
-      for i in instances:
+      for i in self.instances:
         if i.primary_node == self.op.remote_node:
           raise errors.OpPrereqError("Node %s is the primary node of"
                                      " instance %s, cannot use it as"
                                      " secondary" %
                                      (self.op.remote_node, i.name),
                                      errors.ECODE_INVAL)
-        result.append([i.name, self.op.remote_node])
-    else:
-      ial = IAllocator(self.cfg, self.rpc,
-                       mode=constants.IALLOCATOR_MODE_MEVAC,
-                       evac_nodes=self.op.nodes)
-      ial.Run(self.op.iallocator, validate=True)
+
+  def Exec(self, feedback_fn):
+    assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
+
+    if not self.instance_names:
+      # No instances to evacuate
+      jobs = []
+
+    elif self.op.iallocator is not None:
+      # TODO: Implement relocation to other group
+      ial = IAllocator(self.cfg, self.rpc, constants.IALLOCATOR_MODE_NODE_EVAC,
+                       evac_mode=self.op.mode,
+                       instances=list(self.instance_names))
+
+      ial.Run(self.op.iallocator)
+
       if not ial.success:
-        raise errors.OpExecError("No valid evacuation solution: %s" % ial.info,
-                                 errors.ECODE_NORES)
-      result = ial.result
-    return result
+        raise errors.OpPrereqError("Can't compute node evacuation using"
+                                   " iallocator '%s': %s" %
+                                   (self.op.iallocator, ial.info),
+                                   errors.ECODE_NORES)
+
+      jobs = _LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
+
+    elif self.op.remote_node is not None:
+      assert self.op.mode == constants.IALLOCATOR_NEVAC_SEC
+      jobs = [
+        [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
+                                        remote_node=self.op.remote_node,
+                                        disks=[],
+                                        mode=constants.REPLACE_DISK_CHG,
+                                        early_release=self.op.early_release)]
+        for instance_name in self.instance_names
+        ]
+
+    else:
+      raise errors.ProgrammerError("No iallocator or remote node")
+
+    return ResultWithJobs(jobs)
+
+
+def _SetOpEarlyRelease(early_release, op):
+  """Sets C{early_release} flag on opcodes if available.
+
+  """
+  try:
+    op.early_release = early_release
+  except AttributeError:
+    assert not isinstance(op, opcodes.OpInstanceReplaceDisks)
+
+  return op
+
+
+def _NodeEvacDest(use_nodes, group, nodes):
+  """Returns group or nodes depending on caller's choice.
+
+  """
+  if use_nodes:
+    return utils.CommaJoin(nodes)
+  else:
+    return group
+
+
+def _LoadNodeEvacResult(lu, alloc_result, early_release, use_nodes):
+  """Unpacks the result of change-group and node-evacuate iallocator requests.
+
+  Iallocator modes L{constants.IALLOCATOR_MODE_NODE_EVAC} and
+  L{constants.IALLOCATOR_MODE_CHG_GROUP}.
+
+  @type lu: L{LogicalUnit}
+  @param lu: Logical unit instance
+  @type alloc_result: tuple/list
+  @param alloc_result: Result from iallocator
+  @type early_release: bool
+  @param early_release: Whether to release locks early if possible
+  @type use_nodes: bool
+  @param use_nodes: Whether to display node names instead of groups
+
+  """
+  (moved, failed, jobs) = alloc_result
+
+  if failed:
+    lu.LogWarning("Unable to evacuate instances %s",
+                  utils.CommaJoin("%s (%s)" % (name, reason)
+                                  for (name, reason) in failed))
+
+  if moved:
+    lu.LogInfo("Instances to be moved: %s",
+               utils.CommaJoin("%s (to %s)" %
+                               (name, _NodeEvacDest(use_nodes, group, nodes))
+                               for (name, group, nodes) in moved))
+
+  return [map(compat.partial(_SetOpEarlyRelease, early_release),
+              map(opcodes.OpCode.LoadOpCode, ops))
+          for ops in jobs]
 
 
 class LUInstanceGrowDisk(LogicalUnit):
@@ -9519,9 +10187,17 @@ class LUInstanceGrowDisk(LogicalUnit):
     if not disks_ok:
       raise errors.OpExecError("Cannot activate block device to grow")
 
+    # First run all grow ops in dry-run mode
+    for node in instance.all_nodes:
+      self.cfg.SetDiskID(disk, node)
+      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, True)
+      result.Raise("Grow request failed to node %s" % node)
+
+    # We know that (as far as we can test) operations across different
+    # nodes will succeed, time to run it for real
     for node in instance.all_nodes:
       self.cfg.SetDiskID(disk, node)
-      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
+      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, False)
       result.Raise("Grow request failed to node %s" % node)
 
       # TODO: Rewrite code to work properly
@@ -9568,7 +10244,7 @@ class LUInstanceQueryData(NoHooksLU):
       self.wanted_names = None
 
     if self.op.use_locking:
-      self.share_locks = dict.fromkeys(locking.LEVELS, 1)
+      self.share_locks = _ShareAll()
 
       if self.wanted_names is None:
         self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
@@ -9576,7 +10252,6 @@ class LUInstanceQueryData(NoHooksLU):
         self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
 
       self.needed_locks[locking.LEVEL_NODE] = []
-      self.share_locks = dict.fromkeys(locking.LEVELS, 1)
       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
 
   def DeclareLocks(self, level):
@@ -9635,8 +10310,9 @@ class LUInstanceQueryData(NoHooksLU):
     dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
 
     if dev.children:
-      dev_children = [self._ComputeDiskStatus(instance, snode, child)
-                      for child in dev.children]
+      dev_children = map(compat.partial(self._ComputeDiskStatus,
+                                        instance, snode),
+                         dev.children)
     else:
       dev_children = []
 
@@ -9659,7 +10335,15 @@ class LUInstanceQueryData(NoHooksLU):
     cluster = self.cfg.GetClusterInfo()
 
     for instance in self.wanted_instances:
-      if not self.op.static:
+      pnode = self.cfg.GetNodeInfo(instance.primary_node)
+
+      if self.op.static or pnode.offline:
+        remote_state = None
+        if pnode.offline:
+          self.LogWarning("Primary node %s is marked offline, returning static"
+                          " information only for instance %s" %
+                          (pnode.name, instance.name))
+      else:
         remote_info = self.rpc.call_instance_info(instance.primary_node,
                                                   instance.name,
                                                   instance.hypervisor)
@@ -9669,15 +10353,14 @@ class LUInstanceQueryData(NoHooksLU):
           remote_state = "up"
         else:
           remote_state = "down"
-      else:
-        remote_state = None
+
       if instance.admin_up:
         config_state = "up"
       else:
         config_state = "down"
 
-      disks = [self._ComputeDiskStatus(instance, None, device)
-               for device in instance.disks]
+      disks = map(compat.partial(self._ComputeDiskStatus, instance, None),
+                  instance.disks)
 
       result[instance.name] = {
         "name": instance.name,
@@ -9802,13 +10485,13 @@ class LUInstanceSetParams(LogicalUnit):
             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip,
                                        errors.ECODE_INVAL)
 
-      nic_bridge = nic_dict.get('bridge', None)
+      nic_bridge = nic_dict.get("bridge", None)
       nic_link = nic_dict.get(constants.INIC_LINK, None)
       if nic_bridge and nic_link:
         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
                                    " at the same time", errors.ECODE_INVAL)
       elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
-        nic_dict['bridge'] = None
+        nic_dict["bridge"] = None
       elif nic_link and nic_link.lower() == constants.VALUE_NONE:
         nic_dict[constants.INIC_LINK] = None
 
@@ -9851,13 +10534,13 @@ class LUInstanceSetParams(LogicalUnit):
     """
     args = dict()
     if constants.BE_MEMORY in self.be_new:
-      args['memory'] = self.be_new[constants.BE_MEMORY]
+      args["memory"] = self.be_new[constants.BE_MEMORY]
     if constants.BE_VCPUS in self.be_new:
-      args['vcpus'] = self.be_new[constants.BE_VCPUS]
+      args["vcpus"] = self.be_new[constants.BE_VCPUS]
     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
     # information at all.
     if self.op.nics:
-      args['nics'] = []
+      args["nics"] = []
       nic_override = dict(self.op.nics)
       for idx, nic in enumerate(self.instance.nics):
         if idx in nic_override:
@@ -9878,16 +10561,16 @@ class LUInstanceSetParams(LogicalUnit):
           nicparams = self.cluster.SimpleFillNIC(nic.nicparams)
         mode = nicparams[constants.NIC_MODE]
         link = nicparams[constants.NIC_LINK]
-        args['nics'].append((ip, mac, mode, link))
+        args["nics"].append((ip, mac, mode, link))
       if constants.DDM_ADD in nic_override:
         ip = nic_override[constants.DDM_ADD].get(constants.INIC_IP, None)
         mac = nic_override[constants.DDM_ADD][constants.INIC_MAC]
         nicparams = self.nic_pnew[constants.DDM_ADD]
         mode = nicparams[constants.NIC_MODE]
         link = nicparams[constants.NIC_LINK]
-        args['nics'].append((ip, mac, mode, link))
+        args["nics"].append((ip, mac, mode, link))
       elif constants.DDM_REMOVE in nic_override:
-        del args['nics'][-1]
+        del args["nics"][-1]
 
     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
     if self.op.disk_template:
@@ -9977,6 +10660,7 @@ class LUInstanceSetParams(LogicalUnit):
       self.be_inst = i_bedict # the new dict (without defaults)
     else:
       self.be_new = self.be_inst = {}
+    be_old = cluster.FillBE(instance)
 
     # osparams processing
     if self.op.osparams:
@@ -9988,7 +10672,8 @@ class LUInstanceSetParams(LogicalUnit):
 
     self.warn = []
 
-    if constants.BE_MEMORY in self.op.beparams and not self.op.force:
+    if (constants.BE_MEMORY in self.op.beparams and not self.op.force and
+        be_new[constants.BE_MEMORY] > be_old[constants.BE_MEMORY]):
       mem_check_list = [pnode]
       if be_new[constants.BE_AUTO_BALANCE]:
         # either we changed auto_balance to yes or it was from before
@@ -10003,7 +10688,7 @@ class LUInstanceSetParams(LogicalUnit):
         # Assume the primary node is unreachable and go ahead
         self.warn.append("Can't get info from primary node %s: %s" %
                          (pnode,  msg))
-      elif not isinstance(pninfo.payload.get('memory_free', None), int):
+      elif not isinstance(pninfo.payload.get("memory_free", None), int):
         self.warn.append("Node data from primary node %s doesn't contain"
                          " free memory information" % pnode)
       elif instance_info.fail_msg:
@@ -10011,14 +10696,14 @@ class LUInstanceSetParams(LogicalUnit):
                         instance_info.fail_msg)
       else:
         if instance_info.payload:
-          current_mem = int(instance_info.payload['memory'])
+          current_mem = int(instance_info.payload["memory"])
         else:
           # Assume instance not running
           # (there is a slight race condition here, but it's not very probable,
           # and we have no other way to check)
           current_mem = 0
         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
-                    pninfo.payload['memory_free'])
+                    pninfo.payload["memory_free"])
         if miss_mem > 0:
           raise errors.OpPrereqError("This change will prevent the instance"
                                      " from starting, due to %d MB of memory"
@@ -10029,16 +10714,17 @@ class LUInstanceSetParams(LogicalUnit):
         for node, nres in nodeinfo.items():
           if node not in instance.secondary_nodes:
             continue
-          msg = nres.fail_msg
-          if msg:
-            self.warn.append("Can't get info from secondary node %s: %s" %
-                             (node, msg))
-          elif not isinstance(nres.payload.get('memory_free', None), int):
-            self.warn.append("Secondary node %s didn't return free"
-                             " memory information" % node)
-          elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
-            self.warn.append("Not enough memory to failover instance to"
-                             " secondary node %s" % node)
+          nres.Raise("Can't get info from secondary node %s" % node,
+                     prereq=True, ecode=errors.ECODE_STATE)
+          if not isinstance(nres.payload.get("memory_free", None), int):
+            raise errors.OpPrereqError("Secondary node %s didn't return free"
+                                       " memory information" % node,
+                                       errors.ECODE_STATE)
+          elif be_new[constants.BE_MEMORY] > nres.payload["memory_free"]:
+            raise errors.OpPrereqError("This change will prevent the instance"
+                                       " from failover to its secondary node"
+                                       " %s, due to not enough memory" % node,
+                                       errors.ECODE_STATE)
 
     # NIC processing
     self.nic_pnew = {}
@@ -10070,8 +10756,8 @@ class LUInstanceSetParams(LogicalUnit):
                                  for key in constants.NICS_PARAMETERS
                                  if key in nic_dict])
 
-      if 'bridge' in nic_dict:
-        update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
+      if "bridge" in nic_dict:
+        update_params_dict[constants.NIC_LINK] = nic_dict["bridge"]
 
       new_nic_params = _GetUpdatedParams(old_nic_params,
                                          update_params_dict)
@@ -10097,12 +10783,12 @@ class LUInstanceSetParams(LogicalUnit):
         else:
           nic_ip = old_nic_ip
         if nic_ip is None:
-          raise errors.OpPrereqError('Cannot set the nic ip to None'
-                                     ' on a routed nic', errors.ECODE_INVAL)
+          raise errors.OpPrereqError("Cannot set the nic ip to None"
+                                     " on a routed nic", errors.ECODE_INVAL)
       if constants.INIC_MAC in nic_dict:
         nic_mac = nic_dict[constants.INIC_MAC]
         if nic_mac is None:
-          raise errors.OpPrereqError('Cannot set the nic mac to None',
+          raise errors.OpPrereqError("Cannot set the nic mac to None",
                                      errors.ECODE_INVAL)
         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
           # otherwise generate the mac
@@ -10190,7 +10876,8 @@ class LUInstanceSetParams(LogicalUnit):
     self.cfg.Update(instance, feedback_fn)
 
     # disks are created, waiting for sync
-    disk_abort = not _WaitForSync(self, instance)
+    disk_abort = not _WaitForSync(self, instance,
+                                  oneshot=not self.op.wait_for_sync)
     if disk_abort:
       raise errors.OpExecError("There are some degraded disks for"
                                " this instance, please cleanup manually")
@@ -10695,7 +11382,8 @@ class LUBackupExport(LogicalUnit):
             not self.op.remove_instance):
           assert not activate_disks
           feedback_fn("Starting instance %s" % instance.name)
-          result = self.rpc.call_instance_start(src_node, instance, None, None)
+          result = self.rpc.call_instance_start(src_node, instance,
+                                                None, None, False)
           msg = result.fail_msg
           if msg:
             feedback_fn("Failed to start instance: %s" % msg)
@@ -10880,20 +11568,40 @@ class LUGroupAssignNodes(NoHooksLU):
 
     # We want to lock all the affected nodes and groups. We have readily
     # available the list of nodes, and the *destination* group. To gather the
-    # list of "source" groups, we need to fetch node information.
-    self.node_data = self.cfg.GetAllNodesInfo()
-    affected_groups = set(self.node_data[node].group for node in self.op.nodes)
-    affected_groups.add(self.group_uuid)
-
+    # list of "source" groups, we need to fetch node information later on.
     self.needed_locks = {
-      locking.LEVEL_NODEGROUP: list(affected_groups),
+      locking.LEVEL_NODEGROUP: set([self.group_uuid]),
       locking.LEVEL_NODE: self.op.nodes,
       }
 
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODEGROUP:
+      assert len(self.needed_locks[locking.LEVEL_NODEGROUP]) == 1
+
+      # Try to get all affected nodes' groups without having the group or node
+      # lock yet. Needs verification later in the code flow.
+      groups = self.cfg.GetNodeGroupsFromNodes(self.op.nodes)
+
+      self.needed_locks[locking.LEVEL_NODEGROUP].update(groups)
+
   def CheckPrereq(self):
     """Check prerequisites.
 
     """
+    assert self.needed_locks[locking.LEVEL_NODEGROUP]
+    assert (frozenset(self.glm.list_owned(locking.LEVEL_NODE)) ==
+            frozenset(self.op.nodes))
+
+    expected_locks = (set([self.group_uuid]) |
+                      self.cfg.GetNodeGroupsFromNodes(self.op.nodes))
+    actual_locks = self.glm.list_owned(locking.LEVEL_NODEGROUP)
+    if actual_locks != expected_locks:
+      raise errors.OpExecError("Nodes changed groups since locks were acquired,"
+                               " current groups are '%s', used to be '%s'" %
+                               (utils.CommaJoin(expected_locks),
+                                utils.CommaJoin(actual_locks)))
+
+    self.node_data = self.cfg.GetAllNodesInfo()
     self.group = self.cfg.GetNodeGroup(self.group_uuid)
     instance_data = self.cfg.GetAllInstancesInfo()
 
@@ -10929,6 +11637,9 @@ class LUGroupAssignNodes(NoHooksLU):
     for node in self.op.nodes:
       self.node_data[node].group = self.group_uuid
 
+    # FIXME: Depends on side-effects of modifying the result of
+    # C{cfg.GetAllNodesInfo}
+
     self.cfg.Update(self.group, feedback_fn) # Saves all modified nodes.
 
   @staticmethod
@@ -11286,6 +11997,181 @@ class LUGroupRename(LogicalUnit):
     return self.op.new_name
 
 
+class LUGroupEvacuate(LogicalUnit):
+  HPATH = "group-evacuate"
+  HTYPE = constants.HTYPE_GROUP
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    # This raises errors.OpPrereqError on its own:
+    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
+
+    if self.op.target_groups:
+      self.req_target_uuids = map(self.cfg.LookupNodeGroup,
+                                  self.op.target_groups)
+    else:
+      self.req_target_uuids = []
+
+    if self.group_uuid in self.req_target_uuids:
+      raise errors.OpPrereqError("Group to be evacuated (%s) can not be used"
+                                 " as a target group (targets are %s)" %
+                                 (self.group_uuid,
+                                  utils.CommaJoin(self.req_target_uuids)),
+                                 errors.ECODE_INVAL)
+
+    if not self.op.iallocator:
+      # Use default iallocator
+      self.op.iallocator = self.cfg.GetDefaultIAllocator()
+
+    if not self.op.iallocator:
+      raise errors.OpPrereqError("No iallocator was specified, neither in the"
+                                 " opcode nor as a cluster-wide default",
+                                 errors.ECODE_INVAL)
+
+    self.share_locks = _ShareAll()
+    self.needed_locks = {
+      locking.LEVEL_INSTANCE: [],
+      locking.LEVEL_NODEGROUP: [],
+      locking.LEVEL_NODE: [],
+      }
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_INSTANCE:
+      assert not self.needed_locks[locking.LEVEL_INSTANCE]
+
+      # Lock instances optimistically, needs verification once node and group
+      # locks have been acquired
+      self.needed_locks[locking.LEVEL_INSTANCE] = \
+        self.cfg.GetNodeGroupInstances(self.group_uuid)
+
+    elif level == locking.LEVEL_NODEGROUP:
+      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
+
+      if self.req_target_uuids:
+        lock_groups = set([self.group_uuid] + self.req_target_uuids)
+
+        # Lock all groups used by instances optimistically; this requires going
+        # via the node before it's locked, requiring verification later on
+        lock_groups.update(group_uuid
+                           for instance_name in
+                             self.glm.list_owned(locking.LEVEL_INSTANCE)
+                           for group_uuid in
+                             self.cfg.GetInstanceNodeGroups(instance_name))
+      else:
+        # No target groups, need to lock all of them
+        lock_groups = locking.ALL_SET
+
+      self.needed_locks[locking.LEVEL_NODEGROUP] = lock_groups
+
+    elif level == locking.LEVEL_NODE:
+      # This will only lock the nodes in the group to be evacuated which
+      # contain actual instances
+      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+      self._LockInstancesNodes()
+
+      # Lock all nodes in group to be evacuated
+      assert self.group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP)
+      member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members
+      self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
+
+  def CheckPrereq(self):
+    owned_instances = frozenset(self.glm.list_owned(locking.LEVEL_INSTANCE))
+    owned_groups = frozenset(self.glm.list_owned(locking.LEVEL_NODEGROUP))
+    owned_nodes = frozenset(self.glm.list_owned(locking.LEVEL_NODE))
+
+    assert owned_groups.issuperset(self.req_target_uuids)
+    assert self.group_uuid in owned_groups
+
+    # Check if locked instances are still correct
+    wanted_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
+    if owned_instances != wanted_instances:
+      raise errors.OpPrereqError("Instances in node group to be evacuated (%s)"
+                                 " changed since locks were acquired, wanted"
+                                 " %s, have %s; retry the operation" %
+                                 (self.group_uuid,
+                                  utils.CommaJoin(wanted_instances),
+                                  utils.CommaJoin(owned_instances)),
+                                 errors.ECODE_STATE)
+
+    # Get instance information
+    self.instances = dict((name, self.cfg.GetInstanceInfo(name))
+                          for name in owned_instances)
+
+    # Check if node groups for locked instances are still correct
+    for instance_name in owned_instances:
+      inst = self.instances[instance_name]
+      assert self.group_uuid in self.cfg.GetInstanceNodeGroups(instance_name), \
+        "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
+      assert owned_nodes.issuperset(inst.all_nodes), \
+        "Instance %s's nodes changed while we kept the lock" % instance_name
+
+      inst_groups = self.cfg.GetInstanceNodeGroups(instance_name)
+      if not owned_groups.issuperset(inst_groups):
+        raise errors.OpPrereqError("Instance %s's node groups changed since"
+                                   " locks were acquired, current groups"
+                                   " are '%s', owning groups '%s'; retry the"
+                                   " operation" %
+                                   (instance_name,
+                                    utils.CommaJoin(inst_groups),
+                                    utils.CommaJoin(owned_groups)),
+                                   errors.ECODE_STATE)
+
+    if self.req_target_uuids:
+      # User requested specific target groups
+      self.target_uuids = self.req_target_uuids
+    else:
+      # All groups except the one to be evacuated are potential targets
+      self.target_uuids = [group_uuid for group_uuid in owned_groups
+                           if group_uuid != self.group_uuid]
+
+      if not self.target_uuids:
+        raise errors.OpExecError("There are no possible target groups")
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    """
+    return {
+      "GROUP_NAME": self.op.group_name,
+      "TARGET_GROUPS": " ".join(self.target_uuids),
+      }
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
+    mn = self.cfg.GetMasterNode()
+
+    assert self.group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP)
+
+    run_nodes = [mn] + self.cfg.GetNodeGroup(self.group_uuid).members
+
+    return (run_nodes, run_nodes)
+
+  def Exec(self, feedback_fn):
+    instances = list(self.glm.list_owned(locking.LEVEL_INSTANCE))
+
+    assert self.group_uuid not in self.target_uuids
+
+    ial = IAllocator(self.cfg, self.rpc, constants.IALLOCATOR_MODE_CHG_GROUP,
+                     instances=instances, target_groups=self.target_uuids)
+
+    ial.Run(self.op.iallocator)
+
+    if not ial.success:
+      raise errors.OpPrereqError("Can't compute group evacuation using"
+                                 " iallocator '%s': %s" %
+                                 (self.op.iallocator, ial.info),
+                                 errors.ECODE_NORES)
+
+    jobs = _LoadNodeEvacResult(self, ial.result, self.op.early_release, False)
+
+    self.LogInfo("Iallocator returned %s job(s) for evacuating node group %s",
+                 len(jobs), self.op.group_name)
+
+    return ResultWithJobs(jobs)
+
+
 class TagsLU(NoHooksLU): # pylint: disable-msg=W0223
   """Generic tags LU.
 
@@ -11334,7 +12220,7 @@ class LUTagsGet(TagsLU):
     TagsLU.ExpandNames(self)
 
     # Share locks as this is only a read operation
-    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
+    self.share_locks = _ShareAll()
 
   def Exec(self, feedback_fn):
     """Returns the tag list.
@@ -11645,16 +12531,6 @@ class IAllocator(object):
   """
   # pylint: disable-msg=R0902
   # lots of instance attributes
-  _ALLO_KEYS = [
-    "name", "mem_size", "disks", "disk_template",
-    "os", "tags", "nics", "vcpus", "hypervisor",
-    ]
-  _RELO_KEYS = [
-    "name", "relocate_from",
-    ]
-  _EVAC_KEYS = [
-    "evac_nodes",
-    ]
 
   def __init__(self, cfg, rpc, mode, **kwargs):
     self.cfg = cfg
@@ -11663,28 +12539,28 @@ class IAllocator(object):
     self.in_text = self.out_text = self.in_data = self.out_data = None
     # init all input fields so that pylint is happy
     self.mode = mode
-    self.mem_size = self.disks = self.disk_template = None
+    self.memory = self.disks = self.disk_template = None
     self.os = self.tags = self.nics = self.vcpus = None
     self.hypervisor = None
     self.relocate_from = None
     self.name = None
     self.evac_nodes = None
+    self.instances = None
+    self.evac_mode = None
+    self.target_groups = []
     # computed fields
     self.required_nodes = None
     # init result fields
     self.success = self.info = self.result = None
-    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
-      keyset = self._ALLO_KEYS
-      fn = self._AddNewInstance
-    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
-      keyset = self._RELO_KEYS
-      fn = self._AddRelocateInstance
-    elif self.mode == constants.IALLOCATOR_MODE_MEVAC:
-      keyset = self._EVAC_KEYS
-      fn = self._AddEvacuateNodes
-    else:
+
+    try:
+      (fn, keydata, self._result_check) = self._MODE_DATA[self.mode]
+    except KeyError:
       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
                                    " IAllocator" % self.mode)
+
+    keyset = [n for (n, _) in keydata]
+
     for key in kwargs:
       if key not in keyset:
         raise errors.ProgrammerError("Invalid input parameter '%s' to"
@@ -11695,7 +12571,7 @@ class IAllocator(object):
       if key not in kwargs:
         raise errors.ProgrammerError("Missing input parameter '%s' to"
                                      " IAllocator" % key)
-    self._BuildInputData(fn)
+    self._BuildInputData(compat.partial(fn, self), keydata)
 
   def _ComputeClusterData(self):
     """Compute the generic allocator input data.
@@ -11724,7 +12600,7 @@ class IAllocator(object):
       hypervisor_name = self.hypervisor
     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
-    elif self.mode == constants.IALLOCATOR_MODE_MEVAC:
+    else:
       hypervisor_name = cluster_info.enabled_hypervisors[0]
 
     node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
@@ -11750,12 +12626,12 @@ class IAllocator(object):
     """Compute node groups data.
 
     """
-    ng = {}
-    for guuid, gdata in cfg.GetAllNodeGroupsInfo().items():
-      ng[guuid] = {
-        "name": gdata.name,
-        "alloc_policy": gdata.alloc_policy,
-        }
+    ng = dict((guuid, {
+      "name": gdata.name,
+      "alloc_policy": gdata.alloc_policy,
+      })
+      for guuid, gdata in cfg.GetAllNodeGroupsInfo().items())
+
     return ng
 
   @staticmethod
@@ -11766,22 +12642,19 @@ class IAllocator(object):
     @returns: a dict of name: (node dict, node config)
 
     """
-    node_results = {}
-    for ninfo in node_cfg.values():
-      # fill in static (config-based) values
-      pnr = {
-        "tags": list(ninfo.GetTags()),
-        "primary_ip": ninfo.primary_ip,
-        "secondary_ip": ninfo.secondary_ip,
-        "offline": ninfo.offline,
-        "drained": ninfo.drained,
-        "master_candidate": ninfo.master_candidate,
-        "group": ninfo.group,
-        "master_capable": ninfo.master_capable,
-        "vm_capable": ninfo.vm_capable,
-        }
-
-      node_results[ninfo.name] = pnr
+    # fill in static (config-based) values
+    node_results = dict((ninfo.name, {
+      "tags": list(ninfo.GetTags()),
+      "primary_ip": ninfo.primary_ip,
+      "secondary_ip": ninfo.secondary_ip,
+      "offline": ninfo.offline,
+      "drained": ninfo.drained,
+      "master_candidate": ninfo.master_candidate,
+      "group": ninfo.group,
+      "master_capable": ninfo.master_capable,
+      "vm_capable": ninfo.vm_capable,
+      })
+      for ninfo in node_cfg.values())
 
     return node_results
 
@@ -11805,8 +12678,8 @@ class IAllocator(object):
                                 nname)
         remote_info = nresult.payload
 
-        for attr in ['memory_total', 'memory_free', 'memory_dom0',
-                     'vg_size', 'vg_free', 'cpu_total']:
+        for attr in ["memory_total", "memory_free", "memory_dom0",
+                     "vg_size", "vg_free", "cpu_total"]:
           if attr not in remote_info:
             raise errors.OpExecError("Node '%s' didn't return attribute"
                                      " '%s'" % (nname, attr))
@@ -11822,21 +12695,21 @@ class IAllocator(object):
             if iinfo.name not in node_iinfo[nname].payload:
               i_used_mem = 0
             else:
-              i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
+              i_used_mem = int(node_iinfo[nname].payload[iinfo.name]["memory"])
             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
-            remote_info['memory_free'] -= max(0, i_mem_diff)
+            remote_info["memory_free"] -= max(0, i_mem_diff)
 
             if iinfo.admin_up:
               i_p_up_mem += beinfo[constants.BE_MEMORY]
 
         # compute memory used by instances
         pnr_dyn = {
-          "total_memory": remote_info['memory_total'],
-          "reserved_memory": remote_info['memory_dom0'],
-          "free_memory": remote_info['memory_free'],
-          "total_disk": remote_info['vg_size'],
-          "free_disk": remote_info['vg_free'],
-          "total_cpus": remote_info['cpu_total'],
+          "total_memory": remote_info["memory_total"],
+          "reserved_memory": remote_info["memory_dom0"],
+          "free_memory": remote_info["memory_free"],
+          "total_disk": remote_info["vg_size"],
+          "free_disk": remote_info["vg_free"],
+          "total_cpus": remote_info["cpu_total"],
           "i_pri_memory": i_p_mem,
           "i_pri_up_memory": i_p_up_mem,
           }
@@ -11855,11 +12728,12 @@ class IAllocator(object):
       nic_data = []
       for nic in iinfo.nics:
         filled_params = cluster_info.SimpleFillNIC(nic.nicparams)
-        nic_dict = {"mac": nic.mac,
-                    "ip": nic.ip,
-                    "mode": filled_params[constants.NIC_MODE],
-                    "link": filled_params[constants.NIC_LINK],
-                   }
+        nic_dict = {
+          "mac": nic.mac,
+          "ip": nic.ip,
+          "mode": filled_params[constants.NIC_MODE],
+          "link": filled_params[constants.NIC_LINK],
+          }
         if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
           nic_dict["bridge"] = filled_params[constants.NIC_LINK]
         nic_data.append(nic_dict)
@@ -11899,18 +12773,21 @@ class IAllocator(object):
       self.required_nodes = 2
     else:
       self.required_nodes = 1
+
     request = {
       "name": self.name,
       "disk_template": self.disk_template,
       "tags": self.tags,
       "os": self.os,
       "vcpus": self.vcpus,
-      "memory": self.mem_size,
+      "memory": self.memory,
       "disks": self.disks,
       "disk_space_total": disk_space,
       "nics": self.nics,
       "required_nodes": self.required_nodes,
+      "hypervisor": self.hypervisor,
       }
+
     return request
 
   def _AddRelocateInstance(self):
@@ -11958,7 +12835,25 @@ class IAllocator(object):
       }
     return request
 
-  def _BuildInputData(self, fn):
+  def _AddNodeEvacuate(self):
+    """Get data for node-evacuate requests.
+
+    """
+    return {
+      "instances": self.instances,
+      "evac_mode": self.evac_mode,
+      }
+
+  def _AddChangeGroup(self):
+    """Get data for node-evacuate requests.
+
+    """
+    return {
+      "instances": self.instances,
+      "target_groups": self.target_groups,
+      }
+
+  def _BuildInputData(self, fn, keydata):
     """Build input data structures.
 
     """
@@ -11966,10 +12861,75 @@ class IAllocator(object):
 
     request = fn()
     request["type"] = self.mode
+    for keyname, keytype in keydata:
+      if keyname not in request:
+        raise errors.ProgrammerError("Request parameter %s is missing" %
+                                     keyname)
+      val = request[keyname]
+      if not keytype(val):
+        raise errors.ProgrammerError("Request parameter %s doesn't pass"
+                                     " validation, value %s, expected"
+                                     " type %s" % (keyname, val, keytype))
     self.in_data["request"] = request
 
     self.in_text = serializer.Dump(self.in_data)
 
+  _STRING_LIST = ht.TListOf(ht.TString)
+  _JOB_LIST = ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, {
+     # pylint: disable-msg=E1101
+     # Class '...' has no 'OP_ID' member
+     "OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID,
+                          opcodes.OpInstanceMigrate.OP_ID,
+                          opcodes.OpInstanceReplaceDisks.OP_ID])
+     })))
+
+  _NEVAC_MOVED = \
+    ht.TListOf(ht.TAnd(ht.TIsLength(3),
+                       ht.TItems([ht.TNonEmptyString,
+                                  ht.TNonEmptyString,
+                                  ht.TListOf(ht.TNonEmptyString),
+                                 ])))
+  _NEVAC_FAILED = \
+    ht.TListOf(ht.TAnd(ht.TIsLength(2),
+                       ht.TItems([ht.TNonEmptyString,
+                                  ht.TMaybeString,
+                                 ])))
+  _NEVAC_RESULT = ht.TAnd(ht.TIsLength(3),
+                          ht.TItems([_NEVAC_MOVED, _NEVAC_FAILED, _JOB_LIST]))
+
+  _MODE_DATA = {
+    constants.IALLOCATOR_MODE_ALLOC:
+      (_AddNewInstance,
+       [
+        ("name", ht.TString),
+        ("memory", ht.TInt),
+        ("disks", ht.TListOf(ht.TDict)),
+        ("disk_template", ht.TString),
+        ("os", ht.TString),
+        ("tags", _STRING_LIST),
+        ("nics", ht.TListOf(ht.TDict)),
+        ("vcpus", ht.TInt),
+        ("hypervisor", ht.TString),
+        ], ht.TList),
+    constants.IALLOCATOR_MODE_RELOC:
+      (_AddRelocateInstance,
+       [("name", ht.TString), ("relocate_from", _STRING_LIST)],
+       ht.TList),
+    constants.IALLOCATOR_MODE_MEVAC:
+      (_AddEvacuateNodes, [("evac_nodes", _STRING_LIST)],
+       ht.TListOf(ht.TAnd(ht.TIsLength(2), _STRING_LIST))),
+     constants.IALLOCATOR_MODE_NODE_EVAC:
+      (_AddNodeEvacuate, [
+        ("instances", _STRING_LIST),
+        ("evac_mode", ht.TElemOf(constants.IALLOCATOR_NEVAC_MODES)),
+        ], _NEVAC_RESULT),
+     constants.IALLOCATOR_MODE_CHG_GROUP:
+      (_AddChangeGroup, [
+        ("instances", _STRING_LIST),
+        ("target_groups", _STRING_LIST),
+        ], _NEVAC_RESULT),
+    }
+
   def Run(self, name, validate=True, call_fn=None):
     """Run an instance allocator and return the results.
 
@@ -12010,28 +12970,48 @@ class IAllocator(object):
                                  " missing key '%s'" % key)
       setattr(self, key, rdict[key])
 
-    if not isinstance(rdict["result"], list):
-      raise errors.OpExecError("Can't parse iallocator results: 'result' key"
-                               " is not a list")
-
-    if self.mode == constants.IALLOCATOR_MODE_RELOC:
-      assert self.relocate_from is not None
-      assert self.required_nodes == 1
+    if not self._result_check(self.result):
+      raise errors.OpExecError("Iallocator returned invalid result,"
+                               " expected %s, got %s" %
+                               (self._result_check, self.result),
+                               errors.ECODE_INVAL)
 
+    if self.mode in (constants.IALLOCATOR_MODE_RELOC,
+                     constants.IALLOCATOR_MODE_MEVAC):
       node2group = dict((name, ndata["group"])
                         for (name, ndata) in self.in_data["nodes"].items())
 
       fn = compat.partial(self._NodesToGroups, node2group,
                           self.in_data["nodegroups"])
 
-      request_groups = fn(self.relocate_from)
-      result_groups = fn(rdict["result"])
+      if self.mode == constants.IALLOCATOR_MODE_RELOC:
+        assert self.relocate_from is not None
+        assert self.required_nodes == 1
+
+        request_groups = fn(self.relocate_from)
+        result_groups = fn(rdict["result"])
+
+        if result_groups != request_groups:
+          raise errors.OpExecError("Groups of nodes returned by iallocator (%s)"
+                                   " differ from original groups (%s)" %
+                                   (utils.CommaJoin(result_groups),
+                                    utils.CommaJoin(request_groups)))
+      elif self.mode == constants.IALLOCATOR_MODE_MEVAC:
+        request_groups = fn(self.evac_nodes)
+        for (instance_name, secnode) in self.result:
+          result_groups = fn([secnode])
+          if result_groups != request_groups:
+            raise errors.OpExecError("Iallocator returned new secondary node"
+                                     " '%s' (group '%s') for instance '%s'"
+                                     " which is not in original group '%s'" %
+                                     (secnode, utils.CommaJoin(result_groups),
+                                      instance_name,
+                                      utils.CommaJoin(request_groups)))
+      else:
+        raise errors.ProgrammerError("Unhandled mode '%s'" % self.mode)
 
-      if result_groups != request_groups:
-        raise errors.OpExecError("Groups of nodes returned by iallocator (%s)"
-                                 " differ from original groups (%s)" %
-                                 (utils.CommaJoin(result_groups),
-                                  utils.CommaJoin(request_groups)))
+    elif self.mode == constants.IALLOCATOR_MODE_NODE_EVAC:
+      assert self.evac_mode in constants.IALLOCATOR_NEVAC_MODES
 
     self.out_data = rdict
 
@@ -12082,7 +13062,7 @@ class LUTestAllocator(NoHooksLU):
 
     """
     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
-      for attr in ["mem_size", "disks", "disk_template",
+      for attr in ["memory", "disks", "disk_template",
                    "os", "tags", "nics", "vcpus"]:
         if not hasattr(self.op, attr):
           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
@@ -12099,10 +13079,10 @@ class LUTestAllocator(NoHooksLU):
                                    errors.ECODE_INVAL)
       for row in self.op.disks:
         if (not isinstance(row, dict) or
-            "size" not in row or
-            not isinstance(row["size"], int) or
-            "mode" not in row or
-            row["mode"] not in ['r', 'w']):
+            constants.IDISK_SIZE not in row or
+            not isinstance(row[constants.IDISK_SIZE], int) or
+            constants.IDISK_MODE not in row or
+            row[constants.IDISK_MODE] not in constants.DISK_ACCESS_SET):
           raise errors.OpPrereqError("Invalid contents of the 'disks'"
                                      " parameter", errors.ECODE_INVAL)
       if self.op.hypervisor is None:
@@ -12115,6 +13095,11 @@ class LUTestAllocator(NoHooksLU):
       if not hasattr(self.op, "evac_nodes"):
         raise errors.OpPrereqError("Missing attribute 'evac_nodes' on"
                                    " opcode input", errors.ECODE_INVAL)
+    elif self.op.mode in (constants.IALLOCATOR_MODE_CHG_GROUP,
+                          constants.IALLOCATOR_MODE_NODE_EVAC):
+      if not self.op.instances:
+        raise errors.OpPrereqError("Missing instances", errors.ECODE_INVAL)
+      self.op.instances = _GetWantedInstances(self, self.op.instances)
     else:
       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
                                  self.op.mode, errors.ECODE_INVAL)
@@ -12135,7 +13120,7 @@ class LUTestAllocator(NoHooksLU):
       ial = IAllocator(self.cfg, self.rpc,
                        mode=self.op.mode,
                        name=self.op.name,
-                       mem_size=self.op.mem_size,
+                       memory=self.op.memory,
                        disks=self.op.disks,
                        disk_template=self.op.disk_template,
                        os=self.op.os,
@@ -12154,6 +13139,16 @@ class LUTestAllocator(NoHooksLU):
       ial = IAllocator(self.cfg, self.rpc,
                        mode=self.op.mode,
                        evac_nodes=self.op.evac_nodes)
+    elif self.op.mode == constants.IALLOCATOR_MODE_CHG_GROUP:
+      ial = IAllocator(self.cfg, self.rpc,
+                       mode=self.op.mode,
+                       instances=self.op.instances,
+                       target_groups=self.op.target_groups)
+    elif self.op.mode == constants.IALLOCATOR_MODE_NODE_EVAC:
+      ial = IAllocator(self.cfg, self.rpc,
+                       mode=self.op.mode,
+                       instances=self.op.instances,
+                       evac_mode=self.op.evac_mode)
     else:
       raise errors.ProgrammerError("Uncatched mode %s in"
                                    " LUTestAllocator.Exec", self.op.mode)