Removed _CheckNodesFreeDisk function
[ganeti-local] / lib / cmdlib.py
index 84d6f05..7cc7943 100644 (file)
@@ -53,191 +53,38 @@ from ganeti import uidpool
 from ganeti import compat
 from ganeti import masterd
 from ganeti import netutils
+from ganeti import ht
 
 import ganeti.masterd.instance # pylint: disable-msg=W0611
 
-
-# Modifiable default values; need to define these here before the
-# actual LUs
-
-def _EmptyList():
-  """Returns an empty list.
-
-  """
-  return []
-
-
-def _EmptyDict():
-  """Returns an empty dict.
-
-  """
-  return {}
-
-
-#: The without-default default value
-_NoDefault = object()
-
-
-#: The no-type (value to complex to check it in the type system)
-_NoType = object()
-
-
-# Some basic types
-def _TNotNone(val):
-  """Checks if the given value is not None.
-
-  """
-  return val is not None
-
-
-def _TNone(val):
-  """Checks if the given value is None.
-
-  """
-  return val is None
-
-
-def _TBool(val):
-  """Checks if the given value is a boolean.
-
-  """
-  return isinstance(val, bool)
-
-
-def _TInt(val):
-  """Checks if the given value is an integer.
-
-  """
-  return isinstance(val, int)
-
-
-def _TFloat(val):
-  """Checks if the given value is a float.
-
-  """
-  return isinstance(val, float)
-
-
-def _TString(val):
-  """Checks if the given value is a string.
-
-  """
-  return isinstance(val, basestring)
-
-
-def _TTrue(val):
-  """Checks if a given value evaluates to a boolean True value.
-
-  """
-  return bool(val)
-
-
-def _TElemOf(target_list):
-  """Builds a function that checks if a given value is a member of a list.
-
-  """
-  return lambda val: val in target_list
-
-
-# Container types
-def _TList(val):
-  """Checks if the given value is a list.
-
-  """
-  return isinstance(val, list)
-
-
-def _TDict(val):
-  """Checks if the given value is a dictionary.
-
-  """
-  return isinstance(val, dict)
-
-
-# Combinator types
-def _TAnd(*args):
-  """Combine multiple functions using an AND operation.
-
-  """
-  def fn(val):
-    return compat.all(t(val) for t in args)
-  return fn
-
-
-def _TOr(*args):
-  """Combine multiple functions using an AND operation.
-
-  """
-  def fn(val):
-    return compat.any(t(val) for t in args)
-  return fn
-
-
-# Type aliases
-
-#: a non-empty string
-_TNonEmptyString = _TAnd(_TString, _TTrue)
-
-
-#: a maybe non-empty string
-_TMaybeString = _TOr(_TNonEmptyString, _TNone)
-
-
-#: a maybe boolean (bool or none)
-_TMaybeBool = _TOr(_TBool, _TNone)
-
-
-#: a positive integer
-_TPositiveInt = _TAnd(_TInt, lambda v: v >= 0)
-
-#: a strictly positive integer
-_TStrictPositiveInt = _TAnd(_TInt, lambda v: v > 0)
-
-
-def _TListOf(my_type):
-  """Checks if a given value is a list with all elements of the same type.
-
-  """
-  return _TAnd(_TList,
-               lambda lst: compat.all(my_type(v) for v in lst))
-
-
-def _TDictOf(key_type, val_type):
-  """Checks a dict type for the type of its key/values.
-
-  """
-  return _TAnd(_TDict,
-               lambda my_dict: (compat.all(key_type(v) for v in my_dict.keys())
-                                and compat.all(val_type(v)
-                                               for v in my_dict.values())))
-
-
 # Common opcode attributes
 
 #: output fields for a query operation
-_POutputFields = ("output_fields", _NoDefault, _TListOf(_TNonEmptyString))
+_POutputFields = ("output_fields", ht.NoDefault, ht.TListOf(ht.TNonEmptyString))
 
 
 #: the shutdown timeout
 _PShutdownTimeout = ("shutdown_timeout", constants.DEFAULT_SHUTDOWN_TIMEOUT,
-                     _TPositiveInt)
+                     ht.TPositiveInt)
 
 #: the force parameter
-_PForce = ("force", False, _TBool)
+_PForce = ("force", False, ht.TBool)
 
 #: a required instance name (for single-instance LUs)
-_PInstanceName = ("instance_name", _NoDefault, _TNonEmptyString)
+_PInstanceName = ("instance_name", ht.NoDefault, ht.TNonEmptyString)
 
+#: Whether to ignore offline nodes
+_PIgnoreOfflineNodes = ("ignore_offline_nodes", False, ht.TBool)
 
 #: a required node name (for single-node LUs)
-_PNodeName = ("node_name", _NoDefault, _TNonEmptyString)
+_PNodeName = ("node_name", ht.NoDefault, ht.TNonEmptyString)
 
 #: the migration type (live/non-live)
-_PMigrationMode = ("mode", None, _TOr(_TNone,
-                                      _TElemOf(constants.HT_MIGRATION_MODES)))
+_PMigrationMode = ("mode", None,
+                   ht.TOr(ht.TNone, ht.TElemOf(constants.HT_MIGRATION_MODES)))
 
 #: the obsolete 'live' mode (boolean)
-_PMigrationLive = ("live", None, _TMaybeBool)
+_PMigrationLive = ("live", None, ht.TMaybeBool)
 
 
 # End types
@@ -306,7 +153,7 @@ class LogicalUnit(object):
     op_id = self.op.OP_ID
     for attr_name, aval, test in self._OP_PARAMS:
       if not hasattr(op, attr_name):
-        if aval == _NoDefault:
+        if aval == ht.NoDefault:
           raise errors.OpPrereqError("Required parameter '%s.%s' missing" %
                                      (op_id, attr_name), errors.ECODE_INVAL)
         else:
@@ -316,7 +163,7 @@ class LogicalUnit(object):
             dval = aval
           setattr(self.op, attr_name, dval)
       attr_val = getattr(op, attr_name)
-      if test == _NoType:
+      if test == ht.NoType:
         # no tests here
         continue
       if not callable(test):
@@ -746,17 +593,19 @@ def _CheckGlobalHvParams(params):
     raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
 
 
-def _CheckNodeOnline(lu, node):
+def _CheckNodeOnline(lu, node, msg=None):
   """Ensure that a given node is online.
 
   @param lu: the LU on behalf of which we make the check
   @param node: the node to check
+  @param msg: if passed, should be a message to replace the default one
   @raise errors.OpPrereqError: if the node is offline
 
   """
+  if msg is None:
+    msg = "Can't use offline node"
   if lu.cfg.GetNodeInfo(node).offline:
-    raise errors.OpPrereqError("Can't use offline node %s" % node,
-                               errors.ECODE_INVAL)
+    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
 
 
 def _CheckNodeNotDrained(lu, node):
@@ -769,7 +618,20 @@ def _CheckNodeNotDrained(lu, node):
   """
   if lu.cfg.GetNodeInfo(node).drained:
     raise errors.OpPrereqError("Can't use drained node %s" % node,
-                               errors.ECODE_INVAL)
+                               errors.ECODE_STATE)
+
+
+def _CheckNodeVmCapable(lu, node):
+  """Ensure that a given node is vm capable.
+
+  @param lu: the LU on behalf of which we make the check
+  @param node: the node to check
+  @raise errors.OpPrereqError: if the node is not vm capable
+
+  """
+  if not lu.cfg.GetNodeInfo(node).vm_capable:
+    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
+                               errors.ECODE_STATE)
 
 
 def _CheckNodeHasOS(lu, node, os_name, force_variant):
@@ -790,6 +652,33 @@ def _CheckNodeHasOS(lu, node, os_name, force_variant):
     _CheckOSVariant(result.payload, os_name)
 
 
+def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
+  """Ensure that a node has the given secondary ip.
+
+  @type lu: L{LogicalUnit}
+  @param lu: the LU on behalf of which we make the check
+  @type node: string
+  @param node: the node to check
+  @type secondary_ip: string
+  @param secondary_ip: the ip to check
+  @type prereq: boolean
+  @param prereq: whether to throw a prerequisite or an execute error
+  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
+  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
+
+  """
+  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
+  result.Raise("Failure checking secondary ip on node %s" % node,
+               prereq=prereq, ecode=errors.ECODE_ENVIRON)
+  if not result.payload:
+    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
+           " please fix and re-run this command" % secondary_ip)
+    if prereq:
+      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
+    else:
+      raise errors.OpExecError(msg)
+
+
 def _RequireFileStorage():
   """Checks that file storage is enabled.
 
@@ -1088,9 +977,8 @@ def _CheckOSVariant(os_obj, name):
   """
   if not os_obj.supported_variants:
     return
-  try:
-    variant = name.split("+", 1)[1]
-  except IndexError:
+  variant = objects.OS.GetVariant(name)
+  if not variant:
     raise errors.OpPrereqError("OS name must include a variant",
                                errors.ECODE_INVAL)
 
@@ -1249,7 +1137,6 @@ class LUDestroyCluster(LogicalUnit):
 
     """
     master = self.cfg.GetMasterNode()
-    modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
 
     # Run post hooks on master node before it's removed
     hm = self.proc.hmclass(self.rpc.call_hooks_runner, self)
@@ -1262,11 +1149,6 @@ class LUDestroyCluster(LogicalUnit):
     result = self.rpc.call_node_stop_master(master, False)
     result.Raise("Could not disable the master role")
 
-    if modify_ssh_setup:
-      priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
-      utils.CreateBackup(priv_key)
-      utils.CreateBackup(pub_key)
-
     return master
 
 
@@ -1310,11 +1192,11 @@ class LUVerifyCluster(LogicalUnit):
   HPATH = "cluster-verify"
   HTYPE = constants.HTYPE_CLUSTER
   _OP_PARAMS = [
-    ("skip_checks", _EmptyList,
-     _TListOf(_TElemOf(constants.VERIFY_OPTIONAL_CHECKS))),
-    ("verbose", False, _TBool),
-    ("error_codes", False, _TBool),
-    ("debug_simulate_errors", False, _TBool),
+    ("skip_checks", ht.EmptyList,
+     ht.TListOf(ht.TElemOf(constants.VERIFY_OPTIONAL_CHECKS))),
+    ("verbose", False, ht.TBool),
+    ("error_codes", False, ht.TBool),
+    ("debug_simulate_errors", False, ht.TBool),
     ]
   REQ_BGL = False
 
@@ -1328,7 +1210,7 @@ class LUVerifyCluster(LogicalUnit):
   EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
   EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
   EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
-  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
+  EINSTANCEFAULTYDISK = (TINSTANCE, "EINSTANCEFAULTYDISK")
   EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
   ENODEDRBD = (TNODE, "ENODEDRBD")
   ENODEDRBDHELPER = (TNODE, "ENODEDRBDHELPER")
@@ -1379,9 +1261,11 @@ class LUVerifyCluster(LogicalUnit):
     @ivar os_fail: whether the RPC call didn't return valid OS data
     @type oslist: list
     @ivar oslist: list of OSes as diagnosed by DiagnoseOS
+    @type vm_capable: boolean
+    @ivar vm_capable: whether the node can host instances
 
     """
-    def __init__(self, offline=False, name=None):
+    def __init__(self, offline=False, name=None, vm_capable=True):
       self.name = name
       self.volumes = {}
       self.instances = []
@@ -1391,6 +1275,7 @@ class LUVerifyCluster(LogicalUnit):
       self.mfree = 0
       self.dfree = 0
       self.offline = offline
+      self.vm_capable = vm_capable
       self.rpc_fail = False
       self.lvm_fail = False
       self.hyp_fail = False
@@ -1495,13 +1380,12 @@ class LUVerifyCluster(LogicalUnit):
                   code=self.ETYPE_WARNING)
 
     hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
-    if isinstance(hyp_result, dict):
+    if ninfo.vm_capable and isinstance(hyp_result, dict):
       for hv_name, hv_result in hyp_result.iteritems():
         test = hv_result is not None
         _ErrorIf(test, self.ENODEHV, node,
                  "hypervisor %s verify failure: '%s'", hv_name, hv_result)
 
-
     test = nresult.get(constants.NV_NODESETUP,
                            ["Missing NODESETUP results"])
     _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
@@ -1620,8 +1504,8 @@ class LUVerifyCluster(LogicalUnit):
           msg = "cannot reach the master IP"
         _ErrorIf(True, self.ENODENET, node, msg)
 
-
-  def _VerifyInstance(self, instance, instanceconfig, node_image):
+  def _VerifyInstance(self, instance, instanceconfig, node_image,
+                      diskstatus):
     """Verify an instance.
 
     This function checks to see if the required block devices are
@@ -1657,6 +1541,20 @@ class LUVerifyCluster(LogicalUnit):
         _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
                  "instance should not run on node %s", node)
 
+    diskdata = [(nname, success, status, idx)
+                for (nname, disks) in diskstatus.items()
+                for idx, (success, status) in enumerate(disks)]
+
+    for nname, success, bdev_status, idx in diskdata:
+      _ErrorIf(instanceconfig.admin_up and not success,
+               self.EINSTANCEFAULTYDISK, instance,
+               "couldn't retrieve status for disk/%s on %s: %s",
+               idx, nname, bdev_status)
+      _ErrorIf((instanceconfig.admin_up and success and
+                bdev_status.ldisk_status == constants.LDS_FAULTY),
+               self.EINSTANCEFAULTYDISK, instance,
+               "disk/%s on %s is faulty", idx, nname)
+
   def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
     """Verify if there are any unknown volumes in the cluster.
 
@@ -2007,6 +1905,80 @@ class LUVerifyCluster(LogicalUnit):
           _ErrorIf(True, self.ENODERPC, node,
                    "node returned invalid LVM info, check LVM status")
 
+  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
+    """Gets per-disk status information for all instances.
+
+    @type nodelist: list of strings
+    @param nodelist: Node names
+    @type node_image: dict of (name, L{objects.Node})
+    @param node_image: Node objects
+    @type instanceinfo: dict of (name, L{objects.Instance})
+    @param instanceinfo: Instance objects
+
+    """
+    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+
+    node_disks = {}
+    node_disks_devonly = {}
+
+    for nname in nodelist:
+      disks = [(inst, disk)
+               for instlist in [node_image[nname].pinst,
+                                node_image[nname].sinst]
+               for inst in instlist
+               for disk in instanceinfo[inst].disks]
+
+      if not disks:
+        # No need to collect data
+        continue
+
+      node_disks[nname] = disks
+
+      # Creating copies as SetDiskID below will modify the objects and that can
+      # lead to incorrect data returned from nodes
+      devonly = [dev.Copy() for (_, dev) in disks]
+
+      for dev in devonly:
+        self.cfg.SetDiskID(dev, nname)
+
+      node_disks_devonly[nname] = devonly
+
+    assert len(node_disks) == len(node_disks_devonly)
+
+    # Collect data from all nodes with disks
+    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
+                                                          node_disks_devonly)
+
+    assert len(result) == len(node_disks)
+
+    instdisk = {}
+
+    for (nname, nres) in result.items():
+      if nres.offline:
+        # Ignore offline node
+        continue
+
+      disks = node_disks[nname]
+
+      msg = nres.fail_msg
+      _ErrorIf(msg, self.ENODERPC, nname,
+               "while getting disk information: %s", nres.fail_msg)
+      if msg:
+        # No data from this node
+        data = len(disks) * [None]
+      else:
+        data = nres.payload
+
+      for ((inst, _), status) in zip(disks, data):
+        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
+
+    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
+                      len(nnames) <= len(instanceinfo[inst].all_nodes)
+                      for inst, nnames in instdisk.items()
+                      for nname, statuses in nnames.items())
+
+    return instdisk
+
   def BuildHooksEnv(self):
     """Build hooks env.
 
@@ -2085,6 +2057,7 @@ class LUVerifyCluster(LogicalUnit):
       constants.NV_TIME: None,
       constants.NV_MASTERIP: (master_node, master_ip),
       constants.NV_OSLIST: None,
+      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
       }
 
     if vg_name is not None:
@@ -2098,7 +2071,8 @@ class LUVerifyCluster(LogicalUnit):
 
     # Build our expected cluster state
     node_image = dict((node.name, self.NodeImage(offline=node.offline,
-                                                 name=node.name))
+                                                 name=node.name,
+                                                 vm_capable=node.vm_capable))
                       for node in nodeinfo)
 
     for instance in instancelist:
@@ -2137,6 +2111,9 @@ class LUVerifyCluster(LogicalUnit):
 
     all_drbd_map = self.cfg.ComputeDRBDMap()
 
+    feedback_fn("* Gathering disk information (%s nodes)" % len(nodelist))
+    instdisk = self._CollectDiskInfo(nodelist, node_image, instanceinfo)
+
     feedback_fn("* Verifying node status")
 
     refos_img = None
@@ -2172,29 +2149,32 @@ class LUVerifyCluster(LogicalUnit):
       nresult = all_nvinfo[node].payload
 
       nimg.call_ok = self._VerifyNode(node_i, nresult)
+      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
       self._VerifyNodeNetwork(node_i, nresult)
-      self._VerifyNodeLVM(node_i, nresult, vg_name)
       self._VerifyNodeFiles(node_i, nresult, file_names, local_checksums,
                             master_files)
-      self._VerifyNodeDrbd(node_i, nresult, instanceinfo, drbd_helper,
-                           all_drbd_map)
-      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
 
-      self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
-      self._UpdateNodeInstances(node_i, nresult, nimg)
-      self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
-      self._UpdateNodeOS(node_i, nresult, nimg)
-      if not nimg.os_fail:
-        if refos_img is None:
-          refos_img = nimg
-        self._VerifyNodeOS(node_i, nimg, refos_img)
+      if nimg.vm_capable:
+        self._VerifyNodeLVM(node_i, nresult, vg_name)
+        self._VerifyNodeDrbd(node_i, nresult, instanceinfo, drbd_helper,
+                             all_drbd_map)
+
+        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
+        self._UpdateNodeInstances(node_i, nresult, nimg)
+        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
+        self._UpdateNodeOS(node_i, nresult, nimg)
+        if not nimg.os_fail:
+          if refos_img is None:
+            refos_img = nimg
+          self._VerifyNodeOS(node_i, nimg, refos_img)
 
     feedback_fn("* Verifying instance status")
     for instance in instancelist:
       if verbose:
         feedback_fn("* Verifying instance %s" % instance)
       inst_config = instanceinfo[instance]
-      self._VerifyInstance(instance, inst_config, node_image)
+      self._VerifyInstance(instance, inst_config, node_image,
+                           instdisk[instance])
       inst_nodes_offline = []
 
       pnode = inst_config.primary_node
@@ -2233,10 +2213,12 @@ class LUVerifyCluster(LogicalUnit):
       _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
                "instance lives on offline node(s) %s",
                utils.CommaJoin(inst_nodes_offline))
-      # ... or ghost nodes
+      # ... or ghost/non-vm_capable nodes
       for node in inst_config.all_nodes:
         _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance,
                  "instance lives on ghost node %s", node)
+        _ErrorIf(not node_image[node].vm_capable, self.EINSTANCEBADNODE,
+                 instance, "instance lives on non-vm_capable node %s", node)
 
     feedback_fn("* Verifying orphan volumes")
     reserved = utils.FieldSet(*cluster.reserved_lvs)
@@ -2391,7 +2373,7 @@ class LURepairDiskSizes(NoHooksLU):
   """Verifies the cluster disks sizes.
 
   """
-  _OP_PARAMS = [("instances", _EmptyList, _TListOf(_TNonEmptyString))]
+  _OP_PARAMS = [("instances", ht.EmptyList, ht.TListOf(ht.TNonEmptyString))]
   REQ_BGL = False
 
   def ExpandNames(self):
@@ -2509,7 +2491,7 @@ class LURenameCluster(LogicalUnit):
   """
   HPATH = "cluster-rename"
   HTYPE = constants.HTYPE_CLUSTER
-  _OP_PARAMS = [("name", _NoDefault, _TNonEmptyString)]
+  _OP_PARAMS = [("name", ht.NoDefault, ht.TNonEmptyString)]
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -2541,7 +2523,7 @@ class LURenameCluster(LogicalUnit):
     if new_ip != old_ip:
       if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
         raise errors.OpPrereqError("The given cluster IP address (%s) is"
-                                   " reachable on the network. Aborting." %
+                                   " reachable on the network" %
                                    new_ip, errors.ECODE_NOTUNIQUE)
 
     self.op.name = new_name
@@ -2571,15 +2553,7 @@ class LURenameCluster(LogicalUnit):
         node_list.remove(master)
       except ValueError:
         pass
-      result = self.rpc.call_upload_file(node_list,
-                                         constants.SSH_KNOWN_HOSTS_FILE)
-      for to_node, to_result in result.iteritems():
-        msg = to_result.fail_msg
-        if msg:
-          msg = ("Copy of file %s to node %s failed: %s" %
-                 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
-          self.proc.LogWarning(msg)
-
+      _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
     finally:
       result = self.rpc.call_node_start_master(master, False, False)
       msg = result.fail_msg
@@ -2597,22 +2571,38 @@ class LUSetClusterParams(LogicalUnit):
   HPATH = "cluster-modify"
   HTYPE = constants.HTYPE_CLUSTER
   _OP_PARAMS = [
-    ("vg_name", None, _TMaybeString),
+    ("vg_name", None, ht.TMaybeString),
     ("enabled_hypervisors", None,
-     _TOr(_TAnd(_TListOf(_TElemOf(constants.HYPER_TYPES)), _TTrue), _TNone)),
-    ("hvparams", None, _TOr(_TDictOf(_TNonEmptyString, _TDict), _TNone)),
-    ("beparams", None, _TOr(_TDictOf(_TNonEmptyString, _TDict), _TNone)),
-    ("os_hvp", None, _TOr(_TDictOf(_TNonEmptyString, _TDict), _TNone)),
-    ("osparams", None, _TOr(_TDictOf(_TNonEmptyString, _TDict), _TNone)),
-    ("candidate_pool_size", None, _TOr(_TStrictPositiveInt, _TNone)),
-    ("uid_pool", None, _NoType),
-    ("add_uids", None, _NoType),
-    ("remove_uids", None, _NoType),
-    ("maintain_node_health", None, _TMaybeBool),
-    ("nicparams", None, _TOr(_TDict, _TNone)),
-    ("drbd_helper", None, _TOr(_TString, _TNone)),
-    ("default_iallocator", None, _TMaybeString),
-    ("reserved_lvs", None, _TOr(_TListOf(_TNonEmptyString), _TNone)),
+     ht.TOr(ht.TAnd(ht.TListOf(ht.TElemOf(constants.HYPER_TYPES)), ht.TTrue),
+            ht.TNone)),
+    ("hvparams", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
+                              ht.TNone)),
+    ("beparams", None, ht.TOr(ht.TDict, ht.TNone)),
+    ("os_hvp", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
+                            ht.TNone)),
+    ("osparams", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
+                              ht.TNone)),
+    ("candidate_pool_size", None, ht.TOr(ht.TStrictPositiveInt, ht.TNone)),
+    ("uid_pool", None, ht.NoType),
+    ("add_uids", None, ht.NoType),
+    ("remove_uids", None, ht.NoType),
+    ("maintain_node_health", None, ht.TMaybeBool),
+    ("prealloc_wipe_disks", None, ht.TMaybeBool),
+    ("nicparams", None, ht.TOr(ht.TDict, ht.TNone)),
+    ("ndparams", None, ht.TOr(ht.TDict, ht.TNone)),
+    ("drbd_helper", None, ht.TOr(ht.TString, ht.TNone)),
+    ("default_iallocator", None, ht.TOr(ht.TString, ht.TNone)),
+    ("reserved_lvs", None, ht.TOr(ht.TListOf(ht.TNonEmptyString), ht.TNone)),
+    ("hidden_os", None, ht.TOr(ht.TListOf(\
+          ht.TAnd(ht.TList,
+                ht.TIsLength(2),
+                ht.TMap(lambda v: v[0], ht.TElemOf(constants.DDMS_VALUES)))),
+          ht.TNone)),
+    ("blacklisted_os", None, ht.TOr(ht.TListOf(\
+          ht.TAnd(ht.TList,
+                ht.TIsLength(2),
+                ht.TMap(lambda v: v[0], ht.TElemOf(constants.DDMS_VALUES)))),
+          ht.TNone)),
     ]
   REQ_BGL = False
 
@@ -2709,6 +2699,10 @@ class LUSetClusterParams(LogicalUnit):
       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
       self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
 
+    if self.op.ndparams:
+      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
+      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
+
     if self.op.nicparams:
       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
       self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
@@ -2862,6 +2856,8 @@ class LUSetClusterParams(LogicalUnit):
       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
     if self.op.osparams:
       self.cluster.osparams = self.new_osp
+    if self.op.ndparams:
+      self.cluster.ndparams = self.new_ndparams
 
     if self.op.candidate_pool_size is not None:
       self.cluster.candidate_pool_size = self.op.candidate_pool_size
@@ -2871,6 +2867,9 @@ class LUSetClusterParams(LogicalUnit):
     if self.op.maintain_node_health is not None:
       self.cluster.maintain_node_health = self.op.maintain_node_health
 
+    if self.op.prealloc_wipe_disks is not None:
+      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
+
     if self.op.add_uids is not None:
       uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
 
@@ -2886,10 +2885,47 @@ class LUSetClusterParams(LogicalUnit):
     if self.op.reserved_lvs is not None:
       self.cluster.reserved_lvs = self.op.reserved_lvs
 
+    def helper_os(aname, mods, desc):
+      desc += " OS list"
+      lst = getattr(self.cluster, aname)
+      for key, val in mods:
+        if key == constants.DDM_ADD:
+          if val in lst:
+            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
+          else:
+            lst.append(val)
+        elif key == constants.DDM_REMOVE:
+          if val in lst:
+            lst.remove(val)
+          else:
+            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
+        else:
+          raise errors.ProgrammerError("Invalid modification '%s'" % key)
+
+    if self.op.hidden_os:
+      helper_os("hidden_os", self.op.hidden_os, "hidden")
+
+    if self.op.blacklisted_os:
+      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
+
     self.cfg.Update(self.cluster, feedback_fn)
 
 
-def _RedistributeAncillaryFiles(lu, additional_nodes=None):
+def _UploadHelper(lu, nodes, fname):
+  """Helper for uploading a file and showing warnings.
+
+  """
+  if os.path.exists(fname):
+    result = lu.rpc.call_upload_file(nodes, fname)
+    for to_node, to_result in result.items():
+      msg = to_result.fail_msg
+      if msg:
+        msg = ("Copy of file %s to node %s failed: %s" %
+               (fname, to_node, msg))
+        lu.proc.LogWarning(msg)
+
+
+def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
   """Distribute additional files which are part of the cluster configuration.
 
   ConfigWriter takes care of distributing the config and ssconf files, but
@@ -2898,15 +2934,23 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None):
 
   @param lu: calling logical unit
   @param additional_nodes: list of nodes not in the config to distribute to
+  @type additional_vm: boolean
+  @param additional_vm: whether the additional nodes are vm-capable or not
 
   """
   # 1. Gather target nodes
   myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
   dist_nodes = lu.cfg.GetOnlineNodeList()
+  nvm_nodes = lu.cfg.GetNonVmCapableNodeList()
+  vm_nodes = [name for name in dist_nodes if name not in nvm_nodes]
   if additional_nodes is not None:
     dist_nodes.extend(additional_nodes)
+    if additional_vm:
+      vm_nodes.extend(additional_nodes)
   if myself.name in dist_nodes:
     dist_nodes.remove(myself.name)
+  if myself.name in vm_nodes:
+    vm_nodes.remove(myself.name)
 
   # 2. Gather files to distribute
   dist_files = set([constants.ETC_HOSTS,
@@ -2917,21 +2961,17 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None):
                     constants.CLUSTER_DOMAIN_SECRET_FILE,
                    ])
 
+  vm_files = set()
   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
   for hv_name in enabled_hypervisors:
     hv_class = hypervisor.GetHypervisor(hv_name)
-    dist_files.update(hv_class.GetAncillaryFiles())
+    vm_files.update(hv_class.GetAncillaryFiles())
 
   # 3. Perform the files upload
   for fname in dist_files:
-    if os.path.exists(fname):
-      result = lu.rpc.call_upload_file(dist_nodes, fname)
-      for to_node, to_result in result.items():
-        msg = to_result.fail_msg
-        if msg:
-          msg = ("Copy of file %s to node %s failed: %s" %
-                 (fname, to_node, msg))
-          lu.proc.LogWarning(msg)
+    _UploadHelper(lu, dist_nodes, fname)
+  for fname in vm_files:
+    _UploadHelper(lu, vm_nodes, fname)
 
 
 class LURedistributeConfig(NoHooksLU):
@@ -3071,12 +3111,15 @@ class LUDiagnoseOS(NoHooksLU):
   """
   _OP_PARAMS = [
     _POutputFields,
-    ("names", _EmptyList, _TListOf(_TNonEmptyString)),
+    ("names", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
     ]
   REQ_BGL = False
+  _HID = "hidden"
+  _BLK = "blacklisted"
+  _VLD = "valid"
   _FIELDS_STATIC = utils.FieldSet()
-  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants",
-                                   "parameters", "api_versions")
+  _FIELDS_DYNAMIC = utils.FieldSet("name", _VLD, "node_status", "variants",
+                                   "parameters", "api_versions", _HID, _BLK)
 
   def CheckArguments(self):
     if self.op.names:
@@ -3143,8 +3186,10 @@ class LUDiagnoseOS(NoHooksLU):
     node_data = self.rpc.call_os_diagnose(valid_nodes)
     pol = self._DiagnoseByOS(node_data)
     output = []
+    cluster = self.cfg.GetClusterInfo()
 
-    for os_name, os_data in pol.items():
+    for os_name in utils.NiceSort(pol.keys()):
+      os_data = pol[os_name]
       row = []
       valid = True
       (variants, params, api_versions) = null_state = (set(), set(), set())
@@ -3163,10 +3208,17 @@ class LUDiagnoseOS(NoHooksLU):
           params.intersection_update(node_params)
           api_versions.intersection_update(node_api)
 
+      is_hid = os_name in cluster.hidden_os
+      is_blk = os_name in cluster.blacklisted_os
+      if ((self._HID not in self.op.output_fields and is_hid) or
+          (self._BLK not in self.op.output_fields and is_blk) or
+          (self._VLD not in self.op.output_fields and not valid)):
+        continue
+
       for field in self.op.output_fields:
         if field == "name":
           val = os_name
-        elif field == "valid":
+        elif field == self._VLD:
           val = valid
         elif field == "node_status":
           # this is just a copy of the dict
@@ -3174,11 +3226,15 @@ class LUDiagnoseOS(NoHooksLU):
           for node_name, nos_list in os_data.items():
             val[node_name] = nos_list
         elif field == "variants":
-          val = list(variants)
+          val = utils.NiceSort(list(variants))
         elif field == "parameters":
           val = list(params)
         elif field == "api_versions":
           val = list(api_versions)
+        elif field == self._HID:
+          val = is_hid
+        elif field == self._BLK:
+          val = is_blk
         else:
           raise errors.ParameterError(field)
         row.append(val)
@@ -3278,8 +3334,11 @@ class LURemoveNode(LogicalUnit):
 
     # Remove node from our /etc/hosts
     if self.cfg.GetClusterInfo().modify_etc_hosts:
-      # FIXME: this should be done via an rpc call to node daemon
-      utils.RemoveHostFromEtcHosts(node.name)
+      master_node = self.cfg.GetMasterNode()
+      result = self.rpc.call_etc_hosts_modify(master_node,
+                                              constants.ETC_HOSTS_REMOVE,
+                                              node.name, None)
+      result.Raise("Can't update hosts file with new host data")
       _RedistributeAncillaryFiles(self)
 
 
@@ -3290,13 +3349,14 @@ class LUQueryNodes(NoHooksLU):
   # pylint: disable-msg=W0142
   _OP_PARAMS = [
     _POutputFields,
-    ("names", _EmptyList, _TListOf(_TNonEmptyString)),
-    ("use_locking", False, _TBool),
+    ("names", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
+    ("use_locking", False, ht.TBool),
     ]
   REQ_BGL = False
 
   _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
-                    "master_candidate", "offline", "drained"]
+                    "master_candidate", "offline", "drained",
+                    "master_capable", "vm_capable"]
 
   _FIELDS_DYNAMIC = utils.FieldSet(
     "dtotal", "dfree",
@@ -3309,8 +3369,9 @@ class LUQueryNodes(NoHooksLU):
     "pinst_cnt", "sinst_cnt",
     "pinst_list", "sinst_list",
     "pip", "sip", "tags",
-    "master",
-    "role"] + _SIMPLE_FIELDS
+    "master", "role",
+    "group.uuid", "group",
+    ] + _SIMPLE_FIELDS
     )
 
   def CheckArguments(self):
@@ -3352,6 +3413,11 @@ class LUQueryNodes(NoHooksLU):
     nodenames = utils.NiceSort(nodenames)
     nodelist = [all_info[name] for name in nodenames]
 
+    if "group" in self.op.output_fields:
+      groups = self.cfg.GetAllNodeGroupsInfo()
+    else:
+      groups = {}
+
     # begin data gathering
 
     if self.do_node_query:
@@ -3433,6 +3499,14 @@ class LUQueryNodes(NoHooksLU):
             val = "O"
           else:
             val = "R"
+        elif field == "group.uuid":
+          val = node.group
+        elif field == "group":
+          ng = groups.get(node.group, None)
+          if ng is None:
+            val = "<unknown>"
+          else:
+            val = ng.name
         else:
           raise errors.ParameterError(field)
         node_output.append(val)
@@ -3446,8 +3520,8 @@ class LUQueryNodeVolumes(NoHooksLU):
 
   """
   _OP_PARAMS = [
-    ("nodes", _EmptyList, _TListOf(_TNonEmptyString)),
-    ("output_fields", _NoDefault, _TListOf(_TNonEmptyString)),
+    _POutputFields,
+    ("nodes", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
     ]
   REQ_BGL = False
   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
@@ -3529,10 +3603,10 @@ class LUQueryNodeStorage(NoHooksLU):
   """
   _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
   _OP_PARAMS = [
-    ("nodes", _EmptyList, _TListOf(_TNonEmptyString)),
-    ("storage_type", _NoDefault, _CheckStorageType),
-    ("output_fields", _NoDefault, _TListOf(_TNonEmptyString)),
-    ("name", None, _TMaybeString),
+    _POutputFields,
+    ("nodes", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
+    ("storage_type", ht.NoDefault, _CheckStorageType),
+    ("name", None, ht.TMaybeString),
     ]
   REQ_BGL = False
 
@@ -3618,9 +3692,9 @@ class LUModifyNodeStorage(NoHooksLU):
   """
   _OP_PARAMS = [
     _PNodeName,
-    ("storage_type", _NoDefault, _CheckStorageType),
-    ("name", _NoDefault, _TNonEmptyString),
-    ("changes", _NoDefault, _TDict),
+    ("storage_type", ht.NoDefault, _CheckStorageType),
+    ("name", ht.NoDefault, ht.TNonEmptyString),
+    ("changes", ht.NoDefault, ht.TDict),
     ]
   REQ_BGL = False
 
@@ -3668,16 +3742,25 @@ class LUAddNode(LogicalUnit):
   HTYPE = constants.HTYPE_NODE
   _OP_PARAMS = [
     _PNodeName,
-    ("primary_ip", None, _NoType),
-    ("secondary_ip", None, _TMaybeString),
-    ("readd", False, _TBool),
+    ("primary_ip", None, ht.NoType),
+    ("secondary_ip", None, ht.TMaybeString),
+    ("readd", False, ht.TBool),
+    ("group", None, ht.TMaybeString),
+    ("master_capable", None, ht.TMaybeBool),
+    ("vm_capable", None, ht.TMaybeBool),
+    ("ndparams", None, ht.TOr(ht.TDict, ht.TNone)),
     ]
+  _NFLAGS = ["master_capable", "vm_capable"]
 
   def CheckArguments(self):
+    self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
     # validate/normalize the node name
     self.hostname = netutils.GetHostname(name=self.op.node_name,
-                                         family=self.cfg.GetPrimaryIPFamily())
+                                         family=self.primary_ip_family)
     self.op.node_name = self.hostname.name
+    if self.op.readd and self.op.group:
+      raise errors.OpPrereqError("Cannot pass a node group when a node is"
+                                 " being readded", errors.ECODE_INVAL)
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -3690,6 +3773,8 @@ class LUAddNode(LogicalUnit):
       "NODE_NAME": self.op.node_name,
       "NODE_PIP": self.op.primary_ip,
       "NODE_SIP": self.op.secondary_ip,
+      "MASTER_CAPABLE": str(self.op.master_capable),
+      "VM_CAPABLE": str(self.op.vm_capable),
       }
     nodes_0 = self.cfg.GetNodeList()
     nodes_1 = nodes_0 + [self.op.node_name, ]
@@ -3711,6 +3796,10 @@ class LUAddNode(LogicalUnit):
     node = hostname.name
     primary_ip = self.op.primary_ip = hostname.ip
     if self.op.secondary_ip is None:
+      if self.primary_ip_family == netutils.IP6Address.family:
+        raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
+                                   " IPv4 address must be given as secondary",
+                                   errors.ECODE_INVAL)
       self.op.secondary_ip = primary_ip
 
     secondary_ip = self.op.secondary_ip
@@ -3749,6 +3838,27 @@ class LUAddNode(LogicalUnit):
                                    " existing node %s" % existing_node.name,
                                    errors.ECODE_NOTUNIQUE)
 
+    # After this 'if' block, None is no longer a valid value for the
+    # _capable op attributes
+    if self.op.readd:
+      old_node = self.cfg.GetNodeInfo(node)
+      assert old_node is not None, "Can't retrieve locked node %s" % node
+      for attr in self._NFLAGS:
+        if getattr(self.op, attr) is None:
+          setattr(self.op, attr, getattr(old_node, attr))
+    else:
+      for attr in self._NFLAGS:
+        if getattr(self.op, attr) is None:
+          setattr(self.op, attr, True)
+
+    if self.op.readd and not self.op.vm_capable:
+      pri, sec = cfg.GetNodeInstances(node)
+      if pri or sec:
+        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
+                                   " flag set to false, but it already holds"
+                                   " instances" % node,
+                                   errors.ECODE_STATE)
+
     # check that the type of the node (single versus dual homed) is the
     # same as for the master
     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
@@ -3756,11 +3866,11 @@ class LUAddNode(LogicalUnit):
     newbie_singlehomed = secondary_ip == primary_ip
     if master_singlehomed != newbie_singlehomed:
       if master_singlehomed:
-        raise errors.OpPrereqError("The master has no private ip but the"
+        raise errors.OpPrereqError("The master has no secondary ip but the"
                                    " new node has one",
                                    errors.ECODE_INVAL)
       else:
-        raise errors.OpPrereqError("The master has a private ip but the"
+        raise errors.OpPrereqError("The master has a secondary ip but the"
                                    " new node doesn't have one",
                                    errors.ECODE_INVAL)
 
@@ -3774,7 +3884,7 @@ class LUAddNode(LogicalUnit):
       if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
                            source=myself.secondary_ip):
         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
-                                   " based ping to noded port",
+                                   " based ping to node daemon port",
                                    errors.ECODE_ENVIRON)
 
     if self.op.readd:
@@ -3782,17 +3892,24 @@ class LUAddNode(LogicalUnit):
     else:
       exceptions = []
 
-    self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
+    if self.op.master_capable:
+      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
+    else:
+      self.master_candidate = False
 
     if self.op.readd:
-      self.new_node = self.cfg.GetNodeInfo(node)
-      assert self.new_node is not None, "Can't retrieve locked node %s" % node
+      self.new_node = old_node
     else:
+      node_group = cfg.LookupNodeGroup(self.op.group)
       self.new_node = objects.Node(name=node,
                                    primary_ip=primary_ip,
                                    secondary_ip=secondary_ip,
                                    master_candidate=self.master_candidate,
-                                   offline=False, drained=False)
+                                   offline=False, drained=False,
+                                   group=node_group)
+
+    if self.op.ndparams:
+      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
 
   def Exec(self, feedback_fn):
     """Adds the new node to the cluster.
@@ -3813,10 +3930,17 @@ class LUAddNode(LogicalUnit):
       if self.changed_primary_ip:
         new_node.primary_ip = self.op.primary_ip
 
+    # copy the master/vm_capable flags
+    for attr in self._NFLAGS:
+      setattr(new_node, attr, getattr(self.op, attr))
+
     # notify the user about any possible mc promotion
     if new_node.master_candidate:
       self.LogInfo("Node will be a master candidate")
 
+    if self.op.ndparams:
+      new_node.ndparams = self.op.ndparams
+
     # check connectivity
     result = self.rpc.call_version([node])[node]
     result.Raise("Can't get version information from node %s" % node)
@@ -3828,37 +3952,18 @@ class LUAddNode(LogicalUnit):
                                " node version %s" %
                                (constants.PROTOCOL_VERSION, result.payload))
 
-    # setup ssh on node
-    if self.cfg.GetClusterInfo().modify_ssh_setup:
-      logging.info("Copy ssh key to node %s", node)
-      priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
-      keyarray = []
-      keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
-                  constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
-                  priv_key, pub_key]
-
-      for i in keyfiles:
-        keyarray.append(utils.ReadFile(i))
-
-      result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
-                                      keyarray[2], keyarray[3], keyarray[4],
-                                      keyarray[5])
-      result.Raise("Cannot transfer ssh keys to the new node")
-
     # Add node to our /etc/hosts, and add key to known_hosts
     if self.cfg.GetClusterInfo().modify_etc_hosts:
-      # FIXME: this should be done via an rpc call to node daemon
-      utils.AddHostToEtcHosts(self.hostname)
+      master_node = self.cfg.GetMasterNode()
+      result = self.rpc.call_etc_hosts_modify(master_node,
+                                              constants.ETC_HOSTS_ADD,
+                                              self.hostname.name,
+                                              self.hostname.ip)
+      result.Raise("Can't update hosts file with new host data")
 
     if new_node.secondary_ip != new_node.primary_ip:
-      result = self.rpc.call_node_has_ip_address(new_node.name,
-                                                 new_node.secondary_ip)
-      result.Raise("Failure checking secondary ip on node %s" % new_node.name,
-                   prereq=True, ecode=errors.ECODE_ENVIRON)
-      if not result.payload:
-        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
-                                 " you gave (%s). Please fix and re-run this"
-                                 " command." % new_node.secondary_ip)
+      _CheckNodeHasSecondaryIP(self, new_node.name, new_node.secondary_ip,
+                               False)
 
     node_verify_list = [self.cfg.GetMasterNode()]
     node_verify_param = {
@@ -3891,30 +3996,51 @@ class LUAddNode(LogicalUnit):
           self.LogWarning("Node failed to demote itself from master"
                           " candidate status: %s" % msg)
     else:
-      _RedistributeAncillaryFiles(self, additional_nodes=[node])
+      _RedistributeAncillaryFiles(self, additional_nodes=[node],
+                                  additional_vm=self.op.vm_capable)
       self.context.AddNode(new_node, self.proc.GetECId())
 
 
 class LUSetNodeParams(LogicalUnit):
   """Modifies the parameters of a node.
 
+  @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
+      to the node role (as _ROLE_*)
+  @cvar _R2F: a dictionary from node role to tuples of flags
+  @cvar _FLAGS: a list of attribute names corresponding to the flags
+
   """
   HPATH = "node-modify"
   HTYPE = constants.HTYPE_NODE
   _OP_PARAMS = [
     _PNodeName,
-    ("master_candidate", None, _TMaybeBool),
-    ("offline", None, _TMaybeBool),
-    ("drained", None, _TMaybeBool),
-    ("auto_promote", False, _TBool),
+    ("master_candidate", None, ht.TMaybeBool),
+    ("offline", None, ht.TMaybeBool),
+    ("drained", None, ht.TMaybeBool),
+    ("auto_promote", False, ht.TBool),
+    ("master_capable", None, ht.TMaybeBool),
+    ("vm_capable", None, ht.TMaybeBool),
+    ("secondary_ip", None, ht.TMaybeString),
+    ("ndparams", None, ht.TOr(ht.TDict, ht.TNone)),
     _PForce,
     ]
   REQ_BGL = False
+  (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
+  _F2R = {
+    (True, False, False): _ROLE_CANDIDATE,
+    (False, True, False): _ROLE_DRAINED,
+    (False, False, True): _ROLE_OFFLINE,
+    (False, False, False): _ROLE_REGULAR,
+    }
+  _R2F = dict((v, k) for k, v in _F2R.items())
+  _FLAGS = ["master_candidate", "drained", "offline"]
 
   def CheckArguments(self):
     self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
-    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
-    if all_mods.count(None) == 3:
+    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
+                self.op.master_capable, self.op.vm_capable,
+                self.op.secondary_ip]
+    if all_mods.count(None) == len(all_mods):
       raise errors.OpPrereqError("Please pass at least one modification",
                                  errors.ECODE_INVAL)
     if all_mods.count(True) > 1:
@@ -3922,16 +4048,20 @@ class LUSetNodeParams(LogicalUnit):
                                  " state at the same time",
                                  errors.ECODE_INVAL)
 
-    # Boolean value that tells us whether we're offlining or draining the node
-    self.offline_or_drain = (self.op.offline == True or
-                             self.op.drained == True)
-    self.deoffline_or_drain = (self.op.offline == False or
-                               self.op.drained == False)
+    # Boolean value that tells us whether we might be demoting from MC
     self.might_demote = (self.op.master_candidate == False or
-                         self.offline_or_drain)
+                         self.op.offline == True or
+                         self.op.drained == True or
+                         self.op.master_capable == False)
+
+    if self.op.secondary_ip:
+      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
+        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
+                                   " address" % self.op.secondary_ip,
+                                   errors.ECODE_INVAL)
 
     self.lock_all = self.op.auto_promote and self.might_demote
-
+    self.lock_instances = self.op.secondary_ip is not None
 
   def ExpandNames(self):
     if self.lock_all:
@@ -3939,6 +4069,29 @@ class LUSetNodeParams(LogicalUnit):
     else:
       self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
 
+    if self.lock_instances:
+      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
+
+  def DeclareLocks(self, level):
+    # If we have locked all instances, before waiting to lock nodes, release
+    # all the ones living on nodes unrelated to the current operation.
+    if level == locking.LEVEL_NODE and self.lock_instances:
+      instances_release = []
+      instances_keep = []
+      self.affected_instances = []
+      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
+        for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
+          instance = self.context.cfg.GetInstanceInfo(instance_name)
+          i_mirrored = instance.disk_template in constants.DTS_NET_MIRROR
+          if i_mirrored and self.op.node_name in instance.all_nodes:
+            instances_keep.append(instance_name)
+            self.affected_instances.append(instance)
+          else:
+            instances_release.append(instance_name)
+        if instances_release:
+          self.context.glm.release(locking.LEVEL_INSTANCE, instances_release)
+          self.acquired_locks[locking.LEVEL_INSTANCE] = instances_keep
+
   def BuildHooksEnv(self):
     """Build hooks env.
 
@@ -3950,6 +4103,8 @@ class LUSetNodeParams(LogicalUnit):
       "MASTER_CANDIDATE": str(self.op.master_candidate),
       "OFFLINE": str(self.op.offline),
       "DRAINED": str(self.op.drained),
+      "MASTER_CAPABLE": str(self.op.master_capable),
+      "VM_CAPABLE": str(self.op.vm_capable),
       }
     nl = [self.cfg.GetMasterNode(),
           self.op.node_name]
@@ -3972,6 +4127,17 @@ class LUSetNodeParams(LogicalUnit):
                                    " only via master-failover",
                                    errors.ECODE_INVAL)
 
+    if self.op.master_candidate and not node.master_capable:
+      raise errors.OpPrereqError("Node %s is not master capable, cannot make"
+                                 " it a master candidate" % node.name,
+                                 errors.ECODE_STATE)
+
+    if self.op.vm_capable == False:
+      (ipri, isec) = self.cfg.GetNodeInstances(self.op.node_name)
+      if ipri or isec:
+        raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
+                                   " the vm_capable flag" % node.name,
+                                   errors.ECODE_STATE)
 
     if node.master_candidate and self.might_demote and not self.lock_all:
       assert not self.op.auto_promote, "auto-promote set but lock_all not"
@@ -3982,80 +4148,144 @@ class LUSetNodeParams(LogicalUnit):
       if mc_remaining < mc_should:
         raise errors.OpPrereqError("Not enough master candidates, please"
                                    " pass auto_promote to allow promotion",
-                                   errors.ECODE_INVAL)
+                                   errors.ECODE_STATE)
 
-    if (self.op.master_candidate == True and
-        ((node.offline and not self.op.offline == False) or
-         (node.drained and not self.op.drained == False))):
-      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
-                                 " to master_candidate" % node.name,
-                                 errors.ECODE_INVAL)
+    self.old_flags = old_flags = (node.master_candidate,
+                                  node.drained, node.offline)
+    assert old_flags in self._F2R, "Un-handled old flags  %s" % str(old_flags)
+    self.old_role = old_role = self._F2R[old_flags]
 
-    # If we're being deofflined/drained, we'll MC ourself if needed
-    if (self.deoffline_or_drain and not self.offline_or_drain and not
-        self.op.master_candidate == True and not node.master_candidate):
-      self.op.master_candidate = _DecideSelfPromotion(self)
-      if self.op.master_candidate:
-        self.LogInfo("Autopromoting node to master candidate")
+    # Check for ineffective changes
+    for attr in self._FLAGS:
+      if (getattr(self.op, attr) == False and getattr(node, attr) == False):
+        self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
+        setattr(self.op, attr, None)
 
-    return
+    # Past this point, any flag change to False means a transition
+    # away from the respective state, as only real changes are kept
+
+    # If we're being deofflined/drained, we'll MC ourself if needed
+    if (self.op.drained == False or self.op.offline == False or
+        (self.op.master_capable and not node.master_capable)):
+      if _DecideSelfPromotion(self):
+        self.op.master_candidate = True
+        self.LogInfo("Auto-promoting node to master candidate")
+
+    # If we're no longer master capable, we'll demote ourselves from MC
+    if self.op.master_capable == False and node.master_candidate:
+      self.LogInfo("Demoting from master candidate")
+      self.op.master_candidate = False
+
+    # Compute new role
+    assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
+    if self.op.master_candidate:
+      new_role = self._ROLE_CANDIDATE
+    elif self.op.drained:
+      new_role = self._ROLE_DRAINED
+    elif self.op.offline:
+      new_role = self._ROLE_OFFLINE
+    elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
+      # False is still in new flags, which means we're un-setting (the
+      # only) True flag
+      new_role = self._ROLE_REGULAR
+    else: # no new flags, nothing, keep old role
+      new_role = old_role
+
+    self.new_role = new_role
+
+    if old_role == self._ROLE_OFFLINE and new_role != old_role:
+      # Trying to transition out of offline status
+      result = self.rpc.call_version([node.name])[node.name]
+      if result.fail_msg:
+        raise errors.OpPrereqError("Node %s is being de-offlined but fails"
+                                   " to report its version: %s" %
+                                   (node.name, result.fail_msg),
+                                   errors.ECODE_STATE)
+      else:
+        self.LogWarning("Transitioning node from offline to online state"
+                        " without using re-add. Please make sure the node"
+                        " is healthy!")
+
+    if self.op.secondary_ip:
+      # Ok even without locking, because this can't be changed by any LU
+      master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
+      master_singlehomed = master.secondary_ip == master.primary_ip
+      if master_singlehomed and self.op.secondary_ip:
+        raise errors.OpPrereqError("Cannot change the secondary ip on a single"
+                                   " homed cluster", errors.ECODE_INVAL)
+
+      if node.offline:
+        if self.affected_instances:
+          raise errors.OpPrereqError("Cannot change secondary ip: offline"
+                                     " node has instances (%s) configured"
+                                     " to use it" % self.affected_instances)
+      else:
+        # On online nodes, check that no instances are running, and that
+        # the node has the new ip and we can reach it.
+        for instance in self.affected_instances:
+          _CheckInstanceDown(self, instance, "cannot change secondary ip")
+
+        _CheckNodeHasSecondaryIP(self, node.name, self.op.secondary_ip, True)
+        if master.name != node.name:
+          # check reachability from master secondary ip to new secondary ip
+          if not netutils.TcpPing(self.op.secondary_ip,
+                                  constants.DEFAULT_NODED_PORT,
+                                  source=master.secondary_ip):
+            raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
+                                       " based ping to node daemon port",
+                                       errors.ECODE_ENVIRON)
+
+    if self.op.ndparams:
+      new_ndparams = _GetUpdatedParams(self.node.ndparams, self.op.ndparams)
+      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
+      self.new_ndparams = new_ndparams
 
   def Exec(self, feedback_fn):
     """Modifies a node.
 
     """
     node = self.node
+    old_role = self.old_role
+    new_role = self.new_role
 
     result = []
-    changed_mc = False
-
-    if self.op.offline is not None:
-      node.offline = self.op.offline
-      result.append(("offline", str(self.op.offline)))
-      if self.op.offline == True:
-        if node.master_candidate:
-          node.master_candidate = False
-          changed_mc = True
-          result.append(("master_candidate", "auto-demotion due to offline"))
-        if node.drained:
-          node.drained = False
-          result.append(("drained", "clear drained status due to offline"))
-
-    if self.op.master_candidate is not None:
-      node.master_candidate = self.op.master_candidate
-      changed_mc = True
-      result.append(("master_candidate", str(self.op.master_candidate)))
-      if self.op.master_candidate == False:
-        rrc = self.rpc.call_node_demote_from_mc(node.name)
-        msg = rrc.fail_msg
+
+    if self.op.ndparams:
+      node.ndparams = self.new_ndparams
+
+    for attr in ["master_capable", "vm_capable"]:
+      val = getattr(self.op, attr)
+      if val is not None:
+        setattr(node, attr, val)
+        result.append((attr, str(val)))
+
+    if new_role != old_role:
+      # Tell the node to demote itself, if no longer MC and not offline
+      if old_role == self._ROLE_CANDIDATE and new_role != self._ROLE_OFFLINE:
+        msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
         if msg:
-          self.LogWarning("Node failed to demote itself: %s" % msg)
-
-    if self.op.drained is not None:
-      node.drained = self.op.drained
-      result.append(("drained", str(self.op.drained)))
-      if self.op.drained == True:
-        if node.master_candidate:
-          node.master_candidate = False
-          changed_mc = True
-          result.append(("master_candidate", "auto-demotion due to drain"))
-          rrc = self.rpc.call_node_demote_from_mc(node.name)
-          msg = rrc.fail_msg
-          if msg:
-            self.LogWarning("Node failed to demote itself: %s" % msg)
-        if node.offline:
-          node.offline = False
-          result.append(("offline", "clear offline status due to drain"))
+          self.LogWarning("Node failed to demote itself: %s", msg)
 
-    # we locked all nodes, we adjust the CP before updating this node
-    if self.lock_all:
-      _AdjustCandidatePool(self, [node.name])
+      new_flags = self._R2F[new_role]
+      for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
+        if of != nf:
+          result.append((desc, str(nf)))
+      (node.master_candidate, node.drained, node.offline) = new_flags
+
+      # we locked all nodes, we adjust the CP before updating this node
+      if self.lock_all:
+        _AdjustCandidatePool(self, [node.name])
+
+    if self.op.secondary_ip:
+      node.secondary_ip = self.op.secondary_ip
+      result.append(("secondary_ip", self.op.secondary_ip))
 
     # this will trigger configuration file update, if needed
     self.cfg.Update(node, feedback_fn)
 
-    # this will trigger job queue propagation or cleanup
-    if changed_mc:
+    # this will trigger job queue propagation or cleanup if the mc
+    # flag changed
+    if [old_role, new_role].count(self._ROLE_CANDIDATE) == 1:
       self.context.ReaddNode(node)
 
     return result
@@ -4156,6 +4386,7 @@ class LUQueryClusterInfo(NoHooksLU):
       "default_iallocator": cluster.default_iallocator,
       "reserved_lvs": cluster.reserved_lvs,
       "primary_ip_version": primary_ip_version,
+      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
       }
 
     return result
@@ -4169,7 +4400,7 @@ class LUQueryConfigValues(NoHooksLU):
   REQ_BGL = False
   _FIELDS_DYNAMIC = utils.FieldSet()
   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
-                                  "watcher_pause")
+                                  "watcher_pause", "volume_group_name")
 
   def CheckArguments(self):
     _CheckOutputFields(static=self._FIELDS_STATIC,
@@ -4193,6 +4424,8 @@ class LUQueryConfigValues(NoHooksLU):
         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
       elif field == "watcher_pause":
         entry = utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
+      elif field == "volume_group_name":
+        entry = self.cfg.GetVGName()
       else:
         raise errors.ParameterError(field)
       values.append(entry)
@@ -4205,7 +4438,7 @@ class LUActivateInstanceDisks(NoHooksLU):
   """
   _OP_PARAMS = [
     _PInstanceName,
-    ("ignore_size", False, _TBool),
+    ("ignore_size", False, ht.TBool),
     ]
   REQ_BGL = False
 
@@ -4472,8 +4705,8 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
                                errors.ECODE_NORES)
 
 
-def _CheckNodesFreeDisk(lu, nodenames, requested):
-  """Checks if nodes have enough free disk space in the default VG.
+def _CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
+  """Checks if nodes have enough free disk space in the all VGs.
 
   This function check if all given nodes have the needed amount of
   free disk. In case any node has less disk or we cannot get the
@@ -4484,13 +4717,39 @@ def _CheckNodesFreeDisk(lu, nodenames, requested):
   @param lu: a logical unit from which we get configuration data
   @type nodenames: C{list}
   @param nodenames: the list of node names to check
+  @type req_sizes: C{dict}
+  @param req_sizes: the hash of vg and corresponding amount of disk in
+      MiB to check for
+  @raise errors.OpPrereqError: if the node doesn't have enough disk,
+      or we cannot check the node
+
+  """
+  if req_sizes is not None:
+    for vg, req_size in req_sizes.iteritems():
+      _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
+
+
+def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
+  """Checks if nodes have enough free disk space in the specified VG.
+
+  This function check if all given nodes have the needed amount of
+  free disk. In case any node has less disk or we cannot get the
+  information from the node, this function raise an OpPrereqError
+  exception.
+
+  @type lu: C{LogicalUnit}
+  @param lu: a logical unit from which we get configuration data
+  @type nodenames: C{list}
+  @param nodenames: the list of node names to check
+  @type vg: C{str}
+  @param vg: the volume group to check
   @type requested: C{int}
   @param requested: the amount of disk in MiB to check for
-  @raise errors.OpPrereqError: if the node doesn't have enough disk, or
-      we cannot check the node
+  @raise errors.OpPrereqError: if the node doesn't have enough disk,
+      or we cannot check the node
 
   """
-  nodeinfo = lu.rpc.call_node_info(nodenames, lu.cfg.GetVGName(),
+  nodeinfo = lu.rpc.call_node_info(nodenames, vg,
                                    lu.cfg.GetHypervisorType())
   for node in nodenames:
     info = nodeinfo[node]
@@ -4498,13 +4757,13 @@ def _CheckNodesFreeDisk(lu, nodenames, requested):
                prereq=True, ecode=errors.ECODE_ENVIRON)
     vg_free = info.payload.get("vg_free", None)
     if not isinstance(vg_free, int):
-      raise errors.OpPrereqError("Can't compute free disk space on node %s,"
-                                 " result was '%s'" % (node, vg_free),
-                                 errors.ECODE_ENVIRON)
+      raise errors.OpPrereqError("Can't compute free disk space on node"
+                                 " %s for vg %s, result was '%s'" %
+                                 (node, vg, vg_free), errors.ECODE_ENVIRON)
     if requested > vg_free:
-      raise errors.OpPrereqError("Not enough disk space on target node %s:"
-                                 " required %d MiB, available %d MiB" %
-                                 (node, requested, vg_free),
+      raise errors.OpPrereqError("Not enough disk space on target node %s"
+                                 " vg %s: required %d MiB, available %d MiB" %
+                                 (node, vg, requested, vg_free),
                                  errors.ECODE_NORES)
 
 
@@ -4517,8 +4776,9 @@ class LUStartupInstance(LogicalUnit):
   _OP_PARAMS = [
     _PInstanceName,
     _PForce,
-    ("hvparams", _EmptyDict, _TDict),
-    ("beparams", _EmptyDict, _TDict),
+    _PIgnoreOfflineNodes,
+    ("hvparams", ht.EmptyDict, ht.TDict),
+    ("beparams", ht.EmptyDict, ht.TDict),
     ]
   REQ_BGL = False
 
@@ -4565,21 +4825,30 @@ class LUStartupInstance(LogicalUnit):
       hv_type.CheckParameterSyntax(filled_hvp)
       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
 
-    _CheckNodeOnline(self, instance.primary_node)
+    self.primary_offline = self.cfg.GetNodeInfo(instance.primary_node).offline
 
-    bep = self.cfg.GetClusterInfo().FillBE(instance)
-    # check bridges existence
-    _CheckInstanceBridgesExist(self, instance)
+    if self.primary_offline and self.op.ignore_offline_nodes:
+      self.proc.LogWarning("Ignoring offline primary node")
+
+      if self.op.hvparams or self.op.beparams:
+        self.proc.LogWarning("Overridden parameters are ignored")
+    else:
+      _CheckNodeOnline(self, instance.primary_node)
+
+      bep = self.cfg.GetClusterInfo().FillBE(instance)
+
+      # check bridges existence
+      _CheckInstanceBridgesExist(self, instance)
 
-    remote_info = self.rpc.call_instance_info(instance.primary_node,
-                                              instance.name,
-                                              instance.hypervisor)
-    remote_info.Raise("Error checking node %s" % instance.primary_node,
-                      prereq=True, ecode=errors.ECODE_ENVIRON)
-    if not remote_info.payload: # not running already
-      _CheckNodeFreeMemory(self, instance.primary_node,
-                           "starting instance %s" % instance.name,
-                           bep[constants.BE_MEMORY], instance.hypervisor)
+      remote_info = self.rpc.call_instance_info(instance.primary_node,
+                                                instance.name,
+                                                instance.hypervisor)
+      remote_info.Raise("Error checking node %s" % instance.primary_node,
+                        prereq=True, ecode=errors.ECODE_ENVIRON)
+      if not remote_info.payload: # not running already
+        _CheckNodeFreeMemory(self, instance.primary_node,
+                             "starting instance %s" % instance.name,
+                             bep[constants.BE_MEMORY], instance.hypervisor)
 
   def Exec(self, feedback_fn):
     """Start the instance.
@@ -4590,16 +4859,20 @@ class LUStartupInstance(LogicalUnit):
 
     self.cfg.MarkInstanceUp(instance.name)
 
-    node_current = instance.primary_node
+    if self.primary_offline:
+      assert self.op.ignore_offline_nodes
+      self.proc.LogInfo("Primary node offline, marked instance as started")
+    else:
+      node_current = instance.primary_node
 
-    _StartInstanceDisks(self, instance, force)
+      _StartInstanceDisks(self, instance, force)
 
-    result = self.rpc.call_instance_start(node_current, instance,
-                                          self.op.hvparams, self.op.beparams)
-    msg = result.fail_msg
-    if msg:
-      _ShutdownInstanceDisks(self, instance)
-      raise errors.OpExecError("Could not start instance: %s" % msg)
+      result = self.rpc.call_instance_start(node_current, instance,
+                                            self.op.hvparams, self.op.beparams)
+      msg = result.fail_msg
+      if msg:
+        _ShutdownInstanceDisks(self, instance)
+        raise errors.OpExecError("Could not start instance: %s" % msg)
 
 
 class LURebootInstance(LogicalUnit):
@@ -4610,8 +4883,8 @@ class LURebootInstance(LogicalUnit):
   HTYPE = constants.HTYPE_INSTANCE
   _OP_PARAMS = [
     _PInstanceName,
-    ("ignore_secondaries", False, _TBool),
-    ("reboot_type", _NoDefault, _TElemOf(constants.REBOOT_TYPES)),
+    ("ignore_secondaries", False, ht.TBool),
+    ("reboot_type", ht.NoDefault, ht.TElemOf(constants.REBOOT_TYPES)),
     _PShutdownTimeout,
     ]
   REQ_BGL = False
@@ -4691,7 +4964,8 @@ class LUShutdownInstance(LogicalUnit):
   HTYPE = constants.HTYPE_INSTANCE
   _OP_PARAMS = [
     _PInstanceName,
-    ("timeout", constants.DEFAULT_SHUTDOWN_TIMEOUT, _TPositiveInt),
+    _PIgnoreOfflineNodes,
+    ("timeout", constants.DEFAULT_SHUTDOWN_TIMEOUT, ht.TPositiveInt),
     ]
   REQ_BGL = False
 
@@ -4718,7 +4992,14 @@ class LUShutdownInstance(LogicalUnit):
     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
-    _CheckNodeOnline(self, self.instance.primary_node)
+
+    self.primary_offline = \
+      self.cfg.GetNodeInfo(self.instance.primary_node).offline
+
+    if self.primary_offline and self.op.ignore_offline_nodes:
+      self.proc.LogWarning("Ignoring offline primary node")
+    else:
+      _CheckNodeOnline(self, self.instance.primary_node)
 
   def Exec(self, feedback_fn):
     """Shutdown the instance.
@@ -4727,13 +5008,19 @@ class LUShutdownInstance(LogicalUnit):
     instance = self.instance
     node_current = instance.primary_node
     timeout = self.op.timeout
+
     self.cfg.MarkInstanceDown(instance.name)
-    result = self.rpc.call_instance_shutdown(node_current, instance, timeout)
-    msg = result.fail_msg
-    if msg:
-      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
 
-    _ShutdownInstanceDisks(self, instance)
+    if self.primary_offline:
+      assert self.op.ignore_offline_nodes
+      self.proc.LogInfo("Primary node offline, marked instance as stopped")
+    else:
+      result = self.rpc.call_instance_shutdown(node_current, instance, timeout)
+      msg = result.fail_msg
+      if msg:
+        self.proc.LogWarning("Could not shutdown instance: %s" % msg)
+
+      _ShutdownInstanceDisks(self, instance)
 
 
 class LUReinstallInstance(LogicalUnit):
@@ -4744,8 +5031,9 @@ class LUReinstallInstance(LogicalUnit):
   HTYPE = constants.HTYPE_INSTANCE
   _OP_PARAMS = [
     _PInstanceName,
-    ("os_type", None, _TMaybeString),
-    ("force_variant", False, _TBool),
+    ("os_type", None, ht.TMaybeString),
+    ("force_variant", False, ht.TBool),
+    ("osparams", None, ht.TOr(ht.TDict, ht.TNone)),
     ]
   REQ_BGL = False
 
@@ -4771,7 +5059,11 @@ class LUReinstallInstance(LogicalUnit):
     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
-    _CheckNodeOnline(self, instance.primary_node)
+    _CheckNodeOnline(self, instance.primary_node, "Instance primary node"
+                     " offline, cannot reinstall")
+    for node in instance.secondary_nodes:
+      _CheckNodeOnline(self, node, "Instance secondary node offline,"
+                       " cannot reinstall")
 
     if instance.disk_template == constants.DT_DISKLESS:
       raise errors.OpPrereqError("Instance '%s' has no disks" %
@@ -4783,6 +5075,18 @@ class LUReinstallInstance(LogicalUnit):
       # OS verification
       pnode = _ExpandNodeName(self.cfg, instance.primary_node)
       _CheckNodeHasOS(self, pnode, self.op.os_type, self.op.force_variant)
+      instance_os = self.op.os_type
+    else:
+      instance_os = instance.os
+
+    nodelist = list(instance.all_nodes)
+
+    if self.op.osparams:
+      i_osdict = _GetUpdatedParams(instance.osparams, self.op.osparams)
+      _CheckOSParams(self, True, nodelist, instance_os, i_osdict)
+      self.os_inst = i_osdict # the new dict (without defaults)
+    else:
+      self.os_inst = None
 
     self.instance = instance
 
@@ -4795,6 +5099,7 @@ class LUReinstallInstance(LogicalUnit):
     if self.op.os_type is not None:
       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
       inst.os = self.op.os_type
+      # Write to configuration
       self.cfg.Update(inst, feedback_fn)
 
     _StartInstanceDisks(self, inst, None)
@@ -4802,7 +5107,8 @@ class LUReinstallInstance(LogicalUnit):
       feedback_fn("Running the instance OS create scripts...")
       # FIXME: pass debug option from opcode to backend
       result = self.rpc.call_instance_os_add(inst.primary_node, inst, True,
-                                             self.op.debug_level)
+                                             self.op.debug_level,
+                                             osparams=self.os_inst)
       result.Raise("Could not install OS for instance %s on node %s" %
                    (inst.name, inst.primary_node))
     finally:
@@ -4817,7 +5123,7 @@ class LURecreateInstanceDisks(LogicalUnit):
   HTYPE = constants.HTYPE_INSTANCE
   _OP_PARAMS = [
     _PInstanceName,
-    ("disks", _EmptyList, _TListOf(_TPositiveInt)),
+    ("disks", ht.EmptyList, ht.TListOf(ht.TPositiveInt)),
     ]
   REQ_BGL = False
 
@@ -4881,9 +5187,9 @@ class LURenameInstance(LogicalUnit):
   HTYPE = constants.HTYPE_INSTANCE
   _OP_PARAMS = [
     _PInstanceName,
-    ("new_name", _NoDefault, _TNonEmptyString),
-    ("ip_check", False, _TBool),
-    ("name_check", True, _TBool),
+    ("new_name", ht.NoDefault, ht.TNonEmptyString),
+    ("ip_check", False, ht.TBool),
+    ("name_check", True, ht.TBool),
     ]
 
   def CheckArguments(self):
@@ -4923,7 +5229,7 @@ class LURenameInstance(LogicalUnit):
     new_name = self.op.new_name
     if self.op.name_check:
       hostname = netutils.GetHostname(name=new_name)
-      new_name = hostname.name
+      new_name = self.op.new_name = hostname.name
       if (self.op.ip_check and
           netutils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT)):
         raise errors.OpPrereqError("IP %s of instance %s already in use" %
@@ -4987,7 +5293,7 @@ class LURemoveInstance(LogicalUnit):
   HTYPE = constants.HTYPE_INSTANCE
   _OP_PARAMS = [
     _PInstanceName,
-    ("ignore_failures", False, _TBool),
+    ("ignore_failures", False, ht.TBool),
     _PShutdownTimeout,
     ]
   REQ_BGL = False
@@ -5073,9 +5379,9 @@ class LUQueryInstances(NoHooksLU):
   """
   # pylint: disable-msg=W0142
   _OP_PARAMS = [
-    ("output_fields", _NoDefault, _TListOf(_TNonEmptyString)),
-    ("names", _EmptyList, _TListOf(_TNonEmptyString)),
-    ("use_locking", False, _TBool),
+    _POutputFields,
+    ("names", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
+    ("use_locking", False, ht.TBool),
     ]
   REQ_BGL = False
   _SIMPLE_FIELDS = ["name", "os", "network_port", "hypervisor",
@@ -5092,7 +5398,8 @@ class LUQueryInstances(NoHooksLU):
                                     r"(nic)\.(bridge)/([0-9]+)",
                                     r"(nic)\.(macs|ips|modes|links|bridges)",
                                     r"(disk|nic)\.(count)",
-                                    "hvparams",
+                                    "hvparams", "custom_hvparams",
+                                    "custom_beparams", "custom_nicparams",
                                     ] + _SIMPLE_FIELDS +
                                   ["hv/%s" % name
                                    for name in constants.HVS_PARAMETERS
@@ -5270,6 +5577,8 @@ class LUQueryInstances(NoHooksLU):
             val = instance.nics[0].mac
           else:
             val = None
+        elif field == "custom_nicparams":
+          val = [nic.nicparams for nic in instance.nics]
         elif field == "sda_size" or field == "sdb_size":
           idx = ord(field[2]) - ord('a')
           try:
@@ -5281,12 +5590,16 @@ class LUQueryInstances(NoHooksLU):
           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
         elif field == "tags":
           val = list(instance.GetTags())
+        elif field == "custom_hvparams":
+          val = instance.hvparams # not filled!
         elif field == "hvparams":
           val = i_hv
         elif (field.startswith(HVPREFIX) and
               field[len(HVPREFIX):] in constants.HVS_PARAMETERS and
               field[len(HVPREFIX):] not in constants.HVC_GLOBALS):
           val = i_hv.get(field[len(HVPREFIX):], None)
+        elif field == "custom_beparams":
+          val = instance.beparams
         elif field == "beparams":
           val = i_be
         elif (field.startswith(BEPREFIX) and
@@ -5366,7 +5679,7 @@ class LUFailoverInstance(LogicalUnit):
   HTYPE = constants.HTYPE_INSTANCE
   _OP_PARAMS = [
     _PInstanceName,
-    ("ignore_consistency", False, _TBool),
+    ("ignore_consistency", False, ht.TBool),
     _PShutdownTimeout,
     ]
   REQ_BGL = False
@@ -5447,6 +5760,7 @@ class LUFailoverInstance(LogicalUnit):
 
     """
     instance = self.instance
+    primary_node = self.cfg.GetNodeInfo(instance.primary_node)
 
     source_node = instance.primary_node
     target_node = instance.secondary_nodes[0]
@@ -5470,7 +5784,7 @@ class LUFailoverInstance(LogicalUnit):
                                              self.op.shutdown_timeout)
     msg = result.fail_msg
     if msg:
-      if self.op.ignore_consistency:
+      if self.op.ignore_consistency or primary_node.offline:
         self.proc.LogWarning("Could not shutdown instance %s on node %s."
                              " Proceeding anyway. Please make sure node"
                              " %s is down. Error details: %s",
@@ -5522,7 +5836,7 @@ class LUMigrateInstance(LogicalUnit):
     _PInstanceName,
     _PMigrationMode,
     _PMigrationLive,
-    ("cleanup", False, _TBool),
+    ("cleanup", False, ht.TBool),
     ]
 
   REQ_BGL = False
@@ -5573,7 +5887,7 @@ class LUMoveInstance(LogicalUnit):
   HTYPE = constants.HTYPE_INSTANCE
   _OP_PARAMS = [
     _PInstanceName,
-    ("target_node", _NoDefault, _TNonEmptyString),
+    ("target_node", ht.NoDefault, ht.TNonEmptyString),
     _PShutdownTimeout,
     ]
   REQ_BGL = False
@@ -5634,6 +5948,7 @@ class LUMoveInstance(LogicalUnit):
 
     _CheckNodeOnline(self, target_node)
     _CheckNodeNotDrained(self, target_node)
+    _CheckNodeVmCapable(self, target_node)
 
     if instance.admin_up:
       # check memory requirements on the secondary node
@@ -6242,13 +6557,12 @@ def _GenerateUniqueNames(lu, exts):
   return results
 
 
-def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
+def _GenerateDRBD8Branch(lu, primary, secondary, size, vgname, names, iv_name,
                          p_minor, s_minor):
   """Generate a drbd8 device complete with its children.
 
   """
   port = lu.cfg.AllocatePort()
-  vgname = lu.cfg.GetVGName()
   shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
                           logical_id=(vgname, names[0]))
@@ -6267,7 +6581,7 @@ def _GenerateDiskTemplate(lu, template_name,
                           instance_name, primary_node,
                           secondary_nodes, disk_info,
                           file_storage_dir, file_driver,
-                          base_index):
+                          base_index, feedback_fn):
   """Generate the entire disk layout for a given template type.
 
   """
@@ -6286,8 +6600,10 @@ def _GenerateDiskTemplate(lu, template_name,
                                       for i in range(disk_count)])
     for idx, disk in enumerate(disk_info):
       disk_index = idx + base_index
+      vg = disk.get("vg", vgname)
+      feedback_fn("* disk %i, vg %s, name %s" % (idx, vg, names[idx]))
       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
-                              logical_id=(vgname, names[idx]),
+                              logical_id=(vg, names[idx]),
                               iv_name="disk/%d" % disk_index,
                               mode=disk["mode"])
       disks.append(disk_dev)
@@ -6305,8 +6621,9 @@ def _GenerateDiskTemplate(lu, template_name,
       names.append(lv_prefix + "_meta")
     for idx, disk in enumerate(disk_info):
       disk_index = idx + base_index
+      vg = disk.get("vg", vgname)
       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
-                                      disk["size"], names[idx*2:idx*2+2],
+                                      disk["size"], vg, names[idx*2:idx*2+2],
                                       "disk/%d" % disk_index,
                                       minors[idx*2], minors[idx*2+1])
       disk_dev.mode = disk["mode"]
@@ -6338,6 +6655,58 @@ def _GetInstanceInfoText(instance):
   return "originstname+%s" % instance.name
 
 
+def _CalcEta(time_taken, written, total_size):
+  """Calculates the ETA based on size written and total size.
+
+  @param time_taken: The time taken so far
+  @param written: amount written so far
+  @param total_size: The total size of data to be written
+  @return: The remaining time in seconds
+
+  """
+  avg_time = time_taken / float(written)
+  return (total_size - written) * avg_time
+
+
+def _WipeDisks(lu, instance):
+  """Wipes instance disks.
+
+  @type lu: L{LogicalUnit}
+  @param lu: the logical unit on whose behalf we execute
+  @type instance: L{objects.Instance}
+  @param instance: the instance whose disks we should create
+  @return: the success of the wipe
+
+  """
+  node = instance.primary_node
+  for idx, device in enumerate(instance.disks):
+    lu.LogInfo("* Wiping disk %d", idx)
+    logging.info("Wiping disk %d for instance %s", idx, instance.name)
+
+    # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
+    # MAX_WIPE_CHUNK at max
+    wipe_chunk_size = min(constants.MAX_WIPE_CHUNK, device.size / 100.0 *
+                          constants.MIN_WIPE_CHUNK_PERCENT)
+
+    offset = 0
+    size = device.size
+    last_output = 0
+    start_time = time.time()
+
+    while offset < size:
+      wipe_size = min(wipe_chunk_size, size - offset)
+      result = lu.rpc.call_blockdev_wipe(node, device, offset, wipe_size)
+      result.Raise("Could not wipe disk %d at offset %d for size %d" %
+                   (idx, offset, wipe_size))
+      now = time.time()
+      offset += wipe_size
+      if now - last_output >= 60:
+        eta = _CalcEta(now - start_time, offset, size)
+        lu.LogInfo(" - done: %.1f%% ETA: %s" %
+                   (offset / float(size) * 100, utils.FormatSeconds(eta)))
+        last_output = now
+
+
 def _CreateDisks(lu, instance, to_skip=None, target_node=None):
   """Create all disks for an instance.
 
@@ -6432,6 +6801,35 @@ def _RemoveDisks(lu, instance, target_node=None):
   return all_result
 
 
+def _ComputeDiskSizePerVG(disk_template, disks):
+  """Compute disk size requirements in the volume group
+
+  """
+  def _compute(disks, payload):
+    """Universal algorithm
+
+    """
+    vgs = {}
+    for disk in disks:
+      vgs[disk["vg"]] = vgs.get("vg", 0) + disk["size"] + payload
+
+    return vgs
+
+  # Required free disk space as a function of disk and swap space
+  req_size_dict = {
+    constants.DT_DISKLESS: None,
+    constants.DT_PLAIN: _compute(disks, 0),
+    # 128 MB are added for drbd metadata for each disk
+    constants.DT_DRBD8: _compute(disks, 128),
+    constants.DT_FILE: None,
+  }
+
+  if disk_template not in req_size_dict:
+    raise errors.ProgrammerError("Disk template '%s' size requirement"
+                                 " is unknown" %  disk_template)
+
+  return req_size_dict[disk_template]
+
 def _ComputeDiskSize(disk_template, disks):
   """Compute disk size requirements in the volume group
 
@@ -6516,33 +6914,32 @@ class LUCreateInstance(LogicalUnit):
   HTYPE = constants.HTYPE_INSTANCE
   _OP_PARAMS = [
     _PInstanceName,
-    ("mode", _NoDefault, _TElemOf(constants.INSTANCE_CREATE_MODES)),
-    ("start", True, _TBool),
-    ("wait_for_sync", True, _TBool),
-    ("ip_check", True, _TBool),
-    ("name_check", True, _TBool),
-    ("disks", _NoDefault, _TListOf(_TDict)),
-    ("nics", _NoDefault, _TListOf(_TDict)),
-    ("hvparams", _EmptyDict, _TDict),
-    ("beparams", _EmptyDict, _TDict),
-    ("osparams", _EmptyDict, _TDict),
-    ("no_install", None, _TMaybeBool),
-    ("os_type", None, _TMaybeString),
-    ("force_variant", False, _TBool),
-    ("source_handshake", None, _TOr(_TList, _TNone)),
-    ("source_x509_ca", None, _TMaybeString),
-    ("source_instance_name", None, _TMaybeString),
-    ("src_node", None, _TMaybeString),
-    ("src_path", None, _TMaybeString),
-    ("pnode", None, _TMaybeString),
-    ("snode", None, _TMaybeString),
-    ("iallocator", None, _TMaybeString),
-    ("hypervisor", None, _TMaybeString),
-    ("disk_template", _NoDefault, _CheckDiskTemplate),
-    ("identify_defaults", False, _TBool),
-    ("file_driver", None, _TOr(_TNone, _TElemOf(constants.FILE_DRIVER))),
-    ("file_storage_dir", None, _TMaybeString),
-    ("dry_run", False, _TBool),
+    ("mode", ht.NoDefault, ht.TElemOf(constants.INSTANCE_CREATE_MODES)),
+    ("start", True, ht.TBool),
+    ("wait_for_sync", True, ht.TBool),
+    ("ip_check", True, ht.TBool),
+    ("name_check", True, ht.TBool),
+    ("disks", ht.NoDefault, ht.TListOf(ht.TDict)),
+    ("nics", ht.NoDefault, ht.TListOf(ht.TDict)),
+    ("hvparams", ht.EmptyDict, ht.TDict),
+    ("beparams", ht.EmptyDict, ht.TDict),
+    ("osparams", ht.EmptyDict, ht.TDict),
+    ("no_install", None, ht.TMaybeBool),
+    ("os_type", None, ht.TMaybeString),
+    ("force_variant", False, ht.TBool),
+    ("source_handshake", None, ht.TOr(ht.TList, ht.TNone)),
+    ("source_x509_ca", None, ht.TMaybeString),
+    ("source_instance_name", None, ht.TMaybeString),
+    ("src_node", None, ht.TMaybeString),
+    ("src_path", None, ht.TMaybeString),
+    ("pnode", None, ht.TMaybeString),
+    ("snode", None, ht.TMaybeString),
+    ("iallocator", None, ht.TMaybeString),
+    ("hypervisor", None, ht.TMaybeString),
+    ("disk_template", ht.NoDefault, _CheckDiskTemplate),
+    ("identify_defaults", False, ht.TBool),
+    ("file_driver", None, ht.TOr(ht.TNone, ht.TElemOf(constants.FILE_DRIVER))),
+    ("file_storage_dir", None, ht.TMaybeString),
     ]
   REQ_BGL = False
 
@@ -6600,9 +6997,6 @@ class LUCreateInstance(LogicalUnit):
       self.op.instance_name = self.hostname1.name
       # used in CheckPrereq for ip ping check
       self.check_ip = self.hostname1.ip
-    elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
-      raise errors.OpPrereqError("Remote imports require names to be checked" %
-                                 errors.ECODE_INVAL)
     else:
       self.check_ip = None
 
@@ -6619,6 +7013,16 @@ class LUCreateInstance(LogicalUnit):
     ### Node/iallocator related checks
     _CheckIAllocatorOrNode(self, "iallocator", "pnode")
 
+    if self.op.pnode is not None:
+      if self.op.disk_template in constants.DTS_NET_MIRROR:
+        if self.op.snode is None:
+          raise errors.OpPrereqError("The networked disk templates need"
+                                     " a mirror node", errors.ECODE_INVAL)
+      elif self.op.snode:
+        self.LogWarning("Secondary node will be ignored on non-mirrored disk"
+                        " template")
+        self.op.snode = None
+
     self._cds = _GetClusterDomainSecret()
 
     if self.op.mode == constants.INSTANCE_IMPORT:
@@ -6634,6 +7038,10 @@ class LUCreateInstance(LogicalUnit):
       if self.op.os_type is None:
         raise errors.OpPrereqError("No guest OS specified",
                                    errors.ECODE_INVAL)
+      if self.op.os_type in self.cfg.GetClusterInfo().blacklisted_os:
+        raise errors.OpPrereqError("Guest OS '%s' is not allowed for"
+                                   " installation" % self.op.os_type,
+                                   errors.ECODE_STATE)
       if self.op.disk_template is None:
         raise errors.OpPrereqError("No disk template specified",
                                    errors.ECODE_INVAL)
@@ -7017,13 +7425,12 @@ class LUCreateInstance(LogicalUnit):
       elif ip.lower() == constants.VALUE_AUTO:
         if not self.op.name_check:
           raise errors.OpPrereqError("IP address set to auto but name checks"
-                                     " have been skipped. Aborting.",
+                                     " have been skipped",
                                      errors.ECODE_INVAL)
         nic_ip = self.hostname1.ip
       else:
-        if not netutils.IP4Address.IsValid(ip):
-          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
-                                     " like a valid IP" % ip,
+        if not netutils.IPAddress.IsValid(ip):
+          raise errors.OpPrereqError("Invalid IP address '%s'" % ip,
                                      errors.ECODE_INVAL)
         nic_ip = ip
 
@@ -7081,7 +7488,8 @@ class LUCreateInstance(LogicalUnit):
       except (TypeError, ValueError):
         raise errors.OpPrereqError("Invalid disk size '%s'" % size,
                                    errors.ECODE_INVAL)
-      new_disk = {"size": size, "mode": mode}
+      vg = disk.get("vg", self.cfg.GetVGName())
+      new_disk = {"size": size, "mode": mode, "vg": vg}
       if "adopt" in disk:
         new_disk["adopt"] = disk["adopt"]
       self.disks.append(new_disk)
@@ -7161,37 +7569,38 @@ class LUCreateInstance(LogicalUnit):
     if pnode.drained:
       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
                                  pnode.name, errors.ECODE_STATE)
+    if not pnode.vm_capable:
+      raise errors.OpPrereqError("Cannot use non-vm_capable primary node"
+                                 " '%s'" % pnode.name, errors.ECODE_STATE)
 
     self.secondaries = []
 
     # mirror node verification
     if self.op.disk_template in constants.DTS_NET_MIRROR:
-      if self.op.snode is None:
-        raise errors.OpPrereqError("The networked disk templates need"
-                                   " a mirror node", errors.ECODE_INVAL)
       if self.op.snode == pnode.name:
         raise errors.OpPrereqError("The secondary node cannot be the"
                                    " primary node.", errors.ECODE_INVAL)
       _CheckNodeOnline(self, self.op.snode)
       _CheckNodeNotDrained(self, self.op.snode)
+      _CheckNodeVmCapable(self, self.op.snode)
       self.secondaries.append(self.op.snode)
 
     nodenames = [pnode.name] + self.secondaries
 
-    req_size = _ComputeDiskSize(self.op.disk_template,
-                                self.disks)
+    if not self.adopt_disks:
+      # Check lv size requirements, if not adopting
+      req_sizes = _ComputeDiskSizePerVG(self.op.disk_template, self.disks)
+      _CheckNodesFreeDiskPerVG(self, nodenames, req_sizes)
 
-    # Check lv size requirements, if not adopting
-    if req_size is not None and not self.adopt_disks:
-      _CheckNodesFreeDisk(self, nodenames, req_size)
-
-    if self.adopt_disks: # instead, we must check the adoption data
+    else: # instead, we must check the adoption data
       all_lvs = set([i["adopt"] for i in self.disks])
       if len(all_lvs) != len(self.disks):
         raise errors.OpPrereqError("Duplicate volume names given for adoption",
                                    errors.ECODE_INVAL)
       for lv_name in all_lvs:
         try:
+          # FIXME: VG must be provided here. Else all LVs with the
+          # same name will be locked on all VGs.
           self.cfg.ReserveLV(lv_name, self.proc.GetECId())
         except errors.ReservationError:
           raise errors.OpPrereqError("LV named %s used by another instance" %
@@ -7265,7 +7674,8 @@ class LUCreateInstance(LogicalUnit):
                                   self.disks,
                                   file_storage_dir,
                                   self.op.file_driver,
-                                  0)
+                                  0,
+                                  feedback_fn)
 
     iobj = objects.Instance(name=instance, os=self.op.os_type,
                             primary_node=pnode_name,
@@ -7303,6 +7713,18 @@ class LUCreateInstance(LogicalUnit):
           self.cfg.ReleaseDRBDMinors(instance)
           raise
 
+      if self.cfg.GetClusterInfo().prealloc_wipe_disks:
+        feedback_fn("* wiping instance disks...")
+        try:
+          _WipeDisks(self, iobj)
+        except errors.OpExecError:
+          self.LogWarning("Device wiping failed, reverting...")
+          try:
+            _RemoveDisks(self, iobj)
+          finally:
+            self.cfg.ReleaseDRBDMinors(instance)
+            raise
+
     feedback_fn("adding instance %s to cluster config" % instance)
 
     self.cfg.AddInstance(iobj, self.proc.GetECId())
@@ -7454,7 +7876,12 @@ class LUConnectConsole(NoHooksLU):
     node_insts.Raise("Can't get node information from %s" % node)
 
     if instance.name not in node_insts.payload:
-      raise errors.OpExecError("Instance %s is not running." % instance.name)
+      if instance.admin_up:
+        state = "ERROR_down"
+      else:
+        state = "ADMIN_down"
+      raise errors.OpExecError("Instance %s is not running (state %s)" %
+                               (instance.name, state))
 
     logging.debug("Connecting to console of %s on %s", instance.name, node)
 
@@ -7478,11 +7905,11 @@ class LUReplaceDisks(LogicalUnit):
   HTYPE = constants.HTYPE_INSTANCE
   _OP_PARAMS = [
     _PInstanceName,
-    ("mode", _NoDefault, _TElemOf(constants.REPLACE_MODES)),
-    ("disks", _EmptyList, _TListOf(_TPositiveInt)),
-    ("remote_node", None, _TMaybeString),
-    ("iallocator", None, _TMaybeString),
-    ("early_release", False, _TBool),
+    ("mode", ht.NoDefault, ht.TElemOf(constants.REPLACE_MODES)),
+    ("disks", ht.EmptyList, ht.TListOf(ht.TPositiveInt)),
+    ("remote_node", None, ht.TMaybeString),
+    ("iallocator", None, ht.TMaybeString),
+    ("early_release", False, ht.TBool),
     ]
   REQ_BGL = False
 
@@ -7738,6 +8165,7 @@ class TLReplaceDisks(Tasklet):
         check_nodes = [self.new_node, self.other_node]
 
         _CheckNodeNotDrained(self.lu, remote_node)
+        _CheckNodeVmCapable(self.lu, remote_node)
 
         old_node_info = self.cfg.GetNodeInfo(secondary_node)
         assert old_node_info is not None
@@ -8221,9 +8649,9 @@ class LURepairNodeStorage(NoHooksLU):
   """
   _OP_PARAMS = [
     _PNodeName,
-    ("storage_type", _NoDefault, _CheckStorageType),
-    ("name", _NoDefault, _TNonEmptyString),
-    ("ignore_consistency", False, _TBool),
+    ("storage_type", ht.NoDefault, _CheckStorageType),
+    ("name", ht.NoDefault, ht.TNonEmptyString),
+    ("ignore_consistency", False, ht.TBool),
     ]
   REQ_BGL = False
 
@@ -8288,9 +8716,9 @@ class LUNodeEvacuationStrategy(NoHooksLU):
 
   """
   _OP_PARAMS = [
-    ("nodes", _NoDefault, _TListOf(_TNonEmptyString)),
-    ("remote_node", None, _TMaybeString),
-    ("iallocator", None, _TMaybeString),
+    ("nodes", ht.NoDefault, ht.TListOf(ht.TNonEmptyString)),
+    ("remote_node", None, ht.TMaybeString),
+    ("iallocator", None, ht.TMaybeString),
     ]
   REQ_BGL = False
 
@@ -8340,9 +8768,9 @@ class LUGrowDisk(LogicalUnit):
   HTYPE = constants.HTYPE_INSTANCE
   _OP_PARAMS = [
     _PInstanceName,
-    ("disk", _NoDefault, _TInt),
-    ("amount", _NoDefault, _TInt),
-    ("wait_for_sync", True, _TBool),
+    ("disk", ht.NoDefault, ht.TInt),
+    ("amount", ht.NoDefault, ht.TInt),
+    ("wait_for_sync", True, ht.TBool),
     ]
   REQ_BGL = False
 
@@ -8391,9 +8819,10 @@ class LUGrowDisk(LogicalUnit):
     self.disk = instance.FindDisk(self.op.disk)
 
     if instance.disk_template != constants.DT_FILE:
-      # TODO: check the free disk space for file, when that feature will be
-      # supported
-      _CheckNodesFreeDisk(self, nodenames, self.op.amount)
+      # TODO: check the free disk space for file, when that feature
+      # will be supported
+      _CheckNodesFreeDiskPerVG(self, nodenames,
+                               {self.disk.physical_id[0]: self.op.amount})
 
   def Exec(self, feedback_fn):
     """Execute disk grow.
@@ -8438,8 +8867,8 @@ class LUQueryInstanceData(NoHooksLU):
 
   """
   _OP_PARAMS = [
-    ("instances", _EmptyList, _TListOf(_TNonEmptyString)),
-    ("static", False, _TBool),
+    ("instances", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
+    ("static", False, ht.TBool),
     ]
   REQ_BGL = False
 
@@ -8599,15 +9028,15 @@ class LUSetInstanceParams(LogicalUnit):
   HTYPE = constants.HTYPE_INSTANCE
   _OP_PARAMS = [
     _PInstanceName,
-    ("nics", _EmptyList, _TList),
-    ("disks", _EmptyList, _TList),
-    ("beparams", _EmptyDict, _TDict),
-    ("hvparams", _EmptyDict, _TDict),
-    ("disk_template", None, _TMaybeString),
-    ("remote_node", None, _TMaybeString),
-    ("os_name", None, _TMaybeString),
-    ("force_variant", False, _TBool),
-    ("osparams", None, _TOr(_TDict, _TNone)),
+    ("nics", ht.EmptyList, ht.TList),
+    ("disks", ht.EmptyList, ht.TList),
+    ("beparams", ht.EmptyDict, ht.TDict),
+    ("hvparams", ht.EmptyDict, ht.TDict),
+    ("disk_template", None, ht.TMaybeString),
+    ("remote_node", None, ht.TMaybeString),
+    ("os_name", None, ht.TMaybeString),
+    ("force_variant", False, ht.TBool),
+    ("osparams", None, ht.TOr(ht.TDict, ht.TNone)),
     _PForce,
     ]
   REQ_BGL = False
@@ -8696,7 +9125,7 @@ class LUSetInstanceParams(LogicalUnit):
         if nic_ip.lower() == constants.VALUE_NONE:
           nic_dict['ip'] = None
         else:
-          if not netutils.IP4Address.IsValid(nic_ip):
+          if not netutils.IPAddress.IsValid(nic_ip):
             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip,
                                        errors.ECODE_INVAL)
 
@@ -8835,9 +9264,9 @@ class LUSetInstanceParams(LogicalUnit):
                                      self.op.remote_node, errors.ECODE_STATE)
         _CheckNodeOnline(self, self.op.remote_node)
         _CheckNodeNotDrained(self, self.op.remote_node)
-        disks = [{"size": d.size} for d in instance.disks]
-        required = _ComputeDiskSize(self.op.disk_template, disks)
-        _CheckNodesFreeDisk(self, [self.op.remote_node], required)
+        disks = [{"size": d.size, "vg": d.vg} for d in instance.disks]
+        required = _ComputeDiskSizePerVG(self.op.disk_template, disks)
+        _CheckNodesFreeDiskPerVG(self, [self.op.remote_node], required)
 
     # hvparams processing
     if self.op.hvparams:
@@ -8869,10 +9298,9 @@ class LUSetInstanceParams(LogicalUnit):
     if self.op.osparams:
       i_osdict = _GetUpdatedParams(instance.osparams, self.op.osparams)
       _CheckOSParams(self, True, nodelist, instance_os, i_osdict)
-      self.os_new = cluster.SimpleFillOS(instance_os, i_osdict)
       self.os_inst = i_osdict # the new dict (without defaults)
     else:
-      self.os_new = self.os_inst = {}
+      self.os_inst = {}
 
     self.warn = []
 
@@ -9044,7 +9472,7 @@ class LUSetInstanceParams(LogicalUnit):
     disk_info = [{"size": d.size, "mode": d.mode} for d in instance.disks]
     new_disks = _GenerateDiskTemplate(self, self.op.disk_template,
                                       instance.name, pnode, [snode],
-                                      disk_info, None, None, 0)
+                                      disk_info, None, None, 0, feedback_fn)
     info = _GetInstanceInfoText(instance)
     feedback_fn("Creating aditional volumes...")
     # first, create the missing data and meta devices
@@ -9162,7 +9590,7 @@ class LUSetInstanceParams(LogicalUnit):
                                          [disk_dict],
                                          file_path,
                                          file_driver,
-                                         disk_idx_base)[0]
+                                         disk_idx_base, feedback_fn)[0]
         instance.disks.append(new_disk)
         info = _GetInstanceInfoText(instance)
 
@@ -9264,8 +9692,8 @@ class LUQueryExports(NoHooksLU):
 
   """
   _OP_PARAMS = [
-    ("nodes", _EmptyList, _TListOf(_TNonEmptyString)),
-    ("use_locking", False, _TBool),
+    ("nodes", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
+    ("use_locking", False, ht.TBool),
     ]
   REQ_BGL = False
 
@@ -9305,7 +9733,7 @@ class LUPrepareExport(NoHooksLU):
   """
   _OP_PARAMS = [
     _PInstanceName,
-    ("mode", _NoDefault, _TElemOf(constants.EXPORT_MODES)),
+    ("mode", ht.NoDefault, ht.TElemOf(constants.EXPORT_MODES)),
     ]
   REQ_BGL = False
 
@@ -9362,14 +9790,14 @@ class LUExportInstance(LogicalUnit):
   HTYPE = constants.HTYPE_INSTANCE
   _OP_PARAMS = [
     _PInstanceName,
-    ("target_node", _NoDefault, _TOr(_TNonEmptyString, _TList)),
-    ("shutdown", True, _TBool),
+    ("target_node", ht.NoDefault, ht.TOr(ht.TNonEmptyString, ht.TList)),
+    ("shutdown", True, ht.TBool),
     _PShutdownTimeout,
-    ("remove_instance", False, _TBool),
-    ("ignore_remove_failures", False, _TBool),
-    ("mode", constants.EXPORT_MODE_LOCAL, _TElemOf(constants.EXPORT_MODES)),
-    ("x509_key_name", None, _TOr(_TList, _TNone)),
-    ("destination_x509_ca", None, _TMaybeString),
+    ("remove_instance", False, ht.TBool),
+    ("ignore_remove_failures", False, ht.TBool),
+    ("mode", constants.EXPORT_MODE_LOCAL, ht.TElemOf(constants.EXPORT_MODES)),
+    ("x509_key_name", None, ht.TOr(ht.TList, ht.TNone)),
+    ("destination_x509_ca", None, ht.TMaybeString),
     ]
   REQ_BGL = False
 
@@ -9380,10 +9808,6 @@ class LUExportInstance(LogicalUnit):
     self.x509_key_name = self.op.x509_key_name
     self.dest_x509_ca_pem = self.op.destination_x509_ca
 
-    if self.op.remove_instance and not self.op.shutdown:
-      raise errors.OpPrereqError("Can not remove instance without shutting it"
-                                 " down before")
-
     if self.op.mode == constants.EXPORT_MODE_REMOTE:
       if not self.x509_key_name:
         raise errors.OpPrereqError("Missing X509 key name for encryption",
@@ -9449,6 +9873,11 @@ class LUExportInstance(LogicalUnit):
           "Cannot retrieve locked instance %s" % self.op.instance_name
     _CheckNodeOnline(self, self.instance.primary_node)
 
+    if (self.op.remove_instance and self.instance.admin_up and
+        not self.op.shutdown):
+      raise errors.OpPrereqError("Can not remove instance without shutting it"
+                                 " down before")
+
     if self.op.mode == constants.EXPORT_MODE_LOCAL:
       self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
       self.dst_node = self.cfg.GetNodeInfo(self.op.target_node)
@@ -9718,6 +10147,9 @@ class TagsLU(NoHooksLU): # pylint: disable-msg=W0223
       self.op.name = _ExpandInstanceName(self.cfg, self.op.name)
       self.needed_locks[locking.LEVEL_INSTANCE] = self.op.name
 
+    # FIXME: Acquire BGL for cluster tag operations (as of this writing it's
+    # not possible to acquire the BGL based on opcode parameters)
+
   def CheckPrereq(self):
     """Check prerequisites.
 
@@ -9738,11 +10170,18 @@ class LUGetTags(TagsLU):
 
   """
   _OP_PARAMS = [
-    ("kind", _NoDefault, _TElemOf(constants.VALID_TAG_TYPES)),
-    ("name", _NoDefault, _TNonEmptyString),
+    ("kind", ht.NoDefault, ht.TElemOf(constants.VALID_TAG_TYPES)),
+    # Name is only meaningful for nodes and instances
+    ("name", ht.NoDefault, ht.TMaybeString),
     ]
   REQ_BGL = False
 
+  def ExpandNames(self):
+    TagsLU.ExpandNames(self)
+
+    # Share locks as this is only a read operation
+    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
+
   def Exec(self, feedback_fn):
     """Returns the tag list.
 
@@ -9755,7 +10194,7 @@ class LUSearchTags(NoHooksLU):
 
   """
   _OP_PARAMS = [
-    ("pattern", _NoDefault, _TNonEmptyString),
+    ("pattern", ht.NoDefault, ht.TNonEmptyString),
     ]
   REQ_BGL = False
 
@@ -9797,9 +10236,10 @@ class LUAddTags(TagsLU):
 
   """
   _OP_PARAMS = [
-    ("kind", _NoDefault, _TElemOf(constants.VALID_TAG_TYPES)),
-    ("name", _NoDefault, _TNonEmptyString),
-    ("tags", _NoDefault, _TListOf(_TNonEmptyString)),
+    ("kind", ht.NoDefault, ht.TElemOf(constants.VALID_TAG_TYPES)),
+    # Name is only meaningful for nodes and instances
+    ("name", ht.NoDefault, ht.TMaybeString),
+    ("tags", ht.NoDefault, ht.TListOf(ht.TNonEmptyString)),
     ]
   REQ_BGL = False
 
@@ -9830,9 +10270,10 @@ class LUDelTags(TagsLU):
 
   """
   _OP_PARAMS = [
-    ("kind", _NoDefault, _TElemOf(constants.VALID_TAG_TYPES)),
-    ("name", _NoDefault, _TNonEmptyString),
-    ("tags", _NoDefault, _TListOf(_TNonEmptyString)),
+    ("kind", ht.NoDefault, ht.TElemOf(constants.VALID_TAG_TYPES)),
+    # Name is only meaningful for nodes and instances
+    ("name", ht.NoDefault, ht.TMaybeString),
+    ("tags", ht.NoDefault, ht.TListOf(ht.TNonEmptyString)),
     ]
   REQ_BGL = False
 
@@ -9847,12 +10288,13 @@ class LUDelTags(TagsLU):
       objects.TaggableObject.ValidateTag(tag)
     del_tags = frozenset(self.op.tags)
     cur_tags = self.target.GetTags()
-    if not del_tags <= cur_tags:
-      diff_tags = del_tags - cur_tags
-      diff_names = ["'%s'" % tag for tag in diff_tags]
-      diff_names.sort()
+
+    diff_tags = del_tags - cur_tags
+    if diff_tags:
+      diff_names = ("'%s'" % i for i in sorted(diff_tags))
       raise errors.OpPrereqError("Tag(s) %s not found" %
-                                 (",".join(diff_names)), errors.ECODE_NOENT)
+                                 (utils.CommaJoin(diff_names), ),
+                                 errors.ECODE_NOENT)
 
   def Exec(self, feedback_fn):
     """Remove the tag from the object.
@@ -9871,10 +10313,10 @@ class LUTestDelay(NoHooksLU):
 
   """
   _OP_PARAMS = [
-    ("duration", _NoDefault, _TFloat),
-    ("on_master", True, _TBool),
-    ("on_nodes", _EmptyList, _TListOf(_TNonEmptyString)),
-    ("repeat", 0, _TPositiveInt)
+    ("duration", ht.NoDefault, ht.TFloat),
+    ("on_master", True, ht.TBool),
+    ("on_nodes", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
+    ("repeat", 0, ht.TPositiveInt)
     ]
   REQ_BGL = False
 
@@ -9922,10 +10364,10 @@ class LUTestJobqueue(NoHooksLU):
 
   """
   _OP_PARAMS = [
-    ("notify_waitlock", False, _TBool),
-    ("notify_exec", False, _TBool),
-    ("log_messages", _EmptyList, _TListOf(_TString)),
-    ("fail", False, _TBool),
+    ("notify_waitlock", False, ht.TBool),
+    ("notify_exec", False, ht.TBool),
+    ("log_messages", ht.EmptyList, ht.TListOf(ht.TString)),
+    ("fail", False, ht.TBool),
     ]
   REQ_BGL = False
 
@@ -10146,7 +10588,6 @@ class IAllocator(object):
     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
 
     # node data
-    node_results = {}
     node_list = cfg.GetNodeList()
 
     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
@@ -10161,6 +10602,31 @@ class IAllocator(object):
     node_iinfo = \
       self.rpc.call_all_instances_info(node_list,
                                        cluster_info.enabled_hypervisors)
+
+    data["nodegroups"] = self._ComputeNodeGroupData(cfg)
+
+    data["nodes"] = self._ComputeNodeData(cfg, node_data, node_iinfo, i_list)
+
+    data["instances"] = self._ComputeInstanceData(cluster_info, i_list)
+
+    self.in_data = data
+
+  @staticmethod
+  def _ComputeNodeGroupData(cfg):
+    """Compute node groups data.
+
+    """
+    ng = {}
+    for guuid, gdata in cfg.GetAllNodeGroupsInfo().items():
+      ng[guuid] = { "name": gdata.name }
+    return ng
+
+  @staticmethod
+  def _ComputeNodeData(cfg, node_data, node_iinfo, i_list):
+    """Compute global node data.
+
+    """
+    node_results = {}
     for nname, nresult in node_data.items():
       # first fill in static (config-based) values
       ninfo = cfg.GetNodeInfo(nname)
@@ -10171,6 +10637,9 @@ class IAllocator(object):
         "offline": ninfo.offline,
         "drained": ninfo.drained,
         "master_candidate": ninfo.master_candidate,
+        "group": ninfo.group,
+        "master_capable": ninfo.master_capable,
+        "vm_capable": ninfo.vm_capable,
         }
 
       if not (ninfo.offline or ninfo.drained):
@@ -10217,9 +10686,14 @@ class IAllocator(object):
         pnr.update(pnr_dyn)
 
       node_results[nname] = pnr
-    data["nodes"] = node_results
 
-    # instance data
+    return node_results
+
+  @staticmethod
+  def _ComputeInstanceData(cluster_info, i_list):
+    """Compute global instance data.
+
+    """
     instance_data = {}
     for iinfo, beinfo in i_list:
       nic_data = []
@@ -10249,9 +10723,7 @@ class IAllocator(object):
                                                  pir["disks"])
       instance_data[iinfo.name] = pir
 
-    data["instances"] = instance_data
-
-    self.in_data = data
+    return instance_data
 
   def _AddNewInstance(self):
     """Add new instance data to allocator structure.
@@ -10392,21 +10864,22 @@ class LUTestAllocator(NoHooksLU):
 
   """
   _OP_PARAMS = [
-    ("direction", _NoDefault, _TElemOf(constants.VALID_IALLOCATOR_DIRECTIONS)),
-    ("mode", _NoDefault, _TElemOf(constants.VALID_IALLOCATOR_MODES)),
-    ("name", _NoDefault, _TNonEmptyString),
-    ("nics", _NoDefault, _TOr(_TNone, _TListOf(
-      _TDictOf(_TElemOf(["mac", "ip", "bridge"]),
-               _TOr(_TNone, _TNonEmptyString))))),
-    ("disks", _NoDefault, _TOr(_TNone, _TList)),
-    ("hypervisor", None, _TMaybeString),
-    ("allocator", None, _TMaybeString),
-    ("tags", _EmptyList, _TListOf(_TNonEmptyString)),
-    ("mem_size", None, _TOr(_TNone, _TPositiveInt)),
-    ("vcpus", None, _TOr(_TNone, _TPositiveInt)),
-    ("os", None, _TMaybeString),
-    ("disk_template", None, _TMaybeString),
-    ("evac_nodes", None, _TOr(_TNone, _TListOf(_TNonEmptyString))),
+    ("direction", ht.NoDefault,
+     ht.TElemOf(constants.VALID_IALLOCATOR_DIRECTIONS)),
+    ("mode", ht.NoDefault, ht.TElemOf(constants.VALID_IALLOCATOR_MODES)),
+    ("name", ht.NoDefault, ht.TNonEmptyString),
+    ("nics", ht.NoDefault, ht.TOr(ht.TNone, ht.TListOf(
+      ht.TDictOf(ht.TElemOf(["mac", "ip", "bridge"]),
+               ht.TOr(ht.TNone, ht.TNonEmptyString))))),
+    ("disks", ht.NoDefault, ht.TOr(ht.TNone, ht.TList)),
+    ("hypervisor", None, ht.TMaybeString),
+    ("allocator", None, ht.TMaybeString),
+    ("tags", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)),
+    ("mem_size", None, ht.TOr(ht.TNone, ht.TPositiveInt)),
+    ("vcpus", None, ht.TOr(ht.TNone, ht.TPositiveInt)),
+    ("os", None, ht.TMaybeString),
+    ("disk_template", None, ht.TMaybeString),
+    ("evac_nodes", None, ht.TOr(ht.TNone, ht.TListOf(ht.TNonEmptyString))),
     ]
 
   def CheckPrereq(self):