lib/cmdlib/group.py \
lib/cmdlib/node.py \
lib/cmdlib/instance.py \
+ lib/cmdlib/instance_storage.py \
lib/cmdlib/instance_utils.py \
lib/cmdlib/backup.py \
lib/cmdlib/query.py \
LUInstanceMigrate, \
LUInstanceMultiAlloc, \
LUInstanceSetParams, \
- LUInstanceChangeGroup, \
+ LUInstanceChangeGroup
+from ganeti.cmdlib.instance_storage import \
LUInstanceRecreateDisks, \
LUInstanceGrowDisk, \
LUInstanceReplaceDisks, \
from ganeti.cmdlib.base import _QueryBase, NoHooksLU, LogicalUnit
from ganeti.cmdlib.common import _GetWantedNodes, _ShareAll, \
_CheckNodeOnline, _ExpandNodeName
+from ganeti.cmdlib.instance_storage import _StartInstanceDisks, \
+ _ShutdownInstanceDisks
from ganeti.cmdlib.instance_utils import _GetClusterDomainSecret, \
- _BuildInstanceHookEnvByObject, _CheckNodeNotDrained, _StartInstanceDisks, \
- _ShutdownInstanceDisks, _RemoveInstance
+ _BuildInstanceHookEnvByObject, _CheckNodeNotDrained, _RemoveInstance
class _ExportQuery(_QueryBase):
_LoadNodeEvacResult, _CheckIAllocatorOrNode, _CheckParamsNotGlobal, \
_IsExclusiveStorageEnabledNode, _CheckHVParams, _CheckOSParams, \
_GetWantedInstances, _CheckInstancesNodeGroups, _AnnotateDiskParams, \
- _GetUpdatedParams, _ExpandInstanceName, _FindFaultyInstanceDisks, \
- _ComputeIPolicySpecViolation, _ComputeIPolicyInstanceViolation, \
+ _GetUpdatedParams, _ExpandInstanceName, _ComputeIPolicySpecViolation, \
_CheckInstanceState, _ExpandNodeName
-from ganeti.cmdlib.instance_utils import _AssembleInstanceDisks, \
- _BuildInstanceHookEnvByObject, _GetClusterDomainSecret, \
- _BuildInstanceHookEnv, _NICListToTuple, _NICToTuple, _CheckNodeNotDrained, \
- _RemoveDisks, _StartInstanceDisks, _ShutdownInstanceDisks, \
- _RemoveInstance, _ExpandCheckDisks
+from ganeti.cmdlib.instance_storage import _CreateDisks, \
+ _CheckNodesFreeDiskPerVG, _WipeDisks, _WaitForSync, _CheckDiskConsistency, \
+ _IsExclusiveStorageEnabledNodeName, _CreateSingleBlockDev, _ComputeDisks, \
+ _CheckRADOSFreeSpace, _ComputeDiskSizePerVG, _GenerateDiskTemplate, \
+ _CreateBlockDev, _StartInstanceDisks, _ShutdownInstanceDisks, \
+ _AssembleInstanceDisks, _ExpandCheckDisks
+from ganeti.cmdlib.instance_utils import _BuildInstanceHookEnvByObject, \
+ _GetClusterDomainSecret, _BuildInstanceHookEnv, _NICListToTuple, \
+ _NICToTuple, _CheckNodeNotDrained, _RemoveInstance, _CopyLockList, \
+ _ReleaseLocks, _CheckNodeVmCapable, _CheckTargetNodeIPolicy, \
+ _GetInstanceInfoText, _RemoveDisks
import ganeti.masterd.instance
-_DISK_TEMPLATE_NAME_PREFIX = {
- constants.DT_PLAIN: "",
- constants.DT_RBD: ".rbd",
- constants.DT_EXT: ".ext",
- }
-
-
-_DISK_TEMPLATE_DEVICE_TYPE = {
- constants.DT_PLAIN: constants.LD_LV,
- constants.DT_FILE: constants.LD_FILE,
- constants.DT_SHARED_FILE: constants.LD_FILE,
- constants.DT_BLOCK: constants.LD_BLOCKDEV,
- constants.DT_RBD: constants.LD_RBD,
- constants.DT_EXT: constants.LD_EXT,
- }
-
-
#: Type description for changes as returned by L{ApplyContainerMods}'s
#: callbacks
_TApplyContModsCbChanges = \
])))
-def _CopyLockList(names):
- """Makes a copy of a list of lock names.
-
- Handles L{locking.ALL_SET} correctly.
-
- """
- if names == locking.ALL_SET:
- return locking.ALL_SET
- else:
- return names[:]
-
-
-def _ReleaseLocks(lu, level, names=None, keep=None):
- """Releases locks owned by an LU.
-
- @type lu: L{LogicalUnit}
- @param level: Lock level
- @type names: list or None
- @param names: Names of locks to release
- @type keep: list or None
- @param keep: Names of locks to retain
-
- """
- assert not (keep is not None and names is not None), \
- "Only one of the 'names' and the 'keep' parameters can be given"
-
- if names is not None:
- should_release = names.__contains__
- elif keep:
- should_release = lambda name: name not in keep
- else:
- should_release = None
-
- owned = lu.owned_locks(level)
- if not owned:
- # Not owning any lock at this level, do nothing
- pass
-
- elif should_release:
- retain = []
- release = []
-
- # Determine which locks to release
- for name in owned:
- if should_release(name):
- release.append(name)
- else:
- retain.append(name)
-
- assert len(lu.owned_locks(level)) == (len(retain) + len(release))
-
- # Release just some locks
- lu.glm.release(level, names=release)
-
- assert frozenset(lu.owned_locks(level)) == frozenset(retain)
- else:
- # Release everything
- lu.glm.release(level)
-
- assert not lu.glm.is_owned(level), "No locks should be owned"
-
-
def _CheckHostnameSane(lu, name):
"""Ensures that a given hostname resolves to a 'sane' name.
return (None, None)
-def _CheckRADOSFreeSpace():
- """Compute disk size requirements inside the RADOS cluster.
-
- """
- # For the RADOS cluster we assume there is always enough space.
- pass
-
-
-def _WaitForSync(lu, instance, disks=None, oneshot=False):
- """Sleep and poll for an instance's disk to sync.
-
- """
- if not instance.disks or disks is not None and not disks:
- return True
-
- disks = _ExpandCheckDisks(instance, disks)
-
- if not oneshot:
- lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
-
- node = instance.primary_node
-
- for dev in disks:
- lu.cfg.SetDiskID(dev, node)
-
- # TODO: Convert to utils.Retry
-
- retries = 0
- degr_retries = 10 # in seconds, as we sleep 1 second each time
- while True:
- max_time = 0
- done = True
- cumul_degraded = False
- rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
- msg = rstats.fail_msg
- if msg:
- lu.LogWarning("Can't get any data from node %s: %s", node, msg)
- retries += 1
- if retries >= 10:
- raise errors.RemoteError("Can't contact node %s for mirror data,"
- " aborting." % node)
- time.sleep(6)
- continue
- rstats = rstats.payload
- retries = 0
- for i, mstat in enumerate(rstats):
- if mstat is None:
- lu.LogWarning("Can't compute data for node %s/%s",
- node, disks[i].iv_name)
- continue
-
- cumul_degraded = (cumul_degraded or
- (mstat.is_degraded and mstat.sync_percent is None))
- if mstat.sync_percent is not None:
- done = False
- if mstat.estimated_time is not None:
- rem_time = ("%s remaining (estimated)" %
- utils.FormatSeconds(mstat.estimated_time))
- max_time = mstat.estimated_time
- else:
- rem_time = "no time estimate"
- lu.LogInfo("- device %s: %5.2f%% done, %s",
- disks[i].iv_name, mstat.sync_percent, rem_time)
-
- # if we're done but degraded, let's do a few small retries, to
- # make sure we see a stable and not transient situation; therefore
- # we force restart of the loop
- if (done or oneshot) and cumul_degraded and degr_retries > 0:
- logging.info("Degraded disks found, %d retries left", degr_retries)
- degr_retries -= 1
- time.sleep(1)
- continue
-
- if done or oneshot:
- break
-
- time.sleep(min(60, max_time))
-
- if done:
- lu.LogInfo("Instance %s's disks are in sync", instance.name)
-
- return not cumul_degraded
-
-
-def _ComputeDisks(op, default_vg):
- """Computes the instance disks.
-
- @param op: The instance opcode
- @param default_vg: The default_vg to assume
-
- @return: The computed disks
-
- """
- disks = []
- for disk in op.disks:
- mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
- if mode not in constants.DISK_ACCESS_SET:
- raise errors.OpPrereqError("Invalid disk access mode '%s'" %
- mode, errors.ECODE_INVAL)
- size = disk.get(constants.IDISK_SIZE, None)
- if size is None:
- raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
- try:
- size = int(size)
- except (TypeError, ValueError):
- raise errors.OpPrereqError("Invalid disk size '%s'" % size,
- errors.ECODE_INVAL)
-
- ext_provider = disk.get(constants.IDISK_PROVIDER, None)
- if ext_provider and op.disk_template != constants.DT_EXT:
- raise errors.OpPrereqError("The '%s' option is only valid for the %s"
- " disk template, not %s" %
- (constants.IDISK_PROVIDER, constants.DT_EXT,
- op.disk_template), errors.ECODE_INVAL)
-
- data_vg = disk.get(constants.IDISK_VG, default_vg)
- name = disk.get(constants.IDISK_NAME, None)
- if name is not None and name.lower() == constants.VALUE_NONE:
- name = None
- new_disk = {
- constants.IDISK_SIZE: size,
- constants.IDISK_MODE: mode,
- constants.IDISK_VG: data_vg,
- constants.IDISK_NAME: name,
- }
-
- if constants.IDISK_METAVG in disk:
- new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG]
- if constants.IDISK_ADOPT in disk:
- new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
-
- # For extstorage, demand the `provider' option and add any
- # additional parameters (ext-params) to the dict
- if op.disk_template == constants.DT_EXT:
- if ext_provider:
- new_disk[constants.IDISK_PROVIDER] = ext_provider
- for key in disk:
- if key not in constants.IDISK_PARAMS:
- new_disk[key] = disk[key]
- else:
- raise errors.OpPrereqError("Missing provider for template '%s'" %
- constants.DT_EXT, errors.ECODE_INVAL)
-
- disks.append(new_disk)
-
- return disks
-
-
-def _ComputeDiskSizePerVG(disk_template, disks):
- """Compute disk size requirements in the volume group
-
- """
- def _compute(disks, payload):
- """Universal algorithm.
-
- """
- vgs = {}
- for disk in disks:
- vgs[disk[constants.IDISK_VG]] = \
- vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
-
- return vgs
-
- # Required free disk space as a function of disk and swap space
- req_size_dict = {
- constants.DT_DISKLESS: {},
- constants.DT_PLAIN: _compute(disks, 0),
- # 128 MB are added for drbd metadata for each disk
- constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
- constants.DT_FILE: {},
- constants.DT_SHARED_FILE: {},
- }
-
- if disk_template not in req_size_dict:
- raise errors.ProgrammerError("Disk template '%s' size requirement"
- " is unknown" % disk_template)
-
- return req_size_dict[disk_template]
-
-
-def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
- """Checks if nodes have enough free disk space in the specified VG.
-
- This function checks if all given nodes have the needed amount of
- free disk. In case any node has less disk or we cannot get the
- information from the node, this function raises an OpPrereqError
- exception.
-
- @type lu: C{LogicalUnit}
- @param lu: a logical unit from which we get configuration data
- @type nodenames: C{list}
- @param nodenames: the list of node names to check
- @type vg: C{str}
- @param vg: the volume group to check
- @type requested: C{int}
- @param requested: the amount of disk in MiB to check for
- @raise errors.OpPrereqError: if the node doesn't have enough disk,
- or we cannot check the node
-
- """
- es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
- nodeinfo = lu.rpc.call_node_info(nodenames, [vg], None, es_flags)
- for node in nodenames:
- info = nodeinfo[node]
- info.Raise("Cannot get current information from node %s" % node,
- prereq=True, ecode=errors.ECODE_ENVIRON)
- (_, (vg_info, ), _) = info.payload
- vg_free = vg_info.get("vg_free", None)
- if not isinstance(vg_free, int):
- raise errors.OpPrereqError("Can't compute free disk space on node"
- " %s for vg %s, result was '%s'" %
- (node, vg, vg_free), errors.ECODE_ENVIRON)
- if requested > vg_free:
- raise errors.OpPrereqError("Not enough disk space on target node %s"
- " vg %s: required %d MiB, available %d MiB" %
- (node, vg, requested, vg_free),
- errors.ECODE_NORES)
-
-
-def _CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
- """Checks if nodes have enough free disk space in all the VGs.
-
- This function checks if all given nodes have the needed amount of
- free disk. In case any node has less disk or we cannot get the
- information from the node, this function raises an OpPrereqError
- exception.
-
- @type lu: C{LogicalUnit}
- @param lu: a logical unit from which we get configuration data
- @type nodenames: C{list}
- @param nodenames: the list of node names to check
- @type req_sizes: C{dict}
- @param req_sizes: the hash of vg and corresponding amount of disk in
- MiB to check for
- @raise errors.OpPrereqError: if the node doesn't have enough disk,
- or we cannot check the node
-
- """
- for vg, req_size in req_sizes.items():
- _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
-
-
-def _CheckNodeVmCapable(lu, node):
- """Ensure that a given node is vm capable.
-
- @param lu: the LU on behalf of which we make the check
- @param node: the node to check
- @raise errors.OpPrereqError: if the node is not vm capable
-
- """
- if not lu.cfg.GetNodeInfo(node).vm_capable:
- raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
- errors.ECODE_STATE)
-
-
def _ComputeIPolicyInstanceSpecViolation(
ipolicy, instance_spec, disk_template,
_compute_fn=_ComputeIPolicySpecViolation):
return free_mem
-def _GenerateUniqueNames(lu, exts):
- """Generate a suitable LV name.
-
- This will generate a logical volume name for the given instance.
-
- """
- results = []
- for val in exts:
- new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
- results.append("%s%s" % (new_id, val))
- return results
-
-
-def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
- iv_name, p_minor, s_minor):
- """Generate a drbd8 device complete with its children.
-
- """
- assert len(vgnames) == len(names) == 2
- port = lu.cfg.AllocatePort()
- shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
-
- dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
- logical_id=(vgnames[0], names[0]),
- params={})
- dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
- dev_meta = objects.Disk(dev_type=constants.LD_LV,
- size=constants.DRBD_META_SIZE,
- logical_id=(vgnames[1], names[1]),
- params={})
- dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
- drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
- logical_id=(primary, secondary, port,
- p_minor, s_minor,
- shared_secret),
- children=[dev_data, dev_meta],
- iv_name=iv_name, params={})
- drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
- return drbd_dev
-
-
-def _GenerateDiskTemplate(
- lu, template_name, instance_name, primary_node, secondary_nodes,
- disk_info, file_storage_dir, file_driver, base_index,
- feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
- _req_shr_file_storage=opcodes.RequireSharedFileStorage):
- """Generate the entire disk layout for a given template type.
-
- """
- vgname = lu.cfg.GetVGName()
- disk_count = len(disk_info)
- disks = []
-
- if template_name == constants.DT_DISKLESS:
- pass
- elif template_name == constants.DT_DRBD8:
- if len(secondary_nodes) != 1:
- raise errors.ProgrammerError("Wrong template configuration")
- remote_node = secondary_nodes[0]
- minors = lu.cfg.AllocateDRBDMinor(
- [primary_node, remote_node] * len(disk_info), instance_name)
-
- (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
- full_disk_params)
- drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
-
- names = []
- for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
- for i in range(disk_count)]):
- names.append(lv_prefix + "_data")
- names.append(lv_prefix + "_meta")
- for idx, disk in enumerate(disk_info):
- disk_index = idx + base_index
- data_vg = disk.get(constants.IDISK_VG, vgname)
- meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
- disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
- disk[constants.IDISK_SIZE],
- [data_vg, meta_vg],
- names[idx * 2:idx * 2 + 2],
- "disk/%d" % disk_index,
- minors[idx * 2], minors[idx * 2 + 1])
- disk_dev.mode = disk[constants.IDISK_MODE]
- disk_dev.name = disk.get(constants.IDISK_NAME, None)
- disks.append(disk_dev)
- else:
- if secondary_nodes:
- raise errors.ProgrammerError("Wrong template configuration")
-
- if template_name == constants.DT_FILE:
- _req_file_storage()
- elif template_name == constants.DT_SHARED_FILE:
- _req_shr_file_storage()
-
- name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
- if name_prefix is None:
- names = None
- else:
- names = _GenerateUniqueNames(lu, ["%s.disk%s" %
- (name_prefix, base_index + i)
- for i in range(disk_count)])
-
- if template_name == constants.DT_PLAIN:
-
- def logical_id_fn(idx, _, disk):
- vg = disk.get(constants.IDISK_VG, vgname)
- return (vg, names[idx])
-
- elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
- logical_id_fn = \
- lambda _, disk_index, disk: (file_driver,
- "%s/disk%d" % (file_storage_dir,
- disk_index))
- elif template_name == constants.DT_BLOCK:
- logical_id_fn = \
- lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
- disk[constants.IDISK_ADOPT])
- elif template_name == constants.DT_RBD:
- logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
- elif template_name == constants.DT_EXT:
- def logical_id_fn(idx, _, disk):
- provider = disk.get(constants.IDISK_PROVIDER, None)
- if provider is None:
- raise errors.ProgrammerError("Disk template is %s, but '%s' is"
- " not found", constants.DT_EXT,
- constants.IDISK_PROVIDER)
- return (provider, names[idx])
- else:
- raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
-
- dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
-
- for idx, disk in enumerate(disk_info):
- params = {}
- # Only for the Ext template add disk_info to params
- if template_name == constants.DT_EXT:
- params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
- for key in disk:
- if key not in constants.IDISK_PARAMS:
- params[key] = disk[key]
- disk_index = idx + base_index
- size = disk[constants.IDISK_SIZE]
- feedback_fn("* disk %s, size %s" %
- (disk_index, utils.FormatUnit(size, "h")))
- disk_dev = objects.Disk(dev_type=dev_type, size=size,
- logical_id=logical_id_fn(idx, disk_index, disk),
- iv_name="disk/%d" % disk_index,
- mode=disk[constants.IDISK_MODE],
- params=params)
- disk_dev.name = disk.get(constants.IDISK_NAME, None)
- disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
- disks.append(disk_dev)
-
- return disks
-
-
-def _CreateSingleBlockDev(lu, node, instance, device, info, force_open,
- excl_stor):
- """Create a single block device on a given node.
-
- This will not recurse over children of the device, so they must be
- created in advance.
-
- @param lu: the lu on whose behalf we execute
- @param node: the node on which to create the device
- @type instance: L{objects.Instance}
- @param instance: the instance which owns the device
- @type device: L{objects.Disk}
- @param device: the device to create
- @param info: the extra 'metadata' we should attach to the device
- (this will be represented as a LVM tag)
- @type force_open: boolean
- @param force_open: this parameter will be passes to the
- L{backend.BlockdevCreate} function where it specifies
- whether we run on primary or not, and it affects both
- the child assembly and the device own Open() execution
- @type excl_stor: boolean
- @param excl_stor: Whether exclusive_storage is active for the node
-
- """
- lu.cfg.SetDiskID(device, node)
- result = lu.rpc.call_blockdev_create(node, device, device.size,
- instance.name, force_open, info,
- excl_stor)
- result.Raise("Can't create block device %s on"
- " node %s for instance %s" % (device, node, instance.name))
- if device.physical_id is None:
- device.physical_id = result.payload
-
-
-def _CreateBlockDevInner(lu, node, instance, device, force_create,
- info, force_open, excl_stor):
- """Create a tree of block devices on a given node.
-
- If this device type has to be created on secondaries, create it and
- all its children.
-
- If not, just recurse to children keeping the same 'force' value.
-
- @attention: The device has to be annotated already.
-
- @param lu: the lu on whose behalf we execute
- @param node: the node on which to create the device
- @type instance: L{objects.Instance}
- @param instance: the instance which owns the device
- @type device: L{objects.Disk}
- @param device: the device to create
- @type force_create: boolean
- @param force_create: whether to force creation of this device; this
- will be change to True whenever we find a device which has
- CreateOnSecondary() attribute
- @param info: the extra 'metadata' we should attach to the device
- (this will be represented as a LVM tag)
- @type force_open: boolean
- @param force_open: this parameter will be passes to the
- L{backend.BlockdevCreate} function where it specifies
- whether we run on primary or not, and it affects both
- the child assembly and the device own Open() execution
- @type excl_stor: boolean
- @param excl_stor: Whether exclusive_storage is active for the node
-
- @return: list of created devices
- """
- created_devices = []
- try:
- if device.CreateOnSecondary():
- force_create = True
-
- if device.children:
- for child in device.children:
- devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
- info, force_open, excl_stor)
- created_devices.extend(devs)
-
- if not force_create:
- return created_devices
-
- _CreateSingleBlockDev(lu, node, instance, device, info, force_open,
- excl_stor)
- # The device has been completely created, so there is no point in keeping
- # its subdevices in the list. We just add the device itself instead.
- created_devices = [(node, device)]
- return created_devices
-
- except errors.DeviceCreationError, e:
- e.created_devices.extend(created_devices)
- raise e
- except errors.OpExecError, e:
- raise errors.DeviceCreationError(str(e), created_devices)
-
-
-def _IsExclusiveStorageEnabledNodeName(cfg, nodename):
- """Whether exclusive_storage is in effect for the given node.
-
- @type cfg: L{config.ConfigWriter}
- @param cfg: The cluster configuration
- @type nodename: string
- @param nodename: The node
- @rtype: bool
- @return: The effective value of exclusive_storage
- @raise errors.OpPrereqError: if no node exists with the given name
-
- """
- ni = cfg.GetNodeInfo(nodename)
- if ni is None:
- raise errors.OpPrereqError("Invalid node name %s" % nodename,
- errors.ECODE_NOENT)
- return _IsExclusiveStorageEnabledNode(cfg, ni)
-
-
-def _CreateBlockDev(lu, node, instance, device, force_create, info,
- force_open):
- """Wrapper around L{_CreateBlockDevInner}.
-
- This method annotates the root device first.
-
- """
- (disk,) = _AnnotateDiskParams(instance, [device], lu.cfg)
- excl_stor = _IsExclusiveStorageEnabledNodeName(lu.cfg, node)
- return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
- force_open, excl_stor)
-
-
-def _CreateDisks(lu, instance, to_skip=None, target_node=None):
- """Create all disks for an instance.
-
- This abstracts away some work from AddInstance.
-
- @type lu: L{LogicalUnit}
- @param lu: the logical unit on whose behalf we execute
- @type instance: L{objects.Instance}
- @param instance: the instance whose disks we should create
- @type to_skip: list
- @param to_skip: list of indices to skip
- @type target_node: string
- @param target_node: if passed, overrides the target node for creation
- @rtype: boolean
- @return: the success of the creation
-
- """
- info = _GetInstanceInfoText(instance)
- if target_node is None:
- pnode = instance.primary_node
- all_nodes = instance.all_nodes
- else:
- pnode = target_node
- all_nodes = [pnode]
-
- if instance.disk_template in constants.DTS_FILEBASED:
- file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
- result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
-
- result.Raise("Failed to create directory '%s' on"
- " node %s" % (file_storage_dir, pnode))
-
- disks_created = []
- # Note: this needs to be kept in sync with adding of disks in
- # LUInstanceSetParams
- for idx, device in enumerate(instance.disks):
- if to_skip and idx in to_skip:
- continue
- logging.info("Creating disk %s for instance '%s'", idx, instance.name)
- #HARDCODE
- for node in all_nodes:
- f_create = node == pnode
- try:
- _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
- disks_created.append((node, device))
- except errors.OpExecError:
- logging.warning("Creating disk %s for instance '%s' failed",
- idx, instance.name)
- except errors.DeviceCreationError, e:
- logging.warning("Creating disk %s for instance '%s' failed",
- idx, instance.name)
- disks_created.extend(e.created_devices)
- for (node, disk) in disks_created:
- lu.cfg.SetDiskID(disk, node)
- result = lu.rpc.call_blockdev_remove(node, disk)
- if result.fail_msg:
- logging.warning("Failed to remove newly-created disk %s on node %s:"
- " %s", device, node, result.fail_msg)
- raise errors.OpExecError(e.message)
-
-
-def _CalcEta(time_taken, written, total_size):
- """Calculates the ETA based on size written and total size.
-
- @param time_taken: The time taken so far
- @param written: amount written so far
- @param total_size: The total size of data to be written
- @return: The remaining time in seconds
-
- """
- avg_time = time_taken / float(written)
- return (total_size - written) * avg_time
-
-
-def _WipeDisks(lu, instance, disks=None):
- """Wipes instance disks.
-
- @type lu: L{LogicalUnit}
- @param lu: the logical unit on whose behalf we execute
- @type instance: L{objects.Instance}
- @param instance: the instance whose disks we should create
- @type disks: None or list of tuple of (number, L{objects.Disk}, number)
- @param disks: Disk details; tuple contains disk index, disk object and the
- start offset
-
- """
- node = instance.primary_node
-
- if disks is None:
- disks = [(idx, disk, 0)
- for (idx, disk) in enumerate(instance.disks)]
-
- for (_, device, _) in disks:
- lu.cfg.SetDiskID(device, node)
-
- logging.info("Pausing synchronization of disks of instance '%s'",
- instance.name)
- result = lu.rpc.call_blockdev_pause_resume_sync(node,
- (map(compat.snd, disks),
- instance),
- True)
- result.Raise("Failed to pause disk synchronization on node '%s'" % node)
-
- for idx, success in enumerate(result.payload):
- if not success:
- logging.warn("Pausing synchronization of disk %s of instance '%s'"
- " failed", idx, instance.name)
-
- try:
- for (idx, device, offset) in disks:
- # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
- # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
- wipe_chunk_size = \
- int(min(constants.MAX_WIPE_CHUNK,
- device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
-
- size = device.size
- last_output = 0
- start_time = time.time()
-
- if offset == 0:
- info_text = ""
- else:
- info_text = (" (from %s to %s)" %
- (utils.FormatUnit(offset, "h"),
- utils.FormatUnit(size, "h")))
-
- lu.LogInfo("* Wiping disk %s%s", idx, info_text)
-
- logging.info("Wiping disk %d for instance %s on node %s using"
- " chunk size %s", idx, instance.name, node, wipe_chunk_size)
-
- while offset < size:
- wipe_size = min(wipe_chunk_size, size - offset)
-
- logging.debug("Wiping disk %d, offset %s, chunk %s",
- idx, offset, wipe_size)
-
- result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
- wipe_size)
- result.Raise("Could not wipe disk %d at offset %d for size %d" %
- (idx, offset, wipe_size))
-
- now = time.time()
- offset += wipe_size
- if now - last_output >= 60:
- eta = _CalcEta(now - start_time, offset, size)
- lu.LogInfo(" - done: %.1f%% ETA: %s",
- offset / float(size) * 100, utils.FormatSeconds(eta))
- last_output = now
- finally:
- logging.info("Resuming synchronization of disks for instance '%s'",
- instance.name)
-
- result = lu.rpc.call_blockdev_pause_resume_sync(node,
- (map(compat.snd, disks),
- instance),
- False)
-
- if result.fail_msg:
- lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
- node, result.fail_msg)
- else:
- for idx, success in enumerate(result.payload):
- if not success:
- lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
- " failed", idx, instance.name)
-
-
class LUInstanceCreate(LogicalUnit):
"""Create an instance.
return list(iobj.all_nodes)
-def _GetInstanceInfoText(instance):
- """Compute that text that should be added to the disk's metadata.
-
- """
- return "originstname+%s" % instance.name
-
-
class LUInstanceRename(LogicalUnit):
"""Rename an instance.
_CheckNicsBridgesExist(lu, instance.nics, node)
-def _ComputeIPolicyNodeViolation(ipolicy, instance, current_group,
- target_group, cfg,
- _compute_fn=_ComputeIPolicyInstanceViolation):
- """Compute if instance meets the specs of the new target group.
-
- @param ipolicy: The ipolicy to verify
- @param instance: The instance object to verify
- @param current_group: The current group of the instance
- @param target_group: The new group of the instance
- @type cfg: L{config.ConfigWriter}
- @param cfg: Cluster configuration
- @param _compute_fn: The function to verify ipolicy (unittest only)
- @see: L{_ComputeIPolicySpecViolation}
-
- """
- if current_group == target_group:
- return []
- else:
- return _compute_fn(ipolicy, instance, cfg)
-
-
-def _CheckTargetNodeIPolicy(lu, ipolicy, instance, node, cfg, ignore=False,
- _compute_fn=_ComputeIPolicyNodeViolation):
- """Checks that the target node is correct in terms of instance policy.
-
- @param ipolicy: The ipolicy to verify
- @param instance: The instance object to verify
- @param node: The new node to relocate
- @type cfg: L{config.ConfigWriter}
- @param cfg: Cluster configuration
- @param ignore: Ignore violations of the ipolicy
- @param _compute_fn: The function to verify ipolicy (unittest only)
- @see: L{_ComputeIPolicySpecViolation}
-
- """
- primary_node = lu.cfg.GetNodeInfo(instance.primary_node)
- res = _compute_fn(ipolicy, instance, primary_node.group, node.group, cfg)
-
- if res:
- msg = ("Instance does not meet target node group's (%s) instance"
- " policy: %s") % (node.group, utils.CommaJoin(res))
- if ignore:
- lu.LogWarning(msg)
- else:
- raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
-
-
class LUInstanceMove(LogicalUnit):
"""Move an instance by data-copying.
cluster = self.cfg.GetClusterInfo()
- node_names = itertools.chain(*(i.all_nodes for i in self.wanted_instances))
- nodes = dict(self.cfg.GetMultiNodeInfo(node_names))
-
- groups = dict(self.cfg.GetMultiNodeGroupInfo(node.group
- for node in nodes.values()))
-
- group2name_fn = lambda uuid: groups[uuid].name
- for instance in self.wanted_instances:
- pnode = nodes[instance.primary_node]
-
- if self.op.static or pnode.offline:
- remote_state = None
- if pnode.offline:
- self.LogWarning("Primary node %s is marked offline, returning static"
- " information only for instance %s" %
- (pnode.name, instance.name))
- else:
- remote_info = self.rpc.call_instance_info(instance.primary_node,
- instance.name,
- instance.hypervisor)
- remote_info.Raise("Error checking node %s" % instance.primary_node)
- remote_info = remote_info.payload
- if remote_info and "state" in remote_info:
- remote_state = "up"
- else:
- if instance.admin_state == constants.ADMINST_UP:
- remote_state = "down"
- else:
- remote_state = instance.admin_state
-
- disks = map(compat.partial(self._ComputeDiskStatus, instance, None),
- instance.disks)
-
- snodes_group_uuids = [nodes[snode_name].group
- for snode_name in instance.secondary_nodes]
-
- result[instance.name] = {
- "name": instance.name,
- "config_state": instance.admin_state,
- "run_state": remote_state,
- "pnode": instance.primary_node,
- "pnode_group_uuid": pnode.group,
- "pnode_group_name": group2name_fn(pnode.group),
- "snodes": instance.secondary_nodes,
- "snodes_group_uuids": snodes_group_uuids,
- "snodes_group_names": map(group2name_fn, snodes_group_uuids),
- "os": instance.os,
- # this happens to be the same format used for hooks
- "nics": _NICListToTuple(self, instance.nics),
- "disk_template": instance.disk_template,
- "disks": disks,
- "hypervisor": instance.hypervisor,
- "network_port": instance.network_port,
- "hv_instance": instance.hvparams,
- "hv_actual": cluster.FillHV(instance, skip_globals=True),
- "be_instance": instance.beparams,
- "be_actual": cluster.FillBE(instance),
- "os_instance": instance.osparams,
- "os_actual": cluster.SimpleFillOS(instance.os, instance.osparams),
- "serial_no": instance.serial_no,
- "mtime": instance.mtime,
- "ctime": instance.ctime,
- "uuid": instance.uuid,
- }
-
- return result
-
-
-class LUInstanceRecreateDisks(LogicalUnit):
- """Recreate an instance's missing disks.
-
- """
- HPATH = "instance-recreate-disks"
- HTYPE = constants.HTYPE_INSTANCE
- REQ_BGL = False
-
- _MODIFYABLE = compat.UniqueFrozenset([
- constants.IDISK_SIZE,
- constants.IDISK_MODE,
- ])
-
- # New or changed disk parameters may have different semantics
- assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
- constants.IDISK_ADOPT,
-
- # TODO: Implement support changing VG while recreating
- constants.IDISK_VG,
- constants.IDISK_METAVG,
- constants.IDISK_PROVIDER,
- constants.IDISK_NAME,
- ]))
-
- def _RunAllocator(self):
- """Run the allocator based on input opcode.
-
- """
- be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
-
- # FIXME
- # The allocator should actually run in "relocate" mode, but current
- # allocators don't support relocating all the nodes of an instance at
- # the same time. As a workaround we use "allocate" mode, but this is
- # suboptimal for two reasons:
- # - The instance name passed to the allocator is present in the list of
- # existing instances, so there could be a conflict within the
- # internal structures of the allocator. This doesn't happen with the
- # current allocators, but it's a liability.
- # - The allocator counts the resources used by the instance twice: once
- # because the instance exists already, and once because it tries to
- # allocate a new instance.
- # The allocator could choose some of the nodes on which the instance is
- # running, but that's not a problem. If the instance nodes are broken,
- # they should be already be marked as drained or offline, and hence
- # skipped by the allocator. If instance disks have been lost for other
- # reasons, then recreating the disks on the same nodes should be fine.
- disk_template = self.instance.disk_template
- spindle_use = be_full[constants.BE_SPINDLE_USE]
- req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
- disk_template=disk_template,
- tags=list(self.instance.GetTags()),
- os=self.instance.os,
- nics=[{}],
- vcpus=be_full[constants.BE_VCPUS],
- memory=be_full[constants.BE_MAXMEM],
- spindle_use=spindle_use,
- disks=[{constants.IDISK_SIZE: d.size,
- constants.IDISK_MODE: d.mode}
- for d in self.instance.disks],
- hypervisor=self.instance.hypervisor,
- node_whitelist=None)
- ial = iallocator.IAllocator(self.cfg, self.rpc, req)
-
- ial.Run(self.op.iallocator)
-
- assert req.RequiredNodes() == len(self.instance.all_nodes)
-
- if not ial.success:
- raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
- " %s" % (self.op.iallocator, ial.info),
- errors.ECODE_NORES)
-
- self.op.nodes = ial.result
- self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
- self.op.instance_name, self.op.iallocator,
- utils.CommaJoin(ial.result))
-
- def CheckArguments(self):
- if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
- # Normalize and convert deprecated list of disk indices
- self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
-
- duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
- if duplicates:
- raise errors.OpPrereqError("Some disks have been specified more than"
- " once: %s" % utils.CommaJoin(duplicates),
- errors.ECODE_INVAL)
-
- # We don't want _CheckIAllocatorOrNode selecting the default iallocator
- # when neither iallocator nor nodes are specified
- if self.op.iallocator or self.op.nodes:
- _CheckIAllocatorOrNode(self, "iallocator", "nodes")
-
- for (idx, params) in self.op.disks:
- utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
- unsupported = frozenset(params.keys()) - self._MODIFYABLE
- if unsupported:
- raise errors.OpPrereqError("Parameters for disk %s try to change"
- " unmodifyable parameter(s): %s" %
- (idx, utils.CommaJoin(unsupported)),
- errors.ECODE_INVAL)
-
- def ExpandNames(self):
- self._ExpandAndLockInstance()
- self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
-
- if self.op.nodes:
- self.op.nodes = [_ExpandNodeName(self.cfg, n) for n in self.op.nodes]
- self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
- else:
- self.needed_locks[locking.LEVEL_NODE] = []
- if self.op.iallocator:
- # iallocator will select a new node in the same group
- self.needed_locks[locking.LEVEL_NODEGROUP] = []
- self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
-
- self.needed_locks[locking.LEVEL_NODE_RES] = []
-
- def DeclareLocks(self, level):
- if level == locking.LEVEL_NODEGROUP:
- assert self.op.iallocator is not None
- assert not self.op.nodes
- assert not self.needed_locks[locking.LEVEL_NODEGROUP]
- self.share_locks[locking.LEVEL_NODEGROUP] = 1
- # Lock the primary group used by the instance optimistically; this
- # requires going via the node before it's locked, requiring
- # verification later on
- self.needed_locks[locking.LEVEL_NODEGROUP] = \
- self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
-
- elif level == locking.LEVEL_NODE:
- # If an allocator is used, then we lock all the nodes in the current
- # instance group, as we don't know yet which ones will be selected;
- # if we replace the nodes without using an allocator, locks are
- # already declared in ExpandNames; otherwise, we need to lock all the
- # instance nodes for disk re-creation
- if self.op.iallocator:
- assert not self.op.nodes
- assert not self.needed_locks[locking.LEVEL_NODE]
- assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
-
- # Lock member nodes of the group of the primary node
- for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
- self.needed_locks[locking.LEVEL_NODE].extend(
- self.cfg.GetNodeGroup(group_uuid).members)
-
- assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
- elif not self.op.nodes:
- self._LockInstancesNodes(primary_only=False)
- elif level == locking.LEVEL_NODE_RES:
- # Copy node locks
- self.needed_locks[locking.LEVEL_NODE_RES] = \
- _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
-
- def BuildHooksEnv(self):
- """Build hooks env.
-
- This runs on master, primary and secondary nodes of the instance.
-
- """
- return _BuildInstanceHookEnvByObject(self, self.instance)
-
- def BuildHooksNodes(self):
- """Build hooks nodes.
-
- """
- nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
- return (nl, nl)
-
- def CheckPrereq(self):
- """Check prerequisites.
-
- This checks that the instance is in the cluster and is not running.
-
- """
- instance = self.cfg.GetInstanceInfo(self.op.instance_name)
- assert instance is not None, \
- "Cannot retrieve locked instance %s" % self.op.instance_name
- if self.op.nodes:
- if len(self.op.nodes) != len(instance.all_nodes):
- raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
- " %d replacement nodes were specified" %
- (instance.name, len(instance.all_nodes),
- len(self.op.nodes)),
- errors.ECODE_INVAL)
- assert instance.disk_template != constants.DT_DRBD8 or \
- len(self.op.nodes) == 2
- assert instance.disk_template != constants.DT_PLAIN or \
- len(self.op.nodes) == 1
- primary_node = self.op.nodes[0]
- else:
- primary_node = instance.primary_node
- if not self.op.iallocator:
- _CheckNodeOnline(self, primary_node)
-
- if instance.disk_template == constants.DT_DISKLESS:
- raise errors.OpPrereqError("Instance '%s' has no disks" %
- self.op.instance_name, errors.ECODE_INVAL)
-
- # Verify if node group locks are still correct
- owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
- if owned_groups:
- # Node group locks are acquired only for the primary node (and only
- # when the allocator is used)
- _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
- primary_only=True)
-
- # if we replace nodes *and* the old primary is offline, we don't
- # check the instance state
- old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
- if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
- _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
- msg="cannot recreate disks")
-
- if self.op.disks:
- self.disks = dict(self.op.disks)
- else:
- self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
-
- maxidx = max(self.disks.keys())
- if maxidx >= len(instance.disks):
- raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
- errors.ECODE_INVAL)
-
- if ((self.op.nodes or self.op.iallocator) and
- sorted(self.disks.keys()) != range(len(instance.disks))):
- raise errors.OpPrereqError("Can't recreate disks partially and"
- " change the nodes at the same time",
- errors.ECODE_INVAL)
-
- self.instance = instance
-
- if self.op.iallocator:
- self._RunAllocator()
- # Release unneeded node and node resource locks
- _ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
- _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
- _ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
-
- assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
-
- def Exec(self, feedback_fn):
- """Recreate the disks.
-
- """
- instance = self.instance
-
- assert (self.owned_locks(locking.LEVEL_NODE) ==
- self.owned_locks(locking.LEVEL_NODE_RES))
-
- to_skip = []
- mods = [] # keeps track of needed changes
-
- for idx, disk in enumerate(instance.disks):
- try:
- changes = self.disks[idx]
- except KeyError:
- # Disk should not be recreated
- to_skip.append(idx)
- continue
-
- # update secondaries for disks, if needed
- if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
- # need to update the nodes and minors
- assert len(self.op.nodes) == 2
- assert len(disk.logical_id) == 6 # otherwise disk internals
- # have changed
- (_, _, old_port, _, _, old_secret) = disk.logical_id
- new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
- new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
- new_minors[0], new_minors[1], old_secret)
- assert len(disk.logical_id) == len(new_id)
- else:
- new_id = None
-
- mods.append((idx, new_id, changes))
-
- # now that we have passed all asserts above, we can apply the mods
- # in a single run (to avoid partial changes)
- for idx, new_id, changes in mods:
- disk = instance.disks[idx]
- if new_id is not None:
- assert disk.dev_type == constants.LD_DRBD8
- disk.logical_id = new_id
- if changes:
- disk.Update(size=changes.get(constants.IDISK_SIZE, None),
- mode=changes.get(constants.IDISK_MODE, None))
-
- # change primary node, if needed
- if self.op.nodes:
- instance.primary_node = self.op.nodes[0]
- self.LogWarning("Changing the instance's nodes, you will have to"
- " remove any disks left on the older nodes manually")
-
- if self.op.nodes:
- self.cfg.Update(instance, feedback_fn)
-
- # All touched nodes must be locked
- mylocks = self.owned_locks(locking.LEVEL_NODE)
- assert mylocks.issuperset(frozenset(instance.all_nodes))
- _CreateDisks(self, instance, to_skip=to_skip)
-
-
-def _SafeShutdownInstanceDisks(lu, instance, disks=None):
- """Shutdown block devices of an instance.
-
- This function checks if an instance is running, before calling
- _ShutdownInstanceDisks.
-
- """
- _CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
- _ShutdownInstanceDisks(lu, instance, disks=disks)
-
-
-def _DiskSizeInBytesToMebibytes(lu, size):
- """Converts a disk size in bytes to mebibytes.
-
- Warns and rounds up if the size isn't an even multiple of 1 MiB.
-
- """
- (mib, remainder) = divmod(size, 1024 * 1024)
-
- if remainder != 0:
- lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
- " to not overwrite existing data (%s bytes will not be"
- " wiped)", (1024 * 1024) - remainder)
- mib += 1
-
- return mib
-
-
-class LUInstanceGrowDisk(LogicalUnit):
- """Grow a disk of an instance.
-
- """
- HPATH = "disk-grow"
- HTYPE = constants.HTYPE_INSTANCE
- REQ_BGL = False
-
- def ExpandNames(self):
- self._ExpandAndLockInstance()
- self.needed_locks[locking.LEVEL_NODE] = []
- self.needed_locks[locking.LEVEL_NODE_RES] = []
- self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
- self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
-
- def DeclareLocks(self, level):
- if level == locking.LEVEL_NODE:
- self._LockInstancesNodes()
- elif level == locking.LEVEL_NODE_RES:
- # Copy node locks
- self.needed_locks[locking.LEVEL_NODE_RES] = \
- _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
-
- def BuildHooksEnv(self):
- """Build hooks env.
-
- This runs on the master, the primary and all the secondaries.
-
- """
- env = {
- "DISK": self.op.disk,
- "AMOUNT": self.op.amount,
- "ABSOLUTE": self.op.absolute,
- }
- env.update(_BuildInstanceHookEnvByObject(self, self.instance))
- return env
-
- def BuildHooksNodes(self):
- """Build hooks nodes.
-
- """
- nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
- return (nl, nl)
-
- def CheckPrereq(self):
- """Check prerequisites.
-
- This checks that the instance is in the cluster.
-
- """
- instance = self.cfg.GetInstanceInfo(self.op.instance_name)
- assert instance is not None, \
- "Cannot retrieve locked instance %s" % self.op.instance_name
- nodenames = list(instance.all_nodes)
- for node in nodenames:
- _CheckNodeOnline(self, node)
-
- self.instance = instance
-
- if instance.disk_template not in constants.DTS_GROWABLE:
- raise errors.OpPrereqError("Instance's disk layout does not support"
- " growing", errors.ECODE_INVAL)
-
- self.disk = instance.FindDisk(self.op.disk)
-
- if self.op.absolute:
- self.target = self.op.amount
- self.delta = self.target - self.disk.size
- if self.delta < 0:
- raise errors.OpPrereqError("Requested size (%s) is smaller than "
- "current disk size (%s)" %
- (utils.FormatUnit(self.target, "h"),
- utils.FormatUnit(self.disk.size, "h")),
- errors.ECODE_STATE)
- else:
- self.delta = self.op.amount
- self.target = self.disk.size + self.delta
- if self.delta < 0:
- raise errors.OpPrereqError("Requested increment (%s) is negative" %
- utils.FormatUnit(self.delta, "h"),
- errors.ECODE_INVAL)
-
- self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
-
- def _CheckDiskSpace(self, nodenames, req_vgspace):
- template = self.instance.disk_template
- if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
- # TODO: check the free disk space for file, when that feature will be
- # supported
- nodes = map(self.cfg.GetNodeInfo, nodenames)
- es_nodes = filter(lambda n: _IsExclusiveStorageEnabledNode(self.cfg, n),
- nodes)
- if es_nodes:
- # With exclusive storage we need to something smarter than just looking
- # at free space; for now, let's simply abort the operation.
- raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
- " is enabled", errors.ECODE_STATE)
- _CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
-
- def Exec(self, feedback_fn):
- """Execute disk grow.
-
- """
- instance = self.instance
- disk = self.disk
-
- assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
- assert (self.owned_locks(locking.LEVEL_NODE) ==
- self.owned_locks(locking.LEVEL_NODE_RES))
-
- wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
-
- disks_ok, _ = _AssembleInstanceDisks(self, self.instance, disks=[disk])
- if not disks_ok:
- raise errors.OpExecError("Cannot activate block device to grow")
-
- feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
- (self.op.disk, instance.name,
- utils.FormatUnit(self.delta, "h"),
- utils.FormatUnit(self.target, "h")))
-
- # First run all grow ops in dry-run mode
- for node in instance.all_nodes:
- self.cfg.SetDiskID(disk, node)
- result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
- True, True)
- result.Raise("Dry-run grow request failed to node %s" % node)
-
- if wipe_disks:
- # Get disk size from primary node for wiping
- result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk])
- result.Raise("Failed to retrieve disk size from node '%s'" %
- instance.primary_node)
-
- (disk_size_in_bytes, ) = result.payload
-
- if disk_size_in_bytes is None:
- raise errors.OpExecError("Failed to retrieve disk size from primary"
- " node '%s'" % instance.primary_node)
-
- old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
-
- assert old_disk_size >= disk.size, \
- ("Retrieved disk size too small (got %s, should be at least %s)" %
- (old_disk_size, disk.size))
- else:
- old_disk_size = None
-
- # We know that (as far as we can test) operations across different
- # nodes will succeed, time to run it for real on the backing storage
- for node in instance.all_nodes:
- self.cfg.SetDiskID(disk, node)
- result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
- False, True)
- result.Raise("Grow request failed to node %s" % node)
-
- # And now execute it for logical storage, on the primary node
- node = instance.primary_node
- self.cfg.SetDiskID(disk, node)
- result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
- False, False)
- result.Raise("Grow request failed to node %s" % node)
-
- disk.RecordGrow(self.delta)
- self.cfg.Update(instance, feedback_fn)
-
- # Changes have been recorded, release node lock
- _ReleaseLocks(self, locking.LEVEL_NODE)
-
- # Downgrade lock while waiting for sync
- self.glm.downgrade(locking.LEVEL_INSTANCE)
-
- assert wipe_disks ^ (old_disk_size is None)
-
- if wipe_disks:
- assert instance.disks[self.op.disk] == disk
-
- # Wipe newly added disk space
- _WipeDisks(self, instance,
- disks=[(self.op.disk, disk, old_disk_size)])
-
- if self.op.wait_for_sync:
- disk_abort = not _WaitForSync(self, instance, disks=[disk])
- if disk_abort:
- self.LogWarning("Disk syncing has not returned a good status; check"
- " the instance")
- if instance.admin_state != constants.ADMINST_UP:
- _SafeShutdownInstanceDisks(self, instance, disks=[disk])
- elif instance.admin_state != constants.ADMINST_UP:
- self.LogWarning("Not shutting down the disk even if the instance is"
- " not supposed to be running because no wait for"
- " sync mode was requested")
-
- assert self.owned_locks(locking.LEVEL_NODE_RES)
- assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
-
-
-class LUInstanceReplaceDisks(LogicalUnit):
- """Replace the disks of an instance.
-
- """
- HPATH = "mirrors-replace"
- HTYPE = constants.HTYPE_INSTANCE
- REQ_BGL = False
-
- def CheckArguments(self):
- """Check arguments.
-
- """
- remote_node = self.op.remote_node
- ialloc = self.op.iallocator
- if self.op.mode == constants.REPLACE_DISK_CHG:
- if remote_node is None and ialloc is None:
- raise errors.OpPrereqError("When changing the secondary either an"
- " iallocator script must be used or the"
- " new node given", errors.ECODE_INVAL)
- else:
- _CheckIAllocatorOrNode(self, "iallocator", "remote_node")
-
- elif remote_node is not None or ialloc is not None:
- # Not replacing the secondary
- raise errors.OpPrereqError("The iallocator and new node options can"
- " only be used when changing the"
- " secondary node", errors.ECODE_INVAL)
-
- def ExpandNames(self):
- self._ExpandAndLockInstance()
-
- assert locking.LEVEL_NODE not in self.needed_locks
- assert locking.LEVEL_NODE_RES not in self.needed_locks
- assert locking.LEVEL_NODEGROUP not in self.needed_locks
-
- assert self.op.iallocator is None or self.op.remote_node is None, \
- "Conflicting options"
-
- if self.op.remote_node is not None:
- self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
-
- # Warning: do not remove the locking of the new secondary here
- # unless DRBD8.AddChildren is changed to work in parallel;
- # currently it doesn't since parallel invocations of
- # FindUnusedMinor will conflict
- self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
- self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
- else:
- self.needed_locks[locking.LEVEL_NODE] = []
- self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
-
- if self.op.iallocator is not None:
- # iallocator will select a new node in the same group
- self.needed_locks[locking.LEVEL_NODEGROUP] = []
- self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
-
- self.needed_locks[locking.LEVEL_NODE_RES] = []
-
- self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
- self.op.iallocator, self.op.remote_node,
- self.op.disks, self.op.early_release,
- self.op.ignore_ipolicy)
-
- self.tasklets = [self.replacer]
-
- def DeclareLocks(self, level):
- if level == locking.LEVEL_NODEGROUP:
- assert self.op.remote_node is None
- assert self.op.iallocator is not None
- assert not self.needed_locks[locking.LEVEL_NODEGROUP]
-
- self.share_locks[locking.LEVEL_NODEGROUP] = 1
- # Lock all groups used by instance optimistically; this requires going
- # via the node before it's locked, requiring verification later on
- self.needed_locks[locking.LEVEL_NODEGROUP] = \
- self.cfg.GetInstanceNodeGroups(self.op.instance_name)
-
- elif level == locking.LEVEL_NODE:
- if self.op.iallocator is not None:
- assert self.op.remote_node is None
- assert not self.needed_locks[locking.LEVEL_NODE]
- assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
-
- # Lock member nodes of all locked groups
- self.needed_locks[locking.LEVEL_NODE] = \
- [node_name
- for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
- for node_name in self.cfg.GetNodeGroup(group_uuid).members]
- else:
- assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
-
- self._LockInstancesNodes()
-
- elif level == locking.LEVEL_NODE_RES:
- # Reuse node locks
- self.needed_locks[locking.LEVEL_NODE_RES] = \
- self.needed_locks[locking.LEVEL_NODE]
-
- def BuildHooksEnv(self):
- """Build hooks env.
-
- This runs on the master, the primary and all the secondaries.
-
- """
- instance = self.replacer.instance
- env = {
- "MODE": self.op.mode,
- "NEW_SECONDARY": self.op.remote_node,
- "OLD_SECONDARY": instance.secondary_nodes[0],
- }
- env.update(_BuildInstanceHookEnvByObject(self, instance))
- return env
-
- def BuildHooksNodes(self):
- """Build hooks nodes.
-
- """
- instance = self.replacer.instance
- nl = [
- self.cfg.GetMasterNode(),
- instance.primary_node,
- ]
- if self.op.remote_node is not None:
- nl.append(self.op.remote_node)
- return nl, nl
-
- def CheckPrereq(self):
- """Check prerequisites.
-
- """
- assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
- self.op.iallocator is None)
-
- # Verify if node group locks are still correct
- owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
- if owned_groups:
- _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
-
- return LogicalUnit.CheckPrereq(self)
-
-
-class LUInstanceActivateDisks(NoHooksLU):
- """Bring up an instance's disks.
-
- """
- REQ_BGL = False
-
- def ExpandNames(self):
- self._ExpandAndLockInstance()
- self.needed_locks[locking.LEVEL_NODE] = []
- self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
-
- def DeclareLocks(self, level):
- if level == locking.LEVEL_NODE:
- self._LockInstancesNodes()
-
- def CheckPrereq(self):
- """Check prerequisites.
-
- This checks that the instance is in the cluster.
-
- """
- self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
- assert self.instance is not None, \
- "Cannot retrieve locked instance %s" % self.op.instance_name
- _CheckNodeOnline(self, self.instance.primary_node)
-
- def Exec(self, feedback_fn):
- """Activate the disks.
-
- """
- disks_ok, disks_info = \
- _AssembleInstanceDisks(self, self.instance,
- ignore_size=self.op.ignore_size)
- if not disks_ok:
- raise errors.OpExecError("Cannot activate block devices")
-
- if self.op.wait_for_sync:
- if not _WaitForSync(self, self.instance):
- raise errors.OpExecError("Some disks of the instance are degraded!")
-
- return disks_info
-
-
-class LUInstanceDeactivateDisks(NoHooksLU):
- """Shutdown an instance's disks.
-
- """
- REQ_BGL = False
-
- def ExpandNames(self):
- self._ExpandAndLockInstance()
- self.needed_locks[locking.LEVEL_NODE] = []
- self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+ node_names = itertools.chain(*(i.all_nodes for i in self.wanted_instances))
+ nodes = dict(self.cfg.GetMultiNodeInfo(node_names))
- def DeclareLocks(self, level):
- if level == locking.LEVEL_NODE:
- self._LockInstancesNodes()
+ groups = dict(self.cfg.GetMultiNodeGroupInfo(node.group
+ for node in nodes.values()))
- def CheckPrereq(self):
- """Check prerequisites.
+ group2name_fn = lambda uuid: groups[uuid].name
+ for instance in self.wanted_instances:
+ pnode = nodes[instance.primary_node]
- This checks that the instance is in the cluster.
+ if self.op.static or pnode.offline:
+ remote_state = None
+ if pnode.offline:
+ self.LogWarning("Primary node %s is marked offline, returning static"
+ " information only for instance %s" %
+ (pnode.name, instance.name))
+ else:
+ remote_info = self.rpc.call_instance_info(instance.primary_node,
+ instance.name,
+ instance.hypervisor)
+ remote_info.Raise("Error checking node %s" % instance.primary_node)
+ remote_info = remote_info.payload
+ if remote_info and "state" in remote_info:
+ remote_state = "up"
+ else:
+ if instance.admin_state == constants.ADMINST_UP:
+ remote_state = "down"
+ else:
+ remote_state = instance.admin_state
- """
- self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
- assert self.instance is not None, \
- "Cannot retrieve locked instance %s" % self.op.instance_name
+ disks = map(compat.partial(self._ComputeDiskStatus, instance, None),
+ instance.disks)
- def Exec(self, feedback_fn):
- """Deactivate the disks
+ snodes_group_uuids = [nodes[snode_name].group
+ for snode_name in instance.secondary_nodes]
- """
- instance = self.instance
- if self.op.force:
- _ShutdownInstanceDisks(self, instance)
- else:
- _SafeShutdownInstanceDisks(self, instance)
+ result[instance.name] = {
+ "name": instance.name,
+ "config_state": instance.admin_state,
+ "run_state": remote_state,
+ "pnode": instance.primary_node,
+ "pnode_group_uuid": pnode.group,
+ "pnode_group_name": group2name_fn(pnode.group),
+ "snodes": instance.secondary_nodes,
+ "snodes_group_uuids": snodes_group_uuids,
+ "snodes_group_names": map(group2name_fn, snodes_group_uuids),
+ "os": instance.os,
+ # this happens to be the same format used for hooks
+ "nics": _NICListToTuple(self, instance.nics),
+ "disk_template": instance.disk_template,
+ "disks": disks,
+ "hypervisor": instance.hypervisor,
+ "network_port": instance.network_port,
+ "hv_instance": instance.hvparams,
+ "hv_actual": cluster.FillHV(instance, skip_globals=True),
+ "be_instance": instance.beparams,
+ "be_actual": cluster.FillBE(instance),
+ "os_instance": instance.osparams,
+ "os_actual": cluster.SimpleFillOS(instance.os, instance.osparams),
+ "serial_no": instance.serial_no,
+ "mtime": instance.mtime,
+ "ctime": instance.ctime,
+ "uuid": instance.uuid,
+ }
+
+ return result
class LUInstanceStartup(LogicalUnit):
return ResultWithJobs(jobs)
-def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
- ldisk=False):
- """Check that mirrors are not degraded.
-
- @attention: The device has to be annotated already.
-
- The ldisk parameter, if True, will change the test from the
- is_degraded attribute (which represents overall non-ok status for
- the device(s)) to the ldisk (representing the local storage status).
-
- """
- lu.cfg.SetDiskID(dev, node)
-
- result = True
-
- if on_primary or dev.AssembleOnSecondary():
- rstats = lu.rpc.call_blockdev_find(node, dev)
- msg = rstats.fail_msg
- if msg:
- lu.LogWarning("Can't find disk on node %s: %s", node, msg)
- result = False
- elif not rstats.payload:
- lu.LogWarning("Can't find disk on node %s", node)
- result = False
- else:
- if ldisk:
- result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
- else:
- result = result and not rstats.payload.is_degraded
-
- if dev.children:
- for child in dev.children:
- result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
- on_primary)
-
- return result
-
-
-def _CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
- """Wrapper around L{_CheckDiskConsistencyInner}.
-
- """
- (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
- return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
- ldisk=ldisk)
-
-
class TLMigrateInstance(Tasklet):
"""Tasklet class for instance migration.
return self._ExecCleanup()
else:
return self._ExecMigration()
-
-
-def _BlockdevFind(lu, node, dev, instance):
- """Wrapper around call_blockdev_find to annotate diskparams.
-
- @param lu: A reference to the lu object
- @param node: The node to call out
- @param dev: The device to find
- @param instance: The instance object the device belongs to
- @returns The result of the rpc call
-
- """
- (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
- return lu.rpc.call_blockdev_find(node, disk)
-
-
-class TLReplaceDisks(Tasklet):
- """Replaces disks for an instance.
-
- Note: Locking is not within the scope of this class.
-
- """
- def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
- disks, early_release, ignore_ipolicy):
- """Initializes this class.
-
- """
- Tasklet.__init__(self, lu)
-
- # Parameters
- self.instance_name = instance_name
- self.mode = mode
- self.iallocator_name = iallocator_name
- self.remote_node = remote_node
- self.disks = disks
- self.early_release = early_release
- self.ignore_ipolicy = ignore_ipolicy
-
- # Runtime data
- self.instance = None
- self.new_node = None
- self.target_node = None
- self.other_node = None
- self.remote_node_info = None
- self.node_secondary_ip = None
-
- @staticmethod
- def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
- """Compute a new secondary node using an IAllocator.
-
- """
- req = iallocator.IAReqRelocate(name=instance_name,
- relocate_from=list(relocate_from))
- ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
-
- ial.Run(iallocator_name)
-
- if not ial.success:
- raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
- " %s" % (iallocator_name, ial.info),
- errors.ECODE_NORES)
-
- remote_node_name = ial.result[0]
-
- lu.LogInfo("Selected new secondary for instance '%s': %s",
- instance_name, remote_node_name)
-
- return remote_node_name
-
- def _FindFaultyDisks(self, node_name):
- """Wrapper for L{_FindFaultyInstanceDisks}.
-
- """
- return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
- node_name, True)
-
- def _CheckDisksActivated(self, instance):
- """Checks if the instance disks are activated.
-
- @param instance: The instance to check disks
- @return: True if they are activated, False otherwise
-
- """
- nodes = instance.all_nodes
-
- for idx, dev in enumerate(instance.disks):
- for node in nodes:
- self.lu.LogInfo("Checking disk/%d on %s", idx, node)
- self.cfg.SetDiskID(dev, node)
-
- result = _BlockdevFind(self, node, dev, instance)
-
- if result.offline:
- continue
- elif result.fail_msg or not result.payload:
- return False
-
- return True
-
- def CheckPrereq(self):
- """Check prerequisites.
-
- This checks that the instance is in the cluster.
-
- """
- self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
- assert instance is not None, \
- "Cannot retrieve locked instance %s" % self.instance_name
-
- if instance.disk_template != constants.DT_DRBD8:
- raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
- " instances", errors.ECODE_INVAL)
-
- if len(instance.secondary_nodes) != 1:
- raise errors.OpPrereqError("The instance has a strange layout,"
- " expected one secondary but found %d" %
- len(instance.secondary_nodes),
- errors.ECODE_FAULT)
-
- instance = self.instance
- secondary_node = instance.secondary_nodes[0]
-
- if self.iallocator_name is None:
- remote_node = self.remote_node
- else:
- remote_node = self._RunAllocator(self.lu, self.iallocator_name,
- instance.name, instance.secondary_nodes)
-
- if remote_node is None:
- self.remote_node_info = None
- else:
- assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \
- "Remote node '%s' is not locked" % remote_node
-
- self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
- assert self.remote_node_info is not None, \
- "Cannot retrieve locked node %s" % remote_node
-
- if remote_node == self.instance.primary_node:
- raise errors.OpPrereqError("The specified node is the primary node of"
- " the instance", errors.ECODE_INVAL)
-
- if remote_node == secondary_node:
- raise errors.OpPrereqError("The specified node is already the"
- " secondary node of the instance",
- errors.ECODE_INVAL)
-
- if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
- constants.REPLACE_DISK_CHG):
- raise errors.OpPrereqError("Cannot specify disks to be replaced",
- errors.ECODE_INVAL)
-
- if self.mode == constants.REPLACE_DISK_AUTO:
- if not self._CheckDisksActivated(instance):
- raise errors.OpPrereqError("Please run activate-disks on instance %s"
- " first" % self.instance_name,
- errors.ECODE_STATE)
- faulty_primary = self._FindFaultyDisks(instance.primary_node)
- faulty_secondary = self._FindFaultyDisks(secondary_node)
-
- if faulty_primary and faulty_secondary:
- raise errors.OpPrereqError("Instance %s has faulty disks on more than"
- " one node and can not be repaired"
- " automatically" % self.instance_name,
- errors.ECODE_STATE)
-
- if faulty_primary:
- self.disks = faulty_primary
- self.target_node = instance.primary_node
- self.other_node = secondary_node
- check_nodes = [self.target_node, self.other_node]
- elif faulty_secondary:
- self.disks = faulty_secondary
- self.target_node = secondary_node
- self.other_node = instance.primary_node
- check_nodes = [self.target_node, self.other_node]
- else:
- self.disks = []
- check_nodes = []
-
- else:
- # Non-automatic modes
- if self.mode == constants.REPLACE_DISK_PRI:
- self.target_node = instance.primary_node
- self.other_node = secondary_node
- check_nodes = [self.target_node, self.other_node]
-
- elif self.mode == constants.REPLACE_DISK_SEC:
- self.target_node = secondary_node
- self.other_node = instance.primary_node
- check_nodes = [self.target_node, self.other_node]
-
- elif self.mode == constants.REPLACE_DISK_CHG:
- self.new_node = remote_node
- self.other_node = instance.primary_node
- self.target_node = secondary_node
- check_nodes = [self.new_node, self.other_node]
-
- _CheckNodeNotDrained(self.lu, remote_node)
- _CheckNodeVmCapable(self.lu, remote_node)
-
- old_node_info = self.cfg.GetNodeInfo(secondary_node)
- assert old_node_info is not None
- if old_node_info.offline and not self.early_release:
- # doesn't make sense to delay the release
- self.early_release = True
- self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
- " early-release mode", secondary_node)
-
- else:
- raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
- self.mode)
-
- # If not specified all disks should be replaced
- if not self.disks:
- self.disks = range(len(self.instance.disks))
-
- # TODO: This is ugly, but right now we can't distinguish between internal
- # submitted opcode and external one. We should fix that.
- if self.remote_node_info:
- # We change the node, lets verify it still meets instance policy
- new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
- cluster = self.cfg.GetClusterInfo()
- ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
- new_group_info)
- _CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
- self.cfg, ignore=self.ignore_ipolicy)
-
- for node in check_nodes:
- _CheckNodeOnline(self.lu, node)
-
- touched_nodes = frozenset(node_name for node_name in [self.new_node,
- self.other_node,
- self.target_node]
- if node_name is not None)
-
- # Release unneeded node and node resource locks
- _ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
- _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
- _ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
-
- # Release any owned node group
- _ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
-
- # Check whether disks are valid
- for disk_idx in self.disks:
- instance.FindDisk(disk_idx)
-
- # Get secondary node IP addresses
- self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node)
- in self.cfg.GetMultiNodeInfo(touched_nodes))
-
- def Exec(self, feedback_fn):
- """Execute disk replacement.
-
- This dispatches the disk replacement to the appropriate handler.
-
- """
- if __debug__:
- # Verify owned locks before starting operation
- owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
- assert set(owned_nodes) == set(self.node_secondary_ip), \
- ("Incorrect node locks, owning %s, expected %s" %
- (owned_nodes, self.node_secondary_ip.keys()))
- assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
- self.lu.owned_locks(locking.LEVEL_NODE_RES))
- assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
-
- owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
- assert list(owned_instances) == [self.instance_name], \
- "Instance '%s' not locked" % self.instance_name
-
- assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
- "Should not own any node group lock at this point"
-
- if not self.disks:
- feedback_fn("No disks need replacement for instance '%s'" %
- self.instance.name)
- return
-
- feedback_fn("Replacing disk(s) %s for instance '%s'" %
- (utils.CommaJoin(self.disks), self.instance.name))
- feedback_fn("Current primary node: %s" % self.instance.primary_node)
- feedback_fn("Current seconary node: %s" %
- utils.CommaJoin(self.instance.secondary_nodes))
-
- activate_disks = (self.instance.admin_state != constants.ADMINST_UP)
-
- # Activate the instance disks if we're replacing them on a down instance
- if activate_disks:
- _StartInstanceDisks(self.lu, self.instance, True)
-
- try:
- # Should we replace the secondary node?
- if self.new_node is not None:
- fn = self._ExecDrbd8Secondary
- else:
- fn = self._ExecDrbd8DiskOnly
-
- result = fn(feedback_fn)
- finally:
- # Deactivate the instance disks if we're replacing them on a
- # down instance
- if activate_disks:
- _SafeShutdownInstanceDisks(self.lu, self.instance)
-
- assert not self.lu.owned_locks(locking.LEVEL_NODE)
-
- if __debug__:
- # Verify owned locks
- owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
- nodes = frozenset(self.node_secondary_ip)
- assert ((self.early_release and not owned_nodes) or
- (not self.early_release and not (set(owned_nodes) - nodes))), \
- ("Not owning the correct locks, early_release=%s, owned=%r,"
- " nodes=%r" % (self.early_release, owned_nodes, nodes))
-
- return result
-
- def _CheckVolumeGroup(self, nodes):
- self.lu.LogInfo("Checking volume groups")
-
- vgname = self.cfg.GetVGName()
-
- # Make sure volume group exists on all involved nodes
- results = self.rpc.call_vg_list(nodes)
- if not results:
- raise errors.OpExecError("Can't list volume groups on the nodes")
-
- for node in nodes:
- res = results[node]
- res.Raise("Error checking node %s" % node)
- if vgname not in res.payload:
- raise errors.OpExecError("Volume group '%s' not found on node %s" %
- (vgname, node))
-
- def _CheckDisksExistence(self, nodes):
- # Check disk existence
- for idx, dev in enumerate(self.instance.disks):
- if idx not in self.disks:
- continue
-
- for node in nodes:
- self.lu.LogInfo("Checking disk/%d on %s", idx, node)
- self.cfg.SetDiskID(dev, node)
-
- result = _BlockdevFind(self, node, dev, self.instance)
-
- msg = result.fail_msg
- if msg or not result.payload:
- if not msg:
- msg = "disk not found"
- raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
- (idx, node, msg))
-
- def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
- for idx, dev in enumerate(self.instance.disks):
- if idx not in self.disks:
- continue
-
- self.lu.LogInfo("Checking disk/%d consistency on node %s" %
- (idx, node_name))
-
- if not _CheckDiskConsistency(self.lu, self.instance, dev, node_name,
- on_primary, ldisk=ldisk):
- raise errors.OpExecError("Node %s has degraded storage, unsafe to"
- " replace disks for instance %s" %
- (node_name, self.instance.name))
-
- def _CreateNewStorage(self, node_name):
- """Create new storage on the primary or secondary node.
-
- This is only used for same-node replaces, not for changing the
- secondary node, hence we don't want to modify the existing disk.
-
- """
- iv_names = {}
-
- disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
- for idx, dev in enumerate(disks):
- if idx not in self.disks:
- continue
-
- self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx)
-
- self.cfg.SetDiskID(dev, node_name)
-
- lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
- names = _GenerateUniqueNames(self.lu, lv_names)
-
- (data_disk, meta_disk) = dev.children
- vg_data = data_disk.logical_id[0]
- lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
- logical_id=(vg_data, names[0]),
- params=data_disk.params)
- vg_meta = meta_disk.logical_id[0]
- lv_meta = objects.Disk(dev_type=constants.LD_LV,
- size=constants.DRBD_META_SIZE,
- logical_id=(vg_meta, names[1]),
- params=meta_disk.params)
-
- new_lvs = [lv_data, lv_meta]
- old_lvs = [child.Copy() for child in dev.children]
- iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
- excl_stor = _IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
-
- # we pass force_create=True to force the LVM creation
- for new_lv in new_lvs:
- _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
- _GetInstanceInfoText(self.instance), False,
- excl_stor)
-
- return iv_names
-
- def _CheckDevices(self, node_name, iv_names):
- for name, (dev, _, _) in iv_names.iteritems():
- self.cfg.SetDiskID(dev, node_name)
-
- result = _BlockdevFind(self, node_name, dev, self.instance)
-
- msg = result.fail_msg
- if msg or not result.payload:
- if not msg:
- msg = "disk not found"
- raise errors.OpExecError("Can't find DRBD device %s: %s" %
- (name, msg))
-
- if result.payload.is_degraded:
- raise errors.OpExecError("DRBD device %s is degraded!" % name)
-
- def _RemoveOldStorage(self, node_name, iv_names):
- for name, (_, old_lvs, _) in iv_names.iteritems():
- self.lu.LogInfo("Remove logical volumes for %s", name)
-
- for lv in old_lvs:
- self.cfg.SetDiskID(lv, node_name)
-
- msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
- if msg:
- self.lu.LogWarning("Can't remove old LV: %s", msg,
- hint="remove unused LVs manually")
-
- def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
- """Replace a disk on the primary or secondary for DRBD 8.
-
- The algorithm for replace is quite complicated:
-
- 1. for each disk to be replaced:
-
- 1. create new LVs on the target node with unique names
- 1. detach old LVs from the drbd device
- 1. rename old LVs to name_replaced.<time_t>
- 1. rename new LVs to old LVs
- 1. attach the new LVs (with the old names now) to the drbd device
-
- 1. wait for sync across all devices
-
- 1. for each modified disk:
-
- 1. remove old LVs (which have the name name_replaces.<time_t>)
-
- Failures are not very well handled.
-
- """
- steps_total = 6
-
- # Step: check device activation
- self.lu.LogStep(1, steps_total, "Check device existence")
- self._CheckDisksExistence([self.other_node, self.target_node])
- self._CheckVolumeGroup([self.target_node, self.other_node])
-
- # Step: check other node consistency
- self.lu.LogStep(2, steps_total, "Check peer consistency")
- self._CheckDisksConsistency(self.other_node,
- self.other_node == self.instance.primary_node,
- False)
-
- # Step: create new storage
- self.lu.LogStep(3, steps_total, "Allocate new storage")
- iv_names = self._CreateNewStorage(self.target_node)
-
- # Step: for each lv, detach+rename*2+attach
- self.lu.LogStep(4, steps_total, "Changing drbd configuration")
- for dev, old_lvs, new_lvs in iv_names.itervalues():
- self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
-
- result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
- old_lvs)
- result.Raise("Can't detach drbd from local storage on node"
- " %s for device %s" % (self.target_node, dev.iv_name))
- #dev.children = []
- #cfg.Update(instance)
-
- # ok, we created the new LVs, so now we know we have the needed
- # storage; as such, we proceed on the target node to rename
- # old_lv to _old, and new_lv to old_lv; note that we rename LVs
- # using the assumption that logical_id == physical_id (which in
- # turn is the unique_id on that node)
-
- # FIXME(iustin): use a better name for the replaced LVs
- temp_suffix = int(time.time())
- ren_fn = lambda d, suff: (d.physical_id[0],
- d.physical_id[1] + "_replaced-%s" % suff)
-
- # Build the rename list based on what LVs exist on the node
- rename_old_to_new = []
- for to_ren in old_lvs:
- result = self.rpc.call_blockdev_find(self.target_node, to_ren)
- if not result.fail_msg and result.payload:
- # device exists
- rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
-
- self.lu.LogInfo("Renaming the old LVs on the target node")
- result = self.rpc.call_blockdev_rename(self.target_node,
- rename_old_to_new)
- result.Raise("Can't rename old LVs on node %s" % self.target_node)
-
- # Now we rename the new LVs to the old LVs
- self.lu.LogInfo("Renaming the new LVs on the target node")
- rename_new_to_old = [(new, old.physical_id)
- for old, new in zip(old_lvs, new_lvs)]
- result = self.rpc.call_blockdev_rename(self.target_node,
- rename_new_to_old)
- result.Raise("Can't rename new LVs on node %s" % self.target_node)
-
- # Intermediate steps of in memory modifications
- for old, new in zip(old_lvs, new_lvs):
- new.logical_id = old.logical_id
- self.cfg.SetDiskID(new, self.target_node)
-
- # We need to modify old_lvs so that removal later removes the
- # right LVs, not the newly added ones; note that old_lvs is a
- # copy here
- for disk in old_lvs:
- disk.logical_id = ren_fn(disk, temp_suffix)
- self.cfg.SetDiskID(disk, self.target_node)
-
- # Now that the new lvs have the old name, we can add them to the device
- self.lu.LogInfo("Adding new mirror component on %s", self.target_node)
- result = self.rpc.call_blockdev_addchildren(self.target_node,
- (dev, self.instance), new_lvs)
- msg = result.fail_msg
- if msg:
- for new_lv in new_lvs:
- msg2 = self.rpc.call_blockdev_remove(self.target_node,
- new_lv).fail_msg
- if msg2:
- self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
- hint=("cleanup manually the unused logical"
- "volumes"))
- raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
-
- cstep = itertools.count(5)
-
- if self.early_release:
- self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
- self._RemoveOldStorage(self.target_node, iv_names)
- # TODO: Check if releasing locks early still makes sense
- _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
- else:
- # Release all resource locks except those used by the instance
- _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
- keep=self.node_secondary_ip.keys())
-
- # Release all node locks while waiting for sync
- _ReleaseLocks(self.lu, locking.LEVEL_NODE)
-
- # TODO: Can the instance lock be downgraded here? Take the optional disk
- # shutdown in the caller into consideration.
-
- # Wait for sync
- # This can fail as the old devices are degraded and _WaitForSync
- # does a combined result over all disks, so we don't check its return value
- self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
- _WaitForSync(self.lu, self.instance)
-
- # Check all devices manually
- self._CheckDevices(self.instance.primary_node, iv_names)
-
- # Step: remove old storage
- if not self.early_release:
- self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
- self._RemoveOldStorage(self.target_node, iv_names)
-
- def _ExecDrbd8Secondary(self, feedback_fn):
- """Replace the secondary node for DRBD 8.
-
- The algorithm for replace is quite complicated:
- - for all disks of the instance:
- - create new LVs on the new node with same names
- - shutdown the drbd device on the old secondary
- - disconnect the drbd network on the primary
- - create the drbd device on the new secondary
- - network attach the drbd on the primary, using an artifice:
- the drbd code for Attach() will connect to the network if it
- finds a device which is connected to the good local disks but
- not network enabled
- - wait for sync across all devices
- - remove all disks from the old secondary
-
- Failures are not very well handled.
-
- """
- steps_total = 6
-
- pnode = self.instance.primary_node
-
- # Step: check device activation
- self.lu.LogStep(1, steps_total, "Check device existence")
- self._CheckDisksExistence([self.instance.primary_node])
- self._CheckVolumeGroup([self.instance.primary_node])
-
- # Step: check other node consistency
- self.lu.LogStep(2, steps_total, "Check peer consistency")
- self._CheckDisksConsistency(self.instance.primary_node, True, True)
-
- # Step: create new storage
- self.lu.LogStep(3, steps_total, "Allocate new storage")
- disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
- excl_stor = _IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
- for idx, dev in enumerate(disks):
- self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
- (self.new_node, idx))
- # we pass force_create=True to force LVM creation
- for new_lv in dev.children:
- _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
- True, _GetInstanceInfoText(self.instance), False,
- excl_stor)
-
- # Step 4: dbrd minors and drbd setups changes
- # after this, we must manually remove the drbd minors on both the
- # error and the success paths
- self.lu.LogStep(4, steps_total, "Changing drbd configuration")
- minors = self.cfg.AllocateDRBDMinor([self.new_node
- for dev in self.instance.disks],
- self.instance.name)
- logging.debug("Allocated minors %r", minors)
-
- iv_names = {}
- for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
- self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
- (self.new_node, idx))
- # create new devices on new_node; note that we create two IDs:
- # one without port, so the drbd will be activated without
- # networking information on the new node at this stage, and one
- # with network, for the latter activation in step 4
- (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
- if self.instance.primary_node == o_node1:
- p_minor = o_minor1
- else:
- assert self.instance.primary_node == o_node2, "Three-node instance?"
- p_minor = o_minor2
-
- new_alone_id = (self.instance.primary_node, self.new_node, None,
- p_minor, new_minor, o_secret)
- new_net_id = (self.instance.primary_node, self.new_node, o_port,
- p_minor, new_minor, o_secret)
-
- iv_names[idx] = (dev, dev.children, new_net_id)
- logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
- new_net_id)
- new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
- logical_id=new_alone_id,
- children=dev.children,
- size=dev.size,
- params={})
- (anno_new_drbd,) = _AnnotateDiskParams(self.instance, [new_drbd],
- self.cfg)
- try:
- _CreateSingleBlockDev(self.lu, self.new_node, self.instance,
- anno_new_drbd,
- _GetInstanceInfoText(self.instance), False,
- excl_stor)
- except errors.GenericError:
- self.cfg.ReleaseDRBDMinors(self.instance.name)
- raise
-
- # We have new devices, shutdown the drbd on the old secondary
- for idx, dev in enumerate(self.instance.disks):
- self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
- self.cfg.SetDiskID(dev, self.target_node)
- msg = self.rpc.call_blockdev_shutdown(self.target_node,
- (dev, self.instance)).fail_msg
- if msg:
- self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
- "node: %s" % (idx, msg),
- hint=("Please cleanup this device manually as"
- " soon as possible"))
-
- self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
- result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
- self.instance.disks)[pnode]
-
- msg = result.fail_msg
- if msg:
- # detaches didn't succeed (unlikely)
- self.cfg.ReleaseDRBDMinors(self.instance.name)
- raise errors.OpExecError("Can't detach the disks from the network on"
- " old node: %s" % (msg,))
-
- # if we managed to detach at least one, we update all the disks of
- # the instance to point to the new secondary
- self.lu.LogInfo("Updating instance configuration")
- for dev, _, new_logical_id in iv_names.itervalues():
- dev.logical_id = new_logical_id
- self.cfg.SetDiskID(dev, self.instance.primary_node)
-
- self.cfg.Update(self.instance, feedback_fn)
-
- # Release all node locks (the configuration has been updated)
- _ReleaseLocks(self.lu, locking.LEVEL_NODE)
-
- # and now perform the drbd attach
- self.lu.LogInfo("Attaching primary drbds to new secondary"
- " (standalone => connected)")
- result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
- self.new_node],
- self.node_secondary_ip,
- (self.instance.disks, self.instance),
- self.instance.name,
- False)
- for to_node, to_result in result.items():
- msg = to_result.fail_msg
- if msg:
- self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
- to_node, msg,
- hint=("please do a gnt-instance info to see the"
- " status of disks"))
-
- cstep = itertools.count(5)
-
- if self.early_release:
- self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
- self._RemoveOldStorage(self.target_node, iv_names)
- # TODO: Check if releasing locks early still makes sense
- _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
- else:
- # Release all resource locks except those used by the instance
- _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
- keep=self.node_secondary_ip.keys())
-
- # TODO: Can the instance lock be downgraded here? Take the optional disk
- # shutdown in the caller into consideration.
-
- # Wait for sync
- # This can fail as the old devices are degraded and _WaitForSync
- # does a combined result over all disks, so we don't check its return value
- self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
- _WaitForSync(self.lu, self.instance)
-
- # Check all devices manually
- self._CheckDevices(self.instance.primary_node, iv_names)
-
- # Step: remove old storage
- if not self.early_release:
- self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
- self._RemoveOldStorage(self.target_node, iv_names)
--- /dev/null
+#
+#
+
+# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+
+
+"""Logical units dealing with storage of instances."""
+
+import itertools
+import logging
+import os
+import time
+
+from ganeti import compat
+from ganeti import constants
+from ganeti import errors
+from ganeti import ht
+from ganeti import locking
+from ganeti.masterd import iallocator
+from ganeti import objects
+from ganeti import utils
+from ganeti import opcodes
+from ganeti import rpc
+from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
+from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
+ _AnnotateDiskParams, _CheckIAllocatorOrNode, _ExpandNodeName, \
+ _CheckNodeOnline, _CheckInstanceNodeGroups, _CheckInstanceState, \
+ _IsExclusiveStorageEnabledNode, _FindFaultyInstanceDisks
+from ganeti.cmdlib.instance_utils import _GetInstanceInfoText, \
+ _CopyLockList, _ReleaseLocks, _CheckNodeVmCapable, \
+ _BuildInstanceHookEnvByObject, _CheckNodeNotDrained, _CheckTargetNodeIPolicy
+
+import ganeti.masterd.instance
+
+
+_DISK_TEMPLATE_NAME_PREFIX = {
+ constants.DT_PLAIN: "",
+ constants.DT_RBD: ".rbd",
+ constants.DT_EXT: ".ext",
+ }
+
+
+_DISK_TEMPLATE_DEVICE_TYPE = {
+ constants.DT_PLAIN: constants.LD_LV,
+ constants.DT_FILE: constants.LD_FILE,
+ constants.DT_SHARED_FILE: constants.LD_FILE,
+ constants.DT_BLOCK: constants.LD_BLOCKDEV,
+ constants.DT_RBD: constants.LD_RBD,
+ constants.DT_EXT: constants.LD_EXT,
+ }
+
+
+def _CreateSingleBlockDev(lu, node, instance, device, info, force_open,
+ excl_stor):
+ """Create a single block device on a given node.
+
+ This will not recurse over children of the device, so they must be
+ created in advance.
+
+ @param lu: the lu on whose behalf we execute
+ @param node: the node on which to create the device
+ @type instance: L{objects.Instance}
+ @param instance: the instance which owns the device
+ @type device: L{objects.Disk}
+ @param device: the device to create
+ @param info: the extra 'metadata' we should attach to the device
+ (this will be represented as a LVM tag)
+ @type force_open: boolean
+ @param force_open: this parameter will be passes to the
+ L{backend.BlockdevCreate} function where it specifies
+ whether we run on primary or not, and it affects both
+ the child assembly and the device own Open() execution
+ @type excl_stor: boolean
+ @param excl_stor: Whether exclusive_storage is active for the node
+
+ """
+ lu.cfg.SetDiskID(device, node)
+ result = lu.rpc.call_blockdev_create(node, device, device.size,
+ instance.name, force_open, info,
+ excl_stor)
+ result.Raise("Can't create block device %s on"
+ " node %s for instance %s" % (device, node, instance.name))
+ if device.physical_id is None:
+ device.physical_id = result.payload
+
+
+def _CreateBlockDevInner(lu, node, instance, device, force_create,
+ info, force_open, excl_stor):
+ """Create a tree of block devices on a given node.
+
+ If this device type has to be created on secondaries, create it and
+ all its children.
+
+ If not, just recurse to children keeping the same 'force' value.
+
+ @attention: The device has to be annotated already.
+
+ @param lu: the lu on whose behalf we execute
+ @param node: the node on which to create the device
+ @type instance: L{objects.Instance}
+ @param instance: the instance which owns the device
+ @type device: L{objects.Disk}
+ @param device: the device to create
+ @type force_create: boolean
+ @param force_create: whether to force creation of this device; this
+ will be change to True whenever we find a device which has
+ CreateOnSecondary() attribute
+ @param info: the extra 'metadata' we should attach to the device
+ (this will be represented as a LVM tag)
+ @type force_open: boolean
+ @param force_open: this parameter will be passes to the
+ L{backend.BlockdevCreate} function where it specifies
+ whether we run on primary or not, and it affects both
+ the child assembly and the device own Open() execution
+ @type excl_stor: boolean
+ @param excl_stor: Whether exclusive_storage is active for the node
+
+ @return: list of created devices
+ """
+ created_devices = []
+ try:
+ if device.CreateOnSecondary():
+ force_create = True
+
+ if device.children:
+ for child in device.children:
+ devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
+ info, force_open, excl_stor)
+ created_devices.extend(devs)
+
+ if not force_create:
+ return created_devices
+
+ _CreateSingleBlockDev(lu, node, instance, device, info, force_open,
+ excl_stor)
+ # The device has been completely created, so there is no point in keeping
+ # its subdevices in the list. We just add the device itself instead.
+ created_devices = [(node, device)]
+ return created_devices
+
+ except errors.DeviceCreationError, e:
+ e.created_devices.extend(created_devices)
+ raise e
+ except errors.OpExecError, e:
+ raise errors.DeviceCreationError(str(e), created_devices)
+
+
+def _IsExclusiveStorageEnabledNodeName(cfg, nodename):
+ """Whether exclusive_storage is in effect for the given node.
+
+ @type cfg: L{config.ConfigWriter}
+ @param cfg: The cluster configuration
+ @type nodename: string
+ @param nodename: The node
+ @rtype: bool
+ @return: The effective value of exclusive_storage
+ @raise errors.OpPrereqError: if no node exists with the given name
+
+ """
+ ni = cfg.GetNodeInfo(nodename)
+ if ni is None:
+ raise errors.OpPrereqError("Invalid node name %s" % nodename,
+ errors.ECODE_NOENT)
+ return _IsExclusiveStorageEnabledNode(cfg, ni)
+
+
+def _CreateBlockDev(lu, node, instance, device, force_create, info,
+ force_open):
+ """Wrapper around L{_CreateBlockDevInner}.
+
+ This method annotates the root device first.
+
+ """
+ (disk,) = _AnnotateDiskParams(instance, [device], lu.cfg)
+ excl_stor = _IsExclusiveStorageEnabledNodeName(lu.cfg, node)
+ return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
+ force_open, excl_stor)
+
+
+def _CreateDisks(lu, instance, to_skip=None, target_node=None):
+ """Create all disks for an instance.
+
+ This abstracts away some work from AddInstance.
+
+ @type lu: L{LogicalUnit}
+ @param lu: the logical unit on whose behalf we execute
+ @type instance: L{objects.Instance}
+ @param instance: the instance whose disks we should create
+ @type to_skip: list
+ @param to_skip: list of indices to skip
+ @type target_node: string
+ @param target_node: if passed, overrides the target node for creation
+ @rtype: boolean
+ @return: the success of the creation
+
+ """
+ info = _GetInstanceInfoText(instance)
+ if target_node is None:
+ pnode = instance.primary_node
+ all_nodes = instance.all_nodes
+ else:
+ pnode = target_node
+ all_nodes = [pnode]
+
+ if instance.disk_template in constants.DTS_FILEBASED:
+ file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
+ result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
+
+ result.Raise("Failed to create directory '%s' on"
+ " node %s" % (file_storage_dir, pnode))
+
+ disks_created = []
+ # Note: this needs to be kept in sync with adding of disks in
+ # LUInstanceSetParams
+ for idx, device in enumerate(instance.disks):
+ if to_skip and idx in to_skip:
+ continue
+ logging.info("Creating disk %s for instance '%s'", idx, instance.name)
+ #HARDCODE
+ for node in all_nodes:
+ f_create = node == pnode
+ try:
+ _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
+ disks_created.append((node, device))
+ except errors.OpExecError:
+ logging.warning("Creating disk %s for instance '%s' failed",
+ idx, instance.name)
+ except errors.DeviceCreationError, e:
+ logging.warning("Creating disk %s for instance '%s' failed",
+ idx, instance.name)
+ disks_created.extend(e.created_devices)
+ for (node, disk) in disks_created:
+ lu.cfg.SetDiskID(disk, node)
+ result = lu.rpc.call_blockdev_remove(node, disk)
+ if result.fail_msg:
+ logging.warning("Failed to remove newly-created disk %s on node %s:"
+ " %s", device, node, result.fail_msg)
+ raise errors.OpExecError(e.message)
+
+
+def _ComputeDiskSizePerVG(disk_template, disks):
+ """Compute disk size requirements in the volume group
+
+ """
+ def _compute(disks, payload):
+ """Universal algorithm.
+
+ """
+ vgs = {}
+ for disk in disks:
+ vgs[disk[constants.IDISK_VG]] = \
+ vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
+
+ return vgs
+
+ # Required free disk space as a function of disk and swap space
+ req_size_dict = {
+ constants.DT_DISKLESS: {},
+ constants.DT_PLAIN: _compute(disks, 0),
+ # 128 MB are added for drbd metadata for each disk
+ constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
+ constants.DT_FILE: {},
+ constants.DT_SHARED_FILE: {},
+ }
+
+ if disk_template not in req_size_dict:
+ raise errors.ProgrammerError("Disk template '%s' size requirement"
+ " is unknown" % disk_template)
+
+ return req_size_dict[disk_template]
+
+
+def _ComputeDisks(op, default_vg):
+ """Computes the instance disks.
+
+ @param op: The instance opcode
+ @param default_vg: The default_vg to assume
+
+ @return: The computed disks
+
+ """
+ disks = []
+ for disk in op.disks:
+ mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
+ if mode not in constants.DISK_ACCESS_SET:
+ raise errors.OpPrereqError("Invalid disk access mode '%s'" %
+ mode, errors.ECODE_INVAL)
+ size = disk.get(constants.IDISK_SIZE, None)
+ if size is None:
+ raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
+ try:
+ size = int(size)
+ except (TypeError, ValueError):
+ raise errors.OpPrereqError("Invalid disk size '%s'" % size,
+ errors.ECODE_INVAL)
+
+ ext_provider = disk.get(constants.IDISK_PROVIDER, None)
+ if ext_provider and op.disk_template != constants.DT_EXT:
+ raise errors.OpPrereqError("The '%s' option is only valid for the %s"
+ " disk template, not %s" %
+ (constants.IDISK_PROVIDER, constants.DT_EXT,
+ op.disk_template), errors.ECODE_INVAL)
+
+ data_vg = disk.get(constants.IDISK_VG, default_vg)
+ name = disk.get(constants.IDISK_NAME, None)
+ if name is not None and name.lower() == constants.VALUE_NONE:
+ name = None
+ new_disk = {
+ constants.IDISK_SIZE: size,
+ constants.IDISK_MODE: mode,
+ constants.IDISK_VG: data_vg,
+ constants.IDISK_NAME: name,
+ }
+
+ if constants.IDISK_METAVG in disk:
+ new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG]
+ if constants.IDISK_ADOPT in disk:
+ new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
+
+ # For extstorage, demand the `provider' option and add any
+ # additional parameters (ext-params) to the dict
+ if op.disk_template == constants.DT_EXT:
+ if ext_provider:
+ new_disk[constants.IDISK_PROVIDER] = ext_provider
+ for key in disk:
+ if key not in constants.IDISK_PARAMS:
+ new_disk[key] = disk[key]
+ else:
+ raise errors.OpPrereqError("Missing provider for template '%s'" %
+ constants.DT_EXT, errors.ECODE_INVAL)
+
+ disks.append(new_disk)
+
+ return disks
+
+
+def _CheckRADOSFreeSpace():
+ """Compute disk size requirements inside the RADOS cluster.
+
+ """
+ # For the RADOS cluster we assume there is always enough space.
+ pass
+
+
+def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
+ iv_name, p_minor, s_minor):
+ """Generate a drbd8 device complete with its children.
+
+ """
+ assert len(vgnames) == len(names) == 2
+ port = lu.cfg.AllocatePort()
+ shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
+
+ dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
+ logical_id=(vgnames[0], names[0]),
+ params={})
+ dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
+ dev_meta = objects.Disk(dev_type=constants.LD_LV,
+ size=constants.DRBD_META_SIZE,
+ logical_id=(vgnames[1], names[1]),
+ params={})
+ dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
+ drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
+ logical_id=(primary, secondary, port,
+ p_minor, s_minor,
+ shared_secret),
+ children=[dev_data, dev_meta],
+ iv_name=iv_name, params={})
+ drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
+ return drbd_dev
+
+
+def _GenerateDiskTemplate(
+ lu, template_name, instance_name, primary_node, secondary_nodes,
+ disk_info, file_storage_dir, file_driver, base_index,
+ feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
+ _req_shr_file_storage=opcodes.RequireSharedFileStorage):
+ """Generate the entire disk layout for a given template type.
+
+ """
+ vgname = lu.cfg.GetVGName()
+ disk_count = len(disk_info)
+ disks = []
+
+ if template_name == constants.DT_DISKLESS:
+ pass
+ elif template_name == constants.DT_DRBD8:
+ if len(secondary_nodes) != 1:
+ raise errors.ProgrammerError("Wrong template configuration")
+ remote_node = secondary_nodes[0]
+ minors = lu.cfg.AllocateDRBDMinor(
+ [primary_node, remote_node] * len(disk_info), instance_name)
+
+ (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
+ full_disk_params)
+ drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
+
+ names = []
+ for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
+ for i in range(disk_count)]):
+ names.append(lv_prefix + "_data")
+ names.append(lv_prefix + "_meta")
+ for idx, disk in enumerate(disk_info):
+ disk_index = idx + base_index
+ data_vg = disk.get(constants.IDISK_VG, vgname)
+ meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
+ disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
+ disk[constants.IDISK_SIZE],
+ [data_vg, meta_vg],
+ names[idx * 2:idx * 2 + 2],
+ "disk/%d" % disk_index,
+ minors[idx * 2], minors[idx * 2 + 1])
+ disk_dev.mode = disk[constants.IDISK_MODE]
+ disk_dev.name = disk.get(constants.IDISK_NAME, None)
+ disks.append(disk_dev)
+ else:
+ if secondary_nodes:
+ raise errors.ProgrammerError("Wrong template configuration")
+
+ if template_name == constants.DT_FILE:
+ _req_file_storage()
+ elif template_name == constants.DT_SHARED_FILE:
+ _req_shr_file_storage()
+
+ name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
+ if name_prefix is None:
+ names = None
+ else:
+ names = _GenerateUniqueNames(lu, ["%s.disk%s" %
+ (name_prefix, base_index + i)
+ for i in range(disk_count)])
+
+ if template_name == constants.DT_PLAIN:
+
+ def logical_id_fn(idx, _, disk):
+ vg = disk.get(constants.IDISK_VG, vgname)
+ return (vg, names[idx])
+
+ elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
+ logical_id_fn = \
+ lambda _, disk_index, disk: (file_driver,
+ "%s/disk%d" % (file_storage_dir,
+ disk_index))
+ elif template_name == constants.DT_BLOCK:
+ logical_id_fn = \
+ lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
+ disk[constants.IDISK_ADOPT])
+ elif template_name == constants.DT_RBD:
+ logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
+ elif template_name == constants.DT_EXT:
+ def logical_id_fn(idx, _, disk):
+ provider = disk.get(constants.IDISK_PROVIDER, None)
+ if provider is None:
+ raise errors.ProgrammerError("Disk template is %s, but '%s' is"
+ " not found", constants.DT_EXT,
+ constants.IDISK_PROVIDER)
+ return (provider, names[idx])
+ else:
+ raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
+
+ dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
+
+ for idx, disk in enumerate(disk_info):
+ params = {}
+ # Only for the Ext template add disk_info to params
+ if template_name == constants.DT_EXT:
+ params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
+ for key in disk:
+ if key not in constants.IDISK_PARAMS:
+ params[key] = disk[key]
+ disk_index = idx + base_index
+ size = disk[constants.IDISK_SIZE]
+ feedback_fn("* disk %s, size %s" %
+ (disk_index, utils.FormatUnit(size, "h")))
+ disk_dev = objects.Disk(dev_type=dev_type, size=size,
+ logical_id=logical_id_fn(idx, disk_index, disk),
+ iv_name="disk/%d" % disk_index,
+ mode=disk[constants.IDISK_MODE],
+ params=params)
+ disk_dev.name = disk.get(constants.IDISK_NAME, None)
+ disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
+ disks.append(disk_dev)
+
+ return disks
+
+
+class LUInstanceRecreateDisks(LogicalUnit):
+ """Recreate an instance's missing disks.
+
+ """
+ HPATH = "instance-recreate-disks"
+ HTYPE = constants.HTYPE_INSTANCE
+ REQ_BGL = False
+
+ _MODIFYABLE = compat.UniqueFrozenset([
+ constants.IDISK_SIZE,
+ constants.IDISK_MODE,
+ ])
+
+ # New or changed disk parameters may have different semantics
+ assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
+ constants.IDISK_ADOPT,
+
+ # TODO: Implement support changing VG while recreating
+ constants.IDISK_VG,
+ constants.IDISK_METAVG,
+ constants.IDISK_PROVIDER,
+ constants.IDISK_NAME,
+ ]))
+
+ def _RunAllocator(self):
+ """Run the allocator based on input opcode.
+
+ """
+ be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
+
+ # FIXME
+ # The allocator should actually run in "relocate" mode, but current
+ # allocators don't support relocating all the nodes of an instance at
+ # the same time. As a workaround we use "allocate" mode, but this is
+ # suboptimal for two reasons:
+ # - The instance name passed to the allocator is present in the list of
+ # existing instances, so there could be a conflict within the
+ # internal structures of the allocator. This doesn't happen with the
+ # current allocators, but it's a liability.
+ # - The allocator counts the resources used by the instance twice: once
+ # because the instance exists already, and once because it tries to
+ # allocate a new instance.
+ # The allocator could choose some of the nodes on which the instance is
+ # running, but that's not a problem. If the instance nodes are broken,
+ # they should be already be marked as drained or offline, and hence
+ # skipped by the allocator. If instance disks have been lost for other
+ # reasons, then recreating the disks on the same nodes should be fine.
+ disk_template = self.instance.disk_template
+ spindle_use = be_full[constants.BE_SPINDLE_USE]
+ req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
+ disk_template=disk_template,
+ tags=list(self.instance.GetTags()),
+ os=self.instance.os,
+ nics=[{}],
+ vcpus=be_full[constants.BE_VCPUS],
+ memory=be_full[constants.BE_MAXMEM],
+ spindle_use=spindle_use,
+ disks=[{constants.IDISK_SIZE: d.size,
+ constants.IDISK_MODE: d.mode}
+ for d in self.instance.disks],
+ hypervisor=self.instance.hypervisor,
+ node_whitelist=None)
+ ial = iallocator.IAllocator(self.cfg, self.rpc, req)
+
+ ial.Run(self.op.iallocator)
+
+ assert req.RequiredNodes() == len(self.instance.all_nodes)
+
+ if not ial.success:
+ raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
+ " %s" % (self.op.iallocator, ial.info),
+ errors.ECODE_NORES)
+
+ self.op.nodes = ial.result
+ self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
+ self.op.instance_name, self.op.iallocator,
+ utils.CommaJoin(ial.result))
+
+ def CheckArguments(self):
+ if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
+ # Normalize and convert deprecated list of disk indices
+ self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
+
+ duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
+ if duplicates:
+ raise errors.OpPrereqError("Some disks have been specified more than"
+ " once: %s" % utils.CommaJoin(duplicates),
+ errors.ECODE_INVAL)
+
+ # We don't want _CheckIAllocatorOrNode selecting the default iallocator
+ # when neither iallocator nor nodes are specified
+ if self.op.iallocator or self.op.nodes:
+ _CheckIAllocatorOrNode(self, "iallocator", "nodes")
+
+ for (idx, params) in self.op.disks:
+ utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
+ unsupported = frozenset(params.keys()) - self._MODIFYABLE
+ if unsupported:
+ raise errors.OpPrereqError("Parameters for disk %s try to change"
+ " unmodifyable parameter(s): %s" %
+ (idx, utils.CommaJoin(unsupported)),
+ errors.ECODE_INVAL)
+
+ def ExpandNames(self):
+ self._ExpandAndLockInstance()
+ self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+
+ if self.op.nodes:
+ self.op.nodes = [_ExpandNodeName(self.cfg, n) for n in self.op.nodes]
+ self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
+ else:
+ self.needed_locks[locking.LEVEL_NODE] = []
+ if self.op.iallocator:
+ # iallocator will select a new node in the same group
+ self.needed_locks[locking.LEVEL_NODEGROUP] = []
+ self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
+
+ self.needed_locks[locking.LEVEL_NODE_RES] = []
+
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_NODEGROUP:
+ assert self.op.iallocator is not None
+ assert not self.op.nodes
+ assert not self.needed_locks[locking.LEVEL_NODEGROUP]
+ self.share_locks[locking.LEVEL_NODEGROUP] = 1
+ # Lock the primary group used by the instance optimistically; this
+ # requires going via the node before it's locked, requiring
+ # verification later on
+ self.needed_locks[locking.LEVEL_NODEGROUP] = \
+ self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
+
+ elif level == locking.LEVEL_NODE:
+ # If an allocator is used, then we lock all the nodes in the current
+ # instance group, as we don't know yet which ones will be selected;
+ # if we replace the nodes without using an allocator, locks are
+ # already declared in ExpandNames; otherwise, we need to lock all the
+ # instance nodes for disk re-creation
+ if self.op.iallocator:
+ assert not self.op.nodes
+ assert not self.needed_locks[locking.LEVEL_NODE]
+ assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
+
+ # Lock member nodes of the group of the primary node
+ for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
+ self.needed_locks[locking.LEVEL_NODE].extend(
+ self.cfg.GetNodeGroup(group_uuid).members)
+
+ assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
+ elif not self.op.nodes:
+ self._LockInstancesNodes(primary_only=False)
+ elif level == locking.LEVEL_NODE_RES:
+ # Copy node locks
+ self.needed_locks[locking.LEVEL_NODE_RES] = \
+ _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
+
+ def BuildHooksEnv(self):
+ """Build hooks env.
+
+ This runs on master, primary and secondary nodes of the instance.
+
+ """
+ return _BuildInstanceHookEnvByObject(self, self.instance)
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
+ nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
+ return (nl, nl)
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ This checks that the instance is in the cluster and is not running.
+
+ """
+ instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+ assert instance is not None, \
+ "Cannot retrieve locked instance %s" % self.op.instance_name
+ if self.op.nodes:
+ if len(self.op.nodes) != len(instance.all_nodes):
+ raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
+ " %d replacement nodes were specified" %
+ (instance.name, len(instance.all_nodes),
+ len(self.op.nodes)),
+ errors.ECODE_INVAL)
+ assert instance.disk_template != constants.DT_DRBD8 or \
+ len(self.op.nodes) == 2
+ assert instance.disk_template != constants.DT_PLAIN or \
+ len(self.op.nodes) == 1
+ primary_node = self.op.nodes[0]
+ else:
+ primary_node = instance.primary_node
+ if not self.op.iallocator:
+ _CheckNodeOnline(self, primary_node)
+
+ if instance.disk_template == constants.DT_DISKLESS:
+ raise errors.OpPrereqError("Instance '%s' has no disks" %
+ self.op.instance_name, errors.ECODE_INVAL)
+
+ # Verify if node group locks are still correct
+ owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
+ if owned_groups:
+ # Node group locks are acquired only for the primary node (and only
+ # when the allocator is used)
+ _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
+ primary_only=True)
+
+ # if we replace nodes *and* the old primary is offline, we don't
+ # check the instance state
+ old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
+ if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
+ _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
+ msg="cannot recreate disks")
+
+ if self.op.disks:
+ self.disks = dict(self.op.disks)
+ else:
+ self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
+
+ maxidx = max(self.disks.keys())
+ if maxidx >= len(instance.disks):
+ raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
+ errors.ECODE_INVAL)
+
+ if ((self.op.nodes or self.op.iallocator) and
+ sorted(self.disks.keys()) != range(len(instance.disks))):
+ raise errors.OpPrereqError("Can't recreate disks partially and"
+ " change the nodes at the same time",
+ errors.ECODE_INVAL)
+
+ self.instance = instance
+
+ if self.op.iallocator:
+ self._RunAllocator()
+ # Release unneeded node and node resource locks
+ _ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
+ _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
+ _ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
+
+ assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
+
+ def Exec(self, feedback_fn):
+ """Recreate the disks.
+
+ """
+ instance = self.instance
+
+ assert (self.owned_locks(locking.LEVEL_NODE) ==
+ self.owned_locks(locking.LEVEL_NODE_RES))
+
+ to_skip = []
+ mods = [] # keeps track of needed changes
+
+ for idx, disk in enumerate(instance.disks):
+ try:
+ changes = self.disks[idx]
+ except KeyError:
+ # Disk should not be recreated
+ to_skip.append(idx)
+ continue
+
+ # update secondaries for disks, if needed
+ if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
+ # need to update the nodes and minors
+ assert len(self.op.nodes) == 2
+ assert len(disk.logical_id) == 6 # otherwise disk internals
+ # have changed
+ (_, _, old_port, _, _, old_secret) = disk.logical_id
+ new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
+ new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
+ new_minors[0], new_minors[1], old_secret)
+ assert len(disk.logical_id) == len(new_id)
+ else:
+ new_id = None
+
+ mods.append((idx, new_id, changes))
+
+ # now that we have passed all asserts above, we can apply the mods
+ # in a single run (to avoid partial changes)
+ for idx, new_id, changes in mods:
+ disk = instance.disks[idx]
+ if new_id is not None:
+ assert disk.dev_type == constants.LD_DRBD8
+ disk.logical_id = new_id
+ if changes:
+ disk.Update(size=changes.get(constants.IDISK_SIZE, None),
+ mode=changes.get(constants.IDISK_MODE, None))
+
+ # change primary node, if needed
+ if self.op.nodes:
+ instance.primary_node = self.op.nodes[0]
+ self.LogWarning("Changing the instance's nodes, you will have to"
+ " remove any disks left on the older nodes manually")
+
+ if self.op.nodes:
+ self.cfg.Update(instance, feedback_fn)
+
+ # All touched nodes must be locked
+ mylocks = self.owned_locks(locking.LEVEL_NODE)
+ assert mylocks.issuperset(frozenset(instance.all_nodes))
+ _CreateDisks(self, instance, to_skip=to_skip)
+
+
+def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
+ """Checks if nodes have enough free disk space in the specified VG.
+
+ This function checks if all given nodes have the needed amount of
+ free disk. In case any node has less disk or we cannot get the
+ information from the node, this function raises an OpPrereqError
+ exception.
+
+ @type lu: C{LogicalUnit}
+ @param lu: a logical unit from which we get configuration data
+ @type nodenames: C{list}
+ @param nodenames: the list of node names to check
+ @type vg: C{str}
+ @param vg: the volume group to check
+ @type requested: C{int}
+ @param requested: the amount of disk in MiB to check for
+ @raise errors.OpPrereqError: if the node doesn't have enough disk,
+ or we cannot check the node
+
+ """
+ es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
+ nodeinfo = lu.rpc.call_node_info(nodenames, [vg], None, es_flags)
+ for node in nodenames:
+ info = nodeinfo[node]
+ info.Raise("Cannot get current information from node %s" % node,
+ prereq=True, ecode=errors.ECODE_ENVIRON)
+ (_, (vg_info, ), _) = info.payload
+ vg_free = vg_info.get("vg_free", None)
+ if not isinstance(vg_free, int):
+ raise errors.OpPrereqError("Can't compute free disk space on node"
+ " %s for vg %s, result was '%s'" %
+ (node, vg, vg_free), errors.ECODE_ENVIRON)
+ if requested > vg_free:
+ raise errors.OpPrereqError("Not enough disk space on target node %s"
+ " vg %s: required %d MiB, available %d MiB" %
+ (node, vg, requested, vg_free),
+ errors.ECODE_NORES)
+
+
+def _CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
+ """Checks if nodes have enough free disk space in all the VGs.
+
+ This function checks if all given nodes have the needed amount of
+ free disk. In case any node has less disk or we cannot get the
+ information from the node, this function raises an OpPrereqError
+ exception.
+
+ @type lu: C{LogicalUnit}
+ @param lu: a logical unit from which we get configuration data
+ @type nodenames: C{list}
+ @param nodenames: the list of node names to check
+ @type req_sizes: C{dict}
+ @param req_sizes: the hash of vg and corresponding amount of disk in
+ MiB to check for
+ @raise errors.OpPrereqError: if the node doesn't have enough disk,
+ or we cannot check the node
+
+ """
+ for vg, req_size in req_sizes.items():
+ _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
+
+
+def _DiskSizeInBytesToMebibytes(lu, size):
+ """Converts a disk size in bytes to mebibytes.
+
+ Warns and rounds up if the size isn't an even multiple of 1 MiB.
+
+ """
+ (mib, remainder) = divmod(size, 1024 * 1024)
+
+ if remainder != 0:
+ lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
+ " to not overwrite existing data (%s bytes will not be"
+ " wiped)", (1024 * 1024) - remainder)
+ mib += 1
+
+ return mib
+
+
+def _CalcEta(time_taken, written, total_size):
+ """Calculates the ETA based on size written and total size.
+
+ @param time_taken: The time taken so far
+ @param written: amount written so far
+ @param total_size: The total size of data to be written
+ @return: The remaining time in seconds
+
+ """
+ avg_time = time_taken / float(written)
+ return (total_size - written) * avg_time
+
+
+def _WipeDisks(lu, instance, disks=None):
+ """Wipes instance disks.
+
+ @type lu: L{LogicalUnit}
+ @param lu: the logical unit on whose behalf we execute
+ @type instance: L{objects.Instance}
+ @param instance: the instance whose disks we should create
+ @type disks: None or list of tuple of (number, L{objects.Disk}, number)
+ @param disks: Disk details; tuple contains disk index, disk object and the
+ start offset
+
+ """
+ node = instance.primary_node
+
+ if disks is None:
+ disks = [(idx, disk, 0)
+ for (idx, disk) in enumerate(instance.disks)]
+
+ for (_, device, _) in disks:
+ lu.cfg.SetDiskID(device, node)
+
+ logging.info("Pausing synchronization of disks of instance '%s'",
+ instance.name)
+ result = lu.rpc.call_blockdev_pause_resume_sync(node,
+ (map(compat.snd, disks),
+ instance),
+ True)
+ result.Raise("Failed to pause disk synchronization on node '%s'" % node)
+
+ for idx, success in enumerate(result.payload):
+ if not success:
+ logging.warn("Pausing synchronization of disk %s of instance '%s'"
+ " failed", idx, instance.name)
+
+ try:
+ for (idx, device, offset) in disks:
+ # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
+ # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
+ wipe_chunk_size = \
+ int(min(constants.MAX_WIPE_CHUNK,
+ device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
+
+ size = device.size
+ last_output = 0
+ start_time = time.time()
+
+ if offset == 0:
+ info_text = ""
+ else:
+ info_text = (" (from %s to %s)" %
+ (utils.FormatUnit(offset, "h"),
+ utils.FormatUnit(size, "h")))
+
+ lu.LogInfo("* Wiping disk %s%s", idx, info_text)
+
+ logging.info("Wiping disk %d for instance %s on node %s using"
+ " chunk size %s", idx, instance.name, node, wipe_chunk_size)
+
+ while offset < size:
+ wipe_size = min(wipe_chunk_size, size - offset)
+
+ logging.debug("Wiping disk %d, offset %s, chunk %s",
+ idx, offset, wipe_size)
+
+ result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
+ wipe_size)
+ result.Raise("Could not wipe disk %d at offset %d for size %d" %
+ (idx, offset, wipe_size))
+
+ now = time.time()
+ offset += wipe_size
+ if now - last_output >= 60:
+ eta = _CalcEta(now - start_time, offset, size)
+ lu.LogInfo(" - done: %.1f%% ETA: %s",
+ offset / float(size) * 100, utils.FormatSeconds(eta))
+ last_output = now
+ finally:
+ logging.info("Resuming synchronization of disks for instance '%s'",
+ instance.name)
+
+ result = lu.rpc.call_blockdev_pause_resume_sync(node,
+ (map(compat.snd, disks),
+ instance),
+ False)
+
+ if result.fail_msg:
+ lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
+ node, result.fail_msg)
+ else:
+ for idx, success in enumerate(result.payload):
+ if not success:
+ lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
+ " failed", idx, instance.name)
+
+
+def _ExpandCheckDisks(instance, disks):
+ """Return the instance disks selected by the disks list
+
+ @type disks: list of L{objects.Disk} or None
+ @param disks: selected disks
+ @rtype: list of L{objects.Disk}
+ @return: selected instance disks to act on
+
+ """
+ if disks is None:
+ return instance.disks
+ else:
+ if not set(disks).issubset(instance.disks):
+ raise errors.ProgrammerError("Can only act on disks belonging to the"
+ " target instance")
+ return disks
+
+
+def _WaitForSync(lu, instance, disks=None, oneshot=False):
+ """Sleep and poll for an instance's disk to sync.
+
+ """
+ if not instance.disks or disks is not None and not disks:
+ return True
+
+ disks = _ExpandCheckDisks(instance, disks)
+
+ if not oneshot:
+ lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
+
+ node = instance.primary_node
+
+ for dev in disks:
+ lu.cfg.SetDiskID(dev, node)
+
+ # TODO: Convert to utils.Retry
+
+ retries = 0
+ degr_retries = 10 # in seconds, as we sleep 1 second each time
+ while True:
+ max_time = 0
+ done = True
+ cumul_degraded = False
+ rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
+ msg = rstats.fail_msg
+ if msg:
+ lu.LogWarning("Can't get any data from node %s: %s", node, msg)
+ retries += 1
+ if retries >= 10:
+ raise errors.RemoteError("Can't contact node %s for mirror data,"
+ " aborting." % node)
+ time.sleep(6)
+ continue
+ rstats = rstats.payload
+ retries = 0
+ for i, mstat in enumerate(rstats):
+ if mstat is None:
+ lu.LogWarning("Can't compute data for node %s/%s",
+ node, disks[i].iv_name)
+ continue
+
+ cumul_degraded = (cumul_degraded or
+ (mstat.is_degraded and mstat.sync_percent is None))
+ if mstat.sync_percent is not None:
+ done = False
+ if mstat.estimated_time is not None:
+ rem_time = ("%s remaining (estimated)" %
+ utils.FormatSeconds(mstat.estimated_time))
+ max_time = mstat.estimated_time
+ else:
+ rem_time = "no time estimate"
+ lu.LogInfo("- device %s: %5.2f%% done, %s",
+ disks[i].iv_name, mstat.sync_percent, rem_time)
+
+ # if we're done but degraded, let's do a few small retries, to
+ # make sure we see a stable and not transient situation; therefore
+ # we force restart of the loop
+ if (done or oneshot) and cumul_degraded and degr_retries > 0:
+ logging.info("Degraded disks found, %d retries left", degr_retries)
+ degr_retries -= 1
+ time.sleep(1)
+ continue
+
+ if done or oneshot:
+ break
+
+ time.sleep(min(60, max_time))
+
+ if done:
+ lu.LogInfo("Instance %s's disks are in sync", instance.name)
+
+ return not cumul_degraded
+
+
+def _ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
+ """Shutdown block devices of an instance.
+
+ This does the shutdown on all nodes of the instance.
+
+ If the ignore_primary is false, errors on the primary node are
+ ignored.
+
+ """
+ all_result = True
+ disks = _ExpandCheckDisks(instance, disks)
+
+ for disk in disks:
+ for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
+ lu.cfg.SetDiskID(top_disk, node)
+ result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
+ msg = result.fail_msg
+ if msg:
+ lu.LogWarning("Could not shutdown block device %s on node %s: %s",
+ disk.iv_name, node, msg)
+ if ((node == instance.primary_node and not ignore_primary) or
+ (node != instance.primary_node and not result.offline)):
+ all_result = False
+ return all_result
+
+
+def _SafeShutdownInstanceDisks(lu, instance, disks=None):
+ """Shutdown block devices of an instance.
+
+ This function checks if an instance is running, before calling
+ _ShutdownInstanceDisks.
+
+ """
+ _CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
+ _ShutdownInstanceDisks(lu, instance, disks=disks)
+
+
+def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
+ ignore_size=False):
+ """Prepare the block devices for an instance.
+
+ This sets up the block devices on all nodes.
+
+ @type lu: L{LogicalUnit}
+ @param lu: the logical unit on whose behalf we execute
+ @type instance: L{objects.Instance}
+ @param instance: the instance for whose disks we assemble
+ @type disks: list of L{objects.Disk} or None
+ @param disks: which disks to assemble (or all, if None)
+ @type ignore_secondaries: boolean
+ @param ignore_secondaries: if true, errors on secondary nodes
+ won't result in an error return from the function
+ @type ignore_size: boolean
+ @param ignore_size: if true, the current known size of the disk
+ will not be used during the disk activation, useful for cases
+ when the size is wrong
+ @return: False if the operation failed, otherwise a list of
+ (host, instance_visible_name, node_visible_name)
+ with the mapping from node devices to instance devices
+
+ """
+ device_info = []
+ disks_ok = True
+ iname = instance.name
+ disks = _ExpandCheckDisks(instance, disks)
+
+ # With the two passes mechanism we try to reduce the window of
+ # opportunity for the race condition of switching DRBD to primary
+ # before handshaking occured, but we do not eliminate it
+
+ # The proper fix would be to wait (with some limits) until the
+ # connection has been made and drbd transitions from WFConnection
+ # into any other network-connected state (Connected, SyncTarget,
+ # SyncSource, etc.)
+
+ # 1st pass, assemble on all nodes in secondary mode
+ for idx, inst_disk in enumerate(disks):
+ for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
+ if ignore_size:
+ node_disk = node_disk.Copy()
+ node_disk.UnsetSize()
+ lu.cfg.SetDiskID(node_disk, node)
+ result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
+ False, idx)
+ msg = result.fail_msg
+ if msg:
+ is_offline_secondary = (node in instance.secondary_nodes and
+ result.offline)
+ lu.LogWarning("Could not prepare block device %s on node %s"
+ " (is_primary=False, pass=1): %s",
+ inst_disk.iv_name, node, msg)
+ if not (ignore_secondaries or is_offline_secondary):
+ disks_ok = False
+
+ # FIXME: race condition on drbd migration to primary
+
+ # 2nd pass, do only the primary node
+ for idx, inst_disk in enumerate(disks):
+ dev_path = None
+
+ for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
+ if node != instance.primary_node:
+ continue
+ if ignore_size:
+ node_disk = node_disk.Copy()
+ node_disk.UnsetSize()
+ lu.cfg.SetDiskID(node_disk, node)
+ result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
+ True, idx)
+ msg = result.fail_msg
+ if msg:
+ lu.LogWarning("Could not prepare block device %s on node %s"
+ " (is_primary=True, pass=2): %s",
+ inst_disk.iv_name, node, msg)
+ disks_ok = False
+ else:
+ dev_path = result.payload
+
+ device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
+
+ # leave the disks configured for the primary node
+ # this is a workaround that would be fixed better by
+ # improving the logical/physical id handling
+ for disk in disks:
+ lu.cfg.SetDiskID(disk, instance.primary_node)
+
+ return disks_ok, device_info
+
+
+def _StartInstanceDisks(lu, instance, force):
+ """Start the disks of an instance.
+
+ """
+ disks_ok, _ = _AssembleInstanceDisks(lu, instance,
+ ignore_secondaries=force)
+ if not disks_ok:
+ _ShutdownInstanceDisks(lu, instance)
+ if force is not None and not force:
+ lu.LogWarning("",
+ hint=("If the message above refers to a secondary node,"
+ " you can retry the operation using '--force'"))
+ raise errors.OpExecError("Disk consistency error")
+
+
+class LUInstanceGrowDisk(LogicalUnit):
+ """Grow a disk of an instance.
+
+ """
+ HPATH = "disk-grow"
+ HTYPE = constants.HTYPE_INSTANCE
+ REQ_BGL = False
+
+ def ExpandNames(self):
+ self._ExpandAndLockInstance()
+ self.needed_locks[locking.LEVEL_NODE] = []
+ self.needed_locks[locking.LEVEL_NODE_RES] = []
+ self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+ self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
+
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_NODE:
+ self._LockInstancesNodes()
+ elif level == locking.LEVEL_NODE_RES:
+ # Copy node locks
+ self.needed_locks[locking.LEVEL_NODE_RES] = \
+ _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
+
+ def BuildHooksEnv(self):
+ """Build hooks env.
+
+ This runs on the master, the primary and all the secondaries.
+
+ """
+ env = {
+ "DISK": self.op.disk,
+ "AMOUNT": self.op.amount,
+ "ABSOLUTE": self.op.absolute,
+ }
+ env.update(_BuildInstanceHookEnvByObject(self, self.instance))
+ return env
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
+ nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
+ return (nl, nl)
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ This checks that the instance is in the cluster.
+
+ """
+ instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+ assert instance is not None, \
+ "Cannot retrieve locked instance %s" % self.op.instance_name
+ nodenames = list(instance.all_nodes)
+ for node in nodenames:
+ _CheckNodeOnline(self, node)
+
+ self.instance = instance
+
+ if instance.disk_template not in constants.DTS_GROWABLE:
+ raise errors.OpPrereqError("Instance's disk layout does not support"
+ " growing", errors.ECODE_INVAL)
+
+ self.disk = instance.FindDisk(self.op.disk)
+
+ if self.op.absolute:
+ self.target = self.op.amount
+ self.delta = self.target - self.disk.size
+ if self.delta < 0:
+ raise errors.OpPrereqError("Requested size (%s) is smaller than "
+ "current disk size (%s)" %
+ (utils.FormatUnit(self.target, "h"),
+ utils.FormatUnit(self.disk.size, "h")),
+ errors.ECODE_STATE)
+ else:
+ self.delta = self.op.amount
+ self.target = self.disk.size + self.delta
+ if self.delta < 0:
+ raise errors.OpPrereqError("Requested increment (%s) is negative" %
+ utils.FormatUnit(self.delta, "h"),
+ errors.ECODE_INVAL)
+
+ self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
+
+ def _CheckDiskSpace(self, nodenames, req_vgspace):
+ template = self.instance.disk_template
+ if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
+ # TODO: check the free disk space for file, when that feature will be
+ # supported
+ nodes = map(self.cfg.GetNodeInfo, nodenames)
+ es_nodes = filter(lambda n: _IsExclusiveStorageEnabledNode(self.cfg, n),
+ nodes)
+ if es_nodes:
+ # With exclusive storage we need to something smarter than just looking
+ # at free space; for now, let's simply abort the operation.
+ raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
+ " is enabled", errors.ECODE_STATE)
+ _CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
+
+ def Exec(self, feedback_fn):
+ """Execute disk grow.
+
+ """
+ instance = self.instance
+ disk = self.disk
+
+ assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
+ assert (self.owned_locks(locking.LEVEL_NODE) ==
+ self.owned_locks(locking.LEVEL_NODE_RES))
+
+ wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
+
+ disks_ok, _ = _AssembleInstanceDisks(self, self.instance, disks=[disk])
+ if not disks_ok:
+ raise errors.OpExecError("Cannot activate block device to grow")
+
+ feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
+ (self.op.disk, instance.name,
+ utils.FormatUnit(self.delta, "h"),
+ utils.FormatUnit(self.target, "h")))
+
+ # First run all grow ops in dry-run mode
+ for node in instance.all_nodes:
+ self.cfg.SetDiskID(disk, node)
+ result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
+ True, True)
+ result.Raise("Dry-run grow request failed to node %s" % node)
+
+ if wipe_disks:
+ # Get disk size from primary node for wiping
+ result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk])
+ result.Raise("Failed to retrieve disk size from node '%s'" %
+ instance.primary_node)
+
+ (disk_size_in_bytes, ) = result.payload
+
+ if disk_size_in_bytes is None:
+ raise errors.OpExecError("Failed to retrieve disk size from primary"
+ " node '%s'" % instance.primary_node)
+
+ old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
+
+ assert old_disk_size >= disk.size, \
+ ("Retrieved disk size too small (got %s, should be at least %s)" %
+ (old_disk_size, disk.size))
+ else:
+ old_disk_size = None
+
+ # We know that (as far as we can test) operations across different
+ # nodes will succeed, time to run it for real on the backing storage
+ for node in instance.all_nodes:
+ self.cfg.SetDiskID(disk, node)
+ result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
+ False, True)
+ result.Raise("Grow request failed to node %s" % node)
+
+ # And now execute it for logical storage, on the primary node
+ node = instance.primary_node
+ self.cfg.SetDiskID(disk, node)
+ result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
+ False, False)
+ result.Raise("Grow request failed to node %s" % node)
+
+ disk.RecordGrow(self.delta)
+ self.cfg.Update(instance, feedback_fn)
+
+ # Changes have been recorded, release node lock
+ _ReleaseLocks(self, locking.LEVEL_NODE)
+
+ # Downgrade lock while waiting for sync
+ self.glm.downgrade(locking.LEVEL_INSTANCE)
+
+ assert wipe_disks ^ (old_disk_size is None)
+
+ if wipe_disks:
+ assert instance.disks[self.op.disk] == disk
+
+ # Wipe newly added disk space
+ _WipeDisks(self, instance,
+ disks=[(self.op.disk, disk, old_disk_size)])
+
+ if self.op.wait_for_sync:
+ disk_abort = not _WaitForSync(self, instance, disks=[disk])
+ if disk_abort:
+ self.LogWarning("Disk syncing has not returned a good status; check"
+ " the instance")
+ if instance.admin_state != constants.ADMINST_UP:
+ _SafeShutdownInstanceDisks(self, instance, disks=[disk])
+ elif instance.admin_state != constants.ADMINST_UP:
+ self.LogWarning("Not shutting down the disk even if the instance is"
+ " not supposed to be running because no wait for"
+ " sync mode was requested")
+
+ assert self.owned_locks(locking.LEVEL_NODE_RES)
+ assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
+
+
+class LUInstanceReplaceDisks(LogicalUnit):
+ """Replace the disks of an instance.
+
+ """
+ HPATH = "mirrors-replace"
+ HTYPE = constants.HTYPE_INSTANCE
+ REQ_BGL = False
+
+ def CheckArguments(self):
+ """Check arguments.
+
+ """
+ remote_node = self.op.remote_node
+ ialloc = self.op.iallocator
+ if self.op.mode == constants.REPLACE_DISK_CHG:
+ if remote_node is None and ialloc is None:
+ raise errors.OpPrereqError("When changing the secondary either an"
+ " iallocator script must be used or the"
+ " new node given", errors.ECODE_INVAL)
+ else:
+ _CheckIAllocatorOrNode(self, "iallocator", "remote_node")
+
+ elif remote_node is not None or ialloc is not None:
+ # Not replacing the secondary
+ raise errors.OpPrereqError("The iallocator and new node options can"
+ " only be used when changing the"
+ " secondary node", errors.ECODE_INVAL)
+
+ def ExpandNames(self):
+ self._ExpandAndLockInstance()
+
+ assert locking.LEVEL_NODE not in self.needed_locks
+ assert locking.LEVEL_NODE_RES not in self.needed_locks
+ assert locking.LEVEL_NODEGROUP not in self.needed_locks
+
+ assert self.op.iallocator is None or self.op.remote_node is None, \
+ "Conflicting options"
+
+ if self.op.remote_node is not None:
+ self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
+
+ # Warning: do not remove the locking of the new secondary here
+ # unless DRBD8.AddChildren is changed to work in parallel;
+ # currently it doesn't since parallel invocations of
+ # FindUnusedMinor will conflict
+ self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
+ self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+ else:
+ self.needed_locks[locking.LEVEL_NODE] = []
+ self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+ if self.op.iallocator is not None:
+ # iallocator will select a new node in the same group
+ self.needed_locks[locking.LEVEL_NODEGROUP] = []
+ self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
+
+ self.needed_locks[locking.LEVEL_NODE_RES] = []
+
+ self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
+ self.op.iallocator, self.op.remote_node,
+ self.op.disks, self.op.early_release,
+ self.op.ignore_ipolicy)
+
+ self.tasklets = [self.replacer]
+
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_NODEGROUP:
+ assert self.op.remote_node is None
+ assert self.op.iallocator is not None
+ assert not self.needed_locks[locking.LEVEL_NODEGROUP]
+
+ self.share_locks[locking.LEVEL_NODEGROUP] = 1
+ # Lock all groups used by instance optimistically; this requires going
+ # via the node before it's locked, requiring verification later on
+ self.needed_locks[locking.LEVEL_NODEGROUP] = \
+ self.cfg.GetInstanceNodeGroups(self.op.instance_name)
+
+ elif level == locking.LEVEL_NODE:
+ if self.op.iallocator is not None:
+ assert self.op.remote_node is None
+ assert not self.needed_locks[locking.LEVEL_NODE]
+ assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
+
+ # Lock member nodes of all locked groups
+ self.needed_locks[locking.LEVEL_NODE] = \
+ [node_name
+ for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
+ for node_name in self.cfg.GetNodeGroup(group_uuid).members]
+ else:
+ assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
+
+ self._LockInstancesNodes()
+
+ elif level == locking.LEVEL_NODE_RES:
+ # Reuse node locks
+ self.needed_locks[locking.LEVEL_NODE_RES] = \
+ self.needed_locks[locking.LEVEL_NODE]
+
+ def BuildHooksEnv(self):
+ """Build hooks env.
+
+ This runs on the master, the primary and all the secondaries.
+
+ """
+ instance = self.replacer.instance
+ env = {
+ "MODE": self.op.mode,
+ "NEW_SECONDARY": self.op.remote_node,
+ "OLD_SECONDARY": instance.secondary_nodes[0],
+ }
+ env.update(_BuildInstanceHookEnvByObject(self, instance))
+ return env
+
+ def BuildHooksNodes(self):
+ """Build hooks nodes.
+
+ """
+ instance = self.replacer.instance
+ nl = [
+ self.cfg.GetMasterNode(),
+ instance.primary_node,
+ ]
+ if self.op.remote_node is not None:
+ nl.append(self.op.remote_node)
+ return nl, nl
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ """
+ assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
+ self.op.iallocator is None)
+
+ # Verify if node group locks are still correct
+ owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
+ if owned_groups:
+ _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
+
+ return LogicalUnit.CheckPrereq(self)
+
+
+class LUInstanceActivateDisks(NoHooksLU):
+ """Bring up an instance's disks.
+
+ """
+ REQ_BGL = False
+
+ def ExpandNames(self):
+ self._ExpandAndLockInstance()
+ self.needed_locks[locking.LEVEL_NODE] = []
+ self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_NODE:
+ self._LockInstancesNodes()
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ This checks that the instance is in the cluster.
+
+ """
+ self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+ assert self.instance is not None, \
+ "Cannot retrieve locked instance %s" % self.op.instance_name
+ _CheckNodeOnline(self, self.instance.primary_node)
+
+ def Exec(self, feedback_fn):
+ """Activate the disks.
+
+ """
+ disks_ok, disks_info = \
+ _AssembleInstanceDisks(self, self.instance,
+ ignore_size=self.op.ignore_size)
+ if not disks_ok:
+ raise errors.OpExecError("Cannot activate block devices")
+
+ if self.op.wait_for_sync:
+ if not _WaitForSync(self, self.instance):
+ raise errors.OpExecError("Some disks of the instance are degraded!")
+
+ return disks_info
+
+
+class LUInstanceDeactivateDisks(NoHooksLU):
+ """Shutdown an instance's disks.
+
+ """
+ REQ_BGL = False
+
+ def ExpandNames(self):
+ self._ExpandAndLockInstance()
+ self.needed_locks[locking.LEVEL_NODE] = []
+ self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_NODE:
+ self._LockInstancesNodes()
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ This checks that the instance is in the cluster.
+
+ """
+ self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+ assert self.instance is not None, \
+ "Cannot retrieve locked instance %s" % self.op.instance_name
+
+ def Exec(self, feedback_fn):
+ """Deactivate the disks
+
+ """
+ instance = self.instance
+ if self.op.force:
+ _ShutdownInstanceDisks(self, instance)
+ else:
+ _SafeShutdownInstanceDisks(self, instance)
+
+
+def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
+ ldisk=False):
+ """Check that mirrors are not degraded.
+
+ @attention: The device has to be annotated already.
+
+ The ldisk parameter, if True, will change the test from the
+ is_degraded attribute (which represents overall non-ok status for
+ the device(s)) to the ldisk (representing the local storage status).
+
+ """
+ lu.cfg.SetDiskID(dev, node)
+
+ result = True
+
+ if on_primary or dev.AssembleOnSecondary():
+ rstats = lu.rpc.call_blockdev_find(node, dev)
+ msg = rstats.fail_msg
+ if msg:
+ lu.LogWarning("Can't find disk on node %s: %s", node, msg)
+ result = False
+ elif not rstats.payload:
+ lu.LogWarning("Can't find disk on node %s", node)
+ result = False
+ else:
+ if ldisk:
+ result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
+ else:
+ result = result and not rstats.payload.is_degraded
+
+ if dev.children:
+ for child in dev.children:
+ result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
+ on_primary)
+
+ return result
+
+
+def _CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
+ """Wrapper around L{_CheckDiskConsistencyInner}.
+
+ """
+ (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
+ return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
+ ldisk=ldisk)
+
+
+def _BlockdevFind(lu, node, dev, instance):
+ """Wrapper around call_blockdev_find to annotate diskparams.
+
+ @param lu: A reference to the lu object
+ @param node: The node to call out
+ @param dev: The device to find
+ @param instance: The instance object the device belongs to
+ @returns The result of the rpc call
+
+ """
+ (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
+ return lu.rpc.call_blockdev_find(node, disk)
+
+
+def _GenerateUniqueNames(lu, exts):
+ """Generate a suitable LV name.
+
+ This will generate a logical volume name for the given instance.
+
+ """
+ results = []
+ for val in exts:
+ new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
+ results.append("%s%s" % (new_id, val))
+ return results
+
+
+class TLReplaceDisks(Tasklet):
+ """Replaces disks for an instance.
+
+ Note: Locking is not within the scope of this class.
+
+ """
+ def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
+ disks, early_release, ignore_ipolicy):
+ """Initializes this class.
+
+ """
+ Tasklet.__init__(self, lu)
+
+ # Parameters
+ self.instance_name = instance_name
+ self.mode = mode
+ self.iallocator_name = iallocator_name
+ self.remote_node = remote_node
+ self.disks = disks
+ self.early_release = early_release
+ self.ignore_ipolicy = ignore_ipolicy
+
+ # Runtime data
+ self.instance = None
+ self.new_node = None
+ self.target_node = None
+ self.other_node = None
+ self.remote_node_info = None
+ self.node_secondary_ip = None
+
+ @staticmethod
+ def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
+ """Compute a new secondary node using an IAllocator.
+
+ """
+ req = iallocator.IAReqRelocate(name=instance_name,
+ relocate_from=list(relocate_from))
+ ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
+
+ ial.Run(iallocator_name)
+
+ if not ial.success:
+ raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
+ " %s" % (iallocator_name, ial.info),
+ errors.ECODE_NORES)
+
+ remote_node_name = ial.result[0]
+
+ lu.LogInfo("Selected new secondary for instance '%s': %s",
+ instance_name, remote_node_name)
+
+ return remote_node_name
+
+ def _FindFaultyDisks(self, node_name):
+ """Wrapper for L{_FindFaultyInstanceDisks}.
+
+ """
+ return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
+ node_name, True)
+
+ def _CheckDisksActivated(self, instance):
+ """Checks if the instance disks are activated.
+
+ @param instance: The instance to check disks
+ @return: True if they are activated, False otherwise
+
+ """
+ nodes = instance.all_nodes
+
+ for idx, dev in enumerate(instance.disks):
+ for node in nodes:
+ self.lu.LogInfo("Checking disk/%d on %s", idx, node)
+ self.cfg.SetDiskID(dev, node)
+
+ result = _BlockdevFind(self, node, dev, instance)
+
+ if result.offline:
+ continue
+ elif result.fail_msg or not result.payload:
+ return False
+
+ return True
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ This checks that the instance is in the cluster.
+
+ """
+ self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
+ assert instance is not None, \
+ "Cannot retrieve locked instance %s" % self.instance_name
+
+ if instance.disk_template != constants.DT_DRBD8:
+ raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
+ " instances", errors.ECODE_INVAL)
+
+ if len(instance.secondary_nodes) != 1:
+ raise errors.OpPrereqError("The instance has a strange layout,"
+ " expected one secondary but found %d" %
+ len(instance.secondary_nodes),
+ errors.ECODE_FAULT)
+
+ instance = self.instance
+ secondary_node = instance.secondary_nodes[0]
+
+ if self.iallocator_name is None:
+ remote_node = self.remote_node
+ else:
+ remote_node = self._RunAllocator(self.lu, self.iallocator_name,
+ instance.name, instance.secondary_nodes)
+
+ if remote_node is None:
+ self.remote_node_info = None
+ else:
+ assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \
+ "Remote node '%s' is not locked" % remote_node
+
+ self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
+ assert self.remote_node_info is not None, \
+ "Cannot retrieve locked node %s" % remote_node
+
+ if remote_node == self.instance.primary_node:
+ raise errors.OpPrereqError("The specified node is the primary node of"
+ " the instance", errors.ECODE_INVAL)
+
+ if remote_node == secondary_node:
+ raise errors.OpPrereqError("The specified node is already the"
+ " secondary node of the instance",
+ errors.ECODE_INVAL)
+
+ if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
+ constants.REPLACE_DISK_CHG):
+ raise errors.OpPrereqError("Cannot specify disks to be replaced",
+ errors.ECODE_INVAL)
+
+ if self.mode == constants.REPLACE_DISK_AUTO:
+ if not self._CheckDisksActivated(instance):
+ raise errors.OpPrereqError("Please run activate-disks on instance %s"
+ " first" % self.instance_name,
+ errors.ECODE_STATE)
+ faulty_primary = self._FindFaultyDisks(instance.primary_node)
+ faulty_secondary = self._FindFaultyDisks(secondary_node)
+
+ if faulty_primary and faulty_secondary:
+ raise errors.OpPrereqError("Instance %s has faulty disks on more than"
+ " one node and can not be repaired"
+ " automatically" % self.instance_name,
+ errors.ECODE_STATE)
+
+ if faulty_primary:
+ self.disks = faulty_primary
+ self.target_node = instance.primary_node
+ self.other_node = secondary_node
+ check_nodes = [self.target_node, self.other_node]
+ elif faulty_secondary:
+ self.disks = faulty_secondary
+ self.target_node = secondary_node
+ self.other_node = instance.primary_node
+ check_nodes = [self.target_node, self.other_node]
+ else:
+ self.disks = []
+ check_nodes = []
+
+ else:
+ # Non-automatic modes
+ if self.mode == constants.REPLACE_DISK_PRI:
+ self.target_node = instance.primary_node
+ self.other_node = secondary_node
+ check_nodes = [self.target_node, self.other_node]
+
+ elif self.mode == constants.REPLACE_DISK_SEC:
+ self.target_node = secondary_node
+ self.other_node = instance.primary_node
+ check_nodes = [self.target_node, self.other_node]
+
+ elif self.mode == constants.REPLACE_DISK_CHG:
+ self.new_node = remote_node
+ self.other_node = instance.primary_node
+ self.target_node = secondary_node
+ check_nodes = [self.new_node, self.other_node]
+
+ _CheckNodeNotDrained(self.lu, remote_node)
+ _CheckNodeVmCapable(self.lu, remote_node)
+
+ old_node_info = self.cfg.GetNodeInfo(secondary_node)
+ assert old_node_info is not None
+ if old_node_info.offline and not self.early_release:
+ # doesn't make sense to delay the release
+ self.early_release = True
+ self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
+ " early-release mode", secondary_node)
+
+ else:
+ raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
+ self.mode)
+
+ # If not specified all disks should be replaced
+ if not self.disks:
+ self.disks = range(len(self.instance.disks))
+
+ # TODO: This is ugly, but right now we can't distinguish between internal
+ # submitted opcode and external one. We should fix that.
+ if self.remote_node_info:
+ # We change the node, lets verify it still meets instance policy
+ new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
+ cluster = self.cfg.GetClusterInfo()
+ ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
+ new_group_info)
+ _CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
+ self.cfg, ignore=self.ignore_ipolicy)
+
+ for node in check_nodes:
+ _CheckNodeOnline(self.lu, node)
+
+ touched_nodes = frozenset(node_name for node_name in [self.new_node,
+ self.other_node,
+ self.target_node]
+ if node_name is not None)
+
+ # Release unneeded node and node resource locks
+ _ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
+ _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
+ _ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
+
+ # Release any owned node group
+ _ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
+
+ # Check whether disks are valid
+ for disk_idx in self.disks:
+ instance.FindDisk(disk_idx)
+
+ # Get secondary node IP addresses
+ self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node)
+ in self.cfg.GetMultiNodeInfo(touched_nodes))
+
+ def Exec(self, feedback_fn):
+ """Execute disk replacement.
+
+ This dispatches the disk replacement to the appropriate handler.
+
+ """
+ if __debug__:
+ # Verify owned locks before starting operation
+ owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
+ assert set(owned_nodes) == set(self.node_secondary_ip), \
+ ("Incorrect node locks, owning %s, expected %s" %
+ (owned_nodes, self.node_secondary_ip.keys()))
+ assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
+ self.lu.owned_locks(locking.LEVEL_NODE_RES))
+ assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
+
+ owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
+ assert list(owned_instances) == [self.instance_name], \
+ "Instance '%s' not locked" % self.instance_name
+
+ assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
+ "Should not own any node group lock at this point"
+
+ if not self.disks:
+ feedback_fn("No disks need replacement for instance '%s'" %
+ self.instance.name)
+ return
+
+ feedback_fn("Replacing disk(s) %s for instance '%s'" %
+ (utils.CommaJoin(self.disks), self.instance.name))
+ feedback_fn("Current primary node: %s" % self.instance.primary_node)
+ feedback_fn("Current seconary node: %s" %
+ utils.CommaJoin(self.instance.secondary_nodes))
+
+ activate_disks = (self.instance.admin_state != constants.ADMINST_UP)
+
+ # Activate the instance disks if we're replacing them on a down instance
+ if activate_disks:
+ _StartInstanceDisks(self.lu, self.instance, True)
+
+ try:
+ # Should we replace the secondary node?
+ if self.new_node is not None:
+ fn = self._ExecDrbd8Secondary
+ else:
+ fn = self._ExecDrbd8DiskOnly
+
+ result = fn(feedback_fn)
+ finally:
+ # Deactivate the instance disks if we're replacing them on a
+ # down instance
+ if activate_disks:
+ _SafeShutdownInstanceDisks(self.lu, self.instance)
+
+ assert not self.lu.owned_locks(locking.LEVEL_NODE)
+
+ if __debug__:
+ # Verify owned locks
+ owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
+ nodes = frozenset(self.node_secondary_ip)
+ assert ((self.early_release and not owned_nodes) or
+ (not self.early_release and not (set(owned_nodes) - nodes))), \
+ ("Not owning the correct locks, early_release=%s, owned=%r,"
+ " nodes=%r" % (self.early_release, owned_nodes, nodes))
+
+ return result
+
+ def _CheckVolumeGroup(self, nodes):
+ self.lu.LogInfo("Checking volume groups")
+
+ vgname = self.cfg.GetVGName()
+
+ # Make sure volume group exists on all involved nodes
+ results = self.rpc.call_vg_list(nodes)
+ if not results:
+ raise errors.OpExecError("Can't list volume groups on the nodes")
+
+ for node in nodes:
+ res = results[node]
+ res.Raise("Error checking node %s" % node)
+ if vgname not in res.payload:
+ raise errors.OpExecError("Volume group '%s' not found on node %s" %
+ (vgname, node))
+
+ def _CheckDisksExistence(self, nodes):
+ # Check disk existence
+ for idx, dev in enumerate(self.instance.disks):
+ if idx not in self.disks:
+ continue
+
+ for node in nodes:
+ self.lu.LogInfo("Checking disk/%d on %s", idx, node)
+ self.cfg.SetDiskID(dev, node)
+
+ result = _BlockdevFind(self, node, dev, self.instance)
+
+ msg = result.fail_msg
+ if msg or not result.payload:
+ if not msg:
+ msg = "disk not found"
+ raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
+ (idx, node, msg))
+
+ def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
+ for idx, dev in enumerate(self.instance.disks):
+ if idx not in self.disks:
+ continue
+
+ self.lu.LogInfo("Checking disk/%d consistency on node %s" %
+ (idx, node_name))
+
+ if not _CheckDiskConsistency(self.lu, self.instance, dev, node_name,
+ on_primary, ldisk=ldisk):
+ raise errors.OpExecError("Node %s has degraded storage, unsafe to"
+ " replace disks for instance %s" %
+ (node_name, self.instance.name))
+
+ def _CreateNewStorage(self, node_name):
+ """Create new storage on the primary or secondary node.
+
+ This is only used for same-node replaces, not for changing the
+ secondary node, hence we don't want to modify the existing disk.
+
+ """
+ iv_names = {}
+
+ disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
+ for idx, dev in enumerate(disks):
+ if idx not in self.disks:
+ continue
+
+ self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx)
+
+ self.cfg.SetDiskID(dev, node_name)
+
+ lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
+ names = _GenerateUniqueNames(self.lu, lv_names)
+
+ (data_disk, meta_disk) = dev.children
+ vg_data = data_disk.logical_id[0]
+ lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
+ logical_id=(vg_data, names[0]),
+ params=data_disk.params)
+ vg_meta = meta_disk.logical_id[0]
+ lv_meta = objects.Disk(dev_type=constants.LD_LV,
+ size=constants.DRBD_META_SIZE,
+ logical_id=(vg_meta, names[1]),
+ params=meta_disk.params)
+
+ new_lvs = [lv_data, lv_meta]
+ old_lvs = [child.Copy() for child in dev.children]
+ iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
+ excl_stor = _IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
+
+ # we pass force_create=True to force the LVM creation
+ for new_lv in new_lvs:
+ _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
+ _GetInstanceInfoText(self.instance), False,
+ excl_stor)
+
+ return iv_names
+
+ def _CheckDevices(self, node_name, iv_names):
+ for name, (dev, _, _) in iv_names.iteritems():
+ self.cfg.SetDiskID(dev, node_name)
+
+ result = _BlockdevFind(self, node_name, dev, self.instance)
+
+ msg = result.fail_msg
+ if msg or not result.payload:
+ if not msg:
+ msg = "disk not found"
+ raise errors.OpExecError("Can't find DRBD device %s: %s" %
+ (name, msg))
+
+ if result.payload.is_degraded:
+ raise errors.OpExecError("DRBD device %s is degraded!" % name)
+
+ def _RemoveOldStorage(self, node_name, iv_names):
+ for name, (_, old_lvs, _) in iv_names.iteritems():
+ self.lu.LogInfo("Remove logical volumes for %s", name)
+
+ for lv in old_lvs:
+ self.cfg.SetDiskID(lv, node_name)
+
+ msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
+ if msg:
+ self.lu.LogWarning("Can't remove old LV: %s", msg,
+ hint="remove unused LVs manually")
+
+ def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
+ """Replace a disk on the primary or secondary for DRBD 8.
+
+ The algorithm for replace is quite complicated:
+
+ 1. for each disk to be replaced:
+
+ 1. create new LVs on the target node with unique names
+ 1. detach old LVs from the drbd device
+ 1. rename old LVs to name_replaced.<time_t>
+ 1. rename new LVs to old LVs
+ 1. attach the new LVs (with the old names now) to the drbd device
+
+ 1. wait for sync across all devices
+
+ 1. for each modified disk:
+
+ 1. remove old LVs (which have the name name_replaces.<time_t>)
+
+ Failures are not very well handled.
+
+ """
+ steps_total = 6
+
+ # Step: check device activation
+ self.lu.LogStep(1, steps_total, "Check device existence")
+ self._CheckDisksExistence([self.other_node, self.target_node])
+ self._CheckVolumeGroup([self.target_node, self.other_node])
+
+ # Step: check other node consistency
+ self.lu.LogStep(2, steps_total, "Check peer consistency")
+ self._CheckDisksConsistency(self.other_node,
+ self.other_node == self.instance.primary_node,
+ False)
+
+ # Step: create new storage
+ self.lu.LogStep(3, steps_total, "Allocate new storage")
+ iv_names = self._CreateNewStorage(self.target_node)
+
+ # Step: for each lv, detach+rename*2+attach
+ self.lu.LogStep(4, steps_total, "Changing drbd configuration")
+ for dev, old_lvs, new_lvs in iv_names.itervalues():
+ self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
+
+ result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
+ old_lvs)
+ result.Raise("Can't detach drbd from local storage on node"
+ " %s for device %s" % (self.target_node, dev.iv_name))
+ #dev.children = []
+ #cfg.Update(instance)
+
+ # ok, we created the new LVs, so now we know we have the needed
+ # storage; as such, we proceed on the target node to rename
+ # old_lv to _old, and new_lv to old_lv; note that we rename LVs
+ # using the assumption that logical_id == physical_id (which in
+ # turn is the unique_id on that node)
+
+ # FIXME(iustin): use a better name for the replaced LVs
+ temp_suffix = int(time.time())
+ ren_fn = lambda d, suff: (d.physical_id[0],
+ d.physical_id[1] + "_replaced-%s" % suff)
+
+ # Build the rename list based on what LVs exist on the node
+ rename_old_to_new = []
+ for to_ren in old_lvs:
+ result = self.rpc.call_blockdev_find(self.target_node, to_ren)
+ if not result.fail_msg and result.payload:
+ # device exists
+ rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
+
+ self.lu.LogInfo("Renaming the old LVs on the target node")
+ result = self.rpc.call_blockdev_rename(self.target_node,
+ rename_old_to_new)
+ result.Raise("Can't rename old LVs on node %s" % self.target_node)
+
+ # Now we rename the new LVs to the old LVs
+ self.lu.LogInfo("Renaming the new LVs on the target node")
+ rename_new_to_old = [(new, old.physical_id)
+ for old, new in zip(old_lvs, new_lvs)]
+ result = self.rpc.call_blockdev_rename(self.target_node,
+ rename_new_to_old)
+ result.Raise("Can't rename new LVs on node %s" % self.target_node)
+
+ # Intermediate steps of in memory modifications
+ for old, new in zip(old_lvs, new_lvs):
+ new.logical_id = old.logical_id
+ self.cfg.SetDiskID(new, self.target_node)
+
+ # We need to modify old_lvs so that removal later removes the
+ # right LVs, not the newly added ones; note that old_lvs is a
+ # copy here
+ for disk in old_lvs:
+ disk.logical_id = ren_fn(disk, temp_suffix)
+ self.cfg.SetDiskID(disk, self.target_node)
+
+ # Now that the new lvs have the old name, we can add them to the device
+ self.lu.LogInfo("Adding new mirror component on %s", self.target_node)
+ result = self.rpc.call_blockdev_addchildren(self.target_node,
+ (dev, self.instance), new_lvs)
+ msg = result.fail_msg
+ if msg:
+ for new_lv in new_lvs:
+ msg2 = self.rpc.call_blockdev_remove(self.target_node,
+ new_lv).fail_msg
+ if msg2:
+ self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
+ hint=("cleanup manually the unused logical"
+ "volumes"))
+ raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
+
+ cstep = itertools.count(5)
+
+ if self.early_release:
+ self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
+ self._RemoveOldStorage(self.target_node, iv_names)
+ # TODO: Check if releasing locks early still makes sense
+ _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
+ else:
+ # Release all resource locks except those used by the instance
+ _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
+ keep=self.node_secondary_ip.keys())
+
+ # Release all node locks while waiting for sync
+ _ReleaseLocks(self.lu, locking.LEVEL_NODE)
+
+ # TODO: Can the instance lock be downgraded here? Take the optional disk
+ # shutdown in the caller into consideration.
+
+ # Wait for sync
+ # This can fail as the old devices are degraded and _WaitForSync
+ # does a combined result over all disks, so we don't check its return value
+ self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
+ _WaitForSync(self.lu, self.instance)
+
+ # Check all devices manually
+ self._CheckDevices(self.instance.primary_node, iv_names)
+
+ # Step: remove old storage
+ if not self.early_release:
+ self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
+ self._RemoveOldStorage(self.target_node, iv_names)
+
+ def _ExecDrbd8Secondary(self, feedback_fn):
+ """Replace the secondary node for DRBD 8.
+
+ The algorithm for replace is quite complicated:
+ - for all disks of the instance:
+ - create new LVs on the new node with same names
+ - shutdown the drbd device on the old secondary
+ - disconnect the drbd network on the primary
+ - create the drbd device on the new secondary
+ - network attach the drbd on the primary, using an artifice:
+ the drbd code for Attach() will connect to the network if it
+ finds a device which is connected to the good local disks but
+ not network enabled
+ - wait for sync across all devices
+ - remove all disks from the old secondary
+
+ Failures are not very well handled.
+
+ """
+ steps_total = 6
+
+ pnode = self.instance.primary_node
+
+ # Step: check device activation
+ self.lu.LogStep(1, steps_total, "Check device existence")
+ self._CheckDisksExistence([self.instance.primary_node])
+ self._CheckVolumeGroup([self.instance.primary_node])
+
+ # Step: check other node consistency
+ self.lu.LogStep(2, steps_total, "Check peer consistency")
+ self._CheckDisksConsistency(self.instance.primary_node, True, True)
+
+ # Step: create new storage
+ self.lu.LogStep(3, steps_total, "Allocate new storage")
+ disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
+ excl_stor = _IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
+ for idx, dev in enumerate(disks):
+ self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
+ (self.new_node, idx))
+ # we pass force_create=True to force LVM creation
+ for new_lv in dev.children:
+ _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
+ True, _GetInstanceInfoText(self.instance), False,
+ excl_stor)
+
+ # Step 4: dbrd minors and drbd setups changes
+ # after this, we must manually remove the drbd minors on both the
+ # error and the success paths
+ self.lu.LogStep(4, steps_total, "Changing drbd configuration")
+ minors = self.cfg.AllocateDRBDMinor([self.new_node
+ for dev in self.instance.disks],
+ self.instance.name)
+ logging.debug("Allocated minors %r", minors)
+
+ iv_names = {}
+ for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
+ self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
+ (self.new_node, idx))
+ # create new devices on new_node; note that we create two IDs:
+ # one without port, so the drbd will be activated without
+ # networking information on the new node at this stage, and one
+ # with network, for the latter activation in step 4
+ (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
+ if self.instance.primary_node == o_node1:
+ p_minor = o_minor1
+ else:
+ assert self.instance.primary_node == o_node2, "Three-node instance?"
+ p_minor = o_minor2
+
+ new_alone_id = (self.instance.primary_node, self.new_node, None,
+ p_minor, new_minor, o_secret)
+ new_net_id = (self.instance.primary_node, self.new_node, o_port,
+ p_minor, new_minor, o_secret)
+
+ iv_names[idx] = (dev, dev.children, new_net_id)
+ logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
+ new_net_id)
+ new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
+ logical_id=new_alone_id,
+ children=dev.children,
+ size=dev.size,
+ params={})
+ (anno_new_drbd,) = _AnnotateDiskParams(self.instance, [new_drbd],
+ self.cfg)
+ try:
+ _CreateSingleBlockDev(self.lu, self.new_node, self.instance,
+ anno_new_drbd,
+ _GetInstanceInfoText(self.instance), False,
+ excl_stor)
+ except errors.GenericError:
+ self.cfg.ReleaseDRBDMinors(self.instance.name)
+ raise
+
+ # We have new devices, shutdown the drbd on the old secondary
+ for idx, dev in enumerate(self.instance.disks):
+ self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
+ self.cfg.SetDiskID(dev, self.target_node)
+ msg = self.rpc.call_blockdev_shutdown(self.target_node,
+ (dev, self.instance)).fail_msg
+ if msg:
+ self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
+ "node: %s" % (idx, msg),
+ hint=("Please cleanup this device manually as"
+ " soon as possible"))
+
+ self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
+ result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
+ self.instance.disks)[pnode]
+
+ msg = result.fail_msg
+ if msg:
+ # detaches didn't succeed (unlikely)
+ self.cfg.ReleaseDRBDMinors(self.instance.name)
+ raise errors.OpExecError("Can't detach the disks from the network on"
+ " old node: %s" % (msg,))
+
+ # if we managed to detach at least one, we update all the disks of
+ # the instance to point to the new secondary
+ self.lu.LogInfo("Updating instance configuration")
+ for dev, _, new_logical_id in iv_names.itervalues():
+ dev.logical_id = new_logical_id
+ self.cfg.SetDiskID(dev, self.instance.primary_node)
+
+ self.cfg.Update(self.instance, feedback_fn)
+
+ # Release all node locks (the configuration has been updated)
+ _ReleaseLocks(self.lu, locking.LEVEL_NODE)
+
+ # and now perform the drbd attach
+ self.lu.LogInfo("Attaching primary drbds to new secondary"
+ " (standalone => connected)")
+ result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
+ self.new_node],
+ self.node_secondary_ip,
+ (self.instance.disks, self.instance),
+ self.instance.name,
+ False)
+ for to_node, to_result in result.items():
+ msg = to_result.fail_msg
+ if msg:
+ self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
+ to_node, msg,
+ hint=("please do a gnt-instance info to see the"
+ " status of disks"))
+
+ cstep = itertools.count(5)
+
+ if self.early_release:
+ self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
+ self._RemoveOldStorage(self.target_node, iv_names)
+ # TODO: Check if releasing locks early still makes sense
+ _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
+ else:
+ # Release all resource locks except those used by the instance
+ _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
+ keep=self.node_secondary_ip.keys())
+
+ # TODO: Can the instance lock be downgraded here? Take the optional disk
+ # shutdown in the caller into consideration.
+
+ # Wait for sync
+ # This can fail as the old devices are degraded and _WaitForSync
+ # does a combined result over all disks, so we don't check its return value
+ self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
+ _WaitForSync(self.lu, self.instance)
+
+ # Check all devices manually
+ self._CheckDevices(self.instance.primary_node, iv_names)
+
+ # Step: remove old storage
+ if not self.early_release:
+ self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
+ self._RemoveOldStorage(self.target_node, iv_names)
from ganeti import objects
from ganeti import pathutils
from ganeti import utils
-
-from ganeti.cmdlib.common import _AnnotateDiskParams
+from ganeti.cmdlib.common import _AnnotateDiskParams, \
+ _ComputeIPolicyInstanceViolation
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
errors.ECODE_STATE)
-def _StartInstanceDisks(lu, instance, force):
- """Start the disks of an instance.
-
- """
- disks_ok, _ = _AssembleInstanceDisks(lu, instance,
- ignore_secondaries=force)
- if not disks_ok:
- _ShutdownInstanceDisks(lu, instance)
- if force is not None and not force:
- lu.LogWarning("",
- hint=("If the message above refers to a secondary node,"
- " you can retry the operation using '--force'"))
- raise errors.OpExecError("Disk consistency error")
-
-
-def _ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
- """Shutdown block devices of an instance.
+def _CheckNodeVmCapable(lu, node):
+ """Ensure that a given node is vm capable.
- This does the shutdown on all nodes of the instance.
-
- If the ignore_primary is false, errors on the primary node are
- ignored.
+ @param lu: the LU on behalf of which we make the check
+ @param node: the node to check
+ @raise errors.OpPrereqError: if the node is not vm capable
"""
- all_result = True
- disks = _ExpandCheckDisks(instance, disks)
-
- for disk in disks:
- for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
- lu.cfg.SetDiskID(top_disk, node)
- result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
- msg = result.fail_msg
- if msg:
- lu.LogWarning("Could not shutdown block device %s on node %s: %s",
- disk.iv_name, node, msg)
- if ((node == instance.primary_node and not ignore_primary) or
- (node != instance.primary_node and not result.offline)):
- all_result = False
- return all_result
+ if not lu.cfg.GetNodeInfo(node).vm_capable:
+ raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
+ errors.ECODE_STATE)
def _RemoveInstance(lu, feedback_fn, instance, ignore_failures):
lu.remove_locks[locking.LEVEL_INSTANCE] = instance.name
-def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
- ignore_size=False):
- """Prepare the block devices for an instance.
-
- This sets up the block devices on all nodes.
-
- @type lu: L{LogicalUnit}
- @param lu: the logical unit on whose behalf we execute
- @type instance: L{objects.Instance}
- @param instance: the instance for whose disks we assemble
- @type disks: list of L{objects.Disk} or None
- @param disks: which disks to assemble (or all, if None)
- @type ignore_secondaries: boolean
- @param ignore_secondaries: if true, errors on secondary nodes
- won't result in an error return from the function
- @type ignore_size: boolean
- @param ignore_size: if true, the current known size of the disk
- will not be used during the disk activation, useful for cases
- when the size is wrong
- @return: False if the operation failed, otherwise a list of
- (host, instance_visible_name, node_visible_name)
- with the mapping from node devices to instance devices
-
- """
- device_info = []
- disks_ok = True
- iname = instance.name
- disks = _ExpandCheckDisks(instance, disks)
-
- # With the two passes mechanism we try to reduce the window of
- # opportunity for the race condition of switching DRBD to primary
- # before handshaking occured, but we do not eliminate it
-
- # The proper fix would be to wait (with some limits) until the
- # connection has been made and drbd transitions from WFConnection
- # into any other network-connected state (Connected, SyncTarget,
- # SyncSource, etc.)
-
- # 1st pass, assemble on all nodes in secondary mode
- for idx, inst_disk in enumerate(disks):
- for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
- if ignore_size:
- node_disk = node_disk.Copy()
- node_disk.UnsetSize()
- lu.cfg.SetDiskID(node_disk, node)
- result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
- False, idx)
- msg = result.fail_msg
- if msg:
- is_offline_secondary = (node in instance.secondary_nodes and
- result.offline)
- lu.LogWarning("Could not prepare block device %s on node %s"
- " (is_primary=False, pass=1): %s",
- inst_disk.iv_name, node, msg)
- if not (ignore_secondaries or is_offline_secondary):
- disks_ok = False
-
- # FIXME: race condition on drbd migration to primary
-
- # 2nd pass, do only the primary node
- for idx, inst_disk in enumerate(disks):
- dev_path = None
-
- for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
- if node != instance.primary_node:
- continue
- if ignore_size:
- node_disk = node_disk.Copy()
- node_disk.UnsetSize()
- lu.cfg.SetDiskID(node_disk, node)
- result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
- True, idx)
- msg = result.fail_msg
- if msg:
- lu.LogWarning("Could not prepare block device %s on node %s"
- " (is_primary=True, pass=2): %s",
- inst_disk.iv_name, node, msg)
- disks_ok = False
- else:
- dev_path = result.payload
-
- device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
-
- # leave the disks configured for the primary node
- # this is a workaround that would be fixed better by
- # improving the logical/physical id handling
- for disk in disks:
- lu.cfg.SetDiskID(disk, instance.primary_node)
-
- return disks_ok, device_info
-
-
def _RemoveDisks(lu, instance, target_node=None, ignore_failures=False):
"""Remove all disks for an instance.
return all_result
-def _ExpandCheckDisks(instance, disks):
- """Return the instance disks selected by the disks list
-
- @type disks: list of L{objects.Disk} or None
- @param disks: selected disks
- @rtype: list of L{objects.Disk}
- @return: selected instance disks to act on
-
- """
- if disks is None:
- return instance.disks
- else:
- if not set(disks).issubset(instance.disks):
- raise errors.ProgrammerError("Can only act on disks belonging to the"
- " target instance")
- return disks
-
-
def _NICToTuple(lu, nic):
"""Build a tupple of nic information.
for nic in nics:
hooks_nics.append(_NICToTuple(lu, nic))
return hooks_nics
+
+
+def _CopyLockList(names):
+ """Makes a copy of a list of lock names.
+
+ Handles L{locking.ALL_SET} correctly.
+
+ """
+ if names == locking.ALL_SET:
+ return locking.ALL_SET
+ else:
+ return names[:]
+
+
+def _ReleaseLocks(lu, level, names=None, keep=None):
+ """Releases locks owned by an LU.
+
+ @type lu: L{LogicalUnit}
+ @param level: Lock level
+ @type names: list or None
+ @param names: Names of locks to release
+ @type keep: list or None
+ @param keep: Names of locks to retain
+
+ """
+ assert not (keep is not None and names is not None), \
+ "Only one of the 'names' and the 'keep' parameters can be given"
+
+ if names is not None:
+ should_release = names.__contains__
+ elif keep:
+ should_release = lambda name: name not in keep
+ else:
+ should_release = None
+
+ owned = lu.owned_locks(level)
+ if not owned:
+ # Not owning any lock at this level, do nothing
+ pass
+
+ elif should_release:
+ retain = []
+ release = []
+
+ # Determine which locks to release
+ for name in owned:
+ if should_release(name):
+ release.append(name)
+ else:
+ retain.append(name)
+
+ assert len(lu.owned_locks(level)) == (len(retain) + len(release))
+
+ # Release just some locks
+ lu.glm.release(level, names=release)
+
+ assert frozenset(lu.owned_locks(level)) == frozenset(retain)
+ else:
+ # Release everything
+ lu.glm.release(level)
+
+ assert not lu.glm.is_owned(level), "No locks should be owned"
+
+
+def _ComputeIPolicyNodeViolation(ipolicy, instance, current_group,
+ target_group, cfg,
+ _compute_fn=_ComputeIPolicyInstanceViolation):
+ """Compute if instance meets the specs of the new target group.
+
+ @param ipolicy: The ipolicy to verify
+ @param instance: The instance object to verify
+ @param current_group: The current group of the instance
+ @param target_group: The new group of the instance
+ @type cfg: L{config.ConfigWriter}
+ @param cfg: Cluster configuration
+ @param _compute_fn: The function to verify ipolicy (unittest only)
+ @see: L{ganeti.cmdlib.common._ComputeIPolicySpecViolation}
+
+ """
+ if current_group == target_group:
+ return []
+ else:
+ return _compute_fn(ipolicy, instance, cfg)
+
+
+def _CheckTargetNodeIPolicy(lu, ipolicy, instance, node, cfg, ignore=False,
+ _compute_fn=_ComputeIPolicyNodeViolation):
+ """Checks that the target node is correct in terms of instance policy.
+
+ @param ipolicy: The ipolicy to verify
+ @param instance: The instance object to verify
+ @param node: The new node to relocate
+ @type cfg: L{config.ConfigWriter}
+ @param cfg: Cluster configuration
+ @param ignore: Ignore violations of the ipolicy
+ @param _compute_fn: The function to verify ipolicy (unittest only)
+ @see: L{ganeti.cmdlib.common._ComputeIPolicySpecViolation}
+
+ """
+ primary_node = lu.cfg.GetNodeInfo(instance.primary_node)
+ res = _compute_fn(ipolicy, instance, primary_node.group, node.group, cfg)
+
+ if res:
+ msg = ("Instance does not meet target node group's (%s) instance"
+ " policy: %s") % (node.group, utils.CommaJoin(res))
+ if ignore:
+ lu.LogWarning(msg)
+ else:
+ raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
+
+
+def _GetInstanceInfoText(instance):
+ """Compute that text that should be added to the disk's metadata.
+
+ """
+ return "originstname+%s" % instance.name
from ganeti.cmdlib import cluster
from ganeti.cmdlib import group
from ganeti.cmdlib import instance
+from ganeti.cmdlib import instance_storage
+from ganeti.cmdlib import instance_utils
from ganeti.cmdlib import common
from ganeti.cmdlib import query
from ganeti import opcodes
self.recorder = _CallRecorder(return_value=[])
def testSameGroup(self):
- ret = instance._ComputeIPolicyNodeViolation(NotImplemented,
- NotImplemented,
- "foo", "foo", NotImplemented,
- _compute_fn=self.recorder)
+ ret = instance_utils._ComputeIPolicyNodeViolation(
+ NotImplemented,
+ NotImplemented,
+ "foo", "foo", NotImplemented,
+ _compute_fn=self.recorder)
self.assertFalse(self.recorder.called)
self.assertEqual(ret, [])
def testDifferentGroup(self):
- ret = instance._ComputeIPolicyNodeViolation(NotImplemented,
- NotImplemented,
- "foo", "bar", NotImplemented,
- _compute_fn=self.recorder)
+ ret = instance_utils._ComputeIPolicyNodeViolation(
+ NotImplemented,
+ NotImplemented,
+ "foo", "bar", NotImplemented,
+ _compute_fn=self.recorder)
self.assertTrue(self.recorder.called)
self.assertEqual(ret, [])
def testLessThanOneMebibyte(self):
for i in [1, 2, 7, 512, 1000, 1023]:
lu = _FakeLU()
- result = instance._DiskSizeInBytesToMebibytes(lu, i)
+ result = instance_storage._DiskSizeInBytesToMebibytes(lu, i)
self.assertEqual(result, 1)
self.assertEqual(len(lu.warning_log), 1)
self.assertEqual(len(lu.warning_log[0]), 2)
def testEven(self):
for i in [1, 2, 7, 512, 1000, 1023]:
lu = _FakeLU()
- result = instance._DiskSizeInBytesToMebibytes(lu, i * 1024 * 1024)
+ result = instance_storage._DiskSizeInBytesToMebibytes(lu,
+ i * 1024 * 1024)
self.assertEqual(result, i)
self.assertFalse(lu.warning_log)
for j in [1, 2, 486, 326, 986, 1023]:
lu = _FakeLU()
size = (1024 * 1024 * i) + j
- result = instance._DiskSizeInBytesToMebibytes(lu, size)
+ result = instance_storage._DiskSizeInBytesToMebibytes(lu, size)
self.assertEqual(result, i + 1, msg="Amount was not rounded up")
self.assertEqual(len(lu.warning_log), 1)
self.assertEqual(len(lu.warning_log[0]), 2)