X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/2c2f257d065d8d16355dcf798c06aee796700936..99ccf8b915722aaed2029d189b3d522d5a6c8760:/lib/cmdlib.py diff --git a/lib/cmdlib.py b/lib/cmdlib.py index fb49df1..b672fdd 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc. +# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -26,13 +26,12 @@ # W0201 since most LU attributes are defined in CheckPrereq or similar # functions -# C0302: since we have waaaay to many lines in this module +# C0302: since we have waaaay too many lines in this module import os import os.path import time import re -import platform import logging import copy import OpenSSL @@ -59,15 +58,31 @@ from ganeti import query from ganeti import qlang from ganeti import opcodes from ganeti import ht +from ganeti import rpc +from ganeti import runtime import ganeti.masterd.instance # pylint: disable=W0611 +#: Size of DRBD meta block device +DRBD_META_SIZE = 128 + +# States of instance +INSTANCE_DOWN = [constants.ADMINST_DOWN] +INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP] +INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE] + +#: Instance status in which an instance can be marked as offline/online +CAN_CHANGE_INSTANCE_OFFLINE = (frozenset(INSTANCE_DOWN) | frozenset([ + constants.ADMINST_OFFLINE, + ])) + + class ResultWithJobs: """Data container for LU results with jobs. Instances of this class returned from L{LogicalUnit.Exec} will be recognized - by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs + by L{mcpu._ProcessResult}. The latter will then submit the jobs contained in the C{jobs} attribute and include the job IDs in the opcode result. @@ -108,7 +123,7 @@ class LogicalUnit(object): HTYPE = None REQ_BGL = True - def __init__(self, processor, op, context, rpc): + def __init__(self, processor, op, context, rpc_runner): """Constructor for LogicalUnit. This needs to be overridden in derived classes in order to check op @@ -122,7 +137,7 @@ class LogicalUnit(object): # readability alias self.owned_locks = context.glm.list_owned self.context = context - self.rpc = rpc + self.rpc = rpc_runner # Dicts used to declare locking needs to mcpu self.needed_locks = None self.share_locks = dict.fromkeys(locking.LEVELS, 0) @@ -181,9 +196,15 @@ class LogicalUnit(object): as values. Rules: - use an empty dict if you don't need any lock - - if you don't need any lock at a particular level omit that level + - if you don't need any lock at a particular level omit that + level (note that in this case C{DeclareLocks} won't be called + at all for that level) + - if you need locks at a level, but you can't calculate it in + this function, initialise that level with an empty list and do + further processing in L{LogicalUnit.DeclareLocks} (see that + function's docstring) - don't put anything for the BGL level - - if you want all locks at a level use locking.ALL_SET as a value + - if you want all locks at a level use L{locking.ALL_SET} as a value If you need to share locks (rather than acquire them exclusively) at one level you can modify self.share_locks, setting a true value (usually 1) for @@ -230,7 +251,7 @@ class LogicalUnit(object): self.needed_locks for the level. @param level: Locking level which is going to be locked - @type level: member of ganeti.locking.LEVELS + @type level: member of L{ganeti.locking.LEVELS} """ @@ -344,7 +365,8 @@ class LogicalUnit(object): self.op.instance_name) self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name - def _LockInstancesNodes(self, primary_only=False): + def _LockInstancesNodes(self, primary_only=False, + level=locking.LEVEL_NODE): """Helper function to declare instances' nodes for locking. This function should be called after locking one or more instances to lock @@ -365,9 +387,10 @@ class LogicalUnit(object): @type primary_only: boolean @param primary_only: only lock primary nodes of locked instances + @param level: Which lock level to use for locking nodes """ - assert locking.LEVEL_NODE in self.recalculate_locks, \ + assert level in self.recalculate_locks, \ "_LockInstancesNodes helper function called with no nodes to recalculate" # TODO: check if we're really been called with the instance locks held @@ -382,12 +405,14 @@ class LogicalUnit(object): if not primary_only: wanted_nodes.extend(instance.secondary_nodes) - if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE: - self.needed_locks[locking.LEVEL_NODE] = wanted_nodes - elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND: - self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes) + if self.recalculate_locks[level] == constants.LOCKS_REPLACE: + self.needed_locks[level] = wanted_nodes + elif self.recalculate_locks[level] == constants.LOCKS_APPEND: + self.needed_locks[level].extend(wanted_nodes) + else: + raise errors.ProgrammerError("Unknown recalculation mode") - del self.recalculate_locks[locking.LEVEL_NODE] + del self.recalculate_locks[level] class NoHooksLU(LogicalUnit): # pylint: disable=W0223 @@ -468,14 +493,17 @@ class _QueryBase: #: Attribute holding field definitions FIELDS = None - def __init__(self, filter_, fields, use_locking): + #: Field to sort by + SORT_FIELD = "name" + + def __init__(self, qfilter, fields, use_locking): """Initializes this class. """ self.use_locking = use_locking - self.query = query.Query(self.FIELDS, fields, filter_=filter_, - namefield="name") + self.query = query.Query(self.FIELDS, fields, qfilter=qfilter, + namefield=self.SORT_FIELD) self.requested_data = self.query.RequestedData() self.names = self.query.RequestedNames() @@ -557,6 +585,46 @@ def _ShareAll(): return dict.fromkeys(locking.LEVELS, 1) +def _MakeLegacyNodeInfo(data): + """Formats the data returned by L{rpc.RpcRunner.call_node_info}. + + Converts the data into a single dictionary. This is fine for most use cases, + but some require information from more than one volume group or hypervisor. + + """ + (bootid, (vg_info, ), (hv_info, )) = data + + return utils.JoinDisjointDicts(utils.JoinDisjointDicts(vg_info, hv_info), { + "bootid": bootid, + }) + + +def _CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_nodes, + cur_group_uuid): + """Checks if node groups for locked instances are still correct. + + @type cfg: L{config.ConfigWriter} + @param cfg: Cluster configuration + @type instances: dict; string as key, L{objects.Instance} as value + @param instances: Dictionary, instance name as key, instance object as value + @type owned_groups: iterable of string + @param owned_groups: List of owned groups + @type owned_nodes: iterable of string + @param owned_nodes: List of owned nodes + @type cur_group_uuid: string or None + @param cur_group_uuid: Optional group UUID to check against instance's groups + + """ + for (name, inst) in instances.items(): + assert owned_nodes.issuperset(inst.all_nodes), \ + "Instance %s's nodes changed while we kept the lock" % name + + inst_groups = _CheckInstanceNodeGroups(cfg, name, owned_groups) + + assert cur_group_uuid is None or cur_group_uuid in inst_groups, \ + "Instance %s has no node in group %s" % (name, cur_group_uuid) + + def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups): """Checks if the owned node groups are still correct for an instance. @@ -691,6 +759,118 @@ def _GetUpdatedParams(old_params, update_dict, return params_copy +def _GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False): + """Return the new version of a instance policy. + + @param group_policy: whether this policy applies to a group and thus + we should support removal of policy entries + + """ + use_none = use_default = group_policy + ipolicy = copy.deepcopy(old_ipolicy) + for key, value in new_ipolicy.items(): + if key not in constants.IPOLICY_ALL_KEYS: + raise errors.OpPrereqError("Invalid key in new ipolicy: %s" % key, + errors.ECODE_INVAL) + if key in constants.IPOLICY_ISPECS: + utils.ForceDictType(value, constants.ISPECS_PARAMETER_TYPES) + ipolicy[key] = _GetUpdatedParams(old_ipolicy.get(key, {}), value, + use_none=use_none, + use_default=use_default) + else: + if not value or value == [constants.VALUE_DEFAULT]: + if group_policy: + del ipolicy[key] + else: + raise errors.OpPrereqError("Can't unset ipolicy attribute '%s'" + " on the cluster'" % key, + errors.ECODE_INVAL) + else: + if key in constants.IPOLICY_PARAMETERS: + # FIXME: we assume all such values are float + try: + ipolicy[key] = float(value) + except (TypeError, ValueError), err: + raise errors.OpPrereqError("Invalid value for attribute" + " '%s': '%s', error: %s" % + (key, value, err), errors.ECODE_INVAL) + else: + # FIXME: we assume all others are lists; this should be redone + # in a nicer way + ipolicy[key] = list(value) + try: + objects.InstancePolicy.CheckParameterSyntax(ipolicy) + except errors.ConfigurationError, err: + raise errors.OpPrereqError("Invalid instance policy: %s" % err, + errors.ECODE_INVAL) + return ipolicy + + +def _UpdateAndVerifySubDict(base, updates, type_check): + """Updates and verifies a dict with sub dicts of the same type. + + @param base: The dict with the old data + @param updates: The dict with the new data + @param type_check: Dict suitable to ForceDictType to verify correct types + @returns: A new dict with updated and verified values + + """ + def fn(old, value): + new = _GetUpdatedParams(old, value) + utils.ForceDictType(new, type_check) + return new + + ret = copy.deepcopy(base) + ret.update(dict((key, fn(base.get(key, {}), value)) + for key, value in updates.items())) + return ret + + +def _MergeAndVerifyHvState(op_input, obj_input): + """Combines the hv state from an opcode with the one of the object + + @param op_input: The input dict from the opcode + @param obj_input: The input dict from the objects + @return: The verified and updated dict + + """ + if op_input: + invalid_hvs = set(op_input) - constants.HYPER_TYPES + if invalid_hvs: + raise errors.OpPrereqError("Invalid hypervisor(s) in hypervisor state:" + " %s" % utils.CommaJoin(invalid_hvs), + errors.ECODE_INVAL) + if obj_input is None: + obj_input = {} + type_check = constants.HVSTS_PARAMETER_TYPES + return _UpdateAndVerifySubDict(obj_input, op_input, type_check) + + return None + + +def _MergeAndVerifyDiskState(op_input, obj_input): + """Combines the disk state from an opcode with the one of the object + + @param op_input: The input dict from the opcode + @param obj_input: The input dict from the objects + @return: The verified and updated dict + """ + if op_input: + invalid_dst = set(op_input) - constants.DS_VALID_TYPES + if invalid_dst: + raise errors.OpPrereqError("Invalid storage type(s) in disk state: %s" % + utils.CommaJoin(invalid_dst), + errors.ECODE_INVAL) + type_check = constants.DSS_PARAMETER_TYPES + if obj_input is None: + obj_input = {} + return dict((key, _UpdateAndVerifySubDict(obj_input.get(key, {}), value, + type_check)) + for key, value in op_input.items()) + + return None + + def _ReleaseLocks(lu, level, names=None, keep=None): """Releases locks owned by an LU. @@ -712,12 +892,17 @@ def _ReleaseLocks(lu, level, names=None, keep=None): else: should_release = None - if should_release: + owned = lu.owned_locks(level) + if not owned: + # Not owning any lock at this level, do nothing + pass + + elif should_release: retain = [] release = [] # Determine which locks to release - for name in lu.owned_locks(level): + for name in owned: if should_release(name): release.append(name) else: @@ -753,7 +938,7 @@ def _RunPostHook(lu, node_name): """Runs the post-hook for an opcode on a single node. """ - hm = lu.proc.hmclass(lu.rpc.call_hooks_runner, lu) + hm = lu.proc.BuildHooksManager(lu) try: hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name]) except: @@ -889,20 +1074,191 @@ def _GetClusterDomainSecret(): strict=True) -def _CheckInstanceDown(lu, instance, reason): - """Ensure that an instance is not running.""" - if instance.admin_up: - raise errors.OpPrereqError("Instance %s is marked to be up, %s" % - (instance.name, reason), errors.ECODE_STATE) +def _CheckInstanceState(lu, instance, req_states, msg=None): + """Ensure that an instance is in one of the required states. + + @param lu: the LU on behalf of which we make the check + @param instance: the instance to check + @param msg: if passed, should be a message to replace the default one + @raise errors.OpPrereqError: if the instance is not in the required state + + """ + if msg is None: + msg = "can't use instance from outside %s states" % ", ".join(req_states) + if instance.admin_state not in req_states: + raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" % + (instance.name, instance.admin_state, msg), + errors.ECODE_STATE) + + if constants.ADMINST_UP not in req_states: + pnode = instance.primary_node + ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode] + ins_l.Raise("Can't contact node %s for instance information" % pnode, + prereq=True, ecode=errors.ECODE_ENVIRON) + + if instance.name in ins_l.payload: + raise errors.OpPrereqError("Instance %s is running, %s" % + (instance.name, msg), errors.ECODE_STATE) + + +def _ComputeMinMaxSpec(name, ipolicy, value): + """Computes if value is in the desired range. + + @param name: name of the parameter for which we perform the check + @param ipolicy: dictionary containing min, max and std values + @param value: actual value that we want to use + @return: None or element not meeting the criteria + + + """ + if value in [None, constants.VALUE_AUTO]: + return None + max_v = ipolicy[constants.ISPECS_MAX].get(name, value) + min_v = ipolicy[constants.ISPECS_MIN].get(name, value) + if value > max_v or min_v > value: + return ("%s value %s is not in range [%s, %s]" % + (name, value, min_v, max_v)) + return None + + +def _ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count, + nic_count, disk_sizes, spindle_use, + _compute_fn=_ComputeMinMaxSpec): + """Verifies ipolicy against provided specs. + + @type ipolicy: dict + @param ipolicy: The ipolicy + @type mem_size: int + @param mem_size: The memory size + @type cpu_count: int + @param cpu_count: Used cpu cores + @type disk_count: int + @param disk_count: Number of disks used + @type nic_count: int + @param nic_count: Number of nics used + @type disk_sizes: list of ints + @param disk_sizes: Disk sizes of used disk (len must match C{disk_count}) + @type spindle_use: int + @param spindle_use: The number of spindles this instance uses + @param _compute_fn: The compute function (unittest only) + @return: A list of violations, or an empty list of no violations are found + + """ + assert disk_count == len(disk_sizes) + + test_settings = [ + (constants.ISPEC_MEM_SIZE, mem_size), + (constants.ISPEC_CPU_COUNT, cpu_count), + (constants.ISPEC_DISK_COUNT, disk_count), + (constants.ISPEC_NIC_COUNT, nic_count), + (constants.ISPEC_SPINDLE_USE, spindle_use), + ] + map((lambda d: (constants.ISPEC_DISK_SIZE, d)), disk_sizes) + + return filter(None, + (_compute_fn(name, ipolicy, value) + for (name, value) in test_settings)) + + +def _ComputeIPolicyInstanceViolation(ipolicy, instance, + _compute_fn=_ComputeIPolicySpecViolation): + """Compute if instance meets the specs of ipolicy. + + @type ipolicy: dict + @param ipolicy: The ipolicy to verify against + @type instance: L{objects.Instance} + @param instance: The instance to verify + @param _compute_fn: The function to verify ipolicy (unittest only) + @see: L{_ComputeIPolicySpecViolation} + + """ + mem_size = instance.beparams.get(constants.BE_MAXMEM, None) + cpu_count = instance.beparams.get(constants.BE_VCPUS, None) + spindle_use = instance.beparams.get(constants.BE_SPINDLE_USE, None) + disk_count = len(instance.disks) + disk_sizes = [disk.size for disk in instance.disks] + nic_count = len(instance.nics) + + return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count, + disk_sizes, spindle_use) + + +def _ComputeIPolicyInstanceSpecViolation(ipolicy, instance_spec, + _compute_fn=_ComputeIPolicySpecViolation): + """Compute if instance specs meets the specs of ipolicy. + + @type ipolicy: dict + @param ipolicy: The ipolicy to verify against + @param instance_spec: dict + @param instance_spec: The instance spec to verify + @param _compute_fn: The function to verify ipolicy (unittest only) + @see: L{_ComputeIPolicySpecViolation} + + """ + mem_size = instance_spec.get(constants.ISPEC_MEM_SIZE, None) + cpu_count = instance_spec.get(constants.ISPEC_CPU_COUNT, None) + disk_count = instance_spec.get(constants.ISPEC_DISK_COUNT, 0) + disk_sizes = instance_spec.get(constants.ISPEC_DISK_SIZE, []) + nic_count = instance_spec.get(constants.ISPEC_NIC_COUNT, 0) + spindle_use = instance_spec.get(constants.ISPEC_SPINDLE_USE, None) + + return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count, + disk_sizes, spindle_use) + + +def _ComputeIPolicyNodeViolation(ipolicy, instance, current_group, + target_group, + _compute_fn=_ComputeIPolicyInstanceViolation): + """Compute if instance meets the specs of the new target group. + + @param ipolicy: The ipolicy to verify + @param instance: The instance object to verify + @param current_group: The current group of the instance + @param target_group: The new group of the instance + @param _compute_fn: The function to verify ipolicy (unittest only) + @see: L{_ComputeIPolicySpecViolation} + + """ + if current_group == target_group: + return [] + else: + return _compute_fn(ipolicy, instance) + + +def _CheckTargetNodeIPolicy(lu, ipolicy, instance, node, ignore=False, + _compute_fn=_ComputeIPolicyNodeViolation): + """Checks that the target node is correct in terms of instance policy. + + @param ipolicy: The ipolicy to verify + @param instance: The instance object to verify + @param node: The new node to relocate + @param ignore: Ignore violations of the ipolicy + @param _compute_fn: The function to verify ipolicy (unittest only) + @see: L{_ComputeIPolicySpecViolation} + + """ + primary_node = lu.cfg.GetNodeInfo(instance.primary_node) + res = _compute_fn(ipolicy, instance, primary_node.group, node.group) + + if res: + msg = ("Instance does not meet target node group's (%s) instance" + " policy: %s") % (node.group, utils.CommaJoin(res)) + if ignore: + lu.LogWarning(msg) + else: + raise errors.OpPrereqError(msg, errors.ECODE_INVAL) + - pnode = instance.primary_node - ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode] - ins_l.Raise("Can't contact node %s for instance information" % pnode, - prereq=True, ecode=errors.ECODE_ENVIRON) +def _ComputeNewInstanceViolations(old_ipolicy, new_ipolicy, instances): + """Computes a set of any instances that would violate the new ipolicy. - if instance.name in ins_l.payload: - raise errors.OpPrereqError("Instance %s is running, %s" % - (instance.name, reason), errors.ECODE_STATE) + @param old_ipolicy: The current (still in-place) ipolicy + @param new_ipolicy: The new (to become) ipolicy + @param instances: List of instances to verify + @return: A list of instances which violates the new ipolicy but did not before + + """ + return (_ComputeViolatingInstances(old_ipolicy, instances) - + _ComputeViolatingInstances(new_ipolicy, instances)) def _ExpandItemName(fn, name, kind): @@ -933,7 +1289,7 @@ def _ExpandInstanceName(cfg, name): def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, - memory, vcpus, nics, disk_template, disks, + minmem, maxmem, vcpus, nics, disk_template, disks, bep, hvp, hypervisor_name, tags): """Builds instance related env variables for hooks @@ -947,10 +1303,12 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, @param secondary_nodes: list of secondary nodes as strings @type os_type: string @param os_type: the name of the instance's OS - @type status: boolean - @param status: the should_run status of the instance - @type memory: string - @param memory: the memory size of the instance + @type status: string + @param status: the desired status of the instance + @type minmem: string + @param minmem: the minimum memory size of the instance + @type maxmem: string + @param maxmem: the maximum memory size of the instance @type vcpus: string @param vcpus: the count of VCPUs the instance has @type nics: list @@ -972,23 +1330,21 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, @return: the hook environment for this instance """ - if status: - str_status = "up" - else: - str_status = "down" env = { "OP_TARGET": name, "INSTANCE_NAME": name, "INSTANCE_PRIMARY": primary_node, "INSTANCE_SECONDARIES": " ".join(secondary_nodes), "INSTANCE_OS_TYPE": os_type, - "INSTANCE_STATUS": str_status, - "INSTANCE_MEMORY": memory, + "INSTANCE_STATUS": status, + "INSTANCE_MINMEM": minmem, + "INSTANCE_MAXMEM": maxmem, + # TODO(2.7) remove deprecated "memory" value + "INSTANCE_MEMORY": maxmem, "INSTANCE_VCPUS": vcpus, "INSTANCE_DISK_TEMPLATE": disk_template, "INSTANCE_HYPERVISOR": hypervisor_name, } - if nics: nic_count = len(nics) for idx, (ip, mac, mode, link) in enumerate(nics): @@ -1074,8 +1430,9 @@ def _BuildInstanceHookEnvByObject(lu, instance, override=None): "primary_node": instance.primary_node, "secondary_nodes": instance.secondary_nodes, "os_type": instance.os, - "status": instance.admin_up, - "memory": bep[constants.BE_MEMORY], + "status": instance.admin_state, + "maxmem": bep[constants.BE_MAXMEM], + "minmem": bep[constants.BE_MINMEM], "vcpus": bep[constants.BE_VCPUS], "nics": _NICListToTuple(lu, instance.nics), "disk_template": instance.disk_template, @@ -1117,6 +1474,26 @@ def _DecideSelfPromotion(lu, exceptions=None): return mc_now < mc_should +def _CalculateGroupIPolicy(cluster, group): + """Calculate instance policy for group. + + """ + return cluster.SimpleFillIPolicy(group.ipolicy) + + +def _ComputeViolatingInstances(ipolicy, instances): + """Computes a set of instances who violates given ipolicy. + + @param ipolicy: The ipolicy to verify + @type instances: object.Instance + @param instances: List of instances to verify + @return: A frozenset of instance names violating the ipolicy + + """ + return frozenset([inst.name for inst in instances + if _ComputeIPolicyInstanceViolation(ipolicy, inst)]) + + def _CheckNicsBridgesExist(lu, target_nics, target_node): """Check that the brigdes needed by a list of nics exist. @@ -1204,13 +1581,13 @@ def _GetStorageTypeArgs(cfg, storage_type): return [] -def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq): +def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq): faulty = [] for dev in instance.disks: cfg.SetDiskID(dev, node_name) - result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks) + result = rpc_runner.call_blockdev_getmirrorstatus(node_name, instance.disks) result.Raise("Failed to get disk status from node %s" % node_name, prereq=prereq, ecode=errors.ECODE_ENVIRON) @@ -1350,15 +1727,19 @@ class LUClusterDestroy(LogicalUnit): """Destroys the cluster. """ - master = self.cfg.GetMasterNode() + master_params = self.cfg.GetMasterNetworkParameters() # Run post hooks on master node before it's removed - _RunPostHook(self, master) + _RunPostHook(self, master_params.name) - result = self.rpc.call_node_stop_master(master, False) - result.Raise("Could not disable the master role") + ems = self.cfg.GetUseExternalMipScript() + result = self.rpc.call_node_deactivate_master_ip(master_params.name, + master_params, ems) + if result.fail_msg: + self.LogWarning("Error disabling the master IP address: %s", + result.fail_msg) - return master + return master_params.name def _VerifyCertificate(filename): @@ -1433,39 +1814,6 @@ class _VerifyErrors(object): self.op and self._feedback_fn to be available.) """ - TCLUSTER = "cluster" - TNODE = "node" - TINSTANCE = "instance" - - ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG") - ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT") - ECLUSTERFILECHECK = (TCLUSTER, "ECLUSTERFILECHECK") - ECLUSTERDANGLINGNODES = (TNODE, "ECLUSTERDANGLINGNODES") - ECLUSTERDANGLINGINST = (TNODE, "ECLUSTERDANGLINGINST") - EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE") - EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN") - EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT") - EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK") - EINSTANCEFAULTYDISK = (TINSTANCE, "EINSTANCEFAULTYDISK") - EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE") - EINSTANCESPLITGROUPS = (TINSTANCE, "EINSTANCESPLITGROUPS") - ENODEDRBD = (TNODE, "ENODEDRBD") - ENODEDRBDHELPER = (TNODE, "ENODEDRBDHELPER") - ENODEFILECHECK = (TNODE, "ENODEFILECHECK") - ENODEHOOKS = (TNODE, "ENODEHOOKS") - ENODEHV = (TNODE, "ENODEHV") - ENODELVM = (TNODE, "ENODELVM") - ENODEN1 = (TNODE, "ENODEN1") - ENODENET = (TNODE, "ENODENET") - ENODEOS = (TNODE, "ENODEOS") - ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE") - ENODEORPHANLV = (TNODE, "ENODEORPHANLV") - ENODERPC = (TNODE, "ENODERPC") - ENODESSH = (TNODE, "ENODESSH") - ENODEVERSION = (TNODE, "ENODEVERSION") - ENODESETUP = (TNODE, "ENODESETUP") - ENODETIME = (TNODE, "ENODETIME") - ENODEOOBPATH = (TNODE, "ENODEOOBPATH") ETYPE_FIELD = "code" ETYPE_ERROR = "ERROR" @@ -1481,7 +1829,7 @@ class _VerifyErrors(object): """ ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) - itype, etxt = ecode + itype, etxt, _ = ecode # first complete the msg if args: msg = msg % args @@ -1497,14 +1845,22 @@ class _VerifyErrors(object): # and finally report it via the feedback_fn self._feedback_fn(" - %s" % msg) # Mix-in. pylint: disable=E1101 - def _ErrorIf(self, cond, *args, **kwargs): + def _ErrorIf(self, cond, ecode, *args, **kwargs): """Log an error message if the passed condition is True. """ cond = (bool(cond) or self.op.debug_simulate_errors) # pylint: disable=E1101 + + # If the error code is in the list of ignored errors, demote the error to a + # warning + (_, etxt, _) = ecode + if etxt in self.op.ignore_errors: # pylint: disable=E1101 + kwargs[self.ETYPE_FIELD] = self.ETYPE_WARNING + if cond: - self._Error(*args, **kwargs) + self._Error(ecode, *args, **kwargs) + # do not mark the operation as failed for WARN cases only if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR: self.bad = self.bad or cond @@ -1529,13 +1885,16 @@ class LUClusterVerify(NoHooksLU): groups = self.cfg.GetNodeGroupList() # Verify global configuration - jobs.append([opcodes.OpClusterVerifyConfig()]) + jobs.append([ + opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors) + ]) # Always depend on global verification depends_fn = lambda: [(-len(jobs), [])] jobs.extend([opcodes.OpClusterVerifyGroup(group_name=group, - depends=depends_fn())] + ignore_errors=self.op.ignore_errors, + depends=depends_fn())] for group in groups) # Fix up all parameters @@ -1555,7 +1914,7 @@ class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors): """Verifies the cluster config. """ - REQ_BGL = True + REQ_BGL = False def _VerifyHVP(self, hvp_data): """Verifies locally the syntax of the hypervisor parameters. @@ -1569,16 +1928,20 @@ class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors): utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES) hv_class.CheckParameterSyntax(hv_params) except errors.GenericError, err: - self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err)) + self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err)) def ExpandNames(self): - # Information can be safely retrieved as the BGL is acquired in exclusive - # mode - assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER) + self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET) + self.share_locks = _ShareAll() + + def CheckPrereq(self): + """Check prerequisites. + + """ + # Retrieve all information self.all_group_info = self.cfg.GetAllNodeGroupsInfo() self.all_node_info = self.cfg.GetAllNodesInfo() self.all_inst_info = self.cfg.GetAllInstancesInfo() - self.needed_locks = {} def Exec(self, feedback_fn): """Verify integrity of cluster, performing various test on nodes. @@ -1590,13 +1953,13 @@ class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors): feedback_fn("* Verifying cluster config") for msg in self.cfg.VerifyConfig(): - self._ErrorIf(True, self.ECLUSTERCFG, None, msg) + self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg) feedback_fn("* Verifying cluster certificate files") for cert_filename in constants.ALL_CERT_FILES: (errcode, msg) = _VerifyCertificate(cert_filename) - self._ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode) + self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode) feedback_fn("* Verifying hypervisor parameters") @@ -1628,11 +1991,13 @@ class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors): ["no instances"]))) for node in dangling_nodes] - self._ErrorIf(bool(dangling_nodes), self.ECLUSTERDANGLINGNODES, None, + self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES, + None, "the following nodes (and their instances) belong to a non" " existing group: %s", utils.CommaJoin(pretty_dangling)) - self._ErrorIf(bool(no_node_instances), self.ECLUSTERDANGLINGINST, None, + self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST, + None, "the following instances have a non-existing primary-node:" " %s", utils.CommaJoin(no_node_instances)) @@ -1809,7 +2174,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): # main result, nresult should be a non-empty dict test = not nresult or not isinstance(nresult, dict) - _ErrorIf(test, self.ENODERPC, node, + _ErrorIf(test, constants.CV_ENODERPC, node, "unable to verify node: no data returned") if test: return False @@ -1820,13 +2185,13 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): test = not (remote_version and isinstance(remote_version, (list, tuple)) and len(remote_version) == 2) - _ErrorIf(test, self.ENODERPC, node, + _ErrorIf(test, constants.CV_ENODERPC, node, "connection to node returned invalid data") if test: return False test = local_version != remote_version[0] - _ErrorIf(test, self.ENODEVERSION, node, + _ErrorIf(test, constants.CV_ENODEVERSION, node, "incompatible protocol versions: master %s," " node %s", local_version, remote_version[0]) if test: @@ -1836,7 +2201,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): # full package version self._ErrorIf(constants.RELEASE_VERSION != remote_version[1], - self.ENODEVERSION, node, + constants.CV_ENODEVERSION, node, "software version mismatch: master %s, node %s", constants.RELEASE_VERSION, remote_version[1], code=self.ETYPE_WARNING) @@ -1845,19 +2210,19 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): if ninfo.vm_capable and isinstance(hyp_result, dict): for hv_name, hv_result in hyp_result.iteritems(): test = hv_result is not None - _ErrorIf(test, self.ENODEHV, node, + _ErrorIf(test, constants.CV_ENODEHV, node, "hypervisor %s verify failure: '%s'", hv_name, hv_result) hvp_result = nresult.get(constants.NV_HVPARAMS, None) if ninfo.vm_capable and isinstance(hvp_result, list): for item, hv_name, hv_result in hvp_result: - _ErrorIf(True, self.ENODEHV, node, + _ErrorIf(True, constants.CV_ENODEHV, node, "hypervisor %s parameter verify failure (source %s): %s", hv_name, item, hv_result) test = nresult.get(constants.NV_NODESETUP, ["Missing NODESETUP results"]) - _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s", + _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s", "; ".join(test)) return True @@ -1880,7 +2245,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): try: ntime_merged = utils.MergeTime(ntime) except (ValueError, TypeError): - _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time") + _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time") return if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW): @@ -1890,7 +2255,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): else: ntime_diff = None - _ErrorIf(ntime_diff is not None, self.ENODETIME, node, + _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node, "Node time diverges by at least %s from master node time", ntime_diff) @@ -1912,24 +2277,25 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): # checks vg existence and size > 20G vglist = nresult.get(constants.NV_VGLIST, None) test = not vglist - _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups") + _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups") if not test: vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name, constants.MIN_VG_SIZE) - _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus) + _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus) # check pv names pvlist = nresult.get(constants.NV_PVLIST, None) test = pvlist is None - _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node") + _ErrorIf(test, constants.CV_ENODELVM, node, "Can't get PV list from node") if not test: # check that ':' is not present in PV names, since it's a # special character for lvcreate (denotes the range of PEs to # use on the PV) for _, pvname, owner_vg in pvlist: test = ":" in pvname - _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV" - " '%s' of VG '%s'", pvname, owner_vg) + _ErrorIf(test, constants.CV_ENODELVM, node, + "Invalid character ':' in PV '%s' of VG '%s'", + pvname, owner_vg) def _VerifyNodeBridges(self, ninfo, nresult, bridges): """Check the node bridges. @@ -1948,11 +2314,31 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): missing = nresult.get(constants.NV_BRIDGES, None) test = not isinstance(missing, list) - _ErrorIf(test, self.ENODENET, node, + _ErrorIf(test, constants.CV_ENODENET, node, "did not return valid bridge information") if not test: - _ErrorIf(bool(missing), self.ENODENET, node, "missing bridges: %s" % - utils.CommaJoin(sorted(missing))) + _ErrorIf(bool(missing), constants.CV_ENODENET, node, + "missing bridges: %s" % utils.CommaJoin(sorted(missing))) + + def _VerifyNodeUserScripts(self, ninfo, nresult): + """Check the results of user scripts presence and executability on the node + + @type ninfo: L{objects.Node} + @param ninfo: the node to check + @param nresult: the remote results for the node + + """ + node = ninfo.name + + test = not constants.NV_USERSCRIPTS in nresult + self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node, + "did not return user scripts information") + + broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None) + if not test: + self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node, + "user scripts not present or not executable: %s" % + utils.CommaJoin(sorted(broken_scripts))) def _VerifyNodeNetwork(self, ninfo, nresult): """Check the node network connectivity results. @@ -1966,27 +2352,27 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): _ErrorIf = self._ErrorIf # pylint: disable=C0103 test = constants.NV_NODELIST not in nresult - _ErrorIf(test, self.ENODESSH, node, + _ErrorIf(test, constants.CV_ENODESSH, node, "node hasn't returned node ssh connectivity data") if not test: if nresult[constants.NV_NODELIST]: for a_node, a_msg in nresult[constants.NV_NODELIST].items(): - _ErrorIf(True, self.ENODESSH, node, + _ErrorIf(True, constants.CV_ENODESSH, node, "ssh communication with node '%s': %s", a_node, a_msg) test = constants.NV_NODENETTEST not in nresult - _ErrorIf(test, self.ENODENET, node, + _ErrorIf(test, constants.CV_ENODENET, node, "node hasn't returned node tcp connectivity data") if not test: if nresult[constants.NV_NODENETTEST]: nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys()) for anode in nlist: - _ErrorIf(True, self.ENODENET, node, + _ErrorIf(True, constants.CV_ENODENET, node, "tcp communication with node '%s': %s", anode, nresult[constants.NV_NODENETTEST][anode]) test = constants.NV_MASTERIP not in nresult - _ErrorIf(test, self.ENODENET, node, + _ErrorIf(test, constants.CV_ENODENET, node, "node hasn't returned node master IP reachability data") if not test: if not nresult[constants.NV_MASTERIP]: @@ -1994,7 +2380,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): msg = "the master node cannot reach the master IP (not configured?)" else: msg = "cannot reach the master IP" - _ErrorIf(True, self.ENODENET, node, msg) + _ErrorIf(True, constants.CV_ENODENET, node, msg) def _VerifyInstance(self, instance, instanceconfig, node_image, diskstatus): @@ -2010,6 +2396,10 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): node_vol_should = {} instanceconfig.MapLVsByNode(node_vol_should) + ipolicy = _CalculateGroupIPolicy(self.cfg.GetClusterInfo(), self.group_info) + err = _ComputeIPolicyInstanceViolation(ipolicy, instanceconfig) + _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, err) + for node in node_vol_should: n_img = node_image[node] if n_img.offline or n_img.rpc_fail or n_img.lvm_fail: @@ -2017,13 +2407,13 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): continue for volume in node_vol_should[node]: test = volume not in n_img.volumes - _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance, + _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance, "volume %s missing on node %s", volume, node) - if instanceconfig.admin_up: + if instanceconfig.admin_state == constants.ADMINST_UP: pri_img = node_image[node_current] test = instance not in pri_img.instances and not pri_img.offline - _ErrorIf(test, self.EINSTANCEDOWN, instance, + _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance, "instance not running on its primary node %s", node_current) @@ -2036,13 +2426,14 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): # node here snode = node_image[nname] bad_snode = snode.ghost or snode.offline - _ErrorIf(instanceconfig.admin_up and not success and not bad_snode, - self.EINSTANCEFAULTYDISK, instance, + _ErrorIf(instanceconfig.admin_state == constants.ADMINST_UP and + not success and not bad_snode, + constants.CV_EINSTANCEFAULTYDISK, instance, "couldn't retrieve status for disk/%s on %s: %s", idx, nname, bdev_status) - _ErrorIf((instanceconfig.admin_up and success and - bdev_status.ldisk_status == constants.LDS_FAULTY), - self.EINSTANCEFAULTYDISK, instance, + _ErrorIf((instanceconfig.admin_state == constants.ADMINST_UP and + success and bdev_status.ldisk_status == constants.LDS_FAULTY), + constants.CV_EINSTANCEFAULTYDISK, instance, "disk/%s on %s is faulty", idx, nname) def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved): @@ -2064,7 +2455,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): test = ((node not in node_vol_should or volume not in node_vol_should[node]) and not reserved.Matches(volume)) - self._ErrorIf(test, self.ENODEORPHANLV, node, + self._ErrorIf(test, constants.CV_ENODEORPHANLV, node, "volume %s is unknown", volume) def _VerifyNPlusOneMemory(self, node_image, instance_cfg): @@ -2090,21 +2481,22 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): # infromation from them; we already list instances living on such # nodes, and that's enough warning continue + #TODO(dynmem): also consider ballooning out other instances for prinode, instances in n_img.sbp.items(): needed_mem = 0 for instance in instances: bep = cluster_info.FillBE(instance_cfg[instance]) if bep[constants.BE_AUTO_BALANCE]: - needed_mem += bep[constants.BE_MEMORY] + needed_mem += bep[constants.BE_MINMEM] test = n_img.mfree < needed_mem - self._ErrorIf(test, self.ENODEN1, node, + self._ErrorIf(test, constants.CV_ENODEN1, node, "not enough memory to accomodate instance failovers" " should node %s fail (%dMiB needed, %dMiB available)", prinode, needed_mem, n_img.mfree) @classmethod def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo, - (files_all, files_all_opt, files_mc, files_vm)): + (files_all, files_opt, files_mc, files_vm)): """Verifies file checksums collected from all nodes. @param errorif: Callback for reporting errors @@ -2113,14 +2505,9 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): @param all_nvinfo: RPC results """ - assert (len(files_all | files_all_opt | files_mc | files_vm) == - sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \ - "Found file listed in more than one file list" - # Define functions determining which nodes to consider for a file files2nodefn = [ (files_all, None), - (files_all_opt, None), (files_mc, lambda node: (node.master_candidate or node.name == master_node)), (files_vm, lambda node: node.vm_capable), @@ -2137,7 +2524,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): frozenset(map(operator.attrgetter("name"), filenodes))) for filename in files) - assert set(nodefiles) == (files_all | files_all_opt | files_mc | files_vm) + assert set(nodefiles) == (files_all | files_mc | files_vm) fileinfo = dict((filename, {}) for filename in nodefiles) ignore_nodes = set() @@ -2155,7 +2542,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): node_files = nresult.payload.get(constants.NV_FILELIST, None) test = not (node_files and isinstance(node_files, dict)) - errorif(test, cls.ENODEFILECHECK, node.name, + errorif(test, constants.CV_ENODEFILECHECK, node.name, "Node did not return file checksum data") if test: ignore_nodes.add(node.name) @@ -2179,23 +2566,22 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): # Nodes missing file missing_file = expected_nodes - with_file - if filename in files_all_opt: + if filename in files_opt: # All or no nodes errorif(missing_file and missing_file != expected_nodes, - cls.ECLUSTERFILECHECK, None, + constants.CV_ECLUSTERFILECHECK, None, "File %s is optional, but it must exist on all or no" " nodes (not found on %s)", filename, utils.CommaJoin(utils.NiceSort(missing_file))) else: - # Non-optional files - errorif(missing_file, cls.ECLUSTERFILECHECK, None, + errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None, "File %s is missing from node(s) %s", filename, utils.CommaJoin(utils.NiceSort(missing_file))) # Warn if a node has a file it shouldn't unexpected = with_file - expected_nodes errorif(unexpected, - cls.ECLUSTERFILECHECK, None, + constants.CV_ECLUSTERFILECHECK, None, "File %s should not exist on node(s) %s", filename, utils.CommaJoin(utils.NiceSort(unexpected))) @@ -2209,7 +2595,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): else: variants = [] - errorif(test, cls.ECLUSTERFILECHECK, None, + errorif(test, constants.CV_ECLUSTERFILECHECK, None, "File %s found with %s different checksums (%s)", filename, len(checksums), "; ".join(variants)) @@ -2232,22 +2618,22 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): if drbd_helper: helper_result = nresult.get(constants.NV_DRBDHELPER, None) test = (helper_result == None) - _ErrorIf(test, self.ENODEDRBDHELPER, node, + _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node, "no drbd usermode helper returned") if helper_result: status, payload = helper_result test = not status - _ErrorIf(test, self.ENODEDRBDHELPER, node, + _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node, "drbd usermode helper check unsuccessful: %s", payload) test = status and (payload != drbd_helper) - _ErrorIf(test, self.ENODEDRBDHELPER, node, + _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node, "wrong drbd usermode helper: %s", payload) # compute the DRBD minors node_drbd = {} for minor, instance in drbd_map[node].items(): test = instance not in instanceinfo - _ErrorIf(test, self.ECLUSTERCFG, None, + _ErrorIf(test, constants.CV_ECLUSTERCFG, None, "ghost instance '%s' in temporary DRBD map", instance) # ghost instance should not be running, but otherwise we # don't give double warnings (both ghost instance and @@ -2256,12 +2642,13 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): node_drbd[minor] = (instance, False) else: instance = instanceinfo[instance] - node_drbd[minor] = (instance.name, instance.admin_up) + node_drbd[minor] = (instance.name, + instance.admin_state == constants.ADMINST_UP) # and now check them used_minors = nresult.get(constants.NV_DRBDLIST, []) test = not isinstance(used_minors, (tuple, list)) - _ErrorIf(test, self.ENODEDRBD, node, + _ErrorIf(test, constants.CV_ENODEDRBD, node, "cannot parse drbd status file: %s", str(used_minors)) if test: # we cannot check drbd status @@ -2269,11 +2656,11 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): for minor, (iname, must_exist) in node_drbd.items(): test = minor not in used_minors and must_exist - _ErrorIf(test, self.ENODEDRBD, node, + _ErrorIf(test, constants.CV_ENODEDRBD, node, "drbd minor %d of instance %s is not active", minor, iname) for minor in used_minors: test = minor not in node_drbd - _ErrorIf(test, self.ENODEDRBD, node, + _ErrorIf(test, constants.CV_ENODEDRBD, node, "unallocated drbd minor %d is in use", minor) def _UpdateNodeOS(self, ninfo, nresult, nimg): @@ -2293,7 +2680,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): not compat.all(isinstance(v, list) and len(v) == 7 for v in remote_os)) - _ErrorIf(test, self.ENODEOS, node, + _ErrorIf(test, constants.CV_ENODEOS, node, "node hasn't returned valid OS data") nimg.os_fail = test @@ -2335,14 +2722,14 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): for os_name, os_data in nimg.oslist.items(): assert os_data, "Empty OS status for OS %s?!" % os_name f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0] - _ErrorIf(not f_status, self.ENODEOS, node, + _ErrorIf(not f_status, constants.CV_ENODEOS, node, "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag) - _ErrorIf(len(os_data) > 1, self.ENODEOS, node, + _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node, "OS '%s' has multiple entries (first one shadows the rest): %s", os_name, utils.CommaJoin([v[0] for v in os_data])) # comparisons with the 'base' image test = os_name not in base.oslist - _ErrorIf(test, self.ENODEOS, node, + _ErrorIf(test, constants.CV_ENODEOS, node, "Extra OS %s not present on reference node (%s)", os_name, base.name) if test: @@ -2356,14 +2743,14 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): ("variants list", f_var, b_var), ("parameters", beautify_params(f_param), beautify_params(b_param))]: - _ErrorIf(a != b, self.ENODEOS, node, + _ErrorIf(a != b, constants.CV_ENODEOS, node, "OS %s for %s differs from reference node %s: [%s] vs. [%s]", kind, os_name, base.name, utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b))) # check any missing OSes missing = set(base.oslist.keys()).difference(nimg.oslist.keys()) - _ErrorIf(missing, self.ENODEOS, node, + _ErrorIf(missing, constants.CV_ENODEOS, node, "OSes present on reference node %s but missing on this node: %s", base.name, utils.CommaJoin(missing)) @@ -2381,7 +2768,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): if ((ninfo.master_candidate or ninfo.master_capable) and constants.NV_OOB_PATHS in nresult): for path_result in nresult[constants.NV_OOB_PATHS]: - self._ErrorIf(path_result, self.ENODEOOBPATH, node, path_result) + self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result) def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name): """Verifies and updates the node volume data. @@ -2404,10 +2791,11 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): if vg_name is None: pass elif isinstance(lvdata, basestring): - _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s", + _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s", utils.SafeEncode(lvdata)) elif not isinstance(lvdata, dict): - _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)") + _ErrorIf(True, constants.CV_ENODELVM, node, + "rpc call to node failed (lvlist)") else: nimg.volumes = lvdata nimg.lvm_fail = False @@ -2427,8 +2815,9 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): """ idata = nresult.get(constants.NV_INSTANCELIST, None) test = not isinstance(idata, list) - self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed" - " (instancelist): %s", utils.SafeEncode(str(idata))) + self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name, + "rpc call to node failed (instancelist): %s", + utils.SafeEncode(str(idata))) if test: nimg.hyp_fail = True else: @@ -2450,26 +2839,27 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): # try to read free memory (from the hypervisor) hv_info = nresult.get(constants.NV_HVINFO, None) test = not isinstance(hv_info, dict) or "memory_free" not in hv_info - _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)") + _ErrorIf(test, constants.CV_ENODEHV, node, + "rpc call to node failed (hvinfo)") if not test: try: nimg.mfree = int(hv_info["memory_free"]) except (ValueError, TypeError): - _ErrorIf(True, self.ENODERPC, node, + _ErrorIf(True, constants.CV_ENODERPC, node, "node returned invalid nodeinfo, check hypervisor") # FIXME: devise a free space model for file based instances as well if vg_name is not None: test = (constants.NV_VGLIST not in nresult or vg_name not in nresult[constants.NV_VGLIST]) - _ErrorIf(test, self.ENODELVM, node, + _ErrorIf(test, constants.CV_ENODELVM, node, "node didn't return data for the volume group '%s'" " - it is either missing or broken", vg_name) if not test: try: nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name]) except (ValueError, TypeError): - _ErrorIf(True, self.ENODERPC, node, + _ErrorIf(True, constants.CV_ENODERPC, node, "node returned invalid LVM info, check LVM status") def _CollectDiskInfo(self, nodelist, node_image, instanceinfo): @@ -2536,7 +2926,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): data = len(disks) * [(False, "node offline")] else: msg = nres.fail_msg - _ErrorIf(msg, self.ENODERPC, nname, + _ErrorIf(msg, constants.CV_ENODERPC, nname, "while getting disk information: %s", msg) if msg: # No data from this node @@ -2651,6 +3041,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): i_non_redundant = [] # Non redundant instances i_non_a_balanced = [] # Non auto-balanced instances + i_offline = 0 # Count of offline instances n_offline = 0 # Count of offline nodes n_drained = 0 # Count of nodes being drained node_vol_should = {} @@ -2666,6 +3057,10 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names)) + user_scripts = [] + if self.cfg.GetUseExternalMipScript(): + user_scripts.append(constants.EXTERNAL_MASTER_SETUP_SCRIPT) + node_verify_param = { constants.NV_FILELIST: utils.UniqueSequence(filename @@ -2688,6 +3083,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): constants.NV_MASTERIP: (master_node, master_ip), constants.NV_OSLIST: None, constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(), + constants.NV_USERSCRIPTS: user_scripts, } if vg_name is not None: @@ -2835,7 +3231,8 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): feedback_fn("* Verifying node %s (%s)" % (node, ntype)) msg = all_nvinfo[node].fail_msg - _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg) + _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s", + msg) if msg: nimg.rpc_fail = True continue @@ -2845,6 +3242,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): nimg.call_ok = self._VerifyNode(node_i, nresult) self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime) self._VerifyNodeNetwork(node_i, nresult) + self._VerifyNodeUserScripts(node_i, nresult) self._VerifyOob(node_i, nresult) if nimg.vm_capable: @@ -2869,10 +3267,16 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): non_primary_inst = set(nimg.instances).difference(nimg.pinst) for inst in non_primary_inst: + # FIXME: investigate best way to handle offline insts + if inst.admin_state == constants.ADMINST_OFFLINE: + if verbose: + feedback_fn("* Skipping offline instance %s" % inst.name) + i_offline += 1 + continue test = inst in self.all_inst_info - _ErrorIf(test, self.EINSTANCEWRONGNODE, inst, + _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst, "instance should not run on node %s", node_i.name) - _ErrorIf(not test, self.ENODEORPHANINSTANCE, node_i.name, + _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name, "node is running unknown instance %s", inst) for node, result in extra_lv_nvinfo.items(): @@ -2891,11 +3295,12 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): pnode = inst_config.primary_node pnode_img = node_image[pnode] _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline, - self.ENODERPC, pnode, "instance %s, connection to" + constants.CV_ENODERPC, pnode, "instance %s, connection to" " primary node failed", instance) - _ErrorIf(inst_config.admin_up and pnode_img.offline, - self.EINSTANCEBADNODE, instance, + _ErrorIf(inst_config.admin_state == constants.ADMINST_UP and + pnode_img.offline, + constants.CV_EINSTANCEBADNODE, instance, "instance is marked as running and lives on offline node %s", inst_config.primary_node) @@ -2907,7 +3312,8 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): if not inst_config.secondary_nodes: i_non_redundant.append(instance) - _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT, + _ErrorIf(len(inst_config.secondary_nodes) > 1, + constants.CV_EINSTANCELAYOUT, instance, "instance has multiple secondary nodes: %s", utils.CommaJoin(inst_config.secondary_nodes), code=self.ETYPE_WARNING) @@ -2928,7 +3334,8 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): key=lambda (_, nodes): pnode in nodes, reverse=True)] - self._ErrorIf(len(instance_groups) > 1, self.EINSTANCESPLITGROUPS, + self._ErrorIf(len(instance_groups) > 1, + constants.CV_EINSTANCESPLITGROUPS, instance, "instance has primary and secondary nodes in" " different groups: %s", utils.CommaJoin(pretty_list), code=self.ETYPE_WARNING) @@ -2938,21 +3345,22 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): for snode in inst_config.secondary_nodes: s_img = node_image[snode] - _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode, - "instance %s, connection to secondary node failed", instance) + _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC, + snode, "instance %s, connection to secondary node failed", + instance) if s_img.offline: inst_nodes_offline.append(snode) # warn that the instance lives on offline nodes - _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance, + _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance, "instance has offline secondary node(s) %s", utils.CommaJoin(inst_nodes_offline)) # ... or ghost/non-vm_capable nodes for node in inst_config.all_nodes: - _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance, - "instance lives on ghost node %s", node) - _ErrorIf(not node_image[node].vm_capable, self.EINSTANCEBADNODE, + _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE, + instance, "instance lives on ghost node %s", node) + _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE, instance, "instance lives on non-vm_capable node %s", node) feedback_fn("* Verifying orphan volumes") @@ -2983,6 +3391,9 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found." % len(i_non_a_balanced)) + if i_offline: + feedback_fn(" - NOTICE: %d offline instance(s) found." % i_offline) + if n_offline: feedback_fn(" - NOTICE: %d offline node(s) found." % n_offline) @@ -3020,7 +3431,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): res = hooks_results[node_name] msg = res.fail_msg test = msg and not res.offline - self._ErrorIf(test, self.ENODEHOOKS, node_name, + self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name, "Communication failure in hooks execution: %s", msg) if res.offline or msg: # No need to investigate payload if node is offline or gave @@ -3028,7 +3439,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): continue for script, hkr, output in res.payload: test = hkr == constants.HKR_FAIL - self._ErrorIf(test, self.ENODEHOOKS, node_name, + self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name, "Script %s failed, output:", script) if test: output = self._HOOKS_INDENT_RE.sub(" ", output) @@ -3121,15 +3532,8 @@ class LUGroupVerifyDisks(NoHooksLU): self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances)) # Check if node groups for locked instances are still correct - for (instance_name, inst) in self.instances.items(): - assert owned_nodes.issuperset(inst.all_nodes), \ - "Instance %s's nodes changed while we kept the lock" % instance_name - - inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name, - owned_groups) - - assert self.group_uuid in inst_groups, \ - "Instance %s has no node in group %s" % (instance_name, self.group_uuid) + _CheckInstancesNodeGroups(self.cfg, self.instances, + owned_groups, owned_nodes, self.group_uuid) def Exec(self, feedback_fn): """Verify integrity of cluster disks. @@ -3145,8 +3549,8 @@ class LUGroupVerifyDisks(NoHooksLU): res_missing = {} nv_dict = _MapInstanceDisksToNodes([inst - for inst in self.instances.values() - if inst.admin_up]) + for inst in self.instances.values() + if inst.admin_state == constants.ADMINST_UP]) if nv_dict: nodes = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) & @@ -3187,24 +3591,24 @@ class LUClusterRepairDiskSizes(NoHooksLU): if self.op.instances: self.wanted_names = _GetWantedInstances(self, self.op.instances) self.needed_locks = { - locking.LEVEL_NODE: [], + locking.LEVEL_NODE_RES: [], locking.LEVEL_INSTANCE: self.wanted_names, } - self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE else: self.wanted_names = None self.needed_locks = { - locking.LEVEL_NODE: locking.ALL_SET, + locking.LEVEL_NODE_RES: locking.ALL_SET, locking.LEVEL_INSTANCE: locking.ALL_SET, } self.share_locks = { - locking.LEVEL_NODE: 1, + locking.LEVEL_NODE_RES: 1, locking.LEVEL_INSTANCE: 0, } def DeclareLocks(self, level): - if level == locking.LEVEL_NODE and self.wanted_names is not None: - self._LockInstancesNodes(primary_only=True) + if level == locking.LEVEL_NODE_RES and self.wanted_names is not None: + self._LockInstancesNodes(primary_only=True, level=level) def CheckPrereq(self): """Check prerequisites. @@ -3255,6 +3659,11 @@ class LUClusterRepairDiskSizes(NoHooksLU): for idx, disk in enumerate(instance.disks): per_node_disks[pnode].append((instance, idx, disk)) + assert not (frozenset(per_node_disks.keys()) - + self.owned_locks(locking.LEVEL_NODE_RES)), \ + "Not owning correct locks" + assert not self.owned_locks(locking.LEVEL_NODE) + changed = [] for node, dskl in per_node_disks.items(): newl = [v[2].Copy() for v in dskl] @@ -3344,29 +3753,33 @@ class LUClusterRename(LogicalUnit): """ clustername = self.op.name - ip = self.ip + new_ip = self.ip # shutdown the master IP - master = self.cfg.GetMasterNode() - result = self.rpc.call_node_stop_master(master, False) + master_params = self.cfg.GetMasterNetworkParameters() + ems = self.cfg.GetUseExternalMipScript() + result = self.rpc.call_node_deactivate_master_ip(master_params.name, + master_params, ems) result.Raise("Could not disable the master role") try: cluster = self.cfg.GetClusterInfo() cluster.cluster_name = clustername - cluster.master_ip = ip + cluster.master_ip = new_ip self.cfg.Update(cluster, feedback_fn) # update the known hosts file ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE) node_list = self.cfg.GetOnlineNodeList() try: - node_list.remove(master) + node_list.remove(master_params.name) except ValueError: pass _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE) finally: - result = self.rpc.call_node_start_master(master, False, False) + master_params.ip = new_ip + result = self.rpc.call_node_activate_master_ip(master_params.name, + master_params, ems) msg = result.fail_msg if msg: self.LogWarning("Could not re-enable the master role on" @@ -3375,6 +3788,27 @@ class LUClusterRename(LogicalUnit): return clustername +def _ValidateNetmask(cfg, netmask): + """Checks if a netmask is valid. + + @type cfg: L{config.ConfigWriter} + @param cfg: The cluster configuration + @type netmask: int + @param netmask: the netmask to be verified + @raise errors.OpPrereqError: if the validation fails + + """ + ip_family = cfg.GetPrimaryIPFamily() + try: + ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family) + except errors.ProgrammerError: + raise errors.OpPrereqError("Invalid primary ip family: %s." % + ip_family) + if not ipcls.ValidateNetmask(netmask): + raise errors.OpPrereqError("CIDR netmask (%s) not valid" % + (netmask)) + + class LUClusterSetParams(LogicalUnit): """Change the parameters of the cluster. @@ -3396,13 +3830,26 @@ class LUClusterSetParams(LogicalUnit): if self.op.remove_uids: uidpool.CheckUidPool(self.op.remove_uids) + if self.op.master_netmask is not None: + _ValidateNetmask(self.cfg, self.op.master_netmask) + + if self.op.diskparams: + for dt_params in self.op.diskparams.values(): + utils.ForceDictType(dt_params, constants.DISK_DT_TYPES) + def ExpandNames(self): # FIXME: in the future maybe other cluster params won't require checking on # all nodes to be modified. self.needed_locks = { locking.LEVEL_NODE: locking.ALL_SET, + locking.LEVEL_INSTANCE: locking.ALL_SET, + locking.LEVEL_NODEGROUP: locking.ALL_SET, + } + self.share_locks = { + locking.LEVEL_NODE: 1, + locking.LEVEL_INSTANCE: 1, + locking.LEVEL_NODEGROUP: 1, } - self.share_locks[locking.LEVEL_NODE] = 1 def BuildHooksEnv(self): """Build hooks env. @@ -3477,6 +3924,7 @@ class LUClusterSetParams(LogicalUnit): self.cluster = cluster = self.cfg.GetClusterInfo() # validate params changes if self.op.beparams: + objects.UpgradeBeParams(self.op.beparams) utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES) self.new_beparams = cluster.SimpleFillBE(self.op.beparams) @@ -3490,6 +3938,42 @@ class LUClusterSetParams(LogicalUnit): self.new_ndparams["oob_program"] = \ constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM] + if self.op.hv_state: + new_hv_state = _MergeAndVerifyHvState(self.op.hv_state, + self.cluster.hv_state_static) + self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values)) + for hv, values in new_hv_state.items()) + + if self.op.disk_state: + new_disk_state = _MergeAndVerifyDiskState(self.op.disk_state, + self.cluster.disk_state_static) + self.new_disk_state = \ + dict((storage, dict((name, cluster.SimpleFillDiskState(values)) + for name, values in svalues.items())) + for storage, svalues in new_disk_state.items()) + + if self.op.ipolicy: + self.new_ipolicy = _GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy, + group_policy=False) + + all_instances = self.cfg.GetAllInstancesInfo().values() + violations = set() + for group in self.cfg.GetAllNodeGroupsInfo().values(): + instances = frozenset([inst for inst in all_instances + if compat.any(node in group.members + for node in inst.all_nodes)]) + new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy) + new = _ComputeNewInstanceViolations(_CalculateGroupIPolicy(cluster, + group), + new_ipolicy, instances) + if new: + violations.update(new) + + if violations: + self.LogWarning("After the ipolicy change the following instances" + " violate them: %s", + utils.CommaJoin(violations)) + if self.op.nicparams: utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES) self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams) @@ -3527,6 +4011,15 @@ class LUClusterSetParams(LogicalUnit): else: self.new_hvparams[hv_name].update(hv_dict) + # disk template parameters + self.new_diskparams = objects.FillDict(cluster.diskparams, {}) + if self.op.diskparams: + for dt_name, dt_params in self.op.diskparams.items(): + if dt_name not in self.op.diskparams: + self.new_diskparams[dt_name] = dt_params + else: + self.new_diskparams[dt_name].update(dt_params) + # os hypervisor parameters self.new_os_hvp = objects.FillDict(cluster.os_hvp, {}) if self.op.os_hvp: @@ -3641,10 +4134,18 @@ class LUClusterSetParams(LogicalUnit): self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams if self.op.nicparams: self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams + if self.op.ipolicy: + self.cluster.ipolicy = self.new_ipolicy if self.op.osparams: self.cluster.osparams = self.new_osp if self.op.ndparams: self.cluster.ndparams = self.new_ndparams + if self.op.diskparams: + self.cluster.diskparams = self.new_diskparams + if self.op.hv_state: + self.cluster.hv_state_static = self.new_hv_state + if self.op.disk_state: + self.cluster.disk_state_static = self.new_disk_state if self.op.candidate_pool_size is not None: self.cluster.candidate_pool_size = self.op.candidate_pool_size @@ -3652,6 +4153,9 @@ class LUClusterSetParams(LogicalUnit): _AdjustCandidatePool(self, []) if self.op.maintain_node_health is not None: + if self.op.maintain_node_health and not constants.ENABLE_CONFD: + feedback_fn("Note: CONFD was disabled at build time, node health" + " maintenance is not useful (still enabling it)") self.cluster.maintain_node_health = self.op.maintain_node_health if self.op.prealloc_wipe_disks is not None: @@ -3672,6 +4176,9 @@ class LUClusterSetParams(LogicalUnit): if self.op.reserved_lvs is not None: self.cluster.reserved_lvs = self.op.reserved_lvs + if self.op.use_external_mip_script is not None: + self.cluster.use_external_mip_script = self.op.use_external_mip_script + def helper_os(aname, mods, desc): desc += " OS list" lst = getattr(self.cluster, aname) @@ -3696,21 +4203,40 @@ class LUClusterSetParams(LogicalUnit): helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted") if self.op.master_netdev: - master = self.cfg.GetMasterNode() + master_params = self.cfg.GetMasterNetworkParameters() + ems = self.cfg.GetUseExternalMipScript() feedback_fn("Shutting down master ip on the current netdev (%s)" % self.cluster.master_netdev) - result = self.rpc.call_node_stop_master(master, False) + result = self.rpc.call_node_deactivate_master_ip(master_params.name, + master_params, ems) result.Raise("Could not disable the master ip") feedback_fn("Changing master_netdev from %s to %s" % - (self.cluster.master_netdev, self.op.master_netdev)) + (master_params.netdev, self.op.master_netdev)) self.cluster.master_netdev = self.op.master_netdev + if self.op.master_netmask: + master_params = self.cfg.GetMasterNetworkParameters() + feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask) + result = self.rpc.call_node_change_master_netmask(master_params.name, + master_params.netmask, + self.op.master_netmask, + master_params.ip, + master_params.netdev) + if result.fail_msg: + msg = "Could not change the master IP netmask: %s" % result.fail_msg + feedback_fn(msg) + + self.cluster.master_netmask = self.op.master_netmask + self.cfg.Update(self.cluster, feedback_fn) if self.op.master_netdev: + master_params = self.cfg.GetMasterNetworkParameters() feedback_fn("Starting the master ip on the new master netdev (%s)" % self.op.master_netdev) - result = self.rpc.call_node_start_master(master, False, False) + ems = self.cfg.GetUseExternalMipScript() + result = self.rpc.call_node_activate_master_ip(master_params.name, + master_params, ems) if result.fail_msg: self.LogWarning("Could not re-enable the master ip on" " the master, please restart manually: %s", @@ -3743,6 +4269,9 @@ def _ComputeAncillaryFiles(cluster, redist): constants.SSH_KNOWN_HOSTS_FILE, constants.CONFD_HMAC_KEY, constants.CLUSTER_DOMAIN_SECRET_FILE, + constants.SPICE_CERT_FILE, + constants.SPICE_CACERT_FILE, + constants.RAPI_USERS_FILE, ]) if not redist: @@ -3755,27 +4284,43 @@ def _ComputeAncillaryFiles(cluster, redist): if cluster.modify_etc_hosts: files_all.add(constants.ETC_HOSTS) - # Files which must either exist on all nodes or on none - files_all_opt = set([ + # Files which are optional, these must: + # - be present in one other category as well + # - either exist or not exist on all nodes of that category (mc, vm all) + files_opt = set([ constants.RAPI_USERS_FILE, ]) # Files which should only be on master candidates files_mc = set() + if not redist: files_mc.add(constants.CLUSTER_CONF_FILE) + # FIXME: this should also be replicated but Ganeti doesn't support files_mc + # replication + files_mc.add(constants.DEFAULT_MASTER_SETUP_SCRIPT) + # Files which should only be on VM-capable nodes files_vm = set(filename for hv_name in cluster.enabled_hypervisors - for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()) + for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[0]) + + files_opt |= set(filename + for hv_name in cluster.enabled_hypervisors + for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[1]) - # Filenames must be unique - assert (len(files_all | files_all_opt | files_mc | files_vm) == - sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \ + # Filenames in each category must be unique + all_files_set = files_all | files_mc | files_vm + assert (len(all_files_set) == + sum(map(len, [files_all, files_mc, files_vm]))), \ "Found file listed in more than one file list" - return (files_all, files_all_opt, files_mc, files_vm) + # Optional files must be present in one other category + assert all_files_set.issuperset(files_opt), \ + "Optional file not in a different required list" + + return (files_all, files_opt, files_mc, files_vm) def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True): @@ -3809,7 +4354,7 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True): nodelist.remove(master_info.name) # Gather file lists - (files_all, files_all_opt, files_mc, files_vm) = \ + (files_all, _, files_mc, files_vm) = \ _ComputeAncillaryFiles(cluster, True) # Never re-distribute configuration file from here @@ -3819,7 +4364,6 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True): filemap = [ (online_nodes, files_all), - (online_nodes, files_all_opt), (vm_nodes, files_vm), ] @@ -3851,6 +4395,36 @@ class LUClusterRedistConf(NoHooksLU): _RedistributeAncillaryFiles(self) +class LUClusterActivateMasterIp(NoHooksLU): + """Activate the master IP on the master node. + + """ + def Exec(self, feedback_fn): + """Activate the master IP. + + """ + master_params = self.cfg.GetMasterNetworkParameters() + ems = self.cfg.GetUseExternalMipScript() + result = self.rpc.call_node_activate_master_ip(master_params.name, + master_params, ems) + result.Raise("Could not activate the master IP") + + +class LUClusterDeactivateMasterIp(NoHooksLU): + """Deactivate the master IP on the master node. + + """ + def Exec(self, feedback_fn): + """Deactivate the master IP. + + """ + master_params = self.cfg.GetMasterNetworkParameters() + ems = self.cfg.GetUseExternalMipScript() + result = self.rpc.call_node_deactivate_master_ip(master_params.name, + master_params, ems) + result.Raise("Could not deactivate the master IP") + + def _WaitForSync(lu, instance, disks=None, oneshot=False): """Sleep and poll for an instance's disk to sync. @@ -3926,7 +4500,7 @@ def _WaitForSync(lu, instance, disks=None, oneshot=False): return not cumul_degraded -def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False): +def _CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False): """Check that mirrors are not degraded. The ldisk parameter, if True, will change the test from the @@ -3955,7 +4529,8 @@ def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False): if dev.children: for child in dev.children: - result = result and _CheckDiskConsistency(lu, child, node, on_primary) + result = result and _CheckDiskConsistency(lu, instance, child, node, + on_primary) return result @@ -3964,7 +4539,7 @@ class LUOobCommand(NoHooksLU): """Logical unit for OOB handling. """ - REG_BGL = False + REQ_BGL = False _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE) def ExpandNames(self): @@ -4311,9 +4886,6 @@ class LUNodeRemove(LogicalUnit): def BuildHooksEnv(self): """Build hooks env. - This doesn't run on the target node in the pre phase as a failed - node would then be impossible to remove. - """ return { "OP_TARGET": self.op.node_name, @@ -4323,13 +4895,15 @@ class LUNodeRemove(LogicalUnit): def BuildHooksNodes(self): """Build hooks nodes. + This doesn't run on the target node in the pre phase as a failed + node would then be impossible to remove. + """ all_nodes = self.cfg.GetNodeList() try: all_nodes.remove(self.op.node_name) except ValueError: - logging.warning("Node '%s', which is about to be removed, was not found" - " in the list of all nodes", self.op.node_name) + pass return (all_nodes, all_nodes) def CheckPrereq(self): @@ -4370,6 +4944,9 @@ class LUNodeRemove(LogicalUnit): modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup + assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ + "Not owning BGL" + # Promote nodes to master candidate as needed _AdjustCandidatePool(self, exceptions=[node.name]) self.context.RemoveNode(node.name) @@ -4428,9 +5005,9 @@ class _NodeQuery(_QueryBase): # filter out non-vm_capable nodes toquery_nodes = [name for name in nodenames if all_info[name].vm_capable] - node_data = lu.rpc.call_node_info(toquery_nodes, lu.cfg.GetVGName(), - lu.cfg.GetHypervisorType()) - live_data = dict((name, nresult.payload) + node_data = lu.rpc.call_node_info(toquery_nodes, [lu.cfg.GetVGName()], + [lu.cfg.GetHypervisorType()]) + live_data = dict((name, _MakeLegacyNodeInfo(nresult.payload)) for (name, nresult) in node_data.items() if not nresult.fail_msg and nresult.payload) else: @@ -4483,6 +5060,9 @@ class LUNodeQuery(NoHooksLU): def ExpandNames(self): self.nq.ExpandNames(self) + def DeclareLocks(self, level): + self.nq.DeclareLocks(self, level) + def Exec(self, feedback_fn): return self.nq.OldStyleQuery(self) @@ -4501,8 +5081,9 @@ class LUNodeQueryvols(NoHooksLU): selected=self.op.output_fields) def ExpandNames(self): + self.share_locks = _ShareAll() self.needed_locks = {} - self.share_locks[locking.LEVEL_NODE] = 1 + if not self.op.nodes: self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET else: @@ -4569,8 +5150,8 @@ class LUNodeQueryStorage(NoHooksLU): selected=self.op.output_fields) def ExpandNames(self): + self.share_locks = _ShareAll() self.needed_locks = {} - self.share_locks[locking.LEVEL_NODE] = 1 if self.op.nodes: self.needed_locks[locking.LEVEL_NODE] = \ @@ -4781,7 +5362,7 @@ class LUQuery(NoHooksLU): def CheckArguments(self): qcls = _GetQueryImplementation(self.op.what) - self.impl = qcls(self.op.filter, self.op.fields, self.op.use_locking) + self.impl = qcls(self.op.qfilter, self.op.fields, self.op.use_locking) def ExpandNames(self): self.impl.ExpandNames(self) @@ -5029,6 +5610,25 @@ class LUNodeAdd(LogicalUnit): if self.op.ndparams: utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES) + if self.op.hv_state: + self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state, None) + + if self.op.disk_state: + self.new_disk_state = _MergeAndVerifyDiskState(self.op.disk_state, None) + + # TODO: If we need to have multiple DnsOnlyRunner we probably should make + # it a property on the base class. + result = rpc.DnsOnlyRunner().call_version([node])[node] + result.Raise("Can't get version information from node %s" % node) + if constants.PROTOCOL_VERSION == result.payload: + logging.info("Communication to node %s fine, sw version %s match", + node, result.payload) + else: + raise errors.OpPrereqError("Version mismatch master version %s," + " node version %s" % + (constants.PROTOCOL_VERSION, result.payload), + errors.ECODE_ENVIRON) + def Exec(self, feedback_fn): """Adds the new node to the cluster. @@ -5036,6 +5636,9 @@ class LUNodeAdd(LogicalUnit): new_node = self.new_node node = new_node.name + assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ + "Not owning BGL" + # We adding a new node so we assume it's powered new_node.powered = True @@ -5064,16 +5667,11 @@ class LUNodeAdd(LogicalUnit): else: new_node.ndparams = {} - # check connectivity - result = self.rpc.call_version([node])[node] - result.Raise("Can't get version information from node %s" % node) - if constants.PROTOCOL_VERSION == result.payload: - logging.info("Communication to node %s fine, sw version %s match", - node, result.payload) - else: - raise errors.OpExecError("Version mismatch master version %s," - " node version %s" % - (constants.PROTOCOL_VERSION, result.payload)) + if self.op.hv_state: + new_node.hv_state_static = self.new_hv_state + + if self.op.disk_state: + new_node.disk_state_static = self.new_disk_state # Add node to our /etc/hosts, and add key to known_hosts if self.cfg.GetClusterInfo().modify_etc_hosts: @@ -5150,7 +5748,8 @@ class LUNodeSetParams(LogicalUnit): self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) all_mods = [self.op.offline, self.op.master_candidate, self.op.drained, self.op.master_capable, self.op.vm_capable, - self.op.secondary_ip, self.op.ndparams] + self.op.secondary_ip, self.op.ndparams, self.op.hv_state, + self.op.disk_state] if all_mods.count(None) == len(all_mods): raise errors.OpPrereqError("Please pass at least one modification", errors.ECODE_INVAL) @@ -5174,35 +5773,32 @@ class LUNodeSetParams(LogicalUnit): self.lock_all = self.op.auto_promote and self.might_demote self.lock_instances = self.op.secondary_ip is not None + def _InstanceFilter(self, instance): + """Filter for getting affected instances. + + """ + return (instance.disk_template in constants.DTS_INT_MIRROR and + self.op.node_name in instance.all_nodes) + def ExpandNames(self): if self.lock_all: self.needed_locks = {locking.LEVEL_NODE: locking.ALL_SET} else: self.needed_locks = {locking.LEVEL_NODE: self.op.node_name} - if self.lock_instances: - self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET + # Since modifying a node can have severe effects on currently running + # operations the resource lock is at least acquired in shared mode + self.needed_locks[locking.LEVEL_NODE_RES] = \ + self.needed_locks[locking.LEVEL_NODE] - def DeclareLocks(self, level): - # If we have locked all instances, before waiting to lock nodes, release - # all the ones living on nodes unrelated to the current operation. - if level == locking.LEVEL_NODE and self.lock_instances: - self.affected_instances = [] - if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET: - instances_keep = [] - - # Build list of instances to release - locked_i = self.owned_locks(locking.LEVEL_INSTANCE) - for instance_name, instance in self.cfg.GetMultiInstanceInfo(locked_i): - if (instance.disk_template in constants.DTS_INT_MIRROR and - self.op.node_name in instance.all_nodes): - instances_keep.append(instance_name) - self.affected_instances.append(instance) - - _ReleaseLocks(self, locking.LEVEL_INSTANCE, keep=instances_keep) - - assert (set(self.owned_locks(locking.LEVEL_INSTANCE)) == - set(instances_keep)) + # Get node resource and instance locks in shared mode; they are not used + # for anything but read-only access + self.share_locks[locking.LEVEL_NODE_RES] = 1 + self.share_locks[locking.LEVEL_INSTANCE] = 1 + + if self.lock_instances: + self.needed_locks[locking.LEVEL_INSTANCE] = \ + frozenset(self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)) def BuildHooksEnv(self): """Build hooks env. @@ -5234,6 +5830,25 @@ class LUNodeSetParams(LogicalUnit): """ node = self.node = self.cfg.GetNodeInfo(self.op.node_name) + if self.lock_instances: + affected_instances = \ + self.cfg.GetInstancesInfoByFilter(self._InstanceFilter) + + # Verify instance locks + owned_instances = self.owned_locks(locking.LEVEL_INSTANCE) + wanted_instances = frozenset(affected_instances.keys()) + if wanted_instances - owned_instances: + raise errors.OpPrereqError("Instances affected by changing node %s's" + " secondary IP address have changed since" + " locks were acquired, wanted '%s', have" + " '%s'; retry the operation" % + (self.op.node_name, + utils.CommaJoin(wanted_instances), + utils.CommaJoin(owned_instances)), + errors.ECODE_STATE) + else: + affected_instances = None + if (self.op.master_candidate is not None or self.op.drained is not None or self.op.offline is not None): @@ -5342,16 +5957,21 @@ class LUNodeSetParams(LogicalUnit): raise errors.OpPrereqError("Cannot change the secondary ip on a single" " homed cluster", errors.ECODE_INVAL) + assert not (frozenset(affected_instances) - + self.owned_locks(locking.LEVEL_INSTANCE)) + if node.offline: - if self.affected_instances: - raise errors.OpPrereqError("Cannot change secondary ip: offline" - " node has instances (%s) configured" - " to use it" % self.affected_instances) + if affected_instances: + raise errors.OpPrereqError("Cannot change secondary IP address:" + " offline node has instances (%s)" + " configured to use it" % + utils.CommaJoin(affected_instances.keys())) else: # On online nodes, check that no instances are running, and that # the node has the new ip and we can reach it. - for instance in self.affected_instances: - _CheckInstanceDown(self, instance, "cannot change secondary ip") + for instance in affected_instances.values(): + _CheckInstanceState(self, instance, INSTANCE_DOWN, + msg="cannot change secondary ip") _CheckNodeHasSecondaryIP(self, node.name, self.op.secondary_ip, True) if master.name != node.name: @@ -5368,6 +5988,15 @@ class LUNodeSetParams(LogicalUnit): utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES) self.new_ndparams = new_ndparams + if self.op.hv_state: + self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state, + self.node.hv_state_static) + + if self.op.disk_state: + self.new_disk_state = \ + _MergeAndVerifyDiskState(self.op.disk_state, + self.node.disk_state_static) + def Exec(self, feedback_fn): """Modifies a node. @@ -5384,6 +6013,12 @@ class LUNodeSetParams(LogicalUnit): if self.op.powered is not None: node.powered = self.op.powered + if self.op.hv_state: + node.hv_state_static = self.new_hv_state + + if self.op.disk_state: + node.disk_state_static = self.new_disk_state + for attr in ["master_capable", "vm_capable"]: val = getattr(self.op, attr) if val is not None: @@ -5488,20 +6123,23 @@ class LUClusterQuery(NoHooksLU): "config_version": constants.CONFIG_VERSION, "os_api_version": max(constants.OS_API_VERSIONS), "export_version": constants.EXPORT_VERSION, - "architecture": (platform.architecture()[0], platform.machine()), + "architecture": runtime.GetArchInfo(), "name": cluster.cluster_name, "master": cluster.master_node, - "default_hypervisor": cluster.enabled_hypervisors[0], + "default_hypervisor": cluster.primary_hypervisor, "enabled_hypervisors": cluster.enabled_hypervisors, "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name]) for hypervisor_name in cluster.enabled_hypervisors]), "os_hvp": os_hvp, "beparams": cluster.beparams, "osparams": cluster.osparams, + "ipolicy": cluster.ipolicy, "nicparams": cluster.nicparams, "ndparams": cluster.ndparams, "candidate_pool_size": cluster.candidate_pool_size, "master_netdev": cluster.master_netdev, + "master_netmask": cluster.master_netmask, + "use_external_mip_script": cluster.use_external_mip_script, "volume_group_name": cluster.volume_group_name, "drbd_usermode_helper": cluster.drbd_usermode_helper, "file_storage_dir": cluster.file_storage_dir, @@ -5528,56 +6166,88 @@ class LUClusterConfigQuery(NoHooksLU): """ REQ_BGL = False - _FIELDS_DYNAMIC = utils.FieldSet() - _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag", - "watcher_pause", "volume_group_name") def CheckArguments(self): - _CheckOutputFields(static=self._FIELDS_STATIC, - dynamic=self._FIELDS_DYNAMIC, - selected=self.op.output_fields) + self.cq = _ClusterQuery(None, self.op.output_fields, False) def ExpandNames(self): - self.needed_locks = {} + self.cq.ExpandNames(self) + + def DeclareLocks(self, level): + self.cq.DeclareLocks(self, level) def Exec(self, feedback_fn): - """Dump a representation of the cluster config to the standard output. - - """ - values = [] - for field in self.op.output_fields: - if field == "cluster_name": - entry = self.cfg.GetClusterName() - elif field == "master_node": - entry = self.cfg.GetMasterNode() - elif field == "drain_flag": - entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE) - elif field == "watcher_pause": - entry = utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE) - elif field == "volume_group_name": - entry = self.cfg.GetVGName() - else: - raise errors.ParameterError(field) - values.append(entry) - return values + result = self.cq.OldStyleQuery(self) + assert len(result) == 1 -class LUInstanceActivateDisks(NoHooksLU): - """Bring up an instance's disks. + return result[0] - """ - REQ_BGL = False - def ExpandNames(self): - self._ExpandAndLockInstance() - self.needed_locks[locking.LEVEL_NODE] = [] - self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE +class _ClusterQuery(_QueryBase): + FIELDS = query.CLUSTER_FIELDS - def DeclareLocks(self, level): - if level == locking.LEVEL_NODE: - self._LockInstancesNodes() + #: Do not sort (there is only one item) + SORT_FIELD = None - def CheckPrereq(self): + def ExpandNames(self, lu): + lu.needed_locks = {} + + # The following variables interact with _QueryBase._GetNames + self.wanted = locking.ALL_SET + self.do_locking = self.use_locking + + if self.do_locking: + raise errors.OpPrereqError("Can not use locking for cluster queries", + errors.ECODE_INVAL) + + def DeclareLocks(self, lu, level): + pass + + def _GetQueryData(self, lu): + """Computes the list of nodes and their attributes. + + """ + # Locking is not used + assert not (compat.any(lu.glm.is_owned(level) + for level in locking.LEVELS + if level != locking.LEVEL_CLUSTER) or + self.do_locking or self.use_locking) + + if query.CQ_CONFIG in self.requested_data: + cluster = lu.cfg.GetClusterInfo() + else: + cluster = NotImplemented + + if query.CQ_QUEUE_DRAINED in self.requested_data: + drain_flag = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE) + else: + drain_flag = NotImplemented + + if query.CQ_WATCHER_PAUSE in self.requested_data: + watcher_pause = utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE) + else: + watcher_pause = NotImplemented + + return query.ClusterQueryData(cluster, drain_flag, watcher_pause) + + +class LUInstanceActivateDisks(NoHooksLU): + """Bring up an instance's disks. + + """ + REQ_BGL = False + + def ExpandNames(self): + self._ExpandAndLockInstance() + self.needed_locks[locking.LEVEL_NODE] = [] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE: + self._LockInstancesNodes() + + def CheckPrereq(self): """Check prerequisites. This checks that the instance is in the cluster. @@ -5646,7 +6316,8 @@ def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False, node_disk = node_disk.Copy() node_disk.UnsetSize() lu.cfg.SetDiskID(node_disk, node) - result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False, idx) + result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname, + False, idx) msg = result.fail_msg if msg: lu.proc.LogWarning("Could not prepare block device %s on node %s" @@ -5668,7 +6339,8 @@ def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False, node_disk = node_disk.Copy() node_disk.UnsetSize() lu.cfg.SetDiskID(node_disk, node) - result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True, idx) + result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname, + True, idx) msg = result.fail_msg if msg: lu.proc.LogWarning("Could not prepare block device %s on node %s" @@ -5747,7 +6419,7 @@ def _SafeShutdownInstanceDisks(lu, instance, disks=None): _ShutdownInstanceDisks. """ - _CheckInstanceDown(lu, instance, "cannot shutdown disks") + _CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks") _ShutdownInstanceDisks(lu, instance, disks=disks) @@ -5813,14 +6485,18 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name): @param requested: the amount of memory in MiB to check for @type hypervisor_name: C{str} @param hypervisor_name: the hypervisor to ask for memory stats + @rtype: integer + @return: node current free memory @raise errors.OpPrereqError: if the node doesn't have enough memory, or we cannot check the node """ - nodeinfo = lu.rpc.call_node_info([node], None, hypervisor_name) + nodeinfo = lu.rpc.call_node_info([node], None, [hypervisor_name]) nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True, ecode=errors.ECODE_ENVIRON) - free_mem = nodeinfo[node].payload.get("memory_free", None) + (_, _, (hv_info, )) = nodeinfo[node].payload + + free_mem = hv_info.get("memory_free", None) if not isinstance(free_mem, int): raise errors.OpPrereqError("Can't compute free memory on node %s, result" " was '%s'" % (node, free_mem), @@ -5830,6 +6506,7 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name): " needed %s MiB, available %s MiB" % (node, reason, requested, free_mem), errors.ECODE_NORES) + return free_mem def _CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes): @@ -5875,12 +6552,13 @@ def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested): or we cannot check the node """ - nodeinfo = lu.rpc.call_node_info(nodenames, vg, None) + nodeinfo = lu.rpc.call_node_info(nodenames, [vg], None) for node in nodenames: info = nodeinfo[node] info.Raise("Cannot get current information from node %s" % node, prereq=True, ecode=errors.ECODE_ENVIRON) - vg_free = info.payload.get("vg_free", None) + (_, (vg_info, ), _) = info.payload + vg_free = vg_info.get("vg_free", None) if not isinstance(vg_free, int): raise errors.OpPrereqError("Can't compute free disk space on node" " %s for vg %s, result was '%s'" % @@ -5892,6 +6570,41 @@ def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested): errors.ECODE_NORES) +def _CheckNodesPhysicalCPUs(lu, nodenames, requested, hypervisor_name): + """Checks if nodes have enough physical CPUs + + This function checks if all given nodes have the needed number of + physical CPUs. In case any node has less CPUs or we cannot get the + information from the node, this function raises an OpPrereqError + exception. + + @type lu: C{LogicalUnit} + @param lu: a logical unit from which we get configuration data + @type nodenames: C{list} + @param nodenames: the list of node names to check + @type requested: C{int} + @param requested: the minimum acceptable number of physical CPUs + @raise errors.OpPrereqError: if the node doesn't have enough CPUs, + or we cannot check the node + + """ + nodeinfo = lu.rpc.call_node_info(nodenames, None, [hypervisor_name]) + for node in nodenames: + info = nodeinfo[node] + info.Raise("Cannot get current information from node %s" % node, + prereq=True, ecode=errors.ECODE_ENVIRON) + (_, _, (hv_info, )) = info.payload + num_cpus = hv_info.get("cpu_total", None) + if not isinstance(num_cpus, int): + raise errors.OpPrereqError("Can't compute the number of physical CPUs" + " on node %s, result was '%s'" % + (node, num_cpus), errors.ECODE_ENVIRON) + if requested > num_cpus: + raise errors.OpPrereqError("Node %s has %s physical CPUs, but %s are " + "required" % (node, num_cpus, requested), + errors.ECODE_NORES) + + class LUInstanceStartup(LogicalUnit): """Starts an instance. @@ -5904,10 +6617,16 @@ class LUInstanceStartup(LogicalUnit): # extra beparams if self.op.beparams: # fill the beparams dict + objects.UpgradeBeParams(self.op.beparams) utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES) def ExpandNames(self): self._ExpandAndLockInstance() + self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE_RES: + self._LockInstancesNodes(primary_only=True, level=locking.LEVEL_NODE_RES) def BuildHooksEnv(self): """Build hooks env. @@ -5951,6 +6670,8 @@ class LUInstanceStartup(LogicalUnit): hv_type.CheckParameterSyntax(filled_hvp) _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp) + _CheckInstanceState(self, instance, INSTANCE_ONLINE) + self.primary_offline = self.cfg.GetNodeInfo(instance.primary_node).offline if self.primary_offline and self.op.ignore_offline_nodes: @@ -5962,6 +6683,7 @@ class LUInstanceStartup(LogicalUnit): _CheckNodeOnline(self, instance.primary_node) bep = self.cfg.GetClusterInfo().FillBE(instance) + bep.update(self.op.beparams) # check bridges existence _CheckInstanceBridgesExist(self, instance) @@ -5974,7 +6696,7 @@ class LUInstanceStartup(LogicalUnit): if not remote_info.payload: # not running already _CheckNodeFreeMemory(self, instance.primary_node, "starting instance %s" % instance.name, - bep[constants.BE_MEMORY], instance.hypervisor) + bep[constants.BE_MINMEM], instance.hypervisor) def Exec(self, feedback_fn): """Start the instance. @@ -5994,9 +6716,11 @@ class LUInstanceStartup(LogicalUnit): _StartInstanceDisks(self, instance, force) - result = self.rpc.call_instance_start(node_current, instance, - self.op.hvparams, self.op.beparams, - self.op.startup_paused) + result = \ + self.rpc.call_instance_start(node_current, + (instance, self.op.hvparams, + self.op.beparams), + self.op.startup_paused) msg = result.fail_msg if msg: _ShutdownInstanceDisks(self, instance) @@ -6046,7 +6770,7 @@ class LUInstanceReboot(LogicalUnit): self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name) assert self.instance is not None, \ "Cannot retrieve locked instance %s" % self.op.instance_name - + _CheckInstanceState(self, instance, INSTANCE_ONLINE) _CheckNodeOnline(self, instance.primary_node) # check bridges existence @@ -6086,8 +6810,8 @@ class LUInstanceReboot(LogicalUnit): self.LogInfo("Instance %s was already stopped, starting now", instance.name) _StartInstanceDisks(self, instance, ignore_secondaries) - result = self.rpc.call_instance_start(node_current, instance, - None, None, False) + result = self.rpc.call_instance_start(node_current, + (instance, None, None), False) msg = result.fail_msg if msg: _ShutdownInstanceDisks(self, instance) @@ -6135,6 +6859,8 @@ class LUInstanceShutdown(LogicalUnit): assert self.instance is not None, \ "Cannot retrieve locked instance %s" % self.op.instance_name + _CheckInstanceState(self, self.instance, INSTANCE_ONLINE) + self.primary_offline = \ self.cfg.GetNodeInfo(self.instance.primary_node).offline @@ -6211,7 +6937,7 @@ class LUInstanceReinstall(LogicalUnit): raise errors.OpPrereqError("Instance '%s' has no disks" % self.op.instance_name, errors.ECODE_INVAL) - _CheckInstanceDown(self, instance, "cannot reinstall") + _CheckInstanceState(self, instance, INSTANCE_DOWN, msg="cannot reinstall") if self.op.os_type is not None: # OS verification @@ -6248,9 +6974,9 @@ class LUInstanceReinstall(LogicalUnit): try: feedback_fn("Running the instance OS create scripts...") # FIXME: pass debug option from opcode to backend - result = self.rpc.call_instance_os_add(inst.primary_node, inst, True, - self.op.debug_level, - osparams=self.os_inst) + result = self.rpc.call_instance_os_add(inst.primary_node, + (inst, self.os_inst), True, + self.op.debug_level) result.Raise("Could not install OS for instance %s on node %s" % (inst.name, inst.primary_node)) finally: @@ -6265,9 +6991,39 @@ class LUInstanceRecreateDisks(LogicalUnit): HTYPE = constants.HTYPE_INSTANCE REQ_BGL = False + _MODIFYABLE = frozenset([ + constants.IDISK_SIZE, + constants.IDISK_MODE, + ]) + + # New or changed disk parameters may have different semantics + assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([ + constants.IDISK_ADOPT, + + # TODO: Implement support changing VG while recreating + constants.IDISK_VG, + constants.IDISK_METAVG, + ])) + def CheckArguments(self): - # normalise the disk list - self.op.disks = sorted(frozenset(self.op.disks)) + if self.op.disks and ht.TPositiveInt(self.op.disks[0]): + # Normalize and convert deprecated list of disk indices + self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))] + + duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks)) + if duplicates: + raise errors.OpPrereqError("Some disks have been specified more than" + " once: %s" % utils.CommaJoin(duplicates), + errors.ECODE_INVAL) + + for (idx, params) in self.op.disks: + utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES) + unsupported = frozenset(params.keys()) - self._MODIFYABLE + if unsupported: + raise errors.OpPrereqError("Parameters for disk %s try to change" + " unmodifyable parameter(s): %s" % + (idx, utils.CommaJoin(unsupported)), + errors.ECODE_INVAL) def ExpandNames(self): self._ExpandAndLockInstance() @@ -6277,6 +7033,7 @@ class LUInstanceRecreateDisks(LogicalUnit): self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes) else: self.needed_locks[locking.LEVEL_NODE] = [] + self.needed_locks[locking.LEVEL_NODE_RES] = [] def DeclareLocks(self, level): if level == locking.LEVEL_NODE: @@ -6284,6 +7041,10 @@ class LUInstanceRecreateDisks(LogicalUnit): # otherwise we need to lock all nodes for disk re-creation primary_only = bool(self.op.nodes) self._LockInstancesNodes(primary_only=primary_only) + elif level == locking.LEVEL_NODE_RES: + # Copy node locks + self.needed_locks[locking.LEVEL_NODE_RES] = \ + self.needed_locks[locking.LEVEL_NODE][:] def BuildHooksEnv(self): """Build hooks env. @@ -6328,24 +7089,32 @@ class LUInstanceRecreateDisks(LogicalUnit): if instance.disk_template == constants.DT_DISKLESS: raise errors.OpPrereqError("Instance '%s' has no disks" % self.op.instance_name, errors.ECODE_INVAL) + # if we replace nodes *and* the old primary is offline, we don't # check - assert instance.primary_node in self.needed_locks[locking.LEVEL_NODE] + assert instance.primary_node in self.owned_locks(locking.LEVEL_NODE) + assert instance.primary_node in self.owned_locks(locking.LEVEL_NODE_RES) old_pnode = self.cfg.GetNodeInfo(instance.primary_node) if not (self.op.nodes and old_pnode.offline): - _CheckInstanceDown(self, instance, "cannot recreate disks") + _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING, + msg="cannot recreate disks") - if not self.op.disks: - self.op.disks = range(len(instance.disks)) + if self.op.disks: + self.disks = dict(self.op.disks) else: - for idx in self.op.disks: - if idx >= len(instance.disks): - raise errors.OpPrereqError("Invalid disk index '%s'" % idx, - errors.ECODE_INVAL) - if self.op.disks != range(len(instance.disks)) and self.op.nodes: + self.disks = dict((idx, {}) for idx in range(len(instance.disks))) + + maxidx = max(self.disks.keys()) + if maxidx >= len(instance.disks): + raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx, + errors.ECODE_INVAL) + + if (self.op.nodes and + sorted(self.disks.keys()) != range(len(instance.disks))): raise errors.OpPrereqError("Can't recreate disks partially and" " change the nodes at the same time", errors.ECODE_INVAL) + self.instance = instance def Exec(self, feedback_fn): @@ -6354,31 +7123,46 @@ class LUInstanceRecreateDisks(LogicalUnit): """ instance = self.instance + assert (self.owned_locks(locking.LEVEL_NODE) == + self.owned_locks(locking.LEVEL_NODE_RES)) + to_skip = [] - mods = [] # keeps track of needed logical_id changes + mods = [] # keeps track of needed changes for idx, disk in enumerate(instance.disks): - if idx not in self.op.disks: # disk idx has not been passed in + try: + changes = self.disks[idx] + except KeyError: + # Disk should not be recreated to_skip.append(idx) continue + # update secondaries for disks, if needed - if self.op.nodes: - if disk.dev_type == constants.LD_DRBD8: - # need to update the nodes and minors - assert len(self.op.nodes) == 2 - assert len(disk.logical_id) == 6 # otherwise disk internals - # have changed - (_, _, old_port, _, _, old_secret) = disk.logical_id - new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name) - new_id = (self.op.nodes[0], self.op.nodes[1], old_port, - new_minors[0], new_minors[1], old_secret) - assert len(disk.logical_id) == len(new_id) - mods.append((idx, new_id)) + if self.op.nodes and disk.dev_type == constants.LD_DRBD8: + # need to update the nodes and minors + assert len(self.op.nodes) == 2 + assert len(disk.logical_id) == 6 # otherwise disk internals + # have changed + (_, _, old_port, _, _, old_secret) = disk.logical_id + new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name) + new_id = (self.op.nodes[0], self.op.nodes[1], old_port, + new_minors[0], new_minors[1], old_secret) + assert len(disk.logical_id) == len(new_id) + else: + new_id = None + + mods.append((idx, new_id, changes)) # now that we have passed all asserts above, we can apply the mods # in a single run (to avoid partial changes) - for idx, new_id in mods: - instance.disks[idx].logical_id = new_id + for idx, new_id, changes in mods: + disk = instance.disks[idx] + if new_id is not None: + assert disk.dev_type == constants.LD_DRBD8 + disk.logical_id = new_id + if changes: + disk.Update(size=changes.get(constants.IDISK_SIZE, None), + mode=changes.get(constants.IDISK_MODE, None)) # change primary node, if needed if self.op.nodes: @@ -6436,7 +7220,8 @@ class LUInstanceRename(LogicalUnit): instance = self.cfg.GetInstanceInfo(self.op.instance_name) assert instance is not None _CheckNodeOnline(self, instance.primary_node) - _CheckInstanceDown(self, instance, "cannot rename") + _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING, + msg="cannot rename") self.instance = instance new_name = self.op.new_name @@ -6522,11 +7307,16 @@ class LUInstanceRemove(LogicalUnit): def ExpandNames(self): self._ExpandAndLockInstance() self.needed_locks[locking.LEVEL_NODE] = [] + self.needed_locks[locking.LEVEL_NODE_RES] = [] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE def DeclareLocks(self, level): if level == locking.LEVEL_NODE: self._LockInstancesNodes() + elif level == locking.LEVEL_NODE_RES: + # Copy node locks + self.needed_locks[locking.LEVEL_NODE_RES] = \ + self.needed_locks[locking.LEVEL_NODE][:] def BuildHooksEnv(self): """Build hooks env. @@ -6575,6 +7365,12 @@ class LUInstanceRemove(LogicalUnit): " node %s: %s" % (instance.name, instance.primary_node, msg)) + assert (self.owned_locks(locking.LEVEL_NODE) == + self.owned_locks(locking.LEVEL_NODE_RES)) + assert not (set(instance.all_nodes) - + self.owned_locks(locking.LEVEL_NODE)), \ + "Not owning correct locks" + _RemoveInstance(self, feedback_fn, instance, self.op.ignore_failures) @@ -6584,7 +7380,7 @@ def _RemoveInstance(lu, feedback_fn, instance, ignore_failures): """ logging.info("Removing block devices for instance %s", instance.name) - if not _RemoveDisks(lu, instance): + if not _RemoveDisks(lu, instance, ignore_failures=ignore_failures): if not ignore_failures: raise errors.OpExecError("Can't remove instance's disks") feedback_fn("Warning: can't remove instance's disks") @@ -6645,13 +7441,17 @@ class LUInstanceFailover(LogicalUnit): self.needed_locks[locking.LEVEL_NODE] = [] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + self.needed_locks[locking.LEVEL_NODE_RES] = [] + self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE + ignore_consistency = self.op.ignore_consistency shutdown_timeout = self.op.shutdown_timeout self._migrater = TLMigrateInstance(self, self.op.instance_name, cleanup=False, failover=True, ignore_consistency=ignore_consistency, - shutdown_timeout=shutdown_timeout) + shutdown_timeout=shutdown_timeout, + ignore_ipolicy=self.op.ignore_ipolicy) self.tasklets = [self._migrater] def DeclareLocks(self, level): @@ -6666,6 +7466,10 @@ class LUInstanceFailover(LogicalUnit): del self.recalculate_locks[locking.LEVEL_NODE] else: self._LockInstancesNodes() + elif level == locking.LEVEL_NODE_RES: + # Copy node locks + self.needed_locks[locking.LEVEL_NODE_RES] = \ + self.needed_locks[locking.LEVEL_NODE][:] def BuildHooksEnv(self): """Build hooks env. @@ -6722,10 +7526,16 @@ class LUInstanceMigrate(LogicalUnit): self.needed_locks[locking.LEVEL_NODE] = [] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE - self._migrater = TLMigrateInstance(self, self.op.instance_name, - cleanup=self.op.cleanup, - failover=False, - fallback=self.op.allow_failover) + self.needed_locks[locking.LEVEL_NODE] = [] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + + self._migrater = \ + TLMigrateInstance(self, self.op.instance_name, + cleanup=self.op.cleanup, + failover=False, + fallback=self.op.allow_failover, + allow_runtime_changes=self.op.allow_runtime_changes, + ignore_ipolicy=self.op.ignore_ipolicy) self.tasklets = [self._migrater] def DeclareLocks(self, level): @@ -6740,6 +7550,10 @@ class LUInstanceMigrate(LogicalUnit): del self.recalculate_locks[locking.LEVEL_NODE] else: self._LockInstancesNodes() + elif level == locking.LEVEL_NODE_RES: + # Copy node locks + self.needed_locks[locking.LEVEL_NODE_RES] = \ + self.needed_locks[locking.LEVEL_NODE][:] def BuildHooksEnv(self): """Build hooks env. @@ -6756,6 +7570,7 @@ class LUInstanceMigrate(LogicalUnit): "MIGRATE_CLEANUP": self.op.cleanup, "OLD_PRIMARY": source_node, "NEW_PRIMARY": target_node, + "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes, }) if instance.disk_template in constants.DTS_INT_MIRROR: @@ -6788,11 +7603,16 @@ class LUInstanceMove(LogicalUnit): target_node = _ExpandNodeName(self.cfg, self.op.target_node) self.op.target_node = target_node self.needed_locks[locking.LEVEL_NODE] = [target_node] + self.needed_locks[locking.LEVEL_NODE_RES] = [] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND def DeclareLocks(self, level): if level == locking.LEVEL_NODE: self._LockInstancesNodes(primary_only=True) + elif level == locking.LEVEL_NODE_RES: + # Copy node locks + self.needed_locks[locking.LEVEL_NODE_RES] = \ + self.needed_locks[locking.LEVEL_NODE][:] def BuildHooksEnv(self): """Build hooks env. @@ -6849,11 +7669,15 @@ class LUInstanceMove(LogicalUnit): _CheckNodeOnline(self, target_node) _CheckNodeNotDrained(self, target_node) _CheckNodeVmCapable(self, target_node) + ipolicy = _CalculateGroupIPolicy(self.cfg.GetClusterInfo(), + self.cfg.GetNodeGroup(node.group)) + _CheckTargetNodeIPolicy(self, ipolicy, instance, node, + ignore=self.op.ignore_ipolicy) - if instance.admin_up: + if instance.admin_state == constants.ADMINST_UP: # check memory requirements on the secondary node _CheckNodeFreeMemory(self, target_node, "failing over instance %s" % - instance.name, bep[constants.BE_MEMORY], + instance.name, bep[constants.BE_MAXMEM], instance.hypervisor) else: self.LogInfo("Not checking memory on the secondary node as" @@ -6877,6 +7701,9 @@ class LUInstanceMove(LogicalUnit): self.LogInfo("Shutting down instance %s on source node %s", instance.name, source_node) + assert (self.owned_locks(locking.LEVEL_NODE) == + self.owned_locks(locking.LEVEL_NODE_RES)) + result = self.rpc.call_instance_shutdown(source_node, instance, self.op.shutdown_timeout) msg = result.fail_msg @@ -6908,7 +7735,7 @@ class LUInstanceMove(LogicalUnit): # activate, get path, copy the data over for idx, disk in enumerate(instance.disks): self.LogInfo("Copying data for disk %d", idx) - result = self.rpc.call_blockdev_assemble(target_node, disk, + result = self.rpc.call_blockdev_assemble(target_node, (disk, instance), instance.name, True, idx) if result.fail_msg: self.LogWarning("Can't assemble newly created disk %d: %s", @@ -6916,7 +7743,7 @@ class LUInstanceMove(LogicalUnit): errs.append(result.fail_msg) break dev_path = result.payload - result = self.rpc.call_blockdev_export(source_node, disk, + result = self.rpc.call_blockdev_export(source_node, (disk, instance), target_node, dev_path, cluster_name) if result.fail_msg: @@ -6941,7 +7768,7 @@ class LUInstanceMove(LogicalUnit): _RemoveDisks(self, instance, target_node=source_node) # Only start the instance if it's marked as up - if instance.admin_up: + if instance.admin_state == constants.ADMINST_UP: self.LogInfo("Starting instance %s on node %s", instance.name, target_node) @@ -6951,8 +7778,8 @@ class LUInstanceMove(LogicalUnit): _ShutdownInstanceDisks(self, instance) raise errors.OpExecError("Can't activate the instance's disks") - result = self.rpc.call_instance_start(target_node, instance, - None, None, False) + result = self.rpc.call_instance_start(target_node, + (instance, None, None), False) msg = result.fail_msg if msg: _ShutdownInstanceDisks(self, instance) @@ -6987,6 +7814,7 @@ class LUNodeMigrate(LogicalUnit): """ return { "NODE_NAME": self.op.node_name, + "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes, } def BuildHooksNodes(self): @@ -7001,12 +7829,15 @@ class LUNodeMigrate(LogicalUnit): def Exec(self, feedback_fn): # Prepare jobs for migration instances + allow_runtime_changes = self.op.allow_runtime_changes jobs = [ [opcodes.OpInstanceMigrate(instance_name=inst.name, mode=self.op.mode, live=self.op.live, iallocator=self.op.iallocator, - target_node=self.op.target_node)] + target_node=self.op.target_node, + allow_runtime_changes=allow_runtime_changes, + ignore_ipolicy=self.op.ignore_ipolicy)] for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name) ] @@ -7043,12 +7874,21 @@ class TLMigrateInstance(Tasklet): and target node @type shutdown_timeout: int @ivar shutdown_timeout: In case of failover timeout of the shutdown + @type ignore_ipolicy: bool + @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating """ + + # Constants + _MIGRATION_POLL_INTERVAL = 1 # seconds + _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds + def __init__(self, lu, instance_name, cleanup=False, failover=False, fallback=False, ignore_consistency=False, - shutdown_timeout=constants.DEFAULT_SHUTDOWN_TIMEOUT): + allow_runtime_changes=True, + shutdown_timeout=constants.DEFAULT_SHUTDOWN_TIMEOUT, + ignore_ipolicy=False): """Initializes this class. """ @@ -7062,6 +7902,8 @@ class TLMigrateInstance(Tasklet): self.fallback = fallback self.ignore_consistency = ignore_consistency self.shutdown_timeout = shutdown_timeout + self.ignore_ipolicy = ignore_ipolicy + self.allow_runtime_changes = allow_runtime_changes def CheckPrereq(self): """Check prerequisites. @@ -7073,11 +7915,13 @@ class TLMigrateInstance(Tasklet): instance = self.cfg.GetInstanceInfo(instance_name) assert instance is not None self.instance = instance + cluster = self.cfg.GetClusterInfo() - if (not self.cleanup and not instance.admin_up and not self.failover and - self.fallback): - self.lu.LogInfo("Instance is marked down, fallback allowed, switching" - " to failover") + if (not self.cleanup and + not instance.admin_state == constants.ADMINST_UP and + not self.failover and self.fallback): + self.lu.LogInfo("Instance is marked down or offline, fallback allowed," + " switching to failover") self.failover = True if instance.disk_template not in constants.DTS_MIRRORED: @@ -7099,6 +7943,13 @@ class TLMigrateInstance(Tasklet): # BuildHooksEnv self.target_node = self.lu.op.target_node + # Check that the target node is correct in terms of instance policy + nodeinfo = self.cfg.GetNodeInfo(self.target_node) + group_info = self.cfg.GetNodeGroup(nodeinfo.group) + ipolicy = _CalculateGroupIPolicy(cluster, group_info) + _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, + ignore=self.ignore_ipolicy) + # self.target_node is already populated, either directly or by the # iallocator run target_node = self.target_node @@ -7132,18 +7983,38 @@ class TLMigrateInstance(Tasklet): " node can be passed)" % (instance.disk_template, text), errors.ECODE_INVAL) + nodeinfo = self.cfg.GetNodeInfo(target_node) + group_info = self.cfg.GetNodeGroup(nodeinfo.group) + ipolicy = _CalculateGroupIPolicy(cluster, group_info) + _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, + ignore=self.ignore_ipolicy) - i_be = self.cfg.GetClusterInfo().FillBE(instance) + i_be = cluster.FillBE(instance) # check memory requirements on the secondary node - if not self.cleanup and (not self.failover or instance.admin_up): - _CheckNodeFreeMemory(self.lu, target_node, "migrating instance %s" % - instance.name, i_be[constants.BE_MEMORY], - instance.hypervisor) + if (not self.cleanup and + (not self.failover or instance.admin_state == constants.ADMINST_UP)): + self.tgt_free_mem = _CheckNodeFreeMemory(self.lu, target_node, + "migrating instance %s" % + instance.name, + i_be[constants.BE_MINMEM], + instance.hypervisor) else: self.lu.LogInfo("Not checking memory on the secondary node as" " instance will not be started") + # check if failover must be forced instead of migration + if (not self.cleanup and not self.failover and + i_be[constants.BE_ALWAYS_FAILOVER]): + if self.fallback: + self.lu.LogInfo("Instance configured to always failover; fallback" + " to failover") + self.failover = True + else: + raise errors.OpPrereqError("This instance has been configured to" + " always failover, please allow failover", + errors.ECODE_STATE) + # check bridge existance _CheckInstanceBridgesExist(self.lu, instance, node=target_node) @@ -7177,8 +8048,7 @@ class TLMigrateInstance(Tasklet): self.lu.op.live = None elif self.lu.op.mode is None: # read the default value from the hypervisor - i_hv = self.cfg.GetClusterInfo().FillHV(self.instance, - skip_globals=False) + i_hv = cluster.FillHV(self.instance, skip_globals=False) self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE] self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE @@ -7186,16 +8056,25 @@ class TLMigrateInstance(Tasklet): # Failover is never live self.live = False + if not (self.failover or self.cleanup): + remote_info = self.rpc.call_instance_info(instance.primary_node, + instance.name, + instance.hypervisor) + remote_info.Raise("Error checking instance on node %s" % + instance.primary_node) + instance_running = bool(remote_info.payload) + if instance_running: + self.current_mem = int(remote_info.payload["memory"]) + def _RunAllocator(self): """Run the allocator based on input opcode. """ + # FIXME: add a self.ignore_ipolicy option ial = IAllocator(self.cfg, self.rpc, mode=constants.IALLOCATOR_MODE_RELOC, name=self.instance_name, - # TODO See why hail breaks with a single node below - relocate_from=[self.instance.primary_node, - self.instance.primary_node], + relocate_from=[self.instance.primary_node], ) ial.Run(self.lu.op.iallocator) @@ -7227,7 +8106,8 @@ class TLMigrateInstance(Tasklet): all_done = True result = self.rpc.call_drbd_wait_sync(self.all_nodes, self.nodes_ip, - self.instance.disks) + (self.instance.disks, + self.instance)) min_percent = 100 for node, nres in result.items(): nres.Raise("Cannot resync disks on node %s" % node) @@ -7273,7 +8153,7 @@ class TLMigrateInstance(Tasklet): msg = "single-master" self.feedback_fn("* changing disks into %s mode" % msg) result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip, - self.instance.disks, + (self.instance.disks, self.instance), self.instance.name, multimaster) for node, nres in result.items(): nres.Raise("Cannot change disks config on node %s" % node) @@ -7368,12 +8248,13 @@ class TLMigrateInstance(Tasklet): """ instance = self.instance target_node = self.target_node + source_node = self.source_node migration_info = self.migration_info - abort_result = self.rpc.call_finalize_migration(target_node, - instance, - migration_info, - False) + abort_result = self.rpc.call_instance_finalize_migration_dst(target_node, + instance, + migration_info, + False) abort_msg = abort_result.fail_msg if abort_msg: logging.error("Aborting migration failed on target node %s: %s", @@ -7381,6 +8262,13 @@ class TLMigrateInstance(Tasklet): # Don't raise an exception here, as we stil have to try to revert the # disk status, even if this step failed. + abort_result = self.rpc.call_instance_finalize_migration_src(source_node, + instance, False, self.live) + abort_msg = abort_result.fail_msg + if abort_msg: + logging.error("Aborting migration failed on source node %s: %s", + source_node, abort_msg) + def _ExecMigration(self): """Migrate an instance. @@ -7397,12 +8285,43 @@ class TLMigrateInstance(Tasklet): target_node = self.target_node source_node = self.source_node + # Check for hypervisor version mismatch and warn the user. + nodeinfo = self.rpc.call_node_info([source_node, target_node], + None, [self.instance.hypervisor]) + for ninfo in nodeinfo.values(): + ninfo.Raise("Unable to retrieve node information from node '%s'" % + ninfo.node) + (_, _, (src_info, )) = nodeinfo[source_node].payload + (_, _, (dst_info, )) = nodeinfo[target_node].payload + + if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and + (constants.HV_NODEINFO_KEY_VERSION in dst_info)): + src_version = src_info[constants.HV_NODEINFO_KEY_VERSION] + dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION] + if src_version != dst_version: + self.feedback_fn("* warning: hypervisor version mismatch between" + " source (%s) and target (%s) node" % + (src_version, dst_version)) + self.feedback_fn("* checking disk consistency between source and target") - for dev in instance.disks: - if not _CheckDiskConsistency(self.lu, dev, target_node, False): + for (idx, dev) in enumerate(instance.disks): + if not _CheckDiskConsistency(self.lu, instance, dev, target_node, False): raise errors.OpExecError("Disk %s is degraded or not fully" " synchronized on target node," - " aborting migration" % dev.iv_name) + " aborting migration" % idx) + + if self.current_mem > self.tgt_free_mem: + if not self.allow_runtime_changes: + raise errors.OpExecError("Memory ballooning not allowed and not enough" + " free memory to fit instance %s on target" + " node %s (have %dMB, need %dMB)" % + (instance.name, target_node, + self.tgt_free_mem, self.current_mem)) + self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem) + rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node, + instance, + self.tgt_free_mem) + rpcres.Raise("Cannot modify instance runtime memory") # First get the migration information from the remote node result = self.rpc.call_migration_info(source_node, instance) @@ -7452,18 +8371,59 @@ class TLMigrateInstance(Tasklet): raise errors.OpExecError("Could not migrate instance %s: %s" % (instance.name, msg)) + self.feedback_fn("* starting memory transfer") + last_feedback = time.time() + while True: + result = self.rpc.call_instance_get_migration_status(source_node, + instance) + msg = result.fail_msg + ms = result.payload # MigrationStatus instance + if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES): + logging.error("Instance migration failed, trying to revert" + " disk status: %s", msg) + self.feedback_fn("Migration failed, aborting") + self._AbortMigration() + self._RevertDiskStatus() + raise errors.OpExecError("Could not migrate instance %s: %s" % + (instance.name, msg)) + + if result.payload.status != constants.HV_MIGRATION_ACTIVE: + self.feedback_fn("* memory transfer complete") + break + + if (utils.TimeoutExpired(last_feedback, + self._MIGRATION_FEEDBACK_INTERVAL) and + ms.transferred_ram is not None): + mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram) + self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress) + last_feedback = time.time() + + time.sleep(self._MIGRATION_POLL_INTERVAL) + + result = self.rpc.call_instance_finalize_migration_src(source_node, + instance, + True, + self.live) + msg = result.fail_msg + if msg: + logging.error("Instance migration succeeded, but finalization failed" + " on the source node: %s", msg) + raise errors.OpExecError("Could not finalize instance migration: %s" % + msg) + instance.primary_node = target_node + # distribute new instance config to the other nodes self.cfg.Update(instance, self.feedback_fn) - result = self.rpc.call_finalize_migration(target_node, - instance, - migration_info, - True) + result = self.rpc.call_instance_finalize_migration_dst(target_node, + instance, + migration_info, + True) msg = result.fail_msg if msg: - logging.error("Instance migration succeeded, but finalization failed:" - " %s", msg) + logging.error("Instance migration succeeded, but finalization failed" + " on the target node: %s", msg) raise errors.OpExecError("Could not finalize instance migration: %s" % msg) @@ -7474,6 +8434,21 @@ class TLMigrateInstance(Tasklet): self._GoReconnect(False) self._WaitUntilSync() + # If the instance's disk template is `rbd' and there was a successful + # migration, unmap the device from the source node. + if self.instance.disk_template == constants.DT_RBD: + disks = _ExpandCheckDisks(instance, instance.disks) + self.feedback_fn("* unmapping instance's disks from %s" % source_node) + for disk in disks: + result = self.rpc.call_blockdev_shutdown(source_node, disk) + msg = result.fail_msg + if msg: + logging.error("Migration was successful, but couldn't unmap the" + " block device %s on source node %s: %s", + disk.iv_name, source_node, msg) + logging.error("You need to unmap the device %s manually on %s", + disk.iv_name, source_node) + self.feedback_fn("* done") def _ExecFailover(self): @@ -7489,18 +8464,19 @@ class TLMigrateInstance(Tasklet): source_node = instance.primary_node target_node = self.target_node - if instance.admin_up: + if instance.admin_state == constants.ADMINST_UP: self.feedback_fn("* checking disk consistency between source and target") - for dev in instance.disks: + for (idx, dev) in enumerate(instance.disks): # for drbd, these are drbd over lvm - if not _CheckDiskConsistency(self.lu, dev, target_node, False): + if not _CheckDiskConsistency(self.lu, instance, dev, target_node, + False): if primary_node.offline: self.feedback_fn("Node %s is offline, ignoring degraded disk %s on" " target node %s" % - (primary_node.name, dev.iv_name, target_node)) + (primary_node.name, idx, target_node)) elif not self.ignore_consistency: raise errors.OpExecError("Disk %s is degraded on target node," - " aborting failover" % dev.iv_name) + " aborting failover" % idx) else: self.feedback_fn("* not checking disk consistency as instance is not" " running") @@ -7532,7 +8508,7 @@ class TLMigrateInstance(Tasklet): self.cfg.Update(instance, self.feedback_fn) # Only start the instance if it's marked as up - if instance.admin_up: + if instance.admin_state == constants.ADMINST_UP: self.feedback_fn("* activating the instance's disks on target node %s" % target_node) logging.info("Starting instance %s on node %s", @@ -7546,7 +8522,7 @@ class TLMigrateInstance(Tasklet): self.feedback_fn("* starting the instance on the target node %s" % target_node) - result = self.rpc.call_instance_start(target_node, instance, None, None, + result = self.rpc.call_instance_start(target_node, (instance, None, None), False) msg = result.fail_msg if msg: @@ -7676,24 +8652,41 @@ def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names, assert len(vgnames) == len(names) == 2 port = lu.cfg.AllocatePort() shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId()) + dev_data = objects.Disk(dev_type=constants.LD_LV, size=size, - logical_id=(vgnames[0], names[0])) - dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128, - logical_id=(vgnames[1], names[1])) + logical_id=(vgnames[0], names[0]), + params={}) + dev_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE, + logical_id=(vgnames[1], names[1]), + params={}) drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size, logical_id=(primary, secondary, port, p_minor, s_minor, shared_secret), children=[dev_data, dev_meta], - iv_name=iv_name) + iv_name=iv_name, params={}) return drbd_dev -def _GenerateDiskTemplate(lu, template_name, - instance_name, primary_node, - secondary_nodes, disk_info, - file_storage_dir, file_driver, - base_index, feedback_fn): +_DISK_TEMPLATE_NAME_PREFIX = { + constants.DT_PLAIN: "", + constants.DT_RBD: ".rbd", + } + + +_DISK_TEMPLATE_DEVICE_TYPE = { + constants.DT_PLAIN: constants.LD_LV, + constants.DT_FILE: constants.LD_FILE, + constants.DT_SHARED_FILE: constants.LD_FILE, + constants.DT_BLOCK: constants.LD_BLOCKDEV, + constants.DT_RBD: constants.LD_RBD, + } + + +def _GenerateDiskTemplate(lu, template_name, instance_name, primary_node, + secondary_nodes, disk_info, file_storage_dir, file_driver, base_index, + feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage, + _req_shr_file_storage=opcodes.RequireSharedFileStorage): """Generate the entire disk layout for a given template type. """ @@ -7702,24 +8695,9 @@ def _GenerateDiskTemplate(lu, template_name, vgname = lu.cfg.GetVGName() disk_count = len(disk_info) disks = [] + if template_name == constants.DT_DISKLESS: pass - elif template_name == constants.DT_PLAIN: - if len(secondary_nodes) != 0: - raise errors.ProgrammerError("Wrong template configuration") - - names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i) - for i in range(disk_count)]) - for idx, disk in enumerate(disk_info): - disk_index = idx + base_index - vg = disk.get(constants.IDISK_VG, vgname) - feedback_fn("* disk %i, vg %s, name %s" % (idx, vg, names[idx])) - disk_dev = objects.Disk(dev_type=constants.LD_LV, - size=disk[constants.IDISK_SIZE], - logical_id=(vg, names[idx]), - iv_name="disk/%d" % disk_index, - mode=disk[constants.IDISK_MODE]) - disks.append(disk_dev) elif template_name == constants.DT_DRBD8: if len(secondary_nodes) != 1: raise errors.ProgrammerError("Wrong template configuration") @@ -7727,6 +8705,10 @@ def _GenerateDiskTemplate(lu, template_name, minors = lu.cfg.AllocateDRBDMinor( [primary_node, remote_node] * len(disk_info), instance_name) + (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name, + full_disk_params) + drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG] + names = [] for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i) for i in range(disk_count)]): @@ -7735,7 +8717,7 @@ def _GenerateDiskTemplate(lu, template_name, for idx, disk in enumerate(disk_info): disk_index = idx + base_index data_vg = disk.get(constants.IDISK_VG, vgname) - meta_vg = disk.get(constants.IDISK_METAVG, data_vg) + meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg) disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node, disk[constants.IDISK_SIZE], [data_vg, meta_vg], @@ -7744,54 +8726,54 @@ def _GenerateDiskTemplate(lu, template_name, minors[idx * 2], minors[idx * 2 + 1]) disk_dev.mode = disk[constants.IDISK_MODE] disks.append(disk_dev) - elif template_name == constants.DT_FILE: - if len(secondary_nodes) != 0: + else: + if secondary_nodes: raise errors.ProgrammerError("Wrong template configuration") - opcodes.RequireFileStorage() - - for idx, disk in enumerate(disk_info): - disk_index = idx + base_index - disk_dev = objects.Disk(dev_type=constants.LD_FILE, - size=disk[constants.IDISK_SIZE], - iv_name="disk/%d" % disk_index, - logical_id=(file_driver, - "%s/disk%d" % (file_storage_dir, - disk_index)), - mode=disk[constants.IDISK_MODE]) - disks.append(disk_dev) - elif template_name == constants.DT_SHARED_FILE: - if len(secondary_nodes) != 0: - raise errors.ProgrammerError("Wrong template configuration") + if template_name == constants.DT_FILE: + _req_file_storage() + elif template_name == constants.DT_SHARED_FILE: + _req_shr_file_storage() - opcodes.RequireSharedFileStorage() + name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None) + if name_prefix is None: + names = None + else: + names = _GenerateUniqueNames(lu, ["%s.disk%s" % + (name_prefix, base_index + i) + for i in range(disk_count)]) + + if template_name == constants.DT_PLAIN: + def logical_id_fn(idx, _, disk): + vg = disk.get(constants.IDISK_VG, vgname) + return (vg, names[idx]) + elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE): + logical_id_fn = \ + lambda _, disk_index, disk: (file_driver, + "%s/disk%d" % (file_storage_dir, + disk_index)) + elif template_name == constants.DT_BLOCK: + logical_id_fn = \ + lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL, + disk[constants.IDISK_ADOPT]) + elif template_name == constants.DT_RBD: + logical_id_fn = lambda idx, _, disk: ("rbd", names[idx]) + else: + raise errors.ProgrammerError("Unknown disk template '%s'" % template_name) - for idx, disk in enumerate(disk_info): - disk_index = idx + base_index - disk_dev = objects.Disk(dev_type=constants.LD_FILE, - size=disk[constants.IDISK_SIZE], - iv_name="disk/%d" % disk_index, - logical_id=(file_driver, - "%s/disk%d" % (file_storage_dir, - disk_index)), - mode=disk[constants.IDISK_MODE]) - disks.append(disk_dev) - elif template_name == constants.DT_BLOCK: - if len(secondary_nodes) != 0: - raise errors.ProgrammerError("Wrong template configuration") + dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name] for idx, disk in enumerate(disk_info): disk_index = idx + base_index - disk_dev = objects.Disk(dev_type=constants.LD_BLOCKDEV, - size=disk[constants.IDISK_SIZE], - logical_id=(constants.BLOCKDEV_DRIVER_MANUAL, - disk[constants.IDISK_ADOPT]), - iv_name="disk/%d" % disk_index, - mode=disk[constants.IDISK_MODE]) - disks.append(disk_dev) + size = disk[constants.IDISK_SIZE] + feedback_fn("* disk %s, size %s" % + (disk_index, utils.FormatUnit(size, "h"))) + disks.append(objects.Disk(dev_type=dev_type, size=size, + logical_id=logical_id_fn(idx, disk_index, disk), + iv_name="disk/%d" % disk_index, + mode=disk[constants.IDISK_MODE], + params={})) - else: - raise errors.ProgrammerError("Invalid disk template '%s'" % template_name) return disks @@ -7831,7 +8813,9 @@ def _WipeDisks(lu, instance): lu.cfg.SetDiskID(device, node) logging.info("Pause sync of instance %s disks", instance.name) - result = lu.rpc.call_blockdev_pause_resume_sync(node, instance.disks, True) + result = lu.rpc.call_blockdev_pause_resume_sync(node, + (instance.disks, instance), + True) for idx, success in enumerate(result.payload): if not success: @@ -7861,7 +8845,8 @@ def _WipeDisks(lu, instance): wipe_size = min(wipe_chunk_size, size - offset) logging.debug("Wiping disk %d, offset %s, chunk %s", idx, offset, wipe_size) - result = lu.rpc.call_blockdev_wipe(node, device, offset, wipe_size) + result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset, + wipe_size) result.Raise("Could not wipe disk %d at offset %d for size %d" % (idx, offset, wipe_size)) now = time.time() @@ -7874,7 +8859,9 @@ def _WipeDisks(lu, instance): finally: logging.info("Resume sync of instance %s disks", instance.name) - result = lu.rpc.call_blockdev_pause_resume_sync(node, instance.disks, False) + result = lu.rpc.call_blockdev_pause_resume_sync(node, + (instance.disks, instance), + False) for idx, success in enumerate(result.payload): if not success: @@ -7921,15 +8908,14 @@ def _CreateDisks(lu, instance, to_skip=None, target_node=None): for idx, device in enumerate(instance.disks): if to_skip and idx in to_skip: continue - logging.info("Creating volume %s for instance %s", - device.iv_name, instance.name) + logging.info("Creating disk %s for instance '%s'", idx, instance.name) #HARDCODE for node in all_nodes: f_create = node == pnode _CreateBlockDev(lu, node, instance, device, f_create, info, f_create) -def _RemoveDisks(lu, instance, target_node=None): +def _RemoveDisks(lu, instance, target_node=None, ignore_failures=False): """Remove all disks for an instance. This abstracts away some work from `AddInstance()` and @@ -7950,7 +8936,8 @@ def _RemoveDisks(lu, instance, target_node=None): logging.info("Removing block devices for instance %s", instance.name) all_result = True - for device in instance.disks: + ports_to_release = set() + for (idx, device) in enumerate(instance.disks): if target_node: edata = [(target_node, device)] else: @@ -7959,14 +8946,17 @@ def _RemoveDisks(lu, instance, target_node=None): lu.cfg.SetDiskID(disk, node) msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg if msg: - lu.LogWarning("Could not remove block device %s on node %s," - " continuing anyway: %s", device.iv_name, node, msg) + lu.LogWarning("Could not remove disk %s on node %s," + " continuing anyway: %s", idx, node, msg) all_result = False # if this is a DRBD disk, return its port to the pool if device.dev_type in constants.LDS_DRBD: - tcp_port = device.logical_id[2] - lu.cfg.AddTcpUdpPort(tcp_port) + ports_to_release.add(device.logical_id[2]) + + if all_result or ignore_failures: + for port in ports_to_release: + lu.cfg.AddTcpUdpPort(port) if instance.disk_template == constants.DT_FILE: file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1]) @@ -8003,7 +8993,7 @@ def _ComputeDiskSizePerVG(disk_template, disks): constants.DT_DISKLESS: {}, constants.DT_PLAIN: _compute(disks, 0), # 128 MB are added for drbd metadata for each disk - constants.DT_DRBD8: _compute(disks, 128), + constants.DT_DRBD8: _compute(disks, DRBD_META_SIZE), constants.DT_FILE: {}, constants.DT_SHARED_FILE: {}, } @@ -8024,10 +9014,12 @@ def _ComputeDiskSize(disk_template, disks): constants.DT_DISKLESS: None, constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks), # 128 MB are added for drbd metadata for each disk - constants.DT_DRBD8: sum(d[constants.IDISK_SIZE] + 128 for d in disks), + constants.DT_DRBD8: + sum(d[constants.IDISK_SIZE] + DRBD_META_SIZE for d in disks), constants.DT_FILE: None, constants.DT_SHARED_FILE: 0, constants.DT_BLOCK: 0, + constants.DT_RBD: 0, } if disk_template not in req_size_dict: @@ -8070,9 +9062,11 @@ def _CheckHVParams(lu, nodenames, hvname, hvparams): """ nodenames = _FilterVmNodes(lu, nodenames) - hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames, - hvname, - hvparams) + + cluster = lu.cfg.GetClusterInfo() + hvfull = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams) + + hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames, hvname, hvfull) for node in nodenames: info = hvinfo[node] if info.offline: @@ -8098,7 +9092,7 @@ def _CheckOSParams(lu, required, nodenames, osname, osparams): """ nodenames = _FilterVmNodes(lu, nodenames) - result = lu.rpc.call_os_validate(required, nodenames, osname, + result = lu.rpc.call_os_validate(nodenames, required, osname, [constants.OS_VALIDATE_PARAMETERS], osparams) for node, nres in result.items(): @@ -8291,7 +9285,11 @@ class LUInstanceCreate(LogicalUnit): self.add_locks[locking.LEVEL_INSTANCE] = instance_name if self.op.iallocator: + # TODO: Find a solution to not lock all nodes in the cluster, e.g. by + # specifying a group on instance creation and then selecting nodes from + # that group self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + self.needed_locks[locking.LEVEL_NODE_RES] = locking.ALL_SET else: self.op.pnode = _ExpandNodeName(self.cfg, self.op.pnode) nodelist = [self.op.pnode] @@ -8299,6 +9297,9 @@ class LUInstanceCreate(LogicalUnit): self.op.snode = _ExpandNodeName(self.cfg, self.op.snode) nodelist.append(self.op.snode) self.needed_locks[locking.LEVEL_NODE] = nodelist + # Lock resources of instance's primary and secondary nodes (copy to + # prevent accidential modification) + self.needed_locks[locking.LEVEL_NODE_RES] = list(nodelist) # in case of import lock the source node too if self.op.mode == constants.INSTANCE_IMPORT: @@ -8335,7 +9336,8 @@ class LUInstanceCreate(LogicalUnit): tags=self.op.tags, os=self.op.os_type, vcpus=self.be_full[constants.BE_VCPUS], - memory=self.be_full[constants.BE_MEMORY], + memory=self.be_full[constants.BE_MAXMEM], + spindle_use=self.be_full[constants.BE_SPINDLE_USE], disks=self.disks, nics=nics, hypervisor=self.op.hypervisor, @@ -8380,7 +9382,8 @@ class LUInstanceCreate(LogicalUnit): secondary_nodes=self.secondaries, status=self.op.start, os_type=self.op.os_type, - memory=self.be_full[constants.BE_MEMORY], + minmem=self.be_full[constants.BE_MINMEM], + maxmem=self.be_full[constants.BE_MAXMEM], vcpus=self.be_full[constants.BE_VCPUS], nics=_NICListToTuple(self, self.nics), disk_template=self.op.disk_template, @@ -8462,33 +9465,39 @@ class LUInstanceCreate(LogicalUnit): if einfo.has_option(constants.INISECT_INS, "disk_template"): self.op.disk_template = einfo.get(constants.INISECT_INS, "disk_template") + if self.op.disk_template not in constants.DISK_TEMPLATES: + raise errors.OpPrereqError("Disk template specified in configuration" + " file is not one of the allowed values:" + " %s" % " ".join(constants.DISK_TEMPLATES)) else: raise errors.OpPrereqError("No disk template specified and the export" " is missing the disk_template information", errors.ECODE_INVAL) if not self.op.disks: - if einfo.has_option(constants.INISECT_INS, "disk_count"): - disks = [] - # TODO: import the disk iv_name too - for idx in range(einfo.getint(constants.INISECT_INS, "disk_count")): + disks = [] + # TODO: import the disk iv_name too + for idx in range(constants.MAX_DISKS): + if einfo.has_option(constants.INISECT_INS, "disk%d_size" % idx): disk_sz = einfo.getint(constants.INISECT_INS, "disk%d_size" % idx) disks.append({constants.IDISK_SIZE: disk_sz}) - self.op.disks = disks - else: + self.op.disks = disks + if not disks and self.op.disk_template != constants.DT_DISKLESS: raise errors.OpPrereqError("No disk info specified and the export" " is missing the disk information", errors.ECODE_INVAL) - if (not self.op.nics and - einfo.has_option(constants.INISECT_INS, "nic_count")): + if not self.op.nics: nics = [] - for idx in range(einfo.getint(constants.INISECT_INS, "nic_count")): - ndict = {} - for name in list(constants.NICS_PARAMETERS) + ["ip", "mac"]: - v = einfo.get(constants.INISECT_INS, "nic%d_%s" % (idx, name)) - ndict[name] = v - nics.append(ndict) + for idx in range(constants.MAX_NICS): + if einfo.has_option(constants.INISECT_INS, "nic%d_mac" % idx): + ndict = {} + for name in list(constants.NICS_PARAMETERS) + ["ip", "mac"]: + v = einfo.get(constants.INISECT_INS, "nic%d_%s" % (idx, name)) + ndict[name] = v + nics.append(ndict) + else: + break self.op.nics = nics if not self.op.tags and einfo.has_option(constants.INISECT_INS, "tags"): @@ -8510,6 +9519,12 @@ class LUInstanceCreate(LogicalUnit): for name, value in einfo.items(constants.INISECT_BEP): if name not in self.op.beparams: self.op.beparams[name] = value + # Compatibility for the old "memory" be param + if name == constants.BE_MEMORY: + if constants.BE_MAXMEM not in self.op.beparams: + self.op.beparams[constants.BE_MAXMEM] = value + if constants.BE_MINMEM not in self.op.beparams: + self.op.beparams[constants.BE_MINMEM] = value else: # try to read the parameters old style, from the main section for name in constants.BES_PARAMETERS: @@ -8577,7 +9592,7 @@ class LUInstanceCreate(LogicalUnit): # pylint: disable=W0142 self.instance_file_storage_dir = utils.PathJoin(*joinargs) - def CheckPrereq(self): + def CheckPrereq(self): # pylint: disable=R0914 """Check prerequisites. """ @@ -8592,7 +9607,8 @@ class LUInstanceCreate(LogicalUnit): raise errors.OpPrereqError("Cluster does not support lvm-based" " instances", errors.ECODE_STATE) - if self.op.hypervisor is None: + if (self.op.hypervisor is None or + self.op.hypervisor == constants.VALUE_AUTO): self.op.hypervisor = self.cfg.GetHypervisorType() cluster = self.cfg.GetClusterInfo() @@ -8618,6 +9634,11 @@ class LUInstanceCreate(LogicalUnit): _CheckGlobalHvParams(self.op.hvparams) # fill and remember the beparams dict + default_beparams = cluster.beparams[constants.PP_DEFAULT] + for param, value in self.op.beparams.iteritems(): + if value == constants.VALUE_AUTO: + self.op.beparams[param] = default_beparams[param] + objects.UpgradeBeParams(self.op.beparams) utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES) self.be_full = cluster.SimpleFillBE(self.op.beparams) @@ -8634,7 +9655,7 @@ class LUInstanceCreate(LogicalUnit): for idx, nic in enumerate(self.op.nics): nic_mode_req = nic.get(constants.INIC_MODE, None) nic_mode = nic_mode_req - if nic_mode is None: + if nic_mode is None or nic_mode == constants.VALUE_AUTO: nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE] # in routed mode, for the first nic, the default ip is 'auto' @@ -8678,9 +9699,11 @@ class LUInstanceCreate(LogicalUnit): # Build nic parameters link = nic.get(constants.INIC_LINK, None) + if link == constants.VALUE_AUTO: + link = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_LINK] nicparams = {} if nic_mode_req: - nicparams[constants.NIC_MODE] = nic_mode_req + nicparams[constants.NIC_MODE] = nic_mode if link: nicparams[constants.NIC_LINK] = link @@ -8710,25 +9733,16 @@ class LUInstanceCreate(LogicalUnit): constants.IDISK_SIZE: size, constants.IDISK_MODE: mode, constants.IDISK_VG: data_vg, - constants.IDISK_METAVG: disk.get(constants.IDISK_METAVG, data_vg), } + if constants.IDISK_METAVG in disk: + new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG] if constants.IDISK_ADOPT in disk: new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT] self.disks.append(new_disk) if self.op.mode == constants.INSTANCE_IMPORT: - - # Check that the new instance doesn't have less disks than the export - instance_disks = len(self.disks) - export_disks = export_info.getint(constants.INISECT_INS, 'disk_count') - if instance_disks < export_disks: - raise errors.OpPrereqError("Not enough disks to import." - " (instance: %d, export: %d)" % - (instance_disks, export_disks), - errors.ECODE_INVAL) - disk_images = [] - for idx in range(export_disks): + for idx in range(len(self.disks)): option = "disk%d_dump" % idx if export_info.has_option(constants.INISECT_INS, option): # FIXME: are the old os-es, disk sizes, etc. useful? @@ -8741,15 +9755,9 @@ class LUInstanceCreate(LogicalUnit): self.src_images = disk_images old_name = export_info.get(constants.INISECT_INS, "name") - try: - exp_nic_count = export_info.getint(constants.INISECT_INS, "nic_count") - except (TypeError, ValueError), err: - raise errors.OpPrereqError("Invalid export file, nic_count is not" - " an integer: %s" % str(err), - errors.ECODE_STATE) if self.op.instance_name == old_name: for idx, nic in enumerate(self.nics): - if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx: + if nic.mac == constants.VALUE_AUTO: nic_mac_ini = "nic%d_mac" % idx nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini) @@ -8779,6 +9787,14 @@ class LUInstanceCreate(LogicalUnit): if self.op.iallocator is not None: self._RunAllocator() + # Release all unneeded node locks + _ReleaseLocks(self, locking.LEVEL_NODE, + keep=filter(None, [self.op.pnode, self.op.snode, + self.op.src_node])) + _ReleaseLocks(self, locking.LEVEL_NODE_RES, + keep=filter(None, [self.op.pnode, self.op.snode, + self.op.src_node])) + #### node related checks # check primary node @@ -8807,12 +9823,45 @@ class LUInstanceCreate(LogicalUnit): _CheckNodeVmCapable(self, self.op.snode) self.secondaries.append(self.op.snode) + snode = self.cfg.GetNodeInfo(self.op.snode) + if pnode.group != snode.group: + self.LogWarning("The primary and secondary nodes are in two" + " different node groups; the disk parameters" + " from the first disk's node group will be" + " used") + nodenames = [pnode.name] + self.secondaries + # Verify instance specs + spindle_use = self.be_full.get(constants.BE_SPINDLE_USE, None) + ispec = { + constants.ISPEC_MEM_SIZE: self.be_full.get(constants.BE_MAXMEM, None), + constants.ISPEC_CPU_COUNT: self.be_full.get(constants.BE_VCPUS, None), + constants.ISPEC_DISK_COUNT: len(self.disks), + constants.ISPEC_DISK_SIZE: [disk["size"] for disk in self.disks], + constants.ISPEC_NIC_COUNT: len(self.nics), + constants.ISPEC_SPINDLE_USE: spindle_use, + } + + group_info = self.cfg.GetNodeGroup(pnode.group) + ipolicy = _CalculateGroupIPolicy(cluster, group_info) + res = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec) + if not self.op.ignore_ipolicy and res: + raise errors.OpPrereqError(("Instance allocation to group %s violates" + " policy: %s") % (pnode.group, + utils.CommaJoin(res)), + errors.ECODE_INVAL) + if not self.adopt_disks: - # Check lv size requirements, if not adopting - req_sizes = _ComputeDiskSizePerVG(self.op.disk_template, self.disks) - _CheckNodesFreeDiskPerVG(self, nodenames, req_sizes) + if self.op.disk_template == constants.DT_RBD: + # _CheckRADOSFreeSpace() is just a placeholder. + # Any function that checks prerequisites can be placed here. + # Check if there is enough space on the RADOS cluster. + _CheckRADOSFreeSpace() + else: + # Check lv size requirements, if not adopting + req_sizes = _ComputeDiskSizePerVG(self.op.disk_template, self.disks) + _CheckNodesFreeDiskPerVG(self, nodenames, req_sizes) elif self.op.disk_template == constants.DT_PLAIN: # Check the adoption data all_lvs = set(["%s/%s" % (disk[constants.IDISK_VG], @@ -8893,10 +9942,11 @@ class LUInstanceCreate(LogicalUnit): _CheckNicsBridgesExist(self, self.nics, self.pnode.name) # memory check on primary node + #TODO(dynmem): use MINMEM for checking if self.op.start: _CheckNodeFreeMemory(self, self.pnode.name, "creating instance %s" % self.op.instance_name, - self.be_full[constants.BE_MEMORY], + self.be_full[constants.BE_MAXMEM], self.op.hypervisor) self.dry_run_result = list(nodenames) @@ -8908,12 +9958,21 @@ class LUInstanceCreate(LogicalUnit): instance = self.op.instance_name pnode_name = self.pnode.name + assert not (self.owned_locks(locking.LEVEL_NODE_RES) - + self.owned_locks(locking.LEVEL_NODE)), \ + "Node locks differ from node resource locks" + ht_kind = self.op.hypervisor if ht_kind in constants.HTS_REQ_PORT: network_port = self.cfg.AllocatePort() else: network_port = None + # This is ugly but we got a chicken-egg problem here + # We can only take the group disk parameters, as the instance + # has no disks yet (we are generating them right here). + node = self.cfg.GetNodeInfo(pnode_name) + nodegroup = self.cfg.GetNodeGroup(node.group) disks = _GenerateDiskTemplate(self, self.op.disk_template, instance, pnode_name, @@ -8922,13 +9981,14 @@ class LUInstanceCreate(LogicalUnit): self.instance_file_storage_dir, self.op.file_driver, 0, - feedback_fn) + feedback_fn, + self.cfg.GetGroupDiskParams(nodegroup)) iobj = objects.Instance(name=instance, os=self.op.os_type, primary_node=pnode_name, nics=self.nics, disks=disks, disk_template=self.op.disk_template, - admin_up=False, + admin_state=constants.ADMINST_DOWN, network_port=network_port, beparams=self.op.beparams, hvparams=self.op.hvparams, @@ -9010,6 +10070,9 @@ class LUInstanceCreate(LogicalUnit): raise errors.OpExecError("There are some degraded disks for" " this instance") + # Release all node resource locks + _ReleaseLocks(self, locking.LEVEL_NODE_RES) + if iobj.disk_template != constants.DT_DISKLESS and not self.adopt_disks: if self.op.mode == constants.INSTANCE_CREATE: if not self.op.no_install: @@ -9018,7 +10081,8 @@ class LUInstanceCreate(LogicalUnit): if pause_sync: feedback_fn("* pausing disk sync to install instance OS") result = self.rpc.call_blockdev_pause_resume_sync(pnode_name, - iobj.disks, True) + (iobj.disks, + iobj), True) for idx, success in enumerate(result.payload): if not success: logging.warn("pause-sync of instance %s for disk %d failed", @@ -9026,19 +10090,21 @@ class LUInstanceCreate(LogicalUnit): feedback_fn("* running the instance OS create scripts...") # FIXME: pass debug option from opcode to backend - result = self.rpc.call_instance_os_add(pnode_name, iobj, False, - self.op.debug_level) + os_add_result = \ + self.rpc.call_instance_os_add(pnode_name, (iobj, None), False, + self.op.debug_level) if pause_sync: feedback_fn("* resuming disk sync") result = self.rpc.call_blockdev_pause_resume_sync(pnode_name, - iobj.disks, False) + (iobj.disks, + iobj), False) for idx, success in enumerate(result.payload): if not success: logging.warn("resume-sync of instance %s for disk %d failed", instance, idx) - result.Raise("Could not add os for instance %s" - " on node %s" % (instance, pnode_name)) + os_add_result.Raise("Could not add os for instance %s" + " on node %s" % (instance, pnode_name)) elif self.op.mode == constants.INSTANCE_IMPORT: feedback_fn("* running the instance OS import scripts...") @@ -9101,18 +10167,28 @@ class LUInstanceCreate(LogicalUnit): raise errors.ProgrammerError("Unknown OS initialization mode '%s'" % self.op.mode) + assert not self.owned_locks(locking.LEVEL_NODE_RES) + if self.op.start: - iobj.admin_up = True + iobj.admin_state = constants.ADMINST_UP self.cfg.Update(iobj, feedback_fn) logging.info("Starting instance %s on node %s", instance, pnode_name) feedback_fn("* starting instance...") - result = self.rpc.call_instance_start(pnode_name, iobj, - None, None, False) + result = self.rpc.call_instance_start(pnode_name, (iobj, None, None), + False) result.Raise("Could not start instance") return list(iobj.all_nodes) +def _CheckRADOSFreeSpace(): + """Compute disk size requirements inside the RADOS cluster. + + """ + # For the RADOS cluster we assume there is always enough space. + pass + + class LUInstanceConsole(NoHooksLU): """Connect to an instance's console. @@ -9124,6 +10200,7 @@ class LUInstanceConsole(NoHooksLU): REQ_BGL = False def ExpandNames(self): + self.share_locks = _ShareAll() self._ExpandAndLockInstance() def CheckPrereq(self): @@ -9149,10 +10226,12 @@ class LUInstanceConsole(NoHooksLU): node_insts.Raise("Can't get node information from %s" % node) if instance.name not in node_insts.payload: - if instance.admin_up: + if instance.admin_state == constants.ADMINST_UP: state = constants.INSTST_ERRORDOWN - else: + elif instance.admin_state == constants.ADMINST_DOWN: state = constants.INSTST_ADMINDOWN + else: + state = constants.INSTST_ADMINOFFLINE raise errors.OpExecError("Instance %s is not running (state %s)" % (instance.name, state)) @@ -9198,6 +10277,7 @@ class LUInstanceReplaceDisks(LogicalUnit): self._ExpandAndLockInstance() assert locking.LEVEL_NODE not in self.needed_locks + assert locking.LEVEL_NODE_RES not in self.needed_locks assert locking.LEVEL_NODEGROUP not in self.needed_locks assert self.op.iallocator is None or self.op.remote_node is None, \ @@ -9220,9 +10300,12 @@ class LUInstanceReplaceDisks(LogicalUnit): # iallocator will select a new node in the same group self.needed_locks[locking.LEVEL_NODEGROUP] = [] + self.needed_locks[locking.LEVEL_NODE_RES] = [] + self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode, self.op.iallocator, self.op.remote_node, - self.op.disks, False, self.op.early_release) + self.op.disks, False, self.op.early_release, + self.op.ignore_ipolicy) self.tasklets = [self.replacer] @@ -9233,6 +10316,8 @@ class LUInstanceReplaceDisks(LogicalUnit): assert not self.needed_locks[locking.LEVEL_NODEGROUP] self.share_locks[locking.LEVEL_NODEGROUP] = 1 + # Lock all groups used by instance optimistically; this requires going + # via the node before it's locked, requiring verification later on self.needed_locks[locking.LEVEL_NODEGROUP] = \ self.cfg.GetInstanceNodeGroups(self.op.instance_name) @@ -9247,6 +10332,10 @@ class LUInstanceReplaceDisks(LogicalUnit): for node_name in self.cfg.GetNodeGroup(group_uuid).members] else: self._LockInstancesNodes() + elif level == locking.LEVEL_NODE_RES: + # Reuse node locks + self.needed_locks[locking.LEVEL_NODE_RES] = \ + self.needed_locks[locking.LEVEL_NODE] def BuildHooksEnv(self): """Build hooks env. @@ -9283,6 +10372,7 @@ class LUInstanceReplaceDisks(LogicalUnit): assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or self.op.iallocator is None) + # Verify if node group locks are still correct owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) if owned_groups: _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups) @@ -9297,7 +10387,7 @@ class TLReplaceDisks(Tasklet): """ def __init__(self, lu, instance_name, mode, iallocator_name, remote_node, - disks, delay_iallocator, early_release): + disks, delay_iallocator, early_release, ignore_ipolicy): """Initializes this class. """ @@ -9311,6 +10401,7 @@ class TLReplaceDisks(Tasklet): self.disks = disks self.delay_iallocator = delay_iallocator self.early_release = early_release + self.ignore_ipolicy = ignore_ipolicy # Runtime data self.instance = None @@ -9533,6 +10624,16 @@ class TLReplaceDisks(Tasklet): if not self.disks: self.disks = range(len(self.instance.disks)) + # TODO: This is ugly, but right now we can't distinguish between internal + # submitted opcode and external one. We should fix that. + if self.remote_node_info: + # We change the node, lets verify it still meets instance policy + new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group) + ipolicy = _CalculateGroupIPolicy(self.cfg.GetClusterInfo(), + new_group_info) + _CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info, + ignore=self.ignore_ipolicy) + for node in check_nodes: _CheckNodeOnline(self.lu, node) @@ -9541,8 +10642,9 @@ class TLReplaceDisks(Tasklet): self.target_node] if node_name is not None) - # Release unneeded node locks + # Release unneeded node and node resource locks _ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes) + _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes) # Release any owned node group if self.lu.glm.is_owned(locking.LEVEL_NODEGROUP): @@ -9571,6 +10673,8 @@ class TLReplaceDisks(Tasklet): assert set(owned_nodes) == set(self.node_secondary_ip), \ ("Incorrect node locks, owning %s, expected %s" % (owned_nodes, self.node_secondary_ip.keys())) + assert (self.lu.owned_locks(locking.LEVEL_NODE) == + self.lu.owned_locks(locking.LEVEL_NODE_RES)) owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE) assert list(owned_instances) == [self.instance_name], \ @@ -9586,7 +10690,7 @@ class TLReplaceDisks(Tasklet): feedback_fn("Replacing disk(s) %s for %s" % (utils.CommaJoin(self.disks), self.instance.name)) - activate_disks = (not self.instance.admin_up) + activate_disks = (self.instance.admin_state != constants.ADMINST_UP) # Activate the instance disks if we're replacing them on a down instance if activate_disks: @@ -9606,9 +10710,11 @@ class TLReplaceDisks(Tasklet): if activate_disks: _SafeShutdownInstanceDisks(self.lu, self.instance) + assert not self.lu.owned_locks(locking.LEVEL_NODE) + if __debug__: # Verify owned locks - owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE) + owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES) nodes = frozenset(self.node_secondary_ip) assert ((self.early_release and not owned_nodes) or (not self.early_release and not (set(owned_nodes) - nodes))), \ @@ -9661,8 +10767,8 @@ class TLReplaceDisks(Tasklet): self.lu.LogInfo("Checking disk/%d consistency on node %s" % (idx, node_name)) - if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary, - ldisk=ldisk): + if not _CheckDiskConsistency(self.lu, self.instance, dev, node_name, + on_primary, ldisk=ldisk): raise errors.OpExecError("Node %s has degraded storage, unsafe to" " replace disks for instance %s" % (node_name, self.instance.name)) @@ -9689,10 +10795,10 @@ class TLReplaceDisks(Tasklet): vg_data = dev.children[0].logical_id[0] lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size, - logical_id=(vg_data, names[0])) + logical_id=(vg_data, names[0]), params={}) vg_meta = dev.children[1].logical_id[0] - lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128, - logical_id=(vg_meta, names[1])) + lv_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE, + logical_id=(vg_meta, names[1]), params={}) new_lvs = [lv_data, lv_meta] old_lvs = [child.Copy() for child in dev.children] @@ -9830,8 +10936,9 @@ class TLReplaceDisks(Tasklet): # Now that the new lvs have the old name, we can add them to the device self.lu.LogInfo("Adding new mirror component on %s" % self.target_node) - result = self.rpc.call_blockdev_addchildren(self.target_node, dev, - new_lvs) + result = self.rpc.call_blockdev_addchildren(self.target_node, + (dev, self.instance), + (new_lvs, self.instance)) msg = result.fail_msg if msg: for new_lv in new_lvs: @@ -9843,21 +10950,28 @@ class TLReplaceDisks(Tasklet): "volumes")) raise errors.OpExecError("Can't add local storage to drbd: %s" % msg) - cstep = 5 + cstep = itertools.count(5) + if self.early_release: - self.lu.LogStep(cstep, steps_total, "Removing old storage") - cstep += 1 + self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") self._RemoveOldStorage(self.target_node, iv_names) - # WARNING: we release both node locks here, do not do other RPCs - # than WaitForSync to the primary node - _ReleaseLocks(self.lu, locking.LEVEL_NODE, - names=[self.target_node, self.other_node]) + # TODO: Check if releasing locks early still makes sense + _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES) + else: + # Release all resource locks except those used by the instance + _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, + keep=self.node_secondary_ip.keys()) + + # Release all node locks while waiting for sync + _ReleaseLocks(self.lu, locking.LEVEL_NODE) + + # TODO: Can the instance lock be downgraded here? Take the optional disk + # shutdown in the caller into consideration. # Wait for sync # This can fail as the old devices are degraded and _WaitForSync # does a combined result over all disks, so we don't check its return value - self.lu.LogStep(cstep, steps_total, "Sync devices") - cstep += 1 + self.lu.LogStep(cstep.next(), steps_total, "Sync devices") _WaitForSync(self.lu, self.instance) # Check all devices manually @@ -9865,8 +10979,7 @@ class TLReplaceDisks(Tasklet): # Step: remove old storage if not self.early_release: - self.lu.LogStep(cstep, steps_total, "Removing old storage") - cstep += 1 + self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") self._RemoveOldStorage(self.target_node, iv_names) def _ExecDrbd8Secondary(self, feedback_fn): @@ -9946,7 +11059,8 @@ class TLReplaceDisks(Tasklet): new_drbd = objects.Disk(dev_type=constants.LD_DRBD8, logical_id=new_alone_id, children=dev.children, - size=dev.size) + size=dev.size, + params={}) try: _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd, _GetInstanceInfoText(self.instance), False) @@ -9985,13 +11099,16 @@ class TLReplaceDisks(Tasklet): self.cfg.Update(self.instance, feedback_fn) + # Release all node locks (the configuration has been updated) + _ReleaseLocks(self.lu, locking.LEVEL_NODE) + # and now perform the drbd attach self.lu.LogInfo("Attaching primary drbds to new secondary" " (standalone => connected)") result = self.rpc.call_drbd_attach_net([self.instance.primary_node, self.new_node], self.node_secondary_ip, - self.instance.disks, + (self.instance.disks, self.instance), self.instance.name, False) for to_node, to_result in result.items(): @@ -10001,23 +11118,26 @@ class TLReplaceDisks(Tasklet): to_node, msg, hint=("please do a gnt-instance info to see the" " status of disks")) - cstep = 5 + + cstep = itertools.count(5) + if self.early_release: - self.lu.LogStep(cstep, steps_total, "Removing old storage") - cstep += 1 + self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") self._RemoveOldStorage(self.target_node, iv_names) - # WARNING: we release all node locks here, do not do other RPCs - # than WaitForSync to the primary node - _ReleaseLocks(self.lu, locking.LEVEL_NODE, - names=[self.instance.primary_node, - self.target_node, - self.new_node]) + # TODO: Check if releasing locks early still makes sense + _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES) + else: + # Release all resource locks except those used by the instance + _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, + keep=self.node_secondary_ip.keys()) + + # TODO: Can the instance lock be downgraded here? Take the optional disk + # shutdown in the caller into consideration. # Wait for sync # This can fail as the old devices are degraded and _WaitForSync # does a combined result over all disks, so we don't check its return value - self.lu.LogStep(cstep, steps_total, "Sync devices") - cstep += 1 + self.lu.LogStep(cstep.next(), steps_total, "Sync devices") _WaitForSync(self.lu, self.instance) # Check all devices manually @@ -10025,7 +11145,7 @@ class TLReplaceDisks(Tasklet): # Step: remove old storage if not self.early_release: - self.lu.LogStep(cstep, steps_total, "Removing old storage") + self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") self._RemoveOldStorage(self.target_node, iv_names) @@ -10071,7 +11191,7 @@ class LURepairNodeStorage(NoHooksLU): """ # Check whether any instance on this node has faulty disks for inst in _GetNodeInstances(self.cfg, self.op.node_name): - if not inst.admin_up: + if inst.admin_state != constants.ADMINST_UP: continue check_nodes = set(inst.all_nodes) check_nodes.discard(self.op.node_name) @@ -10097,6 +11217,15 @@ class LUNodeEvacuate(NoHooksLU): """ REQ_BGL = False + _MODE2IALLOCATOR = { + constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI, + constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC, + constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL, + } + assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES + assert (frozenset(_MODE2IALLOCATOR.values()) == + constants.IALLOCATOR_NEVAC_MODES) + def CheckArguments(self): _CheckIAllocatorOrNode(self, "iallocator", "remote_node") @@ -10111,7 +11240,7 @@ class LUNodeEvacuate(NoHooksLU): raise errors.OpPrereqError("Can not use evacuated node as a new" " secondary node", errors.ECODE_INVAL) - if self.op.mode != constants.IALLOCATOR_NEVAC_SEC: + if self.op.mode != constants.NODE_EVAC_SEC: raise errors.OpPrereqError("Without the use of an iallocator only" " secondary instances can be evacuated", errors.ECODE_INVAL) @@ -10145,19 +11274,19 @@ class LUNodeEvacuate(NoHooksLU): """Builds list of instances to operate on. """ - assert self.op.mode in constants.IALLOCATOR_NEVAC_MODES + assert self.op.mode in constants.NODE_EVAC_MODES - if self.op.mode == constants.IALLOCATOR_NEVAC_PRI: + if self.op.mode == constants.NODE_EVAC_PRI: # Primary instances only inst_fn = _GetNodePrimaryInstances assert self.op.remote_node is None, \ "Evacuating primary instances requires iallocator" - elif self.op.mode == constants.IALLOCATOR_NEVAC_SEC: + elif self.op.mode == constants.NODE_EVAC_SEC: # Secondary instances only inst_fn = _GetNodeSecondaryInstances else: # All instances - assert self.op.mode == constants.IALLOCATOR_NEVAC_ALL + assert self.op.mode == constants.NODE_EVAC_ALL inst_fn = _GetNodeInstances # TODO: In 2.6, change the iallocator interface to take an evacuation mode # per instance @@ -10251,7 +11380,7 @@ class LUNodeEvacuate(NoHooksLU): elif self.op.iallocator is not None: # TODO: Implement relocation to other group ial = IAllocator(self.cfg, self.rpc, constants.IALLOCATOR_MODE_NODE_EVAC, - evac_mode=self.op.mode, + evac_mode=self._MODE2IALLOCATOR[self.op.mode], instances=list(self.instance_names)) ial.Run(self.op.iallocator) @@ -10265,7 +11394,7 @@ class LUNodeEvacuate(NoHooksLU): jobs = _LoadNodeEvacResult(self, ial.result, self.op.early_release, True) elif self.op.remote_node is not None: - assert self.op.mode == constants.IALLOCATOR_NEVAC_SEC + assert self.op.mode == constants.NODE_EVAC_SEC jobs = [ [opcodes.OpInstanceReplaceDisks(instance_name=instance_name, remote_node=self.op.remote_node, @@ -10349,11 +11478,17 @@ class LUInstanceGrowDisk(LogicalUnit): def ExpandNames(self): self._ExpandAndLockInstance() self.needed_locks[locking.LEVEL_NODE] = [] + self.needed_locks[locking.LEVEL_NODE_RES] = [] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE def DeclareLocks(self, level): if level == locking.LEVEL_NODE: self._LockInstancesNodes() + elif level == locking.LEVEL_NODE_RES: + # Copy node locks + self.needed_locks[locking.LEVEL_NODE_RES] = \ + self.needed_locks[locking.LEVEL_NODE][:] def BuildHooksEnv(self): """Build hooks env. @@ -10364,6 +11499,7 @@ class LUInstanceGrowDisk(LogicalUnit): env = { "DISK": self.op.disk, "AMOUNT": self.op.amount, + "ABSOLUTE": self.op.absolute, } env.update(_BuildInstanceHookEnvByObject(self, self.instance)) return env @@ -10396,12 +11532,30 @@ class LUInstanceGrowDisk(LogicalUnit): self.disk = instance.FindDisk(self.op.disk) + if self.op.absolute: + self.target = self.op.amount + self.delta = self.target - self.disk.size + if self.delta < 0: + raise errors.OpPrereqError("Requested size (%s) is smaller than " + "current disk size (%s)" % + (utils.FormatUnit(self.target, "h"), + utils.FormatUnit(self.disk.size, "h")), + errors.ECODE_STATE) + else: + self.delta = self.op.amount + self.target = self.disk.size + self.delta + if self.delta < 0: + raise errors.OpPrereqError("Requested increment (%s) is negative" % + utils.FormatUnit(self.delta, "h"), + errors.ECODE_INVAL) + if instance.disk_template not in (constants.DT_FILE, - constants.DT_SHARED_FILE): + constants.DT_SHARED_FILE, + constants.DT_RBD): # TODO: check the free disk space for file, when that feature will be # supported _CheckNodesFreeDiskPerVG(self, nodenames, - self.disk.ComputeGrowth(self.op.amount)) + self.disk.ComputeGrowth(self.delta)) def Exec(self, feedback_fn): """Execute disk grow. @@ -10410,21 +11564,32 @@ class LUInstanceGrowDisk(LogicalUnit): instance = self.instance disk = self.disk + assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE) + assert (self.owned_locks(locking.LEVEL_NODE) == + self.owned_locks(locking.LEVEL_NODE_RES)) + disks_ok, _ = _AssembleInstanceDisks(self, self.instance, disks=[disk]) if not disks_ok: raise errors.OpExecError("Cannot activate block device to grow") + feedback_fn("Growing disk %s of instance '%s' by %s to %s" % + (self.op.disk, instance.name, + utils.FormatUnit(self.delta, "h"), + utils.FormatUnit(self.target, "h"))) + # First run all grow ops in dry-run mode for node in instance.all_nodes: self.cfg.SetDiskID(disk, node) - result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, True) + result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta, + True) result.Raise("Grow request failed to node %s" % node) # We know that (as far as we can test) operations across different # nodes will succeed, time to run it for real for node in instance.all_nodes: self.cfg.SetDiskID(disk, node) - result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, False) + result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta, + False) result.Raise("Grow request failed to node %s" % node) # TODO: Rewrite code to work properly @@ -10434,20 +11599,30 @@ class LUInstanceGrowDisk(LogicalUnit): # time is a work-around. time.sleep(5) - disk.RecordGrow(self.op.amount) + disk.RecordGrow(self.delta) self.cfg.Update(instance, feedback_fn) + + # Changes have been recorded, release node lock + _ReleaseLocks(self, locking.LEVEL_NODE) + + # Downgrade lock while waiting for sync + self.glm.downgrade(locking.LEVEL_INSTANCE) + if self.op.wait_for_sync: disk_abort = not _WaitForSync(self, instance, disks=[disk]) if disk_abort: self.proc.LogWarning("Disk sync-ing has not returned a good" " status; please check the instance") - if not instance.admin_up: + if instance.admin_state != constants.ADMINST_UP: _SafeShutdownInstanceDisks(self, instance, disks=[disk]) - elif not instance.admin_up: + elif instance.admin_state != constants.ADMINST_UP: self.proc.LogWarning("Not shutting down the disk even if the instance is" " not supposed to be running because no wait for" " sync mode was requested") + assert self.owned_locks(locking.LEVEL_NODE_RES) + assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE) + class LUInstanceQueryData(NoHooksLU): """Query runtime instance data. @@ -10478,12 +11653,25 @@ class LUInstanceQueryData(NoHooksLU): else: self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names + self.needed_locks[locking.LEVEL_NODEGROUP] = [] self.needed_locks[locking.LEVEL_NODE] = [] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE def DeclareLocks(self, level): - if self.op.use_locking and level == locking.LEVEL_NODE: - self._LockInstancesNodes() + if self.op.use_locking: + if level == locking.LEVEL_NODEGROUP: + owned_instances = self.owned_locks(locking.LEVEL_INSTANCE) + + # Lock all groups used by instances optimistically; this requires going + # via the node before it's locked, requiring verification later on + self.needed_locks[locking.LEVEL_NODEGROUP] = \ + frozenset(group_uuid + for instance_name in owned_instances + for group_uuid in + self.cfg.GetInstanceNodeGroups(instance_name)) + + elif level == locking.LEVEL_NODE: + self._LockInstancesNodes() def CheckPrereq(self): """Check prerequisites. @@ -10491,14 +11679,25 @@ class LUInstanceQueryData(NoHooksLU): This only checks the optional instance list against the existing names. """ + owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE)) + owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) + owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE)) + if self.wanted_names is None: assert self.op.use_locking, "Locking was not used" - self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE) + self.wanted_names = owned_instances - self.wanted_instances = \ - map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names)) + instances = dict(self.cfg.GetMultiInstanceInfo(self.wanted_names)) + + if self.op.use_locking: + _CheckInstancesNodeGroups(self.cfg, instances, owned_groups, owned_nodes, + None) + else: + assert not (owned_instances or owned_groups or owned_nodes) - def _ComputeBlockdevStatus(self, node, instance_name, dev): + self.wanted_instances = instances.values() + + def _ComputeBlockdevStatus(self, node, instance, dev): """Returns the status of a block device """ @@ -10511,7 +11710,7 @@ class LUInstanceQueryData(NoHooksLU): if result.offline: return None - result.Raise("Can't compute disk status for %s" % instance_name) + result.Raise("Can't compute disk status for %s" % instance.name) status = result.payload if status is None: @@ -10533,8 +11732,8 @@ class LUInstanceQueryData(NoHooksLU): snode = dev.logical_id[0] dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node, - instance.name, dev) - dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev) + instance, dev) + dev_sstatus = self._ComputeBlockdevStatus(snode, instance, dev) if dev.children: dev_children = map(compat.partial(self._ComputeDiskStatus, @@ -10561,9 +11760,17 @@ class LUInstanceQueryData(NoHooksLU): cluster = self.cfg.GetClusterInfo() - pri_nodes = self.cfg.GetMultiNodeInfo(i.primary_node - for i in self.wanted_instances) - for instance, (_, pnode) in zip(self.wanted_instances, pri_nodes): + node_names = itertools.chain(*(i.all_nodes for i in self.wanted_instances)) + nodes = dict(self.cfg.GetMultiNodeInfo(node_names)) + + groups = dict(self.cfg.GetMultiNodeGroupInfo(node.group + for node in nodes.values())) + + group2name_fn = lambda uuid: groups[uuid].name + + for instance in self.wanted_instances: + pnode = nodes[instance.primary_node] + if self.op.static or pnode.offline: remote_state = None if pnode.offline: @@ -10579,22 +11786,27 @@ class LUInstanceQueryData(NoHooksLU): if remote_info and "state" in remote_info: remote_state = "up" else: - remote_state = "down" - - if instance.admin_up: - config_state = "up" - else: - config_state = "down" + if instance.admin_state == constants.ADMINST_UP: + remote_state = "down" + else: + remote_state = instance.admin_state disks = map(compat.partial(self._ComputeDiskStatus, instance, None), instance.disks) + snodes_group_uuids = [nodes[snode_name].group + for snode_name in instance.secondary_nodes] + result[instance.name] = { "name": instance.name, - "config_state": config_state, + "config_state": instance.admin_state, "run_state": remote_state, "pnode": instance.primary_node, + "pnode_group_uuid": pnode.group, + "pnode_group_name": group2name_fn(pnode.group), "snodes": instance.secondary_nodes, + "snodes_group_uuids": snodes_group_uuids, + "snodes_group_names": map(group2name_fn, snodes_group_uuids), "os": instance.os, # this happens to be the same format used for hooks "nics": _NICListToTuple(self, instance.nics), @@ -10617,62 +11829,286 @@ class LUInstanceQueryData(NoHooksLU): return result -class LUInstanceSetParams(LogicalUnit): - """Modifies an instances's parameters. +def PrepareContainerMods(mods, private_fn): + """Prepares a list of container modifications by adding a private data field. + + @type mods: list of tuples; (operation, index, parameters) + @param mods: List of modifications + @type private_fn: callable or None + @param private_fn: Callable for constructing a private data field for a + modification + @rtype: list """ - HPATH = "instance-modify" - HTYPE = constants.HTYPE_INSTANCE - REQ_BGL = False + if private_fn is None: + fn = lambda: None + else: + fn = private_fn + + return [(op, idx, params, fn()) for (op, idx, params) in mods] + + +#: Type description for changes as returned by L{ApplyContainerMods}'s +#: callbacks +_TApplyContModsCbChanges = \ + ht.TMaybeListOf(ht.TAnd(ht.TIsLength(2), ht.TItems([ + ht.TNonEmptyString, + ht.TAny, + ]))) + + +def ApplyContainerMods(kind, container, chgdesc, mods, + create_fn, modify_fn, remove_fn): + """Applies descriptions in C{mods} to C{container}. + + @type kind: string + @param kind: One-word item description + @type container: list + @param container: Container to modify + @type chgdesc: None or list + @param chgdesc: List of applied changes + @type mods: list + @param mods: Modifications as returned by L{PrepareContainerMods} + @type create_fn: callable + @param create_fn: Callback for creating a new item (L{constants.DDM_ADD}); + receives absolute item index, parameters and private data object as added + by L{PrepareContainerMods}, returns tuple containing new item and changes + as list + @type modify_fn: callable + @param modify_fn: Callback for modifying an existing item + (L{constants.DDM_MODIFY}); receives absolute item index, item, parameters + and private data object as added by L{PrepareContainerMods}, returns + changes as list + @type remove_fn: callable + @param remove_fn: Callback on removing item; receives absolute item index, + item and private data object as added by L{PrepareContainerMods} + + """ + for (op, idx, params, private) in mods: + if idx == -1: + # Append + absidx = len(container) - 1 + elif idx < 0: + raise IndexError("Not accepting negative indices other than -1") + elif idx > len(container): + raise IndexError("Got %s index %s, but there are only %s" % + (kind, idx, len(container))) + else: + absidx = idx - def CheckArguments(self): + changes = None + + if op == constants.DDM_ADD: + # Calculate where item will be added + if idx == -1: + addidx = len(container) + else: + addidx = idx + + if create_fn is None: + item = params + else: + (item, changes) = create_fn(addidx, params, private) + + if idx == -1: + container.append(item) + else: + assert idx >= 0 + assert idx <= len(container) + # list.insert does so before the specified index + container.insert(idx, item) + else: + # Retrieve existing item + try: + item = container[absidx] + except IndexError: + raise IndexError("Invalid %s index %s" % (kind, idx)) + + if op == constants.DDM_REMOVE: + assert not params + + if remove_fn is not None: + remove_fn(absidx, item, private) + + changes = [("%s/%s" % (kind, absidx), "remove")] + + assert container[absidx] == item + del container[absidx] + elif op == constants.DDM_MODIFY: + if modify_fn is not None: + changes = modify_fn(absidx, item, params, private) + else: + raise errors.ProgrammerError("Unhandled operation '%s'" % op) + + assert _TApplyContModsCbChanges(changes) + + if not (chgdesc is None or changes is None): + chgdesc.extend(changes) + + +def _UpdateIvNames(base_index, disks): + """Updates the C{iv_name} attribute of disks. + + @type disks: list of L{objects.Disk} + + """ + for (idx, disk) in enumerate(disks): + disk.iv_name = "disk/%s" % (base_index + idx, ) + + +class _InstNicModPrivate: + """Data structure for network interface modifications. + + Used by L{LUInstanceSetParams}. + + """ + def __init__(self): + self.params = None + self.filled = None + + +class LUInstanceSetParams(LogicalUnit): + """Modifies an instances's parameters. + + """ + HPATH = "instance-modify" + HTYPE = constants.HTYPE_INSTANCE + REQ_BGL = False + + @staticmethod + def _UpgradeDiskNicMods(kind, mods, verify_fn): + assert ht.TList(mods) + assert not mods or len(mods[0]) in (2, 3) + + if mods and len(mods[0]) == 2: + result = [] + + addremove = 0 + for op, params in mods: + if op in (constants.DDM_ADD, constants.DDM_REMOVE): + result.append((op, -1, params)) + addremove += 1 + + if addremove > 1: + raise errors.OpPrereqError("Only one %s add or remove operation is" + " supported at a time" % kind, + errors.ECODE_INVAL) + else: + result.append((constants.DDM_MODIFY, op, params)) + + assert verify_fn(result) + else: + result = mods + + return result + + @staticmethod + def _CheckMods(kind, mods, key_types, item_fn): + """Ensures requested disk/NIC modifications are valid. + + """ + for (op, _, params) in mods: + assert ht.TDict(params) + + utils.ForceDictType(params, key_types) + + if op == constants.DDM_REMOVE: + if params: + raise errors.OpPrereqError("No settings should be passed when" + " removing a %s" % kind, + errors.ECODE_INVAL) + elif op in (constants.DDM_ADD, constants.DDM_MODIFY): + item_fn(op, params) + else: + raise errors.ProgrammerError("Unhandled operation '%s'" % op) + + @staticmethod + def _VerifyDiskModification(op, params): + """Verifies a disk modification. + + """ + if op == constants.DDM_ADD: + mode = params.setdefault(constants.IDISK_MODE, constants.DISK_RDWR) + if mode not in constants.DISK_ACCESS_SET: + raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode, + errors.ECODE_INVAL) + + size = params.get(constants.IDISK_SIZE, None) + if size is None: + raise errors.OpPrereqError("Required disk parameter '%s' missing" % + constants.IDISK_SIZE, errors.ECODE_INVAL) + + try: + size = int(size) + except (TypeError, ValueError), err: + raise errors.OpPrereqError("Invalid disk size parameter: %s" % err, + errors.ECODE_INVAL) + + params[constants.IDISK_SIZE] = size + + elif op == constants.DDM_MODIFY and constants.IDISK_SIZE in params: + raise errors.OpPrereqError("Disk size change not possible, use" + " grow-disk", errors.ECODE_INVAL) + + @staticmethod + def _VerifyNicModification(op, params): + """Verifies a network interface modification. + + """ + if op in (constants.DDM_ADD, constants.DDM_MODIFY): + ip = params.get(constants.INIC_IP, None) + if ip is None: + pass + elif ip.lower() == constants.VALUE_NONE: + params[constants.INIC_IP] = None + elif not netutils.IPAddress.IsValid(ip): + raise errors.OpPrereqError("Invalid IP address '%s'" % ip, + errors.ECODE_INVAL) + + bridge = params.get("bridge", None) + link = params.get(constants.INIC_LINK, None) + if bridge and link: + raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'" + " at the same time", errors.ECODE_INVAL) + elif bridge and bridge.lower() == constants.VALUE_NONE: + params["bridge"] = None + elif link and link.lower() == constants.VALUE_NONE: + params[constants.INIC_LINK] = None + + if op == constants.DDM_ADD: + macaddr = params.get(constants.INIC_MAC, None) + if macaddr is None: + params[constants.INIC_MAC] = constants.VALUE_AUTO + + if constants.INIC_MAC in params: + macaddr = params[constants.INIC_MAC] + if macaddr not in (constants.VALUE_AUTO, constants.VALUE_GENERATE): + macaddr = utils.NormalizeAndValidateMac(macaddr) + + if op == constants.DDM_MODIFY and macaddr == constants.VALUE_AUTO: + raise errors.OpPrereqError("'auto' is not a valid MAC address when" + " modifying an existing NIC", + errors.ECODE_INVAL) + + def CheckArguments(self): if not (self.op.nics or self.op.disks or self.op.disk_template or - self.op.hvparams or self.op.beparams or self.op.os_name): + self.op.hvparams or self.op.beparams or self.op.os_name or + self.op.offline is not None or self.op.runtime_mem): raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL) if self.op.hvparams: _CheckGlobalHvParams(self.op.hvparams) - # Disk validation - disk_addremove = 0 - for disk_op, disk_dict in self.op.disks: - utils.ForceDictType(disk_dict, constants.IDISK_PARAMS_TYPES) - if disk_op == constants.DDM_REMOVE: - disk_addremove += 1 - continue - elif disk_op == constants.DDM_ADD: - disk_addremove += 1 - else: - if not isinstance(disk_op, int): - raise errors.OpPrereqError("Invalid disk index", errors.ECODE_INVAL) - if not isinstance(disk_dict, dict): - msg = "Invalid disk value: expected dict, got '%s'" % disk_dict - raise errors.OpPrereqError(msg, errors.ECODE_INVAL) - - if disk_op == constants.DDM_ADD: - mode = disk_dict.setdefault(constants.IDISK_MODE, constants.DISK_RDWR) - if mode not in constants.DISK_ACCESS_SET: - raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode, - errors.ECODE_INVAL) - size = disk_dict.get(constants.IDISK_SIZE, None) - if size is None: - raise errors.OpPrereqError("Required disk parameter size missing", - errors.ECODE_INVAL) - try: - size = int(size) - except (TypeError, ValueError), err: - raise errors.OpPrereqError("Invalid disk size parameter: %s" % - str(err), errors.ECODE_INVAL) - disk_dict[constants.IDISK_SIZE] = size - else: - # modification of disk - if constants.IDISK_SIZE in disk_dict: - raise errors.OpPrereqError("Disk size change not possible, use" - " grow-disk", errors.ECODE_INVAL) + self.op.disks = \ + self._UpgradeDiskNicMods("disk", self.op.disks, + opcodes.OpInstanceSetParams.TestDiskModifications) + self.op.nics = \ + self._UpgradeDiskNicMods("NIC", self.op.nics, + opcodes.OpInstanceSetParams.TestNicModifications) - if disk_addremove > 1: - raise errors.OpPrereqError("Only one disk add or remove operation" - " supported at a time", errors.ECODE_INVAL) + # Check disk modifications + self._CheckMods("disk", self.op.disks, constants.IDISK_PARAMS_TYPES, + self._VerifyDiskModification) if self.op.disks and self.op.disk_template is not None: raise errors.OpPrereqError("Disk template conversion and other disk" @@ -10686,72 +12122,29 @@ class LUInstanceSetParams(LogicalUnit): " one requires specifying a secondary node", errors.ECODE_INVAL) - # NIC validation - nic_addremove = 0 - for nic_op, nic_dict in self.op.nics: - utils.ForceDictType(nic_dict, constants.INIC_PARAMS_TYPES) - if nic_op == constants.DDM_REMOVE: - nic_addremove += 1 - continue - elif nic_op == constants.DDM_ADD: - nic_addremove += 1 - else: - if not isinstance(nic_op, int): - raise errors.OpPrereqError("Invalid nic index", errors.ECODE_INVAL) - if not isinstance(nic_dict, dict): - msg = "Invalid nic value: expected dict, got '%s'" % nic_dict - raise errors.OpPrereqError(msg, errors.ECODE_INVAL) - - # nic_dict should be a dict - nic_ip = nic_dict.get(constants.INIC_IP, None) - if nic_ip is not None: - if nic_ip.lower() == constants.VALUE_NONE: - nic_dict[constants.INIC_IP] = None - else: - if not netutils.IPAddress.IsValid(nic_ip): - raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip, - errors.ECODE_INVAL) - - nic_bridge = nic_dict.get("bridge", None) - nic_link = nic_dict.get(constants.INIC_LINK, None) - if nic_bridge and nic_link: - raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'" - " at the same time", errors.ECODE_INVAL) - elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE: - nic_dict["bridge"] = None - elif nic_link and nic_link.lower() == constants.VALUE_NONE: - nic_dict[constants.INIC_LINK] = None - - if nic_op == constants.DDM_ADD: - nic_mac = nic_dict.get(constants.INIC_MAC, None) - if nic_mac is None: - nic_dict[constants.INIC_MAC] = constants.VALUE_AUTO - - if constants.INIC_MAC in nic_dict: - nic_mac = nic_dict[constants.INIC_MAC] - if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE): - nic_mac = utils.NormalizeAndValidateMac(nic_mac) - - if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO: - raise errors.OpPrereqError("'auto' is not a valid MAC address when" - " modifying an existing nic", - errors.ECODE_INVAL) - - if nic_addremove > 1: - raise errors.OpPrereqError("Only one NIC add or remove operation" - " supported at a time", errors.ECODE_INVAL) + # Check NIC modifications + self._CheckMods("NIC", self.op.nics, constants.INIC_PARAMS_TYPES, + self._VerifyNicModification) def ExpandNames(self): self._ExpandAndLockInstance() + # Can't even acquire node locks in shared mode as upcoming changes in + # Ganeti 2.6 will start to modify the node object on disk conversion self.needed_locks[locking.LEVEL_NODE] = [] + self.needed_locks[locking.LEVEL_NODE_RES] = [] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE def DeclareLocks(self, level): + # TODO: Acquire group lock in shared mode (disk parameters) if level == locking.LEVEL_NODE: self._LockInstancesNodes() if self.op.disk_template and self.op.remote_node: self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node) self.needed_locks[locking.LEVEL_NODE].append(self.op.remote_node) + elif level == locking.LEVEL_NODE_RES and self.op.disk_template: + # Copy node locks + self.needed_locks[locking.LEVEL_NODE_RES] = \ + self.needed_locks[locking.LEVEL_NODE][:] def BuildHooksEnv(self): """Build hooks env. @@ -10760,48 +12153,31 @@ class LUInstanceSetParams(LogicalUnit): """ args = dict() - if constants.BE_MEMORY in self.be_new: - args["memory"] = self.be_new[constants.BE_MEMORY] + if constants.BE_MINMEM in self.be_new: + args["minmem"] = self.be_new[constants.BE_MINMEM] + if constants.BE_MAXMEM in self.be_new: + args["maxmem"] = self.be_new[constants.BE_MAXMEM] if constants.BE_VCPUS in self.be_new: args["vcpus"] = self.be_new[constants.BE_VCPUS] # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk # information at all. - if self.op.nics: - args["nics"] = [] - nic_override = dict(self.op.nics) - for idx, nic in enumerate(self.instance.nics): - if idx in nic_override: - this_nic_override = nic_override[idx] - else: - this_nic_override = {} - if constants.INIC_IP in this_nic_override: - ip = this_nic_override[constants.INIC_IP] - else: - ip = nic.ip - if constants.INIC_MAC in this_nic_override: - mac = this_nic_override[constants.INIC_MAC] - else: - mac = nic.mac - if idx in self.nic_pnew: - nicparams = self.nic_pnew[idx] - else: - nicparams = self.cluster.SimpleFillNIC(nic.nicparams) - mode = nicparams[constants.NIC_MODE] - link = nicparams[constants.NIC_LINK] - args["nics"].append((ip, mac, mode, link)) - if constants.DDM_ADD in nic_override: - ip = nic_override[constants.DDM_ADD].get(constants.INIC_IP, None) - mac = nic_override[constants.DDM_ADD][constants.INIC_MAC] - nicparams = self.nic_pnew[constants.DDM_ADD] + + if self._new_nics is not None: + nics = [] + + for nic in self._new_nics: + nicparams = self.cluster.SimpleFillNIC(nic.nicparams) mode = nicparams[constants.NIC_MODE] link = nicparams[constants.NIC_LINK] - args["nics"].append((ip, mac, mode, link)) - elif constants.DDM_REMOVE in nic_override: - del args["nics"][-1] + nics.append((nic.ip, nic.mac, mode, link)) + + args["nics"] = nics env = _BuildInstanceHookEnvByObject(self, self.instance, override=args) if self.op.disk_template: env["NEW_DISK_TEMPLATE"] = self.op.disk_template + if self.op.runtime_mem: + env["RUNTIME_MEMORY"] = self.op.runtime_mem return env @@ -10812,6 +12188,61 @@ class LUInstanceSetParams(LogicalUnit): nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) return (nl, nl) + def _PrepareNicModification(self, params, private, old_ip, old_params, + cluster, pnode): + update_params_dict = dict([(key, params[key]) + for key in constants.NICS_PARAMETERS + if key in params]) + + if "bridge" in params: + update_params_dict[constants.NIC_LINK] = params["bridge"] + + new_params = _GetUpdatedParams(old_params, update_params_dict) + utils.ForceDictType(new_params, constants.NICS_PARAMETER_TYPES) + + new_filled_params = cluster.SimpleFillNIC(new_params) + objects.NIC.CheckParameterSyntax(new_filled_params) + + new_mode = new_filled_params[constants.NIC_MODE] + if new_mode == constants.NIC_MODE_BRIDGED: + bridge = new_filled_params[constants.NIC_LINK] + msg = self.rpc.call_bridges_exist(pnode, [bridge]).fail_msg + if msg: + msg = "Error checking bridges on node '%s': %s" % (pnode, msg) + if self.op.force: + self.warn.append(msg) + else: + raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON) + + elif new_mode == constants.NIC_MODE_ROUTED: + ip = params.get(constants.INIC_IP, old_ip) + if ip is None: + raise errors.OpPrereqError("Cannot set the NIC IP address to None" + " on a routed NIC", errors.ECODE_INVAL) + + if constants.INIC_MAC in params: + mac = params[constants.INIC_MAC] + if mac is None: + raise errors.OpPrereqError("Cannot unset the NIC MAC address", + errors.ECODE_INVAL) + elif mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE): + # otherwise generate the MAC address + params[constants.INIC_MAC] = \ + self.cfg.GenerateMAC(self.proc.GetECId()) + else: + # or validate/reserve the current one + try: + self.cfg.ReserveMAC(mac, self.proc.GetECId()) + except errors.ReservationError: + raise errors.OpPrereqError("MAC address '%s' already in use" + " in cluster" % mac, + errors.ECODE_NOTUNIQUE) + + private.params = new_params + private.filled = new_filled_params + + return (None, None) + def CheckPrereq(self): """Check prerequisites. @@ -10826,6 +12257,12 @@ class LUInstanceSetParams(LogicalUnit): "Cannot retrieve locked instance %s" % self.op.instance_name pnode = instance.primary_node nodelist = list(instance.all_nodes) + pnode_info = self.cfg.GetNodeInfo(pnode) + self.diskparams = self.cfg.GetInstanceDiskParams(instance) + + # Prepare disk/NIC modifications + self.diskmod = PrepareContainerMods(self.op.disks, None) + self.nicmod = PrepareContainerMods(self.op.nics, _InstNicModPrivate) # OS change if self.op.os_name and not self.op.force: @@ -10835,6 +12272,9 @@ class LUInstanceSetParams(LogicalUnit): else: instance_os = instance.os + assert not (self.op.disk_template and self.op.disks), \ + "Can't modify disk template and apply disk changes at the same time" + if self.op.disk_template: if instance.disk_template == self.op.disk_template: raise errors.OpPrereqError("Instance already has disk template %s" % @@ -10846,7 +12286,8 @@ class LUInstanceSetParams(LogicalUnit): " %s to %s" % (instance.disk_template, self.op.disk_template), errors.ECODE_INVAL) - _CheckInstanceDown(self, instance, "cannot change disk template") + _CheckInstanceState(self, instance, INSTANCE_DOWN, + msg="cannot change disk template") if self.op.disk_template in constants.DTS_INT_MIRROR: if self.op.remote_node == pnode: raise errors.OpPrereqError("Given new secondary node %s is the same" @@ -10862,6 +12303,17 @@ class LUInstanceSetParams(LogicalUnit): required = _ComputeDiskSizePerVG(self.op.disk_template, disks) _CheckNodesFreeDiskPerVG(self, [self.op.remote_node], required) + snode_info = self.cfg.GetNodeInfo(self.op.remote_node) + snode_group = self.cfg.GetNodeGroup(snode_info.group) + ipolicy = _CalculateGroupIPolicy(cluster, snode_group) + _CheckTargetNodeIPolicy(self, ipolicy, instance, snode_info, + ignore=self.op.ignore_ipolicy) + if pnode_info.group != snode_info.group: + self.LogWarning("The primary and secondary nodes are in two" + " different node groups; the disk parameters" + " from the first disk's node group will be" + " used") + # hvparams processing if self.op.hvparams: hv_type = instance.hypervisor @@ -10872,23 +12324,54 @@ class LUInstanceSetParams(LogicalUnit): # local check hypervisor.GetHypervisor(hv_type).CheckParameterSyntax(hv_new) _CheckHVParams(self, nodelist, instance.hypervisor, hv_new) - self.hv_new = hv_new # the new actual values + self.hv_proposed = self.hv_new = hv_new # the new actual values self.hv_inst = i_hvdict # the new dict (without defaults) else: + self.hv_proposed = cluster.SimpleFillHV(instance.hypervisor, instance.os, + instance.hvparams) self.hv_new = self.hv_inst = {} # beparams processing if self.op.beparams: i_bedict = _GetUpdatedParams(instance.beparams, self.op.beparams, use_none=True) + objects.UpgradeBeParams(i_bedict) utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES) be_new = cluster.SimpleFillBE(i_bedict) - self.be_new = be_new # the new actual values + self.be_proposed = self.be_new = be_new # the new actual values self.be_inst = i_bedict # the new dict (without defaults) else: self.be_new = self.be_inst = {} + self.be_proposed = cluster.SimpleFillBE(instance.beparams) be_old = cluster.FillBE(instance) + # CPU param validation -- checking every time a paramtere is + # changed to cover all cases where either CPU mask or vcpus have + # changed + if (constants.BE_VCPUS in self.be_proposed and + constants.HV_CPU_MASK in self.hv_proposed): + cpu_list = \ + utils.ParseMultiCpuMask(self.hv_proposed[constants.HV_CPU_MASK]) + # Verify mask is consistent with number of vCPUs. Can skip this + # test if only 1 entry in the CPU mask, which means same mask + # is applied to all vCPUs. + if (len(cpu_list) > 1 and + len(cpu_list) != self.be_proposed[constants.BE_VCPUS]): + raise errors.OpPrereqError("Number of vCPUs [%d] does not match the" + " CPU mask [%s]" % + (self.be_proposed[constants.BE_VCPUS], + self.hv_proposed[constants.HV_CPU_MASK]), + errors.ECODE_INVAL) + + # Only perform this test if a new CPU mask is given + if constants.HV_CPU_MASK in self.hv_new: + # Calculate the largest CPU number requested + max_requested_cpu = max(map(max, cpu_list)) + # Check that all of the instance's nodes have enough physical CPUs to + # satisfy the requested CPU mask + _CheckNodesPhysicalCPUs(self, instance.all_nodes, + max_requested_cpu + 1, instance.hypervisor) + # osparams processing if self.op.osparams: i_osdict = _GetUpdatedParams(instance.osparams, self.op.osparams) @@ -10899,8 +12382,9 @@ class LUInstanceSetParams(LogicalUnit): self.warn = [] - if (constants.BE_MEMORY in self.op.beparams and not self.op.force and - be_new[constants.BE_MEMORY] > be_old[constants.BE_MEMORY]): + #TODO(dynmem): do the appropriate check involving MINMEM + if (constants.BE_MAXMEM in self.op.beparams and not self.op.force and + be_new[constants.BE_MAXMEM] > be_old[constants.BE_MAXMEM]): mem_check_list = [pnode] if be_new[constants.BE_AUTO_BALANCE]: # either we changed auto_balance to yes or it was from before @@ -10908,34 +12392,39 @@ class LUInstanceSetParams(LogicalUnit): instance_info = self.rpc.call_instance_info(pnode, instance.name, instance.hypervisor) nodeinfo = self.rpc.call_node_info(mem_check_list, None, - instance.hypervisor) + [instance.hypervisor]) pninfo = nodeinfo[pnode] msg = pninfo.fail_msg if msg: # Assume the primary node is unreachable and go ahead self.warn.append("Can't get info from primary node %s: %s" % (pnode, msg)) - elif not isinstance(pninfo.payload.get("memory_free", None), int): - self.warn.append("Node data from primary node %s doesn't contain" - " free memory information" % pnode) - elif instance_info.fail_msg: - self.warn.append("Can't get instance runtime information: %s" % - instance_info.fail_msg) else: - if instance_info.payload: - current_mem = int(instance_info.payload["memory"]) + (_, _, (pnhvinfo, )) = pninfo.payload + if not isinstance(pnhvinfo.get("memory_free", None), int): + self.warn.append("Node data from primary node %s doesn't contain" + " free memory information" % pnode) + elif instance_info.fail_msg: + self.warn.append("Can't get instance runtime information: %s" % + instance_info.fail_msg) else: - # Assume instance not running - # (there is a slight race condition here, but it's not very probable, - # and we have no other way to check) - current_mem = 0 - miss_mem = (be_new[constants.BE_MEMORY] - current_mem - - pninfo.payload["memory_free"]) - if miss_mem > 0: - raise errors.OpPrereqError("This change will prevent the instance" - " from starting, due to %d MB of memory" - " missing on its primary node" % miss_mem, - errors.ECODE_NORES) + if instance_info.payload: + current_mem = int(instance_info.payload["memory"]) + else: + # Assume instance not running + # (there is a slight race condition here, but it's not very + # probable, and we have no other way to check) + # TODO: Describe race condition + current_mem = 0 + #TODO(dynmem): do the appropriate check involving MINMEM + miss_mem = (be_new[constants.BE_MAXMEM] - current_mem - + pnhvinfo["memory_free"]) + if miss_mem > 0: + raise errors.OpPrereqError("This change will prevent the instance" + " from starting, due to %d MB of memory" + " missing on its primary node" % + miss_mem, + errors.ECODE_NORES) if be_new[constants.BE_AUTO_BALANCE]: for node, nres in nodeinfo.items(): @@ -10943,119 +12432,92 @@ class LUInstanceSetParams(LogicalUnit): continue nres.Raise("Can't get info from secondary node %s" % node, prereq=True, ecode=errors.ECODE_STATE) - if not isinstance(nres.payload.get("memory_free", None), int): + (_, _, (nhvinfo, )) = nres.payload + if not isinstance(nhvinfo.get("memory_free", None), int): raise errors.OpPrereqError("Secondary node %s didn't return free" " memory information" % node, errors.ECODE_STATE) - elif be_new[constants.BE_MEMORY] > nres.payload["memory_free"]: + #TODO(dynmem): do the appropriate check involving MINMEM + elif be_new[constants.BE_MAXMEM] > nhvinfo["memory_free"]: raise errors.OpPrereqError("This change will prevent the instance" " from failover to its secondary node" " %s, due to not enough memory" % node, errors.ECODE_STATE) - # NIC processing - self.nic_pnew = {} - self.nic_pinst = {} - for nic_op, nic_dict in self.op.nics: - if nic_op == constants.DDM_REMOVE: - if not instance.nics: - raise errors.OpPrereqError("Instance has no NICs, cannot remove", - errors.ECODE_INVAL) - continue - if nic_op != constants.DDM_ADD: - # an existing nic - if not instance.nics: - raise errors.OpPrereqError("Invalid NIC index %s, instance has" - " no NICs" % nic_op, - errors.ECODE_INVAL) - if nic_op < 0 or nic_op >= len(instance.nics): - raise errors.OpPrereqError("Invalid NIC index %s, valid values" - " are 0 to %d" % - (nic_op, len(instance.nics) - 1), - errors.ECODE_INVAL) - old_nic_params = instance.nics[nic_op].nicparams - old_nic_ip = instance.nics[nic_op].ip - else: - old_nic_params = {} - old_nic_ip = None - - update_params_dict = dict([(key, nic_dict[key]) - for key in constants.NICS_PARAMETERS - if key in nic_dict]) - - if "bridge" in nic_dict: - update_params_dict[constants.NIC_LINK] = nic_dict["bridge"] - - new_nic_params = _GetUpdatedParams(old_nic_params, - update_params_dict) - utils.ForceDictType(new_nic_params, constants.NICS_PARAMETER_TYPES) - new_filled_nic_params = cluster.SimpleFillNIC(new_nic_params) - objects.NIC.CheckParameterSyntax(new_filled_nic_params) - self.nic_pinst[nic_op] = new_nic_params - self.nic_pnew[nic_op] = new_filled_nic_params - new_nic_mode = new_filled_nic_params[constants.NIC_MODE] - - if new_nic_mode == constants.NIC_MODE_BRIDGED: - nic_bridge = new_filled_nic_params[constants.NIC_LINK] - msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg - if msg: - msg = "Error checking bridges on node %s: %s" % (pnode, msg) - if self.op.force: - self.warn.append(msg) - else: - raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON) - if new_nic_mode == constants.NIC_MODE_ROUTED: - if constants.INIC_IP in nic_dict: - nic_ip = nic_dict[constants.INIC_IP] - else: - nic_ip = old_nic_ip - if nic_ip is None: - raise errors.OpPrereqError("Cannot set the nic ip to None" - " on a routed nic", errors.ECODE_INVAL) - if constants.INIC_MAC in nic_dict: - nic_mac = nic_dict[constants.INIC_MAC] - if nic_mac is None: - raise errors.OpPrereqError("Cannot set the nic mac to None", - errors.ECODE_INVAL) - elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE): - # otherwise generate the mac - nic_dict[constants.INIC_MAC] = \ - self.cfg.GenerateMAC(self.proc.GetECId()) - else: - # or validate/reserve the current one - try: - self.cfg.ReserveMAC(nic_mac, self.proc.GetECId()) - except errors.ReservationError: - raise errors.OpPrereqError("MAC address %s already in use" - " in cluster" % nic_mac, - errors.ECODE_NOTUNIQUE) + if self.op.runtime_mem: + remote_info = self.rpc.call_instance_info(instance.primary_node, + instance.name, + instance.hypervisor) + remote_info.Raise("Error checking node %s" % instance.primary_node) + if not remote_info.payload: # not running already + raise errors.OpPrereqError("Instance %s is not running" % instance.name, + errors.ECODE_STATE) + + current_memory = remote_info.payload["memory"] + if (not self.op.force and + (self.op.runtime_mem > self.be_proposed[constants.BE_MAXMEM] or + self.op.runtime_mem < self.be_proposed[constants.BE_MINMEM])): + raise errors.OpPrereqError("Instance %s must have memory between %d" + " and %d MB of memory unless --force is" + " given" % (instance.name, + self.be_proposed[constants.BE_MINMEM], + self.be_proposed[constants.BE_MAXMEM]), + errors.ECODE_INVAL) + + if self.op.runtime_mem > current_memory: + _CheckNodeFreeMemory(self, instance.primary_node, + "ballooning memory for instance %s" % + instance.name, + self.op.memory - current_memory, + instance.hypervisor) - # DISK processing if self.op.disks and instance.disk_template == constants.DT_DISKLESS: raise errors.OpPrereqError("Disk operations not supported for" " diskless instances", errors.ECODE_INVAL) - for disk_op, _ in self.op.disks: - if disk_op == constants.DDM_REMOVE: - if len(instance.disks) == 1: - raise errors.OpPrereqError("Cannot remove the last disk of" - " an instance", errors.ECODE_INVAL) - _CheckInstanceDown(self, instance, "cannot remove disks") - - if (disk_op == constants.DDM_ADD and - len(instance.disks) >= constants.MAX_DISKS): - raise errors.OpPrereqError("Instance has too many disks (%d), cannot" - " add more" % constants.MAX_DISKS, - errors.ECODE_STATE) - if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE): - # an existing disk - if disk_op < 0 or disk_op >= len(instance.disks): - raise errors.OpPrereqError("Invalid disk index %s, valid values" - " are 0 to %d" % - (disk_op, len(instance.disks)), - errors.ECODE_INVAL) - return + def _PrepareNicCreate(_, params, private): + return self._PrepareNicModification(params, private, None, {}, + cluster, pnode) + + def _PrepareNicMod(_, nic, params, private): + return self._PrepareNicModification(params, private, nic.ip, + nic.nicparams, cluster, pnode) + + # Verify NIC changes (operating on copy) + nics = instance.nics[:] + ApplyContainerMods("NIC", nics, None, self.nicmod, + _PrepareNicCreate, _PrepareNicMod, None) + if len(nics) > constants.MAX_NICS: + raise errors.OpPrereqError("Instance has too many network interfaces" + " (%d), cannot add more" % constants.MAX_NICS, + errors.ECODE_STATE) + + # Verify disk changes (operating on a copy) + disks = instance.disks[:] + ApplyContainerMods("disk", disks, None, self.diskmod, None, None, None) + if len(disks) > constants.MAX_DISKS: + raise errors.OpPrereqError("Instance has too many disks (%d), cannot add" + " more" % constants.MAX_DISKS, + errors.ECODE_STATE) + + if self.op.offline is not None: + if self.op.offline: + msg = "can't change to offline" + else: + msg = "can't change to online" + _CheckInstanceState(self, instance, CAN_CHANGE_INSTANCE_OFFLINE, msg=msg) + + # Pre-compute NIC changes (necessary to use result in hooks) + self._nic_chgdesc = [] + if self.nicmod: + # Operate on copies as this is still in prereq + nics = [nic.Copy() for nic in instance.nics] + ApplyContainerMods("NIC", nics, self._nic_chgdesc, self.nicmod, + self._CreateNewNic, self._ApplyNicMods, None) + self._new_nics = nics + else: + self._new_nics = None def _ConvertPlainToDrbd(self, feedback_fn): """Converts an instance from plain to drbd. @@ -11066,15 +12528,18 @@ class LUInstanceSetParams(LogicalUnit): pnode = instance.primary_node snode = self.op.remote_node + assert instance.disk_template == constants.DT_PLAIN + # create a fake disk info for _GenerateDiskTemplate disk_info = [{constants.IDISK_SIZE: d.size, constants.IDISK_MODE: d.mode, constants.IDISK_VG: d.logical_id[0]} for d in instance.disks] new_disks = _GenerateDiskTemplate(self, self.op.disk_template, instance.name, pnode, [snode], - disk_info, None, None, 0, feedback_fn) + disk_info, None, None, 0, feedback_fn, + self.diskparams) info = _GetInstanceInfoText(instance) - feedback_fn("Creating aditional volumes...") + feedback_fn("Creating additional volumes...") # first, create the missing data and meta devices for disk in new_disks: # unfortunately this is... not too nice @@ -11102,6 +12567,9 @@ class LUInstanceSetParams(LogicalUnit): instance.disks = new_disks self.cfg.Update(instance, feedback_fn) + # Release node locks while waiting for sync + _ReleaseLocks(self, locking.LEVEL_NODE) + # disks are created, waiting for sync disk_abort = not _WaitForSync(self, instance, oneshot=not self.op.wait_for_sync) @@ -11109,12 +12577,17 @@ class LUInstanceSetParams(LogicalUnit): raise errors.OpExecError("There are some degraded disks for" " this instance, please cleanup manually") + # Node resource locks will be released by caller + def _ConvertDrbdToPlain(self, feedback_fn): """Converts an instance from drbd to plain. """ instance = self.instance + assert len(instance.secondary_nodes) == 1 + assert instance.disk_template == constants.DT_DRBD8 + pnode = instance.primary_node snode = instance.secondary_nodes[0] feedback_fn("Converting template to plain") @@ -11127,11 +12600,20 @@ class LUInstanceSetParams(LogicalUnit): child.size = parent.size child.mode = parent.mode + # this is a DRBD disk, return its port to the pool + # NOTE: this must be done right before the call to cfg.Update! + for disk in old_disks: + tcp_port = disk.logical_id[2] + self.cfg.AddTcpUdpPort(tcp_port) + # update instance structure instance.disks = new_disks instance.disk_template = constants.DT_PLAIN self.cfg.Update(instance, feedback_fn) + # Release locks in case removing disks takes a while + _ReleaseLocks(self, locking.LEVEL_NODE) + feedback_fn("Removing volumes on the secondary node...") for disk in old_disks: self.cfg.SetDiskID(disk, snode) @@ -11149,10 +12631,104 @@ class LUInstanceSetParams(LogicalUnit): self.LogWarning("Could not remove metadata for disk %d on node %s," " continuing anyway: %s", idx, pnode, msg) - # this is a DRBD disk, return its port to the pool - for disk in old_disks: - tcp_port = disk.logical_id[2] - self.cfg.AddTcpUdpPort(tcp_port) + def _CreateNewDisk(self, idx, params, _): + """Creates a new disk. + + """ + instance = self.instance + + # add a new disk + if instance.disk_template in constants.DTS_FILEBASED: + (file_driver, file_path) = instance.disks[0].logical_id + file_path = os.path.dirname(file_path) + else: + file_driver = file_path = None + + disk = \ + _GenerateDiskTemplate(self, instance.disk_template, instance.name, + instance.primary_node, instance.secondary_nodes, + [params], file_path, file_driver, idx, + self.Log, self.diskparams)[0] + + info = _GetInstanceInfoText(instance) + + logging.info("Creating volume %s for instance %s", + disk.iv_name, instance.name) + # Note: this needs to be kept in sync with _CreateDisks + #HARDCODE + for node in instance.all_nodes: + f_create = (node == instance.primary_node) + try: + _CreateBlockDev(self, node, instance, disk, f_create, info, f_create) + except errors.OpExecError, err: + self.LogWarning("Failed to create volume %s (%s) on node '%s': %s", + disk.iv_name, disk, node, err) + + return (disk, [ + ("disk/%d" % idx, "add:size=%s,mode=%s" % (disk.size, disk.mode)), + ]) + + @staticmethod + def _ModifyDisk(idx, disk, params, _): + """Modifies a disk. + + """ + disk.mode = params[constants.IDISK_MODE] + + return [ + ("disk.mode/%d" % idx, disk.mode), + ] + + def _RemoveDisk(self, idx, root, _): + """Removes a disk. + + """ + for node, disk in root.ComputeNodeTree(self.instance.primary_node): + self.cfg.SetDiskID(disk, node) + msg = self.rpc.call_blockdev_remove(node, disk).fail_msg + if msg: + self.LogWarning("Could not remove disk/%d on node '%s': %s," + " continuing anyway", idx, node, msg) + + # if this is a DRBD disk, return its port to the pool + if root.dev_type in constants.LDS_DRBD: + self.cfg.AddTcpUdpPort(root.logical_id[2]) + + @staticmethod + def _CreateNewNic(idx, params, private): + """Creates data structure for a new network interface. + + """ + mac = params[constants.INIC_MAC] + ip = params.get(constants.INIC_IP, None) + nicparams = private.params + + return (objects.NIC(mac=mac, ip=ip, nicparams=nicparams), [ + ("nic.%d" % idx, + "add:mac=%s,ip=%s,mode=%s,link=%s" % + (mac, ip, private.filled[constants.NIC_MODE], + private.filled[constants.NIC_LINK])), + ]) + + @staticmethod + def _ApplyNicMods(idx, nic, params, private): + """Modifies a network interface. + + """ + changes = [] + + for key in [constants.INIC_MAC, constants.INIC_IP]: + if key in params: + changes.append(("nic.%s/%d" % (key, idx), params[key])) + setattr(nic, key, params[key]) + + if private.params: + nic.nicparams = private.params + + for (key, val) in params.items(): + changes.append(("nic.%s/%d" % (key, idx), val)) + + return changes def Exec(self, feedback_fn): """Modifies an instance. @@ -11162,71 +12738,41 @@ class LUInstanceSetParams(LogicalUnit): """ # Process here the warnings from CheckPrereq, as we don't have a # feedback_fn there. + # TODO: Replace with self.LogWarning for warn in self.warn: feedback_fn("WARNING: %s" % warn) + assert ((self.op.disk_template is None) ^ + bool(self.owned_locks(locking.LEVEL_NODE_RES))), \ + "Not owning any node resource locks" + result = [] instance = self.instance - # disk changes - for disk_op, disk_dict in self.op.disks: - if disk_op == constants.DDM_REMOVE: - # remove the last disk - device = instance.disks.pop() - device_idx = len(instance.disks) - for node, disk in device.ComputeNodeTree(instance.primary_node): - self.cfg.SetDiskID(disk, node) - msg = self.rpc.call_blockdev_remove(node, disk).fail_msg - if msg: - self.LogWarning("Could not remove disk/%d on node %s: %s," - " continuing anyway", device_idx, node, msg) - result.append(("disk/%d" % device_idx, "remove")) - - # if this is a DRBD disk, return its port to the pool - if device.dev_type in constants.LDS_DRBD: - tcp_port = device.logical_id[2] - self.cfg.AddTcpUdpPort(tcp_port) - elif disk_op == constants.DDM_ADD: - # add a new disk - if instance.disk_template in (constants.DT_FILE, - constants.DT_SHARED_FILE): - file_driver, file_path = instance.disks[0].logical_id - file_path = os.path.dirname(file_path) - else: - file_driver = file_path = None - disk_idx_base = len(instance.disks) - new_disk = _GenerateDiskTemplate(self, - instance.disk_template, - instance.name, instance.primary_node, - instance.secondary_nodes, - [disk_dict], - file_path, - file_driver, - disk_idx_base, feedback_fn)[0] - instance.disks.append(new_disk) - info = _GetInstanceInfoText(instance) - - logging.info("Creating volume %s for instance %s", - new_disk.iv_name, instance.name) - # Note: this needs to be kept in sync with _CreateDisks - #HARDCODE - for node in instance.all_nodes: - f_create = node == instance.primary_node - try: - _CreateBlockDev(self, node, instance, new_disk, - f_create, info, f_create) - except errors.OpExecError, err: - self.LogWarning("Failed to create volume %s (%s) on" - " node %s: %s", - new_disk.iv_name, new_disk, node, err) - result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" % - (new_disk.size, new_disk.mode))) - else: - # change a given disk - instance.disks[disk_op].mode = disk_dict[constants.IDISK_MODE] - result.append(("disk.mode/%d" % disk_op, - disk_dict[constants.IDISK_MODE])) + + # runtime memory + if self.op.runtime_mem: + rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node, + instance, + self.op.runtime_mem) + rpcres.Raise("Cannot modify instance runtime memory") + result.append(("runtime_memory", self.op.runtime_mem)) + + # Apply disk changes + ApplyContainerMods("disk", instance.disks, result, self.diskmod, + self._CreateNewDisk, self._ModifyDisk, self._RemoveDisk) + _UpdateIvNames(0, instance.disks) if self.op.disk_template: + if __debug__: + check_nodes = set(instance.all_nodes) + if self.op.remote_node: + check_nodes.add(self.op.remote_node) + for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]: + owned = self.owned_locks(level) + assert not (check_nodes - owned), \ + ("Not owning the correct locks, owning %r, expected at least %r" % + (owned, check_nodes)) + r_shut = _ShutdownInstanceDisks(self, instance) if not r_shut: raise errors.OpExecError("Cannot shutdown instance disks, unable to" @@ -11239,33 +12785,19 @@ class LUInstanceSetParams(LogicalUnit): raise result.append(("disk_template", self.op.disk_template)) - # NIC changes - for nic_op, nic_dict in self.op.nics: - if nic_op == constants.DDM_REMOVE: - # remove the last nic - del instance.nics[-1] - result.append(("nic.%d" % len(instance.nics), "remove")) - elif nic_op == constants.DDM_ADD: - # mac and bridge should be set, by now - mac = nic_dict[constants.INIC_MAC] - ip = nic_dict.get(constants.INIC_IP, None) - nicparams = self.nic_pinst[constants.DDM_ADD] - new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams) - instance.nics.append(new_nic) - result.append(("nic.%d" % (len(instance.nics) - 1), - "add:mac=%s,ip=%s,mode=%s,link=%s" % - (new_nic.mac, new_nic.ip, - self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE], - self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK] - ))) - else: - for key in (constants.INIC_MAC, constants.INIC_IP): - if key in nic_dict: - setattr(instance.nics[nic_op], key, nic_dict[key]) - if nic_op in self.nic_pinst: - instance.nics[nic_op].nicparams = self.nic_pinst[nic_op] - for key, val in nic_dict.iteritems(): - result.append(("nic.%s/%d" % (key, nic_op), val)) + assert instance.disk_template == self.op.disk_template, \ + ("Expected disk template '%s', found '%s'" % + (self.op.disk_template, instance.disk_template)) + + # Release node and resource locks if there are any (they might already have + # been released during disk conversion) + _ReleaseLocks(self, locking.LEVEL_NODE) + _ReleaseLocks(self, locking.LEVEL_NODE_RES) + + # Apply NIC changes + if self._new_nics is not None: + instance.nics = self._new_nics + result.extend(self._nic_chgdesc) # hvparams changes if self.op.hvparams: @@ -11289,8 +12821,24 @@ class LUInstanceSetParams(LogicalUnit): for key, val in self.op.osparams.iteritems(): result.append(("os/%s" % key, val)) + if self.op.offline is None: + # Ignore + pass + elif self.op.offline: + # Mark instance as offline + self.cfg.MarkInstanceOffline(instance.name) + result.append(("admin_state", constants.ADMINST_OFFLINE)) + else: + # Mark instance as online, but stopped + self.cfg.MarkInstanceDown(instance.name) + result.append(("admin_state", constants.ADMINST_DOWN)) + self.cfg.Update(instance, feedback_fn) + assert not (self.owned_locks(locking.LEVEL_NODE_RES) or + self.owned_locks(locking.LEVEL_NODE)), \ + "All node locks should have been released by now" + return result _DISK_CONVERSIONS = { @@ -11377,7 +12925,7 @@ class LUInstanceChangeGroup(LogicalUnit): if self.req_target_uuids: # User requested specific target groups - self.target_uuids = self.req_target_uuids + self.target_uuids = frozenset(self.req_target_uuids) else: # All groups except those used by the instance are potential targets self.target_uuids = owned_groups - inst_groups @@ -11446,32 +12994,74 @@ class LUBackupQuery(NoHooksLU): """ REQ_BGL = False + def CheckArguments(self): + self.expq = _ExportQuery(qlang.MakeSimpleFilter("node", self.op.nodes), + ["node", "export"], self.op.use_locking) + def ExpandNames(self): - self.needed_locks = {} - self.share_locks[locking.LEVEL_NODE] = 1 - if not self.op.nodes: - self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET - else: - self.needed_locks[locking.LEVEL_NODE] = \ - _GetWantedNodes(self, self.op.nodes) + self.expq.ExpandNames(self) + + def DeclareLocks(self, level): + self.expq.DeclareLocks(self, level) def Exec(self, feedback_fn): - """Compute the list of all the exported system images. + result = {} - @rtype: dict - @return: a dictionary with the structure node->(export-list) - where export-list is a list of the instances exported on - that node. + for (node, expname) in self.expq.OldStyleQuery(self): + if expname is None: + result[node] = False + else: + result.setdefault(node, []).append(expname) + + return result + + +class _ExportQuery(_QueryBase): + FIELDS = query.EXPORT_FIELDS + + #: The node name is not a unique key for this query + SORT_FIELD = "node" + + def ExpandNames(self, lu): + lu.needed_locks = {} + + # The following variables interact with _QueryBase._GetNames + if self.names: + self.wanted = _GetWantedNodes(lu, self.names) + else: + self.wanted = locking.ALL_SET + + self.do_locking = self.use_locking + + if self.do_locking: + lu.share_locks = _ShareAll() + lu.needed_locks = { + locking.LEVEL_NODE: self.wanted, + } + + def DeclareLocks(self, lu, level): + pass + + def _GetQueryData(self, lu): + """Computes the list of nodes and their attributes. """ - self.nodes = self.owned_locks(locking.LEVEL_NODE) - rpcresult = self.rpc.call_export_list(self.nodes) - result = {} - for node in rpcresult: - if rpcresult[node].fail_msg: - result[node] = False + # Locking is not used + # TODO + assert not (compat.any(lu.glm.is_owned(level) + for level in locking.LEVELS + if level != locking.LEVEL_CLUSTER) or + self.do_locking or self.use_locking) + + nodes = self._GetNames(lu, lu.cfg.GetNodeList(), locking.LEVEL_NODE) + + result = [] + + for (node, nres) in lu.rpc.call_export_list(nodes).items(): + if nres.fail_msg: + result.append((node, None)) else: - result[node] = rpcresult[node].payload + result.extend((node, expname) for expname in nres.payload) return result @@ -11613,7 +13203,8 @@ class LUBackupExport(LogicalUnit): "Cannot retrieve locked instance %s" % self.op.instance_name _CheckNodeOnline(self, self.instance.primary_node) - if (self.op.remove_instance and self.instance.admin_up and + if (self.op.remove_instance and + self.instance.admin_state == constants.ADMINST_UP and not self.op.shutdown): raise errors.OpPrereqError("Can not remove instance without shutting it" " down before") @@ -11743,7 +13334,7 @@ class LUBackupExport(LogicalUnit): for disk in instance.disks: self.cfg.SetDiskID(disk, src_node) - activate_disks = (not instance.admin_up) + activate_disks = (instance.admin_state != constants.ADMINST_UP) if activate_disks: # Activate the instance disks if we'exporting a stopped instance @@ -11756,12 +13347,13 @@ class LUBackupExport(LogicalUnit): helper.CreateSnapshots() try: - if (self.op.shutdown and instance.admin_up and + if (self.op.shutdown and + instance.admin_state == constants.ADMINST_UP and not self.op.remove_instance): assert not activate_disks feedback_fn("Starting instance %s" % instance.name) - result = self.rpc.call_instance_start(src_node, instance, - None, None, False) + result = self.rpc.call_instance_start(src_node, + (instance, None, None), False) msg = result.fail_msg if msg: feedback_fn("Failed to start instance: %s" % msg) @@ -11905,6 +13497,33 @@ class LUGroupAdd(LogicalUnit): if self.op.ndparams: utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES) + if self.op.hv_state: + self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state, None) + else: + self.new_hv_state = None + + if self.op.disk_state: + self.new_disk_state = _MergeAndVerifyDiskState(self.op.disk_state, None) + else: + self.new_disk_state = None + + if self.op.diskparams: + for templ in constants.DISK_TEMPLATES: + if templ not in self.op.diskparams: + self.op.diskparams[templ] = {} + utils.ForceDictType(self.op.diskparams[templ], constants.DISK_DT_TYPES) + else: + self.op.diskparams = self.cfg.GetClusterInfo().diskparams + + if self.op.ipolicy: + cluster = self.cfg.GetClusterInfo() + full_ipolicy = cluster.SimpleFillIPolicy(self.op.ipolicy) + try: + objects.InstancePolicy.CheckParameterSyntax(full_ipolicy) + except errors.ConfigurationError, err: + raise errors.OpPrereqError("Invalid instance policy: %s" % err, + errors.ECODE_INVAL) + def BuildHooksEnv(self): """Build hooks env. @@ -11927,7 +13546,11 @@ class LUGroupAdd(LogicalUnit): group_obj = objects.NodeGroup(name=self.op.group_name, members=[], uuid=self.group_uuid, alloc_policy=self.op.alloc_policy, - ndparams=self.op.ndparams) + ndparams=self.op.ndparams, + diskparams=self.op.diskparams, + ipolicy=self.op.ipolicy, + hv_state_static=self.new_hv_state, + disk_state_static=self.new_disk_state) self.cfg.AddNodeGroup(group_obj, self.proc.GetECId(), check_uuid=False) del self.remove_locks[locking.LEVEL_NODEGROUP] @@ -12073,6 +13696,7 @@ class _GroupQuery(_QueryBase): lu.needed_locks = {} self._all_groups = lu.cfg.GetAllNodeGroupsInfo() + self._cluster = lu.cfg.GetClusterInfo() name_to_uuid = dict((g.name, g.uuid) for g in self._all_groups.values()) if not self.names: @@ -12138,7 +13762,8 @@ class _GroupQuery(_QueryBase): # Do not pass on node information if it was not requested. group_to_nodes = None - return query.GroupQueryData([self._all_groups[uuid] + return query.GroupQueryData(self._cluster, + [self._all_groups[uuid] for uuid in self.wanted], group_to_nodes, group_to_instances) @@ -12174,7 +13799,11 @@ class LUGroupSetParams(LogicalUnit): def CheckArguments(self): all_changes = [ self.op.ndparams, + self.op.diskparams, self.op.alloc_policy, + self.op.hv_state, + self.op.disk_state, + self.op.ipolicy, ] if all_changes.count(None) == len(all_changes): @@ -12186,14 +13815,32 @@ class LUGroupSetParams(LogicalUnit): self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) self.needed_locks = { + locking.LEVEL_INSTANCE: [], locking.LEVEL_NODEGROUP: [self.group_uuid], } + self.share_locks[locking.LEVEL_INSTANCE] = 1 + + def DeclareLocks(self, level): + if level == locking.LEVEL_INSTANCE: + assert not self.needed_locks[locking.LEVEL_INSTANCE] + + # Lock instances optimistically, needs verification once group lock has + # been acquired + self.needed_locks[locking.LEVEL_INSTANCE] = \ + self.cfg.GetNodeGroupInstances(self.group_uuid) + def CheckPrereq(self): """Check prerequisites. """ + owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE)) + + # Check if locked instances are still correct + _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances) + self.group = self.cfg.GetNodeGroup(self.group_uuid) + cluster = self.cfg.GetClusterInfo() if self.group is None: raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" % @@ -12204,6 +13851,43 @@ class LUGroupSetParams(LogicalUnit): utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES) self.new_ndparams = new_ndparams + if self.op.diskparams: + self.new_diskparams = dict() + for templ in constants.DISK_TEMPLATES: + if templ not in self.op.diskparams: + self.op.diskparams[templ] = {} + new_templ_params = _GetUpdatedParams(self.group.diskparams[templ], + self.op.diskparams[templ]) + utils.ForceDictType(new_templ_params, constants.DISK_DT_TYPES) + self.new_diskparams[templ] = new_templ_params + + if self.op.hv_state: + self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state, + self.group.hv_state_static) + + if self.op.disk_state: + self.new_disk_state = \ + _MergeAndVerifyDiskState(self.op.disk_state, + self.group.disk_state_static) + + if self.op.ipolicy: + self.new_ipolicy = _GetUpdatedIPolicy(self.group.ipolicy, + self.op.ipolicy, + group_policy=True) + + new_ipolicy = cluster.SimpleFillIPolicy(self.new_ipolicy) + inst_filter = lambda inst: inst.name in owned_instances + instances = self.cfg.GetInstancesInfoByFilter(inst_filter).values() + violations = \ + _ComputeNewInstanceViolations(_CalculateGroupIPolicy(cluster, + self.group), + new_ipolicy, instances) + + if violations: + self.LogWarning("After the ipolicy change the following instances" + " violate them: %s", + utils.CommaJoin(violations)) + def BuildHooksEnv(self): """Build hooks env. @@ -12230,9 +13914,22 @@ class LUGroupSetParams(LogicalUnit): self.group.ndparams = self.new_ndparams result.append(("ndparams", str(self.group.ndparams))) + if self.op.diskparams: + self.group.diskparams = self.new_diskparams + result.append(("diskparams", str(self.group.diskparams))) + if self.op.alloc_policy: self.group.alloc_policy = self.op.alloc_policy + if self.op.hv_state: + self.group.hv_state_static = self.new_hv_state + + if self.op.disk_state: + self.group.disk_state_static = self.new_disk_state + + if self.op.ipolicy: + self.group.ipolicy = self.new_ipolicy + self.cfg.Update(self.group, feedback_fn) return result @@ -12461,16 +14158,8 @@ class LUGroupEvacuate(LogicalUnit): self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances)) # Check if node groups for locked instances are still correct - for instance_name in owned_instances: - inst = self.instances[instance_name] - assert owned_nodes.issuperset(inst.all_nodes), \ - "Instance %s's nodes changed while we kept the lock" % instance_name - - inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name, - owned_groups) - - assert self.group_uuid in inst_groups, \ - "Instance %s has no node in group %s" % (instance_name, self.group_uuid) + _CheckInstancesNodeGroups(self.cfg, self.instances, + owned_groups, owned_nodes, self.group_uuid) if self.req_target_uuids: # User requested specific target groups @@ -12538,14 +14227,25 @@ class TagsLU(NoHooksLU): # pylint: disable=W0223 def ExpandNames(self): self.group_uuid = None self.needed_locks = {} + if self.op.kind == constants.TAG_NODE: self.op.name = _ExpandNodeName(self.cfg, self.op.name) - self.needed_locks[locking.LEVEL_NODE] = self.op.name + lock_level = locking.LEVEL_NODE + lock_name = self.op.name elif self.op.kind == constants.TAG_INSTANCE: self.op.name = _ExpandInstanceName(self.cfg, self.op.name) - self.needed_locks[locking.LEVEL_INSTANCE] = self.op.name + lock_level = locking.LEVEL_INSTANCE + lock_name = self.op.name elif self.op.kind == constants.TAG_NODEGROUP: self.group_uuid = self.cfg.LookupNodeGroup(self.op.name) + lock_level = locking.LEVEL_NODEGROUP + lock_name = self.group_uuid + else: + lock_level = None + lock_name = None + + if lock_level and getattr(self.op, "use_locking", True): + self.needed_locks[lock_level] = lock_name # FIXME: Acquire BGL for cluster tag operations (as of this writing it's # not possible to acquire the BGL based on opcode parameters) @@ -12889,14 +14589,14 @@ class IAllocator(object): # pylint: disable=R0902 # lots of instance attributes - def __init__(self, cfg, rpc, mode, **kwargs): + def __init__(self, cfg, rpc_runner, mode, **kwargs): self.cfg = cfg - self.rpc = rpc + self.rpc = rpc_runner # init buffer variables self.in_text = self.out_text = self.in_data = self.out_data = None # init all input fields so that pylint is happy self.mode = mode - self.memory = self.disks = self.disk_template = None + self.memory = self.disks = self.disk_template = self.spindle_use = None self.os = self.tags = self.nics = self.vcpus = None self.hypervisor = None self.relocate_from = None @@ -12943,7 +14643,7 @@ class IAllocator(object): "cluster_name": cfg.GetClusterName(), "cluster_tags": list(cluster_info.GetTags()), "enabled_hypervisors": list(cluster_info.enabled_hypervisors), - # we don't have job IDs + "ipolicy": cluster_info.ipolicy, } ninfo = cfg.GetAllNodesInfo() iinfo = cfg.GetAllInstancesInfo().values() @@ -12957,17 +14657,17 @@ class IAllocator(object): elif self.mode == constants.IALLOCATOR_MODE_RELOC: hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor else: - hypervisor_name = cluster_info.enabled_hypervisors[0] + hypervisor_name = cluster_info.primary_hypervisor - node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(), - hypervisor_name) + node_data = self.rpc.call_node_info(node_list, [cfg.GetVGName()], + [hypervisor_name]) node_iinfo = \ self.rpc.call_all_instances_info(node_list, cluster_info.enabled_hypervisors) data["nodegroups"] = self._ComputeNodeGroupData(cfg) - config_ndata = self._ComputeBasicNodeData(ninfo) + config_ndata = self._ComputeBasicNodeData(cfg, ninfo) data["nodes"] = self._ComputeDynamicNodeData(ninfo, node_data, node_iinfo, i_list, config_ndata) assert len(data["nodes"]) == len(ninfo), \ @@ -12982,16 +14682,18 @@ class IAllocator(object): """Compute node groups data. """ + cluster = cfg.GetClusterInfo() ng = dict((guuid, { "name": gdata.name, "alloc_policy": gdata.alloc_policy, + "ipolicy": _CalculateGroupIPolicy(cluster, gdata), }) for guuid, gdata in cfg.GetAllNodeGroupsInfo().items()) return ng @staticmethod - def _ComputeBasicNodeData(node_cfg): + def _ComputeBasicNodeData(cfg, node_cfg): """Compute global node data. @rtype: dict @@ -13009,6 +14711,7 @@ class IAllocator(object): "group": ninfo.group, "master_capable": ninfo.master_capable, "vm_capable": ninfo.vm_capable, + "ndparams": cfg.GetNdParams(ninfo), }) for ninfo in node_cfg.values()) @@ -13022,6 +14725,7 @@ class IAllocator(object): @param node_results: the basic node structures as filled from the config """ + #TODO(dynmem): compute the right data on MAX and MIN memory # make a copy of the current dict node_results = dict(node_results) for nname, nresult in node_data.items(): @@ -13032,7 +14736,7 @@ class IAllocator(object): nresult.Raise("Can't get data for node %s" % nname) node_iinfo[nname].Raise("Can't get node instance info from node %s" % nname) - remote_info = nresult.payload + remote_info = _MakeLegacyNodeInfo(nresult.payload) for attr in ["memory_total", "memory_free", "memory_dom0", "vg_size", "vg_free", "cpu_total"]: @@ -13047,16 +14751,16 @@ class IAllocator(object): i_p_mem = i_p_up_mem = 0 for iinfo, beinfo in i_list: if iinfo.primary_node == nname: - i_p_mem += beinfo[constants.BE_MEMORY] + i_p_mem += beinfo[constants.BE_MAXMEM] if iinfo.name not in node_iinfo[nname].payload: i_used_mem = 0 else: i_used_mem = int(node_iinfo[nname].payload[iinfo.name]["memory"]) - i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem + i_mem_diff = beinfo[constants.BE_MAXMEM] - i_used_mem remote_info["memory_free"] -= max(0, i_mem_diff) - if iinfo.admin_up: - i_p_up_mem += beinfo[constants.BE_MEMORY] + if iinfo.admin_state == constants.ADMINST_UP: + i_p_up_mem += beinfo[constants.BE_MAXMEM] # compute memory used by instances pnr_dyn = { @@ -13095,9 +14799,10 @@ class IAllocator(object): nic_data.append(nic_dict) pir = { "tags": list(iinfo.GetTags()), - "admin_up": iinfo.admin_up, + "admin_state": iinfo.admin_state, "vcpus": beinfo[constants.BE_VCPUS], - "memory": beinfo[constants.BE_MEMORY], + "memory": beinfo[constants.BE_MAXMEM], + "spindle_use": beinfo[constants.BE_SPINDLE_USE], "os": iinfo.os, "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes), "nics": nic_data, @@ -13137,6 +14842,7 @@ class IAllocator(object): "os": self.os, "vcpus": self.vcpus, "memory": self.memory, + "spindle_use": self.spindle_use, "disks": self.disks, "disk_space_total": disk_space, "nics": self.nics, @@ -13250,6 +14956,7 @@ class IAllocator(object): [ ("name", ht.TString), ("memory", ht.TInt), + ("spindle_use", ht.TInt), ("disks", ht.TListOf(ht.TDict)), ("disk_template", ht.TString), ("os", ht.TString), @@ -13486,10 +15193,12 @@ class LUTestAllocator(NoHooksLU): #: Query type implementations _QUERY_IMPL = { + constants.QR_CLUSTER: _ClusterQuery, constants.QR_INSTANCE: _InstanceQuery, constants.QR_NODE: _NodeQuery, constants.QR_GROUP: _GroupQuery, constants.QR_OS: _OsQuery, + constants.QR_EXPORT: _ExportQuery, } assert set(_QUERY_IMPL.keys()) == constants.QR_VIA_OP