cmdlib: Extract storage related functionality
authorThomas Thrainer <thomasth@google.com>
Thu, 16 May 2013 07:13:48 +0000 (09:13 +0200)
committerThomas Thrainer <thomasth@google.com>
Fri, 17 May 2013 09:32:09 +0000 (11:32 +0200)
Split instance.py further by extracting storage related logical units
and functions to instance_storage.py.

Signed-off-by: Thomas Thrainer <thomasth@google.com>
Reviewed-by: Bernardo Dal Seno <bdalseno@google.com>

Makefile.am
lib/cmdlib/__init__.py
lib/cmdlib/backup.py
lib/cmdlib/instance.py
lib/cmdlib/instance_storage.py [new file with mode: 0644]
lib/cmdlib/instance_utils.py
test/py/ganeti.cmdlib_unittest.py

index 1f16894..64306df 100644 (file)
@@ -315,6 +315,7 @@ cmdlib_PYTHON = \
        lib/cmdlib/group.py \
        lib/cmdlib/node.py \
        lib/cmdlib/instance.py \
+       lib/cmdlib/instance_storage.py \
        lib/cmdlib/instance_utils.py \
        lib/cmdlib/backup.py \
        lib/cmdlib/query.py \
index 95f7713..b248063 100644 (file)
@@ -83,7 +83,8 @@ from ganeti.cmdlib.instance import \
   LUInstanceMigrate, \
   LUInstanceMultiAlloc, \
   LUInstanceSetParams, \
-  LUInstanceChangeGroup, \
+  LUInstanceChangeGroup
+from ganeti.cmdlib.instance_storage import \
   LUInstanceRecreateDisks, \
   LUInstanceGrowDisk, \
   LUInstanceReplaceDisks, \
index 3cbc664..e5954f5 100644 (file)
@@ -36,9 +36,10 @@ from ganeti import utils
 from ganeti.cmdlib.base import _QueryBase, NoHooksLU, LogicalUnit
 from ganeti.cmdlib.common import _GetWantedNodes, _ShareAll, \
   _CheckNodeOnline, _ExpandNodeName
+from ganeti.cmdlib.instance_storage import _StartInstanceDisks, \
+  _ShutdownInstanceDisks
 from ganeti.cmdlib.instance_utils import _GetClusterDomainSecret, \
-  _BuildInstanceHookEnvByObject, _CheckNodeNotDrained, _StartInstanceDisks, \
-  _ShutdownInstanceDisks, _RemoveInstance
+  _BuildInstanceHookEnvByObject, _CheckNodeNotDrained, _RemoveInstance
 
 
 class _ExportQuery(_QueryBase):
index b5dc099..9edf524 100644 (file)
@@ -55,35 +55,23 @@ from ganeti.cmdlib.common import INSTANCE_ONLINE, INSTANCE_DOWN, \
   _LoadNodeEvacResult, _CheckIAllocatorOrNode, _CheckParamsNotGlobal, \
   _IsExclusiveStorageEnabledNode, _CheckHVParams, _CheckOSParams, \
   _GetWantedInstances, _CheckInstancesNodeGroups, _AnnotateDiskParams, \
-  _GetUpdatedParams, _ExpandInstanceName, _FindFaultyInstanceDisks, \
-  _ComputeIPolicySpecViolation, _ComputeIPolicyInstanceViolation, \
+  _GetUpdatedParams, _ExpandInstanceName, _ComputeIPolicySpecViolation, \
   _CheckInstanceState, _ExpandNodeName
-from ganeti.cmdlib.instance_utils import _AssembleInstanceDisks, \
-  _BuildInstanceHookEnvByObject, _GetClusterDomainSecret, \
-  _BuildInstanceHookEnv, _NICListToTuple, _NICToTuple, _CheckNodeNotDrained, \
-  _RemoveDisks, _StartInstanceDisks, _ShutdownInstanceDisks, \
-  _RemoveInstance, _ExpandCheckDisks
+from ganeti.cmdlib.instance_storage import _CreateDisks, \
+  _CheckNodesFreeDiskPerVG, _WipeDisks, _WaitForSync, _CheckDiskConsistency, \
+  _IsExclusiveStorageEnabledNodeName, _CreateSingleBlockDev, _ComputeDisks, \
+  _CheckRADOSFreeSpace, _ComputeDiskSizePerVG, _GenerateDiskTemplate, \
+  _CreateBlockDev, _StartInstanceDisks, _ShutdownInstanceDisks, \
+  _AssembleInstanceDisks, _ExpandCheckDisks
+from ganeti.cmdlib.instance_utils import _BuildInstanceHookEnvByObject, \
+  _GetClusterDomainSecret, _BuildInstanceHookEnv, _NICListToTuple, \
+  _NICToTuple, _CheckNodeNotDrained, _RemoveInstance, _CopyLockList, \
+  _ReleaseLocks, _CheckNodeVmCapable, _CheckTargetNodeIPolicy, \
+  _GetInstanceInfoText, _RemoveDisks
 
 import ganeti.masterd.instance
 
 
-_DISK_TEMPLATE_NAME_PREFIX = {
-  constants.DT_PLAIN: "",
-  constants.DT_RBD: ".rbd",
-  constants.DT_EXT: ".ext",
-  }
-
-
-_DISK_TEMPLATE_DEVICE_TYPE = {
-  constants.DT_PLAIN: constants.LD_LV,
-  constants.DT_FILE: constants.LD_FILE,
-  constants.DT_SHARED_FILE: constants.LD_FILE,
-  constants.DT_BLOCK: constants.LD_BLOCKDEV,
-  constants.DT_RBD: constants.LD_RBD,
-  constants.DT_EXT: constants.LD_EXT,
-  }
-
-
 #: Type description for changes as returned by L{ApplyContainerMods}'s
 #: callbacks
 _TApplyContModsCbChanges = \
@@ -93,68 +81,6 @@ _TApplyContModsCbChanges = \
     ])))
 
 
-def _CopyLockList(names):
-  """Makes a copy of a list of lock names.
-
-  Handles L{locking.ALL_SET} correctly.
-
-  """
-  if names == locking.ALL_SET:
-    return locking.ALL_SET
-  else:
-    return names[:]
-
-
-def _ReleaseLocks(lu, level, names=None, keep=None):
-  """Releases locks owned by an LU.
-
-  @type lu: L{LogicalUnit}
-  @param level: Lock level
-  @type names: list or None
-  @param names: Names of locks to release
-  @type keep: list or None
-  @param keep: Names of locks to retain
-
-  """
-  assert not (keep is not None and names is not None), \
-    "Only one of the 'names' and the 'keep' parameters can be given"
-
-  if names is not None:
-    should_release = names.__contains__
-  elif keep:
-    should_release = lambda name: name not in keep
-  else:
-    should_release = None
-
-  owned = lu.owned_locks(level)
-  if not owned:
-    # Not owning any lock at this level, do nothing
-    pass
-
-  elif should_release:
-    retain = []
-    release = []
-
-    # Determine which locks to release
-    for name in owned:
-      if should_release(name):
-        release.append(name)
-      else:
-        retain.append(name)
-
-    assert len(lu.owned_locks(level)) == (len(retain) + len(release))
-
-    # Release just some locks
-    lu.glm.release(level, names=release)
-
-    assert frozenset(lu.owned_locks(level)) == frozenset(retain)
-  else:
-    # Release everything
-    lu.glm.release(level)
-
-    assert not lu.glm.is_owned(level), "No locks should be owned"
-
-
 def _CheckHostnameSane(lu, name):
   """Ensures that a given hostname resolves to a 'sane' name.
 
@@ -344,261 +270,6 @@ def _CheckForConflictingIp(lu, ip, node):
   return (None, None)
 
 
-def _CheckRADOSFreeSpace():
-  """Compute disk size requirements inside the RADOS cluster.
-
-  """
-  # For the RADOS cluster we assume there is always enough space.
-  pass
-
-
-def _WaitForSync(lu, instance, disks=None, oneshot=False):
-  """Sleep and poll for an instance's disk to sync.
-
-  """
-  if not instance.disks or disks is not None and not disks:
-    return True
-
-  disks = _ExpandCheckDisks(instance, disks)
-
-  if not oneshot:
-    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
-
-  node = instance.primary_node
-
-  for dev in disks:
-    lu.cfg.SetDiskID(dev, node)
-
-  # TODO: Convert to utils.Retry
-
-  retries = 0
-  degr_retries = 10 # in seconds, as we sleep 1 second each time
-  while True:
-    max_time = 0
-    done = True
-    cumul_degraded = False
-    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
-    msg = rstats.fail_msg
-    if msg:
-      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
-      retries += 1
-      if retries >= 10:
-        raise errors.RemoteError("Can't contact node %s for mirror data,"
-                                 " aborting." % node)
-      time.sleep(6)
-      continue
-    rstats = rstats.payload
-    retries = 0
-    for i, mstat in enumerate(rstats):
-      if mstat is None:
-        lu.LogWarning("Can't compute data for node %s/%s",
-                      node, disks[i].iv_name)
-        continue
-
-      cumul_degraded = (cumul_degraded or
-                        (mstat.is_degraded and mstat.sync_percent is None))
-      if mstat.sync_percent is not None:
-        done = False
-        if mstat.estimated_time is not None:
-          rem_time = ("%s remaining (estimated)" %
-                      utils.FormatSeconds(mstat.estimated_time))
-          max_time = mstat.estimated_time
-        else:
-          rem_time = "no time estimate"
-        lu.LogInfo("- device %s: %5.2f%% done, %s",
-                   disks[i].iv_name, mstat.sync_percent, rem_time)
-
-    # if we're done but degraded, let's do a few small retries, to
-    # make sure we see a stable and not transient situation; therefore
-    # we force restart of the loop
-    if (done or oneshot) and cumul_degraded and degr_retries > 0:
-      logging.info("Degraded disks found, %d retries left", degr_retries)
-      degr_retries -= 1
-      time.sleep(1)
-      continue
-
-    if done or oneshot:
-      break
-
-    time.sleep(min(60, max_time))
-
-  if done:
-    lu.LogInfo("Instance %s's disks are in sync", instance.name)
-
-  return not cumul_degraded
-
-
-def _ComputeDisks(op, default_vg):
-  """Computes the instance disks.
-
-  @param op: The instance opcode
-  @param default_vg: The default_vg to assume
-
-  @return: The computed disks
-
-  """
-  disks = []
-  for disk in op.disks:
-    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
-    if mode not in constants.DISK_ACCESS_SET:
-      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
-                                 mode, errors.ECODE_INVAL)
-    size = disk.get(constants.IDISK_SIZE, None)
-    if size is None:
-      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
-    try:
-      size = int(size)
-    except (TypeError, ValueError):
-      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
-                                 errors.ECODE_INVAL)
-
-    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
-    if ext_provider and op.disk_template != constants.DT_EXT:
-      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
-                                 " disk template, not %s" %
-                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
-                                  op.disk_template), errors.ECODE_INVAL)
-
-    data_vg = disk.get(constants.IDISK_VG, default_vg)
-    name = disk.get(constants.IDISK_NAME, None)
-    if name is not None and name.lower() == constants.VALUE_NONE:
-      name = None
-    new_disk = {
-      constants.IDISK_SIZE: size,
-      constants.IDISK_MODE: mode,
-      constants.IDISK_VG: data_vg,
-      constants.IDISK_NAME: name,
-      }
-
-    if constants.IDISK_METAVG in disk:
-      new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG]
-    if constants.IDISK_ADOPT in disk:
-      new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
-
-    # For extstorage, demand the `provider' option and add any
-    # additional parameters (ext-params) to the dict
-    if op.disk_template == constants.DT_EXT:
-      if ext_provider:
-        new_disk[constants.IDISK_PROVIDER] = ext_provider
-        for key in disk:
-          if key not in constants.IDISK_PARAMS:
-            new_disk[key] = disk[key]
-      else:
-        raise errors.OpPrereqError("Missing provider for template '%s'" %
-                                   constants.DT_EXT, errors.ECODE_INVAL)
-
-    disks.append(new_disk)
-
-  return disks
-
-
-def _ComputeDiskSizePerVG(disk_template, disks):
-  """Compute disk size requirements in the volume group
-
-  """
-  def _compute(disks, payload):
-    """Universal algorithm.
-
-    """
-    vgs = {}
-    for disk in disks:
-      vgs[disk[constants.IDISK_VG]] = \
-        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
-
-    return vgs
-
-  # Required free disk space as a function of disk and swap space
-  req_size_dict = {
-    constants.DT_DISKLESS: {},
-    constants.DT_PLAIN: _compute(disks, 0),
-    # 128 MB are added for drbd metadata for each disk
-    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
-    constants.DT_FILE: {},
-    constants.DT_SHARED_FILE: {},
-    }
-
-  if disk_template not in req_size_dict:
-    raise errors.ProgrammerError("Disk template '%s' size requirement"
-                                 " is unknown" % disk_template)
-
-  return req_size_dict[disk_template]
-
-
-def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
-  """Checks if nodes have enough free disk space in the specified VG.
-
-  This function checks if all given nodes have the needed amount of
-  free disk. In case any node has less disk or we cannot get the
-  information from the node, this function raises an OpPrereqError
-  exception.
-
-  @type lu: C{LogicalUnit}
-  @param lu: a logical unit from which we get configuration data
-  @type nodenames: C{list}
-  @param nodenames: the list of node names to check
-  @type vg: C{str}
-  @param vg: the volume group to check
-  @type requested: C{int}
-  @param requested: the amount of disk in MiB to check for
-  @raise errors.OpPrereqError: if the node doesn't have enough disk,
-      or we cannot check the node
-
-  """
-  es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
-  nodeinfo = lu.rpc.call_node_info(nodenames, [vg], None, es_flags)
-  for node in nodenames:
-    info = nodeinfo[node]
-    info.Raise("Cannot get current information from node %s" % node,
-               prereq=True, ecode=errors.ECODE_ENVIRON)
-    (_, (vg_info, ), _) = info.payload
-    vg_free = vg_info.get("vg_free", None)
-    if not isinstance(vg_free, int):
-      raise errors.OpPrereqError("Can't compute free disk space on node"
-                                 " %s for vg %s, result was '%s'" %
-                                 (node, vg, vg_free), errors.ECODE_ENVIRON)
-    if requested > vg_free:
-      raise errors.OpPrereqError("Not enough disk space on target node %s"
-                                 " vg %s: required %d MiB, available %d MiB" %
-                                 (node, vg, requested, vg_free),
-                                 errors.ECODE_NORES)
-
-
-def _CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
-  """Checks if nodes have enough free disk space in all the VGs.
-
-  This function checks if all given nodes have the needed amount of
-  free disk. In case any node has less disk or we cannot get the
-  information from the node, this function raises an OpPrereqError
-  exception.
-
-  @type lu: C{LogicalUnit}
-  @param lu: a logical unit from which we get configuration data
-  @type nodenames: C{list}
-  @param nodenames: the list of node names to check
-  @type req_sizes: C{dict}
-  @param req_sizes: the hash of vg and corresponding amount of disk in
-      MiB to check for
-  @raise errors.OpPrereqError: if the node doesn't have enough disk,
-      or we cannot check the node
-
-  """
-  for vg, req_size in req_sizes.items():
-    _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
-
-
-def _CheckNodeVmCapable(lu, node):
-  """Ensure that a given node is vm capable.
-
-  @param lu: the LU on behalf of which we make the check
-  @param node: the node to check
-  @raise errors.OpPrereqError: if the node is not vm capable
-
-  """
-  if not lu.cfg.GetNodeInfo(node).vm_capable:
-    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
-                               errors.ECODE_STATE)
-
-
 def _ComputeIPolicyInstanceSpecViolation(
   ipolicy, instance_spec, disk_template,
   _compute_fn=_ComputeIPolicySpecViolation):
@@ -723,457 +394,6 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
   return free_mem
 
 
-def _GenerateUniqueNames(lu, exts):
-  """Generate a suitable LV name.
-
-  This will generate a logical volume name for the given instance.
-
-  """
-  results = []
-  for val in exts:
-    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
-    results.append("%s%s" % (new_id, val))
-  return results
-
-
-def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
-                         iv_name, p_minor, s_minor):
-  """Generate a drbd8 device complete with its children.
-
-  """
-  assert len(vgnames) == len(names) == 2
-  port = lu.cfg.AllocatePort()
-  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
-
-  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
-                          logical_id=(vgnames[0], names[0]),
-                          params={})
-  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
-  dev_meta = objects.Disk(dev_type=constants.LD_LV,
-                          size=constants.DRBD_META_SIZE,
-                          logical_id=(vgnames[1], names[1]),
-                          params={})
-  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
-  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
-                          logical_id=(primary, secondary, port,
-                                      p_minor, s_minor,
-                                      shared_secret),
-                          children=[dev_data, dev_meta],
-                          iv_name=iv_name, params={})
-  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
-  return drbd_dev
-
-
-def _GenerateDiskTemplate(
-  lu, template_name, instance_name, primary_node, secondary_nodes,
-  disk_info, file_storage_dir, file_driver, base_index,
-  feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
-  _req_shr_file_storage=opcodes.RequireSharedFileStorage):
-  """Generate the entire disk layout for a given template type.
-
-  """
-  vgname = lu.cfg.GetVGName()
-  disk_count = len(disk_info)
-  disks = []
-
-  if template_name == constants.DT_DISKLESS:
-    pass
-  elif template_name == constants.DT_DRBD8:
-    if len(secondary_nodes) != 1:
-      raise errors.ProgrammerError("Wrong template configuration")
-    remote_node = secondary_nodes[0]
-    minors = lu.cfg.AllocateDRBDMinor(
-      [primary_node, remote_node] * len(disk_info), instance_name)
-
-    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
-                                                       full_disk_params)
-    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
-
-    names = []
-    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
-                                               for i in range(disk_count)]):
-      names.append(lv_prefix + "_data")
-      names.append(lv_prefix + "_meta")
-    for idx, disk in enumerate(disk_info):
-      disk_index = idx + base_index
-      data_vg = disk.get(constants.IDISK_VG, vgname)
-      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
-      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
-                                      disk[constants.IDISK_SIZE],
-                                      [data_vg, meta_vg],
-                                      names[idx * 2:idx * 2 + 2],
-                                      "disk/%d" % disk_index,
-                                      minors[idx * 2], minors[idx * 2 + 1])
-      disk_dev.mode = disk[constants.IDISK_MODE]
-      disk_dev.name = disk.get(constants.IDISK_NAME, None)
-      disks.append(disk_dev)
-  else:
-    if secondary_nodes:
-      raise errors.ProgrammerError("Wrong template configuration")
-
-    if template_name == constants.DT_FILE:
-      _req_file_storage()
-    elif template_name == constants.DT_SHARED_FILE:
-      _req_shr_file_storage()
-
-    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
-    if name_prefix is None:
-      names = None
-    else:
-      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
-                                        (name_prefix, base_index + i)
-                                        for i in range(disk_count)])
-
-    if template_name == constants.DT_PLAIN:
-
-      def logical_id_fn(idx, _, disk):
-        vg = disk.get(constants.IDISK_VG, vgname)
-        return (vg, names[idx])
-
-    elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
-      logical_id_fn = \
-        lambda _, disk_index, disk: (file_driver,
-                                     "%s/disk%d" % (file_storage_dir,
-                                                    disk_index))
-    elif template_name == constants.DT_BLOCK:
-      logical_id_fn = \
-        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
-                                       disk[constants.IDISK_ADOPT])
-    elif template_name == constants.DT_RBD:
-      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
-    elif template_name == constants.DT_EXT:
-      def logical_id_fn(idx, _, disk):
-        provider = disk.get(constants.IDISK_PROVIDER, None)
-        if provider is None:
-          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
-                                       " not found", constants.DT_EXT,
-                                       constants.IDISK_PROVIDER)
-        return (provider, names[idx])
-    else:
-      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
-
-    dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
-
-    for idx, disk in enumerate(disk_info):
-      params = {}
-      # Only for the Ext template add disk_info to params
-      if template_name == constants.DT_EXT:
-        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
-        for key in disk:
-          if key not in constants.IDISK_PARAMS:
-            params[key] = disk[key]
-      disk_index = idx + base_index
-      size = disk[constants.IDISK_SIZE]
-      feedback_fn("* disk %s, size %s" %
-                  (disk_index, utils.FormatUnit(size, "h")))
-      disk_dev = objects.Disk(dev_type=dev_type, size=size,
-                              logical_id=logical_id_fn(idx, disk_index, disk),
-                              iv_name="disk/%d" % disk_index,
-                              mode=disk[constants.IDISK_MODE],
-                              params=params)
-      disk_dev.name = disk.get(constants.IDISK_NAME, None)
-      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
-      disks.append(disk_dev)
-
-  return disks
-
-
-def _CreateSingleBlockDev(lu, node, instance, device, info, force_open,
-                          excl_stor):
-  """Create a single block device on a given node.
-
-  This will not recurse over children of the device, so they must be
-  created in advance.
-
-  @param lu: the lu on whose behalf we execute
-  @param node: the node on which to create the device
-  @type instance: L{objects.Instance}
-  @param instance: the instance which owns the device
-  @type device: L{objects.Disk}
-  @param device: the device to create
-  @param info: the extra 'metadata' we should attach to the device
-      (this will be represented as a LVM tag)
-  @type force_open: boolean
-  @param force_open: this parameter will be passes to the
-      L{backend.BlockdevCreate} function where it specifies
-      whether we run on primary or not, and it affects both
-      the child assembly and the device own Open() execution
-  @type excl_stor: boolean
-  @param excl_stor: Whether exclusive_storage is active for the node
-
-  """
-  lu.cfg.SetDiskID(device, node)
-  result = lu.rpc.call_blockdev_create(node, device, device.size,
-                                       instance.name, force_open, info,
-                                       excl_stor)
-  result.Raise("Can't create block device %s on"
-               " node %s for instance %s" % (device, node, instance.name))
-  if device.physical_id is None:
-    device.physical_id = result.payload
-
-
-def _CreateBlockDevInner(lu, node, instance, device, force_create,
-                         info, force_open, excl_stor):
-  """Create a tree of block devices on a given node.
-
-  If this device type has to be created on secondaries, create it and
-  all its children.
-
-  If not, just recurse to children keeping the same 'force' value.
-
-  @attention: The device has to be annotated already.
-
-  @param lu: the lu on whose behalf we execute
-  @param node: the node on which to create the device
-  @type instance: L{objects.Instance}
-  @param instance: the instance which owns the device
-  @type device: L{objects.Disk}
-  @param device: the device to create
-  @type force_create: boolean
-  @param force_create: whether to force creation of this device; this
-      will be change to True whenever we find a device which has
-      CreateOnSecondary() attribute
-  @param info: the extra 'metadata' we should attach to the device
-      (this will be represented as a LVM tag)
-  @type force_open: boolean
-  @param force_open: this parameter will be passes to the
-      L{backend.BlockdevCreate} function where it specifies
-      whether we run on primary or not, and it affects both
-      the child assembly and the device own Open() execution
-  @type excl_stor: boolean
-  @param excl_stor: Whether exclusive_storage is active for the node
-
-  @return: list of created devices
-  """
-  created_devices = []
-  try:
-    if device.CreateOnSecondary():
-      force_create = True
-
-    if device.children:
-      for child in device.children:
-        devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
-                                    info, force_open, excl_stor)
-        created_devices.extend(devs)
-
-    if not force_create:
-      return created_devices
-
-    _CreateSingleBlockDev(lu, node, instance, device, info, force_open,
-                          excl_stor)
-    # The device has been completely created, so there is no point in keeping
-    # its subdevices in the list. We just add the device itself instead.
-    created_devices = [(node, device)]
-    return created_devices
-
-  except errors.DeviceCreationError, e:
-    e.created_devices.extend(created_devices)
-    raise e
-  except errors.OpExecError, e:
-    raise errors.DeviceCreationError(str(e), created_devices)
-
-
-def _IsExclusiveStorageEnabledNodeName(cfg, nodename):
-  """Whether exclusive_storage is in effect for the given node.
-
-  @type cfg: L{config.ConfigWriter}
-  @param cfg: The cluster configuration
-  @type nodename: string
-  @param nodename: The node
-  @rtype: bool
-  @return: The effective value of exclusive_storage
-  @raise errors.OpPrereqError: if no node exists with the given name
-
-  """
-  ni = cfg.GetNodeInfo(nodename)
-  if ni is None:
-    raise errors.OpPrereqError("Invalid node name %s" % nodename,
-                               errors.ECODE_NOENT)
-  return _IsExclusiveStorageEnabledNode(cfg, ni)
-
-
-def _CreateBlockDev(lu, node, instance, device, force_create, info,
-                    force_open):
-  """Wrapper around L{_CreateBlockDevInner}.
-
-  This method annotates the root device first.
-
-  """
-  (disk,) = _AnnotateDiskParams(instance, [device], lu.cfg)
-  excl_stor = _IsExclusiveStorageEnabledNodeName(lu.cfg, node)
-  return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
-                              force_open, excl_stor)
-
-
-def _CreateDisks(lu, instance, to_skip=None, target_node=None):
-  """Create all disks for an instance.
-
-  This abstracts away some work from AddInstance.
-
-  @type lu: L{LogicalUnit}
-  @param lu: the logical unit on whose behalf we execute
-  @type instance: L{objects.Instance}
-  @param instance: the instance whose disks we should create
-  @type to_skip: list
-  @param to_skip: list of indices to skip
-  @type target_node: string
-  @param target_node: if passed, overrides the target node for creation
-  @rtype: boolean
-  @return: the success of the creation
-
-  """
-  info = _GetInstanceInfoText(instance)
-  if target_node is None:
-    pnode = instance.primary_node
-    all_nodes = instance.all_nodes
-  else:
-    pnode = target_node
-    all_nodes = [pnode]
-
-  if instance.disk_template in constants.DTS_FILEBASED:
-    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
-    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
-
-    result.Raise("Failed to create directory '%s' on"
-                 " node %s" % (file_storage_dir, pnode))
-
-  disks_created = []
-  # Note: this needs to be kept in sync with adding of disks in
-  # LUInstanceSetParams
-  for idx, device in enumerate(instance.disks):
-    if to_skip and idx in to_skip:
-      continue
-    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
-    #HARDCODE
-    for node in all_nodes:
-      f_create = node == pnode
-      try:
-        _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
-        disks_created.append((node, device))
-      except errors.OpExecError:
-        logging.warning("Creating disk %s for instance '%s' failed",
-                        idx, instance.name)
-      except errors.DeviceCreationError, e:
-        logging.warning("Creating disk %s for instance '%s' failed",
-                        idx, instance.name)
-        disks_created.extend(e.created_devices)
-        for (node, disk) in disks_created:
-          lu.cfg.SetDiskID(disk, node)
-          result = lu.rpc.call_blockdev_remove(node, disk)
-          if result.fail_msg:
-            logging.warning("Failed to remove newly-created disk %s on node %s:"
-                            " %s", device, node, result.fail_msg)
-        raise errors.OpExecError(e.message)
-
-
-def _CalcEta(time_taken, written, total_size):
-  """Calculates the ETA based on size written and total size.
-
-  @param time_taken: The time taken so far
-  @param written: amount written so far
-  @param total_size: The total size of data to be written
-  @return: The remaining time in seconds
-
-  """
-  avg_time = time_taken / float(written)
-  return (total_size - written) * avg_time
-
-
-def _WipeDisks(lu, instance, disks=None):
-  """Wipes instance disks.
-
-  @type lu: L{LogicalUnit}
-  @param lu: the logical unit on whose behalf we execute
-  @type instance: L{objects.Instance}
-  @param instance: the instance whose disks we should create
-  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
-  @param disks: Disk details; tuple contains disk index, disk object and the
-    start offset
-
-  """
-  node = instance.primary_node
-
-  if disks is None:
-    disks = [(idx, disk, 0)
-             for (idx, disk) in enumerate(instance.disks)]
-
-  for (_, device, _) in disks:
-    lu.cfg.SetDiskID(device, node)
-
-  logging.info("Pausing synchronization of disks of instance '%s'",
-               instance.name)
-  result = lu.rpc.call_blockdev_pause_resume_sync(node,
-                                                  (map(compat.snd, disks),
-                                                   instance),
-                                                  True)
-  result.Raise("Failed to pause disk synchronization on node '%s'" % node)
-
-  for idx, success in enumerate(result.payload):
-    if not success:
-      logging.warn("Pausing synchronization of disk %s of instance '%s'"
-                   " failed", idx, instance.name)
-
-  try:
-    for (idx, device, offset) in disks:
-      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
-      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
-      wipe_chunk_size = \
-        int(min(constants.MAX_WIPE_CHUNK,
-                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
-
-      size = device.size
-      last_output = 0
-      start_time = time.time()
-
-      if offset == 0:
-        info_text = ""
-      else:
-        info_text = (" (from %s to %s)" %
-                     (utils.FormatUnit(offset, "h"),
-                      utils.FormatUnit(size, "h")))
-
-      lu.LogInfo("* Wiping disk %s%s", idx, info_text)
-
-      logging.info("Wiping disk %d for instance %s on node %s using"
-                   " chunk size %s", idx, instance.name, node, wipe_chunk_size)
-
-      while offset < size:
-        wipe_size = min(wipe_chunk_size, size - offset)
-
-        logging.debug("Wiping disk %d, offset %s, chunk %s",
-                      idx, offset, wipe_size)
-
-        result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
-                                           wipe_size)
-        result.Raise("Could not wipe disk %d at offset %d for size %d" %
-                     (idx, offset, wipe_size))
-
-        now = time.time()
-        offset += wipe_size
-        if now - last_output >= 60:
-          eta = _CalcEta(now - start_time, offset, size)
-          lu.LogInfo(" - done: %.1f%% ETA: %s",
-                     offset / float(size) * 100, utils.FormatSeconds(eta))
-          last_output = now
-  finally:
-    logging.info("Resuming synchronization of disks for instance '%s'",
-                 instance.name)
-
-    result = lu.rpc.call_blockdev_pause_resume_sync(node,
-                                                    (map(compat.snd, disks),
-                                                     instance),
-                                                    False)
-
-    if result.fail_msg:
-      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
-                    node, result.fail_msg)
-    else:
-      for idx, success in enumerate(result.payload):
-        if not success:
-          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
-                        " failed", idx, instance.name)
-
-
 class LUInstanceCreate(LogicalUnit):
   """Create an instance.
 
@@ -2259,13 +1479,6 @@ class LUInstanceCreate(LogicalUnit):
     return list(iobj.all_nodes)
 
 
-def _GetInstanceInfoText(instance):
-  """Compute that text that should be added to the disk's metadata.
-
-  """
-  return "originstname+%s" % instance.name
-
-
 class LUInstanceRename(LogicalUnit):
   """Rename an instance.
 
@@ -2476,53 +1689,6 @@ def _CheckInstanceBridgesExist(lu, instance, node=None):
   _CheckNicsBridgesExist(lu, instance.nics, node)
 
 
-def _ComputeIPolicyNodeViolation(ipolicy, instance, current_group,
-                                 target_group, cfg,
-                                 _compute_fn=_ComputeIPolicyInstanceViolation):
-  """Compute if instance meets the specs of the new target group.
-
-  @param ipolicy: The ipolicy to verify
-  @param instance: The instance object to verify
-  @param current_group: The current group of the instance
-  @param target_group: The new group of the instance
-  @type cfg: L{config.ConfigWriter}
-  @param cfg: Cluster configuration
-  @param _compute_fn: The function to verify ipolicy (unittest only)
-  @see: L{_ComputeIPolicySpecViolation}
-
-  """
-  if current_group == target_group:
-    return []
-  else:
-    return _compute_fn(ipolicy, instance, cfg)
-
-
-def _CheckTargetNodeIPolicy(lu, ipolicy, instance, node, cfg, ignore=False,
-                            _compute_fn=_ComputeIPolicyNodeViolation):
-  """Checks that the target node is correct in terms of instance policy.
-
-  @param ipolicy: The ipolicy to verify
-  @param instance: The instance object to verify
-  @param node: The new node to relocate
-  @type cfg: L{config.ConfigWriter}
-  @param cfg: Cluster configuration
-  @param ignore: Ignore violations of the ipolicy
-  @param _compute_fn: The function to verify ipolicy (unittest only)
-  @see: L{_ComputeIPolicySpecViolation}
-
-  """
-  primary_node = lu.cfg.GetNodeInfo(instance.primary_node)
-  res = _compute_fn(ipolicy, instance, primary_node.group, node.group, cfg)
-
-  if res:
-    msg = ("Instance does not meet target node group's (%s) instance"
-           " policy: %s") % (node.group, utils.CommaJoin(res))
-    if ignore:
-      lu.LogWarning(msg)
-    else:
-      raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
-
-
 class LUInstanceMove(LogicalUnit):
   """Move an instance by data-copying.
 
@@ -3072,821 +2238,72 @@ class LUInstanceQueryData(NoHooksLU):
 
     cluster = self.cfg.GetClusterInfo()
 
-    node_names = itertools.chain(*(i.all_nodes for i in self.wanted_instances))
-    nodes = dict(self.cfg.GetMultiNodeInfo(node_names))
-
-    groups = dict(self.cfg.GetMultiNodeGroupInfo(node.group
-                                                 for node in nodes.values()))
-
-    group2name_fn = lambda uuid: groups[uuid].name
-    for instance in self.wanted_instances:
-      pnode = nodes[instance.primary_node]
-
-      if self.op.static or pnode.offline:
-        remote_state = None
-        if pnode.offline:
-          self.LogWarning("Primary node %s is marked offline, returning static"
-                          " information only for instance %s" %
-                          (pnode.name, instance.name))
-      else:
-        remote_info = self.rpc.call_instance_info(instance.primary_node,
-                                                  instance.name,
-                                                  instance.hypervisor)
-        remote_info.Raise("Error checking node %s" % instance.primary_node)
-        remote_info = remote_info.payload
-        if remote_info and "state" in remote_info:
-          remote_state = "up"
-        else:
-          if instance.admin_state == constants.ADMINST_UP:
-            remote_state = "down"
-          else:
-            remote_state = instance.admin_state
-
-      disks = map(compat.partial(self._ComputeDiskStatus, instance, None),
-                  instance.disks)
-
-      snodes_group_uuids = [nodes[snode_name].group
-                            for snode_name in instance.secondary_nodes]
-
-      result[instance.name] = {
-        "name": instance.name,
-        "config_state": instance.admin_state,
-        "run_state": remote_state,
-        "pnode": instance.primary_node,
-        "pnode_group_uuid": pnode.group,
-        "pnode_group_name": group2name_fn(pnode.group),
-        "snodes": instance.secondary_nodes,
-        "snodes_group_uuids": snodes_group_uuids,
-        "snodes_group_names": map(group2name_fn, snodes_group_uuids),
-        "os": instance.os,
-        # this happens to be the same format used for hooks
-        "nics": _NICListToTuple(self, instance.nics),
-        "disk_template": instance.disk_template,
-        "disks": disks,
-        "hypervisor": instance.hypervisor,
-        "network_port": instance.network_port,
-        "hv_instance": instance.hvparams,
-        "hv_actual": cluster.FillHV(instance, skip_globals=True),
-        "be_instance": instance.beparams,
-        "be_actual": cluster.FillBE(instance),
-        "os_instance": instance.osparams,
-        "os_actual": cluster.SimpleFillOS(instance.os, instance.osparams),
-        "serial_no": instance.serial_no,
-        "mtime": instance.mtime,
-        "ctime": instance.ctime,
-        "uuid": instance.uuid,
-        }
-
-    return result
-
-
-class LUInstanceRecreateDisks(LogicalUnit):
-  """Recreate an instance's missing disks.
-
-  """
-  HPATH = "instance-recreate-disks"
-  HTYPE = constants.HTYPE_INSTANCE
-  REQ_BGL = False
-
-  _MODIFYABLE = compat.UniqueFrozenset([
-    constants.IDISK_SIZE,
-    constants.IDISK_MODE,
-    ])
-
-  # New or changed disk parameters may have different semantics
-  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
-    constants.IDISK_ADOPT,
-
-    # TODO: Implement support changing VG while recreating
-    constants.IDISK_VG,
-    constants.IDISK_METAVG,
-    constants.IDISK_PROVIDER,
-    constants.IDISK_NAME,
-    ]))
-
-  def _RunAllocator(self):
-    """Run the allocator based on input opcode.
-
-    """
-    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
-
-    # FIXME
-    # The allocator should actually run in "relocate" mode, but current
-    # allocators don't support relocating all the nodes of an instance at
-    # the same time. As a workaround we use "allocate" mode, but this is
-    # suboptimal for two reasons:
-    # - The instance name passed to the allocator is present in the list of
-    #   existing instances, so there could be a conflict within the
-    #   internal structures of the allocator. This doesn't happen with the
-    #   current allocators, but it's a liability.
-    # - The allocator counts the resources used by the instance twice: once
-    #   because the instance exists already, and once because it tries to
-    #   allocate a new instance.
-    # The allocator could choose some of the nodes on which the instance is
-    # running, but that's not a problem. If the instance nodes are broken,
-    # they should be already be marked as drained or offline, and hence
-    # skipped by the allocator. If instance disks have been lost for other
-    # reasons, then recreating the disks on the same nodes should be fine.
-    disk_template = self.instance.disk_template
-    spindle_use = be_full[constants.BE_SPINDLE_USE]
-    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
-                                        disk_template=disk_template,
-                                        tags=list(self.instance.GetTags()),
-                                        os=self.instance.os,
-                                        nics=[{}],
-                                        vcpus=be_full[constants.BE_VCPUS],
-                                        memory=be_full[constants.BE_MAXMEM],
-                                        spindle_use=spindle_use,
-                                        disks=[{constants.IDISK_SIZE: d.size,
-                                                constants.IDISK_MODE: d.mode}
-                                               for d in self.instance.disks],
-                                        hypervisor=self.instance.hypervisor,
-                                        node_whitelist=None)
-    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
-
-    ial.Run(self.op.iallocator)
-
-    assert req.RequiredNodes() == len(self.instance.all_nodes)
-
-    if not ial.success:
-      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
-                                 " %s" % (self.op.iallocator, ial.info),
-                                 errors.ECODE_NORES)
-
-    self.op.nodes = ial.result
-    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
-                 self.op.instance_name, self.op.iallocator,
-                 utils.CommaJoin(ial.result))
-
-  def CheckArguments(self):
-    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
-      # Normalize and convert deprecated list of disk indices
-      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
-
-    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
-    if duplicates:
-      raise errors.OpPrereqError("Some disks have been specified more than"
-                                 " once: %s" % utils.CommaJoin(duplicates),
-                                 errors.ECODE_INVAL)
-
-    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
-    # when neither iallocator nor nodes are specified
-    if self.op.iallocator or self.op.nodes:
-      _CheckIAllocatorOrNode(self, "iallocator", "nodes")
-
-    for (idx, params) in self.op.disks:
-      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
-      unsupported = frozenset(params.keys()) - self._MODIFYABLE
-      if unsupported:
-        raise errors.OpPrereqError("Parameters for disk %s try to change"
-                                   " unmodifyable parameter(s): %s" %
-                                   (idx, utils.CommaJoin(unsupported)),
-                                   errors.ECODE_INVAL)
-
-  def ExpandNames(self):
-    self._ExpandAndLockInstance()
-    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
-
-    if self.op.nodes:
-      self.op.nodes = [_ExpandNodeName(self.cfg, n) for n in self.op.nodes]
-      self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
-    else:
-      self.needed_locks[locking.LEVEL_NODE] = []
-      if self.op.iallocator:
-        # iallocator will select a new node in the same group
-        self.needed_locks[locking.LEVEL_NODEGROUP] = []
-        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
-
-    self.needed_locks[locking.LEVEL_NODE_RES] = []
-
-  def DeclareLocks(self, level):
-    if level == locking.LEVEL_NODEGROUP:
-      assert self.op.iallocator is not None
-      assert not self.op.nodes
-      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
-      self.share_locks[locking.LEVEL_NODEGROUP] = 1
-      # Lock the primary group used by the instance optimistically; this
-      # requires going via the node before it's locked, requiring
-      # verification later on
-      self.needed_locks[locking.LEVEL_NODEGROUP] = \
-        self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
-
-    elif level == locking.LEVEL_NODE:
-      # If an allocator is used, then we lock all the nodes in the current
-      # instance group, as we don't know yet which ones will be selected;
-      # if we replace the nodes without using an allocator, locks are
-      # already declared in ExpandNames; otherwise, we need to lock all the
-      # instance nodes for disk re-creation
-      if self.op.iallocator:
-        assert not self.op.nodes
-        assert not self.needed_locks[locking.LEVEL_NODE]
-        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
-
-        # Lock member nodes of the group of the primary node
-        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
-          self.needed_locks[locking.LEVEL_NODE].extend(
-            self.cfg.GetNodeGroup(group_uuid).members)
-
-        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
-      elif not self.op.nodes:
-        self._LockInstancesNodes(primary_only=False)
-    elif level == locking.LEVEL_NODE_RES:
-      # Copy node locks
-      self.needed_locks[locking.LEVEL_NODE_RES] = \
-        _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
-
-  def BuildHooksEnv(self):
-    """Build hooks env.
-
-    This runs on master, primary and secondary nodes of the instance.
-
-    """
-    return _BuildInstanceHookEnvByObject(self, self.instance)
-
-  def BuildHooksNodes(self):
-    """Build hooks nodes.
-
-    """
-    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
-    return (nl, nl)
-
-  def CheckPrereq(self):
-    """Check prerequisites.
-
-    This checks that the instance is in the cluster and is not running.
-
-    """
-    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
-    assert instance is not None, \
-      "Cannot retrieve locked instance %s" % self.op.instance_name
-    if self.op.nodes:
-      if len(self.op.nodes) != len(instance.all_nodes):
-        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
-                                   " %d replacement nodes were specified" %
-                                   (instance.name, len(instance.all_nodes),
-                                    len(self.op.nodes)),
-                                   errors.ECODE_INVAL)
-      assert instance.disk_template != constants.DT_DRBD8 or \
-             len(self.op.nodes) == 2
-      assert instance.disk_template != constants.DT_PLAIN or \
-             len(self.op.nodes) == 1
-      primary_node = self.op.nodes[0]
-    else:
-      primary_node = instance.primary_node
-    if not self.op.iallocator:
-      _CheckNodeOnline(self, primary_node)
-
-    if instance.disk_template == constants.DT_DISKLESS:
-      raise errors.OpPrereqError("Instance '%s' has no disks" %
-                                 self.op.instance_name, errors.ECODE_INVAL)
-
-    # Verify if node group locks are still correct
-    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
-    if owned_groups:
-      # Node group locks are acquired only for the primary node (and only
-      # when the allocator is used)
-      _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
-                               primary_only=True)
-
-    # if we replace nodes *and* the old primary is offline, we don't
-    # check the instance state
-    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
-    if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
-      _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
-                          msg="cannot recreate disks")
-
-    if self.op.disks:
-      self.disks = dict(self.op.disks)
-    else:
-      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
-
-    maxidx = max(self.disks.keys())
-    if maxidx >= len(instance.disks):
-      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
-                                 errors.ECODE_INVAL)
-
-    if ((self.op.nodes or self.op.iallocator) and
-         sorted(self.disks.keys()) != range(len(instance.disks))):
-      raise errors.OpPrereqError("Can't recreate disks partially and"
-                                 " change the nodes at the same time",
-                                 errors.ECODE_INVAL)
-
-    self.instance = instance
-
-    if self.op.iallocator:
-      self._RunAllocator()
-      # Release unneeded node and node resource locks
-      _ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
-      _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
-      _ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
-
-    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
-
-  def Exec(self, feedback_fn):
-    """Recreate the disks.
-
-    """
-    instance = self.instance
-
-    assert (self.owned_locks(locking.LEVEL_NODE) ==
-            self.owned_locks(locking.LEVEL_NODE_RES))
-
-    to_skip = []
-    mods = [] # keeps track of needed changes
-
-    for idx, disk in enumerate(instance.disks):
-      try:
-        changes = self.disks[idx]
-      except KeyError:
-        # Disk should not be recreated
-        to_skip.append(idx)
-        continue
-
-      # update secondaries for disks, if needed
-      if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
-        # need to update the nodes and minors
-        assert len(self.op.nodes) == 2
-        assert len(disk.logical_id) == 6 # otherwise disk internals
-                                         # have changed
-        (_, _, old_port, _, _, old_secret) = disk.logical_id
-        new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
-        new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
-                  new_minors[0], new_minors[1], old_secret)
-        assert len(disk.logical_id) == len(new_id)
-      else:
-        new_id = None
-
-      mods.append((idx, new_id, changes))
-
-    # now that we have passed all asserts above, we can apply the mods
-    # in a single run (to avoid partial changes)
-    for idx, new_id, changes in mods:
-      disk = instance.disks[idx]
-      if new_id is not None:
-        assert disk.dev_type == constants.LD_DRBD8
-        disk.logical_id = new_id
-      if changes:
-        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
-                    mode=changes.get(constants.IDISK_MODE, None))
-
-    # change primary node, if needed
-    if self.op.nodes:
-      instance.primary_node = self.op.nodes[0]
-      self.LogWarning("Changing the instance's nodes, you will have to"
-                      " remove any disks left on the older nodes manually")
-
-    if self.op.nodes:
-      self.cfg.Update(instance, feedback_fn)
-
-    # All touched nodes must be locked
-    mylocks = self.owned_locks(locking.LEVEL_NODE)
-    assert mylocks.issuperset(frozenset(instance.all_nodes))
-    _CreateDisks(self, instance, to_skip=to_skip)
-
-
-def _SafeShutdownInstanceDisks(lu, instance, disks=None):
-  """Shutdown block devices of an instance.
-
-  This function checks if an instance is running, before calling
-  _ShutdownInstanceDisks.
-
-  """
-  _CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
-  _ShutdownInstanceDisks(lu, instance, disks=disks)
-
-
-def _DiskSizeInBytesToMebibytes(lu, size):
-  """Converts a disk size in bytes to mebibytes.
-
-  Warns and rounds up if the size isn't an even multiple of 1 MiB.
-
-  """
-  (mib, remainder) = divmod(size, 1024 * 1024)
-
-  if remainder != 0:
-    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
-                  " to not overwrite existing data (%s bytes will not be"
-                  " wiped)", (1024 * 1024) - remainder)
-    mib += 1
-
-  return mib
-
-
-class LUInstanceGrowDisk(LogicalUnit):
-  """Grow a disk of an instance.
-
-  """
-  HPATH = "disk-grow"
-  HTYPE = constants.HTYPE_INSTANCE
-  REQ_BGL = False
-
-  def ExpandNames(self):
-    self._ExpandAndLockInstance()
-    self.needed_locks[locking.LEVEL_NODE] = []
-    self.needed_locks[locking.LEVEL_NODE_RES] = []
-    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
-    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
-
-  def DeclareLocks(self, level):
-    if level == locking.LEVEL_NODE:
-      self._LockInstancesNodes()
-    elif level == locking.LEVEL_NODE_RES:
-      # Copy node locks
-      self.needed_locks[locking.LEVEL_NODE_RES] = \
-        _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
-
-  def BuildHooksEnv(self):
-    """Build hooks env.
-
-    This runs on the master, the primary and all the secondaries.
-
-    """
-    env = {
-      "DISK": self.op.disk,
-      "AMOUNT": self.op.amount,
-      "ABSOLUTE": self.op.absolute,
-      }
-    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
-    return env
-
-  def BuildHooksNodes(self):
-    """Build hooks nodes.
-
-    """
-    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
-    return (nl, nl)
-
-  def CheckPrereq(self):
-    """Check prerequisites.
-
-    This checks that the instance is in the cluster.
-
-    """
-    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
-    assert instance is not None, \
-      "Cannot retrieve locked instance %s" % self.op.instance_name
-    nodenames = list(instance.all_nodes)
-    for node in nodenames:
-      _CheckNodeOnline(self, node)
-
-    self.instance = instance
-
-    if instance.disk_template not in constants.DTS_GROWABLE:
-      raise errors.OpPrereqError("Instance's disk layout does not support"
-                                 " growing", errors.ECODE_INVAL)
-
-    self.disk = instance.FindDisk(self.op.disk)
-
-    if self.op.absolute:
-      self.target = self.op.amount
-      self.delta = self.target - self.disk.size
-      if self.delta < 0:
-        raise errors.OpPrereqError("Requested size (%s) is smaller than "
-                                   "current disk size (%s)" %
-                                   (utils.FormatUnit(self.target, "h"),
-                                    utils.FormatUnit(self.disk.size, "h")),
-                                   errors.ECODE_STATE)
-    else:
-      self.delta = self.op.amount
-      self.target = self.disk.size + self.delta
-      if self.delta < 0:
-        raise errors.OpPrereqError("Requested increment (%s) is negative" %
-                                   utils.FormatUnit(self.delta, "h"),
-                                   errors.ECODE_INVAL)
-
-    self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
-
-  def _CheckDiskSpace(self, nodenames, req_vgspace):
-    template = self.instance.disk_template
-    if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
-      # TODO: check the free disk space for file, when that feature will be
-      # supported
-      nodes = map(self.cfg.GetNodeInfo, nodenames)
-      es_nodes = filter(lambda n: _IsExclusiveStorageEnabledNode(self.cfg, n),
-                        nodes)
-      if es_nodes:
-        # With exclusive storage we need to something smarter than just looking
-        # at free space; for now, let's simply abort the operation.
-        raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
-                                   " is enabled", errors.ECODE_STATE)
-      _CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
-
-  def Exec(self, feedback_fn):
-    """Execute disk grow.
-
-    """
-    instance = self.instance
-    disk = self.disk
-
-    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
-    assert (self.owned_locks(locking.LEVEL_NODE) ==
-            self.owned_locks(locking.LEVEL_NODE_RES))
-
-    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
-
-    disks_ok, _ = _AssembleInstanceDisks(self, self.instance, disks=[disk])
-    if not disks_ok:
-      raise errors.OpExecError("Cannot activate block device to grow")
-
-    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
-                (self.op.disk, instance.name,
-                 utils.FormatUnit(self.delta, "h"),
-                 utils.FormatUnit(self.target, "h")))
-
-    # First run all grow ops in dry-run mode
-    for node in instance.all_nodes:
-      self.cfg.SetDiskID(disk, node)
-      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
-                                           True, True)
-      result.Raise("Dry-run grow request failed to node %s" % node)
-
-    if wipe_disks:
-      # Get disk size from primary node for wiping
-      result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk])
-      result.Raise("Failed to retrieve disk size from node '%s'" %
-                   instance.primary_node)
-
-      (disk_size_in_bytes, ) = result.payload
-
-      if disk_size_in_bytes is None:
-        raise errors.OpExecError("Failed to retrieve disk size from primary"
-                                 " node '%s'" % instance.primary_node)
-
-      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
-
-      assert old_disk_size >= disk.size, \
-        ("Retrieved disk size too small (got %s, should be at least %s)" %
-         (old_disk_size, disk.size))
-    else:
-      old_disk_size = None
-
-    # We know that (as far as we can test) operations across different
-    # nodes will succeed, time to run it for real on the backing storage
-    for node in instance.all_nodes:
-      self.cfg.SetDiskID(disk, node)
-      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
-                                           False, True)
-      result.Raise("Grow request failed to node %s" % node)
-
-    # And now execute it for logical storage, on the primary node
-    node = instance.primary_node
-    self.cfg.SetDiskID(disk, node)
-    result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
-                                         False, False)
-    result.Raise("Grow request failed to node %s" % node)
-
-    disk.RecordGrow(self.delta)
-    self.cfg.Update(instance, feedback_fn)
-
-    # Changes have been recorded, release node lock
-    _ReleaseLocks(self, locking.LEVEL_NODE)
-
-    # Downgrade lock while waiting for sync
-    self.glm.downgrade(locking.LEVEL_INSTANCE)
-
-    assert wipe_disks ^ (old_disk_size is None)
-
-    if wipe_disks:
-      assert instance.disks[self.op.disk] == disk
-
-      # Wipe newly added disk space
-      _WipeDisks(self, instance,
-                 disks=[(self.op.disk, disk, old_disk_size)])
-
-    if self.op.wait_for_sync:
-      disk_abort = not _WaitForSync(self, instance, disks=[disk])
-      if disk_abort:
-        self.LogWarning("Disk syncing has not returned a good status; check"
-                        " the instance")
-      if instance.admin_state != constants.ADMINST_UP:
-        _SafeShutdownInstanceDisks(self, instance, disks=[disk])
-    elif instance.admin_state != constants.ADMINST_UP:
-      self.LogWarning("Not shutting down the disk even if the instance is"
-                      " not supposed to be running because no wait for"
-                      " sync mode was requested")
-
-    assert self.owned_locks(locking.LEVEL_NODE_RES)
-    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
-
-
-class LUInstanceReplaceDisks(LogicalUnit):
-  """Replace the disks of an instance.
-
-  """
-  HPATH = "mirrors-replace"
-  HTYPE = constants.HTYPE_INSTANCE
-  REQ_BGL = False
-
-  def CheckArguments(self):
-    """Check arguments.
-
-    """
-    remote_node = self.op.remote_node
-    ialloc = self.op.iallocator
-    if self.op.mode == constants.REPLACE_DISK_CHG:
-      if remote_node is None and ialloc is None:
-        raise errors.OpPrereqError("When changing the secondary either an"
-                                   " iallocator script must be used or the"
-                                   " new node given", errors.ECODE_INVAL)
-      else:
-        _CheckIAllocatorOrNode(self, "iallocator", "remote_node")
-
-    elif remote_node is not None or ialloc is not None:
-      # Not replacing the secondary
-      raise errors.OpPrereqError("The iallocator and new node options can"
-                                 " only be used when changing the"
-                                 " secondary node", errors.ECODE_INVAL)
-
-  def ExpandNames(self):
-    self._ExpandAndLockInstance()
-
-    assert locking.LEVEL_NODE not in self.needed_locks
-    assert locking.LEVEL_NODE_RES not in self.needed_locks
-    assert locking.LEVEL_NODEGROUP not in self.needed_locks
-
-    assert self.op.iallocator is None or self.op.remote_node is None, \
-      "Conflicting options"
-
-    if self.op.remote_node is not None:
-      self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
-
-      # Warning: do not remove the locking of the new secondary here
-      # unless DRBD8.AddChildren is changed to work in parallel;
-      # currently it doesn't since parallel invocations of
-      # FindUnusedMinor will conflict
-      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
-      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
-    else:
-      self.needed_locks[locking.LEVEL_NODE] = []
-      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
-
-      if self.op.iallocator is not None:
-        # iallocator will select a new node in the same group
-        self.needed_locks[locking.LEVEL_NODEGROUP] = []
-        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
-
-    self.needed_locks[locking.LEVEL_NODE_RES] = []
-
-    self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
-                                   self.op.iallocator, self.op.remote_node,
-                                   self.op.disks, self.op.early_release,
-                                   self.op.ignore_ipolicy)
-
-    self.tasklets = [self.replacer]
-
-  def DeclareLocks(self, level):
-    if level == locking.LEVEL_NODEGROUP:
-      assert self.op.remote_node is None
-      assert self.op.iallocator is not None
-      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
-
-      self.share_locks[locking.LEVEL_NODEGROUP] = 1
-      # Lock all groups used by instance optimistically; this requires going
-      # via the node before it's locked, requiring verification later on
-      self.needed_locks[locking.LEVEL_NODEGROUP] = \
-        self.cfg.GetInstanceNodeGroups(self.op.instance_name)
-
-    elif level == locking.LEVEL_NODE:
-      if self.op.iallocator is not None:
-        assert self.op.remote_node is None
-        assert not self.needed_locks[locking.LEVEL_NODE]
-        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
-
-        # Lock member nodes of all locked groups
-        self.needed_locks[locking.LEVEL_NODE] = \
-          [node_name
-           for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
-           for node_name in self.cfg.GetNodeGroup(group_uuid).members]
-      else:
-        assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
-
-        self._LockInstancesNodes()
-
-    elif level == locking.LEVEL_NODE_RES:
-      # Reuse node locks
-      self.needed_locks[locking.LEVEL_NODE_RES] = \
-        self.needed_locks[locking.LEVEL_NODE]
-
-  def BuildHooksEnv(self):
-    """Build hooks env.
-
-    This runs on the master, the primary and all the secondaries.
-
-    """
-    instance = self.replacer.instance
-    env = {
-      "MODE": self.op.mode,
-      "NEW_SECONDARY": self.op.remote_node,
-      "OLD_SECONDARY": instance.secondary_nodes[0],
-      }
-    env.update(_BuildInstanceHookEnvByObject(self, instance))
-    return env
-
-  def BuildHooksNodes(self):
-    """Build hooks nodes.
-
-    """
-    instance = self.replacer.instance
-    nl = [
-      self.cfg.GetMasterNode(),
-      instance.primary_node,
-      ]
-    if self.op.remote_node is not None:
-      nl.append(self.op.remote_node)
-    return nl, nl
-
-  def CheckPrereq(self):
-    """Check prerequisites.
-
-    """
-    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
-            self.op.iallocator is None)
-
-    # Verify if node group locks are still correct
-    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
-    if owned_groups:
-      _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
-
-    return LogicalUnit.CheckPrereq(self)
-
-
-class LUInstanceActivateDisks(NoHooksLU):
-  """Bring up an instance's disks.
-
-  """
-  REQ_BGL = False
-
-  def ExpandNames(self):
-    self._ExpandAndLockInstance()
-    self.needed_locks[locking.LEVEL_NODE] = []
-    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
-
-  def DeclareLocks(self, level):
-    if level == locking.LEVEL_NODE:
-      self._LockInstancesNodes()
-
-  def CheckPrereq(self):
-    """Check prerequisites.
-
-    This checks that the instance is in the cluster.
-
-    """
-    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
-    assert self.instance is not None, \
-      "Cannot retrieve locked instance %s" % self.op.instance_name
-    _CheckNodeOnline(self, self.instance.primary_node)
-
-  def Exec(self, feedback_fn):
-    """Activate the disks.
-
-    """
-    disks_ok, disks_info = \
-              _AssembleInstanceDisks(self, self.instance,
-                                     ignore_size=self.op.ignore_size)
-    if not disks_ok:
-      raise errors.OpExecError("Cannot activate block devices")
-
-    if self.op.wait_for_sync:
-      if not _WaitForSync(self, self.instance):
-        raise errors.OpExecError("Some disks of the instance are degraded!")
-
-    return disks_info
-
-
-class LUInstanceDeactivateDisks(NoHooksLU):
-  """Shutdown an instance's disks.
-
-  """
-  REQ_BGL = False
-
-  def ExpandNames(self):
-    self._ExpandAndLockInstance()
-    self.needed_locks[locking.LEVEL_NODE] = []
-    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+    node_names = itertools.chain(*(i.all_nodes for i in self.wanted_instances))
+    nodes = dict(self.cfg.GetMultiNodeInfo(node_names))
 
-  def DeclareLocks(self, level):
-    if level == locking.LEVEL_NODE:
-      self._LockInstancesNodes()
+    groups = dict(self.cfg.GetMultiNodeGroupInfo(node.group
+                                                 for node in nodes.values()))
 
-  def CheckPrereq(self):
-    """Check prerequisites.
+    group2name_fn = lambda uuid: groups[uuid].name
+    for instance in self.wanted_instances:
+      pnode = nodes[instance.primary_node]
 
-    This checks that the instance is in the cluster.
+      if self.op.static or pnode.offline:
+        remote_state = None
+        if pnode.offline:
+          self.LogWarning("Primary node %s is marked offline, returning static"
+                          " information only for instance %s" %
+                          (pnode.name, instance.name))
+      else:
+        remote_info = self.rpc.call_instance_info(instance.primary_node,
+                                                  instance.name,
+                                                  instance.hypervisor)
+        remote_info.Raise("Error checking node %s" % instance.primary_node)
+        remote_info = remote_info.payload
+        if remote_info and "state" in remote_info:
+          remote_state = "up"
+        else:
+          if instance.admin_state == constants.ADMINST_UP:
+            remote_state = "down"
+          else:
+            remote_state = instance.admin_state
 
-    """
-    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
-    assert self.instance is not None, \
-      "Cannot retrieve locked instance %s" % self.op.instance_name
+      disks = map(compat.partial(self._ComputeDiskStatus, instance, None),
+                  instance.disks)
 
-  def Exec(self, feedback_fn):
-    """Deactivate the disks
+      snodes_group_uuids = [nodes[snode_name].group
+                            for snode_name in instance.secondary_nodes]
 
-    """
-    instance = self.instance
-    if self.op.force:
-      _ShutdownInstanceDisks(self, instance)
-    else:
-      _SafeShutdownInstanceDisks(self, instance)
+      result[instance.name] = {
+        "name": instance.name,
+        "config_state": instance.admin_state,
+        "run_state": remote_state,
+        "pnode": instance.primary_node,
+        "pnode_group_uuid": pnode.group,
+        "pnode_group_name": group2name_fn(pnode.group),
+        "snodes": instance.secondary_nodes,
+        "snodes_group_uuids": snodes_group_uuids,
+        "snodes_group_names": map(group2name_fn, snodes_group_uuids),
+        "os": instance.os,
+        # this happens to be the same format used for hooks
+        "nics": _NICListToTuple(self, instance.nics),
+        "disk_template": instance.disk_template,
+        "disks": disks,
+        "hypervisor": instance.hypervisor,
+        "network_port": instance.network_port,
+        "hv_instance": instance.hvparams,
+        "hv_actual": cluster.FillHV(instance, skip_globals=True),
+        "be_instance": instance.beparams,
+        "be_actual": cluster.FillBE(instance),
+        "os_instance": instance.osparams,
+        "os_actual": cluster.SimpleFillOS(instance.os, instance.osparams),
+        "serial_no": instance.serial_no,
+        "mtime": instance.mtime,
+        "ctime": instance.ctime,
+        "uuid": instance.uuid,
+        }
+
+    return result
 
 
 class LUInstanceStartup(LogicalUnit):
@@ -6177,53 +4594,6 @@ class LUInstanceChangeGroup(LogicalUnit):
     return ResultWithJobs(jobs)
 
 
-def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
-                               ldisk=False):
-  """Check that mirrors are not degraded.
-
-  @attention: The device has to be annotated already.
-
-  The ldisk parameter, if True, will change the test from the
-  is_degraded attribute (which represents overall non-ok status for
-  the device(s)) to the ldisk (representing the local storage status).
-
-  """
-  lu.cfg.SetDiskID(dev, node)
-
-  result = True
-
-  if on_primary or dev.AssembleOnSecondary():
-    rstats = lu.rpc.call_blockdev_find(node, dev)
-    msg = rstats.fail_msg
-    if msg:
-      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
-      result = False
-    elif not rstats.payload:
-      lu.LogWarning("Can't find disk on node %s", node)
-      result = False
-    else:
-      if ldisk:
-        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
-      else:
-        result = result and not rstats.payload.is_degraded
-
-  if dev.children:
-    for child in dev.children:
-      result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
-                                                     on_primary)
-
-  return result
-
-
-def _CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
-  """Wrapper around L{_CheckDiskConsistencyInner}.
-
-  """
-  (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
-  return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
-                                    ldisk=ldisk)
-
-
 class TLMigrateInstance(Tasklet):
   """Tasklet class for instance migration.
 
@@ -6926,760 +5296,3 @@ class TLMigrateInstance(Tasklet):
         return self._ExecCleanup()
       else:
         return self._ExecMigration()
-
-
-def _BlockdevFind(lu, node, dev, instance):
-  """Wrapper around call_blockdev_find to annotate diskparams.
-
-  @param lu: A reference to the lu object
-  @param node: The node to call out
-  @param dev: The device to find
-  @param instance: The instance object the device belongs to
-  @returns The result of the rpc call
-
-  """
-  (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
-  return lu.rpc.call_blockdev_find(node, disk)
-
-
-class TLReplaceDisks(Tasklet):
-  """Replaces disks for an instance.
-
-  Note: Locking is not within the scope of this class.
-
-  """
-  def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
-               disks, early_release, ignore_ipolicy):
-    """Initializes this class.
-
-    """
-    Tasklet.__init__(self, lu)
-
-    # Parameters
-    self.instance_name = instance_name
-    self.mode = mode
-    self.iallocator_name = iallocator_name
-    self.remote_node = remote_node
-    self.disks = disks
-    self.early_release = early_release
-    self.ignore_ipolicy = ignore_ipolicy
-
-    # Runtime data
-    self.instance = None
-    self.new_node = None
-    self.target_node = None
-    self.other_node = None
-    self.remote_node_info = None
-    self.node_secondary_ip = None
-
-  @staticmethod
-  def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
-    """Compute a new secondary node using an IAllocator.
-
-    """
-    req = iallocator.IAReqRelocate(name=instance_name,
-                                   relocate_from=list(relocate_from))
-    ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
-
-    ial.Run(iallocator_name)
-
-    if not ial.success:
-      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
-                                 " %s" % (iallocator_name, ial.info),
-                                 errors.ECODE_NORES)
-
-    remote_node_name = ial.result[0]
-
-    lu.LogInfo("Selected new secondary for instance '%s': %s",
-               instance_name, remote_node_name)
-
-    return remote_node_name
-
-  def _FindFaultyDisks(self, node_name):
-    """Wrapper for L{_FindFaultyInstanceDisks}.
-
-    """
-    return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
-                                    node_name, True)
-
-  def _CheckDisksActivated(self, instance):
-    """Checks if the instance disks are activated.
-
-    @param instance: The instance to check disks
-    @return: True if they are activated, False otherwise
-
-    """
-    nodes = instance.all_nodes
-
-    for idx, dev in enumerate(instance.disks):
-      for node in nodes:
-        self.lu.LogInfo("Checking disk/%d on %s", idx, node)
-        self.cfg.SetDiskID(dev, node)
-
-        result = _BlockdevFind(self, node, dev, instance)
-
-        if result.offline:
-          continue
-        elif result.fail_msg or not result.payload:
-          return False
-
-    return True
-
-  def CheckPrereq(self):
-    """Check prerequisites.
-
-    This checks that the instance is in the cluster.
-
-    """
-    self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
-    assert instance is not None, \
-      "Cannot retrieve locked instance %s" % self.instance_name
-
-    if instance.disk_template != constants.DT_DRBD8:
-      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
-                                 " instances", errors.ECODE_INVAL)
-
-    if len(instance.secondary_nodes) != 1:
-      raise errors.OpPrereqError("The instance has a strange layout,"
-                                 " expected one secondary but found %d" %
-                                 len(instance.secondary_nodes),
-                                 errors.ECODE_FAULT)
-
-    instance = self.instance
-    secondary_node = instance.secondary_nodes[0]
-
-    if self.iallocator_name is None:
-      remote_node = self.remote_node
-    else:
-      remote_node = self._RunAllocator(self.lu, self.iallocator_name,
-                                       instance.name, instance.secondary_nodes)
-
-    if remote_node is None:
-      self.remote_node_info = None
-    else:
-      assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \
-             "Remote node '%s' is not locked" % remote_node
-
-      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
-      assert self.remote_node_info is not None, \
-        "Cannot retrieve locked node %s" % remote_node
-
-    if remote_node == self.instance.primary_node:
-      raise errors.OpPrereqError("The specified node is the primary node of"
-                                 " the instance", errors.ECODE_INVAL)
-
-    if remote_node == secondary_node:
-      raise errors.OpPrereqError("The specified node is already the"
-                                 " secondary node of the instance",
-                                 errors.ECODE_INVAL)
-
-    if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
-                                    constants.REPLACE_DISK_CHG):
-      raise errors.OpPrereqError("Cannot specify disks to be replaced",
-                                 errors.ECODE_INVAL)
-
-    if self.mode == constants.REPLACE_DISK_AUTO:
-      if not self._CheckDisksActivated(instance):
-        raise errors.OpPrereqError("Please run activate-disks on instance %s"
-                                   " first" % self.instance_name,
-                                   errors.ECODE_STATE)
-      faulty_primary = self._FindFaultyDisks(instance.primary_node)
-      faulty_secondary = self._FindFaultyDisks(secondary_node)
-
-      if faulty_primary and faulty_secondary:
-        raise errors.OpPrereqError("Instance %s has faulty disks on more than"
-                                   " one node and can not be repaired"
-                                   " automatically" % self.instance_name,
-                                   errors.ECODE_STATE)
-
-      if faulty_primary:
-        self.disks = faulty_primary
-        self.target_node = instance.primary_node
-        self.other_node = secondary_node
-        check_nodes = [self.target_node, self.other_node]
-      elif faulty_secondary:
-        self.disks = faulty_secondary
-        self.target_node = secondary_node
-        self.other_node = instance.primary_node
-        check_nodes = [self.target_node, self.other_node]
-      else:
-        self.disks = []
-        check_nodes = []
-
-    else:
-      # Non-automatic modes
-      if self.mode == constants.REPLACE_DISK_PRI:
-        self.target_node = instance.primary_node
-        self.other_node = secondary_node
-        check_nodes = [self.target_node, self.other_node]
-
-      elif self.mode == constants.REPLACE_DISK_SEC:
-        self.target_node = secondary_node
-        self.other_node = instance.primary_node
-        check_nodes = [self.target_node, self.other_node]
-
-      elif self.mode == constants.REPLACE_DISK_CHG:
-        self.new_node = remote_node
-        self.other_node = instance.primary_node
-        self.target_node = secondary_node
-        check_nodes = [self.new_node, self.other_node]
-
-        _CheckNodeNotDrained(self.lu, remote_node)
-        _CheckNodeVmCapable(self.lu, remote_node)
-
-        old_node_info = self.cfg.GetNodeInfo(secondary_node)
-        assert old_node_info is not None
-        if old_node_info.offline and not self.early_release:
-          # doesn't make sense to delay the release
-          self.early_release = True
-          self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
-                          " early-release mode", secondary_node)
-
-      else:
-        raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
-                                     self.mode)
-
-      # If not specified all disks should be replaced
-      if not self.disks:
-        self.disks = range(len(self.instance.disks))
-
-    # TODO: This is ugly, but right now we can't distinguish between internal
-    # submitted opcode and external one. We should fix that.
-    if self.remote_node_info:
-      # We change the node, lets verify it still meets instance policy
-      new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
-      cluster = self.cfg.GetClusterInfo()
-      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
-                                                              new_group_info)
-      _CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
-                              self.cfg, ignore=self.ignore_ipolicy)
-
-    for node in check_nodes:
-      _CheckNodeOnline(self.lu, node)
-
-    touched_nodes = frozenset(node_name for node_name in [self.new_node,
-                                                          self.other_node,
-                                                          self.target_node]
-                              if node_name is not None)
-
-    # Release unneeded node and node resource locks
-    _ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
-    _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
-    _ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
-
-    # Release any owned node group
-    _ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
-
-    # Check whether disks are valid
-    for disk_idx in self.disks:
-      instance.FindDisk(disk_idx)
-
-    # Get secondary node IP addresses
-    self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node)
-                                  in self.cfg.GetMultiNodeInfo(touched_nodes))
-
-  def Exec(self, feedback_fn):
-    """Execute disk replacement.
-
-    This dispatches the disk replacement to the appropriate handler.
-
-    """
-    if __debug__:
-      # Verify owned locks before starting operation
-      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
-      assert set(owned_nodes) == set(self.node_secondary_ip), \
-          ("Incorrect node locks, owning %s, expected %s" %
-           (owned_nodes, self.node_secondary_ip.keys()))
-      assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
-              self.lu.owned_locks(locking.LEVEL_NODE_RES))
-      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
-
-      owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
-      assert list(owned_instances) == [self.instance_name], \
-          "Instance '%s' not locked" % self.instance_name
-
-      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
-          "Should not own any node group lock at this point"
-
-    if not self.disks:
-      feedback_fn("No disks need replacement for instance '%s'" %
-                  self.instance.name)
-      return
-
-    feedback_fn("Replacing disk(s) %s for instance '%s'" %
-                (utils.CommaJoin(self.disks), self.instance.name))
-    feedback_fn("Current primary node: %s" % self.instance.primary_node)
-    feedback_fn("Current seconary node: %s" %
-                utils.CommaJoin(self.instance.secondary_nodes))
-
-    activate_disks = (self.instance.admin_state != constants.ADMINST_UP)
-
-    # Activate the instance disks if we're replacing them on a down instance
-    if activate_disks:
-      _StartInstanceDisks(self.lu, self.instance, True)
-
-    try:
-      # Should we replace the secondary node?
-      if self.new_node is not None:
-        fn = self._ExecDrbd8Secondary
-      else:
-        fn = self._ExecDrbd8DiskOnly
-
-      result = fn(feedback_fn)
-    finally:
-      # Deactivate the instance disks if we're replacing them on a
-      # down instance
-      if activate_disks:
-        _SafeShutdownInstanceDisks(self.lu, self.instance)
-
-    assert not self.lu.owned_locks(locking.LEVEL_NODE)
-
-    if __debug__:
-      # Verify owned locks
-      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
-      nodes = frozenset(self.node_secondary_ip)
-      assert ((self.early_release and not owned_nodes) or
-              (not self.early_release and not (set(owned_nodes) - nodes))), \
-        ("Not owning the correct locks, early_release=%s, owned=%r,"
-         " nodes=%r" % (self.early_release, owned_nodes, nodes))
-
-    return result
-
-  def _CheckVolumeGroup(self, nodes):
-    self.lu.LogInfo("Checking volume groups")
-
-    vgname = self.cfg.GetVGName()
-
-    # Make sure volume group exists on all involved nodes
-    results = self.rpc.call_vg_list(nodes)
-    if not results:
-      raise errors.OpExecError("Can't list volume groups on the nodes")
-
-    for node in nodes:
-      res = results[node]
-      res.Raise("Error checking node %s" % node)
-      if vgname not in res.payload:
-        raise errors.OpExecError("Volume group '%s' not found on node %s" %
-                                 (vgname, node))
-
-  def _CheckDisksExistence(self, nodes):
-    # Check disk existence
-    for idx, dev in enumerate(self.instance.disks):
-      if idx not in self.disks:
-        continue
-
-      for node in nodes:
-        self.lu.LogInfo("Checking disk/%d on %s", idx, node)
-        self.cfg.SetDiskID(dev, node)
-
-        result = _BlockdevFind(self, node, dev, self.instance)
-
-        msg = result.fail_msg
-        if msg or not result.payload:
-          if not msg:
-            msg = "disk not found"
-          raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
-                                   (idx, node, msg))
-
-  def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
-    for idx, dev in enumerate(self.instance.disks):
-      if idx not in self.disks:
-        continue
-
-      self.lu.LogInfo("Checking disk/%d consistency on node %s" %
-                      (idx, node_name))
-
-      if not _CheckDiskConsistency(self.lu, self.instance, dev, node_name,
-                                   on_primary, ldisk=ldisk):
-        raise errors.OpExecError("Node %s has degraded storage, unsafe to"
-                                 " replace disks for instance %s" %
-                                 (node_name, self.instance.name))
-
-  def _CreateNewStorage(self, node_name):
-    """Create new storage on the primary or secondary node.
-
-    This is only used for same-node replaces, not for changing the
-    secondary node, hence we don't want to modify the existing disk.
-
-    """
-    iv_names = {}
-
-    disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
-    for idx, dev in enumerate(disks):
-      if idx not in self.disks:
-        continue
-
-      self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx)
-
-      self.cfg.SetDiskID(dev, node_name)
-
-      lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
-      names = _GenerateUniqueNames(self.lu, lv_names)
-
-      (data_disk, meta_disk) = dev.children
-      vg_data = data_disk.logical_id[0]
-      lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
-                             logical_id=(vg_data, names[0]),
-                             params=data_disk.params)
-      vg_meta = meta_disk.logical_id[0]
-      lv_meta = objects.Disk(dev_type=constants.LD_LV,
-                             size=constants.DRBD_META_SIZE,
-                             logical_id=(vg_meta, names[1]),
-                             params=meta_disk.params)
-
-      new_lvs = [lv_data, lv_meta]
-      old_lvs = [child.Copy() for child in dev.children]
-      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
-      excl_stor = _IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
-
-      # we pass force_create=True to force the LVM creation
-      for new_lv in new_lvs:
-        _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
-                             _GetInstanceInfoText(self.instance), False,
-                             excl_stor)
-
-    return iv_names
-
-  def _CheckDevices(self, node_name, iv_names):
-    for name, (dev, _, _) in iv_names.iteritems():
-      self.cfg.SetDiskID(dev, node_name)
-
-      result = _BlockdevFind(self, node_name, dev, self.instance)
-
-      msg = result.fail_msg
-      if msg or not result.payload:
-        if not msg:
-          msg = "disk not found"
-        raise errors.OpExecError("Can't find DRBD device %s: %s" %
-                                 (name, msg))
-
-      if result.payload.is_degraded:
-        raise errors.OpExecError("DRBD device %s is degraded!" % name)
-
-  def _RemoveOldStorage(self, node_name, iv_names):
-    for name, (_, old_lvs, _) in iv_names.iteritems():
-      self.lu.LogInfo("Remove logical volumes for %s", name)
-
-      for lv in old_lvs:
-        self.cfg.SetDiskID(lv, node_name)
-
-        msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
-        if msg:
-          self.lu.LogWarning("Can't remove old LV: %s", msg,
-                             hint="remove unused LVs manually")
-
-  def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
-    """Replace a disk on the primary or secondary for DRBD 8.
-
-    The algorithm for replace is quite complicated:
-
-      1. for each disk to be replaced:
-
-        1. create new LVs on the target node with unique names
-        1. detach old LVs from the drbd device
-        1. rename old LVs to name_replaced.<time_t>
-        1. rename new LVs to old LVs
-        1. attach the new LVs (with the old names now) to the drbd device
-
-      1. wait for sync across all devices
-
-      1. for each modified disk:
-
-        1. remove old LVs (which have the name name_replaces.<time_t>)
-
-    Failures are not very well handled.
-
-    """
-    steps_total = 6
-
-    # Step: check device activation
-    self.lu.LogStep(1, steps_total, "Check device existence")
-    self._CheckDisksExistence([self.other_node, self.target_node])
-    self._CheckVolumeGroup([self.target_node, self.other_node])
-
-    # Step: check other node consistency
-    self.lu.LogStep(2, steps_total, "Check peer consistency")
-    self._CheckDisksConsistency(self.other_node,
-                                self.other_node == self.instance.primary_node,
-                                False)
-
-    # Step: create new storage
-    self.lu.LogStep(3, steps_total, "Allocate new storage")
-    iv_names = self._CreateNewStorage(self.target_node)
-
-    # Step: for each lv, detach+rename*2+attach
-    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
-    for dev, old_lvs, new_lvs in iv_names.itervalues():
-      self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
-
-      result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
-                                                     old_lvs)
-      result.Raise("Can't detach drbd from local storage on node"
-                   " %s for device %s" % (self.target_node, dev.iv_name))
-      #dev.children = []
-      #cfg.Update(instance)
-
-      # ok, we created the new LVs, so now we know we have the needed
-      # storage; as such, we proceed on the target node to rename
-      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
-      # using the assumption that logical_id == physical_id (which in
-      # turn is the unique_id on that node)
-
-      # FIXME(iustin): use a better name for the replaced LVs
-      temp_suffix = int(time.time())
-      ren_fn = lambda d, suff: (d.physical_id[0],
-                                d.physical_id[1] + "_replaced-%s" % suff)
-
-      # Build the rename list based on what LVs exist on the node
-      rename_old_to_new = []
-      for to_ren in old_lvs:
-        result = self.rpc.call_blockdev_find(self.target_node, to_ren)
-        if not result.fail_msg and result.payload:
-          # device exists
-          rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
-
-      self.lu.LogInfo("Renaming the old LVs on the target node")
-      result = self.rpc.call_blockdev_rename(self.target_node,
-                                             rename_old_to_new)
-      result.Raise("Can't rename old LVs on node %s" % self.target_node)
-
-      # Now we rename the new LVs to the old LVs
-      self.lu.LogInfo("Renaming the new LVs on the target node")
-      rename_new_to_old = [(new, old.physical_id)
-                           for old, new in zip(old_lvs, new_lvs)]
-      result = self.rpc.call_blockdev_rename(self.target_node,
-                                             rename_new_to_old)
-      result.Raise("Can't rename new LVs on node %s" % self.target_node)
-
-      # Intermediate steps of in memory modifications
-      for old, new in zip(old_lvs, new_lvs):
-        new.logical_id = old.logical_id
-        self.cfg.SetDiskID(new, self.target_node)
-
-      # We need to modify old_lvs so that removal later removes the
-      # right LVs, not the newly added ones; note that old_lvs is a
-      # copy here
-      for disk in old_lvs:
-        disk.logical_id = ren_fn(disk, temp_suffix)
-        self.cfg.SetDiskID(disk, self.target_node)
-
-      # Now that the new lvs have the old name, we can add them to the device
-      self.lu.LogInfo("Adding new mirror component on %s", self.target_node)
-      result = self.rpc.call_blockdev_addchildren(self.target_node,
-                                                  (dev, self.instance), new_lvs)
-      msg = result.fail_msg
-      if msg:
-        for new_lv in new_lvs:
-          msg2 = self.rpc.call_blockdev_remove(self.target_node,
-                                               new_lv).fail_msg
-          if msg2:
-            self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
-                               hint=("cleanup manually the unused logical"
-                                     "volumes"))
-        raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
-
-    cstep = itertools.count(5)
-
-    if self.early_release:
-      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
-      self._RemoveOldStorage(self.target_node, iv_names)
-      # TODO: Check if releasing locks early still makes sense
-      _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
-    else:
-      # Release all resource locks except those used by the instance
-      _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
-                    keep=self.node_secondary_ip.keys())
-
-    # Release all node locks while waiting for sync
-    _ReleaseLocks(self.lu, locking.LEVEL_NODE)
-
-    # TODO: Can the instance lock be downgraded here? Take the optional disk
-    # shutdown in the caller into consideration.
-
-    # Wait for sync
-    # This can fail as the old devices are degraded and _WaitForSync
-    # does a combined result over all disks, so we don't check its return value
-    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
-    _WaitForSync(self.lu, self.instance)
-
-    # Check all devices manually
-    self._CheckDevices(self.instance.primary_node, iv_names)
-
-    # Step: remove old storage
-    if not self.early_release:
-      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
-      self._RemoveOldStorage(self.target_node, iv_names)
-
-  def _ExecDrbd8Secondary(self, feedback_fn):
-    """Replace the secondary node for DRBD 8.
-
-    The algorithm for replace is quite complicated:
-      - for all disks of the instance:
-        - create new LVs on the new node with same names
-        - shutdown the drbd device on the old secondary
-        - disconnect the drbd network on the primary
-        - create the drbd device on the new secondary
-        - network attach the drbd on the primary, using an artifice:
-          the drbd code for Attach() will connect to the network if it
-          finds a device which is connected to the good local disks but
-          not network enabled
-      - wait for sync across all devices
-      - remove all disks from the old secondary
-
-    Failures are not very well handled.
-
-    """
-    steps_total = 6
-
-    pnode = self.instance.primary_node
-
-    # Step: check device activation
-    self.lu.LogStep(1, steps_total, "Check device existence")
-    self._CheckDisksExistence([self.instance.primary_node])
-    self._CheckVolumeGroup([self.instance.primary_node])
-
-    # Step: check other node consistency
-    self.lu.LogStep(2, steps_total, "Check peer consistency")
-    self._CheckDisksConsistency(self.instance.primary_node, True, True)
-
-    # Step: create new storage
-    self.lu.LogStep(3, steps_total, "Allocate new storage")
-    disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
-    excl_stor = _IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
-    for idx, dev in enumerate(disks):
-      self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
-                      (self.new_node, idx))
-      # we pass force_create=True to force LVM creation
-      for new_lv in dev.children:
-        _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
-                             True, _GetInstanceInfoText(self.instance), False,
-                             excl_stor)
-
-    # Step 4: dbrd minors and drbd setups changes
-    # after this, we must manually remove the drbd minors on both the
-    # error and the success paths
-    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
-    minors = self.cfg.AllocateDRBDMinor([self.new_node
-                                         for dev in self.instance.disks],
-                                        self.instance.name)
-    logging.debug("Allocated minors %r", minors)
-
-    iv_names = {}
-    for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
-      self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
-                      (self.new_node, idx))
-      # create new devices on new_node; note that we create two IDs:
-      # one without port, so the drbd will be activated without
-      # networking information on the new node at this stage, and one
-      # with network, for the latter activation in step 4
-      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
-      if self.instance.primary_node == o_node1:
-        p_minor = o_minor1
-      else:
-        assert self.instance.primary_node == o_node2, "Three-node instance?"
-        p_minor = o_minor2
-
-      new_alone_id = (self.instance.primary_node, self.new_node, None,
-                      p_minor, new_minor, o_secret)
-      new_net_id = (self.instance.primary_node, self.new_node, o_port,
-                    p_minor, new_minor, o_secret)
-
-      iv_names[idx] = (dev, dev.children, new_net_id)
-      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
-                    new_net_id)
-      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
-                              logical_id=new_alone_id,
-                              children=dev.children,
-                              size=dev.size,
-                              params={})
-      (anno_new_drbd,) = _AnnotateDiskParams(self.instance, [new_drbd],
-                                             self.cfg)
-      try:
-        _CreateSingleBlockDev(self.lu, self.new_node, self.instance,
-                              anno_new_drbd,
-                              _GetInstanceInfoText(self.instance), False,
-                              excl_stor)
-      except errors.GenericError:
-        self.cfg.ReleaseDRBDMinors(self.instance.name)
-        raise
-
-    # We have new devices, shutdown the drbd on the old secondary
-    for idx, dev in enumerate(self.instance.disks):
-      self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
-      self.cfg.SetDiskID(dev, self.target_node)
-      msg = self.rpc.call_blockdev_shutdown(self.target_node,
-                                            (dev, self.instance)).fail_msg
-      if msg:
-        self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
-                           "node: %s" % (idx, msg),
-                           hint=("Please cleanup this device manually as"
-                                 " soon as possible"))
-
-    self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
-    result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
-                                               self.instance.disks)[pnode]
-
-    msg = result.fail_msg
-    if msg:
-      # detaches didn't succeed (unlikely)
-      self.cfg.ReleaseDRBDMinors(self.instance.name)
-      raise errors.OpExecError("Can't detach the disks from the network on"
-                               " old node: %s" % (msg,))
-
-    # if we managed to detach at least one, we update all the disks of
-    # the instance to point to the new secondary
-    self.lu.LogInfo("Updating instance configuration")
-    for dev, _, new_logical_id in iv_names.itervalues():
-      dev.logical_id = new_logical_id
-      self.cfg.SetDiskID(dev, self.instance.primary_node)
-
-    self.cfg.Update(self.instance, feedback_fn)
-
-    # Release all node locks (the configuration has been updated)
-    _ReleaseLocks(self.lu, locking.LEVEL_NODE)
-
-    # and now perform the drbd attach
-    self.lu.LogInfo("Attaching primary drbds to new secondary"
-                    " (standalone => connected)")
-    result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
-                                            self.new_node],
-                                           self.node_secondary_ip,
-                                           (self.instance.disks, self.instance),
-                                           self.instance.name,
-                                           False)
-    for to_node, to_result in result.items():
-      msg = to_result.fail_msg
-      if msg:
-        self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
-                           to_node, msg,
-                           hint=("please do a gnt-instance info to see the"
-                                 " status of disks"))
-
-    cstep = itertools.count(5)
-
-    if self.early_release:
-      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
-      self._RemoveOldStorage(self.target_node, iv_names)
-      # TODO: Check if releasing locks early still makes sense
-      _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
-    else:
-      # Release all resource locks except those used by the instance
-      _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
-                    keep=self.node_secondary_ip.keys())
-
-    # TODO: Can the instance lock be downgraded here? Take the optional disk
-    # shutdown in the caller into consideration.
-
-    # Wait for sync
-    # This can fail as the old devices are degraded and _WaitForSync
-    # does a combined result over all disks, so we don't check its return value
-    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
-    _WaitForSync(self.lu, self.instance)
-
-    # Check all devices manually
-    self._CheckDevices(self.instance.primary_node, iv_names)
-
-    # Step: remove old storage
-    if not self.early_release:
-      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
-      self._RemoveOldStorage(self.target_node, iv_names)
diff --git a/lib/cmdlib/instance_storage.py b/lib/cmdlib/instance_storage.py
new file mode 100644 (file)
index 0000000..bfdee08
--- /dev/null
@@ -0,0 +1,2462 @@
+#
+#
+
+# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+
+
+"""Logical units dealing with storage of instances."""
+
+import itertools
+import logging
+import os
+import time
+
+from ganeti import compat
+from ganeti import constants
+from ganeti import errors
+from ganeti import ht
+from ganeti import locking
+from ganeti.masterd import iallocator
+from ganeti import objects
+from ganeti import utils
+from ganeti import opcodes
+from ganeti import rpc
+from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
+from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
+  _AnnotateDiskParams, _CheckIAllocatorOrNode, _ExpandNodeName, \
+  _CheckNodeOnline, _CheckInstanceNodeGroups, _CheckInstanceState, \
+  _IsExclusiveStorageEnabledNode, _FindFaultyInstanceDisks
+from ganeti.cmdlib.instance_utils import _GetInstanceInfoText, \
+  _CopyLockList, _ReleaseLocks, _CheckNodeVmCapable, \
+  _BuildInstanceHookEnvByObject, _CheckNodeNotDrained, _CheckTargetNodeIPolicy
+
+import ganeti.masterd.instance
+
+
+_DISK_TEMPLATE_NAME_PREFIX = {
+  constants.DT_PLAIN: "",
+  constants.DT_RBD: ".rbd",
+  constants.DT_EXT: ".ext",
+  }
+
+
+_DISK_TEMPLATE_DEVICE_TYPE = {
+  constants.DT_PLAIN: constants.LD_LV,
+  constants.DT_FILE: constants.LD_FILE,
+  constants.DT_SHARED_FILE: constants.LD_FILE,
+  constants.DT_BLOCK: constants.LD_BLOCKDEV,
+  constants.DT_RBD: constants.LD_RBD,
+  constants.DT_EXT: constants.LD_EXT,
+  }
+
+
+def _CreateSingleBlockDev(lu, node, instance, device, info, force_open,
+                          excl_stor):
+  """Create a single block device on a given node.
+
+  This will not recurse over children of the device, so they must be
+  created in advance.
+
+  @param lu: the lu on whose behalf we execute
+  @param node: the node on which to create the device
+  @type instance: L{objects.Instance}
+  @param instance: the instance which owns the device
+  @type device: L{objects.Disk}
+  @param device: the device to create
+  @param info: the extra 'metadata' we should attach to the device
+      (this will be represented as a LVM tag)
+  @type force_open: boolean
+  @param force_open: this parameter will be passes to the
+      L{backend.BlockdevCreate} function where it specifies
+      whether we run on primary or not, and it affects both
+      the child assembly and the device own Open() execution
+  @type excl_stor: boolean
+  @param excl_stor: Whether exclusive_storage is active for the node
+
+  """
+  lu.cfg.SetDiskID(device, node)
+  result = lu.rpc.call_blockdev_create(node, device, device.size,
+                                       instance.name, force_open, info,
+                                       excl_stor)
+  result.Raise("Can't create block device %s on"
+               " node %s for instance %s" % (device, node, instance.name))
+  if device.physical_id is None:
+    device.physical_id = result.payload
+
+
+def _CreateBlockDevInner(lu, node, instance, device, force_create,
+                         info, force_open, excl_stor):
+  """Create a tree of block devices on a given node.
+
+  If this device type has to be created on secondaries, create it and
+  all its children.
+
+  If not, just recurse to children keeping the same 'force' value.
+
+  @attention: The device has to be annotated already.
+
+  @param lu: the lu on whose behalf we execute
+  @param node: the node on which to create the device
+  @type instance: L{objects.Instance}
+  @param instance: the instance which owns the device
+  @type device: L{objects.Disk}
+  @param device: the device to create
+  @type force_create: boolean
+  @param force_create: whether to force creation of this device; this
+      will be change to True whenever we find a device which has
+      CreateOnSecondary() attribute
+  @param info: the extra 'metadata' we should attach to the device
+      (this will be represented as a LVM tag)
+  @type force_open: boolean
+  @param force_open: this parameter will be passes to the
+      L{backend.BlockdevCreate} function where it specifies
+      whether we run on primary or not, and it affects both
+      the child assembly and the device own Open() execution
+  @type excl_stor: boolean
+  @param excl_stor: Whether exclusive_storage is active for the node
+
+  @return: list of created devices
+  """
+  created_devices = []
+  try:
+    if device.CreateOnSecondary():
+      force_create = True
+
+    if device.children:
+      for child in device.children:
+        devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
+                                    info, force_open, excl_stor)
+        created_devices.extend(devs)
+
+    if not force_create:
+      return created_devices
+
+    _CreateSingleBlockDev(lu, node, instance, device, info, force_open,
+                          excl_stor)
+    # The device has been completely created, so there is no point in keeping
+    # its subdevices in the list. We just add the device itself instead.
+    created_devices = [(node, device)]
+    return created_devices
+
+  except errors.DeviceCreationError, e:
+    e.created_devices.extend(created_devices)
+    raise e
+  except errors.OpExecError, e:
+    raise errors.DeviceCreationError(str(e), created_devices)
+
+
+def _IsExclusiveStorageEnabledNodeName(cfg, nodename):
+  """Whether exclusive_storage is in effect for the given node.
+
+  @type cfg: L{config.ConfigWriter}
+  @param cfg: The cluster configuration
+  @type nodename: string
+  @param nodename: The node
+  @rtype: bool
+  @return: The effective value of exclusive_storage
+  @raise errors.OpPrereqError: if no node exists with the given name
+
+  """
+  ni = cfg.GetNodeInfo(nodename)
+  if ni is None:
+    raise errors.OpPrereqError("Invalid node name %s" % nodename,
+                               errors.ECODE_NOENT)
+  return _IsExclusiveStorageEnabledNode(cfg, ni)
+
+
+def _CreateBlockDev(lu, node, instance, device, force_create, info,
+                    force_open):
+  """Wrapper around L{_CreateBlockDevInner}.
+
+  This method annotates the root device first.
+
+  """
+  (disk,) = _AnnotateDiskParams(instance, [device], lu.cfg)
+  excl_stor = _IsExclusiveStorageEnabledNodeName(lu.cfg, node)
+  return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
+                              force_open, excl_stor)
+
+
+def _CreateDisks(lu, instance, to_skip=None, target_node=None):
+  """Create all disks for an instance.
+
+  This abstracts away some work from AddInstance.
+
+  @type lu: L{LogicalUnit}
+  @param lu: the logical unit on whose behalf we execute
+  @type instance: L{objects.Instance}
+  @param instance: the instance whose disks we should create
+  @type to_skip: list
+  @param to_skip: list of indices to skip
+  @type target_node: string
+  @param target_node: if passed, overrides the target node for creation
+  @rtype: boolean
+  @return: the success of the creation
+
+  """
+  info = _GetInstanceInfoText(instance)
+  if target_node is None:
+    pnode = instance.primary_node
+    all_nodes = instance.all_nodes
+  else:
+    pnode = target_node
+    all_nodes = [pnode]
+
+  if instance.disk_template in constants.DTS_FILEBASED:
+    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
+    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
+
+    result.Raise("Failed to create directory '%s' on"
+                 " node %s" % (file_storage_dir, pnode))
+
+  disks_created = []
+  # Note: this needs to be kept in sync with adding of disks in
+  # LUInstanceSetParams
+  for idx, device in enumerate(instance.disks):
+    if to_skip and idx in to_skip:
+      continue
+    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
+    #HARDCODE
+    for node in all_nodes:
+      f_create = node == pnode
+      try:
+        _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
+        disks_created.append((node, device))
+      except errors.OpExecError:
+        logging.warning("Creating disk %s for instance '%s' failed",
+                        idx, instance.name)
+      except errors.DeviceCreationError, e:
+        logging.warning("Creating disk %s for instance '%s' failed",
+                        idx, instance.name)
+        disks_created.extend(e.created_devices)
+        for (node, disk) in disks_created:
+          lu.cfg.SetDiskID(disk, node)
+          result = lu.rpc.call_blockdev_remove(node, disk)
+          if result.fail_msg:
+            logging.warning("Failed to remove newly-created disk %s on node %s:"
+                            " %s", device, node, result.fail_msg)
+        raise errors.OpExecError(e.message)
+
+
+def _ComputeDiskSizePerVG(disk_template, disks):
+  """Compute disk size requirements in the volume group
+
+  """
+  def _compute(disks, payload):
+    """Universal algorithm.
+
+    """
+    vgs = {}
+    for disk in disks:
+      vgs[disk[constants.IDISK_VG]] = \
+        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
+
+    return vgs
+
+  # Required free disk space as a function of disk and swap space
+  req_size_dict = {
+    constants.DT_DISKLESS: {},
+    constants.DT_PLAIN: _compute(disks, 0),
+    # 128 MB are added for drbd metadata for each disk
+    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
+    constants.DT_FILE: {},
+    constants.DT_SHARED_FILE: {},
+    }
+
+  if disk_template not in req_size_dict:
+    raise errors.ProgrammerError("Disk template '%s' size requirement"
+                                 " is unknown" % disk_template)
+
+  return req_size_dict[disk_template]
+
+
+def _ComputeDisks(op, default_vg):
+  """Computes the instance disks.
+
+  @param op: The instance opcode
+  @param default_vg: The default_vg to assume
+
+  @return: The computed disks
+
+  """
+  disks = []
+  for disk in op.disks:
+    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
+    if mode not in constants.DISK_ACCESS_SET:
+      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
+                                 mode, errors.ECODE_INVAL)
+    size = disk.get(constants.IDISK_SIZE, None)
+    if size is None:
+      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
+    try:
+      size = int(size)
+    except (TypeError, ValueError):
+      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
+                                 errors.ECODE_INVAL)
+
+    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
+    if ext_provider and op.disk_template != constants.DT_EXT:
+      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
+                                 " disk template, not %s" %
+                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
+                                  op.disk_template), errors.ECODE_INVAL)
+
+    data_vg = disk.get(constants.IDISK_VG, default_vg)
+    name = disk.get(constants.IDISK_NAME, None)
+    if name is not None and name.lower() == constants.VALUE_NONE:
+      name = None
+    new_disk = {
+      constants.IDISK_SIZE: size,
+      constants.IDISK_MODE: mode,
+      constants.IDISK_VG: data_vg,
+      constants.IDISK_NAME: name,
+      }
+
+    if constants.IDISK_METAVG in disk:
+      new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG]
+    if constants.IDISK_ADOPT in disk:
+      new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
+
+    # For extstorage, demand the `provider' option and add any
+    # additional parameters (ext-params) to the dict
+    if op.disk_template == constants.DT_EXT:
+      if ext_provider:
+        new_disk[constants.IDISK_PROVIDER] = ext_provider
+        for key in disk:
+          if key not in constants.IDISK_PARAMS:
+            new_disk[key] = disk[key]
+      else:
+        raise errors.OpPrereqError("Missing provider for template '%s'" %
+                                   constants.DT_EXT, errors.ECODE_INVAL)
+
+    disks.append(new_disk)
+
+  return disks
+
+
+def _CheckRADOSFreeSpace():
+  """Compute disk size requirements inside the RADOS cluster.
+
+  """
+  # For the RADOS cluster we assume there is always enough space.
+  pass
+
+
+def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
+                         iv_name, p_minor, s_minor):
+  """Generate a drbd8 device complete with its children.
+
+  """
+  assert len(vgnames) == len(names) == 2
+  port = lu.cfg.AllocatePort()
+  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
+
+  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
+                          logical_id=(vgnames[0], names[0]),
+                          params={})
+  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
+  dev_meta = objects.Disk(dev_type=constants.LD_LV,
+                          size=constants.DRBD_META_SIZE,
+                          logical_id=(vgnames[1], names[1]),
+                          params={})
+  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
+  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
+                          logical_id=(primary, secondary, port,
+                                      p_minor, s_minor,
+                                      shared_secret),
+                          children=[dev_data, dev_meta],
+                          iv_name=iv_name, params={})
+  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
+  return drbd_dev
+
+
+def _GenerateDiskTemplate(
+  lu, template_name, instance_name, primary_node, secondary_nodes,
+  disk_info, file_storage_dir, file_driver, base_index,
+  feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
+  _req_shr_file_storage=opcodes.RequireSharedFileStorage):
+  """Generate the entire disk layout for a given template type.
+
+  """
+  vgname = lu.cfg.GetVGName()
+  disk_count = len(disk_info)
+  disks = []
+
+  if template_name == constants.DT_DISKLESS:
+    pass
+  elif template_name == constants.DT_DRBD8:
+    if len(secondary_nodes) != 1:
+      raise errors.ProgrammerError("Wrong template configuration")
+    remote_node = secondary_nodes[0]
+    minors = lu.cfg.AllocateDRBDMinor(
+      [primary_node, remote_node] * len(disk_info), instance_name)
+
+    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
+                                                       full_disk_params)
+    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
+
+    names = []
+    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
+                                               for i in range(disk_count)]):
+      names.append(lv_prefix + "_data")
+      names.append(lv_prefix + "_meta")
+    for idx, disk in enumerate(disk_info):
+      disk_index = idx + base_index
+      data_vg = disk.get(constants.IDISK_VG, vgname)
+      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
+      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
+                                      disk[constants.IDISK_SIZE],
+                                      [data_vg, meta_vg],
+                                      names[idx * 2:idx * 2 + 2],
+                                      "disk/%d" % disk_index,
+                                      minors[idx * 2], minors[idx * 2 + 1])
+      disk_dev.mode = disk[constants.IDISK_MODE]
+      disk_dev.name = disk.get(constants.IDISK_NAME, None)
+      disks.append(disk_dev)
+  else:
+    if secondary_nodes:
+      raise errors.ProgrammerError("Wrong template configuration")
+
+    if template_name == constants.DT_FILE:
+      _req_file_storage()
+    elif template_name == constants.DT_SHARED_FILE:
+      _req_shr_file_storage()
+
+    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
+    if name_prefix is None:
+      names = None
+    else:
+      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
+                                        (name_prefix, base_index + i)
+                                        for i in range(disk_count)])
+
+    if template_name == constants.DT_PLAIN:
+
+      def logical_id_fn(idx, _, disk):
+        vg = disk.get(constants.IDISK_VG, vgname)
+        return (vg, names[idx])
+
+    elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
+      logical_id_fn = \
+        lambda _, disk_index, disk: (file_driver,
+                                     "%s/disk%d" % (file_storage_dir,
+                                                    disk_index))
+    elif template_name == constants.DT_BLOCK:
+      logical_id_fn = \
+        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
+                                       disk[constants.IDISK_ADOPT])
+    elif template_name == constants.DT_RBD:
+      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
+    elif template_name == constants.DT_EXT:
+      def logical_id_fn(idx, _, disk):
+        provider = disk.get(constants.IDISK_PROVIDER, None)
+        if provider is None:
+          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
+                                       " not found", constants.DT_EXT,
+                                       constants.IDISK_PROVIDER)
+        return (provider, names[idx])
+    else:
+      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
+
+    dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
+
+    for idx, disk in enumerate(disk_info):
+      params = {}
+      # Only for the Ext template add disk_info to params
+      if template_name == constants.DT_EXT:
+        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
+        for key in disk:
+          if key not in constants.IDISK_PARAMS:
+            params[key] = disk[key]
+      disk_index = idx + base_index
+      size = disk[constants.IDISK_SIZE]
+      feedback_fn("* disk %s, size %s" %
+                  (disk_index, utils.FormatUnit(size, "h")))
+      disk_dev = objects.Disk(dev_type=dev_type, size=size,
+                              logical_id=logical_id_fn(idx, disk_index, disk),
+                              iv_name="disk/%d" % disk_index,
+                              mode=disk[constants.IDISK_MODE],
+                              params=params)
+      disk_dev.name = disk.get(constants.IDISK_NAME, None)
+      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
+      disks.append(disk_dev)
+
+  return disks
+
+
+class LUInstanceRecreateDisks(LogicalUnit):
+  """Recreate an instance's missing disks.
+
+  """
+  HPATH = "instance-recreate-disks"
+  HTYPE = constants.HTYPE_INSTANCE
+  REQ_BGL = False
+
+  _MODIFYABLE = compat.UniqueFrozenset([
+    constants.IDISK_SIZE,
+    constants.IDISK_MODE,
+    ])
+
+  # New or changed disk parameters may have different semantics
+  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
+    constants.IDISK_ADOPT,
+
+    # TODO: Implement support changing VG while recreating
+    constants.IDISK_VG,
+    constants.IDISK_METAVG,
+    constants.IDISK_PROVIDER,
+    constants.IDISK_NAME,
+    ]))
+
+  def _RunAllocator(self):
+    """Run the allocator based on input opcode.
+
+    """
+    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
+
+    # FIXME
+    # The allocator should actually run in "relocate" mode, but current
+    # allocators don't support relocating all the nodes of an instance at
+    # the same time. As a workaround we use "allocate" mode, but this is
+    # suboptimal for two reasons:
+    # - The instance name passed to the allocator is present in the list of
+    #   existing instances, so there could be a conflict within the
+    #   internal structures of the allocator. This doesn't happen with the
+    #   current allocators, but it's a liability.
+    # - The allocator counts the resources used by the instance twice: once
+    #   because the instance exists already, and once because it tries to
+    #   allocate a new instance.
+    # The allocator could choose some of the nodes on which the instance is
+    # running, but that's not a problem. If the instance nodes are broken,
+    # they should be already be marked as drained or offline, and hence
+    # skipped by the allocator. If instance disks have been lost for other
+    # reasons, then recreating the disks on the same nodes should be fine.
+    disk_template = self.instance.disk_template
+    spindle_use = be_full[constants.BE_SPINDLE_USE]
+    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
+                                        disk_template=disk_template,
+                                        tags=list(self.instance.GetTags()),
+                                        os=self.instance.os,
+                                        nics=[{}],
+                                        vcpus=be_full[constants.BE_VCPUS],
+                                        memory=be_full[constants.BE_MAXMEM],
+                                        spindle_use=spindle_use,
+                                        disks=[{constants.IDISK_SIZE: d.size,
+                                                constants.IDISK_MODE: d.mode}
+                                               for d in self.instance.disks],
+                                        hypervisor=self.instance.hypervisor,
+                                        node_whitelist=None)
+    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
+
+    ial.Run(self.op.iallocator)
+
+    assert req.RequiredNodes() == len(self.instance.all_nodes)
+
+    if not ial.success:
+      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
+                                 " %s" % (self.op.iallocator, ial.info),
+                                 errors.ECODE_NORES)
+
+    self.op.nodes = ial.result
+    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
+                 self.op.instance_name, self.op.iallocator,
+                 utils.CommaJoin(ial.result))
+
+  def CheckArguments(self):
+    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
+      # Normalize and convert deprecated list of disk indices
+      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
+
+    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
+    if duplicates:
+      raise errors.OpPrereqError("Some disks have been specified more than"
+                                 " once: %s" % utils.CommaJoin(duplicates),
+                                 errors.ECODE_INVAL)
+
+    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
+    # when neither iallocator nor nodes are specified
+    if self.op.iallocator or self.op.nodes:
+      _CheckIAllocatorOrNode(self, "iallocator", "nodes")
+
+    for (idx, params) in self.op.disks:
+      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
+      unsupported = frozenset(params.keys()) - self._MODIFYABLE
+      if unsupported:
+        raise errors.OpPrereqError("Parameters for disk %s try to change"
+                                   " unmodifyable parameter(s): %s" %
+                                   (idx, utils.CommaJoin(unsupported)),
+                                   errors.ECODE_INVAL)
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+
+    if self.op.nodes:
+      self.op.nodes = [_ExpandNodeName(self.cfg, n) for n in self.op.nodes]
+      self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
+    else:
+      self.needed_locks[locking.LEVEL_NODE] = []
+      if self.op.iallocator:
+        # iallocator will select a new node in the same group
+        self.needed_locks[locking.LEVEL_NODEGROUP] = []
+        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
+
+    self.needed_locks[locking.LEVEL_NODE_RES] = []
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODEGROUP:
+      assert self.op.iallocator is not None
+      assert not self.op.nodes
+      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
+      self.share_locks[locking.LEVEL_NODEGROUP] = 1
+      # Lock the primary group used by the instance optimistically; this
+      # requires going via the node before it's locked, requiring
+      # verification later on
+      self.needed_locks[locking.LEVEL_NODEGROUP] = \
+        self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
+
+    elif level == locking.LEVEL_NODE:
+      # If an allocator is used, then we lock all the nodes in the current
+      # instance group, as we don't know yet which ones will be selected;
+      # if we replace the nodes without using an allocator, locks are
+      # already declared in ExpandNames; otherwise, we need to lock all the
+      # instance nodes for disk re-creation
+      if self.op.iallocator:
+        assert not self.op.nodes
+        assert not self.needed_locks[locking.LEVEL_NODE]
+        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
+
+        # Lock member nodes of the group of the primary node
+        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
+          self.needed_locks[locking.LEVEL_NODE].extend(
+            self.cfg.GetNodeGroup(group_uuid).members)
+
+        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
+      elif not self.op.nodes:
+        self._LockInstancesNodes(primary_only=False)
+    elif level == locking.LEVEL_NODE_RES:
+      # Copy node locks
+      self.needed_locks[locking.LEVEL_NODE_RES] = \
+        _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    This runs on master, primary and secondary nodes of the instance.
+
+    """
+    return _BuildInstanceHookEnvByObject(self, self.instance)
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
+    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
+    return (nl, nl)
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    This checks that the instance is in the cluster and is not running.
+
+    """
+    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+    assert instance is not None, \
+      "Cannot retrieve locked instance %s" % self.op.instance_name
+    if self.op.nodes:
+      if len(self.op.nodes) != len(instance.all_nodes):
+        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
+                                   " %d replacement nodes were specified" %
+                                   (instance.name, len(instance.all_nodes),
+                                    len(self.op.nodes)),
+                                   errors.ECODE_INVAL)
+      assert instance.disk_template != constants.DT_DRBD8 or \
+             len(self.op.nodes) == 2
+      assert instance.disk_template != constants.DT_PLAIN or \
+             len(self.op.nodes) == 1
+      primary_node = self.op.nodes[0]
+    else:
+      primary_node = instance.primary_node
+    if not self.op.iallocator:
+      _CheckNodeOnline(self, primary_node)
+
+    if instance.disk_template == constants.DT_DISKLESS:
+      raise errors.OpPrereqError("Instance '%s' has no disks" %
+                                 self.op.instance_name, errors.ECODE_INVAL)
+
+    # Verify if node group locks are still correct
+    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
+    if owned_groups:
+      # Node group locks are acquired only for the primary node (and only
+      # when the allocator is used)
+      _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
+                               primary_only=True)
+
+    # if we replace nodes *and* the old primary is offline, we don't
+    # check the instance state
+    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
+    if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
+      _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
+                          msg="cannot recreate disks")
+
+    if self.op.disks:
+      self.disks = dict(self.op.disks)
+    else:
+      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
+
+    maxidx = max(self.disks.keys())
+    if maxidx >= len(instance.disks):
+      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
+                                 errors.ECODE_INVAL)
+
+    if ((self.op.nodes or self.op.iallocator) and
+         sorted(self.disks.keys()) != range(len(instance.disks))):
+      raise errors.OpPrereqError("Can't recreate disks partially and"
+                                 " change the nodes at the same time",
+                                 errors.ECODE_INVAL)
+
+    self.instance = instance
+
+    if self.op.iallocator:
+      self._RunAllocator()
+      # Release unneeded node and node resource locks
+      _ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
+      _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
+      _ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
+
+    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
+
+  def Exec(self, feedback_fn):
+    """Recreate the disks.
+
+    """
+    instance = self.instance
+
+    assert (self.owned_locks(locking.LEVEL_NODE) ==
+            self.owned_locks(locking.LEVEL_NODE_RES))
+
+    to_skip = []
+    mods = [] # keeps track of needed changes
+
+    for idx, disk in enumerate(instance.disks):
+      try:
+        changes = self.disks[idx]
+      except KeyError:
+        # Disk should not be recreated
+        to_skip.append(idx)
+        continue
+
+      # update secondaries for disks, if needed
+      if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
+        # need to update the nodes and minors
+        assert len(self.op.nodes) == 2
+        assert len(disk.logical_id) == 6 # otherwise disk internals
+                                         # have changed
+        (_, _, old_port, _, _, old_secret) = disk.logical_id
+        new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
+        new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
+                  new_minors[0], new_minors[1], old_secret)
+        assert len(disk.logical_id) == len(new_id)
+      else:
+        new_id = None
+
+      mods.append((idx, new_id, changes))
+
+    # now that we have passed all asserts above, we can apply the mods
+    # in a single run (to avoid partial changes)
+    for idx, new_id, changes in mods:
+      disk = instance.disks[idx]
+      if new_id is not None:
+        assert disk.dev_type == constants.LD_DRBD8
+        disk.logical_id = new_id
+      if changes:
+        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
+                    mode=changes.get(constants.IDISK_MODE, None))
+
+    # change primary node, if needed
+    if self.op.nodes:
+      instance.primary_node = self.op.nodes[0]
+      self.LogWarning("Changing the instance's nodes, you will have to"
+                      " remove any disks left on the older nodes manually")
+
+    if self.op.nodes:
+      self.cfg.Update(instance, feedback_fn)
+
+    # All touched nodes must be locked
+    mylocks = self.owned_locks(locking.LEVEL_NODE)
+    assert mylocks.issuperset(frozenset(instance.all_nodes))
+    _CreateDisks(self, instance, to_skip=to_skip)
+
+
+def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
+  """Checks if nodes have enough free disk space in the specified VG.
+
+  This function checks if all given nodes have the needed amount of
+  free disk. In case any node has less disk or we cannot get the
+  information from the node, this function raises an OpPrereqError
+  exception.
+
+  @type lu: C{LogicalUnit}
+  @param lu: a logical unit from which we get configuration data
+  @type nodenames: C{list}
+  @param nodenames: the list of node names to check
+  @type vg: C{str}
+  @param vg: the volume group to check
+  @type requested: C{int}
+  @param requested: the amount of disk in MiB to check for
+  @raise errors.OpPrereqError: if the node doesn't have enough disk,
+      or we cannot check the node
+
+  """
+  es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
+  nodeinfo = lu.rpc.call_node_info(nodenames, [vg], None, es_flags)
+  for node in nodenames:
+    info = nodeinfo[node]
+    info.Raise("Cannot get current information from node %s" % node,
+               prereq=True, ecode=errors.ECODE_ENVIRON)
+    (_, (vg_info, ), _) = info.payload
+    vg_free = vg_info.get("vg_free", None)
+    if not isinstance(vg_free, int):
+      raise errors.OpPrereqError("Can't compute free disk space on node"
+                                 " %s for vg %s, result was '%s'" %
+                                 (node, vg, vg_free), errors.ECODE_ENVIRON)
+    if requested > vg_free:
+      raise errors.OpPrereqError("Not enough disk space on target node %s"
+                                 " vg %s: required %d MiB, available %d MiB" %
+                                 (node, vg, requested, vg_free),
+                                 errors.ECODE_NORES)
+
+
+def _CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
+  """Checks if nodes have enough free disk space in all the VGs.
+
+  This function checks if all given nodes have the needed amount of
+  free disk. In case any node has less disk or we cannot get the
+  information from the node, this function raises an OpPrereqError
+  exception.
+
+  @type lu: C{LogicalUnit}
+  @param lu: a logical unit from which we get configuration data
+  @type nodenames: C{list}
+  @param nodenames: the list of node names to check
+  @type req_sizes: C{dict}
+  @param req_sizes: the hash of vg and corresponding amount of disk in
+      MiB to check for
+  @raise errors.OpPrereqError: if the node doesn't have enough disk,
+      or we cannot check the node
+
+  """
+  for vg, req_size in req_sizes.items():
+    _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
+
+
+def _DiskSizeInBytesToMebibytes(lu, size):
+  """Converts a disk size in bytes to mebibytes.
+
+  Warns and rounds up if the size isn't an even multiple of 1 MiB.
+
+  """
+  (mib, remainder) = divmod(size, 1024 * 1024)
+
+  if remainder != 0:
+    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
+                  " to not overwrite existing data (%s bytes will not be"
+                  " wiped)", (1024 * 1024) - remainder)
+    mib += 1
+
+  return mib
+
+
+def _CalcEta(time_taken, written, total_size):
+  """Calculates the ETA based on size written and total size.
+
+  @param time_taken: The time taken so far
+  @param written: amount written so far
+  @param total_size: The total size of data to be written
+  @return: The remaining time in seconds
+
+  """
+  avg_time = time_taken / float(written)
+  return (total_size - written) * avg_time
+
+
+def _WipeDisks(lu, instance, disks=None):
+  """Wipes instance disks.
+
+  @type lu: L{LogicalUnit}
+  @param lu: the logical unit on whose behalf we execute
+  @type instance: L{objects.Instance}
+  @param instance: the instance whose disks we should create
+  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
+  @param disks: Disk details; tuple contains disk index, disk object and the
+    start offset
+
+  """
+  node = instance.primary_node
+
+  if disks is None:
+    disks = [(idx, disk, 0)
+             for (idx, disk) in enumerate(instance.disks)]
+
+  for (_, device, _) in disks:
+    lu.cfg.SetDiskID(device, node)
+
+  logging.info("Pausing synchronization of disks of instance '%s'",
+               instance.name)
+  result = lu.rpc.call_blockdev_pause_resume_sync(node,
+                                                  (map(compat.snd, disks),
+                                                   instance),
+                                                  True)
+  result.Raise("Failed to pause disk synchronization on node '%s'" % node)
+
+  for idx, success in enumerate(result.payload):
+    if not success:
+      logging.warn("Pausing synchronization of disk %s of instance '%s'"
+                   " failed", idx, instance.name)
+
+  try:
+    for (idx, device, offset) in disks:
+      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
+      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
+      wipe_chunk_size = \
+        int(min(constants.MAX_WIPE_CHUNK,
+                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
+
+      size = device.size
+      last_output = 0
+      start_time = time.time()
+
+      if offset == 0:
+        info_text = ""
+      else:
+        info_text = (" (from %s to %s)" %
+                     (utils.FormatUnit(offset, "h"),
+                      utils.FormatUnit(size, "h")))
+
+      lu.LogInfo("* Wiping disk %s%s", idx, info_text)
+
+      logging.info("Wiping disk %d for instance %s on node %s using"
+                   " chunk size %s", idx, instance.name, node, wipe_chunk_size)
+
+      while offset < size:
+        wipe_size = min(wipe_chunk_size, size - offset)
+
+        logging.debug("Wiping disk %d, offset %s, chunk %s",
+                      idx, offset, wipe_size)
+
+        result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
+                                           wipe_size)
+        result.Raise("Could not wipe disk %d at offset %d for size %d" %
+                     (idx, offset, wipe_size))
+
+        now = time.time()
+        offset += wipe_size
+        if now - last_output >= 60:
+          eta = _CalcEta(now - start_time, offset, size)
+          lu.LogInfo(" - done: %.1f%% ETA: %s",
+                     offset / float(size) * 100, utils.FormatSeconds(eta))
+          last_output = now
+  finally:
+    logging.info("Resuming synchronization of disks for instance '%s'",
+                 instance.name)
+
+    result = lu.rpc.call_blockdev_pause_resume_sync(node,
+                                                    (map(compat.snd, disks),
+                                                     instance),
+                                                    False)
+
+    if result.fail_msg:
+      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
+                    node, result.fail_msg)
+    else:
+      for idx, success in enumerate(result.payload):
+        if not success:
+          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
+                        " failed", idx, instance.name)
+
+
+def _ExpandCheckDisks(instance, disks):
+  """Return the instance disks selected by the disks list
+
+  @type disks: list of L{objects.Disk} or None
+  @param disks: selected disks
+  @rtype: list of L{objects.Disk}
+  @return: selected instance disks to act on
+
+  """
+  if disks is None:
+    return instance.disks
+  else:
+    if not set(disks).issubset(instance.disks):
+      raise errors.ProgrammerError("Can only act on disks belonging to the"
+                                   " target instance")
+    return disks
+
+
+def _WaitForSync(lu, instance, disks=None, oneshot=False):
+  """Sleep and poll for an instance's disk to sync.
+
+  """
+  if not instance.disks or disks is not None and not disks:
+    return True
+
+  disks = _ExpandCheckDisks(instance, disks)
+
+  if not oneshot:
+    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
+
+  node = instance.primary_node
+
+  for dev in disks:
+    lu.cfg.SetDiskID(dev, node)
+
+  # TODO: Convert to utils.Retry
+
+  retries = 0
+  degr_retries = 10 # in seconds, as we sleep 1 second each time
+  while True:
+    max_time = 0
+    done = True
+    cumul_degraded = False
+    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
+    msg = rstats.fail_msg
+    if msg:
+      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
+      retries += 1
+      if retries >= 10:
+        raise errors.RemoteError("Can't contact node %s for mirror data,"
+                                 " aborting." % node)
+      time.sleep(6)
+      continue
+    rstats = rstats.payload
+    retries = 0
+    for i, mstat in enumerate(rstats):
+      if mstat is None:
+        lu.LogWarning("Can't compute data for node %s/%s",
+                      node, disks[i].iv_name)
+        continue
+
+      cumul_degraded = (cumul_degraded or
+                        (mstat.is_degraded and mstat.sync_percent is None))
+      if mstat.sync_percent is not None:
+        done = False
+        if mstat.estimated_time is not None:
+          rem_time = ("%s remaining (estimated)" %
+                      utils.FormatSeconds(mstat.estimated_time))
+          max_time = mstat.estimated_time
+        else:
+          rem_time = "no time estimate"
+        lu.LogInfo("- device %s: %5.2f%% done, %s",
+                   disks[i].iv_name, mstat.sync_percent, rem_time)
+
+    # if we're done but degraded, let's do a few small retries, to
+    # make sure we see a stable and not transient situation; therefore
+    # we force restart of the loop
+    if (done or oneshot) and cumul_degraded and degr_retries > 0:
+      logging.info("Degraded disks found, %d retries left", degr_retries)
+      degr_retries -= 1
+      time.sleep(1)
+      continue
+
+    if done or oneshot:
+      break
+
+    time.sleep(min(60, max_time))
+
+  if done:
+    lu.LogInfo("Instance %s's disks are in sync", instance.name)
+
+  return not cumul_degraded
+
+
+def _ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
+  """Shutdown block devices of an instance.
+
+  This does the shutdown on all nodes of the instance.
+
+  If the ignore_primary is false, errors on the primary node are
+  ignored.
+
+  """
+  all_result = True
+  disks = _ExpandCheckDisks(instance, disks)
+
+  for disk in disks:
+    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
+      lu.cfg.SetDiskID(top_disk, node)
+      result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
+      msg = result.fail_msg
+      if msg:
+        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
+                      disk.iv_name, node, msg)
+        if ((node == instance.primary_node and not ignore_primary) or
+            (node != instance.primary_node and not result.offline)):
+          all_result = False
+  return all_result
+
+
+def _SafeShutdownInstanceDisks(lu, instance, disks=None):
+  """Shutdown block devices of an instance.
+
+  This function checks if an instance is running, before calling
+  _ShutdownInstanceDisks.
+
+  """
+  _CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
+  _ShutdownInstanceDisks(lu, instance, disks=disks)
+
+
+def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
+                           ignore_size=False):
+  """Prepare the block devices for an instance.
+
+  This sets up the block devices on all nodes.
+
+  @type lu: L{LogicalUnit}
+  @param lu: the logical unit on whose behalf we execute
+  @type instance: L{objects.Instance}
+  @param instance: the instance for whose disks we assemble
+  @type disks: list of L{objects.Disk} or None
+  @param disks: which disks to assemble (or all, if None)
+  @type ignore_secondaries: boolean
+  @param ignore_secondaries: if true, errors on secondary nodes
+      won't result in an error return from the function
+  @type ignore_size: boolean
+  @param ignore_size: if true, the current known size of the disk
+      will not be used during the disk activation, useful for cases
+      when the size is wrong
+  @return: False if the operation failed, otherwise a list of
+      (host, instance_visible_name, node_visible_name)
+      with the mapping from node devices to instance devices
+
+  """
+  device_info = []
+  disks_ok = True
+  iname = instance.name
+  disks = _ExpandCheckDisks(instance, disks)
+
+  # With the two passes mechanism we try to reduce the window of
+  # opportunity for the race condition of switching DRBD to primary
+  # before handshaking occured, but we do not eliminate it
+
+  # The proper fix would be to wait (with some limits) until the
+  # connection has been made and drbd transitions from WFConnection
+  # into any other network-connected state (Connected, SyncTarget,
+  # SyncSource, etc.)
+
+  # 1st pass, assemble on all nodes in secondary mode
+  for idx, inst_disk in enumerate(disks):
+    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
+      if ignore_size:
+        node_disk = node_disk.Copy()
+        node_disk.UnsetSize()
+      lu.cfg.SetDiskID(node_disk, node)
+      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
+                                             False, idx)
+      msg = result.fail_msg
+      if msg:
+        is_offline_secondary = (node in instance.secondary_nodes and
+                                result.offline)
+        lu.LogWarning("Could not prepare block device %s on node %s"
+                      " (is_primary=False, pass=1): %s",
+                      inst_disk.iv_name, node, msg)
+        if not (ignore_secondaries or is_offline_secondary):
+          disks_ok = False
+
+  # FIXME: race condition on drbd migration to primary
+
+  # 2nd pass, do only the primary node
+  for idx, inst_disk in enumerate(disks):
+    dev_path = None
+
+    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
+      if node != instance.primary_node:
+        continue
+      if ignore_size:
+        node_disk = node_disk.Copy()
+        node_disk.UnsetSize()
+      lu.cfg.SetDiskID(node_disk, node)
+      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
+                                             True, idx)
+      msg = result.fail_msg
+      if msg:
+        lu.LogWarning("Could not prepare block device %s on node %s"
+                      " (is_primary=True, pass=2): %s",
+                      inst_disk.iv_name, node, msg)
+        disks_ok = False
+      else:
+        dev_path = result.payload
+
+    device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
+
+  # leave the disks configured for the primary node
+  # this is a workaround that would be fixed better by
+  # improving the logical/physical id handling
+  for disk in disks:
+    lu.cfg.SetDiskID(disk, instance.primary_node)
+
+  return disks_ok, device_info
+
+
+def _StartInstanceDisks(lu, instance, force):
+  """Start the disks of an instance.
+
+  """
+  disks_ok, _ = _AssembleInstanceDisks(lu, instance,
+                                       ignore_secondaries=force)
+  if not disks_ok:
+    _ShutdownInstanceDisks(lu, instance)
+    if force is not None and not force:
+      lu.LogWarning("",
+                    hint=("If the message above refers to a secondary node,"
+                          " you can retry the operation using '--force'"))
+    raise errors.OpExecError("Disk consistency error")
+
+
+class LUInstanceGrowDisk(LogicalUnit):
+  """Grow a disk of an instance.
+
+  """
+  HPATH = "disk-grow"
+  HTYPE = constants.HTYPE_INSTANCE
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+    self.needed_locks[locking.LEVEL_NODE] = []
+    self.needed_locks[locking.LEVEL_NODE_RES] = []
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      self._LockInstancesNodes()
+    elif level == locking.LEVEL_NODE_RES:
+      # Copy node locks
+      self.needed_locks[locking.LEVEL_NODE_RES] = \
+        _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    This runs on the master, the primary and all the secondaries.
+
+    """
+    env = {
+      "DISK": self.op.disk,
+      "AMOUNT": self.op.amount,
+      "ABSOLUTE": self.op.absolute,
+      }
+    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
+    return env
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
+    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
+    return (nl, nl)
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    This checks that the instance is in the cluster.
+
+    """
+    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+    assert instance is not None, \
+      "Cannot retrieve locked instance %s" % self.op.instance_name
+    nodenames = list(instance.all_nodes)
+    for node in nodenames:
+      _CheckNodeOnline(self, node)
+
+    self.instance = instance
+
+    if instance.disk_template not in constants.DTS_GROWABLE:
+      raise errors.OpPrereqError("Instance's disk layout does not support"
+                                 " growing", errors.ECODE_INVAL)
+
+    self.disk = instance.FindDisk(self.op.disk)
+
+    if self.op.absolute:
+      self.target = self.op.amount
+      self.delta = self.target - self.disk.size
+      if self.delta < 0:
+        raise errors.OpPrereqError("Requested size (%s) is smaller than "
+                                   "current disk size (%s)" %
+                                   (utils.FormatUnit(self.target, "h"),
+                                    utils.FormatUnit(self.disk.size, "h")),
+                                   errors.ECODE_STATE)
+    else:
+      self.delta = self.op.amount
+      self.target = self.disk.size + self.delta
+      if self.delta < 0:
+        raise errors.OpPrereqError("Requested increment (%s) is negative" %
+                                   utils.FormatUnit(self.delta, "h"),
+                                   errors.ECODE_INVAL)
+
+    self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
+
+  def _CheckDiskSpace(self, nodenames, req_vgspace):
+    template = self.instance.disk_template
+    if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
+      # TODO: check the free disk space for file, when that feature will be
+      # supported
+      nodes = map(self.cfg.GetNodeInfo, nodenames)
+      es_nodes = filter(lambda n: _IsExclusiveStorageEnabledNode(self.cfg, n),
+                        nodes)
+      if es_nodes:
+        # With exclusive storage we need to something smarter than just looking
+        # at free space; for now, let's simply abort the operation.
+        raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
+                                   " is enabled", errors.ECODE_STATE)
+      _CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
+
+  def Exec(self, feedback_fn):
+    """Execute disk grow.
+
+    """
+    instance = self.instance
+    disk = self.disk
+
+    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
+    assert (self.owned_locks(locking.LEVEL_NODE) ==
+            self.owned_locks(locking.LEVEL_NODE_RES))
+
+    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
+
+    disks_ok, _ = _AssembleInstanceDisks(self, self.instance, disks=[disk])
+    if not disks_ok:
+      raise errors.OpExecError("Cannot activate block device to grow")
+
+    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
+                (self.op.disk, instance.name,
+                 utils.FormatUnit(self.delta, "h"),
+                 utils.FormatUnit(self.target, "h")))
+
+    # First run all grow ops in dry-run mode
+    for node in instance.all_nodes:
+      self.cfg.SetDiskID(disk, node)
+      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
+                                           True, True)
+      result.Raise("Dry-run grow request failed to node %s" % node)
+
+    if wipe_disks:
+      # Get disk size from primary node for wiping
+      result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk])
+      result.Raise("Failed to retrieve disk size from node '%s'" %
+                   instance.primary_node)
+
+      (disk_size_in_bytes, ) = result.payload
+
+      if disk_size_in_bytes is None:
+        raise errors.OpExecError("Failed to retrieve disk size from primary"
+                                 " node '%s'" % instance.primary_node)
+
+      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
+
+      assert old_disk_size >= disk.size, \
+        ("Retrieved disk size too small (got %s, should be at least %s)" %
+         (old_disk_size, disk.size))
+    else:
+      old_disk_size = None
+
+    # We know that (as far as we can test) operations across different
+    # nodes will succeed, time to run it for real on the backing storage
+    for node in instance.all_nodes:
+      self.cfg.SetDiskID(disk, node)
+      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
+                                           False, True)
+      result.Raise("Grow request failed to node %s" % node)
+
+    # And now execute it for logical storage, on the primary node
+    node = instance.primary_node
+    self.cfg.SetDiskID(disk, node)
+    result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
+                                         False, False)
+    result.Raise("Grow request failed to node %s" % node)
+
+    disk.RecordGrow(self.delta)
+    self.cfg.Update(instance, feedback_fn)
+
+    # Changes have been recorded, release node lock
+    _ReleaseLocks(self, locking.LEVEL_NODE)
+
+    # Downgrade lock while waiting for sync
+    self.glm.downgrade(locking.LEVEL_INSTANCE)
+
+    assert wipe_disks ^ (old_disk_size is None)
+
+    if wipe_disks:
+      assert instance.disks[self.op.disk] == disk
+
+      # Wipe newly added disk space
+      _WipeDisks(self, instance,
+                 disks=[(self.op.disk, disk, old_disk_size)])
+
+    if self.op.wait_for_sync:
+      disk_abort = not _WaitForSync(self, instance, disks=[disk])
+      if disk_abort:
+        self.LogWarning("Disk syncing has not returned a good status; check"
+                        " the instance")
+      if instance.admin_state != constants.ADMINST_UP:
+        _SafeShutdownInstanceDisks(self, instance, disks=[disk])
+    elif instance.admin_state != constants.ADMINST_UP:
+      self.LogWarning("Not shutting down the disk even if the instance is"
+                      " not supposed to be running because no wait for"
+                      " sync mode was requested")
+
+    assert self.owned_locks(locking.LEVEL_NODE_RES)
+    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
+
+
+class LUInstanceReplaceDisks(LogicalUnit):
+  """Replace the disks of an instance.
+
+  """
+  HPATH = "mirrors-replace"
+  HTYPE = constants.HTYPE_INSTANCE
+  REQ_BGL = False
+
+  def CheckArguments(self):
+    """Check arguments.
+
+    """
+    remote_node = self.op.remote_node
+    ialloc = self.op.iallocator
+    if self.op.mode == constants.REPLACE_DISK_CHG:
+      if remote_node is None and ialloc is None:
+        raise errors.OpPrereqError("When changing the secondary either an"
+                                   " iallocator script must be used or the"
+                                   " new node given", errors.ECODE_INVAL)
+      else:
+        _CheckIAllocatorOrNode(self, "iallocator", "remote_node")
+
+    elif remote_node is not None or ialloc is not None:
+      # Not replacing the secondary
+      raise errors.OpPrereqError("The iallocator and new node options can"
+                                 " only be used when changing the"
+                                 " secondary node", errors.ECODE_INVAL)
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+
+    assert locking.LEVEL_NODE not in self.needed_locks
+    assert locking.LEVEL_NODE_RES not in self.needed_locks
+    assert locking.LEVEL_NODEGROUP not in self.needed_locks
+
+    assert self.op.iallocator is None or self.op.remote_node is None, \
+      "Conflicting options"
+
+    if self.op.remote_node is not None:
+      self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
+
+      # Warning: do not remove the locking of the new secondary here
+      # unless DRBD8.AddChildren is changed to work in parallel;
+      # currently it doesn't since parallel invocations of
+      # FindUnusedMinor will conflict
+      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
+      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+    else:
+      self.needed_locks[locking.LEVEL_NODE] = []
+      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+      if self.op.iallocator is not None:
+        # iallocator will select a new node in the same group
+        self.needed_locks[locking.LEVEL_NODEGROUP] = []
+        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
+
+    self.needed_locks[locking.LEVEL_NODE_RES] = []
+
+    self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
+                                   self.op.iallocator, self.op.remote_node,
+                                   self.op.disks, self.op.early_release,
+                                   self.op.ignore_ipolicy)
+
+    self.tasklets = [self.replacer]
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODEGROUP:
+      assert self.op.remote_node is None
+      assert self.op.iallocator is not None
+      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
+
+      self.share_locks[locking.LEVEL_NODEGROUP] = 1
+      # Lock all groups used by instance optimistically; this requires going
+      # via the node before it's locked, requiring verification later on
+      self.needed_locks[locking.LEVEL_NODEGROUP] = \
+        self.cfg.GetInstanceNodeGroups(self.op.instance_name)
+
+    elif level == locking.LEVEL_NODE:
+      if self.op.iallocator is not None:
+        assert self.op.remote_node is None
+        assert not self.needed_locks[locking.LEVEL_NODE]
+        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
+
+        # Lock member nodes of all locked groups
+        self.needed_locks[locking.LEVEL_NODE] = \
+          [node_name
+           for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
+           for node_name in self.cfg.GetNodeGroup(group_uuid).members]
+      else:
+        assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
+
+        self._LockInstancesNodes()
+
+    elif level == locking.LEVEL_NODE_RES:
+      # Reuse node locks
+      self.needed_locks[locking.LEVEL_NODE_RES] = \
+        self.needed_locks[locking.LEVEL_NODE]
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    This runs on the master, the primary and all the secondaries.
+
+    """
+    instance = self.replacer.instance
+    env = {
+      "MODE": self.op.mode,
+      "NEW_SECONDARY": self.op.remote_node,
+      "OLD_SECONDARY": instance.secondary_nodes[0],
+      }
+    env.update(_BuildInstanceHookEnvByObject(self, instance))
+    return env
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
+    instance = self.replacer.instance
+    nl = [
+      self.cfg.GetMasterNode(),
+      instance.primary_node,
+      ]
+    if self.op.remote_node is not None:
+      nl.append(self.op.remote_node)
+    return nl, nl
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    """
+    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
+            self.op.iallocator is None)
+
+    # Verify if node group locks are still correct
+    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
+    if owned_groups:
+      _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
+
+    return LogicalUnit.CheckPrereq(self)
+
+
+class LUInstanceActivateDisks(NoHooksLU):
+  """Bring up an instance's disks.
+
+  """
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+    self.needed_locks[locking.LEVEL_NODE] = []
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      self._LockInstancesNodes()
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    This checks that the instance is in the cluster.
+
+    """
+    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+    assert self.instance is not None, \
+      "Cannot retrieve locked instance %s" % self.op.instance_name
+    _CheckNodeOnline(self, self.instance.primary_node)
+
+  def Exec(self, feedback_fn):
+    """Activate the disks.
+
+    """
+    disks_ok, disks_info = \
+              _AssembleInstanceDisks(self, self.instance,
+                                     ignore_size=self.op.ignore_size)
+    if not disks_ok:
+      raise errors.OpExecError("Cannot activate block devices")
+
+    if self.op.wait_for_sync:
+      if not _WaitForSync(self, self.instance):
+        raise errors.OpExecError("Some disks of the instance are degraded!")
+
+    return disks_info
+
+
+class LUInstanceDeactivateDisks(NoHooksLU):
+  """Shutdown an instance's disks.
+
+  """
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+    self.needed_locks[locking.LEVEL_NODE] = []
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      self._LockInstancesNodes()
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    This checks that the instance is in the cluster.
+
+    """
+    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+    assert self.instance is not None, \
+      "Cannot retrieve locked instance %s" % self.op.instance_name
+
+  def Exec(self, feedback_fn):
+    """Deactivate the disks
+
+    """
+    instance = self.instance
+    if self.op.force:
+      _ShutdownInstanceDisks(self, instance)
+    else:
+      _SafeShutdownInstanceDisks(self, instance)
+
+
+def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
+                               ldisk=False):
+  """Check that mirrors are not degraded.
+
+  @attention: The device has to be annotated already.
+
+  The ldisk parameter, if True, will change the test from the
+  is_degraded attribute (which represents overall non-ok status for
+  the device(s)) to the ldisk (representing the local storage status).
+
+  """
+  lu.cfg.SetDiskID(dev, node)
+
+  result = True
+
+  if on_primary or dev.AssembleOnSecondary():
+    rstats = lu.rpc.call_blockdev_find(node, dev)
+    msg = rstats.fail_msg
+    if msg:
+      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
+      result = False
+    elif not rstats.payload:
+      lu.LogWarning("Can't find disk on node %s", node)
+      result = False
+    else:
+      if ldisk:
+        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
+      else:
+        result = result and not rstats.payload.is_degraded
+
+  if dev.children:
+    for child in dev.children:
+      result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
+                                                     on_primary)
+
+  return result
+
+
+def _CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
+  """Wrapper around L{_CheckDiskConsistencyInner}.
+
+  """
+  (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
+  return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
+                                    ldisk=ldisk)
+
+
+def _BlockdevFind(lu, node, dev, instance):
+  """Wrapper around call_blockdev_find to annotate diskparams.
+
+  @param lu: A reference to the lu object
+  @param node: The node to call out
+  @param dev: The device to find
+  @param instance: The instance object the device belongs to
+  @returns The result of the rpc call
+
+  """
+  (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
+  return lu.rpc.call_blockdev_find(node, disk)
+
+
+def _GenerateUniqueNames(lu, exts):
+  """Generate a suitable LV name.
+
+  This will generate a logical volume name for the given instance.
+
+  """
+  results = []
+  for val in exts:
+    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
+    results.append("%s%s" % (new_id, val))
+  return results
+
+
+class TLReplaceDisks(Tasklet):
+  """Replaces disks for an instance.
+
+  Note: Locking is not within the scope of this class.
+
+  """
+  def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
+               disks, early_release, ignore_ipolicy):
+    """Initializes this class.
+
+    """
+    Tasklet.__init__(self, lu)
+
+    # Parameters
+    self.instance_name = instance_name
+    self.mode = mode
+    self.iallocator_name = iallocator_name
+    self.remote_node = remote_node
+    self.disks = disks
+    self.early_release = early_release
+    self.ignore_ipolicy = ignore_ipolicy
+
+    # Runtime data
+    self.instance = None
+    self.new_node = None
+    self.target_node = None
+    self.other_node = None
+    self.remote_node_info = None
+    self.node_secondary_ip = None
+
+  @staticmethod
+  def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
+    """Compute a new secondary node using an IAllocator.
+
+    """
+    req = iallocator.IAReqRelocate(name=instance_name,
+                                   relocate_from=list(relocate_from))
+    ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
+
+    ial.Run(iallocator_name)
+
+    if not ial.success:
+      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
+                                 " %s" % (iallocator_name, ial.info),
+                                 errors.ECODE_NORES)
+
+    remote_node_name = ial.result[0]
+
+    lu.LogInfo("Selected new secondary for instance '%s': %s",
+               instance_name, remote_node_name)
+
+    return remote_node_name
+
+  def _FindFaultyDisks(self, node_name):
+    """Wrapper for L{_FindFaultyInstanceDisks}.
+
+    """
+    return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
+                                    node_name, True)
+
+  def _CheckDisksActivated(self, instance):
+    """Checks if the instance disks are activated.
+
+    @param instance: The instance to check disks
+    @return: True if they are activated, False otherwise
+
+    """
+    nodes = instance.all_nodes
+
+    for idx, dev in enumerate(instance.disks):
+      for node in nodes:
+        self.lu.LogInfo("Checking disk/%d on %s", idx, node)
+        self.cfg.SetDiskID(dev, node)
+
+        result = _BlockdevFind(self, node, dev, instance)
+
+        if result.offline:
+          continue
+        elif result.fail_msg or not result.payload:
+          return False
+
+    return True
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    This checks that the instance is in the cluster.
+
+    """
+    self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
+    assert instance is not None, \
+      "Cannot retrieve locked instance %s" % self.instance_name
+
+    if instance.disk_template != constants.DT_DRBD8:
+      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
+                                 " instances", errors.ECODE_INVAL)
+
+    if len(instance.secondary_nodes) != 1:
+      raise errors.OpPrereqError("The instance has a strange layout,"
+                                 " expected one secondary but found %d" %
+                                 len(instance.secondary_nodes),
+                                 errors.ECODE_FAULT)
+
+    instance = self.instance
+    secondary_node = instance.secondary_nodes[0]
+
+    if self.iallocator_name is None:
+      remote_node = self.remote_node
+    else:
+      remote_node = self._RunAllocator(self.lu, self.iallocator_name,
+                                       instance.name, instance.secondary_nodes)
+
+    if remote_node is None:
+      self.remote_node_info = None
+    else:
+      assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \
+             "Remote node '%s' is not locked" % remote_node
+
+      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
+      assert self.remote_node_info is not None, \
+        "Cannot retrieve locked node %s" % remote_node
+
+    if remote_node == self.instance.primary_node:
+      raise errors.OpPrereqError("The specified node is the primary node of"
+                                 " the instance", errors.ECODE_INVAL)
+
+    if remote_node == secondary_node:
+      raise errors.OpPrereqError("The specified node is already the"
+                                 " secondary node of the instance",
+                                 errors.ECODE_INVAL)
+
+    if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
+                                    constants.REPLACE_DISK_CHG):
+      raise errors.OpPrereqError("Cannot specify disks to be replaced",
+                                 errors.ECODE_INVAL)
+
+    if self.mode == constants.REPLACE_DISK_AUTO:
+      if not self._CheckDisksActivated(instance):
+        raise errors.OpPrereqError("Please run activate-disks on instance %s"
+                                   " first" % self.instance_name,
+                                   errors.ECODE_STATE)
+      faulty_primary = self._FindFaultyDisks(instance.primary_node)
+      faulty_secondary = self._FindFaultyDisks(secondary_node)
+
+      if faulty_primary and faulty_secondary:
+        raise errors.OpPrereqError("Instance %s has faulty disks on more than"
+                                   " one node and can not be repaired"
+                                   " automatically" % self.instance_name,
+                                   errors.ECODE_STATE)
+
+      if faulty_primary:
+        self.disks = faulty_primary
+        self.target_node = instance.primary_node
+        self.other_node = secondary_node
+        check_nodes = [self.target_node, self.other_node]
+      elif faulty_secondary:
+        self.disks = faulty_secondary
+        self.target_node = secondary_node
+        self.other_node = instance.primary_node
+        check_nodes = [self.target_node, self.other_node]
+      else:
+        self.disks = []
+        check_nodes = []
+
+    else:
+      # Non-automatic modes
+      if self.mode == constants.REPLACE_DISK_PRI:
+        self.target_node = instance.primary_node
+        self.other_node = secondary_node
+        check_nodes = [self.target_node, self.other_node]
+
+      elif self.mode == constants.REPLACE_DISK_SEC:
+        self.target_node = secondary_node
+        self.other_node = instance.primary_node
+        check_nodes = [self.target_node, self.other_node]
+
+      elif self.mode == constants.REPLACE_DISK_CHG:
+        self.new_node = remote_node
+        self.other_node = instance.primary_node
+        self.target_node = secondary_node
+        check_nodes = [self.new_node, self.other_node]
+
+        _CheckNodeNotDrained(self.lu, remote_node)
+        _CheckNodeVmCapable(self.lu, remote_node)
+
+        old_node_info = self.cfg.GetNodeInfo(secondary_node)
+        assert old_node_info is not None
+        if old_node_info.offline and not self.early_release:
+          # doesn't make sense to delay the release
+          self.early_release = True
+          self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
+                          " early-release mode", secondary_node)
+
+      else:
+        raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
+                                     self.mode)
+
+      # If not specified all disks should be replaced
+      if not self.disks:
+        self.disks = range(len(self.instance.disks))
+
+    # TODO: This is ugly, but right now we can't distinguish between internal
+    # submitted opcode and external one. We should fix that.
+    if self.remote_node_info:
+      # We change the node, lets verify it still meets instance policy
+      new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
+      cluster = self.cfg.GetClusterInfo()
+      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
+                                                              new_group_info)
+      _CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
+                              self.cfg, ignore=self.ignore_ipolicy)
+
+    for node in check_nodes:
+      _CheckNodeOnline(self.lu, node)
+
+    touched_nodes = frozenset(node_name for node_name in [self.new_node,
+                                                          self.other_node,
+                                                          self.target_node]
+                              if node_name is not None)
+
+    # Release unneeded node and node resource locks
+    _ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
+    _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
+    _ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
+
+    # Release any owned node group
+    _ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
+
+    # Check whether disks are valid
+    for disk_idx in self.disks:
+      instance.FindDisk(disk_idx)
+
+    # Get secondary node IP addresses
+    self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node)
+                                  in self.cfg.GetMultiNodeInfo(touched_nodes))
+
+  def Exec(self, feedback_fn):
+    """Execute disk replacement.
+
+    This dispatches the disk replacement to the appropriate handler.
+
+    """
+    if __debug__:
+      # Verify owned locks before starting operation
+      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
+      assert set(owned_nodes) == set(self.node_secondary_ip), \
+          ("Incorrect node locks, owning %s, expected %s" %
+           (owned_nodes, self.node_secondary_ip.keys()))
+      assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
+              self.lu.owned_locks(locking.LEVEL_NODE_RES))
+      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
+
+      owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
+      assert list(owned_instances) == [self.instance_name], \
+          "Instance '%s' not locked" % self.instance_name
+
+      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
+          "Should not own any node group lock at this point"
+
+    if not self.disks:
+      feedback_fn("No disks need replacement for instance '%s'" %
+                  self.instance.name)
+      return
+
+    feedback_fn("Replacing disk(s) %s for instance '%s'" %
+                (utils.CommaJoin(self.disks), self.instance.name))
+    feedback_fn("Current primary node: %s" % self.instance.primary_node)
+    feedback_fn("Current seconary node: %s" %
+                utils.CommaJoin(self.instance.secondary_nodes))
+
+    activate_disks = (self.instance.admin_state != constants.ADMINST_UP)
+
+    # Activate the instance disks if we're replacing them on a down instance
+    if activate_disks:
+      _StartInstanceDisks(self.lu, self.instance, True)
+
+    try:
+      # Should we replace the secondary node?
+      if self.new_node is not None:
+        fn = self._ExecDrbd8Secondary
+      else:
+        fn = self._ExecDrbd8DiskOnly
+
+      result = fn(feedback_fn)
+    finally:
+      # Deactivate the instance disks if we're replacing them on a
+      # down instance
+      if activate_disks:
+        _SafeShutdownInstanceDisks(self.lu, self.instance)
+
+    assert not self.lu.owned_locks(locking.LEVEL_NODE)
+
+    if __debug__:
+      # Verify owned locks
+      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
+      nodes = frozenset(self.node_secondary_ip)
+      assert ((self.early_release and not owned_nodes) or
+              (not self.early_release and not (set(owned_nodes) - nodes))), \
+        ("Not owning the correct locks, early_release=%s, owned=%r,"
+         " nodes=%r" % (self.early_release, owned_nodes, nodes))
+
+    return result
+
+  def _CheckVolumeGroup(self, nodes):
+    self.lu.LogInfo("Checking volume groups")
+
+    vgname = self.cfg.GetVGName()
+
+    # Make sure volume group exists on all involved nodes
+    results = self.rpc.call_vg_list(nodes)
+    if not results:
+      raise errors.OpExecError("Can't list volume groups on the nodes")
+
+    for node in nodes:
+      res = results[node]
+      res.Raise("Error checking node %s" % node)
+      if vgname not in res.payload:
+        raise errors.OpExecError("Volume group '%s' not found on node %s" %
+                                 (vgname, node))
+
+  def _CheckDisksExistence(self, nodes):
+    # Check disk existence
+    for idx, dev in enumerate(self.instance.disks):
+      if idx not in self.disks:
+        continue
+
+      for node in nodes:
+        self.lu.LogInfo("Checking disk/%d on %s", idx, node)
+        self.cfg.SetDiskID(dev, node)
+
+        result = _BlockdevFind(self, node, dev, self.instance)
+
+        msg = result.fail_msg
+        if msg or not result.payload:
+          if not msg:
+            msg = "disk not found"
+          raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
+                                   (idx, node, msg))
+
+  def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
+    for idx, dev in enumerate(self.instance.disks):
+      if idx not in self.disks:
+        continue
+
+      self.lu.LogInfo("Checking disk/%d consistency on node %s" %
+                      (idx, node_name))
+
+      if not _CheckDiskConsistency(self.lu, self.instance, dev, node_name,
+                                   on_primary, ldisk=ldisk):
+        raise errors.OpExecError("Node %s has degraded storage, unsafe to"
+                                 " replace disks for instance %s" %
+                                 (node_name, self.instance.name))
+
+  def _CreateNewStorage(self, node_name):
+    """Create new storage on the primary or secondary node.
+
+    This is only used for same-node replaces, not for changing the
+    secondary node, hence we don't want to modify the existing disk.
+
+    """
+    iv_names = {}
+
+    disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
+    for idx, dev in enumerate(disks):
+      if idx not in self.disks:
+        continue
+
+      self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx)
+
+      self.cfg.SetDiskID(dev, node_name)
+
+      lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
+      names = _GenerateUniqueNames(self.lu, lv_names)
+
+      (data_disk, meta_disk) = dev.children
+      vg_data = data_disk.logical_id[0]
+      lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
+                             logical_id=(vg_data, names[0]),
+                             params=data_disk.params)
+      vg_meta = meta_disk.logical_id[0]
+      lv_meta = objects.Disk(dev_type=constants.LD_LV,
+                             size=constants.DRBD_META_SIZE,
+                             logical_id=(vg_meta, names[1]),
+                             params=meta_disk.params)
+
+      new_lvs = [lv_data, lv_meta]
+      old_lvs = [child.Copy() for child in dev.children]
+      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
+      excl_stor = _IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
+
+      # we pass force_create=True to force the LVM creation
+      for new_lv in new_lvs:
+        _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
+                             _GetInstanceInfoText(self.instance), False,
+                             excl_stor)
+
+    return iv_names
+
+  def _CheckDevices(self, node_name, iv_names):
+    for name, (dev, _, _) in iv_names.iteritems():
+      self.cfg.SetDiskID(dev, node_name)
+
+      result = _BlockdevFind(self, node_name, dev, self.instance)
+
+      msg = result.fail_msg
+      if msg or not result.payload:
+        if not msg:
+          msg = "disk not found"
+        raise errors.OpExecError("Can't find DRBD device %s: %s" %
+                                 (name, msg))
+
+      if result.payload.is_degraded:
+        raise errors.OpExecError("DRBD device %s is degraded!" % name)
+
+  def _RemoveOldStorage(self, node_name, iv_names):
+    for name, (_, old_lvs, _) in iv_names.iteritems():
+      self.lu.LogInfo("Remove logical volumes for %s", name)
+
+      for lv in old_lvs:
+        self.cfg.SetDiskID(lv, node_name)
+
+        msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
+        if msg:
+          self.lu.LogWarning("Can't remove old LV: %s", msg,
+                             hint="remove unused LVs manually")
+
+  def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
+    """Replace a disk on the primary or secondary for DRBD 8.
+
+    The algorithm for replace is quite complicated:
+
+      1. for each disk to be replaced:
+
+        1. create new LVs on the target node with unique names
+        1. detach old LVs from the drbd device
+        1. rename old LVs to name_replaced.<time_t>
+        1. rename new LVs to old LVs
+        1. attach the new LVs (with the old names now) to the drbd device
+
+      1. wait for sync across all devices
+
+      1. for each modified disk:
+
+        1. remove old LVs (which have the name name_replaces.<time_t>)
+
+    Failures are not very well handled.
+
+    """
+    steps_total = 6
+
+    # Step: check device activation
+    self.lu.LogStep(1, steps_total, "Check device existence")
+    self._CheckDisksExistence([self.other_node, self.target_node])
+    self._CheckVolumeGroup([self.target_node, self.other_node])
+
+    # Step: check other node consistency
+    self.lu.LogStep(2, steps_total, "Check peer consistency")
+    self._CheckDisksConsistency(self.other_node,
+                                self.other_node == self.instance.primary_node,
+                                False)
+
+    # Step: create new storage
+    self.lu.LogStep(3, steps_total, "Allocate new storage")
+    iv_names = self._CreateNewStorage(self.target_node)
+
+    # Step: for each lv, detach+rename*2+attach
+    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
+    for dev, old_lvs, new_lvs in iv_names.itervalues():
+      self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
+
+      result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
+                                                     old_lvs)
+      result.Raise("Can't detach drbd from local storage on node"
+                   " %s for device %s" % (self.target_node, dev.iv_name))
+      #dev.children = []
+      #cfg.Update(instance)
+
+      # ok, we created the new LVs, so now we know we have the needed
+      # storage; as such, we proceed on the target node to rename
+      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
+      # using the assumption that logical_id == physical_id (which in
+      # turn is the unique_id on that node)
+
+      # FIXME(iustin): use a better name for the replaced LVs
+      temp_suffix = int(time.time())
+      ren_fn = lambda d, suff: (d.physical_id[0],
+                                d.physical_id[1] + "_replaced-%s" % suff)
+
+      # Build the rename list based on what LVs exist on the node
+      rename_old_to_new = []
+      for to_ren in old_lvs:
+        result = self.rpc.call_blockdev_find(self.target_node, to_ren)
+        if not result.fail_msg and result.payload:
+          # device exists
+          rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
+
+      self.lu.LogInfo("Renaming the old LVs on the target node")
+      result = self.rpc.call_blockdev_rename(self.target_node,
+                                             rename_old_to_new)
+      result.Raise("Can't rename old LVs on node %s" % self.target_node)
+
+      # Now we rename the new LVs to the old LVs
+      self.lu.LogInfo("Renaming the new LVs on the target node")
+      rename_new_to_old = [(new, old.physical_id)
+                           for old, new in zip(old_lvs, new_lvs)]
+      result = self.rpc.call_blockdev_rename(self.target_node,
+                                             rename_new_to_old)
+      result.Raise("Can't rename new LVs on node %s" % self.target_node)
+
+      # Intermediate steps of in memory modifications
+      for old, new in zip(old_lvs, new_lvs):
+        new.logical_id = old.logical_id
+        self.cfg.SetDiskID(new, self.target_node)
+
+      # We need to modify old_lvs so that removal later removes the
+      # right LVs, not the newly added ones; note that old_lvs is a
+      # copy here
+      for disk in old_lvs:
+        disk.logical_id = ren_fn(disk, temp_suffix)
+        self.cfg.SetDiskID(disk, self.target_node)
+
+      # Now that the new lvs have the old name, we can add them to the device
+      self.lu.LogInfo("Adding new mirror component on %s", self.target_node)
+      result = self.rpc.call_blockdev_addchildren(self.target_node,
+                                                  (dev, self.instance), new_lvs)
+      msg = result.fail_msg
+      if msg:
+        for new_lv in new_lvs:
+          msg2 = self.rpc.call_blockdev_remove(self.target_node,
+                                               new_lv).fail_msg
+          if msg2:
+            self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
+                               hint=("cleanup manually the unused logical"
+                                     "volumes"))
+        raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
+
+    cstep = itertools.count(5)
+
+    if self.early_release:
+      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
+      self._RemoveOldStorage(self.target_node, iv_names)
+      # TODO: Check if releasing locks early still makes sense
+      _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
+    else:
+      # Release all resource locks except those used by the instance
+      _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
+                    keep=self.node_secondary_ip.keys())
+
+    # Release all node locks while waiting for sync
+    _ReleaseLocks(self.lu, locking.LEVEL_NODE)
+
+    # TODO: Can the instance lock be downgraded here? Take the optional disk
+    # shutdown in the caller into consideration.
+
+    # Wait for sync
+    # This can fail as the old devices are degraded and _WaitForSync
+    # does a combined result over all disks, so we don't check its return value
+    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
+    _WaitForSync(self.lu, self.instance)
+
+    # Check all devices manually
+    self._CheckDevices(self.instance.primary_node, iv_names)
+
+    # Step: remove old storage
+    if not self.early_release:
+      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
+      self._RemoveOldStorage(self.target_node, iv_names)
+
+  def _ExecDrbd8Secondary(self, feedback_fn):
+    """Replace the secondary node for DRBD 8.
+
+    The algorithm for replace is quite complicated:
+      - for all disks of the instance:
+        - create new LVs on the new node with same names
+        - shutdown the drbd device on the old secondary
+        - disconnect the drbd network on the primary
+        - create the drbd device on the new secondary
+        - network attach the drbd on the primary, using an artifice:
+          the drbd code for Attach() will connect to the network if it
+          finds a device which is connected to the good local disks but
+          not network enabled
+      - wait for sync across all devices
+      - remove all disks from the old secondary
+
+    Failures are not very well handled.
+
+    """
+    steps_total = 6
+
+    pnode = self.instance.primary_node
+
+    # Step: check device activation
+    self.lu.LogStep(1, steps_total, "Check device existence")
+    self._CheckDisksExistence([self.instance.primary_node])
+    self._CheckVolumeGroup([self.instance.primary_node])
+
+    # Step: check other node consistency
+    self.lu.LogStep(2, steps_total, "Check peer consistency")
+    self._CheckDisksConsistency(self.instance.primary_node, True, True)
+
+    # Step: create new storage
+    self.lu.LogStep(3, steps_total, "Allocate new storage")
+    disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
+    excl_stor = _IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
+    for idx, dev in enumerate(disks):
+      self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
+                      (self.new_node, idx))
+      # we pass force_create=True to force LVM creation
+      for new_lv in dev.children:
+        _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
+                             True, _GetInstanceInfoText(self.instance), False,
+                             excl_stor)
+
+    # Step 4: dbrd minors and drbd setups changes
+    # after this, we must manually remove the drbd minors on both the
+    # error and the success paths
+    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
+    minors = self.cfg.AllocateDRBDMinor([self.new_node
+                                         for dev in self.instance.disks],
+                                        self.instance.name)
+    logging.debug("Allocated minors %r", minors)
+
+    iv_names = {}
+    for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
+      self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
+                      (self.new_node, idx))
+      # create new devices on new_node; note that we create two IDs:
+      # one without port, so the drbd will be activated without
+      # networking information on the new node at this stage, and one
+      # with network, for the latter activation in step 4
+      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
+      if self.instance.primary_node == o_node1:
+        p_minor = o_minor1
+      else:
+        assert self.instance.primary_node == o_node2, "Three-node instance?"
+        p_minor = o_minor2
+
+      new_alone_id = (self.instance.primary_node, self.new_node, None,
+                      p_minor, new_minor, o_secret)
+      new_net_id = (self.instance.primary_node, self.new_node, o_port,
+                    p_minor, new_minor, o_secret)
+
+      iv_names[idx] = (dev, dev.children, new_net_id)
+      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
+                    new_net_id)
+      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
+                              logical_id=new_alone_id,
+                              children=dev.children,
+                              size=dev.size,
+                              params={})
+      (anno_new_drbd,) = _AnnotateDiskParams(self.instance, [new_drbd],
+                                             self.cfg)
+      try:
+        _CreateSingleBlockDev(self.lu, self.new_node, self.instance,
+                              anno_new_drbd,
+                              _GetInstanceInfoText(self.instance), False,
+                              excl_stor)
+      except errors.GenericError:
+        self.cfg.ReleaseDRBDMinors(self.instance.name)
+        raise
+
+    # We have new devices, shutdown the drbd on the old secondary
+    for idx, dev in enumerate(self.instance.disks):
+      self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
+      self.cfg.SetDiskID(dev, self.target_node)
+      msg = self.rpc.call_blockdev_shutdown(self.target_node,
+                                            (dev, self.instance)).fail_msg
+      if msg:
+        self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
+                           "node: %s" % (idx, msg),
+                           hint=("Please cleanup this device manually as"
+                                 " soon as possible"))
+
+    self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
+    result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
+                                               self.instance.disks)[pnode]
+
+    msg = result.fail_msg
+    if msg:
+      # detaches didn't succeed (unlikely)
+      self.cfg.ReleaseDRBDMinors(self.instance.name)
+      raise errors.OpExecError("Can't detach the disks from the network on"
+                               " old node: %s" % (msg,))
+
+    # if we managed to detach at least one, we update all the disks of
+    # the instance to point to the new secondary
+    self.lu.LogInfo("Updating instance configuration")
+    for dev, _, new_logical_id in iv_names.itervalues():
+      dev.logical_id = new_logical_id
+      self.cfg.SetDiskID(dev, self.instance.primary_node)
+
+    self.cfg.Update(self.instance, feedback_fn)
+
+    # Release all node locks (the configuration has been updated)
+    _ReleaseLocks(self.lu, locking.LEVEL_NODE)
+
+    # and now perform the drbd attach
+    self.lu.LogInfo("Attaching primary drbds to new secondary"
+                    " (standalone => connected)")
+    result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
+                                            self.new_node],
+                                           self.node_secondary_ip,
+                                           (self.instance.disks, self.instance),
+                                           self.instance.name,
+                                           False)
+    for to_node, to_result in result.items():
+      msg = to_result.fail_msg
+      if msg:
+        self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
+                           to_node, msg,
+                           hint=("please do a gnt-instance info to see the"
+                                 " status of disks"))
+
+    cstep = itertools.count(5)
+
+    if self.early_release:
+      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
+      self._RemoveOldStorage(self.target_node, iv_names)
+      # TODO: Check if releasing locks early still makes sense
+      _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
+    else:
+      # Release all resource locks except those used by the instance
+      _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
+                    keep=self.node_secondary_ip.keys())
+
+    # TODO: Can the instance lock be downgraded here? Take the optional disk
+    # shutdown in the caller into consideration.
+
+    # Wait for sync
+    # This can fail as the old devices are degraded and _WaitForSync
+    # does a combined result over all disks, so we don't check its return value
+    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
+    _WaitForSync(self.lu, self.instance)
+
+    # Check all devices manually
+    self._CheckDevices(self.instance.primary_node, iv_names)
+
+    # Step: remove old storage
+    if not self.early_release:
+      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
+      self._RemoveOldStorage(self.target_node, iv_names)
index 8718af3..bb760ff 100644 (file)
@@ -31,8 +31,8 @@ from ganeti import network
 from ganeti import objects
 from ganeti import pathutils
 from ganeti import utils
-
-from ganeti.cmdlib.common import _AnnotateDiskParams
+from ganeti.cmdlib.common import _AnnotateDiskParams, \
+  _ComputeIPolicyInstanceViolation
 
 
 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
@@ -202,45 +202,17 @@ def _CheckNodeNotDrained(lu, node):
                                errors.ECODE_STATE)
 
 
-def _StartInstanceDisks(lu, instance, force):
-  """Start the disks of an instance.
-
-  """
-  disks_ok, _ = _AssembleInstanceDisks(lu, instance,
-                                           ignore_secondaries=force)
-  if not disks_ok:
-    _ShutdownInstanceDisks(lu, instance)
-    if force is not None and not force:
-      lu.LogWarning("",
-                    hint=("If the message above refers to a secondary node,"
-                          " you can retry the operation using '--force'"))
-    raise errors.OpExecError("Disk consistency error")
-
-
-def _ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
-  """Shutdown block devices of an instance.
+def _CheckNodeVmCapable(lu, node):
+  """Ensure that a given node is vm capable.
 
-  This does the shutdown on all nodes of the instance.
-
-  If the ignore_primary is false, errors on the primary node are
-  ignored.
+  @param lu: the LU on behalf of which we make the check
+  @param node: the node to check
+  @raise errors.OpPrereqError: if the node is not vm capable
 
   """
-  all_result = True
-  disks = _ExpandCheckDisks(instance, disks)
-
-  for disk in disks:
-    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
-      lu.cfg.SetDiskID(top_disk, node)
-      result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
-      msg = result.fail_msg
-      if msg:
-        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
-                      disk.iv_name, node, msg)
-        if ((node == instance.primary_node and not ignore_primary) or
-            (node != instance.primary_node and not result.offline)):
-          all_result = False
-  return all_result
+  if not lu.cfg.GetNodeInfo(node).vm_capable:
+    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
+                               errors.ECODE_STATE)
 
 
 def _RemoveInstance(lu, feedback_fn, instance, ignore_failures):
@@ -265,98 +237,6 @@ def _RemoveInstance(lu, feedback_fn, instance, ignore_failures):
   lu.remove_locks[locking.LEVEL_INSTANCE] = instance.name
 
 
-def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
-                           ignore_size=False):
-  """Prepare the block devices for an instance.
-
-  This sets up the block devices on all nodes.
-
-  @type lu: L{LogicalUnit}
-  @param lu: the logical unit on whose behalf we execute
-  @type instance: L{objects.Instance}
-  @param instance: the instance for whose disks we assemble
-  @type disks: list of L{objects.Disk} or None
-  @param disks: which disks to assemble (or all, if None)
-  @type ignore_secondaries: boolean
-  @param ignore_secondaries: if true, errors on secondary nodes
-      won't result in an error return from the function
-  @type ignore_size: boolean
-  @param ignore_size: if true, the current known size of the disk
-      will not be used during the disk activation, useful for cases
-      when the size is wrong
-  @return: False if the operation failed, otherwise a list of
-      (host, instance_visible_name, node_visible_name)
-      with the mapping from node devices to instance devices
-
-  """
-  device_info = []
-  disks_ok = True
-  iname = instance.name
-  disks = _ExpandCheckDisks(instance, disks)
-
-  # With the two passes mechanism we try to reduce the window of
-  # opportunity for the race condition of switching DRBD to primary
-  # before handshaking occured, but we do not eliminate it
-
-  # The proper fix would be to wait (with some limits) until the
-  # connection has been made and drbd transitions from WFConnection
-  # into any other network-connected state (Connected, SyncTarget,
-  # SyncSource, etc.)
-
-  # 1st pass, assemble on all nodes in secondary mode
-  for idx, inst_disk in enumerate(disks):
-    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
-      if ignore_size:
-        node_disk = node_disk.Copy()
-        node_disk.UnsetSize()
-      lu.cfg.SetDiskID(node_disk, node)
-      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
-                                             False, idx)
-      msg = result.fail_msg
-      if msg:
-        is_offline_secondary = (node in instance.secondary_nodes and
-                                result.offline)
-        lu.LogWarning("Could not prepare block device %s on node %s"
-                      " (is_primary=False, pass=1): %s",
-                      inst_disk.iv_name, node, msg)
-        if not (ignore_secondaries or is_offline_secondary):
-          disks_ok = False
-
-  # FIXME: race condition on drbd migration to primary
-
-  # 2nd pass, do only the primary node
-  for idx, inst_disk in enumerate(disks):
-    dev_path = None
-
-    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
-      if node != instance.primary_node:
-        continue
-      if ignore_size:
-        node_disk = node_disk.Copy()
-        node_disk.UnsetSize()
-      lu.cfg.SetDiskID(node_disk, node)
-      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
-                                             True, idx)
-      msg = result.fail_msg
-      if msg:
-        lu.LogWarning("Could not prepare block device %s on node %s"
-                      " (is_primary=True, pass=2): %s",
-                      inst_disk.iv_name, node, msg)
-        disks_ok = False
-      else:
-        dev_path = result.payload
-
-    device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
-
-  # leave the disks configured for the primary node
-  # this is a workaround that would be fixed better by
-  # improving the logical/physical id handling
-  for disk in disks:
-    lu.cfg.SetDiskID(disk, instance.primary_node)
-
-  return disks_ok, device_info
-
-
 def _RemoveDisks(lu, instance, target_node=None, ignore_failures=False):
   """Remove all disks for an instance.
 
@@ -416,24 +296,6 @@ def _RemoveDisks(lu, instance, target_node=None, ignore_failures=False):
   return all_result
 
 
-def _ExpandCheckDisks(instance, disks):
-  """Return the instance disks selected by the disks list
-
-  @type disks: list of L{objects.Disk} or None
-  @param disks: selected disks
-  @rtype: list of L{objects.Disk}
-  @return: selected instance disks to act on
-
-  """
-  if disks is None:
-    return instance.disks
-  else:
-    if not set(disks).issubset(instance.disks):
-      raise errors.ProgrammerError("Can only act on disks belonging to the"
-                                   " target instance")
-    return disks
-
-
 def _NICToTuple(lu, nic):
   """Build a tupple of nic information.
 
@@ -470,3 +332,119 @@ def _NICListToTuple(lu, nics):
   for nic in nics:
     hooks_nics.append(_NICToTuple(lu, nic))
   return hooks_nics
+
+
+def _CopyLockList(names):
+  """Makes a copy of a list of lock names.
+
+  Handles L{locking.ALL_SET} correctly.
+
+  """
+  if names == locking.ALL_SET:
+    return locking.ALL_SET
+  else:
+    return names[:]
+
+
+def _ReleaseLocks(lu, level, names=None, keep=None):
+  """Releases locks owned by an LU.
+
+  @type lu: L{LogicalUnit}
+  @param level: Lock level
+  @type names: list or None
+  @param names: Names of locks to release
+  @type keep: list or None
+  @param keep: Names of locks to retain
+
+  """
+  assert not (keep is not None and names is not None), \
+         "Only one of the 'names' and the 'keep' parameters can be given"
+
+  if names is not None:
+    should_release = names.__contains__
+  elif keep:
+    should_release = lambda name: name not in keep
+  else:
+    should_release = None
+
+  owned = lu.owned_locks(level)
+  if not owned:
+    # Not owning any lock at this level, do nothing
+    pass
+
+  elif should_release:
+    retain = []
+    release = []
+
+    # Determine which locks to release
+    for name in owned:
+      if should_release(name):
+        release.append(name)
+      else:
+        retain.append(name)
+
+    assert len(lu.owned_locks(level)) == (len(retain) + len(release))
+
+    # Release just some locks
+    lu.glm.release(level, names=release)
+
+    assert frozenset(lu.owned_locks(level)) == frozenset(retain)
+  else:
+    # Release everything
+    lu.glm.release(level)
+
+    assert not lu.glm.is_owned(level), "No locks should be owned"
+
+
+def _ComputeIPolicyNodeViolation(ipolicy, instance, current_group,
+                                 target_group, cfg,
+                                 _compute_fn=_ComputeIPolicyInstanceViolation):
+  """Compute if instance meets the specs of the new target group.
+
+  @param ipolicy: The ipolicy to verify
+  @param instance: The instance object to verify
+  @param current_group: The current group of the instance
+  @param target_group: The new group of the instance
+  @type cfg: L{config.ConfigWriter}
+  @param cfg: Cluster configuration
+  @param _compute_fn: The function to verify ipolicy (unittest only)
+  @see: L{ganeti.cmdlib.common._ComputeIPolicySpecViolation}
+
+  """
+  if current_group == target_group:
+    return []
+  else:
+    return _compute_fn(ipolicy, instance, cfg)
+
+
+def _CheckTargetNodeIPolicy(lu, ipolicy, instance, node, cfg, ignore=False,
+                            _compute_fn=_ComputeIPolicyNodeViolation):
+  """Checks that the target node is correct in terms of instance policy.
+
+  @param ipolicy: The ipolicy to verify
+  @param instance: The instance object to verify
+  @param node: The new node to relocate
+  @type cfg: L{config.ConfigWriter}
+  @param cfg: Cluster configuration
+  @param ignore: Ignore violations of the ipolicy
+  @param _compute_fn: The function to verify ipolicy (unittest only)
+  @see: L{ganeti.cmdlib.common._ComputeIPolicySpecViolation}
+
+  """
+  primary_node = lu.cfg.GetNodeInfo(instance.primary_node)
+  res = _compute_fn(ipolicy, instance, primary_node.group, node.group, cfg)
+
+  if res:
+    msg = ("Instance does not meet target node group's (%s) instance"
+           " policy: %s") % (node.group, utils.CommaJoin(res))
+    if ignore:
+      lu.LogWarning(msg)
+    else:
+      raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
+
+
+def _GetInstanceInfoText(instance):
+  """Compute that text that should be added to the disk's metadata.
+
+  """
+  return "originstname+%s" % instance.name
index e888d72..bb73ed5 100755 (executable)
@@ -36,6 +36,8 @@ from ganeti import cmdlib
 from ganeti.cmdlib import cluster
 from ganeti.cmdlib import group
 from ganeti.cmdlib import instance
+from ganeti.cmdlib import instance_storage
+from ganeti.cmdlib import instance_utils
 from ganeti.cmdlib import common
 from ganeti.cmdlib import query
 from ganeti import opcodes
@@ -888,18 +890,20 @@ class TestComputeIPolicyNodeViolation(unittest.TestCase):
     self.recorder = _CallRecorder(return_value=[])
 
   def testSameGroup(self):
-    ret = instance._ComputeIPolicyNodeViolation(NotImplemented,
-                                                NotImplemented,
-                                                "foo", "foo", NotImplemented,
-                                                _compute_fn=self.recorder)
+    ret = instance_utils._ComputeIPolicyNodeViolation(
+      NotImplemented,
+      NotImplemented,
+      "foo", "foo", NotImplemented,
+      _compute_fn=self.recorder)
     self.assertFalse(self.recorder.called)
     self.assertEqual(ret, [])
 
   def testDifferentGroup(self):
-    ret = instance._ComputeIPolicyNodeViolation(NotImplemented,
-                                                NotImplemented,
-                                                "foo", "bar", NotImplemented,
-                                                _compute_fn=self.recorder)
+    ret = instance_utils._ComputeIPolicyNodeViolation(
+      NotImplemented,
+      NotImplemented,
+      "foo", "bar", NotImplemented,
+      _compute_fn=self.recorder)
     self.assertTrue(self.recorder.called)
     self.assertEqual(ret, [])
 
@@ -1605,7 +1609,7 @@ class TestDiskSizeInBytesToMebibytes(unittest.TestCase):
   def testLessThanOneMebibyte(self):
     for i in [1, 2, 7, 512, 1000, 1023]:
       lu = _FakeLU()
-      result = instance._DiskSizeInBytesToMebibytes(lu, i)
+      result = instance_storage._DiskSizeInBytesToMebibytes(lu, i)
       self.assertEqual(result, 1)
       self.assertEqual(len(lu.warning_log), 1)
       self.assertEqual(len(lu.warning_log[0]), 2)
@@ -1615,7 +1619,8 @@ class TestDiskSizeInBytesToMebibytes(unittest.TestCase):
   def testEven(self):
     for i in [1, 2, 7, 512, 1000, 1023]:
       lu = _FakeLU()
-      result = instance._DiskSizeInBytesToMebibytes(lu, i * 1024 * 1024)
+      result = instance_storage._DiskSizeInBytesToMebibytes(lu,
+                                                            i * 1024 * 1024)
       self.assertEqual(result, i)
       self.assertFalse(lu.warning_log)
 
@@ -1624,7 +1629,7 @@ class TestDiskSizeInBytesToMebibytes(unittest.TestCase):
       for j in [1, 2, 486, 326, 986, 1023]:
         lu = _FakeLU()
         size = (1024 * 1024 * i) + j
-        result = instance._DiskSizeInBytesToMebibytes(lu, size)
+        result = instance_storage._DiskSizeInBytesToMebibytes(lu, size)
         self.assertEqual(result, i + 1, msg="Amount was not rounded up")
         self.assertEqual(len(lu.warning_log), 1)
         self.assertEqual(len(lu.warning_log[0]), 2)