X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/7ee045a0207190af8ee1f24c4793b5f09cd14f7f..b6267745ede04b3c943bc02e004bdb9347e0f564:/lib/cmdlib.py diff --git a/lib/cmdlib.py b/lib/cmdlib.py index 3e603bd..4f33e5f 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -21,7 +21,7 @@ """Module implementing the master-side code.""" -# pylint: disable-msg=W0201,C0302 +# pylint: disable=W0201,C0302 # W0201 since most LU attributes are defined in CheckPrereq or similar # functions @@ -40,6 +40,7 @@ import socket import tempfile import shutil import itertools +import operator from ganeti import ssh from ganeti import utils @@ -57,21 +58,9 @@ from ganeti import netutils from ganeti import query from ganeti import qlang from ganeti import opcodes +from ganeti import ht -import ganeti.masterd.instance # pylint: disable-msg=W0611 - - -def _SupportsOob(cfg, node): - """Tells if node supports OOB. - - @type cfg: L{config.ConfigWriter} - @param cfg: The cluster configuration - @type node: L{objects.Node} - @param node: The node - @return: The OOB script if supported or an empty string otherwise - - """ - return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM] +import ganeti.masterd.instance # pylint: disable=W0611 class ResultWithJobs: @@ -130,6 +119,8 @@ class LogicalUnit(object): self.op = op self.cfg = context.cfg self.glm = context.glm + # readability alias + self.owned_locks = context.glm.list_owned self.context = context self.rpc = rpc # Dicts used to declare locking needs to mcpu @@ -140,10 +131,10 @@ class LogicalUnit(object): # Used to force good behavior when calling helper functions self.recalculate_locks = {} # logging - self.Log = processor.Log # pylint: disable-msg=C0103 - self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103 - self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103 - self.LogStep = processor.LogStep # pylint: disable-msg=C0103 + self.Log = processor.Log # pylint: disable=C0103 + self.LogWarning = processor.LogWarning # pylint: disable=C0103 + self.LogInfo = processor.LogInfo # pylint: disable=C0103 + self.LogStep = processor.LogStep # pylint: disable=C0103 # support for dry-run self.dry_run_result = None # support for generic debug attribute @@ -331,7 +322,7 @@ class LogicalUnit(object): """ # API must be kept, thus we ignore the unused argument and could # be a function warnings - # pylint: disable-msg=W0613,R0201 + # pylint: disable=W0613,R0201 return lu_result def _ExpandAndLockInstance(self): @@ -385,8 +376,8 @@ class LogicalUnit(object): # future we might want to have different behaviors depending on the value # of self.recalculate_locks[locking.LEVEL_NODE] wanted_nodes = [] - for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE): - instance = self.context.cfg.GetInstanceInfo(instance_name) + locked_i = self.owned_locks(locking.LEVEL_INSTANCE) + for _, instance in self.cfg.GetMultiInstanceInfo(locked_i): wanted_nodes.append(instance.primary_node) if not primary_only: wanted_nodes.extend(instance.secondary_nodes) @@ -399,7 +390,7 @@ class LogicalUnit(object): del self.recalculate_locks[locking.LEVEL_NODE] -class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223 +class NoHooksLU(LogicalUnit): # pylint: disable=W0223 """Simple LU which runs no hooks. This LU is intended as a parent for other LogicalUnits which will @@ -499,7 +490,7 @@ class _QueryBase: """ if self.do_locking: - names = lu.glm.list_owned(lock_level) + names = lu.owned_locks(lock_level) else: names = all_names @@ -559,6 +550,76 @@ class _QueryBase: sort_by_name=self.sort_by_name) +def _ShareAll(): + """Returns a dict declaring all lock levels shared. + + """ + return dict.fromkeys(locking.LEVELS, 1) + + +def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups): + """Checks if the owned node groups are still correct for an instance. + + @type cfg: L{config.ConfigWriter} + @param cfg: The cluster configuration + @type instance_name: string + @param instance_name: Instance name + @type owned_groups: set or frozenset + @param owned_groups: List of currently owned node groups + + """ + inst_groups = cfg.GetInstanceNodeGroups(instance_name) + + if not owned_groups.issuperset(inst_groups): + raise errors.OpPrereqError("Instance %s's node groups changed since" + " locks were acquired, current groups are" + " are '%s', owning groups '%s'; retry the" + " operation" % + (instance_name, + utils.CommaJoin(inst_groups), + utils.CommaJoin(owned_groups)), + errors.ECODE_STATE) + + return inst_groups + + +def _CheckNodeGroupInstances(cfg, group_uuid, owned_instances): + """Checks if the instances in a node group are still correct. + + @type cfg: L{config.ConfigWriter} + @param cfg: The cluster configuration + @type group_uuid: string + @param group_uuid: Node group UUID + @type owned_instances: set or frozenset + @param owned_instances: List of currently owned instances + + """ + wanted_instances = cfg.GetNodeGroupInstances(group_uuid) + if owned_instances != wanted_instances: + raise errors.OpPrereqError("Instances in node group '%s' changed since" + " locks were acquired, wanted '%s', have '%s';" + " retry the operation" % + (group_uuid, + utils.CommaJoin(wanted_instances), + utils.CommaJoin(owned_instances)), + errors.ECODE_STATE) + + return wanted_instances + + +def _SupportsOob(cfg, node): + """Tells if node supports OOB. + + @type cfg: L{config.ConfigWriter} + @param cfg: The cluster configuration + @type node: L{objects.Node} + @param node: The node + @return: The OOB script if supported or an empty string otherwise + + """ + return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM] + + def _GetWantedNodes(lu, nodes): """Returns list of checked and expanded node names. @@ -656,18 +717,18 @@ def _ReleaseLocks(lu, level, names=None, keep=None): release = [] # Determine which locks to release - for name in lu.glm.list_owned(level): + for name in lu.owned_locks(level): if should_release(name): release.append(name) else: retain.append(name) - assert len(lu.glm.list_owned(level)) == (len(retain) + len(release)) + assert len(lu.owned_locks(level)) == (len(retain) + len(release)) # Release just some locks lu.glm.release(level, names=release) - assert frozenset(lu.glm.list_owned(level)) == frozenset(retain) + assert frozenset(lu.owned_locks(level)) == frozenset(retain) else: # Release everything lu.glm.release(level) @@ -675,6 +736,19 @@ def _ReleaseLocks(lu, level, names=None, keep=None): assert not lu.glm.is_owned(level), "No locks should be owned" +def _MapInstanceDisksToNodes(instances): + """Creates a map from (node, volume) to instance name. + + @type instances: list of L{objects.Instance} + @rtype: dict; tuple of (node name, volume name) as key, instance name as value + + """ + return dict(((node, vol), inst.name) + for inst in instances + for (node, vols) in inst.MapLVsByNode().items() + for vol in vols) + + def _RunPostHook(lu, node_name): """Runs the post-hook for an opcode on a single node. @@ -683,7 +757,7 @@ def _RunPostHook(lu, node_name): try: hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name]) except: - # pylint: disable-msg=W0702 + # pylint: disable=W0702 lu.LogWarning("Errors occurred running hooks on %s" % node_name) @@ -860,7 +934,7 @@ def _ExpandInstanceName(cfg, name): def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, memory, vcpus, nics, disk_template, disks, - bep, hvp, hypervisor_name): + bep, hvp, hypervisor_name, tags): """Builds instance related env variables for hooks This builds the hook environment from individual variables. @@ -892,6 +966,8 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, @param hvp: the hypervisor parameters for the instance @type hypervisor_name: string @param hypervisor_name: the hypervisor for the instance + @type tags: list + @param tags: list of instance tags as strings @rtype: dict @return: the hook environment for this instance @@ -939,6 +1015,11 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, env["INSTANCE_DISK_COUNT"] = disk_count + if not tags: + tags = [] + + env["INSTANCE_TAGS"] = " ".join(tags) + for source, kind in [(bep, "BE"), (hvp, "HV")]: for key, value in source.items(): env["INSTANCE_%s_%s" % (kind, key)] = value @@ -989,23 +1070,24 @@ def _BuildInstanceHookEnvByObject(lu, instance, override=None): bep = cluster.FillBE(instance) hvp = cluster.FillHV(instance) args = { - 'name': instance.name, - 'primary_node': instance.primary_node, - 'secondary_nodes': instance.secondary_nodes, - 'os_type': instance.os, - 'status': instance.admin_up, - 'memory': bep[constants.BE_MEMORY], - 'vcpus': bep[constants.BE_VCPUS], - 'nics': _NICListToTuple(lu, instance.nics), - 'disk_template': instance.disk_template, - 'disks': [(disk.size, disk.mode) for disk in instance.disks], - 'bep': bep, - 'hvp': hvp, - 'hypervisor_name': instance.hypervisor, + "name": instance.name, + "primary_node": instance.primary_node, + "secondary_nodes": instance.secondary_nodes, + "os_type": instance.os, + "status": instance.admin_up, + "memory": bep[constants.BE_MEMORY], + "vcpus": bep[constants.BE_VCPUS], + "nics": _NICListToTuple(lu, instance.nics), + "disk_template": instance.disk_template, + "disks": [(disk.size, disk.mode) for disk in instance.disks], + "bep": bep, + "hvp": hvp, + "hypervisor_name": instance.hypervisor, + "tags": instance.tags, } if override: args.update(override) - return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142 + return _BuildInstanceHookEnv(**args) # pylint: disable=W0142 def _AdjustCandidatePool(lu, exceptions): @@ -1067,9 +1149,13 @@ def _CheckOSVariant(os_obj, name): @param name: OS name passed by the user, to check for validity """ + variant = objects.OS.GetVariant(name) if not os_obj.supported_variants: + if variant: + raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'" + " passed)" % (os_obj.name, variant), + errors.ECODE_INVAL) return - variant = objects.OS.GetVariant(name) if not variant: raise errors.OpPrereqError("OS name must include a variant", errors.ECODE_INVAL) @@ -1153,7 +1239,7 @@ def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot): iallocator = getattr(lu.op, iallocator_slot, None) if node is not None and iallocator is not None: - raise errors.OpPrereqError("Do not specify both, iallocator and node.", + raise errors.OpPrereqError("Do not specify both, iallocator and node", errors.ECODE_INVAL) elif node is None and iallocator is None: default_iallocator = lu.cfg.GetDefaultIAllocator() @@ -1161,10 +1247,33 @@ def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot): setattr(lu.op, iallocator_slot, default_iallocator) else: raise errors.OpPrereqError("No iallocator or node given and no" - " cluster-wide default iallocator found." - " Please specify either an iallocator or a" + " cluster-wide default iallocator found;" + " please specify either an iallocator or a" " node, or set a cluster-wide default" - " iallocator.") + " iallocator") + + +def _GetDefaultIAllocator(cfg, iallocator): + """Decides on which iallocator to use. + + @type cfg: L{config.ConfigWriter} + @param cfg: Cluster configuration object + @type iallocator: string or None + @param iallocator: Iallocator specified in opcode + @rtype: string + @return: Iallocator name + + """ + if not iallocator: + # Use default iallocator + iallocator = cfg.GetDefaultIAllocator() + + if not iallocator: + raise errors.OpPrereqError("No iallocator was specified, neither in the" + " opcode nor as a cluster-wide default", + errors.ECODE_INVAL) + + return iallocator class LUClusterPostInit(LogicalUnit): @@ -1253,7 +1362,7 @@ class LUClusterDestroy(LogicalUnit): def _VerifyCertificate(filename): - """Verifies a certificate for LUClusterVerify. + """Verifies a certificate for L{LUClusterVerifyConfig}. @type filename: string @param filename: Path to PEM file @@ -1262,8 +1371,8 @@ def _VerifyCertificate(filename): try: cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, utils.ReadFile(filename)) - except Exception, err: # pylint: disable-msg=W0703 - return (LUClusterVerify.ETYPE_ERROR, + except Exception, err: # pylint: disable=W0703 + return (LUClusterVerifyConfig.ETYPE_ERROR, "Failed to load X509 certificate %s: %s" % (filename, err)) (errcode, msg) = \ @@ -1278,21 +1387,52 @@ def _VerifyCertificate(filename): if errcode is None: return (None, fnamemsg) elif errcode == utils.CERT_WARNING: - return (LUClusterVerify.ETYPE_WARNING, fnamemsg) + return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg) elif errcode == utils.CERT_ERROR: - return (LUClusterVerify.ETYPE_ERROR, fnamemsg) + return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg) raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode) -class LUClusterVerify(LogicalUnit): - """Verifies the cluster status. +def _GetAllHypervisorParameters(cluster, instances): + """Compute the set of all hypervisor parameters. + + @type cluster: L{objects.Cluster} + @param cluster: the cluster object + @param instances: list of L{objects.Instance} + @param instances: additional instances from which to obtain parameters + @rtype: list of (origin, hypervisor, parameters) + @return: a list with all parameters found, indicating the hypervisor they + apply to, and the origin (can be "cluster", "os X", or "instance Y") """ - HPATH = "cluster-verify" - HTYPE = constants.HTYPE_CLUSTER - REQ_BGL = False + hvp_data = [] + + for hv_name in cluster.enabled_hypervisors: + hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name))) + + for os_name, os_hvp in cluster.os_hvp.items(): + for hv_name, hv_params in os_hvp.items(): + if hv_params: + full_params = cluster.GetHVDefaults(hv_name, os_name=os_name) + hvp_data.append(("os %s" % os_name, hv_name, full_params)) + + # TODO: collapse identical parameter values in a single one + for instance in instances: + if instance.hvparams: + hvp_data.append(("instance %s" % instance.name, instance.hypervisor, + cluster.FillHV(instance))) + + return hvp_data + +class _VerifyErrors(object): + """Mix-in for cluster/group verify LUs. + + It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects + self.op and self._feedback_fn to be available.) + + """ TCLUSTER = "cluster" TNODE = "node" TINSTANCE = "instance" @@ -1300,6 +1440,8 @@ class LUClusterVerify(LogicalUnit): ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG") ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT") ECLUSTERFILECHECK = (TCLUSTER, "ECLUSTERFILECHECK") + ECLUSTERDANGLINGNODES = (TNODE, "ECLUSTERDANGLINGNODES") + ECLUSTERDANGLINGINST = (TNODE, "ECLUSTERDANGLINGINST") EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE") EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN") EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT") @@ -1329,6 +1471,182 @@ class LUClusterVerify(LogicalUnit): ETYPE_ERROR = "ERROR" ETYPE_WARNING = "WARNING" + def _Error(self, ecode, item, msg, *args, **kwargs): + """Format an error message. + + Based on the opcode's error_codes parameter, either format a + parseable error code, or a simpler error string. + + This must be called only from Exec and functions called from Exec. + + """ + ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) + itype, etxt = ecode + # first complete the msg + if args: + msg = msg % args + # then format the whole message + if self.op.error_codes: # This is a mix-in. pylint: disable=E1101 + msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg) + else: + if item: + item = " " + item + else: + item = "" + msg = "%s: %s%s: %s" % (ltype, itype, item, msg) + # and finally report it via the feedback_fn + self._feedback_fn(" - %s" % msg) # Mix-in. pylint: disable=E1101 + + def _ErrorIf(self, cond, *args, **kwargs): + """Log an error message if the passed condition is True. + + """ + cond = (bool(cond) + or self.op.debug_simulate_errors) # pylint: disable=E1101 + if cond: + self._Error(*args, **kwargs) + # do not mark the operation as failed for WARN cases only + if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR: + self.bad = self.bad or cond + + +class LUClusterVerify(NoHooksLU): + """Submits all jobs necessary to verify the cluster. + + """ + REQ_BGL = False + + def ExpandNames(self): + self.needed_locks = {} + + def Exec(self, feedback_fn): + jobs = [] + + if self.op.group_name: + groups = [self.op.group_name] + depends_fn = lambda: None + else: + groups = self.cfg.GetNodeGroupList() + + # Verify global configuration + jobs.append([opcodes.OpClusterVerifyConfig()]) + + # Always depend on global verification + depends_fn = lambda: [(-len(jobs), [])] + + jobs.extend([opcodes.OpClusterVerifyGroup(group_name=group, + depends=depends_fn())] + for group in groups) + + # Fix up all parameters + for op in itertools.chain(*jobs): # pylint: disable=W0142 + op.debug_simulate_errors = self.op.debug_simulate_errors + op.verbose = self.op.verbose + op.error_codes = self.op.error_codes + try: + op.skip_checks = self.op.skip_checks + except AttributeError: + assert not isinstance(op, opcodes.OpClusterVerifyGroup) + + return ResultWithJobs(jobs) + + +class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors): + """Verifies the cluster config. + + """ + REQ_BGL = True + + def _VerifyHVP(self, hvp_data): + """Verifies locally the syntax of the hypervisor parameters. + + """ + for item, hv_name, hv_params in hvp_data: + msg = ("hypervisor %s parameters syntax check (source %s): %%s" % + (item, hv_name)) + try: + hv_class = hypervisor.GetHypervisor(hv_name) + utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES) + hv_class.CheckParameterSyntax(hv_params) + except errors.GenericError, err: + self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err)) + + def ExpandNames(self): + # Information can be safely retrieved as the BGL is acquired in exclusive + # mode + assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER) + self.all_group_info = self.cfg.GetAllNodeGroupsInfo() + self.all_node_info = self.cfg.GetAllNodesInfo() + self.all_inst_info = self.cfg.GetAllInstancesInfo() + self.needed_locks = {} + + def Exec(self, feedback_fn): + """Verify integrity of cluster, performing various test on nodes. + + """ + self.bad = False + self._feedback_fn = feedback_fn + + feedback_fn("* Verifying cluster config") + + for msg in self.cfg.VerifyConfig(): + self._ErrorIf(True, self.ECLUSTERCFG, None, msg) + + feedback_fn("* Verifying cluster certificate files") + + for cert_filename in constants.ALL_CERT_FILES: + (errcode, msg) = _VerifyCertificate(cert_filename) + self._ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode) + + feedback_fn("* Verifying hypervisor parameters") + + self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(), + self.all_inst_info.values())) + + feedback_fn("* Verifying all nodes belong to an existing group") + + # We do this verification here because, should this bogus circumstance + # occur, it would never be caught by VerifyGroup, which only acts on + # nodes/instances reachable from existing node groups. + + dangling_nodes = set(node.name for node in self.all_node_info.values() + if node.group not in self.all_group_info) + + dangling_instances = {} + no_node_instances = [] + + for inst in self.all_inst_info.values(): + if inst.primary_node in dangling_nodes: + dangling_instances.setdefault(inst.primary_node, []).append(inst.name) + elif inst.primary_node not in self.all_node_info: + no_node_instances.append(inst.name) + + pretty_dangling = [ + "%s (%s)" % + (node.name, + utils.CommaJoin(dangling_instances.get(node.name, + ["no instances"]))) + for node in dangling_nodes] + + self._ErrorIf(bool(dangling_nodes), self.ECLUSTERDANGLINGNODES, None, + "the following nodes (and their instances) belong to a non" + " existing group: %s", utils.CommaJoin(pretty_dangling)) + + self._ErrorIf(bool(no_node_instances), self.ECLUSTERDANGLINGINST, None, + "the following instances have a non-existing primary-node:" + " %s", utils.CommaJoin(no_node_instances)) + + return not self.bad + + +class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): + """Verifies the status of a node group. + + """ + HPATH = "cluster-verify" + HTYPE = constants.HTYPE_CLUSTER + REQ_BGL = False + _HOOKS_INDENT_RE = re.compile("^", re.M) class NodeImage(object): @@ -1382,48 +1700,90 @@ class LUClusterVerify(LogicalUnit): self.oslist = {} def ExpandNames(self): + # This raises errors.OpPrereqError on its own: + self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) + + # Get instances in node group; this is unsafe and needs verification later + inst_names = self.cfg.GetNodeGroupInstances(self.group_uuid) + self.needed_locks = { - locking.LEVEL_NODE: locking.ALL_SET, - locking.LEVEL_INSTANCE: locking.ALL_SET, - } - self.share_locks = dict.fromkeys(locking.LEVELS, 1) + locking.LEVEL_INSTANCE: inst_names, + locking.LEVEL_NODEGROUP: [self.group_uuid], + locking.LEVEL_NODE: [], + } - def _Error(self, ecode, item, msg, *args, **kwargs): - """Format an error message. + self.share_locks = _ShareAll() - Based on the opcode's error_codes parameter, either format a - parseable error code, or a simpler error string. + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE: + # Get members of node group; this is unsafe and needs verification later + nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members) - This must be called only from Exec and functions called from Exec. + all_inst_info = self.cfg.GetAllInstancesInfo() - """ - ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) - itype, etxt = ecode - # first complete the msg - if args: - msg = msg % args - # then format the whole message - if self.op.error_codes: - msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg) - else: - if item: - item = " " + item - else: - item = "" - msg = "%s: %s%s: %s" % (ltype, itype, item, msg) - # and finally report it via the feedback_fn - self._feedback_fn(" - %s" % msg) + # In Exec(), we warn about mirrored instances that have primary and + # secondary living in separate node groups. To fully verify that + # volumes for these instances are healthy, we will need to do an + # extra call to their secondaries. We ensure here those nodes will + # be locked. + for inst in self.owned_locks(locking.LEVEL_INSTANCE): + # Important: access only the instances whose lock is owned + if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR: + nodes.update(all_inst_info[inst].secondary_nodes) - def _ErrorIf(self, cond, *args, **kwargs): - """Log an error message if the passed condition is True. + self.needed_locks[locking.LEVEL_NODE] = nodes - """ - cond = bool(cond) or self.op.debug_simulate_errors - if cond: - self._Error(*args, **kwargs) - # do not mark the operation as failed for WARN cases only - if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR: - self.bad = self.bad or cond + def CheckPrereq(self): + assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP) + self.group_info = self.cfg.GetNodeGroup(self.group_uuid) + + group_nodes = set(self.group_info.members) + group_instances = self.cfg.GetNodeGroupInstances(self.group_uuid) + + unlocked_nodes = \ + group_nodes.difference(self.owned_locks(locking.LEVEL_NODE)) + + unlocked_instances = \ + group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE)) + + if unlocked_nodes: + raise errors.OpPrereqError("Missing lock for nodes: %s" % + utils.CommaJoin(unlocked_nodes)) + + if unlocked_instances: + raise errors.OpPrereqError("Missing lock for instances: %s" % + utils.CommaJoin(unlocked_instances)) + + self.all_node_info = self.cfg.GetAllNodesInfo() + self.all_inst_info = self.cfg.GetAllInstancesInfo() + + self.my_node_names = utils.NiceSort(group_nodes) + self.my_inst_names = utils.NiceSort(group_instances) + + self.my_node_info = dict((name, self.all_node_info[name]) + for name in self.my_node_names) + + self.my_inst_info = dict((name, self.all_inst_info[name]) + for name in self.my_inst_names) + + # We detect here the nodes that will need the extra RPC calls for verifying + # split LV volumes; they should be locked. + extra_lv_nodes = set() + + for inst in self.my_inst_info.values(): + if inst.disk_template in constants.DTS_INT_MIRROR: + group = self.my_node_info[inst.primary_node].group + for nname in inst.secondary_nodes: + if self.all_node_info[nname].group != group: + extra_lv_nodes.add(nname) + + unlocked_lv_nodes = \ + extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE)) + + if unlocked_lv_nodes: + raise errors.OpPrereqError("these nodes could be locked: %s" % + utils.CommaJoin(unlocked_lv_nodes)) + self.extra_lv_nodes = list(extra_lv_nodes) def _VerifyNode(self, ninfo, nresult): """Perform some basic validation on data returned from a node. @@ -1441,7 +1801,7 @@ class LUClusterVerify(LogicalUnit): """ node = ninfo.name - _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + _ErrorIf = self._ErrorIf # pylint: disable=C0103 # main result, nresult should be a non-empty dict test = not nresult or not isinstance(nresult, dict) @@ -1510,7 +1870,7 @@ class LUClusterVerify(LogicalUnit): """ node = ninfo.name - _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + _ErrorIf = self._ErrorIf # pylint: disable=C0103 ntime = nresult.get(constants.NV_TIME, None) try: @@ -1531,7 +1891,7 @@ class LUClusterVerify(LogicalUnit): ntime_diff) def _VerifyNodeLVM(self, ninfo, nresult, vg_name): - """Check the node time. + """Check the node LVM results. @type ninfo: L{objects.Node} @param ninfo: the node to check @@ -1543,7 +1903,7 @@ class LUClusterVerify(LogicalUnit): return node = ninfo.name - _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + _ErrorIf = self._ErrorIf # pylint: disable=C0103 # checks vg existence and size > 20G vglist = nresult.get(constants.NV_VGLIST, None) @@ -1567,8 +1927,31 @@ class LUClusterVerify(LogicalUnit): _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV" " '%s' of VG '%s'", pvname, owner_vg) + def _VerifyNodeBridges(self, ninfo, nresult, bridges): + """Check the node bridges. + + @type ninfo: L{objects.Node} + @param ninfo: the node to check + @param nresult: the remote results for the node + @param bridges: the expected list of bridges + + """ + if not bridges: + return + + node = ninfo.name + _ErrorIf = self._ErrorIf # pylint: disable=C0103 + + missing = nresult.get(constants.NV_BRIDGES, None) + test = not isinstance(missing, list) + _ErrorIf(test, self.ENODENET, node, + "did not return valid bridge information") + if not test: + _ErrorIf(bool(missing), self.ENODENET, node, "missing bridges: %s" % + utils.CommaJoin(sorted(missing))) + def _VerifyNodeNetwork(self, ninfo, nresult): - """Check the node time. + """Check the node network connectivity results. @type ninfo: L{objects.Node} @param ninfo: the node to check @@ -1576,7 +1959,7 @@ class LUClusterVerify(LogicalUnit): """ node = ninfo.name - _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + _ErrorIf = self._ErrorIf # pylint: disable=C0103 test = constants.NV_NODELIST not in nresult _ErrorIf(test, self.ENODESSH, node, @@ -1617,7 +2000,7 @@ class LUClusterVerify(LogicalUnit): available on the instance's node. """ - _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + _ErrorIf = self._ErrorIf # pylint: disable=C0103 node_current = instanceconfig.primary_node node_vol_should = {} @@ -1640,12 +2023,6 @@ class LUClusterVerify(LogicalUnit): "instance not running on its primary node %s", node_current) - for node, n_img in node_image.items(): - if node != node_current: - test = instance in n_img.instances - _ErrorIf(test, self.EINSTANCEWRONGNODE, instance, - "instance should not run on node %s", node) - diskdata = [(nname, success, status, idx) for (nname, disks) in diskstatus.items() for idx, (success, status) in enumerate(disks)] @@ -1685,18 +2062,6 @@ class LUClusterVerify(LogicalUnit): self._ErrorIf(test, self.ENODEORPHANLV, node, "volume %s is unknown", volume) - def _VerifyOrphanInstances(self, instancelist, node_image): - """Verify the list of running instances. - - This checks what instances are running but unknown to the cluster. - - """ - for node, n_img in node_image.items(): - for o_inst in n_img.instances: - test = o_inst not in instancelist - self._ErrorIf(test, self.ENODEORPHANINSTANCE, node, - "instance %s on node %s should not exist", o_inst, node) - def _VerifyNPlusOneMemory(self, node_image, instance_cfg): """Verify N+1 Memory Resilience. @@ -1743,7 +2108,7 @@ class LUClusterVerify(LogicalUnit): @param all_nvinfo: RPC results """ - node_names = frozenset(node.name for node in nodeinfo) + node_names = frozenset(node.name for node in nodeinfo if not node.offline) assert master_node in node_names assert (len(files_all | files_all_opt | files_mc | files_vm) == @@ -1762,6 +2127,9 @@ class LUClusterVerify(LogicalUnit): fileinfo = dict((filename, {}) for filename in file2nodefn.keys()) for node in nodeinfo: + if node.offline: + continue + nresult = all_nvinfo[node.name] if nresult.fail_msg or not nresult.payload: @@ -1796,8 +2164,8 @@ class LUClusterVerify(LogicalUnit): # All or no nodes errorif(missing_file and missing_file != node_names, cls.ECLUSTERFILECHECK, None, - "File %s is optional, but it must exist on all or no nodes (not" - " found on %s)", + "File %s is optional, but it must exist on all or no" + " nodes (not found on %s)", filename, utils.CommaJoin(utils.NiceSort(missing_file))) else: errorif(missing_file, cls.ECLUSTERFILECHECK, None, @@ -1832,7 +2200,7 @@ class LUClusterVerify(LogicalUnit): """ node = ninfo.name - _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + _ErrorIf = self._ErrorIf # pylint: disable=C0103 if drbd_helper: helper_result = nresult.get(constants.NV_DRBDHELPER, None) @@ -1891,7 +2259,7 @@ class LUClusterVerify(LogicalUnit): """ node = ninfo.name - _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + _ErrorIf = self._ErrorIf # pylint: disable=C0103 remote_os = nresult.get(constants.NV_OSLIST, None) test = (not isinstance(remote_os, list) or @@ -1932,7 +2300,7 @@ class LUClusterVerify(LogicalUnit): """ node = ninfo.name - _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + _ErrorIf = self._ErrorIf # pylint: disable=C0103 assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?" @@ -1945,11 +2313,6 @@ class LUClusterVerify(LogicalUnit): _ErrorIf(len(os_data) > 1, self.ENODEOS, node, "OS '%s' has multiple entries (first one shadows the rest): %s", os_name, utils.CommaJoin([v[0] for v in os_data])) - # this will catched in backend too - _ErrorIf(compat.any(v >= constants.OS_API_V15 for v in f_api) - and not f_var, self.ENODEOS, node, - "OS %s with API at least %d does not declare any variant", - os_name, constants.OS_API_V15) # comparisons with the 'base' image test = os_name not in base.oslist _ErrorIf(test, self.ENODEOS, node, @@ -2007,7 +2370,7 @@ class LUClusterVerify(LogicalUnit): """ node = ninfo.name - _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + _ErrorIf = self._ErrorIf # pylint: disable=C0103 nimg.lvm_fail = True lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data") @@ -2055,7 +2418,7 @@ class LUClusterVerify(LogicalUnit): """ node = ninfo.name - _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + _ErrorIf = self._ErrorIf # pylint: disable=C0103 # try to read free memory (from the hypervisor) hv_info = nresult.get(constants.NV_HVINFO, None) @@ -2097,7 +2460,7 @@ class LUClusterVerify(LogicalUnit): list of tuples (success, payload) """ - _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + _ErrorIf = self._ErrorIf # pylint: disable=C0103 node_disks = {} node_disks_devonly = {} @@ -2173,25 +2536,11 @@ class LUClusterVerify(LogicalUnit): len(nnames) <= len(instanceinfo[inst].all_nodes) and compat.all(isinstance(s, (tuple, list)) and len(s) == 2 for s in statuses) - for inst, nnames in instdisk.items() - for nname, statuses in nnames.items()) - assert set(instdisk) == set(instanceinfo), "instdisk consistency failure" - - return instdisk - - def _VerifyHVP(self, hvp_data): - """Verifies locally the syntax of the hypervisor parameters. - - """ - for item, hv_name, hv_params in hvp_data: - msg = ("hypervisor %s parameters syntax check (source %s): %%s" % - (item, hv_name)) - try: - hv_class = hypervisor.GetHypervisor(hv_name) - utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES) - hv_class.CheckParameterSyntax(hv_params) - except errors.GenericError, err: - self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err)) + for inst, nnames in instdisk.items() + for nname, statuses in nnames.items()) + assert set(instdisk) == set(instanceinfo), "instdisk consistency failure" + + return instdisk def BuildHooksEnv(self): """Build hooks env. @@ -2200,14 +2549,12 @@ class LUClusterVerify(LogicalUnit): the output be logged in the verify output and the verification to fail. """ - cfg = self.cfg - env = { - "CLUSTER_TAGS": " ".join(cfg.GetClusterInfo().GetTags()) + "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()) } env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags())) - for node in cfg.GetAllNodesInfo().values()) + for node in self.my_node_info.values()) return env @@ -2215,37 +2562,32 @@ class LUClusterVerify(LogicalUnit): """Build hooks nodes. """ - return ([], self.cfg.GetNodeList()) + return ([], self.my_node_names) def Exec(self, feedback_fn): - """Verify integrity of cluster, performing various test on nodes. + """Verify integrity of the node group, performing various test on nodes. """ - # This method has too many local variables. pylint: disable-msg=R0914 + # This method has too many local variables. pylint: disable=R0914 + feedback_fn("* Verifying group '%s'" % self.group_info.name) + + if not self.my_node_names: + # empty node group + feedback_fn("* Empty node group, skipping verification") + return True + self.bad = False - _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + _ErrorIf = self._ErrorIf # pylint: disable=C0103 verbose = self.op.verbose self._feedback_fn = feedback_fn - feedback_fn("* Verifying global settings") - for msg in self.cfg.VerifyConfig(): - _ErrorIf(True, self.ECLUSTERCFG, None, msg) - - # Check the cluster certificates - for cert_filename in constants.ALL_CERT_FILES: - (errcode, msg) = _VerifyCertificate(cert_filename) - _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode) vg_name = self.cfg.GetVGName() drbd_helper = self.cfg.GetDRBDHelper() - hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors cluster = self.cfg.GetClusterInfo() - nodelist = utils.NiceSort(self.cfg.GetNodeList()) - nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist] - nodeinfo_byname = dict(zip(nodelist, nodeinfo)) - instancelist = utils.NiceSort(self.cfg.GetInstanceList()) - instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname)) - for iname in instancelist) groupinfo = self.cfg.GetAllNodeGroupsInfo() + hypervisors = cluster.enabled_hypervisors + node_data_list = [self.my_node_info[name] for name in self.my_node_names] + i_non_redundant = [] # Non redundant instances i_non_a_balanced = [] # Non auto-balanced instances n_offline = 0 # Count of offline nodes @@ -2261,37 +2603,32 @@ class LUClusterVerify(LogicalUnit): master_node = self.master_node = self.cfg.GetMasterNode() master_ip = self.cfg.GetMasterIP() - # Compute the set of hypervisor parameters - hvp_data = [] - for hv_name in hypervisors: - hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name))) - for os_name, os_hvp in cluster.os_hvp.items(): - for hv_name, hv_params in os_hvp.items(): - if not hv_params: - continue - full_params = cluster.GetHVDefaults(hv_name, os_name=os_name) - hvp_data.append(("os %s" % os_name, hv_name, full_params)) - # TODO: collapse identical parameter values in a single one - for instance in instanceinfo.values(): - if not instance.hvparams: - continue - hvp_data.append(("instance %s" % instance.name, instance.hypervisor, - cluster.FillHV(instance))) - # and verify them locally - self._VerifyHVP(hvp_data) + feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names)) + + # We will make nodes contact all nodes in their group, and one node from + # every other group. + # TODO: should it be a *random* node, different every time? + online_nodes = [node.name for node in node_data_list if not node.offline] + other_group_nodes = {} + + for name in sorted(self.all_node_info): + node = self.all_node_info[name] + if (node.group not in other_group_nodes + and node.group != self.group_uuid + and not node.offline): + other_group_nodes[node.group] = node.name - feedback_fn("* Gathering data (%d nodes)" % len(nodelist)) node_verify_param = { constants.NV_FILELIST: utils.UniqueSequence(filename for files in filemap for filename in files), - constants.NV_NODELIST: [node.name for node in nodeinfo - if not node.offline], + constants.NV_NODELIST: online_nodes + other_group_nodes.values(), constants.NV_HYPERVISOR: hypervisors, - constants.NV_HVPARAMS: hvp_data, - constants.NV_NODENETTEST: [(node.name, node.primary_ip, - node.secondary_ip) for node in nodeinfo + constants.NV_HVPARAMS: + _GetAllHypervisorParameters(cluster, self.all_inst_info.values()), + constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip) + for node in node_data_list if not node.offline], constants.NV_INSTANCELIST: hypervisors, constants.NV_VERSION: None, @@ -2312,15 +2649,30 @@ class LUClusterVerify(LogicalUnit): if drbd_helper: node_verify_param[constants.NV_DRBDHELPER] = drbd_helper + # bridge checks + # FIXME: this needs to be changed per node-group, not cluster-wide + bridges = set() + default_nicpp = cluster.nicparams[constants.PP_DEFAULT] + if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED: + bridges.add(default_nicpp[constants.NIC_LINK]) + for instance in self.my_inst_info.values(): + for nic in instance.nics: + full_nic = cluster.SimpleFillNIC(nic.nicparams) + if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED: + bridges.add(full_nic[constants.NIC_LINK]) + + if bridges: + node_verify_param[constants.NV_BRIDGES] = list(bridges) + # Build our expected cluster state node_image = dict((node.name, self.NodeImage(offline=node.offline, name=node.name, vm_capable=node.vm_capable)) - for node in nodeinfo) + for node in node_data_list) # Gather OOB paths oob_paths = [] - for node in nodeinfo: + for node in self.all_node_info.values(): path = _SupportsOob(self.cfg, node) if path and path not in oob_paths: oob_paths.append(path) @@ -2328,14 +2680,13 @@ class LUClusterVerify(LogicalUnit): if oob_paths: node_verify_param[constants.NV_OOB_PATHS] = oob_paths - for instance in instancelist: - inst_config = instanceinfo[instance] + for instance in self.my_inst_names: + inst_config = self.my_inst_info[instance] for nname in inst_config.all_nodes: if nname not in node_image: - # ghost node gnode = self.NodeImage(name=nname) - gnode.ghost = True + gnode.ghost = (nname not in self.all_node_info) node_image[nname] = gnode inst_config.MapLVsByNode(node_vol_should) @@ -2358,23 +2709,60 @@ class LUClusterVerify(LogicalUnit): # time before and after executing the request, we can at least have a time # window. nvinfo_starttime = time.time() - all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param, + all_nvinfo = self.rpc.call_node_verify(self.my_node_names, + node_verify_param, self.cfg.GetClusterName()) nvinfo_endtime = time.time() + if self.extra_lv_nodes and vg_name is not None: + extra_lv_nvinfo = \ + self.rpc.call_node_verify(self.extra_lv_nodes, + {constants.NV_LVLIST: vg_name}, + self.cfg.GetClusterName()) + else: + extra_lv_nvinfo = {} + all_drbd_map = self.cfg.ComputeDRBDMap() - feedback_fn("* Gathering disk information (%s nodes)" % len(nodelist)) - instdisk = self._CollectDiskInfo(nodelist, node_image, instanceinfo) + feedback_fn("* Gathering disk information (%s nodes)" % + len(self.my_node_names)) + instdisk = self._CollectDiskInfo(self.my_node_names, node_image, + self.my_inst_info) feedback_fn("* Verifying configuration file consistency") - self._VerifyFiles(_ErrorIf, nodeinfo, master_node, all_nvinfo, filemap) + + # If not all nodes are being checked, we need to make sure the master node + # and a non-checked vm_capable node are in the list. + absent_nodes = set(self.all_node_info).difference(self.my_node_info) + if absent_nodes: + vf_nvinfo = all_nvinfo.copy() + vf_node_info = list(self.my_node_info.values()) + additional_nodes = [] + if master_node not in self.my_node_info: + additional_nodes.append(master_node) + vf_node_info.append(self.all_node_info[master_node]) + # Add the first vm_capable node we find which is not included + for node in absent_nodes: + nodeinfo = self.all_node_info[node] + if nodeinfo.vm_capable and not nodeinfo.offline: + additional_nodes.append(node) + vf_node_info.append(self.all_node_info[node]) + break + key = constants.NV_FILELIST + vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes, + {key: node_verify_param[key]}, + self.cfg.GetClusterName())) + else: + vf_nvinfo = all_nvinfo + vf_node_info = self.my_node_info.values() + + self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap) feedback_fn("* Verifying node status") refos_img = None - for node_i in nodeinfo: + for node_i in node_data_list: node = node_i.name nimg = node_image[node] @@ -2411,23 +2799,41 @@ class LUClusterVerify(LogicalUnit): if nimg.vm_capable: self._VerifyNodeLVM(node_i, nresult, vg_name) - self._VerifyNodeDrbd(node_i, nresult, instanceinfo, drbd_helper, + self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper, all_drbd_map) self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name) self._UpdateNodeInstances(node_i, nresult, nimg) self._UpdateNodeInfo(node_i, nresult, nimg, vg_name) self._UpdateNodeOS(node_i, nresult, nimg) + if not nimg.os_fail: if refos_img is None: refos_img = nimg self._VerifyNodeOS(node_i, nimg, refos_img) + self._VerifyNodeBridges(node_i, nresult, bridges) + + # Check whether all running instancies are primary for the node. (This + # can no longer be done from _VerifyInstance below, since some of the + # wrong instances could be from other node groups.) + non_primary_inst = set(nimg.instances).difference(nimg.pinst) + + for inst in non_primary_inst: + test = inst in self.all_inst_info + _ErrorIf(test, self.EINSTANCEWRONGNODE, inst, + "instance should not run on node %s", node_i.name) + _ErrorIf(not test, self.ENODEORPHANINSTANCE, node_i.name, + "node is running unknown instance %s", inst) + + for node, result in extra_lv_nvinfo.items(): + self._UpdateNodeVolumes(self.all_node_info[node], result.payload, + node_image[node], vg_name) feedback_fn("* Verifying instance status") - for instance in instancelist: + for instance in self.my_inst_names: if verbose: feedback_fn("* Verifying instance %s" % instance) - inst_config = instanceinfo[instance] + inst_config = self.my_inst_info[instance] self._VerifyInstance(instance, inst_config, node_image, instdisk[instance]) inst_nodes_offline = [] @@ -2462,7 +2868,7 @@ class LUClusterVerify(LogicalUnit): instance_groups = {} for node in instance_nodes: - instance_groups.setdefault(nodeinfo_byname[node].group, + instance_groups.setdefault(self.all_node_info[node].group, []).append(node) pretty_list = [ @@ -2501,14 +2907,22 @@ class LUClusterVerify(LogicalUnit): feedback_fn("* Verifying orphan volumes") reserved = utils.FieldSet(*cluster.reserved_lvs) - self._VerifyOrphanVolumes(node_vol_should, node_image, reserved) - feedback_fn("* Verifying orphan instances") - self._VerifyOrphanInstances(instancelist, node_image) + # We will get spurious "unknown volume" warnings if any node of this group + # is secondary for an instance whose primary is in another group. To avoid + # them, we find these instances and add their volumes to node_vol_should. + for inst in self.all_inst_info.values(): + for secondary in inst.secondary_nodes: + if (secondary in self.my_node_info + and inst.name not in self.my_inst_info): + inst.MapLVsByNode(node_vol_should) + break + + self._VerifyOrphanVolumes(node_vol_should, node_image, reserved) if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks: feedback_fn("* Verifying N+1 Memory redundancy") - self._VerifyNPlusOneMemory(node_image, instanceinfo) + self._VerifyNPlusOneMemory(node_image, self.my_inst_info) feedback_fn("* Other Notes") if i_non_redundant: @@ -2542,9 +2956,12 @@ class LUClusterVerify(LogicalUnit): and hook results """ - # We only really run POST phase hooks, and are only interested in - # their results - if phase == constants.HOOKS_PHASE_POST: + # We only really run POST phase hooks, only for non-empty groups, + # and are only interested in their results + if not self.my_node_names: + # empty node group + pass + elif phase == constants.HOOKS_PHASE_POST: # Used to change hooks' output to proper indentation feedback_fn("* Hooks Results") assert hooks_results, "invalid result from hooks" @@ -2566,11 +2983,11 @@ class LUClusterVerify(LogicalUnit): self._ErrorIf(test, self.ENODEHOOKS, node_name, "Script %s failed, output:", script) if test: - output = self._HOOKS_INDENT_RE.sub(' ', output) + output = self._HOOKS_INDENT_RE.sub(" ", output) feedback_fn("%s" % output) lu_result = 0 - return lu_result + return lu_result class LUClusterVerifyDisks(NoHooksLU): @@ -2580,11 +2997,91 @@ class LUClusterVerifyDisks(NoHooksLU): REQ_BGL = False def ExpandNames(self): + self.share_locks = _ShareAll() self.needed_locks = { - locking.LEVEL_NODE: locking.ALL_SET, - locking.LEVEL_INSTANCE: locking.ALL_SET, - } - self.share_locks = dict.fromkeys(locking.LEVELS, 1) + locking.LEVEL_NODEGROUP: locking.ALL_SET, + } + + def Exec(self, feedback_fn): + group_names = self.owned_locks(locking.LEVEL_NODEGROUP) + + # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group + return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)] + for group in group_names]) + + +class LUGroupVerifyDisks(NoHooksLU): + """Verifies the status of all disks in a node group. + + """ + REQ_BGL = False + + def ExpandNames(self): + # Raises errors.OpPrereqError on its own if group can't be found + self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) + + self.share_locks = _ShareAll() + self.needed_locks = { + locking.LEVEL_INSTANCE: [], + locking.LEVEL_NODEGROUP: [], + locking.LEVEL_NODE: [], + } + + def DeclareLocks(self, level): + if level == locking.LEVEL_INSTANCE: + assert not self.needed_locks[locking.LEVEL_INSTANCE] + + # Lock instances optimistically, needs verification once node and group + # locks have been acquired + self.needed_locks[locking.LEVEL_INSTANCE] = \ + self.cfg.GetNodeGroupInstances(self.group_uuid) + + elif level == locking.LEVEL_NODEGROUP: + assert not self.needed_locks[locking.LEVEL_NODEGROUP] + + self.needed_locks[locking.LEVEL_NODEGROUP] = \ + set([self.group_uuid] + + # Lock all groups used by instances optimistically; this requires + # going via the node before it's locked, requiring verification + # later on + [group_uuid + for instance_name in self.owned_locks(locking.LEVEL_INSTANCE) + for group_uuid in self.cfg.GetInstanceNodeGroups(instance_name)]) + + elif level == locking.LEVEL_NODE: + # This will only lock the nodes in the group to be verified which contain + # actual instances + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + self._LockInstancesNodes() + + # Lock all nodes in group to be verified + assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP) + member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members + self.needed_locks[locking.LEVEL_NODE].extend(member_nodes) + + def CheckPrereq(self): + owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE)) + owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) + owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE)) + + assert self.group_uuid in owned_groups + + # Check if locked instances are still correct + _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances) + + # Get instance information + self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances)) + + # Check if node groups for locked instances are still correct + for (instance_name, inst) in self.instances.items(): + assert owned_nodes.issuperset(inst.all_nodes), \ + "Instance %s's nodes changed while we kept the lock" % instance_name + + inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name, + owned_groups) + + assert self.group_uuid in inst_groups, \ + "Instance %s has no node in group %s" % (instance_name, self.group_uuid) def Exec(self, feedback_fn): """Verify integrity of cluster disks. @@ -2595,50 +3092,41 @@ class LUClusterVerifyDisks(NoHooksLU): missing volumes """ - result = res_nodes, res_instances, res_missing = {}, [], {} + res_nodes = {} + res_instances = set() + res_missing = {} - nodes = utils.NiceSort(self.cfg.GetVmCapableNodeList()) - instances = self.cfg.GetAllInstancesInfo().values() + nv_dict = _MapInstanceDisksToNodes([inst + for inst in self.instances.values() + if inst.admin_up]) - nv_dict = {} - for inst in instances: - inst_lvs = {} - if not inst.admin_up: - continue - inst.MapLVsByNode(inst_lvs) - # transform { iname: {node: [vol,],},} to {(node, vol): iname} - for node, vol_list in inst_lvs.iteritems(): - for vol in vol_list: - nv_dict[(node, vol)] = inst - - if not nv_dict: - return result - - node_lvs = self.rpc.call_lv_list(nodes, []) - for node, node_res in node_lvs.items(): - if node_res.offline: - continue - msg = node_res.fail_msg - if msg: - logging.warning("Error enumerating LVs on node %s: %s", node, msg) - res_nodes[node] = msg - continue + if nv_dict: + nodes = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) & + set(self.cfg.GetVmCapableNodeList())) + + node_lvs = self.rpc.call_lv_list(nodes, []) - lvs = node_res.payload - for lv_name, (_, _, lv_online) in lvs.items(): - inst = nv_dict.pop((node, lv_name), None) - if (not lv_online and inst is not None - and inst.name not in res_instances): - res_instances.append(inst.name) + for (node, node_res) in node_lvs.items(): + if node_res.offline: + continue - # any leftover items in nv_dict are missing LVs, let's arrange the - # data better - for key, inst in nv_dict.iteritems(): - if inst.name not in res_missing: - res_missing[inst.name] = [] - res_missing[inst.name].append(key) + msg = node_res.fail_msg + if msg: + logging.warning("Error enumerating LVs on node %s: %s", node, msg) + res_nodes[node] = msg + continue - return result + for lv_name, (_, _, lv_online) in node_res.payload.items(): + inst = nv_dict.pop((node, lv_name), None) + if not (lv_online or inst is None): + res_instances.add(inst) + + # any leftover items in nv_dict are missing LVs, let's arrange the data + # better + for key, inst in nv_dict.iteritems(): + res_missing.setdefault(inst, []).append(key) + + return (res_nodes, list(res_instances), res_missing) class LUClusterRepairDiskSizes(NoHooksLU): @@ -2661,7 +3149,7 @@ class LUClusterRepairDiskSizes(NoHooksLU): locking.LEVEL_NODE: locking.ALL_SET, locking.LEVEL_INSTANCE: locking.ALL_SET, } - self.share_locks = dict.fromkeys(locking.LEVELS, 1) + self.share_locks = _ShareAll() def DeclareLocks(self, level): if level == locking.LEVEL_NODE and self.wanted_names is not None: @@ -2674,10 +3162,10 @@ class LUClusterRepairDiskSizes(NoHooksLU): """ if self.wanted_names is None: - self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE) + self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE) - self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name - in self.wanted_names] + self.wanted_instances = \ + map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names)) def _EnsureChildSizes(self, disk): """Ensure children of the disk have the needed disk size. @@ -2899,7 +3387,7 @@ class LUClusterSetParams(LogicalUnit): " drbd-based instances exist", errors.ECODE_INVAL) - node_list = self.glm.list_owned(locking.LEVEL_NODE) + node_list = self.owned_locks(locking.LEVEL_NODE) # if vg_name not None, checks given volume group on all nodes if self.op.vg_name: @@ -2921,8 +3409,7 @@ class LUClusterSetParams(LogicalUnit): if self.op.drbd_helper: # checks given drbd helper on all nodes helpers = self.rpc.call_drbd_helper(node_list) - for node in node_list: - ninfo = self.cfg.GetNodeInfo(node) + for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list): if ninfo.offline: self.LogInfo("Not checking drbd helper on offline node %s", node) continue @@ -2974,8 +3461,8 @@ class LUClusterSetParams(LogicalUnit): # if we're moving instances to routed, check that they have an ip target_mode = params_filled[constants.NIC_MODE] if target_mode == constants.NIC_MODE_ROUTED and not nic.ip: - nic_errors.append("Instance %s, nic/%d: routed nick with no ip" % - (instance.name, nic_idx)) + nic_errors.append("Instance %s, nic/%d: routed NIC with no ip" + " address" % (instance.name, nic_idx)) if nic_errors: raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" % "\n".join(nic_errors)) @@ -3426,6 +3913,20 @@ class LUOobCommand(NoHooksLU): REG_BGL = False _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE) + def ExpandNames(self): + """Gather locks we need. + + """ + if self.op.node_names: + self.op.node_names = _GetWantedNodes(self, self.op.node_names) + lock_names = self.op.node_names + else: + lock_names = locking.ALL_SET + + self.needed_locks = { + locking.LEVEL_NODE: lock_names, + } + def CheckPrereq(self): """Check prerequisites. @@ -3467,9 +3968,7 @@ class LUOobCommand(NoHooksLU): if self.op.command in self._SKIP_MASTER: assert self.master_node not in self.op.node_names - for node_name in self.op.node_names: - node = self.cfg.GetNodeInfo(node_name) - + for (node_name, node) in self.cfg.GetMultiNodeInfo(self.op.node_names): if node is None: raise errors.OpPrereqError("Node %s not found" % node_name, errors.ECODE_NOENT) @@ -3482,20 +3981,6 @@ class LUOobCommand(NoHooksLU): " not marked offline") % node_name, errors.ECODE_STATE) - def ExpandNames(self): - """Gather locks we need. - - """ - if self.op.node_names: - self.op.node_names = _GetWantedNodes(self, self.op.node_names) - lock_names = self.op.node_names - else: - lock_names = locking.ALL_SET - - self.needed_locks = { - locking.LEVEL_NODE: lock_names, - } - def Exec(self, feedback_fn): """Execute OOB and return result if we expect any. @@ -3503,7 +3988,8 @@ class LUOobCommand(NoHooksLU): master_node = self.master_node ret = [] - for idx, node in enumerate(self.nodes): + for idx, node in enumerate(utils.NiceSort(self.nodes, + key=lambda node: node.name)): node_entry = [(constants.RS_NORMAL, node.name)] ret.append(node_entry) @@ -3599,6 +4085,7 @@ class LUOobCommand(NoHooksLU): raise errors.OpExecError("Check of out-of-band payload failed due to %s" % utils.CommaJoin(errs)) + class _OsQuery(_QueryBase): FIELDS = query.OS_FIELDS @@ -3806,15 +4293,12 @@ class LUNodeRemove(LogicalUnit): node = self.cfg.GetNodeInfo(self.op.node_name) assert node is not None - instance_list = self.cfg.GetInstanceList() - masternode = self.cfg.GetMasterNode() if node.name == masternode: raise errors.OpPrereqError("Node is the master node, failover to another" " node is required", errors.ECODE_INVAL) - for instance_name in instance_list: - instance = self.cfg.GetInstanceInfo(instance_name) + for instance_name, instance in self.cfg.GetAllInstancesInfo(): if node.name in instance.all_nodes: raise errors.OpPrereqError("Instance %s is still running on the node," " please remove first" % instance_name, @@ -3860,7 +4344,7 @@ class _NodeQuery(_QueryBase): def ExpandNames(self, lu): lu.needed_locks = {} - lu.share_locks[locking.LEVEL_NODE] = 1 + lu.share_locks = _ShareAll() if self.names: self.wanted = _GetWantedNodes(lu, self.names) @@ -3871,7 +4355,7 @@ class _NodeQuery(_QueryBase): query.NQ_LIVE in self.requested_data) if self.do_locking: - # if we don't request only static fields, we need to lock the nodes + # If any non-static field is requested we need to lock the nodes lu.needed_locks[locking.LEVEL_NODE] = self.wanted def DeclareLocks(self, lu, level): @@ -3935,7 +4419,7 @@ class LUNodeQuery(NoHooksLU): """Logical unit for querying nodes. """ - # pylint: disable-msg=W0142 + # pylint: disable=W0142 REQ_BGL = False def CheckArguments(self): @@ -3975,13 +4459,11 @@ class LUNodeQueryvols(NoHooksLU): """Computes the list of nodes and their attributes. """ - nodenames = self.glm.list_owned(locking.LEVEL_NODE) + nodenames = self.owned_locks(locking.LEVEL_NODE) volumes = self.rpc.call_node_volumes(nodenames) - ilist = [self.cfg.GetInstanceInfo(iname) for iname - in self.cfg.GetInstanceList()] - - lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist]) + ilist = self.cfg.GetAllInstancesInfo() + vol2inst = _MapInstanceDisksToNodes(ilist.values()) output = [] for node in nodenames: @@ -3993,8 +4475,8 @@ class LUNodeQueryvols(NoHooksLU): self.LogWarning("Can't compute volume data on node %s: %s", node, msg) continue - node_vols = nresult.payload[:] - node_vols.sort(key=lambda vol: vol['dev']) + node_vols = sorted(nresult.payload, + key=operator.itemgetter("dev")) for vol in node_vols: node_output = [] @@ -4002,22 +4484,15 @@ class LUNodeQueryvols(NoHooksLU): if field == "node": val = node elif field == "phys": - val = vol['dev'] + val = vol["dev"] elif field == "vg": - val = vol['vg'] + val = vol["vg"] elif field == "name": - val = vol['name'] + val = vol["name"] elif field == "size": - val = int(float(vol['size'])) + val = int(float(vol["size"])) elif field == "instance": - for inst in ilist: - if node not in lv_by_node[inst]: - continue - if vol['name'] in lv_by_node[inst][node]: - val = inst.name - break - else: - val = '-' + val = vol2inst.get((node, vol["vg"] + "/" + vol["name"]), "-") else: raise errors.ParameterError(field) node_output.append(str(val)) @@ -4053,7 +4528,7 @@ class LUNodeQueryStorage(NoHooksLU): """Computes the list of nodes and their attributes. """ - self.nodes = self.glm.list_owned(locking.LEVEL_NODE) + self.nodes = self.owned_locks(locking.LEVEL_NODE) # Always get name to sort by if constants.SF_NAME in self.op.output_fields: @@ -4115,8 +4590,7 @@ class _InstanceQuery(_QueryBase): def ExpandNames(self, lu): lu.needed_locks = {} - lu.share_locks[locking.LEVEL_INSTANCE] = 1 - lu.share_locks[locking.LEVEL_NODE] = 1 + lu.share_locks = _ShareAll() if self.names: self.wanted = _GetWantedInstances(lu, self.names) @@ -4127,17 +4601,43 @@ class _InstanceQuery(_QueryBase): query.IQ_LIVE in self.requested_data) if self.do_locking: lu.needed_locks[locking.LEVEL_INSTANCE] = self.wanted + lu.needed_locks[locking.LEVEL_NODEGROUP] = [] lu.needed_locks[locking.LEVEL_NODE] = [] lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + self.do_grouplocks = (self.do_locking and + query.IQ_NODES in self.requested_data) + def DeclareLocks(self, lu, level): - if level == locking.LEVEL_NODE and self.do_locking: - lu._LockInstancesNodes() # pylint: disable-msg=W0212 + if self.do_locking: + if level == locking.LEVEL_NODEGROUP and self.do_grouplocks: + assert not lu.needed_locks[locking.LEVEL_NODEGROUP] + + # Lock all groups used by instances optimistically; this requires going + # via the node before it's locked, requiring verification later on + lu.needed_locks[locking.LEVEL_NODEGROUP] = \ + set(group_uuid + for instance_name in lu.owned_locks(locking.LEVEL_INSTANCE) + for group_uuid in lu.cfg.GetInstanceNodeGroups(instance_name)) + elif level == locking.LEVEL_NODE: + lu._LockInstancesNodes() # pylint: disable=W0212 + + @staticmethod + def _CheckGroupLocks(lu): + owned_instances = frozenset(lu.owned_locks(locking.LEVEL_INSTANCE)) + owned_groups = frozenset(lu.owned_locks(locking.LEVEL_NODEGROUP)) + + # Check if node groups for locked instances are still correct + for instance_name in owned_instances: + _CheckInstanceNodeGroups(lu.cfg, instance_name, owned_groups) def _GetQueryData(self, lu): """Computes the list of instances and their attributes. """ + if self.do_grouplocks: + self._CheckGroupLocks(lu) + cluster = lu.cfg.GetClusterInfo() all_info = lu.cfg.GetAllInstancesInfo() @@ -4200,22 +4700,34 @@ class _InstanceQuery(_QueryBase): else: consinfo = None + if query.IQ_NODES in self.requested_data: + node_names = set(itertools.chain(*map(operator.attrgetter("all_nodes"), + instance_list))) + nodes = dict(lu.cfg.GetMultiNodeInfo(node_names)) + groups = dict((uuid, lu.cfg.GetNodeGroup(uuid)) + for uuid in set(map(operator.attrgetter("group"), + nodes.values()))) + else: + nodes = None + groups = None + return query.InstanceQueryData(instance_list, lu.cfg.GetClusterInfo(), disk_usage, offline_nodes, bad_nodes, - live_data, wrongnode_inst, consinfo) + live_data, wrongnode_inst, consinfo, + nodes, groups) class LUQuery(NoHooksLU): """Query for resources/items of a certain kind. """ - # pylint: disable-msg=W0142 + # pylint: disable=W0142 REQ_BGL = False def CheckArguments(self): qcls = _GetQueryImplementation(self.op.what) - self.impl = qcls(self.op.filter, self.op.fields, False) + self.impl = qcls(self.op.filter, self.op.fields, self.op.use_locking) def ExpandNames(self): self.impl.ExpandNames(self) @@ -4231,7 +4743,7 @@ class LUQueryFields(NoHooksLU): """Query for resources/items of a certain kind. """ - # pylint: disable-msg=W0142 + # pylint: disable=W0142 REQ_BGL = False def CheckArguments(self): @@ -4371,9 +4883,7 @@ class LUNodeAdd(LogicalUnit): self.changed_primary_ip = False - for existing_node_name in node_list: - existing_node = cfg.GetNodeInfo(existing_node_name) - + for existing_node_name, existing_node in cfg.GetMultiNodeInfo(node_list): if self.op.readd and node == existing_node_name: if existing_node.secondary_ip != secondary_ip: raise errors.OpPrereqError("Readded node doesn't have the same IP" @@ -4480,7 +4990,7 @@ class LUNodeAdd(LogicalUnit): # later in the procedure; this also means that if the re-add # fails, we are left with a non-offlined, broken node if self.op.readd: - new_node.drained = new_node.offline = False # pylint: disable-msg=W0201 + new_node.drained = new_node.offline = False # pylint: disable=W0201 self.LogInfo("Readding a node, the offline/drained flags were reset") # if we demote the node, we do cleanup later in the procedure new_node.master_candidate = self.master_candidate @@ -4628,8 +5138,8 @@ class LUNodeSetParams(LogicalUnit): instances_keep = [] # Build list of instances to release - for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE): - instance = self.context.cfg.GetInstanceInfo(instance_name) + locked_i = self.owned_locks(locking.LEVEL_INSTANCE) + for instance_name, instance in self.cfg.GetMultiInstanceInfo(locked_i): if (instance.disk_template in constants.DTS_INT_MIRROR and self.op.node_name in instance.all_nodes): instances_keep.append(instance_name) @@ -4637,7 +5147,7 @@ class LUNodeSetParams(LogicalUnit): _ReleaseLocks(self, locking.LEVEL_INSTANCE, keep=instances_keep) - assert (set(self.glm.list_owned(locking.LEVEL_INSTANCE)) == + assert (set(self.owned_locks(locking.LEVEL_INSTANCE)) == set(instances_keep)) def BuildHooksEnv(self): @@ -5256,7 +5766,7 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name): nodeinfo = lu.rpc.call_node_info([node], None, hypervisor_name) nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True, ecode=errors.ECODE_ENVIRON) - free_mem = nodeinfo[node].payload.get('memory_free', None) + free_mem = nodeinfo[node].payload.get("memory_free", None) if not isinstance(free_mem, int): raise errors.OpPrereqError("Can't compute free memory on node %s, result" " was '%s'" % (node, free_mem), @@ -5419,7 +5929,8 @@ class LUInstanceStartup(LogicalUnit): instance = self.instance force = self.op.force - self.cfg.MarkInstanceUp(instance.name) + if not self.op.no_remember: + self.cfg.MarkInstanceUp(instance.name) if self.primary_offline: assert self.op.ignore_offline_nodes @@ -5430,7 +5941,8 @@ class LUInstanceStartup(LogicalUnit): _StartInstanceDisks(self, instance, force) result = self.rpc.call_instance_start(node_current, instance, - self.op.hvparams, self.op.beparams) + self.op.hvparams, self.op.beparams, + self.op.startup_paused) msg = result.fail_msg if msg: _ShutdownInstanceDisks(self, instance) @@ -5520,7 +6032,8 @@ class LUInstanceReboot(LogicalUnit): self.LogInfo("Instance %s was already stopped, starting now", instance.name) _StartInstanceDisks(self, instance, ignore_secondaries) - result = self.rpc.call_instance_start(node_current, instance, None, None) + result = self.rpc.call_instance_start(node_current, instance, + None, None, False) msg = result.fail_msg if msg: _ShutdownInstanceDisks(self, instance) @@ -5584,7 +6097,8 @@ class LUInstanceShutdown(LogicalUnit): node_current = instance.primary_node timeout = self.op.timeout - self.cfg.MarkInstanceDown(instance.name) + if not self.op.no_remember: + self.cfg.MarkInstanceDown(instance.name) if self.primary_offline: assert self.op.ignore_offline_nodes @@ -5697,8 +6211,25 @@ class LUInstanceRecreateDisks(LogicalUnit): HTYPE = constants.HTYPE_INSTANCE REQ_BGL = False + def CheckArguments(self): + # normalise the disk list + self.op.disks = sorted(frozenset(self.op.disks)) + def ExpandNames(self): self._ExpandAndLockInstance() + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + if self.op.nodes: + self.op.nodes = [_ExpandNodeName(self.cfg, n) for n in self.op.nodes] + self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes) + else: + self.needed_locks[locking.LEVEL_NODE] = [] + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE: + # if we replace the nodes, we only need to lock the old primary, + # otherwise we need to lock all nodes for disk re-creation + primary_only = bool(self.op.nodes) + self._LockInstancesNodes(primary_only=primary_only) def BuildHooksEnv(self): """Build hooks env. @@ -5724,12 +6255,31 @@ class LUInstanceRecreateDisks(LogicalUnit): instance = self.cfg.GetInstanceInfo(self.op.instance_name) assert instance is not None, \ "Cannot retrieve locked instance %s" % self.op.instance_name - _CheckNodeOnline(self, instance.primary_node) + if self.op.nodes: + if len(self.op.nodes) != len(instance.all_nodes): + raise errors.OpPrereqError("Instance %s currently has %d nodes, but" + " %d replacement nodes were specified" % + (instance.name, len(instance.all_nodes), + len(self.op.nodes)), + errors.ECODE_INVAL) + assert instance.disk_template != constants.DT_DRBD8 or \ + len(self.op.nodes) == 2 + assert instance.disk_template != constants.DT_PLAIN or \ + len(self.op.nodes) == 1 + primary_node = self.op.nodes[0] + else: + primary_node = instance.primary_node + _CheckNodeOnline(self, primary_node) if instance.disk_template == constants.DT_DISKLESS: raise errors.OpPrereqError("Instance '%s' has no disks" % self.op.instance_name, errors.ECODE_INVAL) - _CheckInstanceDown(self, instance, "cannot recreate disks") + # if we replace nodes *and* the old primary is offline, we don't + # check + assert instance.primary_node in self.needed_locks[locking.LEVEL_NODE] + old_pnode = self.cfg.GetNodeInfo(instance.primary_node) + if not (self.op.nodes and old_pnode.offline): + _CheckInstanceDown(self, instance, "cannot recreate disks") if not self.op.disks: self.op.disks = range(len(instance.disks)) @@ -5738,20 +6288,54 @@ class LUInstanceRecreateDisks(LogicalUnit): if idx >= len(instance.disks): raise errors.OpPrereqError("Invalid disk index '%s'" % idx, errors.ECODE_INVAL) - + if self.op.disks != range(len(instance.disks)) and self.op.nodes: + raise errors.OpPrereqError("Can't recreate disks partially and" + " change the nodes at the same time", + errors.ECODE_INVAL) self.instance = instance def Exec(self, feedback_fn): """Recreate the disks. """ + instance = self.instance + to_skip = [] - for idx, _ in enumerate(self.instance.disks): + mods = [] # keeps track of needed logical_id changes + + for idx, disk in enumerate(instance.disks): if idx not in self.op.disks: # disk idx has not been passed in to_skip.append(idx) continue + # update secondaries for disks, if needed + if self.op.nodes: + if disk.dev_type == constants.LD_DRBD8: + # need to update the nodes and minors + assert len(self.op.nodes) == 2 + assert len(disk.logical_id) == 6 # otherwise disk internals + # have changed + (_, _, old_port, _, _, old_secret) = disk.logical_id + new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name) + new_id = (self.op.nodes[0], self.op.nodes[1], old_port, + new_minors[0], new_minors[1], old_secret) + assert len(disk.logical_id) == len(new_id) + mods.append((idx, new_id)) + + # now that we have passed all asserts above, we can apply the mods + # in a single run (to avoid partial changes) + for idx, new_id in mods: + instance.disks[idx].logical_id = new_id + + # change primary node, if needed + if self.op.nodes: + instance.primary_node = self.op.nodes[0] + self.LogWarning("Changing the instance's nodes, you will have to" + " remove any disks left on the older nodes manually") - _CreateDisks(self, self.instance, to_skip=to_skip) + if self.op.nodes: + self.cfg.Update(instance, feedback_fn) + + _CreateDisks(self, instance, to_skip=to_skip) class LUInstanceRename(LogicalUnit): @@ -5804,8 +6388,9 @@ class LUInstanceRename(LogicalUnit): new_name = self.op.new_name if self.op.name_check: hostname = netutils.GetHostname(name=new_name) - self.LogInfo("Resolved given name '%s' to '%s'", new_name, - hostname.name) + if hostname != new_name: + self.LogInfo("Resolved given name '%s' to '%s'", new_name, + hostname.name) if not utils.MatchNameComponent(self.op.new_name, [hostname.name]): raise errors.OpPrereqError(("Resolved hostname '%s' does not look the" " same as given hostname '%s'") % @@ -5831,7 +6416,7 @@ class LUInstanceRename(LogicalUnit): old_name = inst.name rename_file_storage = False - if (inst.disk_template in (constants.DT_FILE, constants.DT_SHARED_FILE) and + if (inst.disk_template in constants.DTS_FILEBASED and self.op.new_name != inst.name): old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1]) rename_file_storage = True @@ -5965,7 +6550,7 @@ class LUInstanceQuery(NoHooksLU): """Logical unit for querying instances. """ - # pylint: disable-msg=W0142 + # pylint: disable=W0142 REQ_BGL = False def CheckArguments(self): @@ -6312,7 +6897,8 @@ class LUInstanceMove(LogicalUnit): _ShutdownInstanceDisks(self, instance) raise errors.OpExecError("Can't activate the instance's disks") - result = self.rpc.call_instance_start(target_node, instance, None, None) + result = self.rpc.call_instance_start(target_node, instance, + None, None, False) msg = result.fail_msg if msg: _ShutdownInstanceDisks(self, instance) @@ -6329,45 +6915,15 @@ class LUNodeMigrate(LogicalUnit): REQ_BGL = False def CheckArguments(self): - _CheckIAllocatorOrNode(self, "iallocator", "remote_node") + pass def ExpandNames(self): self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) - self.needed_locks = {} - - # Create tasklets for migrating instances for all instances on this node - names = [] - tasklets = [] - - self.lock_all_nodes = False - - for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name): - logging.debug("Migrating instance %s", inst.name) - names.append(inst.name) - - tasklets.append(TLMigrateInstance(self, inst.name, cleanup=False)) - - if inst.disk_template in constants.DTS_EXT_MIRROR: - # We need to lock all nodes, as the iallocator will choose the - # destination nodes afterwards - self.lock_all_nodes = True - - self.tasklets = tasklets - - # Declare node locks - if self.lock_all_nodes: - self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET - else: - self.needed_locks[locking.LEVEL_NODE] = [self.op.node_name] - self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND - - # Declare instance locks - self.needed_locks[locking.LEVEL_INSTANCE] = names - - def DeclareLocks(self, level): - if level == locking.LEVEL_NODE and not self.lock_all_nodes: - self._LockInstancesNodes() + self.share_locks = _ShareAll() + self.needed_locks = { + locking.LEVEL_NODE: [self.op.node_name], + } def BuildHooksEnv(self): """Build hooks env. @@ -6386,6 +6942,30 @@ class LUNodeMigrate(LogicalUnit): nl = [self.cfg.GetMasterNode()] return (nl, nl) + def CheckPrereq(self): + pass + + def Exec(self, feedback_fn): + # Prepare jobs for migration instances + jobs = [ + [opcodes.OpInstanceMigrate(instance_name=inst.name, + mode=self.op.mode, + live=self.op.live, + iallocator=self.op.iallocator, + target_node=self.op.target_node)] + for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name) + ] + + # TODO: Run iallocator in this opcode and pass correct placement options to + # OpInstanceMigrate. Since other jobs can modify the cluster between + # running the iallocator and the actual migration, a good consistency model + # will have to be found. + + assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) == + frozenset([self.op.node_name])) + + return ResultWithJobs(jobs) + class TLMigrateInstance(Tasklet): """Tasklet class for instance migration. @@ -6468,6 +7048,10 @@ class TLMigrateInstance(Tasklet): # self.target_node is already populated, either directly or by the # iallocator run target_node = self.target_node + if self.target_node == instance.primary_node: + raise errors.OpPrereqError("Cannot migrate instance %s" + " to its primary (%s)" % + (instance.name, instance.primary_node)) if len(self.lu.tasklets) == 1: # It is safe to release locks only when we're the only tasklet @@ -6855,8 +7439,12 @@ class TLMigrateInstance(Tasklet): self.feedback_fn("* checking disk consistency between source and target") for dev in instance.disks: # for drbd, these are drbd over lvm - if not _CheckDiskConsistency(self, dev, target_node, False): - if not self.ignore_consistency: + if not _CheckDiskConsistency(self.lu, dev, target_node, False): + if primary_node.offline: + self.feedback_fn("Node %s is offline, ignoring degraded disk %s on" + " target node %s" % + (primary_node.name, dev.iv_name, target_node)) + elif not self.ignore_consistency: raise errors.OpExecError("Disk %s is degraded on target node," " aborting failover" % dev.iv_name) else: @@ -6882,8 +7470,8 @@ class TLMigrateInstance(Tasklet): (instance.name, source_node, msg)) self.feedback_fn("* deactivating the instance's disks on source node") - if not _ShutdownInstanceDisks(self, instance, ignore_primary=True): - raise errors.OpExecError("Can't shut down the instance's disks.") + if not _ShutdownInstanceDisks(self.lu, instance, ignore_primary=True): + raise errors.OpExecError("Can't shut down the instance's disks") instance.primary_node = target_node # distribute new instance config to the other nodes @@ -6891,21 +7479,24 @@ class TLMigrateInstance(Tasklet): # Only start the instance if it's marked as up if instance.admin_up: - self.feedback_fn("* activating the instance's disks on target node") + self.feedback_fn("* activating the instance's disks on target node %s" % + target_node) logging.info("Starting instance %s on node %s", instance.name, target_node) - disks_ok, _ = _AssembleInstanceDisks(self, instance, + disks_ok, _ = _AssembleInstanceDisks(self.lu, instance, ignore_secondaries=True) if not disks_ok: - _ShutdownInstanceDisks(self, instance) + _ShutdownInstanceDisks(self.lu, instance) raise errors.OpExecError("Can't activate the instance's disks") - self.feedback_fn("* starting the instance on the target node") - result = self.rpc.call_instance_start(target_node, instance, None, None) + self.feedback_fn("* starting the instance on the target node %s" % + target_node) + result = self.rpc.call_instance_start(target_node, instance, None, None, + False) msg = result.fail_msg if msg: - _ShutdownInstanceDisks(self, instance) + _ShutdownInstanceDisks(self.lu, instance) raise errors.OpExecError("Could not start instance %s on node %s: %s" % (instance.name, target_node, msg)) @@ -6923,10 +7514,8 @@ class TLMigrateInstance(Tasklet): # directly, or through an iallocator. self.all_nodes = [self.source_node, self.target_node] - self.nodes_ip = { - self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip, - self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip, - } + self.nodes_ip = dict((name, node.secondary_ip) for (name, node) + in self.cfg.GetMultiNodeInfo(self.all_nodes)) if self.failover: feedback_fn("Failover instance %s" % self.instance.name) @@ -7266,7 +7855,7 @@ def _CreateDisks(lu, instance, to_skip=None, target_node=None): pnode = target_node all_nodes = [pnode] - if instance.disk_template in (constants.DT_FILE, constants.DT_SHARED_FILE): + if instance.disk_template in constants.DTS_FILEBASED: file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1]) result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir) @@ -7362,7 +7951,7 @@ def _ComputeDiskSizePerVG(disk_template, disks): if disk_template not in req_size_dict: raise errors.ProgrammerError("Disk template '%s' size requirement" - " is unknown" % disk_template) + " is unknown" % disk_template) return req_size_dict[disk_template] @@ -7384,7 +7973,7 @@ def _ComputeDiskSize(disk_template, disks): if disk_template not in req_size_dict: raise errors.ProgrammerError("Disk template '%s' size requirement" - " is unknown" % disk_template) + " is unknown" % disk_template) return req_size_dict[disk_template] @@ -7539,9 +8128,10 @@ class LUInstanceCreate(LogicalUnit): raise errors.OpPrereqError("Invalid file driver name '%s'" % self.op.file_driver, errors.ECODE_INVAL) - if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir): - raise errors.OpPrereqError("File storage directory path not absolute", - errors.ECODE_INVAL) + if self.op.disk_template == constants.DT_FILE: + opcodes.RequireFileStorage() + elif self.op.disk_template == constants.DT_SHARED_FILE: + opcodes.RequireSharedFileStorage() ### Node/iallocator related checks _CheckIAllocatorOrNode(self, "iallocator", "pnode") @@ -7663,8 +8253,8 @@ class LUInstanceCreate(LogicalUnit): self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET self.op.src_node = None if os.path.isabs(src_path): - raise errors.OpPrereqError("Importing an instance from an absolute" - " path requires a source node option", + raise errors.OpPrereqError("Importing an instance from a path" + " requires a source node option", errors.ECODE_INVAL) else: self.op.src_node = src_node = _ExpandNodeName(self.cfg, src_node) @@ -7683,10 +8273,10 @@ class LUInstanceCreate(LogicalUnit): mode=constants.IALLOCATOR_MODE_ALLOC, name=self.op.instance_name, disk_template=self.op.disk_template, - tags=[], + tags=self.op.tags, os=self.op.os_type, vcpus=self.be_full[constants.BE_VCPUS], - mem_size=self.be_full[constants.BE_MEMORY], + memory=self.be_full[constants.BE_MEMORY], disks=self.disks, nics=nics, hypervisor=self.op.hypervisor, @@ -7740,6 +8330,7 @@ class LUInstanceCreate(LogicalUnit): bep=self.be_full, hvp=self.hv_full, hypervisor_name=self.op.hypervisor, + tags=self.op.tags, )) return env @@ -7766,7 +8357,7 @@ class LUInstanceCreate(LogicalUnit): src_path = self.op.src_path if src_node is None: - locked_nodes = self.glm.list_owned(locking.LEVEL_NODE) + locked_nodes = self.owned_locks(locking.LEVEL_NODE) exp_list = self.rpc.call_export_list(locked_nodes) found = False for node in exp_list: @@ -7841,9 +8432,13 @@ class LUInstanceCreate(LogicalUnit): nics.append(ndict) self.op.nics = nics + if not self.op.tags and einfo.has_option(constants.INISECT_INS, "tags"): + self.op.tags = einfo.get(constants.INISECT_INS, "tags").split() + if (self.op.hypervisor is None and einfo.has_option(constants.INISECT_INS, "hypervisor")): self.op.hypervisor = einfo.get(constants.INISECT_INS, "hypervisor") + if einfo.has_section(constants.INISECT_HYP): # use the export parameters but do not override the ones # specified by the user @@ -7895,10 +8490,40 @@ class LUInstanceCreate(LogicalUnit): if name in os_defs and os_defs[name] == self.op.osparams[name]: del self.op.osparams[name] + def _CalculateFileStorageDir(self): + """Calculate final instance file storage dir. + + """ + # file storage dir calculation/check + self.instance_file_storage_dir = None + if self.op.disk_template in constants.DTS_FILEBASED: + # build the full file storage dir path + joinargs = [] + + if self.op.disk_template == constants.DT_SHARED_FILE: + get_fsd_fn = self.cfg.GetSharedFileStorageDir + else: + get_fsd_fn = self.cfg.GetFileStorageDir + + cfg_storagedir = get_fsd_fn() + if not cfg_storagedir: + raise errors.OpPrereqError("Cluster file storage dir not defined") + joinargs.append(cfg_storagedir) + + if self.op.file_storage_dir is not None: + joinargs.append(self.op.file_storage_dir) + + joinargs.append(self.op.instance_name) + + # pylint: disable=W0142 + self.instance_file_storage_dir = utils.PathJoin(*joinargs) + def CheckPrereq(self): """Check prerequisites. """ + self._CalculateFileStorageDir() + if self.op.mode == constants.INSTANCE_IMPORT: export_info = self._ReadExportInfo() self._ReadExportParams(export_info) @@ -7919,6 +8544,10 @@ class LUInstanceCreate(LogicalUnit): ",".join(enabled_hvs)), errors.ECODE_STATE) + # Check tag validity + for tag in self.op.tags: + objects.TaggableObject.ValidateTag(tag) + # check hypervisor parameter syntax (locally) utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES) filled_hvp = cluster.SimpleFillHV(self.op.hypervisor, self.op.os_type, @@ -8041,7 +8670,7 @@ class LUInstanceCreate(LogicalUnit): disk_images = [] for idx in range(export_disks): - option = 'disk%d_dump' % idx + option = "disk%d_dump" % idx if export_info.has_option(constants.INISECT_INS, option): # FIXME: are the old os-es, disk sizes, etc. useful? export_name = export_info.get(constants.INISECT_INS, option) @@ -8052,9 +8681,9 @@ class LUInstanceCreate(LogicalUnit): self.src_images = disk_images - old_name = export_info.get(constants.INISECT_INS, 'name') + old_name = export_info.get(constants.INISECT_INS, "name") try: - exp_nic_count = export_info.getint(constants.INISECT_INS, 'nic_count') + exp_nic_count = export_info.getint(constants.INISECT_INS, "nic_count") except (TypeError, ValueError), err: raise errors.OpPrereqError("Invalid export file, nic_count is not" " an integer: %s" % str(err), @@ -8062,7 +8691,7 @@ class LUInstanceCreate(LogicalUnit): if self.op.instance_name == old_name: for idx, nic in enumerate(self.nics): if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx: - nic_mac_ini = 'nic%d_mac' % idx + nic_mac_ini = "nic%d_mac" % idx nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini) # ENDIF: self.op.mode == constants.INSTANCE_IMPORT @@ -8226,30 +8855,12 @@ class LUInstanceCreate(LogicalUnit): else: network_port = None - if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE: - # this is needed because os.path.join does not accept None arguments - if self.op.file_storage_dir is None: - string_file_storage_dir = "" - else: - string_file_storage_dir = self.op.file_storage_dir - - # build the full file storage dir path - if self.op.disk_template == constants.DT_SHARED_FILE: - get_fsd_fn = self.cfg.GetSharedFileStorageDir - else: - get_fsd_fn = self.cfg.GetFileStorageDir - - file_storage_dir = utils.PathJoin(get_fsd_fn(), - string_file_storage_dir, instance) - else: - file_storage_dir = "" - disks = _GenerateDiskTemplate(self, self.op.disk_template, instance, pnode_name, self.secondaries, self.disks, - file_storage_dir, + self.instance_file_storage_dir, self.op.file_driver, 0, feedback_fn) @@ -8266,13 +8877,17 @@ class LUInstanceCreate(LogicalUnit): osparams=self.op.osparams, ) + if self.op.tags: + for tag in self.op.tags: + iobj.AddTag(tag) + if self.adopt_disks: if self.op.disk_template == constants.DT_PLAIN: # rename LVs to the newly-generated names; we need to construct # 'fake' LV disks with the old data, plus the new unique_id tmp_disks = [objects.Disk.FromDict(v.ToDict()) for v in disks] rename_to = [] - for t_dsk, a_dsk in zip (tmp_disks, self.disks): + for t_dsk, a_dsk in zip(tmp_disks, self.disks): rename_to.append(t_dsk.logical_id) t_dsk.logical_id = (t_dsk.logical_id[0], a_dsk[constants.IDISK_ADOPT]) self.cfg.SetDiskID(t_dsk, pnode_name) @@ -8323,7 +8938,6 @@ class LUInstanceCreate(LogicalUnit): disk_abort = not _WaitForSync(self, iobj) elif iobj.disk_template in constants.DTS_INT_MIRROR: # make sure the disks are not degraded (still sync-ing is ok) - time.sleep(15) feedback_fn("* checking mirrors status") disk_abort = not _WaitForSync(self, iobj, oneshot=True) else: @@ -8340,12 +8954,33 @@ class LUInstanceCreate(LogicalUnit): if iobj.disk_template != constants.DT_DISKLESS and not self.adopt_disks: if self.op.mode == constants.INSTANCE_CREATE: if not self.op.no_install: + pause_sync = (iobj.disk_template in constants.DTS_INT_MIRROR and + not self.op.wait_for_sync) + if pause_sync: + feedback_fn("* pausing disk sync to install instance OS") + result = self.rpc.call_blockdev_pause_resume_sync(pnode_name, + iobj.disks, True) + for idx, success in enumerate(result.payload): + if not success: + logging.warn("pause-sync of instance %s for disk %d failed", + instance, idx) + feedback_fn("* running the instance OS create scripts...") # FIXME: pass debug option from opcode to backend - result = self.rpc.call_instance_os_add(pnode_name, iobj, False, - self.op.debug_level) - result.Raise("Could not add os for instance %s" - " on node %s" % (instance, pnode_name)) + os_add_result = \ + self.rpc.call_instance_os_add(pnode_name, iobj, False, + self.op.debug_level) + if pause_sync: + feedback_fn("* resuming disk sync") + result = self.rpc.call_blockdev_pause_resume_sync(pnode_name, + iobj.disks, False) + for idx, success in enumerate(result.payload): + if not success: + logging.warn("resume-sync of instance %s for disk %d failed", + instance, idx) + + os_add_result.Raise("Could not add os for instance %s" + " on node %s" % (instance, pnode_name)) elif self.op.mode == constants.INSTANCE_IMPORT: feedback_fn("* running the instance OS import scripts...") @@ -8413,7 +9048,8 @@ class LUInstanceCreate(LogicalUnit): self.cfg.Update(iobj, feedback_fn) logging.info("Starting instance %s on node %s", instance, pnode_name) feedback_fn("* starting instance...") - result = self.rpc.call_instance_start(pnode_name, iobj, None, None) + result = self.rpc.call_instance_start(pnode_name, iobj, + None, None, False) result.Raise("Could not start instance") return list(iobj.all_nodes) @@ -8549,7 +9185,7 @@ class LUInstanceReplaceDisks(LogicalUnit): # Lock member nodes of all locked groups self.needed_locks[locking.LEVEL_NODE] = [node_name - for group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP) + for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP) for node_name in self.cfg.GetNodeGroup(group_uuid).members] else: self._LockInstancesNodes() @@ -8589,16 +9225,9 @@ class LUInstanceReplaceDisks(LogicalUnit): assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or self.op.iallocator is None) - owned_groups = self.glm.list_owned(locking.LEVEL_NODEGROUP) + owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) if owned_groups: - groups = self.cfg.GetInstanceNodeGroups(self.op.instance_name) - if owned_groups != groups: - raise errors.OpExecError("Node groups used by instance '%s' changed" - " since lock was acquired, current list is %r," - " used to be '%s'" % - (self.op.instance_name, - utils.CommaJoin(groups), - utils.CommaJoin(owned_groups))) + _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups) return LogicalUnit.CheckPrereq(self) @@ -8663,7 +9292,7 @@ class TLReplaceDisks(Tasklet): ial = IAllocator(lu.cfg, lu.rpc, mode=constants.IALLOCATOR_MODE_RELOC, name=instance_name, - relocate_from=relocate_from) + relocate_from=list(relocate_from)) ial.Run(iallocator_name) @@ -8687,6 +9316,9 @@ class TLReplaceDisks(Tasklet): return remote_node_name def _FindFaultyDisks(self, node_name): + """Wrapper for L{_FindFaultyInstanceDisks}. + + """ return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance, node_name, True) @@ -8757,7 +9389,7 @@ class TLReplaceDisks(Tasklet): if remote_node is None: self.remote_node_info = None else: - assert remote_node in self.lu.glm.list_owned(locking.LEVEL_NODE), \ + assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \ "Remote node '%s' is not locked" % remote_node self.remote_node_info = self.cfg.GetNodeInfo(remote_node) @@ -8863,9 +9495,8 @@ class TLReplaceDisks(Tasklet): instance.FindDisk(disk_idx) # Get secondary node IP addresses - self.node_secondary_ip = \ - dict((node_name, self.cfg.GetNodeInfo(node_name).secondary_ip) - for node_name in touched_nodes) + self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node) + in self.cfg.GetMultiNodeInfo(touched_nodes)) def Exec(self, feedback_fn): """Execute disk replacement. @@ -8878,13 +9509,13 @@ class TLReplaceDisks(Tasklet): if __debug__: # Verify owned locks before starting operation - owned_locks = self.lu.glm.list_owned(locking.LEVEL_NODE) - assert set(owned_locks) == set(self.node_secondary_ip), \ + owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE) + assert set(owned_nodes) == set(self.node_secondary_ip), \ ("Incorrect node locks, owning %s, expected %s" % - (owned_locks, self.node_secondary_ip.keys())) + (owned_nodes, self.node_secondary_ip.keys())) - owned_locks = self.lu.glm.list_owned(locking.LEVEL_INSTANCE) - assert list(owned_locks) == [self.instance_name], \ + owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE) + assert list(owned_instances) == [self.instance_name], \ "Instance '%s' not locked" % self.instance_name assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \ @@ -8919,12 +9550,12 @@ class TLReplaceDisks(Tasklet): if __debug__: # Verify owned locks - owned_locks = self.lu.glm.list_owned(locking.LEVEL_NODE) + owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE) nodes = frozenset(self.node_secondary_ip) - assert ((self.early_release and not owned_locks) or - (not self.early_release and not (set(owned_locks) - nodes))), \ + assert ((self.early_release and not owned_nodes) or + (not self.early_release and not (set(owned_nodes) - nodes))), \ ("Not owning the correct locks, early_release=%s, owned=%r," - " nodes=%r" % (self.early_release, owned_locks, nodes)) + " nodes=%r" % (self.early_release, owned_nodes, nodes)) return result @@ -8979,6 +9610,12 @@ class TLReplaceDisks(Tasklet): (node_name, self.instance.name)) def _CreateNewStorage(self, node_name): + """Create new storage on the primary or secondary node. + + This is only used for same-node replaces, not for changing the + secondary node, hence we don't want to modify the existing disk. + + """ iv_names = {} for idx, dev in enumerate(self.instance.disks): @@ -9000,7 +9637,7 @@ class TLReplaceDisks(Tasklet): logical_id=(vg_meta, names[1])) new_lvs = [lv_data, lv_meta] - old_lvs = dev.children + old_lvs = [child.Copy() for child in dev.children] iv_names[dev.iv_name] = (dev, old_lvs, new_lvs) # we pass force_create=True to force the LVM creation @@ -9038,7 +9675,7 @@ class TLReplaceDisks(Tasklet): self.lu.LogWarning("Can't remove old LV: %s" % msg, hint="remove unused LVs manually") - def _ExecDrbd8DiskOnly(self, feedback_fn): + def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613 """Replace a disk on the primary or secondary for DRBD 8. The algorithm for replace is quite complicated: @@ -9121,10 +9758,14 @@ class TLReplaceDisks(Tasklet): rename_new_to_old) result.Raise("Can't rename new LVs on node %s" % self.target_node) + # Intermediate steps of in memory modifications for old, new in zip(old_lvs, new_lvs): new.logical_id = old.logical_id self.cfg.SetDiskID(new, self.target_node) + # We need to modify old_lvs so that removal later removes the + # right LVs, not the newly added ones; note that old_lvs is a + # copy here for disk in old_lvs: disk.logical_id = ren_fn(disk, temp_suffix) self.cfg.SetDiskID(disk, self.target_node) @@ -9144,10 +9785,6 @@ class TLReplaceDisks(Tasklet): "volumes")) raise errors.OpExecError("Can't add local storage to drbd: %s" % msg) - dev.children = new_lvs - - self.cfg.Update(self.instance, feedback_fn) - cstep = 5 if self.early_release: self.lu.LogStep(cstep, steps_total, "Removing old storage") @@ -9195,6 +9832,8 @@ class TLReplaceDisks(Tasklet): """ steps_total = 6 + pnode = self.instance.primary_node + # Step: check device activation self.lu.LogStep(1, steps_total, "Check device existence") self._CheckDisksExistence([self.instance.primary_node]) @@ -9269,10 +9908,8 @@ class TLReplaceDisks(Tasklet): " soon as possible")) self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)") - result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node], - self.node_secondary_ip, - self.instance.disks)\ - [self.instance.primary_node] + result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip, + self.instance.disks)[pnode] msg = result.fail_msg if msg: @@ -9396,48 +10033,223 @@ class LURepairNodeStorage(NoHooksLU): (self.op.name, self.op.node_name)) -class LUNodeEvacStrategy(NoHooksLU): - """Computes the node evacuation strategy. +class LUNodeEvacuate(NoHooksLU): + """Evacuates instances off a list of nodes. + + """ + REQ_BGL = False + + def CheckArguments(self): + _CheckIAllocatorOrNode(self, "iallocator", "remote_node") + + def ExpandNames(self): + self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) + + if self.op.remote_node is not None: + self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node) + assert self.op.remote_node + + if self.op.remote_node == self.op.node_name: + raise errors.OpPrereqError("Can not use evacuated node as a new" + " secondary node", errors.ECODE_INVAL) + + if self.op.mode != constants.IALLOCATOR_NEVAC_SEC: + raise errors.OpPrereqError("Without the use of an iallocator only" + " secondary instances can be evacuated", + errors.ECODE_INVAL) + + # Declare locks + self.share_locks = _ShareAll() + self.needed_locks = { + locking.LEVEL_INSTANCE: [], + locking.LEVEL_NODEGROUP: [], + locking.LEVEL_NODE: [], + } + + if self.op.remote_node is None: + # Iallocator will choose any node(s) in the same group + group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_name]) + else: + group_nodes = frozenset([self.op.remote_node]) + + # Determine nodes to be locked + self.lock_nodes = set([self.op.node_name]) | group_nodes + + def _DetermineInstances(self): + """Builds list of instances to operate on. - """ - REQ_BGL = False + """ + assert self.op.mode in constants.IALLOCATOR_NEVAC_MODES - def CheckArguments(self): - _CheckIAllocatorOrNode(self, "iallocator", "remote_node") + if self.op.mode == constants.IALLOCATOR_NEVAC_PRI: + # Primary instances only + inst_fn = _GetNodePrimaryInstances + assert self.op.remote_node is None, \ + "Evacuating primary instances requires iallocator" + elif self.op.mode == constants.IALLOCATOR_NEVAC_SEC: + # Secondary instances only + inst_fn = _GetNodeSecondaryInstances + else: + # All instances + assert self.op.mode == constants.IALLOCATOR_NEVAC_ALL + inst_fn = _GetNodeInstances - def ExpandNames(self): - self.op.nodes = _GetWantedNodes(self, self.op.nodes) - self.needed_locks = locks = {} - if self.op.remote_node is None: - locks[locking.LEVEL_NODE] = locking.ALL_SET + return inst_fn(self.cfg, self.op.node_name) + + def DeclareLocks(self, level): + if level == locking.LEVEL_INSTANCE: + # Lock instances optimistically, needs verification once node and group + # locks have been acquired + self.needed_locks[locking.LEVEL_INSTANCE] = \ + set(i.name for i in self._DetermineInstances()) + + elif level == locking.LEVEL_NODEGROUP: + # Lock node groups optimistically, needs verification once nodes have + # been acquired + self.needed_locks[locking.LEVEL_NODEGROUP] = \ + self.cfg.GetNodeGroupsFromNodes(self.lock_nodes) + + elif level == locking.LEVEL_NODE: + self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes + + def CheckPrereq(self): + # Verify locks + owned_instances = self.owned_locks(locking.LEVEL_INSTANCE) + owned_nodes = self.owned_locks(locking.LEVEL_NODE) + owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) + + assert owned_nodes == self.lock_nodes + + wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes) + if owned_groups != wanted_groups: + raise errors.OpExecError("Node groups changed since locks were acquired," + " current groups are '%s', used to be '%s'" % + (utils.CommaJoin(wanted_groups), + utils.CommaJoin(owned_groups))) + + # Determine affected instances + self.instances = self._DetermineInstances() + self.instance_names = [i.name for i in self.instances] + + if set(self.instance_names) != owned_instances: + raise errors.OpExecError("Instances on node '%s' changed since locks" + " were acquired, current instances are '%s'," + " used to be '%s'" % + (self.op.node_name, + utils.CommaJoin(self.instance_names), + utils.CommaJoin(owned_instances))) + + if self.instance_names: + self.LogInfo("Evacuating instances from node '%s': %s", + self.op.node_name, + utils.CommaJoin(utils.NiceSort(self.instance_names))) else: - self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node) - locks[locking.LEVEL_NODE] = self.op.nodes + [self.op.remote_node] + self.LogInfo("No instances to evacuate from node '%s'", + self.op.node_name) - def Exec(self, feedback_fn): if self.op.remote_node is not None: - instances = [] - for node in self.op.nodes: - instances.extend(_GetNodeSecondaryInstances(self.cfg, node)) - result = [] - for i in instances: + for i in self.instances: if i.primary_node == self.op.remote_node: raise errors.OpPrereqError("Node %s is the primary node of" " instance %s, cannot use it as" " secondary" % (self.op.remote_node, i.name), errors.ECODE_INVAL) - result.append([i.name, self.op.remote_node]) - else: - ial = IAllocator(self.cfg, self.rpc, - mode=constants.IALLOCATOR_MODE_MEVAC, - evac_nodes=self.op.nodes) - ial.Run(self.op.iallocator, validate=True) + + def Exec(self, feedback_fn): + assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None) + + if not self.instance_names: + # No instances to evacuate + jobs = [] + + elif self.op.iallocator is not None: + # TODO: Implement relocation to other group + ial = IAllocator(self.cfg, self.rpc, constants.IALLOCATOR_MODE_NODE_EVAC, + evac_mode=self.op.mode, + instances=list(self.instance_names)) + + ial.Run(self.op.iallocator) + if not ial.success: - raise errors.OpExecError("No valid evacuation solution: %s" % ial.info, - errors.ECODE_NORES) - result = ial.result - return result + raise errors.OpPrereqError("Can't compute node evacuation using" + " iallocator '%s': %s" % + (self.op.iallocator, ial.info), + errors.ECODE_NORES) + + jobs = _LoadNodeEvacResult(self, ial.result, self.op.early_release, True) + + elif self.op.remote_node is not None: + assert self.op.mode == constants.IALLOCATOR_NEVAC_SEC + jobs = [ + [opcodes.OpInstanceReplaceDisks(instance_name=instance_name, + remote_node=self.op.remote_node, + disks=[], + mode=constants.REPLACE_DISK_CHG, + early_release=self.op.early_release)] + for instance_name in self.instance_names + ] + + else: + raise errors.ProgrammerError("No iallocator or remote node") + + return ResultWithJobs(jobs) + + +def _SetOpEarlyRelease(early_release, op): + """Sets C{early_release} flag on opcodes if available. + + """ + try: + op.early_release = early_release + except AttributeError: + assert not isinstance(op, opcodes.OpInstanceReplaceDisks) + + return op + + +def _NodeEvacDest(use_nodes, group, nodes): + """Returns group or nodes depending on caller's choice. + + """ + if use_nodes: + return utils.CommaJoin(nodes) + else: + return group + + +def _LoadNodeEvacResult(lu, alloc_result, early_release, use_nodes): + """Unpacks the result of change-group and node-evacuate iallocator requests. + + Iallocator modes L{constants.IALLOCATOR_MODE_NODE_EVAC} and + L{constants.IALLOCATOR_MODE_CHG_GROUP}. + + @type lu: L{LogicalUnit} + @param lu: Logical unit instance + @type alloc_result: tuple/list + @param alloc_result: Result from iallocator + @type early_release: bool + @param early_release: Whether to release locks early if possible + @type use_nodes: bool + @param use_nodes: Whether to display node names instead of groups + + """ + (moved, failed, jobs) = alloc_result + + if failed: + lu.LogWarning("Unable to evacuate instances %s", + utils.CommaJoin("%s (%s)" % (name, reason) + for (name, reason) in failed)) + + if moved: + lu.LogInfo("Instances to be moved: %s", + utils.CommaJoin("%s (to %s)" % + (name, _NodeEvacDest(use_nodes, group, nodes)) + for (name, group, nodes) in moved)) + + return [map(compat.partial(_SetOpEarlyRelease, early_release), + map(opcodes.OpCode.LoadOpCode, ops)) + for ops in jobs] class LUInstanceGrowDisk(LogicalUnit): @@ -9516,9 +10328,17 @@ class LUInstanceGrowDisk(LogicalUnit): if not disks_ok: raise errors.OpExecError("Cannot activate block device to grow") + # First run all grow ops in dry-run mode + for node in instance.all_nodes: + self.cfg.SetDiskID(disk, node) + result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, True) + result.Raise("Grow request failed to node %s" % node) + + # We know that (as far as we can test) operations across different + # nodes will succeed, time to run it for real for node in instance.all_nodes: self.cfg.SetDiskID(disk, node) - result = self.rpc.call_blockdev_grow(node, disk, self.op.amount) + result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, False) result.Raise("Grow request failed to node %s" % node) # TODO: Rewrite code to work properly @@ -9565,7 +10385,7 @@ class LUInstanceQueryData(NoHooksLU): self.wanted_names = None if self.op.use_locking: - self.share_locks = dict.fromkeys(locking.LEVELS, 1) + self.share_locks = _ShareAll() if self.wanted_names is None: self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET @@ -9573,7 +10393,6 @@ class LUInstanceQueryData(NoHooksLU): self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names self.needed_locks[locking.LEVEL_NODE] = [] - self.share_locks = dict.fromkeys(locking.LEVELS, 1) self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE def DeclareLocks(self, level): @@ -9588,10 +10407,10 @@ class LUInstanceQueryData(NoHooksLU): """ if self.wanted_names is None: assert self.op.use_locking, "Locking was not used" - self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE) + self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE) - self.wanted_instances = [self.cfg.GetInstanceInfo(name) - for name in self.wanted_names] + self.wanted_instances = \ + map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names)) def _ComputeBlockdevStatus(self, node, instance_name, dev): """Returns the status of a block device @@ -9632,8 +10451,9 @@ class LUInstanceQueryData(NoHooksLU): dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev) if dev.children: - dev_children = [self._ComputeDiskStatus(instance, snode, child) - for child in dev.children] + dev_children = map(compat.partial(self._ComputeDiskStatus, + instance, snode), + dev.children) else: dev_children = [] @@ -9655,8 +10475,16 @@ class LUInstanceQueryData(NoHooksLU): cluster = self.cfg.GetClusterInfo() - for instance in self.wanted_instances: - if not self.op.static: + pri_nodes = self.cfg.GetMultiNodeInfo(i.primary_node + for i in self.wanted_instances) + for instance, (_, pnode) in zip(self.wanted_instances, pri_nodes): + if self.op.static or pnode.offline: + remote_state = None + if pnode.offline: + self.LogWarning("Primary node %s is marked offline, returning static" + " information only for instance %s" % + (pnode.name, instance.name)) + else: remote_info = self.rpc.call_instance_info(instance.primary_node, instance.name, instance.hypervisor) @@ -9666,15 +10494,14 @@ class LUInstanceQueryData(NoHooksLU): remote_state = "up" else: remote_state = "down" - else: - remote_state = None + if instance.admin_up: config_state = "up" else: config_state = "down" - disks = [self._ComputeDiskStatus(instance, None, device) - for device in instance.disks] + disks = map(compat.partial(self._ComputeDiskStatus, instance, None), + instance.disks) result[instance.name] = { "name": instance.name, @@ -9799,13 +10626,13 @@ class LUInstanceSetParams(LogicalUnit): raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip, errors.ECODE_INVAL) - nic_bridge = nic_dict.get('bridge', None) + nic_bridge = nic_dict.get("bridge", None) nic_link = nic_dict.get(constants.INIC_LINK, None) if nic_bridge and nic_link: raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'" " at the same time", errors.ECODE_INVAL) elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE: - nic_dict['bridge'] = None + nic_dict["bridge"] = None elif nic_link and nic_link.lower() == constants.VALUE_NONE: nic_dict[constants.INIC_LINK] = None @@ -9848,13 +10675,13 @@ class LUInstanceSetParams(LogicalUnit): """ args = dict() if constants.BE_MEMORY in self.be_new: - args['memory'] = self.be_new[constants.BE_MEMORY] + args["memory"] = self.be_new[constants.BE_MEMORY] if constants.BE_VCPUS in self.be_new: - args['vcpus'] = self.be_new[constants.BE_VCPUS] + args["vcpus"] = self.be_new[constants.BE_VCPUS] # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk # information at all. if self.op.nics: - args['nics'] = [] + args["nics"] = [] nic_override = dict(self.op.nics) for idx, nic in enumerate(self.instance.nics): if idx in nic_override: @@ -9875,16 +10702,16 @@ class LUInstanceSetParams(LogicalUnit): nicparams = self.cluster.SimpleFillNIC(nic.nicparams) mode = nicparams[constants.NIC_MODE] link = nicparams[constants.NIC_LINK] - args['nics'].append((ip, mac, mode, link)) + args["nics"].append((ip, mac, mode, link)) if constants.DDM_ADD in nic_override: ip = nic_override[constants.DDM_ADD].get(constants.INIC_IP, None) mac = nic_override[constants.DDM_ADD][constants.INIC_MAC] nicparams = self.nic_pnew[constants.DDM_ADD] mode = nicparams[constants.NIC_MODE] link = nicparams[constants.NIC_LINK] - args['nics'].append((ip, mac, mode, link)) + args["nics"].append((ip, mac, mode, link)) elif constants.DDM_REMOVE in nic_override: - del args['nics'][-1] + del args["nics"][-1] env = _BuildInstanceHookEnvByObject(self, self.instance, override=args) if self.op.disk_template: @@ -9974,6 +10801,7 @@ class LUInstanceSetParams(LogicalUnit): self.be_inst = i_bedict # the new dict (without defaults) else: self.be_new = self.be_inst = {} + be_old = cluster.FillBE(instance) # osparams processing if self.op.osparams: @@ -9985,7 +10813,8 @@ class LUInstanceSetParams(LogicalUnit): self.warn = [] - if constants.BE_MEMORY in self.op.beparams and not self.op.force: + if (constants.BE_MEMORY in self.op.beparams and not self.op.force and + be_new[constants.BE_MEMORY] > be_old[constants.BE_MEMORY]): mem_check_list = [pnode] if be_new[constants.BE_AUTO_BALANCE]: # either we changed auto_balance to yes or it was from before @@ -9999,8 +10828,8 @@ class LUInstanceSetParams(LogicalUnit): if msg: # Assume the primary node is unreachable and go ahead self.warn.append("Can't get info from primary node %s: %s" % - (pnode, msg)) - elif not isinstance(pninfo.payload.get('memory_free', None), int): + (pnode, msg)) + elif not isinstance(pninfo.payload.get("memory_free", None), int): self.warn.append("Node data from primary node %s doesn't contain" " free memory information" % pnode) elif instance_info.fail_msg: @@ -10008,14 +10837,14 @@ class LUInstanceSetParams(LogicalUnit): instance_info.fail_msg) else: if instance_info.payload: - current_mem = int(instance_info.payload['memory']) + current_mem = int(instance_info.payload["memory"]) else: # Assume instance not running # (there is a slight race condition here, but it's not very probable, # and we have no other way to check) current_mem = 0 miss_mem = (be_new[constants.BE_MEMORY] - current_mem - - pninfo.payload['memory_free']) + pninfo.payload["memory_free"]) if miss_mem > 0: raise errors.OpPrereqError("This change will prevent the instance" " from starting, due to %d MB of memory" @@ -10026,16 +10855,17 @@ class LUInstanceSetParams(LogicalUnit): for node, nres in nodeinfo.items(): if node not in instance.secondary_nodes: continue - msg = nres.fail_msg - if msg: - self.warn.append("Can't get info from secondary node %s: %s" % - (node, msg)) - elif not isinstance(nres.payload.get('memory_free', None), int): - self.warn.append("Secondary node %s didn't return free" - " memory information" % node) - elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']: - self.warn.append("Not enough memory to failover instance to" - " secondary node %s" % node) + nres.Raise("Can't get info from secondary node %s" % node, + prereq=True, ecode=errors.ECODE_STATE) + if not isinstance(nres.payload.get("memory_free", None), int): + raise errors.OpPrereqError("Secondary node %s didn't return free" + " memory information" % node, + errors.ECODE_STATE) + elif be_new[constants.BE_MEMORY] > nres.payload["memory_free"]: + raise errors.OpPrereqError("This change will prevent the instance" + " from failover to its secondary node" + " %s, due to not enough memory" % node, + errors.ECODE_STATE) # NIC processing self.nic_pnew = {} @@ -10067,8 +10897,8 @@ class LUInstanceSetParams(LogicalUnit): for key in constants.NICS_PARAMETERS if key in nic_dict]) - if 'bridge' in nic_dict: - update_params_dict[constants.NIC_LINK] = nic_dict['bridge'] + if "bridge" in nic_dict: + update_params_dict[constants.NIC_LINK] = nic_dict["bridge"] new_nic_params = _GetUpdatedParams(old_nic_params, update_params_dict) @@ -10094,12 +10924,12 @@ class LUInstanceSetParams(LogicalUnit): else: nic_ip = old_nic_ip if nic_ip is None: - raise errors.OpPrereqError('Cannot set the nic ip to None' - ' on a routed nic', errors.ECODE_INVAL) + raise errors.OpPrereqError("Cannot set the nic ip to None" + " on a routed nic", errors.ECODE_INVAL) if constants.INIC_MAC in nic_dict: nic_mac = nic_dict[constants.INIC_MAC] if nic_mac is None: - raise errors.OpPrereqError('Cannot set the nic mac to None', + raise errors.OpPrereqError("Cannot set the nic mac to None", errors.ECODE_INVAL) elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE): # otherwise generate the mac @@ -10187,7 +11017,8 @@ class LUInstanceSetParams(LogicalUnit): self.cfg.Update(instance, feedback_fn) # disks are created, waiting for sync - disk_abort = not _WaitForSync(self, instance) + disk_abort = not _WaitForSync(self, instance, + oneshot=not self.op.wait_for_sync) if disk_abort: raise errors.OpExecError("There are some degraded disks for" " this instance, please cleanup manually") @@ -10372,6 +11203,147 @@ class LUInstanceSetParams(LogicalUnit): } +class LUInstanceChangeGroup(LogicalUnit): + HPATH = "instance-change-group" + HTYPE = constants.HTYPE_INSTANCE + REQ_BGL = False + + def ExpandNames(self): + self.share_locks = _ShareAll() + self.needed_locks = { + locking.LEVEL_NODEGROUP: [], + locking.LEVEL_NODE: [], + } + + self._ExpandAndLockInstance() + + if self.op.target_groups: + self.req_target_uuids = map(self.cfg.LookupNodeGroup, + self.op.target_groups) + else: + self.req_target_uuids = None + + self.op.iallocator = _GetDefaultIAllocator(self.cfg, self.op.iallocator) + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODEGROUP: + assert not self.needed_locks[locking.LEVEL_NODEGROUP] + + if self.req_target_uuids: + lock_groups = set(self.req_target_uuids) + + # Lock all groups used by instance optimistically; this requires going + # via the node before it's locked, requiring verification later on + instance_groups = self.cfg.GetInstanceNodeGroups(self.op.instance_name) + lock_groups.update(instance_groups) + else: + # No target groups, need to lock all of them + lock_groups = locking.ALL_SET + + self.needed_locks[locking.LEVEL_NODEGROUP] = lock_groups + + elif level == locking.LEVEL_NODE: + if self.req_target_uuids: + # Lock all nodes used by instances + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + self._LockInstancesNodes() + + # Lock all nodes in all potential target groups + lock_groups = (frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) - + self.cfg.GetInstanceNodeGroups(self.op.instance_name)) + member_nodes = [node_name + for group in lock_groups + for node_name in self.cfg.GetNodeGroup(group).members] + self.needed_locks[locking.LEVEL_NODE].extend(member_nodes) + else: + # Lock all nodes as all groups are potential targets + self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + + def CheckPrereq(self): + owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE)) + owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) + owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE)) + + assert (self.req_target_uuids is None or + owned_groups.issuperset(self.req_target_uuids)) + assert owned_instances == set([self.op.instance_name]) + + # Get instance information + self.instance = self.cfg.GetInstanceInfo(self.op.instance_name) + + # Check if node groups for locked instance are still correct + assert owned_nodes.issuperset(self.instance.all_nodes), \ + ("Instance %s's nodes changed while we kept the lock" % + self.op.instance_name) + + inst_groups = _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, + owned_groups) + + if self.req_target_uuids: + # User requested specific target groups + self.target_uuids = self.req_target_uuids + else: + # All groups except those used by the instance are potential targets + self.target_uuids = owned_groups - inst_groups + + conflicting_groups = self.target_uuids & inst_groups + if conflicting_groups: + raise errors.OpPrereqError("Can't use group(s) '%s' as targets, they are" + " used by the instance '%s'" % + (utils.CommaJoin(conflicting_groups), + self.op.instance_name), + errors.ECODE_INVAL) + + if not self.target_uuids: + raise errors.OpPrereqError("There are no possible target groups", + errors.ECODE_INVAL) + + def BuildHooksEnv(self): + """Build hooks env. + + """ + assert self.target_uuids + + env = { + "TARGET_GROUPS": " ".join(self.target_uuids), + } + + env.update(_BuildInstanceHookEnvByObject(self, self.instance)) + + return env + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ + mn = self.cfg.GetMasterNode() + return ([mn], [mn]) + + def Exec(self, feedback_fn): + instances = list(self.owned_locks(locking.LEVEL_INSTANCE)) + + assert instances == [self.op.instance_name], "Instance not locked" + + ial = IAllocator(self.cfg, self.rpc, constants.IALLOCATOR_MODE_CHG_GROUP, + instances=instances, target_groups=list(self.target_uuids)) + + ial.Run(self.op.iallocator) + + if not ial.success: + raise errors.OpPrereqError("Can't compute solution for changing group of" + " instance '%s' using iallocator '%s': %s" % + (self.op.instance_name, self.op.iallocator, + ial.info), + errors.ECODE_NORES) + + jobs = _LoadNodeEvacResult(self, ial.result, self.op.early_release, False) + + self.LogInfo("Iallocator returned %s job(s) for changing group of" + " instance '%s'", len(jobs), self.op.instance_name) + + return ResultWithJobs(jobs) + + class LUBackupQuery(NoHooksLU): """Query the exports list @@ -10396,7 +11368,7 @@ class LUBackupQuery(NoHooksLU): that node. """ - self.nodes = self.glm.list_owned(locking.LEVEL_NODE) + self.nodes = self.owned_locks(locking.LEVEL_NODE) rpcresult = self.rpc.call_export_list(self.nodes) result = {} for node in rpcresult: @@ -10692,7 +11664,8 @@ class LUBackupExport(LogicalUnit): not self.op.remove_instance): assert not activate_disks feedback_fn("Starting instance %s" % instance.name) - result = self.rpc.call_instance_start(src_node, instance, None, None) + result = self.rpc.call_instance_start(src_node, instance, + None, None, False) msg = result.fail_msg if msg: feedback_fn("Failed to start instance: %s" % msg) @@ -10778,7 +11751,7 @@ class LUBackupRemove(NoHooksLU): fqdn_warn = True instance_name = self.op.instance_name - locked_nodes = self.glm.list_owned(locking.LEVEL_NODE) + locked_nodes = self.owned_locks(locking.LEVEL_NODE) exportlist = self.rpc.call_export_list(locked_nodes) found = False for node in exportlist: @@ -10877,20 +11850,40 @@ class LUGroupAssignNodes(NoHooksLU): # We want to lock all the affected nodes and groups. We have readily # available the list of nodes, and the *destination* group. To gather the - # list of "source" groups, we need to fetch node information. - self.node_data = self.cfg.GetAllNodesInfo() - affected_groups = set(self.node_data[node].group for node in self.op.nodes) - affected_groups.add(self.group_uuid) - + # list of "source" groups, we need to fetch node information later on. self.needed_locks = { - locking.LEVEL_NODEGROUP: list(affected_groups), + locking.LEVEL_NODEGROUP: set([self.group_uuid]), locking.LEVEL_NODE: self.op.nodes, } + def DeclareLocks(self, level): + if level == locking.LEVEL_NODEGROUP: + assert len(self.needed_locks[locking.LEVEL_NODEGROUP]) == 1 + + # Try to get all affected nodes' groups without having the group or node + # lock yet. Needs verification later in the code flow. + groups = self.cfg.GetNodeGroupsFromNodes(self.op.nodes) + + self.needed_locks[locking.LEVEL_NODEGROUP].update(groups) + def CheckPrereq(self): """Check prerequisites. """ + assert self.needed_locks[locking.LEVEL_NODEGROUP] + assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) == + frozenset(self.op.nodes)) + + expected_locks = (set([self.group_uuid]) | + self.cfg.GetNodeGroupsFromNodes(self.op.nodes)) + actual_locks = self.owned_locks(locking.LEVEL_NODEGROUP) + if actual_locks != expected_locks: + raise errors.OpExecError("Nodes changed groups since locks were acquired," + " current groups are '%s', used to be '%s'" % + (utils.CommaJoin(expected_locks), + utils.CommaJoin(actual_locks))) + + self.node_data = self.cfg.GetAllNodesInfo() self.group = self.cfg.GetNodeGroup(self.group_uuid) instance_data = self.cfg.GetAllInstancesInfo() @@ -10926,6 +11919,9 @@ class LUGroupAssignNodes(NoHooksLU): for node in self.op.nodes: self.node_data[node].group = self.group_uuid + # FIXME: Depends on side-effects of modifying the result of + # C{cfg.GetAllNodesInfo} + self.cfg.Update(self.group, feedback_fn) # Saves all modified nodes. @staticmethod @@ -11068,6 +12064,9 @@ class LUGroupQuery(NoHooksLU): def ExpandNames(self): self.gq.ExpandNames(self) + def DeclareLocks(self, level): + self.gq.DeclareLocks(self, level) + def Exec(self, feedback_fn): return self.gq.OldStyleQuery(self) @@ -11146,7 +12145,6 @@ class LUGroupSetParams(LogicalUnit): return result - class LUGroupRemove(LogicalUnit): HPATH = "group-remove" HTYPE = constants.HTYPE_GROUP @@ -11283,7 +12281,163 @@ class LUGroupRename(LogicalUnit): return self.op.new_name -class TagsLU(NoHooksLU): # pylint: disable-msg=W0223 +class LUGroupEvacuate(LogicalUnit): + HPATH = "group-evacuate" + HTYPE = constants.HTYPE_GROUP + REQ_BGL = False + + def ExpandNames(self): + # This raises errors.OpPrereqError on its own: + self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) + + if self.op.target_groups: + self.req_target_uuids = map(self.cfg.LookupNodeGroup, + self.op.target_groups) + else: + self.req_target_uuids = [] + + if self.group_uuid in self.req_target_uuids: + raise errors.OpPrereqError("Group to be evacuated (%s) can not be used" + " as a target group (targets are %s)" % + (self.group_uuid, + utils.CommaJoin(self.req_target_uuids)), + errors.ECODE_INVAL) + + self.op.iallocator = _GetDefaultIAllocator(self.cfg, self.op.iallocator) + + self.share_locks = _ShareAll() + self.needed_locks = { + locking.LEVEL_INSTANCE: [], + locking.LEVEL_NODEGROUP: [], + locking.LEVEL_NODE: [], + } + + def DeclareLocks(self, level): + if level == locking.LEVEL_INSTANCE: + assert not self.needed_locks[locking.LEVEL_INSTANCE] + + # Lock instances optimistically, needs verification once node and group + # locks have been acquired + self.needed_locks[locking.LEVEL_INSTANCE] = \ + self.cfg.GetNodeGroupInstances(self.group_uuid) + + elif level == locking.LEVEL_NODEGROUP: + assert not self.needed_locks[locking.LEVEL_NODEGROUP] + + if self.req_target_uuids: + lock_groups = set([self.group_uuid] + self.req_target_uuids) + + # Lock all groups used by instances optimistically; this requires going + # via the node before it's locked, requiring verification later on + lock_groups.update(group_uuid + for instance_name in + self.owned_locks(locking.LEVEL_INSTANCE) + for group_uuid in + self.cfg.GetInstanceNodeGroups(instance_name)) + else: + # No target groups, need to lock all of them + lock_groups = locking.ALL_SET + + self.needed_locks[locking.LEVEL_NODEGROUP] = lock_groups + + elif level == locking.LEVEL_NODE: + # This will only lock the nodes in the group to be evacuated which + # contain actual instances + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + self._LockInstancesNodes() + + # Lock all nodes in group to be evacuated and target groups + owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) + assert self.group_uuid in owned_groups + member_nodes = [node_name + for group in owned_groups + for node_name in self.cfg.GetNodeGroup(group).members] + self.needed_locks[locking.LEVEL_NODE].extend(member_nodes) + + def CheckPrereq(self): + owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE)) + owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP)) + owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE)) + + assert owned_groups.issuperset(self.req_target_uuids) + assert self.group_uuid in owned_groups + + # Check if locked instances are still correct + _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances) + + # Get instance information + self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances)) + + # Check if node groups for locked instances are still correct + for instance_name in owned_instances: + inst = self.instances[instance_name] + assert owned_nodes.issuperset(inst.all_nodes), \ + "Instance %s's nodes changed while we kept the lock" % instance_name + + inst_groups = _CheckInstanceNodeGroups(self.cfg, instance_name, + owned_groups) + + assert self.group_uuid in inst_groups, \ + "Instance %s has no node in group %s" % (instance_name, self.group_uuid) + + if self.req_target_uuids: + # User requested specific target groups + self.target_uuids = self.req_target_uuids + else: + # All groups except the one to be evacuated are potential targets + self.target_uuids = [group_uuid for group_uuid in owned_groups + if group_uuid != self.group_uuid] + + if not self.target_uuids: + raise errors.OpPrereqError("There are no possible target groups", + errors.ECODE_INVAL) + + def BuildHooksEnv(self): + """Build hooks env. + + """ + return { + "GROUP_NAME": self.op.group_name, + "TARGET_GROUPS": " ".join(self.target_uuids), + } + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ + mn = self.cfg.GetMasterNode() + + assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP) + + run_nodes = [mn] + self.cfg.GetNodeGroup(self.group_uuid).members + + return (run_nodes, run_nodes) + + def Exec(self, feedback_fn): + instances = list(self.owned_locks(locking.LEVEL_INSTANCE)) + + assert self.group_uuid not in self.target_uuids + + ial = IAllocator(self.cfg, self.rpc, constants.IALLOCATOR_MODE_CHG_GROUP, + instances=instances, target_groups=self.target_uuids) + + ial.Run(self.op.iallocator) + + if not ial.success: + raise errors.OpPrereqError("Can't compute group evacuation using" + " iallocator '%s': %s" % + (self.op.iallocator, ial.info), + errors.ECODE_NORES) + + jobs = _LoadNodeEvacResult(self, ial.result, self.op.early_release, False) + + self.LogInfo("Iallocator returned %s job(s) for evacuating node group %s", + len(jobs), self.op.group_name) + + return ResultWithJobs(jobs) + + +class TagsLU(NoHooksLU): # pylint: disable=W0223 """Generic tags LU. This is an abstract class which is the parent of all the other tags LUs. @@ -11331,7 +12485,7 @@ class LUTagsGet(TagsLU): TagsLU.ExpandNames(self) # Share locks as this is only a read operation - self.share_locks = dict.fromkeys(locking.LEVELS, 1) + self.share_locks = _ShareAll() def Exec(self, feedback_fn): """Returns the tag list. @@ -11543,7 +12697,7 @@ class LUTestJqueue(NoHooksLU): # Wait for client to close try: try: - # pylint: disable-msg=E1101 + # pylint: disable=E1101 # Instance of '_socketobject' has no ... member conn.settimeout(cls._CLIENT_CONFIRM_TIMEOUT) conn.recv(1) @@ -11640,18 +12794,8 @@ class IAllocator(object): easy usage """ - # pylint: disable-msg=R0902 + # pylint: disable=R0902 # lots of instance attributes - _ALLO_KEYS = [ - "name", "mem_size", "disks", "disk_template", - "os", "tags", "nics", "vcpus", "hypervisor", - ] - _RELO_KEYS = [ - "name", "relocate_from", - ] - _EVAC_KEYS = [ - "evac_nodes", - ] def __init__(self, cfg, rpc, mode, **kwargs): self.cfg = cfg @@ -11660,28 +12804,27 @@ class IAllocator(object): self.in_text = self.out_text = self.in_data = self.out_data = None # init all input fields so that pylint is happy self.mode = mode - self.mem_size = self.disks = self.disk_template = None + self.memory = self.disks = self.disk_template = None self.os = self.tags = self.nics = self.vcpus = None self.hypervisor = None self.relocate_from = None self.name = None - self.evac_nodes = None + self.instances = None + self.evac_mode = None + self.target_groups = [] # computed fields self.required_nodes = None # init result fields self.success = self.info = self.result = None - if self.mode == constants.IALLOCATOR_MODE_ALLOC: - keyset = self._ALLO_KEYS - fn = self._AddNewInstance - elif self.mode == constants.IALLOCATOR_MODE_RELOC: - keyset = self._RELO_KEYS - fn = self._AddRelocateInstance - elif self.mode == constants.IALLOCATOR_MODE_MEVAC: - keyset = self._EVAC_KEYS - fn = self._AddEvacuateNodes - else: + + try: + (fn, keydata, self._result_check) = self._MODE_DATA[self.mode] + except KeyError: raise errors.ProgrammerError("Unknown mode '%s' passed to the" " IAllocator" % self.mode) + + keyset = [n for (n, _) in keydata] + for key in kwargs: if key not in keyset: raise errors.ProgrammerError("Invalid input parameter '%s' to" @@ -11692,7 +12835,7 @@ class IAllocator(object): if key not in kwargs: raise errors.ProgrammerError("Missing input parameter '%s' to" " IAllocator" % key) - self._BuildInputData(fn) + self._BuildInputData(compat.partial(fn, self), keydata) def _ComputeClusterData(self): """Compute the generic allocator input data. @@ -11721,7 +12864,7 @@ class IAllocator(object): hypervisor_name = self.hypervisor elif self.mode == constants.IALLOCATOR_MODE_RELOC: hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor - elif self.mode == constants.IALLOCATOR_MODE_MEVAC: + else: hypervisor_name = cluster_info.enabled_hypervisors[0] node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(), @@ -11747,12 +12890,12 @@ class IAllocator(object): """Compute node groups data. """ - ng = {} - for guuid, gdata in cfg.GetAllNodeGroupsInfo().items(): - ng[guuid] = { - "name": gdata.name, - "alloc_policy": gdata.alloc_policy, - } + ng = dict((guuid, { + "name": gdata.name, + "alloc_policy": gdata.alloc_policy, + }) + for guuid, gdata in cfg.GetAllNodeGroupsInfo().items()) + return ng @staticmethod @@ -11763,22 +12906,19 @@ class IAllocator(object): @returns: a dict of name: (node dict, node config) """ - node_results = {} - for ninfo in node_cfg.values(): - # fill in static (config-based) values - pnr = { - "tags": list(ninfo.GetTags()), - "primary_ip": ninfo.primary_ip, - "secondary_ip": ninfo.secondary_ip, - "offline": ninfo.offline, - "drained": ninfo.drained, - "master_candidate": ninfo.master_candidate, - "group": ninfo.group, - "master_capable": ninfo.master_capable, - "vm_capable": ninfo.vm_capable, - } - - node_results[ninfo.name] = pnr + # fill in static (config-based) values + node_results = dict((ninfo.name, { + "tags": list(ninfo.GetTags()), + "primary_ip": ninfo.primary_ip, + "secondary_ip": ninfo.secondary_ip, + "offline": ninfo.offline, + "drained": ninfo.drained, + "master_candidate": ninfo.master_candidate, + "group": ninfo.group, + "master_capable": ninfo.master_capable, + "vm_capable": ninfo.vm_capable, + }) + for ninfo in node_cfg.values()) return node_results @@ -11802,8 +12942,8 @@ class IAllocator(object): nname) remote_info = nresult.payload - for attr in ['memory_total', 'memory_free', 'memory_dom0', - 'vg_size', 'vg_free', 'cpu_total']: + for attr in ["memory_total", "memory_free", "memory_dom0", + "vg_size", "vg_free", "cpu_total"]: if attr not in remote_info: raise errors.OpExecError("Node '%s' didn't return attribute" " '%s'" % (nname, attr)) @@ -11819,21 +12959,21 @@ class IAllocator(object): if iinfo.name not in node_iinfo[nname].payload: i_used_mem = 0 else: - i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory']) + i_used_mem = int(node_iinfo[nname].payload[iinfo.name]["memory"]) i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem - remote_info['memory_free'] -= max(0, i_mem_diff) + remote_info["memory_free"] -= max(0, i_mem_diff) if iinfo.admin_up: i_p_up_mem += beinfo[constants.BE_MEMORY] # compute memory used by instances pnr_dyn = { - "total_memory": remote_info['memory_total'], - "reserved_memory": remote_info['memory_dom0'], - "free_memory": remote_info['memory_free'], - "total_disk": remote_info['vg_size'], - "free_disk": remote_info['vg_free'], - "total_cpus": remote_info['cpu_total'], + "total_memory": remote_info["memory_total"], + "reserved_memory": remote_info["memory_dom0"], + "free_memory": remote_info["memory_free"], + "total_disk": remote_info["vg_size"], + "free_disk": remote_info["vg_free"], + "total_cpus": remote_info["cpu_total"], "i_pri_memory": i_p_mem, "i_pri_up_memory": i_p_up_mem, } @@ -11852,11 +12992,12 @@ class IAllocator(object): nic_data = [] for nic in iinfo.nics: filled_params = cluster_info.SimpleFillNIC(nic.nicparams) - nic_dict = {"mac": nic.mac, - "ip": nic.ip, - "mode": filled_params[constants.NIC_MODE], - "link": filled_params[constants.NIC_LINK], - } + nic_dict = { + "mac": nic.mac, + "ip": nic.ip, + "mode": filled_params[constants.NIC_MODE], + "link": filled_params[constants.NIC_LINK], + } if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED: nic_dict["bridge"] = filled_params[constants.NIC_LINK] nic_data.append(nic_dict) @@ -11896,18 +13037,21 @@ class IAllocator(object): self.required_nodes = 2 else: self.required_nodes = 1 + request = { "name": self.name, "disk_template": self.disk_template, "tags": self.tags, "os": self.os, "vcpus": self.vcpus, - "memory": self.mem_size, + "memory": self.memory, "disks": self.disks, "disk_space_total": disk_space, "nics": self.nics, "required_nodes": self.required_nodes, + "hypervisor": self.hypervisor, } + return request def _AddRelocateInstance(self): @@ -11946,16 +13090,25 @@ class IAllocator(object): } return request - def _AddEvacuateNodes(self): - """Add evacuate nodes data to allocator structure. + def _AddNodeEvacuate(self): + """Get data for node-evacuate requests. """ - request = { - "evac_nodes": self.evac_nodes + return { + "instances": self.instances, + "evac_mode": self.evac_mode, + } + + def _AddChangeGroup(self): + """Get data for node-evacuate requests. + + """ + return { + "instances": self.instances, + "target_groups": self.target_groups, } - return request - def _BuildInputData(self, fn): + def _BuildInputData(self, fn, keydata): """Build input data structures. """ @@ -11963,10 +13116,72 @@ class IAllocator(object): request = fn() request["type"] = self.mode + for keyname, keytype in keydata: + if keyname not in request: + raise errors.ProgrammerError("Request parameter %s is missing" % + keyname) + val = request[keyname] + if not keytype(val): + raise errors.ProgrammerError("Request parameter %s doesn't pass" + " validation, value %s, expected" + " type %s" % (keyname, val, keytype)) self.in_data["request"] = request self.in_text = serializer.Dump(self.in_data) + _STRING_LIST = ht.TListOf(ht.TString) + _JOB_LIST = ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, { + # pylint: disable=E1101 + # Class '...' has no 'OP_ID' member + "OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID, + opcodes.OpInstanceMigrate.OP_ID, + opcodes.OpInstanceReplaceDisks.OP_ID]) + }))) + + _NEVAC_MOVED = \ + ht.TListOf(ht.TAnd(ht.TIsLength(3), + ht.TItems([ht.TNonEmptyString, + ht.TNonEmptyString, + ht.TListOf(ht.TNonEmptyString), + ]))) + _NEVAC_FAILED = \ + ht.TListOf(ht.TAnd(ht.TIsLength(2), + ht.TItems([ht.TNonEmptyString, + ht.TMaybeString, + ]))) + _NEVAC_RESULT = ht.TAnd(ht.TIsLength(3), + ht.TItems([_NEVAC_MOVED, _NEVAC_FAILED, _JOB_LIST])) + + _MODE_DATA = { + constants.IALLOCATOR_MODE_ALLOC: + (_AddNewInstance, + [ + ("name", ht.TString), + ("memory", ht.TInt), + ("disks", ht.TListOf(ht.TDict)), + ("disk_template", ht.TString), + ("os", ht.TString), + ("tags", _STRING_LIST), + ("nics", ht.TListOf(ht.TDict)), + ("vcpus", ht.TInt), + ("hypervisor", ht.TString), + ], ht.TList), + constants.IALLOCATOR_MODE_RELOC: + (_AddRelocateInstance, + [("name", ht.TString), ("relocate_from", _STRING_LIST)], + ht.TList), + constants.IALLOCATOR_MODE_NODE_EVAC: + (_AddNodeEvacuate, [ + ("instances", _STRING_LIST), + ("evac_mode", ht.TElemOf(constants.IALLOCATOR_NEVAC_MODES)), + ], _NEVAC_RESULT), + constants.IALLOCATOR_MODE_CHG_GROUP: + (_AddChangeGroup, [ + ("instances", _STRING_LIST), + ("target_groups", _STRING_LIST), + ], _NEVAC_RESULT), + } + def Run(self, name, validate=True, call_fn=None): """Run an instance allocator and return the results. @@ -12007,9 +13222,11 @@ class IAllocator(object): " missing key '%s'" % key) setattr(self, key, rdict[key]) - if not isinstance(rdict["result"], list): - raise errors.OpExecError("Can't parse iallocator results: 'result' key" - " is not a list") + if not self._result_check(self.result): + raise errors.OpExecError("Iallocator returned invalid result," + " expected %s, got %s" % + (self._result_check, self.result), + errors.ECODE_INVAL) if self.mode == constants.IALLOCATOR_MODE_RELOC: assert self.relocate_from is not None @@ -12021,15 +13238,19 @@ class IAllocator(object): fn = compat.partial(self._NodesToGroups, node2group, self.in_data["nodegroups"]) - request_groups = fn(self.relocate_from) - result_groups = fn(rdict["result"]) + instance = self.cfg.GetInstanceInfo(self.name) + request_groups = fn(self.relocate_from + [instance.primary_node]) + result_groups = fn(rdict["result"] + [instance.primary_node]) - if result_groups != request_groups: + if self.success and not set(result_groups).issubset(request_groups): raise errors.OpExecError("Groups of nodes returned by iallocator (%s)" " differ from original groups (%s)" % (utils.CommaJoin(result_groups), utils.CommaJoin(request_groups))) + elif self.mode == constants.IALLOCATOR_MODE_NODE_EVAC: + assert self.evac_mode in constants.IALLOCATOR_NEVAC_MODES + self.out_data = rdict @staticmethod @@ -12079,7 +13300,7 @@ class LUTestAllocator(NoHooksLU): """ if self.op.mode == constants.IALLOCATOR_MODE_ALLOC: - for attr in ["mem_size", "disks", "disk_template", + for attr in ["memory", "disks", "disk_template", "os", "tags", "nics", "vcpus"]: if not hasattr(self.op, attr): raise errors.OpPrereqError("Missing attribute '%s' on opcode input" % @@ -12096,10 +13317,10 @@ class LUTestAllocator(NoHooksLU): errors.ECODE_INVAL) for row in self.op.disks: if (not isinstance(row, dict) or - "size" not in row or - not isinstance(row["size"], int) or - "mode" not in row or - row["mode"] not in ['r', 'w']): + constants.IDISK_SIZE not in row or + not isinstance(row[constants.IDISK_SIZE], int) or + constants.IDISK_MODE not in row or + row[constants.IDISK_MODE] not in constants.DISK_ACCESS_SET): raise errors.OpPrereqError("Invalid contents of the 'disks'" " parameter", errors.ECODE_INVAL) if self.op.hypervisor is None: @@ -12107,11 +13328,13 @@ class LUTestAllocator(NoHooksLU): elif self.op.mode == constants.IALLOCATOR_MODE_RELOC: fname = _ExpandInstanceName(self.cfg, self.op.name) self.op.name = fname - self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes - elif self.op.mode == constants.IALLOCATOR_MODE_MEVAC: - if not hasattr(self.op, "evac_nodes"): - raise errors.OpPrereqError("Missing attribute 'evac_nodes' on" - " opcode input", errors.ECODE_INVAL) + self.relocate_from = \ + list(self.cfg.GetInstanceInfo(fname).secondary_nodes) + elif self.op.mode in (constants.IALLOCATOR_MODE_CHG_GROUP, + constants.IALLOCATOR_MODE_NODE_EVAC): + if not self.op.instances: + raise errors.OpPrereqError("Missing instances", errors.ECODE_INVAL) + self.op.instances = _GetWantedInstances(self, self.op.instances) else: raise errors.OpPrereqError("Invalid test allocator mode '%s'" % self.op.mode, errors.ECODE_INVAL) @@ -12132,7 +13355,7 @@ class LUTestAllocator(NoHooksLU): ial = IAllocator(self.cfg, self.rpc, mode=self.op.mode, name=self.op.name, - mem_size=self.op.mem_size, + memory=self.op.memory, disks=self.op.disks, disk_template=self.op.disk_template, os=self.op.os, @@ -12147,10 +13370,16 @@ class LUTestAllocator(NoHooksLU): name=self.op.name, relocate_from=list(self.relocate_from), ) - elif self.op.mode == constants.IALLOCATOR_MODE_MEVAC: + elif self.op.mode == constants.IALLOCATOR_MODE_CHG_GROUP: + ial = IAllocator(self.cfg, self.rpc, + mode=self.op.mode, + instances=self.op.instances, + target_groups=self.op.target_groups) + elif self.op.mode == constants.IALLOCATOR_MODE_NODE_EVAC: ial = IAllocator(self.cfg, self.rpc, mode=self.op.mode, - evac_nodes=self.op.evac_nodes) + instances=self.op.instances, + evac_mode=self.op.evac_mode) else: raise errors.ProgrammerError("Uncatched mode %s in" " LUTestAllocator.Exec", self.op.mode)