iallocator: Relocation nodes must be in same group
[ganeti-local] / lib / cmdlib.py
index 1fd50ef..f355a39 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
-# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -39,6 +39,7 @@ import OpenSSL
 import socket
 import tempfile
 import shutil
+import itertools
 
 from ganeti import ssh
 from ganeti import utils
@@ -53,41 +54,48 @@ from ganeti import uidpool
 from ganeti import compat
 from ganeti import masterd
 from ganeti import netutils
-from ganeti import ht
+from ganeti import query
+from ganeti import qlang
+from ganeti import opcodes
 
 import ganeti.masterd.instance # pylint: disable-msg=W0611
 
-# Common opcode attributes
 
-#: output fields for a query operation
-_POutputFields = ("output_fields", ht.NoDefault, ht.TListOf(ht.TNonEmptyString))
+def _SupportsOob(cfg, node):
+  """Tells if node supports OOB.
 
+  @type cfg: L{config.ConfigWriter}
+  @param cfg: The cluster configuration
+  @type node: L{objects.Node}
+  @param node: The node
+  @return: The OOB script if supported or an empty string otherwise
+
+  """
+  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
 
-#: the shutdown timeout
-_PShutdownTimeout = ("shutdown_timeout", constants.DEFAULT_SHUTDOWN_TIMEOUT,
-                     ht.TPositiveInt)
 
-#: the force parameter
-_PForce = ("force", False, ht.TBool)
+class ResultWithJobs:
+  """Data container for LU results with jobs.
 
-#: a required instance name (for single-instance LUs)
-_PInstanceName = ("instance_name", ht.NoDefault, ht.TNonEmptyString)
+  Instances of this class returned from L{LogicalUnit.Exec} will be recognized
+  by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
+  contained in the C{jobs} attribute and include the job IDs in the opcode
+  result.
 
-#: Whether to ignore offline nodes
-_PIgnoreOfflineNodes = ("ignore_offline_nodes", False, ht.TBool)
+  """
+  def __init__(self, jobs, **kwargs):
+    """Initializes this class.
 
-#: a required node name (for single-node LUs)
-_PNodeName = ("node_name", ht.NoDefault, ht.TNonEmptyString)
+    Additional return values can be specified as keyword arguments.
 
-#: the migration type (live/non-live)
-_PMigrationMode = ("mode", None,
-                   ht.TOr(ht.TNone, ht.TElemOf(constants.HT_MIGRATION_MODES)))
+    @type jobs: list of lists of L{opcode.OpCode}
+    @param jobs: A list of lists of opcode objects
 
-#: the obsolete 'live' mode (boolean)
-_PMigrationLive = ("live", None, ht.TMaybeBool)
+    """
+    self.jobs = jobs
+    self.other = kwargs
 
 
-# End types
 class LogicalUnit(object):
   """Logical Unit base class.
 
@@ -96,6 +104,7 @@ class LogicalUnit(object):
     - implement CheckPrereq (except when tasklets are used)
     - implement Exec (except when tasklets are used)
     - implement BuildHooksEnv
+    - implement BuildHooksNodes
     - redefine HPATH and HTYPE
     - optionally redefine their run requirements:
         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
@@ -104,13 +113,10 @@ class LogicalUnit(object):
 
   @ivar dry_run_result: the value (if any) that will be returned to the caller
       in dry-run mode (signalled by opcode dry_run parameter)
-  @cvar _OP_PARAMS: a list of opcode attributes, their defaults values
-      they should get if not already defined, and types they must match
 
   """
   HPATH = None
   HTYPE = None
-  _OP_PARAMS = []
   REQ_BGL = True
 
   def __init__(self, processor, op, context, rpc):
@@ -149,32 +155,8 @@ class LogicalUnit(object):
     # Tasklets
     self.tasklets = None
 
-    # The new kind-of-type-system
-    op_id = self.op.OP_ID
-    for attr_name, aval, test in self._OP_PARAMS:
-      if not hasattr(op, attr_name):
-        if aval == ht.NoDefault:
-          raise errors.OpPrereqError("Required parameter '%s.%s' missing" %
-                                     (op_id, attr_name), errors.ECODE_INVAL)
-        else:
-          if callable(aval):
-            dval = aval()
-          else:
-            dval = aval
-          setattr(self.op, attr_name, dval)
-      attr_val = getattr(op, attr_name)
-      if test == ht.NoType:
-        # no tests here
-        continue
-      if not callable(test):
-        raise errors.ProgrammerError("Validation for parameter '%s.%s' failed,"
-                                     " given type is not a proper type (%s)" %
-                                     (op_id, attr_name, test))
-      if not test(attr_val):
-        logging.error("OpCode %s, parameter %s, has invalid type %s/value %s",
-                      self.op.OP_ID, attr_name, type(attr_val), attr_val)
-        raise errors.OpPrereqError("Parameter '%s.%s' fails validation" %
-                                   (op_id, attr_name), errors.ECODE_INVAL)
+    # Validate opcode parameters and set defaults
+    self.op.Validate(True)
 
     self.CheckArguments()
 
@@ -212,7 +194,7 @@ class LogicalUnit(object):
     This method is called before starting to execute the opcode, and it should
     update all the parameters of the opcode to their canonical form (e.g. a
     short node name must be fully expanded after this method has successfully
-    completed). This way locking, hooks, logging, ecc. can work correctly.
+    completed). This way locking, hooks, logging, etc. can work correctly.
 
     LUs which implement this method must also populate the self.needed_locks
     member, as a dict with lock levels as keys, and a list of needed lock names
@@ -313,21 +295,28 @@ class LogicalUnit(object):
   def BuildHooksEnv(self):
     """Build hooks environment for this LU.
 
-    This method should return a three-node tuple consisting of: a dict
-    containing the environment that will be used for running the
-    specific hook for this LU, a list of node names on which the hook
-    should run before the execution, and a list of node names on which
-    the hook should run after the execution.
+    @rtype: dict
+    @return: Dictionary containing the environment that will be used for
+      running the hooks for this LU. The keys of the dict must not be prefixed
+      with "GANETI_"--that'll be added by the hooks runner. The hooks runner
+      will extend the environment with additional variables. If no environment
+      should be defined, an empty dictionary should be returned (not C{None}).
+    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
+      will not be called.
 
-    The keys of the dict must not have 'GANETI_' prefixed as this will
-    be handled in the hooks runner. Also note additional keys will be
-    added by the hooks runner. If the LU doesn't define any
-    environment, an empty dict (and not None) should be returned.
+    """
+    raise NotImplementedError
 
-    No nodes should be returned as an empty list (and not None).
+  def BuildHooksNodes(self):
+    """Build list of nodes to run LU's hooks.
 
-    Note that if the HPATH for a LU class is None, this function will
-    not be called.
+    @rtype: tuple; (list, list)
+    @return: Tuple containing a list of node names on which the hook
+      should run before the execution and a list of node names on which the
+      hook should run after the execution. No nodes should be returned as an
+      empty list (and not None).
+    @note: If the C{HPATH} attribute of the LU class is C{None}, this function
+      will not be called.
 
     """
     raise NotImplementedError
@@ -437,7 +426,13 @@ class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
     This just raises an error.
 
     """
-    assert False, "BuildHooksEnv called for NoHooksLUs"
+    raise AssertionError("BuildHooksEnv called for NoHooksLUs")
+
+  def BuildHooksNodes(self):
+    """Empty BuildHooksNodes for NoHooksLU.
+
+    """
+    raise AssertionError("BuildHooksNodes called for NoHooksLU")
 
 
 class Tasklet:
@@ -486,6 +481,95 @@ class Tasklet:
     raise NotImplementedError
 
 
+class _QueryBase:
+  """Base for query utility classes.
+
+  """
+  #: Attribute holding field definitions
+  FIELDS = None
+
+  def __init__(self, filter_, fields, use_locking):
+    """Initializes this class.
+
+    """
+    self.use_locking = use_locking
+
+    self.query = query.Query(self.FIELDS, fields, filter_=filter_,
+                             namefield="name")
+    self.requested_data = self.query.RequestedData()
+    self.names = self.query.RequestedNames()
+
+    # Sort only if no names were requested
+    self.sort_by_name = not self.names
+
+    self.do_locking = None
+    self.wanted = None
+
+  def _GetNames(self, lu, all_names, lock_level):
+    """Helper function to determine names asked for in the query.
+
+    """
+    if self.do_locking:
+      names = lu.acquired_locks[lock_level]
+    else:
+      names = all_names
+
+    if self.wanted == locking.ALL_SET:
+      assert not self.names
+      # caller didn't specify names, so ordering is not important
+      return utils.NiceSort(names)
+
+    # caller specified names and we must keep the same order
+    assert self.names
+    assert not self.do_locking or lu.acquired_locks[lock_level]
+
+    missing = set(self.wanted).difference(names)
+    if missing:
+      raise errors.OpExecError("Some items were removed before retrieving"
+                               " their data: %s" % missing)
+
+    # Return expanded names
+    return self.wanted
+
+  def ExpandNames(self, lu):
+    """Expand names for this query.
+
+    See L{LogicalUnit.ExpandNames}.
+
+    """
+    raise NotImplementedError()
+
+  def DeclareLocks(self, lu, level):
+    """Declare locks for this query.
+
+    See L{LogicalUnit.DeclareLocks}.
+
+    """
+    raise NotImplementedError()
+
+  def _GetQueryData(self, lu):
+    """Collects all data for this query.
+
+    @return: Query data object
+
+    """
+    raise NotImplementedError()
+
+  def NewStyleQuery(self, lu):
+    """Collect data and execute query.
+
+    """
+    return query.GetQueryResponse(self.query, self._GetQueryData(lu),
+                                  sort_by_name=self.sort_by_name)
+
+  def OldStyleQuery(self, lu):
+    """Collect data and execute query.
+
+    """
+    return self.query.OldStyleQuery(self._GetQueryData(lu),
+                                    sort_by_name=self.sort_by_name)
+
+
 def _GetWantedNodes(lu, nodes):
   """Returns list of checked and expanded node names.
 
@@ -498,12 +582,10 @@ def _GetWantedNodes(lu, nodes):
   @raise errors.ProgrammerError: if the nodes parameter is wrong type
 
   """
-  if not nodes:
-    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
-      " non-empty list of nodes whose name is to be expanded.")
+  if nodes:
+    return [_ExpandNodeName(lu.cfg, name) for name in nodes]
 
-  wanted = [_ExpandNodeName(lu.cfg, name) for name in nodes]
-  return utils.NiceSort(wanted)
+  return utils.NiceSort(lu.cfg.GetNodeList())
 
 
 def _GetWantedInstances(lu, instances):
@@ -559,6 +641,18 @@ def _GetUpdatedParams(old_params, update_dict,
   return params_copy
 
 
+def _RunPostHook(lu, node_name):
+  """Runs the post-hook for an opcode on a single node.
+
+  """
+  hm = lu.proc.hmclass(lu.rpc.call_hooks_runner, lu)
+  try:
+    hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
+  except:
+    # pylint: disable-msg=W0702
+    lu.LogWarning("Errors occurred running hooks on %s" % node_name)
+
+
 def _CheckOutputFields(static, dynamic, selected):
   """Checks whether all selected fields are valid.
 
@@ -679,42 +773,6 @@ def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
       raise errors.OpExecError(msg)
 
 
-def _RequireFileStorage():
-  """Checks that file storage is enabled.
-
-  @raise errors.OpPrereqError: when file storage is disabled
-
-  """
-  if not constants.ENABLE_FILE_STORAGE:
-    raise errors.OpPrereqError("File storage disabled at configure time",
-                               errors.ECODE_INVAL)
-
-
-def _CheckDiskTemplate(template):
-  """Ensure a given disk template is valid.
-
-  """
-  if template not in constants.DISK_TEMPLATES:
-    msg = ("Invalid disk template name '%s', valid templates are: %s" %
-           (template, utils.CommaJoin(constants.DISK_TEMPLATES)))
-    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
-  if template == constants.DT_FILE:
-    _RequireFileStorage()
-  return True
-
-
-def _CheckStorageType(storage_type):
-  """Ensure a given storage type is valid.
-
-  """
-  if storage_type not in constants.VALID_STORAGE_TYPES:
-    raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
-                               errors.ECODE_INVAL)
-  if storage_type == constants.ST_FILE:
-    _RequireFileStorage()
-  return True
-
-
 def _GetClusterDomainSecret():
   """Reads the cluster domain secret.
 
@@ -858,7 +916,7 @@ def _NICListToTuple(lu, nics):
   """Build a list of nic information tuples.
 
   This list is suitable to be passed to _BuildInstanceHookEnv or as a return
-  value in LUQueryInstanceData.
+  value in LUInstanceQueryData.
 
   @type lu:  L{LogicalUnit}
   @param lu: the logical unit on whose behalf we execute
@@ -1021,7 +1079,7 @@ def _GetStorageTypeArgs(cfg, storage_type):
   # Special case for file storage
   if storage_type == constants.ST_FILE:
     # storage.FileStorage wants a list of storage directories
-    return [[cfg.GetFileStorageDir()]]
+    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
 
   return []
 
@@ -1075,7 +1133,7 @@ def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
                                  " iallocator.")
 
 
-class LUPostInitCluster(LogicalUnit):
+class LUClusterPostInit(LogicalUnit):
   """Logical unit for running hooks after cluster initialization.
 
   """
@@ -1086,9 +1144,15 @@ class LUPostInitCluster(LogicalUnit):
     """Build hooks env.
 
     """
-    env = {"OP_TARGET": self.cfg.GetClusterName()}
-    mn = self.cfg.GetMasterNode()
-    return env, [], [mn]
+    return {
+      "OP_TARGET": self.cfg.GetClusterName(),
+      }
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
+    return ([], [self.cfg.GetMasterNode()])
 
   def Exec(self, feedback_fn):
     """Nothing to do.
@@ -1097,7 +1161,7 @@ class LUPostInitCluster(LogicalUnit):
     return True
 
 
-class LUDestroyCluster(LogicalUnit):
+class LUClusterDestroy(LogicalUnit):
   """Logical unit for destroying the cluster.
 
   """
@@ -1108,8 +1172,15 @@ class LUDestroyCluster(LogicalUnit):
     """Build hooks env.
 
     """
-    env = {"OP_TARGET": self.cfg.GetClusterName()}
-    return env, [], []
+    return {
+      "OP_TARGET": self.cfg.GetClusterName(),
+      }
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
+    return ([], [])
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -1139,12 +1210,7 @@ class LUDestroyCluster(LogicalUnit):
     master = self.cfg.GetMasterNode()
 
     # Run post hooks on master node before it's removed
-    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
-    try:
-      hm.RunPhase(constants.HOOKS_PHASE_POST, [master])
-    except:
-      # pylint: disable-msg=W0702
-      self.LogWarning("Errors occurred running hooks on %s" % master)
+    _RunPostHook(self, master)
 
     result = self.rpc.call_node_stop_master(master, False)
     result.Raise("Could not disable the master role")
@@ -1153,7 +1219,7 @@ class LUDestroyCluster(LogicalUnit):
 
 
 def _VerifyCertificate(filename):
-  """Verifies a certificate for LUVerifyCluster.
+  """Verifies a certificate for LUClusterVerify.
 
   @type filename: string
   @param filename: Path to PEM file
@@ -1163,7 +1229,7 @@ def _VerifyCertificate(filename):
     cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
                                            utils.ReadFile(filename))
   except Exception, err: # pylint: disable-msg=W0703
-    return (LUVerifyCluster.ETYPE_ERROR,
+    return (LUClusterVerify.ETYPE_ERROR,
             "Failed to load X509 certificate %s: %s" % (filename, err))
 
   (errcode, msg) = \
@@ -1178,26 +1244,19 @@ def _VerifyCertificate(filename):
   if errcode is None:
     return (None, fnamemsg)
   elif errcode == utils.CERT_WARNING:
-    return (LUVerifyCluster.ETYPE_WARNING, fnamemsg)
+    return (LUClusterVerify.ETYPE_WARNING, fnamemsg)
   elif errcode == utils.CERT_ERROR:
-    return (LUVerifyCluster.ETYPE_ERROR, fnamemsg)
+    return (LUClusterVerify.ETYPE_ERROR, fnamemsg)
 
   raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
 
 
-class LUVerifyCluster(LogicalUnit):
+class LUClusterVerify(LogicalUnit):
   """Verifies the cluster status.
 
   """
   HPATH = "cluster-verify"
   HTYPE = constants.HTYPE_CLUSTER
-  _OP_PARAMS = [
-    ("skip_checks", ht.EmptyList,
-     ht.TListOf(ht.TElemOf(constants.VERIFY_OPTIONAL_CHECKS))),
-    ("verbose", False, ht.TBool),
-    ("error_codes", False, ht.TBool),
-    ("debug_simulate_errors", False, ht.TBool),
-    ]
   REQ_BGL = False
 
   TCLUSTER = "cluster"
@@ -1206,12 +1265,14 @@ class LUVerifyCluster(LogicalUnit):
 
   ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
   ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
+  ECLUSTERFILECHECK = (TCLUSTER, "ECLUSTERFILECHECK")
   EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
   EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
   EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
   EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
   EINSTANCEFAULTYDISK = (TINSTANCE, "EINSTANCEFAULTYDISK")
   EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
+  EINSTANCESPLITGROUPS = (TINSTANCE, "EINSTANCESPLITGROUPS")
   ENODEDRBD = (TNODE, "ENODEDRBD")
   ENODEDRBDHELPER = (TNODE, "ENODEDRBDHELPER")
   ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
@@ -1228,6 +1289,7 @@ class LUVerifyCluster(LogicalUnit):
   ENODEVERSION = (TNODE, "ENODEVERSION")
   ENODESETUP = (TNODE, "ENODESETUP")
   ENODETIME = (TNODE, "ENODETIME")
+  ENODEOOBPATH = (TNODE, "ENODEOOBPATH")
 
   ETYPE_FIELD = "code"
   ETYPE_ERROR = "ERROR"
@@ -1245,8 +1307,8 @@ class LUVerifyCluster(LogicalUnit):
     @ivar instances: a list of running instances (runtime)
     @ivar pinst: list of configured primary instances (config)
     @ivar sinst: list of configured secondary instances (config)
-    @ivar sbp: diction of {secondary-node: list of instances} of all peers
-        of this node (config)
+    @ivar sbp: dictionary of {primary-node: list of instances} for all
+        instances for which this node is secondary (config)
     @ivar mfree: free memory, as reported by hypervisor (runtime)
     @ivar dfree: free disk, as reported by the node (runtime)
     @ivar offline: the offline status (config)
@@ -1388,6 +1450,13 @@ class LUVerifyCluster(LogicalUnit):
         _ErrorIf(test, self.ENODEHV, node,
                  "hypervisor %s verify failure: '%s'", hv_name, hv_result)
 
+    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
+    if ninfo.vm_capable and isinstance(hvp_result, list):
+      for item, hv_name, hv_result in hvp_result:
+        _ErrorIf(True, self.ENODEHV, node,
+                 "hypervisor %s parameter verify failure (source %s): %s",
+                 hv_name, item, hv_result)
+
     test = nresult.get(constants.NV_NODESETUP,
                            ["Missing NODESETUP results"])
     _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
@@ -1538,7 +1607,7 @@ class LUVerifyCluster(LogicalUnit):
                node_current)
 
     for node, n_img in node_image.items():
-      if (not node == node_current):
+      if node != node_current:
         test = instance in n_img.instances
         _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
                  "instance should not run on node %s", node)
@@ -1548,7 +1617,11 @@ class LUVerifyCluster(LogicalUnit):
                 for idx, (success, status) in enumerate(disks)]
 
     for nname, success, bdev_status, idx in diskdata:
-      _ErrorIf(instanceconfig.admin_up and not success,
+      # the 'ghost node' construction in Exec() ensures that we have a
+      # node here
+      snode = node_image[nname]
+      bad_snode = snode.ghost or snode.offline
+      _ErrorIf(instanceconfig.admin_up and not success and not bad_snode,
                self.EINSTANCEFAULTYDISK, instance,
                "couldn't retrieve status for disk/%s on %s: %s",
                idx, nname, bdev_status)
@@ -1597,6 +1670,7 @@ class LUVerifyCluster(LogicalUnit):
     instances it was primary for.
 
     """
+    cluster_info = self.cfg.GetClusterInfo()
     for node, n_img in node_image.items():
       # This code checks that every node which is now listed as
       # secondary has enough memory to host all instances it is
@@ -1606,60 +1680,109 @@ class LUVerifyCluster(LogicalUnit):
       # WARNING: we currently take into account down instances as well
       # as up ones, considering that even if they're down someone
       # might want to start them even in the event of a node failure.
+      if n_img.offline:
+        # we're skipping offline nodes from the N+1 warning, since
+        # most likely we don't have good memory infromation from them;
+        # we already list instances living on such nodes, and that's
+        # enough warning
+        continue
       for prinode, instances in n_img.sbp.items():
         needed_mem = 0
         for instance in instances:
-          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
+          bep = cluster_info.FillBE(instance_cfg[instance])
           if bep[constants.BE_AUTO_BALANCE]:
             needed_mem += bep[constants.BE_MEMORY]
         test = n_img.mfree < needed_mem
         self._ErrorIf(test, self.ENODEN1, node,
-                      "not enough memory on to accommodate"
-                      " failovers should peer node %s fail", prinode)
+                      "not enough memory to accomodate instance failovers"
+                      " should node %s fail (%dMiB needed, %dMiB available)",
+                      prinode, needed_mem, n_img.mfree)
 
-  def _VerifyNodeFiles(self, ninfo, nresult, file_list, local_cksum,
-                       master_files):
-    """Verifies and computes the node required file checksums.
+  @classmethod
+  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
+                   (files_all, files_all_opt, files_mc, files_vm)):
+    """Verifies file checksums collected from all nodes.
 
-    @type ninfo: L{objects.Node}
-    @param ninfo: the node to check
-    @param nresult: the remote results for the node
-    @param file_list: required list of files
-    @param local_cksum: dictionary of local files and their checksums
-    @param master_files: list of files that only masters should have
+    @param errorif: Callback for reporting errors
+    @param nodeinfo: List of L{objects.Node} objects
+    @param master_node: Name of master node
+    @param all_nvinfo: RPC results
 
     """
-    node = ninfo.name
-    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+    node_names = frozenset(node.name for node in nodeinfo)
 
-    remote_cksum = nresult.get(constants.NV_FILELIST, None)
-    test = not isinstance(remote_cksum, dict)
-    _ErrorIf(test, self.ENODEFILECHECK, node,
-             "node hasn't returned file checksum data")
-    if test:
-      return
+    assert master_node in node_names
+    assert (len(files_all | files_all_opt | files_mc | files_vm) ==
+            sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
+           "Found file listed in more than one file list"
+
+    # Define functions determining which nodes to consider for a file
+    file2nodefn = dict([(filename, fn)
+      for (files, fn) in [(files_all, None),
+                          (files_all_opt, None),
+                          (files_mc, lambda node: (node.master_candidate or
+                                                   node.name == master_node)),
+                          (files_vm, lambda node: node.vm_capable)]
+      for filename in files])
+
+    fileinfo = dict((filename, {}) for filename in file2nodefn.keys())
+
+    for node in nodeinfo:
+      nresult = all_nvinfo[node.name]
+
+      if nresult.fail_msg or not nresult.payload:
+        node_files = None
+      else:
+        node_files = nresult.payload.get(constants.NV_FILELIST, None)
+
+      test = not (node_files and isinstance(node_files, dict))
+      errorif(test, cls.ENODEFILECHECK, node.name,
+              "Node did not return file checksum data")
+      if test:
+        continue
+
+      for (filename, checksum) in node_files.items():
+        # Check if the file should be considered for a node
+        fn = file2nodefn[filename]
+        if fn is None or fn(node):
+          fileinfo[filename].setdefault(checksum, set()).add(node.name)
+
+    for (filename, checksums) in fileinfo.items():
+      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
+
+      # Nodes having the file
+      with_file = frozenset(node_name
+                            for nodes in fileinfo[filename].values()
+                            for node_name in nodes)
+
+      # Nodes missing file
+      missing_file = node_names - with_file
+
+      if filename in files_all_opt:
+        # All or no nodes
+        errorif(missing_file and missing_file != node_names,
+                cls.ECLUSTERFILECHECK, None,
+                "File %s is optional, but it must exist on all or no nodes (not"
+                " found on %s)",
+                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
+      else:
+        errorif(missing_file, cls.ECLUSTERFILECHECK, None,
+                "File %s is missing from node(s) %s", filename,
+                utils.CommaJoin(utils.NiceSort(missing_file)))
 
-    for file_name in file_list:
-      node_is_mc = ninfo.master_candidate
-      must_have = (file_name not in master_files) or node_is_mc
-      # missing
-      test1 = file_name not in remote_cksum
-      # invalid checksum
-      test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
-      # existing and good
-      test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
-      _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
-               "file '%s' missing", file_name)
-      _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
-               "file '%s' has wrong checksum", file_name)
-      # not candidate and this is not a must-have file
-      _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
-               "file '%s' should not exist on non master"
-               " candidates (and the file is outdated)", file_name)
-      # all good, except non-master/non-must have combination
-      _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
-               "file '%s' should not exist"
-               " on non master candidates", file_name)
+      # See if there are multiple versions of the file
+      test = len(checksums) > 1
+      if test:
+        variants = ["variant %s on %s" %
+                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
+                    for (idx, (checksum, nodes)) in
+                      enumerate(sorted(checksums.items()))]
+      else:
+        variants = []
+
+      errorif(test, cls.ECLUSTERFILECHECK, None,
+              "File %s found with %s different checksums (%s)",
+              filename, len(checksums), "; ".join(variants))
 
   def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
                       drbd_map):
@@ -1818,6 +1941,22 @@ class LUVerifyCluster(LogicalUnit):
              "OSes present on reference node %s but missing on this node: %s",
              base.name, utils.CommaJoin(missing))
 
+  def _VerifyOob(self, ninfo, nresult):
+    """Verifies out of band functionality of a node.
+
+    @type ninfo: L{objects.Node}
+    @param ninfo: the node to check
+    @param nresult: the remote results for the node
+
+    """
+    node = ninfo.name
+    # We just have to verify the paths on master and/or master candidates
+    # as the oob helper is invoked on the master
+    if ((ninfo.master_candidate or ninfo.master_capable) and
+        constants.NV_OOB_PATHS in nresult):
+      for path_result in nresult[constants.NV_OOB_PATHS]:
+        self._ErrorIf(path_result, self.ENODEOOBPATH, node, path_result)
+
   def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
     """Verifies and updates the node volume data.
 
@@ -1916,18 +2055,26 @@ class LUVerifyCluster(LogicalUnit):
     @param node_image: Node objects
     @type instanceinfo: dict of (name, L{objects.Instance})
     @param instanceinfo: Instance objects
+    @rtype: {instance: {node: [(succes, payload)]}}
+    @return: a dictionary of per-instance dictionaries with nodes as
+        keys and disk information as values; the disk information is a
+        list of tuples (success, payload)
 
     """
     _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
 
     node_disks = {}
     node_disks_devonly = {}
+    diskless_instances = set()
+    diskless = constants.DT_DISKLESS
 
     for nname in nodelist:
+      node_instances = list(itertools.chain(node_image[nname].pinst,
+                                            node_image[nname].sinst))
+      diskless_instances.update(inst for inst in node_instances
+                                if instanceinfo[inst].disk_template == diskless)
       disks = [(inst, disk)
-               for instlist in [node_image[nname].pinst,
-                                node_image[nname].sinst]
-               for inst in instlist
+               for inst in node_instances
                for disk in instanceinfo[inst].disks]
 
       if not disks:
@@ -1956,31 +2103,60 @@ class LUVerifyCluster(LogicalUnit):
     instdisk = {}
 
     for (nname, nres) in result.items():
-      if nres.offline:
-        # Ignore offline node
-        continue
-
       disks = node_disks[nname]
 
-      msg = nres.fail_msg
-      _ErrorIf(msg, self.ENODERPC, nname,
-               "while getting disk information: %s", nres.fail_msg)
-      if msg:
+      if nres.offline:
         # No data from this node
-        data = len(disks) * [None]
+        data = len(disks) * [(False, "node offline")]
       else:
-        data = nres.payload
+        msg = nres.fail_msg
+        _ErrorIf(msg, self.ENODERPC, nname,
+                 "while getting disk information: %s", msg)
+        if msg:
+          # No data from this node
+          data = len(disks) * [(False, msg)]
+        else:
+          data = []
+          for idx, i in enumerate(nres.payload):
+            if isinstance(i, (tuple, list)) and len(i) == 2:
+              data.append(i)
+            else:
+              logging.warning("Invalid result from node %s, entry %d: %s",
+                              nname, idx, i)
+              data.append((False, "Invalid result from the remote node"))
 
       for ((inst, _), status) in zip(disks, data):
         instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
 
+    # Add empty entries for diskless instances.
+    for inst in diskless_instances:
+      assert inst not in instdisk
+      instdisk[inst] = {}
+
     assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
-                      len(nnames) <= len(instanceinfo[inst].all_nodes)
+                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
+                      compat.all(isinstance(s, (tuple, list)) and
+                                 len(s) == 2 for s in statuses)
                       for inst, nnames in instdisk.items()
                       for nname, statuses in nnames.items())
+    assert set(instdisk) == set(instanceinfo), "instdisk consistency failure"
 
     return instdisk
 
+  def _VerifyHVP(self, hvp_data):
+    """Verifies locally the syntax of the hypervisor parameters.
+
+    """
+    for item, hv_name, hv_params in hvp_data:
+      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
+             (item, hv_name))
+      try:
+        hv_class = hypervisor.GetHypervisor(hv_name)
+        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
+        hv_class.CheckParameterSyntax(hv_params)
+      except errors.GenericError, err:
+        self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err))
+
   def BuildHooksEnv(self):
     """Build hooks env.
 
@@ -1988,19 +2164,28 @@ class LUVerifyCluster(LogicalUnit):
     the output be logged in the verify output and the verification to fail.
 
     """
-    all_nodes = self.cfg.GetNodeList()
+    cfg = self.cfg
+
     env = {
-      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
+      "CLUSTER_TAGS": " ".join(cfg.GetClusterInfo().GetTags())
       }
-    for node in self.cfg.GetAllNodesInfo().values():
-      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
 
-    return env, [], all_nodes
+    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
+               for node in cfg.GetAllNodesInfo().values())
+
+    return env
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
+    return ([], self.cfg.GetNodeList())
 
   def Exec(self, feedback_fn):
     """Verify integrity of cluster, performing various test on nodes.
 
     """
+    # This method has too many local variables. pylint: disable-msg=R0914
     self.bad = False
     _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
     verbose = self.op.verbose
@@ -2020,9 +2205,11 @@ class LUVerifyCluster(LogicalUnit):
     cluster = self.cfg.GetClusterInfo()
     nodelist = utils.NiceSort(self.cfg.GetNodeList())
     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
+    nodeinfo_byname = dict(zip(nodelist, nodeinfo))
     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
                         for iname in instancelist)
+    groupinfo = self.cfg.GetAllNodeGroupsInfo()
     i_non_redundant = [] # Non redundant instances
     i_non_a_balanced = [] # Non auto-balanced instances
     n_offline = 0 # Count of offline nodes
@@ -2030,25 +2217,43 @@ class LUVerifyCluster(LogicalUnit):
     node_vol_should = {}
 
     # FIXME: verify OS list
+
+    # File verification
+    filemap = _ComputeAncillaryFiles(cluster, False)
+
     # do local checksums
-    master_files = [constants.CLUSTER_CONF_FILE]
     master_node = self.master_node = self.cfg.GetMasterNode()
     master_ip = self.cfg.GetMasterIP()
 
-    file_names = ssconf.SimpleStore().GetFileList()
-    file_names.extend(constants.ALL_CERT_FILES)
-    file_names.extend(master_files)
-    if cluster.modify_etc_hosts:
-      file_names.append(constants.ETC_HOSTS)
-
-    local_checksums = utils.FingerprintFiles(file_names)
+    # Compute the set of hypervisor parameters
+    hvp_data = []
+    for hv_name in hypervisors:
+      hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
+    for os_name, os_hvp in cluster.os_hvp.items():
+      for hv_name, hv_params in os_hvp.items():
+        if not hv_params:
+          continue
+        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
+        hvp_data.append(("os %s" % os_name, hv_name, full_params))
+    # TODO: collapse identical parameter values in a single one
+    for instance in instanceinfo.values():
+      if not instance.hvparams:
+        continue
+      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
+                       cluster.FillHV(instance)))
+    # and verify them locally
+    self._VerifyHVP(hvp_data)
 
     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
     node_verify_param = {
-      constants.NV_FILELIST: file_names,
+      constants.NV_FILELIST:
+        utils.UniqueSequence(filename
+                             for files in filemap
+                             for filename in files),
       constants.NV_NODELIST: [node.name for node in nodeinfo
                               if not node.offline],
       constants.NV_HYPERVISOR: hypervisors,
+      constants.NV_HVPARAMS: hvp_data,
       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
                                   node.secondary_ip) for node in nodeinfo
                                  if not node.offline],
@@ -2077,6 +2282,16 @@ class LUVerifyCluster(LogicalUnit):
                                                  vm_capable=node.vm_capable))
                       for node in nodeinfo)
 
+    # Gather OOB paths
+    oob_paths = []
+    for node in nodeinfo:
+      path = _SupportsOob(self.cfg, node)
+      if path and path not in oob_paths:
+        oob_paths.append(path)
+
+    if oob_paths:
+      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
+
     for instance in instancelist:
       inst_config = instanceinfo[instance]
 
@@ -2116,6 +2331,9 @@ class LUVerifyCluster(LogicalUnit):
     feedback_fn("* Gathering disk information (%s nodes)" % len(nodelist))
     instdisk = self._CollectDiskInfo(nodelist, node_image, instanceinfo)
 
+    feedback_fn("* Verifying configuration file consistency")
+    self._VerifyFiles(_ErrorIf, nodeinfo, master_node, all_nvinfo, filemap)
+
     feedback_fn("* Verifying node status")
 
     refos_img = None
@@ -2153,8 +2371,7 @@ class LUVerifyCluster(LogicalUnit):
       nimg.call_ok = self._VerifyNode(node_i, nresult)
       self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
       self._VerifyNodeNetwork(node_i, nresult)
-      self._VerifyNodeFiles(node_i, nresult, file_names, local_checksums,
-                            master_files)
+      self._VerifyOob(node_i, nresult)
 
       if nimg.vm_capable:
         self._VerifyNodeLVM(node_i, nresult, vg_name)
@@ -2185,8 +2402,10 @@ class LUVerifyCluster(LogicalUnit):
                self.ENODERPC, pnode, "instance %s, connection to"
                " primary node failed", instance)
 
-      if pnode_img.offline:
-        inst_nodes_offline.append(pnode)
+      _ErrorIf(inst_config.admin_up and pnode_img.offline,
+               self.EINSTANCEBADNODE, instance,
+               "instance is marked as running and lives on offline node %s",
+               inst_config.primary_node)
 
       # If the instance is non-redundant we cannot survive losing its primary
       # node, so we are not N+1 compliant. On the other hand we have no disk
@@ -2195,11 +2414,33 @@ class LUVerifyCluster(LogicalUnit):
       # FIXME: does not support file-backed instances
       if not inst_config.secondary_nodes:
         i_non_redundant.append(instance)
+
       _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT,
                instance, "instance has multiple secondary nodes: %s",
                utils.CommaJoin(inst_config.secondary_nodes),
                code=self.ETYPE_WARNING)
 
+      if inst_config.disk_template in constants.DTS_INT_MIRROR:
+        pnode = inst_config.primary_node
+        instance_nodes = utils.NiceSort(inst_config.all_nodes)
+        instance_groups = {}
+
+        for node in instance_nodes:
+          instance_groups.setdefault(nodeinfo_byname[node].group,
+                                     []).append(node)
+
+        pretty_list = [
+          "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
+          # Sort so that we always list the primary node first.
+          for group, nodes in sorted(instance_groups.items(),
+                                     key=lambda (_, nodes): pnode in nodes,
+                                     reverse=True)]
+
+        self._ErrorIf(len(instance_groups) > 1, self.EINSTANCESPLITGROUPS,
+                      instance, "instance has primary and secondary nodes in"
+                      " different groups: %s", utils.CommaJoin(pretty_list),
+                      code=self.ETYPE_WARNING)
+
       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
         i_non_a_balanced.append(instance)
 
@@ -2213,7 +2454,7 @@ class LUVerifyCluster(LogicalUnit):
 
       # warn that the instance lives on offline nodes
       _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
-               "instance lives on offline node(s) %s",
+               "instance has offline secondary node(s) %s",
                utils.CommaJoin(inst_nodes_offline))
       # ... or ghost/non-vm_capable nodes
       for node in inst_config.all_nodes:
@@ -2296,7 +2537,7 @@ class LUVerifyCluster(LogicalUnit):
       return lu_result
 
 
-class LUVerifyDisks(NoHooksLU):
+class LUClusterVerifyDisks(NoHooksLU):
   """Verifies the cluster disks status.
 
   """
@@ -2320,16 +2561,13 @@ class LUVerifyDisks(NoHooksLU):
     """
     result = res_nodes, res_instances, res_missing = {}, [], {}
 
-    vg_name = self.cfg.GetVGName()
-    nodes = utils.NiceSort(self.cfg.GetNodeList())
-    instances = [self.cfg.GetInstanceInfo(name)
-                 for name in self.cfg.GetInstanceList()]
+    nodes = utils.NiceSort(self.cfg.GetVmCapableNodeList())
+    instances = self.cfg.GetAllInstancesInfo().values()
 
     nv_dict = {}
     for inst in instances:
       inst_lvs = {}
-      if (not inst.admin_up or
-          inst.disk_template not in constants.DTS_NET_MIRROR):
+      if not inst.admin_up:
         continue
       inst.MapLVsByNode(inst_lvs)
       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
@@ -2340,11 +2578,8 @@ class LUVerifyDisks(NoHooksLU):
     if not nv_dict:
       return result
 
-    node_lvs = self.rpc.call_lv_list(nodes, vg_name)
-
-    for node in nodes:
-      # node_volume
-      node_res = node_lvs[node]
+    node_lvs = self.rpc.call_lv_list(nodes, [])
+    for node, node_res in node_lvs.items():
       if node_res.offline:
         continue
       msg = node_res.fail_msg
@@ -2370,11 +2605,10 @@ class LUVerifyDisks(NoHooksLU):
     return result
 
 
-class LURepairDiskSizes(NoHooksLU):
+class LUClusterRepairDiskSizes(NoHooksLU):
   """Verifies the cluster disks sizes.
 
   """
-  _OP_PARAMS = [("instances", ht.EmptyList, ht.TListOf(ht.TNonEmptyString))]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -2454,16 +2688,18 @@ class LURepairDiskSizes(NoHooksLU):
       newl = [v[2].Copy() for v in dskl]
       for dsk in newl:
         self.cfg.SetDiskID(dsk, node)
-      result = self.rpc.call_blockdev_getsizes(node, newl)
+      result = self.rpc.call_blockdev_getsize(node, newl)
       if result.fail_msg:
-        self.LogWarning("Failure in blockdev_getsizes call to node"
+        self.LogWarning("Failure in blockdev_getsize call to node"
                         " %s, ignoring", node)
         continue
-      if len(result.data) != len(dskl):
+      if len(result.payload) != len(dskl):
+        logging.warning("Invalid result from node %s: len(dksl)=%d,"
+                        " result.payload=%s", node, len(dskl), result.payload)
         self.LogWarning("Invalid result from node %s, ignoring node results",
                         node)
         continue
-      for ((instance, idx, disk), size) in zip(dskl, result.data):
+      for ((instance, idx, disk), size) in zip(dskl, result.payload):
         if size is None:
           self.LogWarning("Disk %d of instance %s did not return size"
                           " information, ignoring", idx, instance.name)
@@ -2486,25 +2722,27 @@ class LURepairDiskSizes(NoHooksLU):
     return changed
 
 
-class LURenameCluster(LogicalUnit):
+class LUClusterRename(LogicalUnit):
   """Rename the cluster.
 
   """
   HPATH = "cluster-rename"
   HTYPE = constants.HTYPE_CLUSTER
-  _OP_PARAMS = [("name", ht.NoDefault, ht.TNonEmptyString)]
 
   def BuildHooksEnv(self):
     """Build hooks env.
 
     """
-    env = {
+    return {
       "OP_TARGET": self.cfg.GetClusterName(),
       "NEW_NAME": self.op.name,
       }
-    mn = self.cfg.GetMasterNode()
-    all_nodes = self.cfg.GetNodeList()
-    return env, [mn], all_nodes
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
+    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
 
   def CheckPrereq(self):
     """Verify that the passed name is a valid one.
@@ -2565,46 +2803,12 @@ class LURenameCluster(LogicalUnit):
     return clustername
 
 
-class LUSetClusterParams(LogicalUnit):
+class LUClusterSetParams(LogicalUnit):
   """Change the parameters of the cluster.
 
   """
   HPATH = "cluster-modify"
   HTYPE = constants.HTYPE_CLUSTER
-  _OP_PARAMS = [
-    ("vg_name", None, ht.TMaybeString),
-    ("enabled_hypervisors", None,
-     ht.TOr(ht.TAnd(ht.TListOf(ht.TElemOf(constants.HYPER_TYPES)), ht.TTrue),
-            ht.TNone)),
-    ("hvparams", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
-                              ht.TNone)),
-    ("beparams", None, ht.TOr(ht.TDict, ht.TNone)),
-    ("os_hvp", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
-                            ht.TNone)),
-    ("osparams", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
-                              ht.TNone)),
-    ("candidate_pool_size", None, ht.TOr(ht.TStrictPositiveInt, ht.TNone)),
-    ("uid_pool", None, ht.NoType),
-    ("add_uids", None, ht.NoType),
-    ("remove_uids", None, ht.NoType),
-    ("maintain_node_health", None, ht.TMaybeBool),
-    ("prealloc_wipe_disks", None, ht.TMaybeBool),
-    ("nicparams", None, ht.TOr(ht.TDict, ht.TNone)),
-    ("ndparams", None, ht.TOr(ht.TDict, ht.TNone)),
-    ("drbd_helper", None, ht.TOr(ht.TString, ht.TNone)),
-    ("default_iallocator", None, ht.TOr(ht.TString, ht.TNone)),
-    ("reserved_lvs", None, ht.TOr(ht.TListOf(ht.TNonEmptyString), ht.TNone)),
-    ("hidden_os", None, ht.TOr(ht.TListOf(\
-          ht.TAnd(ht.TList,
-                ht.TIsLength(2),
-                ht.TMap(lambda v: v[0], ht.TElemOf(constants.DDMS_VALUES)))),
-          ht.TNone)),
-    ("blacklisted_os", None, ht.TOr(ht.TListOf(\
-          ht.TAnd(ht.TList,
-                ht.TIsLength(2),
-                ht.TMap(lambda v: v[0], ht.TElemOf(constants.DDMS_VALUES)))),
-          ht.TNone)),
-    ]
   REQ_BGL = False
 
   def CheckArguments(self):
@@ -2632,12 +2836,17 @@ class LUSetClusterParams(LogicalUnit):
     """Build hooks env.
 
     """
-    env = {
+    return {
       "OP_TARGET": self.cfg.GetClusterName(),
       "NEW_VG_NAME": self.op.vg_name,
       }
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
     mn = self.cfg.GetMasterNode()
-    return env, [mn], [mn]
+    return ([mn], [mn])
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -2704,6 +2913,12 @@ class LUSetClusterParams(LogicalUnit):
       utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
       self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
 
+      # TODO: we need a more general way to handle resetting
+      # cluster-level parameters to default values
+      if self.new_ndparams["oob_program"] == "":
+        self.new_ndparams["oob_program"] = \
+            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
+
     if self.op.nicparams:
       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
       self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
@@ -2909,8 +3124,27 @@ class LUSetClusterParams(LogicalUnit):
     if self.op.blacklisted_os:
       helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
 
+    if self.op.master_netdev:
+      master = self.cfg.GetMasterNode()
+      feedback_fn("Shutting down master ip on the current netdev (%s)" %
+                  self.cluster.master_netdev)
+      result = self.rpc.call_node_stop_master(master, False)
+      result.Raise("Could not disable the master ip")
+      feedback_fn("Changing master_netdev from %s to %s" %
+                  (self.cluster.master_netdev, self.op.master_netdev))
+      self.cluster.master_netdev = self.op.master_netdev
+
     self.cfg.Update(self.cluster, feedback_fn)
 
+    if self.op.master_netdev:
+      feedback_fn("Starting the master ip on the new master netdev (%s)" %
+                  self.op.master_netdev)
+      result = self.rpc.call_node_start_master(master, False, False)
+      if result.fail_msg:
+        self.LogWarning("Could not re-enable the master ip on"
+                        " the master, please restart manually: %s",
+                        result.fail_msg)
+
 
 def _UploadHelper(lu, nodes, fname):
   """Helper for uploading a file and showing warnings.
@@ -2926,6 +3160,50 @@ def _UploadHelper(lu, nodes, fname):
         lu.proc.LogWarning(msg)
 
 
+def _ComputeAncillaryFiles(cluster, redist):
+  """Compute files external to Ganeti which need to be consistent.
+
+  @type redist: boolean
+  @param redist: Whether to include files which need to be redistributed
+
+  """
+  # Compute files for all nodes
+  files_all = set([
+    constants.SSH_KNOWN_HOSTS_FILE,
+    constants.CONFD_HMAC_KEY,
+    constants.CLUSTER_DOMAIN_SECRET_FILE,
+    ])
+
+  if not redist:
+    files_all.update(constants.ALL_CERT_FILES)
+    files_all.update(ssconf.SimpleStore().GetFileList())
+
+  if cluster.modify_etc_hosts:
+    files_all.add(constants.ETC_HOSTS)
+
+  # Files which must either exist on all nodes or on none
+  files_all_opt = set([
+    constants.RAPI_USERS_FILE,
+    ])
+
+  # Files which should only be on master candidates
+  files_mc = set()
+  if not redist:
+    files_mc.add(constants.CLUSTER_CONF_FILE)
+
+  # Files which should only be on VM-capable nodes
+  files_vm = set(filename
+    for hv_name in cluster.enabled_hypervisors
+    for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles())
+
+  # Filenames must be unique
+  assert (len(files_all | files_all_opt | files_mc | files_vm) ==
+          sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \
+         "Found file listed in more than one file list"
+
+  return (files_all, files_all_opt, files_mc, files_vm)
+
+
 def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
   """Distribute additional files which are part of the cluster configuration.
 
@@ -2939,43 +3217,45 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
   @param additional_vm: whether the additional nodes are vm-capable or not
 
   """
-  # 1. Gather target nodes
-  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
-  dist_nodes = lu.cfg.GetOnlineNodeList()
-  nvm_nodes = lu.cfg.GetNonVmCapableNodeList()
-  vm_nodes = [name for name in dist_nodes if name not in nvm_nodes]
+  # Gather target nodes
+  cluster = lu.cfg.GetClusterInfo()
+  master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
+
+  online_nodes = lu.cfg.GetOnlineNodeList()
+  vm_nodes = lu.cfg.GetVmCapableNodeList()
+
   if additional_nodes is not None:
-    dist_nodes.extend(additional_nodes)
+    online_nodes.extend(additional_nodes)
     if additional_vm:
       vm_nodes.extend(additional_nodes)
-  if myself.name in dist_nodes:
-    dist_nodes.remove(myself.name)
-  if myself.name in vm_nodes:
-    vm_nodes.remove(myself.name)
-
-  # 2. Gather files to distribute
-  dist_files = set([constants.ETC_HOSTS,
-                    constants.SSH_KNOWN_HOSTS_FILE,
-                    constants.RAPI_CERT_FILE,
-                    constants.RAPI_USERS_FILE,
-                    constants.CONFD_HMAC_KEY,
-                    constants.CLUSTER_DOMAIN_SECRET_FILE,
-                   ])
-
-  vm_files = set()
-  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
-  for hv_name in enabled_hypervisors:
-    hv_class = hypervisor.GetHypervisor(hv_name)
-    vm_files.update(hv_class.GetAncillaryFiles())
-
-  # 3. Perform the files upload
-  for fname in dist_files:
-    _UploadHelper(lu, dist_nodes, fname)
-  for fname in vm_files:
-    _UploadHelper(lu, vm_nodes, fname)
-
-
-class LURedistributeConfig(NoHooksLU):
+
+  # Never distribute to master node
+  for nodelist in [online_nodes, vm_nodes]:
+    if master_info.name in nodelist:
+      nodelist.remove(master_info.name)
+
+  # Gather file lists
+  (files_all, files_all_opt, files_mc, files_vm) = \
+    _ComputeAncillaryFiles(cluster, True)
+
+  # Never re-distribute configuration file from here
+  assert not (constants.CLUSTER_CONF_FILE in files_all or
+              constants.CLUSTER_CONF_FILE in files_vm)
+  assert not files_mc, "Master candidates not handled in this function"
+
+  filemap = [
+    (online_nodes, files_all),
+    (online_nodes, files_all_opt),
+    (vm_nodes, files_vm),
+    ]
+
+  # Upload the files
+  for (node_list, files) in filemap:
+    for fname in files:
+      _UploadHelper(lu, node_list, fname)
+
+
+class LUClusterRedistConf(NoHooksLU):
   """Force the redistribution of cluster configuration.
 
   This is a very simple LU.
@@ -3106,153 +3386,351 @@ def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
   return result
 
 
-class LUDiagnoseOS(NoHooksLU):
-  """Logical unit for OS diagnose/query.
+class LUOobCommand(NoHooksLU):
+  """Logical unit for OOB handling.
 
   """
-  _OP_PARAMS = [
-    _POutputFields,
-    ("names", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
-    ]
-  REQ_BGL = False
-  _HID = "hidden"
-  _BLK = "blacklisted"
-  _VLD = "valid"
-  _FIELDS_STATIC = utils.FieldSet()
-  _FIELDS_DYNAMIC = utils.FieldSet("name", _VLD, "node_status", "variants",
-                                   "parameters", "api_versions", _HID, _BLK)
+  REG_BGL = False
+  _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
 
-  def CheckArguments(self):
-    if self.op.names:
-      raise errors.OpPrereqError("Selective OS query not supported",
-                                 errors.ECODE_INVAL)
+  def CheckPrereq(self):
+    """Check prerequisites.
 
-    _CheckOutputFields(static=self._FIELDS_STATIC,
-                       dynamic=self._FIELDS_DYNAMIC,
-                       selected=self.op.output_fields)
+    This checks:
+     - the node exists in the configuration
+     - OOB is supported
 
-  def ExpandNames(self):
-    # Lock all nodes, in shared mode
-    # Temporary removal of locks, should be reverted later
-    # TODO: reintroduce locks when they are lighter-weight
-    self.needed_locks = {}
-    #self.share_locks[locking.LEVEL_NODE] = 1
-    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+    Any errors are signaled by raising errors.OpPrereqError.
 
-  @staticmethod
-  def _DiagnoseByOS(rlist):
-    """Remaps a per-node return list into an a per-os per-node dictionary
+    """
+    self.nodes = []
+    self.master_node = self.cfg.GetMasterNode()
 
-    @param rlist: a map with node names as keys and OS objects as values
+    assert self.op.power_delay >= 0.0
 
-    @rtype: dict
-    @return: a dictionary with osnames as keys and as value another
-        map, with nodes as keys and tuples of (path, status, diagnose,
-        variants, parameters, api_versions) as values, eg::
+    if self.op.node_names:
+      if self.op.command in self._SKIP_MASTER:
+        if self.master_node in self.op.node_names:
+          master_node_obj = self.cfg.GetNodeInfo(self.master_node)
+          master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
 
-          {"debian-etch": {"node1": [(/usr/lib/..., True, "", [], []),
-                                     (/srv/..., False, "invalid api")],
-                           "node2": [(/srv/..., True, "", [], [])]}
-          }
+          if master_oob_handler:
+            additional_text = ("Run '%s %s %s' if you want to operate on the"
+                               " master regardless") % (master_oob_handler,
+                                                        self.op.command,
+                                                        self.master_node)
+          else:
+            additional_text = "The master node does not support out-of-band"
 
-    """
-    all_os = {}
-    # we build here the list of nodes that didn't fail the RPC (at RPC
-    # level), so that nodes with a non-responding node daemon don't
-    # make all OSes invalid
-    good_nodes = [node_name for node_name in rlist
-                  if not rlist[node_name].fail_msg]
-    for node_name, nr in rlist.items():
-      if nr.fail_msg or not nr.payload:
-        continue
-      for (name, path, status, diagnose, variants,
-           params, api_versions) in nr.payload:
-        if name not in all_os:
-          # build a list of nodes for this os containing empty lists
-          # for each node in node_list
-          all_os[name] = {}
-          for nname in good_nodes:
-            all_os[name][nname] = []
-        # convert params from [name, help] to (name, help)
-        params = [tuple(v) for v in params]
-        all_os[name][node_name].append((path, status, diagnose,
-                                        variants, params, api_versions))
-    return all_os
+          raise errors.OpPrereqError(("Operating on the master node %s is not"
+                                      " allowed for %s\n%s") %
+                                     (self.master_node, self.op.command,
+                                      additional_text), errors.ECODE_INVAL)
+    else:
+      self.op.node_names = self.cfg.GetNodeList()
+      if self.op.command in self._SKIP_MASTER:
+        self.op.node_names.remove(self.master_node)
 
-  def Exec(self, feedback_fn):
-    """Compute the list of OSes.
+    if self.op.command in self._SKIP_MASTER:
+      assert self.master_node not in self.op.node_names
 
-    """
-    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
-    node_data = self.rpc.call_os_diagnose(valid_nodes)
-    pol = self._DiagnoseByOS(node_data)
-    output = []
-    cluster = self.cfg.GetClusterInfo()
+    for node_name in self.op.node_names:
+      node = self.cfg.GetNodeInfo(node_name)
+
+      if node is None:
+        raise errors.OpPrereqError("Node %s not found" % node_name,
+                                   errors.ECODE_NOENT)
+      else:
+        self.nodes.append(node)
+
+      if (not self.op.ignore_status and
+          (self.op.command == constants.OOB_POWER_OFF and not node.offline)):
+        raise errors.OpPrereqError(("Cannot power off node %s because it is"
+                                    " not marked offline") % node_name,
+                                   errors.ECODE_STATE)
+
+  def ExpandNames(self):
+    """Gather locks we need.
+
+    """
+    if self.op.node_names:
+      self.op.node_names = [_ExpandNodeName(self.cfg, name)
+                            for name in self.op.node_names]
+      lock_names = self.op.node_names
+    else:
+      lock_names = locking.ALL_SET
+
+    self.needed_locks = {
+      locking.LEVEL_NODE: lock_names,
+      }
+
+  def Exec(self, feedback_fn):
+    """Execute OOB and return result if we expect any.
+
+    """
+    master_node = self.master_node
+    ret = []
+
+    for idx, node in enumerate(self.nodes):
+      node_entry = [(constants.RS_NORMAL, node.name)]
+      ret.append(node_entry)
+
+      oob_program = _SupportsOob(self.cfg, node)
+
+      if not oob_program:
+        node_entry.append((constants.RS_UNAVAIL, None))
+        continue
+
+      logging.info("Executing out-of-band command '%s' using '%s' on %s",
+                   self.op.command, oob_program, node.name)
+      result = self.rpc.call_run_oob(master_node, oob_program,
+                                     self.op.command, node.name,
+                                     self.op.timeout)
+
+      if result.fail_msg:
+        self.LogWarning("On node '%s' out-of-band RPC failed with: %s",
+                        node.name, result.fail_msg)
+        node_entry.append((constants.RS_NODATA, None))
+      else:
+        try:
+          self._CheckPayload(result)
+        except errors.OpExecError, err:
+          self.LogWarning("The payload returned by '%s' is not valid: %s",
+                          node.name, err)
+          node_entry.append((constants.RS_NODATA, None))
+        else:
+          if self.op.command == constants.OOB_HEALTH:
+            # For health we should log important events
+            for item, status in result.payload:
+              if status in [constants.OOB_STATUS_WARNING,
+                            constants.OOB_STATUS_CRITICAL]:
+                self.LogWarning("On node '%s' item '%s' has status '%s'",
+                                node.name, item, status)
+
+          if self.op.command == constants.OOB_POWER_ON:
+            node.powered = True
+          elif self.op.command == constants.OOB_POWER_OFF:
+            node.powered = False
+          elif self.op.command == constants.OOB_POWER_STATUS:
+            powered = result.payload[constants.OOB_POWER_STATUS_POWERED]
+            if powered != node.powered:
+              logging.warning(("Recorded power state (%s) of node '%s' does not"
+                               " match actual power state (%s)"), node.powered,
+                              node.name, powered)
+
+          # For configuration changing commands we should update the node
+          if self.op.command in (constants.OOB_POWER_ON,
+                                 constants.OOB_POWER_OFF):
+            self.cfg.Update(node, feedback_fn)
+
+          node_entry.append((constants.RS_NORMAL, result.payload))
+
+          if (self.op.command == constants.OOB_POWER_ON and
+              idx < len(self.nodes) - 1):
+            time.sleep(self.op.power_delay)
+
+    return ret
+
+  def _CheckPayload(self, result):
+    """Checks if the payload is valid.
+
+    @param result: RPC result
+    @raises errors.OpExecError: If payload is not valid
+
+    """
+    errs = []
+    if self.op.command == constants.OOB_HEALTH:
+      if not isinstance(result.payload, list):
+        errs.append("command 'health' is expected to return a list but got %s" %
+                    type(result.payload))
+      else:
+        for item, status in result.payload:
+          if status not in constants.OOB_STATUSES:
+            errs.append("health item '%s' has invalid status '%s'" %
+                        (item, status))
+
+    if self.op.command == constants.OOB_POWER_STATUS:
+      if not isinstance(result.payload, dict):
+        errs.append("power-status is expected to return a dict but got %s" %
+                    type(result.payload))
+
+    if self.op.command in [
+        constants.OOB_POWER_ON,
+        constants.OOB_POWER_OFF,
+        constants.OOB_POWER_CYCLE,
+        ]:
+      if result.payload is not None:
+        errs.append("%s is expected to not return payload but got '%s'" %
+                    (self.op.command, result.payload))
+
+    if errs:
+      raise errors.OpExecError("Check of out-of-band payload failed due to %s" %
+                               utils.CommaJoin(errs))
+
+class _OsQuery(_QueryBase):
+  FIELDS = query.OS_FIELDS
+
+  def ExpandNames(self, lu):
+    # Lock all nodes in shared mode
+    # Temporary removal of locks, should be reverted later
+    # TODO: reintroduce locks when they are lighter-weight
+    lu.needed_locks = {}
+    #self.share_locks[locking.LEVEL_NODE] = 1
+    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+
+    # The following variables interact with _QueryBase._GetNames
+    if self.names:
+      self.wanted = self.names
+    else:
+      self.wanted = locking.ALL_SET
+
+    self.do_locking = self.use_locking
+
+  def DeclareLocks(self, lu, level):
+    pass
+
+  @staticmethod
+  def _DiagnoseByOS(rlist):
+    """Remaps a per-node return list into an a per-os per-node dictionary
+
+    @param rlist: a map with node names as keys and OS objects as values
+
+    @rtype: dict
+    @return: a dictionary with osnames as keys and as value another
+        map, with nodes as keys and tuples of (path, status, diagnose,
+        variants, parameters, api_versions) as values, eg::
+
+          {"debian-etch": {"node1": [(/usr/lib/..., True, "", [], []),
+                                     (/srv/..., False, "invalid api")],
+                           "node2": [(/srv/..., True, "", [], [])]}
+          }
+
+    """
+    all_os = {}
+    # we build here the list of nodes that didn't fail the RPC (at RPC
+    # level), so that nodes with a non-responding node daemon don't
+    # make all OSes invalid
+    good_nodes = [node_name for node_name in rlist
+                  if not rlist[node_name].fail_msg]
+    for node_name, nr in rlist.items():
+      if nr.fail_msg or not nr.payload:
+        continue
+      for (name, path, status, diagnose, variants,
+           params, api_versions) in nr.payload:
+        if name not in all_os:
+          # build a list of nodes for this os containing empty lists
+          # for each node in node_list
+          all_os[name] = {}
+          for nname in good_nodes:
+            all_os[name][nname] = []
+        # convert params from [name, help] to (name, help)
+        params = [tuple(v) for v in params]
+        all_os[name][node_name].append((path, status, diagnose,
+                                        variants, params, api_versions))
+    return all_os
+
+  def _GetQueryData(self, lu):
+    """Computes the list of nodes and their attributes.
+
+    """
+    # Locking is not used
+    assert not (lu.acquired_locks or self.do_locking or self.use_locking)
+
+    valid_nodes = [node.name
+                   for node in lu.cfg.GetAllNodesInfo().values()
+                   if not node.offline and node.vm_capable]
+    pol = self._DiagnoseByOS(lu.rpc.call_os_diagnose(valid_nodes))
+    cluster = lu.cfg.GetClusterInfo()
+
+    data = {}
+
+    for (os_name, os_data) in pol.items():
+      info = query.OsInfo(name=os_name, valid=True, node_status=os_data,
+                          hidden=(os_name in cluster.hidden_os),
+                          blacklisted=(os_name in cluster.blacklisted_os))
+
+      variants = set()
+      parameters = set()
+      api_versions = set()
 
-    for os_name in utils.NiceSort(pol.keys()):
-      os_data = pol[os_name]
-      row = []
-      valid = True
-      (variants, params, api_versions) = null_state = (set(), set(), set())
       for idx, osl in enumerate(os_data.values()):
-        valid = bool(valid and osl and osl[0][1])
-        if not valid:
-          (variants, params, api_versions) = null_state
+        info.valid = bool(info.valid and osl and osl[0][1])
+        if not info.valid:
           break
-        node_variants, node_params, node_api = osl[0][3:6]
-        if idx == 0: # first entry
-          variants = set(node_variants)
-          params = set(node_params)
-          api_versions = set(node_api)
-        else: # keep consistency
+
+        (node_variants, node_params, node_api) = osl[0][3:6]
+        if idx == 0:
+          # First entry
+          variants.update(node_variants)
+          parameters.update(node_params)
+          api_versions.update(node_api)
+        else:
+          # Filter out inconsistent values
           variants.intersection_update(node_variants)
-          params.intersection_update(node_params)
+          parameters.intersection_update(node_params)
           api_versions.intersection_update(node_api)
 
-      is_hid = os_name in cluster.hidden_os
-      is_blk = os_name in cluster.blacklisted_os
-      if ((self._HID not in self.op.output_fields and is_hid) or
-          (self._BLK not in self.op.output_fields and is_blk) or
-          (self._VLD not in self.op.output_fields and not valid)):
-        continue
+      info.variants = list(variants)
+      info.parameters = list(parameters)
+      info.api_versions = list(api_versions)
 
-      for field in self.op.output_fields:
-        if field == "name":
-          val = os_name
-        elif field == self._VLD:
-          val = valid
-        elif field == "node_status":
-          # this is just a copy of the dict
-          val = {}
-          for node_name, nos_list in os_data.items():
-            val[node_name] = nos_list
-        elif field == "variants":
-          val = utils.NiceSort(list(variants))
-        elif field == "parameters":
-          val = list(params)
-        elif field == "api_versions":
-          val = list(api_versions)
-        elif field == self._HID:
-          val = is_hid
-        elif field == self._BLK:
-          val = is_blk
-        else:
-          raise errors.ParameterError(field)
-        row.append(val)
-      output.append(row)
+      data[os_name] = info
 
-    return output
+    # Prepare data in requested order
+    return [data[name] for name in self._GetNames(lu, pol.keys(), None)
+            if name in data]
+
+
+class LUOsDiagnose(NoHooksLU):
+  """Logical unit for OS diagnose/query.
+
+  """
+  REQ_BGL = False
+
+  @staticmethod
+  def _BuildFilter(fields, names):
+    """Builds a filter for querying OSes.
 
+    """
+    name_filter = qlang.MakeSimpleFilter("name", names)
+
+    # Legacy behaviour: Hide hidden, blacklisted or invalid OSes if the
+    # respective field is not requested
+    status_filter = [[qlang.OP_NOT, [qlang.OP_TRUE, fname]]
+                     for fname in ["hidden", "blacklisted"]
+                     if fname not in fields]
+    if "valid" not in fields:
+      status_filter.append([qlang.OP_TRUE, "valid"])
+
+    if status_filter:
+      status_filter.insert(0, qlang.OP_AND)
+    else:
+      status_filter = None
+
+    if name_filter and status_filter:
+      return [qlang.OP_AND, name_filter, status_filter]
+    elif name_filter:
+      return name_filter
+    else:
+      return status_filter
+
+  def CheckArguments(self):
+    self.oq = _OsQuery(self._BuildFilter(self.op.output_fields, self.op.names),
+                       self.op.output_fields, False)
 
-class LURemoveNode(LogicalUnit):
+  def ExpandNames(self):
+    self.oq.ExpandNames(self)
+
+  def Exec(self, feedback_fn):
+    return self.oq.OldStyleQuery(self)
+
+
+class LUNodeRemove(LogicalUnit):
   """Logical unit for removing a node.
 
   """
   HPATH = "node-remove"
   HTYPE = constants.HTYPE_NODE
-  _OP_PARAMS = [
-    _PNodeName,
-    ]
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -3261,17 +3739,22 @@ class LURemoveNode(LogicalUnit):
     node would then be impossible to remove.
 
     """
-    env = {
+    return {
       "OP_TARGET": self.op.node_name,
       "NODE_NAME": self.op.node_name,
       }
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
     all_nodes = self.cfg.GetNodeList()
     try:
       all_nodes.remove(self.op.node_name)
     except ValueError:
-      logging.warning("Node %s which is about to be removed not found"
-                      " in the all nodes list", self.op.node_name)
-    return env, all_nodes, all_nodes
+      logging.warning("Node '%s', which is about to be removed, was not found"
+                      " in the list of all nodes", self.op.node_name)
+    return (all_nodes, all_nodes)
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -3320,12 +3803,7 @@ class LURemoveNode(LogicalUnit):
     self.context.RemoveNode(node.name)
 
     # Run post hooks on the node before it's removed
-    hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
-    try:
-      hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name])
-    except:
-      # pylint: disable-msg=W0702
-      self.LogWarning("Errors occurred running hooks on %s" % node.name)
+    _RunPostHook(self, node.name)
 
     result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
     msg = result.fail_msg
@@ -3343,116 +3821,54 @@ class LURemoveNode(LogicalUnit):
       _RedistributeAncillaryFiles(self)
 
 
-class LUQueryNodes(NoHooksLU):
-  """Logical unit for querying nodes.
-
-  """
-  # pylint: disable-msg=W0142
-  _OP_PARAMS = [
-    _POutputFields,
-    ("names", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
-    ("use_locking", False, ht.TBool),
-    ]
-  REQ_BGL = False
-
-  _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
-                    "master_candidate", "offline", "drained",
-                    "master_capable", "vm_capable"]
-
-  _FIELDS_DYNAMIC = utils.FieldSet(
-    "dtotal", "dfree",
-    "mtotal", "mnode", "mfree",
-    "bootid",
-    "ctotal", "cnodes", "csockets",
-    )
-
-  _FIELDS_STATIC = utils.FieldSet(*[
-    "pinst_cnt", "sinst_cnt",
-    "pinst_list", "sinst_list",
-    "pip", "sip", "tags",
-    "master", "role",
-    "group.uuid", "group",
-    ] + _SIMPLE_FIELDS
-    )
-
-  def CheckArguments(self):
-    _CheckOutputFields(static=self._FIELDS_STATIC,
-                       dynamic=self._FIELDS_DYNAMIC,
-                       selected=self.op.output_fields)
+class _NodeQuery(_QueryBase):
+  FIELDS = query.NODE_FIELDS
 
-  def ExpandNames(self):
-    self.needed_locks = {}
-    self.share_locks[locking.LEVEL_NODE] = 1
+  def ExpandNames(self, lu):
+    lu.needed_locks = {}
+    lu.share_locks[locking.LEVEL_NODE] = 1
 
-    if self.op.names:
-      self.wanted = _GetWantedNodes(self, self.op.names)
+    if self.names:
+      self.wanted = _GetWantedNodes(lu, self.names)
     else:
       self.wanted = locking.ALL_SET
 
-    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
-    self.do_locking = self.do_node_query and self.op.use_locking
+    self.do_locking = (self.use_locking and
+                       query.NQ_LIVE in self.requested_data)
+
     if self.do_locking:
       # if we don't request only static fields, we need to lock the nodes
-      self.needed_locks[locking.LEVEL_NODE] = self.wanted
+      lu.needed_locks[locking.LEVEL_NODE] = self.wanted
 
-  def Exec(self, feedback_fn):
+  def DeclareLocks(self, lu, level):
+    pass
+
+  def _GetQueryData(self, lu):
     """Computes the list of nodes and their attributes.
 
     """
-    all_info = self.cfg.GetAllNodesInfo()
-    if self.do_locking:
-      nodenames = self.acquired_locks[locking.LEVEL_NODE]
-    elif self.wanted != locking.ALL_SET:
-      nodenames = self.wanted
-      missing = set(nodenames).difference(all_info.keys())
-      if missing:
-        raise errors.OpExecError(
-          "Some nodes were removed before retrieving their data: %s" % missing)
-    else:
-      nodenames = all_info.keys()
-
-    nodenames = utils.NiceSort(nodenames)
-    nodelist = [all_info[name] for name in nodenames]
+    all_info = lu.cfg.GetAllNodesInfo()
 
-    if "group" in self.op.output_fields:
-      groups = self.cfg.GetAllNodeGroupsInfo()
-    else:
-      groups = {}
+    nodenames = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
 
-    # begin data gathering
+    # Gather data as requested
+    if query.NQ_LIVE in self.requested_data:
+      # filter out non-vm_capable nodes
+      toquery_nodes = [name for name in nodenames if all_info[name].vm_capable]
 
-    if self.do_node_query:
-      live_data = {}
-      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
-                                          self.cfg.GetHypervisorType())
-      for name in nodenames:
-        nodeinfo = node_data[name]
-        if not nodeinfo.fail_msg and nodeinfo.payload:
-          nodeinfo = nodeinfo.payload
-          fn = utils.TryConvert
-          live_data[name] = {
-            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
-            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
-            "mfree": fn(int, nodeinfo.get('memory_free', None)),
-            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
-            "dfree": fn(int, nodeinfo.get('vg_free', None)),
-            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
-            "bootid": nodeinfo.get('bootid', None),
-            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
-            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
-            }
-        else:
-          live_data[name] = {}
+      node_data = lu.rpc.call_node_info(toquery_nodes, lu.cfg.GetVGName(),
+                                        lu.cfg.GetHypervisorType())
+      live_data = dict((name, nresult.payload)
+                       for (name, nresult) in node_data.items()
+                       if not nresult.fail_msg and nresult.payload)
     else:
-      live_data = dict.fromkeys(nodenames, {})
+      live_data = None
 
-    node_to_primary = dict([(name, set()) for name in nodenames])
-    node_to_secondary = dict([(name, set()) for name in nodenames])
+    if query.NQ_INST in self.requested_data:
+      node_to_primary = dict([(name, set()) for name in nodenames])
+      node_to_secondary = dict([(name, set()) for name in nodenames])
 
-    inst_fields = frozenset(("pinst_cnt", "pinst_list",
-                             "sinst_cnt", "sinst_list"))
-    if inst_fields & frozenset(self.op.output_fields):
-      inst_data = self.cfg.GetAllInstancesInfo()
+      inst_data = lu.cfg.GetAllInstancesInfo()
 
       for inst in inst_data.values():
         if inst.primary_node in node_to_primary:
@@ -3460,70 +3876,49 @@ class LUQueryNodes(NoHooksLU):
         for secnode in inst.secondary_nodes:
           if secnode in node_to_secondary:
             node_to_secondary[secnode].add(inst.name)
+    else:
+      node_to_primary = None
+      node_to_secondary = None
+
+    if query.NQ_OOB in self.requested_data:
+      oob_support = dict((name, bool(_SupportsOob(lu.cfg, node)))
+                         for name, node in all_info.iteritems())
+    else:
+      oob_support = None
 
-    master_node = self.cfg.GetMasterNode()
+    if query.NQ_GROUP in self.requested_data:
+      groups = lu.cfg.GetAllNodeGroupsInfo()
+    else:
+      groups = {}
 
-    # end data gathering
+    return query.NodeQueryData([all_info[name] for name in nodenames],
+                               live_data, lu.cfg.GetMasterNode(),
+                               node_to_primary, node_to_secondary, groups,
+                               oob_support, lu.cfg.GetClusterInfo())
 
-    output = []
-    for node in nodelist:
-      node_output = []
-      for field in self.op.output_fields:
-        if field in self._SIMPLE_FIELDS:
-          val = getattr(node, field)
-        elif field == "pinst_list":
-          val = list(node_to_primary[node.name])
-        elif field == "sinst_list":
-          val = list(node_to_secondary[node.name])
-        elif field == "pinst_cnt":
-          val = len(node_to_primary[node.name])
-        elif field == "sinst_cnt":
-          val = len(node_to_secondary[node.name])
-        elif field == "pip":
-          val = node.primary_ip
-        elif field == "sip":
-          val = node.secondary_ip
-        elif field == "tags":
-          val = list(node.GetTags())
-        elif field == "master":
-          val = node.name == master_node
-        elif self._FIELDS_DYNAMIC.Matches(field):
-          val = live_data[node.name].get(field, None)
-        elif field == "role":
-          if node.name == master_node:
-            val = "M"
-          elif node.master_candidate:
-            val = "C"
-          elif node.drained:
-            val = "D"
-          elif node.offline:
-            val = "O"
-          else:
-            val = "R"
-        elif field == "group.uuid":
-          val = node.group
-        elif field == "group":
-          ng = groups.get(node.group, None)
-          if ng is None:
-            val = "<unknown>"
-          else:
-            val = ng.name
-        else:
-          raise errors.ParameterError(field)
-        node_output.append(val)
-      output.append(node_output)
 
-    return output
+class LUNodeQuery(NoHooksLU):
+  """Logical unit for querying nodes.
+
+  """
+  # pylint: disable-msg=W0142
+  REQ_BGL = False
+
+  def CheckArguments(self):
+    self.nq = _NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
+                         self.op.output_fields, self.op.use_locking)
+
+  def ExpandNames(self):
+    self.nq.ExpandNames(self)
+
+  def Exec(self, feedback_fn):
+    return self.nq.OldStyleQuery(self)
 
 
-class LUQueryNodeVolumes(NoHooksLU):
+class LUNodeQueryvols(NoHooksLU):
   """Logical unit for getting volumes on node(s).
 
   """
-  _OP_PARAMS = [
-    _POutputFields,
-    ("nodes", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
-    ]
   REQ_BGL = False
   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
   _FIELDS_STATIC = utils.FieldSet("node")
@@ -3598,17 +3993,11 @@ class LUQueryNodeVolumes(NoHooksLU):
     return output
 
 
-class LUQueryNodeStorage(NoHooksLU):
+class LUNodeQueryStorage(NoHooksLU):
   """Logical unit for getting information on storage units on node(s).
 
   """
   _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
-  _OP_PARAMS = [
-    _POutputFields,
-    ("nodes", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
-    ("storage_type", ht.NoDefault, _CheckStorageType),
-    ("name", None, ht.TMaybeString),
-    ]
   REQ_BGL = False
 
   def CheckArguments(self):
@@ -3687,16 +4076,144 @@ class LUQueryNodeStorage(NoHooksLU):
     return result
 
 
-class LUModifyNodeStorage(NoHooksLU):
+class _InstanceQuery(_QueryBase):
+  FIELDS = query.INSTANCE_FIELDS
+
+  def ExpandNames(self, lu):
+    lu.needed_locks = {}
+    lu.share_locks[locking.LEVEL_INSTANCE] = 1
+    lu.share_locks[locking.LEVEL_NODE] = 1
+
+    if self.names:
+      self.wanted = _GetWantedInstances(lu, self.names)
+    else:
+      self.wanted = locking.ALL_SET
+
+    self.do_locking = (self.use_locking and
+                       query.IQ_LIVE in self.requested_data)
+    if self.do_locking:
+      lu.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
+      lu.needed_locks[locking.LEVEL_NODE] = []
+      lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+  def DeclareLocks(self, lu, level):
+    if level == locking.LEVEL_NODE and self.do_locking:
+      lu._LockInstancesNodes() # pylint: disable-msg=W0212
+
+  def _GetQueryData(self, lu):
+    """Computes the list of instances and their attributes.
+
+    """
+    cluster = lu.cfg.GetClusterInfo()
+    all_info = lu.cfg.GetAllInstancesInfo()
+
+    instance_names = self._GetNames(lu, all_info.keys(), locking.LEVEL_INSTANCE)
+
+    instance_list = [all_info[name] for name in instance_names]
+    nodes = frozenset(itertools.chain(*(inst.all_nodes
+                                        for inst in instance_list)))
+    hv_list = list(set([inst.hypervisor for inst in instance_list]))
+    bad_nodes = []
+    offline_nodes = []
+    wrongnode_inst = set()
+
+    # Gather data as requested
+    if self.requested_data & set([query.IQ_LIVE, query.IQ_CONSOLE]):
+      live_data = {}
+      node_data = lu.rpc.call_all_instances_info(nodes, hv_list)
+      for name in nodes:
+        result = node_data[name]
+        if result.offline:
+          # offline nodes will be in both lists
+          assert result.fail_msg
+          offline_nodes.append(name)
+        if result.fail_msg:
+          bad_nodes.append(name)
+        elif result.payload:
+          for inst in result.payload:
+            if inst in all_info:
+              if all_info[inst].primary_node == name:
+                live_data.update(result.payload)
+              else:
+                wrongnode_inst.add(inst)
+            else:
+              # orphan instance; we don't list it here as we don't
+              # handle this case yet in the output of instance listing
+              logging.warning("Orphan instance '%s' found on node %s",
+                              inst, name)
+        # else no instance is alive
+    else:
+      live_data = {}
+
+    if query.IQ_DISKUSAGE in self.requested_data:
+      disk_usage = dict((inst.name,
+                         _ComputeDiskSize(inst.disk_template,
+                                          [{constants.IDISK_SIZE: disk.size}
+                                           for disk in inst.disks]))
+                        for inst in instance_list)
+    else:
+      disk_usage = None
+
+    if query.IQ_CONSOLE in self.requested_data:
+      consinfo = {}
+      for inst in instance_list:
+        if inst.name in live_data:
+          # Instance is running
+          consinfo[inst.name] = _GetInstanceConsole(cluster, inst)
+        else:
+          consinfo[inst.name] = None
+      assert set(consinfo.keys()) == set(instance_names)
+    else:
+      consinfo = None
+
+    return query.InstanceQueryData(instance_list, lu.cfg.GetClusterInfo(),
+                                   disk_usage, offline_nodes, bad_nodes,
+                                   live_data, wrongnode_inst, consinfo)
+
+
+class LUQuery(NoHooksLU):
+  """Query for resources/items of a certain kind.
+
+  """
+  # pylint: disable-msg=W0142
+  REQ_BGL = False
+
+  def CheckArguments(self):
+    qcls = _GetQueryImplementation(self.op.what)
+
+    self.impl = qcls(self.op.filter, self.op.fields, False)
+
+  def ExpandNames(self):
+    self.impl.ExpandNames(self)
+
+  def DeclareLocks(self, level):
+    self.impl.DeclareLocks(self, level)
+
+  def Exec(self, feedback_fn):
+    return self.impl.NewStyleQuery(self)
+
+
+class LUQueryFields(NoHooksLU):
+  """Query for resources/items of a certain kind.
+
+  """
+  # pylint: disable-msg=W0142
+  REQ_BGL = False
+
+  def CheckArguments(self):
+    self.qcls = _GetQueryImplementation(self.op.what)
+
+  def ExpandNames(self):
+    self.needed_locks = {}
+
+  def Exec(self, feedback_fn):
+    return query.QueryFields(self.qcls.FIELDS, self.op.fields)
+
+
+class LUNodeModifyStorage(NoHooksLU):
   """Logical unit for modifying a storage volume on a node.
 
   """
-  _OP_PARAMS = [
-    _PNodeName,
-    ("storage_type", ht.NoDefault, _CheckStorageType),
-    ("name", ht.NoDefault, ht.TNonEmptyString),
-    ("changes", ht.NoDefault, ht.TDict),
-    ]
   REQ_BGL = False
 
   def CheckArguments(self):
@@ -3735,22 +4252,12 @@ class LUModifyNodeStorage(NoHooksLU):
                  (self.op.name, self.op.node_name))
 
 
-class LUAddNode(LogicalUnit):
+class LUNodeAdd(LogicalUnit):
   """Logical unit for adding node to the cluster.
 
   """
   HPATH = "node-add"
   HTYPE = constants.HTYPE_NODE
-  _OP_PARAMS = [
-    _PNodeName,
-    ("primary_ip", None, ht.NoType),
-    ("secondary_ip", None, ht.TMaybeString),
-    ("readd", False, ht.TBool),
-    ("group", None, ht.TMaybeString),
-    ("master_capable", None, ht.TMaybeBool),
-    ("vm_capable", None, ht.TMaybeBool),
-    ("ndparams", None, ht.TOr(ht.TDict, ht.TNone)),
-    ]
   _NFLAGS = ["master_capable", "vm_capable"]
 
   def CheckArguments(self):
@@ -3769,7 +4276,7 @@ class LUAddNode(LogicalUnit):
     This will run on all nodes before, and on all nodes + the new node after.
 
     """
-    env = {
+    return {
       "OP_TARGET": self.op.node_name,
       "NODE_NAME": self.op.node_name,
       "NODE_PIP": self.op.primary_ip,
@@ -3777,9 +4284,16 @@ class LUAddNode(LogicalUnit):
       "MASTER_CAPABLE": str(self.op.master_capable),
       "VM_CAPABLE": str(self.op.vm_capable),
       }
-    nodes_0 = self.cfg.GetNodeList()
-    nodes_1 = nodes_0 + [self.op.node_name, ]
-    return env, nodes_0, nodes_1
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
+    # Exclude added node
+    pre_nodes = list(set(self.cfg.GetNodeList()) - set([self.op.node_name]))
+    post_nodes = pre_nodes + [self.op.node_name, ]
+
+    return (pre_nodes, post_nodes)
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -3919,6 +4433,9 @@ class LUAddNode(LogicalUnit):
     new_node = self.new_node
     node = new_node.name
 
+    # We adding a new node so we assume it's powered
+    new_node.powered = True
+
     # for re-adds, reset the offline/drained/master-candidate flags;
     # we need to reset here, otherwise offline would prevent RPC calls
     # later in the procedure; this also means that if the re-add
@@ -3941,6 +4458,8 @@ class LUAddNode(LogicalUnit):
 
     if self.op.ndparams:
       new_node.ndparams = self.op.ndparams
+    else:
+      new_node.ndparams = {}
 
     # check connectivity
     result = self.rpc.call_version([node])[node]
@@ -4002,7 +4521,7 @@ class LUAddNode(LogicalUnit):
       self.context.AddNode(new_node, self.proc.GetECId())
 
 
-class LUSetNodeParams(LogicalUnit):
+class LUNodeSetParams(LogicalUnit):
   """Modifies the parameters of a node.
 
   @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
@@ -4013,18 +4532,6 @@ class LUSetNodeParams(LogicalUnit):
   """
   HPATH = "node-modify"
   HTYPE = constants.HTYPE_NODE
-  _OP_PARAMS = [
-    _PNodeName,
-    ("master_candidate", None, ht.TMaybeBool),
-    ("offline", None, ht.TMaybeBool),
-    ("drained", None, ht.TMaybeBool),
-    ("auto_promote", False, ht.TBool),
-    ("master_capable", None, ht.TMaybeBool),
-    ("vm_capable", None, ht.TMaybeBool),
-    ("secondary_ip", None, ht.TMaybeString),
-    ("ndparams", None, ht.TOr(ht.TDict, ht.TNone)),
-    _PForce,
-    ]
   REQ_BGL = False
   (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
   _F2R = {
@@ -4083,7 +4590,7 @@ class LUSetNodeParams(LogicalUnit):
       if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
         for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
           instance = self.context.cfg.GetInstanceInfo(instance_name)
-          i_mirrored = instance.disk_template in constants.DTS_NET_MIRROR
+          i_mirrored = instance.disk_template in constants.DTS_INT_MIRROR
           if i_mirrored and self.op.node_name in instance.all_nodes:
             instances_keep.append(instance_name)
             self.affected_instances.append(instance)
@@ -4099,7 +4606,7 @@ class LUSetNodeParams(LogicalUnit):
     This runs on the master node.
 
     """
-    env = {
+    return {
       "OP_TARGET": self.op.node_name,
       "MASTER_CANDIDATE": str(self.op.master_candidate),
       "OFFLINE": str(self.op.offline),
@@ -4107,12 +4614,16 @@ class LUSetNodeParams(LogicalUnit):
       "MASTER_CAPABLE": str(self.op.master_capable),
       "VM_CAPABLE": str(self.op.vm_capable),
       }
-    nl = [self.cfg.GetMasterNode(),
-          self.op.node_name]
-    return env, nl, nl
 
-  def CheckPrereq(self):
-    """Check prerequisites.
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
+    nl = [self.cfg.GetMasterNode(), self.op.node_name]
+    return (nl, nl)
+
+  def CheckPrereq(self):
+    """Check prerequisites.
 
     This only checks the instance list against the existing names.
 
@@ -4141,15 +4652,15 @@ class LUSetNodeParams(LogicalUnit):
                                    errors.ECODE_STATE)
 
     if node.master_candidate and self.might_demote and not self.lock_all:
-      assert not self.op.auto_promote, "auto-promote set but lock_all not"
+      assert not self.op.auto_promote, "auto_promote set but lock_all not"
       # check if after removing the current node, we're missing master
       # candidates
       (mc_remaining, mc_should, _) = \
           self.cfg.GetMasterCandidateStats(exceptions=[node.name])
       if mc_remaining < mc_should:
         raise errors.OpPrereqError("Not enough master candidates, please"
-                                   " pass auto_promote to allow promotion",
-                                   errors.ECODE_STATE)
+                                   " pass auto promote option to allow"
+                                   " promotion", errors.ECODE_STATE)
 
     self.old_flags = old_flags = (node.master_candidate,
                                   node.drained, node.offline)
@@ -4165,6 +4676,18 @@ class LUSetNodeParams(LogicalUnit):
     # Past this point, any flag change to False means a transition
     # away from the respective state, as only real changes are kept
 
+    # TODO: We might query the real power state if it supports OOB
+    if _SupportsOob(self.cfg, node):
+      if self.op.offline is False and not (node.powered or
+                                           self.op.powered == True):
+        raise errors.OpPrereqError(("Please power on node %s first before you"
+                                    " can reset offline state") %
+                                   self.op.node_name)
+    elif self.op.powered is not None:
+      raise errors.OpPrereqError(("Unable to change powered state for node %s"
+                                  " which does not support out-of-band"
+                                  " handling") % self.op.node_name)
+
     # If we're being deofflined/drained, we'll MC ourself if needed
     if (self.op.drained == False or self.op.offline == False or
         (self.op.master_capable and not node.master_capable)):
@@ -4254,6 +4777,9 @@ class LUSetNodeParams(LogicalUnit):
     if self.op.ndparams:
       node.ndparams = self.new_ndparams
 
+    if self.op.powered is not None:
+      node.powered = self.op.powered
+
     for attr in ["master_capable", "vm_capable"]:
       val = getattr(self.op, attr)
       if val is not None:
@@ -4292,14 +4818,10 @@ class LUSetNodeParams(LogicalUnit):
     return result
 
 
-class LUPowercycleNode(NoHooksLU):
+class LUNodePowercycle(NoHooksLU):
   """Powercycles a node.
 
   """
-  _OP_PARAMS = [
-    _PNodeName,
-    _PForce,
-    ]
   REQ_BGL = False
 
   def CheckArguments(self):
@@ -4328,7 +4850,7 @@ class LUPowercycleNode(NoHooksLU):
     return result.payload
 
 
-class LUQueryClusterInfo(NoHooksLU):
+class LUClusterQuery(NoHooksLU):
   """Query cluster configuration.
 
   """
@@ -4373,11 +4895,13 @@ class LUQueryClusterInfo(NoHooksLU):
       "beparams": cluster.beparams,
       "osparams": cluster.osparams,
       "nicparams": cluster.nicparams,
+      "ndparams": cluster.ndparams,
       "candidate_pool_size": cluster.candidate_pool_size,
       "master_netdev": cluster.master_netdev,
       "volume_group_name": cluster.volume_group_name,
       "drbd_usermode_helper": cluster.drbd_usermode_helper,
       "file_storage_dir": cluster.file_storage_dir,
+      "shared_file_storage_dir": cluster.shared_file_storage_dir,
       "maintain_node_health": cluster.maintain_node_health,
       "ctime": cluster.ctime,
       "mtime": cluster.mtime,
@@ -4388,16 +4912,17 @@ class LUQueryClusterInfo(NoHooksLU):
       "reserved_lvs": cluster.reserved_lvs,
       "primary_ip_version": primary_ip_version,
       "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
+      "hidden_os": cluster.hidden_os,
+      "blacklisted_os": cluster.blacklisted_os,
       }
 
     return result
 
 
-class LUQueryConfigValues(NoHooksLU):
+class LUClusterConfigQuery(NoHooksLU):
   """Return configuration values.
 
   """
-  _OP_PARAMS = [_POutputFields]
   REQ_BGL = False
   _FIELDS_DYNAMIC = utils.FieldSet()
   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
@@ -4433,14 +4958,10 @@ class LUQueryConfigValues(NoHooksLU):
     return values
 
 
-class LUActivateInstanceDisks(NoHooksLU):
+class LUInstanceActivateDisks(NoHooksLU):
   """Bring up an instance's disks.
 
   """
-  _OP_PARAMS = [
-    _PInstanceName,
-    ("ignore_size", False, ht.TBool),
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -4515,13 +5036,13 @@ def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
   # SyncSource, etc.)
 
   # 1st pass, assemble on all nodes in secondary mode
-  for inst_disk in disks:
+  for idx, inst_disk in enumerate(disks):
     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
       if ignore_size:
         node_disk = node_disk.Copy()
         node_disk.UnsetSize()
       lu.cfg.SetDiskID(node_disk, node)
-      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
+      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False, idx)
       msg = result.fail_msg
       if msg:
         lu.proc.LogWarning("Could not prepare block device %s on node %s"
@@ -4533,7 +5054,7 @@ def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
   # FIXME: race condition on drbd migration to primary
 
   # 2nd pass, do only the primary node
-  for inst_disk in disks:
+  for idx, inst_disk in enumerate(disks):
     dev_path = None
 
     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
@@ -4543,7 +5064,7 @@ def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
         node_disk = node_disk.Copy()
         node_disk.UnsetSize()
       lu.cfg.SetDiskID(node_disk, node)
-      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
+      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True, idx)
       msg = result.fail_msg
       if msg:
         lu.proc.LogWarning("Could not prepare block device %s on node %s"
@@ -4579,13 +5100,10 @@ def _StartInstanceDisks(lu, instance, force):
     raise errors.OpExecError("Disk consistency error")
 
 
-class LUDeactivateInstanceDisks(NoHooksLU):
+class LUInstanceDeactivateDisks(NoHooksLU):
   """Shutdown an instance's disks.
 
   """
-  _OP_PARAMS = [
-    _PInstanceName,
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -4612,7 +5130,10 @@ class LUDeactivateInstanceDisks(NoHooksLU):
 
     """
     instance = self.instance
-    _SafeShutdownInstanceDisks(self, instance)
+    if self.op.force:
+      _ShutdownInstanceDisks(self, instance)
+    else:
+      _SafeShutdownInstanceDisks(self, instance)
 
 
 def _SafeShutdownInstanceDisks(lu, instance, disks=None):
@@ -4664,7 +5185,8 @@ def _ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
       if msg:
         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
                       disk.iv_name, node, msg)
-        if not ignore_primary or node != instance.primary_node:
+        if ((node == instance.primary_node and not ignore_primary) or
+            (node != instance.primary_node and not result.offline)):
           all_result = False
   return all_result
 
@@ -4725,9 +5247,8 @@ def _CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
       or we cannot check the node
 
   """
-  if req_sizes is not None:
-    for vg, req_size in req_sizes.iteritems():
-      _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
+  for vg, req_size in req_sizes.items():
+    _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
 
 
 def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
@@ -4767,19 +5288,12 @@ def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
                                  errors.ECODE_NORES)
 
 
-class LUStartupInstance(LogicalUnit):
+class LUInstanceStartup(LogicalUnit):
   """Starts an instance.
 
   """
   HPATH = "instance-start"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_PARAMS = [
-    _PInstanceName,
-    _PForce,
-    _PIgnoreOfflineNodes,
-    ("hvparams", ht.EmptyDict, ht.TDict),
-    ("beparams", ht.EmptyDict, ht.TDict),
-    ]
   REQ_BGL = False
 
   def CheckArguments(self):
@@ -4800,9 +5314,17 @@ class LUStartupInstance(LogicalUnit):
     env = {
       "FORCE": self.op.force,
       }
+
     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
+
+    return env
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
-    return env, nl, nl
+    return (nl, nl)
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -4875,18 +5397,12 @@ class LUStartupInstance(LogicalUnit):
         raise errors.OpExecError("Could not start instance: %s" % msg)
 
 
-class LURebootInstance(LogicalUnit):
+class LUInstanceReboot(LogicalUnit):
   """Reboot an instance.
 
   """
   HPATH = "instance-reboot"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_PARAMS = [
-    _PInstanceName,
-    ("ignore_secondaries", False, ht.TBool),
-    ("reboot_type", ht.NoDefault, ht.TElemOf(constants.REBOOT_TYPES)),
-    _PShutdownTimeout,
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -4903,9 +5419,17 @@ class LURebootInstance(LogicalUnit):
       "REBOOT_TYPE": self.op.reboot_type,
       "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
       }
+
     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
+
+    return env
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
-    return env, nl, nl
+    return (nl, nl)
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -4930,10 +5454,16 @@ class LURebootInstance(LogicalUnit):
     ignore_secondaries = self.op.ignore_secondaries
     reboot_type = self.op.reboot_type
 
+    remote_info = self.rpc.call_instance_info(instance.primary_node,
+                                              instance.name,
+                                              instance.hypervisor)
+    remote_info.Raise("Error checking node %s" % instance.primary_node)
+    instance_running = bool(remote_info.payload)
+
     node_current = instance.primary_node
 
-    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
-                       constants.INSTANCE_REBOOT_HARD]:
+    if instance_running and reboot_type in [constants.INSTANCE_REBOOT_SOFT,
+                                            constants.INSTANCE_REBOOT_HARD]:
       for disk in instance.disks:
         self.cfg.SetDiskID(disk, node_current)
       result = self.rpc.call_instance_reboot(node_current, instance,
@@ -4941,10 +5471,14 @@ class LURebootInstance(LogicalUnit):
                                              self.op.shutdown_timeout)
       result.Raise("Could not reboot instance")
     else:
-      result = self.rpc.call_instance_shutdown(node_current, instance,
-                                               self.op.shutdown_timeout)
-      result.Raise("Could not shutdown instance for full reboot")
-      _ShutdownInstanceDisks(self, instance)
+      if instance_running:
+        result = self.rpc.call_instance_shutdown(node_current, instance,
+                                                 self.op.shutdown_timeout)
+        result.Raise("Could not shutdown instance for full reboot")
+        _ShutdownInstanceDisks(self, instance)
+      else:
+        self.LogInfo("Instance %s was already stopped, starting now",
+                     instance.name)
       _StartInstanceDisks(self, instance, ignore_secondaries)
       result = self.rpc.call_instance_start(node_current, instance, None, None)
       msg = result.fail_msg
@@ -4956,17 +5490,12 @@ class LURebootInstance(LogicalUnit):
     self.cfg.MarkInstanceUp(instance.name)
 
 
-class LUShutdownInstance(LogicalUnit):
+class LUInstanceShutdown(LogicalUnit):
   """Shutdown an instance.
 
   """
   HPATH = "instance-stop"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_PARAMS = [
-    _PInstanceName,
-    _PIgnoreOfflineNodes,
-    ("timeout", constants.DEFAULT_SHUTDOWN_TIMEOUT, ht.TPositiveInt),
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -4980,8 +5509,14 @@ class LUShutdownInstance(LogicalUnit):
     """
     env = _BuildInstanceHookEnvByObject(self, self.instance)
     env["TIMEOUT"] = self.op.timeout
+    return env
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
-    return env, nl, nl
+    return (nl, nl)
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -5023,18 +5558,12 @@ class LUShutdownInstance(LogicalUnit):
       _ShutdownInstanceDisks(self, instance)
 
 
-class LUReinstallInstance(LogicalUnit):
+class LUInstanceReinstall(LogicalUnit):
   """Reinstall an instance.
 
   """
   HPATH = "instance-reinstall"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_PARAMS = [
-    _PInstanceName,
-    ("os_type", None, ht.TMaybeString),
-    ("force_variant", False, ht.TBool),
-    ("osparams", None, ht.TOr(ht.TDict, ht.TNone)),
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -5046,9 +5575,14 @@ class LUReinstallInstance(LogicalUnit):
     This runs on master, primary and secondary nodes of the instance.
 
     """
-    env = _BuildInstanceHookEnvByObject(self, self.instance)
+    return _BuildInstanceHookEnvByObject(self, self.instance)
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
-    return env, nl, nl
+    return (nl, nl)
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -5115,16 +5649,12 @@ class LUReinstallInstance(LogicalUnit):
       _ShutdownInstanceDisks(self, inst)
 
 
-class LURecreateInstanceDisks(LogicalUnit):
+class LUInstanceRecreateDisks(LogicalUnit):
   """Recreate an instance's missing disks.
 
   """
   HPATH = "instance-recreate-disks"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_PARAMS = [
-    _PInstanceName,
-    ("disks", ht.EmptyList, ht.TListOf(ht.TPositiveInt)),
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -5136,9 +5666,14 @@ class LURecreateInstanceDisks(LogicalUnit):
     This runs on master, primary and secondary nodes of the instance.
 
     """
-    env = _BuildInstanceHookEnvByObject(self, self.instance)
+    return _BuildInstanceHookEnvByObject(self, self.instance)
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
-    return env, nl, nl
+    return (nl, nl)
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -5179,18 +5714,12 @@ class LURecreateInstanceDisks(LogicalUnit):
     _CreateDisks(self, self.instance, to_skip=to_skip)
 
 
-class LURenameInstance(LogicalUnit):
+class LUInstanceRename(LogicalUnit):
   """Rename an instance.
 
   """
   HPATH = "instance-rename"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_PARAMS = [
-    _PInstanceName,
-    ("new_name", ht.NoDefault, ht.TNonEmptyString),
-    ("ip_check", False, ht.TBool),
-    ("name_check", True, ht.TBool),
-    ]
 
   def CheckArguments(self):
     """Check arguments.
@@ -5209,8 +5738,14 @@ class LURenameInstance(LogicalUnit):
     """
     env = _BuildInstanceHookEnvByObject(self, self.instance)
     env["INSTANCE_NEW_NAME"] = self.op.new_name
+    return env
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
-    return env, nl, nl
+    return (nl, nl)
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -5229,6 +5764,13 @@ class LURenameInstance(LogicalUnit):
     new_name = self.op.new_name
     if self.op.name_check:
       hostname = netutils.GetHostname(name=new_name)
+      self.LogInfo("Resolved given name '%s' to '%s'", new_name,
+                   hostname.name)
+      if not utils.MatchNameComponent(self.op.new_name, [hostname.name]):
+        raise errors.OpPrereqError(("Resolved hostname '%s' does not look the"
+                                    " same as given hostname '%s'") %
+                                    (hostname.name, self.op.new_name),
+                                    errors.ECODE_INVAL)
       new_name = self.op.new_name = hostname.name
       if (self.op.ip_check and
           netutils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT)):
@@ -5237,19 +5779,22 @@ class LURenameInstance(LogicalUnit):
                                    errors.ECODE_NOTUNIQUE)
 
     instance_list = self.cfg.GetInstanceList()
-    if new_name in instance_list:
+    if new_name in instance_list and new_name != instance.name:
       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
                                  new_name, errors.ECODE_EXISTS)
 
   def Exec(self, feedback_fn):
-    """Reinstall the instance.
+    """Rename the instance.
 
     """
     inst = self.instance
     old_name = inst.name
 
-    if inst.disk_template == constants.DT_FILE:
+    rename_file_storage = False
+    if (inst.disk_template in (constants.DT_FILE, constants.DT_SHARED_FILE) and
+        self.op.new_name != inst.name):
       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
+      rename_file_storage = True
 
     self.cfg.RenameInstance(inst.name, self.op.new_name)
     # Change the instance lock. This is definitely safe while we hold the BGL
@@ -5259,7 +5804,7 @@ class LURenameInstance(LogicalUnit):
     # re-read the instance from the configuration after rename
     inst = self.cfg.GetInstanceInfo(self.op.new_name)
 
-    if inst.disk_template == constants.DT_FILE:
+    if rename_file_storage:
       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
                                                      old_file_storage_dir,
@@ -5285,17 +5830,12 @@ class LURenameInstance(LogicalUnit):
     return inst.name
 
 
-class LURemoveInstance(LogicalUnit):
+class LUInstanceRemove(LogicalUnit):
   """Remove an instance.
 
   """
   HPATH = "instance-remove"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_PARAMS = [
-    _PInstanceName,
-    ("ignore_failures", False, ht.TBool),
-    _PShutdownTimeout,
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -5315,9 +5855,15 @@ class LURemoveInstance(LogicalUnit):
     """
     env = _BuildInstanceHookEnvByObject(self, self.instance)
     env["SHUTDOWN_TIMEOUT"] = self.op.shutdown_timeout
+    return env
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
     nl = [self.cfg.GetMasterNode()]
     nl_post = list(self.instance.all_nodes) + nl
-    return env, nl, nl_post
+    return (nl, nl_post)
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -5373,325 +5919,74 @@ def _RemoveInstance(lu, feedback_fn, instance, ignore_failures):
   lu.remove_locks[locking.LEVEL_INSTANCE] = instance.name
 
 
-class LUQueryInstances(NoHooksLU):
+class LUInstanceQuery(NoHooksLU):
   """Logical unit for querying instances.
 
   """
   # pylint: disable-msg=W0142
-  _OP_PARAMS = [
-    _POutputFields,
-    ("names", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
-    ("use_locking", False, ht.TBool),
-    ]
   REQ_BGL = False
-  _SIMPLE_FIELDS = ["name", "os", "network_port", "hypervisor",
-                    "serial_no", "ctime", "mtime", "uuid"]
-  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
-                                    "admin_state",
-                                    "disk_template", "ip", "mac", "bridge",
-                                    "nic_mode", "nic_link",
-                                    "sda_size", "sdb_size", "vcpus", "tags",
-                                    "network_port", "beparams",
-                                    r"(disk)\.(size)/([0-9]+)",
-                                    r"(disk)\.(sizes)", "disk_usage",
-                                    r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
-                                    r"(nic)\.(bridge)/([0-9]+)",
-                                    r"(nic)\.(macs|ips|modes|links|bridges)",
-                                    r"(disk|nic)\.(count)",
-                                    "hvparams", "custom_hvparams",
-                                    "custom_beparams", "custom_nicparams",
-                                    ] + _SIMPLE_FIELDS +
-                                  ["hv/%s" % name
-                                   for name in constants.HVS_PARAMETERS
-                                   if name not in constants.HVC_GLOBALS] +
-                                  ["be/%s" % name
-                                   for name in constants.BES_PARAMETERS])
-  _FIELDS_DYNAMIC = utils.FieldSet("oper_state",
-                                   "oper_ram",
-                                   "oper_vcpus",
-                                   "status")
-
 
   def CheckArguments(self):
-    _CheckOutputFields(static=self._FIELDS_STATIC,
-                       dynamic=self._FIELDS_DYNAMIC,
-                       selected=self.op.output_fields)
+    self.iq = _InstanceQuery(qlang.MakeSimpleFilter("name", self.op.names),
+                             self.op.output_fields, self.op.use_locking)
 
   def ExpandNames(self):
-    self.needed_locks = {}
-    self.share_locks[locking.LEVEL_INSTANCE] = 1
-    self.share_locks[locking.LEVEL_NODE] = 1
-
-    if self.op.names:
-      self.wanted = _GetWantedInstances(self, self.op.names)
-    else:
-      self.wanted = locking.ALL_SET
-
-    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
-    self.do_locking = self.do_node_query and self.op.use_locking
-    if self.do_locking:
-      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
-      self.needed_locks[locking.LEVEL_NODE] = []
-      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+    self.iq.ExpandNames(self)
 
   def DeclareLocks(self, level):
-    if level == locking.LEVEL_NODE and self.do_locking:
-      self._LockInstancesNodes()
+    self.iq.DeclareLocks(self, level)
 
   def Exec(self, feedback_fn):
-    """Computes the list of nodes and their attributes.
-
-    """
-    # pylint: disable-msg=R0912
-    # way too many branches here
-    all_info = self.cfg.GetAllInstancesInfo()
-    if self.wanted == locking.ALL_SET:
-      # caller didn't specify instance names, so ordering is not important
-      if self.do_locking:
-        instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
-      else:
-        instance_names = all_info.keys()
-      instance_names = utils.NiceSort(instance_names)
-    else:
-      # caller did specify names, so we must keep the ordering
-      if self.do_locking:
-        tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
-      else:
-        tgt_set = all_info.keys()
-      missing = set(self.wanted).difference(tgt_set)
-      if missing:
-        raise errors.OpExecError("Some instances were removed before"
-                                 " retrieving their data: %s" % missing)
-      instance_names = self.wanted
-
-    instance_list = [all_info[iname] for iname in instance_names]
-
-    # begin data gathering
-
-    nodes = frozenset([inst.primary_node for inst in instance_list])
-    hv_list = list(set([inst.hypervisor for inst in instance_list]))
-
-    bad_nodes = []
-    off_nodes = []
-    if self.do_node_query:
-      live_data = {}
-      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
-      for name in nodes:
-        result = node_data[name]
-        if result.offline:
-          # offline nodes will be in both lists
-          off_nodes.append(name)
-        if result.fail_msg:
-          bad_nodes.append(name)
-        else:
-          if result.payload:
-            live_data.update(result.payload)
-          # else no instance is alive
-    else:
-      live_data = dict([(name, {}) for name in instance_names])
-
-    # end data gathering
-
-    HVPREFIX = "hv/"
-    BEPREFIX = "be/"
-    output = []
-    cluster = self.cfg.GetClusterInfo()
-    for instance in instance_list:
-      iout = []
-      i_hv = cluster.FillHV(instance, skip_globals=True)
-      i_be = cluster.FillBE(instance)
-      i_nicp = [cluster.SimpleFillNIC(nic.nicparams) for nic in instance.nics]
-      for field in self.op.output_fields:
-        st_match = self._FIELDS_STATIC.Matches(field)
-        if field in self._SIMPLE_FIELDS:
-          val = getattr(instance, field)
-        elif field == "pnode":
-          val = instance.primary_node
-        elif field == "snodes":
-          val = list(instance.secondary_nodes)
-        elif field == "admin_state":
-          val = instance.admin_up
-        elif field == "oper_state":
-          if instance.primary_node in bad_nodes:
-            val = None
-          else:
-            val = bool(live_data.get(instance.name))
-        elif field == "status":
-          if instance.primary_node in off_nodes:
-            val = "ERROR_nodeoffline"
-          elif instance.primary_node in bad_nodes:
-            val = "ERROR_nodedown"
-          else:
-            running = bool(live_data.get(instance.name))
-            if running:
-              if instance.admin_up:
-                val = "running"
-              else:
-                val = "ERROR_up"
-            else:
-              if instance.admin_up:
-                val = "ERROR_down"
-              else:
-                val = "ADMIN_down"
-        elif field == "oper_ram":
-          if instance.primary_node in bad_nodes:
-            val = None
-          elif instance.name in live_data:
-            val = live_data[instance.name].get("memory", "?")
-          else:
-            val = "-"
-        elif field == "oper_vcpus":
-          if instance.primary_node in bad_nodes:
-            val = None
-          elif instance.name in live_data:
-            val = live_data[instance.name].get("vcpus", "?")
-          else:
-            val = "-"
-        elif field == "vcpus":
-          val = i_be[constants.BE_VCPUS]
-        elif field == "disk_template":
-          val = instance.disk_template
-        elif field == "ip":
-          if instance.nics:
-            val = instance.nics[0].ip
-          else:
-            val = None
-        elif field == "nic_mode":
-          if instance.nics:
-            val = i_nicp[0][constants.NIC_MODE]
-          else:
-            val = None
-        elif field == "nic_link":
-          if instance.nics:
-            val = i_nicp[0][constants.NIC_LINK]
-          else:
-            val = None
-        elif field == "bridge":
-          if (instance.nics and
-              i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
-            val = i_nicp[0][constants.NIC_LINK]
-          else:
-            val = None
-        elif field == "mac":
-          if instance.nics:
-            val = instance.nics[0].mac
-          else:
-            val = None
-        elif field == "custom_nicparams":
-          val = [nic.nicparams for nic in instance.nics]
-        elif field == "sda_size" or field == "sdb_size":
-          idx = ord(field[2]) - ord('a')
-          try:
-            val = instance.FindDisk(idx).size
-          except errors.OpPrereqError:
-            val = None
-        elif field == "disk_usage": # total disk usage per node
-          disk_sizes = [{'size': disk.size} for disk in instance.disks]
-          val = _ComputeDiskSize(instance.disk_template, disk_sizes)
-        elif field == "tags":
-          val = list(instance.GetTags())
-        elif field == "custom_hvparams":
-          val = instance.hvparams # not filled!
-        elif field == "hvparams":
-          val = i_hv
-        elif (field.startswith(HVPREFIX) and
-              field[len(HVPREFIX):] in constants.HVS_PARAMETERS and
-              field[len(HVPREFIX):] not in constants.HVC_GLOBALS):
-          val = i_hv.get(field[len(HVPREFIX):], None)
-        elif field == "custom_beparams":
-          val = instance.beparams
-        elif field == "beparams":
-          val = i_be
-        elif (field.startswith(BEPREFIX) and
-              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
-          val = i_be.get(field[len(BEPREFIX):], None)
-        elif st_match and st_match.groups():
-          # matches a variable list
-          st_groups = st_match.groups()
-          if st_groups and st_groups[0] == "disk":
-            if st_groups[1] == "count":
-              val = len(instance.disks)
-            elif st_groups[1] == "sizes":
-              val = [disk.size for disk in instance.disks]
-            elif st_groups[1] == "size":
-              try:
-                val = instance.FindDisk(st_groups[2]).size
-              except errors.OpPrereqError:
-                val = None
-            else:
-              assert False, "Unhandled disk parameter"
-          elif st_groups[0] == "nic":
-            if st_groups[1] == "count":
-              val = len(instance.nics)
-            elif st_groups[1] == "macs":
-              val = [nic.mac for nic in instance.nics]
-            elif st_groups[1] == "ips":
-              val = [nic.ip for nic in instance.nics]
-            elif st_groups[1] == "modes":
-              val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
-            elif st_groups[1] == "links":
-              val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
-            elif st_groups[1] == "bridges":
-              val = []
-              for nicp in i_nicp:
-                if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
-                  val.append(nicp[constants.NIC_LINK])
-                else:
-                  val.append(None)
-            else:
-              # index-based item
-              nic_idx = int(st_groups[2])
-              if nic_idx >= len(instance.nics):
-                val = None
-              else:
-                if st_groups[1] == "mac":
-                  val = instance.nics[nic_idx].mac
-                elif st_groups[1] == "ip":
-                  val = instance.nics[nic_idx].ip
-                elif st_groups[1] == "mode":
-                  val = i_nicp[nic_idx][constants.NIC_MODE]
-                elif st_groups[1] == "link":
-                  val = i_nicp[nic_idx][constants.NIC_LINK]
-                elif st_groups[1] == "bridge":
-                  nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
-                  if nic_mode == constants.NIC_MODE_BRIDGED:
-                    val = i_nicp[nic_idx][constants.NIC_LINK]
-                  else:
-                    val = None
-                else:
-                  assert False, "Unhandled NIC parameter"
-          else:
-            assert False, ("Declared but unhandled variable parameter '%s'" %
-                           field)
-        else:
-          assert False, "Declared but unhandled parameter '%s'" % field
-        iout.append(val)
-      output.append(iout)
-
-    return output
+    return self.iq.OldStyleQuery(self)
 
 
-class LUFailoverInstance(LogicalUnit):
+class LUInstanceFailover(LogicalUnit):
   """Failover an instance.
 
   """
   HPATH = "instance-failover"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_PARAMS = [
-    _PInstanceName,
-    ("ignore_consistency", False, ht.TBool),
-    _PShutdownTimeout,
-    ]
   REQ_BGL = False
 
+  def CheckArguments(self):
+    """Check the arguments.
+
+    """
+    self.iallocator = getattr(self.op, "iallocator", None)
+    self.target_node = getattr(self.op, "target_node", None)
+
   def ExpandNames(self):
     self._ExpandAndLockInstance()
+
+    if self.op.target_node is not None:
+      self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
+
     self.needed_locks[locking.LEVEL_NODE] = []
     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
 
+    ignore_consistency = self.op.ignore_consistency
+    shutdown_timeout = self.op.shutdown_timeout
+    self._migrater = TLMigrateInstance(self, self.op.instance_name,
+                                       cleanup=False,
+                                       iallocator=self.op.iallocator,
+                                       target_node=self.op.target_node,
+                                       failover=True,
+                                       ignore_consistency=ignore_consistency,
+                                       shutdown_timeout=shutdown_timeout)
+    self.tasklets = [self._migrater]
+
   def DeclareLocks(self, level):
     if level == locking.LEVEL_NODE:
-      self._LockInstancesNodes()
+      instance = self.context.cfg.GetInstanceInfo(self.op.instance_name)
+      if instance.disk_template in constants.DTS_EXT_MIRROR:
+        if self.op.target_node is None:
+          self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+        else:
+          self.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
+                                                   self.op.target_node]
+        del self.recalculate_locks[locking.LEVEL_NODE]
+      else:
+        self._LockInstancesNodes()
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -5699,131 +5994,36 @@ class LUFailoverInstance(LogicalUnit):
     This runs on master, primary and secondary nodes of the instance.
 
     """
-    instance = self.instance
+    instance = self._migrater.instance
     source_node = instance.primary_node
-    target_node = instance.secondary_nodes[0]
+    target_node = self._migrater.target_node
     env = {
       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
       "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
       "OLD_PRIMARY": source_node,
-      "OLD_SECONDARY": target_node,
       "NEW_PRIMARY": target_node,
-      "NEW_SECONDARY": source_node,
       }
-    env.update(_BuildInstanceHookEnvByObject(self, instance))
-    nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
-    nl_post = list(nl)
-    nl_post.append(source_node)
-    return env, nl, nl_post
-
-  def CheckPrereq(self):
-    """Check prerequisites.
-
-    This checks that the instance is in the cluster.
-
-    """
-    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
-    assert self.instance is not None, \
-      "Cannot retrieve locked instance %s" % self.op.instance_name
-
-    bep = self.cfg.GetClusterInfo().FillBE(instance)
-    if instance.disk_template not in constants.DTS_NET_MIRROR:
-      raise errors.OpPrereqError("Instance's disk layout is not"
-                                 " network mirrored, cannot failover.",
-                                 errors.ECODE_STATE)
-
-    secondary_nodes = instance.secondary_nodes
-    if not secondary_nodes:
-      raise errors.ProgrammerError("no secondary node but using "
-                                   "a mirrored disk template")
 
-    target_node = secondary_nodes[0]
-    _CheckNodeOnline(self, target_node)
-    _CheckNodeNotDrained(self, target_node)
-    if instance.admin_up:
-      # check memory requirements on the secondary node
-      _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
-                           instance.name, bep[constants.BE_MEMORY],
-                           instance.hypervisor)
+    if instance.disk_template in constants.DTS_INT_MIRROR:
+      env["OLD_SECONDARY"] = instance.secondary_nodes[0]
+      env["NEW_SECONDARY"] = source_node
     else:
-      self.LogInfo("Not checking memory on the secondary node as"
-                   " instance will not be started")
+      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
 
-    # check bridge existance
-    _CheckInstanceBridgesExist(self, instance, node=target_node)
+    env.update(_BuildInstanceHookEnvByObject(self, instance))
 
-  def Exec(self, feedback_fn):
-    """Failover an instance.
+    return env
 
-    The failover is done by shutting it down on its present node and
-    starting it on the secondary.
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
 
     """
-    instance = self.instance
-    primary_node = self.cfg.GetNodeInfo(instance.primary_node)
-
-    source_node = instance.primary_node
-    target_node = instance.secondary_nodes[0]
-
-    if instance.admin_up:
-      feedback_fn("* checking disk consistency between source and target")
-      for dev in instance.disks:
-        # for drbd, these are drbd over lvm
-        if not _CheckDiskConsistency(self, dev, target_node, False):
-          if not self.op.ignore_consistency:
-            raise errors.OpExecError("Disk %s is degraded on target node,"
-                                     " aborting failover." % dev.iv_name)
-    else:
-      feedback_fn("* not checking disk consistency as instance is not running")
-
-    feedback_fn("* shutting down instance on source node")
-    logging.info("Shutting down instance %s on node %s",
-                 instance.name, source_node)
-
-    result = self.rpc.call_instance_shutdown(source_node, instance,
-                                             self.op.shutdown_timeout)
-    msg = result.fail_msg
-    if msg:
-      if self.op.ignore_consistency or primary_node.offline:
-        self.proc.LogWarning("Could not shutdown instance %s on node %s."
-                             " Proceeding anyway. Please make sure node"
-                             " %s is down. Error details: %s",
-                             instance.name, source_node, source_node, msg)
-      else:
-        raise errors.OpExecError("Could not shutdown instance %s on"
-                                 " node %s: %s" %
-                                 (instance.name, source_node, msg))
-
-    feedback_fn("* deactivating the instance's disks on source node")
-    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
-      raise errors.OpExecError("Can't shut down the instance's disks.")
-
-    instance.primary_node = target_node
-    # distribute new instance config to the other nodes
-    self.cfg.Update(instance, feedback_fn)
-
-    # Only start the instance if it's marked as up
-    if instance.admin_up:
-      feedback_fn("* activating the instance's disks on target node")
-      logging.info("Starting instance %s on node %s",
-                   instance.name, target_node)
-
-      disks_ok, _ = _AssembleInstanceDisks(self, instance,
-                                           ignore_secondaries=True)
-      if not disks_ok:
-        _ShutdownInstanceDisks(self, instance)
-        raise errors.OpExecError("Can't activate the instance's disks")
-
-      feedback_fn("* starting the instance on the target node")
-      result = self.rpc.call_instance_start(target_node, instance, None, None)
-      msg = result.fail_msg
-      if msg:
-        _ShutdownInstanceDisks(self, instance)
-        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
-                                 (instance.name, target_node, msg))
+    instance = self._migrater.instance
+    nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
+    return (nl, nl + [instance.primary_node])
 
 
-class LUMigrateInstance(LogicalUnit):
+class LUInstanceMigrate(LogicalUnit):
   """Migrate an instance.
 
   This is migration without shutting down, compared to the failover,
@@ -5832,28 +6032,37 @@ class LUMigrateInstance(LogicalUnit):
   """
   HPATH = "instance-migrate"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_PARAMS = [
-    _PInstanceName,
-    _PMigrationMode,
-    _PMigrationLive,
-    ("cleanup", False, ht.TBool),
-    ]
-
   REQ_BGL = False
 
   def ExpandNames(self):
     self._ExpandAndLockInstance()
 
+    if self.op.target_node is not None:
+      self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
+
     self.needed_locks[locking.LEVEL_NODE] = []
     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
 
     self._migrater = TLMigrateInstance(self, self.op.instance_name,
-                                       self.op.cleanup)
+                                       cleanup=self.op.cleanup,
+                                       iallocator=self.op.iallocator,
+                                       target_node=self.op.target_node,
+                                       failover=False,
+                                       fallback=self.op.allow_failover)
     self.tasklets = [self._migrater]
 
   def DeclareLocks(self, level):
     if level == locking.LEVEL_NODE:
-      self._LockInstancesNodes()
+      instance = self.context.cfg.GetInstanceInfo(self.op.instance_name)
+      if instance.disk_template in constants.DTS_EXT_MIRROR:
+        if self.op.target_node is None:
+          self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+        else:
+          self.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
+                                                   self.op.target_node]
+        del self.recalculate_locks[locking.LEVEL_NODE]
+      else:
+        self._LockInstancesNodes()
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -5863,33 +6072,38 @@ class LUMigrateInstance(LogicalUnit):
     """
     instance = self._migrater.instance
     source_node = instance.primary_node
-    target_node = instance.secondary_nodes[0]
+    target_node = self._migrater.target_node
     env = _BuildInstanceHookEnvByObject(self, instance)
-    env["MIGRATE_LIVE"] = self._migrater.live
-    env["MIGRATE_CLEANUP"] = self.op.cleanup
     env.update({
-        "OLD_PRIMARY": source_node,
-        "OLD_SECONDARY": target_node,
-        "NEW_PRIMARY": target_node,
-        "NEW_SECONDARY": source_node,
-        })
+      "MIGRATE_LIVE": self._migrater.live,
+      "MIGRATE_CLEANUP": self.op.cleanup,
+      "OLD_PRIMARY": source_node,
+      "NEW_PRIMARY": target_node,
+      })
+
+    if instance.disk_template in constants.DTS_INT_MIRROR:
+      env["OLD_SECONDARY"] = target_node
+      env["NEW_SECONDARY"] = source_node
+    else:
+      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = None
+
+    return env
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
+    instance = self._migrater.instance
     nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
-    nl_post = list(nl)
-    nl_post.append(source_node)
-    return env, nl, nl_post
+    return (nl, nl + [instance.primary_node])
 
 
-class LUMoveInstance(LogicalUnit):
+class LUInstanceMove(LogicalUnit):
   """Move an instance by data-copying.
 
   """
   HPATH = "instance-move"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_PARAMS = [
-    _PInstanceName,
-    ("target_node", ht.NoDefault, ht.TNonEmptyString),
-    _PShutdownTimeout,
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -5914,9 +6128,18 @@ class LUMoveInstance(LogicalUnit):
       "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
       }
     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
-    nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node,
-                                       self.op.target_node]
-    return env, nl, nl
+    return env
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
+    nl = [
+      self.cfg.GetMasterNode(),
+      self.instance.primary_node,
+      self.op.target_node,
+      ]
+    return (nl, nl)
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -6009,7 +6232,7 @@ class LUMoveInstance(LogicalUnit):
     for idx, disk in enumerate(instance.disks):
       self.LogInfo("Copying data for disk %d", idx)
       result = self.rpc.call_blockdev_assemble(target_node, disk,
-                                               instance.name, True)
+                                               instance.name, True, idx)
       if result.fail_msg:
         self.LogWarning("Can't assemble newly created disk %d: %s",
                         idx, result.fail_msg)
@@ -6059,45 +6282,55 @@ class LUMoveInstance(LogicalUnit):
                                  (instance.name, target_node, msg))
 
 
-class LUMigrateNode(LogicalUnit):
+class LUNodeMigrate(LogicalUnit):
   """Migrate all instances from a node.
 
   """
   HPATH = "node-migrate"
   HTYPE = constants.HTYPE_NODE
-  _OP_PARAMS = [
-    _PNodeName,
-    _PMigrationMode,
-    _PMigrationLive,
-    ]
   REQ_BGL = False
 
+  def CheckArguments(self):
+    _CheckIAllocatorOrNode(self, "iallocator", "remote_node")
+
   def ExpandNames(self):
     self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
 
-    self.needed_locks = {
-      locking.LEVEL_NODE: [self.op.node_name],
-      }
-
-    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+    self.needed_locks = {}
 
     # Create tasklets for migrating instances for all instances on this node
     names = []
     tasklets = []
 
+    self.lock_all_nodes = False
+
     for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
       logging.debug("Migrating instance %s", inst.name)
       names.append(inst.name)
 
-      tasklets.append(TLMigrateInstance(self, inst.name, False))
+      tasklets.append(TLMigrateInstance(self, inst.name, cleanup=False,
+                                        iallocator=self.op.iallocator,
+                                        taget_node=None))
+
+      if inst.disk_template in constants.DTS_EXT_MIRROR:
+        # We need to lock all nodes, as the iallocator will choose the
+        # destination nodes afterwards
+        self.lock_all_nodes = True
 
     self.tasklets = tasklets
 
+    # Declare node locks
+    if self.lock_all_nodes:
+      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+    else:
+      self.needed_locks[locking.LEVEL_NODE] = [self.op.node_name]
+      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+
     # Declare instance locks
     self.needed_locks[locking.LEVEL_INSTANCE] = names
 
   def DeclareLocks(self, level):
-    if level == locking.LEVEL_NODE:
+    if level == locking.LEVEL_NODE and not self.lock_all_nodes:
       self._LockInstancesNodes()
 
   def BuildHooksEnv(self):
@@ -6106,13 +6339,16 @@ class LUMigrateNode(LogicalUnit):
     This runs on the master, the primary and all the secondaries.
 
     """
-    env = {
+    return {
       "NODE_NAME": self.op.node_name,
       }
 
-    nl = [self.cfg.GetMasterNode()]
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
 
-    return (env, nl, nl)
+    """
+    nl = [self.cfg.GetMasterNode()]
+    return (nl, nl)
 
 
 class TLMigrateInstance(Tasklet):
@@ -6121,9 +6357,28 @@ class TLMigrateInstance(Tasklet):
   @type live: boolean
   @ivar live: whether the migration will be done live or non-live;
       this variable is initalized only after CheckPrereq has run
+  @type cleanup: boolean
+  @ivar cleanup: Wheater we cleanup from a failed migration
+  @type iallocator: string
+  @ivar iallocator: The iallocator used to determine target_node
+  @type target_node: string
+  @ivar target_node: If given, the target_node to reallocate the instance to
+  @type failover: boolean
+  @ivar failover: Whether operation results in failover or migration
+  @type fallback: boolean
+  @ivar fallback: Whether fallback to failover is allowed if migration not
+                  possible
+  @type ignore_consistency: boolean
+  @ivar ignore_consistency: Wheter we should ignore consistency between source
+                            and target node
+  @type shutdown_timeout: int
+  @ivar shutdown_timeout: In case of failover timeout of the shutdown
 
   """
-  def __init__(self, lu, instance_name, cleanup):
+  def __init__(self, lu, instance_name, cleanup=False, iallocator=None,
+               target_node=None, failover=False, fallback=False,
+               ignore_consistency=False,
+               shutdown_timeout=constants.DEFAULT_SHUTDOWN_TIMEOUT):
     """Initializes this class.
 
     """
@@ -6133,6 +6388,12 @@ class TLMigrateInstance(Tasklet):
     self.instance_name = instance_name
     self.cleanup = cleanup
     self.live = False # will be overridden later
+    self.iallocator = iallocator
+    self.target_node = target_node
+    self.failover = failover
+    self.fallback = fallback
+    self.ignore_consistency = ignore_consistency
+    self.shutdown_timeout = shutdown_timeout
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -6143,54 +6404,142 @@ class TLMigrateInstance(Tasklet):
     instance_name = _ExpandInstanceName(self.lu.cfg, self.instance_name)
     instance = self.cfg.GetInstanceInfo(instance_name)
     assert instance is not None
+    self.instance = instance
 
-    if instance.disk_template != constants.DT_DRBD8:
-      raise errors.OpPrereqError("Instance's disk layout is not"
-                                 " drbd8, cannot migrate.", errors.ECODE_STATE)
+    if (not self.cleanup and not instance.admin_up and not self.failover and
+        self.fallback):
+      self.lu.LogInfo("Instance is marked down, fallback allowed, switching"
+                      " to failover")
+      self.failover = True
+
+    if instance.disk_template not in constants.DTS_MIRRORED:
+      if self.failover:
+        text = "failovers"
+      else:
+        text = "migrations"
+      raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
+                                 " %s" % (instance.disk_template, text),
+                                 errors.ECODE_STATE)
 
-    secondary_nodes = instance.secondary_nodes
-    if not secondary_nodes:
-      raise errors.ConfigurationError("No secondary node but using"
-                                      " drbd8 disk template")
+    if instance.disk_template in constants.DTS_EXT_MIRROR:
+      _CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
+
+      if self.iallocator:
+        self._RunAllocator()
+
+      # self.target_node is already populated, either directly or by the
+      # iallocator run
+      target_node = self.target_node
+
+      if len(self.lu.tasklets) == 1:
+        # It is safe to remove locks only when we're the only tasklet in the LU
+        nodes_keep = [instance.primary_node, self.target_node]
+        nodes_rel = [node for node in self.lu.acquired_locks[locking.LEVEL_NODE]
+                     if node not in nodes_keep]
+        self.lu.context.glm.release(locking.LEVEL_NODE, nodes_rel)
+        self.lu.acquired_locks[locking.LEVEL_NODE] = nodes_keep
+
+    else:
+      secondary_nodes = instance.secondary_nodes
+      if not secondary_nodes:
+        raise errors.ConfigurationError("No secondary node but using"
+                                        " %s disk template" %
+                                        instance.disk_template)
+      target_node = secondary_nodes[0]
+      if self.iallocator or (self.target_node and
+                             self.target_node != target_node):
+        if self.failover:
+          text = "failed over"
+        else:
+          text = "migrated"
+        raise errors.OpPrereqError("Instances with disk template %s cannot"
+                                   " be %s over to arbitrary nodes"
+                                   " (neither an iallocator nor a target"
+                                   " node can be passed)" %
+                                   (text, instance.disk_template),
+                                   errors.ECODE_INVAL)
 
     i_be = self.cfg.GetClusterInfo().FillBE(instance)
 
-    target_node = secondary_nodes[0]
     # check memory requirements on the secondary node
-    _CheckNodeFreeMemory(self.lu, target_node, "migrating instance %s" %
-                         instance.name, i_be[constants.BE_MEMORY],
-                         instance.hypervisor)
+    if not self.failover or instance.admin_up:
+      _CheckNodeFreeMemory(self.lu, target_node, "migrating instance %s" %
+                           instance.name, i_be[constants.BE_MEMORY],
+                           instance.hypervisor)
+    else:
+      self.lu.LogInfo("Not checking memory on the secondary node as"
+                      " instance will not be started")
 
     # check bridge existance
     _CheckInstanceBridgesExist(self.lu, instance, node=target_node)
 
     if not self.cleanup:
       _CheckNodeNotDrained(self.lu, target_node)
-      result = self.rpc.call_instance_migratable(instance.primary_node,
-                                                 instance)
-      result.Raise("Can't migrate, please use failover",
-                   prereq=True, ecode=errors.ECODE_STATE)
+      if not self.failover:
+        result = self.rpc.call_instance_migratable(instance.primary_node,
+                                                   instance)
+        if result.fail_msg and self.fallback:
+          self.lu.LogInfo("Can't migrate, instance offline, fallback to"
+                          " failover")
+          self.failover = True
+        else:
+          result.Raise("Can't migrate, please use failover",
+                       prereq=True, ecode=errors.ECODE_STATE)
 
-    self.instance = instance
+    assert not (self.failover and self.cleanup)
 
-    if self.lu.op.live is not None and self.lu.op.mode is not None:
-      raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
-                                 " parameters are accepted",
-                                 errors.ECODE_INVAL)
-    if self.lu.op.live is not None:
-      if self.lu.op.live:
-        self.lu.op.mode = constants.HT_MIGRATION_LIVE
-      else:
-        self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
-      # reset the 'live' parameter to None so that repeated
-      # invocations of CheckPrereq do not raise an exception
-      self.lu.op.live = None
-    elif self.lu.op.mode is None:
-      # read the default value from the hypervisor
-      i_hv = self.cfg.GetClusterInfo().FillHV(instance, skip_globals=False)
-      self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
+  def _RunAllocator(self):
+    """Run the allocator based on input opcode.
+
+    """
+    ial = IAllocator(self.cfg, self.rpc,
+                     mode=constants.IALLOCATOR_MODE_RELOC,
+                     name=self.instance_name,
+                     # TODO See why hail breaks with a single node below
+                     relocate_from=[self.instance.primary_node,
+                                    self.instance.primary_node],
+                     )
+
+    ial.Run(self.iallocator)
+
+    if not ial.success:
+      raise errors.OpPrereqError("Can't compute nodes using"
+                                 " iallocator '%s': %s" %
+                                 (self.iallocator, ial.info),
+                                 errors.ECODE_NORES)
+    if len(ial.result) != ial.required_nodes:
+      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
+                                 " of nodes (%s), required %s" %
+                                 (self.iallocator, len(ial.result),
+                                  ial.required_nodes), errors.ECODE_FAULT)
+    self.target_node = ial.result[0]
+    self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
+                 self.instance_name, self.iallocator,
+                 utils.CommaJoin(ial.result))
 
-    self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
+    if not self.failover:
+      if self.lu.op.live is not None and self.lu.op.mode is not None:
+        raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
+                                   " parameters are accepted",
+                                   errors.ECODE_INVAL)
+      if self.lu.op.live is not None:
+        if self.lu.op.live:
+          self.lu.op.mode = constants.HT_MIGRATION_LIVE
+        else:
+          self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
+        # reset the 'live' parameter to None so that repeated
+        # invocations of CheckPrereq do not raise an exception
+        self.lu.op.live = None
+      elif self.lu.op.mode is None:
+        # read the default value from the hypervisor
+        i_hv = self.cfg.GetClusterInfo().FillHV(self.instance,
+                                                skip_globals=False)
+        self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
+
+      self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
+    else:
+      # Failover is never live
+      self.live = False
 
   def _WaitUntilSync(self):
     """Poll with custom rpc for disk sync.
@@ -6307,16 +6656,17 @@ class TLMigrateInstance(Tasklet):
                        " primary node (%s)" % source_node)
       demoted_node = target_node
 
-    self._EnsureSecondary(demoted_node)
-    try:
+    if instance.disk_template in constants.DTS_INT_MIRROR:
+      self._EnsureSecondary(demoted_node)
+      try:
+        self._WaitUntilSync()
+      except errors.OpExecError:
+        # we ignore here errors, since if the device is standalone, it
+        # won't be able to sync
+        pass
+      self._GoStandalone()
+      self._GoReconnect(False)
       self._WaitUntilSync()
-    except errors.OpExecError:
-      # we ignore here errors, since if the device is standalone, it
-      # won't be able to sync
-      pass
-    self._GoStandalone()
-    self._GoReconnect(False)
-    self._WaitUntilSync()
 
     self.feedback_fn("* done")
 
@@ -6325,6 +6675,9 @@ class TLMigrateInstance(Tasklet):
 
     """
     target_node = self.target_node
+    if self.instance.disk_template in constants.DTS_EXT_MIRROR:
+      return
+
     try:
       self._EnsureSecondary(target_node)
       self._GoStandalone()
@@ -6389,11 +6742,12 @@ class TLMigrateInstance(Tasklet):
 
     self.migration_info = migration_info = result.payload
 
-    # Then switch the disks to master/master mode
-    self._EnsureSecondary(target_node)
-    self._GoStandalone()
-    self._GoReconnect(True)
-    self._WaitUntilSync()
+    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
+      # Then switch the disks to master/master mode
+      self._EnsureSecondary(target_node)
+      self._GoStandalone()
+      self._GoReconnect(True)
+      self._WaitUntilSync()
 
     self.feedback_fn("* preparing %s to accept the instance" % target_node)
     result = self.rpc.call_accept_instance(target_node,
@@ -6412,7 +6766,6 @@ class TLMigrateInstance(Tasklet):
                                (instance.name, msg))
 
     self.feedback_fn("* migrating instance to %s" % target_node)
-    time.sleep(10)
     result = self.rpc.call_instance_migrate(source_node, instance,
                                             self.nodes_ip[target_node],
                                             self.live)
@@ -6425,7 +6778,6 @@ class TLMigrateInstance(Tasklet):
       self._RevertDiskStatus()
       raise errors.OpExecError("Could not migrate instance %s: %s" %
                                (instance.name, msg))
-    time.sleep(10)
 
     instance.primary_node = target_node
     # distribute new instance config to the other nodes
@@ -6442,34 +6794,115 @@ class TLMigrateInstance(Tasklet):
       raise errors.OpExecError("Could not finalize instance migration: %s" %
                                msg)
 
-    self._EnsureSecondary(source_node)
-    self._WaitUntilSync()
-    self._GoStandalone()
-    self._GoReconnect(False)
-    self._WaitUntilSync()
+    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
+      self._EnsureSecondary(source_node)
+      self._WaitUntilSync()
+      self._GoStandalone()
+      self._GoReconnect(False)
+      self._WaitUntilSync()
 
     self.feedback_fn("* done")
 
+  def _ExecFailover(self):
+    """Failover an instance.
+
+    The failover is done by shutting it down on its present node and
+    starting it on the secondary.
+
+    """
+    instance = self.instance
+    primary_node = self.cfg.GetNodeInfo(instance.primary_node)
+
+    source_node = instance.primary_node
+    target_node = self.target_node
+
+    if instance.admin_up:
+      self.feedback_fn("* checking disk consistency between source and target")
+      for dev in instance.disks:
+        # for drbd, these are drbd over lvm
+        if not _CheckDiskConsistency(self, dev, target_node, False):
+          if not self.ignore_consistency:
+            raise errors.OpExecError("Disk %s is degraded on target node,"
+                                     " aborting failover." % dev.iv_name)
+    else:
+      self.feedback_fn("* not checking disk consistency as instance is not"
+                       " running")
+
+    self.feedback_fn("* shutting down instance on source node")
+    logging.info("Shutting down instance %s on node %s",
+                 instance.name, source_node)
+
+    result = self.rpc.call_instance_shutdown(source_node, instance,
+                                             self.shutdown_timeout)
+    msg = result.fail_msg
+    if msg:
+      if self.ignore_consistency or primary_node.offline:
+        self.lu.LogWarning("Could not shutdown instance %s on node %s."
+                           " Proceeding anyway. Please make sure node"
+                           " %s is down. Error details: %s",
+                           instance.name, source_node, source_node, msg)
+      else:
+        raise errors.OpExecError("Could not shutdown instance %s on"
+                                 " node %s: %s" %
+                                 (instance.name, source_node, msg))
+
+    self.feedback_fn("* deactivating the instance's disks on source node")
+    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
+      raise errors.OpExecError("Can't shut down the instance's disks.")
+
+    instance.primary_node = target_node
+    # distribute new instance config to the other nodes
+    self.cfg.Update(instance, self.feedback_fn)
+
+    # Only start the instance if it's marked as up
+    if instance.admin_up:
+      self.feedback_fn("* activating the instance's disks on target node")
+      logging.info("Starting instance %s on node %s",
+                   instance.name, target_node)
+
+      disks_ok, _ = _AssembleInstanceDisks(self, instance,
+                                           ignore_secondaries=True)
+      if not disks_ok:
+        _ShutdownInstanceDisks(self, instance)
+        raise errors.OpExecError("Can't activate the instance's disks")
+
+      self.feedback_fn("* starting the instance on the target node")
+      result = self.rpc.call_instance_start(target_node, instance, None, None)
+      msg = result.fail_msg
+      if msg:
+        _ShutdownInstanceDisks(self, instance)
+        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
+                                 (instance.name, target_node, msg))
+
   def Exec(self, feedback_fn):
     """Perform the migration.
 
     """
-    feedback_fn("Migrating instance %s" % self.instance.name)
-
     self.feedback_fn = feedback_fn
-
     self.source_node = self.instance.primary_node
-    self.target_node = self.instance.secondary_nodes[0]
+
+    # FIXME: if we implement migrate-to-any in DRBD, this needs fixing
+    if self.instance.disk_template in constants.DTS_INT_MIRROR:
+      self.target_node = self.instance.secondary_nodes[0]
+      # Otherwise self.target_node has been populated either
+      # directly, or through an iallocator.
+
     self.all_nodes = [self.source_node, self.target_node]
     self.nodes_ip = {
       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
       }
 
-    if self.cleanup:
-      return self._ExecCleanup()
+    if self.failover:
+      feedback_fn("Failover instance %s" % self.instance.name)
+      self._ExecFailover()
     else:
-      return self._ExecMigration()
+      feedback_fn("Migrating instance %s" % self.instance.name)
+
+      if self.cleanup:
+        return self._ExecCleanup()
+      else:
+        return self._ExecMigration()
 
 
 def _CreateBlockDev(lu, node, instance, device, force_create,
@@ -6600,12 +7033,13 @@ def _GenerateDiskTemplate(lu, template_name,
                                       for i in range(disk_count)])
     for idx, disk in enumerate(disk_info):
       disk_index = idx + base_index
-      vg = disk.get("vg", vgname)
+      vg = disk.get(constants.IDISK_VG, vgname)
       feedback_fn("* disk %i, vg %s, name %s" % (idx, vg, names[idx]))
-      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
+      disk_dev = objects.Disk(dev_type=constants.LD_LV,
+                              size=disk[constants.IDISK_SIZE],
                               logical_id=(vg, names[idx]),
                               iv_name="disk/%d" % disk_index,
-                              mode=disk["mode"])
+                              mode=disk[constants.IDISK_MODE])
       disks.append(disk_dev)
   elif template_name == constants.DT_DRBD8:
     if len(secondary_nodes) != 1:
@@ -6621,28 +7055,60 @@ def _GenerateDiskTemplate(lu, template_name,
       names.append(lv_prefix + "_meta")
     for idx, disk in enumerate(disk_info):
       disk_index = idx + base_index
-      vg = disk.get("vg", vgname)
+      vg = disk.get(constants.IDISK_VG, vgname)
       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
-                                      disk["size"], vg, names[idx*2:idx*2+2],
+                                      disk[constants.IDISK_SIZE], vg,
+                                      names[idx * 2:idx * 2 + 2],
                                       "disk/%d" % disk_index,
-                                      minors[idx*2], minors[idx*2+1])
-      disk_dev.mode = disk["mode"]
+                                      minors[idx * 2], minors[idx * 2 + 1])
+      disk_dev.mode = disk[constants.IDISK_MODE]
       disks.append(disk_dev)
   elif template_name == constants.DT_FILE:
     if len(secondary_nodes) != 0:
       raise errors.ProgrammerError("Wrong template configuration")
 
-    _RequireFileStorage()
+    opcodes.RequireFileStorage()
+
+    for idx, disk in enumerate(disk_info):
+      disk_index = idx + base_index
+      disk_dev = objects.Disk(dev_type=constants.LD_FILE,
+                              size=disk[constants.IDISK_SIZE],
+                              iv_name="disk/%d" % disk_index,
+                              logical_id=(file_driver,
+                                          "%s/disk%d" % (file_storage_dir,
+                                                         disk_index)),
+                              mode=disk[constants.IDISK_MODE])
+      disks.append(disk_dev)
+  elif template_name == constants.DT_SHARED_FILE:
+    if len(secondary_nodes) != 0:
+      raise errors.ProgrammerError("Wrong template configuration")
+
+    opcodes.RequireSharedFileStorage()
 
     for idx, disk in enumerate(disk_info):
       disk_index = idx + base_index
-      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
+      disk_dev = objects.Disk(dev_type=constants.LD_FILE,
+                              size=disk[constants.IDISK_SIZE],
                               iv_name="disk/%d" % disk_index,
                               logical_id=(file_driver,
                                           "%s/disk%d" % (file_storage_dir,
                                                          disk_index)),
-                              mode=disk["mode"])
+                              mode=disk[constants.IDISK_MODE])
+      disks.append(disk_dev)
+  elif template_name == constants.DT_BLOCK:
+    if len(secondary_nodes) != 0:
+      raise errors.ProgrammerError("Wrong template configuration")
+
+    for idx, disk in enumerate(disk_info):
+      disk_index = idx + base_index
+      disk_dev = objects.Disk(dev_type=constants.LD_BLOCKDEV,
+                              size=disk[constants.IDISK_SIZE],
+                              logical_id=(constants.BLOCKDEV_DRIVER_MANUAL,
+                                          disk[constants.IDISK_ADOPT]),
+                              iv_name="disk/%d" % disk_index,
+                              mode=disk[constants.IDISK_MODE])
       disks.append(disk_dev)
+
   else:
     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
   return disks
@@ -6679,32 +7145,57 @@ def _WipeDisks(lu, instance):
 
   """
   node = instance.primary_node
-  for idx, device in enumerate(instance.disks):
-    lu.LogInfo("* Wiping disk %d", idx)
-    logging.info("Wiping disk %d for instance %s", idx, instance.name)
-
-    # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
-    # MAX_WIPE_CHUNK at max
-    wipe_chunk_size = min(constants.MAX_WIPE_CHUNK, device.size / 100.0 *
-                          constants.MIN_WIPE_CHUNK_PERCENT)
-
-    offset = 0
-    size = device.size
-    last_output = 0
-    start_time = time.time()
-
-    while offset < size:
-      wipe_size = min(wipe_chunk_size, size - offset)
-      result = lu.rpc.call_blockdev_wipe(node, device, offset, wipe_size)
-      result.Raise("Could not wipe disk %d at offset %d for size %d" %
-                   (idx, offset, wipe_size))
-      now = time.time()
-      offset += wipe_size
-      if now - last_output >= 60:
-        eta = _CalcEta(now - start_time, offset, size)
-        lu.LogInfo(" - done: %.1f%% ETA: %s" %
-                   (offset / float(size) * 100, utils.FormatSeconds(eta)))
-        last_output = now
+
+  for device in instance.disks:
+    lu.cfg.SetDiskID(device, node)
+
+  logging.info("Pause sync of instance %s disks", instance.name)
+  result = lu.rpc.call_blockdev_pause_resume_sync(node, instance.disks, True)
+
+  for idx, success in enumerate(result.payload):
+    if not success:
+      logging.warn("pause-sync of instance %s for disks %d failed",
+                   instance.name, idx)
+
+  try:
+    for idx, device in enumerate(instance.disks):
+      lu.LogInfo("* Wiping disk %d", idx)
+      logging.info("Wiping disk %d for instance %s, node %s",
+                   idx, instance.name, node)
+
+      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
+      # MAX_WIPE_CHUNK at max
+      wipe_chunk_size = min(constants.MAX_WIPE_CHUNK, device.size / 100.0 *
+                            constants.MIN_WIPE_CHUNK_PERCENT)
+
+      offset = 0
+      size = device.size
+      last_output = 0
+      start_time = time.time()
+
+      while offset < size:
+        wipe_size = min(wipe_chunk_size, size - offset)
+        result = lu.rpc.call_blockdev_wipe(node, device, offset, wipe_size)
+        result.Raise("Could not wipe disk %d at offset %d for size %d" %
+                     (idx, offset, wipe_size))
+        now = time.time()
+        offset += wipe_size
+        if now - last_output >= 60:
+          eta = _CalcEta(now - start_time, offset, size)
+          lu.LogInfo(" - done: %.1f%% ETA: %s" %
+                     (offset / float(size) * 100, utils.FormatSeconds(eta)))
+          last_output = now
+  finally:
+    logging.info("Resume sync of instance %s disks", instance.name)
+
+    result = lu.rpc.call_blockdev_pause_resume_sync(node, instance.disks, False)
+
+    for idx, success in enumerate(result.payload):
+      if not success:
+        lu.LogWarning("Warning: Resume sync of disk %d failed. Please have a"
+                      " look at the status and troubleshoot the issue.", idx)
+        logging.warn("resume-sync of instance %s for disks %d failed",
+                     instance.name, idx)
 
 
 def _CreateDisks(lu, instance, to_skip=None, target_node=None):
@@ -6732,7 +7223,7 @@ def _CreateDisks(lu, instance, to_skip=None, target_node=None):
     pnode = target_node
     all_nodes = [pnode]
 
-  if instance.disk_template == constants.DT_FILE:
+  if instance.disk_template in (constants.DT_FILE, constants.DT_SHARED_FILE):
     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
 
@@ -6740,7 +7231,7 @@ def _CreateDisks(lu, instance, to_skip=None, target_node=None):
                  " node %s" % (file_storage_dir, pnode))
 
   # Note: this needs to be kept in sync with adding of disks in
-  # LUSetInstanceParams
+  # LUInstanceSetParams
   for idx, device in enumerate(instance.disks):
     if to_skip and idx in to_skip:
       continue
@@ -6806,22 +7297,24 @@ def _ComputeDiskSizePerVG(disk_template, disks):
 
   """
   def _compute(disks, payload):
-    """Universal algorithm
+    """Universal algorithm.
 
     """
     vgs = {}
     for disk in disks:
-      vgs[disk["vg"]] = vgs.get("vg", 0) + disk["size"] + payload
+      vgs[disk[constants.IDISK_VG]] = \
+        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
 
     return vgs
 
   # Required free disk space as a function of disk and swap space
   req_size_dict = {
-    constants.DT_DISKLESS: None,
+    constants.DT_DISKLESS: {},
     constants.DT_PLAIN: _compute(disks, 0),
     # 128 MB are added for drbd metadata for each disk
     constants.DT_DRBD8: _compute(disks, 128),
-    constants.DT_FILE: None,
+    constants.DT_FILE: {},
+    constants.DT_SHARED_FILE: {},
   }
 
   if disk_template not in req_size_dict:
@@ -6830,6 +7323,7 @@ def _ComputeDiskSizePerVG(disk_template, disks):
 
   return req_size_dict[disk_template]
 
+
 def _ComputeDiskSize(disk_template, disks):
   """Compute disk size requirements in the volume group
 
@@ -6837,10 +7331,12 @@ def _ComputeDiskSize(disk_template, disks):
   # Required free disk space as a function of disk and swap space
   req_size_dict = {
     constants.DT_DISKLESS: None,
-    constants.DT_PLAIN: sum(d["size"] for d in disks),
+    constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks),
     # 128 MB are added for drbd metadata for each disk
-    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
+    constants.DT_DRBD8: sum(d[constants.IDISK_SIZE] + 128 for d in disks),
     constants.DT_FILE: None,
+    constants.DT_SHARED_FILE: 0,
+    constants.DT_BLOCK: 0,
   }
 
   if disk_template not in req_size_dict:
@@ -6850,6 +7346,21 @@ def _ComputeDiskSize(disk_template, disks):
   return req_size_dict[disk_template]
 
 
+def _FilterVmNodes(lu, nodenames):
+  """Filters out non-vm_capable nodes from a list.
+
+  @type lu: L{LogicalUnit}
+  @param lu: the logical unit for which we check
+  @type nodenames: list
+  @param nodenames: the list of nodes on which we should check
+  @rtype: list
+  @return: the list of vm-capable nodes
+
+  """
+  vm_nodes = frozenset(lu.cfg.GetNonVmCapableNodeList())
+  return [name for name in nodenames if name not in vm_nodes]
+
+
 def _CheckHVParams(lu, nodenames, hvname, hvparams):
   """Hypervisor parameter validation.
 
@@ -6867,6 +7378,7 @@ def _CheckHVParams(lu, nodenames, hvname, hvparams):
   @raise errors.OpPrereqError: if the parameters are not valid
 
   """
+  nodenames = _FilterVmNodes(lu, nodenames)
   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
                                                   hvname,
                                                   hvparams)
@@ -6894,6 +7406,7 @@ def _CheckOSParams(lu, required, nodenames, osname, osparams):
   @raise errors.OpPrereqError: if the parameters are not valid
 
   """
+  nodenames = _FilterVmNodes(lu, nodenames)
   result = lu.rpc.call_os_validate(required, nodenames, osname,
                                    [constants.OS_VALIDATE_PARAMETERS],
                                    osparams)
@@ -6906,41 +7419,12 @@ def _CheckOSParams(lu, required, nodenames, osname, osparams):
                  osname, node)
 
 
-class LUCreateInstance(LogicalUnit):
+class LUInstanceCreate(LogicalUnit):
   """Create an instance.
 
   """
   HPATH = "instance-add"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_PARAMS = [
-    _PInstanceName,
-    ("mode", ht.NoDefault, ht.TElemOf(constants.INSTANCE_CREATE_MODES)),
-    ("start", True, ht.TBool),
-    ("wait_for_sync", True, ht.TBool),
-    ("ip_check", True, ht.TBool),
-    ("name_check", True, ht.TBool),
-    ("disks", ht.NoDefault, ht.TListOf(ht.TDict)),
-    ("nics", ht.NoDefault, ht.TListOf(ht.TDict)),
-    ("hvparams", ht.EmptyDict, ht.TDict),
-    ("beparams", ht.EmptyDict, ht.TDict),
-    ("osparams", ht.EmptyDict, ht.TDict),
-    ("no_install", None, ht.TMaybeBool),
-    ("os_type", None, ht.TMaybeString),
-    ("force_variant", False, ht.TBool),
-    ("source_handshake", None, ht.TOr(ht.TList, ht.TNone)),
-    ("source_x509_ca", None, ht.TMaybeString),
-    ("source_instance_name", None, ht.TMaybeString),
-    ("src_node", None, ht.TMaybeString),
-    ("src_path", None, ht.TMaybeString),
-    ("pnode", None, ht.TMaybeString),
-    ("snode", None, ht.TMaybeString),
-    ("iallocator", None, ht.TMaybeString),
-    ("hypervisor", None, ht.TMaybeString),
-    ("disk_template", ht.NoDefault, _CheckDiskTemplate),
-    ("identify_defaults", False, ht.TBool),
-    ("file_driver", None, ht.TOr(ht.TNone, ht.TElemOf(constants.FILE_DRIVER))),
-    ("file_storage_dir", None, ht.TMaybeString),
-    ]
   REQ_BGL = False
 
   def CheckArguments(self):
@@ -6969,7 +7453,7 @@ class LUCreateInstance(LogicalUnit):
     has_adopt = has_no_adopt = False
     for disk in self.op.disks:
       utils.ForceDictType(disk, constants.IDISK_PARAMS_TYPES)
-      if "adopt" in disk:
+      if constants.IDISK_ADOPT in disk:
         has_adopt = True
       else:
         has_no_adopt = True
@@ -6988,6 +7472,12 @@ class LUCreateInstance(LogicalUnit):
       if self.op.mode == constants.INSTANCE_IMPORT:
         raise errors.OpPrereqError("Disk adoption not allowed for"
                                    " instance import", errors.ECODE_INVAL)
+    else:
+      if self.op.disk_template in constants.DTS_MUST_ADOPT:
+        raise errors.OpPrereqError("Disk template %s requires disk adoption,"
+                                   " but no 'adopt' parameter given" %
+                                   self.op.disk_template,
+                                   errors.ECODE_INVAL)
 
     self.adopt_disks = has_adopt
 
@@ -7014,7 +7504,7 @@ class LUCreateInstance(LogicalUnit):
     _CheckIAllocatorOrNode(self, "iallocator", "pnode")
 
     if self.op.pnode is not None:
-      if self.op.disk_template in constants.DTS_NET_MIRROR:
+      if self.op.disk_template in constants.DTS_INT_MIRROR:
         if self.op.snode is None:
           raise errors.OpPrereqError("The networked disk templates need"
                                      " a mirror node", errors.ECODE_INVAL)
@@ -7202,15 +7692,21 @@ class LUCreateInstance(LogicalUnit):
       vcpus=self.be_full[constants.BE_VCPUS],
       nics=_NICListToTuple(self, self.nics),
       disk_template=self.op.disk_template,
-      disks=[(d["size"], d["mode"]) for d in self.disks],
+      disks=[(d[constants.IDISK_SIZE], d[constants.IDISK_MODE])
+             for d in self.disks],
       bep=self.be_full,
       hvp=self.hv_full,
       hypervisor_name=self.op.hypervisor,
     ))
 
-    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
-          self.secondaries)
-    return env, nl, nl
+    return env
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
+    nl = [self.cfg.GetMasterNode(), self.op.pnode] + self.secondaries
+    return nl, nl
 
   def _ReadExportInfo(self):
     """Reads the export information from disk.
@@ -7284,7 +7780,7 @@ class LUCreateInstance(LogicalUnit):
         # TODO: import the disk iv_name too
         for idx in range(einfo.getint(constants.INISECT_INS, "disk_count")):
           disk_sz = einfo.getint(constants.INISECT_INS, "disk%d_size" % idx)
-          disks.append({"size": disk_sz})
+          disks.append({constants.IDISK_SIZE: disk_sz})
         self.op.disks = disks
       else:
         raise errors.OpPrereqError("No disk info specified and the export"
@@ -7364,8 +7860,6 @@ class LUCreateInstance(LogicalUnit):
       export_info = self._ReadExportInfo()
       self._ReadExportParams(export_info)
 
-    _CheckDiskTemplate(self.op.disk_template)
-
     if (not self.cfg.GetVGName() and
         self.op.disk_template not in constants.DTS_NOT_LVM):
       raise errors.OpPrereqError("Cluster does not support lvm-based"
@@ -7407,7 +7901,7 @@ class LUCreateInstance(LogicalUnit):
     # NIC buildup
     self.nics = []
     for idx, nic in enumerate(self.op.nics):
-      nic_mode_req = nic.get("mode", None)
+      nic_mode_req = nic.get(constants.INIC_MODE, None)
       nic_mode = nic_mode_req
       if nic_mode is None:
         nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
@@ -7419,7 +7913,7 @@ class LUCreateInstance(LogicalUnit):
         default_ip_mode = constants.VALUE_NONE
 
       # ip validity checks
-      ip = nic.get("ip", default_ip_mode)
+      ip = nic.get(constants.INIC_IP, default_ip_mode)
       if ip is None or ip.lower() == constants.VALUE_NONE:
         nic_ip = None
       elif ip.lower() == constants.VALUE_AUTO:
@@ -7440,7 +7934,7 @@ class LUCreateInstance(LogicalUnit):
                                    errors.ECODE_INVAL)
 
       # MAC address verification
-      mac = nic.get("mac", constants.VALUE_AUTO)
+      mac = nic.get(constants.INIC_MAC, constants.VALUE_AUTO)
       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
         mac = utils.NormalizeAndValidateMac(mac)
 
@@ -7451,18 +7945,8 @@ class LUCreateInstance(LogicalUnit):
                                      " in cluster" % mac,
                                      errors.ECODE_NOTUNIQUE)
 
-      # bridge verification
-      bridge = nic.get("bridge", None)
-      link = nic.get("link", None)
-      if bridge and link:
-        raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
-                                   " at the same time", errors.ECODE_INVAL)
-      elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
-        raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic",
-                                   errors.ECODE_INVAL)
-      elif bridge:
-        link = bridge
-
+      #  Build nic parameters
+      link = nic.get(constants.INIC_LINK, None)
       nicparams = {}
       if nic_mode_req:
         nicparams[constants.NIC_MODE] = nic_mode_req
@@ -7474,13 +7958,14 @@ class LUCreateInstance(LogicalUnit):
       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
 
     # disk checks/pre-build
+    default_vg = self.cfg.GetVGName()
     self.disks = []
     for disk in self.op.disks:
-      mode = disk.get("mode", constants.DISK_RDWR)
+      mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
       if mode not in constants.DISK_ACCESS_SET:
         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
                                    mode, errors.ECODE_INVAL)
-      size = disk.get("size", None)
+      size = disk.get(constants.IDISK_SIZE, None)
       if size is None:
         raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
       try:
@@ -7488,10 +7973,13 @@ class LUCreateInstance(LogicalUnit):
       except (TypeError, ValueError):
         raise errors.OpPrereqError("Invalid disk size '%s'" % size,
                                    errors.ECODE_INVAL)
-      vg = disk.get("vg", self.cfg.GetVGName())
-      new_disk = {"size": size, "mode": mode, "vg": vg}
-      if "adopt" in disk:
-        new_disk["adopt"] = disk["adopt"]
+      new_disk = {
+        constants.IDISK_SIZE: size,
+        constants.IDISK_MODE: mode,
+        constants.IDISK_VG: disk.get(constants.IDISK_VG, default_vg),
+        }
+      if constants.IDISK_ADOPT in disk:
+        new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
       self.disks.append(new_disk)
 
     if self.op.mode == constants.INSTANCE_IMPORT:
@@ -7576,7 +8064,7 @@ class LUCreateInstance(LogicalUnit):
     self.secondaries = []
 
     # mirror node verification
-    if self.op.disk_template in constants.DTS_NET_MIRROR:
+    if self.op.disk_template in constants.DTS_INT_MIRROR:
       if self.op.snode == pnode.name:
         raise errors.OpPrereqError("The secondary node cannot be the"
                                    " primary node.", errors.ECODE_INVAL)
@@ -7592,24 +8080,30 @@ class LUCreateInstance(LogicalUnit):
       req_sizes = _ComputeDiskSizePerVG(self.op.disk_template, self.disks)
       _CheckNodesFreeDiskPerVG(self, nodenames, req_sizes)
 
-    else: # instead, we must check the adoption data
-      all_lvs = set([i["adopt"] for i in self.disks])
+    elif self.op.disk_template == constants.DT_PLAIN: # Check the adoption data
+      all_lvs = set(["%s/%s" % (disk[constants.IDISK_VG],
+                                disk[constants.IDISK_ADOPT])
+                     for disk in self.disks])
       if len(all_lvs) != len(self.disks):
         raise errors.OpPrereqError("Duplicate volume names given for adoption",
                                    errors.ECODE_INVAL)
       for lv_name in all_lvs:
         try:
-          # FIXME: VG must be provided here. Else all LVs with the
-          # same name will be locked on all VGs.
+          # FIXME: lv_name here is "vg/lv" need to ensure that other calls
+          # to ReserveLV uses the same syntax
           self.cfg.ReserveLV(lv_name, self.proc.GetECId())
         except errors.ReservationError:
           raise errors.OpPrereqError("LV named %s used by another instance" %
                                      lv_name, errors.ECODE_NOTUNIQUE)
 
+      vg_names = self.rpc.call_vg_list([pnode.name])[pnode.name]
+      vg_names.Raise("Cannot get VG information from node %s" % pnode.name)
+
       node_lvs = self.rpc.call_lv_list([pnode.name],
-                                       self.cfg.GetVGName())[pnode.name]
+                                       vg_names.payload.keys())[pnode.name]
       node_lvs.Raise("Cannot get LV information from node %s" % pnode.name)
       node_lvs = node_lvs.payload
+
       delta = all_lvs.difference(node_lvs.keys())
       if delta:
         raise errors.OpPrereqError("Missing logical volume(s): %s" %
@@ -7622,7 +8116,39 @@ class LUCreateInstance(LogicalUnit):
                                    errors.ECODE_STATE)
       # update the size of disk based on what is found
       for dsk in self.disks:
-        dsk["size"] = int(float(node_lvs[dsk["adopt"]][0]))
+        dsk[constants.IDISK_SIZE] = \
+          int(float(node_lvs["%s/%s" % (dsk[constants.IDISK_VG],
+                                        dsk[constants.IDISK_ADOPT])][0]))
+
+    elif self.op.disk_template == constants.DT_BLOCK:
+      # Normalize and de-duplicate device paths
+      all_disks = set([os.path.abspath(disk[constants.IDISK_ADOPT])
+                       for disk in self.disks])
+      if len(all_disks) != len(self.disks):
+        raise errors.OpPrereqError("Duplicate disk names given for adoption",
+                                   errors.ECODE_INVAL)
+      baddisks = [d for d in all_disks
+                  if not d.startswith(constants.ADOPTABLE_BLOCKDEV_ROOT)]
+      if baddisks:
+        raise errors.OpPrereqError("Device node(s) %s lie outside %s and"
+                                   " cannot be adopted" %
+                                   (", ".join(baddisks),
+                                    constants.ADOPTABLE_BLOCKDEV_ROOT),
+                                   errors.ECODE_INVAL)
+
+      node_disks = self.rpc.call_bdev_sizes([pnode.name],
+                                            list(all_disks))[pnode.name]
+      node_disks.Raise("Cannot get block device information from node %s" %
+                       pnode.name)
+      node_disks = node_disks.payload
+      delta = all_disks.difference(node_disks.keys())
+      if delta:
+        raise errors.OpPrereqError("Missing block device(s): %s" %
+                                   utils.CommaJoin(delta),
+                                   errors.ECODE_INVAL)
+      for dsk in self.disks:
+        dsk[constants.IDISK_SIZE] = \
+          int(float(node_disks[dsk[constants.IDISK_ADOPT]]))
 
     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
 
@@ -7654,7 +8180,7 @@ class LUCreateInstance(LogicalUnit):
     else:
       network_port = None
 
-    if constants.ENABLE_FILE_STORAGE:
+    if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
       # this is needed because os.path.join does not accept None arguments
       if self.op.file_storage_dir is None:
         string_file_storage_dir = ""
@@ -7662,7 +8188,12 @@ class LUCreateInstance(LogicalUnit):
         string_file_storage_dir = self.op.file_storage_dir
 
       # build the full file storage dir path
-      file_storage_dir = utils.PathJoin(self.cfg.GetFileStorageDir(),
+      if self.op.disk_template == constants.DT_SHARED_FILE:
+        get_fsd_fn = self.cfg.GetSharedFileStorageDir
+      else:
+        get_fsd_fn = self.cfg.GetFileStorageDir
+
+      file_storage_dir = utils.PathJoin(get_fsd_fn(),
                                         string_file_storage_dir, instance)
     else:
       file_storage_dir = ""
@@ -7690,17 +8221,18 @@ class LUCreateInstance(LogicalUnit):
                             )
 
     if self.adopt_disks:
-      # rename LVs to the newly-generated names; we need to construct
-      # 'fake' LV disks with the old data, plus the new unique_id
-      tmp_disks = [objects.Disk.FromDict(v.ToDict()) for v in disks]
-      rename_to = []
-      for t_dsk, a_dsk in zip (tmp_disks, self.disks):
-        rename_to.append(t_dsk.logical_id)
-        t_dsk.logical_id = (t_dsk.logical_id[0], a_dsk["adopt"])
-        self.cfg.SetDiskID(t_dsk, pnode_name)
-      result = self.rpc.call_blockdev_rename(pnode_name,
-                                             zip(tmp_disks, rename_to))
-      result.Raise("Failed to rename adoped LVs")
+      if self.op.disk_template == constants.DT_PLAIN:
+        # rename LVs to the newly-generated names; we need to construct
+        # 'fake' LV disks with the old data, plus the new unique_id
+        tmp_disks = [objects.Disk.FromDict(v.ToDict()) for v in disks]
+        rename_to = []
+        for t_dsk, a_dsk in zip (tmp_disks, self.disks):
+          rename_to.append(t_dsk.logical_id)
+          t_dsk.logical_id = (t_dsk.logical_id[0], a_dsk[constants.IDISK_ADOPT])
+          self.cfg.SetDiskID(t_dsk, pnode_name)
+        result = self.rpc.call_blockdev_rename(pnode_name,
+                                               zip(tmp_disks, rename_to))
+        result.Raise("Failed to rename adoped LVs")
     else:
       feedback_fn("* creating instance disks...")
       try:
@@ -7745,7 +8277,7 @@ class LUCreateInstance(LogicalUnit):
 
     if self.op.wait_for_sync:
       disk_abort = not _WaitForSync(self, iobj)
-    elif iobj.disk_template in constants.DTS_NET_MIRROR:
+    elif iobj.disk_template in constants.DTS_INT_MIRROR:
       # make sure the disks are not degraded (still sync-ing is ok)
       time.sleep(15)
       feedback_fn("* checking mirrors status")
@@ -7799,7 +8331,11 @@ class LUCreateInstance(LogicalUnit):
 
       elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
         feedback_fn("* preparing remote import...")
-        connect_timeout = constants.RIE_CONNECT_TIMEOUT
+        # The source cluster will stop the instance before attempting to make a
+        # connection. In some cases stopping an instance can take a long time,
+        # hence the shutdown timeout is added to the connection timeout.
+        connect_timeout = (constants.RIE_CONNECT_TIMEOUT +
+                           self.op.source_shutdown_timeout)
         timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
 
         assert iobj.primary_node == self.pnode.name
@@ -7839,7 +8375,7 @@ class LUCreateInstance(LogicalUnit):
     return list(iobj.all_nodes)
 
 
-class LUConnectConsole(NoHooksLU):
+class LUInstanceConsole(NoHooksLU):
   """Connect to an instance's console.
 
   This is somewhat special in that it returns the command line that
@@ -7847,9 +8383,6 @@ class LUConnectConsole(NoHooksLU):
   console.
 
   """
-  _OP_PARAMS = [
-    _PInstanceName
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -7879,40 +8412,44 @@ class LUConnectConsole(NoHooksLU):
 
     if instance.name not in node_insts.payload:
       if instance.admin_up:
-        state = "ERROR_down"
+        state = constants.INSTST_ERRORDOWN
       else:
-        state = "ADMIN_down"
+        state = constants.INSTST_ADMINDOWN
       raise errors.OpExecError("Instance %s is not running (state %s)" %
                                (instance.name, state))
 
     logging.debug("Connecting to console of %s on %s", instance.name, node)
 
-    hyper = hypervisor.GetHypervisor(instance.hypervisor)
-    cluster = self.cfg.GetClusterInfo()
-    # beparams and hvparams are passed separately, to avoid editing the
-    # instance and then saving the defaults in the instance itself.
-    hvparams = cluster.FillHV(instance)
-    beparams = cluster.FillBE(instance)
-    console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
+    return _GetInstanceConsole(self.cfg.GetClusterInfo(), instance)
+
+
+def _GetInstanceConsole(cluster, instance):
+  """Returns console information for an instance.
+
+  @type cluster: L{objects.Cluster}
+  @type instance: L{objects.Instance}
+  @rtype: dict
 
-    # build ssh cmdline
-    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
+  """
+  hyper = hypervisor.GetHypervisor(instance.hypervisor)
+  # beparams and hvparams are passed separately, to avoid editing the
+  # instance and then saving the defaults in the instance itself.
+  hvparams = cluster.FillHV(instance)
+  beparams = cluster.FillBE(instance)
+  console = hyper.GetInstanceConsole(instance, hvparams, beparams)
+
+  assert console.instance == instance.name
+  assert console.Validate()
 
+  return console.ToDict()
 
-class LUReplaceDisks(LogicalUnit):
+
+class LUInstanceReplaceDisks(LogicalUnit):
   """Replace the disks of an instance.
 
   """
   HPATH = "mirrors-replace"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_PARAMS = [
-    _PInstanceName,
-    ("mode", ht.NoDefault, ht.TElemOf(constants.REPLACE_MODES)),
-    ("disks", ht.EmptyList, ht.TListOf(ht.TPositiveInt)),
-    ("remote_node", None, ht.TMaybeString),
-    ("iallocator", None, ht.TMaybeString),
-    ("early_release", False, ht.TBool),
-    ]
   REQ_BGL = False
 
   def CheckArguments(self):
@@ -7966,13 +8503,20 @@ class LUReplaceDisks(LogicalUnit):
       "OLD_SECONDARY": instance.secondary_nodes[0],
       }
     env.update(_BuildInstanceHookEnvByObject(self, instance))
+    return env
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
+    instance = self.replacer.instance
     nl = [
       self.cfg.GetMasterNode(),
       instance.primary_node,
       ]
     if self.op.remote_node is not None:
       nl.append(self.op.remote_node)
-    return env, nl, nl
+    return nl, nl
 
 
 class TLReplaceDisks(Tasklet):
@@ -8062,6 +8606,30 @@ class TLReplaceDisks(Tasklet):
     return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
                                     node_name, True)
 
+  def _CheckDisksActivated(self, instance):
+    """Checks if the instance disks are activated.
+
+    @param instance: The instance to check disks
+    @return: True if they are activated, False otherwise
+
+    """
+    nodes = instance.all_nodes
+
+    for idx, dev in enumerate(instance.disks):
+      for node in nodes:
+        self.lu.LogInfo("Checking disk/%d on %s", idx, node)
+        self.cfg.SetDiskID(dev, node)
+
+        result = self.rpc.call_blockdev_find(node, dev)
+
+        if result.offline:
+          continue
+        elif result.fail_msg or not result.payload:
+          return False
+
+    return True
+
+
   def CheckPrereq(self):
     """Check prerequisites.
 
@@ -8125,6 +8693,10 @@ class TLReplaceDisks(Tasklet):
                                  errors.ECODE_INVAL)
 
     if self.mode == constants.REPLACE_DISK_AUTO:
+      if not self._CheckDisksActivated(instance):
+        raise errors.OpPrereqError("Please run activate-disks on instance %s"
+                                   " first" % self.instance_name,
+                                   errors.ECODE_STATE)
       faulty_primary = self._FindFaultyDisks(instance.primary_node)
       faulty_secondary = self._FindFaultyDisks(secondary_node)
 
@@ -8649,12 +9221,6 @@ class LURepairNodeStorage(NoHooksLU):
   """Repairs the volume group on a node.
 
   """
-  _OP_PARAMS = [
-    _PNodeName,
-    ("storage_type", ht.NoDefault, _CheckStorageType),
-    ("name", ht.NoDefault, ht.TNonEmptyString),
-    ("ignore_consistency", False, ht.TBool),
-    ]
   REQ_BGL = False
 
   def CheckArguments(self):
@@ -8713,15 +9279,10 @@ class LURepairNodeStorage(NoHooksLU):
                  (self.op.name, self.op.node_name))
 
 
-class LUNodeEvacuationStrategy(NoHooksLU):
+class LUNodeEvacStrategy(NoHooksLU):
   """Computes the node evacuation strategy.
 
   """
-  _OP_PARAMS = [
-    ("nodes", ht.NoDefault, ht.TListOf(ht.TNonEmptyString)),
-    ("remote_node", None, ht.TMaybeString),
-    ("iallocator", None, ht.TMaybeString),
-    ]
   REQ_BGL = False
 
   def CheckArguments(self):
@@ -8762,18 +9323,12 @@ class LUNodeEvacuationStrategy(NoHooksLU):
     return result
 
 
-class LUGrowDisk(LogicalUnit):
+class LUInstanceGrowDisk(LogicalUnit):
   """Grow a disk of an instance.
 
   """
   HPATH = "disk-grow"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_PARAMS = [
-    _PInstanceName,
-    ("disk", ht.NoDefault, ht.TInt),
-    ("amount", ht.NoDefault, ht.TInt),
-    ("wait_for_sync", True, ht.TBool),
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -8796,8 +9351,14 @@ class LUGrowDisk(LogicalUnit):
       "AMOUNT": self.op.amount,
       }
     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
+    return env
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
-    return env, nl, nl
+    return (nl, nl)
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -8820,11 +9381,12 @@ class LUGrowDisk(LogicalUnit):
 
     self.disk = instance.FindDisk(self.op.disk)
 
-    if instance.disk_template != constants.DT_FILE:
-      # TODO: check the free disk space for file, when that feature
-      # will be supported
+    if instance.disk_template not in (constants.DT_FILE,
+                                      constants.DT_SHARED_FILE):
+      # TODO: check the free disk space for file, when that feature will be
+      # supported
       _CheckNodesFreeDiskPerVG(self, nodenames,
-                               {self.disk.physical_id[0]: self.op.amount})
+                               self.disk.ComputeGrowth(self.op.amount))
 
   def Exec(self, feedback_fn):
     """Execute disk grow.
@@ -8864,35 +9426,41 @@ class LUGrowDisk(LogicalUnit):
                            " sync mode was requested.")
 
 
-class LUQueryInstanceData(NoHooksLU):
+class LUInstanceQueryData(NoHooksLU):
   """Query runtime instance data.
 
   """
-  _OP_PARAMS = [
-    ("instances", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
-    ("static", False, ht.TBool),
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
     self.needed_locks = {}
-    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
 
-    if self.op.instances:
-      self.wanted_names = []
-      for name in self.op.instances:
-        full_name = _ExpandInstanceName(self.cfg, name)
-        self.wanted_names.append(full_name)
-      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
+    # Use locking if requested or when non-static information is wanted
+    if not (self.op.static or self.op.use_locking):
+      self.LogWarning("Non-static data requested, locks need to be acquired")
+      self.op.use_locking = True
+
+    if self.op.instances or not self.op.use_locking:
+      # Expand instance names right here
+      self.wanted_names = _GetWantedInstances(self, self.op.instances)
     else:
+      # Will use acquired locks
       self.wanted_names = None
-      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
 
-    self.needed_locks[locking.LEVEL_NODE] = []
-    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+    if self.op.use_locking:
+      self.share_locks = dict.fromkeys(locking.LEVELS, 1)
+
+      if self.wanted_names is None:
+        self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
+      else:
+        self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
+
+      self.needed_locks[locking.LEVEL_NODE] = []
+      self.share_locks = dict.fromkeys(locking.LEVELS, 1)
+      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
 
   def DeclareLocks(self, level):
-    if level == locking.LEVEL_NODE:
+    if self.op.use_locking and level == locking.LEVEL_NODE:
       self._LockInstancesNodes()
 
   def CheckPrereq(self):
@@ -8902,10 +9470,11 @@ class LUQueryInstanceData(NoHooksLU):
 
     """
     if self.wanted_names is None:
+      assert self.op.use_locking, "Locking was not used"
       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
 
-    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
-                             in self.wanted_names]
+    self.wanted_instances = [self.cfg.GetInstanceInfo(name)
+                             for name in self.wanted_names]
 
   def _ComputeBlockdevStatus(self, node, instance_name, dev):
     """Returns the status of a block device
@@ -8951,7 +9520,7 @@ class LUQueryInstanceData(NoHooksLU):
     else:
       dev_children = []
 
-    data = {
+    return {
       "iv_name": dev.iv_name,
       "dev_type": dev.dev_type,
       "logical_id": dev.logical_id,
@@ -8963,8 +9532,6 @@ class LUQueryInstanceData(NoHooksLU):
       "size": dev.size,
       }
 
-    return data
-
   def Exec(self, feedback_fn):
     """Gather and return data"""
     result = {}
@@ -8992,7 +9559,7 @@ class LUQueryInstanceData(NoHooksLU):
       disks = [self._ComputeDiskStatus(instance, None, device)
                for device in instance.disks]
 
-      idict = {
+      result[instance.name] = {
         "name": instance.name,
         "config_state": config_state,
         "run_state": remote_state,
@@ -9017,30 +9584,15 @@ class LUQueryInstanceData(NoHooksLU):
         "uuid": instance.uuid,
         }
 
-      result[instance.name] = idict
-
     return result
 
 
-class LUSetInstanceParams(LogicalUnit):
+class LUInstanceSetParams(LogicalUnit):
   """Modifies an instances's parameters.
 
   """
   HPATH = "instance-modify"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_PARAMS = [
-    _PInstanceName,
-    ("nics", ht.EmptyList, ht.TList),
-    ("disks", ht.EmptyList, ht.TList),
-    ("beparams", ht.EmptyDict, ht.TDict),
-    ("hvparams", ht.EmptyDict, ht.TDict),
-    ("disk_template", None, ht.TMaybeString),
-    ("remote_node", None, ht.TMaybeString),
-    ("os_name", None, ht.TMaybeString),
-    ("force_variant", False, ht.TBool),
-    ("osparams", None, ht.TOr(ht.TDict, ht.TNone)),
-    _PForce,
-    ]
   REQ_BGL = False
 
   def CheckArguments(self):
@@ -9068,11 +9620,11 @@ class LUSetInstanceParams(LogicalUnit):
           raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
 
       if disk_op == constants.DDM_ADD:
-        mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
+        mode = disk_dict.setdefault(constants.IDISK_MODE, constants.DISK_RDWR)
         if mode not in constants.DISK_ACCESS_SET:
           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode,
                                      errors.ECODE_INVAL)
-        size = disk_dict.get('size', None)
+        size = disk_dict.get(constants.IDISK_SIZE, None)
         if size is None:
           raise errors.OpPrereqError("Required disk parameter size missing",
                                      errors.ECODE_INVAL)
@@ -9081,10 +9633,10 @@ class LUSetInstanceParams(LogicalUnit):
         except (TypeError, ValueError), err:
           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
                                      str(err), errors.ECODE_INVAL)
-        disk_dict['size'] = size
+        disk_dict[constants.IDISK_SIZE] = size
       else:
         # modification of disk
-        if 'size' in disk_dict:
+        if constants.IDISK_SIZE in disk_dict:
           raise errors.OpPrereqError("Disk size change not possible, use"
                                      " grow-disk", errors.ECODE_INVAL)
 
@@ -9097,13 +9649,12 @@ class LUSetInstanceParams(LogicalUnit):
                                  " changes not supported at the same time",
                                  errors.ECODE_INVAL)
 
-    if self.op.disk_template:
-      _CheckDiskTemplate(self.op.disk_template)
-      if (self.op.disk_template in constants.DTS_NET_MIRROR and
-          self.op.remote_node is None):
-        raise errors.OpPrereqError("Changing the disk template to a mirrored"
-                                   " one requires specifying a secondary node",
-                                   errors.ECODE_INVAL)
+    if (self.op.disk_template and
+        self.op.disk_template in constants.DTS_INT_MIRROR and
+        self.op.remote_node is None):
+      raise errors.OpPrereqError("Changing the disk template to a mirrored"
+                                 " one requires specifying a secondary node",
+                                 errors.ECODE_INVAL)
 
     # NIC validation
     nic_addremove = 0
@@ -9122,32 +9673,32 @@ class LUSetInstanceParams(LogicalUnit):
           raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
 
       # nic_dict should be a dict
-      nic_ip = nic_dict.get('ip', None)
+      nic_ip = nic_dict.get(constants.INIC_IP, None)
       if nic_ip is not None:
         if nic_ip.lower() == constants.VALUE_NONE:
-          nic_dict['ip'] = None
+          nic_dict[constants.INIC_IP] = None
         else:
           if not netutils.IPAddress.IsValid(nic_ip):
             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip,
                                        errors.ECODE_INVAL)
 
       nic_bridge = nic_dict.get('bridge', None)
-      nic_link = nic_dict.get('link', None)
+      nic_link = nic_dict.get(constants.INIC_LINK, None)
       if nic_bridge and nic_link:
         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
                                    " at the same time", errors.ECODE_INVAL)
       elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
         nic_dict['bridge'] = None
       elif nic_link and nic_link.lower() == constants.VALUE_NONE:
-        nic_dict['link'] = None
+        nic_dict[constants.INIC_LINK] = None
 
       if nic_op == constants.DDM_ADD:
-        nic_mac = nic_dict.get('mac', None)
+        nic_mac = nic_dict.get(constants.INIC_MAC, None)
         if nic_mac is None:
-          nic_dict['mac'] = constants.VALUE_AUTO
+          nic_dict[constants.INIC_MAC] = constants.VALUE_AUTO
 
-      if 'mac' in nic_dict:
-        nic_mac = nic_dict['mac']
+      if constants.INIC_MAC in nic_dict:
+        nic_mac = nic_dict[constants.INIC_MAC]
         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
           nic_mac = utils.NormalizeAndValidateMac(nic_mac)
 
@@ -9193,12 +9744,12 @@ class LUSetInstanceParams(LogicalUnit):
           this_nic_override = nic_override[idx]
         else:
           this_nic_override = {}
-        if 'ip' in this_nic_override:
-          ip = this_nic_override['ip']
+        if constants.INIC_IP in this_nic_override:
+          ip = this_nic_override[constants.INIC_IP]
         else:
           ip = nic.ip
-        if 'mac' in this_nic_override:
-          mac = this_nic_override['mac']
+        if constants.INIC_MAC in this_nic_override:
+          mac = this_nic_override[constants.INIC_MAC]
         else:
           mac = nic.mac
         if idx in self.nic_pnew:
@@ -9209,8 +9760,8 @@ class LUSetInstanceParams(LogicalUnit):
         link = nicparams[constants.NIC_LINK]
         args['nics'].append((ip, mac, mode, link))
       if constants.DDM_ADD in nic_override:
-        ip = nic_override[constants.DDM_ADD].get('ip', None)
-        mac = nic_override[constants.DDM_ADD]['mac']
+        ip = nic_override[constants.DDM_ADD].get(constants.INIC_IP, None)
+        mac = nic_override[constants.DDM_ADD][constants.INIC_MAC]
         nicparams = self.nic_pnew[constants.DDM_ADD]
         mode = nicparams[constants.NIC_MODE]
         link = nicparams[constants.NIC_LINK]
@@ -9221,8 +9772,15 @@ class LUSetInstanceParams(LogicalUnit):
     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
     if self.op.disk_template:
       env["NEW_DISK_TEMPLATE"] = self.op.disk_template
+
+    return env
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
-    return env, nl, nl
+    return (nl, nl)
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -9259,7 +9817,7 @@ class LUSetInstanceParams(LogicalUnit):
                                                   self.op.disk_template),
                                    errors.ECODE_INVAL)
       _CheckInstanceDown(self, instance, "cannot change disk template")
-      if self.op.disk_template in constants.DTS_NET_MIRROR:
+      if self.op.disk_template in constants.DTS_INT_MIRROR:
         if self.op.remote_node == pnode:
           raise errors.OpPrereqError("Given new secondary node %s is the same"
                                      " as the primary node of the instance" %
@@ -9268,7 +9826,8 @@ class LUSetInstanceParams(LogicalUnit):
         _CheckNodeNotDrained(self, self.op.remote_node)
         # FIXME: here we assume that the old instance type is DT_PLAIN
         assert instance.disk_template == constants.DT_PLAIN
-        disks = [{"size": d.size, "vg": d.logical_id[0]}
+        disks = [{constants.IDISK_SIZE: d.size,
+                  constants.IDISK_VG: d.logical_id[0]}
                  for d in instance.disks]
         required = _ComputeDiskSizePerVG(self.op.disk_template, disks)
         _CheckNodesFreeDiskPerVG(self, [self.op.remote_node], required)
@@ -9413,21 +9972,22 @@ class LUSetInstanceParams(LogicalUnit):
           else:
             raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
       if new_nic_mode == constants.NIC_MODE_ROUTED:
-        if 'ip' in nic_dict:
-          nic_ip = nic_dict['ip']
+        if constants.INIC_IP in nic_dict:
+          nic_ip = nic_dict[constants.INIC_IP]
         else:
           nic_ip = old_nic_ip
         if nic_ip is None:
           raise errors.OpPrereqError('Cannot set the nic ip to None'
                                      ' on a routed nic', errors.ECODE_INVAL)
-      if 'mac' in nic_dict:
-        nic_mac = nic_dict['mac']
+      if constants.INIC_MAC in nic_dict:
+        nic_mac = nic_dict[constants.INIC_MAC]
         if nic_mac is None:
           raise errors.OpPrereqError('Cannot set the nic mac to None',
                                      errors.ECODE_INVAL)
         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
           # otherwise generate the mac
-          nic_dict['mac'] = self.cfg.GenerateMAC(self.proc.GetECId())
+          nic_dict[constants.INIC_MAC] = \
+            self.cfg.GenerateMAC(self.proc.GetECId())
         else:
           # or validate/reserve the current one
           try:
@@ -9450,7 +10010,7 @@ class LUSetInstanceParams(LogicalUnit):
         _CheckInstanceDown(self, instance, "cannot remove disks")
 
       if (disk_op == constants.DDM_ADD and
-          len(instance.nics) >= constants.MAX_DISKS):
+          len(instance.disks) >= constants.MAX_DISKS):
         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
                                    " add more" % constants.MAX_DISKS,
                                    errors.ECODE_STATE)
@@ -9474,7 +10034,8 @@ class LUSetInstanceParams(LogicalUnit):
     snode = self.op.remote_node
 
     # create a fake disk info for _GenerateDiskTemplate
-    disk_info = [{"size": d.size, "mode": d.mode} for d in instance.disks]
+    disk_info = [{constants.IDISK_SIZE: d.size, constants.IDISK_MODE: d.mode}
+                 for d in instance.disks]
     new_disks = _GenerateDiskTemplate(self, self.op.disk_template,
                                       instance.name, pnode, [snode],
                                       disk_info, None, None, 0, feedback_fn)
@@ -9581,7 +10142,8 @@ class LUSetInstanceParams(LogicalUnit):
         result.append(("disk/%d" % device_idx, "remove"))
       elif disk_op == constants.DDM_ADD:
         # add a new disk
-        if instance.disk_template == constants.DT_FILE:
+        if instance.disk_template in (constants.DT_FILE,
+                                        constants.DT_SHARED_FILE):
           file_driver, file_path = instance.disks[0].logical_id
           file_path = os.path.dirname(file_path)
         else:
@@ -9615,13 +10177,14 @@ class LUSetInstanceParams(LogicalUnit):
                        (new_disk.size, new_disk.mode)))
       else:
         # change a given disk
-        instance.disks[disk_op].mode = disk_dict['mode']
-        result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
+        instance.disks[disk_op].mode = disk_dict[constants.IDISK_MODE]
+        result.append(("disk.mode/%d" % disk_op,
+                       disk_dict[constants.IDISK_MODE]))
 
     if self.op.disk_template:
       r_shut = _ShutdownInstanceDisks(self, instance)
       if not r_shut:
-        raise errors.OpExecError("Cannot shutdow instance disks, unable to"
+        raise errors.OpExecError("Cannot shutdown instance disks, unable to"
                                  " proceed with disk template conversion")
       mode = (instance.disk_template, self.op.disk_template)
       try:
@@ -9639,8 +10202,8 @@ class LUSetInstanceParams(LogicalUnit):
         result.append(("nic.%d" % len(instance.nics), "remove"))
       elif nic_op == constants.DDM_ADD:
         # mac and bridge should be set, by now
-        mac = nic_dict['mac']
-        ip = nic_dict.get('ip', None)
+        mac = nic_dict[constants.INIC_MAC]
+        ip = nic_dict.get(constants.INIC_IP, None)
         nicparams = self.nic_pinst[constants.DDM_ADD]
         new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
         instance.nics.append(new_nic)
@@ -9651,7 +10214,7 @@ class LUSetInstanceParams(LogicalUnit):
                         self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
                        )))
       else:
-        for key in 'mac', 'ip':
+        for key in (constants.INIC_MAC, constants.INIC_IP):
           if key in nic_dict:
             setattr(instance.nics[nic_op], key, nic_dict[key])
         if nic_op in self.nic_pinst:
@@ -9691,14 +10254,10 @@ class LUSetInstanceParams(LogicalUnit):
     }
 
 
-class LUQueryExports(NoHooksLU):
+class LUBackupQuery(NoHooksLU):
   """Query the exports list
 
   """
-  _OP_PARAMS = [
-    ("nodes", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
-    ("use_locking", False, ht.TBool),
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -9731,14 +10290,10 @@ class LUQueryExports(NoHooksLU):
     return result
 
 
-class LUPrepareExport(NoHooksLU):
+class LUBackupPrepare(NoHooksLU):
   """Prepares an instance for an export and returns useful information.
 
   """
-  _OP_PARAMS = [
-    _PInstanceName,
-    ("mode", ht.NoDefault, ht.TElemOf(constants.EXPORT_MODES)),
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -9786,23 +10341,12 @@ class LUPrepareExport(NoHooksLU):
     return None
 
 
-class LUExportInstance(LogicalUnit):
+class LUBackupExport(LogicalUnit):
   """Export an instance to an image in the cluster.
 
   """
   HPATH = "instance-export"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_PARAMS = [
-    _PInstanceName,
-    ("target_node", ht.NoDefault, ht.TOr(ht.TNonEmptyString, ht.TList)),
-    ("shutdown", True, ht.TBool),
-    _PShutdownTimeout,
-    ("remove_instance", False, ht.TBool),
-    ("ignore_remove_failures", False, ht.TBool),
-    ("mode", constants.EXPORT_MODE_LOCAL, ht.TElemOf(constants.EXPORT_MODES)),
-    ("x509_key_name", None, ht.TOr(ht.TList, ht.TNone)),
-    ("destination_x509_ca", None, ht.TMaybeString),
-    ]
   REQ_BGL = False
 
   def CheckArguments(self):
@@ -9857,12 +10401,18 @@ class LUExportInstance(LogicalUnit):
 
     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
 
+    return env
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
     nl = [self.cfg.GetMasterNode(), self.instance.primary_node]
 
     if self.op.mode == constants.EXPORT_MODE_LOCAL:
       nl.append(self.op.target_node)
 
-    return env, nl, nl
+    return (nl, nl)
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -9930,209 +10480,688 @@ class LUExportInstance(LogicalUnit):
 
       self.dest_x509_ca = cert
 
-      # Verify target information
-      disk_info = []
-      for idx, disk_data in enumerate(self.op.target_node):
-        try:
-          (host, port, magic) = \
-            masterd.instance.CheckRemoteExportDiskInfo(cds, idx, disk_data)
-        except errors.GenericError, err:
-          raise errors.OpPrereqError("Target info for disk %s: %s" %
-                                     (idx, err), errors.ECODE_INVAL)
+      # Verify target information
+      disk_info = []
+      for idx, disk_data in enumerate(self.op.target_node):
+        try:
+          (host, port, magic) = \
+            masterd.instance.CheckRemoteExportDiskInfo(cds, idx, disk_data)
+        except errors.GenericError, err:
+          raise errors.OpPrereqError("Target info for disk %s: %s" %
+                                     (idx, err), errors.ECODE_INVAL)
+
+        disk_info.append((host, port, magic))
+
+      assert len(disk_info) == len(self.op.target_node)
+      self.dest_disk_info = disk_info
+
+    else:
+      raise errors.ProgrammerError("Unhandled export mode %r" %
+                                   self.op.mode)
+
+    # instance disk type verification
+    # TODO: Implement export support for file-based disks
+    for disk in self.instance.disks:
+      if disk.dev_type == constants.LD_FILE:
+        raise errors.OpPrereqError("Export not supported for instances with"
+                                   " file-based disks", errors.ECODE_INVAL)
+
+  def _CleanupExports(self, feedback_fn):
+    """Removes exports of current instance from all other nodes.
+
+    If an instance in a cluster with nodes A..D was exported to node C, its
+    exports will be removed from the nodes A, B and D.
+
+    """
+    assert self.op.mode != constants.EXPORT_MODE_REMOTE
+
+    nodelist = self.cfg.GetNodeList()
+    nodelist.remove(self.dst_node.name)
+
+    # on one-node clusters nodelist will be empty after the removal
+    # if we proceed the backup would be removed because OpBackupQuery
+    # substitutes an empty list with the full cluster node list.
+    iname = self.instance.name
+    if nodelist:
+      feedback_fn("Removing old exports for instance %s" % iname)
+      exportlist = self.rpc.call_export_list(nodelist)
+      for node in exportlist:
+        if exportlist[node].fail_msg:
+          continue
+        if iname in exportlist[node].payload:
+          msg = self.rpc.call_export_remove(node, iname).fail_msg
+          if msg:
+            self.LogWarning("Could not remove older export for instance %s"
+                            " on node %s: %s", iname, node, msg)
+
+  def Exec(self, feedback_fn):
+    """Export an instance to an image in the cluster.
+
+    """
+    assert self.op.mode in constants.EXPORT_MODES
+
+    instance = self.instance
+    src_node = instance.primary_node
+
+    if self.op.shutdown:
+      # shutdown the instance, but not the disks
+      feedback_fn("Shutting down instance %s" % instance.name)
+      result = self.rpc.call_instance_shutdown(src_node, instance,
+                                               self.op.shutdown_timeout)
+      # TODO: Maybe ignore failures if ignore_remove_failures is set
+      result.Raise("Could not shutdown instance %s on"
+                   " node %s" % (instance.name, src_node))
+
+    # set the disks ID correctly since call_instance_start needs the
+    # correct drbd minor to create the symlinks
+    for disk in instance.disks:
+      self.cfg.SetDiskID(disk, src_node)
+
+    activate_disks = (not instance.admin_up)
+
+    if activate_disks:
+      # Activate the instance disks if we'exporting a stopped instance
+      feedback_fn("Activating disks for %s" % instance.name)
+      _StartInstanceDisks(self, instance, None)
+
+    try:
+      helper = masterd.instance.ExportInstanceHelper(self, feedback_fn,
+                                                     instance)
+
+      helper.CreateSnapshots()
+      try:
+        if (self.op.shutdown and instance.admin_up and
+            not self.op.remove_instance):
+          assert not activate_disks
+          feedback_fn("Starting instance %s" % instance.name)
+          result = self.rpc.call_instance_start(src_node, instance, None, None)
+          msg = result.fail_msg
+          if msg:
+            feedback_fn("Failed to start instance: %s" % msg)
+            _ShutdownInstanceDisks(self, instance)
+            raise errors.OpExecError("Could not start instance: %s" % msg)
+
+        if self.op.mode == constants.EXPORT_MODE_LOCAL:
+          (fin_resu, dresults) = helper.LocalExport(self.dst_node)
+        elif self.op.mode == constants.EXPORT_MODE_REMOTE:
+          connect_timeout = constants.RIE_CONNECT_TIMEOUT
+          timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
+
+          (key_name, _, _) = self.x509_key_name
+
+          dest_ca_pem = \
+            OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
+                                            self.dest_x509_ca)
+
+          (fin_resu, dresults) = helper.RemoteExport(self.dest_disk_info,
+                                                     key_name, dest_ca_pem,
+                                                     timeouts)
+      finally:
+        helper.Cleanup()
+
+      # Check for backwards compatibility
+      assert len(dresults) == len(instance.disks)
+      assert compat.all(isinstance(i, bool) for i in dresults), \
+             "Not all results are boolean: %r" % dresults
+
+    finally:
+      if activate_disks:
+        feedback_fn("Deactivating disks for %s" % instance.name)
+        _ShutdownInstanceDisks(self, instance)
+
+    if not (compat.all(dresults) and fin_resu):
+      failures = []
+      if not fin_resu:
+        failures.append("export finalization")
+      if not compat.all(dresults):
+        fdsk = utils.CommaJoin(idx for (idx, dsk) in enumerate(dresults)
+                               if not dsk)
+        failures.append("disk export: disk(s) %s" % fdsk)
+
+      raise errors.OpExecError("Export failed, errors in %s" %
+                               utils.CommaJoin(failures))
+
+    # At this point, the export was successful, we can cleanup/finish
+
+    # Remove instance if requested
+    if self.op.remove_instance:
+      feedback_fn("Removing instance %s" % instance.name)
+      _RemoveInstance(self, feedback_fn, instance,
+                      self.op.ignore_remove_failures)
+
+    if self.op.mode == constants.EXPORT_MODE_LOCAL:
+      self._CleanupExports(feedback_fn)
+
+    return fin_resu, dresults
+
+
+class LUBackupRemove(NoHooksLU):
+  """Remove exports related to the named instance.
+
+  """
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self.needed_locks = {}
+    # We need all nodes to be locked in order for RemoveExport to work, but we
+    # don't need to lock the instance itself, as nothing will happen to it (and
+    # we can remove exports also for a removed instance)
+    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+
+  def Exec(self, feedback_fn):
+    """Remove any export.
+
+    """
+    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
+    # If the instance was not found we'll try with the name that was passed in.
+    # This will only work if it was an FQDN, though.
+    fqdn_warn = False
+    if not instance_name:
+      fqdn_warn = True
+      instance_name = self.op.instance_name
+
+    locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
+    exportlist = self.rpc.call_export_list(locked_nodes)
+    found = False
+    for node in exportlist:
+      msg = exportlist[node].fail_msg
+      if msg:
+        self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
+        continue
+      if instance_name in exportlist[node].payload:
+        found = True
+        result = self.rpc.call_export_remove(node, instance_name)
+        msg = result.fail_msg
+        if msg:
+          logging.error("Could not remove export for instance %s"
+                        " on node %s: %s", instance_name, node, msg)
+
+    if fqdn_warn and not found:
+      feedback_fn("Export not found. If trying to remove an export belonging"
+                  " to a deleted instance please use its Fully Qualified"
+                  " Domain Name.")
+
+
+class LUGroupAdd(LogicalUnit):
+  """Logical unit for creating node groups.
+
+  """
+  HPATH = "group-add"
+  HTYPE = constants.HTYPE_GROUP
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    # We need the new group's UUID here so that we can create and acquire the
+    # corresponding lock. Later, in Exec(), we'll indicate to cfg.AddNodeGroup
+    # that it should not check whether the UUID exists in the configuration.
+    self.group_uuid = self.cfg.GenerateUniqueID(self.proc.GetECId())
+    self.needed_locks = {}
+    self.add_locks[locking.LEVEL_NODEGROUP] = self.group_uuid
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    This checks that the given group name is not an existing node group
+    already.
+
+    """
+    try:
+      existing_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
+    except errors.OpPrereqError:
+      pass
+    else:
+      raise errors.OpPrereqError("Desired group name '%s' already exists as a"
+                                 " node group (UUID: %s)" %
+                                 (self.op.group_name, existing_uuid),
+                                 errors.ECODE_EXISTS)
+
+    if self.op.ndparams:
+      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    """
+    return {
+      "GROUP_NAME": self.op.group_name,
+      }
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
+    mn = self.cfg.GetMasterNode()
+    return ([mn], [mn])
+
+  def Exec(self, feedback_fn):
+    """Add the node group to the cluster.
+
+    """
+    group_obj = objects.NodeGroup(name=self.op.group_name, members=[],
+                                  uuid=self.group_uuid,
+                                  alloc_policy=self.op.alloc_policy,
+                                  ndparams=self.op.ndparams)
+
+    self.cfg.AddNodeGroup(group_obj, self.proc.GetECId(), check_uuid=False)
+    del self.remove_locks[locking.LEVEL_NODEGROUP]
+
+
+class LUGroupAssignNodes(NoHooksLU):
+  """Logical unit for assigning nodes to groups.
+
+  """
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    # These raise errors.OpPrereqError on their own:
+    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
+    self.op.nodes = _GetWantedNodes(self, self.op.nodes)
+
+    # We want to lock all the affected nodes and groups. We have readily
+    # available the list of nodes, and the *destination* group. To gather the
+    # list of "source" groups, we need to fetch node information.
+    self.node_data = self.cfg.GetAllNodesInfo()
+    affected_groups = set(self.node_data[node].group for node in self.op.nodes)
+    affected_groups.add(self.group_uuid)
+
+    self.needed_locks = {
+      locking.LEVEL_NODEGROUP: list(affected_groups),
+      locking.LEVEL_NODE: self.op.nodes,
+      }
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    """
+    self.group = self.cfg.GetNodeGroup(self.group_uuid)
+    instance_data = self.cfg.GetAllInstancesInfo()
+
+    if self.group is None:
+      raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" %
+                               (self.op.group_name, self.group_uuid))
+
+    (new_splits, previous_splits) = \
+      self.CheckAssignmentForSplitInstances([(node, self.group_uuid)
+                                             for node in self.op.nodes],
+                                            self.node_data, instance_data)
+
+    if new_splits:
+      fmt_new_splits = utils.CommaJoin(utils.NiceSort(new_splits))
+
+      if not self.op.force:
+        raise errors.OpExecError("The following instances get split by this"
+                                 " change and --force was not given: %s" %
+                                 fmt_new_splits)
+      else:
+        self.LogWarning("This operation will split the following instances: %s",
+                        fmt_new_splits)
+
+        if previous_splits:
+          self.LogWarning("In addition, these already-split instances continue"
+                          " to be spit across groups: %s",
+                          utils.CommaJoin(utils.NiceSort(previous_splits)))
+
+  def Exec(self, feedback_fn):
+    """Assign nodes to a new group.
+
+    """
+    for node in self.op.nodes:
+      self.node_data[node].group = self.group_uuid
+
+    self.cfg.Update(self.group, feedback_fn) # Saves all modified nodes.
+
+  @staticmethod
+  def CheckAssignmentForSplitInstances(changes, node_data, instance_data):
+    """Check for split instances after a node assignment.
+
+    This method considers a series of node assignments as an atomic operation,
+    and returns information about split instances after applying the set of
+    changes.
+
+    In particular, it returns information about newly split instances, and
+    instances that were already split, and remain so after the change.
+
+    Only instances whose disk template is listed in constants.DTS_INT_MIRROR are
+    considered.
+
+    @type changes: list of (node_name, new_group_uuid) pairs.
+    @param changes: list of node assignments to consider.
+    @param node_data: a dict with data for all nodes
+    @param instance_data: a dict with all instances to consider
+    @rtype: a two-tuple
+    @return: a list of instances that were previously okay and result split as a
+      consequence of this change, and a list of instances that were previously
+      split and this change does not fix.
+
+    """
+    changed_nodes = dict((node, group) for node, group in changes
+                         if node_data[node].group != group)
+
+    all_split_instances = set()
+    previously_split_instances = set()
+
+    def InstanceNodes(instance):
+      return [instance.primary_node] + list(instance.secondary_nodes)
+
+    for inst in instance_data.values():
+      if inst.disk_template not in constants.DTS_INT_MIRROR:
+        continue
+
+      instance_nodes = InstanceNodes(inst)
+
+      if len(set(node_data[node].group for node in instance_nodes)) > 1:
+        previously_split_instances.add(inst.name)
+
+      if len(set(changed_nodes.get(node, node_data[node].group)
+                 for node in instance_nodes)) > 1:
+        all_split_instances.add(inst.name)
+
+    return (list(all_split_instances - previously_split_instances),
+            list(previously_split_instances & all_split_instances))
+
+
+class _GroupQuery(_QueryBase):
+  FIELDS = query.GROUP_FIELDS
+
+  def ExpandNames(self, lu):
+    lu.needed_locks = {}
+
+    self._all_groups = lu.cfg.GetAllNodeGroupsInfo()
+    name_to_uuid = dict((g.name, g.uuid) for g in self._all_groups.values())
+
+    if not self.names:
+      self.wanted = [name_to_uuid[name]
+                     for name in utils.NiceSort(name_to_uuid.keys())]
+    else:
+      # Accept names to be either names or UUIDs.
+      missing = []
+      self.wanted = []
+      all_uuid = frozenset(self._all_groups.keys())
+
+      for name in self.names:
+        if name in all_uuid:
+          self.wanted.append(name)
+        elif name in name_to_uuid:
+          self.wanted.append(name_to_uuid[name])
+        else:
+          missing.append(name)
+
+      if missing:
+        raise errors.OpPrereqError("Some groups do not exist: %s" % missing,
+                                   errors.ECODE_NOENT)
+
+  def DeclareLocks(self, lu, level):
+    pass
+
+  def _GetQueryData(self, lu):
+    """Computes the list of node groups and their attributes.
+
+    """
+    do_nodes = query.GQ_NODE in self.requested_data
+    do_instances = query.GQ_INST in self.requested_data
+
+    group_to_nodes = None
+    group_to_instances = None
+
+    # For GQ_NODE, we need to map group->[nodes], and group->[instances] for
+    # GQ_INST. The former is attainable with just GetAllNodesInfo(), but for the
+    # latter GetAllInstancesInfo() is not enough, for we have to go through
+    # instance->node. Hence, we will need to process nodes even if we only need
+    # instance information.
+    if do_nodes or do_instances:
+      all_nodes = lu.cfg.GetAllNodesInfo()
+      group_to_nodes = dict((uuid, []) for uuid in self.wanted)
+      node_to_group = {}
+
+      for node in all_nodes.values():
+        if node.group in group_to_nodes:
+          group_to_nodes[node.group].append(node.name)
+          node_to_group[node.name] = node.group
+
+      if do_instances:
+        all_instances = lu.cfg.GetAllInstancesInfo()
+        group_to_instances = dict((uuid, []) for uuid in self.wanted)
+
+        for instance in all_instances.values():
+          node = instance.primary_node
+          if node in node_to_group:
+            group_to_instances[node_to_group[node]].append(instance.name)
+
+        if not do_nodes:
+          # Do not pass on node information if it was not requested.
+          group_to_nodes = None
+
+    return query.GroupQueryData([self._all_groups[uuid]
+                                 for uuid in self.wanted],
+                                group_to_nodes, group_to_instances)
+
+
+class LUGroupQuery(NoHooksLU):
+  """Logical unit for querying node groups.
+
+  """
+  REQ_BGL = False
+
+  def CheckArguments(self):
+    self.gq = _GroupQuery(qlang.MakeSimpleFilter("name", self.op.names),
+                          self.op.output_fields, False)
+
+  def ExpandNames(self):
+    self.gq.ExpandNames(self)
+
+  def Exec(self, feedback_fn):
+    return self.gq.OldStyleQuery(self)
+
+
+class LUGroupSetParams(LogicalUnit):
+  """Modifies the parameters of a node group.
+
+  """
+  HPATH = "group-modify"
+  HTYPE = constants.HTYPE_GROUP
+  REQ_BGL = False
+
+  def CheckArguments(self):
+    all_changes = [
+      self.op.ndparams,
+      self.op.alloc_policy,
+      ]
+
+    if all_changes.count(None) == len(all_changes):
+      raise errors.OpPrereqError("Please pass at least one modification",
+                                 errors.ECODE_INVAL)
+
+  def ExpandNames(self):
+    # This raises errors.OpPrereqError on its own:
+    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
+
+    self.needed_locks = {
+      locking.LEVEL_NODEGROUP: [self.group_uuid],
+      }
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    """
+    self.group = self.cfg.GetNodeGroup(self.group_uuid)
+
+    if self.group is None:
+      raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" %
+                               (self.op.group_name, self.group_uuid))
+
+    if self.op.ndparams:
+      new_ndparams = _GetUpdatedParams(self.group.ndparams, self.op.ndparams)
+      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
+      self.new_ndparams = new_ndparams
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    """
+    return {
+      "GROUP_NAME": self.op.group_name,
+      "NEW_ALLOC_POLICY": self.op.alloc_policy,
+      }
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
+    mn = self.cfg.GetMasterNode()
+    return ([mn], [mn])
+
+  def Exec(self, feedback_fn):
+    """Modifies the node group.
 
-        disk_info.append((host, port, magic))
+    """
+    result = []
 
-      assert len(disk_info) == len(self.op.target_node)
-      self.dest_disk_info = disk_info
+    if self.op.ndparams:
+      self.group.ndparams = self.new_ndparams
+      result.append(("ndparams", str(self.group.ndparams)))
 
-    else:
-      raise errors.ProgrammerError("Unhandled export mode %r" %
-                                   self.op.mode)
+    if self.op.alloc_policy:
+      self.group.alloc_policy = self.op.alloc_policy
 
-    # instance disk type verification
-    # TODO: Implement export support for file-based disks
-    for disk in self.instance.disks:
-      if disk.dev_type == constants.LD_FILE:
-        raise errors.OpPrereqError("Export not supported for instances with"
-                                   " file-based disks", errors.ECODE_INVAL)
+    self.cfg.Update(self.group, feedback_fn)
+    return result
 
-  def _CleanupExports(self, feedback_fn):
-    """Removes exports of current instance from all other nodes.
 
-    If an instance in a cluster with nodes A..D was exported to node C, its
-    exports will be removed from the nodes A, B and D.
 
-    """
-    assert self.op.mode != constants.EXPORT_MODE_REMOTE
+class LUGroupRemove(LogicalUnit):
+  HPATH = "group-remove"
+  HTYPE = constants.HTYPE_GROUP
+  REQ_BGL = False
 
-    nodelist = self.cfg.GetNodeList()
-    nodelist.remove(self.dst_node.name)
+  def ExpandNames(self):
+    # This will raises errors.OpPrereqError on its own:
+    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
+    self.needed_locks = {
+      locking.LEVEL_NODEGROUP: [self.group_uuid],
+      }
 
-    # on one-node clusters nodelist will be empty after the removal
-    # if we proceed the backup would be removed because OpQueryExports
-    # substitutes an empty list with the full cluster node list.
-    iname = self.instance.name
-    if nodelist:
-      feedback_fn("Removing old exports for instance %s" % iname)
-      exportlist = self.rpc.call_export_list(nodelist)
-      for node in exportlist:
-        if exportlist[node].fail_msg:
-          continue
-        if iname in exportlist[node].payload:
-          msg = self.rpc.call_export_remove(node, iname).fail_msg
-          if msg:
-            self.LogWarning("Could not remove older export for instance %s"
-                            " on node %s: %s", iname, node, msg)
+  def CheckPrereq(self):
+    """Check prerequisites.
 
-  def Exec(self, feedback_fn):
-    """Export an instance to an image in the cluster.
+    This checks that the given group name exists as a node group, that is
+    empty (i.e., contains no nodes), and that is not the last group of the
+    cluster.
 
     """
-    assert self.op.mode in constants.EXPORT_MODES
+    # Verify that the group is empty.
+    group_nodes = [node.name
+                   for node in self.cfg.GetAllNodesInfo().values()
+                   if node.group == self.group_uuid]
 
-    instance = self.instance
-    src_node = instance.primary_node
+    if group_nodes:
+      raise errors.OpPrereqError("Group '%s' not empty, has the following"
+                                 " nodes: %s" %
+                                 (self.op.group_name,
+                                  utils.CommaJoin(utils.NiceSort(group_nodes))),
+                                 errors.ECODE_STATE)
 
-    if self.op.shutdown:
-      # shutdown the instance, but not the disks
-      feedback_fn("Shutting down instance %s" % instance.name)
-      result = self.rpc.call_instance_shutdown(src_node, instance,
-                                               self.op.shutdown_timeout)
-      # TODO: Maybe ignore failures if ignore_remove_failures is set
-      result.Raise("Could not shutdown instance %s on"
-                   " node %s" % (instance.name, src_node))
+    # Verify the cluster would not be left group-less.
+    if len(self.cfg.GetNodeGroupList()) == 1:
+      raise errors.OpPrereqError("Group '%s' is the only group,"
+                                 " cannot be removed" %
+                                 self.op.group_name,
+                                 errors.ECODE_STATE)
 
-    # set the disks ID correctly since call_instance_start needs the
-    # correct drbd minor to create the symlinks
-    for disk in instance.disks:
-      self.cfg.SetDiskID(disk, src_node)
+  def BuildHooksEnv(self):
+    """Build hooks env.
 
-    activate_disks = (not instance.admin_up)
+    """
+    return {
+      "GROUP_NAME": self.op.group_name,
+      }
 
-    if activate_disks:
-      # Activate the instance disks if we'exporting a stopped instance
-      feedback_fn("Activating disks for %s" % instance.name)
-      _StartInstanceDisks(self, instance, None)
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
 
-    try:
-      helper = masterd.instance.ExportInstanceHelper(self, feedback_fn,
-                                                     instance)
+    """
+    mn = self.cfg.GetMasterNode()
+    return ([mn], [mn])
 
-      helper.CreateSnapshots()
-      try:
-        if (self.op.shutdown and instance.admin_up and
-            not self.op.remove_instance):
-          assert not activate_disks
-          feedback_fn("Starting instance %s" % instance.name)
-          result = self.rpc.call_instance_start(src_node, instance, None, None)
-          msg = result.fail_msg
-          if msg:
-            feedback_fn("Failed to start instance: %s" % msg)
-            _ShutdownInstanceDisks(self, instance)
-            raise errors.OpExecError("Could not start instance: %s" % msg)
+  def Exec(self, feedback_fn):
+    """Remove the node group.
 
-        if self.op.mode == constants.EXPORT_MODE_LOCAL:
-          (fin_resu, dresults) = helper.LocalExport(self.dst_node)
-        elif self.op.mode == constants.EXPORT_MODE_REMOTE:
-          connect_timeout = constants.RIE_CONNECT_TIMEOUT
-          timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
+    """
+    try:
+      self.cfg.RemoveNodeGroup(self.group_uuid)
+    except errors.ConfigurationError:
+      raise errors.OpExecError("Group '%s' with UUID %s disappeared" %
+                               (self.op.group_name, self.group_uuid))
 
-          (key_name, _, _) = self.x509_key_name
+    self.remove_locks[locking.LEVEL_NODEGROUP] = self.group_uuid
 
-          dest_ca_pem = \
-            OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
-                                            self.dest_x509_ca)
 
-          (fin_resu, dresults) = helper.RemoteExport(self.dest_disk_info,
-                                                     key_name, dest_ca_pem,
-                                                     timeouts)
-      finally:
-        helper.Cleanup()
+class LUGroupRename(LogicalUnit):
+  HPATH = "group-rename"
+  HTYPE = constants.HTYPE_GROUP
+  REQ_BGL = False
 
-      # Check for backwards compatibility
-      assert len(dresults) == len(instance.disks)
-      assert compat.all(isinstance(i, bool) for i in dresults), \
-             "Not all results are boolean: %r" % dresults
+  def ExpandNames(self):
+    # This raises errors.OpPrereqError on its own:
+    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
 
-    finally:
-      if activate_disks:
-        feedback_fn("Deactivating disks for %s" % instance.name)
-        _ShutdownInstanceDisks(self, instance)
+    self.needed_locks = {
+      locking.LEVEL_NODEGROUP: [self.group_uuid],
+      }
 
-    if not (compat.all(dresults) and fin_resu):
-      failures = []
-      if not fin_resu:
-        failures.append("export finalization")
-      if not compat.all(dresults):
-        fdsk = utils.CommaJoin(idx for (idx, dsk) in enumerate(dresults)
-                               if not dsk)
-        failures.append("disk export: disk(s) %s" % fdsk)
+  def CheckPrereq(self):
+    """Check prerequisites.
 
-      raise errors.OpExecError("Export failed, errors in %s" %
-                               utils.CommaJoin(failures))
+    Ensures requested new name is not yet used.
 
-    # At this point, the export was successful, we can cleanup/finish
+    """
+    try:
+      new_name_uuid = self.cfg.LookupNodeGroup(self.op.new_name)
+    except errors.OpPrereqError:
+      pass
+    else:
+      raise errors.OpPrereqError("Desired new name '%s' clashes with existing"
+                                 " node group (UUID: %s)" %
+                                 (self.op.new_name, new_name_uuid),
+                                 errors.ECODE_EXISTS)
 
-    # Remove instance if requested
-    if self.op.remove_instance:
-      feedback_fn("Removing instance %s" % instance.name)
-      _RemoveInstance(self, feedback_fn, instance,
-                      self.op.ignore_remove_failures)
+  def BuildHooksEnv(self):
+    """Build hooks env.
 
-    if self.op.mode == constants.EXPORT_MODE_LOCAL:
-      self._CleanupExports(feedback_fn)
+    """
+    return {
+      "OLD_NAME": self.op.group_name,
+      "NEW_NAME": self.op.new_name,
+      }
 
-    return fin_resu, dresults
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
 
+    """
+    mn = self.cfg.GetMasterNode()
 
-class LURemoveExport(NoHooksLU):
-  """Remove exports related to the named instance.
+    all_nodes = self.cfg.GetAllNodesInfo()
+    all_nodes.pop(mn, None)
 
-  """
-  _OP_PARAMS = [
-    _PInstanceName,
-    ]
-  REQ_BGL = False
+    run_nodes = [mn]
+    run_nodes.extend(node.name for node in all_nodes.values()
+                     if node.group == self.group_uuid)
 
-  def ExpandNames(self):
-    self.needed_locks = {}
-    # We need all nodes to be locked in order for RemoveExport to work, but we
-    # don't need to lock the instance itself, as nothing will happen to it (and
-    # we can remove exports also for a removed instance)
-    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+    return (run_nodes, run_nodes)
 
   def Exec(self, feedback_fn):
-    """Remove any export.
+    """Rename the node group.
 
     """
-    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
-    # If the instance was not found we'll try with the name that was passed in.
-    # This will only work if it was an FQDN, though.
-    fqdn_warn = False
-    if not instance_name:
-      fqdn_warn = True
-      instance_name = self.op.instance_name
+    group = self.cfg.GetNodeGroup(self.group_uuid)
 
-    locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
-    exportlist = self.rpc.call_export_list(locked_nodes)
-    found = False
-    for node in exportlist:
-      msg = exportlist[node].fail_msg
-      if msg:
-        self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
-        continue
-      if instance_name in exportlist[node].payload:
-        found = True
-        result = self.rpc.call_export_remove(node, instance_name)
-        msg = result.fail_msg
-        if msg:
-          logging.error("Could not remove export for instance %s"
-                        " on node %s: %s", instance_name, node, msg)
+    if group is None:
+      raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" %
+                               (self.op.group_name, self.group_uuid))
 
-    if fqdn_warn and not found:
-      feedback_fn("Export not found. If trying to remove an export belonging"
-                  " to a deleted instance please use its Fully Qualified"
-                  " Domain Name.")
+    group.name = self.op.new_name
+    self.cfg.Update(group, feedback_fn)
+
+    return self.op.new_name
 
 
 class TagsLU(NoHooksLU): # pylint: disable-msg=W0223
@@ -10141,8 +11170,8 @@ class TagsLU(NoHooksLU): # pylint: disable-msg=W0223
   This is an abstract class which is the parent of all the other tags LUs.
 
   """
-
   def ExpandNames(self):
+    self.group_uuid = None
     self.needed_locks = {}
     if self.op.kind == constants.TAG_NODE:
       self.op.name = _ExpandNodeName(self.cfg, self.op.name)
@@ -10150,6 +11179,8 @@ class TagsLU(NoHooksLU): # pylint: disable-msg=W0223
     elif self.op.kind == constants.TAG_INSTANCE:
       self.op.name = _ExpandInstanceName(self.cfg, self.op.name)
       self.needed_locks[locking.LEVEL_INSTANCE] = self.op.name
+    elif self.op.kind == constants.TAG_NODEGROUP:
+      self.group_uuid = self.cfg.LookupNodeGroup(self.op.name)
 
     # FIXME: Acquire BGL for cluster tag operations (as of this writing it's
     # not possible to acquire the BGL based on opcode parameters)
@@ -10164,20 +11195,17 @@ class TagsLU(NoHooksLU): # pylint: disable-msg=W0223
       self.target = self.cfg.GetNodeInfo(self.op.name)
     elif self.op.kind == constants.TAG_INSTANCE:
       self.target = self.cfg.GetInstanceInfo(self.op.name)
+    elif self.op.kind == constants.TAG_NODEGROUP:
+      self.target = self.cfg.GetNodeGroup(self.group_uuid)
     else:
       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
                                  str(self.op.kind), errors.ECODE_INVAL)
 
 
-class LUGetTags(TagsLU):
+class LUTagsGet(TagsLU):
   """Returns the tags of a given object.
 
   """
-  _OP_PARAMS = [
-    ("kind", ht.NoDefault, ht.TElemOf(constants.VALID_TAG_TYPES)),
-    # Name is only meaningful for nodes and instances
-    ("name", ht.NoDefault, ht.TMaybeString),
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -10193,13 +11221,10 @@ class LUGetTags(TagsLU):
     return list(self.target.GetTags())
 
 
-class LUSearchTags(NoHooksLU):
+class LUTagsSearch(NoHooksLU):
   """Searches the tags for a given pattern.
 
   """
-  _OP_PARAMS = [
-    ("pattern", ht.NoDefault, ht.TNonEmptyString),
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -10227,6 +11252,8 @@ class LUSearchTags(NoHooksLU):
     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
     nlist = cfg.GetAllNodesInfo().values()
     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
+    tgts.extend(("/nodegroup/%s" % n.name, n)
+                for n in cfg.GetAllNodeGroupsInfo().values())
     results = []
     for path, target in tgts:
       for tag in target.GetTags():
@@ -10235,16 +11262,10 @@ class LUSearchTags(NoHooksLU):
     return results
 
 
-class LUAddTags(TagsLU):
+class LUTagsSet(TagsLU):
   """Sets a tag on a given object.
 
   """
-  _OP_PARAMS = [
-    ("kind", ht.NoDefault, ht.TElemOf(constants.VALID_TAG_TYPES)),
-    # Name is only meaningful for nodes and instances
-    ("name", ht.NoDefault, ht.TMaybeString),
-    ("tags", ht.NoDefault, ht.TListOf(ht.TNonEmptyString)),
-    ]
   REQ_BGL = False
 
   def CheckPrereq(self):
@@ -10269,16 +11290,10 @@ class LUAddTags(TagsLU):
     self.cfg.Update(self.target, feedback_fn)
 
 
-class LUDelTags(TagsLU):
+class LUTagsDel(TagsLU):
   """Delete a list of tags from a given object.
 
   """
-  _OP_PARAMS = [
-    ("kind", ht.NoDefault, ht.TElemOf(constants.VALID_TAG_TYPES)),
-    # Name is only meaningful for nodes and instances
-    ("name", ht.NoDefault, ht.TMaybeString),
-    ("tags", ht.NoDefault, ht.TListOf(ht.TNonEmptyString)),
-    ]
   REQ_BGL = False
 
   def CheckPrereq(self):
@@ -10316,12 +11331,6 @@ class LUTestDelay(NoHooksLU):
   time.
 
   """
-  _OP_PARAMS = [
-    ("duration", ht.NoDefault, ht.TFloat),
-    ("on_master", True, ht.TBool),
-    ("on_nodes", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
-    ("repeat", 0, ht.TPositiveInt)
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -10363,16 +11372,10 @@ class LUTestDelay(NoHooksLU):
         self._TestDelay()
 
 
-class LUTestJobqueue(NoHooksLU):
+class LUTestJqueue(NoHooksLU):
   """Utility LU to test some aspects of the job queue.
 
   """
-  _OP_PARAMS = [
-    ("notify_waitlock", False, ht.TBool),
-    ("notify_exec", False, ht.TBool),
-    ("log_messages", ht.EmptyList, ht.TListOf(ht.TString)),
-    ("fail", False, ht.TBool),
-    ]
   REQ_BGL = False
 
   # Must be lower than default timeout for WaitForJobChange to see whether it
@@ -10588,11 +11591,12 @@ class IAllocator(object):
       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
       # we don't have job IDs
       }
+    ninfo = cfg.GetAllNodesInfo()
     iinfo = cfg.GetAllInstancesInfo().values()
     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
 
     # node data
-    node_list = cfg.GetNodeList()
+    node_list = [n.name for n in ninfo.values() if n.vm_capable]
 
     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
       hypervisor_name = self.hypervisor
@@ -10609,7 +11613,11 @@ class IAllocator(object):
 
     data["nodegroups"] = self._ComputeNodeGroupData(cfg)
 
-    data["nodes"] = self._ComputeNodeData(cfg, node_data, node_iinfo, i_list)
+    config_ndata = self._ComputeBasicNodeData(ninfo)
+    data["nodes"] = self._ComputeDynamicNodeData(ninfo, node_data, node_iinfo,
+                                                 i_list, config_ndata)
+    assert len(data["nodes"]) == len(ninfo), \
+        "Incomplete node data computed"
 
     data["instances"] = self._ComputeInstanceData(cluster_info, i_list)
 
@@ -10622,18 +11630,23 @@ class IAllocator(object):
     """
     ng = {}
     for guuid, gdata in cfg.GetAllNodeGroupsInfo().items():
-      ng[guuid] = { "name": gdata.name }
+      ng[guuid] = {
+        "name": gdata.name,
+        "alloc_policy": gdata.alloc_policy,
+        }
     return ng
 
   @staticmethod
-  def _ComputeNodeData(cfg, node_data, node_iinfo, i_list):
+  def _ComputeBasicNodeData(node_cfg):
     """Compute global node data.
 
+    @rtype: dict
+    @returns: a dict of name: (node dict, node config)
+
     """
     node_results = {}
-    for nname, nresult in node_data.items():
-      # first fill in static (config-based) values
-      ninfo = cfg.GetNodeInfo(nname)
+    for ninfo in node_cfg.values():
+      # fill in static (config-based) values
       pnr = {
         "tags": list(ninfo.GetTags()),
         "primary_ip": ninfo.primary_ip,
@@ -10646,6 +11659,24 @@ class IAllocator(object):
         "vm_capable": ninfo.vm_capable,
         }
 
+      node_results[ninfo.name] = pnr
+
+    return node_results
+
+  @staticmethod
+  def _ComputeDynamicNodeData(node_cfg, node_data, node_iinfo, i_list,
+                              node_results):
+    """Compute global node data.
+
+    @param node_results: the basic node structures as filled from the config
+
+    """
+    # make a copy of the current dict
+    node_results = dict(node_results)
+    for nname, nresult in node_data.items():
+      assert nname in node_results, "Missing basic data for node %s" % nname
+      ninfo = node_cfg[nname]
+
       if not (ninfo.offline or ninfo.drained):
         nresult.Raise("Can't get data for node %s" % nname)
         node_iinfo[nname].Raise("Can't get node instance info from node %s" %
@@ -10687,9 +11718,8 @@ class IAllocator(object):
           "i_pri_memory": i_p_mem,
           "i_pri_up_memory": i_p_up_mem,
           }
-        pnr.update(pnr_dyn)
-
-      node_results[nname] = pnr
+        pnr_dyn.update(node_results[nname])
+        node_results[nname] = pnr_dyn
 
     return node_results
 
@@ -10719,7 +11749,9 @@ class IAllocator(object):
         "os": iinfo.os,
         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
         "nics": nic_data,
-        "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
+        "disks": [{constants.IDISK_SIZE: dsk.size,
+                   constants.IDISK_MODE: dsk.mode}
+                  for dsk in iinfo.disks],
         "disk_template": iinfo.disk_template,
         "hypervisor": iinfo.hypervisor,
         }
@@ -10741,7 +11773,7 @@ class IAllocator(object):
     """
     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
 
-    if self.disk_template in constants.DTS_NET_MIRROR:
+    if self.disk_template in constants.DTS_INT_MIRROR:
       self.required_nodes = 2
     else:
       self.required_nodes = 1
@@ -10774,16 +11806,17 @@ class IAllocator(object):
       raise errors.ProgrammerError("Unknown instance '%s' passed to"
                                    " IAllocator" % self.name)
 
-    if instance.disk_template not in constants.DTS_NET_MIRROR:
+    if instance.disk_template not in constants.DTS_MIRRORED:
       raise errors.OpPrereqError("Can't relocate non-mirrored instances",
                                  errors.ECODE_INVAL)
 
-    if len(instance.secondary_nodes) != 1:
+    if instance.disk_template in constants.DTS_INT_MIRROR and \
+        len(instance.secondary_nodes) != 1:
       raise errors.OpPrereqError("Instance has not exactly one secondary node",
                                  errors.ECODE_STATE)
 
     self.required_nodes = 1
-    disk_sizes = [{'size': disk.size} for disk in instance.disks]
+    disk_sizes = [{constants.IDISK_SIZE: disk.size} for disk in instance.disks]
     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
 
     request = {
@@ -10858,8 +11891,61 @@ class IAllocator(object):
     if not isinstance(rdict["result"], list):
       raise errors.OpExecError("Can't parse iallocator results: 'result' key"
                                " is not a list")
+
+    if self.mode == constants.IALLOCATOR_MODE_RELOC:
+      assert self.relocate_from is not None
+      assert self.required_nodes == 1
+
+      node2group = dict((name, ndata["group"])
+                        for (name, ndata) in self.in_data["nodes"].items())
+
+      fn = compat.partial(self._NodesToGroups, node2group,
+                          self.in_data["nodegroups"])
+
+      request_groups = fn(self.relocate_from)
+      result_groups = fn(rdict["result"])
+
+      if result_groups != request_groups:
+        raise errors.OpExecError("Groups of nodes returned by iallocator (%s)"
+                                 " differ from original groups (%s)" %
+                                 (utils.CommaJoin(result_groups),
+                                  utils.CommaJoin(request_groups)))
+
     self.out_data = rdict
 
+  @staticmethod
+  def _NodesToGroups(node2group, groups, nodes):
+    """Returns a list of unique group names for a list of nodes.
+
+    @type node2group: dict
+    @param node2group: Map from node name to group UUID
+    @type groups: dict
+    @param groups: Group information
+    @type nodes: list
+    @param nodes: Node names
+
+    """
+    result = set()
+
+    for node in nodes:
+      try:
+        group_uuid = node2group[node]
+      except KeyError:
+        # Ignore unknown node
+        pass
+      else:
+        try:
+          group = groups[group_uuid]
+        except KeyError:
+          # Can't find group, let's use UUID
+          group_name = group_uuid
+        else:
+          group_name = group["name"]
+
+        result.add(group_name)
+
+    return sorted(result)
+
 
 class LUTestAllocator(NoHooksLU):
   """Run allocator tests.
@@ -10867,25 +11953,6 @@ class LUTestAllocator(NoHooksLU):
   This LU runs the allocator tests
 
   """
-  _OP_PARAMS = [
-    ("direction", ht.NoDefault,
-     ht.TElemOf(constants.VALID_IALLOCATOR_DIRECTIONS)),
-    ("mode", ht.NoDefault, ht.TElemOf(constants.VALID_IALLOCATOR_MODES)),
-    ("name", ht.NoDefault, ht.TNonEmptyString),
-    ("nics", ht.NoDefault, ht.TOr(ht.TNone, ht.TListOf(
-      ht.TDictOf(ht.TElemOf(["mac", "ip", "bridge"]),
-               ht.TOr(ht.TNone, ht.TNonEmptyString))))),
-    ("disks", ht.NoDefault, ht.TOr(ht.TNone, ht.TList)),
-    ("hypervisor", None, ht.TMaybeString),
-    ("allocator", None, ht.TMaybeString),
-    ("tags", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
-    ("mem_size", None, ht.TOr(ht.TNone, ht.TPositiveInt)),
-    ("vcpus", None, ht.TOr(ht.TNone, ht.TPositiveInt)),
-    ("os", None, ht.TMaybeString),
-    ("disk_template", None, ht.TMaybeString),
-    ("evac_nodes", None, ht.TOr(ht.TNone, ht.TListOf(ht.TNonEmptyString))),
-    ]
-
   def CheckPrereq(self):
     """Check prerequisites.
 
@@ -10975,3 +12042,27 @@ class LUTestAllocator(NoHooksLU):
       ial.Run(self.op.allocator, validate=False)
       result = ial.out_text
     return result
+
+
+#: Query type implementations
+_QUERY_IMPL = {
+  constants.QR_INSTANCE: _InstanceQuery,
+  constants.QR_NODE: _NodeQuery,
+  constants.QR_GROUP: _GroupQuery,
+  constants.QR_OS: _OsQuery,
+  }
+
+assert set(_QUERY_IMPL.keys()) == constants.QR_VIA_OP
+
+
+def _GetQueryImplementation(name):
+  """Returns the implemtnation for a query type.
+
+  @param name: Query type, must be one of L{constants.QR_VIA_OP}
+
+  """
+  try:
+    return _QUERY_IMPL[name]
+  except KeyError:
+    raise errors.OpPrereqError("Unknown query resource '%s'" % name,
+                               errors.ECODE_INVAL)