X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/e3303a4ebcdec1fb257a41b6fbf7031cab810553..3f1e065d5095b2c0cda036a130575458c8f270af:/lib/cmdlib.py diff --git a/lib/cmdlib.py b/lib/cmdlib.py index 2e43215..98a056c 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -57,6 +57,7 @@ from ganeti import netutils from ganeti import query from ganeti import qlang from ganeti import opcodes +from ganeti import ht import ganeti.masterd.instance # pylint: disable-msg=W0611 @@ -129,11 +130,11 @@ class LogicalUnit(object): self.proc = processor self.op = op self.cfg = context.cfg + self.glm = context.glm self.context = context self.rpc = rpc # Dicts used to declare locking needs to mcpu self.needed_locks = None - self.acquired_locks = {} self.share_locks = dict.fromkeys(locking.LEVELS, 0) self.add_locks = {} self.remove_locks = {} @@ -385,7 +386,7 @@ class LogicalUnit(object): # future we might want to have different behaviors depending on the value # of self.recalculate_locks[locking.LEVEL_NODE] wanted_nodes = [] - for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]: + for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE): instance = self.context.cfg.GetInstanceInfo(instance_name) wanted_nodes.append(instance.primary_node) if not primary_only: @@ -499,7 +500,7 @@ class _QueryBase: """ if self.do_locking: - names = lu.acquired_locks[lock_level] + names = lu.glm.list_owned(lock_level) else: names = all_names @@ -510,7 +511,7 @@ class _QueryBase: # caller specified names and we must keep the same order assert self.names - assert not self.do_locking or lu.acquired_locks[lock_level] + assert not self.do_locking or lu.glm.is_owned(lock_level) missing = set(self.wanted).difference(names) if missing: @@ -656,25 +657,23 @@ def _ReleaseLocks(lu, level, names=None, keep=None): release = [] # Determine which locks to release - for name in lu.acquired_locks[level]: + for name in lu.glm.list_owned(level): if should_release(name): release.append(name) else: retain.append(name) - assert len(lu.acquired_locks[level]) == (len(retain) + len(release)) + assert len(lu.glm.list_owned(level)) == (len(retain) + len(release)) # Release just some locks - lu.context.glm.release(level, names=release) - lu.acquired_locks[level] = retain + lu.glm.release(level, names=release) - assert frozenset(lu.context.glm.list_owned(level)) == frozenset(retain) + assert frozenset(lu.glm.list_owned(level)) == frozenset(retain) else: # Release everything - lu.context.glm.release(level) - del lu.acquired_locks[level] + lu.glm.release(level) - assert not lu.context.glm.list_owned(level), "No locks should be owned" + assert not lu.glm.is_owned(level), "No locks should be owned" def _RunPostHook(lu, node_name): @@ -1155,7 +1154,7 @@ def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot): iallocator = getattr(lu.op, iallocator_slot, None) if node is not None and iallocator is not None: - raise errors.OpPrereqError("Do not specify both, iallocator and node.", + raise errors.OpPrereqError("Do not specify both, iallocator and node", errors.ECODE_INVAL) elif node is None and iallocator is None: default_iallocator = lu.cfg.GetDefaultIAllocator() @@ -1163,10 +1162,10 @@ def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot): setattr(lu.op, iallocator_slot, default_iallocator) else: raise errors.OpPrereqError("No iallocator or node given and no" - " cluster-wide default iallocator found." - " Please specify either an iallocator or a" + " cluster-wide default iallocator found;" + " please specify either an iallocator or a" " node, or set a cluster-wide default" - " iallocator.") + " iallocator") class LUClusterPostInit(LogicalUnit): @@ -1255,7 +1254,7 @@ class LUClusterDestroy(LogicalUnit): def _VerifyCertificate(filename): - """Verifies a certificate for LUClusterVerify. + """Verifies a certificate for LUClusterVerifyConfig. @type filename: string @param filename: Path to PEM file @@ -1265,7 +1264,7 @@ def _VerifyCertificate(filename): cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, utils.ReadFile(filename)) except Exception, err: # pylint: disable-msg=W0703 - return (LUClusterVerify.ETYPE_ERROR, + return (LUClusterVerifyConfig.ETYPE_ERROR, "Failed to load X509 certificate %s: %s" % (filename, err)) (errcode, msg) = \ @@ -1280,21 +1279,52 @@ def _VerifyCertificate(filename): if errcode is None: return (None, fnamemsg) elif errcode == utils.CERT_WARNING: - return (LUClusterVerify.ETYPE_WARNING, fnamemsg) + return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg) elif errcode == utils.CERT_ERROR: - return (LUClusterVerify.ETYPE_ERROR, fnamemsg) + return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg) raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode) -class LUClusterVerify(LogicalUnit): - """Verifies the cluster status. +def _GetAllHypervisorParameters(cluster, instances): + """Compute the set of all hypervisor parameters. + + @type cluster: L{objects.Cluster} + @param cluster: the cluster object + @param instances: list of L{objects.Instance} + @param instances: additional instances from which to obtain parameters + @rtype: list of (origin, hypervisor, parameters) + @return: a list with all parameters found, indicating the hypervisor they + apply to, and the origin (can be "cluster", "os X", or "instance Y") """ - HPATH = "cluster-verify" - HTYPE = constants.HTYPE_CLUSTER - REQ_BGL = False + hvp_data = [] + + for hv_name in cluster.enabled_hypervisors: + hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name))) + for os_name, os_hvp in cluster.os_hvp.items(): + for hv_name, hv_params in os_hvp.items(): + if hv_params: + full_params = cluster.GetHVDefaults(hv_name, os_name=os_name) + hvp_data.append(("os %s" % os_name, hv_name, full_params)) + + # TODO: collapse identical parameter values in a single one + for instance in instances: + if instance.hvparams: + hvp_data.append(("instance %s" % instance.name, instance.hypervisor, + cluster.FillHV(instance))) + + return hvp_data + + +class _VerifyErrors(object): + """Mix-in for cluster/group verify LUs. + + It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects + self.op and self._feedback_fn to be available.) + + """ TCLUSTER = "cluster" TNODE = "node" TINSTANCE = "instance" @@ -1302,6 +1332,8 @@ class LUClusterVerify(LogicalUnit): ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG") ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT") ECLUSTERFILECHECK = (TCLUSTER, "ECLUSTERFILECHECK") + ECLUSTERDANGLINGNODES = (TNODE, "ECLUSTERDANGLINGNODES") + ECLUSTERDANGLINGINST = (TNODE, "ECLUSTERDANGLINGINST") EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE") EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN") EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT") @@ -1331,6 +1363,138 @@ class LUClusterVerify(LogicalUnit): ETYPE_ERROR = "ERROR" ETYPE_WARNING = "WARNING" + def _Error(self, ecode, item, msg, *args, **kwargs): + """Format an error message. + + Based on the opcode's error_codes parameter, either format a + parseable error code, or a simpler error string. + + This must be called only from Exec and functions called from Exec. + + """ + ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) + itype, etxt = ecode + # first complete the msg + if args: + msg = msg % args + # then format the whole message + if self.op.error_codes: # This is a mix-in. pylint: disable-msg=E1101 + msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg) + else: + if item: + item = " " + item + else: + item = "" + msg = "%s: %s%s: %s" % (ltype, itype, item, msg) + # and finally report it via the feedback_fn + self._feedback_fn(" - %s" % msg) # Mix-in. pylint: disable-msg=E1101 + + def _ErrorIf(self, cond, *args, **kwargs): + """Log an error message if the passed condition is True. + + """ + cond = (bool(cond) + or self.op.debug_simulate_errors) # pylint: disable-msg=E1101 + if cond: + self._Error(*args, **kwargs) + # do not mark the operation as failed for WARN cases only + if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR: + self.bad = self.bad or cond + + +class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors): + """Verifies the cluster config. + + """ + REQ_BGL = False + + def _VerifyHVP(self, hvp_data): + """Verifies locally the syntax of the hypervisor parameters. + + """ + for item, hv_name, hv_params in hvp_data: + msg = ("hypervisor %s parameters syntax check (source %s): %%s" % + (item, hv_name)) + try: + hv_class = hypervisor.GetHypervisor(hv_name) + utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES) + hv_class.CheckParameterSyntax(hv_params) + except errors.GenericError, err: + self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err)) + + def ExpandNames(self): + self.all_group_info = self.cfg.GetAllNodeGroupsInfo() + self.all_node_info = self.cfg.GetAllNodesInfo() + self.all_inst_info = self.cfg.GetAllInstancesInfo() + self.needed_locks = {} + + def Exec(self, feedback_fn): + """Verify integrity of cluster, performing various test on nodes. + + """ + self.bad = False + self._feedback_fn = feedback_fn + + feedback_fn("* Verifying cluster config") + + for msg in self.cfg.VerifyConfig(): + self._ErrorIf(True, self.ECLUSTERCFG, None, msg) + + feedback_fn("* Verifying cluster certificate files") + + for cert_filename in constants.ALL_CERT_FILES: + (errcode, msg) = _VerifyCertificate(cert_filename) + self._ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode) + + feedback_fn("* Verifying hypervisor parameters") + + self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(), + self.all_inst_info.values())) + + feedback_fn("* Verifying all nodes belong to an existing group") + + # We do this verification here because, should this bogus circumstance + # occur, it would never be catched by VerifyGroup, which only acts on + # nodes/instances reachable from existing node groups. + + dangling_nodes = set(node.name for node in self.all_node_info.values() + if node.group not in self.all_group_info) + + dangling_instances = {} + no_node_instances = [] + + for inst in self.all_inst_info.values(): + if inst.primary_node in dangling_nodes: + dangling_instances.setdefault(inst.primary_node, []).append(inst.name) + elif inst.primary_node not in self.all_node_info: + no_node_instances.append(inst.name) + + pretty_dangling = [ + "%s (%s)" % + (node.name, + utils.CommaJoin(dangling_instances.get(node.name, + ["no instances"]))) + for node in dangling_nodes] + + self._ErrorIf(bool(dangling_nodes), self.ECLUSTERDANGLINGNODES, None, + "the following nodes (and their instances) belong to a non" + " existing group: %s", utils.CommaJoin(pretty_dangling)) + + self._ErrorIf(bool(no_node_instances), self.ECLUSTERDANGLINGINST, None, + "the following instances have a non-existing primary-node:" + " %s", utils.CommaJoin(no_node_instances)) + + return (not self.bad, [g.name for g in self.all_group_info.values()]) + + +class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): + """Verifies the status of a node group. + + """ + HPATH = "cluster-verify" + HTYPE = constants.HTYPE_CLUSTER + REQ_BGL = False + _HOOKS_INDENT_RE = re.compile("^", re.M) class NodeImage(object): @@ -1384,48 +1548,90 @@ class LUClusterVerify(LogicalUnit): self.oslist = {} def ExpandNames(self): + # This raises errors.OpPrereqError on its own: + self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) + + all_node_info = self.cfg.GetAllNodesInfo() + all_inst_info = self.cfg.GetAllInstancesInfo() + + node_names = set(node.name + for node in all_node_info.values() + if node.group == self.group_uuid) + + inst_names = [inst.name + for inst in all_inst_info.values() + if inst.primary_node in node_names] + + # In Exec(), we warn about mirrored instances that have primary and + # secondary living in separate node groups. To fully verify that + # volumes for these instances are healthy, we will need to do an + # extra call to their secondaries. We ensure here those nodes will + # be locked. + for inst in inst_names: + if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR: + node_names.update(all_inst_info[inst].secondary_nodes) + self.needed_locks = { - locking.LEVEL_NODE: locking.ALL_SET, - locking.LEVEL_INSTANCE: locking.ALL_SET, + locking.LEVEL_NODEGROUP: [self.group_uuid], + locking.LEVEL_NODE: list(node_names), + locking.LEVEL_INSTANCE: inst_names, } + self.share_locks = dict.fromkeys(locking.LEVELS, 1) - def _Error(self, ecode, item, msg, *args, **kwargs): - """Format an error message. + def CheckPrereq(self): + self.all_node_info = self.cfg.GetAllNodesInfo() + self.all_inst_info = self.cfg.GetAllInstancesInfo() - Based on the opcode's error_codes parameter, either format a - parseable error code, or a simpler error string. + group_nodes = set(node.name + for node in self.all_node_info.values() + if node.group == self.group_uuid) - This must be called only from Exec and functions called from Exec. + group_instances = set(inst.name + for inst in self.all_inst_info.values() + if inst.primary_node in group_nodes) - """ - ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) - itype, etxt = ecode - # first complete the msg - if args: - msg = msg % args - # then format the whole message - if self.op.error_codes: - msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg) - else: - if item: - item = " " + item - else: - item = "" - msg = "%s: %s%s: %s" % (ltype, itype, item, msg) - # and finally report it via the feedback_fn - self._feedback_fn(" - %s" % msg) + unlocked_nodes = \ + group_nodes.difference(self.glm.list_owned(locking.LEVEL_NODE)) - def _ErrorIf(self, cond, *args, **kwargs): - """Log an error message if the passed condition is True. + unlocked_instances = \ + group_instances.difference(self.glm.list_owned(locking.LEVEL_INSTANCE)) - """ - cond = bool(cond) or self.op.debug_simulate_errors - if cond: - self._Error(*args, **kwargs) - # do not mark the operation as failed for WARN cases only - if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR: - self.bad = self.bad or cond + if unlocked_nodes: + raise errors.OpPrereqError("missing lock for nodes: %s" % + utils.CommaJoin(unlocked_nodes)) + + if unlocked_instances: + raise errors.OpPrereqError("missing lock for instances: %s" % + utils.CommaJoin(unlocked_instances)) + + self.my_node_names = utils.NiceSort(group_nodes) + self.my_inst_names = utils.NiceSort(group_instances) + + self.my_node_info = dict((name, self.all_node_info[name]) + for name in self.my_node_names) + + self.my_inst_info = dict((name, self.all_inst_info[name]) + for name in self.my_inst_names) + + # We detect here the nodes that will need the extra RPC calls for verifying + # split LV volumes; they should be locked. + extra_lv_nodes = set() + + for inst in self.my_inst_info.values(): + if inst.disk_template in constants.DTS_INT_MIRROR: + group = self.my_node_info[inst.primary_node].group + for nname in inst.secondary_nodes: + if self.all_node_info[nname].group != group: + extra_lv_nodes.add(nname) + + unlocked_lv_nodes = \ + extra_lv_nodes.difference(self.glm.list_owned(locking.LEVEL_NODE)) + + if unlocked_lv_nodes: + raise errors.OpPrereqError("these nodes could be locked: %s" % + utils.CommaJoin(unlocked_lv_nodes)) + self.extra_lv_nodes = list(extra_lv_nodes) def _VerifyNode(self, ninfo, nresult): """Perform some basic validation on data returned from a node. @@ -1533,7 +1739,7 @@ class LUClusterVerify(LogicalUnit): ntime_diff) def _VerifyNodeLVM(self, ninfo, nresult, vg_name): - """Check the node time. + """Check the node LVM results. @type ninfo: L{objects.Node} @param ninfo: the node to check @@ -1569,8 +1775,31 @@ class LUClusterVerify(LogicalUnit): _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV" " '%s' of VG '%s'", pvname, owner_vg) + def _VerifyNodeBridges(self, ninfo, nresult, bridges): + """Check the node bridges. + + @type ninfo: L{objects.Node} + @param ninfo: the node to check + @param nresult: the remote results for the node + @param bridges: the expected list of bridges + + """ + if not bridges: + return + + node = ninfo.name + _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + + missing = nresult.get(constants.NV_BRIDGES, None) + test = not isinstance(missing, list) + _ErrorIf(test, self.ENODENET, node, + "did not return valid bridge information") + if not test: + _ErrorIf(bool(missing), self.ENODENET, node, "missing bridges: %s" % + utils.CommaJoin(sorted(missing))) + def _VerifyNodeNetwork(self, ninfo, nresult): - """Check the node time. + """Check the node network connectivity results. @type ninfo: L{objects.Node} @param ninfo: the node to check @@ -1642,12 +1871,6 @@ class LUClusterVerify(LogicalUnit): "instance not running on its primary node %s", node_current) - for node, n_img in node_image.items(): - if node != node_current: - test = instance in n_img.instances - _ErrorIf(test, self.EINSTANCEWRONGNODE, instance, - "instance should not run on node %s", node) - diskdata = [(nname, success, status, idx) for (nname, disks) in diskstatus.items() for idx, (success, status) in enumerate(disks)] @@ -1687,18 +1910,6 @@ class LUClusterVerify(LogicalUnit): self._ErrorIf(test, self.ENODEORPHANLV, node, "volume %s is unknown", volume) - def _VerifyOrphanInstances(self, instancelist, node_image): - """Verify the list of running instances. - - This checks what instances are running but unknown to the cluster. - - """ - for node, n_img in node_image.items(): - for o_inst in n_img.instances: - test = o_inst not in instancelist - self._ErrorIf(test, self.ENODEORPHANINSTANCE, node, - "instance %s on node %s should not exist", o_inst, node) - def _VerifyNPlusOneMemory(self, node_image, instance_cfg): """Verify N+1 Memory Resilience. @@ -2181,20 +2392,6 @@ class LUClusterVerify(LogicalUnit): return instdisk - def _VerifyHVP(self, hvp_data): - """Verifies locally the syntax of the hypervisor parameters. - - """ - for item, hv_name, hv_params in hvp_data: - msg = ("hypervisor %s parameters syntax check (source %s): %%s" % - (item, hv_name)) - try: - hv_class = hypervisor.GetHypervisor(hv_name) - utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES) - hv_class.CheckParameterSyntax(hv_params) - except errors.GenericError, err: - self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err)) - def BuildHooksEnv(self): """Build hooks env. @@ -2202,14 +2399,12 @@ class LUClusterVerify(LogicalUnit): the output be logged in the verify output and the verification to fail. """ - cfg = self.cfg - env = { - "CLUSTER_TAGS": " ".join(cfg.GetClusterInfo().GetTags()) + "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()) } env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags())) - for node in cfg.GetAllNodesInfo().values()) + for node in self.my_node_info.values()) return env @@ -2217,10 +2412,12 @@ class LUClusterVerify(LogicalUnit): """Build hooks nodes. """ - return ([], self.cfg.GetNodeList()) + assert self.my_node_names, ("Node list not gathered," + " has CheckPrereq been executed?") + return ([], self.my_node_names) def Exec(self, feedback_fn): - """Verify integrity of cluster, performing various test on nodes. + """Verify integrity of the node group, performing various test on nodes. """ # This method has too many local variables. pylint: disable-msg=R0914 @@ -2228,26 +2425,14 @@ class LUClusterVerify(LogicalUnit): _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 verbose = self.op.verbose self._feedback_fn = feedback_fn - feedback_fn("* Verifying global settings") - for msg in self.cfg.VerifyConfig(): - _ErrorIf(True, self.ECLUSTERCFG, None, msg) - - # Check the cluster certificates - for cert_filename in constants.ALL_CERT_FILES: - (errcode, msg) = _VerifyCertificate(cert_filename) - _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode) vg_name = self.cfg.GetVGName() drbd_helper = self.cfg.GetDRBDHelper() - hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors cluster = self.cfg.GetClusterInfo() - nodelist = utils.NiceSort(self.cfg.GetNodeList()) - nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist] - nodeinfo_byname = dict(zip(nodelist, nodeinfo)) - instancelist = utils.NiceSort(self.cfg.GetInstanceList()) - instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname)) - for iname in instancelist) groupinfo = self.cfg.GetAllNodeGroupsInfo() + hypervisors = cluster.enabled_hypervisors + node_data_list = [self.my_node_info[name] for name in self.my_node_names] + i_non_redundant = [] # Non redundant instances i_non_a_balanced = [] # Non auto-balanced instances n_offline = 0 # Count of offline nodes @@ -2263,37 +2448,32 @@ class LUClusterVerify(LogicalUnit): master_node = self.master_node = self.cfg.GetMasterNode() master_ip = self.cfg.GetMasterIP() - # Compute the set of hypervisor parameters - hvp_data = [] - for hv_name in hypervisors: - hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name))) - for os_name, os_hvp in cluster.os_hvp.items(): - for hv_name, hv_params in os_hvp.items(): - if not hv_params: - continue - full_params = cluster.GetHVDefaults(hv_name, os_name=os_name) - hvp_data.append(("os %s" % os_name, hv_name, full_params)) - # TODO: collapse identical parameter values in a single one - for instance in instanceinfo.values(): - if not instance.hvparams: - continue - hvp_data.append(("instance %s" % instance.name, instance.hypervisor, - cluster.FillHV(instance))) - # and verify them locally - self._VerifyHVP(hvp_data) + feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names)) + + # We will make nodes contact all nodes in their group, and one node from + # every other group. + # TODO: should it be a *random* node, different every time? + online_nodes = [node.name for node in node_data_list if not node.offline] + other_group_nodes = {} + + for name in sorted(self.all_node_info): + node = self.all_node_info[name] + if (node.group not in other_group_nodes + and node.group != self.group_uuid + and not node.offline): + other_group_nodes[node.group] = node.name - feedback_fn("* Gathering data (%d nodes)" % len(nodelist)) node_verify_param = { constants.NV_FILELIST: utils.UniqueSequence(filename for files in filemap for filename in files), - constants.NV_NODELIST: [node.name for node in nodeinfo - if not node.offline], + constants.NV_NODELIST: online_nodes + other_group_nodes.values(), constants.NV_HYPERVISOR: hypervisors, - constants.NV_HVPARAMS: hvp_data, - constants.NV_NODENETTEST: [(node.name, node.primary_ip, - node.secondary_ip) for node in nodeinfo + constants.NV_HVPARAMS: + _GetAllHypervisorParameters(cluster, self.all_inst_info.values()), + constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip) + for node in node_data_list if not node.offline], constants.NV_INSTANCELIST: hypervisors, constants.NV_VERSION: None, @@ -2314,15 +2494,30 @@ class LUClusterVerify(LogicalUnit): if drbd_helper: node_verify_param[constants.NV_DRBDHELPER] = drbd_helper + # bridge checks + # FIXME: this needs to be changed per node-group, not cluster-wide + bridges = set() + default_nicpp = cluster.nicparams[constants.PP_DEFAULT] + if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED: + bridges.add(default_nicpp[constants.NIC_LINK]) + for instance in self.my_inst_info.values(): + for nic in instance.nics: + full_nic = cluster.SimpleFillNIC(nic.nicparams) + if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED: + bridges.add(full_nic[constants.NIC_LINK]) + + if bridges: + node_verify_param[constants.NV_BRIDGES] = list(bridges) + # Build our expected cluster state node_image = dict((node.name, self.NodeImage(offline=node.offline, name=node.name, vm_capable=node.vm_capable)) - for node in nodeinfo) + for node in node_data_list) # Gather OOB paths oob_paths = [] - for node in nodeinfo: + for node in self.all_node_info.values(): path = _SupportsOob(self.cfg, node) if path and path not in oob_paths: oob_paths.append(path) @@ -2330,14 +2525,13 @@ class LUClusterVerify(LogicalUnit): if oob_paths: node_verify_param[constants.NV_OOB_PATHS] = oob_paths - for instance in instancelist: - inst_config = instanceinfo[instance] + for instance in self.my_inst_names: + inst_config = self.my_inst_info[instance] for nname in inst_config.all_nodes: if nname not in node_image: - # ghost node gnode = self.NodeImage(name=nname) - gnode.ghost = True + gnode.ghost = (nname not in self.all_node_info) node_image[nname] = gnode inst_config.MapLVsByNode(node_vol_should) @@ -2360,23 +2554,59 @@ class LUClusterVerify(LogicalUnit): # time before and after executing the request, we can at least have a time # window. nvinfo_starttime = time.time() - all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param, + all_nvinfo = self.rpc.call_node_verify(self.my_node_names, + node_verify_param, self.cfg.GetClusterName()) + if self.extra_lv_nodes and vg_name is not None: + extra_lv_nvinfo = \ + self.rpc.call_node_verify(self.extra_lv_nodes, + {constants.NV_LVLIST: vg_name}, + self.cfg.GetClusterName()) + else: + extra_lv_nvinfo = {} nvinfo_endtime = time.time() all_drbd_map = self.cfg.ComputeDRBDMap() - feedback_fn("* Gathering disk information (%s nodes)" % len(nodelist)) - instdisk = self._CollectDiskInfo(nodelist, node_image, instanceinfo) + feedback_fn("* Gathering disk information (%s nodes)" % + len(self.my_node_names)) + instdisk = self._CollectDiskInfo(self.my_node_names, node_image, + self.my_inst_info) feedback_fn("* Verifying configuration file consistency") - self._VerifyFiles(_ErrorIf, nodeinfo, master_node, all_nvinfo, filemap) + + # If not all nodes are being checked, we need to make sure the master node + # and a non-checked vm_capable node are in the list. + absent_nodes = set(self.all_node_info).difference(self.my_node_info) + if absent_nodes: + vf_nvinfo = all_nvinfo.copy() + vf_node_info = list(self.my_node_info.values()) + additional_nodes = [] + if master_node not in self.my_node_info: + additional_nodes.append(master_node) + vf_node_info.append(self.all_node_info[master_node]) + # Add the first vm_capable node we find which is not included + for node in absent_nodes: + nodeinfo = self.all_node_info[node] + if nodeinfo.vm_capable and not nodeinfo.offline: + additional_nodes.append(node) + vf_node_info.append(self.all_node_info[node]) + break + key = constants.NV_FILELIST + vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes, + {key: node_verify_param[key]}, + self.cfg.GetClusterName())) + else: + vf_nvinfo = all_nvinfo + vf_node_info = self.my_node_info.values() + + self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap) feedback_fn("* Verifying node status") refos_img = None - for node_i in nodeinfo: + for node_i in node_data_list: node = node_i.name nimg = node_image[node] @@ -2413,23 +2643,41 @@ class LUClusterVerify(LogicalUnit): if nimg.vm_capable: self._VerifyNodeLVM(node_i, nresult, vg_name) - self._VerifyNodeDrbd(node_i, nresult, instanceinfo, drbd_helper, + self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper, all_drbd_map) self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name) self._UpdateNodeInstances(node_i, nresult, nimg) self._UpdateNodeInfo(node_i, nresult, nimg, vg_name) self._UpdateNodeOS(node_i, nresult, nimg) + if not nimg.os_fail: if refos_img is None: refos_img = nimg self._VerifyNodeOS(node_i, nimg, refos_img) + self._VerifyNodeBridges(node_i, nresult, bridges) + + # Check whether all running instancies are primary for the node. (This + # can no longer be done from _VerifyInstance below, since some of the + # wrong instances could be from other node groups.) + non_primary_inst = set(nimg.instances).difference(nimg.pinst) + + for inst in non_primary_inst: + test = inst in self.all_inst_info + _ErrorIf(test, self.EINSTANCEWRONGNODE, inst, + "instance should not run on node %s", node_i.name) + _ErrorIf(not test, self.ENODEORPHANINSTANCE, node_i.name, + "node is running unknown instance %s", inst) + + for node, result in extra_lv_nvinfo.items(): + self._UpdateNodeVolumes(self.all_node_info[node], result.payload, + node_image[node], vg_name) feedback_fn("* Verifying instance status") - for instance in instancelist: + for instance in self.my_inst_names: if verbose: feedback_fn("* Verifying instance %s" % instance) - inst_config = instanceinfo[instance] + inst_config = self.my_inst_info[instance] self._VerifyInstance(instance, inst_config, node_image, instdisk[instance]) inst_nodes_offline = [] @@ -2464,7 +2712,7 @@ class LUClusterVerify(LogicalUnit): instance_groups = {} for node in instance_nodes: - instance_groups.setdefault(nodeinfo_byname[node].group, + instance_groups.setdefault(self.all_node_info[node].group, []).append(node) pretty_list = [ @@ -2503,14 +2751,22 @@ class LUClusterVerify(LogicalUnit): feedback_fn("* Verifying orphan volumes") reserved = utils.FieldSet(*cluster.reserved_lvs) - self._VerifyOrphanVolumes(node_vol_should, node_image, reserved) - feedback_fn("* Verifying orphan instances") - self._VerifyOrphanInstances(instancelist, node_image) + # We will get spurious "unknown volume" warnings if any node of this group + # is secondary for an instance whose primary is in another group. To avoid + # them, we find these instances and add their volumes to node_vol_should. + for inst in self.all_inst_info.values(): + for secondary in inst.secondary_nodes: + if (secondary in self.my_node_info + and inst.name not in self.my_inst_info): + inst.MapLVsByNode(node_vol_should) + break + + self._VerifyOrphanVolumes(node_vol_should, node_image, reserved) if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks: feedback_fn("* Verifying N+1 Memory redundancy") - self._VerifyNPlusOneMemory(node_image, instanceinfo) + self._VerifyNPlusOneMemory(node_image, self.my_inst_info) feedback_fn("* Other Notes") if i_non_redundant: @@ -2651,10 +2907,7 @@ class LUClusterRepairDiskSizes(NoHooksLU): def ExpandNames(self): if self.op.instances: - self.wanted_names = [] - for name in self.op.instances: - full_name = _ExpandInstanceName(self.cfg, name) - self.wanted_names.append(full_name) + self.wanted_names = _GetWantedInstances(self, self.op.instances) self.needed_locks = { locking.LEVEL_NODE: [], locking.LEVEL_INSTANCE: self.wanted_names, @@ -2666,7 +2919,7 @@ class LUClusterRepairDiskSizes(NoHooksLU): locking.LEVEL_NODE: locking.ALL_SET, locking.LEVEL_INSTANCE: locking.ALL_SET, } - self.share_locks = dict(((i, 1) for i in locking.LEVELS)) + self.share_locks = dict.fromkeys(locking.LEVELS, 1) def DeclareLocks(self, level): if level == locking.LEVEL_NODE and self.wanted_names is not None: @@ -2679,7 +2932,7 @@ class LUClusterRepairDiskSizes(NoHooksLU): """ if self.wanted_names is None: - self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE] + self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE) self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name in self.wanted_names] @@ -2904,7 +3157,7 @@ class LUClusterSetParams(LogicalUnit): " drbd-based instances exist", errors.ECODE_INVAL) - node_list = self.acquired_locks[locking.LEVEL_NODE] + node_list = self.glm.list_owned(locking.LEVEL_NODE) # if vg_name not None, checks given volume group on all nodes if self.op.vg_name: @@ -2979,8 +3232,8 @@ class LUClusterSetParams(LogicalUnit): # if we're moving instances to routed, check that they have an ip target_mode = params_filled[constants.NIC_MODE] if target_mode == constants.NIC_MODE_ROUTED and not nic.ip: - nic_errors.append("Instance %s, nic/%d: routed nick with no ip" % - (instance.name, nic_idx)) + nic_errors.append("Instance %s, nic/%d: routed NIC with no ip" + " address" % (instance.name, nic_idx)) if nic_errors: raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" % "\n".join(nic_errors)) @@ -3431,6 +3684,20 @@ class LUOobCommand(NoHooksLU): REG_BGL = False _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE) + def ExpandNames(self): + """Gather locks we need. + + """ + if self.op.node_names: + self.op.node_names = _GetWantedNodes(self, self.op.node_names) + lock_names = self.op.node_names + else: + lock_names = locking.ALL_SET + + self.needed_locks = { + locking.LEVEL_NODE: lock_names, + } + def CheckPrereq(self): """Check prerequisites. @@ -3487,21 +3754,6 @@ class LUOobCommand(NoHooksLU): " not marked offline") % node_name, errors.ECODE_STATE) - def ExpandNames(self): - """Gather locks we need. - - """ - if self.op.node_names: - self.op.node_names = [_ExpandNodeName(self.cfg, name) - for name in self.op.node_names] - lock_names = self.op.node_names - else: - lock_names = locking.ALL_SET - - self.needed_locks = { - locking.LEVEL_NODE: lock_names, - } - def Exec(self, feedback_fn): """Execute OOB and return result if we expect any. @@ -3509,7 +3761,8 @@ class LUOobCommand(NoHooksLU): master_node = self.master_node ret = [] - for idx, node in enumerate(self.nodes): + for idx, node in enumerate(utils.NiceSort(self.nodes, + key=lambda node: node.name)): node_entry = [(constants.RS_NORMAL, node.name)] ret.append(node_entry) @@ -3672,7 +3925,10 @@ class _OsQuery(_QueryBase): """ # Locking is not used - assert not (lu.acquired_locks or self.do_locking or self.use_locking) + assert not (compat.any(lu.glm.is_owned(level) + for level in locking.LEVELS + if level != locking.LEVEL_CLUSTER) or + self.do_locking or self.use_locking) valid_nodes = [node.name for node in lu.cfg.GetAllNodesInfo().values() @@ -3978,7 +4234,7 @@ class LUNodeQueryvols(NoHooksLU): """Computes the list of nodes and their attributes. """ - nodenames = self.acquired_locks[locking.LEVEL_NODE] + nodenames = self.glm.list_owned(locking.LEVEL_NODE) volumes = self.rpc.call_node_volumes(nodenames) ilist = [self.cfg.GetInstanceInfo(iname) for iname @@ -4056,7 +4312,7 @@ class LUNodeQueryStorage(NoHooksLU): """Computes the list of nodes and their attributes. """ - self.nodes = self.acquired_locks[locking.LEVEL_NODE] + self.nodes = self.glm.list_owned(locking.LEVEL_NODE) # Always get name to sort by if constants.SF_NAME in self.op.output_fields: @@ -4631,7 +4887,7 @@ class LUNodeSetParams(LogicalUnit): instances_keep = [] # Build list of instances to release - for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]: + for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE): instance = self.context.cfg.GetInstanceInfo(instance_name) if (instance.disk_template in constants.DTS_INT_MIRROR and self.op.node_name in instance.all_nodes): @@ -4640,7 +4896,7 @@ class LUNodeSetParams(LogicalUnit): _ReleaseLocks(self, locking.LEVEL_INSTANCE, keep=instances_keep) - assert (set(self.acquired_locks.get(locking.LEVEL_INSTANCE, [])) == + assert (set(self.glm.list_owned(locking.LEVEL_INSTANCE)) == set(instances_keep)) def BuildHooksEnv(self): @@ -5422,7 +5678,8 @@ class LUInstanceStartup(LogicalUnit): instance = self.instance force = self.op.force - self.cfg.MarkInstanceUp(instance.name) + if not self.op.no_remember: + self.cfg.MarkInstanceUp(instance.name) if self.primary_offline: assert self.op.ignore_offline_nodes @@ -5587,7 +5844,8 @@ class LUInstanceShutdown(LogicalUnit): node_current = instance.primary_node timeout = self.op.timeout - self.cfg.MarkInstanceDown(instance.name) + if not self.op.no_remember: + self.cfg.MarkInstanceDown(instance.name) if self.primary_offline: assert self.op.ignore_offline_nodes @@ -5700,8 +5958,25 @@ class LUInstanceRecreateDisks(LogicalUnit): HTYPE = constants.HTYPE_INSTANCE REQ_BGL = False + def CheckArguments(self): + # normalise the disk list + self.op.disks = sorted(frozenset(self.op.disks)) + def ExpandNames(self): self._ExpandAndLockInstance() + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + if self.op.nodes: + self.op.nodes = [_ExpandNodeName(self.cfg, n) for n in self.op.nodes] + self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes) + else: + self.needed_locks[locking.LEVEL_NODE] = [] + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE: + # if we replace the nodes, we only need to lock the old primary, + # otherwise we need to lock all nodes for disk re-creation + primary_only = bool(self.op.nodes) + self._LockInstancesNodes(primary_only=primary_only) def BuildHooksEnv(self): """Build hooks env. @@ -5727,12 +6002,31 @@ class LUInstanceRecreateDisks(LogicalUnit): instance = self.cfg.GetInstanceInfo(self.op.instance_name) assert instance is not None, \ "Cannot retrieve locked instance %s" % self.op.instance_name - _CheckNodeOnline(self, instance.primary_node) + if self.op.nodes: + if len(self.op.nodes) != len(instance.all_nodes): + raise errors.OpPrereqError("Instance %s currently has %d nodes, but" + " %d replacement nodes were specified" % + (instance.name, len(instance.all_nodes), + len(self.op.nodes)), + errors.ECODE_INVAL) + assert instance.disk_template != constants.DT_DRBD8 or \ + len(self.op.nodes) == 2 + assert instance.disk_template != constants.DT_PLAIN or \ + len(self.op.nodes) == 1 + primary_node = self.op.nodes[0] + else: + primary_node = instance.primary_node + _CheckNodeOnline(self, primary_node) if instance.disk_template == constants.DT_DISKLESS: raise errors.OpPrereqError("Instance '%s' has no disks" % self.op.instance_name, errors.ECODE_INVAL) - _CheckInstanceDown(self, instance, "cannot recreate disks") + # if we replace nodes *and* the old primary is offline, we don't + # check + assert instance.primary_node in self.needed_locks[locking.LEVEL_NODE] + old_pnode = self.cfg.GetNodeInfo(instance.primary_node) + if not (self.op.nodes and old_pnode.offline): + _CheckInstanceDown(self, instance, "cannot recreate disks") if not self.op.disks: self.op.disks = range(len(instance.disks)) @@ -5741,18 +6035,39 @@ class LUInstanceRecreateDisks(LogicalUnit): if idx >= len(instance.disks): raise errors.OpPrereqError("Invalid disk index '%s'" % idx, errors.ECODE_INVAL) - + if self.op.disks != range(len(instance.disks)) and self.op.nodes: + raise errors.OpPrereqError("Can't recreate disks partially and" + " change the nodes at the same time", + errors.ECODE_INVAL) self.instance = instance def Exec(self, feedback_fn): """Recreate the disks. """ + # change primary node, if needed + if self.op.nodes: + self.instance.primary_node = self.op.nodes[0] + self.LogWarning("Changing the instance's nodes, you will have to" + " remove any disks left on the older nodes manually") + to_skip = [] - for idx, _ in enumerate(self.instance.disks): + for idx, disk in enumerate(self.instance.disks): if idx not in self.op.disks: # disk idx has not been passed in to_skip.append(idx) continue + # update secondaries for disks, if needed + if self.op.nodes: + if disk.dev_type == constants.LD_DRBD8: + # need to update the nodes + assert len(self.op.nodes) == 2 + logical_id = list(disk.logical_id) + logical_id[0] = self.op.nodes[0] + logical_id[1] = self.op.nodes[1] + disk.logical_id = tuple(logical_id) + + if self.op.nodes: + self.cfg.Update(self.instance, feedback_fn) _CreateDisks(self, self.instance, to_skip=to_skip) @@ -5807,8 +6122,9 @@ class LUInstanceRename(LogicalUnit): new_name = self.op.new_name if self.op.name_check: hostname = netutils.GetHostname(name=new_name) - self.LogInfo("Resolved given name '%s' to '%s'", new_name, - hostname.name) + if hostname != new_name: + self.LogInfo("Resolved given name '%s' to '%s'", new_name, + hostname.name) if not utils.MatchNameComponent(self.op.new_name, [hostname.name]): raise errors.OpPrereqError(("Resolved hostname '%s' does not look the" " same as given hostname '%s'") % @@ -5843,8 +6159,8 @@ class LUInstanceRename(LogicalUnit): # Change the instance lock. This is definitely safe while we hold the BGL. # Otherwise the new lock would have to be added in acquired mode. assert self.REQ_BGL - self.context.glm.remove(locking.LEVEL_INSTANCE, old_name) - self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name) + self.glm.remove(locking.LEVEL_INSTANCE, old_name) + self.glm.add(locking.LEVEL_INSTANCE, self.op.new_name) # re-read the instance from the configuration after rename inst = self.cfg.GetInstanceInfo(self.op.new_name) @@ -6471,11 +6787,15 @@ class TLMigrateInstance(Tasklet): # self.target_node is already populated, either directly or by the # iallocator run target_node = self.target_node + if self.target_node == instance.primary_node: + raise errors.OpPrereqError("Cannot migrate instance %s" + " to its primary (%s)" % + (instance.name, instance.primary_node)) if len(self.lu.tasklets) == 1: # It is safe to release locks only when we're the only tasklet # in the LU - _ReleaseLocks(self, locking.LEVEL_NODE, + _ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=[instance.primary_node, self.target_node]) else: @@ -7769,7 +8089,7 @@ class LUInstanceCreate(LogicalUnit): src_path = self.op.src_path if src_node is None: - locked_nodes = self.acquired_locks[locking.LEVEL_NODE] + locked_nodes = self.glm.list_owned(locking.LEVEL_NODE) exp_list = self.rpc.call_export_list(locked_nodes) found = False for node in exp_list: @@ -8552,7 +8872,7 @@ class LUInstanceReplaceDisks(LogicalUnit): # Lock member nodes of all locked groups self.needed_locks[locking.LEVEL_NODE] = [node_name - for group_uuid in self.acquired_locks[locking.LEVEL_NODEGROUP] + for group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP) for node_name in self.cfg.GetNodeGroup(group_uuid).members] else: self._LockInstancesNodes() @@ -8589,19 +8909,19 @@ class LUInstanceReplaceDisks(LogicalUnit): """Check prerequisites. """ - assert (locking.LEVEL_NODEGROUP in self.acquired_locks or + assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or self.op.iallocator is None) - if locking.LEVEL_NODEGROUP in self.acquired_locks: + owned_groups = self.glm.list_owned(locking.LEVEL_NODEGROUP) + if owned_groups: groups = self.cfg.GetInstanceNodeGroups(self.op.instance_name) - prevgroups = self.acquired_locks[locking.LEVEL_NODEGROUP] - if prevgroups != groups: + if owned_groups != groups: raise errors.OpExecError("Node groups used by instance '%s' changed" " since lock was acquired, current list is %r," " used to be '%s'" % (self.op.instance_name, utils.CommaJoin(groups), - utils.CommaJoin(prevgroups))) + utils.CommaJoin(owned_groups))) return LogicalUnit.CheckPrereq(self) @@ -8760,7 +9080,7 @@ class TLReplaceDisks(Tasklet): if remote_node is None: self.remote_node_info = None else: - assert remote_node in self.lu.acquired_locks[locking.LEVEL_NODE], \ + assert remote_node in self.lu.glm.list_owned(locking.LEVEL_NODE), \ "Remote node '%s' is not locked" % remote_node self.remote_node_info = self.cfg.GetNodeInfo(remote_node) @@ -8858,7 +9178,7 @@ class TLReplaceDisks(Tasklet): _ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes) # Release any owned node group - if self.lu.context.glm.is_owned(locking.LEVEL_NODEGROUP): + if self.lu.glm.is_owned(locking.LEVEL_NODEGROUP): _ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP) # Check whether disks are valid @@ -8881,16 +9201,16 @@ class TLReplaceDisks(Tasklet): if __debug__: # Verify owned locks before starting operation - owned_locks = self.lu.context.glm.list_owned(locking.LEVEL_NODE) + owned_locks = self.lu.glm.list_owned(locking.LEVEL_NODE) assert set(owned_locks) == set(self.node_secondary_ip), \ ("Incorrect node locks, owning %s, expected %s" % (owned_locks, self.node_secondary_ip.keys())) - owned_locks = self.lu.context.glm.list_owned(locking.LEVEL_INSTANCE) + owned_locks = self.lu.glm.list_owned(locking.LEVEL_INSTANCE) assert list(owned_locks) == [self.instance_name], \ "Instance '%s' not locked" % self.instance_name - assert not self.lu.context.glm.is_owned(locking.LEVEL_NODEGROUP), \ + assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \ "Should not own any node group lock at this point" if not self.disks: @@ -8922,7 +9242,7 @@ class TLReplaceDisks(Tasklet): if __debug__: # Verify owned locks - owned_locks = self.lu.context.glm.list_owned(locking.LEVEL_NODE) + owned_locks = self.lu.glm.list_owned(locking.LEVEL_NODE) nodes = frozenset(self.node_secondary_ip) assert ((self.early_release and not owned_locks) or (not self.early_release and not (set(owned_locks) - nodes))), \ @@ -9519,9 +9839,17 @@ class LUInstanceGrowDisk(LogicalUnit): if not disks_ok: raise errors.OpExecError("Cannot activate block device to grow") + # First run all grow ops in dry-run mode for node in instance.all_nodes: self.cfg.SetDiskID(disk, node) - result = self.rpc.call_blockdev_grow(node, disk, self.op.amount) + result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, True) + result.Raise("Grow request failed to node %s" % node) + + # We know that (as far as we can test) operations across different + # nodes will succeed, time to run it for real + for node in instance.all_nodes: + self.cfg.SetDiskID(disk, node) + result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, False) result.Raise("Grow request failed to node %s" % node) # TODO: Rewrite code to work properly @@ -9591,7 +9919,7 @@ class LUInstanceQueryData(NoHooksLU): """ if self.wanted_names is None: assert self.op.use_locking, "Locking was not used" - self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE] + self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE) self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name in self.wanted_names] @@ -9977,6 +10305,7 @@ class LUInstanceSetParams(LogicalUnit): self.be_inst = i_bedict # the new dict (without defaults) else: self.be_new = self.be_inst = {} + be_old = cluster.FillBE(instance) # osparams processing if self.op.osparams: @@ -9988,7 +10317,8 @@ class LUInstanceSetParams(LogicalUnit): self.warn = [] - if constants.BE_MEMORY in self.op.beparams and not self.op.force: + if (constants.BE_MEMORY in self.op.beparams and not self.op.force and + be_new[constants.BE_MEMORY] > be_old[constants.BE_MEMORY]): mem_check_list = [pnode] if be_new[constants.BE_AUTO_BALANCE]: # either we changed auto_balance to yes or it was from before @@ -10029,16 +10359,17 @@ class LUInstanceSetParams(LogicalUnit): for node, nres in nodeinfo.items(): if node not in instance.secondary_nodes: continue - msg = nres.fail_msg - if msg: - self.warn.append("Can't get info from secondary node %s: %s" % - (node, msg)) - elif not isinstance(nres.payload.get('memory_free', None), int): - self.warn.append("Secondary node %s didn't return free" - " memory information" % node) + nres.Raise("Can't get info from secondary node %s" % node, + prereq=True, ecode=errors.ECODE_STATE) + if not isinstance(nres.payload.get('memory_free', None), int): + raise errors.OpPrereqError("Secondary node %s didn't return free" + " memory information" % node, + errors.ECODE_STATE) elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']: - self.warn.append("Not enough memory to failover instance to" - " secondary node %s" % node) + raise errors.OpPrereqError("This change will prevent the instance" + " from failover to its secondary node" + " %s, due to not enough memory" % node, + errors.ECODE_STATE) # NIC processing self.nic_pnew = {} @@ -10190,7 +10521,8 @@ class LUInstanceSetParams(LogicalUnit): self.cfg.Update(instance, feedback_fn) # disks are created, waiting for sync - disk_abort = not _WaitForSync(self, instance) + disk_abort = not _WaitForSync(self, instance, + oneshot=not self.op.wait_for_sync) if disk_abort: raise errors.OpExecError("There are some degraded disks for" " this instance, please cleanup manually") @@ -10399,7 +10731,7 @@ class LUBackupQuery(NoHooksLU): that node. """ - self.nodes = self.acquired_locks[locking.LEVEL_NODE] + self.nodes = self.glm.list_owned(locking.LEVEL_NODE) rpcresult = self.rpc.call_export_list(self.nodes) result = {} for node in rpcresult: @@ -10781,7 +11113,7 @@ class LUBackupRemove(NoHooksLU): fqdn_warn = True instance_name = self.op.instance_name - locked_nodes = self.acquired_locks[locking.LEVEL_NODE] + locked_nodes = self.glm.list_owned(locking.LEVEL_NODE) exportlist = self.rpc.call_export_list(locked_nodes) found = False for node in exportlist: @@ -10880,20 +11212,40 @@ class LUGroupAssignNodes(NoHooksLU): # We want to lock all the affected nodes and groups. We have readily # available the list of nodes, and the *destination* group. To gather the - # list of "source" groups, we need to fetch node information. - self.node_data = self.cfg.GetAllNodesInfo() - affected_groups = set(self.node_data[node].group for node in self.op.nodes) - affected_groups.add(self.group_uuid) - + # list of "source" groups, we need to fetch node information later on. self.needed_locks = { - locking.LEVEL_NODEGROUP: list(affected_groups), + locking.LEVEL_NODEGROUP: set([self.group_uuid]), locking.LEVEL_NODE: self.op.nodes, } + def DeclareLocks(self, level): + if level == locking.LEVEL_NODEGROUP: + assert len(self.needed_locks[locking.LEVEL_NODEGROUP]) == 1 + + # Try to get all affected nodes' groups without having the group or node + # lock yet. Needs verification later in the code flow. + groups = self.cfg.GetNodeGroupsFromNodes(self.op.nodes) + + self.needed_locks[locking.LEVEL_NODEGROUP].update(groups) + def CheckPrereq(self): """Check prerequisites. """ + assert self.needed_locks[locking.LEVEL_NODEGROUP] + assert (frozenset(self.glm.list_owned(locking.LEVEL_NODE)) == + frozenset(self.op.nodes)) + + expected_locks = (set([self.group_uuid]) | + self.cfg.GetNodeGroupsFromNodes(self.op.nodes)) + actual_locks = self.glm.list_owned(locking.LEVEL_NODEGROUP) + if actual_locks != expected_locks: + raise errors.OpExecError("Nodes changed groups since locks were acquired," + " current groups are '%s', used to be '%s'" % + (utils.CommaJoin(expected_locks), + utils.CommaJoin(actual_locks))) + + self.node_data = self.cfg.GetAllNodesInfo() self.group = self.cfg.GetNodeGroup(self.group_uuid) instance_data = self.cfg.GetAllInstancesInfo() @@ -10929,6 +11281,9 @@ class LUGroupAssignNodes(NoHooksLU): for node in self.op.nodes: self.node_data[node].group = self.group_uuid + # FIXME: Depends on side-effects of modifying the result of + # C{cfg.GetAllNodesInfo} + self.cfg.Update(self.group, feedback_fn) # Saves all modified nodes. @staticmethod @@ -11645,16 +12000,6 @@ class IAllocator(object): """ # pylint: disable-msg=R0902 # lots of instance attributes - _ALLO_KEYS = [ - "name", "mem_size", "disks", "disk_template", - "os", "tags", "nics", "vcpus", "hypervisor", - ] - _RELO_KEYS = [ - "name", "relocate_from", - ] - _EVAC_KEYS = [ - "evac_nodes", - ] def __init__(self, cfg, rpc, mode, **kwargs): self.cfg = cfg @@ -11669,22 +12014,20 @@ class IAllocator(object): self.relocate_from = None self.name = None self.evac_nodes = None + self.instances = None + self.reloc_mode = None + self.target_groups = None # computed fields self.required_nodes = None # init result fields self.success = self.info = self.result = None - if self.mode == constants.IALLOCATOR_MODE_ALLOC: - keyset = self._ALLO_KEYS - fn = self._AddNewInstance - elif self.mode == constants.IALLOCATOR_MODE_RELOC: - keyset = self._RELO_KEYS - fn = self._AddRelocateInstance - elif self.mode == constants.IALLOCATOR_MODE_MEVAC: - keyset = self._EVAC_KEYS - fn = self._AddEvacuateNodes - else: + + try: + (fn, keyset, self._result_check) = self._MODE_DATA[self.mode] + except KeyError: raise errors.ProgrammerError("Unknown mode '%s' passed to the" " IAllocator" % self.mode) + for key in kwargs: if key not in keyset: raise errors.ProgrammerError("Invalid input parameter '%s' to" @@ -11695,7 +12038,7 @@ class IAllocator(object): if key not in kwargs: raise errors.ProgrammerError("Missing input parameter '%s' to" " IAllocator" % key) - self._BuildInputData(fn) + self._BuildInputData(compat.partial(fn, self)) def _ComputeClusterData(self): """Compute the generic allocator input data. @@ -11724,7 +12067,8 @@ class IAllocator(object): hypervisor_name = self.hypervisor elif self.mode == constants.IALLOCATOR_MODE_RELOC: hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor - elif self.mode == constants.IALLOCATOR_MODE_MEVAC: + elif self.mode in (constants.IALLOCATOR_MODE_MEVAC, + constants.IALLOCATOR_MODE_MRELOC): hypervisor_name = cluster_info.enabled_hypervisors[0] node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(), @@ -11750,12 +12094,12 @@ class IAllocator(object): """Compute node groups data. """ - ng = {} - for guuid, gdata in cfg.GetAllNodeGroupsInfo().items(): - ng[guuid] = { - "name": gdata.name, - "alloc_policy": gdata.alloc_policy, - } + ng = dict((guuid, { + "name": gdata.name, + "alloc_policy": gdata.alloc_policy, + }) + for guuid, gdata in cfg.GetAllNodeGroupsInfo().items()) + return ng @staticmethod @@ -11766,22 +12110,19 @@ class IAllocator(object): @returns: a dict of name: (node dict, node config) """ - node_results = {} - for ninfo in node_cfg.values(): - # fill in static (config-based) values - pnr = { - "tags": list(ninfo.GetTags()), - "primary_ip": ninfo.primary_ip, - "secondary_ip": ninfo.secondary_ip, - "offline": ninfo.offline, - "drained": ninfo.drained, - "master_candidate": ninfo.master_candidate, - "group": ninfo.group, - "master_capable": ninfo.master_capable, - "vm_capable": ninfo.vm_capable, - } - - node_results[ninfo.name] = pnr + # fill in static (config-based) values + node_results = dict((ninfo.name, { + "tags": list(ninfo.GetTags()), + "primary_ip": ninfo.primary_ip, + "secondary_ip": ninfo.secondary_ip, + "offline": ninfo.offline, + "drained": ninfo.drained, + "master_candidate": ninfo.master_candidate, + "group": ninfo.group, + "master_capable": ninfo.master_capable, + "vm_capable": ninfo.vm_capable, + }) + for ninfo in node_cfg.values()) return node_results @@ -11855,11 +12196,12 @@ class IAllocator(object): nic_data = [] for nic in iinfo.nics: filled_params = cluster_info.SimpleFillNIC(nic.nicparams) - nic_dict = {"mac": nic.mac, - "ip": nic.ip, - "mode": filled_params[constants.NIC_MODE], - "link": filled_params[constants.NIC_LINK], - } + nic_dict = { + "mac": nic.mac, + "ip": nic.ip, + "mode": filled_params[constants.NIC_MODE], + "link": filled_params[constants.NIC_LINK], + } if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED: nic_dict["bridge"] = filled_params[constants.NIC_LINK] nic_data.append(nic_dict) @@ -11899,6 +12241,7 @@ class IAllocator(object): self.required_nodes = 2 else: self.required_nodes = 1 + request = { "name": self.name, "disk_template": self.disk_template, @@ -11911,6 +12254,7 @@ class IAllocator(object): "nics": self.nics, "required_nodes": self.required_nodes, } + return request def _AddRelocateInstance(self): @@ -11958,6 +12302,16 @@ class IAllocator(object): } return request + def _AddMultiRelocate(self): + """Get data for multi-relocate requests. + + """ + return { + "instances": self.instances, + "reloc_mode": self.reloc_mode, + "target_groups": self.target_groups, + } + def _BuildInputData(self, fn): """Build input data structures. @@ -11970,6 +12324,28 @@ class IAllocator(object): self.in_text = serializer.Dump(self.in_data) + _MODE_DATA = { + constants.IALLOCATOR_MODE_ALLOC: + (_AddNewInstance, + ["name", "mem_size", "disks", "disk_template", "os", "tags", "nics", + "vcpus", "hypervisor"], ht.TList), + constants.IALLOCATOR_MODE_RELOC: + (_AddRelocateInstance, ["name", "relocate_from"], ht.TList), + constants.IALLOCATOR_MODE_MEVAC: + (_AddEvacuateNodes, ["evac_nodes"], + ht.TListOf(ht.TAnd(ht.TIsLength(2), + ht.TListOf(ht.TString)))), + constants.IALLOCATOR_MODE_MRELOC: + (_AddMultiRelocate, ["instances", "reloc_mode", "target_groups"], + ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, { + # pylint: disable-msg=E1101 + # Class '...' has no 'OP_ID' member + "OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID, + opcodes.OpInstanceMigrate.OP_ID, + opcodes.OpInstanceReplaceDisks.OP_ID]) + })))), + } + def Run(self, name, validate=True, call_fn=None): """Run an instance allocator and return the results. @@ -12010,28 +12386,45 @@ class IAllocator(object): " missing key '%s'" % key) setattr(self, key, rdict[key]) - if not isinstance(rdict["result"], list): - raise errors.OpExecError("Can't parse iallocator results: 'result' key" - " is not a list") - - if self.mode == constants.IALLOCATOR_MODE_RELOC: - assert self.relocate_from is not None - assert self.required_nodes == 1 + if not self._result_check(self.result): + raise errors.OpExecError("Iallocator returned invalid result," + " expected %s, got %s" % + (self._result_check, self.result), + errors.ECODE_INVAL) + if self.mode in (constants.IALLOCATOR_MODE_RELOC, + constants.IALLOCATOR_MODE_MEVAC): node2group = dict((name, ndata["group"]) for (name, ndata) in self.in_data["nodes"].items()) fn = compat.partial(self._NodesToGroups, node2group, self.in_data["nodegroups"]) - request_groups = fn(self.relocate_from) - result_groups = fn(rdict["result"]) - - if result_groups != request_groups: - raise errors.OpExecError("Groups of nodes returned by iallocator (%s)" - " differ from original groups (%s)" % - (utils.CommaJoin(result_groups), - utils.CommaJoin(request_groups))) + if self.mode == constants.IALLOCATOR_MODE_RELOC: + assert self.relocate_from is not None + assert self.required_nodes == 1 + + request_groups = fn(self.relocate_from) + result_groups = fn(rdict["result"]) + + if result_groups != request_groups: + raise errors.OpExecError("Groups of nodes returned by iallocator (%s)" + " differ from original groups (%s)" % + (utils.CommaJoin(result_groups), + utils.CommaJoin(request_groups))) + elif self.mode == constants.IALLOCATOR_MODE_MEVAC: + request_groups = fn(self.evac_nodes) + for (instance_name, secnode) in self.result: + result_groups = fn([secnode]) + if result_groups != request_groups: + raise errors.OpExecError("Iallocator returned new secondary node" + " '%s' (group '%s') for instance '%s'" + " which is not in original group '%s'" % + (secnode, utils.CommaJoin(result_groups), + instance_name, + utils.CommaJoin(request_groups))) + else: + raise errors.ProgrammerError("Unhandled mode '%s'" % self.mode) self.out_data = rdict @@ -12115,6 +12508,12 @@ class LUTestAllocator(NoHooksLU): if not hasattr(self.op, "evac_nodes"): raise errors.OpPrereqError("Missing attribute 'evac_nodes' on" " opcode input", errors.ECODE_INVAL) + elif self.op.mode == constants.IALLOCATOR_MODE_MRELOC: + if self.op.instances: + self.op.instances = _GetWantedInstances(self, self.op.instances) + else: + raise errors.OpPrereqError("Missing instances to relocate", + errors.ECODE_INVAL) else: raise errors.OpPrereqError("Invalid test allocator mode '%s'" % self.op.mode, errors.ECODE_INVAL) @@ -12154,6 +12553,12 @@ class LUTestAllocator(NoHooksLU): ial = IAllocator(self.cfg, self.rpc, mode=self.op.mode, evac_nodes=self.op.evac_nodes) + elif self.op.mode == constants.IALLOCATOR_MODE_MRELOC: + ial = IAllocator(self.cfg, self.rpc, + mode=self.op.mode, + instances=self.op.instances, + reloc_mode=self.op.reloc_mode, + target_groups=self.op.target_groups) else: raise errors.ProgrammerError("Uncatched mode %s in" " LUTestAllocator.Exec", self.op.mode)