From: Thomas Thrainer Date: Thu, 16 May 2013 07:13:48 +0000 (+0200) Subject: cmdlib: Extract storage related functionality X-Git-Tag: v2.8.0beta1~66 X-Git-Url: https://code.grnet.gr/git/ganeti-local/commitdiff_plain/763ad5befffbd2b18d86b327e2bd0c7c752cc374 cmdlib: Extract storage related functionality Split instance.py further by extracting storage related logical units and functions to instance_storage.py. Signed-off-by: Thomas Thrainer Reviewed-by: Bernardo Dal Seno --- diff --git a/Makefile.am b/Makefile.am index 1f16894..64306df 100644 --- a/Makefile.am +++ b/Makefile.am @@ -315,6 +315,7 @@ cmdlib_PYTHON = \ lib/cmdlib/group.py \ lib/cmdlib/node.py \ lib/cmdlib/instance.py \ + lib/cmdlib/instance_storage.py \ lib/cmdlib/instance_utils.py \ lib/cmdlib/backup.py \ lib/cmdlib/query.py \ diff --git a/lib/cmdlib/__init__.py b/lib/cmdlib/__init__.py index 95f7713..b248063 100644 --- a/lib/cmdlib/__init__.py +++ b/lib/cmdlib/__init__.py @@ -83,7 +83,8 @@ from ganeti.cmdlib.instance import \ LUInstanceMigrate, \ LUInstanceMultiAlloc, \ LUInstanceSetParams, \ - LUInstanceChangeGroup, \ + LUInstanceChangeGroup +from ganeti.cmdlib.instance_storage import \ LUInstanceRecreateDisks, \ LUInstanceGrowDisk, \ LUInstanceReplaceDisks, \ diff --git a/lib/cmdlib/backup.py b/lib/cmdlib/backup.py index 3cbc664..e5954f5 100644 --- a/lib/cmdlib/backup.py +++ b/lib/cmdlib/backup.py @@ -36,9 +36,10 @@ from ganeti import utils from ganeti.cmdlib.base import _QueryBase, NoHooksLU, LogicalUnit from ganeti.cmdlib.common import _GetWantedNodes, _ShareAll, \ _CheckNodeOnline, _ExpandNodeName +from ganeti.cmdlib.instance_storage import _StartInstanceDisks, \ + _ShutdownInstanceDisks from ganeti.cmdlib.instance_utils import _GetClusterDomainSecret, \ - _BuildInstanceHookEnvByObject, _CheckNodeNotDrained, _StartInstanceDisks, \ - _ShutdownInstanceDisks, _RemoveInstance + _BuildInstanceHookEnvByObject, _CheckNodeNotDrained, _RemoveInstance class _ExportQuery(_QueryBase): diff --git a/lib/cmdlib/instance.py b/lib/cmdlib/instance.py index b5dc099..9edf524 100644 --- a/lib/cmdlib/instance.py +++ b/lib/cmdlib/instance.py @@ -55,35 +55,23 @@ from ganeti.cmdlib.common import INSTANCE_ONLINE, INSTANCE_DOWN, \ _LoadNodeEvacResult, _CheckIAllocatorOrNode, _CheckParamsNotGlobal, \ _IsExclusiveStorageEnabledNode, _CheckHVParams, _CheckOSParams, \ _GetWantedInstances, _CheckInstancesNodeGroups, _AnnotateDiskParams, \ - _GetUpdatedParams, _ExpandInstanceName, _FindFaultyInstanceDisks, \ - _ComputeIPolicySpecViolation, _ComputeIPolicyInstanceViolation, \ + _GetUpdatedParams, _ExpandInstanceName, _ComputeIPolicySpecViolation, \ _CheckInstanceState, _ExpandNodeName -from ganeti.cmdlib.instance_utils import _AssembleInstanceDisks, \ - _BuildInstanceHookEnvByObject, _GetClusterDomainSecret, \ - _BuildInstanceHookEnv, _NICListToTuple, _NICToTuple, _CheckNodeNotDrained, \ - _RemoveDisks, _StartInstanceDisks, _ShutdownInstanceDisks, \ - _RemoveInstance, _ExpandCheckDisks +from ganeti.cmdlib.instance_storage import _CreateDisks, \ + _CheckNodesFreeDiskPerVG, _WipeDisks, _WaitForSync, _CheckDiskConsistency, \ + _IsExclusiveStorageEnabledNodeName, _CreateSingleBlockDev, _ComputeDisks, \ + _CheckRADOSFreeSpace, _ComputeDiskSizePerVG, _GenerateDiskTemplate, \ + _CreateBlockDev, _StartInstanceDisks, _ShutdownInstanceDisks, \ + _AssembleInstanceDisks, _ExpandCheckDisks +from ganeti.cmdlib.instance_utils import _BuildInstanceHookEnvByObject, \ + _GetClusterDomainSecret, _BuildInstanceHookEnv, _NICListToTuple, \ + _NICToTuple, _CheckNodeNotDrained, _RemoveInstance, _CopyLockList, \ + _ReleaseLocks, _CheckNodeVmCapable, _CheckTargetNodeIPolicy, \ + _GetInstanceInfoText, _RemoveDisks import ganeti.masterd.instance -_DISK_TEMPLATE_NAME_PREFIX = { - constants.DT_PLAIN: "", - constants.DT_RBD: ".rbd", - constants.DT_EXT: ".ext", - } - - -_DISK_TEMPLATE_DEVICE_TYPE = { - constants.DT_PLAIN: constants.LD_LV, - constants.DT_FILE: constants.LD_FILE, - constants.DT_SHARED_FILE: constants.LD_FILE, - constants.DT_BLOCK: constants.LD_BLOCKDEV, - constants.DT_RBD: constants.LD_RBD, - constants.DT_EXT: constants.LD_EXT, - } - - #: Type description for changes as returned by L{ApplyContainerMods}'s #: callbacks _TApplyContModsCbChanges = \ @@ -93,68 +81,6 @@ _TApplyContModsCbChanges = \ ]))) -def _CopyLockList(names): - """Makes a copy of a list of lock names. - - Handles L{locking.ALL_SET} correctly. - - """ - if names == locking.ALL_SET: - return locking.ALL_SET - else: - return names[:] - - -def _ReleaseLocks(lu, level, names=None, keep=None): - """Releases locks owned by an LU. - - @type lu: L{LogicalUnit} - @param level: Lock level - @type names: list or None - @param names: Names of locks to release - @type keep: list or None - @param keep: Names of locks to retain - - """ - assert not (keep is not None and names is not None), \ - "Only one of the 'names' and the 'keep' parameters can be given" - - if names is not None: - should_release = names.__contains__ - elif keep: - should_release = lambda name: name not in keep - else: - should_release = None - - owned = lu.owned_locks(level) - if not owned: - # Not owning any lock at this level, do nothing - pass - - elif should_release: - retain = [] - release = [] - - # Determine which locks to release - for name in owned: - if should_release(name): - release.append(name) - else: - retain.append(name) - - assert len(lu.owned_locks(level)) == (len(retain) + len(release)) - - # Release just some locks - lu.glm.release(level, names=release) - - assert frozenset(lu.owned_locks(level)) == frozenset(retain) - else: - # Release everything - lu.glm.release(level) - - assert not lu.glm.is_owned(level), "No locks should be owned" - - def _CheckHostnameSane(lu, name): """Ensures that a given hostname resolves to a 'sane' name. @@ -344,261 +270,6 @@ def _CheckForConflictingIp(lu, ip, node): return (None, None) -def _CheckRADOSFreeSpace(): - """Compute disk size requirements inside the RADOS cluster. - - """ - # For the RADOS cluster we assume there is always enough space. - pass - - -def _WaitForSync(lu, instance, disks=None, oneshot=False): - """Sleep and poll for an instance's disk to sync. - - """ - if not instance.disks or disks is not None and not disks: - return True - - disks = _ExpandCheckDisks(instance, disks) - - if not oneshot: - lu.LogInfo("Waiting for instance %s to sync disks", instance.name) - - node = instance.primary_node - - for dev in disks: - lu.cfg.SetDiskID(dev, node) - - # TODO: Convert to utils.Retry - - retries = 0 - degr_retries = 10 # in seconds, as we sleep 1 second each time - while True: - max_time = 0 - done = True - cumul_degraded = False - rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance)) - msg = rstats.fail_msg - if msg: - lu.LogWarning("Can't get any data from node %s: %s", node, msg) - retries += 1 - if retries >= 10: - raise errors.RemoteError("Can't contact node %s for mirror data," - " aborting." % node) - time.sleep(6) - continue - rstats = rstats.payload - retries = 0 - for i, mstat in enumerate(rstats): - if mstat is None: - lu.LogWarning("Can't compute data for node %s/%s", - node, disks[i].iv_name) - continue - - cumul_degraded = (cumul_degraded or - (mstat.is_degraded and mstat.sync_percent is None)) - if mstat.sync_percent is not None: - done = False - if mstat.estimated_time is not None: - rem_time = ("%s remaining (estimated)" % - utils.FormatSeconds(mstat.estimated_time)) - max_time = mstat.estimated_time - else: - rem_time = "no time estimate" - lu.LogInfo("- device %s: %5.2f%% done, %s", - disks[i].iv_name, mstat.sync_percent, rem_time) - - # if we're done but degraded, let's do a few small retries, to - # make sure we see a stable and not transient situation; therefore - # we force restart of the loop - if (done or oneshot) and cumul_degraded and degr_retries > 0: - logging.info("Degraded disks found, %d retries left", degr_retries) - degr_retries -= 1 - time.sleep(1) - continue - - if done or oneshot: - break - - time.sleep(min(60, max_time)) - - if done: - lu.LogInfo("Instance %s's disks are in sync", instance.name) - - return not cumul_degraded - - -def _ComputeDisks(op, default_vg): - """Computes the instance disks. - - @param op: The instance opcode - @param default_vg: The default_vg to assume - - @return: The computed disks - - """ - disks = [] - for disk in op.disks: - mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR) - if mode not in constants.DISK_ACCESS_SET: - raise errors.OpPrereqError("Invalid disk access mode '%s'" % - mode, errors.ECODE_INVAL) - size = disk.get(constants.IDISK_SIZE, None) - if size is None: - raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL) - try: - size = int(size) - except (TypeError, ValueError): - raise errors.OpPrereqError("Invalid disk size '%s'" % size, - errors.ECODE_INVAL) - - ext_provider = disk.get(constants.IDISK_PROVIDER, None) - if ext_provider and op.disk_template != constants.DT_EXT: - raise errors.OpPrereqError("The '%s' option is only valid for the %s" - " disk template, not %s" % - (constants.IDISK_PROVIDER, constants.DT_EXT, - op.disk_template), errors.ECODE_INVAL) - - data_vg = disk.get(constants.IDISK_VG, default_vg) - name = disk.get(constants.IDISK_NAME, None) - if name is not None and name.lower() == constants.VALUE_NONE: - name = None - new_disk = { - constants.IDISK_SIZE: size, - constants.IDISK_MODE: mode, - constants.IDISK_VG: data_vg, - constants.IDISK_NAME: name, - } - - if constants.IDISK_METAVG in disk: - new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG] - if constants.IDISK_ADOPT in disk: - new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT] - - # For extstorage, demand the `provider' option and add any - # additional parameters (ext-params) to the dict - if op.disk_template == constants.DT_EXT: - if ext_provider: - new_disk[constants.IDISK_PROVIDER] = ext_provider - for key in disk: - if key not in constants.IDISK_PARAMS: - new_disk[key] = disk[key] - else: - raise errors.OpPrereqError("Missing provider for template '%s'" % - constants.DT_EXT, errors.ECODE_INVAL) - - disks.append(new_disk) - - return disks - - -def _ComputeDiskSizePerVG(disk_template, disks): - """Compute disk size requirements in the volume group - - """ - def _compute(disks, payload): - """Universal algorithm. - - """ - vgs = {} - for disk in disks: - vgs[disk[constants.IDISK_VG]] = \ - vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload - - return vgs - - # Required free disk space as a function of disk and swap space - req_size_dict = { - constants.DT_DISKLESS: {}, - constants.DT_PLAIN: _compute(disks, 0), - # 128 MB are added for drbd metadata for each disk - constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE), - constants.DT_FILE: {}, - constants.DT_SHARED_FILE: {}, - } - - if disk_template not in req_size_dict: - raise errors.ProgrammerError("Disk template '%s' size requirement" - " is unknown" % disk_template) - - return req_size_dict[disk_template] - - -def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested): - """Checks if nodes have enough free disk space in the specified VG. - - This function checks if all given nodes have the needed amount of - free disk. In case any node has less disk or we cannot get the - information from the node, this function raises an OpPrereqError - exception. - - @type lu: C{LogicalUnit} - @param lu: a logical unit from which we get configuration data - @type nodenames: C{list} - @param nodenames: the list of node names to check - @type vg: C{str} - @param vg: the volume group to check - @type requested: C{int} - @param requested: the amount of disk in MiB to check for - @raise errors.OpPrereqError: if the node doesn't have enough disk, - or we cannot check the node - - """ - es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames) - nodeinfo = lu.rpc.call_node_info(nodenames, [vg], None, es_flags) - for node in nodenames: - info = nodeinfo[node] - info.Raise("Cannot get current information from node %s" % node, - prereq=True, ecode=errors.ECODE_ENVIRON) - (_, (vg_info, ), _) = info.payload - vg_free = vg_info.get("vg_free", None) - if not isinstance(vg_free, int): - raise errors.OpPrereqError("Can't compute free disk space on node" - " %s for vg %s, result was '%s'" % - (node, vg, vg_free), errors.ECODE_ENVIRON) - if requested > vg_free: - raise errors.OpPrereqError("Not enough disk space on target node %s" - " vg %s: required %d MiB, available %d MiB" % - (node, vg, requested, vg_free), - errors.ECODE_NORES) - - -def _CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes): - """Checks if nodes have enough free disk space in all the VGs. - - This function checks if all given nodes have the needed amount of - free disk. In case any node has less disk or we cannot get the - information from the node, this function raises an OpPrereqError - exception. - - @type lu: C{LogicalUnit} - @param lu: a logical unit from which we get configuration data - @type nodenames: C{list} - @param nodenames: the list of node names to check - @type req_sizes: C{dict} - @param req_sizes: the hash of vg and corresponding amount of disk in - MiB to check for - @raise errors.OpPrereqError: if the node doesn't have enough disk, - or we cannot check the node - - """ - for vg, req_size in req_sizes.items(): - _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size) - - -def _CheckNodeVmCapable(lu, node): - """Ensure that a given node is vm capable. - - @param lu: the LU on behalf of which we make the check - @param node: the node to check - @raise errors.OpPrereqError: if the node is not vm capable - - """ - if not lu.cfg.GetNodeInfo(node).vm_capable: - raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node, - errors.ECODE_STATE) - - def _ComputeIPolicyInstanceSpecViolation( ipolicy, instance_spec, disk_template, _compute_fn=_ComputeIPolicySpecViolation): @@ -723,457 +394,6 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name): return free_mem -def _GenerateUniqueNames(lu, exts): - """Generate a suitable LV name. - - This will generate a logical volume name for the given instance. - - """ - results = [] - for val in exts: - new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) - results.append("%s%s" % (new_id, val)) - return results - - -def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names, - iv_name, p_minor, s_minor): - """Generate a drbd8 device complete with its children. - - """ - assert len(vgnames) == len(names) == 2 - port = lu.cfg.AllocatePort() - shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId()) - - dev_data = objects.Disk(dev_type=constants.LD_LV, size=size, - logical_id=(vgnames[0], names[0]), - params={}) - dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) - dev_meta = objects.Disk(dev_type=constants.LD_LV, - size=constants.DRBD_META_SIZE, - logical_id=(vgnames[1], names[1]), - params={}) - dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) - drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size, - logical_id=(primary, secondary, port, - p_minor, s_minor, - shared_secret), - children=[dev_data, dev_meta], - iv_name=iv_name, params={}) - drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) - return drbd_dev - - -def _GenerateDiskTemplate( - lu, template_name, instance_name, primary_node, secondary_nodes, - disk_info, file_storage_dir, file_driver, base_index, - feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage, - _req_shr_file_storage=opcodes.RequireSharedFileStorage): - """Generate the entire disk layout for a given template type. - - """ - vgname = lu.cfg.GetVGName() - disk_count = len(disk_info) - disks = [] - - if template_name == constants.DT_DISKLESS: - pass - elif template_name == constants.DT_DRBD8: - if len(secondary_nodes) != 1: - raise errors.ProgrammerError("Wrong template configuration") - remote_node = secondary_nodes[0] - minors = lu.cfg.AllocateDRBDMinor( - [primary_node, remote_node] * len(disk_info), instance_name) - - (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name, - full_disk_params) - drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG] - - names = [] - for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i) - for i in range(disk_count)]): - names.append(lv_prefix + "_data") - names.append(lv_prefix + "_meta") - for idx, disk in enumerate(disk_info): - disk_index = idx + base_index - data_vg = disk.get(constants.IDISK_VG, vgname) - meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg) - disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node, - disk[constants.IDISK_SIZE], - [data_vg, meta_vg], - names[idx * 2:idx * 2 + 2], - "disk/%d" % disk_index, - minors[idx * 2], minors[idx * 2 + 1]) - disk_dev.mode = disk[constants.IDISK_MODE] - disk_dev.name = disk.get(constants.IDISK_NAME, None) - disks.append(disk_dev) - else: - if secondary_nodes: - raise errors.ProgrammerError("Wrong template configuration") - - if template_name == constants.DT_FILE: - _req_file_storage() - elif template_name == constants.DT_SHARED_FILE: - _req_shr_file_storage() - - name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None) - if name_prefix is None: - names = None - else: - names = _GenerateUniqueNames(lu, ["%s.disk%s" % - (name_prefix, base_index + i) - for i in range(disk_count)]) - - if template_name == constants.DT_PLAIN: - - def logical_id_fn(idx, _, disk): - vg = disk.get(constants.IDISK_VG, vgname) - return (vg, names[idx]) - - elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE): - logical_id_fn = \ - lambda _, disk_index, disk: (file_driver, - "%s/disk%d" % (file_storage_dir, - disk_index)) - elif template_name == constants.DT_BLOCK: - logical_id_fn = \ - lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL, - disk[constants.IDISK_ADOPT]) - elif template_name == constants.DT_RBD: - logical_id_fn = lambda idx, _, disk: ("rbd", names[idx]) - elif template_name == constants.DT_EXT: - def logical_id_fn(idx, _, disk): - provider = disk.get(constants.IDISK_PROVIDER, None) - if provider is None: - raise errors.ProgrammerError("Disk template is %s, but '%s' is" - " not found", constants.DT_EXT, - constants.IDISK_PROVIDER) - return (provider, names[idx]) - else: - raise errors.ProgrammerError("Unknown disk template '%s'" % template_name) - - dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name] - - for idx, disk in enumerate(disk_info): - params = {} - # Only for the Ext template add disk_info to params - if template_name == constants.DT_EXT: - params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER] - for key in disk: - if key not in constants.IDISK_PARAMS: - params[key] = disk[key] - disk_index = idx + base_index - size = disk[constants.IDISK_SIZE] - feedback_fn("* disk %s, size %s" % - (disk_index, utils.FormatUnit(size, "h"))) - disk_dev = objects.Disk(dev_type=dev_type, size=size, - logical_id=logical_id_fn(idx, disk_index, disk), - iv_name="disk/%d" % disk_index, - mode=disk[constants.IDISK_MODE], - params=params) - disk_dev.name = disk.get(constants.IDISK_NAME, None) - disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) - disks.append(disk_dev) - - return disks - - -def _CreateSingleBlockDev(lu, node, instance, device, info, force_open, - excl_stor): - """Create a single block device on a given node. - - This will not recurse over children of the device, so they must be - created in advance. - - @param lu: the lu on whose behalf we execute - @param node: the node on which to create the device - @type instance: L{objects.Instance} - @param instance: the instance which owns the device - @type device: L{objects.Disk} - @param device: the device to create - @param info: the extra 'metadata' we should attach to the device - (this will be represented as a LVM tag) - @type force_open: boolean - @param force_open: this parameter will be passes to the - L{backend.BlockdevCreate} function where it specifies - whether we run on primary or not, and it affects both - the child assembly and the device own Open() execution - @type excl_stor: boolean - @param excl_stor: Whether exclusive_storage is active for the node - - """ - lu.cfg.SetDiskID(device, node) - result = lu.rpc.call_blockdev_create(node, device, device.size, - instance.name, force_open, info, - excl_stor) - result.Raise("Can't create block device %s on" - " node %s for instance %s" % (device, node, instance.name)) - if device.physical_id is None: - device.physical_id = result.payload - - -def _CreateBlockDevInner(lu, node, instance, device, force_create, - info, force_open, excl_stor): - """Create a tree of block devices on a given node. - - If this device type has to be created on secondaries, create it and - all its children. - - If not, just recurse to children keeping the same 'force' value. - - @attention: The device has to be annotated already. - - @param lu: the lu on whose behalf we execute - @param node: the node on which to create the device - @type instance: L{objects.Instance} - @param instance: the instance which owns the device - @type device: L{objects.Disk} - @param device: the device to create - @type force_create: boolean - @param force_create: whether to force creation of this device; this - will be change to True whenever we find a device which has - CreateOnSecondary() attribute - @param info: the extra 'metadata' we should attach to the device - (this will be represented as a LVM tag) - @type force_open: boolean - @param force_open: this parameter will be passes to the - L{backend.BlockdevCreate} function where it specifies - whether we run on primary or not, and it affects both - the child assembly and the device own Open() execution - @type excl_stor: boolean - @param excl_stor: Whether exclusive_storage is active for the node - - @return: list of created devices - """ - created_devices = [] - try: - if device.CreateOnSecondary(): - force_create = True - - if device.children: - for child in device.children: - devs = _CreateBlockDevInner(lu, node, instance, child, force_create, - info, force_open, excl_stor) - created_devices.extend(devs) - - if not force_create: - return created_devices - - _CreateSingleBlockDev(lu, node, instance, device, info, force_open, - excl_stor) - # The device has been completely created, so there is no point in keeping - # its subdevices in the list. We just add the device itself instead. - created_devices = [(node, device)] - return created_devices - - except errors.DeviceCreationError, e: - e.created_devices.extend(created_devices) - raise e - except errors.OpExecError, e: - raise errors.DeviceCreationError(str(e), created_devices) - - -def _IsExclusiveStorageEnabledNodeName(cfg, nodename): - """Whether exclusive_storage is in effect for the given node. - - @type cfg: L{config.ConfigWriter} - @param cfg: The cluster configuration - @type nodename: string - @param nodename: The node - @rtype: bool - @return: The effective value of exclusive_storage - @raise errors.OpPrereqError: if no node exists with the given name - - """ - ni = cfg.GetNodeInfo(nodename) - if ni is None: - raise errors.OpPrereqError("Invalid node name %s" % nodename, - errors.ECODE_NOENT) - return _IsExclusiveStorageEnabledNode(cfg, ni) - - -def _CreateBlockDev(lu, node, instance, device, force_create, info, - force_open): - """Wrapper around L{_CreateBlockDevInner}. - - This method annotates the root device first. - - """ - (disk,) = _AnnotateDiskParams(instance, [device], lu.cfg) - excl_stor = _IsExclusiveStorageEnabledNodeName(lu.cfg, node) - return _CreateBlockDevInner(lu, node, instance, disk, force_create, info, - force_open, excl_stor) - - -def _CreateDisks(lu, instance, to_skip=None, target_node=None): - """Create all disks for an instance. - - This abstracts away some work from AddInstance. - - @type lu: L{LogicalUnit} - @param lu: the logical unit on whose behalf we execute - @type instance: L{objects.Instance} - @param instance: the instance whose disks we should create - @type to_skip: list - @param to_skip: list of indices to skip - @type target_node: string - @param target_node: if passed, overrides the target node for creation - @rtype: boolean - @return: the success of the creation - - """ - info = _GetInstanceInfoText(instance) - if target_node is None: - pnode = instance.primary_node - all_nodes = instance.all_nodes - else: - pnode = target_node - all_nodes = [pnode] - - if instance.disk_template in constants.DTS_FILEBASED: - file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1]) - result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir) - - result.Raise("Failed to create directory '%s' on" - " node %s" % (file_storage_dir, pnode)) - - disks_created = [] - # Note: this needs to be kept in sync with adding of disks in - # LUInstanceSetParams - for idx, device in enumerate(instance.disks): - if to_skip and idx in to_skip: - continue - logging.info("Creating disk %s for instance '%s'", idx, instance.name) - #HARDCODE - for node in all_nodes: - f_create = node == pnode - try: - _CreateBlockDev(lu, node, instance, device, f_create, info, f_create) - disks_created.append((node, device)) - except errors.OpExecError: - logging.warning("Creating disk %s for instance '%s' failed", - idx, instance.name) - except errors.DeviceCreationError, e: - logging.warning("Creating disk %s for instance '%s' failed", - idx, instance.name) - disks_created.extend(e.created_devices) - for (node, disk) in disks_created: - lu.cfg.SetDiskID(disk, node) - result = lu.rpc.call_blockdev_remove(node, disk) - if result.fail_msg: - logging.warning("Failed to remove newly-created disk %s on node %s:" - " %s", device, node, result.fail_msg) - raise errors.OpExecError(e.message) - - -def _CalcEta(time_taken, written, total_size): - """Calculates the ETA based on size written and total size. - - @param time_taken: The time taken so far - @param written: amount written so far - @param total_size: The total size of data to be written - @return: The remaining time in seconds - - """ - avg_time = time_taken / float(written) - return (total_size - written) * avg_time - - -def _WipeDisks(lu, instance, disks=None): - """Wipes instance disks. - - @type lu: L{LogicalUnit} - @param lu: the logical unit on whose behalf we execute - @type instance: L{objects.Instance} - @param instance: the instance whose disks we should create - @type disks: None or list of tuple of (number, L{objects.Disk}, number) - @param disks: Disk details; tuple contains disk index, disk object and the - start offset - - """ - node = instance.primary_node - - if disks is None: - disks = [(idx, disk, 0) - for (idx, disk) in enumerate(instance.disks)] - - for (_, device, _) in disks: - lu.cfg.SetDiskID(device, node) - - logging.info("Pausing synchronization of disks of instance '%s'", - instance.name) - result = lu.rpc.call_blockdev_pause_resume_sync(node, - (map(compat.snd, disks), - instance), - True) - result.Raise("Failed to pause disk synchronization on node '%s'" % node) - - for idx, success in enumerate(result.payload): - if not success: - logging.warn("Pausing synchronization of disk %s of instance '%s'" - " failed", idx, instance.name) - - try: - for (idx, device, offset) in disks: - # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but - # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors. - wipe_chunk_size = \ - int(min(constants.MAX_WIPE_CHUNK, - device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT)) - - size = device.size - last_output = 0 - start_time = time.time() - - if offset == 0: - info_text = "" - else: - info_text = (" (from %s to %s)" % - (utils.FormatUnit(offset, "h"), - utils.FormatUnit(size, "h"))) - - lu.LogInfo("* Wiping disk %s%s", idx, info_text) - - logging.info("Wiping disk %d for instance %s on node %s using" - " chunk size %s", idx, instance.name, node, wipe_chunk_size) - - while offset < size: - wipe_size = min(wipe_chunk_size, size - offset) - - logging.debug("Wiping disk %d, offset %s, chunk %s", - idx, offset, wipe_size) - - result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset, - wipe_size) - result.Raise("Could not wipe disk %d at offset %d for size %d" % - (idx, offset, wipe_size)) - - now = time.time() - offset += wipe_size - if now - last_output >= 60: - eta = _CalcEta(now - start_time, offset, size) - lu.LogInfo(" - done: %.1f%% ETA: %s", - offset / float(size) * 100, utils.FormatSeconds(eta)) - last_output = now - finally: - logging.info("Resuming synchronization of disks for instance '%s'", - instance.name) - - result = lu.rpc.call_blockdev_pause_resume_sync(node, - (map(compat.snd, disks), - instance), - False) - - if result.fail_msg: - lu.LogWarning("Failed to resume disk synchronization on node '%s': %s", - node, result.fail_msg) - else: - for idx, success in enumerate(result.payload): - if not success: - lu.LogWarning("Resuming synchronization of disk %s of instance '%s'" - " failed", idx, instance.name) - - class LUInstanceCreate(LogicalUnit): """Create an instance. @@ -2259,13 +1479,6 @@ class LUInstanceCreate(LogicalUnit): return list(iobj.all_nodes) -def _GetInstanceInfoText(instance): - """Compute that text that should be added to the disk's metadata. - - """ - return "originstname+%s" % instance.name - - class LUInstanceRename(LogicalUnit): """Rename an instance. @@ -2476,53 +1689,6 @@ def _CheckInstanceBridgesExist(lu, instance, node=None): _CheckNicsBridgesExist(lu, instance.nics, node) -def _ComputeIPolicyNodeViolation(ipolicy, instance, current_group, - target_group, cfg, - _compute_fn=_ComputeIPolicyInstanceViolation): - """Compute if instance meets the specs of the new target group. - - @param ipolicy: The ipolicy to verify - @param instance: The instance object to verify - @param current_group: The current group of the instance - @param target_group: The new group of the instance - @type cfg: L{config.ConfigWriter} - @param cfg: Cluster configuration - @param _compute_fn: The function to verify ipolicy (unittest only) - @see: L{_ComputeIPolicySpecViolation} - - """ - if current_group == target_group: - return [] - else: - return _compute_fn(ipolicy, instance, cfg) - - -def _CheckTargetNodeIPolicy(lu, ipolicy, instance, node, cfg, ignore=False, - _compute_fn=_ComputeIPolicyNodeViolation): - """Checks that the target node is correct in terms of instance policy. - - @param ipolicy: The ipolicy to verify - @param instance: The instance object to verify - @param node: The new node to relocate - @type cfg: L{config.ConfigWriter} - @param cfg: Cluster configuration - @param ignore: Ignore violations of the ipolicy - @param _compute_fn: The function to verify ipolicy (unittest only) - @see: L{_ComputeIPolicySpecViolation} - - """ - primary_node = lu.cfg.GetNodeInfo(instance.primary_node) - res = _compute_fn(ipolicy, instance, primary_node.group, node.group, cfg) - - if res: - msg = ("Instance does not meet target node group's (%s) instance" - " policy: %s") % (node.group, utils.CommaJoin(res)) - if ignore: - lu.LogWarning(msg) - else: - raise errors.OpPrereqError(msg, errors.ECODE_INVAL) - - class LUInstanceMove(LogicalUnit): """Move an instance by data-copying. @@ -3072,821 +2238,72 @@ class LUInstanceQueryData(NoHooksLU): cluster = self.cfg.GetClusterInfo() - node_names = itertools.chain(*(i.all_nodes for i in self.wanted_instances)) - nodes = dict(self.cfg.GetMultiNodeInfo(node_names)) - - groups = dict(self.cfg.GetMultiNodeGroupInfo(node.group - for node in nodes.values())) - - group2name_fn = lambda uuid: groups[uuid].name - for instance in self.wanted_instances: - pnode = nodes[instance.primary_node] - - if self.op.static or pnode.offline: - remote_state = None - if pnode.offline: - self.LogWarning("Primary node %s is marked offline, returning static" - " information only for instance %s" % - (pnode.name, instance.name)) - else: - remote_info = self.rpc.call_instance_info(instance.primary_node, - instance.name, - instance.hypervisor) - remote_info.Raise("Error checking node %s" % instance.primary_node) - remote_info = remote_info.payload - if remote_info and "state" in remote_info: - remote_state = "up" - else: - if instance.admin_state == constants.ADMINST_UP: - remote_state = "down" - else: - remote_state = instance.admin_state - - disks = map(compat.partial(self._ComputeDiskStatus, instance, None), - instance.disks) - - snodes_group_uuids = [nodes[snode_name].group - for snode_name in instance.secondary_nodes] - - result[instance.name] = { - "name": instance.name, - "config_state": instance.admin_state, - "run_state": remote_state, - "pnode": instance.primary_node, - "pnode_group_uuid": pnode.group, - "pnode_group_name": group2name_fn(pnode.group), - "snodes": instance.secondary_nodes, - "snodes_group_uuids": snodes_group_uuids, - "snodes_group_names": map(group2name_fn, snodes_group_uuids), - "os": instance.os, - # this happens to be the same format used for hooks - "nics": _NICListToTuple(self, instance.nics), - "disk_template": instance.disk_template, - "disks": disks, - "hypervisor": instance.hypervisor, - "network_port": instance.network_port, - "hv_instance": instance.hvparams, - "hv_actual": cluster.FillHV(instance, skip_globals=True), - "be_instance": instance.beparams, - "be_actual": cluster.FillBE(instance), - "os_instance": instance.osparams, - "os_actual": cluster.SimpleFillOS(instance.os, instance.osparams), - "serial_no": instance.serial_no, - "mtime": instance.mtime, - "ctime": instance.ctime, - "uuid": instance.uuid, - } - - return result - - -class LUInstanceRecreateDisks(LogicalUnit): - """Recreate an instance's missing disks. - - """ - HPATH = "instance-recreate-disks" - HTYPE = constants.HTYPE_INSTANCE - REQ_BGL = False - - _MODIFYABLE = compat.UniqueFrozenset([ - constants.IDISK_SIZE, - constants.IDISK_MODE, - ]) - - # New or changed disk parameters may have different semantics - assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([ - constants.IDISK_ADOPT, - - # TODO: Implement support changing VG while recreating - constants.IDISK_VG, - constants.IDISK_METAVG, - constants.IDISK_PROVIDER, - constants.IDISK_NAME, - ])) - - def _RunAllocator(self): - """Run the allocator based on input opcode. - - """ - be_full = self.cfg.GetClusterInfo().FillBE(self.instance) - - # FIXME - # The allocator should actually run in "relocate" mode, but current - # allocators don't support relocating all the nodes of an instance at - # the same time. As a workaround we use "allocate" mode, but this is - # suboptimal for two reasons: - # - The instance name passed to the allocator is present in the list of - # existing instances, so there could be a conflict within the - # internal structures of the allocator. This doesn't happen with the - # current allocators, but it's a liability. - # - The allocator counts the resources used by the instance twice: once - # because the instance exists already, and once because it tries to - # allocate a new instance. - # The allocator could choose some of the nodes on which the instance is - # running, but that's not a problem. If the instance nodes are broken, - # they should be already be marked as drained or offline, and hence - # skipped by the allocator. If instance disks have been lost for other - # reasons, then recreating the disks on the same nodes should be fine. - disk_template = self.instance.disk_template - spindle_use = be_full[constants.BE_SPINDLE_USE] - req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name, - disk_template=disk_template, - tags=list(self.instance.GetTags()), - os=self.instance.os, - nics=[{}], - vcpus=be_full[constants.BE_VCPUS], - memory=be_full[constants.BE_MAXMEM], - spindle_use=spindle_use, - disks=[{constants.IDISK_SIZE: d.size, - constants.IDISK_MODE: d.mode} - for d in self.instance.disks], - hypervisor=self.instance.hypervisor, - node_whitelist=None) - ial = iallocator.IAllocator(self.cfg, self.rpc, req) - - ial.Run(self.op.iallocator) - - assert req.RequiredNodes() == len(self.instance.all_nodes) - - if not ial.success: - raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':" - " %s" % (self.op.iallocator, ial.info), - errors.ECODE_NORES) - - self.op.nodes = ial.result - self.LogInfo("Selected nodes for instance %s via iallocator %s: %s", - self.op.instance_name, self.op.iallocator, - utils.CommaJoin(ial.result)) - - def CheckArguments(self): - if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]): - # Normalize and convert deprecated list of disk indices - self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))] - - duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks)) - if duplicates: - raise errors.OpPrereqError("Some disks have been specified more than" - " once: %s" % utils.CommaJoin(duplicates), - errors.ECODE_INVAL) - - # We don't want _CheckIAllocatorOrNode selecting the default iallocator - # when neither iallocator nor nodes are specified - if self.op.iallocator or self.op.nodes: - _CheckIAllocatorOrNode(self, "iallocator", "nodes") - - for (idx, params) in self.op.disks: - utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES) - unsupported = frozenset(params.keys()) - self._MODIFYABLE - if unsupported: - raise errors.OpPrereqError("Parameters for disk %s try to change" - " unmodifyable parameter(s): %s" % - (idx, utils.CommaJoin(unsupported)), - errors.ECODE_INVAL) - - def ExpandNames(self): - self._ExpandAndLockInstance() - self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND - - if self.op.nodes: - self.op.nodes = [_ExpandNodeName(self.cfg, n) for n in self.op.nodes] - self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes) - else: - self.needed_locks[locking.LEVEL_NODE] = [] - if self.op.iallocator: - # iallocator will select a new node in the same group - self.needed_locks[locking.LEVEL_NODEGROUP] = [] - self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET - - self.needed_locks[locking.LEVEL_NODE_RES] = [] - - def DeclareLocks(self, level): - if level == locking.LEVEL_NODEGROUP: - assert self.op.iallocator is not None - assert not self.op.nodes - assert not self.needed_locks[locking.LEVEL_NODEGROUP] - self.share_locks[locking.LEVEL_NODEGROUP] = 1 - # Lock the primary group used by the instance optimistically; this - # requires going via the node before it's locked, requiring - # verification later on - self.needed_locks[locking.LEVEL_NODEGROUP] = \ - self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True) - - elif level == locking.LEVEL_NODE: - # If an allocator is used, then we lock all the nodes in the current - # instance group, as we don't know yet which ones will be selected; - # if we replace the nodes without using an allocator, locks are - # already declared in ExpandNames; otherwise, we need to lock all the - # instance nodes for disk re-creation - if self.op.iallocator: - assert not self.op.nodes - assert not self.needed_locks[locking.LEVEL_NODE] - assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1 - - # Lock member nodes of the group of the primary node - for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP): - self.needed_locks[locking.LEVEL_NODE].extend( - self.cfg.GetNodeGroup(group_uuid).members) - - assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC) - elif not self.op.nodes: - self._LockInstancesNodes(primary_only=False) - elif level == locking.LEVEL_NODE_RES: - # Copy node locks - self.needed_locks[locking.LEVEL_NODE_RES] = \ - _CopyLockList(self.needed_locks[locking.LEVEL_NODE]) - - def BuildHooksEnv(self): - """Build hooks env. - - This runs on master, primary and secondary nodes of the instance. - - """ - return _BuildInstanceHookEnvByObject(self, self.instance) - - def BuildHooksNodes(self): - """Build hooks nodes. - - """ - nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) - return (nl, nl) - - def CheckPrereq(self): - """Check prerequisites. - - This checks that the instance is in the cluster and is not running. - - """ - instance = self.cfg.GetInstanceInfo(self.op.instance_name) - assert instance is not None, \ - "Cannot retrieve locked instance %s" % self.op.instance_name - if self.op.nodes: - if len(self.op.nodes) != len(instance.all_nodes): - raise errors.OpPrereqError("Instance %s currently has %d nodes, but" - " %d replacement nodes were specified" % - (instance.name, len(instance.all_nodes), - len(self.op.nodes)), - errors.ECODE_INVAL) - assert instance.disk_template != constants.DT_DRBD8 or \ - len(self.op.nodes) == 2 - assert instance.disk_template != constants.DT_PLAIN or \ - len(self.op.nodes) == 1 - primary_node = self.op.nodes[0] - else: - primary_node = instance.primary_node - if not self.op.iallocator: - _CheckNodeOnline(self, primary_node) - - if instance.disk_template == constants.DT_DISKLESS: - raise errors.OpPrereqError("Instance '%s' has no disks" % - self.op.instance_name, errors.ECODE_INVAL) - - # Verify if node group locks are still correct - owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) - if owned_groups: - # Node group locks are acquired only for the primary node (and only - # when the allocator is used) - _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups, - primary_only=True) - - # if we replace nodes *and* the old primary is offline, we don't - # check the instance state - old_pnode = self.cfg.GetNodeInfo(instance.primary_node) - if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline): - _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING, - msg="cannot recreate disks") - - if self.op.disks: - self.disks = dict(self.op.disks) - else: - self.disks = dict((idx, {}) for idx in range(len(instance.disks))) - - maxidx = max(self.disks.keys()) - if maxidx >= len(instance.disks): - raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx, - errors.ECODE_INVAL) - - if ((self.op.nodes or self.op.iallocator) and - sorted(self.disks.keys()) != range(len(instance.disks))): - raise errors.OpPrereqError("Can't recreate disks partially and" - " change the nodes at the same time", - errors.ECODE_INVAL) - - self.instance = instance - - if self.op.iallocator: - self._RunAllocator() - # Release unneeded node and node resource locks - _ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes) - _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes) - _ReleaseLocks(self, locking.LEVEL_NODE_ALLOC) - - assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC) - - def Exec(self, feedback_fn): - """Recreate the disks. - - """ - instance = self.instance - - assert (self.owned_locks(locking.LEVEL_NODE) == - self.owned_locks(locking.LEVEL_NODE_RES)) - - to_skip = [] - mods = [] # keeps track of needed changes - - for idx, disk in enumerate(instance.disks): - try: - changes = self.disks[idx] - except KeyError: - # Disk should not be recreated - to_skip.append(idx) - continue - - # update secondaries for disks, if needed - if self.op.nodes and disk.dev_type == constants.LD_DRBD8: - # need to update the nodes and minors - assert len(self.op.nodes) == 2 - assert len(disk.logical_id) == 6 # otherwise disk internals - # have changed - (_, _, old_port, _, _, old_secret) = disk.logical_id - new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name) - new_id = (self.op.nodes[0], self.op.nodes[1], old_port, - new_minors[0], new_minors[1], old_secret) - assert len(disk.logical_id) == len(new_id) - else: - new_id = None - - mods.append((idx, new_id, changes)) - - # now that we have passed all asserts above, we can apply the mods - # in a single run (to avoid partial changes) - for idx, new_id, changes in mods: - disk = instance.disks[idx] - if new_id is not None: - assert disk.dev_type == constants.LD_DRBD8 - disk.logical_id = new_id - if changes: - disk.Update(size=changes.get(constants.IDISK_SIZE, None), - mode=changes.get(constants.IDISK_MODE, None)) - - # change primary node, if needed - if self.op.nodes: - instance.primary_node = self.op.nodes[0] - self.LogWarning("Changing the instance's nodes, you will have to" - " remove any disks left on the older nodes manually") - - if self.op.nodes: - self.cfg.Update(instance, feedback_fn) - - # All touched nodes must be locked - mylocks = self.owned_locks(locking.LEVEL_NODE) - assert mylocks.issuperset(frozenset(instance.all_nodes)) - _CreateDisks(self, instance, to_skip=to_skip) - - -def _SafeShutdownInstanceDisks(lu, instance, disks=None): - """Shutdown block devices of an instance. - - This function checks if an instance is running, before calling - _ShutdownInstanceDisks. - - """ - _CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks") - _ShutdownInstanceDisks(lu, instance, disks=disks) - - -def _DiskSizeInBytesToMebibytes(lu, size): - """Converts a disk size in bytes to mebibytes. - - Warns and rounds up if the size isn't an even multiple of 1 MiB. - - """ - (mib, remainder) = divmod(size, 1024 * 1024) - - if remainder != 0: - lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up" - " to not overwrite existing data (%s bytes will not be" - " wiped)", (1024 * 1024) - remainder) - mib += 1 - - return mib - - -class LUInstanceGrowDisk(LogicalUnit): - """Grow a disk of an instance. - - """ - HPATH = "disk-grow" - HTYPE = constants.HTYPE_INSTANCE - REQ_BGL = False - - def ExpandNames(self): - self._ExpandAndLockInstance() - self.needed_locks[locking.LEVEL_NODE] = [] - self.needed_locks[locking.LEVEL_NODE_RES] = [] - self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE - self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE - - def DeclareLocks(self, level): - if level == locking.LEVEL_NODE: - self._LockInstancesNodes() - elif level == locking.LEVEL_NODE_RES: - # Copy node locks - self.needed_locks[locking.LEVEL_NODE_RES] = \ - _CopyLockList(self.needed_locks[locking.LEVEL_NODE]) - - def BuildHooksEnv(self): - """Build hooks env. - - This runs on the master, the primary and all the secondaries. - - """ - env = { - "DISK": self.op.disk, - "AMOUNT": self.op.amount, - "ABSOLUTE": self.op.absolute, - } - env.update(_BuildInstanceHookEnvByObject(self, self.instance)) - return env - - def BuildHooksNodes(self): - """Build hooks nodes. - - """ - nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) - return (nl, nl) - - def CheckPrereq(self): - """Check prerequisites. - - This checks that the instance is in the cluster. - - """ - instance = self.cfg.GetInstanceInfo(self.op.instance_name) - assert instance is not None, \ - "Cannot retrieve locked instance %s" % self.op.instance_name - nodenames = list(instance.all_nodes) - for node in nodenames: - _CheckNodeOnline(self, node) - - self.instance = instance - - if instance.disk_template not in constants.DTS_GROWABLE: - raise errors.OpPrereqError("Instance's disk layout does not support" - " growing", errors.ECODE_INVAL) - - self.disk = instance.FindDisk(self.op.disk) - - if self.op.absolute: - self.target = self.op.amount - self.delta = self.target - self.disk.size - if self.delta < 0: - raise errors.OpPrereqError("Requested size (%s) is smaller than " - "current disk size (%s)" % - (utils.FormatUnit(self.target, "h"), - utils.FormatUnit(self.disk.size, "h")), - errors.ECODE_STATE) - else: - self.delta = self.op.amount - self.target = self.disk.size + self.delta - if self.delta < 0: - raise errors.OpPrereqError("Requested increment (%s) is negative" % - utils.FormatUnit(self.delta, "h"), - errors.ECODE_INVAL) - - self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta)) - - def _CheckDiskSpace(self, nodenames, req_vgspace): - template = self.instance.disk_template - if template not in (constants.DTS_NO_FREE_SPACE_CHECK): - # TODO: check the free disk space for file, when that feature will be - # supported - nodes = map(self.cfg.GetNodeInfo, nodenames) - es_nodes = filter(lambda n: _IsExclusiveStorageEnabledNode(self.cfg, n), - nodes) - if es_nodes: - # With exclusive storage we need to something smarter than just looking - # at free space; for now, let's simply abort the operation. - raise errors.OpPrereqError("Cannot grow disks when exclusive_storage" - " is enabled", errors.ECODE_STATE) - _CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace) - - def Exec(self, feedback_fn): - """Execute disk grow. - - """ - instance = self.instance - disk = self.disk - - assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE) - assert (self.owned_locks(locking.LEVEL_NODE) == - self.owned_locks(locking.LEVEL_NODE_RES)) - - wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks - - disks_ok, _ = _AssembleInstanceDisks(self, self.instance, disks=[disk]) - if not disks_ok: - raise errors.OpExecError("Cannot activate block device to grow") - - feedback_fn("Growing disk %s of instance '%s' by %s to %s" % - (self.op.disk, instance.name, - utils.FormatUnit(self.delta, "h"), - utils.FormatUnit(self.target, "h"))) - - # First run all grow ops in dry-run mode - for node in instance.all_nodes: - self.cfg.SetDiskID(disk, node) - result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta, - True, True) - result.Raise("Dry-run grow request failed to node %s" % node) - - if wipe_disks: - # Get disk size from primary node for wiping - result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk]) - result.Raise("Failed to retrieve disk size from node '%s'" % - instance.primary_node) - - (disk_size_in_bytes, ) = result.payload - - if disk_size_in_bytes is None: - raise errors.OpExecError("Failed to retrieve disk size from primary" - " node '%s'" % instance.primary_node) - - old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes) - - assert old_disk_size >= disk.size, \ - ("Retrieved disk size too small (got %s, should be at least %s)" % - (old_disk_size, disk.size)) - else: - old_disk_size = None - - # We know that (as far as we can test) operations across different - # nodes will succeed, time to run it for real on the backing storage - for node in instance.all_nodes: - self.cfg.SetDiskID(disk, node) - result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta, - False, True) - result.Raise("Grow request failed to node %s" % node) - - # And now execute it for logical storage, on the primary node - node = instance.primary_node - self.cfg.SetDiskID(disk, node) - result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta, - False, False) - result.Raise("Grow request failed to node %s" % node) - - disk.RecordGrow(self.delta) - self.cfg.Update(instance, feedback_fn) - - # Changes have been recorded, release node lock - _ReleaseLocks(self, locking.LEVEL_NODE) - - # Downgrade lock while waiting for sync - self.glm.downgrade(locking.LEVEL_INSTANCE) - - assert wipe_disks ^ (old_disk_size is None) - - if wipe_disks: - assert instance.disks[self.op.disk] == disk - - # Wipe newly added disk space - _WipeDisks(self, instance, - disks=[(self.op.disk, disk, old_disk_size)]) - - if self.op.wait_for_sync: - disk_abort = not _WaitForSync(self, instance, disks=[disk]) - if disk_abort: - self.LogWarning("Disk syncing has not returned a good status; check" - " the instance") - if instance.admin_state != constants.ADMINST_UP: - _SafeShutdownInstanceDisks(self, instance, disks=[disk]) - elif instance.admin_state != constants.ADMINST_UP: - self.LogWarning("Not shutting down the disk even if the instance is" - " not supposed to be running because no wait for" - " sync mode was requested") - - assert self.owned_locks(locking.LEVEL_NODE_RES) - assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE) - - -class LUInstanceReplaceDisks(LogicalUnit): - """Replace the disks of an instance. - - """ - HPATH = "mirrors-replace" - HTYPE = constants.HTYPE_INSTANCE - REQ_BGL = False - - def CheckArguments(self): - """Check arguments. - - """ - remote_node = self.op.remote_node - ialloc = self.op.iallocator - if self.op.mode == constants.REPLACE_DISK_CHG: - if remote_node is None and ialloc is None: - raise errors.OpPrereqError("When changing the secondary either an" - " iallocator script must be used or the" - " new node given", errors.ECODE_INVAL) - else: - _CheckIAllocatorOrNode(self, "iallocator", "remote_node") - - elif remote_node is not None or ialloc is not None: - # Not replacing the secondary - raise errors.OpPrereqError("The iallocator and new node options can" - " only be used when changing the" - " secondary node", errors.ECODE_INVAL) - - def ExpandNames(self): - self._ExpandAndLockInstance() - - assert locking.LEVEL_NODE not in self.needed_locks - assert locking.LEVEL_NODE_RES not in self.needed_locks - assert locking.LEVEL_NODEGROUP not in self.needed_locks - - assert self.op.iallocator is None or self.op.remote_node is None, \ - "Conflicting options" - - if self.op.remote_node is not None: - self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node) - - # Warning: do not remove the locking of the new secondary here - # unless DRBD8.AddChildren is changed to work in parallel; - # currently it doesn't since parallel invocations of - # FindUnusedMinor will conflict - self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node] - self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND - else: - self.needed_locks[locking.LEVEL_NODE] = [] - self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE - - if self.op.iallocator is not None: - # iallocator will select a new node in the same group - self.needed_locks[locking.LEVEL_NODEGROUP] = [] - self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET - - self.needed_locks[locking.LEVEL_NODE_RES] = [] - - self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode, - self.op.iallocator, self.op.remote_node, - self.op.disks, self.op.early_release, - self.op.ignore_ipolicy) - - self.tasklets = [self.replacer] - - def DeclareLocks(self, level): - if level == locking.LEVEL_NODEGROUP: - assert self.op.remote_node is None - assert self.op.iallocator is not None - assert not self.needed_locks[locking.LEVEL_NODEGROUP] - - self.share_locks[locking.LEVEL_NODEGROUP] = 1 - # Lock all groups used by instance optimistically; this requires going - # via the node before it's locked, requiring verification later on - self.needed_locks[locking.LEVEL_NODEGROUP] = \ - self.cfg.GetInstanceNodeGroups(self.op.instance_name) - - elif level == locking.LEVEL_NODE: - if self.op.iallocator is not None: - assert self.op.remote_node is None - assert not self.needed_locks[locking.LEVEL_NODE] - assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC) - - # Lock member nodes of all locked groups - self.needed_locks[locking.LEVEL_NODE] = \ - [node_name - for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP) - for node_name in self.cfg.GetNodeGroup(group_uuid).members] - else: - assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC) - - self._LockInstancesNodes() - - elif level == locking.LEVEL_NODE_RES: - # Reuse node locks - self.needed_locks[locking.LEVEL_NODE_RES] = \ - self.needed_locks[locking.LEVEL_NODE] - - def BuildHooksEnv(self): - """Build hooks env. - - This runs on the master, the primary and all the secondaries. - - """ - instance = self.replacer.instance - env = { - "MODE": self.op.mode, - "NEW_SECONDARY": self.op.remote_node, - "OLD_SECONDARY": instance.secondary_nodes[0], - } - env.update(_BuildInstanceHookEnvByObject(self, instance)) - return env - - def BuildHooksNodes(self): - """Build hooks nodes. - - """ - instance = self.replacer.instance - nl = [ - self.cfg.GetMasterNode(), - instance.primary_node, - ] - if self.op.remote_node is not None: - nl.append(self.op.remote_node) - return nl, nl - - def CheckPrereq(self): - """Check prerequisites. - - """ - assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or - self.op.iallocator is None) - - # Verify if node group locks are still correct - owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) - if owned_groups: - _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups) - - return LogicalUnit.CheckPrereq(self) - - -class LUInstanceActivateDisks(NoHooksLU): - """Bring up an instance's disks. - - """ - REQ_BGL = False - - def ExpandNames(self): - self._ExpandAndLockInstance() - self.needed_locks[locking.LEVEL_NODE] = [] - self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE - - def DeclareLocks(self, level): - if level == locking.LEVEL_NODE: - self._LockInstancesNodes() - - def CheckPrereq(self): - """Check prerequisites. - - This checks that the instance is in the cluster. - - """ - self.instance = self.cfg.GetInstanceInfo(self.op.instance_name) - assert self.instance is not None, \ - "Cannot retrieve locked instance %s" % self.op.instance_name - _CheckNodeOnline(self, self.instance.primary_node) - - def Exec(self, feedback_fn): - """Activate the disks. - - """ - disks_ok, disks_info = \ - _AssembleInstanceDisks(self, self.instance, - ignore_size=self.op.ignore_size) - if not disks_ok: - raise errors.OpExecError("Cannot activate block devices") - - if self.op.wait_for_sync: - if not _WaitForSync(self, self.instance): - raise errors.OpExecError("Some disks of the instance are degraded!") - - return disks_info - - -class LUInstanceDeactivateDisks(NoHooksLU): - """Shutdown an instance's disks. - - """ - REQ_BGL = False - - def ExpandNames(self): - self._ExpandAndLockInstance() - self.needed_locks[locking.LEVEL_NODE] = [] - self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + node_names = itertools.chain(*(i.all_nodes for i in self.wanted_instances)) + nodes = dict(self.cfg.GetMultiNodeInfo(node_names)) - def DeclareLocks(self, level): - if level == locking.LEVEL_NODE: - self._LockInstancesNodes() + groups = dict(self.cfg.GetMultiNodeGroupInfo(node.group + for node in nodes.values())) - def CheckPrereq(self): - """Check prerequisites. + group2name_fn = lambda uuid: groups[uuid].name + for instance in self.wanted_instances: + pnode = nodes[instance.primary_node] - This checks that the instance is in the cluster. + if self.op.static or pnode.offline: + remote_state = None + if pnode.offline: + self.LogWarning("Primary node %s is marked offline, returning static" + " information only for instance %s" % + (pnode.name, instance.name)) + else: + remote_info = self.rpc.call_instance_info(instance.primary_node, + instance.name, + instance.hypervisor) + remote_info.Raise("Error checking node %s" % instance.primary_node) + remote_info = remote_info.payload + if remote_info and "state" in remote_info: + remote_state = "up" + else: + if instance.admin_state == constants.ADMINST_UP: + remote_state = "down" + else: + remote_state = instance.admin_state - """ - self.instance = self.cfg.GetInstanceInfo(self.op.instance_name) - assert self.instance is not None, \ - "Cannot retrieve locked instance %s" % self.op.instance_name + disks = map(compat.partial(self._ComputeDiskStatus, instance, None), + instance.disks) - def Exec(self, feedback_fn): - """Deactivate the disks + snodes_group_uuids = [nodes[snode_name].group + for snode_name in instance.secondary_nodes] - """ - instance = self.instance - if self.op.force: - _ShutdownInstanceDisks(self, instance) - else: - _SafeShutdownInstanceDisks(self, instance) + result[instance.name] = { + "name": instance.name, + "config_state": instance.admin_state, + "run_state": remote_state, + "pnode": instance.primary_node, + "pnode_group_uuid": pnode.group, + "pnode_group_name": group2name_fn(pnode.group), + "snodes": instance.secondary_nodes, + "snodes_group_uuids": snodes_group_uuids, + "snodes_group_names": map(group2name_fn, snodes_group_uuids), + "os": instance.os, + # this happens to be the same format used for hooks + "nics": _NICListToTuple(self, instance.nics), + "disk_template": instance.disk_template, + "disks": disks, + "hypervisor": instance.hypervisor, + "network_port": instance.network_port, + "hv_instance": instance.hvparams, + "hv_actual": cluster.FillHV(instance, skip_globals=True), + "be_instance": instance.beparams, + "be_actual": cluster.FillBE(instance), + "os_instance": instance.osparams, + "os_actual": cluster.SimpleFillOS(instance.os, instance.osparams), + "serial_no": instance.serial_no, + "mtime": instance.mtime, + "ctime": instance.ctime, + "uuid": instance.uuid, + } + + return result class LUInstanceStartup(LogicalUnit): @@ -6177,53 +4594,6 @@ class LUInstanceChangeGroup(LogicalUnit): return ResultWithJobs(jobs) -def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary, - ldisk=False): - """Check that mirrors are not degraded. - - @attention: The device has to be annotated already. - - The ldisk parameter, if True, will change the test from the - is_degraded attribute (which represents overall non-ok status for - the device(s)) to the ldisk (representing the local storage status). - - """ - lu.cfg.SetDiskID(dev, node) - - result = True - - if on_primary or dev.AssembleOnSecondary(): - rstats = lu.rpc.call_blockdev_find(node, dev) - msg = rstats.fail_msg - if msg: - lu.LogWarning("Can't find disk on node %s: %s", node, msg) - result = False - elif not rstats.payload: - lu.LogWarning("Can't find disk on node %s", node) - result = False - else: - if ldisk: - result = result and rstats.payload.ldisk_status == constants.LDS_OKAY - else: - result = result and not rstats.payload.is_degraded - - if dev.children: - for child in dev.children: - result = result and _CheckDiskConsistencyInner(lu, instance, child, node, - on_primary) - - return result - - -def _CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False): - """Wrapper around L{_CheckDiskConsistencyInner}. - - """ - (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg) - return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary, - ldisk=ldisk) - - class TLMigrateInstance(Tasklet): """Tasklet class for instance migration. @@ -6926,760 +5296,3 @@ class TLMigrateInstance(Tasklet): return self._ExecCleanup() else: return self._ExecMigration() - - -def _BlockdevFind(lu, node, dev, instance): - """Wrapper around call_blockdev_find to annotate diskparams. - - @param lu: A reference to the lu object - @param node: The node to call out - @param dev: The device to find - @param instance: The instance object the device belongs to - @returns The result of the rpc call - - """ - (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg) - return lu.rpc.call_blockdev_find(node, disk) - - -class TLReplaceDisks(Tasklet): - """Replaces disks for an instance. - - Note: Locking is not within the scope of this class. - - """ - def __init__(self, lu, instance_name, mode, iallocator_name, remote_node, - disks, early_release, ignore_ipolicy): - """Initializes this class. - - """ - Tasklet.__init__(self, lu) - - # Parameters - self.instance_name = instance_name - self.mode = mode - self.iallocator_name = iallocator_name - self.remote_node = remote_node - self.disks = disks - self.early_release = early_release - self.ignore_ipolicy = ignore_ipolicy - - # Runtime data - self.instance = None - self.new_node = None - self.target_node = None - self.other_node = None - self.remote_node_info = None - self.node_secondary_ip = None - - @staticmethod - def _RunAllocator(lu, iallocator_name, instance_name, relocate_from): - """Compute a new secondary node using an IAllocator. - - """ - req = iallocator.IAReqRelocate(name=instance_name, - relocate_from=list(relocate_from)) - ial = iallocator.IAllocator(lu.cfg, lu.rpc, req) - - ial.Run(iallocator_name) - - if not ial.success: - raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':" - " %s" % (iallocator_name, ial.info), - errors.ECODE_NORES) - - remote_node_name = ial.result[0] - - lu.LogInfo("Selected new secondary for instance '%s': %s", - instance_name, remote_node_name) - - return remote_node_name - - def _FindFaultyDisks(self, node_name): - """Wrapper for L{_FindFaultyInstanceDisks}. - - """ - return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance, - node_name, True) - - def _CheckDisksActivated(self, instance): - """Checks if the instance disks are activated. - - @param instance: The instance to check disks - @return: True if they are activated, False otherwise - - """ - nodes = instance.all_nodes - - for idx, dev in enumerate(instance.disks): - for node in nodes: - self.lu.LogInfo("Checking disk/%d on %s", idx, node) - self.cfg.SetDiskID(dev, node) - - result = _BlockdevFind(self, node, dev, instance) - - if result.offline: - continue - elif result.fail_msg or not result.payload: - return False - - return True - - def CheckPrereq(self): - """Check prerequisites. - - This checks that the instance is in the cluster. - - """ - self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name) - assert instance is not None, \ - "Cannot retrieve locked instance %s" % self.instance_name - - if instance.disk_template != constants.DT_DRBD8: - raise errors.OpPrereqError("Can only run replace disks for DRBD8-based" - " instances", errors.ECODE_INVAL) - - if len(instance.secondary_nodes) != 1: - raise errors.OpPrereqError("The instance has a strange layout," - " expected one secondary but found %d" % - len(instance.secondary_nodes), - errors.ECODE_FAULT) - - instance = self.instance - secondary_node = instance.secondary_nodes[0] - - if self.iallocator_name is None: - remote_node = self.remote_node - else: - remote_node = self._RunAllocator(self.lu, self.iallocator_name, - instance.name, instance.secondary_nodes) - - if remote_node is None: - self.remote_node_info = None - else: - assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \ - "Remote node '%s' is not locked" % remote_node - - self.remote_node_info = self.cfg.GetNodeInfo(remote_node) - assert self.remote_node_info is not None, \ - "Cannot retrieve locked node %s" % remote_node - - if remote_node == self.instance.primary_node: - raise errors.OpPrereqError("The specified node is the primary node of" - " the instance", errors.ECODE_INVAL) - - if remote_node == secondary_node: - raise errors.OpPrereqError("The specified node is already the" - " secondary node of the instance", - errors.ECODE_INVAL) - - if self.disks and self.mode in (constants.REPLACE_DISK_AUTO, - constants.REPLACE_DISK_CHG): - raise errors.OpPrereqError("Cannot specify disks to be replaced", - errors.ECODE_INVAL) - - if self.mode == constants.REPLACE_DISK_AUTO: - if not self._CheckDisksActivated(instance): - raise errors.OpPrereqError("Please run activate-disks on instance %s" - " first" % self.instance_name, - errors.ECODE_STATE) - faulty_primary = self._FindFaultyDisks(instance.primary_node) - faulty_secondary = self._FindFaultyDisks(secondary_node) - - if faulty_primary and faulty_secondary: - raise errors.OpPrereqError("Instance %s has faulty disks on more than" - " one node and can not be repaired" - " automatically" % self.instance_name, - errors.ECODE_STATE) - - if faulty_primary: - self.disks = faulty_primary - self.target_node = instance.primary_node - self.other_node = secondary_node - check_nodes = [self.target_node, self.other_node] - elif faulty_secondary: - self.disks = faulty_secondary - self.target_node = secondary_node - self.other_node = instance.primary_node - check_nodes = [self.target_node, self.other_node] - else: - self.disks = [] - check_nodes = [] - - else: - # Non-automatic modes - if self.mode == constants.REPLACE_DISK_PRI: - self.target_node = instance.primary_node - self.other_node = secondary_node - check_nodes = [self.target_node, self.other_node] - - elif self.mode == constants.REPLACE_DISK_SEC: - self.target_node = secondary_node - self.other_node = instance.primary_node - check_nodes = [self.target_node, self.other_node] - - elif self.mode == constants.REPLACE_DISK_CHG: - self.new_node = remote_node - self.other_node = instance.primary_node - self.target_node = secondary_node - check_nodes = [self.new_node, self.other_node] - - _CheckNodeNotDrained(self.lu, remote_node) - _CheckNodeVmCapable(self.lu, remote_node) - - old_node_info = self.cfg.GetNodeInfo(secondary_node) - assert old_node_info is not None - if old_node_info.offline and not self.early_release: - # doesn't make sense to delay the release - self.early_release = True - self.lu.LogInfo("Old secondary %s is offline, automatically enabling" - " early-release mode", secondary_node) - - else: - raise errors.ProgrammerError("Unhandled disk replace mode (%s)" % - self.mode) - - # If not specified all disks should be replaced - if not self.disks: - self.disks = range(len(self.instance.disks)) - - # TODO: This is ugly, but right now we can't distinguish between internal - # submitted opcode and external one. We should fix that. - if self.remote_node_info: - # We change the node, lets verify it still meets instance policy - new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group) - cluster = self.cfg.GetClusterInfo() - ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, - new_group_info) - _CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info, - self.cfg, ignore=self.ignore_ipolicy) - - for node in check_nodes: - _CheckNodeOnline(self.lu, node) - - touched_nodes = frozenset(node_name for node_name in [self.new_node, - self.other_node, - self.target_node] - if node_name is not None) - - # Release unneeded node and node resource locks - _ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes) - _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes) - _ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC) - - # Release any owned node group - _ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP) - - # Check whether disks are valid - for disk_idx in self.disks: - instance.FindDisk(disk_idx) - - # Get secondary node IP addresses - self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node) - in self.cfg.GetMultiNodeInfo(touched_nodes)) - - def Exec(self, feedback_fn): - """Execute disk replacement. - - This dispatches the disk replacement to the appropriate handler. - - """ - if __debug__: - # Verify owned locks before starting operation - owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE) - assert set(owned_nodes) == set(self.node_secondary_ip), \ - ("Incorrect node locks, owning %s, expected %s" % - (owned_nodes, self.node_secondary_ip.keys())) - assert (self.lu.owned_locks(locking.LEVEL_NODE) == - self.lu.owned_locks(locking.LEVEL_NODE_RES)) - assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC) - - owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE) - assert list(owned_instances) == [self.instance_name], \ - "Instance '%s' not locked" % self.instance_name - - assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \ - "Should not own any node group lock at this point" - - if not self.disks: - feedback_fn("No disks need replacement for instance '%s'" % - self.instance.name) - return - - feedback_fn("Replacing disk(s) %s for instance '%s'" % - (utils.CommaJoin(self.disks), self.instance.name)) - feedback_fn("Current primary node: %s" % self.instance.primary_node) - feedback_fn("Current seconary node: %s" % - utils.CommaJoin(self.instance.secondary_nodes)) - - activate_disks = (self.instance.admin_state != constants.ADMINST_UP) - - # Activate the instance disks if we're replacing them on a down instance - if activate_disks: - _StartInstanceDisks(self.lu, self.instance, True) - - try: - # Should we replace the secondary node? - if self.new_node is not None: - fn = self._ExecDrbd8Secondary - else: - fn = self._ExecDrbd8DiskOnly - - result = fn(feedback_fn) - finally: - # Deactivate the instance disks if we're replacing them on a - # down instance - if activate_disks: - _SafeShutdownInstanceDisks(self.lu, self.instance) - - assert not self.lu.owned_locks(locking.LEVEL_NODE) - - if __debug__: - # Verify owned locks - owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES) - nodes = frozenset(self.node_secondary_ip) - assert ((self.early_release and not owned_nodes) or - (not self.early_release and not (set(owned_nodes) - nodes))), \ - ("Not owning the correct locks, early_release=%s, owned=%r," - " nodes=%r" % (self.early_release, owned_nodes, nodes)) - - return result - - def _CheckVolumeGroup(self, nodes): - self.lu.LogInfo("Checking volume groups") - - vgname = self.cfg.GetVGName() - - # Make sure volume group exists on all involved nodes - results = self.rpc.call_vg_list(nodes) - if not results: - raise errors.OpExecError("Can't list volume groups on the nodes") - - for node in nodes: - res = results[node] - res.Raise("Error checking node %s" % node) - if vgname not in res.payload: - raise errors.OpExecError("Volume group '%s' not found on node %s" % - (vgname, node)) - - def _CheckDisksExistence(self, nodes): - # Check disk existence - for idx, dev in enumerate(self.instance.disks): - if idx not in self.disks: - continue - - for node in nodes: - self.lu.LogInfo("Checking disk/%d on %s", idx, node) - self.cfg.SetDiskID(dev, node) - - result = _BlockdevFind(self, node, dev, self.instance) - - msg = result.fail_msg - if msg or not result.payload: - if not msg: - msg = "disk not found" - raise errors.OpExecError("Can't find disk/%d on node %s: %s" % - (idx, node, msg)) - - def _CheckDisksConsistency(self, node_name, on_primary, ldisk): - for idx, dev in enumerate(self.instance.disks): - if idx not in self.disks: - continue - - self.lu.LogInfo("Checking disk/%d consistency on node %s" % - (idx, node_name)) - - if not _CheckDiskConsistency(self.lu, self.instance, dev, node_name, - on_primary, ldisk=ldisk): - raise errors.OpExecError("Node %s has degraded storage, unsafe to" - " replace disks for instance %s" % - (node_name, self.instance.name)) - - def _CreateNewStorage(self, node_name): - """Create new storage on the primary or secondary node. - - This is only used for same-node replaces, not for changing the - secondary node, hence we don't want to modify the existing disk. - - """ - iv_names = {} - - disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg) - for idx, dev in enumerate(disks): - if idx not in self.disks: - continue - - self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx) - - self.cfg.SetDiskID(dev, node_name) - - lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]] - names = _GenerateUniqueNames(self.lu, lv_names) - - (data_disk, meta_disk) = dev.children - vg_data = data_disk.logical_id[0] - lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size, - logical_id=(vg_data, names[0]), - params=data_disk.params) - vg_meta = meta_disk.logical_id[0] - lv_meta = objects.Disk(dev_type=constants.LD_LV, - size=constants.DRBD_META_SIZE, - logical_id=(vg_meta, names[1]), - params=meta_disk.params) - - new_lvs = [lv_data, lv_meta] - old_lvs = [child.Copy() for child in dev.children] - iv_names[dev.iv_name] = (dev, old_lvs, new_lvs) - excl_stor = _IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name) - - # we pass force_create=True to force the LVM creation - for new_lv in new_lvs: - _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True, - _GetInstanceInfoText(self.instance), False, - excl_stor) - - return iv_names - - def _CheckDevices(self, node_name, iv_names): - for name, (dev, _, _) in iv_names.iteritems(): - self.cfg.SetDiskID(dev, node_name) - - result = _BlockdevFind(self, node_name, dev, self.instance) - - msg = result.fail_msg - if msg or not result.payload: - if not msg: - msg = "disk not found" - raise errors.OpExecError("Can't find DRBD device %s: %s" % - (name, msg)) - - if result.payload.is_degraded: - raise errors.OpExecError("DRBD device %s is degraded!" % name) - - def _RemoveOldStorage(self, node_name, iv_names): - for name, (_, old_lvs, _) in iv_names.iteritems(): - self.lu.LogInfo("Remove logical volumes for %s", name) - - for lv in old_lvs: - self.cfg.SetDiskID(lv, node_name) - - msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg - if msg: - self.lu.LogWarning("Can't remove old LV: %s", msg, - hint="remove unused LVs manually") - - def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613 - """Replace a disk on the primary or secondary for DRBD 8. - - The algorithm for replace is quite complicated: - - 1. for each disk to be replaced: - - 1. create new LVs on the target node with unique names - 1. detach old LVs from the drbd device - 1. rename old LVs to name_replaced. - 1. rename new LVs to old LVs - 1. attach the new LVs (with the old names now) to the drbd device - - 1. wait for sync across all devices - - 1. for each modified disk: - - 1. remove old LVs (which have the name name_replaces.) - - Failures are not very well handled. - - """ - steps_total = 6 - - # Step: check device activation - self.lu.LogStep(1, steps_total, "Check device existence") - self._CheckDisksExistence([self.other_node, self.target_node]) - self._CheckVolumeGroup([self.target_node, self.other_node]) - - # Step: check other node consistency - self.lu.LogStep(2, steps_total, "Check peer consistency") - self._CheckDisksConsistency(self.other_node, - self.other_node == self.instance.primary_node, - False) - - # Step: create new storage - self.lu.LogStep(3, steps_total, "Allocate new storage") - iv_names = self._CreateNewStorage(self.target_node) - - # Step: for each lv, detach+rename*2+attach - self.lu.LogStep(4, steps_total, "Changing drbd configuration") - for dev, old_lvs, new_lvs in iv_names.itervalues(): - self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name) - - result = self.rpc.call_blockdev_removechildren(self.target_node, dev, - old_lvs) - result.Raise("Can't detach drbd from local storage on node" - " %s for device %s" % (self.target_node, dev.iv_name)) - #dev.children = [] - #cfg.Update(instance) - - # ok, we created the new LVs, so now we know we have the needed - # storage; as such, we proceed on the target node to rename - # old_lv to _old, and new_lv to old_lv; note that we rename LVs - # using the assumption that logical_id == physical_id (which in - # turn is the unique_id on that node) - - # FIXME(iustin): use a better name for the replaced LVs - temp_suffix = int(time.time()) - ren_fn = lambda d, suff: (d.physical_id[0], - d.physical_id[1] + "_replaced-%s" % suff) - - # Build the rename list based on what LVs exist on the node - rename_old_to_new = [] - for to_ren in old_lvs: - result = self.rpc.call_blockdev_find(self.target_node, to_ren) - if not result.fail_msg and result.payload: - # device exists - rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix))) - - self.lu.LogInfo("Renaming the old LVs on the target node") - result = self.rpc.call_blockdev_rename(self.target_node, - rename_old_to_new) - result.Raise("Can't rename old LVs on node %s" % self.target_node) - - # Now we rename the new LVs to the old LVs - self.lu.LogInfo("Renaming the new LVs on the target node") - rename_new_to_old = [(new, old.physical_id) - for old, new in zip(old_lvs, new_lvs)] - result = self.rpc.call_blockdev_rename(self.target_node, - rename_new_to_old) - result.Raise("Can't rename new LVs on node %s" % self.target_node) - - # Intermediate steps of in memory modifications - for old, new in zip(old_lvs, new_lvs): - new.logical_id = old.logical_id - self.cfg.SetDiskID(new, self.target_node) - - # We need to modify old_lvs so that removal later removes the - # right LVs, not the newly added ones; note that old_lvs is a - # copy here - for disk in old_lvs: - disk.logical_id = ren_fn(disk, temp_suffix) - self.cfg.SetDiskID(disk, self.target_node) - - # Now that the new lvs have the old name, we can add them to the device - self.lu.LogInfo("Adding new mirror component on %s", self.target_node) - result = self.rpc.call_blockdev_addchildren(self.target_node, - (dev, self.instance), new_lvs) - msg = result.fail_msg - if msg: - for new_lv in new_lvs: - msg2 = self.rpc.call_blockdev_remove(self.target_node, - new_lv).fail_msg - if msg2: - self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2, - hint=("cleanup manually the unused logical" - "volumes")) - raise errors.OpExecError("Can't add local storage to drbd: %s" % msg) - - cstep = itertools.count(5) - - if self.early_release: - self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") - self._RemoveOldStorage(self.target_node, iv_names) - # TODO: Check if releasing locks early still makes sense - _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES) - else: - # Release all resource locks except those used by the instance - _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, - keep=self.node_secondary_ip.keys()) - - # Release all node locks while waiting for sync - _ReleaseLocks(self.lu, locking.LEVEL_NODE) - - # TODO: Can the instance lock be downgraded here? Take the optional disk - # shutdown in the caller into consideration. - - # Wait for sync - # This can fail as the old devices are degraded and _WaitForSync - # does a combined result over all disks, so we don't check its return value - self.lu.LogStep(cstep.next(), steps_total, "Sync devices") - _WaitForSync(self.lu, self.instance) - - # Check all devices manually - self._CheckDevices(self.instance.primary_node, iv_names) - - # Step: remove old storage - if not self.early_release: - self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") - self._RemoveOldStorage(self.target_node, iv_names) - - def _ExecDrbd8Secondary(self, feedback_fn): - """Replace the secondary node for DRBD 8. - - The algorithm for replace is quite complicated: - - for all disks of the instance: - - create new LVs on the new node with same names - - shutdown the drbd device on the old secondary - - disconnect the drbd network on the primary - - create the drbd device on the new secondary - - network attach the drbd on the primary, using an artifice: - the drbd code for Attach() will connect to the network if it - finds a device which is connected to the good local disks but - not network enabled - - wait for sync across all devices - - remove all disks from the old secondary - - Failures are not very well handled. - - """ - steps_total = 6 - - pnode = self.instance.primary_node - - # Step: check device activation - self.lu.LogStep(1, steps_total, "Check device existence") - self._CheckDisksExistence([self.instance.primary_node]) - self._CheckVolumeGroup([self.instance.primary_node]) - - # Step: check other node consistency - self.lu.LogStep(2, steps_total, "Check peer consistency") - self._CheckDisksConsistency(self.instance.primary_node, True, True) - - # Step: create new storage - self.lu.LogStep(3, steps_total, "Allocate new storage") - disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg) - excl_stor = _IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node) - for idx, dev in enumerate(disks): - self.lu.LogInfo("Adding new local storage on %s for disk/%d" % - (self.new_node, idx)) - # we pass force_create=True to force LVM creation - for new_lv in dev.children: - _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv, - True, _GetInstanceInfoText(self.instance), False, - excl_stor) - - # Step 4: dbrd minors and drbd setups changes - # after this, we must manually remove the drbd minors on both the - # error and the success paths - self.lu.LogStep(4, steps_total, "Changing drbd configuration") - minors = self.cfg.AllocateDRBDMinor([self.new_node - for dev in self.instance.disks], - self.instance.name) - logging.debug("Allocated minors %r", minors) - - iv_names = {} - for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)): - self.lu.LogInfo("activating a new drbd on %s for disk/%d" % - (self.new_node, idx)) - # create new devices on new_node; note that we create two IDs: - # one without port, so the drbd will be activated without - # networking information on the new node at this stage, and one - # with network, for the latter activation in step 4 - (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id - if self.instance.primary_node == o_node1: - p_minor = o_minor1 - else: - assert self.instance.primary_node == o_node2, "Three-node instance?" - p_minor = o_minor2 - - new_alone_id = (self.instance.primary_node, self.new_node, None, - p_minor, new_minor, o_secret) - new_net_id = (self.instance.primary_node, self.new_node, o_port, - p_minor, new_minor, o_secret) - - iv_names[idx] = (dev, dev.children, new_net_id) - logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor, - new_net_id) - new_drbd = objects.Disk(dev_type=constants.LD_DRBD8, - logical_id=new_alone_id, - children=dev.children, - size=dev.size, - params={}) - (anno_new_drbd,) = _AnnotateDiskParams(self.instance, [new_drbd], - self.cfg) - try: - _CreateSingleBlockDev(self.lu, self.new_node, self.instance, - anno_new_drbd, - _GetInstanceInfoText(self.instance), False, - excl_stor) - except errors.GenericError: - self.cfg.ReleaseDRBDMinors(self.instance.name) - raise - - # We have new devices, shutdown the drbd on the old secondary - for idx, dev in enumerate(self.instance.disks): - self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx) - self.cfg.SetDiskID(dev, self.target_node) - msg = self.rpc.call_blockdev_shutdown(self.target_node, - (dev, self.instance)).fail_msg - if msg: - self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old" - "node: %s" % (idx, msg), - hint=("Please cleanup this device manually as" - " soon as possible")) - - self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)") - result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip, - self.instance.disks)[pnode] - - msg = result.fail_msg - if msg: - # detaches didn't succeed (unlikely) - self.cfg.ReleaseDRBDMinors(self.instance.name) - raise errors.OpExecError("Can't detach the disks from the network on" - " old node: %s" % (msg,)) - - # if we managed to detach at least one, we update all the disks of - # the instance to point to the new secondary - self.lu.LogInfo("Updating instance configuration") - for dev, _, new_logical_id in iv_names.itervalues(): - dev.logical_id = new_logical_id - self.cfg.SetDiskID(dev, self.instance.primary_node) - - self.cfg.Update(self.instance, feedback_fn) - - # Release all node locks (the configuration has been updated) - _ReleaseLocks(self.lu, locking.LEVEL_NODE) - - # and now perform the drbd attach - self.lu.LogInfo("Attaching primary drbds to new secondary" - " (standalone => connected)") - result = self.rpc.call_drbd_attach_net([self.instance.primary_node, - self.new_node], - self.node_secondary_ip, - (self.instance.disks, self.instance), - self.instance.name, - False) - for to_node, to_result in result.items(): - msg = to_result.fail_msg - if msg: - self.lu.LogWarning("Can't attach drbd disks on node %s: %s", - to_node, msg, - hint=("please do a gnt-instance info to see the" - " status of disks")) - - cstep = itertools.count(5) - - if self.early_release: - self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") - self._RemoveOldStorage(self.target_node, iv_names) - # TODO: Check if releasing locks early still makes sense - _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES) - else: - # Release all resource locks except those used by the instance - _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, - keep=self.node_secondary_ip.keys()) - - # TODO: Can the instance lock be downgraded here? Take the optional disk - # shutdown in the caller into consideration. - - # Wait for sync - # This can fail as the old devices are degraded and _WaitForSync - # does a combined result over all disks, so we don't check its return value - self.lu.LogStep(cstep.next(), steps_total, "Sync devices") - _WaitForSync(self.lu, self.instance) - - # Check all devices manually - self._CheckDevices(self.instance.primary_node, iv_names) - - # Step: remove old storage - if not self.early_release: - self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") - self._RemoveOldStorage(self.target_node, iv_names) diff --git a/lib/cmdlib/instance_storage.py b/lib/cmdlib/instance_storage.py new file mode 100644 index 0000000..bfdee08 --- /dev/null +++ b/lib/cmdlib/instance_storage.py @@ -0,0 +1,2462 @@ +# +# + +# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + + +"""Logical units dealing with storage of instances.""" + +import itertools +import logging +import os +import time + +from ganeti import compat +from ganeti import constants +from ganeti import errors +from ganeti import ht +from ganeti import locking +from ganeti.masterd import iallocator +from ganeti import objects +from ganeti import utils +from ganeti import opcodes +from ganeti import rpc +from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet +from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \ + _AnnotateDiskParams, _CheckIAllocatorOrNode, _ExpandNodeName, \ + _CheckNodeOnline, _CheckInstanceNodeGroups, _CheckInstanceState, \ + _IsExclusiveStorageEnabledNode, _FindFaultyInstanceDisks +from ganeti.cmdlib.instance_utils import _GetInstanceInfoText, \ + _CopyLockList, _ReleaseLocks, _CheckNodeVmCapable, \ + _BuildInstanceHookEnvByObject, _CheckNodeNotDrained, _CheckTargetNodeIPolicy + +import ganeti.masterd.instance + + +_DISK_TEMPLATE_NAME_PREFIX = { + constants.DT_PLAIN: "", + constants.DT_RBD: ".rbd", + constants.DT_EXT: ".ext", + } + + +_DISK_TEMPLATE_DEVICE_TYPE = { + constants.DT_PLAIN: constants.LD_LV, + constants.DT_FILE: constants.LD_FILE, + constants.DT_SHARED_FILE: constants.LD_FILE, + constants.DT_BLOCK: constants.LD_BLOCKDEV, + constants.DT_RBD: constants.LD_RBD, + constants.DT_EXT: constants.LD_EXT, + } + + +def _CreateSingleBlockDev(lu, node, instance, device, info, force_open, + excl_stor): + """Create a single block device on a given node. + + This will not recurse over children of the device, so they must be + created in advance. + + @param lu: the lu on whose behalf we execute + @param node: the node on which to create the device + @type instance: L{objects.Instance} + @param instance: the instance which owns the device + @type device: L{objects.Disk} + @param device: the device to create + @param info: the extra 'metadata' we should attach to the device + (this will be represented as a LVM tag) + @type force_open: boolean + @param force_open: this parameter will be passes to the + L{backend.BlockdevCreate} function where it specifies + whether we run on primary or not, and it affects both + the child assembly and the device own Open() execution + @type excl_stor: boolean + @param excl_stor: Whether exclusive_storage is active for the node + + """ + lu.cfg.SetDiskID(device, node) + result = lu.rpc.call_blockdev_create(node, device, device.size, + instance.name, force_open, info, + excl_stor) + result.Raise("Can't create block device %s on" + " node %s for instance %s" % (device, node, instance.name)) + if device.physical_id is None: + device.physical_id = result.payload + + +def _CreateBlockDevInner(lu, node, instance, device, force_create, + info, force_open, excl_stor): + """Create a tree of block devices on a given node. + + If this device type has to be created on secondaries, create it and + all its children. + + If not, just recurse to children keeping the same 'force' value. + + @attention: The device has to be annotated already. + + @param lu: the lu on whose behalf we execute + @param node: the node on which to create the device + @type instance: L{objects.Instance} + @param instance: the instance which owns the device + @type device: L{objects.Disk} + @param device: the device to create + @type force_create: boolean + @param force_create: whether to force creation of this device; this + will be change to True whenever we find a device which has + CreateOnSecondary() attribute + @param info: the extra 'metadata' we should attach to the device + (this will be represented as a LVM tag) + @type force_open: boolean + @param force_open: this parameter will be passes to the + L{backend.BlockdevCreate} function where it specifies + whether we run on primary or not, and it affects both + the child assembly and the device own Open() execution + @type excl_stor: boolean + @param excl_stor: Whether exclusive_storage is active for the node + + @return: list of created devices + """ + created_devices = [] + try: + if device.CreateOnSecondary(): + force_create = True + + if device.children: + for child in device.children: + devs = _CreateBlockDevInner(lu, node, instance, child, force_create, + info, force_open, excl_stor) + created_devices.extend(devs) + + if not force_create: + return created_devices + + _CreateSingleBlockDev(lu, node, instance, device, info, force_open, + excl_stor) + # The device has been completely created, so there is no point in keeping + # its subdevices in the list. We just add the device itself instead. + created_devices = [(node, device)] + return created_devices + + except errors.DeviceCreationError, e: + e.created_devices.extend(created_devices) + raise e + except errors.OpExecError, e: + raise errors.DeviceCreationError(str(e), created_devices) + + +def _IsExclusiveStorageEnabledNodeName(cfg, nodename): + """Whether exclusive_storage is in effect for the given node. + + @type cfg: L{config.ConfigWriter} + @param cfg: The cluster configuration + @type nodename: string + @param nodename: The node + @rtype: bool + @return: The effective value of exclusive_storage + @raise errors.OpPrereqError: if no node exists with the given name + + """ + ni = cfg.GetNodeInfo(nodename) + if ni is None: + raise errors.OpPrereqError("Invalid node name %s" % nodename, + errors.ECODE_NOENT) + return _IsExclusiveStorageEnabledNode(cfg, ni) + + +def _CreateBlockDev(lu, node, instance, device, force_create, info, + force_open): + """Wrapper around L{_CreateBlockDevInner}. + + This method annotates the root device first. + + """ + (disk,) = _AnnotateDiskParams(instance, [device], lu.cfg) + excl_stor = _IsExclusiveStorageEnabledNodeName(lu.cfg, node) + return _CreateBlockDevInner(lu, node, instance, disk, force_create, info, + force_open, excl_stor) + + +def _CreateDisks(lu, instance, to_skip=None, target_node=None): + """Create all disks for an instance. + + This abstracts away some work from AddInstance. + + @type lu: L{LogicalUnit} + @param lu: the logical unit on whose behalf we execute + @type instance: L{objects.Instance} + @param instance: the instance whose disks we should create + @type to_skip: list + @param to_skip: list of indices to skip + @type target_node: string + @param target_node: if passed, overrides the target node for creation + @rtype: boolean + @return: the success of the creation + + """ + info = _GetInstanceInfoText(instance) + if target_node is None: + pnode = instance.primary_node + all_nodes = instance.all_nodes + else: + pnode = target_node + all_nodes = [pnode] + + if instance.disk_template in constants.DTS_FILEBASED: + file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1]) + result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir) + + result.Raise("Failed to create directory '%s' on" + " node %s" % (file_storage_dir, pnode)) + + disks_created = [] + # Note: this needs to be kept in sync with adding of disks in + # LUInstanceSetParams + for idx, device in enumerate(instance.disks): + if to_skip and idx in to_skip: + continue + logging.info("Creating disk %s for instance '%s'", idx, instance.name) + #HARDCODE + for node in all_nodes: + f_create = node == pnode + try: + _CreateBlockDev(lu, node, instance, device, f_create, info, f_create) + disks_created.append((node, device)) + except errors.OpExecError: + logging.warning("Creating disk %s for instance '%s' failed", + idx, instance.name) + except errors.DeviceCreationError, e: + logging.warning("Creating disk %s for instance '%s' failed", + idx, instance.name) + disks_created.extend(e.created_devices) + for (node, disk) in disks_created: + lu.cfg.SetDiskID(disk, node) + result = lu.rpc.call_blockdev_remove(node, disk) + if result.fail_msg: + logging.warning("Failed to remove newly-created disk %s on node %s:" + " %s", device, node, result.fail_msg) + raise errors.OpExecError(e.message) + + +def _ComputeDiskSizePerVG(disk_template, disks): + """Compute disk size requirements in the volume group + + """ + def _compute(disks, payload): + """Universal algorithm. + + """ + vgs = {} + for disk in disks: + vgs[disk[constants.IDISK_VG]] = \ + vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload + + return vgs + + # Required free disk space as a function of disk and swap space + req_size_dict = { + constants.DT_DISKLESS: {}, + constants.DT_PLAIN: _compute(disks, 0), + # 128 MB are added for drbd metadata for each disk + constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE), + constants.DT_FILE: {}, + constants.DT_SHARED_FILE: {}, + } + + if disk_template not in req_size_dict: + raise errors.ProgrammerError("Disk template '%s' size requirement" + " is unknown" % disk_template) + + return req_size_dict[disk_template] + + +def _ComputeDisks(op, default_vg): + """Computes the instance disks. + + @param op: The instance opcode + @param default_vg: The default_vg to assume + + @return: The computed disks + + """ + disks = [] + for disk in op.disks: + mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR) + if mode not in constants.DISK_ACCESS_SET: + raise errors.OpPrereqError("Invalid disk access mode '%s'" % + mode, errors.ECODE_INVAL) + size = disk.get(constants.IDISK_SIZE, None) + if size is None: + raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL) + try: + size = int(size) + except (TypeError, ValueError): + raise errors.OpPrereqError("Invalid disk size '%s'" % size, + errors.ECODE_INVAL) + + ext_provider = disk.get(constants.IDISK_PROVIDER, None) + if ext_provider and op.disk_template != constants.DT_EXT: + raise errors.OpPrereqError("The '%s' option is only valid for the %s" + " disk template, not %s" % + (constants.IDISK_PROVIDER, constants.DT_EXT, + op.disk_template), errors.ECODE_INVAL) + + data_vg = disk.get(constants.IDISK_VG, default_vg) + name = disk.get(constants.IDISK_NAME, None) + if name is not None and name.lower() == constants.VALUE_NONE: + name = None + new_disk = { + constants.IDISK_SIZE: size, + constants.IDISK_MODE: mode, + constants.IDISK_VG: data_vg, + constants.IDISK_NAME: name, + } + + if constants.IDISK_METAVG in disk: + new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG] + if constants.IDISK_ADOPT in disk: + new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT] + + # For extstorage, demand the `provider' option and add any + # additional parameters (ext-params) to the dict + if op.disk_template == constants.DT_EXT: + if ext_provider: + new_disk[constants.IDISK_PROVIDER] = ext_provider + for key in disk: + if key not in constants.IDISK_PARAMS: + new_disk[key] = disk[key] + else: + raise errors.OpPrereqError("Missing provider for template '%s'" % + constants.DT_EXT, errors.ECODE_INVAL) + + disks.append(new_disk) + + return disks + + +def _CheckRADOSFreeSpace(): + """Compute disk size requirements inside the RADOS cluster. + + """ + # For the RADOS cluster we assume there is always enough space. + pass + + +def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names, + iv_name, p_minor, s_minor): + """Generate a drbd8 device complete with its children. + + """ + assert len(vgnames) == len(names) == 2 + port = lu.cfg.AllocatePort() + shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId()) + + dev_data = objects.Disk(dev_type=constants.LD_LV, size=size, + logical_id=(vgnames[0], names[0]), + params={}) + dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) + dev_meta = objects.Disk(dev_type=constants.LD_LV, + size=constants.DRBD_META_SIZE, + logical_id=(vgnames[1], names[1]), + params={}) + dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) + drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size, + logical_id=(primary, secondary, port, + p_minor, s_minor, + shared_secret), + children=[dev_data, dev_meta], + iv_name=iv_name, params={}) + drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) + return drbd_dev + + +def _GenerateDiskTemplate( + lu, template_name, instance_name, primary_node, secondary_nodes, + disk_info, file_storage_dir, file_driver, base_index, + feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage, + _req_shr_file_storage=opcodes.RequireSharedFileStorage): + """Generate the entire disk layout for a given template type. + + """ + vgname = lu.cfg.GetVGName() + disk_count = len(disk_info) + disks = [] + + if template_name == constants.DT_DISKLESS: + pass + elif template_name == constants.DT_DRBD8: + if len(secondary_nodes) != 1: + raise errors.ProgrammerError("Wrong template configuration") + remote_node = secondary_nodes[0] + minors = lu.cfg.AllocateDRBDMinor( + [primary_node, remote_node] * len(disk_info), instance_name) + + (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name, + full_disk_params) + drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG] + + names = [] + for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i) + for i in range(disk_count)]): + names.append(lv_prefix + "_data") + names.append(lv_prefix + "_meta") + for idx, disk in enumerate(disk_info): + disk_index = idx + base_index + data_vg = disk.get(constants.IDISK_VG, vgname) + meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg) + disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node, + disk[constants.IDISK_SIZE], + [data_vg, meta_vg], + names[idx * 2:idx * 2 + 2], + "disk/%d" % disk_index, + minors[idx * 2], minors[idx * 2 + 1]) + disk_dev.mode = disk[constants.IDISK_MODE] + disk_dev.name = disk.get(constants.IDISK_NAME, None) + disks.append(disk_dev) + else: + if secondary_nodes: + raise errors.ProgrammerError("Wrong template configuration") + + if template_name == constants.DT_FILE: + _req_file_storage() + elif template_name == constants.DT_SHARED_FILE: + _req_shr_file_storage() + + name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None) + if name_prefix is None: + names = None + else: + names = _GenerateUniqueNames(lu, ["%s.disk%s" % + (name_prefix, base_index + i) + for i in range(disk_count)]) + + if template_name == constants.DT_PLAIN: + + def logical_id_fn(idx, _, disk): + vg = disk.get(constants.IDISK_VG, vgname) + return (vg, names[idx]) + + elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE): + logical_id_fn = \ + lambda _, disk_index, disk: (file_driver, + "%s/disk%d" % (file_storage_dir, + disk_index)) + elif template_name == constants.DT_BLOCK: + logical_id_fn = \ + lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL, + disk[constants.IDISK_ADOPT]) + elif template_name == constants.DT_RBD: + logical_id_fn = lambda idx, _, disk: ("rbd", names[idx]) + elif template_name == constants.DT_EXT: + def logical_id_fn(idx, _, disk): + provider = disk.get(constants.IDISK_PROVIDER, None) + if provider is None: + raise errors.ProgrammerError("Disk template is %s, but '%s' is" + " not found", constants.DT_EXT, + constants.IDISK_PROVIDER) + return (provider, names[idx]) + else: + raise errors.ProgrammerError("Unknown disk template '%s'" % template_name) + + dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name] + + for idx, disk in enumerate(disk_info): + params = {} + # Only for the Ext template add disk_info to params + if template_name == constants.DT_EXT: + params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER] + for key in disk: + if key not in constants.IDISK_PARAMS: + params[key] = disk[key] + disk_index = idx + base_index + size = disk[constants.IDISK_SIZE] + feedback_fn("* disk %s, size %s" % + (disk_index, utils.FormatUnit(size, "h"))) + disk_dev = objects.Disk(dev_type=dev_type, size=size, + logical_id=logical_id_fn(idx, disk_index, disk), + iv_name="disk/%d" % disk_index, + mode=disk[constants.IDISK_MODE], + params=params) + disk_dev.name = disk.get(constants.IDISK_NAME, None) + disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) + disks.append(disk_dev) + + return disks + + +class LUInstanceRecreateDisks(LogicalUnit): + """Recreate an instance's missing disks. + + """ + HPATH = "instance-recreate-disks" + HTYPE = constants.HTYPE_INSTANCE + REQ_BGL = False + + _MODIFYABLE = compat.UniqueFrozenset([ + constants.IDISK_SIZE, + constants.IDISK_MODE, + ]) + + # New or changed disk parameters may have different semantics + assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([ + constants.IDISK_ADOPT, + + # TODO: Implement support changing VG while recreating + constants.IDISK_VG, + constants.IDISK_METAVG, + constants.IDISK_PROVIDER, + constants.IDISK_NAME, + ])) + + def _RunAllocator(self): + """Run the allocator based on input opcode. + + """ + be_full = self.cfg.GetClusterInfo().FillBE(self.instance) + + # FIXME + # The allocator should actually run in "relocate" mode, but current + # allocators don't support relocating all the nodes of an instance at + # the same time. As a workaround we use "allocate" mode, but this is + # suboptimal for two reasons: + # - The instance name passed to the allocator is present in the list of + # existing instances, so there could be a conflict within the + # internal structures of the allocator. This doesn't happen with the + # current allocators, but it's a liability. + # - The allocator counts the resources used by the instance twice: once + # because the instance exists already, and once because it tries to + # allocate a new instance. + # The allocator could choose some of the nodes on which the instance is + # running, but that's not a problem. If the instance nodes are broken, + # they should be already be marked as drained or offline, and hence + # skipped by the allocator. If instance disks have been lost for other + # reasons, then recreating the disks on the same nodes should be fine. + disk_template = self.instance.disk_template + spindle_use = be_full[constants.BE_SPINDLE_USE] + req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name, + disk_template=disk_template, + tags=list(self.instance.GetTags()), + os=self.instance.os, + nics=[{}], + vcpus=be_full[constants.BE_VCPUS], + memory=be_full[constants.BE_MAXMEM], + spindle_use=spindle_use, + disks=[{constants.IDISK_SIZE: d.size, + constants.IDISK_MODE: d.mode} + for d in self.instance.disks], + hypervisor=self.instance.hypervisor, + node_whitelist=None) + ial = iallocator.IAllocator(self.cfg, self.rpc, req) + + ial.Run(self.op.iallocator) + + assert req.RequiredNodes() == len(self.instance.all_nodes) + + if not ial.success: + raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':" + " %s" % (self.op.iallocator, ial.info), + errors.ECODE_NORES) + + self.op.nodes = ial.result + self.LogInfo("Selected nodes for instance %s via iallocator %s: %s", + self.op.instance_name, self.op.iallocator, + utils.CommaJoin(ial.result)) + + def CheckArguments(self): + if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]): + # Normalize and convert deprecated list of disk indices + self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))] + + duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks)) + if duplicates: + raise errors.OpPrereqError("Some disks have been specified more than" + " once: %s" % utils.CommaJoin(duplicates), + errors.ECODE_INVAL) + + # We don't want _CheckIAllocatorOrNode selecting the default iallocator + # when neither iallocator nor nodes are specified + if self.op.iallocator or self.op.nodes: + _CheckIAllocatorOrNode(self, "iallocator", "nodes") + + for (idx, params) in self.op.disks: + utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES) + unsupported = frozenset(params.keys()) - self._MODIFYABLE + if unsupported: + raise errors.OpPrereqError("Parameters for disk %s try to change" + " unmodifyable parameter(s): %s" % + (idx, utils.CommaJoin(unsupported)), + errors.ECODE_INVAL) + + def ExpandNames(self): + self._ExpandAndLockInstance() + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + + if self.op.nodes: + self.op.nodes = [_ExpandNodeName(self.cfg, n) for n in self.op.nodes] + self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes) + else: + self.needed_locks[locking.LEVEL_NODE] = [] + if self.op.iallocator: + # iallocator will select a new node in the same group + self.needed_locks[locking.LEVEL_NODEGROUP] = [] + self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET + + self.needed_locks[locking.LEVEL_NODE_RES] = [] + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODEGROUP: + assert self.op.iallocator is not None + assert not self.op.nodes + assert not self.needed_locks[locking.LEVEL_NODEGROUP] + self.share_locks[locking.LEVEL_NODEGROUP] = 1 + # Lock the primary group used by the instance optimistically; this + # requires going via the node before it's locked, requiring + # verification later on + self.needed_locks[locking.LEVEL_NODEGROUP] = \ + self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True) + + elif level == locking.LEVEL_NODE: + # If an allocator is used, then we lock all the nodes in the current + # instance group, as we don't know yet which ones will be selected; + # if we replace the nodes without using an allocator, locks are + # already declared in ExpandNames; otherwise, we need to lock all the + # instance nodes for disk re-creation + if self.op.iallocator: + assert not self.op.nodes + assert not self.needed_locks[locking.LEVEL_NODE] + assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1 + + # Lock member nodes of the group of the primary node + for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP): + self.needed_locks[locking.LEVEL_NODE].extend( + self.cfg.GetNodeGroup(group_uuid).members) + + assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC) + elif not self.op.nodes: + self._LockInstancesNodes(primary_only=False) + elif level == locking.LEVEL_NODE_RES: + # Copy node locks + self.needed_locks[locking.LEVEL_NODE_RES] = \ + _CopyLockList(self.needed_locks[locking.LEVEL_NODE]) + + def BuildHooksEnv(self): + """Build hooks env. + + This runs on master, primary and secondary nodes of the instance. + + """ + return _BuildInstanceHookEnvByObject(self, self.instance) + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ + nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) + return (nl, nl) + + def CheckPrereq(self): + """Check prerequisites. + + This checks that the instance is in the cluster and is not running. + + """ + instance = self.cfg.GetInstanceInfo(self.op.instance_name) + assert instance is not None, \ + "Cannot retrieve locked instance %s" % self.op.instance_name + if self.op.nodes: + if len(self.op.nodes) != len(instance.all_nodes): + raise errors.OpPrereqError("Instance %s currently has %d nodes, but" + " %d replacement nodes were specified" % + (instance.name, len(instance.all_nodes), + len(self.op.nodes)), + errors.ECODE_INVAL) + assert instance.disk_template != constants.DT_DRBD8 or \ + len(self.op.nodes) == 2 + assert instance.disk_template != constants.DT_PLAIN or \ + len(self.op.nodes) == 1 + primary_node = self.op.nodes[0] + else: + primary_node = instance.primary_node + if not self.op.iallocator: + _CheckNodeOnline(self, primary_node) + + if instance.disk_template == constants.DT_DISKLESS: + raise errors.OpPrereqError("Instance '%s' has no disks" % + self.op.instance_name, errors.ECODE_INVAL) + + # Verify if node group locks are still correct + owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) + if owned_groups: + # Node group locks are acquired only for the primary node (and only + # when the allocator is used) + _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups, + primary_only=True) + + # if we replace nodes *and* the old primary is offline, we don't + # check the instance state + old_pnode = self.cfg.GetNodeInfo(instance.primary_node) + if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline): + _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING, + msg="cannot recreate disks") + + if self.op.disks: + self.disks = dict(self.op.disks) + else: + self.disks = dict((idx, {}) for idx in range(len(instance.disks))) + + maxidx = max(self.disks.keys()) + if maxidx >= len(instance.disks): + raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx, + errors.ECODE_INVAL) + + if ((self.op.nodes or self.op.iallocator) and + sorted(self.disks.keys()) != range(len(instance.disks))): + raise errors.OpPrereqError("Can't recreate disks partially and" + " change the nodes at the same time", + errors.ECODE_INVAL) + + self.instance = instance + + if self.op.iallocator: + self._RunAllocator() + # Release unneeded node and node resource locks + _ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes) + _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes) + _ReleaseLocks(self, locking.LEVEL_NODE_ALLOC) + + assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC) + + def Exec(self, feedback_fn): + """Recreate the disks. + + """ + instance = self.instance + + assert (self.owned_locks(locking.LEVEL_NODE) == + self.owned_locks(locking.LEVEL_NODE_RES)) + + to_skip = [] + mods = [] # keeps track of needed changes + + for idx, disk in enumerate(instance.disks): + try: + changes = self.disks[idx] + except KeyError: + # Disk should not be recreated + to_skip.append(idx) + continue + + # update secondaries for disks, if needed + if self.op.nodes and disk.dev_type == constants.LD_DRBD8: + # need to update the nodes and minors + assert len(self.op.nodes) == 2 + assert len(disk.logical_id) == 6 # otherwise disk internals + # have changed + (_, _, old_port, _, _, old_secret) = disk.logical_id + new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name) + new_id = (self.op.nodes[0], self.op.nodes[1], old_port, + new_minors[0], new_minors[1], old_secret) + assert len(disk.logical_id) == len(new_id) + else: + new_id = None + + mods.append((idx, new_id, changes)) + + # now that we have passed all asserts above, we can apply the mods + # in a single run (to avoid partial changes) + for idx, new_id, changes in mods: + disk = instance.disks[idx] + if new_id is not None: + assert disk.dev_type == constants.LD_DRBD8 + disk.logical_id = new_id + if changes: + disk.Update(size=changes.get(constants.IDISK_SIZE, None), + mode=changes.get(constants.IDISK_MODE, None)) + + # change primary node, if needed + if self.op.nodes: + instance.primary_node = self.op.nodes[0] + self.LogWarning("Changing the instance's nodes, you will have to" + " remove any disks left on the older nodes manually") + + if self.op.nodes: + self.cfg.Update(instance, feedback_fn) + + # All touched nodes must be locked + mylocks = self.owned_locks(locking.LEVEL_NODE) + assert mylocks.issuperset(frozenset(instance.all_nodes)) + _CreateDisks(self, instance, to_skip=to_skip) + + +def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested): + """Checks if nodes have enough free disk space in the specified VG. + + This function checks if all given nodes have the needed amount of + free disk. In case any node has less disk or we cannot get the + information from the node, this function raises an OpPrereqError + exception. + + @type lu: C{LogicalUnit} + @param lu: a logical unit from which we get configuration data + @type nodenames: C{list} + @param nodenames: the list of node names to check + @type vg: C{str} + @param vg: the volume group to check + @type requested: C{int} + @param requested: the amount of disk in MiB to check for + @raise errors.OpPrereqError: if the node doesn't have enough disk, + or we cannot check the node + + """ + es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames) + nodeinfo = lu.rpc.call_node_info(nodenames, [vg], None, es_flags) + for node in nodenames: + info = nodeinfo[node] + info.Raise("Cannot get current information from node %s" % node, + prereq=True, ecode=errors.ECODE_ENVIRON) + (_, (vg_info, ), _) = info.payload + vg_free = vg_info.get("vg_free", None) + if not isinstance(vg_free, int): + raise errors.OpPrereqError("Can't compute free disk space on node" + " %s for vg %s, result was '%s'" % + (node, vg, vg_free), errors.ECODE_ENVIRON) + if requested > vg_free: + raise errors.OpPrereqError("Not enough disk space on target node %s" + " vg %s: required %d MiB, available %d MiB" % + (node, vg, requested, vg_free), + errors.ECODE_NORES) + + +def _CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes): + """Checks if nodes have enough free disk space in all the VGs. + + This function checks if all given nodes have the needed amount of + free disk. In case any node has less disk or we cannot get the + information from the node, this function raises an OpPrereqError + exception. + + @type lu: C{LogicalUnit} + @param lu: a logical unit from which we get configuration data + @type nodenames: C{list} + @param nodenames: the list of node names to check + @type req_sizes: C{dict} + @param req_sizes: the hash of vg and corresponding amount of disk in + MiB to check for + @raise errors.OpPrereqError: if the node doesn't have enough disk, + or we cannot check the node + + """ + for vg, req_size in req_sizes.items(): + _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size) + + +def _DiskSizeInBytesToMebibytes(lu, size): + """Converts a disk size in bytes to mebibytes. + + Warns and rounds up if the size isn't an even multiple of 1 MiB. + + """ + (mib, remainder) = divmod(size, 1024 * 1024) + + if remainder != 0: + lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up" + " to not overwrite existing data (%s bytes will not be" + " wiped)", (1024 * 1024) - remainder) + mib += 1 + + return mib + + +def _CalcEta(time_taken, written, total_size): + """Calculates the ETA based on size written and total size. + + @param time_taken: The time taken so far + @param written: amount written so far + @param total_size: The total size of data to be written + @return: The remaining time in seconds + + """ + avg_time = time_taken / float(written) + return (total_size - written) * avg_time + + +def _WipeDisks(lu, instance, disks=None): + """Wipes instance disks. + + @type lu: L{LogicalUnit} + @param lu: the logical unit on whose behalf we execute + @type instance: L{objects.Instance} + @param instance: the instance whose disks we should create + @type disks: None or list of tuple of (number, L{objects.Disk}, number) + @param disks: Disk details; tuple contains disk index, disk object and the + start offset + + """ + node = instance.primary_node + + if disks is None: + disks = [(idx, disk, 0) + for (idx, disk) in enumerate(instance.disks)] + + for (_, device, _) in disks: + lu.cfg.SetDiskID(device, node) + + logging.info("Pausing synchronization of disks of instance '%s'", + instance.name) + result = lu.rpc.call_blockdev_pause_resume_sync(node, + (map(compat.snd, disks), + instance), + True) + result.Raise("Failed to pause disk synchronization on node '%s'" % node) + + for idx, success in enumerate(result.payload): + if not success: + logging.warn("Pausing synchronization of disk %s of instance '%s'" + " failed", idx, instance.name) + + try: + for (idx, device, offset) in disks: + # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but + # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors. + wipe_chunk_size = \ + int(min(constants.MAX_WIPE_CHUNK, + device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT)) + + size = device.size + last_output = 0 + start_time = time.time() + + if offset == 0: + info_text = "" + else: + info_text = (" (from %s to %s)" % + (utils.FormatUnit(offset, "h"), + utils.FormatUnit(size, "h"))) + + lu.LogInfo("* Wiping disk %s%s", idx, info_text) + + logging.info("Wiping disk %d for instance %s on node %s using" + " chunk size %s", idx, instance.name, node, wipe_chunk_size) + + while offset < size: + wipe_size = min(wipe_chunk_size, size - offset) + + logging.debug("Wiping disk %d, offset %s, chunk %s", + idx, offset, wipe_size) + + result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset, + wipe_size) + result.Raise("Could not wipe disk %d at offset %d for size %d" % + (idx, offset, wipe_size)) + + now = time.time() + offset += wipe_size + if now - last_output >= 60: + eta = _CalcEta(now - start_time, offset, size) + lu.LogInfo(" - done: %.1f%% ETA: %s", + offset / float(size) * 100, utils.FormatSeconds(eta)) + last_output = now + finally: + logging.info("Resuming synchronization of disks for instance '%s'", + instance.name) + + result = lu.rpc.call_blockdev_pause_resume_sync(node, + (map(compat.snd, disks), + instance), + False) + + if result.fail_msg: + lu.LogWarning("Failed to resume disk synchronization on node '%s': %s", + node, result.fail_msg) + else: + for idx, success in enumerate(result.payload): + if not success: + lu.LogWarning("Resuming synchronization of disk %s of instance '%s'" + " failed", idx, instance.name) + + +def _ExpandCheckDisks(instance, disks): + """Return the instance disks selected by the disks list + + @type disks: list of L{objects.Disk} or None + @param disks: selected disks + @rtype: list of L{objects.Disk} + @return: selected instance disks to act on + + """ + if disks is None: + return instance.disks + else: + if not set(disks).issubset(instance.disks): + raise errors.ProgrammerError("Can only act on disks belonging to the" + " target instance") + return disks + + +def _WaitForSync(lu, instance, disks=None, oneshot=False): + """Sleep and poll for an instance's disk to sync. + + """ + if not instance.disks or disks is not None and not disks: + return True + + disks = _ExpandCheckDisks(instance, disks) + + if not oneshot: + lu.LogInfo("Waiting for instance %s to sync disks", instance.name) + + node = instance.primary_node + + for dev in disks: + lu.cfg.SetDiskID(dev, node) + + # TODO: Convert to utils.Retry + + retries = 0 + degr_retries = 10 # in seconds, as we sleep 1 second each time + while True: + max_time = 0 + done = True + cumul_degraded = False + rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance)) + msg = rstats.fail_msg + if msg: + lu.LogWarning("Can't get any data from node %s: %s", node, msg) + retries += 1 + if retries >= 10: + raise errors.RemoteError("Can't contact node %s for mirror data," + " aborting." % node) + time.sleep(6) + continue + rstats = rstats.payload + retries = 0 + for i, mstat in enumerate(rstats): + if mstat is None: + lu.LogWarning("Can't compute data for node %s/%s", + node, disks[i].iv_name) + continue + + cumul_degraded = (cumul_degraded or + (mstat.is_degraded and mstat.sync_percent is None)) + if mstat.sync_percent is not None: + done = False + if mstat.estimated_time is not None: + rem_time = ("%s remaining (estimated)" % + utils.FormatSeconds(mstat.estimated_time)) + max_time = mstat.estimated_time + else: + rem_time = "no time estimate" + lu.LogInfo("- device %s: %5.2f%% done, %s", + disks[i].iv_name, mstat.sync_percent, rem_time) + + # if we're done but degraded, let's do a few small retries, to + # make sure we see a stable and not transient situation; therefore + # we force restart of the loop + if (done or oneshot) and cumul_degraded and degr_retries > 0: + logging.info("Degraded disks found, %d retries left", degr_retries) + degr_retries -= 1 + time.sleep(1) + continue + + if done or oneshot: + break + + time.sleep(min(60, max_time)) + + if done: + lu.LogInfo("Instance %s's disks are in sync", instance.name) + + return not cumul_degraded + + +def _ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False): + """Shutdown block devices of an instance. + + This does the shutdown on all nodes of the instance. + + If the ignore_primary is false, errors on the primary node are + ignored. + + """ + all_result = True + disks = _ExpandCheckDisks(instance, disks) + + for disk in disks: + for node, top_disk in disk.ComputeNodeTree(instance.primary_node): + lu.cfg.SetDiskID(top_disk, node) + result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance)) + msg = result.fail_msg + if msg: + lu.LogWarning("Could not shutdown block device %s on node %s: %s", + disk.iv_name, node, msg) + if ((node == instance.primary_node and not ignore_primary) or + (node != instance.primary_node and not result.offline)): + all_result = False + return all_result + + +def _SafeShutdownInstanceDisks(lu, instance, disks=None): + """Shutdown block devices of an instance. + + This function checks if an instance is running, before calling + _ShutdownInstanceDisks. + + """ + _CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks") + _ShutdownInstanceDisks(lu, instance, disks=disks) + + +def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False, + ignore_size=False): + """Prepare the block devices for an instance. + + This sets up the block devices on all nodes. + + @type lu: L{LogicalUnit} + @param lu: the logical unit on whose behalf we execute + @type instance: L{objects.Instance} + @param instance: the instance for whose disks we assemble + @type disks: list of L{objects.Disk} or None + @param disks: which disks to assemble (or all, if None) + @type ignore_secondaries: boolean + @param ignore_secondaries: if true, errors on secondary nodes + won't result in an error return from the function + @type ignore_size: boolean + @param ignore_size: if true, the current known size of the disk + will not be used during the disk activation, useful for cases + when the size is wrong + @return: False if the operation failed, otherwise a list of + (host, instance_visible_name, node_visible_name) + with the mapping from node devices to instance devices + + """ + device_info = [] + disks_ok = True + iname = instance.name + disks = _ExpandCheckDisks(instance, disks) + + # With the two passes mechanism we try to reduce the window of + # opportunity for the race condition of switching DRBD to primary + # before handshaking occured, but we do not eliminate it + + # The proper fix would be to wait (with some limits) until the + # connection has been made and drbd transitions from WFConnection + # into any other network-connected state (Connected, SyncTarget, + # SyncSource, etc.) + + # 1st pass, assemble on all nodes in secondary mode + for idx, inst_disk in enumerate(disks): + for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node): + if ignore_size: + node_disk = node_disk.Copy() + node_disk.UnsetSize() + lu.cfg.SetDiskID(node_disk, node) + result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname, + False, idx) + msg = result.fail_msg + if msg: + is_offline_secondary = (node in instance.secondary_nodes and + result.offline) + lu.LogWarning("Could not prepare block device %s on node %s" + " (is_primary=False, pass=1): %s", + inst_disk.iv_name, node, msg) + if not (ignore_secondaries or is_offline_secondary): + disks_ok = False + + # FIXME: race condition on drbd migration to primary + + # 2nd pass, do only the primary node + for idx, inst_disk in enumerate(disks): + dev_path = None + + for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node): + if node != instance.primary_node: + continue + if ignore_size: + node_disk = node_disk.Copy() + node_disk.UnsetSize() + lu.cfg.SetDiskID(node_disk, node) + result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname, + True, idx) + msg = result.fail_msg + if msg: + lu.LogWarning("Could not prepare block device %s on node %s" + " (is_primary=True, pass=2): %s", + inst_disk.iv_name, node, msg) + disks_ok = False + else: + dev_path = result.payload + + device_info.append((instance.primary_node, inst_disk.iv_name, dev_path)) + + # leave the disks configured for the primary node + # this is a workaround that would be fixed better by + # improving the logical/physical id handling + for disk in disks: + lu.cfg.SetDiskID(disk, instance.primary_node) + + return disks_ok, device_info + + +def _StartInstanceDisks(lu, instance, force): + """Start the disks of an instance. + + """ + disks_ok, _ = _AssembleInstanceDisks(lu, instance, + ignore_secondaries=force) + if not disks_ok: + _ShutdownInstanceDisks(lu, instance) + if force is not None and not force: + lu.LogWarning("", + hint=("If the message above refers to a secondary node," + " you can retry the operation using '--force'")) + raise errors.OpExecError("Disk consistency error") + + +class LUInstanceGrowDisk(LogicalUnit): + """Grow a disk of an instance. + + """ + HPATH = "disk-grow" + HTYPE = constants.HTYPE_INSTANCE + REQ_BGL = False + + def ExpandNames(self): + self._ExpandAndLockInstance() + self.needed_locks[locking.LEVEL_NODE] = [] + self.needed_locks[locking.LEVEL_NODE_RES] = [] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE: + self._LockInstancesNodes() + elif level == locking.LEVEL_NODE_RES: + # Copy node locks + self.needed_locks[locking.LEVEL_NODE_RES] = \ + _CopyLockList(self.needed_locks[locking.LEVEL_NODE]) + + def BuildHooksEnv(self): + """Build hooks env. + + This runs on the master, the primary and all the secondaries. + + """ + env = { + "DISK": self.op.disk, + "AMOUNT": self.op.amount, + "ABSOLUTE": self.op.absolute, + } + env.update(_BuildInstanceHookEnvByObject(self, self.instance)) + return env + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ + nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) + return (nl, nl) + + def CheckPrereq(self): + """Check prerequisites. + + This checks that the instance is in the cluster. + + """ + instance = self.cfg.GetInstanceInfo(self.op.instance_name) + assert instance is not None, \ + "Cannot retrieve locked instance %s" % self.op.instance_name + nodenames = list(instance.all_nodes) + for node in nodenames: + _CheckNodeOnline(self, node) + + self.instance = instance + + if instance.disk_template not in constants.DTS_GROWABLE: + raise errors.OpPrereqError("Instance's disk layout does not support" + " growing", errors.ECODE_INVAL) + + self.disk = instance.FindDisk(self.op.disk) + + if self.op.absolute: + self.target = self.op.amount + self.delta = self.target - self.disk.size + if self.delta < 0: + raise errors.OpPrereqError("Requested size (%s) is smaller than " + "current disk size (%s)" % + (utils.FormatUnit(self.target, "h"), + utils.FormatUnit(self.disk.size, "h")), + errors.ECODE_STATE) + else: + self.delta = self.op.amount + self.target = self.disk.size + self.delta + if self.delta < 0: + raise errors.OpPrereqError("Requested increment (%s) is negative" % + utils.FormatUnit(self.delta, "h"), + errors.ECODE_INVAL) + + self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta)) + + def _CheckDiskSpace(self, nodenames, req_vgspace): + template = self.instance.disk_template + if template not in (constants.DTS_NO_FREE_SPACE_CHECK): + # TODO: check the free disk space for file, when that feature will be + # supported + nodes = map(self.cfg.GetNodeInfo, nodenames) + es_nodes = filter(lambda n: _IsExclusiveStorageEnabledNode(self.cfg, n), + nodes) + if es_nodes: + # With exclusive storage we need to something smarter than just looking + # at free space; for now, let's simply abort the operation. + raise errors.OpPrereqError("Cannot grow disks when exclusive_storage" + " is enabled", errors.ECODE_STATE) + _CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace) + + def Exec(self, feedback_fn): + """Execute disk grow. + + """ + instance = self.instance + disk = self.disk + + assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE) + assert (self.owned_locks(locking.LEVEL_NODE) == + self.owned_locks(locking.LEVEL_NODE_RES)) + + wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks + + disks_ok, _ = _AssembleInstanceDisks(self, self.instance, disks=[disk]) + if not disks_ok: + raise errors.OpExecError("Cannot activate block device to grow") + + feedback_fn("Growing disk %s of instance '%s' by %s to %s" % + (self.op.disk, instance.name, + utils.FormatUnit(self.delta, "h"), + utils.FormatUnit(self.target, "h"))) + + # First run all grow ops in dry-run mode + for node in instance.all_nodes: + self.cfg.SetDiskID(disk, node) + result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta, + True, True) + result.Raise("Dry-run grow request failed to node %s" % node) + + if wipe_disks: + # Get disk size from primary node for wiping + result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk]) + result.Raise("Failed to retrieve disk size from node '%s'" % + instance.primary_node) + + (disk_size_in_bytes, ) = result.payload + + if disk_size_in_bytes is None: + raise errors.OpExecError("Failed to retrieve disk size from primary" + " node '%s'" % instance.primary_node) + + old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes) + + assert old_disk_size >= disk.size, \ + ("Retrieved disk size too small (got %s, should be at least %s)" % + (old_disk_size, disk.size)) + else: + old_disk_size = None + + # We know that (as far as we can test) operations across different + # nodes will succeed, time to run it for real on the backing storage + for node in instance.all_nodes: + self.cfg.SetDiskID(disk, node) + result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta, + False, True) + result.Raise("Grow request failed to node %s" % node) + + # And now execute it for logical storage, on the primary node + node = instance.primary_node + self.cfg.SetDiskID(disk, node) + result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta, + False, False) + result.Raise("Grow request failed to node %s" % node) + + disk.RecordGrow(self.delta) + self.cfg.Update(instance, feedback_fn) + + # Changes have been recorded, release node lock + _ReleaseLocks(self, locking.LEVEL_NODE) + + # Downgrade lock while waiting for sync + self.glm.downgrade(locking.LEVEL_INSTANCE) + + assert wipe_disks ^ (old_disk_size is None) + + if wipe_disks: + assert instance.disks[self.op.disk] == disk + + # Wipe newly added disk space + _WipeDisks(self, instance, + disks=[(self.op.disk, disk, old_disk_size)]) + + if self.op.wait_for_sync: + disk_abort = not _WaitForSync(self, instance, disks=[disk]) + if disk_abort: + self.LogWarning("Disk syncing has not returned a good status; check" + " the instance") + if instance.admin_state != constants.ADMINST_UP: + _SafeShutdownInstanceDisks(self, instance, disks=[disk]) + elif instance.admin_state != constants.ADMINST_UP: + self.LogWarning("Not shutting down the disk even if the instance is" + " not supposed to be running because no wait for" + " sync mode was requested") + + assert self.owned_locks(locking.LEVEL_NODE_RES) + assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE) + + +class LUInstanceReplaceDisks(LogicalUnit): + """Replace the disks of an instance. + + """ + HPATH = "mirrors-replace" + HTYPE = constants.HTYPE_INSTANCE + REQ_BGL = False + + def CheckArguments(self): + """Check arguments. + + """ + remote_node = self.op.remote_node + ialloc = self.op.iallocator + if self.op.mode == constants.REPLACE_DISK_CHG: + if remote_node is None and ialloc is None: + raise errors.OpPrereqError("When changing the secondary either an" + " iallocator script must be used or the" + " new node given", errors.ECODE_INVAL) + else: + _CheckIAllocatorOrNode(self, "iallocator", "remote_node") + + elif remote_node is not None or ialloc is not None: + # Not replacing the secondary + raise errors.OpPrereqError("The iallocator and new node options can" + " only be used when changing the" + " secondary node", errors.ECODE_INVAL) + + def ExpandNames(self): + self._ExpandAndLockInstance() + + assert locking.LEVEL_NODE not in self.needed_locks + assert locking.LEVEL_NODE_RES not in self.needed_locks + assert locking.LEVEL_NODEGROUP not in self.needed_locks + + assert self.op.iallocator is None or self.op.remote_node is None, \ + "Conflicting options" + + if self.op.remote_node is not None: + self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node) + + # Warning: do not remove the locking of the new secondary here + # unless DRBD8.AddChildren is changed to work in parallel; + # currently it doesn't since parallel invocations of + # FindUnusedMinor will conflict + self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + else: + self.needed_locks[locking.LEVEL_NODE] = [] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + + if self.op.iallocator is not None: + # iallocator will select a new node in the same group + self.needed_locks[locking.LEVEL_NODEGROUP] = [] + self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET + + self.needed_locks[locking.LEVEL_NODE_RES] = [] + + self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode, + self.op.iallocator, self.op.remote_node, + self.op.disks, self.op.early_release, + self.op.ignore_ipolicy) + + self.tasklets = [self.replacer] + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODEGROUP: + assert self.op.remote_node is None + assert self.op.iallocator is not None + assert not self.needed_locks[locking.LEVEL_NODEGROUP] + + self.share_locks[locking.LEVEL_NODEGROUP] = 1 + # Lock all groups used by instance optimistically; this requires going + # via the node before it's locked, requiring verification later on + self.needed_locks[locking.LEVEL_NODEGROUP] = \ + self.cfg.GetInstanceNodeGroups(self.op.instance_name) + + elif level == locking.LEVEL_NODE: + if self.op.iallocator is not None: + assert self.op.remote_node is None + assert not self.needed_locks[locking.LEVEL_NODE] + assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC) + + # Lock member nodes of all locked groups + self.needed_locks[locking.LEVEL_NODE] = \ + [node_name + for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP) + for node_name in self.cfg.GetNodeGroup(group_uuid).members] + else: + assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC) + + self._LockInstancesNodes() + + elif level == locking.LEVEL_NODE_RES: + # Reuse node locks + self.needed_locks[locking.LEVEL_NODE_RES] = \ + self.needed_locks[locking.LEVEL_NODE] + + def BuildHooksEnv(self): + """Build hooks env. + + This runs on the master, the primary and all the secondaries. + + """ + instance = self.replacer.instance + env = { + "MODE": self.op.mode, + "NEW_SECONDARY": self.op.remote_node, + "OLD_SECONDARY": instance.secondary_nodes[0], + } + env.update(_BuildInstanceHookEnvByObject(self, instance)) + return env + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ + instance = self.replacer.instance + nl = [ + self.cfg.GetMasterNode(), + instance.primary_node, + ] + if self.op.remote_node is not None: + nl.append(self.op.remote_node) + return nl, nl + + def CheckPrereq(self): + """Check prerequisites. + + """ + assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or + self.op.iallocator is None) + + # Verify if node group locks are still correct + owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) + if owned_groups: + _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups) + + return LogicalUnit.CheckPrereq(self) + + +class LUInstanceActivateDisks(NoHooksLU): + """Bring up an instance's disks. + + """ + REQ_BGL = False + + def ExpandNames(self): + self._ExpandAndLockInstance() + self.needed_locks[locking.LEVEL_NODE] = [] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE: + self._LockInstancesNodes() + + def CheckPrereq(self): + """Check prerequisites. + + This checks that the instance is in the cluster. + + """ + self.instance = self.cfg.GetInstanceInfo(self.op.instance_name) + assert self.instance is not None, \ + "Cannot retrieve locked instance %s" % self.op.instance_name + _CheckNodeOnline(self, self.instance.primary_node) + + def Exec(self, feedback_fn): + """Activate the disks. + + """ + disks_ok, disks_info = \ + _AssembleInstanceDisks(self, self.instance, + ignore_size=self.op.ignore_size) + if not disks_ok: + raise errors.OpExecError("Cannot activate block devices") + + if self.op.wait_for_sync: + if not _WaitForSync(self, self.instance): + raise errors.OpExecError("Some disks of the instance are degraded!") + + return disks_info + + +class LUInstanceDeactivateDisks(NoHooksLU): + """Shutdown an instance's disks. + + """ + REQ_BGL = False + + def ExpandNames(self): + self._ExpandAndLockInstance() + self.needed_locks[locking.LEVEL_NODE] = [] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE: + self._LockInstancesNodes() + + def CheckPrereq(self): + """Check prerequisites. + + This checks that the instance is in the cluster. + + """ + self.instance = self.cfg.GetInstanceInfo(self.op.instance_name) + assert self.instance is not None, \ + "Cannot retrieve locked instance %s" % self.op.instance_name + + def Exec(self, feedback_fn): + """Deactivate the disks + + """ + instance = self.instance + if self.op.force: + _ShutdownInstanceDisks(self, instance) + else: + _SafeShutdownInstanceDisks(self, instance) + + +def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary, + ldisk=False): + """Check that mirrors are not degraded. + + @attention: The device has to be annotated already. + + The ldisk parameter, if True, will change the test from the + is_degraded attribute (which represents overall non-ok status for + the device(s)) to the ldisk (representing the local storage status). + + """ + lu.cfg.SetDiskID(dev, node) + + result = True + + if on_primary or dev.AssembleOnSecondary(): + rstats = lu.rpc.call_blockdev_find(node, dev) + msg = rstats.fail_msg + if msg: + lu.LogWarning("Can't find disk on node %s: %s", node, msg) + result = False + elif not rstats.payload: + lu.LogWarning("Can't find disk on node %s", node) + result = False + else: + if ldisk: + result = result and rstats.payload.ldisk_status == constants.LDS_OKAY + else: + result = result and not rstats.payload.is_degraded + + if dev.children: + for child in dev.children: + result = result and _CheckDiskConsistencyInner(lu, instance, child, node, + on_primary) + + return result + + +def _CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False): + """Wrapper around L{_CheckDiskConsistencyInner}. + + """ + (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg) + return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary, + ldisk=ldisk) + + +def _BlockdevFind(lu, node, dev, instance): + """Wrapper around call_blockdev_find to annotate diskparams. + + @param lu: A reference to the lu object + @param node: The node to call out + @param dev: The device to find + @param instance: The instance object the device belongs to + @returns The result of the rpc call + + """ + (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg) + return lu.rpc.call_blockdev_find(node, disk) + + +def _GenerateUniqueNames(lu, exts): + """Generate a suitable LV name. + + This will generate a logical volume name for the given instance. + + """ + results = [] + for val in exts: + new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) + results.append("%s%s" % (new_id, val)) + return results + + +class TLReplaceDisks(Tasklet): + """Replaces disks for an instance. + + Note: Locking is not within the scope of this class. + + """ + def __init__(self, lu, instance_name, mode, iallocator_name, remote_node, + disks, early_release, ignore_ipolicy): + """Initializes this class. + + """ + Tasklet.__init__(self, lu) + + # Parameters + self.instance_name = instance_name + self.mode = mode + self.iallocator_name = iallocator_name + self.remote_node = remote_node + self.disks = disks + self.early_release = early_release + self.ignore_ipolicy = ignore_ipolicy + + # Runtime data + self.instance = None + self.new_node = None + self.target_node = None + self.other_node = None + self.remote_node_info = None + self.node_secondary_ip = None + + @staticmethod + def _RunAllocator(lu, iallocator_name, instance_name, relocate_from): + """Compute a new secondary node using an IAllocator. + + """ + req = iallocator.IAReqRelocate(name=instance_name, + relocate_from=list(relocate_from)) + ial = iallocator.IAllocator(lu.cfg, lu.rpc, req) + + ial.Run(iallocator_name) + + if not ial.success: + raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':" + " %s" % (iallocator_name, ial.info), + errors.ECODE_NORES) + + remote_node_name = ial.result[0] + + lu.LogInfo("Selected new secondary for instance '%s': %s", + instance_name, remote_node_name) + + return remote_node_name + + def _FindFaultyDisks(self, node_name): + """Wrapper for L{_FindFaultyInstanceDisks}. + + """ + return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance, + node_name, True) + + def _CheckDisksActivated(self, instance): + """Checks if the instance disks are activated. + + @param instance: The instance to check disks + @return: True if they are activated, False otherwise + + """ + nodes = instance.all_nodes + + for idx, dev in enumerate(instance.disks): + for node in nodes: + self.lu.LogInfo("Checking disk/%d on %s", idx, node) + self.cfg.SetDiskID(dev, node) + + result = _BlockdevFind(self, node, dev, instance) + + if result.offline: + continue + elif result.fail_msg or not result.payload: + return False + + return True + + def CheckPrereq(self): + """Check prerequisites. + + This checks that the instance is in the cluster. + + """ + self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name) + assert instance is not None, \ + "Cannot retrieve locked instance %s" % self.instance_name + + if instance.disk_template != constants.DT_DRBD8: + raise errors.OpPrereqError("Can only run replace disks for DRBD8-based" + " instances", errors.ECODE_INVAL) + + if len(instance.secondary_nodes) != 1: + raise errors.OpPrereqError("The instance has a strange layout," + " expected one secondary but found %d" % + len(instance.secondary_nodes), + errors.ECODE_FAULT) + + instance = self.instance + secondary_node = instance.secondary_nodes[0] + + if self.iallocator_name is None: + remote_node = self.remote_node + else: + remote_node = self._RunAllocator(self.lu, self.iallocator_name, + instance.name, instance.secondary_nodes) + + if remote_node is None: + self.remote_node_info = None + else: + assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \ + "Remote node '%s' is not locked" % remote_node + + self.remote_node_info = self.cfg.GetNodeInfo(remote_node) + assert self.remote_node_info is not None, \ + "Cannot retrieve locked node %s" % remote_node + + if remote_node == self.instance.primary_node: + raise errors.OpPrereqError("The specified node is the primary node of" + " the instance", errors.ECODE_INVAL) + + if remote_node == secondary_node: + raise errors.OpPrereqError("The specified node is already the" + " secondary node of the instance", + errors.ECODE_INVAL) + + if self.disks and self.mode in (constants.REPLACE_DISK_AUTO, + constants.REPLACE_DISK_CHG): + raise errors.OpPrereqError("Cannot specify disks to be replaced", + errors.ECODE_INVAL) + + if self.mode == constants.REPLACE_DISK_AUTO: + if not self._CheckDisksActivated(instance): + raise errors.OpPrereqError("Please run activate-disks on instance %s" + " first" % self.instance_name, + errors.ECODE_STATE) + faulty_primary = self._FindFaultyDisks(instance.primary_node) + faulty_secondary = self._FindFaultyDisks(secondary_node) + + if faulty_primary and faulty_secondary: + raise errors.OpPrereqError("Instance %s has faulty disks on more than" + " one node and can not be repaired" + " automatically" % self.instance_name, + errors.ECODE_STATE) + + if faulty_primary: + self.disks = faulty_primary + self.target_node = instance.primary_node + self.other_node = secondary_node + check_nodes = [self.target_node, self.other_node] + elif faulty_secondary: + self.disks = faulty_secondary + self.target_node = secondary_node + self.other_node = instance.primary_node + check_nodes = [self.target_node, self.other_node] + else: + self.disks = [] + check_nodes = [] + + else: + # Non-automatic modes + if self.mode == constants.REPLACE_DISK_PRI: + self.target_node = instance.primary_node + self.other_node = secondary_node + check_nodes = [self.target_node, self.other_node] + + elif self.mode == constants.REPLACE_DISK_SEC: + self.target_node = secondary_node + self.other_node = instance.primary_node + check_nodes = [self.target_node, self.other_node] + + elif self.mode == constants.REPLACE_DISK_CHG: + self.new_node = remote_node + self.other_node = instance.primary_node + self.target_node = secondary_node + check_nodes = [self.new_node, self.other_node] + + _CheckNodeNotDrained(self.lu, remote_node) + _CheckNodeVmCapable(self.lu, remote_node) + + old_node_info = self.cfg.GetNodeInfo(secondary_node) + assert old_node_info is not None + if old_node_info.offline and not self.early_release: + # doesn't make sense to delay the release + self.early_release = True + self.lu.LogInfo("Old secondary %s is offline, automatically enabling" + " early-release mode", secondary_node) + + else: + raise errors.ProgrammerError("Unhandled disk replace mode (%s)" % + self.mode) + + # If not specified all disks should be replaced + if not self.disks: + self.disks = range(len(self.instance.disks)) + + # TODO: This is ugly, but right now we can't distinguish between internal + # submitted opcode and external one. We should fix that. + if self.remote_node_info: + # We change the node, lets verify it still meets instance policy + new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group) + cluster = self.cfg.GetClusterInfo() + ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, + new_group_info) + _CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info, + self.cfg, ignore=self.ignore_ipolicy) + + for node in check_nodes: + _CheckNodeOnline(self.lu, node) + + touched_nodes = frozenset(node_name for node_name in [self.new_node, + self.other_node, + self.target_node] + if node_name is not None) + + # Release unneeded node and node resource locks + _ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes) + _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes) + _ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC) + + # Release any owned node group + _ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP) + + # Check whether disks are valid + for disk_idx in self.disks: + instance.FindDisk(disk_idx) + + # Get secondary node IP addresses + self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node) + in self.cfg.GetMultiNodeInfo(touched_nodes)) + + def Exec(self, feedback_fn): + """Execute disk replacement. + + This dispatches the disk replacement to the appropriate handler. + + """ + if __debug__: + # Verify owned locks before starting operation + owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE) + assert set(owned_nodes) == set(self.node_secondary_ip), \ + ("Incorrect node locks, owning %s, expected %s" % + (owned_nodes, self.node_secondary_ip.keys())) + assert (self.lu.owned_locks(locking.LEVEL_NODE) == + self.lu.owned_locks(locking.LEVEL_NODE_RES)) + assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC) + + owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE) + assert list(owned_instances) == [self.instance_name], \ + "Instance '%s' not locked" % self.instance_name + + assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \ + "Should not own any node group lock at this point" + + if not self.disks: + feedback_fn("No disks need replacement for instance '%s'" % + self.instance.name) + return + + feedback_fn("Replacing disk(s) %s for instance '%s'" % + (utils.CommaJoin(self.disks), self.instance.name)) + feedback_fn("Current primary node: %s" % self.instance.primary_node) + feedback_fn("Current seconary node: %s" % + utils.CommaJoin(self.instance.secondary_nodes)) + + activate_disks = (self.instance.admin_state != constants.ADMINST_UP) + + # Activate the instance disks if we're replacing them on a down instance + if activate_disks: + _StartInstanceDisks(self.lu, self.instance, True) + + try: + # Should we replace the secondary node? + if self.new_node is not None: + fn = self._ExecDrbd8Secondary + else: + fn = self._ExecDrbd8DiskOnly + + result = fn(feedback_fn) + finally: + # Deactivate the instance disks if we're replacing them on a + # down instance + if activate_disks: + _SafeShutdownInstanceDisks(self.lu, self.instance) + + assert not self.lu.owned_locks(locking.LEVEL_NODE) + + if __debug__: + # Verify owned locks + owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES) + nodes = frozenset(self.node_secondary_ip) + assert ((self.early_release and not owned_nodes) or + (not self.early_release and not (set(owned_nodes) - nodes))), \ + ("Not owning the correct locks, early_release=%s, owned=%r," + " nodes=%r" % (self.early_release, owned_nodes, nodes)) + + return result + + def _CheckVolumeGroup(self, nodes): + self.lu.LogInfo("Checking volume groups") + + vgname = self.cfg.GetVGName() + + # Make sure volume group exists on all involved nodes + results = self.rpc.call_vg_list(nodes) + if not results: + raise errors.OpExecError("Can't list volume groups on the nodes") + + for node in nodes: + res = results[node] + res.Raise("Error checking node %s" % node) + if vgname not in res.payload: + raise errors.OpExecError("Volume group '%s' not found on node %s" % + (vgname, node)) + + def _CheckDisksExistence(self, nodes): + # Check disk existence + for idx, dev in enumerate(self.instance.disks): + if idx not in self.disks: + continue + + for node in nodes: + self.lu.LogInfo("Checking disk/%d on %s", idx, node) + self.cfg.SetDiskID(dev, node) + + result = _BlockdevFind(self, node, dev, self.instance) + + msg = result.fail_msg + if msg or not result.payload: + if not msg: + msg = "disk not found" + raise errors.OpExecError("Can't find disk/%d on node %s: %s" % + (idx, node, msg)) + + def _CheckDisksConsistency(self, node_name, on_primary, ldisk): + for idx, dev in enumerate(self.instance.disks): + if idx not in self.disks: + continue + + self.lu.LogInfo("Checking disk/%d consistency on node %s" % + (idx, node_name)) + + if not _CheckDiskConsistency(self.lu, self.instance, dev, node_name, + on_primary, ldisk=ldisk): + raise errors.OpExecError("Node %s has degraded storage, unsafe to" + " replace disks for instance %s" % + (node_name, self.instance.name)) + + def _CreateNewStorage(self, node_name): + """Create new storage on the primary or secondary node. + + This is only used for same-node replaces, not for changing the + secondary node, hence we don't want to modify the existing disk. + + """ + iv_names = {} + + disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg) + for idx, dev in enumerate(disks): + if idx not in self.disks: + continue + + self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx) + + self.cfg.SetDiskID(dev, node_name) + + lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]] + names = _GenerateUniqueNames(self.lu, lv_names) + + (data_disk, meta_disk) = dev.children + vg_data = data_disk.logical_id[0] + lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size, + logical_id=(vg_data, names[0]), + params=data_disk.params) + vg_meta = meta_disk.logical_id[0] + lv_meta = objects.Disk(dev_type=constants.LD_LV, + size=constants.DRBD_META_SIZE, + logical_id=(vg_meta, names[1]), + params=meta_disk.params) + + new_lvs = [lv_data, lv_meta] + old_lvs = [child.Copy() for child in dev.children] + iv_names[dev.iv_name] = (dev, old_lvs, new_lvs) + excl_stor = _IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name) + + # we pass force_create=True to force the LVM creation + for new_lv in new_lvs: + _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True, + _GetInstanceInfoText(self.instance), False, + excl_stor) + + return iv_names + + def _CheckDevices(self, node_name, iv_names): + for name, (dev, _, _) in iv_names.iteritems(): + self.cfg.SetDiskID(dev, node_name) + + result = _BlockdevFind(self, node_name, dev, self.instance) + + msg = result.fail_msg + if msg or not result.payload: + if not msg: + msg = "disk not found" + raise errors.OpExecError("Can't find DRBD device %s: %s" % + (name, msg)) + + if result.payload.is_degraded: + raise errors.OpExecError("DRBD device %s is degraded!" % name) + + def _RemoveOldStorage(self, node_name, iv_names): + for name, (_, old_lvs, _) in iv_names.iteritems(): + self.lu.LogInfo("Remove logical volumes for %s", name) + + for lv in old_lvs: + self.cfg.SetDiskID(lv, node_name) + + msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg + if msg: + self.lu.LogWarning("Can't remove old LV: %s", msg, + hint="remove unused LVs manually") + + def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613 + """Replace a disk on the primary or secondary for DRBD 8. + + The algorithm for replace is quite complicated: + + 1. for each disk to be replaced: + + 1. create new LVs on the target node with unique names + 1. detach old LVs from the drbd device + 1. rename old LVs to name_replaced. + 1. rename new LVs to old LVs + 1. attach the new LVs (with the old names now) to the drbd device + + 1. wait for sync across all devices + + 1. for each modified disk: + + 1. remove old LVs (which have the name name_replaces.) + + Failures are not very well handled. + + """ + steps_total = 6 + + # Step: check device activation + self.lu.LogStep(1, steps_total, "Check device existence") + self._CheckDisksExistence([self.other_node, self.target_node]) + self._CheckVolumeGroup([self.target_node, self.other_node]) + + # Step: check other node consistency + self.lu.LogStep(2, steps_total, "Check peer consistency") + self._CheckDisksConsistency(self.other_node, + self.other_node == self.instance.primary_node, + False) + + # Step: create new storage + self.lu.LogStep(3, steps_total, "Allocate new storage") + iv_names = self._CreateNewStorage(self.target_node) + + # Step: for each lv, detach+rename*2+attach + self.lu.LogStep(4, steps_total, "Changing drbd configuration") + for dev, old_lvs, new_lvs in iv_names.itervalues(): + self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name) + + result = self.rpc.call_blockdev_removechildren(self.target_node, dev, + old_lvs) + result.Raise("Can't detach drbd from local storage on node" + " %s for device %s" % (self.target_node, dev.iv_name)) + #dev.children = [] + #cfg.Update(instance) + + # ok, we created the new LVs, so now we know we have the needed + # storage; as such, we proceed on the target node to rename + # old_lv to _old, and new_lv to old_lv; note that we rename LVs + # using the assumption that logical_id == physical_id (which in + # turn is the unique_id on that node) + + # FIXME(iustin): use a better name for the replaced LVs + temp_suffix = int(time.time()) + ren_fn = lambda d, suff: (d.physical_id[0], + d.physical_id[1] + "_replaced-%s" % suff) + + # Build the rename list based on what LVs exist on the node + rename_old_to_new = [] + for to_ren in old_lvs: + result = self.rpc.call_blockdev_find(self.target_node, to_ren) + if not result.fail_msg and result.payload: + # device exists + rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix))) + + self.lu.LogInfo("Renaming the old LVs on the target node") + result = self.rpc.call_blockdev_rename(self.target_node, + rename_old_to_new) + result.Raise("Can't rename old LVs on node %s" % self.target_node) + + # Now we rename the new LVs to the old LVs + self.lu.LogInfo("Renaming the new LVs on the target node") + rename_new_to_old = [(new, old.physical_id) + for old, new in zip(old_lvs, new_lvs)] + result = self.rpc.call_blockdev_rename(self.target_node, + rename_new_to_old) + result.Raise("Can't rename new LVs on node %s" % self.target_node) + + # Intermediate steps of in memory modifications + for old, new in zip(old_lvs, new_lvs): + new.logical_id = old.logical_id + self.cfg.SetDiskID(new, self.target_node) + + # We need to modify old_lvs so that removal later removes the + # right LVs, not the newly added ones; note that old_lvs is a + # copy here + for disk in old_lvs: + disk.logical_id = ren_fn(disk, temp_suffix) + self.cfg.SetDiskID(disk, self.target_node) + + # Now that the new lvs have the old name, we can add them to the device + self.lu.LogInfo("Adding new mirror component on %s", self.target_node) + result = self.rpc.call_blockdev_addchildren(self.target_node, + (dev, self.instance), new_lvs) + msg = result.fail_msg + if msg: + for new_lv in new_lvs: + msg2 = self.rpc.call_blockdev_remove(self.target_node, + new_lv).fail_msg + if msg2: + self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2, + hint=("cleanup manually the unused logical" + "volumes")) + raise errors.OpExecError("Can't add local storage to drbd: %s" % msg) + + cstep = itertools.count(5) + + if self.early_release: + self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") + self._RemoveOldStorage(self.target_node, iv_names) + # TODO: Check if releasing locks early still makes sense + _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES) + else: + # Release all resource locks except those used by the instance + _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, + keep=self.node_secondary_ip.keys()) + + # Release all node locks while waiting for sync + _ReleaseLocks(self.lu, locking.LEVEL_NODE) + + # TODO: Can the instance lock be downgraded here? Take the optional disk + # shutdown in the caller into consideration. + + # Wait for sync + # This can fail as the old devices are degraded and _WaitForSync + # does a combined result over all disks, so we don't check its return value + self.lu.LogStep(cstep.next(), steps_total, "Sync devices") + _WaitForSync(self.lu, self.instance) + + # Check all devices manually + self._CheckDevices(self.instance.primary_node, iv_names) + + # Step: remove old storage + if not self.early_release: + self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") + self._RemoveOldStorage(self.target_node, iv_names) + + def _ExecDrbd8Secondary(self, feedback_fn): + """Replace the secondary node for DRBD 8. + + The algorithm for replace is quite complicated: + - for all disks of the instance: + - create new LVs on the new node with same names + - shutdown the drbd device on the old secondary + - disconnect the drbd network on the primary + - create the drbd device on the new secondary + - network attach the drbd on the primary, using an artifice: + the drbd code for Attach() will connect to the network if it + finds a device which is connected to the good local disks but + not network enabled + - wait for sync across all devices + - remove all disks from the old secondary + + Failures are not very well handled. + + """ + steps_total = 6 + + pnode = self.instance.primary_node + + # Step: check device activation + self.lu.LogStep(1, steps_total, "Check device existence") + self._CheckDisksExistence([self.instance.primary_node]) + self._CheckVolumeGroup([self.instance.primary_node]) + + # Step: check other node consistency + self.lu.LogStep(2, steps_total, "Check peer consistency") + self._CheckDisksConsistency(self.instance.primary_node, True, True) + + # Step: create new storage + self.lu.LogStep(3, steps_total, "Allocate new storage") + disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg) + excl_stor = _IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node) + for idx, dev in enumerate(disks): + self.lu.LogInfo("Adding new local storage on %s for disk/%d" % + (self.new_node, idx)) + # we pass force_create=True to force LVM creation + for new_lv in dev.children: + _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv, + True, _GetInstanceInfoText(self.instance), False, + excl_stor) + + # Step 4: dbrd minors and drbd setups changes + # after this, we must manually remove the drbd minors on both the + # error and the success paths + self.lu.LogStep(4, steps_total, "Changing drbd configuration") + minors = self.cfg.AllocateDRBDMinor([self.new_node + for dev in self.instance.disks], + self.instance.name) + logging.debug("Allocated minors %r", minors) + + iv_names = {} + for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)): + self.lu.LogInfo("activating a new drbd on %s for disk/%d" % + (self.new_node, idx)) + # create new devices on new_node; note that we create two IDs: + # one without port, so the drbd will be activated without + # networking information on the new node at this stage, and one + # with network, for the latter activation in step 4 + (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id + if self.instance.primary_node == o_node1: + p_minor = o_minor1 + else: + assert self.instance.primary_node == o_node2, "Three-node instance?" + p_minor = o_minor2 + + new_alone_id = (self.instance.primary_node, self.new_node, None, + p_minor, new_minor, o_secret) + new_net_id = (self.instance.primary_node, self.new_node, o_port, + p_minor, new_minor, o_secret) + + iv_names[idx] = (dev, dev.children, new_net_id) + logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor, + new_net_id) + new_drbd = objects.Disk(dev_type=constants.LD_DRBD8, + logical_id=new_alone_id, + children=dev.children, + size=dev.size, + params={}) + (anno_new_drbd,) = _AnnotateDiskParams(self.instance, [new_drbd], + self.cfg) + try: + _CreateSingleBlockDev(self.lu, self.new_node, self.instance, + anno_new_drbd, + _GetInstanceInfoText(self.instance), False, + excl_stor) + except errors.GenericError: + self.cfg.ReleaseDRBDMinors(self.instance.name) + raise + + # We have new devices, shutdown the drbd on the old secondary + for idx, dev in enumerate(self.instance.disks): + self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx) + self.cfg.SetDiskID(dev, self.target_node) + msg = self.rpc.call_blockdev_shutdown(self.target_node, + (dev, self.instance)).fail_msg + if msg: + self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old" + "node: %s" % (idx, msg), + hint=("Please cleanup this device manually as" + " soon as possible")) + + self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)") + result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip, + self.instance.disks)[pnode] + + msg = result.fail_msg + if msg: + # detaches didn't succeed (unlikely) + self.cfg.ReleaseDRBDMinors(self.instance.name) + raise errors.OpExecError("Can't detach the disks from the network on" + " old node: %s" % (msg,)) + + # if we managed to detach at least one, we update all the disks of + # the instance to point to the new secondary + self.lu.LogInfo("Updating instance configuration") + for dev, _, new_logical_id in iv_names.itervalues(): + dev.logical_id = new_logical_id + self.cfg.SetDiskID(dev, self.instance.primary_node) + + self.cfg.Update(self.instance, feedback_fn) + + # Release all node locks (the configuration has been updated) + _ReleaseLocks(self.lu, locking.LEVEL_NODE) + + # and now perform the drbd attach + self.lu.LogInfo("Attaching primary drbds to new secondary" + " (standalone => connected)") + result = self.rpc.call_drbd_attach_net([self.instance.primary_node, + self.new_node], + self.node_secondary_ip, + (self.instance.disks, self.instance), + self.instance.name, + False) + for to_node, to_result in result.items(): + msg = to_result.fail_msg + if msg: + self.lu.LogWarning("Can't attach drbd disks on node %s: %s", + to_node, msg, + hint=("please do a gnt-instance info to see the" + " status of disks")) + + cstep = itertools.count(5) + + if self.early_release: + self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") + self._RemoveOldStorage(self.target_node, iv_names) + # TODO: Check if releasing locks early still makes sense + _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES) + else: + # Release all resource locks except those used by the instance + _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, + keep=self.node_secondary_ip.keys()) + + # TODO: Can the instance lock be downgraded here? Take the optional disk + # shutdown in the caller into consideration. + + # Wait for sync + # This can fail as the old devices are degraded and _WaitForSync + # does a combined result over all disks, so we don't check its return value + self.lu.LogStep(cstep.next(), steps_total, "Sync devices") + _WaitForSync(self.lu, self.instance) + + # Check all devices manually + self._CheckDevices(self.instance.primary_node, iv_names) + + # Step: remove old storage + if not self.early_release: + self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") + self._RemoveOldStorage(self.target_node, iv_names) diff --git a/lib/cmdlib/instance_utils.py b/lib/cmdlib/instance_utils.py index 8718af3..bb760ff 100644 --- a/lib/cmdlib/instance_utils.py +++ b/lib/cmdlib/instance_utils.py @@ -31,8 +31,8 @@ from ganeti import network from ganeti import objects from ganeti import pathutils from ganeti import utils - -from ganeti.cmdlib.common import _AnnotateDiskParams +from ganeti.cmdlib.common import _AnnotateDiskParams, \ + _ComputeIPolicyInstanceViolation def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, @@ -202,45 +202,17 @@ def _CheckNodeNotDrained(lu, node): errors.ECODE_STATE) -def _StartInstanceDisks(lu, instance, force): - """Start the disks of an instance. - - """ - disks_ok, _ = _AssembleInstanceDisks(lu, instance, - ignore_secondaries=force) - if not disks_ok: - _ShutdownInstanceDisks(lu, instance) - if force is not None and not force: - lu.LogWarning("", - hint=("If the message above refers to a secondary node," - " you can retry the operation using '--force'")) - raise errors.OpExecError("Disk consistency error") - - -def _ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False): - """Shutdown block devices of an instance. +def _CheckNodeVmCapable(lu, node): + """Ensure that a given node is vm capable. - This does the shutdown on all nodes of the instance. - - If the ignore_primary is false, errors on the primary node are - ignored. + @param lu: the LU on behalf of which we make the check + @param node: the node to check + @raise errors.OpPrereqError: if the node is not vm capable """ - all_result = True - disks = _ExpandCheckDisks(instance, disks) - - for disk in disks: - for node, top_disk in disk.ComputeNodeTree(instance.primary_node): - lu.cfg.SetDiskID(top_disk, node) - result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance)) - msg = result.fail_msg - if msg: - lu.LogWarning("Could not shutdown block device %s on node %s: %s", - disk.iv_name, node, msg) - if ((node == instance.primary_node and not ignore_primary) or - (node != instance.primary_node and not result.offline)): - all_result = False - return all_result + if not lu.cfg.GetNodeInfo(node).vm_capable: + raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node, + errors.ECODE_STATE) def _RemoveInstance(lu, feedback_fn, instance, ignore_failures): @@ -265,98 +237,6 @@ def _RemoveInstance(lu, feedback_fn, instance, ignore_failures): lu.remove_locks[locking.LEVEL_INSTANCE] = instance.name -def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False, - ignore_size=False): - """Prepare the block devices for an instance. - - This sets up the block devices on all nodes. - - @type lu: L{LogicalUnit} - @param lu: the logical unit on whose behalf we execute - @type instance: L{objects.Instance} - @param instance: the instance for whose disks we assemble - @type disks: list of L{objects.Disk} or None - @param disks: which disks to assemble (or all, if None) - @type ignore_secondaries: boolean - @param ignore_secondaries: if true, errors on secondary nodes - won't result in an error return from the function - @type ignore_size: boolean - @param ignore_size: if true, the current known size of the disk - will not be used during the disk activation, useful for cases - when the size is wrong - @return: False if the operation failed, otherwise a list of - (host, instance_visible_name, node_visible_name) - with the mapping from node devices to instance devices - - """ - device_info = [] - disks_ok = True - iname = instance.name - disks = _ExpandCheckDisks(instance, disks) - - # With the two passes mechanism we try to reduce the window of - # opportunity for the race condition of switching DRBD to primary - # before handshaking occured, but we do not eliminate it - - # The proper fix would be to wait (with some limits) until the - # connection has been made and drbd transitions from WFConnection - # into any other network-connected state (Connected, SyncTarget, - # SyncSource, etc.) - - # 1st pass, assemble on all nodes in secondary mode - for idx, inst_disk in enumerate(disks): - for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node): - if ignore_size: - node_disk = node_disk.Copy() - node_disk.UnsetSize() - lu.cfg.SetDiskID(node_disk, node) - result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname, - False, idx) - msg = result.fail_msg - if msg: - is_offline_secondary = (node in instance.secondary_nodes and - result.offline) - lu.LogWarning("Could not prepare block device %s on node %s" - " (is_primary=False, pass=1): %s", - inst_disk.iv_name, node, msg) - if not (ignore_secondaries or is_offline_secondary): - disks_ok = False - - # FIXME: race condition on drbd migration to primary - - # 2nd pass, do only the primary node - for idx, inst_disk in enumerate(disks): - dev_path = None - - for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node): - if node != instance.primary_node: - continue - if ignore_size: - node_disk = node_disk.Copy() - node_disk.UnsetSize() - lu.cfg.SetDiskID(node_disk, node) - result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname, - True, idx) - msg = result.fail_msg - if msg: - lu.LogWarning("Could not prepare block device %s on node %s" - " (is_primary=True, pass=2): %s", - inst_disk.iv_name, node, msg) - disks_ok = False - else: - dev_path = result.payload - - device_info.append((instance.primary_node, inst_disk.iv_name, dev_path)) - - # leave the disks configured for the primary node - # this is a workaround that would be fixed better by - # improving the logical/physical id handling - for disk in disks: - lu.cfg.SetDiskID(disk, instance.primary_node) - - return disks_ok, device_info - - def _RemoveDisks(lu, instance, target_node=None, ignore_failures=False): """Remove all disks for an instance. @@ -416,24 +296,6 @@ def _RemoveDisks(lu, instance, target_node=None, ignore_failures=False): return all_result -def _ExpandCheckDisks(instance, disks): - """Return the instance disks selected by the disks list - - @type disks: list of L{objects.Disk} or None - @param disks: selected disks - @rtype: list of L{objects.Disk} - @return: selected instance disks to act on - - """ - if disks is None: - return instance.disks - else: - if not set(disks).issubset(instance.disks): - raise errors.ProgrammerError("Can only act on disks belonging to the" - " target instance") - return disks - - def _NICToTuple(lu, nic): """Build a tupple of nic information. @@ -470,3 +332,119 @@ def _NICListToTuple(lu, nics): for nic in nics: hooks_nics.append(_NICToTuple(lu, nic)) return hooks_nics + + +def _CopyLockList(names): + """Makes a copy of a list of lock names. + + Handles L{locking.ALL_SET} correctly. + + """ + if names == locking.ALL_SET: + return locking.ALL_SET + else: + return names[:] + + +def _ReleaseLocks(lu, level, names=None, keep=None): + """Releases locks owned by an LU. + + @type lu: L{LogicalUnit} + @param level: Lock level + @type names: list or None + @param names: Names of locks to release + @type keep: list or None + @param keep: Names of locks to retain + + """ + assert not (keep is not None and names is not None), \ + "Only one of the 'names' and the 'keep' parameters can be given" + + if names is not None: + should_release = names.__contains__ + elif keep: + should_release = lambda name: name not in keep + else: + should_release = None + + owned = lu.owned_locks(level) + if not owned: + # Not owning any lock at this level, do nothing + pass + + elif should_release: + retain = [] + release = [] + + # Determine which locks to release + for name in owned: + if should_release(name): + release.append(name) + else: + retain.append(name) + + assert len(lu.owned_locks(level)) == (len(retain) + len(release)) + + # Release just some locks + lu.glm.release(level, names=release) + + assert frozenset(lu.owned_locks(level)) == frozenset(retain) + else: + # Release everything + lu.glm.release(level) + + assert not lu.glm.is_owned(level), "No locks should be owned" + + +def _ComputeIPolicyNodeViolation(ipolicy, instance, current_group, + target_group, cfg, + _compute_fn=_ComputeIPolicyInstanceViolation): + """Compute if instance meets the specs of the new target group. + + @param ipolicy: The ipolicy to verify + @param instance: The instance object to verify + @param current_group: The current group of the instance + @param target_group: The new group of the instance + @type cfg: L{config.ConfigWriter} + @param cfg: Cluster configuration + @param _compute_fn: The function to verify ipolicy (unittest only) + @see: L{ganeti.cmdlib.common._ComputeIPolicySpecViolation} + + """ + if current_group == target_group: + return [] + else: + return _compute_fn(ipolicy, instance, cfg) + + +def _CheckTargetNodeIPolicy(lu, ipolicy, instance, node, cfg, ignore=False, + _compute_fn=_ComputeIPolicyNodeViolation): + """Checks that the target node is correct in terms of instance policy. + + @param ipolicy: The ipolicy to verify + @param instance: The instance object to verify + @param node: The new node to relocate + @type cfg: L{config.ConfigWriter} + @param cfg: Cluster configuration + @param ignore: Ignore violations of the ipolicy + @param _compute_fn: The function to verify ipolicy (unittest only) + @see: L{ganeti.cmdlib.common._ComputeIPolicySpecViolation} + + """ + primary_node = lu.cfg.GetNodeInfo(instance.primary_node) + res = _compute_fn(ipolicy, instance, primary_node.group, node.group, cfg) + + if res: + msg = ("Instance does not meet target node group's (%s) instance" + " policy: %s") % (node.group, utils.CommaJoin(res)) + if ignore: + lu.LogWarning(msg) + else: + raise errors.OpPrereqError(msg, errors.ECODE_INVAL) + + +def _GetInstanceInfoText(instance): + """Compute that text that should be added to the disk's metadata. + + """ + return "originstname+%s" % instance.name diff --git a/test/py/ganeti.cmdlib_unittest.py b/test/py/ganeti.cmdlib_unittest.py index e888d72..bb73ed5 100755 --- a/test/py/ganeti.cmdlib_unittest.py +++ b/test/py/ganeti.cmdlib_unittest.py @@ -36,6 +36,8 @@ from ganeti import cmdlib from ganeti.cmdlib import cluster from ganeti.cmdlib import group from ganeti.cmdlib import instance +from ganeti.cmdlib import instance_storage +from ganeti.cmdlib import instance_utils from ganeti.cmdlib import common from ganeti.cmdlib import query from ganeti import opcodes @@ -888,18 +890,20 @@ class TestComputeIPolicyNodeViolation(unittest.TestCase): self.recorder = _CallRecorder(return_value=[]) def testSameGroup(self): - ret = instance._ComputeIPolicyNodeViolation(NotImplemented, - NotImplemented, - "foo", "foo", NotImplemented, - _compute_fn=self.recorder) + ret = instance_utils._ComputeIPolicyNodeViolation( + NotImplemented, + NotImplemented, + "foo", "foo", NotImplemented, + _compute_fn=self.recorder) self.assertFalse(self.recorder.called) self.assertEqual(ret, []) def testDifferentGroup(self): - ret = instance._ComputeIPolicyNodeViolation(NotImplemented, - NotImplemented, - "foo", "bar", NotImplemented, - _compute_fn=self.recorder) + ret = instance_utils._ComputeIPolicyNodeViolation( + NotImplemented, + NotImplemented, + "foo", "bar", NotImplemented, + _compute_fn=self.recorder) self.assertTrue(self.recorder.called) self.assertEqual(ret, []) @@ -1605,7 +1609,7 @@ class TestDiskSizeInBytesToMebibytes(unittest.TestCase): def testLessThanOneMebibyte(self): for i in [1, 2, 7, 512, 1000, 1023]: lu = _FakeLU() - result = instance._DiskSizeInBytesToMebibytes(lu, i) + result = instance_storage._DiskSizeInBytesToMebibytes(lu, i) self.assertEqual(result, 1) self.assertEqual(len(lu.warning_log), 1) self.assertEqual(len(lu.warning_log[0]), 2) @@ -1615,7 +1619,8 @@ class TestDiskSizeInBytesToMebibytes(unittest.TestCase): def testEven(self): for i in [1, 2, 7, 512, 1000, 1023]: lu = _FakeLU() - result = instance._DiskSizeInBytesToMebibytes(lu, i * 1024 * 1024) + result = instance_storage._DiskSizeInBytesToMebibytes(lu, + i * 1024 * 1024) self.assertEqual(result, i) self.assertFalse(lu.warning_log) @@ -1624,7 +1629,7 @@ class TestDiskSizeInBytesToMebibytes(unittest.TestCase): for j in [1, 2, 486, 326, 986, 1023]: lu = _FakeLU() size = (1024 * 1024 * i) + j - result = instance._DiskSizeInBytesToMebibytes(lu, size) + result = instance_storage._DiskSizeInBytesToMebibytes(lu, size) self.assertEqual(result, i + 1, msg="Amount was not rounded up") self.assertEqual(len(lu.warning_log), 1) self.assertEqual(len(lu.warning_log[0]), 2)