Shared block storage support
[ganeti-local] / lib / cmdlib.py
index 8377896..be8e70c 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
-# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -39,6 +39,7 @@ import OpenSSL
 import socket
 import tempfile
 import shutil
+import itertools
 
 from ganeti import ssh
 from ganeti import utils
@@ -53,205 +54,24 @@ from ganeti import uidpool
 from ganeti import compat
 from ganeti import masterd
 from ganeti import netutils
+from ganeti import query
+from ganeti import qlang
+from ganeti import opcodes
 
 import ganeti.masterd.instance # pylint: disable-msg=W0611
 
 
-# Modifiable default values; need to define these here before the
-# actual LUs
+def _SupportsOob(cfg, node):
+  """Tells if node supports OOB.
 
-def _EmptyList():
-  """Returns an empty list.
+  @type cfg: L{config.ConfigWriter}
+  @param cfg: The cluster configuration
+  @type node: L{objects.Node}
+  @param node: The node
+  @return: The OOB script if supported or an empty string otherwise
 
   """
-  return []
-
-
-def _EmptyDict():
-  """Returns an empty dict.
-
-  """
-  return {}
-
-
-#: The without-default default value
-_NoDefault = object()
-
-
-#: The no-type (value to complex to check it in the type system)
-_NoType = object()
-
-
-# Some basic types
-def _TNotNone(val):
-  """Checks if the given value is not None.
-
-  """
-  return val is not None
-
-
-def _TNone(val):
-  """Checks if the given value is None.
-
-  """
-  return val is None
-
-
-def _TBool(val):
-  """Checks if the given value is a boolean.
-
-  """
-  return isinstance(val, bool)
-
-
-def _TInt(val):
-  """Checks if the given value is an integer.
-
-  """
-  return isinstance(val, int)
-
-
-def _TFloat(val):
-  """Checks if the given value is a float.
-
-  """
-  return isinstance(val, float)
-
-
-def _TString(val):
-  """Checks if the given value is a string.
-
-  """
-  return isinstance(val, basestring)
-
-
-def _TTrue(val):
-  """Checks if a given value evaluates to a boolean True value.
-
-  """
-  return bool(val)
-
-
-def _TElemOf(target_list):
-  """Builds a function that checks if a given value is a member of a list.
-
-  """
-  return lambda val: val in target_list
-
-
-# Container types
-def _TList(val):
-  """Checks if the given value is a list.
-
-  """
-  return isinstance(val, list)
-
-
-def _TDict(val):
-  """Checks if the given value is a dictionary.
-
-  """
-  return isinstance(val, dict)
-
-
-def _TIsLength(size):
-  """Check is the given container is of the given size.
-
-  """
-  return lambda container: len(container) == size
-
-
-# Combinator types
-def _TAnd(*args):
-  """Combine multiple functions using an AND operation.
-
-  """
-  def fn(val):
-    return compat.all(t(val) for t in args)
-  return fn
-
-
-def _TOr(*args):
-  """Combine multiple functions using an AND operation.
-
-  """
-  def fn(val):
-    return compat.any(t(val) for t in args)
-  return fn
-
-
-def _TMap(fn, test):
-  """Checks that a modified version of the argument passes the given test.
-
-  """
-  return lambda val: test(fn(val))
-
-
-# Type aliases
-
-#: a non-empty string
-_TNonEmptyString = _TAnd(_TString, _TTrue)
-
-
-#: a maybe non-empty string
-_TMaybeString = _TOr(_TNonEmptyString, _TNone)
-
-
-#: a maybe boolean (bool or none)
-_TMaybeBool = _TOr(_TBool, _TNone)
-
-
-#: a positive integer
-_TPositiveInt = _TAnd(_TInt, lambda v: v >= 0)
-
-#: a strictly positive integer
-_TStrictPositiveInt = _TAnd(_TInt, lambda v: v > 0)
-
-
-def _TListOf(my_type):
-  """Checks if a given value is a list with all elements of the same type.
-
-  """
-  return _TAnd(_TList,
-               lambda lst: compat.all(my_type(v) for v in lst))
-
-
-def _TDictOf(key_type, val_type):
-  """Checks a dict type for the type of its key/values.
-
-  """
-  return _TAnd(_TDict,
-               lambda my_dict: (compat.all(key_type(v) for v in my_dict.keys())
-                                and compat.all(val_type(v)
-                                               for v in my_dict.values())))
-
-
-# Common opcode attributes
-
-#: output fields for a query operation
-_POutputFields = ("output_fields", _NoDefault, _TListOf(_TNonEmptyString))
-
-
-#: the shutdown timeout
-_PShutdownTimeout = ("shutdown_timeout", constants.DEFAULT_SHUTDOWN_TIMEOUT,
-                     _TPositiveInt)
-
-#: the force parameter
-_PForce = ("force", False, _TBool)
-
-#: a required instance name (for single-instance LUs)
-_PInstanceName = ("instance_name", _NoDefault, _TNonEmptyString)
-
-
-#: a required node name (for single-node LUs)
-_PNodeName = ("node_name", _NoDefault, _TNonEmptyString)
-
-#: the migration type (live/non-live)
-_PMigrationMode = ("mode", None, _TOr(_TNone,
-                                      _TElemOf(constants.HT_MIGRATION_MODES)))
-
-#: the obsolete 'live' mode (boolean)
-_PMigrationLive = ("live", None, _TMaybeBool)
+  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
 
 
 # End types
@@ -271,13 +91,10 @@ class LogicalUnit(object):
 
   @ivar dry_run_result: the value (if any) that will be returned to the caller
       in dry-run mode (signalled by opcode dry_run parameter)
-  @cvar _OP_PARAMS: a list of opcode attributes, their defaults values
-      they should get if not already defined, and types they must match
 
   """
   HPATH = None
   HTYPE = None
-  _OP_PARAMS = []
   REQ_BGL = True
 
   def __init__(self, processor, op, context, rpc):
@@ -316,32 +133,8 @@ class LogicalUnit(object):
     # Tasklets
     self.tasklets = None
 
-    # The new kind-of-type-system
-    op_id = self.op.OP_ID
-    for attr_name, aval, test in self._OP_PARAMS:
-      if not hasattr(op, attr_name):
-        if aval == _NoDefault:
-          raise errors.OpPrereqError("Required parameter '%s.%s' missing" %
-                                     (op_id, attr_name), errors.ECODE_INVAL)
-        else:
-          if callable(aval):
-            dval = aval()
-          else:
-            dval = aval
-          setattr(self.op, attr_name, dval)
-      attr_val = getattr(op, attr_name)
-      if test == _NoType:
-        # no tests here
-        continue
-      if not callable(test):
-        raise errors.ProgrammerError("Validation for parameter '%s.%s' failed,"
-                                     " given type is not a proper type (%s)" %
-                                     (op_id, attr_name, test))
-      if not test(attr_val):
-        logging.error("OpCode %s, parameter %s, has invalid type %s/value %s",
-                      self.op.OP_ID, attr_name, type(attr_val), attr_val)
-        raise errors.OpPrereqError("Parameter '%s.%s' fails validation" %
-                                   (op_id, attr_name), errors.ECODE_INVAL)
+    # Validate opcode parameters and set defaults
+    self.op.Validate(True)
 
     self.CheckArguments()
 
@@ -379,7 +172,7 @@ class LogicalUnit(object):
     This method is called before starting to execute the opcode, and it should
     update all the parameters of the opcode to their canonical form (e.g. a
     short node name must be fully expanded after this method has successfully
-    completed). This way locking, hooks, logging, ecc. can work correctly.
+    completed). This way locking, hooks, logging, etc. can work correctly.
 
     LUs which implement this method must also populate the self.needed_locks
     member, as a dict with lock levels as keys, and a list of needed lock names
@@ -653,6 +446,104 @@ class Tasklet:
     raise NotImplementedError
 
 
+class _QueryBase:
+  """Base for query utility classes.
+
+  """
+  #: Attribute holding field definitions
+  FIELDS = None
+
+  def __init__(self, filter_, fields, use_locking):
+    """Initializes this class.
+
+    """
+    self.use_locking = use_locking
+
+    self.query = query.Query(self.FIELDS, fields, filter_=filter_,
+                             namefield="name")
+    self.requested_data = self.query.RequestedData()
+    self.names = self.query.RequestedNames()
+
+    # Sort only if no names were requested
+    self.sort_by_name = not self.names
+
+    self.do_locking = None
+    self.wanted = None
+
+  def _GetNames(self, lu, all_names, lock_level):
+    """Helper function to determine names asked for in the query.
+
+    """
+    if self.do_locking:
+      names = lu.acquired_locks[lock_level]
+    else:
+      names = all_names
+
+    if self.wanted == locking.ALL_SET:
+      assert not self.names
+      # caller didn't specify names, so ordering is not important
+      return utils.NiceSort(names)
+
+    # caller specified names and we must keep the same order
+    assert self.names
+    assert not self.do_locking or lu.acquired_locks[lock_level]
+
+    missing = set(self.wanted).difference(names)
+    if missing:
+      raise errors.OpExecError("Some items were removed before retrieving"
+                               " their data: %s" % missing)
+
+    # Return expanded names
+    return self.wanted
+
+  @classmethod
+  def FieldsQuery(cls, fields):
+    """Returns list of available fields.
+
+    @return: List of L{objects.QueryFieldDefinition}
+
+    """
+    return query.QueryFields(cls.FIELDS, fields)
+
+  def ExpandNames(self, lu):
+    """Expand names for this query.
+
+    See L{LogicalUnit.ExpandNames}.
+
+    """
+    raise NotImplementedError()
+
+  def DeclareLocks(self, lu, level):
+    """Declare locks for this query.
+
+    See L{LogicalUnit.DeclareLocks}.
+
+    """
+    raise NotImplementedError()
+
+  def _GetQueryData(self, lu):
+    """Collects all data for this query.
+
+    @return: Query data object
+
+    """
+    raise NotImplementedError()
+
+  def NewStyleQuery(self, lu):
+    """Collect data and execute query.
+
+    """
+    return query.GetQueryResponse(self.query, self._GetQueryData(lu),
+                                  sort_by_name=self.sort_by_name)
+
+  def OldStyleQuery(self, lu):
+    """Collect data and execute query.
+
+    """
+    return self.query.OldStyleQuery(self._GetQueryData(lu),
+                                    sort_by_name=self.sort_by_name)
+
+
 def _GetWantedNodes(lu, nodes):
   """Returns list of checked and expanded node names.
 
@@ -665,12 +556,10 @@ def _GetWantedNodes(lu, nodes):
   @raise errors.ProgrammerError: if the nodes parameter is wrong type
 
   """
-  if not nodes:
-    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
-      " non-empty list of nodes whose name is to be expanded.")
+  if nodes:
+    return [_ExpandNodeName(lu.cfg, name) for name in nodes]
 
-  wanted = [_ExpandNodeName(lu.cfg, name) for name in nodes]
-  return utils.NiceSort(wanted)
+  return utils.NiceSort(lu.cfg.GetNodeList())
 
 
 def _GetWantedInstances(lu, instances):
@@ -760,17 +649,19 @@ def _CheckGlobalHvParams(params):
     raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
 
 
-def _CheckNodeOnline(lu, node):
+def _CheckNodeOnline(lu, node, msg=None):
   """Ensure that a given node is online.
 
   @param lu: the LU on behalf of which we make the check
   @param node: the node to check
+  @param msg: if passed, should be a message to replace the default one
   @raise errors.OpPrereqError: if the node is offline
 
   """
+  if msg is None:
+    msg = "Can't use offline node"
   if lu.cfg.GetNodeInfo(node).offline:
-    raise errors.OpPrereqError("Can't use offline node %s" % node,
-                               errors.ECODE_INVAL)
+    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
 
 
 def _CheckNodeNotDrained(lu, node):
@@ -783,7 +674,20 @@ def _CheckNodeNotDrained(lu, node):
   """
   if lu.cfg.GetNodeInfo(node).drained:
     raise errors.OpPrereqError("Can't use drained node %s" % node,
-                               errors.ECODE_INVAL)
+                               errors.ECODE_STATE)
+
+
+def _CheckNodeVmCapable(lu, node):
+  """Ensure that a given node is vm capable.
+
+  @param lu: the LU on behalf of which we make the check
+  @param node: the node to check
+  @raise errors.OpPrereqError: if the node is not vm capable
+
+  """
+  if not lu.cfg.GetNodeInfo(node).vm_capable:
+    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
+                               errors.ECODE_STATE)
 
 
 def _CheckNodeHasOS(lu, node, os_name, force_variant):
@@ -804,40 +708,31 @@ def _CheckNodeHasOS(lu, node, os_name, force_variant):
     _CheckOSVariant(result.payload, os_name)
 
 
-def _RequireFileStorage():
-  """Checks that file storage is enabled.
-
-  @raise errors.OpPrereqError: when file storage is disabled
-
-  """
-  if not constants.ENABLE_FILE_STORAGE:
-    raise errors.OpPrereqError("File storage disabled at configure time",
-                               errors.ECODE_INVAL)
-
-
-def _CheckDiskTemplate(template):
-  """Ensure a given disk template is valid.
-
-  """
-  if template not in constants.DISK_TEMPLATES:
-    msg = ("Invalid disk template name '%s', valid templates are: %s" %
-           (template, utils.CommaJoin(constants.DISK_TEMPLATES)))
-    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
-  if template == constants.DT_FILE:
-    _RequireFileStorage()
-  return True
-
+def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
+  """Ensure that a node has the given secondary ip.
 
-def _CheckStorageType(storage_type):
-  """Ensure a given storage type is valid.
+  @type lu: L{LogicalUnit}
+  @param lu: the LU on behalf of which we make the check
+  @type node: string
+  @param node: the node to check
+  @type secondary_ip: string
+  @param secondary_ip: the ip to check
+  @type prereq: boolean
+  @param prereq: whether to throw a prerequisite or an execute error
+  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
+  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
 
   """
-  if storage_type not in constants.VALID_STORAGE_TYPES:
-    raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
-                               errors.ECODE_INVAL)
-  if storage_type == constants.ST_FILE:
-    _RequireFileStorage()
-  return True
+  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
+  result.Raise("Failure checking secondary ip on node %s" % node,
+               prereq=prereq, ecode=errors.ECODE_ENVIRON)
+  if not result.payload:
+    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
+           " please fix and re-run this command" % secondary_ip)
+    if prereq:
+      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
+    else:
+      raise errors.OpExecError(msg)
 
 
 def _GetClusterDomainSecret():
@@ -983,7 +878,7 @@ def _NICListToTuple(lu, nics):
   """Build a list of nic information tuples.
 
   This list is suitable to be passed to _BuildInstanceHookEnv or as a return
-  value in LUQueryInstanceData.
+  value in LUInstanceQueryData.
 
   @type lu:  L{LogicalUnit}
   @param lu: the logical unit on whose behalf we execute
@@ -1146,7 +1041,7 @@ def _GetStorageTypeArgs(cfg, storage_type):
   # Special case for file storage
   if storage_type == constants.ST_FILE:
     # storage.FileStorage wants a list of storage directories
-    return [[cfg.GetFileStorageDir()]]
+    return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
 
   return []
 
@@ -1200,7 +1095,7 @@ def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
                                  " iallocator.")
 
 
-class LUPostInitCluster(LogicalUnit):
+class LUClusterPostInit(LogicalUnit):
   """Logical unit for running hooks after cluster initialization.
 
   """
@@ -1222,7 +1117,7 @@ class LUPostInitCluster(LogicalUnit):
     return True
 
 
-class LUDestroyCluster(LogicalUnit):
+class LUClusterDestroy(LogicalUnit):
   """Logical unit for destroying the cluster.
 
   """
@@ -1262,7 +1157,6 @@ class LUDestroyCluster(LogicalUnit):
 
     """
     master = self.cfg.GetMasterNode()
-    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
 
     # Run post hooks on master node before it's removed
     hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
@@ -1275,16 +1169,11 @@ class LUDestroyCluster(LogicalUnit):
     result = self.rpc.call_node_stop_master(master, False)
     result.Raise("Could not disable the master role")
 
-    if modify_ssh_setup:
-      priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
-      utils.CreateBackup(priv_key)
-      utils.CreateBackup(pub_key)
-
     return master
 
 
 def _VerifyCertificate(filename):
-  """Verifies a certificate for LUVerifyCluster.
+  """Verifies a certificate for LUClusterVerify.
 
   @type filename: string
   @param filename: Path to PEM file
@@ -1294,7 +1183,7 @@ def _VerifyCertificate(filename):
     cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
                                            utils.ReadFile(filename))
   except Exception, err: # pylint: disable-msg=W0703
-    return (LUVerifyCluster.ETYPE_ERROR,
+    return (LUClusterVerify.ETYPE_ERROR,
             "Failed to load X509 certificate %s: %s" % (filename, err))
 
   (errcode, msg) = \
@@ -1309,26 +1198,19 @@ def _VerifyCertificate(filename):
   if errcode is None:
     return (None, fnamemsg)
   elif errcode == utils.CERT_WARNING:
-    return (LUVerifyCluster.ETYPE_WARNING, fnamemsg)
+    return (LUClusterVerify.ETYPE_WARNING, fnamemsg)
   elif errcode == utils.CERT_ERROR:
-    return (LUVerifyCluster.ETYPE_ERROR, fnamemsg)
+    return (LUClusterVerify.ETYPE_ERROR, fnamemsg)
 
   raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
 
 
-class LUVerifyCluster(LogicalUnit):
+class LUClusterVerify(LogicalUnit):
   """Verifies the cluster status.
 
   """
   HPATH = "cluster-verify"
   HTYPE = constants.HTYPE_CLUSTER
-  _OP_PARAMS = [
-    ("skip_checks", _EmptyList,
-     _TListOf(_TElemOf(constants.VERIFY_OPTIONAL_CHECKS))),
-    ("verbose", False, _TBool),
-    ("error_codes", False, _TBool),
-    ("debug_simulate_errors", False, _TBool),
-    ]
   REQ_BGL = False
 
   TCLUSTER = "cluster"
@@ -1341,8 +1223,9 @@ class LUVerifyCluster(LogicalUnit):
   EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
   EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
   EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
-  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
+  EINSTANCEFAULTYDISK = (TINSTANCE, "EINSTANCEFAULTYDISK")
   EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
+  EINSTANCESPLITGROUPS = (TINSTANCE, "EINSTANCESPLITGROUPS")
   ENODEDRBD = (TNODE, "ENODEDRBD")
   ENODEDRBDHELPER = (TNODE, "ENODEDRBDHELPER")
   ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
@@ -1359,11 +1242,14 @@ class LUVerifyCluster(LogicalUnit):
   ENODEVERSION = (TNODE, "ENODEVERSION")
   ENODESETUP = (TNODE, "ENODESETUP")
   ENODETIME = (TNODE, "ENODETIME")
+  ENODEOOBPATH = (TNODE, "ENODEOOBPATH")
 
   ETYPE_FIELD = "code"
   ETYPE_ERROR = "ERROR"
   ETYPE_WARNING = "WARNING"
 
+  _HOOKS_INDENT_RE = re.compile("^", re.M)
+
   class NodeImage(object):
     """A class representing the logical and physical status of a node.
 
@@ -1374,8 +1260,8 @@ class LUVerifyCluster(LogicalUnit):
     @ivar instances: a list of running instances (runtime)
     @ivar pinst: list of configured primary instances (config)
     @ivar sinst: list of configured secondary instances (config)
-    @ivar sbp: diction of {secondary-node: list of instances} of all peers
-        of this node (config)
+    @ivar sbp: dictionary of {primary-node: list of instances} for all
+        instances for which this node is secondary (config)
     @ivar mfree: free memory, as reported by hypervisor (runtime)
     @ivar dfree: free disk, as reported by the node (runtime)
     @ivar offline: the offline status (config)
@@ -1392,9 +1278,11 @@ class LUVerifyCluster(LogicalUnit):
     @ivar os_fail: whether the RPC call didn't return valid OS data
     @type oslist: list
     @ivar oslist: list of OSes as diagnosed by DiagnoseOS
+    @type vm_capable: boolean
+    @ivar vm_capable: whether the node can host instances
 
     """
-    def __init__(self, offline=False, name=None):
+    def __init__(self, offline=False, name=None, vm_capable=True):
       self.name = name
       self.volumes = {}
       self.instances = []
@@ -1404,6 +1292,7 @@ class LUVerifyCluster(LogicalUnit):
       self.mfree = 0
       self.dfree = 0
       self.offline = offline
+      self.vm_capable = vm_capable
       self.rpc_fail = False
       self.lvm_fail = False
       self.hyp_fail = False
@@ -1508,12 +1397,18 @@ class LUVerifyCluster(LogicalUnit):
                   code=self.ETYPE_WARNING)
 
     hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
-    if isinstance(hyp_result, dict):
+    if ninfo.vm_capable and isinstance(hyp_result, dict):
       for hv_name, hv_result in hyp_result.iteritems():
         test = hv_result is not None
         _ErrorIf(test, self.ENODEHV, node,
                  "hypervisor %s verify failure: '%s'", hv_name, hv_result)
 
+    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
+    if ninfo.vm_capable and isinstance(hvp_result, list):
+      for item, hv_name, hv_result in hvp_result:
+        _ErrorIf(True, self.ENODEHV, node,
+                 "hypervisor %s parameter verify failure (source %s): %s",
+                 hv_name, item, hv_result)
 
     test = nresult.get(constants.NV_NODESETUP,
                            ["Missing NODESETUP results"])
@@ -1633,8 +1528,8 @@ class LUVerifyCluster(LogicalUnit):
           msg = "cannot reach the master IP"
         _ErrorIf(True, self.ENODENET, node, msg)
 
-
-  def _VerifyInstance(self, instance, instanceconfig, node_image):
+  def _VerifyInstance(self, instance, instanceconfig, node_image,
+                      diskstatus):
     """Verify an instance.
 
     This function checks to see if the required block devices are
@@ -1665,11 +1560,29 @@ class LUVerifyCluster(LogicalUnit):
                node_current)
 
     for node, n_img in node_image.items():
-      if (not node == node_current):
+      if node != node_current:
         test = instance in n_img.instances
         _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
                  "instance should not run on node %s", node)
 
+    diskdata = [(nname, success, status, idx)
+                for (nname, disks) in diskstatus.items()
+                for idx, (success, status) in enumerate(disks)]
+
+    for nname, success, bdev_status, idx in diskdata:
+      # the 'ghost node' construction in Exec() ensures that we have a
+      # node here
+      snode = node_image[nname]
+      bad_snode = snode.ghost or snode.offline
+      _ErrorIf(instanceconfig.admin_up and not success and not bad_snode,
+               self.EINSTANCEFAULTYDISK, instance,
+               "couldn't retrieve status for disk/%s on %s: %s",
+               idx, nname, bdev_status)
+      _ErrorIf((instanceconfig.admin_up and success and
+                bdev_status.ldisk_status == constants.LDS_FAULTY),
+               self.EINSTANCEFAULTYDISK, instance,
+               "disk/%s on %s is faulty", idx, nname)
+
   def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
     """Verify if there are any unknown volumes in the cluster.
 
@@ -1710,6 +1623,7 @@ class LUVerifyCluster(LogicalUnit):
     instances it was primary for.
 
     """
+    cluster_info = self.cfg.GetClusterInfo()
     for node, n_img in node_image.items():
       # This code checks that every node which is now listed as
       # secondary has enough memory to host all instances it is
@@ -1719,16 +1633,22 @@ class LUVerifyCluster(LogicalUnit):
       # WARNING: we currently take into account down instances as well
       # as up ones, considering that even if they're down someone
       # might want to start them even in the event of a node failure.
+      if n_img.offline:
+        # we're skipping offline nodes from the N+1 warning, since
+        # most likely we don't have good memory infromation from them;
+        # we already list instances living on such nodes, and that's
+        # enough warning
+        continue
       for prinode, instances in n_img.sbp.items():
         needed_mem = 0
         for instance in instances:
-          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
+          bep = cluster_info.FillBE(instance_cfg[instance])
           if bep[constants.BE_AUTO_BALANCE]:
             needed_mem += bep[constants.BE_MEMORY]
         test = n_img.mfree < needed_mem
         self._ErrorIf(test, self.ENODEN1, node,
-                      "not enough memory on to accommodate"
-                      " failovers should peer node %s fail", prinode)
+                      "not enough memory to accomodate instance failovers"
+                      " should node %s fail", prinode)
 
   def _VerifyNodeFiles(self, ninfo, nresult, file_list, local_cksum,
                        master_files):
@@ -1931,6 +1851,22 @@ class LUVerifyCluster(LogicalUnit):
              "OSes present on reference node %s but missing on this node: %s",
              base.name, utils.CommaJoin(missing))
 
+  def _VerifyOob(self, ninfo, nresult):
+    """Verifies out of band functionality of a node.
+
+    @type ninfo: L{objects.Node}
+    @param ninfo: the node to check
+    @param nresult: the remote results for the node
+
+    """
+    node = ninfo.name
+    # We just have to verify the paths on master and/or master candidates
+    # as the oob helper is invoked on the master
+    if ((ninfo.master_candidate or ninfo.master_capable) and
+        constants.NV_OOB_PATHS in nresult):
+      for path_result in nresult[constants.NV_OOB_PATHS]:
+        self._ErrorIf(path_result, self.ENODEOOBPATH, node, path_result)
+
   def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
     """Verifies and updates the node volume data.
 
@@ -2020,6 +1956,118 @@ class LUVerifyCluster(LogicalUnit):
           _ErrorIf(True, self.ENODERPC, node,
                    "node returned invalid LVM info, check LVM status")
 
+  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
+    """Gets per-disk status information for all instances.
+
+    @type nodelist: list of strings
+    @param nodelist: Node names
+    @type node_image: dict of (name, L{objects.Node})
+    @param node_image: Node objects
+    @type instanceinfo: dict of (name, L{objects.Instance})
+    @param instanceinfo: Instance objects
+    @rtype: {instance: {node: [(succes, payload)]}}
+    @return: a dictionary of per-instance dictionaries with nodes as
+        keys and disk information as values; the disk information is a
+        list of tuples (success, payload)
+
+    """
+    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+
+    node_disks = {}
+    node_disks_devonly = {}
+    diskless_instances = set()
+    diskless = constants.DT_DISKLESS
+
+    for nname in nodelist:
+      node_instances = list(itertools.chain(node_image[nname].pinst,
+                                            node_image[nname].sinst))
+      diskless_instances.update(inst for inst in node_instances
+                                if instanceinfo[inst].disk_template == diskless)
+      disks = [(inst, disk)
+               for inst in node_instances
+               for disk in instanceinfo[inst].disks]
+
+      if not disks:
+        # No need to collect data
+        continue
+
+      node_disks[nname] = disks
+
+      # Creating copies as SetDiskID below will modify the objects and that can
+      # lead to incorrect data returned from nodes
+      devonly = [dev.Copy() for (_, dev) in disks]
+
+      for dev in devonly:
+        self.cfg.SetDiskID(dev, nname)
+
+      node_disks_devonly[nname] = devonly
+
+    assert len(node_disks) == len(node_disks_devonly)
+
+    # Collect data from all nodes with disks
+    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
+                                                          node_disks_devonly)
+
+    assert len(result) == len(node_disks)
+
+    instdisk = {}
+
+    for (nname, nres) in result.items():
+      disks = node_disks[nname]
+
+      if nres.offline:
+        # No data from this node
+        data = len(disks) * [(False, "node offline")]
+      else:
+        msg = nres.fail_msg
+        _ErrorIf(msg, self.ENODERPC, nname,
+                 "while getting disk information: %s", msg)
+        if msg:
+          # No data from this node
+          data = len(disks) * [(False, msg)]
+        else:
+          data = []
+          for idx, i in enumerate(nres.payload):
+            if isinstance(i, (tuple, list)) and len(i) == 2:
+              data.append(i)
+            else:
+              logging.warning("Invalid result from node %s, entry %d: %s",
+                              nname, idx, i)
+              data.append((False, "Invalid result from the remote node"))
+
+      for ((inst, _), status) in zip(disks, data):
+        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
+
+    # Add empty entries for diskless instances.
+    for inst in diskless_instances:
+      assert inst not in instdisk
+      instdisk[inst] = {}
+
+    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
+                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
+                      compat.all(isinstance(s, (tuple, list)) and
+                                 len(s) == 2 for s in statuses)
+                      for inst, nnames in instdisk.items()
+                      for nname, statuses in nnames.items())
+    assert set(instdisk) == set(instanceinfo), "instdisk consistency failure"
+
+    return instdisk
+
+  def _VerifyHVP(self, hvp_data):
+    """Verifies locally the syntax of the hypervisor parameters.
+
+    """
+    for item, hv_name, hv_params in hvp_data:
+      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
+             (item, hv_name))
+      try:
+        hv_class = hypervisor.GetHypervisor(hv_name)
+        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
+        hv_class.CheckParameterSyntax(hv_params)
+      except errors.GenericError, err:
+        self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err))
+
+
   def BuildHooksEnv(self):
     """Build hooks env.
 
@@ -2040,6 +2088,7 @@ class LUVerifyCluster(LogicalUnit):
     """Verify integrity of cluster, performing various test on nodes.
 
     """
+    # This method has too many local variables. pylint: disable-msg=R0914
     self.bad = False
     _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
     verbose = self.op.verbose
@@ -2059,9 +2108,11 @@ class LUVerifyCluster(LogicalUnit):
     cluster = self.cfg.GetClusterInfo()
     nodelist = utils.NiceSort(self.cfg.GetNodeList())
     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
+    nodeinfo_byname = dict(zip(nodelist, nodeinfo))
     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
                         for iname in instancelist)
+    groupinfo = self.cfg.GetAllNodeGroupsInfo()
     i_non_redundant = [] # Non redundant instances
     i_non_a_balanced = [] # Non auto-balanced instances
     n_offline = 0 # Count of offline nodes
@@ -2082,12 +2133,32 @@ class LUVerifyCluster(LogicalUnit):
 
     local_checksums = utils.FingerprintFiles(file_names)
 
+    # Compute the set of hypervisor parameters
+    hvp_data = []
+    for hv_name in hypervisors:
+      hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
+    for os_name, os_hvp in cluster.os_hvp.items():
+      for hv_name, hv_params in os_hvp.items():
+        if not hv_params:
+          continue
+        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
+        hvp_data.append(("os %s" % os_name, hv_name, full_params))
+    # TODO: collapse identical parameter values in a single one
+    for instance in instanceinfo.values():
+      if not instance.hvparams:
+        continue
+      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
+                       cluster.FillHV(instance)))
+    # and verify them locally
+    self._VerifyHVP(hvp_data)
+
     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
     node_verify_param = {
       constants.NV_FILELIST: file_names,
       constants.NV_NODELIST: [node.name for node in nodeinfo
                               if not node.offline],
       constants.NV_HYPERVISOR: hypervisors,
+      constants.NV_HVPARAMS: hvp_data,
       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
                                   node.secondary_ip) for node in nodeinfo
                                  if not node.offline],
@@ -2098,6 +2169,7 @@ class LUVerifyCluster(LogicalUnit):
       constants.NV_TIME: None,
       constants.NV_MASTERIP: (master_node, master_ip),
       constants.NV_OSLIST: None,
+      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
       }
 
     if vg_name is not None:
@@ -2111,9 +2183,20 @@ class LUVerifyCluster(LogicalUnit):
 
     # Build our expected cluster state
     node_image = dict((node.name, self.NodeImage(offline=node.offline,
-                                                 name=node.name))
+                                                 name=node.name,
+                                                 vm_capable=node.vm_capable))
                       for node in nodeinfo)
 
+    # Gather OOB paths
+    oob_paths = []
+    for node in nodeinfo:
+      path = _SupportsOob(self.cfg, node)
+      if path and path not in oob_paths:
+        oob_paths.append(path)
+
+    if oob_paths:
+      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
+
     for instance in instancelist:
       inst_config = instanceinfo[instance]
 
@@ -2150,6 +2233,9 @@ class LUVerifyCluster(LogicalUnit):
 
     all_drbd_map = self.cfg.ComputeDRBDMap()
 
+    feedback_fn("* Gathering disk information (%s nodes)" % len(nodelist))
+    instdisk = self._CollectDiskInfo(nodelist, node_image, instanceinfo)
+
     feedback_fn("* Verifying node status")
 
     refos_img = None
@@ -2185,29 +2271,34 @@ class LUVerifyCluster(LogicalUnit):
       nresult = all_nvinfo[node].payload
 
       nimg.call_ok = self._VerifyNode(node_i, nresult)
+      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
       self._VerifyNodeNetwork(node_i, nresult)
-      self._VerifyNodeLVM(node_i, nresult, vg_name)
       self._VerifyNodeFiles(node_i, nresult, file_names, local_checksums,
                             master_files)
-      self._VerifyNodeDrbd(node_i, nresult, instanceinfo, drbd_helper,
-                           all_drbd_map)
-      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
 
-      self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
-      self._UpdateNodeInstances(node_i, nresult, nimg)
-      self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
-      self._UpdateNodeOS(node_i, nresult, nimg)
-      if not nimg.os_fail:
-        if refos_img is None:
-          refos_img = nimg
-        self._VerifyNodeOS(node_i, nimg, refos_img)
+      self._VerifyOob(node_i, nresult)
+
+      if nimg.vm_capable:
+        self._VerifyNodeLVM(node_i, nresult, vg_name)
+        self._VerifyNodeDrbd(node_i, nresult, instanceinfo, drbd_helper,
+                             all_drbd_map)
+
+        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
+        self._UpdateNodeInstances(node_i, nresult, nimg)
+        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
+        self._UpdateNodeOS(node_i, nresult, nimg)
+        if not nimg.os_fail:
+          if refos_img is None:
+            refos_img = nimg
+          self._VerifyNodeOS(node_i, nimg, refos_img)
 
     feedback_fn("* Verifying instance status")
     for instance in instancelist:
       if verbose:
         feedback_fn("* Verifying instance %s" % instance)
       inst_config = instanceinfo[instance]
-      self._VerifyInstance(instance, inst_config, node_image)
+      self._VerifyInstance(instance, inst_config, node_image,
+                           instdisk[instance])
       inst_nodes_offline = []
 
       pnode = inst_config.primary_node
@@ -2216,8 +2307,8 @@ class LUVerifyCluster(LogicalUnit):
                self.ENODERPC, pnode, "instance %s, connection to"
                " primary node failed", instance)
 
-      if pnode_img.offline:
-        inst_nodes_offline.append(pnode)
+      _ErrorIf(pnode_img.offline, self.EINSTANCEBADNODE, instance,
+               "instance lives on offline node %s", inst_config.primary_node)
 
       # If the instance is non-redundant we cannot survive losing its primary
       # node, so we are not N+1 compliant. On the other hand we have no disk
@@ -2226,11 +2317,33 @@ class LUVerifyCluster(LogicalUnit):
       # FIXME: does not support file-backed instances
       if not inst_config.secondary_nodes:
         i_non_redundant.append(instance)
+
       _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT,
                instance, "instance has multiple secondary nodes: %s",
                utils.CommaJoin(inst_config.secondary_nodes),
                code=self.ETYPE_WARNING)
 
+      if inst_config.disk_template in constants.DTS_NET_MIRROR:
+        pnode = inst_config.primary_node
+        instance_nodes = utils.NiceSort(inst_config.all_nodes)
+        instance_groups = {}
+
+        for node in instance_nodes:
+          instance_groups.setdefault(nodeinfo_byname[node].group,
+                                     []).append(node)
+
+        pretty_list = [
+          "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
+          # Sort so that we always list the primary node first.
+          for group, nodes in sorted(instance_groups.items(),
+                                     key=lambda (_, nodes): pnode in nodes,
+                                     reverse=True)]
+
+        self._ErrorIf(len(instance_groups) > 1, self.EINSTANCESPLITGROUPS,
+                      instance, "instance has primary and secondary nodes in"
+                      " different groups: %s", utils.CommaJoin(pretty_list),
+                      code=self.ETYPE_WARNING)
+
       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
         i_non_a_balanced.append(instance)
 
@@ -2244,12 +2357,14 @@ class LUVerifyCluster(LogicalUnit):
 
       # warn that the instance lives on offline nodes
       _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
-               "instance lives on offline node(s) %s",
+               "instance has offline secondary node(s) %s",
                utils.CommaJoin(inst_nodes_offline))
-      # ... or ghost nodes
+      # ... or ghost/non-vm_capable nodes
       for node in inst_config.all_nodes:
         _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance,
                  "instance lives on ghost node %s", node)
+        _ErrorIf(not node_image[node].vm_capable, self.EINSTANCEBADNODE,
+                 instance, "instance lives on non-vm_capable node %s", node)
 
     feedback_fn("* Verifying orphan volumes")
     reserved = utils.FieldSet(*cluster.reserved_lvs)
@@ -2298,7 +2413,6 @@ class LUVerifyCluster(LogicalUnit):
     # their results
     if phase == constants.HOOKS_PHASE_POST:
       # Used to change hooks' output to proper indentation
-      indent_re = re.compile('^', re.M)
       feedback_fn("* Hooks Results")
       assert hooks_results, "invalid result from hooks"
 
@@ -2319,14 +2433,14 @@ class LUVerifyCluster(LogicalUnit):
           self._ErrorIf(test, self.ENODEHOOKS, node_name,
                         "Script %s failed, output:", script)
           if test:
-            output = indent_re.sub('      ', output)
+            output = self._HOOKS_INDENT_RE.sub('      ', output)
             feedback_fn("%s" % output)
             lu_result = 0
 
       return lu_result
 
 
-class LUVerifyDisks(NoHooksLU):
+class LUClusterVerifyDisks(NoHooksLU):
   """Verifies the cluster disks status.
 
   """
@@ -2350,16 +2464,13 @@ class LUVerifyDisks(NoHooksLU):
     """
     result = res_nodes, res_instances, res_missing = {}, [], {}
 
-    vg_name = self.cfg.GetVGName()
-    nodes = utils.NiceSort(self.cfg.GetNodeList())
-    instances = [self.cfg.GetInstanceInfo(name)
-                 for name in self.cfg.GetInstanceList()]
+    nodes = utils.NiceSort(self.cfg.GetVmCapableNodeList())
+    instances = self.cfg.GetAllInstancesInfo().values()
 
     nv_dict = {}
     for inst in instances:
       inst_lvs = {}
-      if (not inst.admin_up or
-          inst.disk_template not in constants.DTS_NET_MIRROR):
+      if not inst.admin_up:
         continue
       inst.MapLVsByNode(inst_lvs)
       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
@@ -2370,11 +2481,8 @@ class LUVerifyDisks(NoHooksLU):
     if not nv_dict:
       return result
 
-    node_lvs = self.rpc.call_lv_list(nodes, vg_name)
-
-    for node in nodes:
-      # node_volume
-      node_res = node_lvs[node]
+    node_lvs = self.rpc.call_lv_list(nodes, [])
+    for node, node_res in node_lvs.items():
       if node_res.offline:
         continue
       msg = node_res.fail_msg
@@ -2400,11 +2508,10 @@ class LUVerifyDisks(NoHooksLU):
     return result
 
 
-class LURepairDiskSizes(NoHooksLU):
+class LUClusterRepairDiskSizes(NoHooksLU):
   """Verifies the cluster disks sizes.
 
   """
-  _OP_PARAMS = [("instances", _EmptyList, _TListOf(_TNonEmptyString))]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -2489,11 +2596,13 @@ class LURepairDiskSizes(NoHooksLU):
         self.LogWarning("Failure in blockdev_getsize call to node"
                         " %s, ignoring", node)
         continue
-      if len(result.data) != len(dskl):
+      if len(result.payload) != len(dskl):
+        logging.warning("Invalid result from node %s: len(dksl)=%d,"
+                        " result.payload=%s", node, len(dskl), result.payload)
         self.LogWarning("Invalid result from node %s, ignoring node results",
                         node)
         continue
-      for ((instance, idx, disk), size) in zip(dskl, result.data):
+      for ((instance, idx, disk), size) in zip(dskl, result.payload):
         if size is None:
           self.LogWarning("Disk %d of instance %s did not return size"
                           " information, ignoring", idx, instance.name)
@@ -2516,13 +2625,12 @@ class LURepairDiskSizes(NoHooksLU):
     return changed
 
 
-class LURenameCluster(LogicalUnit):
+class LUClusterRename(LogicalUnit):
   """Rename the cluster.
 
   """
   HPATH = "cluster-rename"
   HTYPE = constants.HTYPE_CLUSTER
-  _OP_PARAMS = [("name", _NoDefault, _TNonEmptyString)]
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -2540,7 +2648,8 @@ class LURenameCluster(LogicalUnit):
     """Verify that the passed name is a valid one.
 
     """
-    hostname = netutils.GetHostInfo(self.op.name)
+    hostname = netutils.GetHostname(name=self.op.name,
+                                    family=self.cfg.GetPrimaryIPFamily())
 
     new_name = hostname.name
     self.ip = new_ip = hostname.ip
@@ -2553,7 +2662,7 @@ class LURenameCluster(LogicalUnit):
     if new_ip != old_ip:
       if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
         raise errors.OpPrereqError("The given cluster IP address (%s) is"
-                                   " reachable on the network. Aborting." %
+                                   " reachable on the network" %
                                    new_ip, errors.ECODE_NOTUNIQUE)
 
     self.op.name = new_name
@@ -2578,20 +2687,12 @@ class LURenameCluster(LogicalUnit):
 
       # update the known hosts file
       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
-      node_list = self.cfg.GetNodeList()
+      node_list = self.cfg.GetOnlineNodeList()
       try:
         node_list.remove(master)
       except ValueError:
         pass
-      result = self.rpc.call_upload_file(node_list,
-                                         constants.SSH_KNOWN_HOSTS_FILE)
-      for to_node, to_result in result.iteritems():
-        msg = to_result.fail_msg
-        if msg:
-          msg = ("Copy of file %s to node %s failed: %s" %
-                 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
-          self.proc.LogWarning(msg)
-
+      _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
     finally:
       result = self.rpc.call_node_start_master(master, False, False)
       msg = result.fail_msg
@@ -2602,40 +2703,12 @@ class LURenameCluster(LogicalUnit):
     return clustername
 
 
-class LUSetClusterParams(LogicalUnit):
+class LUClusterSetParams(LogicalUnit):
   """Change the parameters of the cluster.
 
   """
   HPATH = "cluster-modify"
   HTYPE = constants.HTYPE_CLUSTER
-  _OP_PARAMS = [
-    ("vg_name", None, _TMaybeString),
-    ("enabled_hypervisors", None,
-     _TOr(_TAnd(_TListOf(_TElemOf(constants.HYPER_TYPES)), _TTrue), _TNone)),
-    ("hvparams", None, _TOr(_TDictOf(_TNonEmptyString, _TDict), _TNone)),
-    ("beparams", None, _TOr(_TDict, _TNone)),
-    ("os_hvp", None, _TOr(_TDictOf(_TNonEmptyString, _TDict), _TNone)),
-    ("osparams", None, _TOr(_TDictOf(_TNonEmptyString, _TDict), _TNone)),
-    ("candidate_pool_size", None, _TOr(_TStrictPositiveInt, _TNone)),
-    ("uid_pool", None, _NoType),
-    ("add_uids", None, _NoType),
-    ("remove_uids", None, _NoType),
-    ("maintain_node_health", None, _TMaybeBool),
-    ("nicparams", None, _TOr(_TDict, _TNone)),
-    ("drbd_helper", None, _TOr(_TString, _TNone)),
-    ("default_iallocator", None, _TMaybeString),
-    ("reserved_lvs", None, _TOr(_TListOf(_TNonEmptyString), _TNone)),
-    ("hidden_os", None, _TOr(_TListOf(\
-          _TAnd(_TList,
-                _TIsLength(2),
-                _TMap(lambda v: v[0], _TElemOf(constants.DDMS_VALUES)))),
-          _TNone)),
-    ("blacklisted_os", None, _TOr(_TListOf(\
-          _TAnd(_TList,
-                _TIsLength(2),
-                _TMap(lambda v: v[0], _TElemOf(constants.DDMS_VALUES)))),
-          _TNone)),
-    ]
   REQ_BGL = False
 
   def CheckArguments(self):
@@ -2731,6 +2804,10 @@ class LUSetClusterParams(LogicalUnit):
       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
       self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
 
+    if self.op.ndparams:
+      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
+      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
+
     if self.op.nicparams:
       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
       self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
@@ -2884,6 +2961,8 @@ class LUSetClusterParams(LogicalUnit):
       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
     if self.op.osparams:
       self.cluster.osparams = self.new_osp
+    if self.op.ndparams:
+      self.cluster.ndparams = self.new_ndparams
 
     if self.op.candidate_pool_size is not None:
       self.cluster.candidate_pool_size = self.op.candidate_pool_size
@@ -2893,6 +2972,9 @@ class LUSetClusterParams(LogicalUnit):
     if self.op.maintain_node_health is not None:
       self.cluster.maintain_node_health = self.op.maintain_node_health
 
+    if self.op.prealloc_wipe_disks is not None:
+      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
+
     if self.op.add_uids is not None:
       uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
 
@@ -2914,14 +2996,14 @@ class LUSetClusterParams(LogicalUnit):
       for key, val in mods:
         if key == constants.DDM_ADD:
           if val in lst:
-            feedback_fn("OS %s already in %s, ignoring", val, desc)
+            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
           else:
             lst.append(val)
         elif key == constants.DDM_REMOVE:
           if val in lst:
             lst.remove(val)
           else:
-            feedback_fn("OS %s not found in %s, ignoring", val, desc)
+            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
         else:
           raise errors.ProgrammerError("Invalid modification '%s'" % key)
 
@@ -2931,10 +3013,43 @@ class LUSetClusterParams(LogicalUnit):
     if self.op.blacklisted_os:
       helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
 
+    if self.op.master_netdev:
+      master = self.cfg.GetMasterNode()
+      feedback_fn("Shutting down master ip on the current netdev (%s)" %
+                  self.cluster.master_netdev)
+      result = self.rpc.call_node_stop_master(master, False)
+      result.Raise("Could not disable the master ip")
+      feedback_fn("Changing master_netdev from %s to %s" %
+                  (self.cluster.master_netdev, self.op.master_netdev))
+      self.cluster.master_netdev = self.op.master_netdev
+
     self.cfg.Update(self.cluster, feedback_fn)
 
+    if self.op.master_netdev:
+      feedback_fn("Starting the master ip on the new master netdev (%s)" %
+                  self.op.master_netdev)
+      result = self.rpc.call_node_start_master(master, False, False)
+      if result.fail_msg:
+        self.LogWarning("Could not re-enable the master ip on"
+                        " the master, please restart manually: %s",
+                        result.fail_msg)
+
+
+def _UploadHelper(lu, nodes, fname):
+  """Helper for uploading a file and showing warnings.
+
+  """
+  if os.path.exists(fname):
+    result = lu.rpc.call_upload_file(nodes, fname)
+    for to_node, to_result in result.items():
+      msg = to_result.fail_msg
+      if msg:
+        msg = ("Copy of file %s to node %s failed: %s" %
+               (fname, to_node, msg))
+        lu.proc.LogWarning(msg)
+
 
-def _RedistributeAncillaryFiles(lu, additional_nodes=None):
+def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
   """Distribute additional files which are part of the cluster configuration.
 
   ConfigWriter takes care of distributing the config and ssconf files, but
@@ -2943,15 +3058,23 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None):
 
   @param lu: calling logical unit
   @param additional_nodes: list of nodes not in the config to distribute to
+  @type additional_vm: boolean
+  @param additional_vm: whether the additional nodes are vm-capable or not
 
   """
   # 1. Gather target nodes
   myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
   dist_nodes = lu.cfg.GetOnlineNodeList()
+  nvm_nodes = lu.cfg.GetNonVmCapableNodeList()
+  vm_nodes = [name for name in dist_nodes if name not in nvm_nodes]
   if additional_nodes is not None:
     dist_nodes.extend(additional_nodes)
+    if additional_vm:
+      vm_nodes.extend(additional_nodes)
   if myself.name in dist_nodes:
     dist_nodes.remove(myself.name)
+  if myself.name in vm_nodes:
+    vm_nodes.remove(myself.name)
 
   # 2. Gather files to distribute
   dist_files = set([constants.ETC_HOSTS,
@@ -2962,24 +3085,20 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None):
                     constants.CLUSTER_DOMAIN_SECRET_FILE,
                    ])
 
+  vm_files = set()
   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
   for hv_name in enabled_hypervisors:
     hv_class = hypervisor.GetHypervisor(hv_name)
-    dist_files.update(hv_class.GetAncillaryFiles())
+    vm_files.update(hv_class.GetAncillaryFiles())
 
   # 3. Perform the files upload
   for fname in dist_files:
-    if os.path.exists(fname):
-      result = lu.rpc.call_upload_file(dist_nodes, fname)
-      for to_node, to_result in result.items():
-        msg = to_result.fail_msg
-        if msg:
-          msg = ("Copy of file %s to node %s failed: %s" %
-                 (fname, to_node, msg))
-          lu.proc.LogWarning(msg)
+    _UploadHelper(lu, dist_nodes, fname)
+  for fname in vm_files:
+    _UploadHelper(lu, vm_nodes, fname)
 
 
-class LURedistributeConfig(NoHooksLU):
+class LUClusterRedistConf(NoHooksLU):
   """Force the redistribution of cluster configuration.
 
   This is a very simple LU.
@@ -3110,18 +3229,191 @@ def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
   return result
 
 
-class LUDiagnoseOS(NoHooksLU):
-  """Logical unit for OS diagnose/query.
+class LUOobCommand(NoHooksLU):
+  """Logical unit for OOB handling.
 
   """
-  _OP_PARAMS = [
-    _POutputFields,
-    ("names", _EmptyList, _TListOf(_TNonEmptyString)),
-    ]
-  REQ_BGL = False
-  _HID = "hidden"
-  _BLK = "blacklisted"
-  _VLD = "valid"
+  REG_BGL = False
+  _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    This checks:
+     - the node exists in the configuration
+     - OOB is supported
+
+    Any errors are signaled by raising errors.OpPrereqError.
+
+    """
+    self.nodes = []
+    self.master_node = self.cfg.GetMasterNode()
+
+    if self.op.node_names:
+      if self.op.command in self._SKIP_MASTER:
+        if self.master_node in self.op.node_names:
+          master_node_obj = self.cfg.GetNodeInfo(self.master_node)
+          master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
+
+          if master_oob_handler:
+            additional_text = ("Run '%s %s %s' if you want to operate on the"
+                               " master regardless") % (master_oob_handler,
+                                                        self.op.command,
+                                                        self.master_node)
+          else:
+            additional_text = "The master node does not support out-of-band"
+
+          raise errors.OpPrereqError(("Operating on the master node %s is not"
+                                      " allowed for %s\n%s") %
+                                     (self.master_node, self.op.command,
+                                      additional_text), errors.ECODE_INVAL)
+    else:
+      self.op.node_names = self.cfg.GetNodeList()
+      if self.op.command in self._SKIP_MASTER:
+        self.op.node_names.remove(self.master_node)
+
+    if self.op.command in self._SKIP_MASTER:
+      assert self.master_node not in self.op.node_names
+
+    for node_name in self.op.node_names:
+      node = self.cfg.GetNodeInfo(node_name)
+
+      if node is None:
+        raise errors.OpPrereqError("Node %s not found" % node_name,
+                                   errors.ECODE_NOENT)
+      else:
+        self.nodes.append(node)
+
+      if (not self.op.ignore_status and
+          (self.op.command == constants.OOB_POWER_OFF and not node.offline)):
+        raise errors.OpPrereqError(("Cannot power off node %s because it is"
+                                    " not marked offline") % node_name,
+                                   errors.ECODE_STATE)
+
+  def ExpandNames(self):
+    """Gather locks we need.
+
+    """
+    if self.op.node_names:
+      self.op.node_names = [_ExpandNodeName(self.cfg, name)
+                            for name in self.op.node_names]
+      lock_names = self.op.node_names
+    else:
+      lock_names = locking.ALL_SET
+
+    self.needed_locks = {
+      locking.LEVEL_NODE: lock_names,
+      }
+
+  def Exec(self, feedback_fn):
+    """Execute OOB and return result if we expect any.
+
+    """
+    master_node = self.master_node
+    ret = []
+
+    for node in self.nodes:
+      node_entry = [(constants.RS_NORMAL, node.name)]
+      ret.append(node_entry)
+
+      oob_program = _SupportsOob(self.cfg, node)
+
+      if not oob_program:
+        node_entry.append((constants.RS_UNAVAIL, None))
+        continue
+
+      logging.info("Executing out-of-band command '%s' using '%s' on %s",
+                   self.op.command, oob_program, node.name)
+      result = self.rpc.call_run_oob(master_node, oob_program,
+                                     self.op.command, node.name,
+                                     self.op.timeout)
+
+      if result.fail_msg:
+        self.LogWarning("On node '%s' out-of-band RPC failed with: %s",
+                        node.name, result.fail_msg)
+        node_entry.append((constants.RS_NODATA, None))
+      else:
+        try:
+          self._CheckPayload(result)
+        except errors.OpExecError, err:
+          self.LogWarning("The payload returned by '%s' is not valid: %s",
+                          node.name, err)
+          node_entry.append((constants.RS_NODATA, None))
+        else:
+          if self.op.command == constants.OOB_HEALTH:
+            # For health we should log important events
+            for item, status in result.payload:
+              if status in [constants.OOB_STATUS_WARNING,
+                            constants.OOB_STATUS_CRITICAL]:
+                self.LogWarning("On node '%s' item '%s' has status '%s'",
+                                node.name, item, status)
+
+          if self.op.command == constants.OOB_POWER_ON:
+            node.powered = True
+          elif self.op.command == constants.OOB_POWER_OFF:
+            node.powered = False
+          elif self.op.command == constants.OOB_POWER_STATUS:
+            powered = result.payload[constants.OOB_POWER_STATUS_POWERED]
+            if powered != node.powered:
+              logging.warning(("Recorded power state (%s) of node '%s' does not"
+                               " match actual power state (%s)"), node.powered,
+                              node.name, powered)
+
+          # For configuration changing commands we should update the node
+          if self.op.command in (constants.OOB_POWER_ON,
+                                 constants.OOB_POWER_OFF):
+            self.cfg.Update(node, feedback_fn)
+
+          node_entry.append((constants.RS_NORMAL, result.payload))
+
+    return ret
+
+  def _CheckPayload(self, result):
+    """Checks if the payload is valid.
+
+    @param result: RPC result
+    @raises errors.OpExecError: If payload is not valid
+
+    """
+    errs = []
+    if self.op.command == constants.OOB_HEALTH:
+      if not isinstance(result.payload, list):
+        errs.append("command 'health' is expected to return a list but got %s" %
+                    type(result.payload))
+      else:
+        for item, status in result.payload:
+          if status not in constants.OOB_STATUSES:
+            errs.append("health item '%s' has invalid status '%s'" %
+                        (item, status))
+
+    if self.op.command == constants.OOB_POWER_STATUS:
+      if not isinstance(result.payload, dict):
+        errs.append("power-status is expected to return a dict but got %s" %
+                    type(result.payload))
+
+    if self.op.command in [
+        constants.OOB_POWER_ON,
+        constants.OOB_POWER_OFF,
+        constants.OOB_POWER_CYCLE,
+        ]:
+      if result.payload is not None:
+        errs.append("%s is expected to not return payload but got '%s'" %
+                    (self.op.command, result.payload))
+
+    if errs:
+      raise errors.OpExecError("Check of out-of-band payload failed due to %s" %
+                               utils.CommaJoin(errs))
+
+
+
+class LUOsDiagnose(NoHooksLU):
+  """Logical unit for OS diagnose/query.
+
+  """
+  REQ_BGL = False
+  _HID = "hidden"
+  _BLK = "blacklisted"
+  _VLD = "valid"
   _FIELDS_STATIC = utils.FieldSet()
   _FIELDS_DYNAMIC = utils.FieldSet("name", _VLD, "node_status", "variants",
                                    "parameters", "api_versions", _HID, _BLK)
@@ -3187,7 +3479,9 @@ class LUDiagnoseOS(NoHooksLU):
     """Compute the list of OSes.
 
     """
-    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
+    valid_nodes = [node.name
+                   for node in self.cfg.GetAllNodesInfo().values()
+                   if not node.offline and node.vm_capable]
     node_data = self.rpc.call_os_diagnose(valid_nodes)
     pol = self._DiagnoseByOS(node_data)
     output = []
@@ -3248,15 +3542,12 @@ class LUDiagnoseOS(NoHooksLU):
     return output
 
 
-class LURemoveNode(LogicalUnit):
+class LUNodeRemove(LogicalUnit):
   """Logical unit for removing a node.
 
   """
   HPATH = "node-remove"
   HTYPE = constants.HTYPE_NODE
-  _OP_PARAMS = [
-    _PNodeName,
-    ]
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -3339,114 +3630,62 @@ class LURemoveNode(LogicalUnit):
 
     # Remove node from our /etc/hosts
     if self.cfg.GetClusterInfo().modify_etc_hosts:
-      # FIXME: this should be done via an rpc call to node daemon
-      utils.RemoveHostFromEtcHosts(node.name)
+      master_node = self.cfg.GetMasterNode()
+      result = self.rpc.call_etc_hosts_modify(master_node,
+                                              constants.ETC_HOSTS_REMOVE,
+                                              node.name, None)
+      result.Raise("Can't update hosts file with new host data")
       _RedistributeAncillaryFiles(self)
 
 
-class LUQueryNodes(NoHooksLU):
-  """Logical unit for querying nodes.
-
-  """
-  # pylint: disable-msg=W0142
-  _OP_PARAMS = [
-    _POutputFields,
-    ("names", _EmptyList, _TListOf(_TNonEmptyString)),
-    ("use_locking", False, _TBool),
-    ]
-  REQ_BGL = False
-
-  _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
-                    "master_candidate", "offline", "drained"]
-
-  _FIELDS_DYNAMIC = utils.FieldSet(
-    "dtotal", "dfree",
-    "mtotal", "mnode", "mfree",
-    "bootid",
-    "ctotal", "cnodes", "csockets",
-    )
-
-  _FIELDS_STATIC = utils.FieldSet(*[
-    "pinst_cnt", "sinst_cnt",
-    "pinst_list", "sinst_list",
-    "pip", "sip", "tags",
-    "master",
-    "role"] + _SIMPLE_FIELDS
-    )
-
-  def CheckArguments(self):
-    _CheckOutputFields(static=self._FIELDS_STATIC,
-                       dynamic=self._FIELDS_DYNAMIC,
-                       selected=self.op.output_fields)
+class _NodeQuery(_QueryBase):
+  FIELDS = query.NODE_FIELDS
 
-  def ExpandNames(self):
-    self.needed_locks = {}
-    self.share_locks[locking.LEVEL_NODE] = 1
+  def ExpandNames(self, lu):
+    lu.needed_locks = {}
+    lu.share_locks[locking.LEVEL_NODE] = 1
 
-    if self.op.names:
-      self.wanted = _GetWantedNodes(self, self.op.names)
+    if self.names:
+      self.wanted = _GetWantedNodes(lu, self.names)
     else:
       self.wanted = locking.ALL_SET
 
-    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
-    self.do_locking = self.do_node_query and self.op.use_locking
+    self.do_locking = (self.use_locking and
+                       query.NQ_LIVE in self.requested_data)
+
     if self.do_locking:
       # if we don't request only static fields, we need to lock the nodes
-      self.needed_locks[locking.LEVEL_NODE] = self.wanted
+      lu.needed_locks[locking.LEVEL_NODE] = self.wanted
 
-  def Exec(self, feedback_fn):
+  def DeclareLocks(self, lu, level):
+    pass
+
+  def _GetQueryData(self, lu):
     """Computes the list of nodes and their attributes.
 
     """
-    all_info = self.cfg.GetAllNodesInfo()
-    if self.do_locking:
-      nodenames = self.acquired_locks[locking.LEVEL_NODE]
-    elif self.wanted != locking.ALL_SET:
-      nodenames = self.wanted
-      missing = set(nodenames).difference(all_info.keys())
-      if missing:
-        raise errors.OpExecError(
-          "Some nodes were removed before retrieving their data: %s" % missing)
-    else:
-      nodenames = all_info.keys()
+    all_info = lu.cfg.GetAllNodesInfo()
 
-    nodenames = utils.NiceSort(nodenames)
-    nodelist = [all_info[name] for name in nodenames]
+    nodenames = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
 
-    # begin data gathering
+    # Gather data as requested
+    if query.NQ_LIVE in self.requested_data:
+      # filter out non-vm_capable nodes
+      toquery_nodes = [name for name in nodenames if all_info[name].vm_capable]
 
-    if self.do_node_query:
-      live_data = {}
-      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
-                                          self.cfg.GetHypervisorType())
-      for name in nodenames:
-        nodeinfo = node_data[name]
-        if not nodeinfo.fail_msg and nodeinfo.payload:
-          nodeinfo = nodeinfo.payload
-          fn = utils.TryConvert
-          live_data[name] = {
-            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
-            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
-            "mfree": fn(int, nodeinfo.get('memory_free', None)),
-            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
-            "dfree": fn(int, nodeinfo.get('vg_free', None)),
-            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
-            "bootid": nodeinfo.get('bootid', None),
-            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
-            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
-            }
-        else:
-          live_data[name] = {}
+      node_data = lu.rpc.call_node_info(toquery_nodes, lu.cfg.GetVGName(),
+                                        lu.cfg.GetHypervisorType())
+      live_data = dict((name, nresult.payload)
+                       for (name, nresult) in node_data.items()
+                       if not nresult.fail_msg and nresult.payload)
     else:
-      live_data = dict.fromkeys(nodenames, {})
+      live_data = None
 
-    node_to_primary = dict([(name, set()) for name in nodenames])
-    node_to_secondary = dict([(name, set()) for name in nodenames])
+    if query.NQ_INST in self.requested_data:
+      node_to_primary = dict([(name, set()) for name in nodenames])
+      node_to_secondary = dict([(name, set()) for name in nodenames])
 
-    inst_fields = frozenset(("pinst_cnt", "pinst_list",
-                             "sinst_cnt", "sinst_list"))
-    if inst_fields & frozenset(self.op.output_fields):
-      inst_data = self.cfg.GetAllInstancesInfo()
+      inst_data = lu.cfg.GetAllInstancesInfo()
 
       for inst in inst_data.values():
         if inst.primary_node in node_to_primary:
@@ -3454,62 +3693,49 @@ class LUQueryNodes(NoHooksLU):
         for secnode in inst.secondary_nodes:
           if secnode in node_to_secondary:
             node_to_secondary[secnode].add(inst.name)
+    else:
+      node_to_primary = None
+      node_to_secondary = None
+
+    if query.NQ_OOB in self.requested_data:
+      oob_support = dict((name, bool(_SupportsOob(lu.cfg, node)))
+                         for name, node in all_info.iteritems())
+    else:
+      oob_support = None
 
-    master_node = self.cfg.GetMasterNode()
+    if query.NQ_GROUP in self.requested_data:
+      groups = lu.cfg.GetAllNodeGroupsInfo()
+    else:
+      groups = {}
 
-    # end data gathering
+    return query.NodeQueryData([all_info[name] for name in nodenames],
+                               live_data, lu.cfg.GetMasterNode(),
+                               node_to_primary, node_to_secondary, groups,
+                               oob_support, lu.cfg.GetClusterInfo())
 
-    output = []
-    for node in nodelist:
-      node_output = []
-      for field in self.op.output_fields:
-        if field in self._SIMPLE_FIELDS:
-          val = getattr(node, field)
-        elif field == "pinst_list":
-          val = list(node_to_primary[node.name])
-        elif field == "sinst_list":
-          val = list(node_to_secondary[node.name])
-        elif field == "pinst_cnt":
-          val = len(node_to_primary[node.name])
-        elif field == "sinst_cnt":
-          val = len(node_to_secondary[node.name])
-        elif field == "pip":
-          val = node.primary_ip
-        elif field == "sip":
-          val = node.secondary_ip
-        elif field == "tags":
-          val = list(node.GetTags())
-        elif field == "master":
-          val = node.name == master_node
-        elif self._FIELDS_DYNAMIC.Matches(field):
-          val = live_data[node.name].get(field, None)
-        elif field == "role":
-          if node.name == master_node:
-            val = "M"
-          elif node.master_candidate:
-            val = "C"
-          elif node.drained:
-            val = "D"
-          elif node.offline:
-            val = "O"
-          else:
-            val = "R"
-        else:
-          raise errors.ParameterError(field)
-        node_output.append(val)
-      output.append(node_output)
 
-    return output
+class LUNodeQuery(NoHooksLU):
+  """Logical unit for querying nodes.
+
+  """
+  # pylint: disable-msg=W0142
+  REQ_BGL = False
+
+  def CheckArguments(self):
+    self.nq = _NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
+                         self.op.output_fields, self.op.use_locking)
+
+  def ExpandNames(self):
+    self.nq.ExpandNames(self)
+
+  def Exec(self, feedback_fn):
+    return self.nq.OldStyleQuery(self)
 
 
-class LUQueryNodeVolumes(NoHooksLU):
+class LUNodeQueryvols(NoHooksLU):
   """Logical unit for getting volumes on node(s).
 
   """
-  _OP_PARAMS = [
-    ("nodes", _EmptyList, _TListOf(_TNonEmptyString)),
-    ("output_fields", _NoDefault, _TListOf(_TNonEmptyString)),
-    ]
   REQ_BGL = False
   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
   _FIELDS_STATIC = utils.FieldSet("node")
@@ -3584,17 +3810,11 @@ class LUQueryNodeVolumes(NoHooksLU):
     return output
 
 
-class LUQueryNodeStorage(NoHooksLU):
+class LUNodeQueryStorage(NoHooksLU):
   """Logical unit for getting information on storage units on node(s).
 
   """
   _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
-  _OP_PARAMS = [
-    ("nodes", _EmptyList, _TListOf(_TNonEmptyString)),
-    ("storage_type", _NoDefault, _CheckStorageType),
-    ("output_fields", _NoDefault, _TListOf(_TNonEmptyString)),
-    ("name", None, _TMaybeString),
-    ]
   REQ_BGL = False
 
   def CheckArguments(self):
@@ -3673,16 +3893,138 @@ class LUQueryNodeStorage(NoHooksLU):
     return result
 
 
-class LUModifyNodeStorage(NoHooksLU):
+class _InstanceQuery(_QueryBase):
+  FIELDS = query.INSTANCE_FIELDS
+
+  def ExpandNames(self, lu):
+    lu.needed_locks = {}
+    lu.share_locks[locking.LEVEL_INSTANCE] = 1
+    lu.share_locks[locking.LEVEL_NODE] = 1
+
+    if self.names:
+      self.wanted = _GetWantedInstances(lu, self.names)
+    else:
+      self.wanted = locking.ALL_SET
+
+    self.do_locking = (self.use_locking and
+                       query.IQ_LIVE in self.requested_data)
+    if self.do_locking:
+      lu.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
+      lu.needed_locks[locking.LEVEL_NODE] = []
+      lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+  def DeclareLocks(self, lu, level):
+    if level == locking.LEVEL_NODE and self.do_locking:
+      lu._LockInstancesNodes() # pylint: disable-msg=W0212
+
+  def _GetQueryData(self, lu):
+    """Computes the list of instances and their attributes.
+
+    """
+    cluster = lu.cfg.GetClusterInfo()
+    all_info = lu.cfg.GetAllInstancesInfo()
+
+    instance_names = self._GetNames(lu, all_info.keys(), locking.LEVEL_INSTANCE)
+
+    instance_list = [all_info[name] for name in instance_names]
+    nodes = frozenset(itertools.chain(*(inst.all_nodes
+                                        for inst in instance_list)))
+    hv_list = list(set([inst.hypervisor for inst in instance_list]))
+    bad_nodes = []
+    offline_nodes = []
+    wrongnode_inst = set()
+
+    # Gather data as requested
+    if self.requested_data & set([query.IQ_LIVE, query.IQ_CONSOLE]):
+      live_data = {}
+      node_data = lu.rpc.call_all_instances_info(nodes, hv_list)
+      for name in nodes:
+        result = node_data[name]
+        if result.offline:
+          # offline nodes will be in both lists
+          assert result.fail_msg
+          offline_nodes.append(name)
+        if result.fail_msg:
+          bad_nodes.append(name)
+        elif result.payload:
+          for inst in result.payload:
+            if all_info[inst].primary_node == name:
+              live_data.update(result.payload)
+            else:
+              wrongnode_inst.add(inst)
+        # else no instance is alive
+    else:
+      live_data = {}
+
+    if query.IQ_DISKUSAGE in self.requested_data:
+      disk_usage = dict((inst.name,
+                         _ComputeDiskSize(inst.disk_template,
+                                          [{"size": disk.size}
+                                           for disk in inst.disks]))
+                        for inst in instance_list)
+    else:
+      disk_usage = None
+
+    if query.IQ_CONSOLE in self.requested_data:
+      consinfo = {}
+      for inst in instance_list:
+        if inst.name in live_data:
+          # Instance is running
+          consinfo[inst.name] = _GetInstanceConsole(cluster, inst)
+        else:
+          consinfo[inst.name] = None
+      assert set(consinfo.keys()) == set(instance_names)
+    else:
+      consinfo = None
+
+    return query.InstanceQueryData(instance_list, lu.cfg.GetClusterInfo(),
+                                   disk_usage, offline_nodes, bad_nodes,
+                                   live_data, wrongnode_inst, consinfo)
+
+
+class LUQuery(NoHooksLU):
+  """Query for resources/items of a certain kind.
+
+  """
+  # pylint: disable-msg=W0142
+  REQ_BGL = False
+
+  def CheckArguments(self):
+    qcls = _GetQueryImplementation(self.op.what)
+
+    self.impl = qcls(self.op.filter, self.op.fields, False)
+
+  def ExpandNames(self):
+    self.impl.ExpandNames(self)
+
+  def DeclareLocks(self, level):
+    self.impl.DeclareLocks(self, level)
+
+  def Exec(self, feedback_fn):
+    return self.impl.NewStyleQuery(self)
+
+
+class LUQueryFields(NoHooksLU):
+  """Query for resources/items of a certain kind.
+
+  """
+  # pylint: disable-msg=W0142
+  REQ_BGL = False
+
+  def CheckArguments(self):
+    self.qcls = _GetQueryImplementation(self.op.what)
+
+  def ExpandNames(self):
+    self.needed_locks = {}
+
+  def Exec(self, feedback_fn):
+    return self.qcls.FieldsQuery(self.op.fields)
+
+
+class LUNodeModifyStorage(NoHooksLU):
   """Logical unit for modifying a storage volume on a node.
 
   """
-  _OP_PARAMS = [
-    _PNodeName,
-    ("storage_type", _NoDefault, _CheckStorageType),
-    ("name", _NoDefault, _TNonEmptyString),
-    ("changes", _NoDefault, _TDict),
-    ]
   REQ_BGL = False
 
   def CheckArguments(self):
@@ -3721,22 +4063,23 @@ class LUModifyNodeStorage(NoHooksLU):
                  (self.op.name, self.op.node_name))
 
 
-class LUAddNode(LogicalUnit):
+class LUNodeAdd(LogicalUnit):
   """Logical unit for adding node to the cluster.
 
   """
   HPATH = "node-add"
   HTYPE = constants.HTYPE_NODE
-  _OP_PARAMS = [
-    _PNodeName,
-    ("primary_ip", None, _NoType),
-    ("secondary_ip", None, _TMaybeString),
-    ("readd", False, _TBool),
-    ]
+  _NFLAGS = ["master_capable", "vm_capable"]
 
   def CheckArguments(self):
+    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
     # validate/normalize the node name
-    self.op.node_name = netutils.HostInfo.NormalizeName(self.op.node_name)
+    self.hostname = netutils.GetHostname(name=self.op.node_name,
+                                         family=self.primary_ip_family)
+    self.op.node_name = self.hostname.name
+    if self.op.readd and self.op.group:
+      raise errors.OpPrereqError("Cannot pass a node group when a node is"
+                                 " being readded", errors.ECODE_INVAL)
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -3749,6 +4092,8 @@ class LUAddNode(LogicalUnit):
       "NODE_NAME": self.op.node_name,
       "NODE_PIP": self.op.primary_ip,
       "NODE_SIP": self.op.secondary_ip,
+      "MASTER_CAPABLE": str(self.op.master_capable),
+      "VM_CAPABLE": str(self.op.vm_capable),
       }
     nodes_0 = self.cfg.GetNodeList()
     nodes_1 = nodes_0 + [self.op.node_name, ]
@@ -3765,19 +4110,21 @@ class LUAddNode(LogicalUnit):
     Any errors are signaled by raising errors.OpPrereqError.
 
     """
-    node_name = self.op.node_name
     cfg = self.cfg
-
-    dns_data = netutils.GetHostInfo(node_name)
-
-    node = dns_data.name
-    primary_ip = self.op.primary_ip = dns_data.ip
+    hostname = self.hostname
+    node = hostname.name
+    primary_ip = self.op.primary_ip = hostname.ip
     if self.op.secondary_ip is None:
+      if self.primary_ip_family == netutils.IP6Address.family:
+        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
+                                   " IPv4 address must be given as secondary",
+                                   errors.ECODE_INVAL)
       self.op.secondary_ip = primary_ip
-    if not netutils.IsValidIP4(self.op.secondary_ip):
-      raise errors.OpPrereqError("Invalid secondary IP given",
-                                 errors.ECODE_INVAL)
+
     secondary_ip = self.op.secondary_ip
+    if not netutils.IP4Address.IsValid(secondary_ip):
+      raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
+                                 " address" % secondary_ip, errors.ECODE_INVAL)
 
     node_list = cfg.GetNodeList()
     if not self.op.readd and node in node_list:
@@ -3810,6 +4157,27 @@ class LUAddNode(LogicalUnit):
                                    " existing node %s" % existing_node.name,
                                    errors.ECODE_NOTUNIQUE)
 
+    # After this 'if' block, None is no longer a valid value for the
+    # _capable op attributes
+    if self.op.readd:
+      old_node = self.cfg.GetNodeInfo(node)
+      assert old_node is not None, "Can't retrieve locked node %s" % node
+      for attr in self._NFLAGS:
+        if getattr(self.op, attr) is None:
+          setattr(self.op, attr, getattr(old_node, attr))
+    else:
+      for attr in self._NFLAGS:
+        if getattr(self.op, attr) is None:
+          setattr(self.op, attr, True)
+
+    if self.op.readd and not self.op.vm_capable:
+      pri, sec = cfg.GetNodeInstances(node)
+      if pri or sec:
+        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
+                                   " flag set to false, but it already holds"
+                                   " instances" % node,
+                                   errors.ECODE_STATE)
+
     # check that the type of the node (single versus dual homed) is the
     # same as for the master
     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
@@ -3817,11 +4185,11 @@ class LUAddNode(LogicalUnit):
     newbie_singlehomed = secondary_ip == primary_ip
     if master_singlehomed != newbie_singlehomed:
       if master_singlehomed:
-        raise errors.OpPrereqError("The master has no private ip but the"
+        raise errors.OpPrereqError("The master has no secondary ip but the"
                                    " new node has one",
                                    errors.ECODE_INVAL)
       else:
-        raise errors.OpPrereqError("The master has a private ip but the"
+        raise errors.OpPrereqError("The master has a secondary ip but the"
                                    " new node doesn't have one",
                                    errors.ECODE_INVAL)
 
@@ -3835,7 +4203,7 @@ class LUAddNode(LogicalUnit):
       if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
                            source=myself.secondary_ip):
         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
-                                   " based ping to noded port",
+                                   " based ping to node daemon port",
                                    errors.ECODE_ENVIRON)
 
     if self.op.readd:
@@ -3843,17 +4211,24 @@ class LUAddNode(LogicalUnit):
     else:
       exceptions = []
 
-    self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
+    if self.op.master_capable:
+      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
+    else:
+      self.master_candidate = False
 
     if self.op.readd:
-      self.new_node = self.cfg.GetNodeInfo(node)
-      assert self.new_node is not None, "Can't retrieve locked node %s" % node
+      self.new_node = old_node
     else:
+      node_group = cfg.LookupNodeGroup(self.op.group)
       self.new_node = objects.Node(name=node,
                                    primary_ip=primary_ip,
                                    secondary_ip=secondary_ip,
                                    master_candidate=self.master_candidate,
-                                   offline=False, drained=False)
+                                   offline=False, drained=False,
+                                   group=node_group)
+
+    if self.op.ndparams:
+      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
 
   def Exec(self, feedback_fn):
     """Adds the new node to the cluster.
@@ -3862,6 +4237,9 @@ class LUAddNode(LogicalUnit):
     new_node = self.new_node
     node = new_node.name
 
+    # We adding a new node so we assume it's powered
+    new_node.powered = True
+
     # for re-adds, reset the offline/drained/master-candidate flags;
     # we need to reset here, otherwise offline would prevent RPC calls
     # later in the procedure; this also means that if the re-add
@@ -3874,10 +4252,19 @@ class LUAddNode(LogicalUnit):
       if self.changed_primary_ip:
         new_node.primary_ip = self.op.primary_ip
 
+    # copy the master/vm_capable flags
+    for attr in self._NFLAGS:
+      setattr(new_node, attr, getattr(self.op, attr))
+
     # notify the user about any possible mc promotion
     if new_node.master_candidate:
       self.LogInfo("Node will be a master candidate")
 
+    if self.op.ndparams:
+      new_node.ndparams = self.op.ndparams
+    else:
+      new_node.ndparams = {}
+
     # check connectivity
     result = self.rpc.call_version([node])[node]
     result.Raise("Can't get version information from node %s" % node)
@@ -3889,37 +4276,18 @@ class LUAddNode(LogicalUnit):
                                " node version %s" %
                                (constants.PROTOCOL_VERSION, result.payload))
 
-    # setup ssh on node
-    if self.cfg.GetClusterInfo().modify_ssh_setup:
-      logging.info("Copy ssh key to node %s", node)
-      priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
-      keyarray = []
-      keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
-                  constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
-                  priv_key, pub_key]
-
-      for i in keyfiles:
-        keyarray.append(utils.ReadFile(i))
-
-      result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
-                                      keyarray[2], keyarray[3], keyarray[4],
-                                      keyarray[5])
-      result.Raise("Cannot transfer ssh keys to the new node")
-
     # Add node to our /etc/hosts, and add key to known_hosts
     if self.cfg.GetClusterInfo().modify_etc_hosts:
-      # FIXME: this should be done via an rpc call to node daemon
-      utils.AddHostToEtcHosts(new_node.name)
+      master_node = self.cfg.GetMasterNode()
+      result = self.rpc.call_etc_hosts_modify(master_node,
+                                              constants.ETC_HOSTS_ADD,
+                                              self.hostname.name,
+                                              self.hostname.ip)
+      result.Raise("Can't update hosts file with new host data")
 
     if new_node.secondary_ip != new_node.primary_ip:
-      result = self.rpc.call_node_has_ip_address(new_node.name,
-                                                 new_node.secondary_ip)
-      result.Raise("Failure checking secondary ip on node %s" % new_node.name,
-                   prereq=True, ecode=errors.ECODE_ENVIRON)
-      if not result.payload:
-        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
-                                 " you gave (%s). Please fix and re-run this"
-                                 " command." % new_node.secondary_ip)
+      _CheckNodeHasSecondaryIP(self, new_node.name, new_node.secondary_ip,
+                               False)
 
     node_verify_list = [self.cfg.GetMasterNode()]
     node_verify_param = {
@@ -3952,30 +4320,39 @@ class LUAddNode(LogicalUnit):
           self.LogWarning("Node failed to demote itself from master"
                           " candidate status: %s" % msg)
     else:
-      _RedistributeAncillaryFiles(self, additional_nodes=[node])
+      _RedistributeAncillaryFiles(self, additional_nodes=[node],
+                                  additional_vm=self.op.vm_capable)
       self.context.AddNode(new_node, self.proc.GetECId())
 
 
-class LUSetNodeParams(LogicalUnit):
+class LUNodeSetParams(LogicalUnit):
   """Modifies the parameters of a node.
 
+  @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
+      to the node role (as _ROLE_*)
+  @cvar _R2F: a dictionary from node role to tuples of flags
+  @cvar _FLAGS: a list of attribute names corresponding to the flags
+
   """
   HPATH = "node-modify"
   HTYPE = constants.HTYPE_NODE
-  _OP_PARAMS = [
-    _PNodeName,
-    ("master_candidate", None, _TMaybeBool),
-    ("offline", None, _TMaybeBool),
-    ("drained", None, _TMaybeBool),
-    ("auto_promote", False, _TBool),
-    _PForce,
-    ]
   REQ_BGL = False
+  (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
+  _F2R = {
+    (True, False, False): _ROLE_CANDIDATE,
+    (False, True, False): _ROLE_DRAINED,
+    (False, False, True): _ROLE_OFFLINE,
+    (False, False, False): _ROLE_REGULAR,
+    }
+  _R2F = dict((v, k) for k, v in _F2R.items())
+  _FLAGS = ["master_candidate", "drained", "offline"]
 
   def CheckArguments(self):
     self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
-    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
-    if all_mods.count(None) == 3:
+    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
+                self.op.master_capable, self.op.vm_capable,
+                self.op.secondary_ip, self.op.ndparams]
+    if all_mods.count(None) == len(all_mods):
       raise errors.OpPrereqError("Please pass at least one modification",
                                  errors.ECODE_INVAL)
     if all_mods.count(True) > 1:
@@ -3983,16 +4360,20 @@ class LUSetNodeParams(LogicalUnit):
                                  " state at the same time",
                                  errors.ECODE_INVAL)
 
-    # Boolean value that tells us whether we're offlining or draining the node
-    self.offline_or_drain = (self.op.offline == True or
-                             self.op.drained == True)
-    self.deoffline_or_drain = (self.op.offline == False or
-                               self.op.drained == False)
+    # Boolean value that tells us whether we might be demoting from MC
     self.might_demote = (self.op.master_candidate == False or
-                         self.offline_or_drain)
+                         self.op.offline == True or
+                         self.op.drained == True or
+                         self.op.master_capable == False)
+
+    if self.op.secondary_ip:
+      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
+        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
+                                   " address" % self.op.secondary_ip,
+                                   errors.ECODE_INVAL)
 
     self.lock_all = self.op.auto_promote and self.might_demote
-
+    self.lock_instances = self.op.secondary_ip is not None
 
   def ExpandNames(self):
     if self.lock_all:
@@ -4000,6 +4381,29 @@ class LUSetNodeParams(LogicalUnit):
     else:
       self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
 
+    if self.lock_instances:
+      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
+
+  def DeclareLocks(self, level):
+    # If we have locked all instances, before waiting to lock nodes, release
+    # all the ones living on nodes unrelated to the current operation.
+    if level == locking.LEVEL_NODE and self.lock_instances:
+      instances_release = []
+      instances_keep = []
+      self.affected_instances = []
+      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
+        for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
+          instance = self.context.cfg.GetInstanceInfo(instance_name)
+          i_mirrored = instance.disk_template in constants.DTS_NET_MIRROR
+          if i_mirrored and self.op.node_name in instance.all_nodes:
+            instances_keep.append(instance_name)
+            self.affected_instances.append(instance)
+          else:
+            instances_release.append(instance_name)
+        if instances_release:
+          self.context.glm.release(locking.LEVEL_INSTANCE, instances_release)
+          self.acquired_locks[locking.LEVEL_INSTANCE] = instances_keep
+
   def BuildHooksEnv(self):
     """Build hooks env.
 
@@ -4011,6 +4415,8 @@ class LUSetNodeParams(LogicalUnit):
       "MASTER_CANDIDATE": str(self.op.master_candidate),
       "OFFLINE": str(self.op.offline),
       "DRAINED": str(self.op.drained),
+      "MASTER_CAPABLE": str(self.op.master_capable),
+      "VM_CAPABLE": str(self.op.vm_capable),
       }
     nl = [self.cfg.GetMasterNode(),
           self.op.node_name]
@@ -4033,103 +4439,189 @@ class LUSetNodeParams(LogicalUnit):
                                    " only via master-failover",
                                    errors.ECODE_INVAL)
 
+    if self.op.master_candidate and not node.master_capable:
+      raise errors.OpPrereqError("Node %s is not master capable, cannot make"
+                                 " it a master candidate" % node.name,
+                                 errors.ECODE_STATE)
+
+    if self.op.vm_capable == False:
+      (ipri, isec) = self.cfg.GetNodeInstances(self.op.node_name)
+      if ipri or isec:
+        raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
+                                   " the vm_capable flag" % node.name,
+                                   errors.ECODE_STATE)
 
     if node.master_candidate and self.might_demote and not self.lock_all:
-      assert not self.op.auto_promote, "auto-promote set but lock_all not"
+      assert not self.op.auto_promote, "auto_promote set but lock_all not"
       # check if after removing the current node, we're missing master
       # candidates
       (mc_remaining, mc_should, _) = \
           self.cfg.GetMasterCandidateStats(exceptions=[node.name])
       if mc_remaining < mc_should:
         raise errors.OpPrereqError("Not enough master candidates, please"
-                                   " pass auto_promote to allow promotion",
-                                   errors.ECODE_INVAL)
-
-    if (self.op.master_candidate == True and
-        ((node.offline and not self.op.offline == False) or
-         (node.drained and not self.op.drained == False))):
-      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
-                                 " to master_candidate" % node.name,
-                                 errors.ECODE_INVAL)
+                                   " pass auto promote option to allow"
+                                   " promotion", errors.ECODE_STATE)
+
+    self.old_flags = old_flags = (node.master_candidate,
+                                  node.drained, node.offline)
+    assert old_flags in self._F2R, "Un-handled old flags  %s" % str(old_flags)
+    self.old_role = old_role = self._F2R[old_flags]
+
+    # Check for ineffective changes
+    for attr in self._FLAGS:
+      if (getattr(self.op, attr) == False and getattr(node, attr) == False):
+        self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
+        setattr(self.op, attr, None)
+
+    # Past this point, any flag change to False means a transition
+    # away from the respective state, as only real changes are kept
+
+    # TODO: We might query the real power state if it supports OOB
+    if _SupportsOob(self.cfg, node):
+      if self.op.offline is False and not (node.powered or
+                                           self.op.powered == True):
+        raise errors.OpPrereqError(("Please power on node %s first before you"
+                                    " can reset offline state") %
+                                   self.op.node_name)
+    elif self.op.powered is not None:
+      raise errors.OpPrereqError(("Unable to change powered state for node %s"
+                                  " which does not support out-of-band"
+                                  " handling") % self.op.node_name)
 
     # If we're being deofflined/drained, we'll MC ourself if needed
-    if (self.deoffline_or_drain and not self.offline_or_drain and not
-        self.op.master_candidate == True and not node.master_candidate):
-      self.op.master_candidate = _DecideSelfPromotion(self)
-      if self.op.master_candidate:
-        self.LogInfo("Autopromoting node to master candidate")
-
-    return
+    if (self.op.drained == False or self.op.offline == False or
+        (self.op.master_capable and not node.master_capable)):
+      if _DecideSelfPromotion(self):
+        self.op.master_candidate = True
+        self.LogInfo("Auto-promoting node to master candidate")
+
+    # If we're no longer master capable, we'll demote ourselves from MC
+    if self.op.master_capable == False and node.master_candidate:
+      self.LogInfo("Demoting from master candidate")
+      self.op.master_candidate = False
+
+    # Compute new role
+    assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
+    if self.op.master_candidate:
+      new_role = self._ROLE_CANDIDATE
+    elif self.op.drained:
+      new_role = self._ROLE_DRAINED
+    elif self.op.offline:
+      new_role = self._ROLE_OFFLINE
+    elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
+      # False is still in new flags, which means we're un-setting (the
+      # only) True flag
+      new_role = self._ROLE_REGULAR
+    else: # no new flags, nothing, keep old role
+      new_role = old_role
+
+    self.new_role = new_role
+
+    if old_role == self._ROLE_OFFLINE and new_role != old_role:
+      # Trying to transition out of offline status
+      result = self.rpc.call_version([node.name])[node.name]
+      if result.fail_msg:
+        raise errors.OpPrereqError("Node %s is being de-offlined but fails"
+                                   " to report its version: %s" %
+                                   (node.name, result.fail_msg),
+                                   errors.ECODE_STATE)
+      else:
+        self.LogWarning("Transitioning node from offline to online state"
+                        " without using re-add. Please make sure the node"
+                        " is healthy!")
+
+    if self.op.secondary_ip:
+      # Ok even without locking, because this can't be changed by any LU
+      master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
+      master_singlehomed = master.secondary_ip == master.primary_ip
+      if master_singlehomed and self.op.secondary_ip:
+        raise errors.OpPrereqError("Cannot change the secondary ip on a single"
+                                   " homed cluster", errors.ECODE_INVAL)
+
+      if node.offline:
+        if self.affected_instances:
+          raise errors.OpPrereqError("Cannot change secondary ip: offline"
+                                     " node has instances (%s) configured"
+                                     " to use it" % self.affected_instances)
+      else:
+        # On online nodes, check that no instances are running, and that
+        # the node has the new ip and we can reach it.
+        for instance in self.affected_instances:
+          _CheckInstanceDown(self, instance, "cannot change secondary ip")
+
+        _CheckNodeHasSecondaryIP(self, node.name, self.op.secondary_ip, True)
+        if master.name != node.name:
+          # check reachability from master secondary ip to new secondary ip
+          if not netutils.TcpPing(self.op.secondary_ip,
+                                  constants.DEFAULT_NODED_PORT,
+                                  source=master.secondary_ip):
+            raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
+                                       " based ping to node daemon port",
+                                       errors.ECODE_ENVIRON)
+
+    if self.op.ndparams:
+      new_ndparams = _GetUpdatedParams(self.node.ndparams, self.op.ndparams)
+      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
+      self.new_ndparams = new_ndparams
 
   def Exec(self, feedback_fn):
     """Modifies a node.
 
     """
     node = self.node
+    old_role = self.old_role
+    new_role = self.new_role
 
     result = []
-    changed_mc = False
-
-    if self.op.offline is not None:
-      node.offline = self.op.offline
-      result.append(("offline", str(self.op.offline)))
-      if self.op.offline == True:
-        if node.master_candidate:
-          node.master_candidate = False
-          changed_mc = True
-          result.append(("master_candidate", "auto-demotion due to offline"))
-        if node.drained:
-          node.drained = False
-          result.append(("drained", "clear drained status due to offline"))
-
-    if self.op.master_candidate is not None:
-      node.master_candidate = self.op.master_candidate
-      changed_mc = True
-      result.append(("master_candidate", str(self.op.master_candidate)))
-      if self.op.master_candidate == False:
-        rrc = self.rpc.call_node_demote_from_mc(node.name)
-        msg = rrc.fail_msg
+
+    if self.op.ndparams:
+      node.ndparams = self.new_ndparams
+
+    if self.op.powered is not None:
+      node.powered = self.op.powered
+
+    for attr in ["master_capable", "vm_capable"]:
+      val = getattr(self.op, attr)
+      if val is not None:
+        setattr(node, attr, val)
+        result.append((attr, str(val)))
+
+    if new_role != old_role:
+      # Tell the node to demote itself, if no longer MC and not offline
+      if old_role == self._ROLE_CANDIDATE and new_role != self._ROLE_OFFLINE:
+        msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
         if msg:
-          self.LogWarning("Node failed to demote itself: %s" % msg)
-
-    if self.op.drained is not None:
-      node.drained = self.op.drained
-      result.append(("drained", str(self.op.drained)))
-      if self.op.drained == True:
-        if node.master_candidate:
-          node.master_candidate = False
-          changed_mc = True
-          result.append(("master_candidate", "auto-demotion due to drain"))
-          rrc = self.rpc.call_node_demote_from_mc(node.name)
-          msg = rrc.fail_msg
-          if msg:
-            self.LogWarning("Node failed to demote itself: %s" % msg)
-        if node.offline:
-          node.offline = False
-          result.append(("offline", "clear offline status due to drain"))
+          self.LogWarning("Node failed to demote itself: %s", msg)
 
-    # we locked all nodes, we adjust the CP before updating this node
-    if self.lock_all:
-      _AdjustCandidatePool(self, [node.name])
+      new_flags = self._R2F[new_role]
+      for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
+        if of != nf:
+          result.append((desc, str(nf)))
+      (node.master_candidate, node.drained, node.offline) = new_flags
+
+      # we locked all nodes, we adjust the CP before updating this node
+      if self.lock_all:
+        _AdjustCandidatePool(self, [node.name])
+
+    if self.op.secondary_ip:
+      node.secondary_ip = self.op.secondary_ip
+      result.append(("secondary_ip", self.op.secondary_ip))
 
     # this will trigger configuration file update, if needed
     self.cfg.Update(node, feedback_fn)
 
-    # this will trigger job queue propagation or cleanup
-    if changed_mc:
+    # this will trigger job queue propagation or cleanup if the mc
+    # flag changed
+    if [old_role, new_role].count(self._ROLE_CANDIDATE) == 1:
       self.context.ReaddNode(node)
 
     return result
 
 
-class LUPowercycleNode(NoHooksLU):
+class LUNodePowercycle(NoHooksLU):
   """Powercycles a node.
 
   """
-  _OP_PARAMS = [
-    _PNodeName,
-    _PForce,
-    ]
   REQ_BGL = False
 
   def CheckArguments(self):
@@ -4158,7 +4650,7 @@ class LUPowercycleNode(NoHooksLU):
     return result.payload
 
 
-class LUQueryClusterInfo(NoHooksLU):
+class LUClusterQuery(NoHooksLU):
   """Query cluster configuration.
 
   """
@@ -4181,6 +4673,11 @@ class LUQueryClusterInfo(NoHooksLU):
         if hv_name in cluster.enabled_hypervisors:
           os_hvp[os_name][hv_name] = hv_params
 
+    # Convert ip_family to ip_version
+    primary_ip_version = constants.IP4_VERSION
+    if cluster.primary_ip_family == netutils.IP6Address.family:
+      primary_ip_version = constants.IP6_VERSION
+
     result = {
       "software_version": constants.RELEASE_VERSION,
       "protocol_version": constants.PROTOCOL_VERSION,
@@ -4198,11 +4695,13 @@ class LUQueryClusterInfo(NoHooksLU):
       "beparams": cluster.beparams,
       "osparams": cluster.osparams,
       "nicparams": cluster.nicparams,
+      "ndparams": cluster.ndparams,
       "candidate_pool_size": cluster.candidate_pool_size,
       "master_netdev": cluster.master_netdev,
       "volume_group_name": cluster.volume_group_name,
       "drbd_usermode_helper": cluster.drbd_usermode_helper,
       "file_storage_dir": cluster.file_storage_dir,
+      "shared_file_storage_dir": cluster.shared_file_storage_dir,
       "maintain_node_health": cluster.maintain_node_health,
       "ctime": cluster.ctime,
       "mtime": cluster.mtime,
@@ -4211,16 +4710,19 @@ class LUQueryClusterInfo(NoHooksLU):
       "uid_pool": cluster.uid_pool,
       "default_iallocator": cluster.default_iallocator,
       "reserved_lvs": cluster.reserved_lvs,
+      "primary_ip_version": primary_ip_version,
+      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
+      "hidden_os": cluster.hidden_os,
+      "blacklisted_os": cluster.blacklisted_os,
       }
 
     return result
 
 
-class LUQueryConfigValues(NoHooksLU):
+class LUClusterConfigQuery(NoHooksLU):
   """Return configuration values.
 
   """
-  _OP_PARAMS = [_POutputFields]
   REQ_BGL = False
   _FIELDS_DYNAMIC = utils.FieldSet()
   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
@@ -4256,14 +4758,10 @@ class LUQueryConfigValues(NoHooksLU):
     return values
 
 
-class LUActivateInstanceDisks(NoHooksLU):
+class LUInstanceActivateDisks(NoHooksLU):
   """Bring up an instance's disks.
 
   """
-  _OP_PARAMS = [
-    _PInstanceName,
-    ("ignore_size", False, _TBool),
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -4338,13 +4836,13 @@ def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
   # SyncSource, etc.)
 
   # 1st pass, assemble on all nodes in secondary mode
-  for inst_disk in disks:
+  for idx, inst_disk in enumerate(disks):
     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
       if ignore_size:
         node_disk = node_disk.Copy()
         node_disk.UnsetSize()
       lu.cfg.SetDiskID(node_disk, node)
-      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
+      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False, idx)
       msg = result.fail_msg
       if msg:
         lu.proc.LogWarning("Could not prepare block device %s on node %s"
@@ -4356,7 +4854,7 @@ def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
   # FIXME: race condition on drbd migration to primary
 
   # 2nd pass, do only the primary node
-  for inst_disk in disks:
+  for idx, inst_disk in enumerate(disks):
     dev_path = None
 
     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
@@ -4366,7 +4864,7 @@ def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
         node_disk = node_disk.Copy()
         node_disk.UnsetSize()
       lu.cfg.SetDiskID(node_disk, node)
-      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
+      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True, idx)
       msg = result.fail_msg
       if msg:
         lu.proc.LogWarning("Could not prepare block device %s on node %s"
@@ -4402,13 +4900,10 @@ def _StartInstanceDisks(lu, instance, force):
     raise errors.OpExecError("Disk consistency error")
 
 
-class LUDeactivateInstanceDisks(NoHooksLU):
+class LUInstanceDeactivateDisks(NoHooksLU):
   """Shutdown an instance's disks.
 
   """
-  _OP_PARAMS = [
-    _PInstanceName,
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -4435,7 +4930,10 @@ class LUDeactivateInstanceDisks(NoHooksLU):
 
     """
     instance = self.instance
-    _SafeShutdownInstanceDisks(self, instance)
+    if self.op.force:
+      _ShutdownInstanceDisks(self, instance)
+    else:
+      _SafeShutdownInstanceDisks(self, instance)
 
 
 def _SafeShutdownInstanceDisks(lu, instance, disks=None):
@@ -4487,7 +4985,8 @@ def _ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
       if msg:
         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
                       disk.iv_name, node, msg)
-        if not ignore_primary or node != instance.primary_node:
+        if ((node == instance.primary_node and not ignore_primary) or
+            (node != instance.primary_node and not result.offline)):
           all_result = False
   return all_result
 
@@ -4514,7 +5013,7 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
       we cannot check the node
 
   """
-  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
+  nodeinfo = lu.rpc.call_node_info([node], None, hypervisor_name)
   nodeinfo[node].Raise("Can't get data from node %s" % node,
                        prereq=True, ecode=errors.ECODE_ENVIRON)
   free_mem = nodeinfo[node].payload.get('memory_free', None)
@@ -4529,8 +5028,31 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
                                errors.ECODE_NORES)
 
 
-def _CheckNodesFreeDisk(lu, nodenames, requested):
-  """Checks if nodes have enough free disk space in the default VG.
+def _CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
+  """Checks if nodes have enough free disk space in the all VGs.
+
+  This function check if all given nodes have the needed amount of
+  free disk. In case any node has less disk or we cannot get the
+  information from the node, this function raise an OpPrereqError
+  exception.
+
+  @type lu: C{LogicalUnit}
+  @param lu: a logical unit from which we get configuration data
+  @type nodenames: C{list}
+  @param nodenames: the list of node names to check
+  @type req_sizes: C{dict}
+  @param req_sizes: the hash of vg and corresponding amount of disk in
+      MiB to check for
+  @raise errors.OpPrereqError: if the node doesn't have enough disk,
+      or we cannot check the node
+
+  """
+  for vg, req_size in req_sizes.items():
+    _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
+
+
+def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
+  """Checks if nodes have enough free disk space in the specified VG.
 
   This function check if all given nodes have the needed amount of
   free disk. In case any node has less disk or we cannot get the
@@ -4541,42 +5063,37 @@ def _CheckNodesFreeDisk(lu, nodenames, requested):
   @param lu: a logical unit from which we get configuration data
   @type nodenames: C{list}
   @param nodenames: the list of node names to check
+  @type vg: C{str}
+  @param vg: the volume group to check
   @type requested: C{int}
   @param requested: the amount of disk in MiB to check for
-  @raise errors.OpPrereqError: if the node doesn't have enough disk, or
-      we cannot check the node
+  @raise errors.OpPrereqError: if the node doesn't have enough disk,
+      or we cannot check the node
 
   """
-  nodeinfo = lu.rpc.call_node_info(nodenames, lu.cfg.GetVGName(),
-                                   lu.cfg.GetHypervisorType())
+  nodeinfo = lu.rpc.call_node_info(nodenames, vg, None)
   for node in nodenames:
     info = nodeinfo[node]
     info.Raise("Cannot get current information from node %s" % node,
                prereq=True, ecode=errors.ECODE_ENVIRON)
     vg_free = info.payload.get("vg_free", None)
     if not isinstance(vg_free, int):
-      raise errors.OpPrereqError("Can't compute free disk space on node %s,"
-                                 " result was '%s'" % (node, vg_free),
-                                 errors.ECODE_ENVIRON)
+      raise errors.OpPrereqError("Can't compute free disk space on node"
+                                 " %s for vg %s, result was '%s'" %
+                                 (node, vg, vg_free), errors.ECODE_ENVIRON)
     if requested > vg_free:
-      raise errors.OpPrereqError("Not enough disk space on target node %s:"
-                                 " required %d MiB, available %d MiB" %
-                                 (node, requested, vg_free),
+      raise errors.OpPrereqError("Not enough disk space on target node %s"
+                                 " vg %s: required %d MiB, available %d MiB" %
+                                 (node, vg, requested, vg_free),
                                  errors.ECODE_NORES)
 
 
-class LUStartupInstance(LogicalUnit):
+class LUInstanceStartup(LogicalUnit):
   """Starts an instance.
 
   """
   HPATH = "instance-start"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_PARAMS = [
-    _PInstanceName,
-    _PForce,
-    ("hvparams", _EmptyDict, _TDict),
-    ("beparams", _EmptyDict, _TDict),
-    ]
   REQ_BGL = False
 
   def CheckArguments(self):
@@ -4622,21 +5139,30 @@ class LUStartupInstance(LogicalUnit):
       hv_type.CheckParameterSyntax(filled_hvp)
       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
 
-    _CheckNodeOnline(self, instance.primary_node)
+    self.primary_offline = self.cfg.GetNodeInfo(instance.primary_node).offline
 
-    bep = self.cfg.GetClusterInfo().FillBE(instance)
-    # check bridges existence
-    _CheckInstanceBridgesExist(self, instance)
+    if self.primary_offline and self.op.ignore_offline_nodes:
+      self.proc.LogWarning("Ignoring offline primary node")
 
-    remote_info = self.rpc.call_instance_info(instance.primary_node,
-                                              instance.name,
-                                              instance.hypervisor)
-    remote_info.Raise("Error checking node %s" % instance.primary_node,
-                      prereq=True, ecode=errors.ECODE_ENVIRON)
-    if not remote_info.payload: # not running already
-      _CheckNodeFreeMemory(self, instance.primary_node,
-                           "starting instance %s" % instance.name,
-                           bep[constants.BE_MEMORY], instance.hypervisor)
+      if self.op.hvparams or self.op.beparams:
+        self.proc.LogWarning("Overridden parameters are ignored")
+    else:
+      _CheckNodeOnline(self, instance.primary_node)
+
+      bep = self.cfg.GetClusterInfo().FillBE(instance)
+
+      # check bridges existence
+      _CheckInstanceBridgesExist(self, instance)
+
+      remote_info = self.rpc.call_instance_info(instance.primary_node,
+                                                instance.name,
+                                                instance.hypervisor)
+      remote_info.Raise("Error checking node %s" % instance.primary_node,
+                        prereq=True, ecode=errors.ECODE_ENVIRON)
+      if not remote_info.payload: # not running already
+        _CheckNodeFreeMemory(self, instance.primary_node,
+                             "starting instance %s" % instance.name,
+                             bep[constants.BE_MEMORY], instance.hypervisor)
 
   def Exec(self, feedback_fn):
     """Start the instance.
@@ -4647,30 +5173,28 @@ class LUStartupInstance(LogicalUnit):
 
     self.cfg.MarkInstanceUp(instance.name)
 
-    node_current = instance.primary_node
+    if self.primary_offline:
+      assert self.op.ignore_offline_nodes
+      self.proc.LogInfo("Primary node offline, marked instance as started")
+    else:
+      node_current = instance.primary_node
 
-    _StartInstanceDisks(self, instance, force)
+      _StartInstanceDisks(self, instance, force)
 
-    result = self.rpc.call_instance_start(node_current, instance,
-                                          self.op.hvparams, self.op.beparams)
-    msg = result.fail_msg
-    if msg:
-      _ShutdownInstanceDisks(self, instance)
-      raise errors.OpExecError("Could not start instance: %s" % msg)
+      result = self.rpc.call_instance_start(node_current, instance,
+                                            self.op.hvparams, self.op.beparams)
+      msg = result.fail_msg
+      if msg:
+        _ShutdownInstanceDisks(self, instance)
+        raise errors.OpExecError("Could not start instance: %s" % msg)
 
 
-class LURebootInstance(LogicalUnit):
+class LUInstanceReboot(LogicalUnit):
   """Reboot an instance.
 
   """
   HPATH = "instance-reboot"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_PARAMS = [
-    _PInstanceName,
-    ("ignore_secondaries", False, _TBool),
-    ("reboot_type", _NoDefault, _TElemOf(constants.REBOOT_TYPES)),
-    _PShutdownTimeout,
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -4714,10 +5238,16 @@ class LURebootInstance(LogicalUnit):
     ignore_secondaries = self.op.ignore_secondaries
     reboot_type = self.op.reboot_type
 
+    remote_info = self.rpc.call_instance_info(instance.primary_node,
+                                              instance.name,
+                                              instance.hypervisor)
+    remote_info.Raise("Error checking node %s" % instance.primary_node)
+    instance_running = bool(remote_info.payload)
+
     node_current = instance.primary_node
 
-    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
-                       constants.INSTANCE_REBOOT_HARD]:
+    if instance_running and reboot_type in [constants.INSTANCE_REBOOT_SOFT,
+                                            constants.INSTANCE_REBOOT_HARD]:
       for disk in instance.disks:
         self.cfg.SetDiskID(disk, node_current)
       result = self.rpc.call_instance_reboot(node_current, instance,
@@ -4725,10 +5255,14 @@ class LURebootInstance(LogicalUnit):
                                              self.op.shutdown_timeout)
       result.Raise("Could not reboot instance")
     else:
-      result = self.rpc.call_instance_shutdown(node_current, instance,
-                                               self.op.shutdown_timeout)
-      result.Raise("Could not shutdown instance for full reboot")
-      _ShutdownInstanceDisks(self, instance)
+      if instance_running:
+        result = self.rpc.call_instance_shutdown(node_current, instance,
+                                                 self.op.shutdown_timeout)
+        result.Raise("Could not shutdown instance for full reboot")
+        _ShutdownInstanceDisks(self, instance)
+      else:
+        self.LogInfo("Instance %s was already stopped, starting now",
+                     instance.name)
       _StartInstanceDisks(self, instance, ignore_secondaries)
       result = self.rpc.call_instance_start(node_current, instance, None, None)
       msg = result.fail_msg
@@ -4740,16 +5274,12 @@ class LURebootInstance(LogicalUnit):
     self.cfg.MarkInstanceUp(instance.name)
 
 
-class LUShutdownInstance(LogicalUnit):
+class LUInstanceShutdown(LogicalUnit):
   """Shutdown an instance.
 
   """
   HPATH = "instance-stop"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_PARAMS = [
-    _PInstanceName,
-    ("timeout", constants.DEFAULT_SHUTDOWN_TIMEOUT, _TPositiveInt),
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -4775,7 +5305,14 @@ class LUShutdownInstance(LogicalUnit):
     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
-    _CheckNodeOnline(self, self.instance.primary_node)
+
+    self.primary_offline = \
+      self.cfg.GetNodeInfo(self.instance.primary_node).offline
+
+    if self.primary_offline and self.op.ignore_offline_nodes:
+      self.proc.LogWarning("Ignoring offline primary node")
+    else:
+      _CheckNodeOnline(self, self.instance.primary_node)
 
   def Exec(self, feedback_fn):
     """Shutdown the instance.
@@ -4784,26 +5321,27 @@ class LUShutdownInstance(LogicalUnit):
     instance = self.instance
     node_current = instance.primary_node
     timeout = self.op.timeout
+
     self.cfg.MarkInstanceDown(instance.name)
-    result = self.rpc.call_instance_shutdown(node_current, instance, timeout)
-    msg = result.fail_msg
-    if msg:
-      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
 
-    _ShutdownInstanceDisks(self, instance)
+    if self.primary_offline:
+      assert self.op.ignore_offline_nodes
+      self.proc.LogInfo("Primary node offline, marked instance as stopped")
+    else:
+      result = self.rpc.call_instance_shutdown(node_current, instance, timeout)
+      msg = result.fail_msg
+      if msg:
+        self.proc.LogWarning("Could not shutdown instance: %s" % msg)
+
+      _ShutdownInstanceDisks(self, instance)
 
 
-class LUReinstallInstance(LogicalUnit):
+class LUInstanceReinstall(LogicalUnit):
   """Reinstall an instance.
 
   """
   HPATH = "instance-reinstall"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_PARAMS = [
-    _PInstanceName,
-    ("os_type", None, _TMaybeString),
-    ("force_variant", False, _TBool),
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -4828,7 +5366,11 @@ class LUReinstallInstance(LogicalUnit):
     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
-    _CheckNodeOnline(self, instance.primary_node)
+    _CheckNodeOnline(self, instance.primary_node, "Instance primary node"
+                     " offline, cannot reinstall")
+    for node in instance.secondary_nodes:
+      _CheckNodeOnline(self, node, "Instance secondary node offline,"
+                       " cannot reinstall")
 
     if instance.disk_template == constants.DT_DISKLESS:
       raise errors.OpPrereqError("Instance '%s' has no disks" %
@@ -4840,6 +5382,18 @@ class LUReinstallInstance(LogicalUnit):
       # OS verification
       pnode = _ExpandNodeName(self.cfg, instance.primary_node)
       _CheckNodeHasOS(self, pnode, self.op.os_type, self.op.force_variant)
+      instance_os = self.op.os_type
+    else:
+      instance_os = instance.os
+
+    nodelist = list(instance.all_nodes)
+
+    if self.op.osparams:
+      i_osdict = _GetUpdatedParams(instance.osparams, self.op.osparams)
+      _CheckOSParams(self, True, nodelist, instance_os, i_osdict)
+      self.os_inst = i_osdict # the new dict (without defaults)
+    else:
+      self.os_inst = None
 
     self.instance = instance
 
@@ -4852,6 +5406,7 @@ class LUReinstallInstance(LogicalUnit):
     if self.op.os_type is not None:
       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
       inst.os = self.op.os_type
+      # Write to configuration
       self.cfg.Update(inst, feedback_fn)
 
     _StartInstanceDisks(self, inst, None)
@@ -4859,23 +5414,20 @@ class LUReinstallInstance(LogicalUnit):
       feedback_fn("Running the instance OS create scripts...")
       # FIXME: pass debug option from opcode to backend
       result = self.rpc.call_instance_os_add(inst.primary_node, inst, True,
-                                             self.op.debug_level)
+                                             self.op.debug_level,
+                                             osparams=self.os_inst)
       result.Raise("Could not install OS for instance %s on node %s" %
                    (inst.name, inst.primary_node))
     finally:
       _ShutdownInstanceDisks(self, inst)
 
 
-class LURecreateInstanceDisks(LogicalUnit):
+class LUInstanceRecreateDisks(LogicalUnit):
   """Recreate an instance's missing disks.
 
   """
   HPATH = "instance-recreate-disks"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_PARAMS = [
-    _PInstanceName,
-    ("disks", _EmptyList, _TListOf(_TPositiveInt)),
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -4930,18 +5482,12 @@ class LURecreateInstanceDisks(LogicalUnit):
     _CreateDisks(self, self.instance, to_skip=to_skip)
 
 
-class LURenameInstance(LogicalUnit):
+class LUInstanceRename(LogicalUnit):
   """Rename an instance.
 
   """
   HPATH = "instance-rename"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_PARAMS = [
-    _PInstanceName,
-    ("new_name", _NoDefault, _TNonEmptyString),
-    ("ip_check", False, _TBool),
-    ("name_check", True, _TBool),
-    ]
 
   def CheckArguments(self):
     """Check arguments.
@@ -4979,29 +5525,38 @@ class LURenameInstance(LogicalUnit):
 
     new_name = self.op.new_name
     if self.op.name_check:
-      hostinfo = netutils.HostInfo(netutils.HostInfo.NormalizeName(new_name))
-      new_name = self.op.new_name = hostinfo.name
+      hostname = netutils.GetHostname(name=new_name)
+      self.LogInfo("Resolved given name '%s' to '%s'", new_name,
+                   hostname.name)
+      if not utils.MatchNameComponent(self.op.new_name, [hostname.name]):
+        raise errors.OpPrereqError(("Resolved hostname '%s' does not look the"
+                                    " same as given hostname '%s'") %
+                                    (hostname.name, self.op.new_name),
+                                    errors.ECODE_INVAL)
+      new_name = self.op.new_name = hostname.name
       if (self.op.ip_check and
-          netutils.TcpPing(hostinfo.ip, constants.DEFAULT_NODED_PORT)):
+          netutils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT)):
         raise errors.OpPrereqError("IP %s of instance %s already in use" %
-                                   (hostinfo.ip, new_name),
+                                   (hostname.ip, new_name),
                                    errors.ECODE_NOTUNIQUE)
 
     instance_list = self.cfg.GetInstanceList()
-    if new_name in instance_list:
+    if new_name in instance_list and new_name != instance.name:
       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
                                  new_name, errors.ECODE_EXISTS)
 
-
   def Exec(self, feedback_fn):
-    """Reinstall the instance.
+    """Rename the instance.
 
     """
     inst = self.instance
     old_name = inst.name
 
-    if inst.disk_template == constants.DT_FILE:
+    rename_file_storage = False
+    if (inst.disk_template in (constants.DT_FILE, constants.DT_SHARED_FILE) and
+        self.op.new_name != inst.name):
       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
+      rename_file_storage = True
 
     self.cfg.RenameInstance(inst.name, self.op.new_name)
     # Change the instance lock. This is definitely safe while we hold the BGL
@@ -5011,7 +5566,7 @@ class LURenameInstance(LogicalUnit):
     # re-read the instance from the configuration after rename
     inst = self.cfg.GetInstanceInfo(self.op.new_name)
 
-    if inst.disk_template == constants.DT_FILE:
+    if rename_file_storage:
       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
                                                      old_file_storage_dir,
@@ -5037,17 +5592,12 @@ class LURenameInstance(LogicalUnit):
     return inst.name
 
 
-class LURemoveInstance(LogicalUnit):
+class LUInstanceRemove(LogicalUnit):
   """Remove an instance.
 
   """
   HPATH = "instance-remove"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_PARAMS = [
-    _PInstanceName,
-    ("ignore_failures", False, _TBool),
-    _PShutdownTimeout,
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -5125,308 +5675,33 @@ def _RemoveInstance(lu, feedback_fn, instance, ignore_failures):
   lu.remove_locks[locking.LEVEL_INSTANCE] = instance.name
 
 
-class LUQueryInstances(NoHooksLU):
+class LUInstanceQuery(NoHooksLU):
   """Logical unit for querying instances.
 
   """
   # pylint: disable-msg=W0142
-  _OP_PARAMS = [
-    ("output_fields", _NoDefault, _TListOf(_TNonEmptyString)),
-    ("names", _EmptyList, _TListOf(_TNonEmptyString)),
-    ("use_locking", False, _TBool),
-    ]
   REQ_BGL = False
-  _SIMPLE_FIELDS = ["name", "os", "network_port", "hypervisor",
-                    "serial_no", "ctime", "mtime", "uuid"]
-  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
-                                    "admin_state",
-                                    "disk_template", "ip", "mac", "bridge",
-                                    "nic_mode", "nic_link",
-                                    "sda_size", "sdb_size", "vcpus", "tags",
-                                    "network_port", "beparams",
-                                    r"(disk)\.(size)/([0-9]+)",
-                                    r"(disk)\.(sizes)", "disk_usage",
-                                    r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
-                                    r"(nic)\.(bridge)/([0-9]+)",
-                                    r"(nic)\.(macs|ips|modes|links|bridges)",
-                                    r"(disk|nic)\.(count)",
-                                    "hvparams",
-                                    ] + _SIMPLE_FIELDS +
-                                  ["hv/%s" % name
-                                   for name in constants.HVS_PARAMETERS
-                                   if name not in constants.HVC_GLOBALS] +
-                                  ["be/%s" % name
-                                   for name in constants.BES_PARAMETERS])
-  _FIELDS_DYNAMIC = utils.FieldSet("oper_state",
-                                   "oper_ram",
-                                   "oper_vcpus",
-                                   "status")
-
 
   def CheckArguments(self):
-    _CheckOutputFields(static=self._FIELDS_STATIC,
-                       dynamic=self._FIELDS_DYNAMIC,
-                       selected=self.op.output_fields)
+    self.iq = _InstanceQuery(qlang.MakeSimpleFilter("name", self.op.names),
+                             self.op.output_fields, self.op.use_locking)
 
   def ExpandNames(self):
-    self.needed_locks = {}
-    self.share_locks[locking.LEVEL_INSTANCE] = 1
-    self.share_locks[locking.LEVEL_NODE] = 1
-
-    if self.op.names:
-      self.wanted = _GetWantedInstances(self, self.op.names)
-    else:
-      self.wanted = locking.ALL_SET
-
-    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
-    self.do_locking = self.do_node_query and self.op.use_locking
-    if self.do_locking:
-      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
-      self.needed_locks[locking.LEVEL_NODE] = []
-      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+    self.iq.ExpandNames(self)
 
   def DeclareLocks(self, level):
-    if level == locking.LEVEL_NODE and self.do_locking:
-      self._LockInstancesNodes()
+    self.iq.DeclareLocks(self, level)
 
   def Exec(self, feedback_fn):
-    """Computes the list of nodes and their attributes.
-
-    """
-    # pylint: disable-msg=R0912
-    # way too many branches here
-    all_info = self.cfg.GetAllInstancesInfo()
-    if self.wanted == locking.ALL_SET:
-      # caller didn't specify instance names, so ordering is not important
-      if self.do_locking:
-        instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
-      else:
-        instance_names = all_info.keys()
-      instance_names = utils.NiceSort(instance_names)
-    else:
-      # caller did specify names, so we must keep the ordering
-      if self.do_locking:
-        tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
-      else:
-        tgt_set = all_info.keys()
-      missing = set(self.wanted).difference(tgt_set)
-      if missing:
-        raise errors.OpExecError("Some instances were removed before"
-                                 " retrieving their data: %s" % missing)
-      instance_names = self.wanted
-
-    instance_list = [all_info[iname] for iname in instance_names]
-
-    # begin data gathering
-
-    nodes = frozenset([inst.primary_node for inst in instance_list])
-    hv_list = list(set([inst.hypervisor for inst in instance_list]))
-
-    bad_nodes = []
-    off_nodes = []
-    if self.do_node_query:
-      live_data = {}
-      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
-      for name in nodes:
-        result = node_data[name]
-        if result.offline:
-          # offline nodes will be in both lists
-          off_nodes.append(name)
-        if result.fail_msg:
-          bad_nodes.append(name)
-        else:
-          if result.payload:
-            live_data.update(result.payload)
-          # else no instance is alive
-    else:
-      live_data = dict([(name, {}) for name in instance_names])
-
-    # end data gathering
-
-    HVPREFIX = "hv/"
-    BEPREFIX = "be/"
-    output = []
-    cluster = self.cfg.GetClusterInfo()
-    for instance in instance_list:
-      iout = []
-      i_hv = cluster.FillHV(instance, skip_globals=True)
-      i_be = cluster.FillBE(instance)
-      i_nicp = [cluster.SimpleFillNIC(nic.nicparams) for nic in instance.nics]
-      for field in self.op.output_fields:
-        st_match = self._FIELDS_STATIC.Matches(field)
-        if field in self._SIMPLE_FIELDS:
-          val = getattr(instance, field)
-        elif field == "pnode":
-          val = instance.primary_node
-        elif field == "snodes":
-          val = list(instance.secondary_nodes)
-        elif field == "admin_state":
-          val = instance.admin_up
-        elif field == "oper_state":
-          if instance.primary_node in bad_nodes:
-            val = None
-          else:
-            val = bool(live_data.get(instance.name))
-        elif field == "status":
-          if instance.primary_node in off_nodes:
-            val = "ERROR_nodeoffline"
-          elif instance.primary_node in bad_nodes:
-            val = "ERROR_nodedown"
-          else:
-            running = bool(live_data.get(instance.name))
-            if running:
-              if instance.admin_up:
-                val = "running"
-              else:
-                val = "ERROR_up"
-            else:
-              if instance.admin_up:
-                val = "ERROR_down"
-              else:
-                val = "ADMIN_down"
-        elif field == "oper_ram":
-          if instance.primary_node in bad_nodes:
-            val = None
-          elif instance.name in live_data:
-            val = live_data[instance.name].get("memory", "?")
-          else:
-            val = "-"
-        elif field == "oper_vcpus":
-          if instance.primary_node in bad_nodes:
-            val = None
-          elif instance.name in live_data:
-            val = live_data[instance.name].get("vcpus", "?")
-          else:
-            val = "-"
-        elif field == "vcpus":
-          val = i_be[constants.BE_VCPUS]
-        elif field == "disk_template":
-          val = instance.disk_template
-        elif field == "ip":
-          if instance.nics:
-            val = instance.nics[0].ip
-          else:
-            val = None
-        elif field == "nic_mode":
-          if instance.nics:
-            val = i_nicp[0][constants.NIC_MODE]
-          else:
-            val = None
-        elif field == "nic_link":
-          if instance.nics:
-            val = i_nicp[0][constants.NIC_LINK]
-          else:
-            val = None
-        elif field == "bridge":
-          if (instance.nics and
-              i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
-            val = i_nicp[0][constants.NIC_LINK]
-          else:
-            val = None
-        elif field == "mac":
-          if instance.nics:
-            val = instance.nics[0].mac
-          else:
-            val = None
-        elif field == "sda_size" or field == "sdb_size":
-          idx = ord(field[2]) - ord('a')
-          try:
-            val = instance.FindDisk(idx).size
-          except errors.OpPrereqError:
-            val = None
-        elif field == "disk_usage": # total disk usage per node
-          disk_sizes = [{'size': disk.size} for disk in instance.disks]
-          val = _ComputeDiskSize(instance.disk_template, disk_sizes)
-        elif field == "tags":
-          val = list(instance.GetTags())
-        elif field == "hvparams":
-          val = i_hv
-        elif (field.startswith(HVPREFIX) and
-              field[len(HVPREFIX):] in constants.HVS_PARAMETERS and
-              field[len(HVPREFIX):] not in constants.HVC_GLOBALS):
-          val = i_hv.get(field[len(HVPREFIX):], None)
-        elif field == "beparams":
-          val = i_be
-        elif (field.startswith(BEPREFIX) and
-              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
-          val = i_be.get(field[len(BEPREFIX):], None)
-        elif st_match and st_match.groups():
-          # matches a variable list
-          st_groups = st_match.groups()
-          if st_groups and st_groups[0] == "disk":
-            if st_groups[1] == "count":
-              val = len(instance.disks)
-            elif st_groups[1] == "sizes":
-              val = [disk.size for disk in instance.disks]
-            elif st_groups[1] == "size":
-              try:
-                val = instance.FindDisk(st_groups[2]).size
-              except errors.OpPrereqError:
-                val = None
-            else:
-              assert False, "Unhandled disk parameter"
-          elif st_groups[0] == "nic":
-            if st_groups[1] == "count":
-              val = len(instance.nics)
-            elif st_groups[1] == "macs":
-              val = [nic.mac for nic in instance.nics]
-            elif st_groups[1] == "ips":
-              val = [nic.ip for nic in instance.nics]
-            elif st_groups[1] == "modes":
-              val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
-            elif st_groups[1] == "links":
-              val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
-            elif st_groups[1] == "bridges":
-              val = []
-              for nicp in i_nicp:
-                if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
-                  val.append(nicp[constants.NIC_LINK])
-                else:
-                  val.append(None)
-            else:
-              # index-based item
-              nic_idx = int(st_groups[2])
-              if nic_idx >= len(instance.nics):
-                val = None
-              else:
-                if st_groups[1] == "mac":
-                  val = instance.nics[nic_idx].mac
-                elif st_groups[1] == "ip":
-                  val = instance.nics[nic_idx].ip
-                elif st_groups[1] == "mode":
-                  val = i_nicp[nic_idx][constants.NIC_MODE]
-                elif st_groups[1] == "link":
-                  val = i_nicp[nic_idx][constants.NIC_LINK]
-                elif st_groups[1] == "bridge":
-                  nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
-                  if nic_mode == constants.NIC_MODE_BRIDGED:
-                    val = i_nicp[nic_idx][constants.NIC_LINK]
-                  else:
-                    val = None
-                else:
-                  assert False, "Unhandled NIC parameter"
-          else:
-            assert False, ("Declared but unhandled variable parameter '%s'" %
-                           field)
-        else:
-          assert False, "Declared but unhandled parameter '%s'" % field
-        iout.append(val)
-      output.append(iout)
-
-    return output
+    return self.iq.OldStyleQuery(self)
 
 
-class LUFailoverInstance(LogicalUnit):
+class LUInstanceFailover(LogicalUnit):
   """Failover an instance.
 
   """
   HPATH = "instance-failover"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_PARAMS = [
-    _PInstanceName,
-    ("ignore_consistency", False, _TBool),
-    _PShutdownTimeout,
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -5505,6 +5780,7 @@ class LUFailoverInstance(LogicalUnit):
 
     """
     instance = self.instance
+    primary_node = self.cfg.GetNodeInfo(instance.primary_node)
 
     source_node = instance.primary_node
     target_node = instance.secondary_nodes[0]
@@ -5528,7 +5804,7 @@ class LUFailoverInstance(LogicalUnit):
                                              self.op.shutdown_timeout)
     msg = result.fail_msg
     if msg:
-      if self.op.ignore_consistency:
+      if self.op.ignore_consistency or primary_node.offline:
         self.proc.LogWarning("Could not shutdown instance %s on node %s."
                              " Proceeding anyway. Please make sure node"
                              " %s is down. Error details: %s",
@@ -5567,7 +5843,7 @@ class LUFailoverInstance(LogicalUnit):
                                  (instance.name, target_node, msg))
 
 
-class LUMigrateInstance(LogicalUnit):
+class LUInstanceMigrate(LogicalUnit):
   """Migrate an instance.
 
   This is migration without shutting down, compared to the failover,
@@ -5576,13 +5852,6 @@ class LUMigrateInstance(LogicalUnit):
   """
   HPATH = "instance-migrate"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_PARAMS = [
-    _PInstanceName,
-    _PMigrationMode,
-    _PMigrationLive,
-    ("cleanup", False, _TBool),
-    ]
-
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -5623,17 +5892,12 @@ class LUMigrateInstance(LogicalUnit):
     return env, nl, nl_post
 
 
-class LUMoveInstance(LogicalUnit):
+class LUInstanceMove(LogicalUnit):
   """Move an instance by data-copying.
 
   """
   HPATH = "instance-move"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_PARAMS = [
-    _PInstanceName,
-    ("target_node", _NoDefault, _TNonEmptyString),
-    _PShutdownTimeout,
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -5692,6 +5956,7 @@ class LUMoveInstance(LogicalUnit):
 
     _CheckNodeOnline(self, target_node)
     _CheckNodeNotDrained(self, target_node)
+    _CheckNodeVmCapable(self, target_node)
 
     if instance.admin_up:
       # check memory requirements on the secondary node
@@ -5752,7 +6017,7 @@ class LUMoveInstance(LogicalUnit):
     for idx, disk in enumerate(instance.disks):
       self.LogInfo("Copying data for disk %d", idx)
       result = self.rpc.call_blockdev_assemble(target_node, disk,
-                                               instance.name, True)
+                                               instance.name, True, idx)
       if result.fail_msg:
         self.LogWarning("Can't assemble newly created disk %d: %s",
                         idx, result.fail_msg)
@@ -5802,17 +6067,12 @@ class LUMoveInstance(LogicalUnit):
                                  (instance.name, target_node, msg))
 
 
-class LUMigrateNode(LogicalUnit):
+class LUNodeMigrate(LogicalUnit):
   """Migrate all instances from a node.
 
   """
   HPATH = "node-migrate"
   HTYPE = constants.HTYPE_NODE
-  _OP_PARAMS = [
-    _PNodeName,
-    _PMigrationMode,
-    _PMigrationLive,
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -6300,13 +6560,12 @@ def _GenerateUniqueNames(lu, exts):
   return results
 
 
-def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
+def _GenerateDRBD8Branch(lu, primary, secondary, size, vgname, names, iv_name,
                          p_minor, s_minor):
   """Generate a drbd8 device complete with its children.
 
   """
   port = lu.cfg.AllocatePort()
-  vgname = lu.cfg.GetVGName()
   shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
                           logical_id=(vgname, names[0]))
@@ -6325,7 +6584,7 @@ def _GenerateDiskTemplate(lu, template_name,
                           instance_name, primary_node,
                           secondary_nodes, disk_info,
                           file_storage_dir, file_driver,
-                          base_index):
+                          base_index, feedback_fn):
   """Generate the entire disk layout for a given template type.
 
   """
@@ -6344,8 +6603,10 @@ def _GenerateDiskTemplate(lu, template_name,
                                       for i in range(disk_count)])
     for idx, disk in enumerate(disk_info):
       disk_index = idx + base_index
+      vg = disk.get("vg", vgname)
+      feedback_fn("* disk %i, vg %s, name %s" % (idx, vg, names[idx]))
       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
-                              logical_id=(vgname, names[idx]),
+                              logical_id=(vg, names[idx]),
                               iv_name="disk/%d" % disk_index,
                               mode=disk["mode"])
       disks.append(disk_dev)
@@ -6363,8 +6624,9 @@ def _GenerateDiskTemplate(lu, template_name,
       names.append(lv_prefix + "_meta")
     for idx, disk in enumerate(disk_info):
       disk_index = idx + base_index
+      vg = disk.get("vg", vgname)
       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
-                                      disk["size"], names[idx*2:idx*2+2],
+                                      disk["size"], vg, names[idx*2:idx*2+2],
                                       "disk/%d" % disk_index,
                                       minors[idx*2], minors[idx*2+1])
       disk_dev.mode = disk["mode"]
@@ -6373,7 +6635,22 @@ def _GenerateDiskTemplate(lu, template_name,
     if len(secondary_nodes) != 0:
       raise errors.ProgrammerError("Wrong template configuration")
 
-    _RequireFileStorage()
+    opcodes.RequireFileStorage()
+
+    for idx, disk in enumerate(disk_info):
+      disk_index = idx + base_index
+      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
+                              iv_name="disk/%d" % disk_index,
+                              logical_id=(file_driver,
+                                          "%s/disk%d" % (file_storage_dir,
+                                                         disk_index)),
+                              mode=disk["mode"])
+      disks.append(disk_dev)
+  elif template_name == constants.DT_SHARED_FILE:
+    if len(secondary_nodes) != 0:
+      raise errors.ProgrammerError("Wrong template configuration")
+
+    opcodes.RequireSharedFileStorage()
 
     for idx, disk in enumerate(disk_info):
       disk_index = idx + base_index
@@ -6384,6 +6661,19 @@ def _GenerateDiskTemplate(lu, template_name,
                                                          disk_index)),
                               mode=disk["mode"])
       disks.append(disk_dev)
+  elif template_name == constants.DT_BLOCK:
+    if len(secondary_nodes) != 0:
+      raise errors.ProgrammerError("Wrong template configuration")
+
+    for idx, disk in enumerate(disk_info):
+      disk_index = idx + base_index
+      disk_dev = objects.Disk(dev_type=constants.LD_BLOCKDEV, size=disk["size"],
+                              logical_id=(constants.BLOCKDEV_DRIVER_MANUAL,
+                                          disk["adopt"]),
+                              iv_name="disk/%d" % disk_index,
+                              mode=disk["mode"])
+      disks.append(disk_dev)
+
   else:
     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
   return disks
@@ -6396,6 +6686,83 @@ def _GetInstanceInfoText(instance):
   return "originstname+%s" % instance.name
 
 
+def _CalcEta(time_taken, written, total_size):
+  """Calculates the ETA based on size written and total size.
+
+  @param time_taken: The time taken so far
+  @param written: amount written so far
+  @param total_size: The total size of data to be written
+  @return: The remaining time in seconds
+
+  """
+  avg_time = time_taken / float(written)
+  return (total_size - written) * avg_time
+
+
+def _WipeDisks(lu, instance):
+  """Wipes instance disks.
+
+  @type lu: L{LogicalUnit}
+  @param lu: the logical unit on whose behalf we execute
+  @type instance: L{objects.Instance}
+  @param instance: the instance whose disks we should create
+  @return: the success of the wipe
+
+  """
+  node = instance.primary_node
+
+  for device in instance.disks:
+    lu.cfg.SetDiskID(device, node)
+
+  logging.info("Pause sync of instance %s disks", instance.name)
+  result = lu.rpc.call_blockdev_pause_resume_sync(node, instance.disks, True)
+
+  for idx, success in enumerate(result.payload):
+    if not success:
+      logging.warn("pause-sync of instance %s for disks %d failed",
+                   instance.name, idx)
+
+  try:
+    for idx, device in enumerate(instance.disks):
+      lu.LogInfo("* Wiping disk %d", idx)
+      logging.info("Wiping disk %d for instance %s, node %s",
+                   idx, instance.name, node)
+
+      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
+      # MAX_WIPE_CHUNK at max
+      wipe_chunk_size = min(constants.MAX_WIPE_CHUNK, device.size / 100.0 *
+                            constants.MIN_WIPE_CHUNK_PERCENT)
+
+      offset = 0
+      size = device.size
+      last_output = 0
+      start_time = time.time()
+
+      while offset < size:
+        wipe_size = min(wipe_chunk_size, size - offset)
+        result = lu.rpc.call_blockdev_wipe(node, device, offset, wipe_size)
+        result.Raise("Could not wipe disk %d at offset %d for size %d" %
+                     (idx, offset, wipe_size))
+        now = time.time()
+        offset += wipe_size
+        if now - last_output >= 60:
+          eta = _CalcEta(now - start_time, offset, size)
+          lu.LogInfo(" - done: %.1f%% ETA: %s" %
+                     (offset / float(size) * 100, utils.FormatSeconds(eta)))
+          last_output = now
+  finally:
+    logging.info("Resume sync of instance %s disks", instance.name)
+
+    result = lu.rpc.call_blockdev_pause_resume_sync(node, instance.disks, False)
+
+    for idx, success in enumerate(result.payload):
+      if not success:
+        lu.LogWarning("Warning: Resume sync of disk %d failed. Please have a"
+                      " look at the status and troubleshoot the issue.", idx)
+        logging.warn("resume-sync of instance %s for disks %d failed",
+                     instance.name, idx)
+
+
 def _CreateDisks(lu, instance, to_skip=None, target_node=None):
   """Create all disks for an instance.
 
@@ -6421,7 +6788,7 @@ def _CreateDisks(lu, instance, to_skip=None, target_node=None):
     pnode = target_node
     all_nodes = [pnode]
 
-  if instance.disk_template == constants.DT_FILE:
+  if instance.disk_template in (constants.DT_FILE, constants.DT_SHARED_FILE):
     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
 
@@ -6429,7 +6796,7 @@ def _CreateDisks(lu, instance, to_skip=None, target_node=None):
                  " node %s" % (file_storage_dir, pnode))
 
   # Note: this needs to be kept in sync with adding of disks in
-  # LUSetInstanceParams
+  # LUInstanceSetParams
   for idx, device in enumerate(instance.disks):
     if to_skip and idx in to_skip:
       continue
@@ -6490,6 +6857,37 @@ def _RemoveDisks(lu, instance, target_node=None):
   return all_result
 
 
+def _ComputeDiskSizePerVG(disk_template, disks):
+  """Compute disk size requirements in the volume group
+
+  """
+  def _compute(disks, payload):
+    """Universal algorithm
+
+    """
+    vgs = {}
+    for disk in disks:
+      vgs[disk["vg"]] = vgs.get("vg", 0) + disk["size"] + payload
+
+    return vgs
+
+  # Required free disk space as a function of disk and swap space
+  req_size_dict = {
+    constants.DT_DISKLESS: {},
+    constants.DT_PLAIN: _compute(disks, 0),
+    # 128 MB are added for drbd metadata for each disk
+    constants.DT_DRBD8: _compute(disks, 128),
+    constants.DT_FILE: {},
+    constants.DT_SHARED_FILE: {},
+  }
+
+  if disk_template not in req_size_dict:
+    raise errors.ProgrammerError("Disk template '%s' size requirement"
+                                 " is unknown" %  disk_template)
+
+  return req_size_dict[disk_template]
+
+
 def _ComputeDiskSize(disk_template, disks):
   """Compute disk size requirements in the volume group
 
@@ -6501,6 +6899,8 @@ def _ComputeDiskSize(disk_template, disks):
     # 128 MB are added for drbd metadata for each disk
     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
     constants.DT_FILE: None,
+    constants.DT_SHARED_FILE: 0,
+    constants.DT_BLOCK: 0,
   }
 
   if disk_template not in req_size_dict:
@@ -6510,6 +6910,21 @@ def _ComputeDiskSize(disk_template, disks):
   return req_size_dict[disk_template]
 
 
+def _FilterVmNodes(lu, nodenames):
+  """Filters out non-vm_capable nodes from a list.
+
+  @type lu: L{LogicalUnit}
+  @param lu: the logical unit for which we check
+  @type nodenames: list
+  @param nodenames: the list of nodes on which we should check
+  @rtype: list
+  @return: the list of vm-capable nodes
+
+  """
+  vm_nodes = frozenset(lu.cfg.GetNonVmCapableNodeList())
+  return [name for name in nodenames if name not in vm_nodes]
+
+
 def _CheckHVParams(lu, nodenames, hvname, hvparams):
   """Hypervisor parameter validation.
 
@@ -6527,6 +6942,7 @@ def _CheckHVParams(lu, nodenames, hvname, hvparams):
   @raise errors.OpPrereqError: if the parameters are not valid
 
   """
+  nodenames = _FilterVmNodes(lu, nodenames)
   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
                                                   hvname,
                                                   hvparams)
@@ -6554,6 +6970,7 @@ def _CheckOSParams(lu, required, nodenames, osname, osparams):
   @raise errors.OpPrereqError: if the parameters are not valid
 
   """
+  nodenames = _FilterVmNodes(lu, nodenames)
   result = lu.rpc.call_os_validate(required, nodenames, osname,
                                    [constants.OS_VALIDATE_PARAMETERS],
                                    osparams)
@@ -6566,41 +6983,12 @@ def _CheckOSParams(lu, required, nodenames, osname, osparams):
                  osname, node)
 
 
-class LUCreateInstance(LogicalUnit):
+class LUInstanceCreate(LogicalUnit):
   """Create an instance.
 
   """
   HPATH = "instance-add"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_PARAMS = [
-    _PInstanceName,
-    ("mode", _NoDefault, _TElemOf(constants.INSTANCE_CREATE_MODES)),
-    ("start", True, _TBool),
-    ("wait_for_sync", True, _TBool),
-    ("ip_check", True, _TBool),
-    ("name_check", True, _TBool),
-    ("disks", _NoDefault, _TListOf(_TDict)),
-    ("nics", _NoDefault, _TListOf(_TDict)),
-    ("hvparams", _EmptyDict, _TDict),
-    ("beparams", _EmptyDict, _TDict),
-    ("osparams", _EmptyDict, _TDict),
-    ("no_install", None, _TMaybeBool),
-    ("os_type", None, _TMaybeString),
-    ("force_variant", False, _TBool),
-    ("source_handshake", None, _TOr(_TList, _TNone)),
-    ("source_x509_ca", None, _TMaybeString),
-    ("source_instance_name", None, _TMaybeString),
-    ("src_node", None, _TMaybeString),
-    ("src_path", None, _TMaybeString),
-    ("pnode", None, _TMaybeString),
-    ("snode", None, _TMaybeString),
-    ("iallocator", None, _TMaybeString),
-    ("hypervisor", None, _TMaybeString),
-    ("disk_template", _NoDefault, _CheckDiskTemplate),
-    ("identify_defaults", False, _TBool),
-    ("file_driver", None, _TOr(_TNone, _TElemOf(constants.FILE_DRIVER))),
-    ("file_storage_dir", None, _TMaybeString),
-    ]
   REQ_BGL = False
 
   def CheckArguments(self):
@@ -6614,7 +7002,7 @@ class LUCreateInstance(LogicalUnit):
       self.op.start = False
     # validate/normalize the instance name
     self.op.instance_name = \
-      netutils.HostInfo.NormalizeName(self.op.instance_name)
+      netutils.Hostname.GetNormalizedName(self.op.instance_name)
 
     if self.op.ip_check and not self.op.name_check:
       # TODO: make the ip check more flexible and not depend on the name check
@@ -6648,12 +7036,18 @@ class LUCreateInstance(LogicalUnit):
       if self.op.mode == constants.INSTANCE_IMPORT:
         raise errors.OpPrereqError("Disk adoption not allowed for"
                                    " instance import", errors.ECODE_INVAL)
+    else:
+      if self.op.disk_template in constants.DTS_MUST_ADOPT:
+        raise errors.OpPrereqError("Disk template %s requires disk adoption,"
+                                   " but no 'adopt' parameter given" %
+                                   self.op.disk_template,
+                                   errors.ECODE_INVAL)
 
     self.adopt_disks = has_adopt
 
     # instance name verification
     if self.op.name_check:
-      self.hostname1 = netutils.GetHostInfo(self.op.instance_name)
+      self.hostname1 = netutils.GetHostname(name=self.op.instance_name)
       self.op.instance_name = self.hostname1.name
       # used in CheckPrereq for ip ping check
       self.check_ip = self.hostname1.ip
@@ -6744,8 +7138,8 @@ class LUCreateInstance(LogicalUnit):
         raise errors.OpPrereqError("Missing source instance name",
                                    errors.ECODE_INVAL)
 
-      norm_name = netutils.HostInfo.NormalizeName(src_instance_name)
-      self.source_instance_name = netutils.GetHostInfo(norm_name).name
+      self.source_instance_name = \
+          netutils.GetHostname(name=src_instance_name).name
 
     else:
       raise errors.OpPrereqError("Invalid instance creation mode %r" %
@@ -7024,8 +7418,6 @@ class LUCreateInstance(LogicalUnit):
       export_info = self._ReadExportInfo()
       self._ReadExportParams(export_info)
 
-    _CheckDiskTemplate(self.op.disk_template)
-
     if (not self.cfg.GetVGName() and
         self.op.disk_template not in constants.DTS_NOT_LVM):
       raise errors.OpPrereqError("Cluster does not support lvm-based"
@@ -7085,13 +7477,12 @@ class LUCreateInstance(LogicalUnit):
       elif ip.lower() == constants.VALUE_AUTO:
         if not self.op.name_check:
           raise errors.OpPrereqError("IP address set to auto but name checks"
-                                     " have been skipped. Aborting.",
+                                     " have been skipped",
                                      errors.ECODE_INVAL)
         nic_ip = self.hostname1.ip
       else:
-        if not netutils.IsValidIP4(ip):
-          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
-                                     " like a valid IP" % ip,
+        if not netutils.IPAddress.IsValid(ip):
+          raise errors.OpPrereqError("Invalid IP address '%s'" % ip,
                                      errors.ECODE_INVAL)
         nic_ip = ip
 
@@ -7112,18 +7503,8 @@ class LUCreateInstance(LogicalUnit):
                                      " in cluster" % mac,
                                      errors.ECODE_NOTUNIQUE)
 
-      # bridge verification
-      bridge = nic.get("bridge", None)
-      link = nic.get("link", None)
-      if bridge and link:
-        raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
-                                   " at the same time", errors.ECODE_INVAL)
-      elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
-        raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic",
-                                   errors.ECODE_INVAL)
-      elif bridge:
-        link = bridge
-
+      #  Build nic parameters
+      link = nic.get(constants.INIC_LINK, None)
       nicparams = {}
       if nic_mode_req:
         nicparams[constants.NIC_MODE] = nic_mode_req
@@ -7149,7 +7530,8 @@ class LUCreateInstance(LogicalUnit):
       except (TypeError, ValueError):
         raise errors.OpPrereqError("Invalid disk size '%s'" % size,
                                    errors.ECODE_INVAL)
-      new_disk = {"size": size, "mode": mode}
+      vg = disk.get("vg", self.cfg.GetVGName())
+      new_disk = {"size": size, "mode": mode, "vg": vg}
       if "adopt" in disk:
         new_disk["adopt"] = disk["adopt"]
       self.disks.append(new_disk)
@@ -7229,6 +7611,9 @@ class LUCreateInstance(LogicalUnit):
     if pnode.drained:
       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
                                  pnode.name, errors.ECODE_STATE)
+    if not pnode.vm_capable:
+      raise errors.OpPrereqError("Cannot use non-vm_capable primary node"
+                                 " '%s'" % pnode.name, errors.ECODE_STATE)
 
     self.secondaries = []
 
@@ -7239,33 +7624,38 @@ class LUCreateInstance(LogicalUnit):
                                    " primary node.", errors.ECODE_INVAL)
       _CheckNodeOnline(self, self.op.snode)
       _CheckNodeNotDrained(self, self.op.snode)
+      _CheckNodeVmCapable(self, self.op.snode)
       self.secondaries.append(self.op.snode)
 
     nodenames = [pnode.name] + self.secondaries
 
-    req_size = _ComputeDiskSize(self.op.disk_template,
-                                self.disks)
-
-    # Check lv size requirements, if not adopting
-    if req_size is not None and not self.adopt_disks:
-      _CheckNodesFreeDisk(self, nodenames, req_size)
+    if not self.adopt_disks:
+      # Check lv size requirements, if not adopting
+      req_sizes = _ComputeDiskSizePerVG(self.op.disk_template, self.disks)
+      _CheckNodesFreeDiskPerVG(self, nodenames, req_sizes)
 
-    if self.adopt_disks: # instead, we must check the adoption data
-      all_lvs = set([i["adopt"] for i in self.disks])
+    elif self.op.disk_template == constants.DT_PLAIN: # Check the adoption data
+      all_lvs = set([i["vg"] + "/" + i["adopt"] for i in self.disks])
       if len(all_lvs) != len(self.disks):
         raise errors.OpPrereqError("Duplicate volume names given for adoption",
                                    errors.ECODE_INVAL)
       for lv_name in all_lvs:
         try:
+          # FIXME: lv_name here is "vg/lv" need to ensure that other calls
+          # to ReserveLV uses the same syntax
           self.cfg.ReserveLV(lv_name, self.proc.GetECId())
         except errors.ReservationError:
           raise errors.OpPrereqError("LV named %s used by another instance" %
                                      lv_name, errors.ECODE_NOTUNIQUE)
 
+      vg_names = self.rpc.call_vg_list([pnode.name])[pnode.name]
+      vg_names.Raise("Cannot get VG information from node %s" % pnode.name)
+
       node_lvs = self.rpc.call_lv_list([pnode.name],
-                                       self.cfg.GetVGName())[pnode.name]
+                                       vg_names.payload.keys())[pnode.name]
       node_lvs.Raise("Cannot get LV information from node %s" % pnode.name)
       node_lvs = node_lvs.payload
+
       delta = all_lvs.difference(node_lvs.keys())
       if delta:
         raise errors.OpPrereqError("Missing logical volume(s): %s" %
@@ -7278,7 +7668,35 @@ class LUCreateInstance(LogicalUnit):
                                    errors.ECODE_STATE)
       # update the size of disk based on what is found
       for dsk in self.disks:
-        dsk["size"] = int(float(node_lvs[dsk["adopt"]][0]))
+        dsk["size"] = int(float(node_lvs[dsk["vg"] + "/" + dsk["adopt"]][0]))
+
+    elif self.op.disk_template == constants.DT_BLOCK:
+      # Normalize and de-duplicate device paths
+      all_disks = set([os.path.abspath(i["adopt"]) for i in self.disks])
+      if len(all_disks) != len(self.disks):
+        raise errors.OpPrereqError("Duplicate disk names given for adoption",
+                                   errors.ECODE_INVAL)
+      baddisks = [d for d in all_disks
+                  if not d.startswith(constants.ADOPTABLE_BLOCKDEV_ROOT)]
+      if baddisks:
+        raise errors.OpPrereqError("Device node(s) %s lie outside %s and"
+                                   " cannot be adopted" %
+                                   (", ".join(baddisks),
+                                    constants.ADOPTABLE_BLOCKDEV_ROOT),
+                                   errors.ECODE_INVAL)
+
+      node_disks = self.rpc.call_bdev_sizes([pnode.name],
+                                            list(all_disks))[pnode.name]
+      node_disks.Raise("Cannot get block device information from node %s" %
+                       pnode.name)
+      node_disks = node_disks.payload
+      delta = all_disks.difference(node_disks.keys())
+      if delta:
+        raise errors.OpPrereqError("Missing block device(s): %s" %
+                                   utils.CommaJoin(delta),
+                                   errors.ECODE_INVAL)
+      for dsk in self.disks:
+        dsk["size"] = int(float(node_disks[dsk["adopt"]]))
 
     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
 
@@ -7310,7 +7728,7 @@ class LUCreateInstance(LogicalUnit):
     else:
       network_port = None
 
-    if constants.ENABLE_FILE_STORAGE:
+    if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
       # this is needed because os.path.join does not accept None arguments
       if self.op.file_storage_dir is None:
         string_file_storage_dir = ""
@@ -7318,7 +7736,12 @@ class LUCreateInstance(LogicalUnit):
         string_file_storage_dir = self.op.file_storage_dir
 
       # build the full file storage dir path
-      file_storage_dir = utils.PathJoin(self.cfg.GetFileStorageDir(),
+      if self.op.disk_template == constants.DT_SHARED_FILE:
+        get_fsd_fn = self.cfg.GetSharedFileStorageDir
+      else:
+        get_fsd_fn = self.cfg.GetFileStorageDir
+
+      file_storage_dir = utils.PathJoin(get_fsd_fn(),
                                         string_file_storage_dir, instance)
     else:
       file_storage_dir = ""
@@ -7330,7 +7753,8 @@ class LUCreateInstance(LogicalUnit):
                                   self.disks,
                                   file_storage_dir,
                                   self.op.file_driver,
-                                  0)
+                                  0,
+                                  feedback_fn)
 
     iobj = objects.Instance(name=instance, os=self.op.os_type,
                             primary_node=pnode_name,
@@ -7345,17 +7769,18 @@ class LUCreateInstance(LogicalUnit):
                             )
 
     if self.adopt_disks:
-      # rename LVs to the newly-generated names; we need to construct
-      # 'fake' LV disks with the old data, plus the new unique_id
-      tmp_disks = [objects.Disk.FromDict(v.ToDict()) for v in disks]
-      rename_to = []
-      for t_dsk, a_dsk in zip (tmp_disks, self.disks):
-        rename_to.append(t_dsk.logical_id)
-        t_dsk.logical_id = (t_dsk.logical_id[0], a_dsk["adopt"])
-        self.cfg.SetDiskID(t_dsk, pnode_name)
-      result = self.rpc.call_blockdev_rename(pnode_name,
-                                             zip(tmp_disks, rename_to))
-      result.Raise("Failed to rename adoped LVs")
+      if self.op.disk_template == constants.DT_PLAIN:
+        # rename LVs to the newly-generated names; we need to construct
+        # 'fake' LV disks with the old data, plus the new unique_id
+        tmp_disks = [objects.Disk.FromDict(v.ToDict()) for v in disks]
+        rename_to = []
+        for t_dsk, a_dsk in zip (tmp_disks, self.disks):
+          rename_to.append(t_dsk.logical_id)
+          t_dsk.logical_id = (t_dsk.logical_id[0], a_dsk["adopt"])
+          self.cfg.SetDiskID(t_dsk, pnode_name)
+        result = self.rpc.call_blockdev_rename(pnode_name,
+                                               zip(tmp_disks, rename_to))
+        result.Raise("Failed to rename adoped LVs")
     else:
       feedback_fn("* creating instance disks...")
       try:
@@ -7368,6 +7793,18 @@ class LUCreateInstance(LogicalUnit):
           self.cfg.ReleaseDRBDMinors(instance)
           raise
 
+      if self.cfg.GetClusterInfo().prealloc_wipe_disks:
+        feedback_fn("* wiping instance disks...")
+        try:
+          _WipeDisks(self, iobj)
+        except errors.OpExecError:
+          self.LogWarning("Device wiping failed, reverting...")
+          try:
+            _RemoveDisks(self, iobj)
+          finally:
+            self.cfg.ReleaseDRBDMinors(instance)
+            raise
+
     feedback_fn("adding instance %s to cluster config" % instance)
 
     self.cfg.AddInstance(iobj, self.proc.GetECId())
@@ -7442,12 +7879,18 @@ class LUCreateInstance(LogicalUnit):
 
       elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
         feedback_fn("* preparing remote import...")
-        connect_timeout = constants.RIE_CONNECT_TIMEOUT
+        # The source cluster will stop the instance before attempting to make a
+        # connection. In some cases stopping an instance can take a long time,
+        # hence the shutdown timeout is added to the connection timeout.
+        connect_timeout = (constants.RIE_CONNECT_TIMEOUT +
+                           self.op.source_shutdown_timeout)
         timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
 
-        disk_results = masterd.instance.RemoteImport(self, feedback_fn, iobj,
-                                                     self.source_x509_ca,
-                                                     self._cds, timeouts)
+        assert iobj.primary_node == self.pnode.name
+        disk_results = \
+          masterd.instance.RemoteImport(self, feedback_fn, iobj, self.pnode,
+                                        self.source_x509_ca,
+                                        self._cds, timeouts)
         if not compat.all(disk_results):
           # TODO: Should the instance still be started, even if some disks
           # failed to import (valid for local imports, too)?
@@ -7480,7 +7923,7 @@ class LUCreateInstance(LogicalUnit):
     return list(iobj.all_nodes)
 
 
-class LUConnectConsole(NoHooksLU):
+class LUInstanceConsole(NoHooksLU):
   """Connect to an instance's console.
 
   This is somewhat special in that it returns the command line that
@@ -7488,9 +7931,6 @@ class LUConnectConsole(NoHooksLU):
   console.
 
   """
-  _OP_PARAMS = [
-    _PInstanceName
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -7519,36 +7959,45 @@ class LUConnectConsole(NoHooksLU):
     node_insts.Raise("Can't get node information from %s" % node)
 
     if instance.name not in node_insts.payload:
-      raise errors.OpExecError("Instance %s is not running." % instance.name)
+      if instance.admin_up:
+        state = constants.INSTST_ERRORDOWN
+      else:
+        state = constants.INSTST_ADMINDOWN
+      raise errors.OpExecError("Instance %s is not running (state %s)" %
+                               (instance.name, state))
 
     logging.debug("Connecting to console of %s on %s", instance.name, node)
 
-    hyper = hypervisor.GetHypervisor(instance.hypervisor)
-    cluster = self.cfg.GetClusterInfo()
-    # beparams and hvparams are passed separately, to avoid editing the
-    # instance and then saving the defaults in the instance itself.
-    hvparams = cluster.FillHV(instance)
-    beparams = cluster.FillBE(instance)
-    console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
+    return _GetInstanceConsole(self.cfg.GetClusterInfo(), instance)
+
 
-    # build ssh cmdline
-    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
+def _GetInstanceConsole(cluster, instance):
+  """Returns console information for an instance.
 
+  @type cluster: L{objects.Cluster}
+  @type instance: L{objects.Instance}
+  @rtype: dict
+
+  """
+  hyper = hypervisor.GetHypervisor(instance.hypervisor)
+  # beparams and hvparams are passed separately, to avoid editing the
+  # instance and then saving the defaults in the instance itself.
+  hvparams = cluster.FillHV(instance)
+  beparams = cluster.FillBE(instance)
+  console = hyper.GetInstanceConsole(instance, hvparams, beparams)
 
-class LUReplaceDisks(LogicalUnit):
+  assert console.instance == instance.name
+  assert console.Validate()
+
+  return console.ToDict()
+
+
+class LUInstanceReplaceDisks(LogicalUnit):
   """Replace the disks of an instance.
 
   """
   HPATH = "mirrors-replace"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_PARAMS = [
-    _PInstanceName,
-    ("mode", _NoDefault, _TElemOf(constants.REPLACE_MODES)),
-    ("disks", _EmptyList, _TListOf(_TPositiveInt)),
-    ("remote_node", None, _TMaybeString),
-    ("iallocator", None, _TMaybeString),
-    ("early_release", False, _TBool),
-    ]
   REQ_BGL = False
 
   def CheckArguments(self):
@@ -7803,6 +8252,7 @@ class TLReplaceDisks(Tasklet):
         check_nodes = [self.new_node, self.other_node]
 
         _CheckNodeNotDrained(self.lu, remote_node)
+        _CheckNodeVmCapable(self.lu, remote_node)
 
         old_node_info = self.cfg.GetNodeInfo(secondary_node)
         assert old_node_info is not None
@@ -8284,12 +8734,6 @@ class LURepairNodeStorage(NoHooksLU):
   """Repairs the volume group on a node.
 
   """
-  _OP_PARAMS = [
-    _PNodeName,
-    ("storage_type", _NoDefault, _CheckStorageType),
-    ("name", _NoDefault, _TNonEmptyString),
-    ("ignore_consistency", False, _TBool),
-    ]
   REQ_BGL = False
 
   def CheckArguments(self):
@@ -8348,15 +8792,10 @@ class LURepairNodeStorage(NoHooksLU):
                  (self.op.name, self.op.node_name))
 
 
-class LUNodeEvacuationStrategy(NoHooksLU):
+class LUNodeEvacStrategy(NoHooksLU):
   """Computes the node evacuation strategy.
 
   """
-  _OP_PARAMS = [
-    ("nodes", _NoDefault, _TListOf(_TNonEmptyString)),
-    ("remote_node", None, _TMaybeString),
-    ("iallocator", None, _TMaybeString),
-    ]
   REQ_BGL = False
 
   def CheckArguments(self):
@@ -8397,18 +8836,12 @@ class LUNodeEvacuationStrategy(NoHooksLU):
     return result
 
 
-class LUGrowDisk(LogicalUnit):
+class LUInstanceGrowDisk(LogicalUnit):
   """Grow a disk of an instance.
 
   """
   HPATH = "disk-grow"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_PARAMS = [
-    _PInstanceName,
-    ("disk", _NoDefault, _TInt),
-    ("amount", _NoDefault, _TInt),
-    ("wait_for_sync", True, _TBool),
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -8455,10 +8888,12 @@ class LUGrowDisk(LogicalUnit):
 
     self.disk = instance.FindDisk(self.op.disk)
 
-    if instance.disk_template != constants.DT_FILE:
+    if instance.disk_template not in (constants.DT_FILE,
+                                      constants.DT_SHARED_FILE):
       # TODO: check the free disk space for file, when that feature will be
       # supported
-      _CheckNodesFreeDisk(self, nodenames, self.op.amount)
+      _CheckNodesFreeDiskPerVG(self, nodenames,
+                               self.disk.ComputeGrowth(self.op.amount))
 
   def Exec(self, feedback_fn):
     """Execute disk grow.
@@ -8498,14 +8933,10 @@ class LUGrowDisk(LogicalUnit):
                            " sync mode was requested.")
 
 
-class LUQueryInstanceData(NoHooksLU):
+class LUInstanceQueryData(NoHooksLU):
   """Query runtime instance data.
 
   """
-  _OP_PARAMS = [
-    ("instances", _EmptyList, _TListOf(_TNonEmptyString)),
-    ("static", False, _TBool),
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -8656,25 +9087,12 @@ class LUQueryInstanceData(NoHooksLU):
     return result
 
 
-class LUSetInstanceParams(LogicalUnit):
+class LUInstanceSetParams(LogicalUnit):
   """Modifies an instances's parameters.
 
   """
   HPATH = "instance-modify"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_PARAMS = [
-    _PInstanceName,
-    ("nics", _EmptyList, _TList),
-    ("disks", _EmptyList, _TList),
-    ("beparams", _EmptyDict, _TDict),
-    ("hvparams", _EmptyDict, _TDict),
-    ("disk_template", None, _TMaybeString),
-    ("remote_node", None, _TMaybeString),
-    ("os_name", None, _TMaybeString),
-    ("force_variant", False, _TBool),
-    ("osparams", None, _TOr(_TDict, _TNone)),
-    _PForce,
-    ]
   REQ_BGL = False
 
   def CheckArguments(self):
@@ -8731,13 +9149,12 @@ class LUSetInstanceParams(LogicalUnit):
                                  " changes not supported at the same time",
                                  errors.ECODE_INVAL)
 
-    if self.op.disk_template:
-      _CheckDiskTemplate(self.op.disk_template)
-      if (self.op.disk_template in constants.DTS_NET_MIRROR and
-          self.op.remote_node is None):
-        raise errors.OpPrereqError("Changing the disk template to a mirrored"
-                                   " one requires specifying a secondary node",
-                                   errors.ECODE_INVAL)
+    if (self.op.disk_template and
+        self.op.disk_template in constants.DTS_NET_MIRROR and
+        self.op.remote_node is None):
+      raise errors.OpPrereqError("Changing the disk template to a mirrored"
+                                 " one requires specifying a secondary node",
+                                 errors.ECODE_INVAL)
 
     # NIC validation
     nic_addremove = 0
@@ -8761,7 +9178,7 @@ class LUSetInstanceParams(LogicalUnit):
         if nic_ip.lower() == constants.VALUE_NONE:
           nic_dict['ip'] = None
         else:
-          if not netutils.IsValidIP4(nic_ip):
+          if not netutils.IPAddress.IsValid(nic_ip):
             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip,
                                        errors.ECODE_INVAL)
 
@@ -8900,9 +9317,12 @@ class LUSetInstanceParams(LogicalUnit):
                                      self.op.remote_node, errors.ECODE_STATE)
         _CheckNodeOnline(self, self.op.remote_node)
         _CheckNodeNotDrained(self, self.op.remote_node)
-        disks = [{"size": d.size} for d in instance.disks]
-        required = _ComputeDiskSize(self.op.disk_template, disks)
-        _CheckNodesFreeDisk(self, [self.op.remote_node], required)
+        # FIXME: here we assume that the old instance type is DT_PLAIN
+        assert instance.disk_template == constants.DT_PLAIN
+        disks = [{"size": d.size, "vg": d.logical_id[0]}
+                 for d in instance.disks]
+        required = _ComputeDiskSizePerVG(self.op.disk_template, disks)
+        _CheckNodesFreeDiskPerVG(self, [self.op.remote_node], required)
 
     # hvparams processing
     if self.op.hvparams:
@@ -8934,10 +9354,9 @@ class LUSetInstanceParams(LogicalUnit):
     if self.op.osparams:
       i_osdict = _GetUpdatedParams(instance.osparams, self.op.osparams)
       _CheckOSParams(self, True, nodelist, instance_os, i_osdict)
-      self.os_new = cluster.SimpleFillOS(instance_os, i_osdict)
       self.os_inst = i_osdict # the new dict (without defaults)
     else:
-      self.os_new = self.os_inst = {}
+      self.os_inst = {}
 
     self.warn = []
 
@@ -8948,7 +9367,7 @@ class LUSetInstanceParams(LogicalUnit):
         mem_check_list.extend(instance.secondary_nodes)
       instance_info = self.rpc.call_instance_info(pnode, instance.name,
                                                   instance.hypervisor)
-      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
+      nodeinfo = self.rpc.call_node_info(mem_check_list, None,
                                          instance.hypervisor)
       pninfo = nodeinfo[pnode]
       msg = pninfo.fail_msg
@@ -9082,7 +9501,7 @@ class LUSetInstanceParams(LogicalUnit):
         _CheckInstanceDown(self, instance, "cannot remove disks")
 
       if (disk_op == constants.DDM_ADD and
-          len(instance.nics) >= constants.MAX_DISKS):
+          len(instance.disks) >= constants.MAX_DISKS):
         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
                                    " add more" % constants.MAX_DISKS,
                                    errors.ECODE_STATE)
@@ -9109,7 +9528,7 @@ class LUSetInstanceParams(LogicalUnit):
     disk_info = [{"size": d.size, "mode": d.mode} for d in instance.disks]
     new_disks = _GenerateDiskTemplate(self, self.op.disk_template,
                                       instance.name, pnode, [snode],
-                                      disk_info, None, None, 0)
+                                      disk_info, None, None, 0, feedback_fn)
     info = _GetInstanceInfoText(instance)
     feedback_fn("Creating aditional volumes...")
     # first, create the missing data and meta devices
@@ -9185,7 +9604,6 @@ class LUSetInstanceParams(LogicalUnit):
         self.LogWarning("Could not remove metadata for disk %d on node %s,"
                         " continuing anyway: %s", idx, pnode, msg)
 
-
   def Exec(self, feedback_fn):
     """Modifies an instance.
 
@@ -9214,7 +9632,8 @@ class LUSetInstanceParams(LogicalUnit):
         result.append(("disk/%d" % device_idx, "remove"))
       elif disk_op == constants.DDM_ADD:
         # add a new disk
-        if instance.disk_template == constants.DT_FILE:
+        if instance.disk_template in (constants.DT_FILE,
+                                        constants.DT_SHARED_FILE):
           file_driver, file_path = instance.disks[0].logical_id
           file_path = os.path.dirname(file_path)
         else:
@@ -9227,7 +9646,7 @@ class LUSetInstanceParams(LogicalUnit):
                                          [disk_dict],
                                          file_path,
                                          file_driver,
-                                         disk_idx_base)[0]
+                                         disk_idx_base, feedback_fn)[0]
         instance.disks.append(new_disk)
         info = _GetInstanceInfoText(instance)
 
@@ -9254,7 +9673,7 @@ class LUSetInstanceParams(LogicalUnit):
     if self.op.disk_template:
       r_shut = _ShutdownInstanceDisks(self, instance)
       if not r_shut:
-        raise errors.OpExecError("Cannot shutdow instance disks, unable to"
+        raise errors.OpExecError("Cannot shutdown instance disks, unable to"
                                  " proceed with disk template conversion")
       mode = (instance.disk_template, self.op.disk_template)
       try:
@@ -9324,14 +9743,10 @@ class LUSetInstanceParams(LogicalUnit):
     }
 
 
-class LUQueryExports(NoHooksLU):
+class LUBackupQuery(NoHooksLU):
   """Query the exports list
 
   """
-  _OP_PARAMS = [
-    ("nodes", _EmptyList, _TListOf(_TNonEmptyString)),
-    ("use_locking", False, _TBool),
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -9364,14 +9779,10 @@ class LUQueryExports(NoHooksLU):
     return result
 
 
-class LUPrepareExport(NoHooksLU):
+class LUBackupPrepare(NoHooksLU):
   """Prepares an instance for an export and returns useful information.
 
   """
-  _OP_PARAMS = [
-    _PInstanceName,
-    ("mode", _NoDefault, _TElemOf(constants.EXPORT_MODES)),
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -9419,23 +9830,12 @@ class LUPrepareExport(NoHooksLU):
     return None
 
 
-class LUExportInstance(LogicalUnit):
+class LUBackupExport(LogicalUnit):
   """Export an instance to an image in the cluster.
 
   """
   HPATH = "instance-export"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_PARAMS = [
-    _PInstanceName,
-    ("target_node", _NoDefault, _TOr(_TNonEmptyString, _TList)),
-    ("shutdown", True, _TBool),
-    _PShutdownTimeout,
-    ("remove_instance", False, _TBool),
-    ("ignore_remove_failures", False, _TBool),
-    ("mode", constants.EXPORT_MODE_LOCAL, _TElemOf(constants.EXPORT_MODES)),
-    ("x509_key_name", None, _TOr(_TList, _TNone)),
-    ("destination_x509_ca", None, _TMaybeString),
-    ]
   REQ_BGL = False
 
   def CheckArguments(self):
@@ -9445,10 +9845,6 @@ class LUExportInstance(LogicalUnit):
     self.x509_key_name = self.op.x509_key_name
     self.dest_x509_ca_pem = self.op.destination_x509_ca
 
-    if self.op.remove_instance and not self.op.shutdown:
-      raise errors.OpPrereqError("Can not remove instance without shutting it"
-                                 " down before")
-
     if self.op.mode == constants.EXPORT_MODE_REMOTE:
       if not self.x509_key_name:
         raise errors.OpPrereqError("Missing X509 key name for encryption",
@@ -9514,6 +9910,11 @@ class LUExportInstance(LogicalUnit):
           "Cannot retrieve locked instance %s" % self.op.instance_name
     _CheckNodeOnline(self, self.instance.primary_node)
 
+    if (self.op.remove_instance and self.instance.admin_up and
+        not self.op.shutdown):
+      raise errors.OpPrereqError("Can not remove instance without shutting it"
+                                 " down before")
+
     if self.op.mode == constants.EXPORT_MODE_LOCAL:
       self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
       self.dst_node = self.cfg.GetNodeInfo(self.op.target_node)
@@ -9601,7 +10002,7 @@ class LUExportInstance(LogicalUnit):
     nodelist.remove(self.dst_node.name)
 
     # on one-node clusters nodelist will be empty after the removal
-    # if we proceed the backup would be removed because OpQueryExports
+    # if we proceed the backup would be removed because OpBackupQuery
     # substitutes an empty list with the full cluster node list.
     iname = self.instance.name
     if nodelist:
@@ -9717,13 +10118,10 @@ class LUExportInstance(LogicalUnit):
     return fin_resu, dresults
 
 
-class LURemoveExport(NoHooksLU):
+class LUBackupRemove(NoHooksLU):
   """Remove exports related to the named instance.
 
   """
-  _OP_PARAMS = [
-    _PInstanceName,
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -9767,10 +10165,473 @@ class LURemoveExport(NoHooksLU):
                   " Domain Name.")
 
 
-class TagsLU(NoHooksLU): # pylint: disable-msg=W0223
-  """Generic tags LU.
-
-  This is an abstract class which is the parent of all the other tags LUs.
+class LUGroupAdd(LogicalUnit):
+  """Logical unit for creating node groups.
+
+  """
+  HPATH = "group-add"
+  HTYPE = constants.HTYPE_GROUP
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    # We need the new group's UUID here so that we can create and acquire the
+    # corresponding lock. Later, in Exec(), we'll indicate to cfg.AddNodeGroup
+    # that it should not check whether the UUID exists in the configuration.
+    self.group_uuid = self.cfg.GenerateUniqueID(self.proc.GetECId())
+    self.needed_locks = {}
+    self.add_locks[locking.LEVEL_NODEGROUP] = self.group_uuid
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    This checks that the given group name is not an existing node group
+    already.
+
+    """
+    try:
+      existing_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
+    except errors.OpPrereqError:
+      pass
+    else:
+      raise errors.OpPrereqError("Desired group name '%s' already exists as a"
+                                 " node group (UUID: %s)" %
+                                 (self.op.group_name, existing_uuid),
+                                 errors.ECODE_EXISTS)
+
+    if self.op.ndparams:
+      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    """
+    env = {
+      "GROUP_NAME": self.op.group_name,
+      }
+    mn = self.cfg.GetMasterNode()
+    return env, [mn], [mn]
+
+  def Exec(self, feedback_fn):
+    """Add the node group to the cluster.
+
+    """
+    group_obj = objects.NodeGroup(name=self.op.group_name, members=[],
+                                  uuid=self.group_uuid,
+                                  alloc_policy=self.op.alloc_policy,
+                                  ndparams=self.op.ndparams)
+
+    self.cfg.AddNodeGroup(group_obj, self.proc.GetECId(), check_uuid=False)
+    del self.remove_locks[locking.LEVEL_NODEGROUP]
+
+
+class LUGroupAssignNodes(NoHooksLU):
+  """Logical unit for assigning nodes to groups.
+
+  """
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    # These raise errors.OpPrereqError on their own:
+    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
+    self.op.nodes = _GetWantedNodes(self, self.op.nodes)
+
+    # We want to lock all the affected nodes and groups. We have readily
+    # available the list of nodes, and the *destination* group. To gather the
+    # list of "source" groups, we need to fetch node information.
+    self.node_data = self.cfg.GetAllNodesInfo()
+    affected_groups = set(self.node_data[node].group for node in self.op.nodes)
+    affected_groups.add(self.group_uuid)
+
+    self.needed_locks = {
+      locking.LEVEL_NODEGROUP: list(affected_groups),
+      locking.LEVEL_NODE: self.op.nodes,
+      }
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    """
+    self.group = self.cfg.GetNodeGroup(self.group_uuid)
+    instance_data = self.cfg.GetAllInstancesInfo()
+
+    if self.group is None:
+      raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" %
+                               (self.op.group_name, self.group_uuid))
+
+    (new_splits, previous_splits) = \
+      self.CheckAssignmentForSplitInstances([(node, self.group_uuid)
+                                             for node in self.op.nodes],
+                                            self.node_data, instance_data)
+
+    if new_splits:
+      fmt_new_splits = utils.CommaJoin(utils.NiceSort(new_splits))
+
+      if not self.op.force:
+        raise errors.OpExecError("The following instances get split by this"
+                                 " change and --force was not given: %s" %
+                                 fmt_new_splits)
+      else:
+        self.LogWarning("This operation will split the following instances: %s",
+                        fmt_new_splits)
+
+        if previous_splits:
+          self.LogWarning("In addition, these already-split instances continue"
+                          " to be spit across groups: %s",
+                          utils.CommaJoin(utils.NiceSort(previous_splits)))
+
+  def Exec(self, feedback_fn):
+    """Assign nodes to a new group.
+
+    """
+    for node in self.op.nodes:
+      self.node_data[node].group = self.group_uuid
+
+    self.cfg.Update(self.group, feedback_fn) # Saves all modified nodes.
+
+  @staticmethod
+  def CheckAssignmentForSplitInstances(changes, node_data, instance_data):
+    """Check for split instances after a node assignment.
+
+    This method considers a series of node assignments as an atomic operation,
+    and returns information about split instances after applying the set of
+    changes.
+
+    In particular, it returns information about newly split instances, and
+    instances that were already split, and remain so after the change.
+
+    Only instances whose disk template is listed in constants.DTS_NET_MIRROR are
+    considered.
+
+    @type changes: list of (node_name, new_group_uuid) pairs.
+    @param changes: list of node assignments to consider.
+    @param node_data: a dict with data for all nodes
+    @param instance_data: a dict with all instances to consider
+    @rtype: a two-tuple
+    @return: a list of instances that were previously okay and result split as a
+      consequence of this change, and a list of instances that were previously
+      split and this change does not fix.
+
+    """
+    changed_nodes = dict((node, group) for node, group in changes
+                         if node_data[node].group != group)
+
+    all_split_instances = set()
+    previously_split_instances = set()
+
+    def InstanceNodes(instance):
+      return [instance.primary_node] + list(instance.secondary_nodes)
+
+    for inst in instance_data.values():
+      if inst.disk_template not in constants.DTS_NET_MIRROR:
+        continue
+
+      instance_nodes = InstanceNodes(inst)
+
+      if len(set(node_data[node].group for node in instance_nodes)) > 1:
+        previously_split_instances.add(inst.name)
+
+      if len(set(changed_nodes.get(node, node_data[node].group)
+                 for node in instance_nodes)) > 1:
+        all_split_instances.add(inst.name)
+
+    return (list(all_split_instances - previously_split_instances),
+            list(previously_split_instances & all_split_instances))
+
+
+class _GroupQuery(_QueryBase):
+  FIELDS = query.GROUP_FIELDS
+
+  def ExpandNames(self, lu):
+    lu.needed_locks = {}
+
+    self._all_groups = lu.cfg.GetAllNodeGroupsInfo()
+    name_to_uuid = dict((g.name, g.uuid) for g in self._all_groups.values())
+
+    if not self.names:
+      self.wanted = [name_to_uuid[name]
+                     for name in utils.NiceSort(name_to_uuid.keys())]
+    else:
+      # Accept names to be either names or UUIDs.
+      missing = []
+      self.wanted = []
+      all_uuid = frozenset(self._all_groups.keys())
+
+      for name in self.names:
+        if name in all_uuid:
+          self.wanted.append(name)
+        elif name in name_to_uuid:
+          self.wanted.append(name_to_uuid[name])
+        else:
+          missing.append(name)
+
+      if missing:
+        raise errors.OpPrereqError("Some groups do not exist: %s" % missing,
+                                   errors.ECODE_NOENT)
+
+  def DeclareLocks(self, lu, level):
+    pass
+
+  def _GetQueryData(self, lu):
+    """Computes the list of node groups and their attributes.
+
+    """
+    do_nodes = query.GQ_NODE in self.requested_data
+    do_instances = query.GQ_INST in self.requested_data
+
+    group_to_nodes = None
+    group_to_instances = None
+
+    # For GQ_NODE, we need to map group->[nodes], and group->[instances] for
+    # GQ_INST. The former is attainable with just GetAllNodesInfo(), but for the
+    # latter GetAllInstancesInfo() is not enough, for we have to go through
+    # instance->node. Hence, we will need to process nodes even if we only need
+    # instance information.
+    if do_nodes or do_instances:
+      all_nodes = lu.cfg.GetAllNodesInfo()
+      group_to_nodes = dict((uuid, []) for uuid in self.wanted)
+      node_to_group = {}
+
+      for node in all_nodes.values():
+        if node.group in group_to_nodes:
+          group_to_nodes[node.group].append(node.name)
+          node_to_group[node.name] = node.group
+
+      if do_instances:
+        all_instances = lu.cfg.GetAllInstancesInfo()
+        group_to_instances = dict((uuid, []) for uuid in self.wanted)
+
+        for instance in all_instances.values():
+          node = instance.primary_node
+          if node in node_to_group:
+            group_to_instances[node_to_group[node]].append(instance.name)
+
+        if not do_nodes:
+          # Do not pass on node information if it was not requested.
+          group_to_nodes = None
+
+    return query.GroupQueryData([self._all_groups[uuid]
+                                 for uuid in self.wanted],
+                                group_to_nodes, group_to_instances)
+
+
+class LUGroupQuery(NoHooksLU):
+  """Logical unit for querying node groups.
+
+  """
+  REQ_BGL = False
+
+  def CheckArguments(self):
+    self.gq = _GroupQuery(qlang.MakeSimpleFilter("name", self.op.names),
+                          self.op.output_fields, False)
+
+  def ExpandNames(self):
+    self.gq.ExpandNames(self)
+
+  def Exec(self, feedback_fn):
+    return self.gq.OldStyleQuery(self)
+
+
+class LUGroupSetParams(LogicalUnit):
+  """Modifies the parameters of a node group.
+
+  """
+  HPATH = "group-modify"
+  HTYPE = constants.HTYPE_GROUP
+  REQ_BGL = False
+
+  def CheckArguments(self):
+    all_changes = [
+      self.op.ndparams,
+      self.op.alloc_policy,
+      ]
+
+    if all_changes.count(None) == len(all_changes):
+      raise errors.OpPrereqError("Please pass at least one modification",
+                                 errors.ECODE_INVAL)
+
+  def ExpandNames(self):
+    # This raises errors.OpPrereqError on its own:
+    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
+
+    self.needed_locks = {
+      locking.LEVEL_NODEGROUP: [self.group_uuid],
+      }
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    """
+    self.group = self.cfg.GetNodeGroup(self.group_uuid)
+
+    if self.group is None:
+      raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" %
+                               (self.op.group_name, self.group_uuid))
+
+    if self.op.ndparams:
+      new_ndparams = _GetUpdatedParams(self.group.ndparams, self.op.ndparams)
+      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
+      self.new_ndparams = new_ndparams
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    """
+    env = {
+      "GROUP_NAME": self.op.group_name,
+      "NEW_ALLOC_POLICY": self.op.alloc_policy,
+      }
+    mn = self.cfg.GetMasterNode()
+    return env, [mn], [mn]
+
+  def Exec(self, feedback_fn):
+    """Modifies the node group.
+
+    """
+    result = []
+
+    if self.op.ndparams:
+      self.group.ndparams = self.new_ndparams
+      result.append(("ndparams", str(self.group.ndparams)))
+
+    if self.op.alloc_policy:
+      self.group.alloc_policy = self.op.alloc_policy
+
+    self.cfg.Update(self.group, feedback_fn)
+    return result
+
+
+
+class LUGroupRemove(LogicalUnit):
+  HPATH = "group-remove"
+  HTYPE = constants.HTYPE_GROUP
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    # This will raises errors.OpPrereqError on its own:
+    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
+    self.needed_locks = {
+      locking.LEVEL_NODEGROUP: [self.group_uuid],
+      }
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    This checks that the given group name exists as a node group, that is
+    empty (i.e., contains no nodes), and that is not the last group of the
+    cluster.
+
+    """
+    # Verify that the group is empty.
+    group_nodes = [node.name
+                   for node in self.cfg.GetAllNodesInfo().values()
+                   if node.group == self.group_uuid]
+
+    if group_nodes:
+      raise errors.OpPrereqError("Group '%s' not empty, has the following"
+                                 " nodes: %s" %
+                                 (self.op.group_name,
+                                  utils.CommaJoin(utils.NiceSort(group_nodes))),
+                                 errors.ECODE_STATE)
+
+    # Verify the cluster would not be left group-less.
+    if len(self.cfg.GetNodeGroupList()) == 1:
+      raise errors.OpPrereqError("Group '%s' is the only group,"
+                                 " cannot be removed" %
+                                 self.op.group_name,
+                                 errors.ECODE_STATE)
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    """
+    env = {
+      "GROUP_NAME": self.op.group_name,
+      }
+    mn = self.cfg.GetMasterNode()
+    return env, [mn], [mn]
+
+  def Exec(self, feedback_fn):
+    """Remove the node group.
+
+    """
+    try:
+      self.cfg.RemoveNodeGroup(self.group_uuid)
+    except errors.ConfigurationError:
+      raise errors.OpExecError("Group '%s' with UUID %s disappeared" %
+                               (self.op.group_name, self.group_uuid))
+
+    self.remove_locks[locking.LEVEL_NODEGROUP] = self.group_uuid
+
+
+class LUGroupRename(LogicalUnit):
+  HPATH = "group-rename"
+  HTYPE = constants.HTYPE_GROUP
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    # This raises errors.OpPrereqError on its own:
+    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
+
+    self.needed_locks = {
+      locking.LEVEL_NODEGROUP: [self.group_uuid],
+      }
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    Ensures requested new name is not yet used.
+
+    """
+    try:
+      new_name_uuid = self.cfg.LookupNodeGroup(self.op.new_name)
+    except errors.OpPrereqError:
+      pass
+    else:
+      raise errors.OpPrereqError("Desired new name '%s' clashes with existing"
+                                 " node group (UUID: %s)" %
+                                 (self.op.new_name, new_name_uuid),
+                                 errors.ECODE_EXISTS)
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    """
+    env = {
+      "OLD_NAME": self.op.group_name,
+      "NEW_NAME": self.op.new_name,
+      }
+
+    mn = self.cfg.GetMasterNode()
+    all_nodes = self.cfg.GetAllNodesInfo()
+    run_nodes = [mn]
+    all_nodes.pop(mn, None)
+
+    for node in all_nodes.values():
+      if node.group == self.group_uuid:
+        run_nodes.append(node.name)
+
+    return env, run_nodes, run_nodes
+
+  def Exec(self, feedback_fn):
+    """Rename the node group.
+
+    """
+    group = self.cfg.GetNodeGroup(self.group_uuid)
+
+    if group is None:
+      raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" %
+                               (self.op.group_name, self.group_uuid))
+
+    group.name = self.op.new_name
+    self.cfg.Update(group, feedback_fn)
+
+    return self.op.new_name
+
+
+class TagsLU(NoHooksLU): # pylint: disable-msg=W0223
+  """Generic tags LU.
+
+  This is an abstract class which is the parent of all the other tags LUs.
 
   """
 
@@ -9801,15 +10662,10 @@ class TagsLU(NoHooksLU): # pylint: disable-msg=W0223
                                  str(self.op.kind), errors.ECODE_INVAL)
 
 
-class LUGetTags(TagsLU):
+class LUTagsGet(TagsLU):
   """Returns the tags of a given object.
 
   """
-  _OP_PARAMS = [
-    ("kind", _NoDefault, _TElemOf(constants.VALID_TAG_TYPES)),
-    # Name is only meaningful for nodes and instances
-    ("name", _NoDefault, _TMaybeString),
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -9825,13 +10681,10 @@ class LUGetTags(TagsLU):
     return list(self.target.GetTags())
 
 
-class LUSearchTags(NoHooksLU):
+class LUTagsSearch(NoHooksLU):
   """Searches the tags for a given pattern.
 
   """
-  _OP_PARAMS = [
-    ("pattern", _NoDefault, _TNonEmptyString),
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -9867,16 +10720,10 @@ class LUSearchTags(NoHooksLU):
     return results
 
 
-class LUAddTags(TagsLU):
+class LUTagsSet(TagsLU):
   """Sets a tag on a given object.
 
   """
-  _OP_PARAMS = [
-    ("kind", _NoDefault, _TElemOf(constants.VALID_TAG_TYPES)),
-    # Name is only meaningful for nodes and instances
-    ("name", _NoDefault, _TMaybeString),
-    ("tags", _NoDefault, _TListOf(_TNonEmptyString)),
-    ]
   REQ_BGL = False
 
   def CheckPrereq(self):
@@ -9901,16 +10748,10 @@ class LUAddTags(TagsLU):
     self.cfg.Update(self.target, feedback_fn)
 
 
-class LUDelTags(TagsLU):
+class LUTagsDel(TagsLU):
   """Delete a list of tags from a given object.
 
   """
-  _OP_PARAMS = [
-    ("kind", _NoDefault, _TElemOf(constants.VALID_TAG_TYPES)),
-    # Name is only meaningful for nodes and instances
-    ("name", _NoDefault, _TMaybeString),
-    ("tags", _NoDefault, _TListOf(_TNonEmptyString)),
-    ]
   REQ_BGL = False
 
   def CheckPrereq(self):
@@ -9948,12 +10789,6 @@ class LUTestDelay(NoHooksLU):
   time.
 
   """
-  _OP_PARAMS = [
-    ("duration", _NoDefault, _TFloat),
-    ("on_master", True, _TBool),
-    ("on_nodes", _EmptyList, _TListOf(_TNonEmptyString)),
-    ("repeat", 0, _TPositiveInt)
-    ]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -9995,16 +10830,10 @@ class LUTestDelay(NoHooksLU):
         self._TestDelay()
 
 
-class LUTestJobqueue(NoHooksLU):
+class LUTestJqueue(NoHooksLU):
   """Utility LU to test some aspects of the job queue.
 
   """
-  _OP_PARAMS = [
-    ("notify_waitlock", False, _TBool),
-    ("notify_exec", False, _TBool),
-    ("log_messages", _EmptyList, _TListOf(_TString)),
-    ("fail", False, _TBool),
-    ]
   REQ_BGL = False
 
   # Must be lower than default timeout for WaitForJobChange to see whether it
@@ -10220,12 +11049,12 @@ class IAllocator(object):
       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
       # we don't have job IDs
       }
+    ninfo = cfg.GetAllNodesInfo()
     iinfo = cfg.GetAllInstancesInfo().values()
     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
 
     # node data
-    node_results = {}
-    node_list = cfg.GetNodeList()
+    node_list = [n.name for n in ninfo.values() if n.vm_capable]
 
     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
       hypervisor_name = self.hypervisor
@@ -10239,9 +11068,43 @@ class IAllocator(object):
     node_iinfo = \
       self.rpc.call_all_instances_info(node_list,
                                        cluster_info.enabled_hypervisors)
-    for nname, nresult in node_data.items():
-      # first fill in static (config-based) values
-      ninfo = cfg.GetNodeInfo(nname)
+
+    data["nodegroups"] = self._ComputeNodeGroupData(cfg)
+
+    config_ndata = self._ComputeBasicNodeData(ninfo)
+    data["nodes"] = self._ComputeDynamicNodeData(ninfo, node_data, node_iinfo,
+                                                 i_list, config_ndata)
+    assert len(data["nodes"]) == len(ninfo), \
+        "Incomplete node data computed"
+
+    data["instances"] = self._ComputeInstanceData(cluster_info, i_list)
+
+    self.in_data = data
+
+  @staticmethod
+  def _ComputeNodeGroupData(cfg):
+    """Compute node groups data.
+
+    """
+    ng = {}
+    for guuid, gdata in cfg.GetAllNodeGroupsInfo().items():
+      ng[guuid] = {
+        "name": gdata.name,
+        "alloc_policy": gdata.alloc_policy,
+        }
+    return ng
+
+  @staticmethod
+  def _ComputeBasicNodeData(node_cfg):
+    """Compute global node data.
+
+    @rtype: dict
+    @returns: a dict of name: (node dict, node config)
+
+    """
+    node_results = {}
+    for ninfo in node_cfg.values():
+      # fill in static (config-based) values
       pnr = {
         "tags": list(ninfo.GetTags()),
         "primary_ip": ninfo.primary_ip,
@@ -10249,8 +11112,29 @@ class IAllocator(object):
         "offline": ninfo.offline,
         "drained": ninfo.drained,
         "master_candidate": ninfo.master_candidate,
+        "group": ninfo.group,
+        "master_capable": ninfo.master_capable,
+        "vm_capable": ninfo.vm_capable,
         }
 
+      node_results[ninfo.name] = pnr
+
+    return node_results
+
+  @staticmethod
+  def _ComputeDynamicNodeData(node_cfg, node_data, node_iinfo, i_list,
+                              node_results):
+    """Compute global node data.
+
+    @param node_results: the basic node structures as filled from the config
+
+    """
+    # make a copy of the current dict
+    node_results = dict(node_results)
+    for nname, nresult in node_data.items():
+      assert nname in node_results, "Missing basic data for node %s" % nname
+      ninfo = node_cfg[nname]
+
       if not (ninfo.offline or ninfo.drained):
         nresult.Raise("Can't get data for node %s" % nname)
         node_iinfo[nname].Raise("Can't get node instance info from node %s" %
@@ -10292,12 +11176,16 @@ class IAllocator(object):
           "i_pri_memory": i_p_mem,
           "i_pri_up_memory": i_p_up_mem,
           }
-        pnr.update(pnr_dyn)
+        pnr_dyn.update(node_results[nname])
+        node_results[nname] = pnr_dyn
 
-      node_results[nname] = pnr
-    data["nodes"] = node_results
+    return node_results
 
-    # instance data
+  @staticmethod
+  def _ComputeInstanceData(cluster_info, i_list):
+    """Compute global instance data.
+
+    """
     instance_data = {}
     for iinfo, beinfo in i_list:
       nic_data = []
@@ -10327,9 +11215,7 @@ class IAllocator(object):
                                                  pir["disks"])
       instance_data[iinfo.name] = pir
 
-    data["instances"] = instance_data
-
-    self.in_data = data
+    return instance_data
 
   def _AddNewInstance(self):
     """Add new instance data to allocator structure.
@@ -10469,24 +11355,6 @@ class LUTestAllocator(NoHooksLU):
   This LU runs the allocator tests
 
   """
-  _OP_PARAMS = [
-    ("direction", _NoDefault, _TElemOf(constants.VALID_IALLOCATOR_DIRECTIONS)),
-    ("mode", _NoDefault, _TElemOf(constants.VALID_IALLOCATOR_MODES)),
-    ("name", _NoDefault, _TNonEmptyString),
-    ("nics", _NoDefault, _TOr(_TNone, _TListOf(
-      _TDictOf(_TElemOf(["mac", "ip", "bridge"]),
-               _TOr(_TNone, _TNonEmptyString))))),
-    ("disks", _NoDefault, _TOr(_TNone, _TList)),
-    ("hypervisor", None, _TMaybeString),
-    ("allocator", None, _TMaybeString),
-    ("tags", _EmptyList, _TListOf(_TNonEmptyString)),
-    ("mem_size", None, _TOr(_TNone, _TPositiveInt)),
-    ("vcpus", None, _TOr(_TNone, _TPositiveInt)),
-    ("os", None, _TMaybeString),
-    ("disk_template", None, _TMaybeString),
-    ("evac_nodes", None, _TOr(_TNone, _TListOf(_TNonEmptyString))),
-    ]
-
   def CheckPrereq(self):
     """Check prerequisites.
 
@@ -10576,3 +11444,24 @@ class LUTestAllocator(NoHooksLU):
       ial.Run(self.op.allocator, validate=False)
       result = ial.out_text
     return result
+
+
+#: Query type implementations
+_QUERY_IMPL = {
+  constants.QR_INSTANCE: _InstanceQuery,
+  constants.QR_NODE: _NodeQuery,
+  constants.QR_GROUP: _GroupQuery,
+  }
+
+
+def _GetQueryImplementation(name):
+  """Returns the implemtnation for a query type.
+
+  @param name: Query type, must be one of L{constants.QR_OP_QUERY}
+
+  """
+  try:
+    return _QUERY_IMPL[name]
+  except KeyError:
+    raise errors.OpPrereqError("Unknown query resource '%s'" % name,
+                               errors.ECODE_INVAL)