X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/a2e885ee62cdbf304cc93a0837bd78f8acebd472..3f1e065d5095b2c0cda036a130575458c8f270af:/lib/cmdlib.py diff --git a/lib/cmdlib.py b/lib/cmdlib.py index 08b6bf0..98a056c 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -57,6 +57,7 @@ from ganeti import netutils from ganeti import query from ganeti import qlang from ganeti import opcodes +from ganeti import ht import ganeti.masterd.instance # pylint: disable-msg=W0611 @@ -129,17 +130,16 @@ class LogicalUnit(object): self.proc = processor self.op = op self.cfg = context.cfg + self.glm = context.glm self.context = context self.rpc = rpc # Dicts used to declare locking needs to mcpu self.needed_locks = None - self.acquired_locks = {} self.share_locks = dict.fromkeys(locking.LEVELS, 0) self.add_locks = {} self.remove_locks = {} # Used to force good behavior when calling helper functions self.recalculate_locks = {} - self.__ssh = None # logging self.Log = processor.Log # pylint: disable-msg=C0103 self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103 @@ -160,16 +160,6 @@ class LogicalUnit(object): self.CheckArguments() - def __GetSSH(self): - """Returns the SshRunner object - - """ - if not self.__ssh: - self.__ssh = ssh.SshRunner(self.cfg.GetClusterName()) - return self.__ssh - - ssh = property(fget=__GetSSH) - def CheckArguments(self): """Check syntactic validity for the opcode arguments. @@ -396,7 +386,7 @@ class LogicalUnit(object): # future we might want to have different behaviors depending on the value # of self.recalculate_locks[locking.LEVEL_NODE] wanted_nodes = [] - for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]: + for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE): instance = self.context.cfg.GetInstanceInfo(instance_name) wanted_nodes.append(instance.primary_node) if not primary_only: @@ -510,7 +500,7 @@ class _QueryBase: """ if self.do_locking: - names = lu.acquired_locks[lock_level] + names = lu.glm.list_owned(lock_level) else: names = all_names @@ -521,7 +511,7 @@ class _QueryBase: # caller specified names and we must keep the same order assert self.names - assert not self.do_locking or lu.acquired_locks[lock_level] + assert not self.do_locking or lu.glm.is_owned(lock_level) missing = set(self.wanted).difference(names) if missing: @@ -641,6 +631,51 @@ def _GetUpdatedParams(old_params, update_dict, return params_copy +def _ReleaseLocks(lu, level, names=None, keep=None): + """Releases locks owned by an LU. + + @type lu: L{LogicalUnit} + @param level: Lock level + @type names: list or None + @param names: Names of locks to release + @type keep: list or None + @param keep: Names of locks to retain + + """ + assert not (keep is not None and names is not None), \ + "Only one of the 'names' and the 'keep' parameters can be given" + + if names is not None: + should_release = names.__contains__ + elif keep: + should_release = lambda name: name not in keep + else: + should_release = None + + if should_release: + retain = [] + release = [] + + # Determine which locks to release + for name in lu.glm.list_owned(level): + if should_release(name): + release.append(name) + else: + retain.append(name) + + assert len(lu.glm.list_owned(level)) == (len(retain) + len(release)) + + # Release just some locks + lu.glm.release(level, names=release) + + assert frozenset(lu.glm.list_owned(level)) == frozenset(retain) + else: + # Release everything + lu.glm.release(level) + + assert not lu.glm.is_owned(level), "No locks should be owned" + + def _RunPostHook(lu, node_name): """Runs the post-hook for an opcode on a single node. @@ -1119,7 +1154,7 @@ def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot): iallocator = getattr(lu.op, iallocator_slot, None) if node is not None and iallocator is not None: - raise errors.OpPrereqError("Do not specify both, iallocator and node.", + raise errors.OpPrereqError("Do not specify both, iallocator and node", errors.ECODE_INVAL) elif node is None and iallocator is None: default_iallocator = lu.cfg.GetDefaultIAllocator() @@ -1127,10 +1162,10 @@ def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot): setattr(lu.op, iallocator_slot, default_iallocator) else: raise errors.OpPrereqError("No iallocator or node given and no" - " cluster-wide default iallocator found." - " Please specify either an iallocator or a" + " cluster-wide default iallocator found;" + " please specify either an iallocator or a" " node, or set a cluster-wide default" - " iallocator.") + " iallocator") class LUClusterPostInit(LogicalUnit): @@ -1219,7 +1254,7 @@ class LUClusterDestroy(LogicalUnit): def _VerifyCertificate(filename): - """Verifies a certificate for LUClusterVerify. + """Verifies a certificate for LUClusterVerifyConfig. @type filename: string @param filename: Path to PEM file @@ -1229,7 +1264,7 @@ def _VerifyCertificate(filename): cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, utils.ReadFile(filename)) except Exception, err: # pylint: disable-msg=W0703 - return (LUClusterVerify.ETYPE_ERROR, + return (LUClusterVerifyConfig.ETYPE_ERROR, "Failed to load X509 certificate %s: %s" % (filename, err)) (errcode, msg) = \ @@ -1244,21 +1279,52 @@ def _VerifyCertificate(filename): if errcode is None: return (None, fnamemsg) elif errcode == utils.CERT_WARNING: - return (LUClusterVerify.ETYPE_WARNING, fnamemsg) + return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg) elif errcode == utils.CERT_ERROR: - return (LUClusterVerify.ETYPE_ERROR, fnamemsg) + return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg) raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode) -class LUClusterVerify(LogicalUnit): - """Verifies the cluster status. +def _GetAllHypervisorParameters(cluster, instances): + """Compute the set of all hypervisor parameters. + + @type cluster: L{objects.Cluster} + @param cluster: the cluster object + @param instances: list of L{objects.Instance} + @param instances: additional instances from which to obtain parameters + @rtype: list of (origin, hypervisor, parameters) + @return: a list with all parameters found, indicating the hypervisor they + apply to, and the origin (can be "cluster", "os X", or "instance Y") """ - HPATH = "cluster-verify" - HTYPE = constants.HTYPE_CLUSTER - REQ_BGL = False + hvp_data = [] + + for hv_name in cluster.enabled_hypervisors: + hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name))) + + for os_name, os_hvp in cluster.os_hvp.items(): + for hv_name, hv_params in os_hvp.items(): + if hv_params: + full_params = cluster.GetHVDefaults(hv_name, os_name=os_name) + hvp_data.append(("os %s" % os_name, hv_name, full_params)) + # TODO: collapse identical parameter values in a single one + for instance in instances: + if instance.hvparams: + hvp_data.append(("instance %s" % instance.name, instance.hypervisor, + cluster.FillHV(instance))) + + return hvp_data + + +class _VerifyErrors(object): + """Mix-in for cluster/group verify LUs. + + It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects + self.op and self._feedback_fn to be available.) + + """ TCLUSTER = "cluster" TNODE = "node" TINSTANCE = "instance" @@ -1266,6 +1332,8 @@ class LUClusterVerify(LogicalUnit): ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG") ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT") ECLUSTERFILECHECK = (TCLUSTER, "ECLUSTERFILECHECK") + ECLUSTERDANGLINGNODES = (TNODE, "ECLUSTERDANGLINGNODES") + ECLUSTERDANGLINGINST = (TNODE, "ECLUSTERDANGLINGINST") EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE") EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN") EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT") @@ -1295,6 +1363,138 @@ class LUClusterVerify(LogicalUnit): ETYPE_ERROR = "ERROR" ETYPE_WARNING = "WARNING" + def _Error(self, ecode, item, msg, *args, **kwargs): + """Format an error message. + + Based on the opcode's error_codes parameter, either format a + parseable error code, or a simpler error string. + + This must be called only from Exec and functions called from Exec. + + """ + ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) + itype, etxt = ecode + # first complete the msg + if args: + msg = msg % args + # then format the whole message + if self.op.error_codes: # This is a mix-in. pylint: disable-msg=E1101 + msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg) + else: + if item: + item = " " + item + else: + item = "" + msg = "%s: %s%s: %s" % (ltype, itype, item, msg) + # and finally report it via the feedback_fn + self._feedback_fn(" - %s" % msg) # Mix-in. pylint: disable-msg=E1101 + + def _ErrorIf(self, cond, *args, **kwargs): + """Log an error message if the passed condition is True. + + """ + cond = (bool(cond) + or self.op.debug_simulate_errors) # pylint: disable-msg=E1101 + if cond: + self._Error(*args, **kwargs) + # do not mark the operation as failed for WARN cases only + if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR: + self.bad = self.bad or cond + + +class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors): + """Verifies the cluster config. + + """ + REQ_BGL = False + + def _VerifyHVP(self, hvp_data): + """Verifies locally the syntax of the hypervisor parameters. + + """ + for item, hv_name, hv_params in hvp_data: + msg = ("hypervisor %s parameters syntax check (source %s): %%s" % + (item, hv_name)) + try: + hv_class = hypervisor.GetHypervisor(hv_name) + utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES) + hv_class.CheckParameterSyntax(hv_params) + except errors.GenericError, err: + self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err)) + + def ExpandNames(self): + self.all_group_info = self.cfg.GetAllNodeGroupsInfo() + self.all_node_info = self.cfg.GetAllNodesInfo() + self.all_inst_info = self.cfg.GetAllInstancesInfo() + self.needed_locks = {} + + def Exec(self, feedback_fn): + """Verify integrity of cluster, performing various test on nodes. + + """ + self.bad = False + self._feedback_fn = feedback_fn + + feedback_fn("* Verifying cluster config") + + for msg in self.cfg.VerifyConfig(): + self._ErrorIf(True, self.ECLUSTERCFG, None, msg) + + feedback_fn("* Verifying cluster certificate files") + + for cert_filename in constants.ALL_CERT_FILES: + (errcode, msg) = _VerifyCertificate(cert_filename) + self._ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode) + + feedback_fn("* Verifying hypervisor parameters") + + self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(), + self.all_inst_info.values())) + + feedback_fn("* Verifying all nodes belong to an existing group") + + # We do this verification here because, should this bogus circumstance + # occur, it would never be catched by VerifyGroup, which only acts on + # nodes/instances reachable from existing node groups. + + dangling_nodes = set(node.name for node in self.all_node_info.values() + if node.group not in self.all_group_info) + + dangling_instances = {} + no_node_instances = [] + + for inst in self.all_inst_info.values(): + if inst.primary_node in dangling_nodes: + dangling_instances.setdefault(inst.primary_node, []).append(inst.name) + elif inst.primary_node not in self.all_node_info: + no_node_instances.append(inst.name) + + pretty_dangling = [ + "%s (%s)" % + (node.name, + utils.CommaJoin(dangling_instances.get(node.name, + ["no instances"]))) + for node in dangling_nodes] + + self._ErrorIf(bool(dangling_nodes), self.ECLUSTERDANGLINGNODES, None, + "the following nodes (and their instances) belong to a non" + " existing group: %s", utils.CommaJoin(pretty_dangling)) + + self._ErrorIf(bool(no_node_instances), self.ECLUSTERDANGLINGINST, None, + "the following instances have a non-existing primary-node:" + " %s", utils.CommaJoin(no_node_instances)) + + return (not self.bad, [g.name for g in self.all_group_info.values()]) + + +class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): + """Verifies the status of a node group. + + """ + HPATH = "cluster-verify" + HTYPE = constants.HTYPE_CLUSTER + REQ_BGL = False + _HOOKS_INDENT_RE = re.compile("^", re.M) class NodeImage(object): @@ -1348,48 +1548,90 @@ class LUClusterVerify(LogicalUnit): self.oslist = {} def ExpandNames(self): + # This raises errors.OpPrereqError on its own: + self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) + + all_node_info = self.cfg.GetAllNodesInfo() + all_inst_info = self.cfg.GetAllInstancesInfo() + + node_names = set(node.name + for node in all_node_info.values() + if node.group == self.group_uuid) + + inst_names = [inst.name + for inst in all_inst_info.values() + if inst.primary_node in node_names] + + # In Exec(), we warn about mirrored instances that have primary and + # secondary living in separate node groups. To fully verify that + # volumes for these instances are healthy, we will need to do an + # extra call to their secondaries. We ensure here those nodes will + # be locked. + for inst in inst_names: + if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR: + node_names.update(all_inst_info[inst].secondary_nodes) + self.needed_locks = { - locking.LEVEL_NODE: locking.ALL_SET, - locking.LEVEL_INSTANCE: locking.ALL_SET, + locking.LEVEL_NODEGROUP: [self.group_uuid], + locking.LEVEL_NODE: list(node_names), + locking.LEVEL_INSTANCE: inst_names, } + self.share_locks = dict.fromkeys(locking.LEVELS, 1) - def _Error(self, ecode, item, msg, *args, **kwargs): - """Format an error message. + def CheckPrereq(self): + self.all_node_info = self.cfg.GetAllNodesInfo() + self.all_inst_info = self.cfg.GetAllInstancesInfo() - Based on the opcode's error_codes parameter, either format a - parseable error code, or a simpler error string. + group_nodes = set(node.name + for node in self.all_node_info.values() + if node.group == self.group_uuid) - This must be called only from Exec and functions called from Exec. + group_instances = set(inst.name + for inst in self.all_inst_info.values() + if inst.primary_node in group_nodes) - """ - ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) - itype, etxt = ecode - # first complete the msg - if args: - msg = msg % args - # then format the whole message - if self.op.error_codes: - msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg) - else: - if item: - item = " " + item - else: - item = "" - msg = "%s: %s%s: %s" % (ltype, itype, item, msg) - # and finally report it via the feedback_fn - self._feedback_fn(" - %s" % msg) + unlocked_nodes = \ + group_nodes.difference(self.glm.list_owned(locking.LEVEL_NODE)) - def _ErrorIf(self, cond, *args, **kwargs): - """Log an error message if the passed condition is True. + unlocked_instances = \ + group_instances.difference(self.glm.list_owned(locking.LEVEL_INSTANCE)) - """ - cond = bool(cond) or self.op.debug_simulate_errors - if cond: - self._Error(*args, **kwargs) - # do not mark the operation as failed for WARN cases only - if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR: - self.bad = self.bad or cond + if unlocked_nodes: + raise errors.OpPrereqError("missing lock for nodes: %s" % + utils.CommaJoin(unlocked_nodes)) + + if unlocked_instances: + raise errors.OpPrereqError("missing lock for instances: %s" % + utils.CommaJoin(unlocked_instances)) + + self.my_node_names = utils.NiceSort(group_nodes) + self.my_inst_names = utils.NiceSort(group_instances) + + self.my_node_info = dict((name, self.all_node_info[name]) + for name in self.my_node_names) + + self.my_inst_info = dict((name, self.all_inst_info[name]) + for name in self.my_inst_names) + + # We detect here the nodes that will need the extra RPC calls for verifying + # split LV volumes; they should be locked. + extra_lv_nodes = set() + + for inst in self.my_inst_info.values(): + if inst.disk_template in constants.DTS_INT_MIRROR: + group = self.my_node_info[inst.primary_node].group + for nname in inst.secondary_nodes: + if self.all_node_info[nname].group != group: + extra_lv_nodes.add(nname) + + unlocked_lv_nodes = \ + extra_lv_nodes.difference(self.glm.list_owned(locking.LEVEL_NODE)) + + if unlocked_lv_nodes: + raise errors.OpPrereqError("these nodes could be locked: %s" % + utils.CommaJoin(unlocked_lv_nodes)) + self.extra_lv_nodes = list(extra_lv_nodes) def _VerifyNode(self, ninfo, nresult): """Perform some basic validation on data returned from a node. @@ -1458,7 +1700,7 @@ class LUClusterVerify(LogicalUnit): hv_name, item, hv_result) test = nresult.get(constants.NV_NODESETUP, - ["Missing NODESETUP results"]) + ["Missing NODESETUP results"]) _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s", "; ".join(test)) @@ -1497,7 +1739,7 @@ class LUClusterVerify(LogicalUnit): ntime_diff) def _VerifyNodeLVM(self, ninfo, nresult, vg_name): - """Check the node time. + """Check the node LVM results. @type ninfo: L{objects.Node} @param ninfo: the node to check @@ -1533,8 +1775,31 @@ class LUClusterVerify(LogicalUnit): _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV" " '%s' of VG '%s'", pvname, owner_vg) + def _VerifyNodeBridges(self, ninfo, nresult, bridges): + """Check the node bridges. + + @type ninfo: L{objects.Node} + @param ninfo: the node to check + @param nresult: the remote results for the node + @param bridges: the expected list of bridges + + """ + if not bridges: + return + + node = ninfo.name + _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + + missing = nresult.get(constants.NV_BRIDGES, None) + test = not isinstance(missing, list) + _ErrorIf(test, self.ENODENET, node, + "did not return valid bridge information") + if not test: + _ErrorIf(bool(missing), self.ENODENET, node, "missing bridges: %s" % + utils.CommaJoin(sorted(missing))) + def _VerifyNodeNetwork(self, ninfo, nresult): - """Check the node time. + """Check the node network connectivity results. @type ninfo: L{objects.Node} @param ninfo: the node to check @@ -1606,12 +1871,6 @@ class LUClusterVerify(LogicalUnit): "instance not running on its primary node %s", node_current) - for node, n_img in node_image.items(): - if node != node_current: - test = instance in n_img.instances - _ErrorIf(test, self.EINSTANCEWRONGNODE, instance, - "instance should not run on node %s", node) - diskdata = [(nname, success, status, idx) for (nname, disks) in diskstatus.items() for idx, (success, status) in enumerate(disks)] @@ -1651,18 +1910,6 @@ class LUClusterVerify(LogicalUnit): self._ErrorIf(test, self.ENODEORPHANLV, node, "volume %s is unknown", volume) - def _VerifyOrphanInstances(self, instancelist, node_image): - """Verify the list of running instances. - - This checks what instances are running but unknown to the cluster. - - """ - for node, n_img in node_image.items(): - for o_inst in n_img.instances: - test = o_inst not in instancelist - self._ErrorIf(test, self.ENODEORPHANINSTANCE, node, - "instance %s on node %s should not exist", o_inst, node) - def _VerifyNPlusOneMemory(self, node_image, instance_cfg): """Verify N+1 Memory Resilience. @@ -1902,6 +2149,7 @@ class LUClusterVerify(LogicalUnit): assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?" + beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l] for os_name, os_data in nimg.oslist.items(): assert os_data, "Empty OS status for OS %s?!" % os_name f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0] @@ -1929,11 +2177,12 @@ class LUClusterVerify(LogicalUnit): continue for kind, a, b in [("API version", f_api, b_api), ("variants list", f_var, b_var), - ("parameters", f_param, b_param)]: + ("parameters", beautify_params(f_param), + beautify_params(b_param))]: _ErrorIf(a != b, self.ENODEOS, node, - "OS %s %s differs from reference node %s: %s vs. %s", + "OS %s for %s differs from reference node %s: [%s] vs. [%s]", kind, os_name, base.name, - utils.CommaJoin(a), utils.CommaJoin(b)) + utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b))) # check any missing OSes missing = set(base.oslist.keys()).difference(nimg.oslist.keys()) @@ -2143,20 +2392,6 @@ class LUClusterVerify(LogicalUnit): return instdisk - def _VerifyHVP(self, hvp_data): - """Verifies locally the syntax of the hypervisor parameters. - - """ - for item, hv_name, hv_params in hvp_data: - msg = ("hypervisor %s parameters syntax check (source %s): %%s" % - (item, hv_name)) - try: - hv_class = hypervisor.GetHypervisor(hv_name) - utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES) - hv_class.CheckParameterSyntax(hv_params) - except errors.GenericError, err: - self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err)) - def BuildHooksEnv(self): """Build hooks env. @@ -2164,14 +2399,12 @@ class LUClusterVerify(LogicalUnit): the output be logged in the verify output and the verification to fail. """ - cfg = self.cfg - env = { - "CLUSTER_TAGS": " ".join(cfg.GetClusterInfo().GetTags()) + "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()) } env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags())) - for node in cfg.GetAllNodesInfo().values()) + for node in self.my_node_info.values()) return env @@ -2179,10 +2412,12 @@ class LUClusterVerify(LogicalUnit): """Build hooks nodes. """ - return ([], self.cfg.GetNodeList()) + assert self.my_node_names, ("Node list not gathered," + " has CheckPrereq been executed?") + return ([], self.my_node_names) def Exec(self, feedback_fn): - """Verify integrity of cluster, performing various test on nodes. + """Verify integrity of the node group, performing various test on nodes. """ # This method has too many local variables. pylint: disable-msg=R0914 @@ -2190,26 +2425,14 @@ class LUClusterVerify(LogicalUnit): _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 verbose = self.op.verbose self._feedback_fn = feedback_fn - feedback_fn("* Verifying global settings") - for msg in self.cfg.VerifyConfig(): - _ErrorIf(True, self.ECLUSTERCFG, None, msg) - - # Check the cluster certificates - for cert_filename in constants.ALL_CERT_FILES: - (errcode, msg) = _VerifyCertificate(cert_filename) - _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode) vg_name = self.cfg.GetVGName() drbd_helper = self.cfg.GetDRBDHelper() - hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors cluster = self.cfg.GetClusterInfo() - nodelist = utils.NiceSort(self.cfg.GetNodeList()) - nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist] - nodeinfo_byname = dict(zip(nodelist, nodeinfo)) - instancelist = utils.NiceSort(self.cfg.GetInstanceList()) - instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname)) - for iname in instancelist) groupinfo = self.cfg.GetAllNodeGroupsInfo() + hypervisors = cluster.enabled_hypervisors + node_data_list = [self.my_node_info[name] for name in self.my_node_names] + i_non_redundant = [] # Non redundant instances i_non_a_balanced = [] # Non auto-balanced instances n_offline = 0 # Count of offline nodes @@ -2225,37 +2448,32 @@ class LUClusterVerify(LogicalUnit): master_node = self.master_node = self.cfg.GetMasterNode() master_ip = self.cfg.GetMasterIP() - # Compute the set of hypervisor parameters - hvp_data = [] - for hv_name in hypervisors: - hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name))) - for os_name, os_hvp in cluster.os_hvp.items(): - for hv_name, hv_params in os_hvp.items(): - if not hv_params: - continue - full_params = cluster.GetHVDefaults(hv_name, os_name=os_name) - hvp_data.append(("os %s" % os_name, hv_name, full_params)) - # TODO: collapse identical parameter values in a single one - for instance in instanceinfo.values(): - if not instance.hvparams: - continue - hvp_data.append(("instance %s" % instance.name, instance.hypervisor, - cluster.FillHV(instance))) - # and verify them locally - self._VerifyHVP(hvp_data) + feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names)) + + # We will make nodes contact all nodes in their group, and one node from + # every other group. + # TODO: should it be a *random* node, different every time? + online_nodes = [node.name for node in node_data_list if not node.offline] + other_group_nodes = {} + + for name in sorted(self.all_node_info): + node = self.all_node_info[name] + if (node.group not in other_group_nodes + and node.group != self.group_uuid + and not node.offline): + other_group_nodes[node.group] = node.name - feedback_fn("* Gathering data (%d nodes)" % len(nodelist)) node_verify_param = { constants.NV_FILELIST: utils.UniqueSequence(filename for files in filemap for filename in files), - constants.NV_NODELIST: [node.name for node in nodeinfo - if not node.offline], + constants.NV_NODELIST: online_nodes + other_group_nodes.values(), constants.NV_HYPERVISOR: hypervisors, - constants.NV_HVPARAMS: hvp_data, - constants.NV_NODENETTEST: [(node.name, node.primary_ip, - node.secondary_ip) for node in nodeinfo + constants.NV_HVPARAMS: + _GetAllHypervisorParameters(cluster, self.all_inst_info.values()), + constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip) + for node in node_data_list if not node.offline], constants.NV_INSTANCELIST: hypervisors, constants.NV_VERSION: None, @@ -2276,15 +2494,30 @@ class LUClusterVerify(LogicalUnit): if drbd_helper: node_verify_param[constants.NV_DRBDHELPER] = drbd_helper + # bridge checks + # FIXME: this needs to be changed per node-group, not cluster-wide + bridges = set() + default_nicpp = cluster.nicparams[constants.PP_DEFAULT] + if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED: + bridges.add(default_nicpp[constants.NIC_LINK]) + for instance in self.my_inst_info.values(): + for nic in instance.nics: + full_nic = cluster.SimpleFillNIC(nic.nicparams) + if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED: + bridges.add(full_nic[constants.NIC_LINK]) + + if bridges: + node_verify_param[constants.NV_BRIDGES] = list(bridges) + # Build our expected cluster state node_image = dict((node.name, self.NodeImage(offline=node.offline, name=node.name, vm_capable=node.vm_capable)) - for node in nodeinfo) + for node in node_data_list) # Gather OOB paths oob_paths = [] - for node in nodeinfo: + for node in self.all_node_info.values(): path = _SupportsOob(self.cfg, node) if path and path not in oob_paths: oob_paths.append(path) @@ -2292,14 +2525,13 @@ class LUClusterVerify(LogicalUnit): if oob_paths: node_verify_param[constants.NV_OOB_PATHS] = oob_paths - for instance in instancelist: - inst_config = instanceinfo[instance] + for instance in self.my_inst_names: + inst_config = self.my_inst_info[instance] for nname in inst_config.all_nodes: if nname not in node_image: - # ghost node gnode = self.NodeImage(name=nname) - gnode.ghost = True + gnode.ghost = (nname not in self.all_node_info) node_image[nname] = gnode inst_config.MapLVsByNode(node_vol_should) @@ -2322,23 +2554,59 @@ class LUClusterVerify(LogicalUnit): # time before and after executing the request, we can at least have a time # window. nvinfo_starttime = time.time() - all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param, + all_nvinfo = self.rpc.call_node_verify(self.my_node_names, + node_verify_param, self.cfg.GetClusterName()) + if self.extra_lv_nodes and vg_name is not None: + extra_lv_nvinfo = \ + self.rpc.call_node_verify(self.extra_lv_nodes, + {constants.NV_LVLIST: vg_name}, + self.cfg.GetClusterName()) + else: + extra_lv_nvinfo = {} nvinfo_endtime = time.time() all_drbd_map = self.cfg.ComputeDRBDMap() - feedback_fn("* Gathering disk information (%s nodes)" % len(nodelist)) - instdisk = self._CollectDiskInfo(nodelist, node_image, instanceinfo) + feedback_fn("* Gathering disk information (%s nodes)" % + len(self.my_node_names)) + instdisk = self._CollectDiskInfo(self.my_node_names, node_image, + self.my_inst_info) feedback_fn("* Verifying configuration file consistency") - self._VerifyFiles(_ErrorIf, nodeinfo, master_node, all_nvinfo, filemap) + + # If not all nodes are being checked, we need to make sure the master node + # and a non-checked vm_capable node are in the list. + absent_nodes = set(self.all_node_info).difference(self.my_node_info) + if absent_nodes: + vf_nvinfo = all_nvinfo.copy() + vf_node_info = list(self.my_node_info.values()) + additional_nodes = [] + if master_node not in self.my_node_info: + additional_nodes.append(master_node) + vf_node_info.append(self.all_node_info[master_node]) + # Add the first vm_capable node we find which is not included + for node in absent_nodes: + nodeinfo = self.all_node_info[node] + if nodeinfo.vm_capable and not nodeinfo.offline: + additional_nodes.append(node) + vf_node_info.append(self.all_node_info[node]) + break + key = constants.NV_FILELIST + vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes, + {key: node_verify_param[key]}, + self.cfg.GetClusterName())) + else: + vf_nvinfo = all_nvinfo + vf_node_info = self.my_node_info.values() + + self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap) feedback_fn("* Verifying node status") refos_img = None - for node_i in nodeinfo: + for node_i in node_data_list: node = node_i.name nimg = node_image[node] @@ -2375,23 +2643,41 @@ class LUClusterVerify(LogicalUnit): if nimg.vm_capable: self._VerifyNodeLVM(node_i, nresult, vg_name) - self._VerifyNodeDrbd(node_i, nresult, instanceinfo, drbd_helper, + self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper, all_drbd_map) self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name) self._UpdateNodeInstances(node_i, nresult, nimg) self._UpdateNodeInfo(node_i, nresult, nimg, vg_name) self._UpdateNodeOS(node_i, nresult, nimg) + if not nimg.os_fail: if refos_img is None: refos_img = nimg self._VerifyNodeOS(node_i, nimg, refos_img) + self._VerifyNodeBridges(node_i, nresult, bridges) + + # Check whether all running instancies are primary for the node. (This + # can no longer be done from _VerifyInstance below, since some of the + # wrong instances could be from other node groups.) + non_primary_inst = set(nimg.instances).difference(nimg.pinst) + + for inst in non_primary_inst: + test = inst in self.all_inst_info + _ErrorIf(test, self.EINSTANCEWRONGNODE, inst, + "instance should not run on node %s", node_i.name) + _ErrorIf(not test, self.ENODEORPHANINSTANCE, node_i.name, + "node is running unknown instance %s", inst) + + for node, result in extra_lv_nvinfo.items(): + self._UpdateNodeVolumes(self.all_node_info[node], result.payload, + node_image[node], vg_name) feedback_fn("* Verifying instance status") - for instance in instancelist: + for instance in self.my_inst_names: if verbose: feedback_fn("* Verifying instance %s" % instance) - inst_config = instanceinfo[instance] + inst_config = self.my_inst_info[instance] self._VerifyInstance(instance, inst_config, node_image, instdisk[instance]) inst_nodes_offline = [] @@ -2426,7 +2712,7 @@ class LUClusterVerify(LogicalUnit): instance_groups = {} for node in instance_nodes: - instance_groups.setdefault(nodeinfo_byname[node].group, + instance_groups.setdefault(self.all_node_info[node].group, []).append(node) pretty_list = [ @@ -2465,14 +2751,22 @@ class LUClusterVerify(LogicalUnit): feedback_fn("* Verifying orphan volumes") reserved = utils.FieldSet(*cluster.reserved_lvs) - self._VerifyOrphanVolumes(node_vol_should, node_image, reserved) - feedback_fn("* Verifying orphan instances") - self._VerifyOrphanInstances(instancelist, node_image) + # We will get spurious "unknown volume" warnings if any node of this group + # is secondary for an instance whose primary is in another group. To avoid + # them, we find these instances and add their volumes to node_vol_should. + for inst in self.all_inst_info.values(): + for secondary in inst.secondary_nodes: + if (secondary in self.my_node_info + and inst.name not in self.my_inst_info): + inst.MapLVsByNode(node_vol_should) + break + + self._VerifyOrphanVolumes(node_vol_should, node_image, reserved) if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks: feedback_fn("* Verifying N+1 Memory redundancy") - self._VerifyNPlusOneMemory(node_image, instanceinfo) + self._VerifyNPlusOneMemory(node_image, self.my_inst_info) feedback_fn("* Other Notes") if i_non_redundant: @@ -2613,10 +2907,7 @@ class LUClusterRepairDiskSizes(NoHooksLU): def ExpandNames(self): if self.op.instances: - self.wanted_names = [] - for name in self.op.instances: - full_name = _ExpandInstanceName(self.cfg, name) - self.wanted_names.append(full_name) + self.wanted_names = _GetWantedInstances(self, self.op.instances) self.needed_locks = { locking.LEVEL_NODE: [], locking.LEVEL_INSTANCE: self.wanted_names, @@ -2628,7 +2919,7 @@ class LUClusterRepairDiskSizes(NoHooksLU): locking.LEVEL_NODE: locking.ALL_SET, locking.LEVEL_INSTANCE: locking.ALL_SET, } - self.share_locks = dict(((i, 1) for i in locking.LEVELS)) + self.share_locks = dict.fromkeys(locking.LEVELS, 1) def DeclareLocks(self, level): if level == locking.LEVEL_NODE and self.wanted_names is not None: @@ -2641,7 +2932,7 @@ class LUClusterRepairDiskSizes(NoHooksLU): """ if self.wanted_names is None: - self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE] + self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE) self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name in self.wanted_names] @@ -2866,7 +3157,7 @@ class LUClusterSetParams(LogicalUnit): " drbd-based instances exist", errors.ECODE_INVAL) - node_list = self.acquired_locks[locking.LEVEL_NODE] + node_list = self.glm.list_owned(locking.LEVEL_NODE) # if vg_name not None, checks given volume group on all nodes if self.op.vg_name: @@ -2941,8 +3232,8 @@ class LUClusterSetParams(LogicalUnit): # if we're moving instances to routed, check that they have an ip target_mode = params_filled[constants.NIC_MODE] if target_mode == constants.NIC_MODE_ROUTED and not nic.ip: - nic_errors.append("Instance %s, nic/%d: routed nick with no ip" % - (instance.name, nic_idx)) + nic_errors.append("Instance %s, nic/%d: routed NIC with no ip" + " address" % (instance.name, nic_idx)) if nic_errors: raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" % "\n".join(nic_errors)) @@ -3393,6 +3684,20 @@ class LUOobCommand(NoHooksLU): REG_BGL = False _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE) + def ExpandNames(self): + """Gather locks we need. + + """ + if self.op.node_names: + self.op.node_names = _GetWantedNodes(self, self.op.node_names) + lock_names = self.op.node_names + else: + lock_names = locking.ALL_SET + + self.needed_locks = { + locking.LEVEL_NODE: lock_names, + } + def CheckPrereq(self): """Check prerequisites. @@ -3409,23 +3714,23 @@ class LUOobCommand(NoHooksLU): assert self.op.power_delay >= 0.0 if self.op.node_names: - if self.op.command in self._SKIP_MASTER: - if self.master_node in self.op.node_names: - master_node_obj = self.cfg.GetNodeInfo(self.master_node) - master_oob_handler = _SupportsOob(self.cfg, master_node_obj) - - if master_oob_handler: - additional_text = ("Run '%s %s %s' if you want to operate on the" - " master regardless") % (master_oob_handler, - self.op.command, - self.master_node) - else: - additional_text = "The master node does not support out-of-band" + if (self.op.command in self._SKIP_MASTER and + self.master_node in self.op.node_names): + master_node_obj = self.cfg.GetNodeInfo(self.master_node) + master_oob_handler = _SupportsOob(self.cfg, master_node_obj) + + if master_oob_handler: + additional_text = ("run '%s %s %s' if you want to operate on the" + " master regardless") % (master_oob_handler, + self.op.command, + self.master_node) + else: + additional_text = "it does not support out-of-band operations" - raise errors.OpPrereqError(("Operating on the master node %s is not" - " allowed for %s\n%s") % - (self.master_node, self.op.command, - additional_text), errors.ECODE_INVAL) + raise errors.OpPrereqError(("Operating on the master node %s is not" + " allowed for %s; %s") % + (self.master_node, self.op.command, + additional_text), errors.ECODE_INVAL) else: self.op.node_names = self.cfg.GetNodeList() if self.op.command in self._SKIP_MASTER: @@ -3449,21 +3754,6 @@ class LUOobCommand(NoHooksLU): " not marked offline") % node_name, errors.ECODE_STATE) - def ExpandNames(self): - """Gather locks we need. - - """ - if self.op.node_names: - self.op.node_names = [_ExpandNodeName(self.cfg, name) - for name in self.op.node_names] - lock_names = self.op.node_names - else: - lock_names = locking.ALL_SET - - self.needed_locks = { - locking.LEVEL_NODE: lock_names, - } - def Exec(self, feedback_fn): """Execute OOB and return result if we expect any. @@ -3471,7 +3761,8 @@ class LUOobCommand(NoHooksLU): master_node = self.master_node ret = [] - for idx, node in enumerate(self.nodes): + for idx, node in enumerate(utils.NiceSort(self.nodes, + key=lambda node: node.name)): node_entry = [(constants.RS_NORMAL, node.name)] ret.append(node_entry) @@ -3488,14 +3779,14 @@ class LUOobCommand(NoHooksLU): self.op.timeout) if result.fail_msg: - self.LogWarning("On node '%s' out-of-band RPC failed with: %s", + self.LogWarning("Out-of-band RPC failed on node '%s': %s", node.name, result.fail_msg) node_entry.append((constants.RS_NODATA, None)) else: try: self._CheckPayload(result) except errors.OpExecError, err: - self.LogWarning("The payload returned by '%s' is not valid: %s", + self.LogWarning("Payload returned by node '%s' is not valid: %s", node.name, err) node_entry.append((constants.RS_NODATA, None)) else: @@ -3504,8 +3795,8 @@ class LUOobCommand(NoHooksLU): for item, status in result.payload: if status in [constants.OOB_STATUS_WARNING, constants.OOB_STATUS_CRITICAL]: - self.LogWarning("On node '%s' item '%s' has status '%s'", - node.name, item, status) + self.LogWarning("Item '%s' on node '%s' has status '%s'", + item, node.name, status) if self.op.command == constants.OOB_POWER_ON: node.powered = True @@ -3634,7 +3925,10 @@ class _OsQuery(_QueryBase): """ # Locking is not used - assert not (lu.acquired_locks or self.do_locking or self.use_locking) + assert not (compat.any(lu.glm.is_owned(level) + for level in locking.LEVELS + if level != locking.LEVEL_CLUSTER) or + self.do_locking or self.use_locking) valid_nodes = [node.name for node in lu.cfg.GetAllNodesInfo().values() @@ -3775,15 +4069,14 @@ class LUNodeRemove(LogicalUnit): masternode = self.cfg.GetMasterNode() if node.name == masternode: - raise errors.OpPrereqError("Node is the master node," - " you need to failover first.", - errors.ECODE_INVAL) + raise errors.OpPrereqError("Node is the master node, failover to another" + " node is required", errors.ECODE_INVAL) for instance_name in instance_list: instance = self.cfg.GetInstanceInfo(instance_name) if node.name in instance.all_nodes: raise errors.OpPrereqError("Instance %s is still running on the node," - " please remove first." % instance_name, + " please remove first" % instance_name, errors.ECODE_INVAL) self.op.node_name = node.name self.node = node @@ -3941,7 +4234,7 @@ class LUNodeQueryvols(NoHooksLU): """Computes the list of nodes and their attributes. """ - nodenames = self.acquired_locks[locking.LEVEL_NODE] + nodenames = self.glm.list_owned(locking.LEVEL_NODE) volumes = self.rpc.call_node_volumes(nodenames) ilist = [self.cfg.GetInstanceInfo(iname) for iname @@ -4019,7 +4312,7 @@ class LUNodeQueryStorage(NoHooksLU): """Computes the list of nodes and their attributes. """ - self.nodes = self.acquired_locks[locking.LEVEL_NODE] + self.nodes = self.glm.list_owned(locking.LEVEL_NODE) # Always get name to sort by if constants.SF_NAME in self.op.output_fields: @@ -4266,6 +4559,11 @@ class LUNodeAdd(LogicalUnit): self.hostname = netutils.GetHostname(name=self.op.node_name, family=self.primary_ip_family) self.op.node_name = self.hostname.name + + if self.op.readd and self.op.node_name == self.cfg.GetMasterNode(): + raise errors.OpPrereqError("Cannot readd the master node", + errors.ECODE_STATE) + if self.op.readd and self.op.group: raise errors.OpPrereqError("Cannot pass a node group when a node is" " being readded", errors.ECODE_INVAL) @@ -4501,7 +4799,7 @@ class LUNodeAdd(LogicalUnit): feedback_fn("ssh/hostname verification failed" " (checking from %s): %s" % (verifier, nl_payload[failed])) - raise errors.OpExecError("ssh/hostname verification failed.") + raise errors.OpExecError("ssh/hostname verification failed") if self.op.readd: _RedistributeAncillaryFiles(self) @@ -4584,21 +4882,22 @@ class LUNodeSetParams(LogicalUnit): # If we have locked all instances, before waiting to lock nodes, release # all the ones living on nodes unrelated to the current operation. if level == locking.LEVEL_NODE and self.lock_instances: - instances_release = [] - instances_keep = [] self.affected_instances = [] if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET: - for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]: + instances_keep = [] + + # Build list of instances to release + for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE): instance = self.context.cfg.GetInstanceInfo(instance_name) - i_mirrored = instance.disk_template in constants.DTS_INT_MIRROR - if i_mirrored and self.op.node_name in instance.all_nodes: + if (instance.disk_template in constants.DTS_INT_MIRROR and + self.op.node_name in instance.all_nodes): instances_keep.append(instance_name) self.affected_instances.append(instance) - else: - instances_release.append(instance_name) - if instances_release: - self.context.glm.release(locking.LEVEL_INSTANCE, instances_release) - self.acquired_locks[locking.LEVEL_INSTANCE] = instances_keep + + _ReleaseLocks(self, locking.LEVEL_INSTANCE, keep=instances_keep) + + assert (set(self.glm.list_owned(locking.LEVEL_INSTANCE)) == + set(instances_keep)) def BuildHooksEnv(self): """Build hooks env. @@ -4664,7 +4963,7 @@ class LUNodeSetParams(LogicalUnit): self.old_flags = old_flags = (node.master_candidate, node.drained, node.offline) - assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags) + assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags) self.old_role = old_role = self._F2R[old_flags] # Check for ineffective changes @@ -4680,12 +4979,12 @@ class LUNodeSetParams(LogicalUnit): if _SupportsOob(self.cfg, node): if self.op.offline is False and not (node.powered or self.op.powered == True): - raise errors.OpPrereqError(("Please power on node %s first before you" - " can reset offline state") % + raise errors.OpPrereqError(("Node %s needs to be turned on before its" + " offline status can be reset") % self.op.node_name) elif self.op.powered is not None: raise errors.OpPrereqError(("Unable to change powered state for node %s" - " which does not support out-of-band" + " as it does not support out-of-band" " handling") % self.op.node_name) # If we're being deofflined/drained, we'll MC ourself if needed @@ -5379,7 +5678,8 @@ class LUInstanceStartup(LogicalUnit): instance = self.instance force = self.op.force - self.cfg.MarkInstanceUp(instance.name) + if not self.op.no_remember: + self.cfg.MarkInstanceUp(instance.name) if self.primary_offline: assert self.op.ignore_offline_nodes @@ -5544,7 +5844,8 @@ class LUInstanceShutdown(LogicalUnit): node_current = instance.primary_node timeout = self.op.timeout - self.cfg.MarkInstanceDown(instance.name) + if not self.op.no_remember: + self.cfg.MarkInstanceDown(instance.name) if self.primary_offline: assert self.op.ignore_offline_nodes @@ -5657,8 +5958,25 @@ class LUInstanceRecreateDisks(LogicalUnit): HTYPE = constants.HTYPE_INSTANCE REQ_BGL = False + def CheckArguments(self): + # normalise the disk list + self.op.disks = sorted(frozenset(self.op.disks)) + def ExpandNames(self): self._ExpandAndLockInstance() + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + if self.op.nodes: + self.op.nodes = [_ExpandNodeName(self.cfg, n) for n in self.op.nodes] + self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes) + else: + self.needed_locks[locking.LEVEL_NODE] = [] + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE: + # if we replace the nodes, we only need to lock the old primary, + # otherwise we need to lock all nodes for disk re-creation + primary_only = bool(self.op.nodes) + self._LockInstancesNodes(primary_only=primary_only) def BuildHooksEnv(self): """Build hooks env. @@ -5684,32 +6002,72 @@ class LUInstanceRecreateDisks(LogicalUnit): instance = self.cfg.GetInstanceInfo(self.op.instance_name) assert instance is not None, \ "Cannot retrieve locked instance %s" % self.op.instance_name - _CheckNodeOnline(self, instance.primary_node) + if self.op.nodes: + if len(self.op.nodes) != len(instance.all_nodes): + raise errors.OpPrereqError("Instance %s currently has %d nodes, but" + " %d replacement nodes were specified" % + (instance.name, len(instance.all_nodes), + len(self.op.nodes)), + errors.ECODE_INVAL) + assert instance.disk_template != constants.DT_DRBD8 or \ + len(self.op.nodes) == 2 + assert instance.disk_template != constants.DT_PLAIN or \ + len(self.op.nodes) == 1 + primary_node = self.op.nodes[0] + else: + primary_node = instance.primary_node + _CheckNodeOnline(self, primary_node) if instance.disk_template == constants.DT_DISKLESS: raise errors.OpPrereqError("Instance '%s' has no disks" % self.op.instance_name, errors.ECODE_INVAL) - _CheckInstanceDown(self, instance, "cannot recreate disks") + # if we replace nodes *and* the old primary is offline, we don't + # check + assert instance.primary_node in self.needed_locks[locking.LEVEL_NODE] + old_pnode = self.cfg.GetNodeInfo(instance.primary_node) + if not (self.op.nodes and old_pnode.offline): + _CheckInstanceDown(self, instance, "cannot recreate disks") if not self.op.disks: self.op.disks = range(len(instance.disks)) else: for idx in self.op.disks: if idx >= len(instance.disks): - raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx, + raise errors.OpPrereqError("Invalid disk index '%s'" % idx, errors.ECODE_INVAL) - + if self.op.disks != range(len(instance.disks)) and self.op.nodes: + raise errors.OpPrereqError("Can't recreate disks partially and" + " change the nodes at the same time", + errors.ECODE_INVAL) self.instance = instance def Exec(self, feedback_fn): """Recreate the disks. """ + # change primary node, if needed + if self.op.nodes: + self.instance.primary_node = self.op.nodes[0] + self.LogWarning("Changing the instance's nodes, you will have to" + " remove any disks left on the older nodes manually") + to_skip = [] - for idx, _ in enumerate(self.instance.disks): + for idx, disk in enumerate(self.instance.disks): if idx not in self.op.disks: # disk idx has not been passed in to_skip.append(idx) continue + # update secondaries for disks, if needed + if self.op.nodes: + if disk.dev_type == constants.LD_DRBD8: + # need to update the nodes + assert len(self.op.nodes) == 2 + logical_id = list(disk.logical_id) + logical_id[0] = self.op.nodes[0] + logical_id[1] = self.op.nodes[1] + disk.logical_id = tuple(logical_id) + + if self.op.nodes: + self.cfg.Update(self.instance, feedback_fn) _CreateDisks(self, self.instance, to_skip=to_skip) @@ -5727,7 +6085,7 @@ class LUInstanceRename(LogicalUnit): """ if self.op.ip_check and not self.op.name_check: # TODO: make the ip check more flexible and not depend on the name check - raise errors.OpPrereqError("Cannot do ip check without a name check", + raise errors.OpPrereqError("IP address check requires a name check", errors.ECODE_INVAL) def BuildHooksEnv(self): @@ -5764,8 +6122,9 @@ class LUInstanceRename(LogicalUnit): new_name = self.op.new_name if self.op.name_check: hostname = netutils.GetHostname(name=new_name) - self.LogInfo("Resolved given name '%s' to '%s'", new_name, - hostname.name) + if hostname != new_name: + self.LogInfo("Resolved given name '%s' to '%s'", new_name, + hostname.name) if not utils.MatchNameComponent(self.op.new_name, [hostname.name]): raise errors.OpPrereqError(("Resolved hostname '%s' does not look the" " same as given hostname '%s'") % @@ -5800,8 +6159,8 @@ class LUInstanceRename(LogicalUnit): # Change the instance lock. This is definitely safe while we hold the BGL. # Otherwise the new lock would have to be added in acquired mode. assert self.REQ_BGL - self.context.glm.remove(locking.LEVEL_INSTANCE, old_name) - self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name) + self.glm.remove(locking.LEVEL_INSTANCE, old_name) + self.glm.add(locking.LEVEL_INSTANCE, self.op.new_name) # re-read the instance from the configuration after rename inst = self.cfg.GetInstanceInfo(self.op.new_name) @@ -5970,8 +6329,6 @@ class LUInstanceFailover(LogicalUnit): shutdown_timeout = self.op.shutdown_timeout self._migrater = TLMigrateInstance(self, self.op.instance_name, cleanup=False, - iallocator=self.op.iallocator, - target_node=self.op.target_node, failover=True, ignore_consistency=ignore_consistency, shutdown_timeout=shutdown_timeout) @@ -5998,7 +6355,7 @@ class LUInstanceFailover(LogicalUnit): """ instance = self._migrater.instance source_node = instance.primary_node - target_node = self._migrater.target_node + target_node = self.op.target_node env = { "IGNORE_CONSISTENCY": self.op.ignore_consistency, "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout, @@ -6047,8 +6404,6 @@ class LUInstanceMigrate(LogicalUnit): self._migrater = TLMigrateInstance(self, self.op.instance_name, cleanup=self.op.cleanup, - iallocator=self.op.iallocator, - target_node=self.op.target_node, failover=False, fallback=self.op.allow_failover) self.tasklets = [self._migrater] @@ -6074,7 +6429,7 @@ class LUInstanceMigrate(LogicalUnit): """ instance = self._migrater.instance source_node = instance.primary_node - target_node = self._migrater.target_node + target_node = self.op.target_node env = _BuildInstanceHookEnvByObject(self, instance) env.update({ "MIGRATE_LIVE": self._migrater.live, @@ -6310,9 +6665,7 @@ class LUNodeMigrate(LogicalUnit): logging.debug("Migrating instance %s", inst.name) names.append(inst.name) - tasklets.append(TLMigrateInstance(self, inst.name, cleanup=False, - iallocator=self.op.iallocator, - taget_node=None)) + tasklets.append(TLMigrateInstance(self, inst.name, cleanup=False)) if inst.disk_template in constants.DTS_EXT_MIRROR: # We need to lock all nodes, as the iallocator will choose the @@ -6377,8 +6730,8 @@ class TLMigrateInstance(Tasklet): @ivar shutdown_timeout: In case of failover timeout of the shutdown """ - def __init__(self, lu, instance_name, cleanup=False, iallocator=None, - target_node=None, failover=False, fallback=False, + def __init__(self, lu, instance_name, cleanup=False, + failover=False, fallback=False, ignore_consistency=False, shutdown_timeout=constants.DEFAULT_SHUTDOWN_TIMEOUT): """Initializes this class. @@ -6390,8 +6743,6 @@ class TLMigrateInstance(Tasklet): self.instance_name = instance_name self.cleanup = cleanup self.live = False # will be overridden later - self.iallocator = iallocator - self.target_node = target_node self.failover = failover self.fallback = fallback self.ignore_consistency = ignore_consistency @@ -6426,20 +6777,26 @@ class TLMigrateInstance(Tasklet): if instance.disk_template in constants.DTS_EXT_MIRROR: _CheckIAllocatorOrNode(self.lu, "iallocator", "target_node") - if self.iallocator: + if self.lu.op.iallocator: self._RunAllocator() + else: + # We set set self.target_node as it is required by + # BuildHooksEnv + self.target_node = self.lu.op.target_node # self.target_node is already populated, either directly or by the # iallocator run target_node = self.target_node + if self.target_node == instance.primary_node: + raise errors.OpPrereqError("Cannot migrate instance %s" + " to its primary (%s)" % + (instance.name, instance.primary_node)) if len(self.lu.tasklets) == 1: - # It is safe to remove locks only when we're the only tasklet in the LU - nodes_keep = [instance.primary_node, self.target_node] - nodes_rel = [node for node in self.lu.acquired_locks[locking.LEVEL_NODE] - if node not in nodes_keep] - self.lu.context.glm.release(locking.LEVEL_NODE, nodes_rel) - self.lu.acquired_locks[locking.LEVEL_NODE] = nodes_keep + # It is safe to release locks only when we're the only tasklet + # in the LU + _ReleaseLocks(self.lu, locking.LEVEL_NODE, + keep=[instance.primary_node, self.target_node]) else: secondary_nodes = instance.secondary_nodes @@ -6448,17 +6805,17 @@ class TLMigrateInstance(Tasklet): " %s disk template" % instance.disk_template) target_node = secondary_nodes[0] - if self.iallocator or (self.target_node and - self.target_node != target_node): + if self.lu.op.iallocator or (self.lu.op.target_node and + self.lu.op.target_node != target_node): if self.failover: text = "failed over" else: text = "migrated" raise errors.OpPrereqError("Instances with disk template %s cannot" - " be %s over to arbitrary nodes" + " be %s to arbitrary nodes" " (neither an iallocator nor a target" " node can be passed)" % - (text, instance.disk_template), + (instance.disk_template, text), errors.ECODE_INVAL) i_be = self.cfg.GetClusterInfo().FillBE(instance) @@ -6490,6 +6847,30 @@ class TLMigrateInstance(Tasklet): assert not (self.failover and self.cleanup) + if not self.failover: + if self.lu.op.live is not None and self.lu.op.mode is not None: + raise errors.OpPrereqError("Only one of the 'live' and 'mode'" + " parameters are accepted", + errors.ECODE_INVAL) + if self.lu.op.live is not None: + if self.lu.op.live: + self.lu.op.mode = constants.HT_MIGRATION_LIVE + else: + self.lu.op.mode = constants.HT_MIGRATION_NONLIVE + # reset the 'live' parameter to None so that repeated + # invocations of CheckPrereq do not raise an exception + self.lu.op.live = None + elif self.lu.op.mode is None: + # read the default value from the hypervisor + i_hv = self.cfg.GetClusterInfo().FillHV(self.instance, + skip_globals=False) + self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE] + + self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE + else: + # Failover is never live + self.live = False + def _RunAllocator(self): """Run the allocator based on input opcode. @@ -6502,47 +6883,23 @@ class TLMigrateInstance(Tasklet): self.instance.primary_node], ) - ial.Run(self.iallocator) + ial.Run(self.lu.op.iallocator) if not ial.success: raise errors.OpPrereqError("Can't compute nodes using" " iallocator '%s': %s" % - (self.iallocator, ial.info), + (self.lu.op.iallocator, ial.info), errors.ECODE_NORES) if len(ial.result) != ial.required_nodes: raise errors.OpPrereqError("iallocator '%s' returned invalid number" " of nodes (%s), required %s" % - (self.iallocator, len(ial.result), + (self.lu.op.iallocator, len(ial.result), ial.required_nodes), errors.ECODE_FAULT) self.target_node = ial.result[0] self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s", - self.instance_name, self.iallocator, + self.instance_name, self.lu.op.iallocator, utils.CommaJoin(ial.result)) - if not self.failover: - if self.lu.op.live is not None and self.lu.op.mode is not None: - raise errors.OpPrereqError("Only one of the 'live' and 'mode'" - " parameters are accepted", - errors.ECODE_INVAL) - if self.lu.op.live is not None: - if self.lu.op.live: - self.lu.op.mode = constants.HT_MIGRATION_LIVE - else: - self.lu.op.mode = constants.HT_MIGRATION_NONLIVE - # reset the 'live' parameter to None so that repeated - # invocations of CheckPrereq do not raise an exception - self.lu.op.live = None - elif self.lu.op.mode is None: - # read the default value from the hypervisor - i_hv = self.cfg.GetClusterInfo().FillHV(self.instance, - skip_globals=False) - self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE] - - self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE - else: - # Failover is never live - self.live = False - def _WaitUntilSync(self): """Poll with custom rpc for disk sync. @@ -6636,15 +6993,15 @@ class TLMigrateInstance(Tasklet): if runningon_source and runningon_target: raise errors.OpExecError("Instance seems to be running on two nodes," - " or the hypervisor is confused. You will have" + " or the hypervisor is confused; you will have" " to ensure manually that it runs only on one" - " and restart this operation.") + " and restart this operation") if not (runningon_source or runningon_target): - raise errors.OpExecError("Instance does not seem to be running at all." - " In this case, it's safer to repair by" + raise errors.OpExecError("Instance does not seem to be running at all;" + " in this case it's safer to repair by" " running 'gnt-instance stop' to ensure disk" - " shutdown, and then restarting it.") + " shutdown, and then restarting it") if runningon_target: # the migration has actually succeeded, we need to update the config @@ -6686,10 +7043,9 @@ class TLMigrateInstance(Tasklet): self._GoReconnect(False) self._WaitUntilSync() except errors.OpExecError, err: - self.lu.LogWarning("Migration failed and I can't reconnect the" - " drives: error '%s'\n" - "Please look and recover the instance status" % - str(err)) + self.lu.LogWarning("Migration failed and I can't reconnect the drives," + " please try to recover the instance manually;" + " error '%s'" % str(err)) def _AbortMigration(self): """Call the hypervisor code to abort a started migration. @@ -6731,7 +7087,7 @@ class TLMigrateInstance(Tasklet): if not _CheckDiskConsistency(self.lu, dev, target_node, False): raise errors.OpExecError("Disk %s is degraded or not fully" " synchronized on target node," - " aborting migrate." % dev.iv_name) + " aborting migration" % dev.iv_name) # First get the migration information from the remote node result = self.rpc.call_migration_info(source_node, instance) @@ -6825,7 +7181,7 @@ class TLMigrateInstance(Tasklet): if not _CheckDiskConsistency(self, dev, target_node, False): if not self.ignore_consistency: raise errors.OpExecError("Disk %s is degraded on target node," - " aborting failover." % dev.iv_name) + " aborting failover" % dev.iv_name) else: self.feedback_fn("* not checking disk consistency as instance is not" " running") @@ -6839,9 +7195,9 @@ class TLMigrateInstance(Tasklet): msg = result.fail_msg if msg: if self.ignore_consistency or primary_node.offline: - self.lu.LogWarning("Could not shutdown instance %s on node %s." - " Proceeding anyway. Please make sure node" - " %s is down. Error details: %s", + self.lu.LogWarning("Could not shutdown instance %s on node %s," + " proceeding anyway; please make sure node" + " %s is down; error details: %s", instance.name, source_node, source_node, msg) else: raise errors.OpExecError("Could not shutdown instance %s on" @@ -6992,17 +7348,18 @@ def _GenerateUniqueNames(lu, exts): return results -def _GenerateDRBD8Branch(lu, primary, secondary, size, vgname, names, iv_name, - p_minor, s_minor): +def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names, + iv_name, p_minor, s_minor): """Generate a drbd8 device complete with its children. """ + assert len(vgnames) == len(names) == 2 port = lu.cfg.AllocatePort() shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId()) dev_data = objects.Disk(dev_type=constants.LD_LV, size=size, - logical_id=(vgname, names[0])) + logical_id=(vgnames[0], names[0])) dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128, - logical_id=(vgname, names[1])) + logical_id=(vgnames[1], names[1])) drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size, logical_id=(primary, secondary, port, p_minor, s_minor, @@ -7057,9 +7414,11 @@ def _GenerateDiskTemplate(lu, template_name, names.append(lv_prefix + "_meta") for idx, disk in enumerate(disk_info): disk_index = idx + base_index - vg = disk.get(constants.IDISK_VG, vgname) + data_vg = disk.get(constants.IDISK_VG, vgname) + meta_vg = disk.get(constants.IDISK_METAVG, data_vg) disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node, - disk[constants.IDISK_SIZE], vg, + disk[constants.IDISK_SIZE], + [data_vg, meta_vg], names[idx * 2:idx * 2 + 2], "disk/%d" % disk_index, minors[idx * 2], minors[idx * 2 + 1]) @@ -7161,14 +7520,17 @@ def _WipeDisks(lu, instance): try: for idx, device in enumerate(instance.disks): - lu.LogInfo("* Wiping disk %d", idx) - logging.info("Wiping disk %d for instance %s, node %s", - idx, instance.name, node) - # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but # MAX_WIPE_CHUNK at max wipe_chunk_size = min(constants.MAX_WIPE_CHUNK, device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT) + # we _must_ make this an int, otherwise rounding errors will + # occur + wipe_chunk_size = int(wipe_chunk_size) + + lu.LogInfo("* Wiping disk %d", idx) + logging.info("Wiping disk %d for instance %s, node %s using" + " chunk size %s", idx, instance.name, node, wipe_chunk_size) offset = 0 size = device.size @@ -7177,6 +7539,8 @@ def _WipeDisks(lu, instance): while offset < size: wipe_size = min(wipe_chunk_size, size - offset) + logging.debug("Wiping disk %d, offset %s, chunk %s", + idx, offset, wipe_size) result = lu.rpc.call_blockdev_wipe(node, device, offset, wipe_size) result.Raise("Could not wipe disk %d at offset %d for size %d" % (idx, offset, wipe_size)) @@ -7194,8 +7558,8 @@ def _WipeDisks(lu, instance): for idx, success in enumerate(result.payload): if not success: - lu.LogWarning("Warning: Resume sync of disk %d failed. Please have a" - " look at the status and troubleshoot the issue.", idx) + lu.LogWarning("Resume sync of disk %d failed, please have a" + " look at the status and troubleshoot the issue", idx) logging.warn("resume-sync of instance %s for disks %d failed", instance.name, idx) @@ -7444,8 +7808,8 @@ class LUInstanceCreate(LogicalUnit): if self.op.ip_check and not self.op.name_check: # TODO: make the ip check more flexible and not depend on the name check - raise errors.OpPrereqError("Cannot do ip check without a name check", - errors.ECODE_INVAL) + raise errors.OpPrereqError("Cannot do IP address check without a name" + " check", errors.ECODE_INVAL) # check nics' parameter names for nic in self.op.nics: @@ -7623,7 +7987,7 @@ class LUInstanceCreate(LogicalUnit): self.op.src_node = None if os.path.isabs(src_path): raise errors.OpPrereqError("Importing an instance from an absolute" - " path requires a source node option.", + " path requires a source node option", errors.ECODE_INVAL) else: self.op.src_node = src_node = _ExpandNodeName(self.cfg, src_node) @@ -7725,7 +8089,7 @@ class LUInstanceCreate(LogicalUnit): src_path = self.op.src_path if src_node is None: - locked_nodes = self.acquired_locks[locking.LEVEL_NODE] + locked_nodes = self.glm.list_owned(locking.LEVEL_NODE) exp_list = self.rpc.call_export_list(locked_nodes) found = False for node in exp_list: @@ -7975,10 +8339,13 @@ class LUInstanceCreate(LogicalUnit): except (TypeError, ValueError): raise errors.OpPrereqError("Invalid disk size '%s'" % size, errors.ECODE_INVAL) + + data_vg = disk.get(constants.IDISK_VG, default_vg) new_disk = { constants.IDISK_SIZE: size, constants.IDISK_MODE: mode, - constants.IDISK_VG: disk.get(constants.IDISK_VG, default_vg), + constants.IDISK_VG: data_vg, + constants.IDISK_METAVG: disk.get(constants.IDISK_METAVG, data_vg), } if constants.IDISK_ADOPT in disk: new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT] @@ -8069,7 +8436,7 @@ class LUInstanceCreate(LogicalUnit): if self.op.disk_template in constants.DTS_INT_MIRROR: if self.op.snode == pnode.name: raise errors.OpPrereqError("The secondary node cannot be the" - " primary node.", errors.ECODE_INVAL) + " primary node", errors.ECODE_INVAL) _CheckNodeOnline(self, self.op.snode) _CheckNodeNotDrained(self, self.op.snode) _CheckNodeVmCapable(self, self.op.snode) @@ -8247,18 +8614,6 @@ class LUInstanceCreate(LogicalUnit): self.cfg.ReleaseDRBDMinors(instance) raise - if self.cfg.GetClusterInfo().prealloc_wipe_disks: - feedback_fn("* wiping instance disks...") - try: - _WipeDisks(self, iobj) - except errors.OpExecError: - self.LogWarning("Device wiping failed, reverting...") - try: - _RemoveDisks(self, iobj) - finally: - self.cfg.ReleaseDRBDMinors(instance) - raise - feedback_fn("adding instance %s to cluster config" % instance) self.cfg.AddInstance(iobj, self.proc.GetECId()) @@ -8266,18 +8621,28 @@ class LUInstanceCreate(LogicalUnit): # Declare that we don't want to remove the instance lock anymore, as we've # added the instance to the config del self.remove_locks[locking.LEVEL_INSTANCE] - # Unlock all the nodes + if self.op.mode == constants.INSTANCE_IMPORT: - nodes_keep = [self.op.src_node] - nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE] - if node != self.op.src_node] - self.context.glm.release(locking.LEVEL_NODE, nodes_release) - self.acquired_locks[locking.LEVEL_NODE] = nodes_keep + # Release unused nodes + _ReleaseLocks(self, locking.LEVEL_NODE, keep=[self.op.src_node]) else: - self.context.glm.release(locking.LEVEL_NODE) - del self.acquired_locks[locking.LEVEL_NODE] + # Release all nodes + _ReleaseLocks(self, locking.LEVEL_NODE) - if self.op.wait_for_sync: + disk_abort = False + if not self.adopt_disks and self.cfg.GetClusterInfo().prealloc_wipe_disks: + feedback_fn("* wiping instance disks...") + try: + _WipeDisks(self, iobj) + except errors.OpExecError, err: + logging.exception("Wiping disks failed") + self.LogWarning("Wiping instance disks failed (%s)", err) + disk_abort = True + + if disk_abort: + # Something is already wrong with the disks, don't do anything else + pass + elif self.op.wait_for_sync: disk_abort = not _WaitForSync(self, iobj) elif iobj.disk_template in constants.DTS_INT_MIRROR: # make sure the disks are not degraded (still sync-ing is ok) @@ -8461,24 +8826,29 @@ class LUInstanceReplaceDisks(LogicalUnit): def ExpandNames(self): self._ExpandAndLockInstance() - if self.op.iallocator is not None: - self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + assert locking.LEVEL_NODE not in self.needed_locks + assert locking.LEVEL_NODEGROUP not in self.needed_locks - elif self.op.remote_node is not None: - remote_node = _ExpandNodeName(self.cfg, self.op.remote_node) - self.op.remote_node = remote_node + assert self.op.iallocator is None or self.op.remote_node is None, \ + "Conflicting options" + + if self.op.remote_node is not None: + self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node) # Warning: do not remove the locking of the new secondary here # unless DRBD8.AddChildren is changed to work in parallel; # currently it doesn't since parallel invocations of # FindUnusedMinor will conflict - self.needed_locks[locking.LEVEL_NODE] = [remote_node] + self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND - else: self.needed_locks[locking.LEVEL_NODE] = [] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + if self.op.iallocator is not None: + # iallocator will select a new node in the same group + self.needed_locks[locking.LEVEL_NODEGROUP] = [] + self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode, self.op.iallocator, self.op.remote_node, self.op.disks, False, self.op.early_release) @@ -8486,11 +8856,26 @@ class LUInstanceReplaceDisks(LogicalUnit): self.tasklets = [self.replacer] def DeclareLocks(self, level): - # If we're not already locking all nodes in the set we have to declare the - # instance's primary/secondary nodes. - if (level == locking.LEVEL_NODE and - self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET): - self._LockInstancesNodes() + if level == locking.LEVEL_NODEGROUP: + assert self.op.remote_node is None + assert self.op.iallocator is not None + assert not self.needed_locks[locking.LEVEL_NODEGROUP] + + self.share_locks[locking.LEVEL_NODEGROUP] = 1 + self.needed_locks[locking.LEVEL_NODEGROUP] = \ + self.cfg.GetInstanceNodeGroups(self.op.instance_name) + + elif level == locking.LEVEL_NODE: + if self.op.iallocator is not None: + assert self.op.remote_node is None + assert not self.needed_locks[locking.LEVEL_NODE] + + # Lock member nodes of all locked groups + self.needed_locks[locking.LEVEL_NODE] = [node_name + for group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP) + for node_name in self.cfg.GetNodeGroup(group_uuid).members] + else: + self._LockInstancesNodes() def BuildHooksEnv(self): """Build hooks env. @@ -8520,6 +8905,26 @@ class LUInstanceReplaceDisks(LogicalUnit): nl.append(self.op.remote_node) return nl, nl + def CheckPrereq(self): + """Check prerequisites. + + """ + assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or + self.op.iallocator is None) + + owned_groups = self.glm.list_owned(locking.LEVEL_NODEGROUP) + if owned_groups: + groups = self.cfg.GetInstanceNodeGroups(self.op.instance_name) + if owned_groups != groups: + raise errors.OpExecError("Node groups used by instance '%s' changed" + " since lock was acquired, current list is %r," + " used to be '%s'" % + (self.op.instance_name, + utils.CommaJoin(groups), + utils.CommaJoin(owned_groups))) + + return LogicalUnit.CheckPrereq(self) + class TLReplaceDisks(Tasklet): """Replaces disks for an instance. @@ -8631,7 +9036,6 @@ class TLReplaceDisks(Tasklet): return True - def CheckPrereq(self): """Check prerequisites. @@ -8673,20 +9077,23 @@ class TLReplaceDisks(Tasklet): remote_node = self._RunAllocator(self.lu, self.iallocator_name, instance.name, instance.secondary_nodes) - if remote_node is not None: + if remote_node is None: + self.remote_node_info = None + else: + assert remote_node in self.lu.glm.list_owned(locking.LEVEL_NODE), \ + "Remote node '%s' is not locked" % remote_node + self.remote_node_info = self.cfg.GetNodeInfo(remote_node) assert self.remote_node_info is not None, \ "Cannot retrieve locked node %s" % remote_node - else: - self.remote_node_info = None if remote_node == self.instance.primary_node: raise errors.OpPrereqError("The specified node is the primary node of" - " the instance.", errors.ECODE_INVAL) + " the instance", errors.ECODE_INVAL) if remote_node == secondary_node: raise errors.OpPrereqError("The specified node is already the" - " secondary node of the instance.", + " secondary node of the instance", errors.ECODE_INVAL) if self.disks and self.mode in (constants.REPLACE_DISK_AUTO, @@ -8762,18 +9169,26 @@ class TLReplaceDisks(Tasklet): for node in check_nodes: _CheckNodeOnline(self.lu, node) + touched_nodes = frozenset(node_name for node_name in [self.new_node, + self.other_node, + self.target_node] + if node_name is not None) + + # Release unneeded node locks + _ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes) + + # Release any owned node group + if self.lu.glm.is_owned(locking.LEVEL_NODEGROUP): + _ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP) + # Check whether disks are valid for disk_idx in self.disks: instance.FindDisk(disk_idx) # Get secondary node IP addresses - node_2nd_ip = {} - - for node_name in [self.target_node, self.other_node, self.new_node]: - if node_name is not None: - node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip - - self.node_secondary_ip = node_2nd_ip + self.node_secondary_ip = \ + dict((node_name, self.cfg.GetNodeInfo(node_name).secondary_ip) + for node_name in touched_nodes) def Exec(self, feedback_fn): """Execute disk replacement. @@ -8784,6 +9199,20 @@ class TLReplaceDisks(Tasklet): if self.delay_iallocator: self._CheckPrereq2() + if __debug__: + # Verify owned locks before starting operation + owned_locks = self.lu.glm.list_owned(locking.LEVEL_NODE) + assert set(owned_locks) == set(self.node_secondary_ip), \ + ("Incorrect node locks, owning %s, expected %s" % + (owned_locks, self.node_secondary_ip.keys())) + + owned_locks = self.lu.glm.list_owned(locking.LEVEL_INSTANCE) + assert list(owned_locks) == [self.instance_name], \ + "Instance '%s' not locked" % self.instance_name + + assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \ + "Should not own any node group lock at this point" + if not self.disks: feedback_fn("No disks need replacement") return @@ -8804,14 +9233,24 @@ class TLReplaceDisks(Tasklet): else: fn = self._ExecDrbd8DiskOnly - return fn(feedback_fn) - + result = fn(feedback_fn) finally: # Deactivate the instance disks if we're replacing them on a # down instance if activate_disks: _SafeShutdownInstanceDisks(self.lu, self.instance) + if __debug__: + # Verify owned locks + owned_locks = self.lu.glm.list_owned(locking.LEVEL_NODE) + nodes = frozenset(self.node_secondary_ip) + assert ((self.early_release and not owned_locks) or + (not self.early_release and not (set(owned_locks) - nodes))), \ + ("Not owning the correct locks, early_release=%s, owned=%r," + " nodes=%r" % (self.early_release, owned_locks, nodes)) + + return result + def _CheckVolumeGroup(self, nodes): self.lu.LogInfo("Checking volume groups") @@ -8863,7 +9302,6 @@ class TLReplaceDisks(Tasklet): (node_name, self.instance.name)) def _CreateNewStorage(self, node_name): - vgname = self.cfg.GetVGName() iv_names = {} for idx, dev in enumerate(self.instance.disks): @@ -8877,10 +9315,12 @@ class TLReplaceDisks(Tasklet): lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]] names = _GenerateUniqueNames(self.lu, lv_names) + vg_data = dev.children[0].logical_id[0] lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size, - logical_id=(vgname, names[0])) + logical_id=(vg_data, names[0])) + vg_meta = dev.children[1].logical_id[0] lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128, - logical_id=(vgname, names[1])) + logical_id=(vg_meta, names[1])) new_lvs = [lv_data, lv_meta] old_lvs = dev.children @@ -8921,10 +9361,6 @@ class TLReplaceDisks(Tasklet): self.lu.LogWarning("Can't remove old LV: %s" % msg, hint="remove unused LVs manually") - def _ReleaseNodeLock(self, node_name): - """Releases the lock for a given node.""" - self.lu.context.glm.release(locking.LEVEL_NODE, node_name) - def _ExecDrbd8DiskOnly(self, feedback_fn): """Replace a disk on the primary or secondary for DRBD 8. @@ -9042,7 +9478,8 @@ class TLReplaceDisks(Tasklet): self._RemoveOldStorage(self.target_node, iv_names) # WARNING: we release both node locks here, do not do other RPCs # than WaitForSync to the primary node - self._ReleaseNodeLock([self.target_node, self.other_node]) + _ReleaseLocks(self.lu, locking.LEVEL_NODE, + names=[self.target_node, self.other_node]) # Wait for sync # This can fail as the old devices are degraded and _WaitForSync @@ -9199,9 +9636,10 @@ class TLReplaceDisks(Tasklet): self._RemoveOldStorage(self.target_node, iv_names) # WARNING: we release all node locks here, do not do other RPCs # than WaitForSync to the primary node - self._ReleaseNodeLock([self.instance.primary_node, - self.target_node, - self.new_node]) + _ReleaseLocks(self.lu, locking.LEVEL_NODE, + names=[self.instance.primary_node, + self.target_node, + self.new_node]) # Wait for sync # This can fail as the old devices are degraded and _WaitForSync @@ -9379,7 +9817,7 @@ class LUInstanceGrowDisk(LogicalUnit): if instance.disk_template not in constants.DTS_GROWABLE: raise errors.OpPrereqError("Instance's disk layout does not support" - " growing.", errors.ECODE_INVAL) + " growing", errors.ECODE_INVAL) self.disk = instance.FindDisk(self.op.disk) @@ -9401,9 +9839,17 @@ class LUInstanceGrowDisk(LogicalUnit): if not disks_ok: raise errors.OpExecError("Cannot activate block device to grow") + # First run all grow ops in dry-run mode for node in instance.all_nodes: self.cfg.SetDiskID(disk, node) - result = self.rpc.call_blockdev_grow(node, disk, self.op.amount) + result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, True) + result.Raise("Grow request failed to node %s" % node) + + # We know that (as far as we can test) operations across different + # nodes will succeed, time to run it for real + for node in instance.all_nodes: + self.cfg.SetDiskID(disk, node) + result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, False) result.Raise("Grow request failed to node %s" % node) # TODO: Rewrite code to work properly @@ -9418,14 +9864,14 @@ class LUInstanceGrowDisk(LogicalUnit): if self.op.wait_for_sync: disk_abort = not _WaitForSync(self, instance, disks=[disk]) if disk_abort: - self.proc.LogWarning("Warning: disk sync-ing has not returned a good" - " status.\nPlease check the instance.") + self.proc.LogWarning("Disk sync-ing has not returned a good" + " status; please check the instance") if not instance.admin_up: _SafeShutdownInstanceDisks(self, instance, disks=[disk]) elif not instance.admin_up: self.proc.LogWarning("Not shutting down the disk even if the instance is" " not supposed to be running because no wait for" - " sync mode was requested.") + " sync mode was requested") class LUInstanceQueryData(NoHooksLU): @@ -9473,7 +9919,7 @@ class LUInstanceQueryData(NoHooksLU): """ if self.wanted_names is None: assert self.op.use_locking, "Locking was not used" - self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE] + self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE) self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name in self.wanted_names] @@ -9859,6 +10305,7 @@ class LUInstanceSetParams(LogicalUnit): self.be_inst = i_bedict # the new dict (without defaults) else: self.be_new = self.be_inst = {} + be_old = cluster.FillBE(instance) # osparams processing if self.op.osparams: @@ -9870,7 +10317,8 @@ class LUInstanceSetParams(LogicalUnit): self.warn = [] - if constants.BE_MEMORY in self.op.beparams and not self.op.force: + if (constants.BE_MEMORY in self.op.beparams and not self.op.force and + be_new[constants.BE_MEMORY] > be_old[constants.BE_MEMORY]): mem_check_list = [pnode] if be_new[constants.BE_AUTO_BALANCE]: # either we changed auto_balance to yes or it was from before @@ -9911,16 +10359,17 @@ class LUInstanceSetParams(LogicalUnit): for node, nres in nodeinfo.items(): if node not in instance.secondary_nodes: continue - msg = nres.fail_msg - if msg: - self.warn.append("Can't get info from secondary node %s: %s" % - (node, msg)) - elif not isinstance(nres.payload.get('memory_free', None), int): - self.warn.append("Secondary node %s didn't return free" - " memory information" % node) + nres.Raise("Can't get info from secondary node %s" % node, + prereq=True, ecode=errors.ECODE_STATE) + if not isinstance(nres.payload.get('memory_free', None), int): + raise errors.OpPrereqError("Secondary node %s didn't return free" + " memory information" % node, + errors.ECODE_STATE) elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']: - self.warn.append("Not enough memory to failover instance to" - " secondary node %s" % node) + raise errors.OpPrereqError("This change will prevent the instance" + " from failover to its secondary node" + " %s, due to not enough memory" % node, + errors.ECODE_STATE) # NIC processing self.nic_pnew = {} @@ -10036,7 +10485,8 @@ class LUInstanceSetParams(LogicalUnit): snode = self.op.remote_node # create a fake disk info for _GenerateDiskTemplate - disk_info = [{constants.IDISK_SIZE: d.size, constants.IDISK_MODE: d.mode} + disk_info = [{constants.IDISK_SIZE: d.size, constants.IDISK_MODE: d.mode, + constants.IDISK_VG: d.logical_id[0]} for d in instance.disks] new_disks = _GenerateDiskTemplate(self, self.op.disk_template, instance.name, pnode, [snode], @@ -10071,7 +10521,8 @@ class LUInstanceSetParams(LogicalUnit): self.cfg.Update(instance, feedback_fn) # disks are created, waiting for sync - disk_abort = not _WaitForSync(self, instance) + disk_abort = not _WaitForSync(self, instance, + oneshot=not self.op.wait_for_sync) if disk_abort: raise errors.OpExecError("There are some degraded disks for" " this instance, please cleanup manually") @@ -10280,7 +10731,7 @@ class LUBackupQuery(NoHooksLU): that node. """ - self.nodes = self.acquired_locks[locking.LEVEL_NODE] + self.nodes = self.glm.list_owned(locking.LEVEL_NODE) rpcresult = self.rpc.call_export_list(self.nodes) result = {} for node in rpcresult: @@ -10662,7 +11113,7 @@ class LUBackupRemove(NoHooksLU): fqdn_warn = True instance_name = self.op.instance_name - locked_nodes = self.acquired_locks[locking.LEVEL_NODE] + locked_nodes = self.glm.list_owned(locking.LEVEL_NODE) exportlist = self.rpc.call_export_list(locked_nodes) found = False for node in exportlist: @@ -10761,20 +11212,40 @@ class LUGroupAssignNodes(NoHooksLU): # We want to lock all the affected nodes and groups. We have readily # available the list of nodes, and the *destination* group. To gather the - # list of "source" groups, we need to fetch node information. - self.node_data = self.cfg.GetAllNodesInfo() - affected_groups = set(self.node_data[node].group for node in self.op.nodes) - affected_groups.add(self.group_uuid) - + # list of "source" groups, we need to fetch node information later on. self.needed_locks = { - locking.LEVEL_NODEGROUP: list(affected_groups), + locking.LEVEL_NODEGROUP: set([self.group_uuid]), locking.LEVEL_NODE: self.op.nodes, } + def DeclareLocks(self, level): + if level == locking.LEVEL_NODEGROUP: + assert len(self.needed_locks[locking.LEVEL_NODEGROUP]) == 1 + + # Try to get all affected nodes' groups without having the group or node + # lock yet. Needs verification later in the code flow. + groups = self.cfg.GetNodeGroupsFromNodes(self.op.nodes) + + self.needed_locks[locking.LEVEL_NODEGROUP].update(groups) + def CheckPrereq(self): """Check prerequisites. """ + assert self.needed_locks[locking.LEVEL_NODEGROUP] + assert (frozenset(self.glm.list_owned(locking.LEVEL_NODE)) == + frozenset(self.op.nodes)) + + expected_locks = (set([self.group_uuid]) | + self.cfg.GetNodeGroupsFromNodes(self.op.nodes)) + actual_locks = self.glm.list_owned(locking.LEVEL_NODEGROUP) + if actual_locks != expected_locks: + raise errors.OpExecError("Nodes changed groups since locks were acquired," + " current groups are '%s', used to be '%s'" % + (utils.CommaJoin(expected_locks), + utils.CommaJoin(actual_locks))) + + self.node_data = self.cfg.GetAllNodesInfo() self.group = self.cfg.GetNodeGroup(self.group_uuid) instance_data = self.cfg.GetAllInstancesInfo() @@ -10800,7 +11271,7 @@ class LUGroupAssignNodes(NoHooksLU): if previous_splits: self.LogWarning("In addition, these already-split instances continue" - " to be spit across groups: %s", + " to be split across groups: %s", utils.CommaJoin(utils.NiceSort(previous_splits))) def Exec(self, feedback_fn): @@ -10810,6 +11281,9 @@ class LUGroupAssignNodes(NoHooksLU): for node in self.op.nodes: self.node_data[node].group = self.group_uuid + # FIXME: Depends on side-effects of modifying the result of + # C{cfg.GetAllNodesInfo} + self.cfg.Update(self.group, feedback_fn) # Saves all modified nodes. @staticmethod @@ -10889,7 +11363,8 @@ class _GroupQuery(_QueryBase): missing.append(name) if missing: - raise errors.OpPrereqError("Some groups do not exist: %s" % missing, + raise errors.OpPrereqError("Some groups do not exist: %s" % + utils.CommaJoin(missing), errors.ECODE_NOENT) def DeclareLocks(self, lu, level): @@ -11525,16 +12000,6 @@ class IAllocator(object): """ # pylint: disable-msg=R0902 # lots of instance attributes - _ALLO_KEYS = [ - "name", "mem_size", "disks", "disk_template", - "os", "tags", "nics", "vcpus", "hypervisor", - ] - _RELO_KEYS = [ - "name", "relocate_from", - ] - _EVAC_KEYS = [ - "evac_nodes", - ] def __init__(self, cfg, rpc, mode, **kwargs): self.cfg = cfg @@ -11549,22 +12014,20 @@ class IAllocator(object): self.relocate_from = None self.name = None self.evac_nodes = None + self.instances = None + self.reloc_mode = None + self.target_groups = None # computed fields self.required_nodes = None # init result fields self.success = self.info = self.result = None - if self.mode == constants.IALLOCATOR_MODE_ALLOC: - keyset = self._ALLO_KEYS - fn = self._AddNewInstance - elif self.mode == constants.IALLOCATOR_MODE_RELOC: - keyset = self._RELO_KEYS - fn = self._AddRelocateInstance - elif self.mode == constants.IALLOCATOR_MODE_MEVAC: - keyset = self._EVAC_KEYS - fn = self._AddEvacuateNodes - else: + + try: + (fn, keyset, self._result_check) = self._MODE_DATA[self.mode] + except KeyError: raise errors.ProgrammerError("Unknown mode '%s' passed to the" " IAllocator" % self.mode) + for key in kwargs: if key not in keyset: raise errors.ProgrammerError("Invalid input parameter '%s' to" @@ -11575,7 +12038,7 @@ class IAllocator(object): if key not in kwargs: raise errors.ProgrammerError("Missing input parameter '%s' to" " IAllocator" % key) - self._BuildInputData(fn) + self._BuildInputData(compat.partial(fn, self)) def _ComputeClusterData(self): """Compute the generic allocator input data. @@ -11604,7 +12067,8 @@ class IAllocator(object): hypervisor_name = self.hypervisor elif self.mode == constants.IALLOCATOR_MODE_RELOC: hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor - elif self.mode == constants.IALLOCATOR_MODE_MEVAC: + elif self.mode in (constants.IALLOCATOR_MODE_MEVAC, + constants.IALLOCATOR_MODE_MRELOC): hypervisor_name = cluster_info.enabled_hypervisors[0] node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(), @@ -11630,12 +12094,12 @@ class IAllocator(object): """Compute node groups data. """ - ng = {} - for guuid, gdata in cfg.GetAllNodeGroupsInfo().items(): - ng[guuid] = { - "name": gdata.name, - "alloc_policy": gdata.alloc_policy, - } + ng = dict((guuid, { + "name": gdata.name, + "alloc_policy": gdata.alloc_policy, + }) + for guuid, gdata in cfg.GetAllNodeGroupsInfo().items()) + return ng @staticmethod @@ -11646,22 +12110,19 @@ class IAllocator(object): @returns: a dict of name: (node dict, node config) """ - node_results = {} - for ninfo in node_cfg.values(): - # fill in static (config-based) values - pnr = { - "tags": list(ninfo.GetTags()), - "primary_ip": ninfo.primary_ip, - "secondary_ip": ninfo.secondary_ip, - "offline": ninfo.offline, - "drained": ninfo.drained, - "master_candidate": ninfo.master_candidate, - "group": ninfo.group, - "master_capable": ninfo.master_capable, - "vm_capable": ninfo.vm_capable, - } - - node_results[ninfo.name] = pnr + # fill in static (config-based) values + node_results = dict((ninfo.name, { + "tags": list(ninfo.GetTags()), + "primary_ip": ninfo.primary_ip, + "secondary_ip": ninfo.secondary_ip, + "offline": ninfo.offline, + "drained": ninfo.drained, + "master_candidate": ninfo.master_candidate, + "group": ninfo.group, + "master_capable": ninfo.master_capable, + "vm_capable": ninfo.vm_capable, + }) + for ninfo in node_cfg.values()) return node_results @@ -11735,11 +12196,12 @@ class IAllocator(object): nic_data = [] for nic in iinfo.nics: filled_params = cluster_info.SimpleFillNIC(nic.nicparams) - nic_dict = {"mac": nic.mac, - "ip": nic.ip, - "mode": filled_params[constants.NIC_MODE], - "link": filled_params[constants.NIC_LINK], - } + nic_dict = { + "mac": nic.mac, + "ip": nic.ip, + "mode": filled_params[constants.NIC_MODE], + "link": filled_params[constants.NIC_LINK], + } if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED: nic_dict["bridge"] = filled_params[constants.NIC_LINK] nic_data.append(nic_dict) @@ -11779,6 +12241,7 @@ class IAllocator(object): self.required_nodes = 2 else: self.required_nodes = 1 + request = { "name": self.name, "disk_template": self.disk_template, @@ -11791,6 +12254,7 @@ class IAllocator(object): "nics": self.nics, "required_nodes": self.required_nodes, } + return request def _AddRelocateInstance(self): @@ -11838,6 +12302,16 @@ class IAllocator(object): } return request + def _AddMultiRelocate(self): + """Get data for multi-relocate requests. + + """ + return { + "instances": self.instances, + "reloc_mode": self.reloc_mode, + "target_groups": self.target_groups, + } + def _BuildInputData(self, fn): """Build input data structures. @@ -11850,6 +12324,28 @@ class IAllocator(object): self.in_text = serializer.Dump(self.in_data) + _MODE_DATA = { + constants.IALLOCATOR_MODE_ALLOC: + (_AddNewInstance, + ["name", "mem_size", "disks", "disk_template", "os", "tags", "nics", + "vcpus", "hypervisor"], ht.TList), + constants.IALLOCATOR_MODE_RELOC: + (_AddRelocateInstance, ["name", "relocate_from"], ht.TList), + constants.IALLOCATOR_MODE_MEVAC: + (_AddEvacuateNodes, ["evac_nodes"], + ht.TListOf(ht.TAnd(ht.TIsLength(2), + ht.TListOf(ht.TString)))), + constants.IALLOCATOR_MODE_MRELOC: + (_AddMultiRelocate, ["instances", "reloc_mode", "target_groups"], + ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, { + # pylint: disable-msg=E1101 + # Class '...' has no 'OP_ID' member + "OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID, + opcodes.OpInstanceMigrate.OP_ID, + opcodes.OpInstanceReplaceDisks.OP_ID]) + })))), + } + def Run(self, name, validate=True, call_fn=None): """Run an instance allocator and return the results. @@ -11890,28 +12386,45 @@ class IAllocator(object): " missing key '%s'" % key) setattr(self, key, rdict[key]) - if not isinstance(rdict["result"], list): - raise errors.OpExecError("Can't parse iallocator results: 'result' key" - " is not a list") - - if self.mode == constants.IALLOCATOR_MODE_RELOC: - assert self.relocate_from is not None - assert self.required_nodes == 1 + if not self._result_check(self.result): + raise errors.OpExecError("Iallocator returned invalid result," + " expected %s, got %s" % + (self._result_check, self.result), + errors.ECODE_INVAL) + if self.mode in (constants.IALLOCATOR_MODE_RELOC, + constants.IALLOCATOR_MODE_MEVAC): node2group = dict((name, ndata["group"]) for (name, ndata) in self.in_data["nodes"].items()) fn = compat.partial(self._NodesToGroups, node2group, self.in_data["nodegroups"]) - request_groups = fn(self.relocate_from) - result_groups = fn(rdict["result"]) - - if result_groups != request_groups: - raise errors.OpExecError("Groups of nodes returned by iallocator (%s)" - " differ from original groups (%s)" % - (utils.CommaJoin(result_groups), - utils.CommaJoin(request_groups))) + if self.mode == constants.IALLOCATOR_MODE_RELOC: + assert self.relocate_from is not None + assert self.required_nodes == 1 + + request_groups = fn(self.relocate_from) + result_groups = fn(rdict["result"]) + + if result_groups != request_groups: + raise errors.OpExecError("Groups of nodes returned by iallocator (%s)" + " differ from original groups (%s)" % + (utils.CommaJoin(result_groups), + utils.CommaJoin(request_groups))) + elif self.mode == constants.IALLOCATOR_MODE_MEVAC: + request_groups = fn(self.evac_nodes) + for (instance_name, secnode) in self.result: + result_groups = fn([secnode]) + if result_groups != request_groups: + raise errors.OpExecError("Iallocator returned new secondary node" + " '%s' (group '%s') for instance '%s'" + " which is not in original group '%s'" % + (secnode, utils.CommaJoin(result_groups), + instance_name, + utils.CommaJoin(request_groups))) + else: + raise errors.ProgrammerError("Unhandled mode '%s'" % self.mode) self.out_data = rdict @@ -11995,6 +12508,12 @@ class LUTestAllocator(NoHooksLU): if not hasattr(self.op, "evac_nodes"): raise errors.OpPrereqError("Missing attribute 'evac_nodes' on" " opcode input", errors.ECODE_INVAL) + elif self.op.mode == constants.IALLOCATOR_MODE_MRELOC: + if self.op.instances: + self.op.instances = _GetWantedInstances(self, self.op.instances) + else: + raise errors.OpPrereqError("Missing instances to relocate", + errors.ECODE_INVAL) else: raise errors.OpPrereqError("Invalid test allocator mode '%s'" % self.op.mode, errors.ECODE_INVAL) @@ -12034,6 +12553,12 @@ class LUTestAllocator(NoHooksLU): ial = IAllocator(self.cfg, self.rpc, mode=self.op.mode, evac_nodes=self.op.evac_nodes) + elif self.op.mode == constants.IALLOCATOR_MODE_MRELOC: + ial = IAllocator(self.cfg, self.rpc, + mode=self.op.mode, + instances=self.op.instances, + reloc_mode=self.op.reloc_mode, + target_groups=self.op.target_groups) else: raise errors.ProgrammerError("Uncatched mode %s in" " LUTestAllocator.Exec", self.op.mode)