X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/3b61ee44b36146db4b83b957fba991be0ded6e76..5a85b99e9085e221208b957a7fc0fe87436843f5:/lib/cmdlib.py diff --git a/lib/cmdlib.py b/lib/cmdlib.py index 0cbe515..d147c43 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -21,12 +21,12 @@ """Module implementing the master-side code.""" -# pylint: disable-msg=W0201,C0302 +# pylint: disable=W0201,C0302 # W0201 since most LU attributes are defined in CheckPrereq or similar # functions -# C0302: since we have waaaay to many lines in this module +# C0302: since we have waaaay too many lines in this module import os import os.path @@ -59,21 +59,13 @@ from ganeti import query from ganeti import qlang from ganeti import opcodes from ganeti import ht +from ganeti import rpc -import ganeti.masterd.instance # pylint: disable-msg=W0611 +import ganeti.masterd.instance # pylint: disable=W0611 -def _SupportsOob(cfg, node): - """Tells if node supports OOB. - - @type cfg: L{config.ConfigWriter} - @param cfg: The cluster configuration - @type node: L{objects.Node} - @param node: The node - @return: The OOB script if supported or an empty string otherwise - - """ - return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM] +#: Size of DRBD meta block device +DRBD_META_SIZE = 128 class ResultWithJobs: @@ -121,7 +113,7 @@ class LogicalUnit(object): HTYPE = None REQ_BGL = True - def __init__(self, processor, op, context, rpc): + def __init__(self, processor, op, context, rpc_runner): """Constructor for LogicalUnit. This needs to be overridden in derived classes in order to check op @@ -132,8 +124,10 @@ class LogicalUnit(object): self.op = op self.cfg = context.cfg self.glm = context.glm + # readability alias + self.owned_locks = context.glm.list_owned self.context = context - self.rpc = rpc + self.rpc = rpc_runner # Dicts used to declare locking needs to mcpu self.needed_locks = None self.share_locks = dict.fromkeys(locking.LEVELS, 0) @@ -142,10 +136,10 @@ class LogicalUnit(object): # Used to force good behavior when calling helper functions self.recalculate_locks = {} # logging - self.Log = processor.Log # pylint: disable-msg=C0103 - self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103 - self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103 - self.LogStep = processor.LogStep # pylint: disable-msg=C0103 + self.Log = processor.Log # pylint: disable=C0103 + self.LogWarning = processor.LogWarning # pylint: disable=C0103 + self.LogInfo = processor.LogInfo # pylint: disable=C0103 + self.LogStep = processor.LogStep # pylint: disable=C0103 # support for dry-run self.dry_run_result = None # support for generic debug attribute @@ -333,7 +327,7 @@ class LogicalUnit(object): """ # API must be kept, thus we ignore the unused argument and could # be a function warnings - # pylint: disable-msg=W0613,R0201 + # pylint: disable=W0613,R0201 return lu_result def _ExpandAndLockInstance(self): @@ -355,7 +349,8 @@ class LogicalUnit(object): self.op.instance_name) self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name - def _LockInstancesNodes(self, primary_only=False): + def _LockInstancesNodes(self, primary_only=False, + level=locking.LEVEL_NODE): """Helper function to declare instances' nodes for locking. This function should be called after locking one or more instances to lock @@ -376,9 +371,10 @@ class LogicalUnit(object): @type primary_only: boolean @param primary_only: only lock primary nodes of locked instances + @param level: Which lock level to use for locking nodes """ - assert locking.LEVEL_NODE in self.recalculate_locks, \ + assert level in self.recalculate_locks, \ "_LockInstancesNodes helper function called with no nodes to recalculate" # TODO: check if we're really been called with the instance locks held @@ -387,21 +383,23 @@ class LogicalUnit(object): # future we might want to have different behaviors depending on the value # of self.recalculate_locks[locking.LEVEL_NODE] wanted_nodes = [] - for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE): - instance = self.context.cfg.GetInstanceInfo(instance_name) + locked_i = self.owned_locks(locking.LEVEL_INSTANCE) + for _, instance in self.cfg.GetMultiInstanceInfo(locked_i): wanted_nodes.append(instance.primary_node) if not primary_only: wanted_nodes.extend(instance.secondary_nodes) - if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE: - self.needed_locks[locking.LEVEL_NODE] = wanted_nodes - elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND: - self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes) + if self.recalculate_locks[level] == constants.LOCKS_REPLACE: + self.needed_locks[level] = wanted_nodes + elif self.recalculate_locks[level] == constants.LOCKS_APPEND: + self.needed_locks[level].extend(wanted_nodes) + else: + raise errors.ProgrammerError("Unknown recalculation mode") - del self.recalculate_locks[locking.LEVEL_NODE] + del self.recalculate_locks[level] -class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223 +class NoHooksLU(LogicalUnit): # pylint: disable=W0223 """Simple LU which runs no hooks. This LU is intended as a parent for other LogicalUnits which will @@ -479,13 +477,13 @@ class _QueryBase: #: Attribute holding field definitions FIELDS = None - def __init__(self, filter_, fields, use_locking): + def __init__(self, qfilter, fields, use_locking): """Initializes this class. """ self.use_locking = use_locking - self.query = query.Query(self.FIELDS, fields, filter_=filter_, + self.query = query.Query(self.FIELDS, fields, qfilter=qfilter, namefield="name") self.requested_data = self.query.RequestedData() self.names = self.query.RequestedNames() @@ -501,7 +499,7 @@ class _QueryBase: """ if self.do_locking: - names = lu.glm.list_owned(lock_level) + names = lu.owned_locks(lock_level) else: names = all_names @@ -561,6 +559,76 @@ class _QueryBase: sort_by_name=self.sort_by_name) +def _ShareAll(): + """Returns a dict declaring all lock levels shared. + + """ + return dict.fromkeys(locking.LEVELS, 1) + + +def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups): + """Checks if the owned node groups are still correct for an instance. + + @type cfg: L{config.ConfigWriter} + @param cfg: The cluster configuration + @type instance_name: string + @param instance_name: Instance name + @type owned_groups: set or frozenset + @param owned_groups: List of currently owned node groups + + """ + inst_groups = cfg.GetInstanceNodeGroups(instance_name) + + if not owned_groups.issuperset(inst_groups): + raise errors.OpPrereqError("Instance %s's node groups changed since" + " locks were acquired, current groups are" + " are '%s', owning groups '%s'; retry the" + " operation" % + (instance_name, + utils.CommaJoin(inst_groups), + utils.CommaJoin(owned_groups)), + errors.ECODE_STATE) + + return inst_groups + + +def _CheckNodeGroupInstances(cfg, group_uuid, owned_instances): + """Checks if the instances in a node group are still correct. + + @type cfg: L{config.ConfigWriter} + @param cfg: The cluster configuration + @type group_uuid: string + @param group_uuid: Node group UUID + @type owned_instances: set or frozenset + @param owned_instances: List of currently owned instances + + """ + wanted_instances = cfg.GetNodeGroupInstances(group_uuid) + if owned_instances != wanted_instances: + raise errors.OpPrereqError("Instances in node group '%s' changed since" + " locks were acquired, wanted '%s', have '%s';" + " retry the operation" % + (group_uuid, + utils.CommaJoin(wanted_instances), + utils.CommaJoin(owned_instances)), + errors.ECODE_STATE) + + return wanted_instances + + +def _SupportsOob(cfg, node): + """Tells if node supports OOB. + + @type cfg: L{config.ConfigWriter} + @param cfg: The cluster configuration + @type node: L{objects.Node} + @param node: The node + @return: The OOB script if supported or an empty string otherwise + + """ + return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM] + + def _GetWantedNodes(lu, nodes): """Returns list of checked and expanded node names. @@ -658,18 +726,18 @@ def _ReleaseLocks(lu, level, names=None, keep=None): release = [] # Determine which locks to release - for name in lu.glm.list_owned(level): + for name in lu.owned_locks(level): if should_release(name): release.append(name) else: retain.append(name) - assert len(lu.glm.list_owned(level)) == (len(retain) + len(release)) + assert len(lu.owned_locks(level)) == (len(retain) + len(release)) # Release just some locks lu.glm.release(level, names=release) - assert frozenset(lu.glm.list_owned(level)) == frozenset(retain) + assert frozenset(lu.owned_locks(level)) == frozenset(retain) else: # Release everything lu.glm.release(level) @@ -677,15 +745,28 @@ def _ReleaseLocks(lu, level, names=None, keep=None): assert not lu.glm.is_owned(level), "No locks should be owned" +def _MapInstanceDisksToNodes(instances): + """Creates a map from (node, volume) to instance name. + + @type instances: list of L{objects.Instance} + @rtype: dict; tuple of (node name, volume name) as key, instance name as value + + """ + return dict(((node, vol), inst.name) + for inst in instances + for (node, vols) in inst.MapLVsByNode().items() + for vol in vols) + + def _RunPostHook(lu, node_name): """Runs the post-hook for an opcode on a single node. """ - hm = lu.proc.hmclass(lu.rpc.call_hooks_runner, lu) + hm = lu.proc.BuildHooksManager(lu) try: hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name]) except: - # pylint: disable-msg=W0702 + # pylint: disable=W0702 lu.LogWarning("Errors occurred running hooks on %s" % node_name) @@ -998,24 +1079,24 @@ def _BuildInstanceHookEnvByObject(lu, instance, override=None): bep = cluster.FillBE(instance) hvp = cluster.FillHV(instance) args = { - 'name': instance.name, - 'primary_node': instance.primary_node, - 'secondary_nodes': instance.secondary_nodes, - 'os_type': instance.os, - 'status': instance.admin_up, - 'memory': bep[constants.BE_MEMORY], - 'vcpus': bep[constants.BE_VCPUS], - 'nics': _NICListToTuple(lu, instance.nics), - 'disk_template': instance.disk_template, - 'disks': [(disk.size, disk.mode) for disk in instance.disks], - 'bep': bep, - 'hvp': hvp, - 'hypervisor_name': instance.hypervisor, - 'tags': instance.tags, + "name": instance.name, + "primary_node": instance.primary_node, + "secondary_nodes": instance.secondary_nodes, + "os_type": instance.os, + "status": instance.admin_up, + "memory": bep[constants.BE_MEMORY], + "vcpus": bep[constants.BE_VCPUS], + "nics": _NICListToTuple(lu, instance.nics), + "disk_template": instance.disk_template, + "disks": [(disk.size, disk.mode) for disk in instance.disks], + "bep": bep, + "hvp": hvp, + "hypervisor_name": instance.hypervisor, + "tags": instance.tags, } if override: args.update(override) - return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142 + return _BuildInstanceHookEnv(**args) # pylint: disable=W0142 def _AdjustCandidatePool(lu, exceptions): @@ -1077,9 +1158,13 @@ def _CheckOSVariant(os_obj, name): @param name: OS name passed by the user, to check for validity """ + variant = objects.OS.GetVariant(name) if not os_obj.supported_variants: + if variant: + raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'" + " passed)" % (os_obj.name, variant), + errors.ECODE_INVAL) return - variant = objects.OS.GetVariant(name) if not variant: raise errors.OpPrereqError("OS name must include a variant", errors.ECODE_INVAL) @@ -1128,13 +1213,13 @@ def _GetStorageTypeArgs(cfg, storage_type): return [] -def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq): +def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq): faulty = [] for dev in instance.disks: cfg.SetDiskID(dev, node_name) - result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks) + result = rpc_runner.call_blockdev_getmirrorstatus(node_name, instance.disks) result.Raise("Failed to get disk status from node %s" % node_name, prereq=prereq, ecode=errors.ECODE_ENVIRON) @@ -1177,6 +1262,29 @@ def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot): " iallocator") +def _GetDefaultIAllocator(cfg, iallocator): + """Decides on which iallocator to use. + + @type cfg: L{config.ConfigWriter} + @param cfg: Cluster configuration object + @type iallocator: string or None + @param iallocator: Iallocator specified in opcode + @rtype: string + @return: Iallocator name + + """ + if not iallocator: + # Use default iallocator + iallocator = cfg.GetDefaultIAllocator() + + if not iallocator: + raise errors.OpPrereqError("No iallocator was specified, neither in the" + " opcode nor as a cluster-wide default", + errors.ECODE_INVAL) + + return iallocator + + class LUClusterPostInit(LogicalUnit): """Logical unit for running hooks after cluster initialization. @@ -1251,15 +1359,16 @@ class LUClusterDestroy(LogicalUnit): """Destroys the cluster. """ - master = self.cfg.GetMasterNode() + master_params = self.cfg.GetMasterNetworkParameters() # Run post hooks on master node before it's removed - _RunPostHook(self, master) + _RunPostHook(self, master_params.name) - result = self.rpc.call_node_stop_master(master, False) + result = self.rpc.call_node_deactivate_master_ip(master_params.name, + master_params) result.Raise("Could not disable the master role") - return master + return master_params.name def _VerifyCertificate(filename): @@ -1272,7 +1381,7 @@ def _VerifyCertificate(filename): try: cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, utils.ReadFile(filename)) - except Exception, err: # pylint: disable-msg=W0703 + except Exception, err: # pylint: disable=W0703 return (LUClusterVerifyConfig.ETYPE_ERROR, "Failed to load X509 certificate %s: %s" % (filename, err)) @@ -1334,39 +1443,6 @@ class _VerifyErrors(object): self.op and self._feedback_fn to be available.) """ - TCLUSTER = "cluster" - TNODE = "node" - TINSTANCE = "instance" - - ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG") - ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT") - ECLUSTERFILECHECK = (TCLUSTER, "ECLUSTERFILECHECK") - ECLUSTERDANGLINGNODES = (TNODE, "ECLUSTERDANGLINGNODES") - ECLUSTERDANGLINGINST = (TNODE, "ECLUSTERDANGLINGINST") - EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE") - EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN") - EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT") - EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK") - EINSTANCEFAULTYDISK = (TINSTANCE, "EINSTANCEFAULTYDISK") - EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE") - EINSTANCESPLITGROUPS = (TINSTANCE, "EINSTANCESPLITGROUPS") - ENODEDRBD = (TNODE, "ENODEDRBD") - ENODEDRBDHELPER = (TNODE, "ENODEDRBDHELPER") - ENODEFILECHECK = (TNODE, "ENODEFILECHECK") - ENODEHOOKS = (TNODE, "ENODEHOOKS") - ENODEHV = (TNODE, "ENODEHV") - ENODELVM = (TNODE, "ENODELVM") - ENODEN1 = (TNODE, "ENODEN1") - ENODENET = (TNODE, "ENODENET") - ENODEOS = (TNODE, "ENODEOS") - ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE") - ENODEORPHANLV = (TNODE, "ENODEORPHANLV") - ENODERPC = (TNODE, "ENODERPC") - ENODESSH = (TNODE, "ENODESSH") - ENODEVERSION = (TNODE, "ENODEVERSION") - ENODESETUP = (TNODE, "ENODESETUP") - ENODETIME = (TNODE, "ENODETIME") - ENODEOOBPATH = (TNODE, "ENODEOOBPATH") ETYPE_FIELD = "code" ETYPE_ERROR = "ERROR" @@ -1382,12 +1458,12 @@ class _VerifyErrors(object): """ ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) - itype, etxt = ecode + itype, etxt, _ = ecode # first complete the msg if args: msg = msg % args # then format the whole message - if self.op.error_codes: # This is a mix-in. pylint: disable-msg=E1101 + if self.op.error_codes: # This is a mix-in. pylint: disable=E1101 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg) else: if item: @@ -1396,21 +1472,73 @@ class _VerifyErrors(object): item = "" msg = "%s: %s%s: %s" % (ltype, itype, item, msg) # and finally report it via the feedback_fn - self._feedback_fn(" - %s" % msg) # Mix-in. pylint: disable-msg=E1101 + self._feedback_fn(" - %s" % msg) # Mix-in. pylint: disable=E1101 - def _ErrorIf(self, cond, *args, **kwargs): + def _ErrorIf(self, cond, ecode, *args, **kwargs): """Log an error message if the passed condition is True. """ cond = (bool(cond) - or self.op.debug_simulate_errors) # pylint: disable-msg=E1101 + or self.op.debug_simulate_errors) # pylint: disable=E1101 + + # If the error code is in the list of ignored errors, demote the error to a + # warning + (_, etxt, _) = ecode + if etxt in self.op.ignore_errors: # pylint: disable=E1101 + kwargs[self.ETYPE_FIELD] = self.ETYPE_WARNING + if cond: - self._Error(*args, **kwargs) + self._Error(ecode, *args, **kwargs) + # do not mark the operation as failed for WARN cases only if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR: self.bad = self.bad or cond +class LUClusterVerify(NoHooksLU): + """Submits all jobs necessary to verify the cluster. + + """ + REQ_BGL = False + + def ExpandNames(self): + self.needed_locks = {} + + def Exec(self, feedback_fn): + jobs = [] + + if self.op.group_name: + groups = [self.op.group_name] + depends_fn = lambda: None + else: + groups = self.cfg.GetNodeGroupList() + + # Verify global configuration + jobs.append([ + opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors) + ]) + + # Always depend on global verification + depends_fn = lambda: [(-len(jobs), [])] + + jobs.extend([opcodes.OpClusterVerifyGroup(group_name=group, + ignore_errors=self.op.ignore_errors, + depends=depends_fn())] + for group in groups) + + # Fix up all parameters + for op in itertools.chain(*jobs): # pylint: disable=W0142 + op.debug_simulate_errors = self.op.debug_simulate_errors + op.verbose = self.op.verbose + op.error_codes = self.op.error_codes + try: + op.skip_checks = self.op.skip_checks + except AttributeError: + assert not isinstance(op, opcodes.OpClusterVerifyGroup) + + return ResultWithJobs(jobs) + + class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors): """Verifies the cluster config. @@ -1429,11 +1557,12 @@ class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors): utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES) hv_class.CheckParameterSyntax(hv_params) except errors.GenericError, err: - self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err)) + self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err)) def ExpandNames(self): # Information can be safely retrieved as the BGL is acquired in exclusive # mode + assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER) self.all_group_info = self.cfg.GetAllNodeGroupsInfo() self.all_node_info = self.cfg.GetAllNodesInfo() self.all_inst_info = self.cfg.GetAllInstancesInfo() @@ -1449,13 +1578,13 @@ class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors): feedback_fn("* Verifying cluster config") for msg in self.cfg.VerifyConfig(): - self._ErrorIf(True, self.ECLUSTERCFG, None, msg) + self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg) feedback_fn("* Verifying cluster certificate files") for cert_filename in constants.ALL_CERT_FILES: (errcode, msg) = _VerifyCertificate(cert_filename) - self._ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode) + self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode) feedback_fn("* Verifying hypervisor parameters") @@ -1487,15 +1616,17 @@ class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors): ["no instances"]))) for node in dangling_nodes] - self._ErrorIf(bool(dangling_nodes), self.ECLUSTERDANGLINGNODES, None, + self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES, + None, "the following nodes (and their instances) belong to a non" " existing group: %s", utils.CommaJoin(pretty_dangling)) - self._ErrorIf(bool(no_node_instances), self.ECLUSTERDANGLINGINST, None, + self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST, + None, "the following instances have a non-existing primary-node:" " %s", utils.CommaJoin(no_node_instances)) - return (not self.bad, [g.name for g in self.all_group_info.values()]) + return not self.bad class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): @@ -1571,7 +1702,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): locking.LEVEL_NODE: [], } - self.share_locks = dict.fromkeys(locking.LEVELS, 1) + self.share_locks = _ShareAll() def DeclareLocks(self, level): if level == locking.LEVEL_NODE: @@ -1585,7 +1716,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): # volumes for these instances are healthy, we will need to do an # extra call to their secondaries. We ensure here those nodes will # be locked. - for inst in self.glm.list_owned(locking.LEVEL_INSTANCE): + for inst in self.owned_locks(locking.LEVEL_INSTANCE): # Important: access only the instances whose lock is owned if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR: nodes.update(all_inst_info[inst].secondary_nodes) @@ -1593,14 +1724,17 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): self.needed_locks[locking.LEVEL_NODE] = nodes def CheckPrereq(self): - group_nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members) + assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP) + self.group_info = self.cfg.GetNodeGroup(self.group_uuid) + + group_nodes = set(self.group_info.members) group_instances = self.cfg.GetNodeGroupInstances(self.group_uuid) unlocked_nodes = \ - group_nodes.difference(self.glm.list_owned(locking.LEVEL_NODE)) + group_nodes.difference(self.owned_locks(locking.LEVEL_NODE)) unlocked_instances = \ - group_instances.difference(self.glm.list_owned(locking.LEVEL_INSTANCE)) + group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE)) if unlocked_nodes: raise errors.OpPrereqError("Missing lock for nodes: %s" % @@ -1634,7 +1768,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): extra_lv_nodes.add(nname) unlocked_lv_nodes = \ - extra_lv_nodes.difference(self.glm.list_owned(locking.LEVEL_NODE)) + extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE)) if unlocked_lv_nodes: raise errors.OpPrereqError("these nodes could be locked: %s" % @@ -1657,11 +1791,11 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): """ node = ninfo.name - _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + _ErrorIf = self._ErrorIf # pylint: disable=C0103 # main result, nresult should be a non-empty dict test = not nresult or not isinstance(nresult, dict) - _ErrorIf(test, self.ENODERPC, node, + _ErrorIf(test, constants.CV_ENODERPC, node, "unable to verify node: no data returned") if test: return False @@ -1672,13 +1806,13 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): test = not (remote_version and isinstance(remote_version, (list, tuple)) and len(remote_version) == 2) - _ErrorIf(test, self.ENODERPC, node, + _ErrorIf(test, constants.CV_ENODERPC, node, "connection to node returned invalid data") if test: return False test = local_version != remote_version[0] - _ErrorIf(test, self.ENODEVERSION, node, + _ErrorIf(test, constants.CV_ENODEVERSION, node, "incompatible protocol versions: master %s," " node %s", local_version, remote_version[0]) if test: @@ -1688,7 +1822,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): # full package version self._ErrorIf(constants.RELEASE_VERSION != remote_version[1], - self.ENODEVERSION, node, + constants.CV_ENODEVERSION, node, "software version mismatch: master %s, node %s", constants.RELEASE_VERSION, remote_version[1], code=self.ETYPE_WARNING) @@ -1697,19 +1831,19 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): if ninfo.vm_capable and isinstance(hyp_result, dict): for hv_name, hv_result in hyp_result.iteritems(): test = hv_result is not None - _ErrorIf(test, self.ENODEHV, node, + _ErrorIf(test, constants.CV_ENODEHV, node, "hypervisor %s verify failure: '%s'", hv_name, hv_result) hvp_result = nresult.get(constants.NV_HVPARAMS, None) if ninfo.vm_capable and isinstance(hvp_result, list): for item, hv_name, hv_result in hvp_result: - _ErrorIf(True, self.ENODEHV, node, + _ErrorIf(True, constants.CV_ENODEHV, node, "hypervisor %s parameter verify failure (source %s): %s", hv_name, item, hv_result) test = nresult.get(constants.NV_NODESETUP, ["Missing NODESETUP results"]) - _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s", + _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s", "; ".join(test)) return True @@ -1726,13 +1860,13 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): """ node = ninfo.name - _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + _ErrorIf = self._ErrorIf # pylint: disable=C0103 ntime = nresult.get(constants.NV_TIME, None) try: ntime_merged = utils.MergeTime(ntime) except (ValueError, TypeError): - _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time") + _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time") return if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW): @@ -1742,7 +1876,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): else: ntime_diff = None - _ErrorIf(ntime_diff is not None, self.ENODETIME, node, + _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node, "Node time diverges by at least %s from master node time", ntime_diff) @@ -1759,29 +1893,30 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): return node = ninfo.name - _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + _ErrorIf = self._ErrorIf # pylint: disable=C0103 # checks vg existence and size > 20G vglist = nresult.get(constants.NV_VGLIST, None) test = not vglist - _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups") + _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups") if not test: vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name, constants.MIN_VG_SIZE) - _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus) + _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus) # check pv names pvlist = nresult.get(constants.NV_PVLIST, None) test = pvlist is None - _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node") + _ErrorIf(test, constants.CV_ENODELVM, node, "Can't get PV list from node") if not test: # check that ':' is not present in PV names, since it's a # special character for lvcreate (denotes the range of PEs to # use on the PV) for _, pvname, owner_vg in pvlist: test = ":" in pvname - _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV" - " '%s' of VG '%s'", pvname, owner_vg) + _ErrorIf(test, constants.CV_ENODELVM, node, + "Invalid character ':' in PV '%s' of VG '%s'", + pvname, owner_vg) def _VerifyNodeBridges(self, ninfo, nresult, bridges): """Check the node bridges. @@ -1796,15 +1931,15 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): return node = ninfo.name - _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + _ErrorIf = self._ErrorIf # pylint: disable=C0103 missing = nresult.get(constants.NV_BRIDGES, None) test = not isinstance(missing, list) - _ErrorIf(test, self.ENODENET, node, + _ErrorIf(test, constants.CV_ENODENET, node, "did not return valid bridge information") if not test: - _ErrorIf(bool(missing), self.ENODENET, node, "missing bridges: %s" % - utils.CommaJoin(sorted(missing))) + _ErrorIf(bool(missing), constants.CV_ENODENET, node, + "missing bridges: %s" % utils.CommaJoin(sorted(missing))) def _VerifyNodeNetwork(self, ninfo, nresult): """Check the node network connectivity results. @@ -1815,30 +1950,30 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): """ node = ninfo.name - _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + _ErrorIf = self._ErrorIf # pylint: disable=C0103 test = constants.NV_NODELIST not in nresult - _ErrorIf(test, self.ENODESSH, node, + _ErrorIf(test, constants.CV_ENODESSH, node, "node hasn't returned node ssh connectivity data") if not test: if nresult[constants.NV_NODELIST]: for a_node, a_msg in nresult[constants.NV_NODELIST].items(): - _ErrorIf(True, self.ENODESSH, node, + _ErrorIf(True, constants.CV_ENODESSH, node, "ssh communication with node '%s': %s", a_node, a_msg) test = constants.NV_NODENETTEST not in nresult - _ErrorIf(test, self.ENODENET, node, + _ErrorIf(test, constants.CV_ENODENET, node, "node hasn't returned node tcp connectivity data") if not test: if nresult[constants.NV_NODENETTEST]: nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys()) for anode in nlist: - _ErrorIf(True, self.ENODENET, node, + _ErrorIf(True, constants.CV_ENODENET, node, "tcp communication with node '%s': %s", anode, nresult[constants.NV_NODENETTEST][anode]) test = constants.NV_MASTERIP not in nresult - _ErrorIf(test, self.ENODENET, node, + _ErrorIf(test, constants.CV_ENODENET, node, "node hasn't returned node master IP reachability data") if not test: if not nresult[constants.NV_MASTERIP]: @@ -1846,7 +1981,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): msg = "the master node cannot reach the master IP (not configured?)" else: msg = "cannot reach the master IP" - _ErrorIf(True, self.ENODENET, node, msg) + _ErrorIf(True, constants.CV_ENODENET, node, msg) def _VerifyInstance(self, instance, instanceconfig, node_image, diskstatus): @@ -1856,7 +1991,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): available on the instance's node. """ - _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + _ErrorIf = self._ErrorIf # pylint: disable=C0103 node_current = instanceconfig.primary_node node_vol_should = {} @@ -1869,13 +2004,13 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): continue for volume in node_vol_should[node]: test = volume not in n_img.volumes - _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance, + _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance, "volume %s missing on node %s", volume, node) if instanceconfig.admin_up: pri_img = node_image[node_current] test = instance not in pri_img.instances and not pri_img.offline - _ErrorIf(test, self.EINSTANCEDOWN, instance, + _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance, "instance not running on its primary node %s", node_current) @@ -1889,12 +2024,12 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): snode = node_image[nname] bad_snode = snode.ghost or snode.offline _ErrorIf(instanceconfig.admin_up and not success and not bad_snode, - self.EINSTANCEFAULTYDISK, instance, + constants.CV_EINSTANCEFAULTYDISK, instance, "couldn't retrieve status for disk/%s on %s: %s", idx, nname, bdev_status) _ErrorIf((instanceconfig.admin_up and success and bdev_status.ldisk_status == constants.LDS_FAULTY), - self.EINSTANCEFAULTYDISK, instance, + constants.CV_EINSTANCEFAULTYDISK, instance, "disk/%s on %s is faulty", idx, nname) def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved): @@ -1915,7 +2050,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): test = ((node not in node_vol_should or volume not in node_vol_should[node]) and not reserved.Matches(volume)) - self._ErrorIf(test, self.ENODEORPHANLV, node, + self._ErrorIf(test, constants.CV_ENODEORPHANLV, node, "volume %s is unknown", volume) def _VerifyNPlusOneMemory(self, node_image, instance_cfg): @@ -1948,14 +2083,14 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): if bep[constants.BE_AUTO_BALANCE]: needed_mem += bep[constants.BE_MEMORY] test = n_img.mfree < needed_mem - self._ErrorIf(test, self.ENODEN1, node, + self._ErrorIf(test, constants.CV_ENODEN1, node, "not enough memory to accomodate instance failovers" " should node %s fail (%dMiB needed, %dMiB available)", prinode, needed_mem, n_img.mfree) @classmethod def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo, - (files_all, files_all_opt, files_mc, files_vm)): + (files_all, files_opt, files_mc, files_vm)): """Verifies file checksums collected from all nodes. @param errorif: Callback for reporting errors @@ -1964,25 +2099,35 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): @param all_nvinfo: RPC results """ - node_names = frozenset(node.name for node in nodeinfo) + # Define functions determining which nodes to consider for a file + files2nodefn = [ + (files_all, None), + (files_mc, lambda node: (node.master_candidate or + node.name == master_node)), + (files_vm, lambda node: node.vm_capable), + ] - assert master_node in node_names - assert (len(files_all | files_all_opt | files_mc | files_vm) == - sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \ - "Found file listed in more than one file list" + # Build mapping from filename to list of nodes which should have the file + nodefiles = {} + for (files, fn) in files2nodefn: + if fn is None: + filenodes = nodeinfo + else: + filenodes = filter(fn, nodeinfo) + nodefiles.update((filename, + frozenset(map(operator.attrgetter("name"), filenodes))) + for filename in files) - # Define functions determining which nodes to consider for a file - file2nodefn = dict([(filename, fn) - for (files, fn) in [(files_all, None), - (files_all_opt, None), - (files_mc, lambda node: (node.master_candidate or - node.name == master_node)), - (files_vm, lambda node: node.vm_capable)] - for filename in files]) + assert set(nodefiles) == (files_all | files_mc | files_vm) - fileinfo = dict((filename, {}) for filename in file2nodefn.keys()) + fileinfo = dict((filename, {}) for filename in nodefiles) + ignore_nodes = set() for node in nodeinfo: + if node.offline: + ignore_nodes.add(node.name) + continue + nresult = all_nvinfo[node.name] if nresult.fail_msg or not nresult.payload: @@ -1991,16 +2136,16 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): node_files = nresult.payload.get(constants.NV_FILELIST, None) test = not (node_files and isinstance(node_files, dict)) - errorif(test, cls.ENODEFILECHECK, node.name, + errorif(test, constants.CV_ENODEFILECHECK, node.name, "Node did not return file checksum data") if test: + ignore_nodes.add(node.name) continue + # Build per-checksum mapping from filename to nodes having it for (filename, checksum) in node_files.items(): - # Check if the file should be considered for a node - fn = file2nodefn[filename] - if fn is None or fn(node): - fileinfo[filename].setdefault(checksum, set()).add(node.name) + assert filename in nodefiles + fileinfo[filename].setdefault(checksum, set()).add(node.name) for (filename, checksums) in fileinfo.items(): assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum" @@ -2008,23 +2153,32 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): # Nodes having the file with_file = frozenset(node_name for nodes in fileinfo[filename].values() - for node_name in nodes) + for node_name in nodes) - ignore_nodes + + expected_nodes = nodefiles[filename] - ignore_nodes # Nodes missing file - missing_file = node_names - with_file + missing_file = expected_nodes - with_file - if filename in files_all_opt: + if filename in files_opt: # All or no nodes - errorif(missing_file and missing_file != node_names, - cls.ECLUSTERFILECHECK, None, + errorif(missing_file and missing_file != expected_nodes, + constants.CV_ECLUSTERFILECHECK, None, "File %s is optional, but it must exist on all or no" " nodes (not found on %s)", filename, utils.CommaJoin(utils.NiceSort(missing_file))) else: - errorif(missing_file, cls.ECLUSTERFILECHECK, None, + errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None, "File %s is missing from node(s) %s", filename, utils.CommaJoin(utils.NiceSort(missing_file))) + # Warn if a node has a file it shouldn't + unexpected = with_file - expected_nodes + errorif(unexpected, + constants.CV_ECLUSTERFILECHECK, None, + "File %s should not exist on node(s) %s", + filename, utils.CommaJoin(utils.NiceSort(unexpected))) + # See if there are multiple versions of the file test = len(checksums) > 1 if test: @@ -2035,7 +2189,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): else: variants = [] - errorif(test, cls.ECLUSTERFILECHECK, None, + errorif(test, constants.CV_ECLUSTERFILECHECK, None, "File %s found with %s different checksums (%s)", filename, len(checksums), "; ".join(variants)) @@ -2053,27 +2207,27 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): """ node = ninfo.name - _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + _ErrorIf = self._ErrorIf # pylint: disable=C0103 if drbd_helper: helper_result = nresult.get(constants.NV_DRBDHELPER, None) test = (helper_result == None) - _ErrorIf(test, self.ENODEDRBDHELPER, node, + _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node, "no drbd usermode helper returned") if helper_result: status, payload = helper_result test = not status - _ErrorIf(test, self.ENODEDRBDHELPER, node, + _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node, "drbd usermode helper check unsuccessful: %s", payload) test = status and (payload != drbd_helper) - _ErrorIf(test, self.ENODEDRBDHELPER, node, + _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node, "wrong drbd usermode helper: %s", payload) # compute the DRBD minors node_drbd = {} for minor, instance in drbd_map[node].items(): test = instance not in instanceinfo - _ErrorIf(test, self.ECLUSTERCFG, None, + _ErrorIf(test, constants.CV_ECLUSTERCFG, None, "ghost instance '%s' in temporary DRBD map", instance) # ghost instance should not be running, but otherwise we # don't give double warnings (both ghost instance and @@ -2087,7 +2241,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): # and now check them used_minors = nresult.get(constants.NV_DRBDLIST, []) test = not isinstance(used_minors, (tuple, list)) - _ErrorIf(test, self.ENODEDRBD, node, + _ErrorIf(test, constants.CV_ENODEDRBD, node, "cannot parse drbd status file: %s", str(used_minors)) if test: # we cannot check drbd status @@ -2095,11 +2249,11 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): for minor, (iname, must_exist) in node_drbd.items(): test = minor not in used_minors and must_exist - _ErrorIf(test, self.ENODEDRBD, node, + _ErrorIf(test, constants.CV_ENODEDRBD, node, "drbd minor %d of instance %s is not active", minor, iname) for minor in used_minors: test = minor not in node_drbd - _ErrorIf(test, self.ENODEDRBD, node, + _ErrorIf(test, constants.CV_ENODEDRBD, node, "unallocated drbd minor %d is in use", minor) def _UpdateNodeOS(self, ninfo, nresult, nimg): @@ -2112,14 +2266,14 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): """ node = ninfo.name - _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + _ErrorIf = self._ErrorIf # pylint: disable=C0103 remote_os = nresult.get(constants.NV_OSLIST, None) test = (not isinstance(remote_os, list) or not compat.all(isinstance(v, list) and len(v) == 7 for v in remote_os)) - _ErrorIf(test, self.ENODEOS, node, + _ErrorIf(test, constants.CV_ENODEOS, node, "node hasn't returned valid OS data") nimg.os_fail = test @@ -2153,7 +2307,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): """ node = ninfo.name - _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + _ErrorIf = self._ErrorIf # pylint: disable=C0103 assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?" @@ -2161,19 +2315,14 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): for os_name, os_data in nimg.oslist.items(): assert os_data, "Empty OS status for OS %s?!" % os_name f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0] - _ErrorIf(not f_status, self.ENODEOS, node, + _ErrorIf(not f_status, constants.CV_ENODEOS, node, "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag) - _ErrorIf(len(os_data) > 1, self.ENODEOS, node, + _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node, "OS '%s' has multiple entries (first one shadows the rest): %s", os_name, utils.CommaJoin([v[0] for v in os_data])) - # this will catched in backend too - _ErrorIf(compat.any(v >= constants.OS_API_V15 for v in f_api) - and not f_var, self.ENODEOS, node, - "OS %s with API at least %d does not declare any variant", - os_name, constants.OS_API_V15) # comparisons with the 'base' image test = os_name not in base.oslist - _ErrorIf(test, self.ENODEOS, node, + _ErrorIf(test, constants.CV_ENODEOS, node, "Extra OS %s not present on reference node (%s)", os_name, base.name) if test: @@ -2187,14 +2336,14 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): ("variants list", f_var, b_var), ("parameters", beautify_params(f_param), beautify_params(b_param))]: - _ErrorIf(a != b, self.ENODEOS, node, + _ErrorIf(a != b, constants.CV_ENODEOS, node, "OS %s for %s differs from reference node %s: [%s] vs. [%s]", kind, os_name, base.name, utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b))) # check any missing OSes missing = set(base.oslist.keys()).difference(nimg.oslist.keys()) - _ErrorIf(missing, self.ENODEOS, node, + _ErrorIf(missing, constants.CV_ENODEOS, node, "OSes present on reference node %s but missing on this node: %s", base.name, utils.CommaJoin(missing)) @@ -2212,7 +2361,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): if ((ninfo.master_candidate or ninfo.master_capable) and constants.NV_OOB_PATHS in nresult): for path_result in nresult[constants.NV_OOB_PATHS]: - self._ErrorIf(path_result, self.ENODEOOBPATH, node, path_result) + self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result) def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name): """Verifies and updates the node volume data. @@ -2228,17 +2377,18 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): """ node = ninfo.name - _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + _ErrorIf = self._ErrorIf # pylint: disable=C0103 nimg.lvm_fail = True lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data") if vg_name is None: pass elif isinstance(lvdata, basestring): - _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s", + _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s", utils.SafeEncode(lvdata)) elif not isinstance(lvdata, dict): - _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)") + _ErrorIf(True, constants.CV_ENODELVM, node, + "rpc call to node failed (lvlist)") else: nimg.volumes = lvdata nimg.lvm_fail = False @@ -2258,8 +2408,9 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): """ idata = nresult.get(constants.NV_INSTANCELIST, None) test = not isinstance(idata, list) - self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed" - " (instancelist): %s", utils.SafeEncode(str(idata))) + self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name, + "rpc call to node failed (instancelist): %s", + utils.SafeEncode(str(idata))) if test: nimg.hyp_fail = True else: @@ -2276,31 +2427,32 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): """ node = ninfo.name - _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + _ErrorIf = self._ErrorIf # pylint: disable=C0103 # try to read free memory (from the hypervisor) hv_info = nresult.get(constants.NV_HVINFO, None) test = not isinstance(hv_info, dict) or "memory_free" not in hv_info - _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)") + _ErrorIf(test, constants.CV_ENODEHV, node, + "rpc call to node failed (hvinfo)") if not test: try: nimg.mfree = int(hv_info["memory_free"]) except (ValueError, TypeError): - _ErrorIf(True, self.ENODERPC, node, + _ErrorIf(True, constants.CV_ENODERPC, node, "node returned invalid nodeinfo, check hypervisor") # FIXME: devise a free space model for file based instances as well if vg_name is not None: test = (constants.NV_VGLIST not in nresult or vg_name not in nresult[constants.NV_VGLIST]) - _ErrorIf(test, self.ENODELVM, node, + _ErrorIf(test, constants.CV_ENODELVM, node, "node didn't return data for the volume group '%s'" " - it is either missing or broken", vg_name) if not test: try: nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name]) except (ValueError, TypeError): - _ErrorIf(True, self.ENODERPC, node, + _ErrorIf(True, constants.CV_ENODERPC, node, "node returned invalid LVM info, check LVM status") def _CollectDiskInfo(self, nodelist, node_image, instanceinfo): @@ -2318,7 +2470,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): list of tuples (success, payload) """ - _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + _ErrorIf = self._ErrorIf # pylint: disable=C0103 node_disks = {} node_disks_devonly = {} @@ -2367,7 +2519,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): data = len(disks) * [(False, "node offline")] else: msg = nres.fail_msg - _ErrorIf(msg, self.ENODERPC, nname, + _ErrorIf(msg, constants.CV_ENODERPC, nname, "while getting disk information: %s", msg) if msg: # No data from this node @@ -2400,6 +2552,40 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): return instdisk + @staticmethod + def _SshNodeSelector(group_uuid, all_nodes): + """Create endless iterators for all potential SSH check hosts. + + """ + nodes = [node for node in all_nodes + if (node.group != group_uuid and + not node.offline)] + keyfunc = operator.attrgetter("group") + + return map(itertools.cycle, + [sorted(map(operator.attrgetter("name"), names)) + for _, names in itertools.groupby(sorted(nodes, key=keyfunc), + keyfunc)]) + + @classmethod + def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes): + """Choose which nodes should talk to which other nodes. + + We will make nodes contact all nodes in their group, and one node from + every other group. + + @warning: This algorithm has a known issue if one node group is much + smaller than others (e.g. just one node). In such a case all other + nodes will talk to the single node. + + """ + online_nodes = sorted(node.name for node in group_nodes if not node.offline) + sel = cls._SshNodeSelector(group_uuid, all_nodes) + + return (online_nodes, + dict((name, sorted([i.next() for i in sel])) + for name in online_nodes)) + def BuildHooksEnv(self): """Build hooks env. @@ -2426,7 +2612,8 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): """Verify integrity of the node group, performing various test on nodes. """ - # This method has too many local variables. pylint: disable-msg=R0914 + # This method has too many local variables. pylint: disable=R0914 + feedback_fn("* Verifying group '%s'" % self.group_info.name) if not self.my_node_names: # empty node group @@ -2434,7 +2621,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): return True self.bad = False - _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + _ErrorIf = self._ErrorIf # pylint: disable=C0103 verbose = self.op.verbose self._feedback_fn = feedback_fn @@ -2462,25 +2649,14 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names)) - # We will make nodes contact all nodes in their group, and one node from - # every other group. - # TODO: should it be a *random* node, different every time? - online_nodes = [node.name for node in node_data_list if not node.offline] - other_group_nodes = {} - - for name in sorted(self.all_node_info): - node = self.all_node_info[name] - if (node.group not in other_group_nodes - and node.group != self.group_uuid - and not node.offline): - other_group_nodes[node.group] = node.name - node_verify_param = { constants.NV_FILELIST: utils.UniqueSequence(filename for files in filemap for filename in files), - constants.NV_NODELIST: online_nodes + other_group_nodes.values(), + constants.NV_NODELIST: + self._SelectSshCheckNodes(node_data_list, self.group_uuid, + self.all_node_info.values()), constants.NV_HYPERVISOR: hypervisors, constants.NV_HVPARAMS: _GetAllHypervisorParameters(cluster, self.all_inst_info.values()), @@ -2642,7 +2818,8 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): feedback_fn("* Verifying node %s (%s)" % (node, ntype)) msg = all_nvinfo[node].fail_msg - _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg) + _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s", + msg) if msg: nimg.rpc_fail = True continue @@ -2677,9 +2854,9 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): for inst in non_primary_inst: test = inst in self.all_inst_info - _ErrorIf(test, self.EINSTANCEWRONGNODE, inst, + _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst, "instance should not run on node %s", node_i.name) - _ErrorIf(not test, self.ENODEORPHANINSTANCE, node_i.name, + _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name, "node is running unknown instance %s", inst) for node, result in extra_lv_nvinfo.items(): @@ -2698,11 +2875,11 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): pnode = inst_config.primary_node pnode_img = node_image[pnode] _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline, - self.ENODERPC, pnode, "instance %s, connection to" + constants.CV_ENODERPC, pnode, "instance %s, connection to" " primary node failed", instance) _ErrorIf(inst_config.admin_up and pnode_img.offline, - self.EINSTANCEBADNODE, instance, + constants.CV_EINSTANCEBADNODE, instance, "instance is marked as running and lives on offline node %s", inst_config.primary_node) @@ -2714,7 +2891,8 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): if not inst_config.secondary_nodes: i_non_redundant.append(instance) - _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT, + _ErrorIf(len(inst_config.secondary_nodes) > 1, + constants.CV_EINSTANCELAYOUT, instance, "instance has multiple secondary nodes: %s", utils.CommaJoin(inst_config.secondary_nodes), code=self.ETYPE_WARNING) @@ -2735,7 +2913,8 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): key=lambda (_, nodes): pnode in nodes, reverse=True)] - self._ErrorIf(len(instance_groups) > 1, self.EINSTANCESPLITGROUPS, + self._ErrorIf(len(instance_groups) > 1, + constants.CV_EINSTANCESPLITGROUPS, instance, "instance has primary and secondary nodes in" " different groups: %s", utils.CommaJoin(pretty_list), code=self.ETYPE_WARNING) @@ -2745,21 +2924,22 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): for snode in inst_config.secondary_nodes: s_img = node_image[snode] - _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode, - "instance %s, connection to secondary node failed", instance) + _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC, + snode, "instance %s, connection to secondary node failed", + instance) if s_img.offline: inst_nodes_offline.append(snode) # warn that the instance lives on offline nodes - _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance, + _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance, "instance has offline secondary node(s) %s", utils.CommaJoin(inst_nodes_offline)) # ... or ghost/non-vm_capable nodes for node in inst_config.all_nodes: - _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance, - "instance lives on ghost node %s", node) - _ErrorIf(not node_image[node].vm_capable, self.EINSTANCEBADNODE, + _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE, + instance, "instance lives on ghost node %s", node) + _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE, instance, "instance lives on non-vm_capable node %s", node) feedback_fn("* Verifying orphan volumes") @@ -2827,22 +3007,20 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): res = hooks_results[node_name] msg = res.fail_msg test = msg and not res.offline - self._ErrorIf(test, self.ENODEHOOKS, node_name, + self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name, "Communication failure in hooks execution: %s", msg) if res.offline or msg: - # No need to investigate payload if node is offline or gave an error. - # override manually lu_result here as _ErrorIf only - # overrides self.bad - lu_result = 1 + # No need to investigate payload if node is offline or gave + # an error. continue for script, hkr, output in res.payload: test = hkr == constants.HKR_FAIL - self._ErrorIf(test, self.ENODEHOOKS, node_name, + self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name, "Script %s failed, output:", script) if test: - output = self._HOOKS_INDENT_RE.sub(' ', output) + output = self._HOOKS_INDENT_RE.sub(" ", output) feedback_fn("%s" % output) - lu_result = 0 + lu_result = False return lu_result @@ -2854,11 +3032,91 @@ class LUClusterVerifyDisks(NoHooksLU): REQ_BGL = False def ExpandNames(self): + self.share_locks = _ShareAll() self.needed_locks = { - locking.LEVEL_NODE: locking.ALL_SET, - locking.LEVEL_INSTANCE: locking.ALL_SET, - } - self.share_locks = dict.fromkeys(locking.LEVELS, 1) + locking.LEVEL_NODEGROUP: locking.ALL_SET, + } + + def Exec(self, feedback_fn): + group_names = self.owned_locks(locking.LEVEL_NODEGROUP) + + # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group + return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)] + for group in group_names]) + + +class LUGroupVerifyDisks(NoHooksLU): + """Verifies the status of all disks in a node group. + + """ + REQ_BGL = False + + def ExpandNames(self): + # Raises errors.OpPrereqError on its own if group can't be found + self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) + + self.share_locks = _ShareAll() + self.needed_locks = { + locking.LEVEL_INSTANCE: [], + locking.LEVEL_NODEGROUP: [], + locking.LEVEL_NODE: [], + } + + def DeclareLocks(self, level): + if level == locking.LEVEL_INSTANCE: + assert not self.needed_locks[locking.LEVEL_INSTANCE] + + # Lock instances optimistically, needs verification once node and group + # locks have been acquired + self.needed_locks[locking.LEVEL_INSTANCE] = \ + self.cfg.GetNodeGroupInstances(self.group_uuid) + + elif level == locking.LEVEL_NODEGROUP: + assert not self.needed_locks[locking.LEVEL_NODEGROUP] + + self.needed_locks[locking.LEVEL_NODEGROUP] = \ + set([self.group_uuid] + + # Lock all groups used by instances optimistically; this requires + # going via the node before it's locked, requiring verification + # later on + [group_uuid + for instance_name in self.owned_locks(locking.LEVEL_INSTANCE) + for group_uuid in self.cfg.GetInstanceNodeGroups(instance_name)]) + + elif level == locking.LEVEL_NODE: + # This will only lock the nodes in the group to be verified which contain + # actual instances + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + self._LockInstancesNodes() + + # Lock all nodes in group to be verified + assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP) + member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members + self.needed_locks[locking.LEVEL_NODE].extend(member_nodes) + + def CheckPrereq(self): + owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE)) + owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) + owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE)) + + assert self.group_uuid in owned_groups + + # Check if locked instances are still correct + _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances) + + # Get instance information + self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances)) + + # Check if node groups for locked instances are still correct + for (instance_name, inst) in self.instances.items(): + assert owned_nodes.issuperset(inst.all_nodes), \ + "Instance %s's nodes changed while we kept the lock" % instance_name + + inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name, + owned_groups) + + assert self.group_uuid in inst_groups, \ + "Instance %s has no node in group %s" % (instance_name, self.group_uuid) def Exec(self, feedback_fn): """Verify integrity of cluster disks. @@ -2869,50 +3127,41 @@ class LUClusterVerifyDisks(NoHooksLU): missing volumes """ - result = res_nodes, res_instances, res_missing = {}, [], {} + res_nodes = {} + res_instances = set() + res_missing = {} - nodes = utils.NiceSort(self.cfg.GetVmCapableNodeList()) - instances = self.cfg.GetAllInstancesInfo().values() + nv_dict = _MapInstanceDisksToNodes([inst + for inst in self.instances.values() + if inst.admin_up]) - nv_dict = {} - for inst in instances: - inst_lvs = {} - if not inst.admin_up: - continue - inst.MapLVsByNode(inst_lvs) - # transform { iname: {node: [vol,],},} to {(node, vol): iname} - for node, vol_list in inst_lvs.iteritems(): - for vol in vol_list: - nv_dict[(node, vol)] = inst - - if not nv_dict: - return result - - node_lvs = self.rpc.call_lv_list(nodes, []) - for node, node_res in node_lvs.items(): - if node_res.offline: - continue - msg = node_res.fail_msg - if msg: - logging.warning("Error enumerating LVs on node %s: %s", node, msg) - res_nodes[node] = msg - continue + if nv_dict: + nodes = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) & + set(self.cfg.GetVmCapableNodeList())) - lvs = node_res.payload - for lv_name, (_, _, lv_online) in lvs.items(): - inst = nv_dict.pop((node, lv_name), None) - if (not lv_online and inst is not None - and inst.name not in res_instances): - res_instances.append(inst.name) + node_lvs = self.rpc.call_lv_list(nodes, []) - # any leftover items in nv_dict are missing LVs, let's arrange the - # data better - for key, inst in nv_dict.iteritems(): - if inst.name not in res_missing: - res_missing[inst.name] = [] - res_missing[inst.name].append(key) + for (node, node_res) in node_lvs.items(): + if node_res.offline: + continue - return result + msg = node_res.fail_msg + if msg: + logging.warning("Error enumerating LVs on node %s: %s", node, msg) + res_nodes[node] = msg + continue + + for lv_name, (_, _, lv_online) in node_res.payload.items(): + inst = nv_dict.pop((node, lv_name), None) + if not (lv_online or inst is None): + res_instances.add(inst) + + # any leftover items in nv_dict are missing LVs, let's arrange the data + # better + for key, inst in nv_dict.iteritems(): + res_missing.setdefault(inst, []).append(list(key)) + + return (res_nodes, list(res_instances), res_missing) class LUClusterRepairDiskSizes(NoHooksLU): @@ -2935,7 +3184,7 @@ class LUClusterRepairDiskSizes(NoHooksLU): locking.LEVEL_NODE: locking.ALL_SET, locking.LEVEL_INSTANCE: locking.ALL_SET, } - self.share_locks = dict.fromkeys(locking.LEVELS, 1) + self.share_locks = _ShareAll() def DeclareLocks(self, level): if level == locking.LEVEL_NODE and self.wanted_names is not None: @@ -2948,10 +3197,10 @@ class LUClusterRepairDiskSizes(NoHooksLU): """ if self.wanted_names is None: - self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE) + self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE) - self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name - in self.wanted_names] + self.wanted_instances = \ + map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names)) def _EnsureChildSizes(self, disk): """Ensure children of the disk have the needed disk size. @@ -3079,29 +3328,32 @@ class LUClusterRename(LogicalUnit): """ clustername = self.op.name - ip = self.ip + new_ip = self.ip # shutdown the master IP - master = self.cfg.GetMasterNode() - result = self.rpc.call_node_stop_master(master, False) + master_params = self.cfg.GetMasterNetworkParameters() + result = self.rpc.call_node_deactivate_master_ip(master_params.name, + master_params) result.Raise("Could not disable the master role") try: cluster = self.cfg.GetClusterInfo() cluster.cluster_name = clustername - cluster.master_ip = ip + cluster.master_ip = new_ip self.cfg.Update(cluster, feedback_fn) # update the known hosts file ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE) node_list = self.cfg.GetOnlineNodeList() try: - node_list.remove(master) + node_list.remove(master_params.name) except ValueError: pass _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE) finally: - result = self.rpc.call_node_start_master(master, False, False) + master_params.ip = new_ip + result = self.rpc.call_node_activate_master_ip(master_params.name, + master_params) msg = result.fail_msg if msg: self.LogWarning("Could not re-enable the master role on" @@ -3110,6 +3362,27 @@ class LUClusterRename(LogicalUnit): return clustername +def _ValidateNetmask(cfg, netmask): + """Checks if a netmask is valid. + + @type cfg: L{config.ConfigWriter} + @param cfg: The cluster configuration + @type netmask: int + @param netmask: the netmask to be verified + @raise errors.OpPrereqError: if the validation fails + + """ + ip_family = cfg.GetPrimaryIPFamily() + try: + ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family) + except errors.ProgrammerError: + raise errors.OpPrereqError("Invalid primary ip family: %s." % + ip_family) + if not ipcls.ValidateNetmask(netmask): + raise errors.OpPrereqError("CIDR netmask (%s) not valid" % + (netmask)) + + class LUClusterSetParams(LogicalUnit): """Change the parameters of the cluster. @@ -3131,6 +3404,9 @@ class LUClusterSetParams(LogicalUnit): if self.op.remove_uids: uidpool.CheckUidPool(self.op.remove_uids) + if self.op.master_netmask is not None: + _ValidateNetmask(self.cfg, self.op.master_netmask) + def ExpandNames(self): # FIXME: in the future maybe other cluster params won't require checking on # all nodes to be modified. @@ -3173,7 +3449,7 @@ class LUClusterSetParams(LogicalUnit): " drbd-based instances exist", errors.ECODE_INVAL) - node_list = self.glm.list_owned(locking.LEVEL_NODE) + node_list = self.owned_locks(locking.LEVEL_NODE) # if vg_name not None, checks given volume group on all nodes if self.op.vg_name: @@ -3195,8 +3471,7 @@ class LUClusterSetParams(LogicalUnit): if self.op.drbd_helper: # checks given drbd helper on all nodes helpers = self.rpc.call_drbd_helper(node_list) - for node in node_list: - ninfo = self.cfg.GetNodeInfo(node) + for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list): if ninfo.offline: self.LogInfo("Not checking drbd helper on offline node %s", node) continue @@ -3432,21 +3707,39 @@ class LUClusterSetParams(LogicalUnit): helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted") if self.op.master_netdev: - master = self.cfg.GetMasterNode() + master_params = self.cfg.GetMasterNetworkParameters() feedback_fn("Shutting down master ip on the current netdev (%s)" % self.cluster.master_netdev) - result = self.rpc.call_node_stop_master(master, False) + result = self.rpc.call_node_deactivate_master_ip(master_params.name, + master_params) result.Raise("Could not disable the master ip") feedback_fn("Changing master_netdev from %s to %s" % - (self.cluster.master_netdev, self.op.master_netdev)) + (master_params.netdev, self.op.master_netdev)) self.cluster.master_netdev = self.op.master_netdev + if self.op.master_netmask: + master_params = self.cfg.GetMasterNetworkParameters() + feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask) + result = self.rpc.call_node_change_master_netmask(master_params.name, + master_params.netmask, + self.op.master_netmask, + master_params.ip, + master_params.netdev) + if result.fail_msg: + msg = "Could not change the master IP netmask: %s" % result.fail_msg + self.LogWarning(msg) + feedback_fn(msg) + else: + self.cluster.master_netmask = self.op.master_netmask + self.cfg.Update(self.cluster, feedback_fn) if self.op.master_netdev: + master_params = self.cfg.GetMasterNetworkParameters() feedback_fn("Starting the master ip on the new master netdev (%s)" % self.op.master_netdev) - result = self.rpc.call_node_start_master(master, False, False) + result = self.rpc.call_node_activate_master_ip(master_params.name, + master_params) if result.fail_msg: self.LogWarning("Could not re-enable the master ip on" " the master, please restart manually: %s", @@ -3479,17 +3772,25 @@ def _ComputeAncillaryFiles(cluster, redist): constants.SSH_KNOWN_HOSTS_FILE, constants.CONFD_HMAC_KEY, constants.CLUSTER_DOMAIN_SECRET_FILE, + constants.SPICE_CERT_FILE, + constants.SPICE_CACERT_FILE, + constants.RAPI_USERS_FILE, ]) if not redist: files_all.update(constants.ALL_CERT_FILES) files_all.update(ssconf.SimpleStore().GetFileList()) + else: + # we need to ship at least the RAPI certificate + files_all.add(constants.RAPI_CERT_FILE) if cluster.modify_etc_hosts: files_all.add(constants.ETC_HOSTS) - # Files which must either exist on all nodes or on none - files_all_opt = set([ + # Files which are optional, these must: + # - be present in one other category as well + # - either exist or not exist on all nodes of that category (mc, vm all) + files_opt = set([ constants.RAPI_USERS_FILE, ]) @@ -3501,14 +3802,23 @@ def _ComputeAncillaryFiles(cluster, redist): # Files which should only be on VM-capable nodes files_vm = set(filename for hv_name in cluster.enabled_hypervisors - for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()) + for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[0]) - # Filenames must be unique - assert (len(files_all | files_all_opt | files_mc | files_vm) == - sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \ + files_opt |= set(filename + for hv_name in cluster.enabled_hypervisors + for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[1]) + + # Filenames in each category must be unique + all_files_set = files_all | files_mc | files_vm + assert (len(all_files_set) == + sum(map(len, [files_all, files_mc, files_vm]))), \ "Found file listed in more than one file list" - return (files_all, files_all_opt, files_mc, files_vm) + # Optional files must be present in one other category + assert all_files_set.issuperset(files_opt), \ + "Optional file not in a different required list" + + return (files_all, files_opt, files_mc, files_vm) def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True): @@ -3542,7 +3852,7 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True): nodelist.remove(master_info.name) # Gather file lists - (files_all, files_all_opt, files_mc, files_vm) = \ + (files_all, _, files_mc, files_vm) = \ _ComputeAncillaryFiles(cluster, True) # Never re-distribute configuration file from here @@ -3552,7 +3862,6 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True): filemap = [ (online_nodes, files_all), - (online_nodes, files_all_opt), (vm_nodes, files_vm), ] @@ -3584,6 +3893,31 @@ class LUClusterRedistConf(NoHooksLU): _RedistributeAncillaryFiles(self) +class LUClusterActivateMasterIp(NoHooksLU): + """Activate the master IP on the master node. + + """ + def Exec(self, feedback_fn): + """Activate the master IP. + + """ + master_params = self.cfg.GetMasterNetworkParameters() + self.rpc.call_node_activate_master_ip(master_params.name, + master_params) + + +class LUClusterDeactivateMasterIp(NoHooksLU): + """Deactivate the master IP on the master node. + + """ + def Exec(self, feedback_fn): + """Deactivate the master IP. + + """ + master_params = self.cfg.GetMasterNetworkParameters() + self.rpc.call_node_deactivate_master_ip(master_params.name, master_params) + + def _WaitForSync(lu, instance, disks=None, oneshot=False): """Sleep and poll for an instance's disk to sync. @@ -3755,9 +4089,7 @@ class LUOobCommand(NoHooksLU): if self.op.command in self._SKIP_MASTER: assert self.master_node not in self.op.node_names - for node_name in self.op.node_names: - node = self.cfg.GetNodeInfo(node_name) - + for (node_name, node) in self.cfg.GetMultiNodeInfo(self.op.node_names): if node is None: raise errors.OpPrereqError("Node %s not found" % node_name, errors.ECODE_NOENT) @@ -3874,6 +4206,7 @@ class LUOobCommand(NoHooksLU): raise errors.OpExecError("Check of out-of-band payload failed due to %s" % utils.CommaJoin(errs)) + class _OsQuery(_QueryBase): FIELDS = query.OS_FIELDS @@ -4081,15 +4414,12 @@ class LUNodeRemove(LogicalUnit): node = self.cfg.GetNodeInfo(self.op.node_name) assert node is not None - instance_list = self.cfg.GetInstanceList() - masternode = self.cfg.GetMasterNode() if node.name == masternode: raise errors.OpPrereqError("Node is the master node, failover to another" " node is required", errors.ECODE_INVAL) - for instance_name in instance_list: - instance = self.cfg.GetInstanceInfo(instance_name) + for instance_name, instance in self.cfg.GetAllInstancesInfo(): if node.name in instance.all_nodes: raise errors.OpPrereqError("Instance %s is still running on the node," " please remove first" % instance_name, @@ -4135,7 +4465,7 @@ class _NodeQuery(_QueryBase): def ExpandNames(self, lu): lu.needed_locks = {} - lu.share_locks[locking.LEVEL_NODE] = 1 + lu.share_locks = _ShareAll() if self.names: self.wanted = _GetWantedNodes(lu, self.names) @@ -4146,7 +4476,7 @@ class _NodeQuery(_QueryBase): query.NQ_LIVE in self.requested_data) if self.do_locking: - # if we don't request only static fields, we need to lock the nodes + # If any non-static field is requested we need to lock the nodes lu.needed_locks[locking.LEVEL_NODE] = self.wanted def DeclareLocks(self, lu, level): @@ -4210,7 +4540,7 @@ class LUNodeQuery(NoHooksLU): """Logical unit for querying nodes. """ - # pylint: disable-msg=W0142 + # pylint: disable=W0142 REQ_BGL = False def CheckArguments(self): @@ -4250,15 +4580,11 @@ class LUNodeQueryvols(NoHooksLU): """Computes the list of nodes and their attributes. """ - nodenames = self.glm.list_owned(locking.LEVEL_NODE) + nodenames = self.owned_locks(locking.LEVEL_NODE) volumes = self.rpc.call_node_volumes(nodenames) ilist = self.cfg.GetAllInstancesInfo() - - vol2inst = dict(((node, vol), inst.name) - for inst in ilist.values() - for (node, vols) in inst.MapLVsByNode().items() - for vol in vols) + vol2inst = _MapInstanceDisksToNodes(ilist.values()) output = [] for node in nodenames: @@ -4279,13 +4605,13 @@ class LUNodeQueryvols(NoHooksLU): if field == "node": val = node elif field == "phys": - val = vol['dev'] + val = vol["dev"] elif field == "vg": - val = vol['vg'] + val = vol["vg"] elif field == "name": - val = vol['name'] + val = vol["name"] elif field == "size": - val = int(float(vol['size'])) + val = int(float(vol["size"])) elif field == "instance": val = vol2inst.get((node, vol["vg"] + "/" + vol["name"]), "-") else: @@ -4323,7 +4649,7 @@ class LUNodeQueryStorage(NoHooksLU): """Computes the list of nodes and their attributes. """ - self.nodes = self.glm.list_owned(locking.LEVEL_NODE) + self.nodes = self.owned_locks(locking.LEVEL_NODE) # Always get name to sort by if constants.SF_NAME in self.op.output_fields: @@ -4385,8 +4711,7 @@ class _InstanceQuery(_QueryBase): def ExpandNames(self, lu): lu.needed_locks = {} - lu.share_locks[locking.LEVEL_INSTANCE] = 1 - lu.share_locks[locking.LEVEL_NODE] = 1 + lu.share_locks = _ShareAll() if self.names: self.wanted = _GetWantedInstances(lu, self.names) @@ -4397,17 +4722,43 @@ class _InstanceQuery(_QueryBase): query.IQ_LIVE in self.requested_data) if self.do_locking: lu.needed_locks[locking.LEVEL_INSTANCE] = self.wanted + lu.needed_locks[locking.LEVEL_NODEGROUP] = [] lu.needed_locks[locking.LEVEL_NODE] = [] lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + self.do_grouplocks = (self.do_locking and + query.IQ_NODES in self.requested_data) + def DeclareLocks(self, lu, level): - if level == locking.LEVEL_NODE and self.do_locking: - lu._LockInstancesNodes() # pylint: disable-msg=W0212 + if self.do_locking: + if level == locking.LEVEL_NODEGROUP and self.do_grouplocks: + assert not lu.needed_locks[locking.LEVEL_NODEGROUP] + + # Lock all groups used by instances optimistically; this requires going + # via the node before it's locked, requiring verification later on + lu.needed_locks[locking.LEVEL_NODEGROUP] = \ + set(group_uuid + for instance_name in lu.owned_locks(locking.LEVEL_INSTANCE) + for group_uuid in lu.cfg.GetInstanceNodeGroups(instance_name)) + elif level == locking.LEVEL_NODE: + lu._LockInstancesNodes() # pylint: disable=W0212 + + @staticmethod + def _CheckGroupLocks(lu): + owned_instances = frozenset(lu.owned_locks(locking.LEVEL_INSTANCE)) + owned_groups = frozenset(lu.owned_locks(locking.LEVEL_NODEGROUP)) + + # Check if node groups for locked instances are still correct + for instance_name in owned_instances: + _CheckInstanceNodeGroups(lu.cfg, instance_name, owned_groups) def _GetQueryData(self, lu): """Computes the list of instances and their attributes. """ + if self.do_grouplocks: + self._CheckGroupLocks(lu) + cluster = lu.cfg.GetClusterInfo() all_info = lu.cfg.GetAllInstancesInfo() @@ -4470,22 +4821,34 @@ class _InstanceQuery(_QueryBase): else: consinfo = None + if query.IQ_NODES in self.requested_data: + node_names = set(itertools.chain(*map(operator.attrgetter("all_nodes"), + instance_list))) + nodes = dict(lu.cfg.GetMultiNodeInfo(node_names)) + groups = dict((uuid, lu.cfg.GetNodeGroup(uuid)) + for uuid in set(map(operator.attrgetter("group"), + nodes.values()))) + else: + nodes = None + groups = None + return query.InstanceQueryData(instance_list, lu.cfg.GetClusterInfo(), disk_usage, offline_nodes, bad_nodes, - live_data, wrongnode_inst, consinfo) + live_data, wrongnode_inst, consinfo, + nodes, groups) class LUQuery(NoHooksLU): """Query for resources/items of a certain kind. """ - # pylint: disable-msg=W0142 + # pylint: disable=W0142 REQ_BGL = False def CheckArguments(self): qcls = _GetQueryImplementation(self.op.what) - self.impl = qcls(self.op.filter, self.op.fields, False) + self.impl = qcls(self.op.qfilter, self.op.fields, self.op.use_locking) def ExpandNames(self): self.impl.ExpandNames(self) @@ -4501,7 +4864,7 @@ class LUQueryFields(NoHooksLU): """Query for resources/items of a certain kind. """ - # pylint: disable-msg=W0142 + # pylint: disable=W0142 REQ_BGL = False def CheckArguments(self): @@ -4641,9 +5004,7 @@ class LUNodeAdd(LogicalUnit): self.changed_primary_ip = False - for existing_node_name in node_list: - existing_node = cfg.GetNodeInfo(existing_node_name) - + for existing_node_name, existing_node in cfg.GetMultiNodeInfo(node_list): if self.op.readd and node == existing_node_name: if existing_node.secondary_ip != secondary_ip: raise errors.OpPrereqError("Readded node doesn't have the same IP" @@ -4750,7 +5111,7 @@ class LUNodeAdd(LogicalUnit): # later in the procedure; this also means that if the re-add # fails, we are left with a non-offlined, broken node if self.op.readd: - new_node.drained = new_node.offline = False # pylint: disable-msg=W0201 + new_node.drained = new_node.offline = False # pylint: disable=W0201 self.LogInfo("Readding a node, the offline/drained flags were reset") # if we demote the node, we do cleanup later in the procedure new_node.master_candidate = self.master_candidate @@ -4796,7 +5157,7 @@ class LUNodeAdd(LogicalUnit): node_verify_list = [self.cfg.GetMasterNode()] node_verify_param = { - constants.NV_NODELIST: [node], + constants.NV_NODELIST: ([node], {}), # TODO: do a node-net-test as well? } @@ -4898,8 +5259,8 @@ class LUNodeSetParams(LogicalUnit): instances_keep = [] # Build list of instances to release - for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE): - instance = self.context.cfg.GetInstanceInfo(instance_name) + locked_i = self.owned_locks(locking.LEVEL_INSTANCE) + for instance_name, instance in self.cfg.GetMultiInstanceInfo(locked_i): if (instance.disk_template in constants.DTS_INT_MIRROR and self.op.node_name in instance.all_nodes): instances_keep.append(instance_name) @@ -4907,7 +5268,7 @@ class LUNodeSetParams(LogicalUnit): _ReleaseLocks(self, locking.LEVEL_INSTANCE, keep=instances_keep) - assert (set(self.glm.list_owned(locking.LEVEL_INSTANCE)) == + assert (set(self.owned_locks(locking.LEVEL_INSTANCE)) == set(instances_keep)) def BuildHooksEnv(self): @@ -5029,7 +5390,9 @@ class LUNodeSetParams(LogicalUnit): if old_role == self._ROLE_OFFLINE and new_role != old_role: # Trying to transition out of offline status - result = self.rpc.call_version([node.name])[node.name] + # TODO: Use standard RPC runner, but make sure it works when the node is + # still marked offline + result = rpc.BootstrapRunner().call_version([node.name])[node.name] if result.fail_msg: raise errors.OpPrereqError("Node %s is being de-offlined but fails" " to report its version: %s" % @@ -5208,6 +5571,7 @@ class LUClusterQuery(NoHooksLU): "ndparams": cluster.ndparams, "candidate_pool_size": cluster.candidate_pool_size, "master_netdev": cluster.master_netdev, + "master_netmask": cluster.master_netmask, "volume_group_name": cluster.volume_group_name, "drbd_usermode_helper": cluster.drbd_usermode_helper, "file_storage_dir": cluster.file_storage_dir, @@ -5526,7 +5890,7 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name): nodeinfo = lu.rpc.call_node_info([node], None, hypervisor_name) nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True, ecode=errors.ECODE_ENVIRON) - free_mem = nodeinfo[node].payload.get('memory_free', None) + free_mem = nodeinfo[node].payload.get("memory_free", None) if not isinstance(free_mem, int): raise errors.OpPrereqError("Can't compute free memory on node %s, result" " was '%s'" % (node, free_mem), @@ -5598,6 +5962,40 @@ def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested): errors.ECODE_NORES) +def _CheckNodesPhysicalCPUs(lu, nodenames, requested, hypervisor_name): + """Checks if nodes have enough physical CPUs + + This function checks if all given nodes have the needed number of + physical CPUs. In case any node has less CPUs or we cannot get the + information from the node, this function raises an OpPrereqError + exception. + + @type lu: C{LogicalUnit} + @param lu: a logical unit from which we get configuration data + @type nodenames: C{list} + @param nodenames: the list of node names to check + @type requested: C{int} + @param requested: the minimum acceptable number of physical CPUs + @raise errors.OpPrereqError: if the node doesn't have enough CPUs, + or we cannot check the node + + """ + nodeinfo = lu.rpc.call_node_info(nodenames, None, hypervisor_name) + for node in nodenames: + info = nodeinfo[node] + info.Raise("Cannot get current information from node %s" % node, + prereq=True, ecode=errors.ECODE_ENVIRON) + num_cpus = info.payload.get("cpu_total", None) + if not isinstance(num_cpus, int): + raise errors.OpPrereqError("Can't compute the number of physical CPUs" + " on node %s, result was '%s'" % + (node, num_cpus), errors.ECODE_ENVIRON) + if requested > num_cpus: + raise errors.OpPrereqError("Node %s has %s physical CPUs, but %s are " + "required" % (node, num_cpus, requested), + errors.ECODE_NORES) + + class LUInstanceStartup(LogicalUnit): """Starts an instance. @@ -5700,9 +6098,11 @@ class LUInstanceStartup(LogicalUnit): _StartInstanceDisks(self, instance, force) - result = self.rpc.call_instance_start(node_current, instance, - self.op.hvparams, self.op.beparams, - self.op.startup_paused) + result = \ + self.rpc.call_instance_start(node_current, + (instance, self.op.hvparams, + self.op.beparams), + self.op.startup_paused) msg = result.fail_msg if msg: _ShutdownInstanceDisks(self, instance) @@ -5792,8 +6192,8 @@ class LUInstanceReboot(LogicalUnit): self.LogInfo("Instance %s was already stopped, starting now", instance.name) _StartInstanceDisks(self, instance, ignore_secondaries) - result = self.rpc.call_instance_start(node_current, instance, - None, None, False) + result = self.rpc.call_instance_start(node_current, + (instance, None, None), False) msg = result.fail_msg if msg: _ShutdownInstanceDisks(self, instance) @@ -5954,9 +6354,9 @@ class LUInstanceReinstall(LogicalUnit): try: feedback_fn("Running the instance OS create scripts...") # FIXME: pass debug option from opcode to backend - result = self.rpc.call_instance_os_add(inst.primary_node, inst, True, - self.op.debug_level, - osparams=self.os_inst) + result = self.rpc.call_instance_os_add(inst.primary_node, + (inst, self.os_inst), True, + self.op.debug_level) result.Raise("Could not install OS for instance %s on node %s" % (inst.name, inst.primary_node)) finally: @@ -6310,7 +6710,7 @@ class LUInstanceQuery(NoHooksLU): """Logical unit for querying instances. """ - # pylint: disable-msg=W0142 + # pylint: disable=W0142 REQ_BGL = False def CheckArguments(self): @@ -6657,8 +7057,8 @@ class LUInstanceMove(LogicalUnit): _ShutdownInstanceDisks(self, instance) raise errors.OpExecError("Can't activate the instance's disks") - result = self.rpc.call_instance_start(target_node, instance, - None, None, False) + result = self.rpc.call_instance_start(target_node, + (instance, None, None), False) msg = result.fail_msg if msg: _ShutdownInstanceDisks(self, instance) @@ -6680,7 +7080,7 @@ class LUNodeMigrate(LogicalUnit): def ExpandNames(self): self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) - self.share_locks = dict.fromkeys(locking.LEVELS, 1) + self.share_locks = _ShareAll() self.needed_locks = { locking.LEVEL_NODE: [self.op.node_name], } @@ -6721,7 +7121,7 @@ class LUNodeMigrate(LogicalUnit): # running the iallocator and the actual migration, a good consistency model # will have to be found. - assert (frozenset(self.glm.list_owned(locking.LEVEL_NODE)) == + assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) == frozenset([self.op.node_name])) return ResultWithJobs(jobs) @@ -6751,6 +7151,11 @@ class TLMigrateInstance(Tasklet): @ivar shutdown_timeout: In case of failover timeout of the shutdown """ + + # Constants + _MIGRATION_POLL_INTERVAL = 1 # seconds + _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds + def __init__(self, lu, instance_name, cleanup=False, failover=False, fallback=False, ignore_consistency=False, @@ -7074,12 +7479,13 @@ class TLMigrateInstance(Tasklet): """ instance = self.instance target_node = self.target_node + source_node = self.source_node migration_info = self.migration_info - abort_result = self.rpc.call_finalize_migration(target_node, - instance, - migration_info, - False) + abort_result = self.rpc.call_instance_finalize_migration_dst(target_node, + instance, + migration_info, + False) abort_msg = abort_result.fail_msg if abort_msg: logging.error("Aborting migration failed on target node %s: %s", @@ -7087,6 +7493,13 @@ class TLMigrateInstance(Tasklet): # Don't raise an exception here, as we stil have to try to revert the # disk status, even if this step failed. + abort_result = self.rpc.call_instance_finalize_migration_src(source_node, + instance, False, self.live) + abort_msg = abort_result.fail_msg + if abort_msg: + logging.error("Aborting migration failed on source node %s: %s", + source_node, abort_msg) + def _ExecMigration(self): """Migrate an instance. @@ -7103,6 +7516,21 @@ class TLMigrateInstance(Tasklet): target_node = self.target_node source_node = self.source_node + # Check for hypervisor version mismatch and warn the user. + nodeinfo = self.rpc.call_node_info([source_node, target_node], + None, self.instance.hypervisor) + src_info = nodeinfo[source_node] + dst_info = nodeinfo[target_node] + + if ((constants.HV_NODEINFO_KEY_VERSION in src_info.payload) and + (constants.HV_NODEINFO_KEY_VERSION in dst_info.payload)): + src_version = src_info.payload[constants.HV_NODEINFO_KEY_VERSION] + dst_version = dst_info.payload[constants.HV_NODEINFO_KEY_VERSION] + if src_version != dst_version: + self.feedback_fn("* warning: hypervisor version mismatch between" + " source (%s) and target (%s) node" % + (src_version, dst_version)) + self.feedback_fn("* checking disk consistency between source and target") for dev in instance.disks: if not _CheckDiskConsistency(self.lu, dev, target_node, False): @@ -7158,18 +7586,59 @@ class TLMigrateInstance(Tasklet): raise errors.OpExecError("Could not migrate instance %s: %s" % (instance.name, msg)) + self.feedback_fn("* starting memory transfer") + last_feedback = time.time() + while True: + result = self.rpc.call_instance_get_migration_status(source_node, + instance) + msg = result.fail_msg + ms = result.payload # MigrationStatus instance + if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES): + logging.error("Instance migration failed, trying to revert" + " disk status: %s", msg) + self.feedback_fn("Migration failed, aborting") + self._AbortMigration() + self._RevertDiskStatus() + raise errors.OpExecError("Could not migrate instance %s: %s" % + (instance.name, msg)) + + if result.payload.status != constants.HV_MIGRATION_ACTIVE: + self.feedback_fn("* memory transfer complete") + break + + if (utils.TimeoutExpired(last_feedback, + self._MIGRATION_FEEDBACK_INTERVAL) and + ms.transferred_ram is not None): + mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram) + self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress) + last_feedback = time.time() + + time.sleep(self._MIGRATION_POLL_INTERVAL) + + result = self.rpc.call_instance_finalize_migration_src(source_node, + instance, + True, + self.live) + msg = result.fail_msg + if msg: + logging.error("Instance migration succeeded, but finalization failed" + " on the source node: %s", msg) + raise errors.OpExecError("Could not finalize instance migration: %s" % + msg) + instance.primary_node = target_node + # distribute new instance config to the other nodes self.cfg.Update(instance, self.feedback_fn) - result = self.rpc.call_finalize_migration(target_node, - instance, - migration_info, - True) + result = self.rpc.call_instance_finalize_migration_dst(target_node, + instance, + migration_info, + True) msg = result.fail_msg if msg: - logging.error("Instance migration succeeded, but finalization failed:" - " %s", msg) + logging.error("Instance migration succeeded, but finalization failed" + " on the target node: %s", msg) raise errors.OpExecError("Could not finalize instance migration: %s" % msg) @@ -7252,7 +7721,7 @@ class TLMigrateInstance(Tasklet): self.feedback_fn("* starting the instance on the target node %s" % target_node) - result = self.rpc.call_instance_start(target_node, instance, None, None, + result = self.rpc.call_instance_start(target_node, (instance, None, None), False) msg = result.fail_msg if msg: @@ -7274,10 +7743,8 @@ class TLMigrateInstance(Tasklet): # directly, or through an iallocator. self.all_nodes = [self.source_node, self.target_node] - self.nodes_ip = { - self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip, - self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip, - } + self.nodes_ip = dict((name, node.secondary_ip) for (name, node) + in self.cfg.GetMultiNodeInfo(self.all_nodes)) if self.failover: feedback_fn("Failover instance %s" % self.instance.name) @@ -7386,7 +7853,7 @@ def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names, shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId()) dev_data = objects.Disk(dev_type=constants.LD_LV, size=size, logical_id=(vgnames[0], names[0])) - dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128, + dev_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE, logical_id=(vgnames[1], names[1])) drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size, logical_id=(primary, secondary, port, @@ -7706,14 +8173,14 @@ def _ComputeDiskSizePerVG(disk_template, disks): constants.DT_DISKLESS: {}, constants.DT_PLAIN: _compute(disks, 0), # 128 MB are added for drbd metadata for each disk - constants.DT_DRBD8: _compute(disks, 128), + constants.DT_DRBD8: _compute(disks, DRBD_META_SIZE), constants.DT_FILE: {}, constants.DT_SHARED_FILE: {}, } if disk_template not in req_size_dict: raise errors.ProgrammerError("Disk template '%s' size requirement" - " is unknown" % disk_template) + " is unknown" % disk_template) return req_size_dict[disk_template] @@ -7727,7 +8194,8 @@ def _ComputeDiskSize(disk_template, disks): constants.DT_DISKLESS: None, constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks), # 128 MB are added for drbd metadata for each disk - constants.DT_DRBD8: sum(d[constants.IDISK_SIZE] + 128 for d in disks), + constants.DT_DRBD8: + sum(d[constants.IDISK_SIZE] + DRBD_META_SIZE for d in disks), constants.DT_FILE: None, constants.DT_SHARED_FILE: 0, constants.DT_BLOCK: 0, @@ -7735,7 +8203,7 @@ def _ComputeDiskSize(disk_template, disks): if disk_template not in req_size_dict: raise errors.ProgrammerError("Disk template '%s' size requirement" - " is unknown" % disk_template) + " is unknown" % disk_template) return req_size_dict[disk_template] @@ -7773,9 +8241,11 @@ def _CheckHVParams(lu, nodenames, hvname, hvparams): """ nodenames = _FilterVmNodes(lu, nodenames) - hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames, - hvname, - hvparams) + + cluster = lu.cfg.GetClusterInfo() + hvfull = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams) + + hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames, hvname, hvfull) for node in nodenames: info = hvinfo[node] if info.offline: @@ -7801,7 +8271,7 @@ def _CheckOSParams(lu, required, nodenames, osname, osparams): """ nodenames = _FilterVmNodes(lu, nodenames) - result = lu.rpc.call_os_validate(required, nodenames, osname, + result = lu.rpc.call_os_validate(nodenames, required, osname, [constants.OS_VALIDATE_PARAMETERS], osparams) for node, nres in result.items(): @@ -8015,8 +8485,8 @@ class LUInstanceCreate(LogicalUnit): self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET self.op.src_node = None if os.path.isabs(src_path): - raise errors.OpPrereqError("Importing an instance from an absolute" - " path requires a source node option", + raise errors.OpPrereqError("Importing an instance from a path" + " requires a source node option", errors.ECODE_INVAL) else: self.op.src_node = src_node = _ExpandNodeName(self.cfg, src_node) @@ -8119,7 +8589,7 @@ class LUInstanceCreate(LogicalUnit): src_path = self.op.src_path if src_node is None: - locked_nodes = self.glm.list_owned(locking.LEVEL_NODE) + locked_nodes = self.owned_locks(locking.LEVEL_NODE) exp_list = self.rpc.call_export_list(locked_nodes) found = False for node in exp_list: @@ -8165,33 +8635,39 @@ class LUInstanceCreate(LogicalUnit): if einfo.has_option(constants.INISECT_INS, "disk_template"): self.op.disk_template = einfo.get(constants.INISECT_INS, "disk_template") + if self.op.disk_template not in constants.DISK_TEMPLATES: + raise errors.OpPrereqError("Disk template specified in configuration" + " file is not one of the allowed values:" + " %s" % " ".join(constants.DISK_TEMPLATES)) else: raise errors.OpPrereqError("No disk template specified and the export" " is missing the disk_template information", errors.ECODE_INVAL) if not self.op.disks: - if einfo.has_option(constants.INISECT_INS, "disk_count"): - disks = [] - # TODO: import the disk iv_name too - for idx in range(einfo.getint(constants.INISECT_INS, "disk_count")): + disks = [] + # TODO: import the disk iv_name too + for idx in range(constants.MAX_DISKS): + if einfo.has_option(constants.INISECT_INS, "disk%d_size" % idx): disk_sz = einfo.getint(constants.INISECT_INS, "disk%d_size" % idx) disks.append({constants.IDISK_SIZE: disk_sz}) - self.op.disks = disks - else: + self.op.disks = disks + if not disks and self.op.disk_template != constants.DT_DISKLESS: raise errors.OpPrereqError("No disk info specified and the export" " is missing the disk information", errors.ECODE_INVAL) - if (not self.op.nics and - einfo.has_option(constants.INISECT_INS, "nic_count")): + if not self.op.nics: nics = [] - for idx in range(einfo.getint(constants.INISECT_INS, "nic_count")): - ndict = {} - for name in list(constants.NICS_PARAMETERS) + ["ip", "mac"]: - v = einfo.get(constants.INISECT_INS, "nic%d_%s" % (idx, name)) - ndict[name] = v - nics.append(ndict) + for idx in range(constants.MAX_NICS): + if einfo.has_option(constants.INISECT_INS, "nic%d_mac" % idx): + ndict = {} + for name in list(constants.NICS_PARAMETERS) + ["ip", "mac"]: + v = einfo.get(constants.INISECT_INS, "nic%d_%s" % (idx, name)) + ndict[name] = v + nics.append(ndict) + else: + break self.op.nics = nics if not self.op.tags and einfo.has_option(constants.INISECT_INS, "tags"): @@ -8277,7 +8753,7 @@ class LUInstanceCreate(LogicalUnit): joinargs.append(self.op.instance_name) - # pylint: disable-msg=W0142 + # pylint: disable=W0142 self.instance_file_storage_dir = utils.PathJoin(*joinargs) def CheckPrereq(self): @@ -8295,7 +8771,8 @@ class LUInstanceCreate(LogicalUnit): raise errors.OpPrereqError("Cluster does not support lvm-based" " instances", errors.ECODE_STATE) - if self.op.hypervisor is None: + if (self.op.hypervisor is None or + self.op.hypervisor == constants.VALUE_AUTO): self.op.hypervisor = self.cfg.GetHypervisorType() cluster = self.cfg.GetClusterInfo() @@ -8321,6 +8798,10 @@ class LUInstanceCreate(LogicalUnit): _CheckGlobalHvParams(self.op.hvparams) # fill and remember the beparams dict + default_beparams = cluster.beparams[constants.PP_DEFAULT] + for param, value in self.op.beparams.iteritems(): + if value == constants.VALUE_AUTO: + self.op.beparams[param] = default_beparams[param] utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES) self.be_full = cluster.SimpleFillBE(self.op.beparams) @@ -8337,7 +8818,7 @@ class LUInstanceCreate(LogicalUnit): for idx, nic in enumerate(self.op.nics): nic_mode_req = nic.get(constants.INIC_MODE, None) nic_mode = nic_mode_req - if nic_mode is None: + if nic_mode is None or nic_mode == constants.VALUE_AUTO: nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE] # in routed mode, for the first nic, the default ip is 'auto' @@ -8381,9 +8862,11 @@ class LUInstanceCreate(LogicalUnit): # Build nic parameters link = nic.get(constants.INIC_LINK, None) + if link == constants.VALUE_AUTO: + link = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_LINK] nicparams = {} if nic_mode_req: - nicparams[constants.NIC_MODE] = nic_mode_req + nicparams[constants.NIC_MODE] = nic_mode if link: nicparams[constants.NIC_LINK] = link @@ -8420,19 +8903,9 @@ class LUInstanceCreate(LogicalUnit): self.disks.append(new_disk) if self.op.mode == constants.INSTANCE_IMPORT: - - # Check that the new instance doesn't have less disks than the export - instance_disks = len(self.disks) - export_disks = export_info.getint(constants.INISECT_INS, 'disk_count') - if instance_disks < export_disks: - raise errors.OpPrereqError("Not enough disks to import." - " (instance: %d, export: %d)" % - (instance_disks, export_disks), - errors.ECODE_INVAL) - disk_images = [] - for idx in range(export_disks): - option = 'disk%d_dump' % idx + for idx in range(len(self.disks)): + option = "disk%d_dump" % idx if export_info.has_option(constants.INISECT_INS, option): # FIXME: are the old os-es, disk sizes, etc. useful? export_name = export_info.get(constants.INISECT_INS, option) @@ -8443,17 +8916,11 @@ class LUInstanceCreate(LogicalUnit): self.src_images = disk_images - old_name = export_info.get(constants.INISECT_INS, 'name') - try: - exp_nic_count = export_info.getint(constants.INISECT_INS, 'nic_count') - except (TypeError, ValueError), err: - raise errors.OpPrereqError("Invalid export file, nic_count is not" - " an integer: %s" % str(err), - errors.ECODE_STATE) + old_name = export_info.get(constants.INISECT_INS, "name") if self.op.instance_name == old_name: for idx, nic in enumerate(self.nics): - if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx: - nic_mac_ini = 'nic%d_mac' % idx + if nic.mac == constants.VALUE_AUTO: + nic_mac_ini = "nic%d_mac" % idx nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini) # ENDIF: self.op.mode == constants.INSTANCE_IMPORT @@ -8649,7 +9116,7 @@ class LUInstanceCreate(LogicalUnit): # 'fake' LV disks with the old data, plus the new unique_id tmp_disks = [objects.Disk.FromDict(v.ToDict()) for v in disks] rename_to = [] - for t_dsk, a_dsk in zip (tmp_disks, self.disks): + for t_dsk, a_dsk in zip(tmp_disks, self.disks): rename_to.append(t_dsk.logical_id) t_dsk.logical_id = (t_dsk.logical_id[0], a_dsk[constants.IDISK_ADOPT]) self.cfg.SetDiskID(t_dsk, pnode_name) @@ -8700,7 +9167,6 @@ class LUInstanceCreate(LogicalUnit): disk_abort = not _WaitForSync(self, iobj) elif iobj.disk_template in constants.DTS_INT_MIRROR: # make sure the disks are not degraded (still sync-ing is ok) - time.sleep(15) feedback_fn("* checking mirrors status") disk_abort = not _WaitForSync(self, iobj, oneshot=True) else: @@ -8717,12 +9183,33 @@ class LUInstanceCreate(LogicalUnit): if iobj.disk_template != constants.DT_DISKLESS and not self.adopt_disks: if self.op.mode == constants.INSTANCE_CREATE: if not self.op.no_install: + pause_sync = (iobj.disk_template in constants.DTS_INT_MIRROR and + not self.op.wait_for_sync) + if pause_sync: + feedback_fn("* pausing disk sync to install instance OS") + result = self.rpc.call_blockdev_pause_resume_sync(pnode_name, + iobj.disks, True) + for idx, success in enumerate(result.payload): + if not success: + logging.warn("pause-sync of instance %s for disk %d failed", + instance, idx) + feedback_fn("* running the instance OS create scripts...") # FIXME: pass debug option from opcode to backend - result = self.rpc.call_instance_os_add(pnode_name, iobj, False, - self.op.debug_level) - result.Raise("Could not add os for instance %s" - " on node %s" % (instance, pnode_name)) + os_add_result = \ + self.rpc.call_instance_os_add(pnode_name, (iobj, None), False, + self.op.debug_level) + if pause_sync: + feedback_fn("* resuming disk sync") + result = self.rpc.call_blockdev_pause_resume_sync(pnode_name, + iobj.disks, False) + for idx, success in enumerate(result.payload): + if not success: + logging.warn("resume-sync of instance %s for disk %d failed", + instance, idx) + + os_add_result.Raise("Could not add os for instance %s" + " on node %s" % (instance, pnode_name)) elif self.op.mode == constants.INSTANCE_IMPORT: feedback_fn("* running the instance OS import scripts...") @@ -8790,8 +9277,8 @@ class LUInstanceCreate(LogicalUnit): self.cfg.Update(iobj, feedback_fn) logging.info("Starting instance %s on node %s", instance, pnode_name) feedback_fn("* starting instance...") - result = self.rpc.call_instance_start(pnode_name, iobj, - None, None, False) + result = self.rpc.call_instance_start(pnode_name, (iobj, None, None), + False) result.Raise("Could not start instance") return list(iobj.all_nodes) @@ -8927,7 +9414,7 @@ class LUInstanceReplaceDisks(LogicalUnit): # Lock member nodes of all locked groups self.needed_locks[locking.LEVEL_NODE] = [node_name - for group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP) + for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP) for node_name in self.cfg.GetNodeGroup(group_uuid).members] else: self._LockInstancesNodes() @@ -8967,16 +9454,9 @@ class LUInstanceReplaceDisks(LogicalUnit): assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or self.op.iallocator is None) - owned_groups = self.glm.list_owned(locking.LEVEL_NODEGROUP) + owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) if owned_groups: - groups = self.cfg.GetInstanceNodeGroups(self.op.instance_name) - if owned_groups != groups: - raise errors.OpExecError("Node groups used by instance '%s' changed" - " since lock was acquired, current list is %r," - " used to be '%s'" % - (self.op.instance_name, - utils.CommaJoin(groups), - utils.CommaJoin(owned_groups))) + _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups) return LogicalUnit.CheckPrereq(self) @@ -9041,7 +9521,7 @@ class TLReplaceDisks(Tasklet): ial = IAllocator(lu.cfg, lu.rpc, mode=constants.IALLOCATOR_MODE_RELOC, name=instance_name, - relocate_from=relocate_from) + relocate_from=list(relocate_from)) ial.Run(iallocator_name) @@ -9065,6 +9545,9 @@ class TLReplaceDisks(Tasklet): return remote_node_name def _FindFaultyDisks(self, node_name): + """Wrapper for L{_FindFaultyInstanceDisks}. + + """ return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance, node_name, True) @@ -9135,7 +9618,7 @@ class TLReplaceDisks(Tasklet): if remote_node is None: self.remote_node_info = None else: - assert remote_node in self.lu.glm.list_owned(locking.LEVEL_NODE), \ + assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \ "Remote node '%s' is not locked" % remote_node self.remote_node_info = self.cfg.GetNodeInfo(remote_node) @@ -9241,9 +9724,8 @@ class TLReplaceDisks(Tasklet): instance.FindDisk(disk_idx) # Get secondary node IP addresses - self.node_secondary_ip = \ - dict((node_name, self.cfg.GetNodeInfo(node_name).secondary_ip) - for node_name in touched_nodes) + self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node) + in self.cfg.GetMultiNodeInfo(touched_nodes)) def Exec(self, feedback_fn): """Execute disk replacement. @@ -9256,13 +9738,13 @@ class TLReplaceDisks(Tasklet): if __debug__: # Verify owned locks before starting operation - owned_locks = self.lu.glm.list_owned(locking.LEVEL_NODE) - assert set(owned_locks) == set(self.node_secondary_ip), \ + owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE) + assert set(owned_nodes) == set(self.node_secondary_ip), \ ("Incorrect node locks, owning %s, expected %s" % - (owned_locks, self.node_secondary_ip.keys())) + (owned_nodes, self.node_secondary_ip.keys())) - owned_locks = self.lu.glm.list_owned(locking.LEVEL_INSTANCE) - assert list(owned_locks) == [self.instance_name], \ + owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE) + assert list(owned_instances) == [self.instance_name], \ "Instance '%s' not locked" % self.instance_name assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \ @@ -9297,12 +9779,12 @@ class TLReplaceDisks(Tasklet): if __debug__: # Verify owned locks - owned_locks = self.lu.glm.list_owned(locking.LEVEL_NODE) + owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE) nodes = frozenset(self.node_secondary_ip) - assert ((self.early_release and not owned_locks) or - (not self.early_release and not (set(owned_locks) - nodes))), \ + assert ((self.early_release and not owned_nodes) or + (not self.early_release and not (set(owned_nodes) - nodes))), \ ("Not owning the correct locks, early_release=%s, owned=%r," - " nodes=%r" % (self.early_release, owned_locks, nodes)) + " nodes=%r" % (self.early_release, owned_nodes, nodes)) return result @@ -9380,7 +9862,7 @@ class TLReplaceDisks(Tasklet): lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size, logical_id=(vg_data, names[0])) vg_meta = dev.children[1].logical_id[0] - lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128, + lv_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE, logical_id=(vg_meta, names[1])) new_lvs = [lv_data, lv_meta] @@ -9422,7 +9904,7 @@ class TLReplaceDisks(Tasklet): self.lu.LogWarning("Can't remove old LV: %s" % msg, hint="remove unused LVs manually") - def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable-msg=W0613 + def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613 """Replace a disk on the primary or secondary for DRBD 8. The algorithm for replace is quite complicated: @@ -9579,6 +10061,8 @@ class TLReplaceDisks(Tasklet): """ steps_total = 6 + pnode = self.instance.primary_node + # Step: check device activation self.lu.LogStep(1, steps_total, "Check device existence") self._CheckDisksExistence([self.instance.primary_node]) @@ -9653,10 +10137,8 @@ class TLReplaceDisks(Tasklet): " soon as possible")) self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)") - result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node], - self.node_secondary_ip, - self.instance.disks)\ - [self.instance.primary_node] + result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip, + self.instance.disks)[pnode] msg = result.fail_msg if msg: @@ -9806,7 +10288,7 @@ class LUNodeEvacuate(NoHooksLU): errors.ECODE_INVAL) # Declare locks - self.share_locks = dict.fromkeys(locking.LEVELS, 1) + self.share_locks = _ShareAll() self.needed_locks = { locking.LEVEL_INSTANCE: [], locking.LEVEL_NODEGROUP: [], @@ -9861,9 +10343,9 @@ class LUNodeEvacuate(NoHooksLU): def CheckPrereq(self): # Verify locks - owned_instances = self.glm.list_owned(locking.LEVEL_INSTANCE) - owned_nodes = self.glm.list_owned(locking.LEVEL_NODE) - owned_groups = self.glm.list_owned(locking.LEVEL_NODEGROUP) + owned_instances = self.owned_locks(locking.LEVEL_INSTANCE) + owned_nodes = self.owned_locks(locking.LEVEL_NODE) + owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) assert owned_nodes == self.lock_nodes @@ -10132,7 +10614,7 @@ class LUInstanceQueryData(NoHooksLU): self.wanted_names = None if self.op.use_locking: - self.share_locks = dict.fromkeys(locking.LEVELS, 1) + self.share_locks = _ShareAll() if self.wanted_names is None: self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET @@ -10140,7 +10622,6 @@ class LUInstanceQueryData(NoHooksLU): self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names self.needed_locks[locking.LEVEL_NODE] = [] - self.share_locks = dict.fromkeys(locking.LEVELS, 1) self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE def DeclareLocks(self, level): @@ -10155,10 +10636,10 @@ class LUInstanceQueryData(NoHooksLU): """ if self.wanted_names is None: assert self.op.use_locking, "Locking was not used" - self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE) + self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE) - self.wanted_instances = [self.cfg.GetInstanceInfo(name) - for name in self.wanted_names] + self.wanted_instances = \ + map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names)) def _ComputeBlockdevStatus(self, node, instance_name, dev): """Returns the status of a block device @@ -10199,8 +10680,9 @@ class LUInstanceQueryData(NoHooksLU): dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev) if dev.children: - dev_children = [self._ComputeDiskStatus(instance, snode, child) - for child in dev.children] + dev_children = map(compat.partial(self._ComputeDiskStatus, + instance, snode), + dev.children) else: dev_children = [] @@ -10222,8 +10704,16 @@ class LUInstanceQueryData(NoHooksLU): cluster = self.cfg.GetClusterInfo() - for instance in self.wanted_instances: - if not self.op.static: + pri_nodes = self.cfg.GetMultiNodeInfo(i.primary_node + for i in self.wanted_instances) + for instance, (_, pnode) in zip(self.wanted_instances, pri_nodes): + if self.op.static or pnode.offline: + remote_state = None + if pnode.offline: + self.LogWarning("Primary node %s is marked offline, returning static" + " information only for instance %s" % + (pnode.name, instance.name)) + else: remote_info = self.rpc.call_instance_info(instance.primary_node, instance.name, instance.hypervisor) @@ -10233,15 +10723,14 @@ class LUInstanceQueryData(NoHooksLU): remote_state = "up" else: remote_state = "down" - else: - remote_state = None + if instance.admin_up: config_state = "up" else: config_state = "down" - disks = [self._ComputeDiskStatus(instance, None, device) - for device in instance.disks] + disks = map(compat.partial(self._ComputeDiskStatus, instance, None), + instance.disks) result[instance.name] = { "name": instance.name, @@ -10366,13 +10855,13 @@ class LUInstanceSetParams(LogicalUnit): raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip, errors.ECODE_INVAL) - nic_bridge = nic_dict.get('bridge', None) + nic_bridge = nic_dict.get("bridge", None) nic_link = nic_dict.get(constants.INIC_LINK, None) if nic_bridge and nic_link: raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'" " at the same time", errors.ECODE_INVAL) elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE: - nic_dict['bridge'] = None + nic_dict["bridge"] = None elif nic_link and nic_link.lower() == constants.VALUE_NONE: nic_dict[constants.INIC_LINK] = None @@ -10415,13 +10904,13 @@ class LUInstanceSetParams(LogicalUnit): """ args = dict() if constants.BE_MEMORY in self.be_new: - args['memory'] = self.be_new[constants.BE_MEMORY] + args["memory"] = self.be_new[constants.BE_MEMORY] if constants.BE_VCPUS in self.be_new: - args['vcpus'] = self.be_new[constants.BE_VCPUS] + args["vcpus"] = self.be_new[constants.BE_VCPUS] # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk # information at all. if self.op.nics: - args['nics'] = [] + args["nics"] = [] nic_override = dict(self.op.nics) for idx, nic in enumerate(self.instance.nics): if idx in nic_override: @@ -10442,16 +10931,16 @@ class LUInstanceSetParams(LogicalUnit): nicparams = self.cluster.SimpleFillNIC(nic.nicparams) mode = nicparams[constants.NIC_MODE] link = nicparams[constants.NIC_LINK] - args['nics'].append((ip, mac, mode, link)) + args["nics"].append((ip, mac, mode, link)) if constants.DDM_ADD in nic_override: ip = nic_override[constants.DDM_ADD].get(constants.INIC_IP, None) mac = nic_override[constants.DDM_ADD][constants.INIC_MAC] nicparams = self.nic_pnew[constants.DDM_ADD] mode = nicparams[constants.NIC_MODE] link = nicparams[constants.NIC_LINK] - args['nics'].append((ip, mac, mode, link)) + args["nics"].append((ip, mac, mode, link)) elif constants.DDM_REMOVE in nic_override: - del args['nics'][-1] + del args["nics"][-1] env = _BuildInstanceHookEnvByObject(self, self.instance, override=args) if self.op.disk_template: @@ -10526,9 +11015,11 @@ class LUInstanceSetParams(LogicalUnit): # local check hypervisor.GetHypervisor(hv_type).CheckParameterSyntax(hv_new) _CheckHVParams(self, nodelist, instance.hypervisor, hv_new) - self.hv_new = hv_new # the new actual values + self.hv_proposed = self.hv_new = hv_new # the new actual values self.hv_inst = i_hvdict # the new dict (without defaults) else: + self.hv_proposed = cluster.SimpleFillHV(instance.hypervisor, instance.os, + instance.hvparams) self.hv_new = self.hv_inst = {} # beparams processing @@ -10537,12 +11028,40 @@ class LUInstanceSetParams(LogicalUnit): use_none=True) utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES) be_new = cluster.SimpleFillBE(i_bedict) - self.be_new = be_new # the new actual values + self.be_proposed = self.be_new = be_new # the new actual values self.be_inst = i_bedict # the new dict (without defaults) else: self.be_new = self.be_inst = {} + self.be_proposed = cluster.SimpleFillBE(instance.beparams) be_old = cluster.FillBE(instance) + # CPU param validation -- checking every time a paramtere is + # changed to cover all cases where either CPU mask or vcpus have + # changed + if (constants.BE_VCPUS in self.be_proposed and + constants.HV_CPU_MASK in self.hv_proposed): + cpu_list = \ + utils.ParseMultiCpuMask(self.hv_proposed[constants.HV_CPU_MASK]) + # Verify mask is consistent with number of vCPUs. Can skip this + # test if only 1 entry in the CPU mask, which means same mask + # is applied to all vCPUs. + if (len(cpu_list) > 1 and + len(cpu_list) != self.be_proposed[constants.BE_VCPUS]): + raise errors.OpPrereqError("Number of vCPUs [%d] does not match the" + " CPU mask [%s]" % + (self.be_proposed[constants.BE_VCPUS], + self.hv_proposed[constants.HV_CPU_MASK]), + errors.ECODE_INVAL) + + # Only perform this test if a new CPU mask is given + if constants.HV_CPU_MASK in self.hv_new: + # Calculate the largest CPU number requested + max_requested_cpu = max(map(max, cpu_list)) + # Check that all of the instance's nodes have enough physical CPUs to + # satisfy the requested CPU mask + _CheckNodesPhysicalCPUs(self, instance.all_nodes, + max_requested_cpu + 1, instance.hypervisor) + # osparams processing if self.op.osparams: i_osdict = _GetUpdatedParams(instance.osparams, self.op.osparams) @@ -10568,8 +11087,8 @@ class LUInstanceSetParams(LogicalUnit): if msg: # Assume the primary node is unreachable and go ahead self.warn.append("Can't get info from primary node %s: %s" % - (pnode, msg)) - elif not isinstance(pninfo.payload.get('memory_free', None), int): + (pnode, msg)) + elif not isinstance(pninfo.payload.get("memory_free", None), int): self.warn.append("Node data from primary node %s doesn't contain" " free memory information" % pnode) elif instance_info.fail_msg: @@ -10577,14 +11096,14 @@ class LUInstanceSetParams(LogicalUnit): instance_info.fail_msg) else: if instance_info.payload: - current_mem = int(instance_info.payload['memory']) + current_mem = int(instance_info.payload["memory"]) else: # Assume instance not running # (there is a slight race condition here, but it's not very probable, # and we have no other way to check) current_mem = 0 miss_mem = (be_new[constants.BE_MEMORY] - current_mem - - pninfo.payload['memory_free']) + pninfo.payload["memory_free"]) if miss_mem > 0: raise errors.OpPrereqError("This change will prevent the instance" " from starting, due to %d MB of memory" @@ -10597,11 +11116,11 @@ class LUInstanceSetParams(LogicalUnit): continue nres.Raise("Can't get info from secondary node %s" % node, prereq=True, ecode=errors.ECODE_STATE) - if not isinstance(nres.payload.get('memory_free', None), int): + if not isinstance(nres.payload.get("memory_free", None), int): raise errors.OpPrereqError("Secondary node %s didn't return free" " memory information" % node, errors.ECODE_STATE) - elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']: + elif be_new[constants.BE_MEMORY] > nres.payload["memory_free"]: raise errors.OpPrereqError("This change will prevent the instance" " from failover to its secondary node" " %s, due to not enough memory" % node, @@ -10637,8 +11156,8 @@ class LUInstanceSetParams(LogicalUnit): for key in constants.NICS_PARAMETERS if key in nic_dict]) - if 'bridge' in nic_dict: - update_params_dict[constants.NIC_LINK] = nic_dict['bridge'] + if "bridge" in nic_dict: + update_params_dict[constants.NIC_LINK] = nic_dict["bridge"] new_nic_params = _GetUpdatedParams(old_nic_params, update_params_dict) @@ -10664,12 +11183,12 @@ class LUInstanceSetParams(LogicalUnit): else: nic_ip = old_nic_ip if nic_ip is None: - raise errors.OpPrereqError('Cannot set the nic ip to None' - ' on a routed nic', errors.ECODE_INVAL) + raise errors.OpPrereqError("Cannot set the nic ip to None" + " on a routed nic", errors.ECODE_INVAL) if constants.INIC_MAC in nic_dict: nic_mac = nic_dict[constants.INIC_MAC] if nic_mac is None: - raise errors.OpPrereqError('Cannot set the nic mac to None', + raise errors.OpPrereqError("Cannot set the nic mac to None", errors.ECODE_INVAL) elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE): # otherwise generate the mac @@ -10943,6 +11462,147 @@ class LUInstanceSetParams(LogicalUnit): } +class LUInstanceChangeGroup(LogicalUnit): + HPATH = "instance-change-group" + HTYPE = constants.HTYPE_INSTANCE + REQ_BGL = False + + def ExpandNames(self): + self.share_locks = _ShareAll() + self.needed_locks = { + locking.LEVEL_NODEGROUP: [], + locking.LEVEL_NODE: [], + } + + self._ExpandAndLockInstance() + + if self.op.target_groups: + self.req_target_uuids = map(self.cfg.LookupNodeGroup, + self.op.target_groups) + else: + self.req_target_uuids = None + + self.op.iallocator = _GetDefaultIAllocator(self.cfg, self.op.iallocator) + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODEGROUP: + assert not self.needed_locks[locking.LEVEL_NODEGROUP] + + if self.req_target_uuids: + lock_groups = set(self.req_target_uuids) + + # Lock all groups used by instance optimistically; this requires going + # via the node before it's locked, requiring verification later on + instance_groups = self.cfg.GetInstanceNodeGroups(self.op.instance_name) + lock_groups.update(instance_groups) + else: + # No target groups, need to lock all of them + lock_groups = locking.ALL_SET + + self.needed_locks[locking.LEVEL_NODEGROUP] = lock_groups + + elif level == locking.LEVEL_NODE: + if self.req_target_uuids: + # Lock all nodes used by instances + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + self._LockInstancesNodes() + + # Lock all nodes in all potential target groups + lock_groups = (frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) - + self.cfg.GetInstanceNodeGroups(self.op.instance_name)) + member_nodes = [node_name + for group in lock_groups + for node_name in self.cfg.GetNodeGroup(group).members] + self.needed_locks[locking.LEVEL_NODE].extend(member_nodes) + else: + # Lock all nodes as all groups are potential targets + self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + + def CheckPrereq(self): + owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE)) + owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) + owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE)) + + assert (self.req_target_uuids is None or + owned_groups.issuperset(self.req_target_uuids)) + assert owned_instances == set([self.op.instance_name]) + + # Get instance information + self.instance = self.cfg.GetInstanceInfo(self.op.instance_name) + + # Check if node groups for locked instance are still correct + assert owned_nodes.issuperset(self.instance.all_nodes), \ + ("Instance %s's nodes changed while we kept the lock" % + self.op.instance_name) + + inst_groups = _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, + owned_groups) + + if self.req_target_uuids: + # User requested specific target groups + self.target_uuids = self.req_target_uuids + else: + # All groups except those used by the instance are potential targets + self.target_uuids = owned_groups - inst_groups + + conflicting_groups = self.target_uuids & inst_groups + if conflicting_groups: + raise errors.OpPrereqError("Can't use group(s) '%s' as targets, they are" + " used by the instance '%s'" % + (utils.CommaJoin(conflicting_groups), + self.op.instance_name), + errors.ECODE_INVAL) + + if not self.target_uuids: + raise errors.OpPrereqError("There are no possible target groups", + errors.ECODE_INVAL) + + def BuildHooksEnv(self): + """Build hooks env. + + """ + assert self.target_uuids + + env = { + "TARGET_GROUPS": " ".join(self.target_uuids), + } + + env.update(_BuildInstanceHookEnvByObject(self, self.instance)) + + return env + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ + mn = self.cfg.GetMasterNode() + return ([mn], [mn]) + + def Exec(self, feedback_fn): + instances = list(self.owned_locks(locking.LEVEL_INSTANCE)) + + assert instances == [self.op.instance_name], "Instance not locked" + + ial = IAllocator(self.cfg, self.rpc, constants.IALLOCATOR_MODE_CHG_GROUP, + instances=instances, target_groups=list(self.target_uuids)) + + ial.Run(self.op.iallocator) + + if not ial.success: + raise errors.OpPrereqError("Can't compute solution for changing group of" + " instance '%s' using iallocator '%s': %s" % + (self.op.instance_name, self.op.iallocator, + ial.info), + errors.ECODE_NORES) + + jobs = _LoadNodeEvacResult(self, ial.result, self.op.early_release, False) + + self.LogInfo("Iallocator returned %s job(s) for changing group of" + " instance '%s'", len(jobs), self.op.instance_name) + + return ResultWithJobs(jobs) + + class LUBackupQuery(NoHooksLU): """Query the exports list @@ -10967,7 +11627,7 @@ class LUBackupQuery(NoHooksLU): that node. """ - self.nodes = self.glm.list_owned(locking.LEVEL_NODE) + self.nodes = self.owned_locks(locking.LEVEL_NODE) rpcresult = self.rpc.call_export_list(self.nodes) result = {} for node in rpcresult: @@ -11263,8 +11923,8 @@ class LUBackupExport(LogicalUnit): not self.op.remove_instance): assert not activate_disks feedback_fn("Starting instance %s" % instance.name) - result = self.rpc.call_instance_start(src_node, instance, - None, None, False) + result = self.rpc.call_instance_start(src_node, + (instance, None, None), False) msg = result.fail_msg if msg: feedback_fn("Failed to start instance: %s" % msg) @@ -11350,7 +12010,7 @@ class LUBackupRemove(NoHooksLU): fqdn_warn = True instance_name = self.op.instance_name - locked_nodes = self.glm.list_owned(locking.LEVEL_NODE) + locked_nodes = self.owned_locks(locking.LEVEL_NODE) exportlist = self.rpc.call_export_list(locked_nodes) found = False for node in exportlist: @@ -11470,12 +12130,12 @@ class LUGroupAssignNodes(NoHooksLU): """ assert self.needed_locks[locking.LEVEL_NODEGROUP] - assert (frozenset(self.glm.list_owned(locking.LEVEL_NODE)) == + assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) == frozenset(self.op.nodes)) expected_locks = (set([self.group_uuid]) | self.cfg.GetNodeGroupsFromNodes(self.op.nodes)) - actual_locks = self.glm.list_owned(locking.LEVEL_NODEGROUP) + actual_locks = self.owned_locks(locking.LEVEL_NODEGROUP) if actual_locks != expected_locks: raise errors.OpExecError("Nodes changed groups since locks were acquired," " current groups are '%s', used to be '%s'" % @@ -11663,6 +12323,9 @@ class LUGroupQuery(NoHooksLU): def ExpandNames(self): self.gq.ExpandNames(self) + def DeclareLocks(self, level): + self.gq.DeclareLocks(self, level) + def Exec(self, feedback_fn): return self.gq.OldStyleQuery(self) @@ -11741,7 +12404,6 @@ class LUGroupSetParams(LogicalUnit): return result - class LUGroupRemove(LogicalUnit): HPATH = "group-remove" HTYPE = constants.HTYPE_GROUP @@ -11900,16 +12562,9 @@ class LUGroupEvacuate(LogicalUnit): utils.CommaJoin(self.req_target_uuids)), errors.ECODE_INVAL) - if not self.op.iallocator: - # Use default iallocator - self.op.iallocator = self.cfg.GetDefaultIAllocator() + self.op.iallocator = _GetDefaultIAllocator(self.cfg, self.op.iallocator) - if not self.op.iallocator: - raise errors.OpPrereqError("No iallocator was specified, neither in the" - " opcode nor as a cluster-wide default", - errors.ECODE_INVAL) - - self.share_locks = dict.fromkeys(locking.LEVELS, 1) + self.share_locks = _ShareAll() self.needed_locks = { locking.LEVEL_INSTANCE: [], locking.LEVEL_NODEGROUP: [], @@ -11935,7 +12590,7 @@ class LUGroupEvacuate(LogicalUnit): # via the node before it's locked, requiring verification later on lock_groups.update(group_uuid for instance_name in - self.glm.list_owned(locking.LEVEL_INSTANCE) + self.owned_locks(locking.LEVEL_INSTANCE) for group_uuid in self.cfg.GetInstanceNodeGroups(instance_name)) else: @@ -11950,50 +12605,39 @@ class LUGroupEvacuate(LogicalUnit): self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND self._LockInstancesNodes() - # Lock all nodes in group to be evacuated - assert self.group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP) - member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members + # Lock all nodes in group to be evacuated and target groups + owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) + assert self.group_uuid in owned_groups + member_nodes = [node_name + for group in owned_groups + for node_name in self.cfg.GetNodeGroup(group).members] self.needed_locks[locking.LEVEL_NODE].extend(member_nodes) def CheckPrereq(self): - owned_instances = frozenset(self.glm.list_owned(locking.LEVEL_INSTANCE)) - owned_groups = frozenset(self.glm.list_owned(locking.LEVEL_NODEGROUP)) - owned_nodes = frozenset(self.glm.list_owned(locking.LEVEL_NODE)) + owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE)) + owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) + owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE)) assert owned_groups.issuperset(self.req_target_uuids) assert self.group_uuid in owned_groups # Check if locked instances are still correct - wanted_instances = self.cfg.GetNodeGroupInstances(self.group_uuid) - if owned_instances != wanted_instances: - raise errors.OpPrereqError("Instances in node group to be evacuated (%s)" - " changed since locks were acquired, wanted" - " %s, have %s; retry the operation" % - (self.group_uuid, - utils.CommaJoin(wanted_instances), - utils.CommaJoin(owned_instances)), - errors.ECODE_STATE) + _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances) # Get instance information - self.instances = dict((name, self.cfg.GetInstanceInfo(name)) - for name in owned_instances) + self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances)) # Check if node groups for locked instances are still correct for instance_name in owned_instances: inst = self.instances[instance_name] - assert self.group_uuid in self.cfg.GetInstanceNodeGroups(instance_name), \ - "Instance %s has no node in group %s" % (instance_name, self.group_uuid) assert owned_nodes.issuperset(inst.all_nodes), \ "Instance %s's nodes changed while we kept the lock" % instance_name - inst_groups = self.cfg.GetInstanceNodeGroups(instance_name) - if not owned_groups.issuperset(inst_groups): - raise errors.OpPrereqError("Instance's node groups changed since locks" - " were acquired, current groups are '%s'," - " owning groups '%s'; retry the operation" % - (utils.CommaJoin(inst_groups), - utils.CommaJoin(owned_groups)), - errors.ECODE_STATE) + inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name, + owned_groups) + + assert self.group_uuid in inst_groups, \ + "Instance %s has no node in group %s" % (instance_name, self.group_uuid) if self.req_target_uuids: # User requested specific target groups @@ -12004,7 +12648,8 @@ class LUGroupEvacuate(LogicalUnit): if group_uuid != self.group_uuid] if not self.target_uuids: - raise errors.OpExecError("There are no possible target groups") + raise errors.OpPrereqError("There are no possible target groups", + errors.ECODE_INVAL) def BuildHooksEnv(self): """Build hooks env. @@ -12021,14 +12666,14 @@ class LUGroupEvacuate(LogicalUnit): """ mn = self.cfg.GetMasterNode() - assert self.group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP) + assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP) run_nodes = [mn] + self.cfg.GetNodeGroup(self.group_uuid).members return (run_nodes, run_nodes) def Exec(self, feedback_fn): - instances = list(self.glm.list_owned(locking.LEVEL_INSTANCE)) + instances = list(self.owned_locks(locking.LEVEL_INSTANCE)) assert self.group_uuid not in self.target_uuids @@ -12051,7 +12696,7 @@ class LUGroupEvacuate(LogicalUnit): return ResultWithJobs(jobs) -class TagsLU(NoHooksLU): # pylint: disable-msg=W0223 +class TagsLU(NoHooksLU): # pylint: disable=W0223 """Generic tags LU. This is an abstract class which is the parent of all the other tags LUs. @@ -12099,7 +12744,7 @@ class LUTagsGet(TagsLU): TagsLU.ExpandNames(self) # Share locks as this is only a read operation - self.share_locks = dict.fromkeys(locking.LEVELS, 1) + self.share_locks = _ShareAll() def Exec(self, feedback_fn): """Returns the tag list. @@ -12311,7 +12956,7 @@ class LUTestJqueue(NoHooksLU): # Wait for client to close try: try: - # pylint: disable-msg=E1101 + # pylint: disable=E1101 # Instance of '_socketobject' has no ... member conn.settimeout(cls._CLIENT_CONFIRM_TIMEOUT) conn.recv(1) @@ -12408,12 +13053,12 @@ class IAllocator(object): easy usage """ - # pylint: disable-msg=R0902 + # pylint: disable=R0902 # lots of instance attributes - def __init__(self, cfg, rpc, mode, **kwargs): + def __init__(self, cfg, rpc_runner, mode, **kwargs): self.cfg = cfg - self.rpc = rpc + self.rpc = rpc_runner # init buffer variables self.in_text = self.out_text = self.in_data = self.out_data = None # init all input fields so that pylint is happy @@ -12423,7 +13068,6 @@ class IAllocator(object): self.hypervisor = None self.relocate_from = None self.name = None - self.evac_nodes = None self.instances = None self.evac_mode = None self.target_groups = [] @@ -12557,8 +13201,8 @@ class IAllocator(object): nname) remote_info = nresult.payload - for attr in ['memory_total', 'memory_free', 'memory_dom0', - 'vg_size', 'vg_free', 'cpu_total']: + for attr in ["memory_total", "memory_free", "memory_dom0", + "vg_size", "vg_free", "cpu_total"]: if attr not in remote_info: raise errors.OpExecError("Node '%s' didn't return attribute" " '%s'" % (nname, attr)) @@ -12574,21 +13218,21 @@ class IAllocator(object): if iinfo.name not in node_iinfo[nname].payload: i_used_mem = 0 else: - i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory']) + i_used_mem = int(node_iinfo[nname].payload[iinfo.name]["memory"]) i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem - remote_info['memory_free'] -= max(0, i_mem_diff) + remote_info["memory_free"] -= max(0, i_mem_diff) if iinfo.admin_up: i_p_up_mem += beinfo[constants.BE_MEMORY] # compute memory used by instances pnr_dyn = { - "total_memory": remote_info['memory_total'], - "reserved_memory": remote_info['memory_dom0'], - "free_memory": remote_info['memory_free'], - "total_disk": remote_info['vg_size'], - "free_disk": remote_info['vg_free'], - "total_cpus": remote_info['cpu_total'], + "total_memory": remote_info["memory_total"], + "reserved_memory": remote_info["memory_dom0"], + "free_memory": remote_info["memory_free"], + "total_disk": remote_info["vg_size"], + "free_disk": remote_info["vg_free"], + "total_cpus": remote_info["cpu_total"], "i_pri_memory": i_p_mem, "i_pri_up_memory": i_p_up_mem, } @@ -12705,15 +13349,6 @@ class IAllocator(object): } return request - def _AddEvacuateNodes(self): - """Add evacuate nodes data to allocator structure. - - """ - request = { - "evac_nodes": self.evac_nodes - } - return request - def _AddNodeEvacuate(self): """Get data for node-evacuate requests. @@ -12755,7 +13390,7 @@ class IAllocator(object): _STRING_LIST = ht.TListOf(ht.TString) _JOB_LIST = ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, { - # pylint: disable-msg=E1101 + # pylint: disable=E1101 # Class '...' has no 'OP_ID' member "OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID, opcodes.OpInstanceMigrate.OP_ID, @@ -12794,9 +13429,6 @@ class IAllocator(object): (_AddRelocateInstance, [("name", ht.TString), ("relocate_from", _STRING_LIST)], ht.TList), - constants.IALLOCATOR_MODE_MEVAC: - (_AddEvacuateNodes, [("evac_nodes", _STRING_LIST)], - ht.TListOf(ht.TAnd(ht.TIsLength(2), _STRING_LIST))), constants.IALLOCATOR_MODE_NODE_EVAC: (_AddNodeEvacuate, [ ("instances", _STRING_LIST), @@ -12855,39 +13487,25 @@ class IAllocator(object): (self._result_check, self.result), errors.ECODE_INVAL) - if self.mode in (constants.IALLOCATOR_MODE_RELOC, - constants.IALLOCATOR_MODE_MEVAC): + if self.mode == constants.IALLOCATOR_MODE_RELOC: + assert self.relocate_from is not None + assert self.required_nodes == 1 + node2group = dict((name, ndata["group"]) for (name, ndata) in self.in_data["nodes"].items()) fn = compat.partial(self._NodesToGroups, node2group, self.in_data["nodegroups"]) - if self.mode == constants.IALLOCATOR_MODE_RELOC: - assert self.relocate_from is not None - assert self.required_nodes == 1 - - request_groups = fn(self.relocate_from) - result_groups = fn(rdict["result"]) - - if result_groups != request_groups: - raise errors.OpExecError("Groups of nodes returned by iallocator (%s)" - " differ from original groups (%s)" % - (utils.CommaJoin(result_groups), - utils.CommaJoin(request_groups))) - elif self.mode == constants.IALLOCATOR_MODE_MEVAC: - request_groups = fn(self.evac_nodes) - for (instance_name, secnode) in self.result: - result_groups = fn([secnode]) - if result_groups != request_groups: - raise errors.OpExecError("Iallocator returned new secondary node" - " '%s' (group '%s') for instance '%s'" - " which is not in original group '%s'" % - (secnode, utils.CommaJoin(result_groups), - instance_name, - utils.CommaJoin(request_groups))) - else: - raise errors.ProgrammerError("Unhandled mode '%s'" % self.mode) + instance = self.cfg.GetInstanceInfo(self.name) + request_groups = fn(self.relocate_from + [instance.primary_node]) + result_groups = fn(rdict["result"] + [instance.primary_node]) + + if self.success and not set(result_groups).issubset(request_groups): + raise errors.OpExecError("Groups of nodes returned by iallocator (%s)" + " differ from original groups (%s)" % + (utils.CommaJoin(result_groups), + utils.CommaJoin(request_groups))) elif self.mode == constants.IALLOCATOR_MODE_NODE_EVAC: assert self.evac_mode in constants.IALLOCATOR_NEVAC_MODES @@ -12969,11 +13587,8 @@ class LUTestAllocator(NoHooksLU): elif self.op.mode == constants.IALLOCATOR_MODE_RELOC: fname = _ExpandInstanceName(self.cfg, self.op.name) self.op.name = fname - self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes - elif self.op.mode == constants.IALLOCATOR_MODE_MEVAC: - if not hasattr(self.op, "evac_nodes"): - raise errors.OpPrereqError("Missing attribute 'evac_nodes' on" - " opcode input", errors.ECODE_INVAL) + self.relocate_from = \ + list(self.cfg.GetInstanceInfo(fname).secondary_nodes) elif self.op.mode in (constants.IALLOCATOR_MODE_CHG_GROUP, constants.IALLOCATOR_MODE_NODE_EVAC): if not self.op.instances: @@ -13014,10 +13629,6 @@ class LUTestAllocator(NoHooksLU): name=self.op.name, relocate_from=list(self.relocate_from), ) - elif self.op.mode == constants.IALLOCATOR_MODE_MEVAC: - ial = IAllocator(self.cfg, self.rpc, - mode=self.op.mode, - evac_nodes=self.op.evac_nodes) elif self.op.mode == constants.IALLOCATOR_MODE_CHG_GROUP: ial = IAllocator(self.cfg, self.rpc, mode=self.op.mode,