Always_failover doesn't require --allow-failover anymore
[ganeti-local] / lib / cmdlib.py
index e11645e..8e0a622 100644 (file)
@@ -32,7 +32,6 @@ import os
 import os.path
 import time
 import re
-import platform
 import logging
 import copy
 import OpenSSL
@@ -60,6 +59,7 @@ from ganeti import qlang
 from ganeti import opcodes
 from ganeti import ht
 from ganeti import rpc
+from ganeti import runtime
 
 import ganeti.masterd.instance # pylint: disable=W0611
 
@@ -82,7 +82,7 @@ class ResultWithJobs:
   """Data container for LU results with jobs.
 
   Instances of this class returned from L{LogicalUnit.Exec} will be recognized
-  by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
+  by L{mcpu._ProcessResult}. The latter will then submit the jobs
   contained in the C{jobs} attribute and include the job IDs in the opcode
   result.
 
@@ -493,6 +493,9 @@ class _QueryBase:
   #: Attribute holding field definitions
   FIELDS = None
 
+  #: Field to sort by
+  SORT_FIELD = "name"
+
   def __init__(self, qfilter, fields, use_locking):
     """Initializes this class.
 
@@ -500,7 +503,7 @@ class _QueryBase:
     self.use_locking = use_locking
 
     self.query = query.Query(self.FIELDS, fields, qfilter=qfilter,
-                             namefield="name")
+                             namefield=self.SORT_FIELD)
     self.requested_data = self.query.RequestedData()
     self.names = self.query.RequestedNames()
 
@@ -596,6 +599,47 @@ def _MakeLegacyNodeInfo(data):
     })
 
 
+def _AnnotateDiskParams(instance, devs, cfg):
+  """Little helper wrapper to the rpc annotation method.
+
+  @param instance: The instance object
+  @type devs: List of L{objects.Disk}
+  @param devs: The root devices (not any of its children!)
+  @param cfg: The config object
+  @returns The annotated disk copies
+  @see L{rpc.AnnotateDiskParams}
+
+  """
+  return rpc.AnnotateDiskParams(instance.disk_template, devs,
+                                cfg.GetInstanceDiskParams(instance))
+
+
+def _CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_nodes,
+                              cur_group_uuid):
+  """Checks if node groups for locked instances are still correct.
+
+  @type cfg: L{config.ConfigWriter}
+  @param cfg: Cluster configuration
+  @type instances: dict; string as key, L{objects.Instance} as value
+  @param instances: Dictionary, instance name as key, instance object as value
+  @type owned_groups: iterable of string
+  @param owned_groups: List of owned groups
+  @type owned_nodes: iterable of string
+  @param owned_nodes: List of owned nodes
+  @type cur_group_uuid: string or None
+  @param cur_group_uuid: Optional group UUID to check against instance's groups
+
+  """
+  for (name, inst) in instances.items():
+    assert owned_nodes.issuperset(inst.all_nodes), \
+      "Instance %s's nodes changed while we kept the lock" % name
+
+    inst_groups = _CheckInstanceNodeGroups(cfg, name, owned_groups)
+
+    assert cur_group_uuid is None or cur_group_uuid in inst_groups, \
+      "Instance %s has no node in group %s" % (name, cur_group_uuid)
+
+
 def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups):
   """Checks if the owned node groups are still correct for an instance.
 
@@ -659,6 +703,18 @@ def _SupportsOob(cfg, node):
   return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
 
 
+def _CopyLockList(names):
+  """Makes a copy of a list of lock names.
+
+  Handles L{locking.ALL_SET} correctly.
+
+  """
+  if names == locking.ALL_SET:
+    return locking.ALL_SET
+  else:
+    return names[:]
+
+
 def _GetWantedNodes(lu, nodes):
   """Returns list of checked and expanded node names.
 
@@ -749,7 +805,8 @@ def _GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False):
                                        use_none=use_none,
                                        use_default=use_default)
     else:
-      if not value or value == [constants.VALUE_DEFAULT]:
+      if (not value or value == [constants.VALUE_DEFAULT] or
+          value == constants.VALUE_DEFAULT):
         if group_policy:
           del ipolicy[key]
         else:
@@ -770,7 +827,7 @@ def _GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False):
           # in a nicer way
           ipolicy[key] = list(value)
   try:
-    objects.InstancePolicy.CheckParameterSyntax(ipolicy)
+    objects.InstancePolicy.CheckParameterSyntax(ipolicy, not group_policy)
   except errors.ConfigurationError, err:
     raise errors.OpPrereqError("Invalid instance policy: %s" % err,
                                errors.ECODE_INVAL)
@@ -912,9 +969,8 @@ def _RunPostHook(lu, node_name):
   hm = lu.proc.BuildHooksManager(lu)
   try:
     hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
-  except:
-    # pylint: disable=W0702
-    lu.LogWarning("Errors occurred running hooks on %s" % node_name)
+  except Exception, err: # pylint: disable=W0703
+    lu.LogWarning("Errors occurred running hooks on %s: %s" % (node_name, err))
 
 
 def _CheckOutputFields(static, dynamic, selected):
@@ -1072,10 +1128,12 @@ def _CheckInstanceState(lu, instance, req_states, msg=None):
                                  (instance.name, msg), errors.ECODE_STATE)
 
 
-def _ComputeMinMaxSpec(name, ipolicy, value):
+def _ComputeMinMaxSpec(name, qualifier, ipolicy, value):
   """Computes if value is in the desired range.
 
   @param name: name of the parameter for which we perform the check
+  @param qualifier: a qualifier used in the error message (e.g. 'disk/1',
+      not just 'disk')
   @param ipolicy: dictionary containing min, max and std values
   @param value: actual value that we want to use
   @return: None or element not meeting the criteria
@@ -1087,13 +1145,17 @@ def _ComputeMinMaxSpec(name, ipolicy, value):
   max_v = ipolicy[constants.ISPECS_MAX].get(name, value)
   min_v = ipolicy[constants.ISPECS_MIN].get(name, value)
   if value > max_v or min_v > value:
+    if qualifier:
+      fqn = "%s/%s" % (name, qualifier)
+    else:
+      fqn = name
     return ("%s value %s is not in range [%s, %s]" %
-            (name, value, min_v, max_v))
+            (fqn, value, min_v, max_v))
   return None
 
 
 def _ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count,
-                                 nic_count, disk_sizes,
+                                 nic_count, disk_sizes, spindle_use,
                                  _compute_fn=_ComputeMinMaxSpec):
   """Verifies ipolicy against provided specs.
 
@@ -1109,6 +1171,8 @@ def _ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count,
   @param nic_count: Number of nics used
   @type disk_sizes: list of ints
   @param disk_sizes: Disk sizes of used disk (len must match C{disk_count})
+  @type spindle_use: int
+  @param spindle_use: The number of spindles this instance uses
   @param _compute_fn: The compute function (unittest only)
   @return: A list of violations, or an empty list of no violations are found
 
@@ -1116,15 +1180,17 @@ def _ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count,
   assert disk_count == len(disk_sizes)
 
   test_settings = [
-    (constants.ISPEC_MEM_SIZE, mem_size),
-    (constants.ISPEC_CPU_COUNT, cpu_count),
-    (constants.ISPEC_DISK_COUNT, disk_count),
-    (constants.ISPEC_NIC_COUNT, nic_count),
-    ] + map((lambda d: (constants.ISPEC_DISK_SIZE, d)), disk_sizes)
+    (constants.ISPEC_MEM_SIZE, "", mem_size),
+    (constants.ISPEC_CPU_COUNT, "", cpu_count),
+    (constants.ISPEC_DISK_COUNT, "", disk_count),
+    (constants.ISPEC_NIC_COUNT, "", nic_count),
+    (constants.ISPEC_SPINDLE_USE, "", spindle_use),
+    ] + [(constants.ISPEC_DISK_SIZE, str(idx), d)
+         for idx, d in enumerate(disk_sizes)]
 
   return filter(None,
-                (_compute_fn(name, ipolicy, value)
-                 for (name, value) in test_settings))
+                (_compute_fn(name, qualifier, ipolicy, value)
+                 for (name, qualifier, value) in test_settings))
 
 
 def _ComputeIPolicyInstanceViolation(ipolicy, instance,
@@ -1141,12 +1207,13 @@ def _ComputeIPolicyInstanceViolation(ipolicy, instance,
   """
   mem_size = instance.beparams.get(constants.BE_MAXMEM, None)
   cpu_count = instance.beparams.get(constants.BE_VCPUS, None)
+  spindle_use = instance.beparams.get(constants.BE_SPINDLE_USE, None)
   disk_count = len(instance.disks)
   disk_sizes = [disk.size for disk in instance.disks]
   nic_count = len(instance.nics)
 
   return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
-                     disk_sizes)
+                     disk_sizes, spindle_use)
 
 
 def _ComputeIPolicyInstanceSpecViolation(ipolicy, instance_spec,
@@ -1166,9 +1233,10 @@ def _ComputeIPolicyInstanceSpecViolation(ipolicy, instance_spec,
   disk_count = instance_spec.get(constants.ISPEC_DISK_COUNT, 0)
   disk_sizes = instance_spec.get(constants.ISPEC_DISK_SIZE, [])
   nic_count = instance_spec.get(constants.ISPEC_NIC_COUNT, 0)
+  spindle_use = instance_spec.get(constants.ISPEC_SPINDLE_USE, None)
 
   return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
-                     disk_sizes)
+                     disk_sizes, spindle_use)
 
 
 def _ComputeIPolicyNodeViolation(ipolicy, instance, current_group,
@@ -1220,11 +1288,12 @@ def _ComputeNewInstanceViolations(old_ipolicy, new_ipolicy, instances):
   @param old_ipolicy: The current (still in-place) ipolicy
   @param new_ipolicy: The new (to become) ipolicy
   @param instances: List of instances to verify
-  @return: A list of instances which violates the new ipolicy but did not before
+  @return: A list of instances which violates the new ipolicy but
+      did not before
 
   """
-  return (_ComputeViolatingInstances(old_ipolicy, instances) -
-          _ComputeViolatingInstances(new_ipolicy, instances))
+  return (_ComputeViolatingInstances(new_ipolicy, instances) -
+          _ComputeViolatingInstances(old_ipolicy, instances))
 
 
 def _ExpandItemName(fn, name, kind):
@@ -1553,7 +1622,8 @@ def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq):
   for dev in instance.disks:
     cfg.SetDiskID(dev, node_name)
 
-  result = rpc_runner.call_blockdev_getmirrorstatus(node_name, instance.disks)
+  result = rpc_runner.call_blockdev_getmirrorstatus(node_name, (instance.disks,
+                                                                instance))
   result.Raise("Failed to get disk status from node %s" % node_name,
                prereq=prereq, ecode=errors.ECODE_ENVIRON)
 
@@ -1880,7 +1950,7 @@ class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
   """Verifies the cluster config.
 
   """
-  REQ_BGL = True
+  REQ_BGL = False
 
   def _VerifyHVP(self, hvp_data):
     """Verifies locally the syntax of the hypervisor parameters.
@@ -1897,13 +1967,17 @@ class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
         self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
 
   def ExpandNames(self):
-    # Information can be safely retrieved as the BGL is acquired in exclusive
-    # mode
-    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER)
+    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
+    self.share_locks = _ShareAll()
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    """
+    # Retrieve all information
     self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
     self.all_node_info = self.cfg.GetAllNodesInfo()
     self.all_inst_info = self.cfg.GetAllInstancesInfo()
-    self.needed_locks = {}
 
   def Exec(self, feedback_fn):
     """Verify integrity of cluster, performing various test on nodes.
@@ -2031,7 +2105,8 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
     self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
 
     # Get instances in node group; this is unsafe and needs verification later
-    inst_names = self.cfg.GetNodeGroupInstances(self.group_uuid)
+    inst_names = \
+      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
 
     self.needed_locks = {
       locking.LEVEL_INSTANCE: inst_names,
@@ -2065,7 +2140,8 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
     self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
 
     group_nodes = set(self.group_info.members)
-    group_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
+    group_instances = \
+      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
 
     unlocked_nodes = \
         group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
@@ -2075,11 +2151,13 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
 
     if unlocked_nodes:
       raise errors.OpPrereqError("Missing lock for nodes: %s" %
-                                 utils.CommaJoin(unlocked_nodes))
+                                 utils.CommaJoin(unlocked_nodes),
+                                 errors.ECODE_STATE)
 
     if unlocked_instances:
       raise errors.OpPrereqError("Missing lock for instances: %s" %
-                                 utils.CommaJoin(unlocked_instances))
+                                 utils.CommaJoin(unlocked_instances),
+                                 errors.ECODE_STATE)
 
     self.all_node_info = self.cfg.GetAllNodesInfo()
     self.all_inst_info = self.cfg.GetAllInstancesInfo()
@@ -2099,17 +2177,17 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
 
     for inst in self.my_inst_info.values():
       if inst.disk_template in constants.DTS_INT_MIRROR:
-        group = self.my_node_info[inst.primary_node].group
-        for nname in inst.secondary_nodes:
-          if self.all_node_info[nname].group != group:
+        for nname in inst.all_nodes:
+          if self.all_node_info[nname].group != self.group_uuid:
             extra_lv_nodes.add(nname)
 
     unlocked_lv_nodes = \
         extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
 
     if unlocked_lv_nodes:
-      raise errors.OpPrereqError("these nodes could be locked: %s" %
-                                 utils.CommaJoin(unlocked_lv_nodes))
+      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
+                                 utils.CommaJoin(unlocked_lv_nodes),
+                                 errors.ECODE_STATE)
     self.extra_lv_nodes = list(extra_lv_nodes)
 
   def _VerifyNode(self, ninfo, nresult):
@@ -2356,7 +2434,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
 
     ipolicy = _CalculateGroupIPolicy(self.cfg.GetClusterInfo(), self.group_info)
     err = _ComputeIPolicyInstanceViolation(ipolicy, instanceconfig)
-    _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, err)
+    _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err))
 
     for node in node_vol_should:
       n_img = node_image[node]
@@ -2405,7 +2483,8 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
 
     """
     for node, n_img in node_image.items():
-      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
+      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
+          self.all_node_info[node].group != self.group_uuid):
         # skip non-healthy nodes
         continue
       for volume in n_img.volumes:
@@ -2432,11 +2511,11 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
       # WARNING: we currently take into account down instances as well
       # as up ones, considering that even if they're down someone
       # might want to start them even in the event of a node failure.
-      if n_img.offline:
-        # we're skipping offline nodes from the N+1 warning, since
-        # most likely we don't have good memory infromation from them;
-        # we already list instances living on such nodes, and that's
-        # enough warning
+      if n_img.offline or self.all_node_info[node].group != self.group_uuid:
+        # we're skipping nodes marked offline and nodes in other groups from
+        # the N+1 warning, since most likely we don't have good memory
+        # infromation from them; we already list instances living on such
+        # nodes, and that's enough warning
         continue
       #TODO(dynmem): also consider ballooning out other instances
       for prinode, instances in n_img.sbp.items():
@@ -2856,12 +2935,12 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
 
       node_disks[nname] = disks
 
-      # Creating copies as SetDiskID below will modify the objects and that can
-      # lead to incorrect data returned from nodes
-      devonly = [dev.Copy() for (_, dev) in disks]
-
-      for dev in devonly:
-        self.cfg.SetDiskID(dev, nname)
+      # _AnnotateDiskParams makes already copies of the disks
+      devonly = []
+      for (inst, dev) in disks:
+        (anno_disk,) = _AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg)
+        self.cfg.SetDiskID(anno_disk, nname)
+        devonly.append(anno_disk)
 
       node_disks_devonly[nname] = devonly
 
@@ -3085,6 +3164,8 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
 
     for instance in self.my_inst_names:
       inst_config = self.my_inst_info[instance]
+      if inst_config.admin_state == constants.ADMINST_OFFLINE:
+        i_offline += 1
 
       for nname in inst_config.all_nodes:
         if nname not in node_image:
@@ -3144,10 +3225,12 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
       if master_node not in self.my_node_info:
         additional_nodes.append(master_node)
         vf_node_info.append(self.all_node_info[master_node])
-      # Add the first vm_capable node we find which is not included
+      # Add the first vm_capable node we find which is not included,
+      # excluding the master node (which we already have)
       for node in absent_nodes:
         nodeinfo = self.all_node_info[node]
-        if nodeinfo.vm_capable and not nodeinfo.offline:
+        if (nodeinfo.vm_capable and not nodeinfo.offline and
+            node != master_node):
           additional_nodes.append(node)
           vf_node_info.append(self.all_node_info[node])
           break
@@ -3224,12 +3307,6 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
         non_primary_inst = set(nimg.instances).difference(nimg.pinst)
 
         for inst in non_primary_inst:
-          # FIXME: investigate best way to handle offline insts
-          if inst.admin_state == constants.ADMINST_OFFLINE:
-            if verbose:
-              feedback_fn("* Skipping offline instance %s" % inst.name)
-            i_offline += 1
-            continue
           test = inst in self.all_inst_info
           _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
                    "instance should not run on node %s", node_i.name)
@@ -3489,15 +3566,8 @@ class LUGroupVerifyDisks(NoHooksLU):
     self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances))
 
     # Check if node groups for locked instances are still correct
-    for (instance_name, inst) in self.instances.items():
-      assert owned_nodes.issuperset(inst.all_nodes), \
-        "Instance %s's nodes changed while we kept the lock" % instance_name
-
-      inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name,
-                                             owned_groups)
-
-      assert self.group_uuid in inst_groups, \
-        "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
+    _CheckInstancesNodeGroups(self.cfg, self.instances,
+                              owned_groups, owned_nodes, self.group_uuid)
 
   def Exec(self, feedback_fn):
     """Verify integrity of cluster disks.
@@ -3800,6 +3870,11 @@ class LUClusterSetParams(LogicalUnit):
     if self.op.diskparams:
       for dt_params in self.op.diskparams.values():
         utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
+      try:
+        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
+      except errors.OpPrereqError, err:
+        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
+                                   errors.ECODE_INVAL)
 
   def ExpandNames(self):
     # FIXME: in the future maybe other cluster params won't require checking on
@@ -3936,7 +4011,7 @@ class LUClusterSetParams(LogicalUnit):
       if violations:
         self.LogWarning("After the ipolicy change the following instances"
                         " violate them: %s",
-                        utils.CommaJoin(violations))
+                        utils.CommaJoin(utils.NiceSort(violations)))
 
     if self.op.nicparams:
       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
@@ -4248,6 +4323,9 @@ def _ComputeAncillaryFiles(cluster, redist):
   if cluster.modify_etc_hosts:
     files_all.add(constants.ETC_HOSTS)
 
+  if cluster.use_external_mip_script:
+    files_all.add(constants.EXTERNAL_MASTER_SETUP_SCRIPT)
+
   # Files which are optional, these must:
   # - be present in one other category as well
   # - either exist or not exist on all nodes of that category (mc, vm all)
@@ -4261,10 +4339,6 @@ def _ComputeAncillaryFiles(cluster, redist):
   if not redist:
     files_mc.add(constants.CLUSTER_CONF_FILE)
 
-    # FIXME: this should also be replicated but Ganeti doesn't support files_mc
-    # replication
-    files_mc.add(constants.DEFAULT_MASTER_SETUP_SCRIPT)
-
   # Files which should only be on VM-capable nodes
   files_vm = set(filename
     for hv_name in cluster.enabled_hypervisors
@@ -4305,7 +4379,8 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
   master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
 
   online_nodes = lu.cfg.GetOnlineNodeList()
-  vm_nodes = lu.cfg.GetVmCapableNodeList()
+  online_set = frozenset(online_nodes)
+  vm_nodes = list(online_set.intersection(lu.cfg.GetVmCapableNodeList()))
 
   if additional_nodes is not None:
     online_nodes.extend(additional_nodes)
@@ -4414,7 +4489,7 @@ def _WaitForSync(lu, instance, disks=None, oneshot=False):
     max_time = 0
     done = True
     cumul_degraded = False
-    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
+    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
     msg = rstats.fail_msg
     if msg:
       lu.LogWarning("Can't get any data from node %s: %s", node, msg)
@@ -4464,9 +4539,35 @@ def _WaitForSync(lu, instance, disks=None, oneshot=False):
   return not cumul_degraded
 
 
-def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
+def _BlockdevFind(lu, node, dev, instance):
+  """Wrapper around call_blockdev_find to annotate diskparams.
+
+  @param lu: A reference to the lu object
+  @param node: The node to call out
+  @param dev: The device to find
+  @param instance: The instance object the device belongs to
+  @returns The result of the rpc call
+
+  """
+  (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
+  return lu.rpc.call_blockdev_find(node, disk)
+
+
+def _CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
+  """Wrapper around L{_CheckDiskConsistencyInner}.
+
+  """
+  (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
+  return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
+                                    ldisk=ldisk)
+
+
+def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
+                               ldisk=False):
   """Check that mirrors are not degraded.
 
+  @attention: The device has to be annotated already.
+
   The ldisk parameter, if True, will change the test from the
   is_degraded attribute (which represents overall non-ok status for
   the device(s)) to the ldisk (representing the local storage status).
@@ -4493,7 +4594,8 @@ def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
 
   if dev.children:
     for child in dev.children:
-      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
+      result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
+                                                     on_primary)
 
   return result
 
@@ -4502,7 +4604,7 @@ class LUOobCommand(NoHooksLU):
   """Logical unit for OOB handling.
 
   """
-  REG_BGL = False
+  REQ_BGL = False
   _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
 
   def ExpandNames(self):
@@ -5579,6 +5681,19 @@ class LUNodeAdd(LogicalUnit):
     if self.op.disk_state:
       self.new_disk_state = _MergeAndVerifyDiskState(self.op.disk_state, None)
 
+    # TODO: If we need to have multiple DnsOnlyRunner we probably should make
+    #       it a property on the base class.
+    result = rpc.DnsOnlyRunner().call_version([node])[node]
+    result.Raise("Can't get version information from node %s" % node)
+    if constants.PROTOCOL_VERSION == result.payload:
+      logging.info("Communication to node %s fine, sw version %s match",
+                   node, result.payload)
+    else:
+      raise errors.OpPrereqError("Version mismatch master version %s,"
+                                 " node version %s" %
+                                 (constants.PROTOCOL_VERSION, result.payload),
+                                 errors.ECODE_ENVIRON)
+
   def Exec(self, feedback_fn):
     """Adds the new node to the cluster.
 
@@ -5623,17 +5738,6 @@ class LUNodeAdd(LogicalUnit):
     if self.op.disk_state:
       new_node.disk_state_static = self.new_disk_state
 
-    # check connectivity
-    result = self.rpc.call_version([node])[node]
-    result.Raise("Can't get version information from node %s" % node)
-    if constants.PROTOCOL_VERSION == result.payload:
-      logging.info("Communication to node %s fine, sw version %s match",
-                   node, result.payload)
-    else:
-      raise errors.OpExecError("Version mismatch master version %s,"
-                               " node version %s" %
-                               (constants.PROTOCOL_VERSION, result.payload))
-
     # Add node to our /etc/hosts, and add key to known_hosts
     if self.cfg.GetClusterInfo().modify_etc_hosts:
       master_node = self.cfg.GetMasterNode()
@@ -5840,7 +5944,8 @@ class LUNodeSetParams(LogicalUnit):
       if mc_remaining < mc_should:
         raise errors.OpPrereqError("Not enough master candidates, please"
                                    " pass auto promote option to allow"
-                                   " promotion", errors.ECODE_STATE)
+                                   " promotion (--auto-promote or RAPI"
+                                   " auto_promote=True)", errors.ECODE_STATE)
 
     self.old_flags = old_flags = (node.master_candidate,
                                   node.drained, node.offline)
@@ -5899,9 +6004,7 @@ class LUNodeSetParams(LogicalUnit):
 
     if old_role == self._ROLE_OFFLINE and new_role != old_role:
       # Trying to transition out of offline status
-      # TODO: Use standard RPC runner, but make sure it works when the node is
-      # still marked offline
-      result = rpc.BootstrapRunner().call_version([node.name])[node.name]
+      result = self.rpc.call_version([node.name])[node.name]
       if result.fail_msg:
         raise errors.OpPrereqError("Node %s is being de-offlined but fails"
                                    " to report its version: %s" %
@@ -6086,7 +6189,7 @@ class LUClusterQuery(NoHooksLU):
       "config_version": constants.CONFIG_VERSION,
       "os_api_version": max(constants.OS_API_VERSIONS),
       "export_version": constants.EXPORT_VERSION,
-      "architecture": (platform.architecture()[0], platform.machine()),
+      "architecture": runtime.GetArchInfo(),
       "name": cluster.cluster_name,
       "master": cluster.master_node,
       "default_hypervisor": cluster.primary_hypervisor,
@@ -6099,6 +6202,7 @@ class LUClusterQuery(NoHooksLU):
       "ipolicy": cluster.ipolicy,
       "nicparams": cluster.nicparams,
       "ndparams": cluster.ndparams,
+      "diskparams": cluster.diskparams,
       "candidate_pool_size": cluster.candidate_pool_size,
       "master_netdev": cluster.master_netdev,
       "master_netmask": cluster.master_netmask,
@@ -6129,38 +6233,70 @@ class LUClusterConfigQuery(NoHooksLU):
 
   """
   REQ_BGL = False
-  _FIELDS_DYNAMIC = utils.FieldSet()
-  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
-                                  "watcher_pause", "volume_group_name")
 
   def CheckArguments(self):
-    _CheckOutputFields(static=self._FIELDS_STATIC,
-                       dynamic=self._FIELDS_DYNAMIC,
-                       selected=self.op.output_fields)
+    self.cq = _ClusterQuery(None, self.op.output_fields, False)
 
   def ExpandNames(self):
-    self.needed_locks = {}
+    self.cq.ExpandNames(self)
+
+  def DeclareLocks(self, level):
+    self.cq.DeclareLocks(self, level)
 
   def Exec(self, feedback_fn):
-    """Dump a representation of the cluster config to the standard output.
-
-    """
-    values = []
-    for field in self.op.output_fields:
-      if field == "cluster_name":
-        entry = self.cfg.GetClusterName()
-      elif field == "master_node":
-        entry = self.cfg.GetMasterNode()
-      elif field == "drain_flag":
-        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
-      elif field == "watcher_pause":
-        entry = utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
-      elif field == "volume_group_name":
-        entry = self.cfg.GetVGName()
-      else:
-        raise errors.ParameterError(field)
-      values.append(entry)
-    return values
+    result = self.cq.OldStyleQuery(self)
+
+    assert len(result) == 1
+
+    return result[0]
+
+
+class _ClusterQuery(_QueryBase):
+  FIELDS = query.CLUSTER_FIELDS
+
+  #: Do not sort (there is only one item)
+  SORT_FIELD = None
+
+  def ExpandNames(self, lu):
+    lu.needed_locks = {}
+
+    # The following variables interact with _QueryBase._GetNames
+    self.wanted = locking.ALL_SET
+    self.do_locking = self.use_locking
+
+    if self.do_locking:
+      raise errors.OpPrereqError("Can not use locking for cluster queries",
+                                 errors.ECODE_INVAL)
+
+  def DeclareLocks(self, lu, level):
+    pass
+
+  def _GetQueryData(self, lu):
+    """Computes the list of nodes and their attributes.
+
+    """
+    # Locking is not used
+    assert not (compat.any(lu.glm.is_owned(level)
+                           for level in locking.LEVELS
+                           if level != locking.LEVEL_CLUSTER) or
+                self.do_locking or self.use_locking)
+
+    if query.CQ_CONFIG in self.requested_data:
+      cluster = lu.cfg.GetClusterInfo()
+    else:
+      cluster = NotImplemented
+
+    if query.CQ_QUEUE_DRAINED in self.requested_data:
+      drain_flag = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
+    else:
+      drain_flag = NotImplemented
+
+    if query.CQ_WATCHER_PAUSE in self.requested_data:
+      watcher_pause = utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
+    else:
+      watcher_pause = NotImplemented
+
+    return query.ClusterQueryData(cluster, drain_flag, watcher_pause)
 
 
 class LUInstanceActivateDisks(NoHooksLU):
@@ -6247,13 +6383,16 @@ def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
         node_disk = node_disk.Copy()
         node_disk.UnsetSize()
       lu.cfg.SetDiskID(node_disk, node)
-      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False, idx)
+      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
+                                             False, idx)
       msg = result.fail_msg
       if msg:
+        is_offline_secondary = (node in instance.secondary_nodes and
+                                result.offline)
         lu.proc.LogWarning("Could not prepare block device %s on node %s"
                            " (is_primary=False, pass=1): %s",
                            inst_disk.iv_name, node, msg)
-        if not ignore_secondaries:
+        if not (ignore_secondaries or is_offline_secondary):
           disks_ok = False
 
   # FIXME: race condition on drbd migration to primary
@@ -6269,7 +6408,8 @@ def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
         node_disk = node_disk.Copy()
         node_disk.UnsetSize()
       lu.cfg.SetDiskID(node_disk, node)
-      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True, idx)
+      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
+                                             True, idx)
       msg = result.fail_msg
       if msg:
         lu.proc.LogWarning("Could not prepare block device %s on node %s"
@@ -6385,7 +6525,7 @@ def _ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
   for disk in disks:
     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
       lu.cfg.SetDiskID(top_disk, node)
-      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
+      result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
       msg = result.fail_msg
       if msg:
         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
@@ -6858,9 +6998,6 @@ class LUInstanceReinstall(LogicalUnit):
       "Cannot retrieve locked instance %s" % self.op.instance_name
     _CheckNodeOnline(self, instance.primary_node, "Instance primary node"
                      " offline, cannot reinstall")
-    for node in instance.secondary_nodes:
-      _CheckNodeOnline(self, node, "Instance secondary node offline,"
-                       " cannot reinstall")
 
     if instance.disk_template == constants.DT_DISKLESS:
       raise errors.OpPrereqError("Instance '%s' has no disks" %
@@ -6973,7 +7110,7 @@ class LUInstanceRecreateDisks(LogicalUnit):
     elif level == locking.LEVEL_NODE_RES:
       # Copy node locks
       self.needed_locks[locking.LEVEL_NODE_RES] = \
-        self.needed_locks[locking.LEVEL_NODE][:]
+        _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -7245,7 +7382,7 @@ class LUInstanceRemove(LogicalUnit):
     elif level == locking.LEVEL_NODE_RES:
       # Copy node locks
       self.needed_locks[locking.LEVEL_NODE_RES] = \
-        self.needed_locks[locking.LEVEL_NODE][:]
+        _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -7309,7 +7446,7 @@ def _RemoveInstance(lu, feedback_fn, instance, ignore_failures):
   """
   logging.info("Removing block devices for instance %s", instance.name)
 
-  if not _RemoveDisks(lu, instance):
+  if not _RemoveDisks(lu, instance, ignore_failures=ignore_failures):
     if not ignore_failures:
       raise errors.OpExecError("Can't remove instance's disks")
     feedback_fn("Warning: can't remove instance's disks")
@@ -7398,7 +7535,7 @@ class LUInstanceFailover(LogicalUnit):
     elif level == locking.LEVEL_NODE_RES:
       # Copy node locks
       self.needed_locks[locking.LEVEL_NODE_RES] = \
-        self.needed_locks[locking.LEVEL_NODE][:]
+        _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -7482,7 +7619,7 @@ class LUInstanceMigrate(LogicalUnit):
     elif level == locking.LEVEL_NODE_RES:
       # Copy node locks
       self.needed_locks[locking.LEVEL_NODE_RES] = \
-        self.needed_locks[locking.LEVEL_NODE][:]
+        _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -7541,7 +7678,7 @@ class LUInstanceMove(LogicalUnit):
     elif level == locking.LEVEL_NODE_RES:
       # Copy node locks
       self.needed_locks[locking.LEVEL_NODE_RES] = \
-        self.needed_locks[locking.LEVEL_NODE][:]
+        _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -7664,7 +7801,7 @@ class LUInstanceMove(LogicalUnit):
     # activate, get path, copy the data over
     for idx, disk in enumerate(instance.disks):
       self.LogInfo("Copying data for disk %d", idx)
-      result = self.rpc.call_blockdev_assemble(target_node, disk,
+      result = self.rpc.call_blockdev_assemble(target_node, (disk, instance),
                                                instance.name, True, idx)
       if result.fail_msg:
         self.LogWarning("Can't assemble newly created disk %d: %s",
@@ -7672,7 +7809,7 @@ class LUInstanceMove(LogicalUnit):
         errs.append(result.fail_msg)
         break
       dev_path = result.payload
-      result = self.rpc.call_blockdev_export(source_node, disk,
+      result = self.rpc.call_blockdev_export(source_node, (disk, instance),
                                              target_node, dev_path,
                                              cluster_name)
       if result.fail_msg:
@@ -7935,14 +8072,9 @@ class TLMigrateInstance(Tasklet):
     # check if failover must be forced instead of migration
     if (not self.cleanup and not self.failover and
         i_be[constants.BE_ALWAYS_FAILOVER]):
-      if self.fallback:
-        self.lu.LogInfo("Instance configured to always failover; fallback"
-                        " to failover")
-        self.failover = True
-      else:
-        raise errors.OpPrereqError("This instance has been configured to"
-                                   " always failover, please allow failover",
-                                   errors.ECODE_STATE)
+      self.lu.LogInfo("Instance configured to always failover; fallback"
+                      " to failover")
+      self.failover = True
 
     # check bridge existance
     _CheckInstanceBridgesExist(self.lu, instance, node=target_node)
@@ -8003,9 +8135,7 @@ class TLMigrateInstance(Tasklet):
     ial = IAllocator(self.cfg, self.rpc,
                      mode=constants.IALLOCATOR_MODE_RELOC,
                      name=self.instance_name,
-                     # TODO See why hail breaks with a single node below
-                     relocate_from=[self.instance.primary_node,
-                                    self.instance.primary_node],
+                     relocate_from=[self.instance.primary_node],
                      )
 
     ial.Run(self.lu.op.iallocator)
@@ -8037,7 +8167,8 @@ class TLMigrateInstance(Tasklet):
       all_done = True
       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
                                             self.nodes_ip,
-                                            self.instance.disks)
+                                            (self.instance.disks,
+                                             self.instance))
       min_percent = 100
       for node, nres in result.items():
         nres.Raise("Cannot resync disks on node %s" % node)
@@ -8083,7 +8214,7 @@ class TLMigrateInstance(Tasklet):
       msg = "single-master"
     self.feedback_fn("* changing disks into %s mode" % msg)
     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
-                                           self.instance.disks,
+                                           (self.instance.disks, self.instance),
                                            self.instance.name, multimaster)
     for node, nres in result.items():
       nres.Raise("Cannot change disks config on node %s" % node)
@@ -8235,7 +8366,7 @@ class TLMigrateInstance(Tasklet):
 
     self.feedback_fn("* checking disk consistency between source and target")
     for (idx, dev) in enumerate(instance.disks):
-      if not _CheckDiskConsistency(self.lu, dev, target_node, False):
+      if not _CheckDiskConsistency(self.lu, instance, dev, target_node, False):
         raise errors.OpExecError("Disk %s is degraded or not fully"
                                  " synchronized on target node,"
                                  " aborting migration" % idx)
@@ -8370,7 +8501,7 @@ class TLMigrateInstance(Tasklet):
       disks = _ExpandCheckDisks(instance, instance.disks)
       self.feedback_fn("* unmapping instance's disks from %s" % source_node)
       for disk in disks:
-        result = self.rpc.call_blockdev_shutdown(source_node, disk)
+        result = self.rpc.call_blockdev_shutdown(source_node, (disk, instance))
         msg = result.fail_msg
         if msg:
           logging.error("Migration was successful, but couldn't unmap the"
@@ -8398,7 +8529,8 @@ class TLMigrateInstance(Tasklet):
       self.feedback_fn("* checking disk consistency between source and target")
       for (idx, dev) in enumerate(instance.disks):
         # for drbd, these are drbd over lvm
-        if not _CheckDiskConsistency(self.lu, dev, target_node, False):
+        if not _CheckDiskConsistency(self.lu, instance, dev, target_node,
+                                     False):
           if primary_node.offline:
             self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
                              " target node %s" %
@@ -8488,8 +8620,20 @@ class TLMigrateInstance(Tasklet):
         return self._ExecMigration()
 
 
-def _CreateBlockDev(lu, node, instance, device, force_create,
-                    info, force_open):
+def _CreateBlockDev(lu, node, instance, device, force_create, info,
+                    force_open):
+  """Wrapper around L{_CreateBlockDevInner}.
+
+  This method annotates the root device first.
+
+  """
+  (disk,) = _AnnotateDiskParams(instance, [device], lu.cfg)
+  return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
+                              force_open)
+
+
+def _CreateBlockDevInner(lu, node, instance, device, force_create,
+                         info, force_open):
   """Create a tree of block devices on a given node.
 
   If this device type has to be created on secondaries, create it and
@@ -8497,6 +8641,8 @@ def _CreateBlockDev(lu, node, instance, device, force_create,
 
   If not, just recurse to children keeping the same 'force' value.
 
+  @attention: The device has to be annotated already.
+
   @param lu: the lu on whose behalf we execute
   @param node: the node on which to create the device
   @type instance: L{objects.Instance}
@@ -8521,8 +8667,8 @@ def _CreateBlockDev(lu, node, instance, device, force_create,
 
   if device.children:
     for child in device.children:
-      _CreateBlockDev(lu, node, instance, child, force_create,
-                      info, force_open)
+      _CreateBlockDevInner(lu, node, instance, child, force_create,
+                           info, force_open)
 
   if not force_create:
     return
@@ -8573,94 +8719,8 @@ def _GenerateUniqueNames(lu, exts):
   return results
 
 
-def _ComputeLDParams(disk_template, disk_params):
-  """Computes Logical Disk parameters from Disk Template parameters.
-
-  @type disk_template: string
-  @param disk_template: disk template, one of L{constants.DISK_TEMPLATES}
-  @type disk_params: dict
-  @param disk_params: disk template parameters; dict(template_name -> parameters
-  @rtype: list(dict)
-  @return: a list of dicts, one for each node of the disk hierarchy. Each dict
-    contains the LD parameters of the node. The tree is flattened in-order.
-
-  """
-  if disk_template not in constants.DISK_TEMPLATES:
-    raise errors.ProgrammerError("Unknown disk template %s" % disk_template)
-
-  result = list()
-  dt_params = disk_params[disk_template]
-  if disk_template == constants.DT_DRBD8:
-    drbd_params = {
-      constants.LDP_RESYNC_RATE: dt_params[constants.DRBD_RESYNC_RATE],
-      constants.LDP_BARRIERS: dt_params[constants.DRBD_DISK_BARRIERS],
-      constants.LDP_NO_META_FLUSH: dt_params[constants.DRBD_META_BARRIERS],
-      constants.LDP_DEFAULT_METAVG: dt_params[constants.DRBD_DEFAULT_METAVG],
-      constants.LDP_DISK_CUSTOM: dt_params[constants.DRBD_DISK_CUSTOM],
-      constants.LDP_NET_CUSTOM: dt_params[constants.DRBD_NET_CUSTOM],
-      constants.LDP_DYNAMIC_RESYNC: dt_params[constants.DRBD_DYNAMIC_RESYNC],
-      constants.LDP_PLAN_AHEAD: dt_params[constants.DRBD_PLAN_AHEAD],
-      constants.LDP_FILL_TARGET: dt_params[constants.DRBD_FILL_TARGET],
-      constants.LDP_DELAY_TARGET: dt_params[constants.DRBD_DELAY_TARGET],
-      constants.LDP_MAX_RATE: dt_params[constants.DRBD_MAX_RATE],
-      constants.LDP_MIN_RATE: dt_params[constants.DRBD_MIN_RATE],
-      }
-
-    drbd_params = \
-      objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_DRBD8],
-                       drbd_params)
-
-    result.append(drbd_params)
-
-    # data LV
-    data_params = {
-      constants.LDP_STRIPES: dt_params[constants.DRBD_DATA_STRIPES],
-      }
-    data_params = \
-      objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_LV],
-                       data_params)
-    result.append(data_params)
-
-    # metadata LV
-    meta_params = {
-      constants.LDP_STRIPES: dt_params[constants.DRBD_META_STRIPES],
-      }
-    meta_params = \
-      objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_LV],
-                       meta_params)
-    result.append(meta_params)
-
-  elif (disk_template == constants.DT_FILE or
-        disk_template == constants.DT_SHARED_FILE):
-    result.append(constants.DISK_LD_DEFAULTS[constants.LD_FILE])
-
-  elif disk_template == constants.DT_PLAIN:
-    params = {
-      constants.LDP_STRIPES: dt_params[constants.LV_STRIPES],
-      }
-    params = \
-      objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_LV],
-                       params)
-    result.append(params)
-
-  elif disk_template == constants.DT_BLOCK:
-    result.append(constants.DISK_LD_DEFAULTS[constants.LD_BLOCKDEV])
-
-  elif disk_template == constants.DT_RBD:
-    params = {
-      constants.LDP_POOL: dt_params[constants.RBD_POOL]
-      }
-    params = \
-      objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_RBD],
-                       params)
-    result.append(params)
-
-  return result
-
-
 def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
-                         iv_name, p_minor, s_minor, drbd_params, data_params,
-                         meta_params):
+                         iv_name, p_minor, s_minor):
   """Generate a drbd8 device complete with its children.
 
   """
@@ -8670,16 +8730,16 @@ def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
 
   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
                           logical_id=(vgnames[0], names[0]),
-                          params=data_params)
+                          params={})
   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE,
                           logical_id=(vgnames[1], names[1]),
-                          params=meta_params)
+                          params={})
   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
                           logical_id=(primary, secondary, port,
                                       p_minor, s_minor,
                                       shared_secret),
                           children=[dev_data, dev_meta],
-                          iv_name=iv_name, params=drbd_params)
+                          iv_name=iv_name, params={})
   return drbd_dev
 
 
@@ -8700,8 +8760,7 @@ _DISK_TEMPLATE_DEVICE_TYPE = {
 
 def _GenerateDiskTemplate(lu, template_name, instance_name, primary_node,
     secondary_nodes, disk_info, file_storage_dir, file_driver, base_index,
-    feedback_fn, disk_params,
-    _req_file_storage=opcodes.RequireFileStorage,
+    feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
     _req_shr_file_storage=opcodes.RequireSharedFileStorage):
   """Generate the entire disk layout for a given template type.
 
@@ -8711,18 +8770,20 @@ def _GenerateDiskTemplate(lu, template_name, instance_name, primary_node,
   vgname = lu.cfg.GetVGName()
   disk_count = len(disk_info)
   disks = []
-  ld_params = _ComputeLDParams(template_name, disk_params)
 
   if template_name == constants.DT_DISKLESS:
     pass
   elif template_name == constants.DT_DRBD8:
-    drbd_params, data_params, meta_params = ld_params
     if len(secondary_nodes) != 1:
       raise errors.ProgrammerError("Wrong template configuration")
     remote_node = secondary_nodes[0]
     minors = lu.cfg.AllocateDRBDMinor(
       [primary_node, remote_node] * len(disk_info), instance_name)
 
+    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
+                                                       full_disk_params)
+    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
+
     names = []
     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
                                                for i in range(disk_count)]):
@@ -8730,7 +8791,6 @@ def _GenerateDiskTemplate(lu, template_name, instance_name, primary_node,
       names.append(lv_prefix + "_meta")
     for idx, disk in enumerate(disk_info):
       disk_index = idx + base_index
-      drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
       data_vg = disk.get(constants.IDISK_VG, vgname)
       meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
@@ -8738,8 +8798,7 @@ def _GenerateDiskTemplate(lu, template_name, instance_name, primary_node,
                                       [data_vg, meta_vg],
                                       names[idx * 2:idx * 2 + 2],
                                       "disk/%d" % disk_index,
-                                      minors[idx * 2], minors[idx * 2 + 1],
-                                      drbd_params, data_params, meta_params)
+                                      minors[idx * 2], minors[idx * 2 + 1])
       disk_dev.mode = disk[constants.IDISK_MODE]
       disks.append(disk_dev)
   else:
@@ -8759,8 +8818,6 @@ def _GenerateDiskTemplate(lu, template_name, instance_name, primary_node,
                                         (name_prefix, base_index + i)
                                         for i in range(disk_count)])
 
-    dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
-
     if template_name == constants.DT_PLAIN:
       def logical_id_fn(idx, _, disk):
         vg = disk.get(constants.IDISK_VG, vgname)
@@ -8779,6 +8836,8 @@ def _GenerateDiskTemplate(lu, template_name, instance_name, primary_node,
     else:
       raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
 
+    dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
+
     for idx, disk in enumerate(disk_info):
       disk_index = idx + base_index
       size = disk[constants.IDISK_SIZE]
@@ -8788,7 +8847,7 @@ def _GenerateDiskTemplate(lu, template_name, instance_name, primary_node,
                                 logical_id=logical_id_fn(idx, disk_index, disk),
                                 iv_name="disk/%d" % disk_index,
                                 mode=disk[constants.IDISK_MODE],
-                                params=ld_params[0]))
+                                params={}))
 
   return disks
 
@@ -8829,7 +8888,10 @@ def _WipeDisks(lu, instance):
     lu.cfg.SetDiskID(device, node)
 
   logging.info("Pause sync of instance %s disks", instance.name)
-  result = lu.rpc.call_blockdev_pause_resume_sync(node, instance.disks, True)
+  result = lu.rpc.call_blockdev_pause_resume_sync(node,
+                                                  (instance.disks, instance),
+                                                  True)
+  result.Raise("Failed RPC to node %s for pausing the disk syncing" % node)
 
   for idx, success in enumerate(result.payload):
     if not success:
@@ -8859,7 +8921,8 @@ def _WipeDisks(lu, instance):
         wipe_size = min(wipe_chunk_size, size - offset)
         logging.debug("Wiping disk %d, offset %s, chunk %s",
                       idx, offset, wipe_size)
-        result = lu.rpc.call_blockdev_wipe(node, device, offset, wipe_size)
+        result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
+                                           wipe_size)
         result.Raise("Could not wipe disk %d at offset %d for size %d" %
                      (idx, offset, wipe_size))
         now = time.time()
@@ -8872,14 +8935,21 @@ def _WipeDisks(lu, instance):
   finally:
     logging.info("Resume sync of instance %s disks", instance.name)
 
-    result = lu.rpc.call_blockdev_pause_resume_sync(node, instance.disks, False)
+    result = lu.rpc.call_blockdev_pause_resume_sync(node,
+                                                    (instance.disks, instance),
+                                                    False)
 
-    for idx, success in enumerate(result.payload):
-      if not success:
-        lu.LogWarning("Resume sync of disk %d failed, please have a"
-                      " look at the status and troubleshoot the issue", idx)
-        logging.warn("resume-sync of instance %s for disks %d failed",
-                     instance.name, idx)
+    if result.fail_msg:
+      lu.LogWarning("RPC call to %s for resuming disk syncing failed,"
+                    " please have a look at the status and troubleshoot"
+                    " the issue: %s", node, result.fail_msg)
+    else:
+      for idx, success in enumerate(result.payload):
+        if not success:
+          lu.LogWarning("Resume sync of disk %d failed, please have a"
+                        " look at the status and troubleshoot the issue", idx)
+          logging.warn("resume-sync of instance %s for disks %d failed",
+                       instance.name, idx)
 
 
 def _CreateDisks(lu, instance, to_skip=None, target_node=None):
@@ -8926,7 +8996,7 @@ def _CreateDisks(lu, instance, to_skip=None, target_node=None):
       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
 
 
-def _RemoveDisks(lu, instance, target_node=None):
+def _RemoveDisks(lu, instance, target_node=None, ignore_failures=False):
   """Remove all disks for an instance.
 
   This abstracts away some work from `AddInstance()` and
@@ -8947,23 +9017,29 @@ def _RemoveDisks(lu, instance, target_node=None):
   logging.info("Removing block devices for instance %s", instance.name)
 
   all_result = True
-  for (idx, device) in enumerate(instance.disks):
+  ports_to_release = set()
+  anno_disks = _AnnotateDiskParams(instance, instance.disks, lu.cfg)
+  for (idx, device) in enumerate(anno_disks):
     if target_node:
       edata = [(target_node, device)]
     else:
       edata = device.ComputeNodeTree(instance.primary_node)
     for node, disk in edata:
       lu.cfg.SetDiskID(disk, node)
-      msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
-      if msg:
+      result = lu.rpc.call_blockdev_remove(node, disk)
+      if result.fail_msg:
         lu.LogWarning("Could not remove disk %s on node %s,"
-                      " continuing anyway: %s", idx, node, msg)
-        all_result = False
+                      " continuing anyway: %s", idx, node, result.fail_msg)
+        if not (result.offline and node != instance.primary_node):
+          all_result = False
 
     # if this is a DRBD disk, return its port to the pool
     if device.dev_type in constants.LDS_DRBD:
-      tcp_port = device.logical_id[2]
-      lu.cfg.AddTcpUdpPort(tcp_port)
+      ports_to_release.add(device.logical_id[2])
+
+  if all_result or ignore_failures:
+    for port in ports_to_release:
+      lu.cfg.AddTcpUdpPort(port)
 
   if instance.disk_template == constants.DT_FILE:
     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
@@ -9013,7 +9089,7 @@ def _ComputeDiskSizePerVG(disk_template, disks):
 
 
 def _ComputeDiskSize(disk_template, disks):
-  """Compute disk size requirements in the volume group
+  """Compute disk size requirements according to disk template
 
   """
   # Required free disk space as a function of disk and swap space
@@ -9023,10 +9099,10 @@ def _ComputeDiskSize(disk_template, disks):
     # 128 MB are added for drbd metadata for each disk
     constants.DT_DRBD8:
       sum(d[constants.IDISK_SIZE] + DRBD_META_SIZE for d in disks),
-    constants.DT_FILE: None,
-    constants.DT_SHARED_FILE: 0,
+    constants.DT_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
+    constants.DT_SHARED_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
     constants.DT_BLOCK: 0,
-    constants.DT_RBD: 0,
+    constants.DT_RBD: sum(d[constants.IDISK_SIZE] for d in disks),
   }
 
   if disk_template not in req_size_dict:
@@ -9344,6 +9420,7 @@ class LUInstanceCreate(LogicalUnit):
                      os=self.op.os_type,
                      vcpus=self.be_full[constants.BE_VCPUS],
                      memory=self.be_full[constants.BE_MAXMEM],
+                     spindle_use=self.be_full[constants.BE_SPINDLE_USE],
                      disks=self.disks,
                      nics=nics,
                      hypervisor=self.op.hypervisor,
@@ -9607,6 +9684,9 @@ class LUInstanceCreate(LogicalUnit):
     if self.op.mode == constants.INSTANCE_IMPORT:
       export_info = self._ReadExportInfo()
       self._ReadExportParams(export_info)
+      self._old_instance_name = export_info.get(constants.INISECT_INS, "name")
+    else:
+      self._old_instance_name = None
 
     if (not self.cfg.GetVGName() and
         self.op.disk_template not in constants.DTS_NOT_LVM):
@@ -9760,8 +9840,7 @@ class LUInstanceCreate(LogicalUnit):
 
       self.src_images = disk_images
 
-      old_name = export_info.get(constants.INISECT_INS, "name")
-      if self.op.instance_name == old_name:
+      if self.op.instance_name == self._old_instance_name:
         for idx, nic in enumerate(self.nics):
           if nic.mac == constants.VALUE_AUTO:
             nic_mac_ini = "nic%d_mac" % idx
@@ -9839,12 +9918,14 @@ class LUInstanceCreate(LogicalUnit):
     nodenames = [pnode.name] + self.secondaries
 
     # Verify instance specs
+    spindle_use = self.be_full.get(constants.BE_SPINDLE_USE, None)
     ispec = {
       constants.ISPEC_MEM_SIZE: self.be_full.get(constants.BE_MAXMEM, None),
       constants.ISPEC_CPU_COUNT: self.be_full.get(constants.BE_VCPUS, None),
       constants.ISPEC_DISK_COUNT: len(self.disks),
       constants.ISPEC_DISK_SIZE: [disk["size"] for disk in self.disks],
       constants.ISPEC_NIC_COUNT: len(self.nics),
+      constants.ISPEC_SPINDLE_USE: spindle_use,
       }
 
     group_info = self.cfg.GetNodeGroup(pnode.group)
@@ -9856,10 +9937,6 @@ class LUInstanceCreate(LogicalUnit):
                                                     utils.CommaJoin(res)),
                                   errors.ECODE_INVAL)
 
-    # disk parameters (not customizable at instance or node level)
-    # just use the primary node parameters, ignoring the secondary.
-    self.diskparams = group_info.diskparams
-
     if not self.adopt_disks:
       if self.op.disk_template == constants.DT_RBD:
         # _CheckRADOSFreeSpace() is just a placeholder.
@@ -9976,6 +10053,11 @@ class LUInstanceCreate(LogicalUnit):
     else:
       network_port = None
 
+    # This is ugly but we got a chicken-egg problem here
+    # We can only take the group disk parameters, as the instance
+    # has no disks yet (we are generating them right here).
+    node = self.cfg.GetNodeInfo(pnode_name)
+    nodegroup = self.cfg.GetNodeGroup(node.group)
     disks = _GenerateDiskTemplate(self,
                                   self.op.disk_template,
                                   instance, pnode_name,
@@ -9985,7 +10067,7 @@ class LUInstanceCreate(LogicalUnit):
                                   self.op.file_driver,
                                   0,
                                   feedback_fn,
-                                  self.diskparams)
+                                  self.cfg.GetGroupDiskParams(nodegroup))
 
     iobj = objects.Instance(name=instance, os=self.op.os_type,
                             primary_node=pnode_name,
@@ -10077,6 +10159,11 @@ class LUInstanceCreate(LogicalUnit):
     _ReleaseLocks(self, locking.LEVEL_NODE_RES)
 
     if iobj.disk_template != constants.DT_DISKLESS and not self.adopt_disks:
+      # we need to set the disks ID to the primary node, since the
+      # preceding code might or might have not done it, depending on
+      # disk template and other options
+      for disk in iobj.disks:
+        self.cfg.SetDiskID(disk, pnode_name)
       if self.op.mode == constants.INSTANCE_CREATE:
         if not self.op.no_install:
           pause_sync = (iobj.disk_template in constants.DTS_INT_MIRROR and
@@ -10084,7 +10171,8 @@ class LUInstanceCreate(LogicalUnit):
           if pause_sync:
             feedback_fn("* pausing disk sync to install instance OS")
             result = self.rpc.call_blockdev_pause_resume_sync(pnode_name,
-                                                              iobj.disks, True)
+                                                              (iobj.disks,
+                                                               iobj), True)
             for idx, success in enumerate(result.payload):
               if not success:
                 logging.warn("pause-sync of instance %s for disk %d failed",
@@ -10098,7 +10186,8 @@ class LUInstanceCreate(LogicalUnit):
           if pause_sync:
             feedback_fn("* resuming disk sync")
             result = self.rpc.call_blockdev_pause_resume_sync(pnode_name,
-                                                              iobj.disks, False)
+                                                              (iobj.disks,
+                                                               iobj), False)
             for idx, success in enumerate(result.payload):
               if not success:
                 logging.warn("resume-sync of instance %s for disk %d failed",
@@ -10107,67 +10196,73 @@ class LUInstanceCreate(LogicalUnit):
           os_add_result.Raise("Could not add os for instance %s"
                               " on node %s" % (instance, pnode_name))
 
-      elif self.op.mode == constants.INSTANCE_IMPORT:
-        feedback_fn("* running the instance OS import scripts...")
+      else:
+        if self.op.mode == constants.INSTANCE_IMPORT:
+          feedback_fn("* running the instance OS import scripts...")
+
+          transfers = []
+
+          for idx, image in enumerate(self.src_images):
+            if not image:
+              continue
+
+            # FIXME: pass debug option from opcode to backend
+            dt = masterd.instance.DiskTransfer("disk/%s" % idx,
+                                               constants.IEIO_FILE, (image, ),
+                                               constants.IEIO_SCRIPT,
+                                               (iobj.disks[idx], idx),
+                                               None)
+            transfers.append(dt)
+
+          import_result = \
+            masterd.instance.TransferInstanceData(self, feedback_fn,
+                                                  self.op.src_node, pnode_name,
+                                                  self.pnode.secondary_ip,
+                                                  iobj, transfers)
+          if not compat.all(import_result):
+            self.LogWarning("Some disks for instance %s on node %s were not"
+                            " imported successfully" % (instance, pnode_name))
+
+          rename_from = self._old_instance_name
+
+        elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
+          feedback_fn("* preparing remote import...")
+          # The source cluster will stop the instance before attempting to make
+          # a connection. In some cases stopping an instance can take a long
+          # time, hence the shutdown timeout is added to the connection
+          # timeout.
+          connect_timeout = (constants.RIE_CONNECT_TIMEOUT +
+                             self.op.source_shutdown_timeout)
+          timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
 
-        transfers = []
+          assert iobj.primary_node == self.pnode.name
+          disk_results = \
+            masterd.instance.RemoteImport(self, feedback_fn, iobj, self.pnode,
+                                          self.source_x509_ca,
+                                          self._cds, timeouts)
+          if not compat.all(disk_results):
+            # TODO: Should the instance still be started, even if some disks
+            # failed to import (valid for local imports, too)?
+            self.LogWarning("Some disks for instance %s on node %s were not"
+                            " imported successfully" % (instance, pnode_name))
 
-        for idx, image in enumerate(self.src_images):
-          if not image:
-            continue
+          rename_from = self.source_instance_name
 
-          # FIXME: pass debug option from opcode to backend
-          dt = masterd.instance.DiskTransfer("disk/%s" % idx,
-                                             constants.IEIO_FILE, (image, ),
-                                             constants.IEIO_SCRIPT,
-                                             (iobj.disks[idx], idx),
-                                             None)
-          transfers.append(dt)
-
-        import_result = \
-          masterd.instance.TransferInstanceData(self, feedback_fn,
-                                                self.op.src_node, pnode_name,
-                                                self.pnode.secondary_ip,
-                                                iobj, transfers)
-        if not compat.all(import_result):
-          self.LogWarning("Some disks for instance %s on node %s were not"
-                          " imported successfully" % (instance, pnode_name))
-
-      elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
-        feedback_fn("* preparing remote import...")
-        # The source cluster will stop the instance before attempting to make a
-        # connection. In some cases stopping an instance can take a long time,
-        # hence the shutdown timeout is added to the connection timeout.
-        connect_timeout = (constants.RIE_CONNECT_TIMEOUT +
-                           self.op.source_shutdown_timeout)
-        timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
-
-        assert iobj.primary_node == self.pnode.name
-        disk_results = \
-          masterd.instance.RemoteImport(self, feedback_fn, iobj, self.pnode,
-                                        self.source_x509_ca,
-                                        self._cds, timeouts)
-        if not compat.all(disk_results):
-          # TODO: Should the instance still be started, even if some disks
-          # failed to import (valid for local imports, too)?
-          self.LogWarning("Some disks for instance %s on node %s were not"
-                          " imported successfully" % (instance, pnode_name))
+        else:
+          # also checked in the prereq part
+          raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
+                                       % self.op.mode)
 
         # Run rename script on newly imported instance
         assert iobj.name == instance
         feedback_fn("Running rename script for %s" % instance)
         result = self.rpc.call_instance_run_rename(pnode_name, iobj,
-                                                   self.source_instance_name,
+                                                   rename_from,
                                                    self.op.debug_level)
         if result.fail_msg:
           self.LogWarning("Failed to run rename script for %s on node"
                           " %s: %s" % (instance, pnode_name, result.fail_msg))
 
-      else:
-        # also checked in the prereq part
-        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
-                                     % self.op.mode)
-
     assert not self.owned_locks(locking.LEVEL_NODE_RES)
 
     if self.op.start:
@@ -10486,7 +10581,7 @@ class TLReplaceDisks(Tasklet):
         self.lu.LogInfo("Checking disk/%d on %s", idx, node)
         self.cfg.SetDiskID(dev, node)
 
-        result = self.rpc.call_blockdev_find(node, dev)
+        result = _BlockdevFind(self, node, dev, instance)
 
         if result.offline:
           continue
@@ -10635,16 +10730,6 @@ class TLReplaceDisks(Tasklet):
       _CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
                               ignore=self.ignore_ipolicy)
 
-    # TODO: compute disk parameters
-    primary_node_info = self.cfg.GetNodeInfo(instance.primary_node)
-    secondary_node_info = self.cfg.GetNodeInfo(secondary_node)
-    if primary_node_info.group != secondary_node_info.group:
-      self.lu.LogInfo("The instance primary and secondary nodes are in two"
-                      " different node groups; the disk parameters of the"
-                      " primary node's group will be applied.")
-
-    self.diskparams = self.cfg.GetNodeGroup(primary_node_info.group).diskparams
-
     for node in check_nodes:
       _CheckNodeOnline(self.lu, node)
 
@@ -10761,7 +10846,7 @@ class TLReplaceDisks(Tasklet):
         self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
         self.cfg.SetDiskID(dev, node)
 
-        result = self.rpc.call_blockdev_find(node, dev)
+        result = _BlockdevFind(self, node, dev, self.instance)
 
         msg = result.fail_msg
         if msg or not result.payload:
@@ -10778,8 +10863,8 @@ class TLReplaceDisks(Tasklet):
       self.lu.LogInfo("Checking disk/%d consistency on node %s" %
                       (idx, node_name))
 
-      if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
-                                   ldisk=ldisk):
+      if not _CheckDiskConsistency(self.lu, self.instance, dev, node_name,
+                                   on_primary, ldisk=ldisk):
         raise errors.OpExecError("Node %s has degraded storage, unsafe to"
                                  " replace disks for instance %s" %
                                  (node_name, self.instance.name))
@@ -10793,7 +10878,8 @@ class TLReplaceDisks(Tasklet):
     """
     iv_names = {}
 
-    for idx, dev in enumerate(self.instance.disks):
+    disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
+    for idx, dev in enumerate(disks):
       if idx not in self.disks:
         continue
 
@@ -10804,14 +10890,15 @@ class TLReplaceDisks(Tasklet):
       lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
       names = _GenerateUniqueNames(self.lu, lv_names)
 
-      _, data_p, meta_p = _ComputeLDParams(constants.DT_DRBD8, self.diskparams)
-
-      vg_data = dev.children[0].logical_id[0]
+      (data_disk, meta_disk) = dev.children
+      vg_data = data_disk.logical_id[0]
       lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
-                             logical_id=(vg_data, names[0]), params=data_p)
-      vg_meta = dev.children[1].logical_id[0]
+                             logical_id=(vg_data, names[0]),
+                             params=data_disk.params)
+      vg_meta = meta_disk.logical_id[0]
       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE,
-                             logical_id=(vg_meta, names[1]), params=meta_p)
+                             logical_id=(vg_meta, names[1]),
+                             params=meta_disk.params)
 
       new_lvs = [lv_data, lv_meta]
       old_lvs = [child.Copy() for child in dev.children]
@@ -10819,8 +10906,8 @@ class TLReplaceDisks(Tasklet):
 
       # we pass force_create=True to force the LVM creation
       for new_lv in new_lvs:
-        _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
-                        _GetInstanceInfoText(self.instance), False)
+        _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
+                             _GetInstanceInfoText(self.instance), False)
 
     return iv_names
 
@@ -10828,7 +10915,7 @@ class TLReplaceDisks(Tasklet):
     for name, (dev, _, _) in iv_names.iteritems():
       self.cfg.SetDiskID(dev, node_name)
 
-      result = self.rpc.call_blockdev_find(node_name, dev)
+      result = _BlockdevFind(self, node_name, dev, self.instance)
 
       msg = result.fail_msg
       if msg or not result.payload:
@@ -10949,8 +11036,8 @@ class TLReplaceDisks(Tasklet):
 
       # Now that the new lvs have the old name, we can add them to the device
       self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
-      result = self.rpc.call_blockdev_addchildren(self.target_node, dev,
-                                                  new_lvs)
+      result = self.rpc.call_blockdev_addchildren(self.target_node,
+                                                  (dev, self.instance), new_lvs)
       msg = result.fail_msg
       if msg:
         for new_lv in new_lvs:
@@ -11028,13 +11115,14 @@ class TLReplaceDisks(Tasklet):
 
     # Step: create new storage
     self.lu.LogStep(3, steps_total, "Allocate new storage")
-    for idx, dev in enumerate(self.instance.disks):
+    disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
+    for idx, dev in enumerate(disks):
       self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
                       (self.new_node, idx))
       # we pass force_create=True to force LVM creation
       for new_lv in dev.children:
-        _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
-                        _GetInstanceInfoText(self.instance), False)
+        _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
+                             True, _GetInstanceInfoText(self.instance), False)
 
     # Step 4: dbrd minors and drbd setups changes
     # after this, we must manually remove the drbd minors on both the
@@ -11068,14 +11156,16 @@ class TLReplaceDisks(Tasklet):
       iv_names[idx] = (dev, dev.children, new_net_id)
       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
                     new_net_id)
-      drbd_params, _, _ = _ComputeLDParams(constants.DT_DRBD8, self.diskparams)
       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
                               logical_id=new_alone_id,
                               children=dev.children,
                               size=dev.size,
-                              params=drbd_params)
+                              params={})
+      (anno_new_drbd,) = _AnnotateDiskParams(self.instance, [new_drbd],
+                                             self.cfg)
       try:
-        _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
+        _CreateSingleBlockDev(self.lu, self.new_node, self.instance,
+                              anno_new_drbd,
                               _GetInstanceInfoText(self.instance), False)
       except errors.GenericError:
         self.cfg.ReleaseDRBDMinors(self.instance.name)
@@ -11085,7 +11175,8 @@ class TLReplaceDisks(Tasklet):
     for idx, dev in enumerate(self.instance.disks):
       self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
       self.cfg.SetDiskID(dev, self.target_node)
-      msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
+      msg = self.rpc.call_blockdev_shutdown(self.target_node,
+                                            (dev, self.instance)).fail_msg
       if msg:
         self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
                            "node: %s" % (idx, msg),
@@ -11121,7 +11212,7 @@ class TLReplaceDisks(Tasklet):
     result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
                                             self.new_node],
                                            self.node_secondary_ip,
-                                           self.instance.disks,
+                                           (self.instance.disks, self.instance),
                                            self.instance.name,
                                            False)
     for to_node, to_result in result.items():
@@ -11501,7 +11592,7 @@ class LUInstanceGrowDisk(LogicalUnit):
     elif level == locking.LEVEL_NODE_RES:
       # Copy node locks
       self.needed_locks[locking.LEVEL_NODE_RES] = \
-        self.needed_locks[locking.LEVEL_NODE][:]
+        _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -11512,6 +11603,7 @@ class LUInstanceGrowDisk(LogicalUnit):
     env = {
       "DISK": self.op.disk,
       "AMOUNT": self.op.amount,
+      "ABSOLUTE": self.op.absolute,
       }
     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
     return env
@@ -11544,13 +11636,30 @@ class LUInstanceGrowDisk(LogicalUnit):
 
     self.disk = instance.FindDisk(self.op.disk)
 
+    if self.op.absolute:
+      self.target = self.op.amount
+      self.delta = self.target - self.disk.size
+      if self.delta < 0:
+        raise errors.OpPrereqError("Requested size (%s) is smaller than "
+                                   "current disk size (%s)" %
+                                   (utils.FormatUnit(self.target, "h"),
+                                    utils.FormatUnit(self.disk.size, "h")),
+                                   errors.ECODE_STATE)
+    else:
+      self.delta = self.op.amount
+      self.target = self.disk.size + self.delta
+      if self.delta < 0:
+        raise errors.OpPrereqError("Requested increment (%s) is negative" %
+                                   utils.FormatUnit(self.delta, "h"),
+                                   errors.ECODE_INVAL)
+
     if instance.disk_template not in (constants.DT_FILE,
                                       constants.DT_SHARED_FILE,
                                       constants.DT_RBD):
       # TODO: check the free disk space for file, when that feature will be
       # supported
       _CheckNodesFreeDiskPerVG(self, nodenames,
-                               self.disk.ComputeGrowth(self.op.amount))
+                               self.disk.ComputeGrowth(self.delta))
 
   def Exec(self, feedback_fn):
     """Execute disk grow.
@@ -11567,21 +11676,24 @@ class LUInstanceGrowDisk(LogicalUnit):
     if not disks_ok:
       raise errors.OpExecError("Cannot activate block device to grow")
 
-    feedback_fn("Growing disk %s of instance '%s' by %s" %
+    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
                 (self.op.disk, instance.name,
-                 utils.FormatUnit(self.op.amount, "h")))
+                 utils.FormatUnit(self.delta, "h"),
+                 utils.FormatUnit(self.target, "h")))
 
     # First run all grow ops in dry-run mode
     for node in instance.all_nodes:
       self.cfg.SetDiskID(disk, node)
-      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, True)
+      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
+                                           True)
       result.Raise("Grow request failed to node %s" % node)
 
     # We know that (as far as we can test) operations across different
     # nodes will succeed, time to run it for real
     for node in instance.all_nodes:
       self.cfg.SetDiskID(disk, node)
-      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, False)
+      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
+                                           False)
       result.Raise("Grow request failed to node %s" % node)
 
       # TODO: Rewrite code to work properly
@@ -11591,7 +11703,7 @@ class LUInstanceGrowDisk(LogicalUnit):
       # time is a work-around.
       time.sleep(5)
 
-    disk.RecordGrow(self.op.amount)
+    disk.RecordGrow(self.delta)
     self.cfg.Update(instance, feedback_fn)
 
     # Changes have been recorded, release node lock
@@ -11645,12 +11757,25 @@ class LUInstanceQueryData(NoHooksLU):
       else:
         self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
 
+      self.needed_locks[locking.LEVEL_NODEGROUP] = []
       self.needed_locks[locking.LEVEL_NODE] = []
       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
 
   def DeclareLocks(self, level):
-    if self.op.use_locking and level == locking.LEVEL_NODE:
-      self._LockInstancesNodes()
+    if self.op.use_locking:
+      if level == locking.LEVEL_NODEGROUP:
+        owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
+
+        # Lock all groups used by instances optimistically; this requires going
+        # via the node before it's locked, requiring verification later on
+        self.needed_locks[locking.LEVEL_NODEGROUP] = \
+          frozenset(group_uuid
+                    for instance_name in owned_instances
+                    for group_uuid in
+                      self.cfg.GetInstanceNodeGroups(instance_name))
+
+      elif level == locking.LEVEL_NODE:
+        self._LockInstancesNodes()
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -11658,14 +11783,25 @@ class LUInstanceQueryData(NoHooksLU):
     This only checks the optional instance list against the existing names.
 
     """
+    owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
+    owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
+    owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
+
     if self.wanted_names is None:
       assert self.op.use_locking, "Locking was not used"
-      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
+      self.wanted_names = owned_instances
 
-    self.wanted_instances = \
-        map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
+    instances = dict(self.cfg.GetMultiInstanceInfo(self.wanted_names))
+
+    if self.op.use_locking:
+      _CheckInstancesNodeGroups(self.cfg, instances, owned_groups, owned_nodes,
+                                None)
+    else:
+      assert not (owned_instances or owned_groups or owned_nodes)
+
+    self.wanted_instances = instances.values()
 
-  def _ComputeBlockdevStatus(self, node, instance_name, dev):
+  def _ComputeBlockdevStatus(self, node, instance, dev):
     """Returns the status of a block device
 
     """
@@ -11678,7 +11814,7 @@ class LUInstanceQueryData(NoHooksLU):
     if result.offline:
       return None
 
-    result.Raise("Can't compute disk status for %s" % instance_name)
+    result.Raise("Can't compute disk status for %s" % instance.name)
 
     status = result.payload
     if status is None:
@@ -11692,6 +11828,16 @@ class LUInstanceQueryData(NoHooksLU):
     """Compute block device status.
 
     """
+    (anno_dev,) = _AnnotateDiskParams(instance, [dev], self.cfg)
+
+    return self._ComputeDiskStatusInner(instance, snode, anno_dev)
+
+  def _ComputeDiskStatusInner(self, instance, snode, dev):
+    """Compute block device status.
+
+    @attention: The device has to be annotated already.
+
+    """
     if dev.dev_type in constants.LDS_DRBD:
       # we change the snode then (otherwise we use the one passed in)
       if dev.logical_id[0] == instance.primary_node:
@@ -11700,11 +11846,11 @@ class LUInstanceQueryData(NoHooksLU):
         snode = dev.logical_id[0]
 
     dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
-                                              instance.name, dev)
-    dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
+                                              instance, dev)
+    dev_sstatus = self._ComputeBlockdevStatus(snode, instance, dev)
 
     if dev.children:
-      dev_children = map(compat.partial(self._ComputeDiskStatus,
+      dev_children = map(compat.partial(self._ComputeDiskStatusInner,
                                         instance, snode),
                          dev.children)
     else:
@@ -11728,9 +11874,17 @@ class LUInstanceQueryData(NoHooksLU):
 
     cluster = self.cfg.GetClusterInfo()
 
-    pri_nodes = self.cfg.GetMultiNodeInfo(i.primary_node
-                                          for i in self.wanted_instances)
-    for instance, (_, pnode) in zip(self.wanted_instances, pri_nodes):
+    node_names = itertools.chain(*(i.all_nodes for i in self.wanted_instances))
+    nodes = dict(self.cfg.GetMultiNodeInfo(node_names))
+
+    groups = dict(self.cfg.GetMultiNodeGroupInfo(node.group
+                                                 for node in nodes.values()))
+
+    group2name_fn = lambda uuid: groups[uuid].name
+
+    for instance in self.wanted_instances:
+      pnode = nodes[instance.primary_node]
+
       if self.op.static or pnode.offline:
         remote_state = None
         if pnode.offline:
@@ -11754,12 +11908,19 @@ class LUInstanceQueryData(NoHooksLU):
       disks = map(compat.partial(self._ComputeDiskStatus, instance, None),
                   instance.disks)
 
+      snodes_group_uuids = [nodes[snode_name].group
+                            for snode_name in instance.secondary_nodes]
+
       result[instance.name] = {
         "name": instance.name,
         "config_state": instance.admin_state,
         "run_state": remote_state,
         "pnode": instance.primary_node,
+        "pnode_group_uuid": pnode.group,
+        "pnode_group_name": group2name_fn(pnode.group),
         "snodes": instance.secondary_nodes,
+        "snodes_group_uuids": snodes_group_uuids,
+        "snodes_group_names": map(group2name_fn, snodes_group_uuids),
         "os": instance.os,
         # this happens to be the same format used for hooks
         "nics": _NICListToTuple(self, instance.nics),
@@ -12097,7 +12258,7 @@ class LUInstanceSetParams(LogicalUnit):
     elif level == locking.LEVEL_NODE_RES and self.op.disk_template:
       # Copy node locks
       self.needed_locks[locking.LEVEL_NODE_RES] = \
-        self.needed_locks[locking.LEVEL_NODE][:]
+        _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -12194,8 +12355,6 @@ class LUInstanceSetParams(LogicalUnit):
     private.params = new_params
     private.filled = new_filled_params
 
-    return (None, None)
-
   def CheckPrereq(self):
     """Check prerequisites.
 
@@ -12211,7 +12370,7 @@ class LUInstanceSetParams(LogicalUnit):
     pnode = instance.primary_node
     nodelist = list(instance.all_nodes)
     pnode_info = self.cfg.GetNodeInfo(pnode)
-    self.diskparams = self.cfg.GetNodeGroup(pnode_info.group).diskparams
+    self.diskparams = self.cfg.GetInstanceDiskParams(instance)
 
     # Prepare disk/NIC modifications
     self.diskmod = PrepareContainerMods(self.op.disks, None)
@@ -12298,7 +12457,7 @@ class LUInstanceSetParams(LogicalUnit):
       self.be_proposed = cluster.SimpleFillBE(instance.beparams)
     be_old = cluster.FillBE(instance)
 
-    # CPU param validation -- checking every time a paramtere is
+    # CPU param validation -- checking every time a parameter is
     # changed to cover all cases where either CPU mask or vcpus have
     # changed
     if (constants.BE_VCPUS in self.be_proposed and
@@ -12430,12 +12589,13 @@ class LUInstanceSetParams(LogicalUnit):
                                  errors.ECODE_INVAL)
 
     def _PrepareNicCreate(_, params, private):
-      return self._PrepareNicModification(params, private, None, {},
-                                          cluster, pnode)
+      self._PrepareNicModification(params, private, None, {}, cluster, pnode)
+      return (None, None)
 
     def _PrepareNicMod(_, nic, params, private):
-      return self._PrepareNicModification(params, private, nic.ip,
-                                          nic.nicparams, cluster, pnode)
+      self._PrepareNicModification(params, private, nic.ip,
+                                   nic.nicparams, cluster, pnode)
+      return None
 
     # Verify NIC changes (operating on copy)
     nics = instance.nics[:]
@@ -12491,10 +12651,12 @@ class LUInstanceSetParams(LogicalUnit):
                                       instance.name, pnode, [snode],
                                       disk_info, None, None, 0, feedback_fn,
                                       self.diskparams)
+    anno_disks = rpc.AnnotateDiskParams(constants.DT_DRBD8, new_disks,
+                                        self.diskparams)
     info = _GetInstanceInfoText(instance)
-    feedback_fn("Creating aditional volumes...")
+    feedback_fn("Creating additional volumes...")
     # first, create the missing data and meta devices
-    for disk in new_disks:
+    for disk in anno_disks:
       # unfortunately this is... not too nice
       _CreateSingleBlockDev(self, pnode, instance, disk.children[1],
                             info, True)
@@ -12510,7 +12672,7 @@ class LUInstanceSetParams(LogicalUnit):
 
     feedback_fn("Initializing DRBD devices...")
     # all child devices are in place, we can now create the DRBD devices
-    for disk in new_disks:
+    for disk in anno_disks:
       for node in [pnode, snode]:
         f_create = node == pnode
         _CreateSingleBlockDev(self, node, instance, disk, info, f_create)
@@ -12545,14 +12707,20 @@ class LUInstanceSetParams(LogicalUnit):
     snode = instance.secondary_nodes[0]
     feedback_fn("Converting template to plain")
 
-    old_disks = instance.disks
-    new_disks = [d.children[0] for d in old_disks]
+    old_disks = _AnnotateDiskParams(instance, instance.disks, self.cfg)
+    new_disks = [d.children[0] for d in instance.disks]
 
     # copy over size and mode
     for parent, child in zip(old_disks, new_disks):
       child.size = parent.size
       child.mode = parent.mode
 
+    # this is a DRBD disk, return its port to the pool
+    # NOTE: this must be done right before the call to cfg.Update!
+    for disk in old_disks:
+      tcp_port = disk.logical_id[2]
+      self.cfg.AddTcpUdpPort(tcp_port)
+
     # update instance structure
     instance.disks = new_disks
     instance.disk_template = constants.DT_PLAIN
@@ -12578,13 +12746,6 @@ class LUInstanceSetParams(LogicalUnit):
         self.LogWarning("Could not remove metadata for disk %d on node %s,"
                         " continuing anyway: %s", idx, pnode, msg)
 
-    # this is a DRBD disk, return its port to the pool
-    for disk in old_disks:
-      tcp_port = disk.logical_id[2]
-      self.cfg.AddTcpUdpPort(tcp_port)
-
-    # Node resource locks will be released by caller
-
   def _CreateNewDisk(self, idx, params, _):
     """Creates a new disk.
 
@@ -12637,7 +12798,8 @@ class LUInstanceSetParams(LogicalUnit):
     """Removes a disk.
 
     """
-    for node, disk in root.ComputeNodeTree(self.instance.primary_node):
+    (anno_disk,) = _AnnotateDiskParams(self.instance, [root], self.cfg)
+    for node, disk in anno_disk.ComputeNodeTree(self.instance.primary_node):
       self.cfg.SetDiskID(disk, node)
       msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
       if msg:
@@ -12879,7 +13041,7 @@ class LUInstanceChangeGroup(LogicalUnit):
 
     if self.req_target_uuids:
       # User requested specific target groups
-      self.target_uuids = self.req_target_uuids
+      self.target_uuids = frozenset(self.req_target_uuids)
     else:
       # All groups except those used by the instance are potential targets
       self.target_uuids = owned_groups - inst_groups
@@ -12948,32 +13110,74 @@ class LUBackupQuery(NoHooksLU):
   """
   REQ_BGL = False
 
+  def CheckArguments(self):
+    self.expq = _ExportQuery(qlang.MakeSimpleFilter("node", self.op.nodes),
+                             ["node", "export"], self.op.use_locking)
+
   def ExpandNames(self):
-    self.needed_locks = {}
-    self.share_locks[locking.LEVEL_NODE] = 1
-    if not self.op.nodes:
-      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
-    else:
-      self.needed_locks[locking.LEVEL_NODE] = \
-        _GetWantedNodes(self, self.op.nodes)
+    self.expq.ExpandNames(self)
+
+  def DeclareLocks(self, level):
+    self.expq.DeclareLocks(self, level)
 
   def Exec(self, feedback_fn):
-    """Compute the list of all the exported system images.
+    result = {}
 
-    @rtype: dict
-    @return: a dictionary with the structure node->(export-list)
-        where export-list is a list of the instances exported on
-        that node.
+    for (node, expname) in self.expq.OldStyleQuery(self):
+      if expname is None:
+        result[node] = False
+      else:
+        result.setdefault(node, []).append(expname)
+
+    return result
+
+
+class _ExportQuery(_QueryBase):
+  FIELDS = query.EXPORT_FIELDS
+
+  #: The node name is not a unique key for this query
+  SORT_FIELD = "node"
+
+  def ExpandNames(self, lu):
+    lu.needed_locks = {}
+
+    # The following variables interact with _QueryBase._GetNames
+    if self.names:
+      self.wanted = _GetWantedNodes(lu, self.names)
+    else:
+      self.wanted = locking.ALL_SET
+
+    self.do_locking = self.use_locking
+
+    if self.do_locking:
+      lu.share_locks = _ShareAll()
+      lu.needed_locks = {
+        locking.LEVEL_NODE: self.wanted,
+        }
+
+  def DeclareLocks(self, lu, level):
+    pass
+
+  def _GetQueryData(self, lu):
+    """Computes the list of nodes and their attributes.
 
     """
-    self.nodes = self.owned_locks(locking.LEVEL_NODE)
-    rpcresult = self.rpc.call_export_list(self.nodes)
-    result = {}
-    for node in rpcresult:
-      if rpcresult[node].fail_msg:
-        result[node] = False
+    # Locking is not used
+    # TODO
+    assert not (compat.any(lu.glm.is_owned(level)
+                           for level in locking.LEVELS
+                           if level != locking.LEVEL_CLUSTER) or
+                self.do_locking or self.use_locking)
+
+    nodes = self._GetNames(lu, lu.cfg.GetNodeList(), locking.LEVEL_NODE)
+
+    result = []
+
+    for (node, nres) in lu.rpc.call_export_list(nodes).items():
+      if nres.fail_msg:
+        result.append((node, None))
       else:
-        result[node] = rpcresult[node].payload
+        result.extend((node, expname) for expname in nres.payload)
 
     return result
 
@@ -13421,17 +13625,23 @@ class LUGroupAdd(LogicalUnit):
 
     if self.op.diskparams:
       for templ in constants.DISK_TEMPLATES:
-        if templ not in self.op.diskparams:
-          self.op.diskparams[templ] = {}
-        utils.ForceDictType(self.op.diskparams[templ], constants.DISK_DT_TYPES)
+        if templ in self.op.diskparams:
+          utils.ForceDictType(self.op.diskparams[templ],
+                              constants.DISK_DT_TYPES)
+      self.new_diskparams = self.op.diskparams
+      try:
+        utils.VerifyDictOptions(self.new_diskparams, constants.DISK_DT_DEFAULTS)
+      except errors.OpPrereqError, err:
+        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
+                                   errors.ECODE_INVAL)
     else:
-      self.op.diskparams = self.cfg.GetClusterInfo().diskparams
+      self.new_diskparams = {}
 
     if self.op.ipolicy:
       cluster = self.cfg.GetClusterInfo()
       full_ipolicy = cluster.SimpleFillIPolicy(self.op.ipolicy)
       try:
-        objects.InstancePolicy.CheckParameterSyntax(full_ipolicy)
+        objects.InstancePolicy.CheckParameterSyntax(full_ipolicy, False)
       except errors.ConfigurationError, err:
         raise errors.OpPrereqError("Invalid instance policy: %s" % err,
                                    errors.ECODE_INVAL)
@@ -13459,7 +13669,7 @@ class LUGroupAdd(LogicalUnit):
                                   uuid=self.group_uuid,
                                   alloc_policy=self.op.alloc_policy,
                                   ndparams=self.op.ndparams,
-                                  diskparams=self.op.diskparams,
+                                  diskparams=self.new_diskparams,
                                   ipolicy=self.op.ipolicy,
                                   hv_state_static=self.new_hv_state,
                                   disk_state_static=self.new_disk_state)
@@ -13677,7 +13887,8 @@ class _GroupQuery(_QueryBase):
     return query.GroupQueryData(self._cluster,
                                 [self._all_groups[uuid]
                                  for uuid in self.wanted],
-                                group_to_nodes, group_to_instances)
+                                group_to_nodes, group_to_instances,
+                                query.GQ_DISKPARAMS in self.requested_data)
 
 
 class LUGroupQuery(NoHooksLU):
@@ -13742,6 +13953,15 @@ class LUGroupSetParams(LogicalUnit):
       self.needed_locks[locking.LEVEL_INSTANCE] = \
           self.cfg.GetNodeGroupInstances(self.group_uuid)
 
+  @staticmethod
+  def _UpdateAndVerifyDiskParams(old, new):
+    """Updates and verifies disk parameters.
+
+    """
+    new_params = _GetUpdatedParams(old, new)
+    utils.ForceDictType(new_params, constants.DISK_DT_TYPES)
+    return new_params
+
   def CheckPrereq(self):
     """Check prerequisites.
 
@@ -13760,18 +13980,26 @@ class LUGroupSetParams(LogicalUnit):
 
     if self.op.ndparams:
       new_ndparams = _GetUpdatedParams(self.group.ndparams, self.op.ndparams)
-      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
+      utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
       self.new_ndparams = new_ndparams
 
     if self.op.diskparams:
-      self.new_diskparams = dict()
-      for templ in constants.DISK_TEMPLATES:
-        if templ not in self.op.diskparams:
-          self.op.diskparams[templ] = {}
-        new_templ_params = _GetUpdatedParams(self.group.diskparams[templ],
-                                             self.op.diskparams[templ])
-        utils.ForceDictType(new_templ_params, constants.DISK_DT_TYPES)
-        self.new_diskparams[templ] = new_templ_params
+      diskparams = self.group.diskparams
+      uavdp = self._UpdateAndVerifyDiskParams
+      # For each disktemplate subdict update and verify the values
+      new_diskparams = dict((dt,
+                             uavdp(diskparams.get(dt, {}),
+                                   self.op.diskparams[dt]))
+                            for dt in constants.DISK_TEMPLATES
+                            if dt in self.op.diskparams)
+      # As we've all subdicts of diskparams ready, lets merge the actual
+      # dict with all updated subdicts
+      self.new_diskparams = objects.FillDict(diskparams, new_diskparams)
+      try:
+        utils.VerifyDictOptions(self.new_diskparams, constants.DISK_DT_DEFAULTS)
+      except errors.OpPrereqError, err:
+        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
+                                   errors.ECODE_INVAL)
 
     if self.op.hv_state:
       self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state,
@@ -14070,16 +14298,8 @@ class LUGroupEvacuate(LogicalUnit):
     self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances))
 
     # Check if node groups for locked instances are still correct
-    for instance_name in owned_instances:
-      inst = self.instances[instance_name]
-      assert owned_nodes.issuperset(inst.all_nodes), \
-        "Instance %s's nodes changed while we kept the lock" % instance_name
-
-      inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name,
-                                             owned_groups)
-
-      assert self.group_uuid in inst_groups, \
-        "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
+    _CheckInstancesNodeGroups(self.cfg, self.instances,
+                              owned_groups, owned_nodes, self.group_uuid)
 
     if self.req_target_uuids:
       # User requested specific target groups
@@ -14147,14 +14367,25 @@ class TagsLU(NoHooksLU): # pylint: disable=W0223
   def ExpandNames(self):
     self.group_uuid = None
     self.needed_locks = {}
+
     if self.op.kind == constants.TAG_NODE:
       self.op.name = _ExpandNodeName(self.cfg, self.op.name)
-      self.needed_locks[locking.LEVEL_NODE] = self.op.name
+      lock_level = locking.LEVEL_NODE
+      lock_name = self.op.name
     elif self.op.kind == constants.TAG_INSTANCE:
       self.op.name = _ExpandInstanceName(self.cfg, self.op.name)
-      self.needed_locks[locking.LEVEL_INSTANCE] = self.op.name
+      lock_level = locking.LEVEL_INSTANCE
+      lock_name = self.op.name
     elif self.op.kind == constants.TAG_NODEGROUP:
       self.group_uuid = self.cfg.LookupNodeGroup(self.op.name)
+      lock_level = locking.LEVEL_NODEGROUP
+      lock_name = self.group_uuid
+    else:
+      lock_level = None
+      lock_name = None
+
+    if lock_level and getattr(self.op, "use_locking", True):
+      self.needed_locks[lock_level] = lock_name
 
     # FIXME: Acquire BGL for cluster tag operations (as of this writing it's
     # not possible to acquire the BGL based on opcode parameters)
@@ -14505,7 +14736,7 @@ class IAllocator(object):
     self.in_text = self.out_text = self.in_data = self.out_data = None
     # init all input fields so that pylint is happy
     self.mode = mode
-    self.memory = self.disks = self.disk_template = None
+    self.memory = self.disks = self.disk_template = self.spindle_use = None
     self.os = self.tags = self.nics = self.vcpus = None
     self.hypervisor = None
     self.relocate_from = None
@@ -14711,6 +14942,7 @@ class IAllocator(object):
         "admin_state": iinfo.admin_state,
         "vcpus": beinfo[constants.BE_VCPUS],
         "memory": beinfo[constants.BE_MAXMEM],
+        "spindle_use": beinfo[constants.BE_SPINDLE_USE],
         "os": iinfo.os,
         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
         "nics": nic_data,
@@ -14750,6 +14982,7 @@ class IAllocator(object):
       "os": self.os,
       "vcpus": self.vcpus,
       "memory": self.memory,
+      "spindle_use": self.spindle_use,
       "disks": self.disks,
       "disk_space_total": disk_space,
       "nics": self.nics,
@@ -14863,6 +15096,7 @@ class IAllocator(object):
        [
         ("name", ht.TString),
         ("memory", ht.TInt),
+        ("spindle_use", ht.TInt),
         ("disks", ht.TListOf(ht.TDict)),
         ("disk_template", ht.TString),
         ("os", ht.TString),
@@ -15068,6 +15302,7 @@ class LUTestAllocator(NoHooksLU):
                        nics=self.op.nics,
                        vcpus=self.op.vcpus,
                        hypervisor=self.op.hypervisor,
+                       spindle_use=self.op.spindle_use,
                        )
     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
       ial = IAllocator(self.cfg, self.rpc,
@@ -15099,10 +15334,12 @@ class LUTestAllocator(NoHooksLU):
 
 #: Query type implementations
 _QUERY_IMPL = {
+  constants.QR_CLUSTER: _ClusterQuery,
   constants.QR_INSTANCE: _InstanceQuery,
   constants.QR_NODE: _NodeQuery,
   constants.QR_GROUP: _GroupQuery,
   constants.QR_OS: _OsQuery,
+  constants.QR_EXPORT: _ExportQuery,
   }
 
 assert set(_QUERY_IMPL.keys()) == constants.QR_VIA_OP