X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/a7761c1205e3a8a627022f7c41d1950b1ef913c9..ff8ab07e6efd80535cf14e721daa4f23727164fb:/lib/cmdlib.py?ds=sidebyside diff --git a/lib/cmdlib.py b/lib/cmdlib.py index 3ba5248..7ba8630 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -129,17 +129,16 @@ class LogicalUnit(object): self.proc = processor self.op = op self.cfg = context.cfg + self.glm = context.glm self.context = context self.rpc = rpc # Dicts used to declare locking needs to mcpu self.needed_locks = None - self.acquired_locks = {} self.share_locks = dict.fromkeys(locking.LEVELS, 0) self.add_locks = {} self.remove_locks = {} # Used to force good behavior when calling helper functions self.recalculate_locks = {} - self.__ssh = None # logging self.Log = processor.Log # pylint: disable-msg=C0103 self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103 @@ -160,16 +159,6 @@ class LogicalUnit(object): self.CheckArguments() - def __GetSSH(self): - """Returns the SshRunner object - - """ - if not self.__ssh: - self.__ssh = ssh.SshRunner(self.cfg.GetClusterName()) - return self.__ssh - - ssh = property(fget=__GetSSH) - def CheckArguments(self): """Check syntactic validity for the opcode arguments. @@ -396,7 +385,7 @@ class LogicalUnit(object): # future we might want to have different behaviors depending on the value # of self.recalculate_locks[locking.LEVEL_NODE] wanted_nodes = [] - for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]: + for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE): instance = self.context.cfg.GetInstanceInfo(instance_name) wanted_nodes.append(instance.primary_node) if not primary_only: @@ -510,7 +499,7 @@ class _QueryBase: """ if self.do_locking: - names = lu.acquired_locks[lock_level] + names = lu.glm.list_owned(lock_level) else: names = all_names @@ -521,7 +510,7 @@ class _QueryBase: # caller specified names and we must keep the same order assert self.names - assert not self.do_locking or lu.acquired_locks[lock_level] + assert not self.do_locking or lu.glm.is_owned(lock_level) missing = set(self.wanted).difference(names) if missing: @@ -641,6 +630,51 @@ def _GetUpdatedParams(old_params, update_dict, return params_copy +def _ReleaseLocks(lu, level, names=None, keep=None): + """Releases locks owned by an LU. + + @type lu: L{LogicalUnit} + @param level: Lock level + @type names: list or None + @param names: Names of locks to release + @type keep: list or None + @param keep: Names of locks to retain + + """ + assert not (keep is not None and names is not None), \ + "Only one of the 'names' and the 'keep' parameters can be given" + + if names is not None: + should_release = names.__contains__ + elif keep: + should_release = lambda name: name not in keep + else: + should_release = None + + if should_release: + retain = [] + release = [] + + # Determine which locks to release + for name in lu.glm.list_owned(level): + if should_release(name): + release.append(name) + else: + retain.append(name) + + assert len(lu.glm.list_owned(level)) == (len(retain) + len(release)) + + # Release just some locks + lu.glm.release(level, names=release) + + assert frozenset(lu.glm.list_owned(level)) == frozenset(retain) + else: + # Release everything + lu.glm.release(level) + + assert not lu.glm.is_owned(level), "No locks should be owned" + + def _RunPostHook(lu, node_name): """Runs the post-hook for an opcode on a single node. @@ -1265,6 +1299,7 @@ class LUClusterVerify(LogicalUnit): ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG") ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT") + ECLUSTERFILECHECK = (TCLUSTER, "ECLUSTERFILECHECK") EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE") EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN") EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT") @@ -1457,7 +1492,7 @@ class LUClusterVerify(LogicalUnit): hv_name, item, hv_result) test = nresult.get(constants.NV_NODESETUP, - ["Missing NODESETUP results"]) + ["Missing NODESETUP results"]) _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s", "; ".join(test)) @@ -1694,51 +1729,94 @@ class LUClusterVerify(LogicalUnit): test = n_img.mfree < needed_mem self._ErrorIf(test, self.ENODEN1, node, "not enough memory to accomodate instance failovers" - " should node %s fail", prinode) + " should node %s fail (%dMiB needed, %dMiB available)", + prinode, needed_mem, n_img.mfree) - def _VerifyNodeFiles(self, ninfo, nresult, file_list, local_cksum, - master_files): - """Verifies and computes the node required file checksums. + @classmethod + def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo, + (files_all, files_all_opt, files_mc, files_vm)): + """Verifies file checksums collected from all nodes. - @type ninfo: L{objects.Node} - @param ninfo: the node to check - @param nresult: the remote results for the node - @param file_list: required list of files - @param local_cksum: dictionary of local files and their checksums - @param master_files: list of files that only masters should have + @param errorif: Callback for reporting errors + @param nodeinfo: List of L{objects.Node} objects + @param master_node: Name of master node + @param all_nvinfo: RPC results """ - node = ninfo.name - _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + node_names = frozenset(node.name for node in nodeinfo) - remote_cksum = nresult.get(constants.NV_FILELIST, None) - test = not isinstance(remote_cksum, dict) - _ErrorIf(test, self.ENODEFILECHECK, node, - "node hasn't returned file checksum data") - if test: - return + assert master_node in node_names + assert (len(files_all | files_all_opt | files_mc | files_vm) == + sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \ + "Found file listed in more than one file list" - for file_name in file_list: - node_is_mc = ninfo.master_candidate - must_have = (file_name not in master_files) or node_is_mc - # missing - test1 = file_name not in remote_cksum - # invalid checksum - test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name] - # existing and good - test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name] - _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node, - "file '%s' missing", file_name) - _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node, - "file '%s' has wrong checksum", file_name) - # not candidate and this is not a must-have file - _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node, - "file '%s' should not exist on non master" - " candidates (and the file is outdated)", file_name) - # all good, except non-master/non-must have combination - _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node, - "file '%s' should not exist" - " on non master candidates", file_name) + # Define functions determining which nodes to consider for a file + file2nodefn = dict([(filename, fn) + for (files, fn) in [(files_all, None), + (files_all_opt, None), + (files_mc, lambda node: (node.master_candidate or + node.name == master_node)), + (files_vm, lambda node: node.vm_capable)] + for filename in files]) + + fileinfo = dict((filename, {}) for filename in file2nodefn.keys()) + + for node in nodeinfo: + nresult = all_nvinfo[node.name] + + if nresult.fail_msg or not nresult.payload: + node_files = None + else: + node_files = nresult.payload.get(constants.NV_FILELIST, None) + + test = not (node_files and isinstance(node_files, dict)) + errorif(test, cls.ENODEFILECHECK, node.name, + "Node did not return file checksum data") + if test: + continue + + for (filename, checksum) in node_files.items(): + # Check if the file should be considered for a node + fn = file2nodefn[filename] + if fn is None or fn(node): + fileinfo[filename].setdefault(checksum, set()).add(node.name) + + for (filename, checksums) in fileinfo.items(): + assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum" + + # Nodes having the file + with_file = frozenset(node_name + for nodes in fileinfo[filename].values() + for node_name in nodes) + + # Nodes missing file + missing_file = node_names - with_file + + if filename in files_all_opt: + # All or no nodes + errorif(missing_file and missing_file != node_names, + cls.ECLUSTERFILECHECK, None, + "File %s is optional, but it must exist on all or no nodes (not" + " found on %s)", + filename, utils.CommaJoin(utils.NiceSort(missing_file))) + else: + errorif(missing_file, cls.ECLUSTERFILECHECK, None, + "File %s is missing from node(s) %s", filename, + utils.CommaJoin(utils.NiceSort(missing_file))) + + # See if there are multiple versions of the file + test = len(checksums) > 1 + if test: + variants = ["variant %s on %s" % + (idx + 1, utils.CommaJoin(utils.NiceSort(nodes))) + for (idx, (checksum, nodes)) in + enumerate(sorted(checksums.items()))] + else: + variants = [] + + errorif(test, cls.ECLUSTERFILECHECK, None, + "File %s found with %s different checksums (%s)", + filename, len(checksums), "; ".join(variants)) def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper, drbd_map): @@ -1858,6 +1936,7 @@ class LUClusterVerify(LogicalUnit): assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?" + beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l] for os_name, os_data in nimg.oslist.items(): assert os_data, "Empty OS status for OS %s?!" % os_name f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0] @@ -1885,11 +1964,12 @@ class LUClusterVerify(LogicalUnit): continue for kind, a, b in [("API version", f_api, b_api), ("variants list", f_var, b_var), - ("parameters", f_param, b_param)]: + ("parameters", beautify_params(f_param), + beautify_params(b_param))]: _ErrorIf(a != b, self.ENODEOS, node, - "OS %s %s differs from reference node %s: %s vs. %s", + "OS %s for %s differs from reference node %s: [%s] vs. [%s]", kind, os_name, base.name, - utils.CommaJoin(a), utils.CommaJoin(b)) + utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b))) # check any missing OSes missing = set(base.oslist.keys()).difference(nimg.oslist.keys()) @@ -2173,19 +2253,14 @@ class LUClusterVerify(LogicalUnit): node_vol_should = {} # FIXME: verify OS list + + # File verification + filemap = _ComputeAncillaryFiles(cluster, False) + # do local checksums - master_files = [constants.CLUSTER_CONF_FILE] master_node = self.master_node = self.cfg.GetMasterNode() master_ip = self.cfg.GetMasterIP() - file_names = ssconf.SimpleStore().GetFileList() - file_names.extend(constants.ALL_CERT_FILES) - file_names.extend(master_files) - if cluster.modify_etc_hosts: - file_names.append(constants.ETC_HOSTS) - - local_checksums = utils.FingerprintFiles(file_names) - # Compute the set of hypervisor parameters hvp_data = [] for hv_name in hypervisors: @@ -2207,7 +2282,10 @@ class LUClusterVerify(LogicalUnit): feedback_fn("* Gathering data (%d nodes)" % len(nodelist)) node_verify_param = { - constants.NV_FILELIST: file_names, + constants.NV_FILELIST: + utils.UniqueSequence(filename + for files in filemap + for filename in files), constants.NV_NODELIST: [node.name for node in nodeinfo if not node.offline], constants.NV_HYPERVISOR: hypervisors, @@ -2289,6 +2367,9 @@ class LUClusterVerify(LogicalUnit): feedback_fn("* Gathering disk information (%s nodes)" % len(nodelist)) instdisk = self._CollectDiskInfo(nodelist, node_image, instanceinfo) + feedback_fn("* Verifying configuration file consistency") + self._VerifyFiles(_ErrorIf, nodeinfo, master_node, all_nvinfo, filemap) + feedback_fn("* Verifying node status") refos_img = None @@ -2326,9 +2407,6 @@ class LUClusterVerify(LogicalUnit): nimg.call_ok = self._VerifyNode(node_i, nresult) self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime) self._VerifyNodeNetwork(node_i, nresult) - self._VerifyNodeFiles(node_i, nresult, file_names, local_checksums, - master_files) - self._VerifyOob(node_i, nresult) if nimg.vm_capable: @@ -2571,10 +2649,7 @@ class LUClusterRepairDiskSizes(NoHooksLU): def ExpandNames(self): if self.op.instances: - self.wanted_names = [] - for name in self.op.instances: - full_name = _ExpandInstanceName(self.cfg, name) - self.wanted_names.append(full_name) + self.wanted_names = _GetWantedInstances(self, self.op.instances) self.needed_locks = { locking.LEVEL_NODE: [], locking.LEVEL_INSTANCE: self.wanted_names, @@ -2586,7 +2661,7 @@ class LUClusterRepairDiskSizes(NoHooksLU): locking.LEVEL_NODE: locking.ALL_SET, locking.LEVEL_INSTANCE: locking.ALL_SET, } - self.share_locks = dict(((i, 1) for i in locking.LEVELS)) + self.share_locks = dict.fromkeys(locking.LEVELS, 1) def DeclareLocks(self, level): if level == locking.LEVEL_NODE and self.wanted_names is not None: @@ -2599,7 +2674,7 @@ class LUClusterRepairDiskSizes(NoHooksLU): """ if self.wanted_names is None: - self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE] + self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE) self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name in self.wanted_names] @@ -2824,7 +2899,7 @@ class LUClusterSetParams(LogicalUnit): " drbd-based instances exist", errors.ECODE_INVAL) - node_list = self.acquired_locks[locking.LEVEL_NODE] + node_list = self.glm.list_owned(locking.LEVEL_NODE) # if vg_name not None, checks given volume group on all nodes if self.op.vg_name: @@ -2871,6 +2946,12 @@ class LUClusterSetParams(LogicalUnit): utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES) self.new_ndparams = cluster.SimpleFillND(self.op.ndparams) + # TODO: we need a more general way to handle resetting + # cluster-level parameters to default values + if self.new_ndparams["oob_program"] == "": + self.new_ndparams["oob_program"] = \ + constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM] + if self.op.nicparams: utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES) self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams) @@ -3112,6 +3193,50 @@ def _UploadHelper(lu, nodes, fname): lu.proc.LogWarning(msg) +def _ComputeAncillaryFiles(cluster, redist): + """Compute files external to Ganeti which need to be consistent. + + @type redist: boolean + @param redist: Whether to include files which need to be redistributed + + """ + # Compute files for all nodes + files_all = set([ + constants.SSH_KNOWN_HOSTS_FILE, + constants.CONFD_HMAC_KEY, + constants.CLUSTER_DOMAIN_SECRET_FILE, + ]) + + if not redist: + files_all.update(constants.ALL_CERT_FILES) + files_all.update(ssconf.SimpleStore().GetFileList()) + + if cluster.modify_etc_hosts: + files_all.add(constants.ETC_HOSTS) + + # Files which must either exist on all nodes or on none + files_all_opt = set([ + constants.RAPI_USERS_FILE, + ]) + + # Files which should only be on master candidates + files_mc = set() + if not redist: + files_mc.add(constants.CLUSTER_CONF_FILE) + + # Files which should only be on VM-capable nodes + files_vm = set(filename + for hv_name in cluster.enabled_hypervisors + for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()) + + # Filenames must be unique + assert (len(files_all | files_all_opt | files_mc | files_vm) == + sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \ + "Found file listed in more than one file list" + + return (files_all, files_all_opt, files_mc, files_vm) + + def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True): """Distribute additional files which are part of the cluster configuration. @@ -3125,40 +3250,42 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True): @param additional_vm: whether the additional nodes are vm-capable or not """ - # 1. Gather target nodes - myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode()) - dist_nodes = lu.cfg.GetOnlineNodeList() - nvm_nodes = lu.cfg.GetNonVmCapableNodeList() - vm_nodes = [name for name in dist_nodes if name not in nvm_nodes] + # Gather target nodes + cluster = lu.cfg.GetClusterInfo() + master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode()) + + online_nodes = lu.cfg.GetOnlineNodeList() + vm_nodes = lu.cfg.GetVmCapableNodeList() + if additional_nodes is not None: - dist_nodes.extend(additional_nodes) + online_nodes.extend(additional_nodes) if additional_vm: vm_nodes.extend(additional_nodes) - if myself.name in dist_nodes: - dist_nodes.remove(myself.name) - if myself.name in vm_nodes: - vm_nodes.remove(myself.name) - - # 2. Gather files to distribute - dist_files = set([constants.ETC_HOSTS, - constants.SSH_KNOWN_HOSTS_FILE, - constants.RAPI_CERT_FILE, - constants.RAPI_USERS_FILE, - constants.CONFD_HMAC_KEY, - constants.CLUSTER_DOMAIN_SECRET_FILE, - ]) - - vm_files = set() - enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors - for hv_name in enabled_hypervisors: - hv_class = hypervisor.GetHypervisor(hv_name) - vm_files.update(hv_class.GetAncillaryFiles()) - - # 3. Perform the files upload - for fname in dist_files: - _UploadHelper(lu, dist_nodes, fname) - for fname in vm_files: - _UploadHelper(lu, vm_nodes, fname) + + # Never distribute to master node + for nodelist in [online_nodes, vm_nodes]: + if master_info.name in nodelist: + nodelist.remove(master_info.name) + + # Gather file lists + (files_all, files_all_opt, files_mc, files_vm) = \ + _ComputeAncillaryFiles(cluster, True) + + # Never re-distribute configuration file from here + assert not (constants.CLUSTER_CONF_FILE in files_all or + constants.CLUSTER_CONF_FILE in files_vm) + assert not files_mc, "Master candidates not handled in this function" + + filemap = [ + (online_nodes, files_all), + (online_nodes, files_all_opt), + (vm_nodes, files_vm), + ] + + # Upload the files + for (node_list, files) in filemap: + for fname in files: + _UploadHelper(lu, node_list, fname) class LUClusterRedistConf(NoHooksLU): @@ -3299,6 +3426,20 @@ class LUOobCommand(NoHooksLU): REG_BGL = False _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE) + def ExpandNames(self): + """Gather locks we need. + + """ + if self.op.node_names: + self.op.node_names = _GetWantedNodes(self, self.op.node_names) + lock_names = self.op.node_names + else: + lock_names = locking.ALL_SET + + self.needed_locks = { + locking.LEVEL_NODE: lock_names, + } + def CheckPrereq(self): """Check prerequisites. @@ -3315,23 +3456,23 @@ class LUOobCommand(NoHooksLU): assert self.op.power_delay >= 0.0 if self.op.node_names: - if self.op.command in self._SKIP_MASTER: - if self.master_node in self.op.node_names: - master_node_obj = self.cfg.GetNodeInfo(self.master_node) - master_oob_handler = _SupportsOob(self.cfg, master_node_obj) - - if master_oob_handler: - additional_text = ("Run '%s %s %s' if you want to operate on the" - " master regardless") % (master_oob_handler, - self.op.command, - self.master_node) - else: - additional_text = "The master node does not support out-of-band" + if (self.op.command in self._SKIP_MASTER and + self.master_node in self.op.node_names): + master_node_obj = self.cfg.GetNodeInfo(self.master_node) + master_oob_handler = _SupportsOob(self.cfg, master_node_obj) + + if master_oob_handler: + additional_text = ("run '%s %s %s' if you want to operate on the" + " master regardless") % (master_oob_handler, + self.op.command, + self.master_node) + else: + additional_text = "it does not support out-of-band operations" - raise errors.OpPrereqError(("Operating on the master node %s is not" - " allowed for %s\n%s") % - (self.master_node, self.op.command, - additional_text), errors.ECODE_INVAL) + raise errors.OpPrereqError(("Operating on the master node %s is not" + " allowed for %s; %s") % + (self.master_node, self.op.command, + additional_text), errors.ECODE_INVAL) else: self.op.node_names = self.cfg.GetNodeList() if self.op.command in self._SKIP_MASTER: @@ -3355,21 +3496,6 @@ class LUOobCommand(NoHooksLU): " not marked offline") % node_name, errors.ECODE_STATE) - def ExpandNames(self): - """Gather locks we need. - - """ - if self.op.node_names: - self.op.node_names = [_ExpandNodeName(self.cfg, name) - for name in self.op.node_names] - lock_names = self.op.node_names - else: - lock_names = locking.ALL_SET - - self.needed_locks = { - locking.LEVEL_NODE: lock_names, - } - def Exec(self, feedback_fn): """Execute OOB and return result if we expect any. @@ -3377,7 +3503,8 @@ class LUOobCommand(NoHooksLU): master_node = self.master_node ret = [] - for idx, node in enumerate(self.nodes): + for idx, node in enumerate(utils.NiceSort(self.nodes, + key=lambda node: node.name)): node_entry = [(constants.RS_NORMAL, node.name)] ret.append(node_entry) @@ -3394,14 +3521,14 @@ class LUOobCommand(NoHooksLU): self.op.timeout) if result.fail_msg: - self.LogWarning("On node '%s' out-of-band RPC failed with: %s", + self.LogWarning("Out-of-band RPC failed on node '%s': %s", node.name, result.fail_msg) node_entry.append((constants.RS_NODATA, None)) else: try: self._CheckPayload(result) except errors.OpExecError, err: - self.LogWarning("The payload returned by '%s' is not valid: %s", + self.LogWarning("Payload returned by node '%s' is not valid: %s", node.name, err) node_entry.append((constants.RS_NODATA, None)) else: @@ -3410,8 +3537,8 @@ class LUOobCommand(NoHooksLU): for item, status in result.payload: if status in [constants.OOB_STATUS_WARNING, constants.OOB_STATUS_CRITICAL]: - self.LogWarning("On node '%s' item '%s' has status '%s'", - node.name, item, status) + self.LogWarning("Item '%s' on node '%s' has status '%s'", + item, node.name, status) if self.op.command == constants.OOB_POWER_ON: node.powered = True @@ -3540,7 +3667,10 @@ class _OsQuery(_QueryBase): """ # Locking is not used - assert not (lu.acquired_locks or self.do_locking or self.use_locking) + assert not (compat.any(lu.glm.is_owned(level) + for level in locking.LEVELS + if level != locking.LEVEL_CLUSTER) or + self.do_locking or self.use_locking) valid_nodes = [node.name for node in lu.cfg.GetAllNodesInfo().values() @@ -3681,15 +3811,14 @@ class LUNodeRemove(LogicalUnit): masternode = self.cfg.GetMasterNode() if node.name == masternode: - raise errors.OpPrereqError("Node is the master node," - " you need to failover first.", - errors.ECODE_INVAL) + raise errors.OpPrereqError("Node is the master node, failover to another" + " node is required", errors.ECODE_INVAL) for instance_name in instance_list: instance = self.cfg.GetInstanceInfo(instance_name) if node.name in instance.all_nodes: raise errors.OpPrereqError("Instance %s is still running on the node," - " please remove first." % instance_name, + " please remove first" % instance_name, errors.ECODE_INVAL) self.op.node_name = node.name self.node = node @@ -3847,7 +3976,7 @@ class LUNodeQueryvols(NoHooksLU): """Computes the list of nodes and their attributes. """ - nodenames = self.acquired_locks[locking.LEVEL_NODE] + nodenames = self.glm.list_owned(locking.LEVEL_NODE) volumes = self.rpc.call_node_volumes(nodenames) ilist = [self.cfg.GetInstanceInfo(iname) for iname @@ -3925,7 +4054,7 @@ class LUNodeQueryStorage(NoHooksLU): """Computes the list of nodes and their attributes. """ - self.nodes = self.acquired_locks[locking.LEVEL_NODE] + self.nodes = self.glm.list_owned(locking.LEVEL_NODE) # Always get name to sort by if constants.SF_NAME in self.op.output_fields: @@ -4037,10 +4166,16 @@ class _InstanceQuery(_QueryBase): bad_nodes.append(name) elif result.payload: for inst in result.payload: - if all_info[inst].primary_node == name: - live_data.update(result.payload) + if inst in all_info: + if all_info[inst].primary_node == name: + live_data.update(result.payload) + else: + wrongnode_inst.add(inst) else: - wrongnode_inst.add(inst) + # orphan instance; we don't list it here as we don't + # handle this case yet in the output of instance listing + logging.warning("Orphan instance '%s' found on node %s", + inst, name) # else no instance is alive else: live_data = {} @@ -4166,6 +4301,11 @@ class LUNodeAdd(LogicalUnit): self.hostname = netutils.GetHostname(name=self.op.node_name, family=self.primary_ip_family) self.op.node_name = self.hostname.name + + if self.op.readd and self.op.node_name == self.cfg.GetMasterNode(): + raise errors.OpPrereqError("Cannot readd the master node", + errors.ECODE_STATE) + if self.op.readd and self.op.group: raise errors.OpPrereqError("Cannot pass a node group when a node is" " being readded", errors.ECODE_INVAL) @@ -4401,7 +4541,7 @@ class LUNodeAdd(LogicalUnit): feedback_fn("ssh/hostname verification failed" " (checking from %s): %s" % (verifier, nl_payload[failed])) - raise errors.OpExecError("ssh/hostname verification failed.") + raise errors.OpExecError("ssh/hostname verification failed") if self.op.readd: _RedistributeAncillaryFiles(self) @@ -4484,21 +4624,22 @@ class LUNodeSetParams(LogicalUnit): # If we have locked all instances, before waiting to lock nodes, release # all the ones living on nodes unrelated to the current operation. if level == locking.LEVEL_NODE and self.lock_instances: - instances_release = [] - instances_keep = [] self.affected_instances = [] if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET: - for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]: + instances_keep = [] + + # Build list of instances to release + for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE): instance = self.context.cfg.GetInstanceInfo(instance_name) - i_mirrored = instance.disk_template in constants.DTS_INT_MIRROR - if i_mirrored and self.op.node_name in instance.all_nodes: + if (instance.disk_template in constants.DTS_INT_MIRROR and + self.op.node_name in instance.all_nodes): instances_keep.append(instance_name) self.affected_instances.append(instance) - else: - instances_release.append(instance_name) - if instances_release: - self.context.glm.release(locking.LEVEL_INSTANCE, instances_release) - self.acquired_locks[locking.LEVEL_INSTANCE] = instances_keep + + _ReleaseLocks(self, locking.LEVEL_INSTANCE, keep=instances_keep) + + assert (set(self.glm.list_owned(locking.LEVEL_INSTANCE)) == + set(instances_keep)) def BuildHooksEnv(self): """Build hooks env. @@ -4564,7 +4705,7 @@ class LUNodeSetParams(LogicalUnit): self.old_flags = old_flags = (node.master_candidate, node.drained, node.offline) - assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags) + assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags) self.old_role = old_role = self._F2R[old_flags] # Check for ineffective changes @@ -4580,12 +4721,12 @@ class LUNodeSetParams(LogicalUnit): if _SupportsOob(self.cfg, node): if self.op.offline is False and not (node.powered or self.op.powered == True): - raise errors.OpPrereqError(("Please power on node %s first before you" - " can reset offline state") % + raise errors.OpPrereqError(("Node %s needs to be turned on before its" + " offline status can be reset") % self.op.node_name) elif self.op.powered is not None: raise errors.OpPrereqError(("Unable to change powered state for node %s" - " which does not support out-of-band" + " as it does not support out-of-band" " handling") % self.op.node_name) # If we're being deofflined/drained, we'll MC ourself if needed @@ -5596,7 +5737,7 @@ class LUInstanceRecreateDisks(LogicalUnit): else: for idx in self.op.disks: if idx >= len(instance.disks): - raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx, + raise errors.OpPrereqError("Invalid disk index '%s'" % idx, errors.ECODE_INVAL) self.instance = instance @@ -5627,7 +5768,7 @@ class LUInstanceRename(LogicalUnit): """ if self.op.ip_check and not self.op.name_check: # TODO: make the ip check more flexible and not depend on the name check - raise errors.OpPrereqError("Cannot do ip check without a name check", + raise errors.OpPrereqError("IP address check requires a name check", errors.ECODE_INVAL) def BuildHooksEnv(self): @@ -5697,9 +5838,11 @@ class LUInstanceRename(LogicalUnit): rename_file_storage = True self.cfg.RenameInstance(inst.name, self.op.new_name) - # Change the instance lock. This is definitely safe while we hold the BGL - self.context.glm.remove(locking.LEVEL_INSTANCE, old_name) - self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name) + # Change the instance lock. This is definitely safe while we hold the BGL. + # Otherwise the new lock would have to be added in acquired mode. + assert self.REQ_BGL + self.glm.remove(locking.LEVEL_INSTANCE, old_name) + self.glm.add(locking.LEVEL_INSTANCE, self.op.new_name) # re-read the instance from the configuration after rename inst = self.cfg.GetInstanceInfo(self.op.new_name) @@ -5864,6 +6007,15 @@ class LUInstanceFailover(LogicalUnit): self.needed_locks[locking.LEVEL_NODE] = [] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + ignore_consistency = self.op.ignore_consistency + shutdown_timeout = self.op.shutdown_timeout + self._migrater = TLMigrateInstance(self, self.op.instance_name, + cleanup=False, + failover=True, + ignore_consistency=ignore_consistency, + shutdown_timeout=shutdown_timeout) + self.tasklets = [self._migrater] + def DeclareLocks(self, level): if level == locking.LEVEL_NODE: instance = self.context.cfg.GetInstanceInfo(self.op.instance_name) @@ -5883,13 +6035,14 @@ class LUInstanceFailover(LogicalUnit): This runs on master, primary and secondary nodes of the instance. """ - instance = self.instance + instance = self._migrater.instance source_node = instance.primary_node + target_node = self.op.target_node env = { "IGNORE_CONSISTENCY": self.op.ignore_consistency, "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout, "OLD_PRIMARY": source_node, - "NEW_PRIMARY": self.op.target_node, + "NEW_PRIMARY": target_node, } if instance.disk_template in constants.DTS_INT_MIRROR: @@ -5906,171 +6059,9 @@ class LUInstanceFailover(LogicalUnit): """Build hooks nodes. """ - nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes) - return (nl, nl + [self.instance.primary_node]) - - def CheckPrereq(self): - """Check prerequisites. - - This checks that the instance is in the cluster. - - """ - self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name) - assert self.instance is not None, \ - "Cannot retrieve locked instance %s" % self.op.instance_name - - bep = self.cfg.GetClusterInfo().FillBE(instance) - if instance.disk_template not in constants.DTS_MIRRORED: - raise errors.OpPrereqError("Instance's disk layout is not" - " mirrored, cannot failover.", - errors.ECODE_STATE) - - if instance.disk_template in constants.DTS_EXT_MIRROR: - _CheckIAllocatorOrNode(self, "iallocator", "target_node") - if self.op.iallocator: - self._RunAllocator() - # Release all unnecessary node locks - nodes_keep = [instance.primary_node, self.op.target_node] - nodes_rel = [node for node in self.acquired_locks[locking.LEVEL_NODE] - if node not in nodes_keep] - self.context.glm.release(locking.LEVEL_NODE, nodes_rel) - self.acquired_locks[locking.LEVEL_NODE] = nodes_keep - - # self.op.target_node is already populated, either directly or by the - # iallocator run - target_node = self.op.target_node - - else: - secondary_nodes = instance.secondary_nodes - if not secondary_nodes: - raise errors.ConfigurationError("No secondary node but using" - " %s disk template" % - instance.disk_template) - target_node = secondary_nodes[0] - - if self.op.iallocator or (self.op.target_node and - self.op.target_node != target_node): - raise errors.OpPrereqError("Instances with disk template %s cannot" - " be failed over to arbitrary nodes" - " (neither an iallocator nor a target" - " node can be passed)" % - instance.disk_template, errors.ECODE_INVAL) - _CheckNodeOnline(self, target_node) - _CheckNodeNotDrained(self, target_node) - - # Save target_node so that we can use it in BuildHooksEnv - self.op.target_node = target_node - - if instance.admin_up: - # check memory requirements on the secondary node - _CheckNodeFreeMemory(self, target_node, "failing over instance %s" % - instance.name, bep[constants.BE_MEMORY], - instance.hypervisor) - else: - self.LogInfo("Not checking memory on the secondary node as" - " instance will not be started") - - # check bridge existance - _CheckInstanceBridgesExist(self, instance, node=target_node) - - def Exec(self, feedback_fn): - """Failover an instance. - - The failover is done by shutting it down on its present node and - starting it on the secondary. - - """ - instance = self.instance - primary_node = self.cfg.GetNodeInfo(instance.primary_node) - - source_node = instance.primary_node - target_node = self.op.target_node - - if instance.admin_up: - feedback_fn("* checking disk consistency between source and target") - for dev in instance.disks: - # for drbd, these are drbd over lvm - if not _CheckDiskConsistency(self, dev, target_node, False): - if not self.op.ignore_consistency: - raise errors.OpExecError("Disk %s is degraded on target node," - " aborting failover." % dev.iv_name) - else: - feedback_fn("* not checking disk consistency as instance is not running") - - feedback_fn("* shutting down instance on source node") - logging.info("Shutting down instance %s on node %s", - instance.name, source_node) - - result = self.rpc.call_instance_shutdown(source_node, instance, - self.op.shutdown_timeout) - msg = result.fail_msg - if msg: - if self.op.ignore_consistency or primary_node.offline: - self.proc.LogWarning("Could not shutdown instance %s on node %s." - " Proceeding anyway. Please make sure node" - " %s is down. Error details: %s", - instance.name, source_node, source_node, msg) - else: - raise errors.OpExecError("Could not shutdown instance %s on" - " node %s: %s" % - (instance.name, source_node, msg)) - - feedback_fn("* deactivating the instance's disks on source node") - if not _ShutdownInstanceDisks(self, instance, ignore_primary=True): - raise errors.OpExecError("Can't shut down the instance's disks.") - - instance.primary_node = target_node - # distribute new instance config to the other nodes - self.cfg.Update(instance, feedback_fn) - - # Only start the instance if it's marked as up - if instance.admin_up: - feedback_fn("* activating the instance's disks on target node") - logging.info("Starting instance %s on node %s", - instance.name, target_node) - - disks_ok, _ = _AssembleInstanceDisks(self, instance, - ignore_secondaries=True) - if not disks_ok: - _ShutdownInstanceDisks(self, instance) - raise errors.OpExecError("Can't activate the instance's disks") - - feedback_fn("* starting the instance on the target node") - result = self.rpc.call_instance_start(target_node, instance, None, None) - msg = result.fail_msg - if msg: - _ShutdownInstanceDisks(self, instance) - raise errors.OpExecError("Could not start instance %s on node %s: %s" % - (instance.name, target_node, msg)) - - def _RunAllocator(self): - """Run the allocator based on input opcode. - - """ - ial = IAllocator(self.cfg, self.rpc, - mode=constants.IALLOCATOR_MODE_RELOC, - name=self.instance.name, - # TODO See why hail breaks with a single node below - relocate_from=[self.instance.primary_node, - self.instance.primary_node], - ) - - ial.Run(self.op.iallocator) - - if not ial.success: - raise errors.OpPrereqError("Can't compute nodes using" - " iallocator '%s': %s" % - (self.op.iallocator, ial.info), - errors.ECODE_NORES) - if len(ial.result) != ial.required_nodes: - raise errors.OpPrereqError("iallocator '%s' returned invalid number" - " of nodes (%s), required %s" % - (self.op.iallocator, len(ial.result), - ial.required_nodes), errors.ECODE_FAULT) - self.op.target_node = ial.result[0] - self.LogInfo("Selected nodes for instance %s via iallocator %s: %s", - self.instance.name, self.op.iallocator, - utils.CommaJoin(ial.result)) + instance = self._migrater.instance + nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes) + return (nl, nl + [instance.primary_node]) class LUInstanceMigrate(LogicalUnit): @@ -6094,8 +6085,9 @@ class LUInstanceMigrate(LogicalUnit): self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE self._migrater = TLMigrateInstance(self, self.op.instance_name, - self.op.cleanup, self.op.iallocator, - self.op.target_node) + cleanup=self.op.cleanup, + failover=False, + fallback=self.op.allow_failover) self.tasklets = [self._migrater] def DeclareLocks(self, level): @@ -6119,7 +6111,7 @@ class LUInstanceMigrate(LogicalUnit): """ instance = self._migrater.instance source_node = instance.primary_node - target_node = self._migrater.target_node + target_node = self.op.target_node env = _BuildInstanceHookEnvByObject(self, instance) env.update({ "MIGRATE_LIVE": self._migrater.live, @@ -6355,8 +6347,7 @@ class LUNodeMigrate(LogicalUnit): logging.debug("Migrating instance %s", inst.name) names.append(inst.name) - tasklets.append(TLMigrateInstance(self, inst.name, False, - self.op.iallocator, None)) + tasklets.append(TLMigrateInstance(self, inst.name, cleanup=False)) if inst.disk_template in constants.DTS_EXT_MIRROR: # We need to lock all nodes, as the iallocator will choose the @@ -6403,10 +6394,28 @@ class TLMigrateInstance(Tasklet): @type live: boolean @ivar live: whether the migration will be done live or non-live; this variable is initalized only after CheckPrereq has run + @type cleanup: boolean + @ivar cleanup: Wheater we cleanup from a failed migration + @type iallocator: string + @ivar iallocator: The iallocator used to determine target_node + @type target_node: string + @ivar target_node: If given, the target_node to reallocate the instance to + @type failover: boolean + @ivar failover: Whether operation results in failover or migration + @type fallback: boolean + @ivar fallback: Whether fallback to failover is allowed if migration not + possible + @type ignore_consistency: boolean + @ivar ignore_consistency: Wheter we should ignore consistency between source + and target node + @type shutdown_timeout: int + @ivar shutdown_timeout: In case of failover timeout of the shutdown """ - def __init__(self, lu, instance_name, cleanup, - iallocator=None, target_node=None): + def __init__(self, lu, instance_name, cleanup=False, + failover=False, fallback=False, + ignore_consistency=False, + shutdown_timeout=constants.DEFAULT_SHUTDOWN_TIMEOUT): """Initializes this class. """ @@ -6416,8 +6425,10 @@ class TLMigrateInstance(Tasklet): self.instance_name = instance_name self.cleanup = cleanup self.live = False # will be overridden later - self.iallocator = iallocator - self.target_node = target_node + self.failover = failover + self.fallback = fallback + self.ignore_consistency = ignore_consistency + self.shutdown_timeout = shutdown_timeout def CheckPrereq(self): """Check prerequisites. @@ -6430,28 +6441,40 @@ class TLMigrateInstance(Tasklet): assert instance is not None self.instance = instance + if (not self.cleanup and not instance.admin_up and not self.failover and + self.fallback): + self.lu.LogInfo("Instance is marked down, fallback allowed, switching" + " to failover") + self.failover = True + if instance.disk_template not in constants.DTS_MIRRORED: + if self.failover: + text = "failovers" + else: + text = "migrations" raise errors.OpPrereqError("Instance's disk layout '%s' does not allow" - " migrations" % instance.disk_template, + " %s" % (instance.disk_template, text), errors.ECODE_STATE) if instance.disk_template in constants.DTS_EXT_MIRROR: _CheckIAllocatorOrNode(self.lu, "iallocator", "target_node") - if self.iallocator: + if self.lu.op.iallocator: self._RunAllocator() + else: + # We set set self.target_node as it is required by + # BuildHooksEnv + self.target_node = self.lu.op.target_node # self.target_node is already populated, either directly or by the # iallocator run target_node = self.target_node if len(self.lu.tasklets) == 1: - # It is safe to remove locks only when we're the only tasklet in the LU - nodes_keep = [instance.primary_node, self.target_node] - nodes_rel = [node for node in self.lu.acquired_locks[locking.LEVEL_NODE] - if node not in nodes_keep] - self.lu.context.glm.release(locking.LEVEL_NODE, nodes_rel) - self.lu.acquired_locks[locking.LEVEL_NODE] = nodes_keep + # It is safe to release locks only when we're the only tasklet + # in the LU + _ReleaseLocks(self.lu, locking.LEVEL_NODE, + keep=[instance.primary_node, self.target_node]) else: secondary_nodes = instance.secondary_nodes @@ -6462,29 +6485,69 @@ class TLMigrateInstance(Tasklet): target_node = secondary_nodes[0] if self.lu.op.iallocator or (self.lu.op.target_node and self.lu.op.target_node != target_node): + if self.failover: + text = "failed over" + else: + text = "migrated" raise errors.OpPrereqError("Instances with disk template %s cannot" - " be migrated over to arbitrary nodes" + " be %s to arbitrary nodes" " (neither an iallocator nor a target" " node can be passed)" % - instance.disk_template, errors.ECODE_INVAL) + (instance.disk_template, text), + errors.ECODE_INVAL) i_be = self.cfg.GetClusterInfo().FillBE(instance) # check memory requirements on the secondary node - _CheckNodeFreeMemory(self.lu, target_node, "migrating instance %s" % - instance.name, i_be[constants.BE_MEMORY], - instance.hypervisor) + if not self.failover or instance.admin_up: + _CheckNodeFreeMemory(self.lu, target_node, "migrating instance %s" % + instance.name, i_be[constants.BE_MEMORY], + instance.hypervisor) + else: + self.lu.LogInfo("Not checking memory on the secondary node as" + " instance will not be started") # check bridge existance _CheckInstanceBridgesExist(self.lu, instance, node=target_node) if not self.cleanup: _CheckNodeNotDrained(self.lu, target_node) - result = self.rpc.call_instance_migratable(instance.primary_node, - instance) - result.Raise("Can't migrate, please use failover", - prereq=True, ecode=errors.ECODE_STATE) + if not self.failover: + result = self.rpc.call_instance_migratable(instance.primary_node, + instance) + if result.fail_msg and self.fallback: + self.lu.LogInfo("Can't migrate, instance offline, fallback to" + " failover") + self.failover = True + else: + result.Raise("Can't migrate, please use failover", + prereq=True, ecode=errors.ECODE_STATE) + assert not (self.failover and self.cleanup) + + if not self.failover: + if self.lu.op.live is not None and self.lu.op.mode is not None: + raise errors.OpPrereqError("Only one of the 'live' and 'mode'" + " parameters are accepted", + errors.ECODE_INVAL) + if self.lu.op.live is not None: + if self.lu.op.live: + self.lu.op.mode = constants.HT_MIGRATION_LIVE + else: + self.lu.op.mode = constants.HT_MIGRATION_NONLIVE + # reset the 'live' parameter to None so that repeated + # invocations of CheckPrereq do not raise an exception + self.lu.op.live = None + elif self.lu.op.mode is None: + # read the default value from the hypervisor + i_hv = self.cfg.GetClusterInfo().FillHV(self.instance, + skip_globals=False) + self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE] + + self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE + else: + # Failover is never live + self.live = False def _RunAllocator(self): """Run the allocator based on input opcode. @@ -6498,42 +6561,23 @@ class TLMigrateInstance(Tasklet): self.instance.primary_node], ) - ial.Run(self.iallocator) + ial.Run(self.lu.op.iallocator) if not ial.success: raise errors.OpPrereqError("Can't compute nodes using" " iallocator '%s': %s" % - (self.iallocator, ial.info), + (self.lu.op.iallocator, ial.info), errors.ECODE_NORES) if len(ial.result) != ial.required_nodes: raise errors.OpPrereqError("iallocator '%s' returned invalid number" " of nodes (%s), required %s" % - (self.iallocator, len(ial.result), + (self.lu.op.iallocator, len(ial.result), ial.required_nodes), errors.ECODE_FAULT) self.target_node = ial.result[0] self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s", - self.instance_name, self.iallocator, + self.instance_name, self.lu.op.iallocator, utils.CommaJoin(ial.result)) - if self.lu.op.live is not None and self.lu.op.mode is not None: - raise errors.OpPrereqError("Only one of the 'live' and 'mode'" - " parameters are accepted", - errors.ECODE_INVAL) - if self.lu.op.live is not None: - if self.lu.op.live: - self.lu.op.mode = constants.HT_MIGRATION_LIVE - else: - self.lu.op.mode = constants.HT_MIGRATION_NONLIVE - # reset the 'live' parameter to None so that repeated - # invocations of CheckPrereq do not raise an exception - self.lu.op.live = None - elif self.lu.op.mode is None: - # read the default value from the hypervisor - i_hv = self.cfg.GetClusterInfo().FillHV(self.instance, skip_globals=False) - self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE] - - self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE - def _WaitUntilSync(self): """Poll with custom rpc for disk sync. @@ -6627,15 +6671,15 @@ class TLMigrateInstance(Tasklet): if runningon_source and runningon_target: raise errors.OpExecError("Instance seems to be running on two nodes," - " or the hypervisor is confused. You will have" + " or the hypervisor is confused; you will have" " to ensure manually that it runs only on one" - " and restart this operation.") + " and restart this operation") if not (runningon_source or runningon_target): - raise errors.OpExecError("Instance does not seem to be running at all." - " In this case, it's safer to repair by" + raise errors.OpExecError("Instance does not seem to be running at all;" + " in this case it's safer to repair by" " running 'gnt-instance stop' to ensure disk" - " shutdown, and then restarting it.") + " shutdown, and then restarting it") if runningon_target: # the migration has actually succeeded, we need to update the config @@ -6677,10 +6721,9 @@ class TLMigrateInstance(Tasklet): self._GoReconnect(False) self._WaitUntilSync() except errors.OpExecError, err: - self.lu.LogWarning("Migration failed and I can't reconnect the" - " drives: error '%s'\n" - "Please look and recover the instance status" % - str(err)) + self.lu.LogWarning("Migration failed and I can't reconnect the drives," + " please try to recover the instance manually;" + " error '%s'" % str(err)) def _AbortMigration(self): """Call the hypervisor code to abort a started migration. @@ -6722,7 +6765,7 @@ class TLMigrateInstance(Tasklet): if not _CheckDiskConsistency(self.lu, dev, target_node, False): raise errors.OpExecError("Disk %s is degraded or not fully" " synchronized on target node," - " aborting migrate." % dev.iv_name) + " aborting migration" % dev.iv_name) # First get the migration information from the remote node result = self.rpc.call_migration_info(source_node, instance) @@ -6759,7 +6802,6 @@ class TLMigrateInstance(Tasklet): (instance.name, msg)) self.feedback_fn("* migrating instance to %s" % target_node) - time.sleep(10) result = self.rpc.call_instance_migrate(source_node, instance, self.nodes_ip[target_node], self.live) @@ -6772,7 +6814,6 @@ class TLMigrateInstance(Tasklet): self._RevertDiskStatus() raise errors.OpExecError("Could not migrate instance %s: %s" % (instance.name, msg)) - time.sleep(10) instance.primary_node = target_node # distribute new instance config to the other nodes @@ -6798,14 +6839,82 @@ class TLMigrateInstance(Tasklet): self.feedback_fn("* done") + def _ExecFailover(self): + """Failover an instance. + + The failover is done by shutting it down on its present node and + starting it on the secondary. + + """ + instance = self.instance + primary_node = self.cfg.GetNodeInfo(instance.primary_node) + + source_node = instance.primary_node + target_node = self.target_node + + if instance.admin_up: + self.feedback_fn("* checking disk consistency between source and target") + for dev in instance.disks: + # for drbd, these are drbd over lvm + if not _CheckDiskConsistency(self, dev, target_node, False): + if not self.ignore_consistency: + raise errors.OpExecError("Disk %s is degraded on target node," + " aborting failover" % dev.iv_name) + else: + self.feedback_fn("* not checking disk consistency as instance is not" + " running") + + self.feedback_fn("* shutting down instance on source node") + logging.info("Shutting down instance %s on node %s", + instance.name, source_node) + + result = self.rpc.call_instance_shutdown(source_node, instance, + self.shutdown_timeout) + msg = result.fail_msg + if msg: + if self.ignore_consistency or primary_node.offline: + self.lu.LogWarning("Could not shutdown instance %s on node %s," + " proceeding anyway; please make sure node" + " %s is down; error details: %s", + instance.name, source_node, source_node, msg) + else: + raise errors.OpExecError("Could not shutdown instance %s on" + " node %s: %s" % + (instance.name, source_node, msg)) + + self.feedback_fn("* deactivating the instance's disks on source node") + if not _ShutdownInstanceDisks(self, instance, ignore_primary=True): + raise errors.OpExecError("Can't shut down the instance's disks.") + + instance.primary_node = target_node + # distribute new instance config to the other nodes + self.cfg.Update(instance, self.feedback_fn) + + # Only start the instance if it's marked as up + if instance.admin_up: + self.feedback_fn("* activating the instance's disks on target node") + logging.info("Starting instance %s on node %s", + instance.name, target_node) + + disks_ok, _ = _AssembleInstanceDisks(self, instance, + ignore_secondaries=True) + if not disks_ok: + _ShutdownInstanceDisks(self, instance) + raise errors.OpExecError("Can't activate the instance's disks") + + self.feedback_fn("* starting the instance on the target node") + result = self.rpc.call_instance_start(target_node, instance, None, None) + msg = result.fail_msg + if msg: + _ShutdownInstanceDisks(self, instance) + raise errors.OpExecError("Could not start instance %s on node %s: %s" % + (instance.name, target_node, msg)) + def Exec(self, feedback_fn): """Perform the migration. """ - feedback_fn("Migrating instance %s" % self.instance.name) - self.feedback_fn = feedback_fn - self.source_node = self.instance.primary_node # FIXME: if we implement migrate-to-any in DRBD, this needs fixing @@ -6820,10 +6929,16 @@ class TLMigrateInstance(Tasklet): self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip, } - if self.cleanup: - return self._ExecCleanup() + if self.failover: + feedback_fn("Failover instance %s" % self.instance.name) + self._ExecFailover() else: - return self._ExecMigration() + feedback_fn("Migrating instance %s" % self.instance.name) + + if self.cleanup: + return self._ExecCleanup() + else: + return self._ExecMigration() def _CreateBlockDev(lu, node, instance, device, force_create, @@ -6911,17 +7026,18 @@ def _GenerateUniqueNames(lu, exts): return results -def _GenerateDRBD8Branch(lu, primary, secondary, size, vgname, names, iv_name, - p_minor, s_minor): +def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names, + iv_name, p_minor, s_minor): """Generate a drbd8 device complete with its children. """ + assert len(vgnames) == len(names) == 2 port = lu.cfg.AllocatePort() shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId()) dev_data = objects.Disk(dev_type=constants.LD_LV, size=size, - logical_id=(vgname, names[0])) + logical_id=(vgnames[0], names[0])) dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128, - logical_id=(vgname, names[1])) + logical_id=(vgnames[1], names[1])) drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size, logical_id=(primary, secondary, port, p_minor, s_minor, @@ -6976,9 +7092,11 @@ def _GenerateDiskTemplate(lu, template_name, names.append(lv_prefix + "_meta") for idx, disk in enumerate(disk_info): disk_index = idx + base_index - vg = disk.get(constants.IDISK_VG, vgname) + data_vg = disk.get(constants.IDISK_VG, vgname) + meta_vg = disk.get(constants.IDISK_METAVG, data_vg) disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node, - disk[constants.IDISK_SIZE], vg, + disk[constants.IDISK_SIZE], + [data_vg, meta_vg], names[idx * 2:idx * 2 + 2], "disk/%d" % disk_index, minors[idx * 2], minors[idx * 2 + 1]) @@ -7080,14 +7198,17 @@ def _WipeDisks(lu, instance): try: for idx, device in enumerate(instance.disks): - lu.LogInfo("* Wiping disk %d", idx) - logging.info("Wiping disk %d for instance %s, node %s", - idx, instance.name, node) - # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but # MAX_WIPE_CHUNK at max wipe_chunk_size = min(constants.MAX_WIPE_CHUNK, device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT) + # we _must_ make this an int, otherwise rounding errors will + # occur + wipe_chunk_size = int(wipe_chunk_size) + + lu.LogInfo("* Wiping disk %d", idx) + logging.info("Wiping disk %d for instance %s, node %s using" + " chunk size %s", idx, instance.name, node, wipe_chunk_size) offset = 0 size = device.size @@ -7096,6 +7217,8 @@ def _WipeDisks(lu, instance): while offset < size: wipe_size = min(wipe_chunk_size, size - offset) + logging.debug("Wiping disk %d, offset %s, chunk %s", + idx, offset, wipe_size) result = lu.rpc.call_blockdev_wipe(node, device, offset, wipe_size) result.Raise("Could not wipe disk %d at offset %d for size %d" % (idx, offset, wipe_size)) @@ -7113,8 +7236,8 @@ def _WipeDisks(lu, instance): for idx, success in enumerate(result.payload): if not success: - lu.LogWarning("Warning: Resume sync of disk %d failed. Please have a" - " look at the status and troubleshoot the issue.", idx) + lu.LogWarning("Resume sync of disk %d failed, please have a" + " look at the status and troubleshoot the issue", idx) logging.warn("resume-sync of instance %s for disks %d failed", instance.name, idx) @@ -7363,8 +7486,8 @@ class LUInstanceCreate(LogicalUnit): if self.op.ip_check and not self.op.name_check: # TODO: make the ip check more flexible and not depend on the name check - raise errors.OpPrereqError("Cannot do ip check without a name check", - errors.ECODE_INVAL) + raise errors.OpPrereqError("Cannot do IP address check without a name" + " check", errors.ECODE_INVAL) # check nics' parameter names for nic in self.op.nics: @@ -7542,7 +7665,7 @@ class LUInstanceCreate(LogicalUnit): self.op.src_node = None if os.path.isabs(src_path): raise errors.OpPrereqError("Importing an instance from an absolute" - " path requires a source node option.", + " path requires a source node option", errors.ECODE_INVAL) else: self.op.src_node = src_node = _ExpandNodeName(self.cfg, src_node) @@ -7644,7 +7767,7 @@ class LUInstanceCreate(LogicalUnit): src_path = self.op.src_path if src_node is None: - locked_nodes = self.acquired_locks[locking.LEVEL_NODE] + locked_nodes = self.glm.list_owned(locking.LEVEL_NODE) exp_list = self.rpc.call_export_list(locked_nodes) found = False for node in exp_list: @@ -7894,10 +8017,13 @@ class LUInstanceCreate(LogicalUnit): except (TypeError, ValueError): raise errors.OpPrereqError("Invalid disk size '%s'" % size, errors.ECODE_INVAL) + + data_vg = disk.get(constants.IDISK_VG, default_vg) new_disk = { constants.IDISK_SIZE: size, constants.IDISK_MODE: mode, - constants.IDISK_VG: disk.get(constants.IDISK_VG, default_vg), + constants.IDISK_VG: data_vg, + constants.IDISK_METAVG: disk.get(constants.IDISK_METAVG, data_vg), } if constants.IDISK_ADOPT in disk: new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT] @@ -7988,7 +8114,7 @@ class LUInstanceCreate(LogicalUnit): if self.op.disk_template in constants.DTS_INT_MIRROR: if self.op.snode == pnode.name: raise errors.OpPrereqError("The secondary node cannot be the" - " primary node.", errors.ECODE_INVAL) + " primary node", errors.ECODE_INVAL) _CheckNodeOnline(self, self.op.snode) _CheckNodeNotDrained(self, self.op.snode) _CheckNodeVmCapable(self, self.op.snode) @@ -8166,18 +8292,6 @@ class LUInstanceCreate(LogicalUnit): self.cfg.ReleaseDRBDMinors(instance) raise - if self.cfg.GetClusterInfo().prealloc_wipe_disks: - feedback_fn("* wiping instance disks...") - try: - _WipeDisks(self, iobj) - except errors.OpExecError: - self.LogWarning("Device wiping failed, reverting...") - try: - _RemoveDisks(self, iobj) - finally: - self.cfg.ReleaseDRBDMinors(instance) - raise - feedback_fn("adding instance %s to cluster config" % instance) self.cfg.AddInstance(iobj, self.proc.GetECId()) @@ -8185,18 +8299,28 @@ class LUInstanceCreate(LogicalUnit): # Declare that we don't want to remove the instance lock anymore, as we've # added the instance to the config del self.remove_locks[locking.LEVEL_INSTANCE] - # Unlock all the nodes + if self.op.mode == constants.INSTANCE_IMPORT: - nodes_keep = [self.op.src_node] - nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE] - if node != self.op.src_node] - self.context.glm.release(locking.LEVEL_NODE, nodes_release) - self.acquired_locks[locking.LEVEL_NODE] = nodes_keep + # Release unused nodes + _ReleaseLocks(self, locking.LEVEL_NODE, keep=[self.op.src_node]) else: - self.context.glm.release(locking.LEVEL_NODE) - del self.acquired_locks[locking.LEVEL_NODE] + # Release all nodes + _ReleaseLocks(self, locking.LEVEL_NODE) - if self.op.wait_for_sync: + disk_abort = False + if not self.adopt_disks and self.cfg.GetClusterInfo().prealloc_wipe_disks: + feedback_fn("* wiping instance disks...") + try: + _WipeDisks(self, iobj) + except errors.OpExecError, err: + logging.exception("Wiping disks failed") + self.LogWarning("Wiping instance disks failed (%s)", err) + disk_abort = True + + if disk_abort: + # Something is already wrong with the disks, don't do anything else + pass + elif self.op.wait_for_sync: disk_abort = not _WaitForSync(self, iobj) elif iobj.disk_template in constants.DTS_INT_MIRROR: # make sure the disks are not degraded (still sync-ing is ok) @@ -8380,24 +8504,29 @@ class LUInstanceReplaceDisks(LogicalUnit): def ExpandNames(self): self._ExpandAndLockInstance() - if self.op.iallocator is not None: - self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + assert locking.LEVEL_NODE not in self.needed_locks + assert locking.LEVEL_NODEGROUP not in self.needed_locks + + assert self.op.iallocator is None or self.op.remote_node is None, \ + "Conflicting options" - elif self.op.remote_node is not None: - remote_node = _ExpandNodeName(self.cfg, self.op.remote_node) - self.op.remote_node = remote_node + if self.op.remote_node is not None: + self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node) # Warning: do not remove the locking of the new secondary here # unless DRBD8.AddChildren is changed to work in parallel; # currently it doesn't since parallel invocations of # FindUnusedMinor will conflict - self.needed_locks[locking.LEVEL_NODE] = [remote_node] + self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND - else: self.needed_locks[locking.LEVEL_NODE] = [] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + if self.op.iallocator is not None: + # iallocator will select a new node in the same group + self.needed_locks[locking.LEVEL_NODEGROUP] = [] + self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode, self.op.iallocator, self.op.remote_node, self.op.disks, False, self.op.early_release) @@ -8405,11 +8534,26 @@ class LUInstanceReplaceDisks(LogicalUnit): self.tasklets = [self.replacer] def DeclareLocks(self, level): - # If we're not already locking all nodes in the set we have to declare the - # instance's primary/secondary nodes. - if (level == locking.LEVEL_NODE and - self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET): - self._LockInstancesNodes() + if level == locking.LEVEL_NODEGROUP: + assert self.op.remote_node is None + assert self.op.iallocator is not None + assert not self.needed_locks[locking.LEVEL_NODEGROUP] + + self.share_locks[locking.LEVEL_NODEGROUP] = 1 + self.needed_locks[locking.LEVEL_NODEGROUP] = \ + self.cfg.GetInstanceNodeGroups(self.op.instance_name) + + elif level == locking.LEVEL_NODE: + if self.op.iallocator is not None: + assert self.op.remote_node is None + assert not self.needed_locks[locking.LEVEL_NODE] + + # Lock member nodes of all locked groups + self.needed_locks[locking.LEVEL_NODE] = [node_name + for group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP) + for node_name in self.cfg.GetNodeGroup(group_uuid).members] + else: + self._LockInstancesNodes() def BuildHooksEnv(self): """Build hooks env. @@ -8439,6 +8583,26 @@ class LUInstanceReplaceDisks(LogicalUnit): nl.append(self.op.remote_node) return nl, nl + def CheckPrereq(self): + """Check prerequisites. + + """ + assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or + self.op.iallocator is None) + + owned_groups = self.glm.list_owned(locking.LEVEL_NODEGROUP) + if owned_groups: + groups = self.cfg.GetInstanceNodeGroups(self.op.instance_name) + if owned_groups != groups: + raise errors.OpExecError("Node groups used by instance '%s' changed" + " since lock was acquired, current list is %r," + " used to be '%s'" % + (self.op.instance_name, + utils.CommaJoin(groups), + utils.CommaJoin(owned_groups))) + + return LogicalUnit.CheckPrereq(self) + class TLReplaceDisks(Tasklet): """Replaces disks for an instance. @@ -8550,7 +8714,6 @@ class TLReplaceDisks(Tasklet): return True - def CheckPrereq(self): """Check prerequisites. @@ -8592,20 +8755,23 @@ class TLReplaceDisks(Tasklet): remote_node = self._RunAllocator(self.lu, self.iallocator_name, instance.name, instance.secondary_nodes) - if remote_node is not None: + if remote_node is None: + self.remote_node_info = None + else: + assert remote_node in self.lu.glm.list_owned(locking.LEVEL_NODE), \ + "Remote node '%s' is not locked" % remote_node + self.remote_node_info = self.cfg.GetNodeInfo(remote_node) assert self.remote_node_info is not None, \ "Cannot retrieve locked node %s" % remote_node - else: - self.remote_node_info = None if remote_node == self.instance.primary_node: raise errors.OpPrereqError("The specified node is the primary node of" - " the instance.", errors.ECODE_INVAL) + " the instance", errors.ECODE_INVAL) if remote_node == secondary_node: raise errors.OpPrereqError("The specified node is already the" - " secondary node of the instance.", + " secondary node of the instance", errors.ECODE_INVAL) if self.disks and self.mode in (constants.REPLACE_DISK_AUTO, @@ -8681,18 +8847,26 @@ class TLReplaceDisks(Tasklet): for node in check_nodes: _CheckNodeOnline(self.lu, node) + touched_nodes = frozenset(node_name for node_name in [self.new_node, + self.other_node, + self.target_node] + if node_name is not None) + + # Release unneeded node locks + _ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes) + + # Release any owned node group + if self.lu.glm.is_owned(locking.LEVEL_NODEGROUP): + _ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP) + # Check whether disks are valid for disk_idx in self.disks: instance.FindDisk(disk_idx) # Get secondary node IP addresses - node_2nd_ip = {} - - for node_name in [self.target_node, self.other_node, self.new_node]: - if node_name is not None: - node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip - - self.node_secondary_ip = node_2nd_ip + self.node_secondary_ip = \ + dict((node_name, self.cfg.GetNodeInfo(node_name).secondary_ip) + for node_name in touched_nodes) def Exec(self, feedback_fn): """Execute disk replacement. @@ -8703,6 +8877,20 @@ class TLReplaceDisks(Tasklet): if self.delay_iallocator: self._CheckPrereq2() + if __debug__: + # Verify owned locks before starting operation + owned_locks = self.lu.glm.list_owned(locking.LEVEL_NODE) + assert set(owned_locks) == set(self.node_secondary_ip), \ + ("Incorrect node locks, owning %s, expected %s" % + (owned_locks, self.node_secondary_ip.keys())) + + owned_locks = self.lu.glm.list_owned(locking.LEVEL_INSTANCE) + assert list(owned_locks) == [self.instance_name], \ + "Instance '%s' not locked" % self.instance_name + + assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \ + "Should not own any node group lock at this point" + if not self.disks: feedback_fn("No disks need replacement") return @@ -8723,14 +8911,24 @@ class TLReplaceDisks(Tasklet): else: fn = self._ExecDrbd8DiskOnly - return fn(feedback_fn) - + result = fn(feedback_fn) finally: # Deactivate the instance disks if we're replacing them on a # down instance if activate_disks: _SafeShutdownInstanceDisks(self.lu, self.instance) + if __debug__: + # Verify owned locks + owned_locks = self.lu.glm.list_owned(locking.LEVEL_NODE) + nodes = frozenset(self.node_secondary_ip) + assert ((self.early_release and not owned_locks) or + (not self.early_release and not (set(owned_locks) - nodes))), \ + ("Not owning the correct locks, early_release=%s, owned=%r," + " nodes=%r" % (self.early_release, owned_locks, nodes)) + + return result + def _CheckVolumeGroup(self, nodes): self.lu.LogInfo("Checking volume groups") @@ -8782,7 +8980,6 @@ class TLReplaceDisks(Tasklet): (node_name, self.instance.name)) def _CreateNewStorage(self, node_name): - vgname = self.cfg.GetVGName() iv_names = {} for idx, dev in enumerate(self.instance.disks): @@ -8796,10 +8993,12 @@ class TLReplaceDisks(Tasklet): lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]] names = _GenerateUniqueNames(self.lu, lv_names) + vg_data = dev.children[0].logical_id[0] lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size, - logical_id=(vgname, names[0])) + logical_id=(vg_data, names[0])) + vg_meta = dev.children[1].logical_id[0] lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128, - logical_id=(vgname, names[1])) + logical_id=(vg_meta, names[1])) new_lvs = [lv_data, lv_meta] old_lvs = dev.children @@ -8840,10 +9039,6 @@ class TLReplaceDisks(Tasklet): self.lu.LogWarning("Can't remove old LV: %s" % msg, hint="remove unused LVs manually") - def _ReleaseNodeLock(self, node_name): - """Releases the lock for a given node.""" - self.lu.context.glm.release(locking.LEVEL_NODE, node_name) - def _ExecDrbd8DiskOnly(self, feedback_fn): """Replace a disk on the primary or secondary for DRBD 8. @@ -8961,7 +9156,8 @@ class TLReplaceDisks(Tasklet): self._RemoveOldStorage(self.target_node, iv_names) # WARNING: we release both node locks here, do not do other RPCs # than WaitForSync to the primary node - self._ReleaseNodeLock([self.target_node, self.other_node]) + _ReleaseLocks(self.lu, locking.LEVEL_NODE, + names=[self.target_node, self.other_node]) # Wait for sync # This can fail as the old devices are degraded and _WaitForSync @@ -9118,9 +9314,10 @@ class TLReplaceDisks(Tasklet): self._RemoveOldStorage(self.target_node, iv_names) # WARNING: we release all node locks here, do not do other RPCs # than WaitForSync to the primary node - self._ReleaseNodeLock([self.instance.primary_node, - self.target_node, - self.new_node]) + _ReleaseLocks(self.lu, locking.LEVEL_NODE, + names=[self.instance.primary_node, + self.target_node, + self.new_node]) # Wait for sync # This can fail as the old devices are degraded and _WaitForSync @@ -9298,7 +9495,7 @@ class LUInstanceGrowDisk(LogicalUnit): if instance.disk_template not in constants.DTS_GROWABLE: raise errors.OpPrereqError("Instance's disk layout does not support" - " growing.", errors.ECODE_INVAL) + " growing", errors.ECODE_INVAL) self.disk = instance.FindDisk(self.op.disk) @@ -9320,9 +9517,17 @@ class LUInstanceGrowDisk(LogicalUnit): if not disks_ok: raise errors.OpExecError("Cannot activate block device to grow") + # First run all grow ops in dry-run mode for node in instance.all_nodes: self.cfg.SetDiskID(disk, node) - result = self.rpc.call_blockdev_grow(node, disk, self.op.amount) + result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, True) + result.Raise("Grow request failed to node %s" % node) + + # We know that (as far as we can test) operations across different + # nodes will succeed, time to run it for real + for node in instance.all_nodes: + self.cfg.SetDiskID(disk, node) + result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, False) result.Raise("Grow request failed to node %s" % node) # TODO: Rewrite code to work properly @@ -9337,14 +9542,14 @@ class LUInstanceGrowDisk(LogicalUnit): if self.op.wait_for_sync: disk_abort = not _WaitForSync(self, instance, disks=[disk]) if disk_abort: - self.proc.LogWarning("Warning: disk sync-ing has not returned a good" - " status.\nPlease check the instance.") + self.proc.LogWarning("Disk sync-ing has not returned a good" + " status; please check the instance") if not instance.admin_up: _SafeShutdownInstanceDisks(self, instance, disks=[disk]) elif not instance.admin_up: self.proc.LogWarning("Not shutting down the disk even if the instance is" " not supposed to be running because no wait for" - " sync mode was requested.") + " sync mode was requested") class LUInstanceQueryData(NoHooksLU): @@ -9355,23 +9560,33 @@ class LUInstanceQueryData(NoHooksLU): def ExpandNames(self): self.needed_locks = {} - self.share_locks = dict.fromkeys(locking.LEVELS, 1) - if self.op.instances: - self.wanted_names = [] - for name in self.op.instances: - full_name = _ExpandInstanceName(self.cfg, name) - self.wanted_names.append(full_name) - self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names + # Use locking if requested or when non-static information is wanted + if not (self.op.static or self.op.use_locking): + self.LogWarning("Non-static data requested, locks need to be acquired") + self.op.use_locking = True + + if self.op.instances or not self.op.use_locking: + # Expand instance names right here + self.wanted_names = _GetWantedInstances(self, self.op.instances) else: + # Will use acquired locks self.wanted_names = None - self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET - self.needed_locks[locking.LEVEL_NODE] = [] - self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + if self.op.use_locking: + self.share_locks = dict.fromkeys(locking.LEVELS, 1) + + if self.wanted_names is None: + self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET + else: + self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names + + self.needed_locks[locking.LEVEL_NODE] = [] + self.share_locks = dict.fromkeys(locking.LEVELS, 1) + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE def DeclareLocks(self, level): - if level == locking.LEVEL_NODE: + if self.op.use_locking and level == locking.LEVEL_NODE: self._LockInstancesNodes() def CheckPrereq(self): @@ -9381,10 +9596,11 @@ class LUInstanceQueryData(NoHooksLU): """ if self.wanted_names is None: - self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE] + assert self.op.use_locking, "Locking was not used" + self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE) - self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name - in self.wanted_names] + self.wanted_instances = [self.cfg.GetInstanceInfo(name) + for name in self.wanted_names] def _ComputeBlockdevStatus(self, node, instance_name, dev): """Returns the status of a block device @@ -9430,7 +9646,7 @@ class LUInstanceQueryData(NoHooksLU): else: dev_children = [] - data = { + return { "iv_name": dev.iv_name, "dev_type": dev.dev_type, "logical_id": dev.logical_id, @@ -9442,8 +9658,6 @@ class LUInstanceQueryData(NoHooksLU): "size": dev.size, } - return data - def Exec(self, feedback_fn): """Gather and return data""" result = {} @@ -9471,7 +9685,7 @@ class LUInstanceQueryData(NoHooksLU): disks = [self._ComputeDiskStatus(instance, None, device) for device in instance.disks] - idict = { + result[instance.name] = { "name": instance.name, "config_state": config_state, "run_state": remote_state, @@ -9496,8 +9710,6 @@ class LUInstanceQueryData(NoHooksLU): "uuid": instance.uuid, } - result[instance.name] = idict - return result @@ -9948,7 +10160,8 @@ class LUInstanceSetParams(LogicalUnit): snode = self.op.remote_node # create a fake disk info for _GenerateDiskTemplate - disk_info = [{constants.IDISK_SIZE: d.size, constants.IDISK_MODE: d.mode} + disk_info = [{constants.IDISK_SIZE: d.size, constants.IDISK_MODE: d.mode, + constants.IDISK_VG: d.logical_id[0]} for d in instance.disks] new_disks = _GenerateDiskTemplate(self, self.op.disk_template, instance.name, pnode, [snode], @@ -10192,7 +10405,7 @@ class LUBackupQuery(NoHooksLU): that node. """ - self.nodes = self.acquired_locks[locking.LEVEL_NODE] + self.nodes = self.glm.list_owned(locking.LEVEL_NODE) rpcresult = self.rpc.call_export_list(self.nodes) result = {} for node in rpcresult: @@ -10574,7 +10787,7 @@ class LUBackupRemove(NoHooksLU): fqdn_warn = True instance_name = self.op.instance_name - locked_nodes = self.acquired_locks[locking.LEVEL_NODE] + locked_nodes = self.glm.list_owned(locking.LEVEL_NODE) exportlist = self.rpc.call_export_list(locked_nodes) found = False for node in exportlist: @@ -10712,7 +10925,7 @@ class LUGroupAssignNodes(NoHooksLU): if previous_splits: self.LogWarning("In addition, these already-split instances continue" - " to be spit across groups: %s", + " to be split across groups: %s", utils.CommaJoin(utils.NiceSort(previous_splits))) def Exec(self, feedback_fn): @@ -10801,7 +11014,8 @@ class _GroupQuery(_QueryBase): missing.append(name) if missing: - raise errors.OpPrereqError("Some groups do not exist: %s" % missing, + raise errors.OpPrereqError("Some groups do not exist: %s" % + utils.CommaJoin(missing), errors.ECODE_NOENT) def DeclareLocks(self, lu, level): @@ -11084,8 +11298,8 @@ class TagsLU(NoHooksLU): # pylint: disable-msg=W0223 This is an abstract class which is the parent of all the other tags LUs. """ - def ExpandNames(self): + self.group_uuid = None self.needed_locks = {} if self.op.kind == constants.TAG_NODE: self.op.name = _ExpandNodeName(self.cfg, self.op.name) @@ -11093,6 +11307,8 @@ class TagsLU(NoHooksLU): # pylint: disable-msg=W0223 elif self.op.kind == constants.TAG_INSTANCE: self.op.name = _ExpandInstanceName(self.cfg, self.op.name) self.needed_locks[locking.LEVEL_INSTANCE] = self.op.name + elif self.op.kind == constants.TAG_NODEGROUP: + self.group_uuid = self.cfg.LookupNodeGroup(self.op.name) # FIXME: Acquire BGL for cluster tag operations (as of this writing it's # not possible to acquire the BGL based on opcode parameters) @@ -11107,6 +11323,8 @@ class TagsLU(NoHooksLU): # pylint: disable-msg=W0223 self.target = self.cfg.GetNodeInfo(self.op.name) elif self.op.kind == constants.TAG_INSTANCE: self.target = self.cfg.GetInstanceInfo(self.op.name) + elif self.op.kind == constants.TAG_NODEGROUP: + self.target = self.cfg.GetNodeGroup(self.group_uuid) else: raise errors.OpPrereqError("Wrong tag type requested (%s)" % str(self.op.kind), errors.ECODE_INVAL) @@ -11162,6 +11380,8 @@ class LUTagsSearch(NoHooksLU): tgts.extend([("/instances/%s" % i.name, i) for i in ilist]) nlist = cfg.GetAllNodesInfo().values() tgts.extend([("/nodes/%s" % n.name, n) for n in nlist]) + tgts.extend(("/nodegroup/%s" % n.name, n) + for n in cfg.GetAllNodeGroupsInfo().values()) results = [] for path, target in tgts: for tag in target.GetTags(): @@ -11799,8 +12019,61 @@ class IAllocator(object): if not isinstance(rdict["result"], list): raise errors.OpExecError("Can't parse iallocator results: 'result' key" " is not a list") + + if self.mode == constants.IALLOCATOR_MODE_RELOC: + assert self.relocate_from is not None + assert self.required_nodes == 1 + + node2group = dict((name, ndata["group"]) + for (name, ndata) in self.in_data["nodes"].items()) + + fn = compat.partial(self._NodesToGroups, node2group, + self.in_data["nodegroups"]) + + request_groups = fn(self.relocate_from) + result_groups = fn(rdict["result"]) + + if result_groups != request_groups: + raise errors.OpExecError("Groups of nodes returned by iallocator (%s)" + " differ from original groups (%s)" % + (utils.CommaJoin(result_groups), + utils.CommaJoin(request_groups))) + self.out_data = rdict + @staticmethod + def _NodesToGroups(node2group, groups, nodes): + """Returns a list of unique group names for a list of nodes. + + @type node2group: dict + @param node2group: Map from node name to group UUID + @type groups: dict + @param groups: Group information + @type nodes: list + @param nodes: Node names + + """ + result = set() + + for node in nodes: + try: + group_uuid = node2group[node] + except KeyError: + # Ignore unknown node + pass + else: + try: + group = groups[group_uuid] + except KeyError: + # Can't find group, let's use UUID + group_name = group_uuid + else: + group_name = group["name"] + + result.add(group_name) + + return sorted(result) + class LUTestAllocator(NoHooksLU): """Run allocator tests.