cmdlib: Remove all diskparams calculations not required anymore
[ganeti-local] / lib / cmdlib.py
index e68974b..b672fdd 100644 (file)
@@ -32,7 +32,6 @@ import os
 import os.path
 import time
 import re
-import platform
 import logging
 import copy
 import OpenSSL
@@ -60,6 +59,7 @@ from ganeti import qlang
 from ganeti import opcodes
 from ganeti import ht
 from ganeti import rpc
+from ganeti import runtime
 
 import ganeti.masterd.instance # pylint: disable=W0611
 
@@ -82,7 +82,7 @@ class ResultWithJobs:
   """Data container for LU results with jobs.
 
   Instances of this class returned from L{LogicalUnit.Exec} will be recognized
-  by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
+  by L{mcpu._ProcessResult}. The latter will then submit the jobs
   contained in the C{jobs} attribute and include the job IDs in the opcode
   result.
 
@@ -493,6 +493,9 @@ class _QueryBase:
   #: Attribute holding field definitions
   FIELDS = None
 
+  #: Field to sort by
+  SORT_FIELD = "name"
+
   def __init__(self, qfilter, fields, use_locking):
     """Initializes this class.
 
@@ -500,7 +503,7 @@ class _QueryBase:
     self.use_locking = use_locking
 
     self.query = query.Query(self.FIELDS, fields, qfilter=qfilter,
-                             namefield="name")
+                             namefield=self.SORT_FIELD)
     self.requested_data = self.query.RequestedData()
     self.names = self.query.RequestedNames()
 
@@ -596,6 +599,32 @@ def _MakeLegacyNodeInfo(data):
     })
 
 
+def _CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_nodes,
+                              cur_group_uuid):
+  """Checks if node groups for locked instances are still correct.
+
+  @type cfg: L{config.ConfigWriter}
+  @param cfg: Cluster configuration
+  @type instances: dict; string as key, L{objects.Instance} as value
+  @param instances: Dictionary, instance name as key, instance object as value
+  @type owned_groups: iterable of string
+  @param owned_groups: List of owned groups
+  @type owned_nodes: iterable of string
+  @param owned_nodes: List of owned nodes
+  @type cur_group_uuid: string or None
+  @param cur_group_uuid: Optional group UUID to check against instance's groups
+
+  """
+  for (name, inst) in instances.items():
+    assert owned_nodes.issuperset(inst.all_nodes), \
+      "Instance %s's nodes changed while we kept the lock" % name
+
+    inst_groups = _CheckInstanceNodeGroups(cfg, name, owned_groups)
+
+    assert cur_group_uuid is None or cur_group_uuid in inst_groups, \
+      "Instance %s has no node in group %s" % (name, cur_group_uuid)
+
+
 def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups):
   """Checks if the owned node groups are still correct for an instance.
 
@@ -1885,7 +1914,7 @@ class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
   """Verifies the cluster config.
 
   """
-  REQ_BGL = True
+  REQ_BGL = False
 
   def _VerifyHVP(self, hvp_data):
     """Verifies locally the syntax of the hypervisor parameters.
@@ -1902,13 +1931,17 @@ class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
         self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
 
   def ExpandNames(self):
-    # Information can be safely retrieved as the BGL is acquired in exclusive
-    # mode
-    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER)
+    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
+    self.share_locks = _ShareAll()
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    """
+    # Retrieve all information
     self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
     self.all_node_info = self.cfg.GetAllNodesInfo()
     self.all_inst_info = self.cfg.GetAllInstancesInfo()
-    self.needed_locks = {}
 
   def Exec(self, feedback_fn):
     """Verify integrity of cluster, performing various test on nodes.
@@ -3499,15 +3532,8 @@ class LUGroupVerifyDisks(NoHooksLU):
     self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances))
 
     # Check if node groups for locked instances are still correct
-    for (instance_name, inst) in self.instances.items():
-      assert owned_nodes.issuperset(inst.all_nodes), \
-        "Instance %s's nodes changed while we kept the lock" % instance_name
-
-      inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name,
-                                             owned_groups)
-
-      assert self.group_uuid in inst_groups, \
-        "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
+    _CheckInstancesNodeGroups(self.cfg, self.instances,
+                              owned_groups, owned_nodes, self.group_uuid)
 
   def Exec(self, feedback_fn):
     """Verify integrity of cluster disks.
@@ -4474,7 +4500,7 @@ def _WaitForSync(lu, instance, disks=None, oneshot=False):
   return not cumul_degraded
 
 
-def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
+def _CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
   """Check that mirrors are not degraded.
 
   The ldisk parameter, if True, will change the test from the
@@ -4503,7 +4529,8 @@ def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
 
   if dev.children:
     for child in dev.children:
-      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
+      result = result and _CheckDiskConsistency(lu, instance, child, node,
+                                                on_primary)
 
   return result
 
@@ -4512,7 +4539,7 @@ class LUOobCommand(NoHooksLU):
   """Logical unit for OOB handling.
 
   """
-  REG_BGL = False
+  REQ_BGL = False
   _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
 
   def ExpandNames(self):
@@ -5589,6 +5616,19 @@ class LUNodeAdd(LogicalUnit):
     if self.op.disk_state:
       self.new_disk_state = _MergeAndVerifyDiskState(self.op.disk_state, None)
 
+    # TODO: If we need to have multiple DnsOnlyRunner we probably should make
+    #       it a property on the base class.
+    result = rpc.DnsOnlyRunner().call_version([node])[node]
+    result.Raise("Can't get version information from node %s" % node)
+    if constants.PROTOCOL_VERSION == result.payload:
+      logging.info("Communication to node %s fine, sw version %s match",
+                   node, result.payload)
+    else:
+      raise errors.OpPrereqError("Version mismatch master version %s,"
+                                 " node version %s" %
+                                 (constants.PROTOCOL_VERSION, result.payload),
+                                 errors.ECODE_ENVIRON)
+
   def Exec(self, feedback_fn):
     """Adds the new node to the cluster.
 
@@ -5633,17 +5673,6 @@ class LUNodeAdd(LogicalUnit):
     if self.op.disk_state:
       new_node.disk_state_static = self.new_disk_state
 
-    # check connectivity
-    result = self.rpc.call_version([node])[node]
-    result.Raise("Can't get version information from node %s" % node)
-    if constants.PROTOCOL_VERSION == result.payload:
-      logging.info("Communication to node %s fine, sw version %s match",
-                   node, result.payload)
-    else:
-      raise errors.OpExecError("Version mismatch master version %s,"
-                               " node version %s" %
-                               (constants.PROTOCOL_VERSION, result.payload))
-
     # Add node to our /etc/hosts, and add key to known_hosts
     if self.cfg.GetClusterInfo().modify_etc_hosts:
       master_node = self.cfg.GetMasterNode()
@@ -5909,9 +5938,7 @@ class LUNodeSetParams(LogicalUnit):
 
     if old_role == self._ROLE_OFFLINE and new_role != old_role:
       # Trying to transition out of offline status
-      # TODO: Use standard RPC runner, but make sure it works when the node is
-      # still marked offline
-      result = rpc.BootstrapRunner().call_version([node.name])[node.name]
+      result = self.rpc.call_version([node.name])[node.name]
       if result.fail_msg:
         raise errors.OpPrereqError("Node %s is being de-offlined but fails"
                                    " to report its version: %s" %
@@ -6096,7 +6123,7 @@ class LUClusterQuery(NoHooksLU):
       "config_version": constants.CONFIG_VERSION,
       "os_api_version": max(constants.OS_API_VERSIONS),
       "export_version": constants.EXPORT_VERSION,
-      "architecture": (platform.architecture()[0], platform.machine()),
+      "architecture": runtime.GetArchInfo(),
       "name": cluster.cluster_name,
       "master": cluster.master_node,
       "default_hypervisor": cluster.primary_hypervisor,
@@ -6139,38 +6166,70 @@ class LUClusterConfigQuery(NoHooksLU):
 
   """
   REQ_BGL = False
-  _FIELDS_DYNAMIC = utils.FieldSet()
-  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag",
-                                  "watcher_pause", "volume_group_name")
 
   def CheckArguments(self):
-    _CheckOutputFields(static=self._FIELDS_STATIC,
-                       dynamic=self._FIELDS_DYNAMIC,
-                       selected=self.op.output_fields)
+    self.cq = _ClusterQuery(None, self.op.output_fields, False)
 
   def ExpandNames(self):
-    self.needed_locks = {}
+    self.cq.ExpandNames(self)
+
+  def DeclareLocks(self, level):
+    self.cq.DeclareLocks(self, level)
 
   def Exec(self, feedback_fn):
-    """Dump a representation of the cluster config to the standard output.
-
-    """
-    values = []
-    for field in self.op.output_fields:
-      if field == "cluster_name":
-        entry = self.cfg.GetClusterName()
-      elif field == "master_node":
-        entry = self.cfg.GetMasterNode()
-      elif field == "drain_flag":
-        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
-      elif field == "watcher_pause":
-        entry = utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
-      elif field == "volume_group_name":
-        entry = self.cfg.GetVGName()
-      else:
-        raise errors.ParameterError(field)
-      values.append(entry)
-    return values
+    result = self.cq.OldStyleQuery(self)
+
+    assert len(result) == 1
+
+    return result[0]
+
+
+class _ClusterQuery(_QueryBase):
+  FIELDS = query.CLUSTER_FIELDS
+
+  #: Do not sort (there is only one item)
+  SORT_FIELD = None
+
+  def ExpandNames(self, lu):
+    lu.needed_locks = {}
+
+    # The following variables interact with _QueryBase._GetNames
+    self.wanted = locking.ALL_SET
+    self.do_locking = self.use_locking
+
+    if self.do_locking:
+      raise errors.OpPrereqError("Can not use locking for cluster queries",
+                                 errors.ECODE_INVAL)
+
+  def DeclareLocks(self, lu, level):
+    pass
+
+  def _GetQueryData(self, lu):
+    """Computes the list of nodes and their attributes.
+
+    """
+    # Locking is not used
+    assert not (compat.any(lu.glm.is_owned(level)
+                           for level in locking.LEVELS
+                           if level != locking.LEVEL_CLUSTER) or
+                self.do_locking or self.use_locking)
+
+    if query.CQ_CONFIG in self.requested_data:
+      cluster = lu.cfg.GetClusterInfo()
+    else:
+      cluster = NotImplemented
+
+    if query.CQ_QUEUE_DRAINED in self.requested_data:
+      drain_flag = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
+    else:
+      drain_flag = NotImplemented
+
+    if query.CQ_WATCHER_PAUSE in self.requested_data:
+      watcher_pause = utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
+    else:
+      watcher_pause = NotImplemented
+
+    return query.ClusterQueryData(cluster, drain_flag, watcher_pause)
 
 
 class LUInstanceActivateDisks(NoHooksLU):
@@ -6257,7 +6316,8 @@ def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
         node_disk = node_disk.Copy()
         node_disk.UnsetSize()
       lu.cfg.SetDiskID(node_disk, node)
-      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False, idx)
+      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
+                                             False, idx)
       msg = result.fail_msg
       if msg:
         lu.proc.LogWarning("Could not prepare block device %s on node %s"
@@ -6279,7 +6339,8 @@ def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
         node_disk = node_disk.Copy()
         node_disk.UnsetSize()
       lu.cfg.SetDiskID(node_disk, node)
-      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True, idx)
+      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
+                                             True, idx)
       msg = result.fail_msg
       if msg:
         lu.proc.LogWarning("Could not prepare block device %s on node %s"
@@ -7319,7 +7380,7 @@ def _RemoveInstance(lu, feedback_fn, instance, ignore_failures):
   """
   logging.info("Removing block devices for instance %s", instance.name)
 
-  if not _RemoveDisks(lu, instance):
+  if not _RemoveDisks(lu, instance, ignore_failures=ignore_failures):
     if not ignore_failures:
       raise errors.OpExecError("Can't remove instance's disks")
     feedback_fn("Warning: can't remove instance's disks")
@@ -7674,7 +7735,7 @@ class LUInstanceMove(LogicalUnit):
     # activate, get path, copy the data over
     for idx, disk in enumerate(instance.disks):
       self.LogInfo("Copying data for disk %d", idx)
-      result = self.rpc.call_blockdev_assemble(target_node, disk,
+      result = self.rpc.call_blockdev_assemble(target_node, (disk, instance),
                                                instance.name, True, idx)
       if result.fail_msg:
         self.LogWarning("Can't assemble newly created disk %d: %s",
@@ -7682,7 +7743,7 @@ class LUInstanceMove(LogicalUnit):
         errs.append(result.fail_msg)
         break
       dev_path = result.payload
-      result = self.rpc.call_blockdev_export(source_node, disk,
+      result = self.rpc.call_blockdev_export(source_node, (disk, instance),
                                              target_node, dev_path,
                                              cluster_name)
       if result.fail_msg:
@@ -8045,7 +8106,8 @@ class TLMigrateInstance(Tasklet):
       all_done = True
       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
                                             self.nodes_ip,
-                                            self.instance.disks)
+                                            (self.instance.disks,
+                                             self.instance))
       min_percent = 100
       for node, nres in result.items():
         nres.Raise("Cannot resync disks on node %s" % node)
@@ -8091,7 +8153,7 @@ class TLMigrateInstance(Tasklet):
       msg = "single-master"
     self.feedback_fn("* changing disks into %s mode" % msg)
     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
-                                           self.instance.disks,
+                                           (self.instance.disks, self.instance),
                                            self.instance.name, multimaster)
     for node, nres in result.items():
       nres.Raise("Cannot change disks config on node %s" % node)
@@ -8243,7 +8305,7 @@ class TLMigrateInstance(Tasklet):
 
     self.feedback_fn("* checking disk consistency between source and target")
     for (idx, dev) in enumerate(instance.disks):
-      if not _CheckDiskConsistency(self.lu, dev, target_node, False):
+      if not _CheckDiskConsistency(self.lu, instance, dev, target_node, False):
         raise errors.OpExecError("Disk %s is degraded or not fully"
                                  " synchronized on target node,"
                                  " aborting migration" % idx)
@@ -8406,7 +8468,8 @@ class TLMigrateInstance(Tasklet):
       self.feedback_fn("* checking disk consistency between source and target")
       for (idx, dev) in enumerate(instance.disks):
         # for drbd, these are drbd over lvm
-        if not _CheckDiskConsistency(self.lu, dev, target_node, False):
+        if not _CheckDiskConsistency(self.lu, instance, dev, target_node,
+                                     False):
           if primary_node.offline:
             self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
                              " target node %s" %
@@ -8581,94 +8644,8 @@ def _GenerateUniqueNames(lu, exts):
   return results
 
 
-def _ComputeLDParams(disk_template, disk_params):
-  """Computes Logical Disk parameters from Disk Template parameters.
-
-  @type disk_template: string
-  @param disk_template: disk template, one of L{constants.DISK_TEMPLATES}
-  @type disk_params: dict
-  @param disk_params: disk template parameters; dict(template_name -> parameters
-  @rtype: list(dict)
-  @return: a list of dicts, one for each node of the disk hierarchy. Each dict
-    contains the LD parameters of the node. The tree is flattened in-order.
-
-  """
-  if disk_template not in constants.DISK_TEMPLATES:
-    raise errors.ProgrammerError("Unknown disk template %s" % disk_template)
-
-  result = list()
-  dt_params = disk_params[disk_template]
-  if disk_template == constants.DT_DRBD8:
-    drbd_params = {
-      constants.LDP_RESYNC_RATE: dt_params[constants.DRBD_RESYNC_RATE],
-      constants.LDP_BARRIERS: dt_params[constants.DRBD_DISK_BARRIERS],
-      constants.LDP_NO_META_FLUSH: dt_params[constants.DRBD_META_BARRIERS],
-      constants.LDP_DEFAULT_METAVG: dt_params[constants.DRBD_DEFAULT_METAVG],
-      constants.LDP_DISK_CUSTOM: dt_params[constants.DRBD_DISK_CUSTOM],
-      constants.LDP_NET_CUSTOM: dt_params[constants.DRBD_NET_CUSTOM],
-      constants.LDP_DYNAMIC_RESYNC: dt_params[constants.DRBD_DYNAMIC_RESYNC],
-      constants.LDP_PLAN_AHEAD: dt_params[constants.DRBD_PLAN_AHEAD],
-      constants.LDP_FILL_TARGET: dt_params[constants.DRBD_FILL_TARGET],
-      constants.LDP_DELAY_TARGET: dt_params[constants.DRBD_DELAY_TARGET],
-      constants.LDP_MAX_RATE: dt_params[constants.DRBD_MAX_RATE],
-      constants.LDP_MIN_RATE: dt_params[constants.DRBD_MIN_RATE],
-      }
-
-    drbd_params = \
-      objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_DRBD8],
-                       drbd_params)
-
-    result.append(drbd_params)
-
-    # data LV
-    data_params = {
-      constants.LDP_STRIPES: dt_params[constants.DRBD_DATA_STRIPES],
-      }
-    data_params = \
-      objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_LV],
-                       data_params)
-    result.append(data_params)
-
-    # metadata LV
-    meta_params = {
-      constants.LDP_STRIPES: dt_params[constants.DRBD_META_STRIPES],
-      }
-    meta_params = \
-      objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_LV],
-                       meta_params)
-    result.append(meta_params)
-
-  elif (disk_template == constants.DT_FILE or
-        disk_template == constants.DT_SHARED_FILE):
-    result.append(constants.DISK_LD_DEFAULTS[constants.LD_FILE])
-
-  elif disk_template == constants.DT_PLAIN:
-    params = {
-      constants.LDP_STRIPES: dt_params[constants.LV_STRIPES],
-      }
-    params = \
-      objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_LV],
-                       params)
-    result.append(params)
-
-  elif disk_template == constants.DT_BLOCK:
-    result.append(constants.DISK_LD_DEFAULTS[constants.LD_BLOCKDEV])
-
-  elif disk_template == constants.DT_RBD:
-    params = {
-      constants.LDP_POOL: dt_params[constants.RBD_POOL]
-      }
-    params = \
-      objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_RBD],
-                       params)
-    result.append(params)
-
-  return result
-
-
 def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
-                         iv_name, p_minor, s_minor, drbd_params, data_params,
-                         meta_params):
+                         iv_name, p_minor, s_minor):
   """Generate a drbd8 device complete with its children.
 
   """
@@ -8678,16 +8655,16 @@ def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
 
   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
                           logical_id=(vgnames[0], names[0]),
-                          params=data_params)
+                          params={})
   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE,
                           logical_id=(vgnames[1], names[1]),
-                          params=meta_params)
+                          params={})
   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
                           logical_id=(primary, secondary, port,
                                       p_minor, s_minor,
                                       shared_secret),
                           children=[dev_data, dev_meta],
-                          iv_name=iv_name, params=drbd_params)
+                          iv_name=iv_name, params={})
   return drbd_dev
 
 
@@ -8708,8 +8685,7 @@ _DISK_TEMPLATE_DEVICE_TYPE = {
 
 def _GenerateDiskTemplate(lu, template_name, instance_name, primary_node,
     secondary_nodes, disk_info, file_storage_dir, file_driver, base_index,
-    feedback_fn, disk_params,
-    _req_file_storage=opcodes.RequireFileStorage,
+    feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
     _req_shr_file_storage=opcodes.RequireSharedFileStorage):
   """Generate the entire disk layout for a given template type.
 
@@ -8719,18 +8695,20 @@ def _GenerateDiskTemplate(lu, template_name, instance_name, primary_node,
   vgname = lu.cfg.GetVGName()
   disk_count = len(disk_info)
   disks = []
-  ld_params = _ComputeLDParams(template_name, disk_params)
 
   if template_name == constants.DT_DISKLESS:
     pass
   elif template_name == constants.DT_DRBD8:
-    drbd_params, data_params, meta_params = ld_params
     if len(secondary_nodes) != 1:
       raise errors.ProgrammerError("Wrong template configuration")
     remote_node = secondary_nodes[0]
     minors = lu.cfg.AllocateDRBDMinor(
       [primary_node, remote_node] * len(disk_info), instance_name)
 
+    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
+                                                       full_disk_params)
+    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
+
     names = []
     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
                                                for i in range(disk_count)]):
@@ -8738,7 +8716,6 @@ def _GenerateDiskTemplate(lu, template_name, instance_name, primary_node,
       names.append(lv_prefix + "_meta")
     for idx, disk in enumerate(disk_info):
       disk_index = idx + base_index
-      drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
       data_vg = disk.get(constants.IDISK_VG, vgname)
       meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
@@ -8746,8 +8723,7 @@ def _GenerateDiskTemplate(lu, template_name, instance_name, primary_node,
                                       [data_vg, meta_vg],
                                       names[idx * 2:idx * 2 + 2],
                                       "disk/%d" % disk_index,
-                                      minors[idx * 2], minors[idx * 2 + 1],
-                                      drbd_params, data_params, meta_params)
+                                      minors[idx * 2], minors[idx * 2 + 1])
       disk_dev.mode = disk[constants.IDISK_MODE]
       disks.append(disk_dev)
   else:
@@ -8767,8 +8743,6 @@ def _GenerateDiskTemplate(lu, template_name, instance_name, primary_node,
                                         (name_prefix, base_index + i)
                                         for i in range(disk_count)])
 
-    dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
-
     if template_name == constants.DT_PLAIN:
       def logical_id_fn(idx, _, disk):
         vg = disk.get(constants.IDISK_VG, vgname)
@@ -8787,6 +8761,8 @@ def _GenerateDiskTemplate(lu, template_name, instance_name, primary_node,
     else:
       raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
 
+    dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
+
     for idx, disk in enumerate(disk_info):
       disk_index = idx + base_index
       size = disk[constants.IDISK_SIZE]
@@ -8796,7 +8772,7 @@ def _GenerateDiskTemplate(lu, template_name, instance_name, primary_node,
                                 logical_id=logical_id_fn(idx, disk_index, disk),
                                 iv_name="disk/%d" % disk_index,
                                 mode=disk[constants.IDISK_MODE],
-                                params=ld_params[0]))
+                                params={}))
 
   return disks
 
@@ -8837,7 +8813,9 @@ def _WipeDisks(lu, instance):
     lu.cfg.SetDiskID(device, node)
 
   logging.info("Pause sync of instance %s disks", instance.name)
-  result = lu.rpc.call_blockdev_pause_resume_sync(node, instance.disks, True)
+  result = lu.rpc.call_blockdev_pause_resume_sync(node,
+                                                  (instance.disks, instance),
+                                                  True)
 
   for idx, success in enumerate(result.payload):
     if not success:
@@ -8867,7 +8845,8 @@ def _WipeDisks(lu, instance):
         wipe_size = min(wipe_chunk_size, size - offset)
         logging.debug("Wiping disk %d, offset %s, chunk %s",
                       idx, offset, wipe_size)
-        result = lu.rpc.call_blockdev_wipe(node, device, offset, wipe_size)
+        result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
+                                           wipe_size)
         result.Raise("Could not wipe disk %d at offset %d for size %d" %
                      (idx, offset, wipe_size))
         now = time.time()
@@ -8880,7 +8859,9 @@ def _WipeDisks(lu, instance):
   finally:
     logging.info("Resume sync of instance %s disks", instance.name)
 
-    result = lu.rpc.call_blockdev_pause_resume_sync(node, instance.disks, False)
+    result = lu.rpc.call_blockdev_pause_resume_sync(node,
+                                                    (instance.disks, instance),
+                                                    False)
 
     for idx, success in enumerate(result.payload):
       if not success:
@@ -8934,7 +8915,7 @@ def _CreateDisks(lu, instance, to_skip=None, target_node=None):
       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
 
 
-def _RemoveDisks(lu, instance, target_node=None):
+def _RemoveDisks(lu, instance, target_node=None, ignore_failures=False):
   """Remove all disks for an instance.
 
   This abstracts away some work from `AddInstance()` and
@@ -8955,6 +8936,7 @@ def _RemoveDisks(lu, instance, target_node=None):
   logging.info("Removing block devices for instance %s", instance.name)
 
   all_result = True
+  ports_to_release = set()
   for (idx, device) in enumerate(instance.disks):
     if target_node:
       edata = [(target_node, device)]
@@ -8970,8 +8952,11 @@ def _RemoveDisks(lu, instance, target_node=None):
 
     # if this is a DRBD disk, return its port to the pool
     if device.dev_type in constants.LDS_DRBD:
-      tcp_port = device.logical_id[2]
-      lu.cfg.AddTcpUdpPort(tcp_port)
+      ports_to_release.add(device.logical_id[2])
+
+  if all_result or ignore_failures:
+    for port in ports_to_release:
+      lu.cfg.AddTcpUdpPort(port)
 
   if instance.disk_template == constants.DT_FILE:
     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
@@ -9867,10 +9852,6 @@ class LUInstanceCreate(LogicalUnit):
                                                     utils.CommaJoin(res)),
                                   errors.ECODE_INVAL)
 
-    # disk parameters (not customizable at instance or node level)
-    # just use the primary node parameters, ignoring the secondary.
-    self.diskparams = group_info.diskparams
-
     if not self.adopt_disks:
       if self.op.disk_template == constants.DT_RBD:
         # _CheckRADOSFreeSpace() is just a placeholder.
@@ -9987,6 +9968,11 @@ class LUInstanceCreate(LogicalUnit):
     else:
       network_port = None
 
+    # This is ugly but we got a chicken-egg problem here
+    # We can only take the group disk parameters, as the instance
+    # has no disks yet (we are generating them right here).
+    node = self.cfg.GetNodeInfo(pnode_name)
+    nodegroup = self.cfg.GetNodeGroup(node.group)
     disks = _GenerateDiskTemplate(self,
                                   self.op.disk_template,
                                   instance, pnode_name,
@@ -9996,7 +9982,7 @@ class LUInstanceCreate(LogicalUnit):
                                   self.op.file_driver,
                                   0,
                                   feedback_fn,
-                                  self.diskparams)
+                                  self.cfg.GetGroupDiskParams(nodegroup))
 
     iobj = objects.Instance(name=instance, os=self.op.os_type,
                             primary_node=pnode_name,
@@ -10095,7 +10081,8 @@ class LUInstanceCreate(LogicalUnit):
           if pause_sync:
             feedback_fn("* pausing disk sync to install instance OS")
             result = self.rpc.call_blockdev_pause_resume_sync(pnode_name,
-                                                              iobj.disks, True)
+                                                              (iobj.disks,
+                                                               iobj), True)
             for idx, success in enumerate(result.payload):
               if not success:
                 logging.warn("pause-sync of instance %s for disk %d failed",
@@ -10109,7 +10096,8 @@ class LUInstanceCreate(LogicalUnit):
           if pause_sync:
             feedback_fn("* resuming disk sync")
             result = self.rpc.call_blockdev_pause_resume_sync(pnode_name,
-                                                              iobj.disks, False)
+                                                              (iobj.disks,
+                                                               iobj), False)
             for idx, success in enumerate(result.payload):
               if not success:
                 logging.warn("resume-sync of instance %s for disk %d failed",
@@ -10646,16 +10634,6 @@ class TLReplaceDisks(Tasklet):
       _CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
                               ignore=self.ignore_ipolicy)
 
-    # TODO: compute disk parameters
-    primary_node_info = self.cfg.GetNodeInfo(instance.primary_node)
-    secondary_node_info = self.cfg.GetNodeInfo(secondary_node)
-    if primary_node_info.group != secondary_node_info.group:
-      self.lu.LogInfo("The instance primary and secondary nodes are in two"
-                      " different node groups; the disk parameters of the"
-                      " primary node's group will be applied.")
-
-    self.diskparams = self.cfg.GetNodeGroup(primary_node_info.group).diskparams
-
     for node in check_nodes:
       _CheckNodeOnline(self.lu, node)
 
@@ -10789,8 +10767,8 @@ class TLReplaceDisks(Tasklet):
       self.lu.LogInfo("Checking disk/%d consistency on node %s" %
                       (idx, node_name))
 
-      if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
-                                   ldisk=ldisk):
+      if not _CheckDiskConsistency(self.lu, self.instance, dev, node_name,
+                                   on_primary, ldisk=ldisk):
         raise errors.OpExecError("Node %s has degraded storage, unsafe to"
                                  " replace disks for instance %s" %
                                  (node_name, self.instance.name))
@@ -10815,14 +10793,12 @@ class TLReplaceDisks(Tasklet):
       lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
       names = _GenerateUniqueNames(self.lu, lv_names)
 
-      _, data_p, meta_p = _ComputeLDParams(constants.DT_DRBD8, self.diskparams)
-
       vg_data = dev.children[0].logical_id[0]
       lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
-                             logical_id=(vg_data, names[0]), params=data_p)
+                             logical_id=(vg_data, names[0]), params={})
       vg_meta = dev.children[1].logical_id[0]
       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE,
-                             logical_id=(vg_meta, names[1]), params=meta_p)
+                             logical_id=(vg_meta, names[1]), params={})
 
       new_lvs = [lv_data, lv_meta]
       old_lvs = [child.Copy() for child in dev.children]
@@ -10960,8 +10936,9 @@ class TLReplaceDisks(Tasklet):
 
       # Now that the new lvs have the old name, we can add them to the device
       self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
-      result = self.rpc.call_blockdev_addchildren(self.target_node, dev,
-                                                  new_lvs)
+      result = self.rpc.call_blockdev_addchildren(self.target_node,
+                                                  (dev, self.instance),
+                                                  (new_lvs, self.instance))
       msg = result.fail_msg
       if msg:
         for new_lv in new_lvs:
@@ -11079,12 +11056,11 @@ class TLReplaceDisks(Tasklet):
       iv_names[idx] = (dev, dev.children, new_net_id)
       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
                     new_net_id)
-      drbd_params, _, _ = _ComputeLDParams(constants.DT_DRBD8, self.diskparams)
       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
                               logical_id=new_alone_id,
                               children=dev.children,
                               size=dev.size,
-                              params=drbd_params)
+                              params={})
       try:
         _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
                               _GetInstanceInfoText(self.instance), False)
@@ -11132,7 +11108,7 @@ class TLReplaceDisks(Tasklet):
     result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
                                             self.new_node],
                                            self.node_secondary_ip,
-                                           self.instance.disks,
+                                           (self.instance.disks, self.instance),
                                            self.instance.name,
                                            False)
     for to_node, to_result in result.items():
@@ -11523,6 +11499,7 @@ class LUInstanceGrowDisk(LogicalUnit):
     env = {
       "DISK": self.op.disk,
       "AMOUNT": self.op.amount,
+      "ABSOLUTE": self.op.absolute,
       }
     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
     return env
@@ -11555,13 +11532,30 @@ class LUInstanceGrowDisk(LogicalUnit):
 
     self.disk = instance.FindDisk(self.op.disk)
 
+    if self.op.absolute:
+      self.target = self.op.amount
+      self.delta = self.target - self.disk.size
+      if self.delta < 0:
+        raise errors.OpPrereqError("Requested size (%s) is smaller than "
+                                   "current disk size (%s)" %
+                                   (utils.FormatUnit(self.target, "h"),
+                                    utils.FormatUnit(self.disk.size, "h")),
+                                   errors.ECODE_STATE)
+    else:
+      self.delta = self.op.amount
+      self.target = self.disk.size + self.delta
+      if self.delta < 0:
+        raise errors.OpPrereqError("Requested increment (%s) is negative" %
+                                   utils.FormatUnit(self.delta, "h"),
+                                   errors.ECODE_INVAL)
+
     if instance.disk_template not in (constants.DT_FILE,
                                       constants.DT_SHARED_FILE,
                                       constants.DT_RBD):
       # TODO: check the free disk space for file, when that feature will be
       # supported
       _CheckNodesFreeDiskPerVG(self, nodenames,
-                               self.disk.ComputeGrowth(self.op.amount))
+                               self.disk.ComputeGrowth(self.delta))
 
   def Exec(self, feedback_fn):
     """Execute disk grow.
@@ -11578,21 +11572,24 @@ class LUInstanceGrowDisk(LogicalUnit):
     if not disks_ok:
       raise errors.OpExecError("Cannot activate block device to grow")
 
-    feedback_fn("Growing disk %s of instance '%s' by %s" %
+    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
                 (self.op.disk, instance.name,
-                 utils.FormatUnit(self.op.amount, "h")))
+                 utils.FormatUnit(self.delta, "h"),
+                 utils.FormatUnit(self.target, "h")))
 
     # First run all grow ops in dry-run mode
     for node in instance.all_nodes:
       self.cfg.SetDiskID(disk, node)
-      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, True)
+      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
+                                           True)
       result.Raise("Grow request failed to node %s" % node)
 
     # We know that (as far as we can test) operations across different
     # nodes will succeed, time to run it for real
     for node in instance.all_nodes:
       self.cfg.SetDiskID(disk, node)
-      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, False)
+      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
+                                           False)
       result.Raise("Grow request failed to node %s" % node)
 
       # TODO: Rewrite code to work properly
@@ -11602,7 +11599,7 @@ class LUInstanceGrowDisk(LogicalUnit):
       # time is a work-around.
       time.sleep(5)
 
-    disk.RecordGrow(self.op.amount)
+    disk.RecordGrow(self.delta)
     self.cfg.Update(instance, feedback_fn)
 
     # Changes have been recorded, release node lock
@@ -11656,12 +11653,25 @@ class LUInstanceQueryData(NoHooksLU):
       else:
         self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
 
+      self.needed_locks[locking.LEVEL_NODEGROUP] = []
       self.needed_locks[locking.LEVEL_NODE] = []
       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
 
   def DeclareLocks(self, level):
-    if self.op.use_locking and level == locking.LEVEL_NODE:
-      self._LockInstancesNodes()
+    if self.op.use_locking:
+      if level == locking.LEVEL_NODEGROUP:
+        owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
+
+        # Lock all groups used by instances optimistically; this requires going
+        # via the node before it's locked, requiring verification later on
+        self.needed_locks[locking.LEVEL_NODEGROUP] = \
+          frozenset(group_uuid
+                    for instance_name in owned_instances
+                    for group_uuid in
+                      self.cfg.GetInstanceNodeGroups(instance_name))
+
+      elif level == locking.LEVEL_NODE:
+        self._LockInstancesNodes()
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -11669,14 +11679,25 @@ class LUInstanceQueryData(NoHooksLU):
     This only checks the optional instance list against the existing names.
 
     """
+    owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
+    owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
+    owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
+
     if self.wanted_names is None:
       assert self.op.use_locking, "Locking was not used"
-      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
+      self.wanted_names = owned_instances
 
-    self.wanted_instances = \
-        map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
+    instances = dict(self.cfg.GetMultiInstanceInfo(self.wanted_names))
 
-  def _ComputeBlockdevStatus(self, node, instance_name, dev):
+    if self.op.use_locking:
+      _CheckInstancesNodeGroups(self.cfg, instances, owned_groups, owned_nodes,
+                                None)
+    else:
+      assert not (owned_instances or owned_groups or owned_nodes)
+
+    self.wanted_instances = instances.values()
+
+  def _ComputeBlockdevStatus(self, node, instance, dev):
     """Returns the status of a block device
 
     """
@@ -11689,7 +11710,7 @@ class LUInstanceQueryData(NoHooksLU):
     if result.offline:
       return None
 
-    result.Raise("Can't compute disk status for %s" % instance_name)
+    result.Raise("Can't compute disk status for %s" % instance.name)
 
     status = result.payload
     if status is None:
@@ -11711,8 +11732,8 @@ class LUInstanceQueryData(NoHooksLU):
         snode = dev.logical_id[0]
 
     dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node,
-                                              instance.name, dev)
-    dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev)
+                                              instance, dev)
+    dev_sstatus = self._ComputeBlockdevStatus(snode, instance, dev)
 
     if dev.children:
       dev_children = map(compat.partial(self._ComputeDiskStatus,
@@ -11739,9 +11760,17 @@ class LUInstanceQueryData(NoHooksLU):
 
     cluster = self.cfg.GetClusterInfo()
 
-    pri_nodes = self.cfg.GetMultiNodeInfo(i.primary_node
-                                          for i in self.wanted_instances)
-    for instance, (_, pnode) in zip(self.wanted_instances, pri_nodes):
+    node_names = itertools.chain(*(i.all_nodes for i in self.wanted_instances))
+    nodes = dict(self.cfg.GetMultiNodeInfo(node_names))
+
+    groups = dict(self.cfg.GetMultiNodeGroupInfo(node.group
+                                                 for node in nodes.values()))
+
+    group2name_fn = lambda uuid: groups[uuid].name
+
+    for instance in self.wanted_instances:
+      pnode = nodes[instance.primary_node]
+
       if self.op.static or pnode.offline:
         remote_state = None
         if pnode.offline:
@@ -11765,12 +11794,19 @@ class LUInstanceQueryData(NoHooksLU):
       disks = map(compat.partial(self._ComputeDiskStatus, instance, None),
                   instance.disks)
 
+      snodes_group_uuids = [nodes[snode_name].group
+                            for snode_name in instance.secondary_nodes]
+
       result[instance.name] = {
         "name": instance.name,
         "config_state": instance.admin_state,
         "run_state": remote_state,
         "pnode": instance.primary_node,
+        "pnode_group_uuid": pnode.group,
+        "pnode_group_name": group2name_fn(pnode.group),
         "snodes": instance.secondary_nodes,
+        "snodes_group_uuids": snodes_group_uuids,
+        "snodes_group_names": map(group2name_fn, snodes_group_uuids),
         "os": instance.os,
         # this happens to be the same format used for hooks
         "nics": _NICListToTuple(self, instance.nics),
@@ -12222,7 +12258,7 @@ class LUInstanceSetParams(LogicalUnit):
     pnode = instance.primary_node
     nodelist = list(instance.all_nodes)
     pnode_info = self.cfg.GetNodeInfo(pnode)
-    self.diskparams = self.cfg.GetNodeGroup(pnode_info.group).diskparams
+    self.diskparams = self.cfg.GetInstanceDiskParams(instance)
 
     # Prepare disk/NIC modifications
     self.diskmod = PrepareContainerMods(self.op.disks, None)
@@ -12503,7 +12539,7 @@ class LUInstanceSetParams(LogicalUnit):
                                       disk_info, None, None, 0, feedback_fn,
                                       self.diskparams)
     info = _GetInstanceInfoText(instance)
-    feedback_fn("Creating aditional volumes...")
+    feedback_fn("Creating additional volumes...")
     # first, create the missing data and meta devices
     for disk in new_disks:
       # unfortunately this is... not too nice
@@ -12564,6 +12600,12 @@ class LUInstanceSetParams(LogicalUnit):
       child.size = parent.size
       child.mode = parent.mode
 
+    # this is a DRBD disk, return its port to the pool
+    # NOTE: this must be done right before the call to cfg.Update!
+    for disk in old_disks:
+      tcp_port = disk.logical_id[2]
+      self.cfg.AddTcpUdpPort(tcp_port)
+
     # update instance structure
     instance.disks = new_disks
     instance.disk_template = constants.DT_PLAIN
@@ -12589,13 +12631,6 @@ class LUInstanceSetParams(LogicalUnit):
         self.LogWarning("Could not remove metadata for disk %d on node %s,"
                         " continuing anyway: %s", idx, pnode, msg)
 
-    # this is a DRBD disk, return its port to the pool
-    for disk in old_disks:
-      tcp_port = disk.logical_id[2]
-      self.cfg.AddTcpUdpPort(tcp_port)
-
-    # Node resource locks will be released by caller
-
   def _CreateNewDisk(self, idx, params, _):
     """Creates a new disk.
 
@@ -12890,7 +12925,7 @@ class LUInstanceChangeGroup(LogicalUnit):
 
     if self.req_target_uuids:
       # User requested specific target groups
-      self.target_uuids = self.req_target_uuids
+      self.target_uuids = frozenset(self.req_target_uuids)
     else:
       # All groups except those used by the instance are potential targets
       self.target_uuids = owned_groups - inst_groups
@@ -12959,32 +12994,74 @@ class LUBackupQuery(NoHooksLU):
   """
   REQ_BGL = False
 
+  def CheckArguments(self):
+    self.expq = _ExportQuery(qlang.MakeSimpleFilter("node", self.op.nodes),
+                             ["node", "export"], self.op.use_locking)
+
   def ExpandNames(self):
-    self.needed_locks = {}
-    self.share_locks[locking.LEVEL_NODE] = 1
-    if not self.op.nodes:
-      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
-    else:
-      self.needed_locks[locking.LEVEL_NODE] = \
-        _GetWantedNodes(self, self.op.nodes)
+    self.expq.ExpandNames(self)
+
+  def DeclareLocks(self, level):
+    self.expq.DeclareLocks(self, level)
 
   def Exec(self, feedback_fn):
-    """Compute the list of all the exported system images.
+    result = {}
 
-    @rtype: dict
-    @return: a dictionary with the structure node->(export-list)
-        where export-list is a list of the instances exported on
-        that node.
+    for (node, expname) in self.expq.OldStyleQuery(self):
+      if expname is None:
+        result[node] = False
+      else:
+        result.setdefault(node, []).append(expname)
+
+    return result
+
+
+class _ExportQuery(_QueryBase):
+  FIELDS = query.EXPORT_FIELDS
+
+  #: The node name is not a unique key for this query
+  SORT_FIELD = "node"
+
+  def ExpandNames(self, lu):
+    lu.needed_locks = {}
+
+    # The following variables interact with _QueryBase._GetNames
+    if self.names:
+      self.wanted = _GetWantedNodes(lu, self.names)
+    else:
+      self.wanted = locking.ALL_SET
+
+    self.do_locking = self.use_locking
+
+    if self.do_locking:
+      lu.share_locks = _ShareAll()
+      lu.needed_locks = {
+        locking.LEVEL_NODE: self.wanted,
+        }
+
+  def DeclareLocks(self, lu, level):
+    pass
+
+  def _GetQueryData(self, lu):
+    """Computes the list of nodes and their attributes.
 
     """
-    self.nodes = self.owned_locks(locking.LEVEL_NODE)
-    rpcresult = self.rpc.call_export_list(self.nodes)
-    result = {}
-    for node in rpcresult:
-      if rpcresult[node].fail_msg:
-        result[node] = False
+    # Locking is not used
+    # TODO
+    assert not (compat.any(lu.glm.is_owned(level)
+                           for level in locking.LEVELS
+                           if level != locking.LEVEL_CLUSTER) or
+                self.do_locking or self.use_locking)
+
+    nodes = self._GetNames(lu, lu.cfg.GetNodeList(), locking.LEVEL_NODE)
+
+    result = []
+
+    for (node, nres) in lu.rpc.call_export_list(nodes).items():
+      if nres.fail_msg:
+        result.append((node, None))
       else:
-        result[node] = rpcresult[node].payload
+        result.extend((node, expname) for expname in nres.payload)
 
     return result
 
@@ -14081,16 +14158,8 @@ class LUGroupEvacuate(LogicalUnit):
     self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances))
 
     # Check if node groups for locked instances are still correct
-    for instance_name in owned_instances:
-      inst = self.instances[instance_name]
-      assert owned_nodes.issuperset(inst.all_nodes), \
-        "Instance %s's nodes changed while we kept the lock" % instance_name
-
-      inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name,
-                                             owned_groups)
-
-      assert self.group_uuid in inst_groups, \
-        "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
+    _CheckInstancesNodeGroups(self.cfg, self.instances,
+                              owned_groups, owned_nodes, self.group_uuid)
 
     if self.req_target_uuids:
       # User requested specific target groups
@@ -14158,14 +14227,25 @@ class TagsLU(NoHooksLU): # pylint: disable=W0223
   def ExpandNames(self):
     self.group_uuid = None
     self.needed_locks = {}
+
     if self.op.kind == constants.TAG_NODE:
       self.op.name = _ExpandNodeName(self.cfg, self.op.name)
-      self.needed_locks[locking.LEVEL_NODE] = self.op.name
+      lock_level = locking.LEVEL_NODE
+      lock_name = self.op.name
     elif self.op.kind == constants.TAG_INSTANCE:
       self.op.name = _ExpandInstanceName(self.cfg, self.op.name)
-      self.needed_locks[locking.LEVEL_INSTANCE] = self.op.name
+      lock_level = locking.LEVEL_INSTANCE
+      lock_name = self.op.name
     elif self.op.kind == constants.TAG_NODEGROUP:
       self.group_uuid = self.cfg.LookupNodeGroup(self.op.name)
+      lock_level = locking.LEVEL_NODEGROUP
+      lock_name = self.group_uuid
+    else:
+      lock_level = None
+      lock_name = None
+
+    if lock_level and getattr(self.op, "use_locking", True):
+      self.needed_locks[lock_level] = lock_name
 
     # FIXME: Acquire BGL for cluster tag operations (as of this writing it's
     # not possible to acquire the BGL based on opcode parameters)
@@ -15113,10 +15193,12 @@ class LUTestAllocator(NoHooksLU):
 
 #: Query type implementations
 _QUERY_IMPL = {
+  constants.QR_CLUSTER: _ClusterQuery,
   constants.QR_INSTANCE: _InstanceQuery,
   constants.QR_NODE: _NodeQuery,
   constants.QR_GROUP: _GroupQuery,
   constants.QR_OS: _OsQuery,
+  constants.QR_EXPORT: _ExportQuery,
   }
 
 assert set(_QUERY_IMPL.keys()) == constants.QR_VIA_OP