X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/982ed68e1491867830ef4254abc758a3702cf82c..cc0dec7b6ef920b8bb77c5c72e5dcd2f8fa09445:/lib/cmdlib.py diff --git a/lib/cmdlib.py b/lib/cmdlib.py index 75b2303..1d3d9a7 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc. +# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -39,6 +39,7 @@ import OpenSSL import socket import tempfile import shutil +import itertools from ganeti import ssh from ganeti import utils @@ -53,40 +54,24 @@ from ganeti import uidpool from ganeti import compat from ganeti import masterd from ganeti import netutils -from ganeti import ht from ganeti import query from ganeti import qlang +from ganeti import opcodes import ganeti.masterd.instance # pylint: disable-msg=W0611 -# Common opcode attributes -#: output fields for a query operation -_POutputFields = ("output_fields", ht.NoDefault, ht.TListOf(ht.TNonEmptyString)) +def _SupportsOob(cfg, node): + """Tells if node supports OOB. + @type cfg: L{config.ConfigWriter} + @param cfg: The cluster configuration + @type node: L{objects.Node} + @param node: The node + @return: The OOB script if supported or an empty string otherwise -#: the shutdown timeout -_PShutdownTimeout = ("shutdown_timeout", constants.DEFAULT_SHUTDOWN_TIMEOUT, - ht.TPositiveInt) - -#: the force parameter -_PForce = ("force", False, ht.TBool) - -#: a required instance name (for single-instance LUs) -_PInstanceName = ("instance_name", ht.NoDefault, ht.TNonEmptyString) - -#: Whether to ignore offline nodes -_PIgnoreOfflineNodes = ("ignore_offline_nodes", False, ht.TBool) - -#: a required node name (for single-node LUs) -_PNodeName = ("node_name", ht.NoDefault, ht.TNonEmptyString) - -#: the migration type (live/non-live) -_PMigrationMode = ("mode", None, - ht.TOr(ht.TNone, ht.TElemOf(constants.HT_MIGRATION_MODES))) - -#: the obsolete 'live' mode (boolean) -_PMigrationLive = ("live", None, ht.TMaybeBool) + """ + return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM] # End types @@ -106,13 +91,10 @@ class LogicalUnit(object): @ivar dry_run_result: the value (if any) that will be returned to the caller in dry-run mode (signalled by opcode dry_run parameter) - @cvar _OP_PARAMS: a list of opcode attributes, their defaults values - they should get if not already defined, and types they must match """ HPATH = None HTYPE = None - _OP_PARAMS = [] REQ_BGL = True def __init__(self, processor, op, context, rpc): @@ -151,32 +133,8 @@ class LogicalUnit(object): # Tasklets self.tasklets = None - # The new kind-of-type-system - op_id = self.op.OP_ID - for attr_name, aval, test in self._OP_PARAMS: - if not hasattr(op, attr_name): - if aval == ht.NoDefault: - raise errors.OpPrereqError("Required parameter '%s.%s' missing" % - (op_id, attr_name), errors.ECODE_INVAL) - else: - if callable(aval): - dval = aval() - else: - dval = aval - setattr(self.op, attr_name, dval) - attr_val = getattr(op, attr_name) - if test == ht.NoType: - # no tests here - continue - if not callable(test): - raise errors.ProgrammerError("Validation for parameter '%s.%s' failed," - " given type is not a proper type (%s)" % - (op_id, attr_name, test)) - if not test(attr_val): - logging.error("OpCode %s, parameter %s, has invalid type %s/value %s", - self.op.OP_ID, attr_name, type(attr_val), attr_val) - raise errors.OpPrereqError("Parameter '%s.%s' fails validation" % - (op_id, attr_name), errors.ECODE_INVAL) + # Validate opcode parameters and set defaults + self.op.Validate(True) self.CheckArguments() @@ -505,6 +463,35 @@ class _QueryBase: self.query = query.Query(self.FIELDS, fields) self.requested_data = self.query.RequestedData() + self.do_locking = None + self.wanted = None + + def _GetNames(self, lu, all_names, lock_level): + """Helper function to determine names asked for in the query. + + """ + if self.do_locking: + names = lu.acquired_locks[lock_level] + else: + names = all_names + + if self.wanted == locking.ALL_SET: + assert not self.names + # caller didn't specify names, so ordering is not important + return utils.NiceSort(names) + + # caller specified names and we must keep the same order + assert self.names + assert not self.do_locking or lu.acquired_locks[lock_level] + + missing = set(self.wanted).difference(names) + if missing: + raise errors.OpExecError("Some items were removed before retrieving" + " their data: %s" % missing) + + # Return expanded names + return self.wanted + @classmethod def FieldsQuery(cls, fields): """Returns list of available fields. @@ -512,15 +499,7 @@ class _QueryBase: @return: List of L{objects.QueryFieldDefinition} """ - if fields is None: - # Client requests all fields - fdefs = query.GetAllFields(cls.FIELDS.values()) - else: - fdefs = query.Query(cls.FIELDS, fields).GetFields() - - return { - "fields": [fdef.ToDict() for fdef in fdefs], - } + return query.QueryFields(cls.FIELDS, fields) def ExpandNames(self, lu): """Expand names for this query. @@ -530,7 +509,7 @@ class _QueryBase: """ raise NotImplementedError() - def DeclareLocks(self, level): + def DeclareLocks(self, lu, level): """Declare locks for this query. See L{LogicalUnit.DeclareLocks}. @@ -550,13 +529,7 @@ class _QueryBase: """Collect data and execute query. """ - data = self._GetQueryData(lu) - - return { - "data": self.query.Query(data), - "fields": [fdef.ToDict() - for fdef in self.query.GetFields()], - } + return query.GetQueryResponse(self.query, self._GetQueryData(lu)) def OldStyleQuery(self, lu): """Collect data and execute query. @@ -577,12 +550,10 @@ def _GetWantedNodes(lu, nodes): @raise errors.ProgrammerError: if the nodes parameter is wrong type """ - if not nodes: - raise errors.ProgrammerError("_GetWantedNodes should only be called with a" - " non-empty list of nodes whose name is to be expanded.") + if nodes: + return [_ExpandNodeName(lu.cfg, name) for name in nodes] - wanted = [_ExpandNodeName(lu.cfg, name) for name in nodes] - return utils.NiceSort(wanted) + return utils.NiceSort(lu.cfg.GetNodeList()) def _GetWantedInstances(lu, instances): @@ -758,42 +729,6 @@ def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq): raise errors.OpExecError(msg) -def _RequireFileStorage(): - """Checks that file storage is enabled. - - @raise errors.OpPrereqError: when file storage is disabled - - """ - if not constants.ENABLE_FILE_STORAGE: - raise errors.OpPrereqError("File storage disabled at configure time", - errors.ECODE_INVAL) - - -def _CheckDiskTemplate(template): - """Ensure a given disk template is valid. - - """ - if template not in constants.DISK_TEMPLATES: - msg = ("Invalid disk template name '%s', valid templates are: %s" % - (template, utils.CommaJoin(constants.DISK_TEMPLATES))) - raise errors.OpPrereqError(msg, errors.ECODE_INVAL) - if template == constants.DT_FILE: - _RequireFileStorage() - return True - - -def _CheckStorageType(storage_type): - """Ensure a given storage type is valid. - - """ - if storage_type not in constants.VALID_STORAGE_TYPES: - raise errors.OpPrereqError("Unknown storage type: %s" % storage_type, - errors.ECODE_INVAL) - if storage_type == constants.ST_FILE: - _RequireFileStorage() - return True - - def _GetClusterDomainSecret(): """Reads the cluster domain secret. @@ -1154,7 +1089,7 @@ def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot): " iallocator.") -class LUPostInitCluster(LogicalUnit): +class LUClusterPostInit(LogicalUnit): """Logical unit for running hooks after cluster initialization. """ @@ -1176,7 +1111,7 @@ class LUPostInitCluster(LogicalUnit): return True -class LUDestroyCluster(LogicalUnit): +class LUClusterDestroy(LogicalUnit): """Logical unit for destroying the cluster. """ @@ -1232,7 +1167,7 @@ class LUDestroyCluster(LogicalUnit): def _VerifyCertificate(filename): - """Verifies a certificate for LUVerifyCluster. + """Verifies a certificate for LUClusterVerify. @type filename: string @param filename: Path to PEM file @@ -1242,7 +1177,7 @@ def _VerifyCertificate(filename): cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, utils.ReadFile(filename)) except Exception, err: # pylint: disable-msg=W0703 - return (LUVerifyCluster.ETYPE_ERROR, + return (LUClusterVerify.ETYPE_ERROR, "Failed to load X509 certificate %s: %s" % (filename, err)) (errcode, msg) = \ @@ -1257,26 +1192,19 @@ def _VerifyCertificate(filename): if errcode is None: return (None, fnamemsg) elif errcode == utils.CERT_WARNING: - return (LUVerifyCluster.ETYPE_WARNING, fnamemsg) + return (LUClusterVerify.ETYPE_WARNING, fnamemsg) elif errcode == utils.CERT_ERROR: - return (LUVerifyCluster.ETYPE_ERROR, fnamemsg) + return (LUClusterVerify.ETYPE_ERROR, fnamemsg) raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode) -class LUVerifyCluster(LogicalUnit): +class LUClusterVerify(LogicalUnit): """Verifies the cluster status. """ HPATH = "cluster-verify" HTYPE = constants.HTYPE_CLUSTER - _OP_PARAMS = [ - ("skip_checks", ht.EmptyList, - ht.TListOf(ht.TElemOf(constants.VERIFY_OPTIONAL_CHECKS))), - ("verbose", False, ht.TBool), - ("error_codes", False, ht.TBool), - ("debug_simulate_errors", False, ht.TBool), - ] REQ_BGL = False TCLUSTER = "cluster" @@ -1291,6 +1219,7 @@ class LUVerifyCluster(LogicalUnit): EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK") EINSTANCEFAULTYDISK = (TINSTANCE, "EINSTANCEFAULTYDISK") EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE") + EINSTANCESPLITGROUPS = (TINSTANCE, "EINSTANCESPLITGROUPS") ENODEDRBD = (TNODE, "ENODEDRBD") ENODEDRBDHELPER = (TNODE, "ENODEDRBDHELPER") ENODEFILECHECK = (TNODE, "ENODEFILECHECK") @@ -1307,6 +1236,7 @@ class LUVerifyCluster(LogicalUnit): ENODEVERSION = (TNODE, "ENODEVERSION") ENODESETUP = (TNODE, "ENODESETUP") ENODETIME = (TNODE, "ENODETIME") + ENODEOOBPATH = (TNODE, "ENODEOOBPATH") ETYPE_FIELD = "code" ETYPE_ERROR = "ERROR" @@ -1693,8 +1623,8 @@ class LUVerifyCluster(LogicalUnit): needed_mem += bep[constants.BE_MEMORY] test = n_img.mfree < needed_mem self._ErrorIf(test, self.ENODEN1, node, - "not enough memory on to accommodate" - " failovers should peer node %s fail", prinode) + "not enough memory to accomodate instance failovers" + " should node %s fail", prinode) def _VerifyNodeFiles(self, ninfo, nresult, file_list, local_cksum, master_files): @@ -1897,6 +1827,22 @@ class LUVerifyCluster(LogicalUnit): "OSes present on reference node %s but missing on this node: %s", base.name, utils.CommaJoin(missing)) + def _VerifyOob(self, ninfo, nresult): + """Verifies out of band functionality of a node. + + @type ninfo: L{objects.Node} + @param ninfo: the node to check + @param nresult: the remote results for the node + + """ + node = ninfo.name + # We just have to verify the paths on master and/or master candidates + # as the oob helper is invoked on the master + if ((ninfo.master_candidate or ninfo.master_capable) and + constants.NV_OOB_PATHS in nresult): + for path_result in nresult[constants.NV_OOB_PATHS]: + self._ErrorIf(path_result, self.ENODEOOBPATH, node, path_result) + def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name): """Verifies and updates the node volume data. @@ -1995,18 +1941,26 @@ class LUVerifyCluster(LogicalUnit): @param node_image: Node objects @type instanceinfo: dict of (name, L{objects.Instance}) @param instanceinfo: Instance objects + @rtype: {instance: {node: [(succes, payload)]}} + @return: a dictionary of per-instance dictionaries with nodes as + keys and disk information as values; the disk information is a + list of tuples (success, payload) """ _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 node_disks = {} node_disks_devonly = {} + diskless_instances = set() + diskless = constants.DT_DISKLESS for nname in nodelist: + node_instances = list(itertools.chain(node_image[nname].pinst, + node_image[nname].sinst)) + diskless_instances.update(inst for inst in node_instances + if instanceinfo[inst].disk_template == diskless) disks = [(inst, disk) - for instlist in [node_image[nname].pinst, - node_image[nname].sinst] - for inst in instlist + for inst in node_instances for disk in instanceinfo[inst].disks] if not disks: @@ -2035,28 +1989,43 @@ class LUVerifyCluster(LogicalUnit): instdisk = {} for (nname, nres) in result.items(): - if nres.offline: - # Ignore offline node - continue - disks = node_disks[nname] - msg = nres.fail_msg - _ErrorIf(msg, self.ENODERPC, nname, - "while getting disk information: %s", nres.fail_msg) - if msg: + if nres.offline: # No data from this node - data = len(disks) * [None] + data = len(disks) * [(False, "node offline")] else: - data = nres.payload + msg = nres.fail_msg + _ErrorIf(msg, self.ENODERPC, nname, + "while getting disk information: %s", msg) + if msg: + # No data from this node + data = len(disks) * [(False, msg)] + else: + data = [] + for idx, i in enumerate(nres.payload): + if isinstance(i, (tuple, list)) and len(i) == 2: + data.append(i) + else: + logging.warning("Invalid result from node %s, entry %d: %s", + nname, idx, i) + data.append((False, "Invalid result from the remote node")) for ((inst, _), status) in zip(disks, data): instdisk.setdefault(inst, {}).setdefault(nname, []).append(status) + # Add empty entries for diskless instances. + for inst in diskless_instances: + assert inst not in instdisk + instdisk[inst] = {} + assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and - len(nnames) <= len(instanceinfo[inst].all_nodes) + len(nnames) <= len(instanceinfo[inst].all_nodes) and + compat.all(isinstance(s, (tuple, list)) and + len(s) == 2 for s in statuses) for inst, nnames in instdisk.items() for nname, statuses in nnames.items()) + assert set(instdisk) == set(instanceinfo), "instdisk consistency failure" return instdisk @@ -2080,6 +2049,7 @@ class LUVerifyCluster(LogicalUnit): """Verify integrity of cluster, performing various test on nodes. """ + # This method has too many local variables. pylint: disable-msg=R0914 self.bad = False _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 verbose = self.op.verbose @@ -2099,9 +2069,11 @@ class LUVerifyCluster(LogicalUnit): cluster = self.cfg.GetClusterInfo() nodelist = utils.NiceSort(self.cfg.GetNodeList()) nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist] + nodeinfo_byname = dict(zip(nodelist, nodeinfo)) instancelist = utils.NiceSort(self.cfg.GetInstanceList()) instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname)) for iname in instancelist) + groupinfo = self.cfg.GetAllNodeGroupsInfo() i_non_redundant = [] # Non redundant instances i_non_a_balanced = [] # Non auto-balanced instances n_offline = 0 # Count of offline nodes @@ -2156,6 +2128,16 @@ class LUVerifyCluster(LogicalUnit): vm_capable=node.vm_capable)) for node in nodeinfo) + # Gather OOB paths + oob_paths = [] + for node in nodeinfo: + path = _SupportsOob(self.cfg, node) + if path and path not in oob_paths: + oob_paths.append(path) + + if oob_paths: + node_verify_param[constants.NV_OOB_PATHS] = oob_paths + for instance in instancelist: inst_config = instanceinfo[instance] @@ -2235,6 +2217,8 @@ class LUVerifyCluster(LogicalUnit): self._VerifyNodeFiles(node_i, nresult, file_names, local_checksums, master_files) + self._VerifyOob(node_i, nresult) + if nimg.vm_capable: self._VerifyNodeLVM(node_i, nresult, vg_name) self._VerifyNodeDrbd(node_i, nresult, instanceinfo, drbd_helper, @@ -2274,11 +2258,33 @@ class LUVerifyCluster(LogicalUnit): # FIXME: does not support file-backed instances if not inst_config.secondary_nodes: i_non_redundant.append(instance) + _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT, instance, "instance has multiple secondary nodes: %s", utils.CommaJoin(inst_config.secondary_nodes), code=self.ETYPE_WARNING) + if inst_config.disk_template in constants.DTS_NET_MIRROR: + pnode = inst_config.primary_node + instance_nodes = utils.NiceSort(inst_config.all_nodes) + instance_groups = {} + + for node in instance_nodes: + instance_groups.setdefault(nodeinfo_byname[node].group, + []).append(node) + + pretty_list = [ + "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name) + # Sort so that we always list the primary node first. + for group, nodes in sorted(instance_groups.items(), + key=lambda (_, nodes): pnode in nodes, + reverse=True)] + + self._ErrorIf(len(instance_groups) > 1, self.EINSTANCESPLITGROUPS, + instance, "instance has primary and secondary nodes in" + " different groups: %s", utils.CommaJoin(pretty_list), + code=self.ETYPE_WARNING) + if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]: i_non_a_balanced.append(instance) @@ -2375,7 +2381,7 @@ class LUVerifyCluster(LogicalUnit): return lu_result -class LUVerifyDisks(NoHooksLU): +class LUClusterVerifyDisks(NoHooksLU): """Verifies the cluster disks status. """ @@ -2399,7 +2405,6 @@ class LUVerifyDisks(NoHooksLU): """ result = res_nodes, res_instances, res_missing = {}, [], {} - vg_name = self.cfg.GetVGName() nodes = utils.NiceSort(self.cfg.GetNodeList()) instances = [self.cfg.GetInstanceInfo(name) for name in self.cfg.GetInstanceList()] @@ -2419,11 +2424,13 @@ class LUVerifyDisks(NoHooksLU): if not nv_dict: return result - node_lvs = self.rpc.call_lv_list(nodes, vg_name) + vg_names = self.rpc.call_vg_list(nodes) + vg_names.Raise("Cannot get list of VGs") for node in nodes: # node_volume - node_res = node_lvs[node] + node_res = self.rpc.call_lv_list([node], + vg_names[node].payload.keys())[node] if node_res.offline: continue msg = node_res.fail_msg @@ -2449,11 +2456,10 @@ class LUVerifyDisks(NoHooksLU): return result -class LURepairDiskSizes(NoHooksLU): +class LUClusterRepairDiskSizes(NoHooksLU): """Verifies the cluster disks sizes. """ - _OP_PARAMS = [("instances", ht.EmptyList, ht.TListOf(ht.TNonEmptyString))] REQ_BGL = False def ExpandNames(self): @@ -2565,13 +2571,12 @@ class LURepairDiskSizes(NoHooksLU): return changed -class LURenameCluster(LogicalUnit): +class LUClusterRename(LogicalUnit): """Rename the cluster. """ HPATH = "cluster-rename" HTYPE = constants.HTYPE_CLUSTER - _OP_PARAMS = [("name", ht.NoDefault, ht.TNonEmptyString)] def BuildHooksEnv(self): """Build hooks env. @@ -2644,46 +2649,12 @@ class LURenameCluster(LogicalUnit): return clustername -class LUSetClusterParams(LogicalUnit): +class LUClusterSetParams(LogicalUnit): """Change the parameters of the cluster. """ HPATH = "cluster-modify" HTYPE = constants.HTYPE_CLUSTER - _OP_PARAMS = [ - ("vg_name", None, ht.TMaybeString), - ("enabled_hypervisors", None, - ht.TOr(ht.TAnd(ht.TListOf(ht.TElemOf(constants.HYPER_TYPES)), ht.TTrue), - ht.TNone)), - ("hvparams", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict), - ht.TNone)), - ("beparams", None, ht.TOr(ht.TDict, ht.TNone)), - ("os_hvp", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict), - ht.TNone)), - ("osparams", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict), - ht.TNone)), - ("candidate_pool_size", None, ht.TOr(ht.TStrictPositiveInt, ht.TNone)), - ("uid_pool", None, ht.NoType), - ("add_uids", None, ht.NoType), - ("remove_uids", None, ht.NoType), - ("maintain_node_health", None, ht.TMaybeBool), - ("prealloc_wipe_disks", None, ht.TMaybeBool), - ("nicparams", None, ht.TOr(ht.TDict, ht.TNone)), - ("ndparams", None, ht.TOr(ht.TDict, ht.TNone)), - ("drbd_helper", None, ht.TOr(ht.TString, ht.TNone)), - ("default_iallocator", None, ht.TOr(ht.TString, ht.TNone)), - ("reserved_lvs", None, ht.TOr(ht.TListOf(ht.TNonEmptyString), ht.TNone)), - ("hidden_os", None, ht.TOr(ht.TListOf(\ - ht.TAnd(ht.TList, - ht.TIsLength(2), - ht.TMap(lambda v: v[0], ht.TElemOf(constants.DDMS_VALUES)))), - ht.TNone)), - ("blacklisted_os", None, ht.TOr(ht.TListOf(\ - ht.TAnd(ht.TList, - ht.TIsLength(2), - ht.TMap(lambda v: v[0], ht.TElemOf(constants.DDMS_VALUES)))), - ht.TNone)), - ] REQ_BGL = False def CheckArguments(self): @@ -2988,8 +2959,27 @@ class LUSetClusterParams(LogicalUnit): if self.op.blacklisted_os: helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted") + if self.op.master_netdev: + master = self.cfg.GetMasterNode() + feedback_fn("Shutting down master ip on the current netdev (%s)" % + self.cluster.master_netdev) + result = self.rpc.call_node_stop_master(master, False) + result.Raise("Could not disable the master ip") + feedback_fn("Changing master_netdev from %s to %s" % + (self.cluster.master_netdev, self.op.master_netdev)) + self.cluster.master_netdev = self.op.master_netdev + self.cfg.Update(self.cluster, feedback_fn) + if self.op.master_netdev: + feedback_fn("Starting the master ip on the new master netdev (%s)" % + self.op.master_netdev) + result = self.rpc.call_node_start_master(master, False, False) + if result.fail_msg: + self.LogWarning("Could not re-enable the master ip on" + " the master, please restart manually: %s", + result.fail_msg) + def _UploadHelper(lu, nodes, fname): """Helper for uploading a file and showing warnings. @@ -3054,7 +3044,7 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True): _UploadHelper(lu, vm_nodes, fname) -class LURedistributeConfig(NoHooksLU): +class LUClusterRedistConf(NoHooksLU): """Force the redistribution of cluster configuration. This is a very simple LU. @@ -3185,14 +3175,130 @@ def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False): return result +class LUOobCommand(NoHooksLU): + """Logical unit for OOB handling. + + """ + REG_BGL = False + + def CheckPrereq(self): + """Check prerequisites. + + This checks: + - the node exists in the configuration + - OOB is supported + + Any errors are signaled by raising errors.OpPrereqError. + + """ + self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) + node = self.cfg.GetNodeInfo(self.op.node_name) + + if node is None: + raise errors.OpPrereqError("Node %s not found" % self.op.node_name) + + self.oob_program = _SupportsOob(self.cfg, node) + + if not self.oob_program: + raise errors.OpPrereqError("OOB is not supported for node %s" % + self.op.node_name) + + if self.op.command == constants.OOB_POWER_OFF and not node.offline: + raise errors.OpPrereqError(("Cannot power off node %s because it is" + " not marked offline") % self.op.node_name) + + self.node = node + + def ExpandNames(self): + """Gather locks we need. + + """ + node_name = _ExpandNodeName(self.cfg, self.op.node_name) + self.needed_locks = { + locking.LEVEL_NODE: [node_name], + } + + def Exec(self, feedback_fn): + """Execute OOB and return result if we expect any. + + """ + master_node = self.cfg.GetMasterNode() + node = self.node + + logging.info("Executing out-of-band command '%s' using '%s' on %s", + self.op.command, self.oob_program, self.op.node_name) + result = self.rpc.call_run_oob(master_node, self.oob_program, + self.op.command, self.op.node_name, + self.op.timeout) + + result.Raise("An error occurred on execution of OOB helper") + + self._CheckPayload(result) + + if self.op.command == constants.OOB_HEALTH: + # For health we should log important events + for item, status in result.payload: + if status in [constants.OOB_STATUS_WARNING, + constants.OOB_STATUS_CRITICAL]: + logging.warning("On node '%s' item '%s' has status '%s'", + self.op.node_name, item, status) + + if self.op.command == constants.OOB_POWER_ON: + node.powered = True + elif self.op.command == constants.OOB_POWER_OFF: + node.powered = False + elif self.op.command == constants.OOB_POWER_STATUS: + powered = result.payload[constants.OOB_POWER_STATUS_POWERED] + if powered != self.node.powered: + logging.warning(("Recorded power state (%s) of node '%s' does not match" + " actual power state (%s)"), node.powered, + self.op.node_name, powered) + + self.cfg.Update(node, feedback_fn) + + return result.payload + + def _CheckPayload(self, result): + """Checks if the payload is valid. + + @param result: RPC result + @raises errors.OpExecError: If payload is not valid + + """ + errs = [] + if self.op.command == constants.OOB_HEALTH: + if not isinstance(result.payload, list): + errs.append("command 'health' is expected to return a list but got %s" % + type(result.payload)) + for item, status in result.payload: + if status not in constants.OOB_STATUSES: + errs.append("health item '%s' has invalid status '%s'" % + (item, status)) + + if self.op.command == constants.OOB_POWER_STATUS: + if not isinstance(result.payload, dict): + errs.append("power-status is expected to return a dict but got %s" % + type(result.payload)) + + if self.op.command in [ + constants.OOB_POWER_ON, + constants.OOB_POWER_OFF, + constants.OOB_POWER_CYCLE, + ]: + if result.payload is not None: + errs.append("%s is expected to not return payload but got '%s'" % + (self.op.command, result.payload)) + + if errs: + raise errors.OpExecError("Check of out-of-band payload failed due to %s" % + utils.CommaJoin(errs)) + + + class LUDiagnoseOS(NoHooksLU): """Logical unit for OS diagnose/query. """ - _OP_PARAMS = [ - _POutputFields, - ("names", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)), - ] REQ_BGL = False _HID = "hidden" _BLK = "blacklisted" @@ -3329,9 +3435,6 @@ class LURemoveNode(LogicalUnit): """ HPATH = "node-remove" HTYPE = constants.HTYPE_NODE - _OP_PARAMS = [ - _PNodeName, - ] def BuildHooksEnv(self): """Build hooks env. @@ -3441,7 +3544,7 @@ class _NodeQuery(_QueryBase): # if we don't request only static fields, we need to lock the nodes lu.needed_locks[locking.LEVEL_NODE] = self.wanted - def DeclareLocks(self, _): + def DeclareLocks(self, lu, level): pass def _GetQueryData(self, lu): @@ -3450,18 +3553,7 @@ class _NodeQuery(_QueryBase): """ all_info = lu.cfg.GetAllNodesInfo() - if self.do_locking: - nodenames = lu.acquired_locks[locking.LEVEL_NODE] - elif self.wanted != locking.ALL_SET: - nodenames = self.wanted - missing = set(nodenames).difference(all_info.keys()) - if missing: - raise errors.OpExecError("Some nodes were removed before retrieving" - " their data: %s" % missing) - else: - nodenames = all_info.keys() - - nodenames = utils.NiceSort(nodenames) + nodenames = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE) # Gather data as requested if query.NQ_LIVE in self.requested_data: @@ -3489,6 +3581,12 @@ class _NodeQuery(_QueryBase): node_to_primary = None node_to_secondary = None + if query.NQ_OOB in self.requested_data: + oob_support = dict((name, bool(_SupportsOob(lu.cfg, node))) + for name, node in all_info.iteritems()) + else: + oob_support = None + if query.NQ_GROUP in self.requested_data: groups = lu.cfg.GetAllNodeGroupsInfo() else: @@ -3496,7 +3594,8 @@ class _NodeQuery(_QueryBase): return query.NodeQueryData([all_info[name] for name in nodenames], live_data, lu.cfg.GetMasterNode(), - node_to_primary, node_to_secondary, groups) + node_to_primary, node_to_secondary, groups, + oob_support, lu.cfg.GetClusterInfo()) class LUQueryNodes(NoHooksLU): @@ -3504,11 +3603,6 @@ class LUQueryNodes(NoHooksLU): """ # pylint: disable-msg=W0142 - _OP_PARAMS = [ - _POutputFields, - ("names", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)), - ("use_locking", False, ht.TBool), - ] REQ_BGL = False def CheckArguments(self): @@ -3526,10 +3620,6 @@ class LUQueryNodeVolumes(NoHooksLU): """Logical unit for getting volumes on node(s). """ - _OP_PARAMS = [ - _POutputFields, - ("nodes", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)), - ] REQ_BGL = False _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance") _FIELDS_STATIC = utils.FieldSet("node") @@ -3609,12 +3699,6 @@ class LUQueryNodeStorage(NoHooksLU): """ _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE) - _OP_PARAMS = [ - _POutputFields, - ("nodes", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)), - ("storage_type", ht.NoDefault, _CheckStorageType), - ("name", None, ht.TMaybeString), - ] REQ_BGL = False def CheckArguments(self): @@ -3693,31 +3777,74 @@ class LUQueryNodeStorage(NoHooksLU): return result -def _InstanceQuery(*args): # pylint: disable-msg=W0613 - """Dummy until instance queries have been converted to query2. +class _InstanceQuery(_QueryBase): + FIELDS = query.INSTANCE_FIELDS - """ - raise NotImplementedError + def ExpandNames(self, lu): + lu.needed_locks = {} + lu.share_locks[locking.LEVEL_INSTANCE] = 1 + lu.share_locks[locking.LEVEL_NODE] = 1 + if self.names: + self.wanted = _GetWantedInstances(lu, self.names) + else: + self.wanted = locking.ALL_SET -#: Query type implementations -_QUERY_IMPL = { - constants.QR_INSTANCE: _InstanceQuery, - constants.QR_NODE: _NodeQuery, - } + self.do_locking = (self.use_locking and + query.IQ_LIVE in self.requested_data) + if self.do_locking: + lu.needed_locks[locking.LEVEL_INSTANCE] = self.wanted + lu.needed_locks[locking.LEVEL_NODE] = [] + lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + def DeclareLocks(self, lu, level): + if level == locking.LEVEL_NODE and self.do_locking: + lu._LockInstancesNodes() # pylint: disable-msg=W0212 -def _GetQueryImplementation(name): - """Returns the implemtnation for a query type. + def _GetQueryData(self, lu): + """Computes the list of instances and their attributes. - @param name: Query type, must be one of L{constants.QR_OP_QUERY} + """ + all_info = lu.cfg.GetAllInstancesInfo() - """ - try: - return _QUERY_IMPL[name] - except KeyError: - raise errors.OpPrereqError("Unknown query resource '%s'" % name, - errors.ECODE_INVAL) + instance_names = self._GetNames(lu, all_info.keys(), locking.LEVEL_INSTANCE) + + instance_list = [all_info[name] for name in instance_names] + nodes = frozenset([inst.primary_node for inst in instance_list]) + hv_list = list(set([inst.hypervisor for inst in instance_list])) + bad_nodes = [] + offline_nodes = [] + + # Gather data as requested + if query.IQ_LIVE in self.requested_data: + live_data = {} + node_data = lu.rpc.call_all_instances_info(nodes, hv_list) + for name in nodes: + result = node_data[name] + if result.offline: + # offline nodes will be in both lists + assert result.fail_msg + offline_nodes.append(name) + if result.fail_msg: + bad_nodes.append(name) + elif result.payload: + live_data.update(result.payload) + # else no instance is alive + else: + live_data = {} + + if query.IQ_DISKUSAGE in self.requested_data: + disk_usage = dict((inst.name, + _ComputeDiskSize(inst.disk_template, + [{"size": disk.size} + for disk in inst.disks])) + for inst in instance_list) + else: + disk_usage = None + + return query.InstanceQueryData(instance_list, lu.cfg.GetClusterInfo(), + disk_usage, offline_nodes, bad_nodes, + live_data) class LUQuery(NoHooksLU): @@ -3725,12 +3852,6 @@ class LUQuery(NoHooksLU): """ # pylint: disable-msg=W0142 - _OP_PARAMS = [ - ("what", ht.NoDefault, ht.TElemOf(constants.QR_OP_QUERY)), - ("fields", ht.NoDefault, ht.TListOf(ht.TNonEmptyString)), - ("filter", None, ht.TOr(ht.TNone, - ht.TListOf(ht.TOr(ht.TNonEmptyString, ht.TList)))), - ] REQ_BGL = False def CheckArguments(self): @@ -3754,10 +3875,6 @@ class LUQueryFields(NoHooksLU): """ # pylint: disable-msg=W0142 - _OP_PARAMS = [ - ("what", ht.NoDefault, ht.TElemOf(constants.QR_OP_QUERY)), - ("fields", None, ht.TOr(ht.TNone, ht.TListOf(ht.TNonEmptyString))), - ] REQ_BGL = False def CheckArguments(self): @@ -3774,12 +3891,6 @@ class LUModifyNodeStorage(NoHooksLU): """Logical unit for modifying a storage volume on a node. """ - _OP_PARAMS = [ - _PNodeName, - ("storage_type", ht.NoDefault, _CheckStorageType), - ("name", ht.NoDefault, ht.TNonEmptyString), - ("changes", ht.NoDefault, ht.TDict), - ] REQ_BGL = False def CheckArguments(self): @@ -3824,16 +3935,6 @@ class LUAddNode(LogicalUnit): """ HPATH = "node-add" HTYPE = constants.HTYPE_NODE - _OP_PARAMS = [ - _PNodeName, - ("primary_ip", None, ht.NoType), - ("secondary_ip", None, ht.TMaybeString), - ("readd", False, ht.TBool), - ("group", None, ht.TMaybeString), - ("master_capable", None, ht.TMaybeBool), - ("vm_capable", None, ht.TMaybeBool), - ("ndparams", None, ht.TOr(ht.TDict, ht.TNone)), - ] _NFLAGS = ["master_capable", "vm_capable"] def CheckArguments(self): @@ -4002,6 +4103,9 @@ class LUAddNode(LogicalUnit): new_node = self.new_node node = new_node.name + # We adding a new node so we assume it's powered + new_node.powered = True + # for re-adds, reset the offline/drained/master-candidate flags; # we need to reset here, otherwise offline would prevent RPC calls # later in the procedure; this also means that if the re-add @@ -4024,6 +4128,8 @@ class LUAddNode(LogicalUnit): if self.op.ndparams: new_node.ndparams = self.op.ndparams + else: + new_node.ndparams = {} # check connectivity result = self.rpc.call_version([node])[node] @@ -4096,18 +4202,6 @@ class LUSetNodeParams(LogicalUnit): """ HPATH = "node-modify" HTYPE = constants.HTYPE_NODE - _OP_PARAMS = [ - _PNodeName, - ("master_candidate", None, ht.TMaybeBool), - ("offline", None, ht.TMaybeBool), - ("drained", None, ht.TMaybeBool), - ("auto_promote", False, ht.TBool), - ("master_capable", None, ht.TMaybeBool), - ("vm_capable", None, ht.TMaybeBool), - ("secondary_ip", None, ht.TMaybeString), - ("ndparams", None, ht.TOr(ht.TDict, ht.TNone)), - _PForce, - ] REQ_BGL = False (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4) _F2R = { @@ -4248,6 +4342,18 @@ class LUSetNodeParams(LogicalUnit): # Past this point, any flag change to False means a transition # away from the respective state, as only real changes are kept + # TODO: We might query the real power state if it supports OOB + if _SupportsOob(self.cfg, node): + if self.op.offline is False and not (node.powered or + self.op.powered == True): + raise errors.OpPrereqError(("Please power on node %s first before you" + " can reset offline state") % + self.op.node_name) + elif self.op.powered is not None: + raise errors.OpPrereqError(("Unable to change powered state for node %s" + " which does not support out-of-band" + " handling") % self.op.node_name) + # If we're being deofflined/drained, we'll MC ourself if needed if (self.op.drained == False or self.op.offline == False or (self.op.master_capable and not node.master_capable)): @@ -4337,6 +4443,9 @@ class LUSetNodeParams(LogicalUnit): if self.op.ndparams: node.ndparams = self.new_ndparams + if self.op.powered is not None: + node.powered = self.op.powered + for attr in ["master_capable", "vm_capable"]: val = getattr(self.op, attr) if val is not None: @@ -4379,10 +4488,6 @@ class LUPowercycleNode(NoHooksLU): """Powercycles a node. """ - _OP_PARAMS = [ - _PNodeName, - _PForce, - ] REQ_BGL = False def CheckArguments(self): @@ -4411,7 +4516,7 @@ class LUPowercycleNode(NoHooksLU): return result.payload -class LUQueryClusterInfo(NoHooksLU): +class LUClusterQuery(NoHooksLU): """Query cluster configuration. """ @@ -4456,6 +4561,7 @@ class LUQueryClusterInfo(NoHooksLU): "beparams": cluster.beparams, "osparams": cluster.osparams, "nicparams": cluster.nicparams, + "ndparams": cluster.ndparams, "candidate_pool_size": cluster.candidate_pool_size, "master_netdev": cluster.master_netdev, "volume_group_name": cluster.volume_group_name, @@ -4476,11 +4582,10 @@ class LUQueryClusterInfo(NoHooksLU): return result -class LUQueryConfigValues(NoHooksLU): +class LUClusterConfigQuery(NoHooksLU): """Return configuration values. """ - _OP_PARAMS = [_POutputFields] REQ_BGL = False _FIELDS_DYNAMIC = utils.FieldSet() _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag", @@ -4516,14 +4621,10 @@ class LUQueryConfigValues(NoHooksLU): return values -class LUActivateInstanceDisks(NoHooksLU): +class LUInstanceActivateDisks(NoHooksLU): """Bring up an instance's disks. """ - _OP_PARAMS = [ - _PInstanceName, - ("ignore_size", False, ht.TBool), - ] REQ_BGL = False def ExpandNames(self): @@ -4666,9 +4767,6 @@ class LUDeactivateInstanceDisks(NoHooksLU): """Shutdown an instance's disks. """ - _OP_PARAMS = [ - _PInstanceName, - ] REQ_BGL = False def ExpandNames(self): @@ -4747,7 +4845,8 @@ def _ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False): if msg: lu.LogWarning("Could not shutdown block device %s on node %s: %s", disk.iv_name, node, msg) - if not ignore_primary or node != instance.primary_node: + if ((node == instance.primary_node and not ignore_primary) or + (node != instance.primary_node and not result.offline)): all_result = False return all_result @@ -4808,9 +4907,8 @@ def _CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes): or we cannot check the node """ - if req_sizes is not None: - for vg, req_size in req_sizes.iteritems(): - _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size) + for vg, req_size in req_sizes.items(): + _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size) def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested): @@ -4856,13 +4954,6 @@ class LUStartupInstance(LogicalUnit): """ HPATH = "instance-start" HTYPE = constants.HTYPE_INSTANCE - _OP_PARAMS = [ - _PInstanceName, - _PForce, - _PIgnoreOfflineNodes, - ("hvparams", ht.EmptyDict, ht.TDict), - ("beparams", ht.EmptyDict, ht.TDict), - ] REQ_BGL = False def CheckArguments(self): @@ -4964,12 +5055,6 @@ class LURebootInstance(LogicalUnit): """ HPATH = "instance-reboot" HTYPE = constants.HTYPE_INSTANCE - _OP_PARAMS = [ - _PInstanceName, - ("ignore_secondaries", False, ht.TBool), - ("reboot_type", ht.NoDefault, ht.TElemOf(constants.REBOOT_TYPES)), - _PShutdownTimeout, - ] REQ_BGL = False def ExpandNames(self): @@ -5045,11 +5130,6 @@ class LUShutdownInstance(LogicalUnit): """ HPATH = "instance-stop" HTYPE = constants.HTYPE_INSTANCE - _OP_PARAMS = [ - _PInstanceName, - _PIgnoreOfflineNodes, - ("timeout", constants.DEFAULT_SHUTDOWN_TIMEOUT, ht.TPositiveInt), - ] REQ_BGL = False def ExpandNames(self): @@ -5112,12 +5192,6 @@ class LUReinstallInstance(LogicalUnit): """ HPATH = "instance-reinstall" HTYPE = constants.HTYPE_INSTANCE - _OP_PARAMS = [ - _PInstanceName, - ("os_type", None, ht.TMaybeString), - ("force_variant", False, ht.TBool), - ("osparams", None, ht.TOr(ht.TDict, ht.TNone)), - ] REQ_BGL = False def ExpandNames(self): @@ -5204,10 +5278,6 @@ class LURecreateInstanceDisks(LogicalUnit): """ HPATH = "instance-recreate-disks" HTYPE = constants.HTYPE_INSTANCE - _OP_PARAMS = [ - _PInstanceName, - ("disks", ht.EmptyList, ht.TListOf(ht.TPositiveInt)), - ] REQ_BGL = False def ExpandNames(self): @@ -5268,12 +5338,6 @@ class LURenameInstance(LogicalUnit): """ HPATH = "instance-rename" HTYPE = constants.HTYPE_INSTANCE - _OP_PARAMS = [ - _PInstanceName, - ("new_name", ht.NoDefault, ht.TNonEmptyString), - ("ip_check", False, ht.TBool), - ("name_check", True, ht.TBool), - ] def CheckArguments(self): """Check arguments. @@ -5312,6 +5376,8 @@ class LURenameInstance(LogicalUnit): new_name = self.op.new_name if self.op.name_check: hostname = netutils.GetHostname(name=new_name) + self.LogInfo("Resolved given name '%s' to '%s'", new_name, + hostname.name) new_name = self.op.new_name = hostname.name if (self.op.ip_check and netutils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT)): @@ -5320,19 +5386,22 @@ class LURenameInstance(LogicalUnit): errors.ECODE_NOTUNIQUE) instance_list = self.cfg.GetInstanceList() - if new_name in instance_list: + if new_name in instance_list and new_name != instance.name: raise errors.OpPrereqError("Instance '%s' is already in the cluster" % new_name, errors.ECODE_EXISTS) def Exec(self, feedback_fn): - """Reinstall the instance. + """Rename the instance. """ inst = self.instance old_name = inst.name - if inst.disk_template == constants.DT_FILE: + rename_file_storage = False + if (inst.disk_template == constants.DT_FILE and + self.op.new_name != inst.name): old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1]) + rename_file_storage = True self.cfg.RenameInstance(inst.name, self.op.new_name) # Change the instance lock. This is definitely safe while we hold the BGL @@ -5342,7 +5411,7 @@ class LURenameInstance(LogicalUnit): # re-read the instance from the configuration after rename inst = self.cfg.GetInstanceInfo(self.op.new_name) - if inst.disk_template == constants.DT_FILE: + if rename_file_storage: new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1]) result = self.rpc.call_file_storage_dir_rename(inst.primary_node, old_file_storage_dir, @@ -5374,11 +5443,6 @@ class LURemoveInstance(LogicalUnit): """ HPATH = "instance-remove" HTYPE = constants.HTYPE_INSTANCE - _OP_PARAMS = [ - _PInstanceName, - ("ignore_failures", False, ht.TBool), - _PShutdownTimeout, - ] REQ_BGL = False def ExpandNames(self): @@ -5461,323 +5525,41 @@ class LUQueryInstances(NoHooksLU): """ # pylint: disable-msg=W0142 - _OP_PARAMS = [ - _POutputFields, - ("names", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)), - ("use_locking", False, ht.TBool), - ] REQ_BGL = False - _SIMPLE_FIELDS = ["name", "os", "network_port", "hypervisor", - "serial_no", "ctime", "mtime", "uuid"] - _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes", - "admin_state", - "disk_template", "ip", "mac", "bridge", - "nic_mode", "nic_link", - "sda_size", "sdb_size", "vcpus", "tags", - "network_port", "beparams", - r"(disk)\.(size)/([0-9]+)", - r"(disk)\.(sizes)", "disk_usage", - r"(nic)\.(mac|ip|mode|link)/([0-9]+)", - r"(nic)\.(bridge)/([0-9]+)", - r"(nic)\.(macs|ips|modes|links|bridges)", - r"(disk|nic)\.(count)", - "hvparams", "custom_hvparams", - "custom_beparams", "custom_nicparams", - ] + _SIMPLE_FIELDS + - ["hv/%s" % name - for name in constants.HVS_PARAMETERS - if name not in constants.HVC_GLOBALS] + - ["be/%s" % name - for name in constants.BES_PARAMETERS]) - _FIELDS_DYNAMIC = utils.FieldSet("oper_state", - "oper_ram", - "oper_vcpus", - "status") - def CheckArguments(self): - _CheckOutputFields(static=self._FIELDS_STATIC, - dynamic=self._FIELDS_DYNAMIC, - selected=self.op.output_fields) + self.iq = _InstanceQuery(self.op.names, self.op.output_fields, + self.op.use_locking) def ExpandNames(self): - self.needed_locks = {} - self.share_locks[locking.LEVEL_INSTANCE] = 1 - self.share_locks[locking.LEVEL_NODE] = 1 - - if self.op.names: - self.wanted = _GetWantedInstances(self, self.op.names) - else: - self.wanted = locking.ALL_SET - - self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields) - self.do_locking = self.do_node_query and self.op.use_locking - if self.do_locking: - self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted - self.needed_locks[locking.LEVEL_NODE] = [] - self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + self.iq.ExpandNames(self) def DeclareLocks(self, level): - if level == locking.LEVEL_NODE and self.do_locking: - self._LockInstancesNodes() + self.iq.DeclareLocks(self, level) def Exec(self, feedback_fn): - """Computes the list of nodes and their attributes. + return self.iq.OldStyleQuery(self) - """ - # pylint: disable-msg=R0912 - # way too many branches here - all_info = self.cfg.GetAllInstancesInfo() - if self.wanted == locking.ALL_SET: - # caller didn't specify instance names, so ordering is not important - if self.do_locking: - instance_names = self.acquired_locks[locking.LEVEL_INSTANCE] - else: - instance_names = all_info.keys() - instance_names = utils.NiceSort(instance_names) - else: - # caller did specify names, so we must keep the ordering - if self.do_locking: - tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE] - else: - tgt_set = all_info.keys() - missing = set(self.wanted).difference(tgt_set) - if missing: - raise errors.OpExecError("Some instances were removed before" - " retrieving their data: %s" % missing) - instance_names = self.wanted - instance_list = [all_info[iname] for iname in instance_names] +class LUFailoverInstance(LogicalUnit): + """Failover an instance. - # begin data gathering + """ + HPATH = "instance-failover" + HTYPE = constants.HTYPE_INSTANCE + REQ_BGL = False - nodes = frozenset([inst.primary_node for inst in instance_list]) - hv_list = list(set([inst.hypervisor for inst in instance_list])) + def ExpandNames(self): + self._ExpandAndLockInstance() + self.needed_locks[locking.LEVEL_NODE] = [] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE - bad_nodes = [] - off_nodes = [] - if self.do_node_query: - live_data = {} - node_data = self.rpc.call_all_instances_info(nodes, hv_list) - for name in nodes: - result = node_data[name] - if result.offline: - # offline nodes will be in both lists - off_nodes.append(name) - if result.fail_msg: - bad_nodes.append(name) - else: - if result.payload: - live_data.update(result.payload) - # else no instance is alive - else: - live_data = dict([(name, {}) for name in instance_names]) + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE: + self._LockInstancesNodes() - # end data gathering - - HVPREFIX = "hv/" - BEPREFIX = "be/" - output = [] - cluster = self.cfg.GetClusterInfo() - for instance in instance_list: - iout = [] - i_hv = cluster.FillHV(instance, skip_globals=True) - i_be = cluster.FillBE(instance) - i_nicp = [cluster.SimpleFillNIC(nic.nicparams) for nic in instance.nics] - for field in self.op.output_fields: - st_match = self._FIELDS_STATIC.Matches(field) - if field in self._SIMPLE_FIELDS: - val = getattr(instance, field) - elif field == "pnode": - val = instance.primary_node - elif field == "snodes": - val = list(instance.secondary_nodes) - elif field == "admin_state": - val = instance.admin_up - elif field == "oper_state": - if instance.primary_node in bad_nodes: - val = None - else: - val = bool(live_data.get(instance.name)) - elif field == "status": - if instance.primary_node in off_nodes: - val = "ERROR_nodeoffline" - elif instance.primary_node in bad_nodes: - val = "ERROR_nodedown" - else: - running = bool(live_data.get(instance.name)) - if running: - if instance.admin_up: - val = "running" - else: - val = "ERROR_up" - else: - if instance.admin_up: - val = "ERROR_down" - else: - val = "ADMIN_down" - elif field == "oper_ram": - if instance.primary_node in bad_nodes: - val = None - elif instance.name in live_data: - val = live_data[instance.name].get("memory", "?") - else: - val = "-" - elif field == "oper_vcpus": - if instance.primary_node in bad_nodes: - val = None - elif instance.name in live_data: - val = live_data[instance.name].get("vcpus", "?") - else: - val = "-" - elif field == "vcpus": - val = i_be[constants.BE_VCPUS] - elif field == "disk_template": - val = instance.disk_template - elif field == "ip": - if instance.nics: - val = instance.nics[0].ip - else: - val = None - elif field == "nic_mode": - if instance.nics: - val = i_nicp[0][constants.NIC_MODE] - else: - val = None - elif field == "nic_link": - if instance.nics: - val = i_nicp[0][constants.NIC_LINK] - else: - val = None - elif field == "bridge": - if (instance.nics and - i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED): - val = i_nicp[0][constants.NIC_LINK] - else: - val = None - elif field == "mac": - if instance.nics: - val = instance.nics[0].mac - else: - val = None - elif field == "custom_nicparams": - val = [nic.nicparams for nic in instance.nics] - elif field == "sda_size" or field == "sdb_size": - idx = ord(field[2]) - ord('a') - try: - val = instance.FindDisk(idx).size - except errors.OpPrereqError: - val = None - elif field == "disk_usage": # total disk usage per node - disk_sizes = [{'size': disk.size} for disk in instance.disks] - val = _ComputeDiskSize(instance.disk_template, disk_sizes) - elif field == "tags": - val = list(instance.GetTags()) - elif field == "custom_hvparams": - val = instance.hvparams # not filled! - elif field == "hvparams": - val = i_hv - elif (field.startswith(HVPREFIX) and - field[len(HVPREFIX):] in constants.HVS_PARAMETERS and - field[len(HVPREFIX):] not in constants.HVC_GLOBALS): - val = i_hv.get(field[len(HVPREFIX):], None) - elif field == "custom_beparams": - val = instance.beparams - elif field == "beparams": - val = i_be - elif (field.startswith(BEPREFIX) and - field[len(BEPREFIX):] in constants.BES_PARAMETERS): - val = i_be.get(field[len(BEPREFIX):], None) - elif st_match and st_match.groups(): - # matches a variable list - st_groups = st_match.groups() - if st_groups and st_groups[0] == "disk": - if st_groups[1] == "count": - val = len(instance.disks) - elif st_groups[1] == "sizes": - val = [disk.size for disk in instance.disks] - elif st_groups[1] == "size": - try: - val = instance.FindDisk(st_groups[2]).size - except errors.OpPrereqError: - val = None - else: - assert False, "Unhandled disk parameter" - elif st_groups[0] == "nic": - if st_groups[1] == "count": - val = len(instance.nics) - elif st_groups[1] == "macs": - val = [nic.mac for nic in instance.nics] - elif st_groups[1] == "ips": - val = [nic.ip for nic in instance.nics] - elif st_groups[1] == "modes": - val = [nicp[constants.NIC_MODE] for nicp in i_nicp] - elif st_groups[1] == "links": - val = [nicp[constants.NIC_LINK] for nicp in i_nicp] - elif st_groups[1] == "bridges": - val = [] - for nicp in i_nicp: - if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED: - val.append(nicp[constants.NIC_LINK]) - else: - val.append(None) - else: - # index-based item - nic_idx = int(st_groups[2]) - if nic_idx >= len(instance.nics): - val = None - else: - if st_groups[1] == "mac": - val = instance.nics[nic_idx].mac - elif st_groups[1] == "ip": - val = instance.nics[nic_idx].ip - elif st_groups[1] == "mode": - val = i_nicp[nic_idx][constants.NIC_MODE] - elif st_groups[1] == "link": - val = i_nicp[nic_idx][constants.NIC_LINK] - elif st_groups[1] == "bridge": - nic_mode = i_nicp[nic_idx][constants.NIC_MODE] - if nic_mode == constants.NIC_MODE_BRIDGED: - val = i_nicp[nic_idx][constants.NIC_LINK] - else: - val = None - else: - assert False, "Unhandled NIC parameter" - else: - assert False, ("Declared but unhandled variable parameter '%s'" % - field) - else: - assert False, "Declared but unhandled parameter '%s'" % field - iout.append(val) - output.append(iout) - - return output - - -class LUFailoverInstance(LogicalUnit): - """Failover an instance. - - """ - HPATH = "instance-failover" - HTYPE = constants.HTYPE_INSTANCE - _OP_PARAMS = [ - _PInstanceName, - ("ignore_consistency", False, ht.TBool), - _PShutdownTimeout, - ] - REQ_BGL = False - - def ExpandNames(self): - self._ExpandAndLockInstance() - self.needed_locks[locking.LEVEL_NODE] = [] - self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE - - def DeclareLocks(self, level): - if level == locking.LEVEL_NODE: - self._LockInstancesNodes() - - def BuildHooksEnv(self): - """Build hooks env. + def BuildHooksEnv(self): + """Build hooks env. This runs on master, primary and secondary nodes of the instance. @@ -5915,13 +5697,6 @@ class LUMigrateInstance(LogicalUnit): """ HPATH = "instance-migrate" HTYPE = constants.HTYPE_INSTANCE - _OP_PARAMS = [ - _PInstanceName, - _PMigrationMode, - _PMigrationLive, - ("cleanup", False, ht.TBool), - ] - REQ_BGL = False def ExpandNames(self): @@ -5968,11 +5743,6 @@ class LUMoveInstance(LogicalUnit): """ HPATH = "instance-move" HTYPE = constants.HTYPE_INSTANCE - _OP_PARAMS = [ - _PInstanceName, - ("target_node", ht.NoDefault, ht.TNonEmptyString), - _PShutdownTimeout, - ] REQ_BGL = False def ExpandNames(self): @@ -6148,11 +5918,6 @@ class LUMigrateNode(LogicalUnit): """ HPATH = "node-migrate" HTYPE = constants.HTYPE_NODE - _OP_PARAMS = [ - _PNodeName, - _PMigrationMode, - _PMigrationLive, - ] REQ_BGL = False def ExpandNames(self): @@ -6715,7 +6480,7 @@ def _GenerateDiskTemplate(lu, template_name, if len(secondary_nodes) != 0: raise errors.ProgrammerError("Wrong template configuration") - _RequireFileStorage() + opcodes.RequireFileStorage() for idx, disk in enumerate(disk_info): disk_index = idx + base_index @@ -6762,32 +6527,52 @@ def _WipeDisks(lu, instance): """ node = instance.primary_node - for idx, device in enumerate(instance.disks): - lu.LogInfo("* Wiping disk %d", idx) - logging.info("Wiping disk %d for instance %s", idx, instance.name) - - # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but - # MAX_WIPE_CHUNK at max - wipe_chunk_size = min(constants.MAX_WIPE_CHUNK, device.size / 100.0 * - constants.MIN_WIPE_CHUNK_PERCENT) - - offset = 0 - size = device.size - last_output = 0 - start_time = time.time() - - while offset < size: - wipe_size = min(wipe_chunk_size, size - offset) - result = lu.rpc.call_blockdev_wipe(node, device, offset, wipe_size) - result.Raise("Could not wipe disk %d at offset %d for size %d" % - (idx, offset, wipe_size)) - now = time.time() - offset += wipe_size - if now - last_output >= 60: - eta = _CalcEta(now - start_time, offset, size) - lu.LogInfo(" - done: %.1f%% ETA: %s" % - (offset / float(size) * 100, utils.FormatSeconds(eta))) - last_output = now + logging.info("Pause sync of instance %s disks", instance.name) + result = lu.rpc.call_blockdev_pause_resume_sync(node, instance.disks, True) + + for idx, success in enumerate(result.payload): + if not success: + logging.warn("pause-sync of instance %s for disks %d failed", + instance.name, idx) + + try: + for idx, device in enumerate(instance.disks): + lu.LogInfo("* Wiping disk %d", idx) + logging.info("Wiping disk %d for instance %s", idx, instance.name) + + # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but + # MAX_WIPE_CHUNK at max + wipe_chunk_size = min(constants.MAX_WIPE_CHUNK, device.size / 100.0 * + constants.MIN_WIPE_CHUNK_PERCENT) + + offset = 0 + size = device.size + last_output = 0 + start_time = time.time() + + while offset < size: + wipe_size = min(wipe_chunk_size, size - offset) + result = lu.rpc.call_blockdev_wipe(node, device, offset, wipe_size) + result.Raise("Could not wipe disk %d at offset %d for size %d" % + (idx, offset, wipe_size)) + now = time.time() + offset += wipe_size + if now - last_output >= 60: + eta = _CalcEta(now - start_time, offset, size) + lu.LogInfo(" - done: %.1f%% ETA: %s" % + (offset / float(size) * 100, utils.FormatSeconds(eta))) + last_output = now + finally: + logging.info("Resume sync of instance %s disks", instance.name) + + result = lu.rpc.call_blockdev_pause_resume_sync(node, instance.disks, False) + + for idx, success in enumerate(result.payload): + if not success: + lu.LogWarning("Warning: Resume sync of disk %d failed. Please have a" + " look at the status and troubleshoot the issue.", idx) + logging.warn("resume-sync of instance %s for disks %d failed", + instance.name, idx) def _CreateDisks(lu, instance, to_skip=None, target_node=None): @@ -6900,11 +6685,11 @@ def _ComputeDiskSizePerVG(disk_template, disks): # Required free disk space as a function of disk and swap space req_size_dict = { - constants.DT_DISKLESS: None, + constants.DT_DISKLESS: {}, constants.DT_PLAIN: _compute(disks, 0), # 128 MB are added for drbd metadata for each disk constants.DT_DRBD8: _compute(disks, 128), - constants.DT_FILE: None, + constants.DT_FILE: {}, } if disk_template not in req_size_dict: @@ -6913,6 +6698,7 @@ def _ComputeDiskSizePerVG(disk_template, disks): return req_size_dict[disk_template] + def _ComputeDiskSize(disk_template, disks): """Compute disk size requirements in the volume group @@ -6995,37 +6781,6 @@ class LUCreateInstance(LogicalUnit): """ HPATH = "instance-add" HTYPE = constants.HTYPE_INSTANCE - _OP_PARAMS = [ - _PInstanceName, - ("mode", ht.NoDefault, ht.TElemOf(constants.INSTANCE_CREATE_MODES)), - ("start", True, ht.TBool), - ("wait_for_sync", True, ht.TBool), - ("ip_check", True, ht.TBool), - ("name_check", True, ht.TBool), - ("disks", ht.NoDefault, ht.TListOf(ht.TDict)), - ("nics", ht.NoDefault, ht.TListOf(ht.TDict)), - ("hvparams", ht.EmptyDict, ht.TDict), - ("beparams", ht.EmptyDict, ht.TDict), - ("osparams", ht.EmptyDict, ht.TDict), - ("no_install", None, ht.TMaybeBool), - ("os_type", None, ht.TMaybeString), - ("force_variant", False, ht.TBool), - ("source_handshake", None, ht.TOr(ht.TList, ht.TNone)), - ("source_x509_ca", None, ht.TMaybeString), - ("source_instance_name", None, ht.TMaybeString), - ("source_shutdown_timeout", constants.DEFAULT_SHUTDOWN_TIMEOUT, - ht.TPositiveInt), - ("src_node", None, ht.TMaybeString), - ("src_path", None, ht.TMaybeString), - ("pnode", None, ht.TMaybeString), - ("snode", None, ht.TMaybeString), - ("iallocator", None, ht.TMaybeString), - ("hypervisor", None, ht.TMaybeString), - ("disk_template", ht.NoDefault, _CheckDiskTemplate), - ("identify_defaults", False, ht.TBool), - ("file_driver", None, ht.TOr(ht.TNone, ht.TElemOf(constants.FILE_DRIVER))), - ("file_storage_dir", None, ht.TMaybeString), - ] REQ_BGL = False def CheckArguments(self): @@ -7449,8 +7204,6 @@ class LUCreateInstance(LogicalUnit): export_info = self._ReadExportInfo() self._ReadExportParams(export_info) - _CheckDiskTemplate(self.op.disk_template) - if (not self.cfg.GetVGName() and self.op.disk_template not in constants.DTS_NOT_LVM): raise errors.OpPrereqError("Cluster does not support lvm-based" @@ -7678,23 +7431,28 @@ class LUCreateInstance(LogicalUnit): _CheckNodesFreeDiskPerVG(self, nodenames, req_sizes) else: # instead, we must check the adoption data - all_lvs = set([i["adopt"] for i in self.disks]) + all_lvs = set([i["vg"] + "/" + i["adopt"] for i in self.disks]) if len(all_lvs) != len(self.disks): raise errors.OpPrereqError("Duplicate volume names given for adoption", errors.ECODE_INVAL) for lv_name in all_lvs: try: - # FIXME: VG must be provided here. Else all LVs with the - # same name will be locked on all VGs. + # FIXME: lv_name here is "vg/lv" need to ensure that other calls + # to ReserveLV uses the same syntax self.cfg.ReserveLV(lv_name, self.proc.GetECId()) except errors.ReservationError: raise errors.OpPrereqError("LV named %s used by another instance" % lv_name, errors.ECODE_NOTUNIQUE) + vg_names = self.rpc.call_vg_list([pnode.name]) + vg_names.Raise("Cannot get VG information from node %s" % pnode.name) + node_lvs = self.rpc.call_lv_list([pnode.name], - self.cfg.GetVGName())[pnode.name] + vg_names[pnode.name].payload.keys() + )[pnode.name] node_lvs.Raise("Cannot get LV information from node %s" % pnode.name) node_lvs = node_lvs.payload + delta = all_lvs.difference(node_lvs.keys()) if delta: raise errors.OpPrereqError("Missing logical volume(s): %s" % @@ -7707,7 +7465,7 @@ class LUCreateInstance(LogicalUnit): errors.ECODE_STATE) # update the size of disk based on what is found for dsk in self.disks: - dsk["size"] = int(float(node_lvs[dsk["adopt"]][0])) + dsk["size"] = int(float(node_lvs[dsk["vg"] + "/" + dsk["adopt"]][0])) _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams) @@ -7928,7 +7686,7 @@ class LUCreateInstance(LogicalUnit): return list(iobj.all_nodes) -class LUConnectConsole(NoHooksLU): +class LUInstanceConsole(NoHooksLU): """Connect to an instance's console. This is somewhat special in that it returns the command line that @@ -7936,9 +7694,6 @@ class LUConnectConsole(NoHooksLU): console. """ - _OP_PARAMS = [ - _PInstanceName - ] REQ_BGL = False def ExpandNames(self): @@ -7982,10 +7737,12 @@ class LUConnectConsole(NoHooksLU): # instance and then saving the defaults in the instance itself. hvparams = cluster.FillHV(instance) beparams = cluster.FillBE(instance) - console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams) + console = hyper.GetInstanceConsole(instance, hvparams, beparams) - # build ssh cmdline - return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True) + assert console.instance == instance.name + assert console.Validate() + + return console.ToDict() class LUReplaceDisks(LogicalUnit): @@ -7994,14 +7751,6 @@ class LUReplaceDisks(LogicalUnit): """ HPATH = "mirrors-replace" HTYPE = constants.HTYPE_INSTANCE - _OP_PARAMS = [ - _PInstanceName, - ("mode", ht.NoDefault, ht.TElemOf(constants.REPLACE_MODES)), - ("disks", ht.EmptyList, ht.TListOf(ht.TPositiveInt)), - ("remote_node", None, ht.TMaybeString), - ("iallocator", None, ht.TMaybeString), - ("early_release", False, ht.TBool), - ] REQ_BGL = False def CheckArguments(self): @@ -8738,12 +8487,6 @@ class LURepairNodeStorage(NoHooksLU): """Repairs the volume group on a node. """ - _OP_PARAMS = [ - _PNodeName, - ("storage_type", ht.NoDefault, _CheckStorageType), - ("name", ht.NoDefault, ht.TNonEmptyString), - ("ignore_consistency", False, ht.TBool), - ] REQ_BGL = False def CheckArguments(self): @@ -8806,11 +8549,6 @@ class LUNodeEvacuationStrategy(NoHooksLU): """Computes the node evacuation strategy. """ - _OP_PARAMS = [ - ("nodes", ht.NoDefault, ht.TListOf(ht.TNonEmptyString)), - ("remote_node", None, ht.TMaybeString), - ("iallocator", None, ht.TMaybeString), - ] REQ_BGL = False def CheckArguments(self): @@ -8857,12 +8595,6 @@ class LUGrowDisk(LogicalUnit): """ HPATH = "disk-grow" HTYPE = constants.HTYPE_INSTANCE - _OP_PARAMS = [ - _PInstanceName, - ("disk", ht.NoDefault, ht.TInt), - ("amount", ht.NoDefault, ht.TInt), - ("wait_for_sync", True, ht.TBool), - ] REQ_BGL = False def ExpandNames(self): @@ -8913,7 +8645,7 @@ class LUGrowDisk(LogicalUnit): # TODO: check the free disk space for file, when that feature # will be supported _CheckNodesFreeDiskPerVG(self, nodenames, - {self.disk.physical_id[0]: self.op.amount}) + self.disk.ComputeGrowth(self.op.amount)) def Exec(self, feedback_fn): """Execute disk grow. @@ -8957,10 +8689,6 @@ class LUQueryInstanceData(NoHooksLU): """Query runtime instance data. """ - _OP_PARAMS = [ - ("instances", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)), - ("static", False, ht.TBool), - ] REQ_BGL = False def ExpandNames(self): @@ -9117,19 +8845,6 @@ class LUSetInstanceParams(LogicalUnit): """ HPATH = "instance-modify" HTYPE = constants.HTYPE_INSTANCE - _OP_PARAMS = [ - _PInstanceName, - ("nics", ht.EmptyList, ht.TList), - ("disks", ht.EmptyList, ht.TList), - ("beparams", ht.EmptyDict, ht.TDict), - ("hvparams", ht.EmptyDict, ht.TDict), - ("disk_template", None, ht.TMaybeString), - ("remote_node", None, ht.TMaybeString), - ("os_name", None, ht.TMaybeString), - ("force_variant", False, ht.TBool), - ("osparams", None, ht.TOr(ht.TDict, ht.TNone)), - _PForce, - ] REQ_BGL = False def CheckArguments(self): @@ -9186,13 +8901,12 @@ class LUSetInstanceParams(LogicalUnit): " changes not supported at the same time", errors.ECODE_INVAL) - if self.op.disk_template: - _CheckDiskTemplate(self.op.disk_template) - if (self.op.disk_template in constants.DTS_NET_MIRROR and - self.op.remote_node is None): - raise errors.OpPrereqError("Changing the disk template to a mirrored" - " one requires specifying a secondary node", - errors.ECODE_INVAL) + if (self.op.disk_template and + self.op.disk_template in constants.DTS_NET_MIRROR and + self.op.remote_node is None): + raise errors.OpPrereqError("Changing the disk template to a mirrored" + " one requires specifying a secondary node", + errors.ECODE_INVAL) # NIC validation nic_addremove = 0 @@ -9710,7 +9424,7 @@ class LUSetInstanceParams(LogicalUnit): if self.op.disk_template: r_shut = _ShutdownInstanceDisks(self, instance) if not r_shut: - raise errors.OpExecError("Cannot shutdow instance disks, unable to" + raise errors.OpExecError("Cannot shutdown instance disks, unable to" " proceed with disk template conversion") mode = (instance.disk_template, self.op.disk_template) try: @@ -9780,14 +9494,10 @@ class LUSetInstanceParams(LogicalUnit): } -class LUQueryExports(NoHooksLU): +class LUBackupQuery(NoHooksLU): """Query the exports list """ - _OP_PARAMS = [ - ("nodes", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)), - ("use_locking", False, ht.TBool), - ] REQ_BGL = False def ExpandNames(self): @@ -9820,14 +9530,10 @@ class LUQueryExports(NoHooksLU): return result -class LUPrepareExport(NoHooksLU): +class LUBackupPrepare(NoHooksLU): """Prepares an instance for an export and returns useful information. """ - _OP_PARAMS = [ - _PInstanceName, - ("mode", ht.NoDefault, ht.TElemOf(constants.EXPORT_MODES)), - ] REQ_BGL = False def ExpandNames(self): @@ -9875,23 +9581,12 @@ class LUPrepareExport(NoHooksLU): return None -class LUExportInstance(LogicalUnit): +class LUBackupExport(LogicalUnit): """Export an instance to an image in the cluster. """ HPATH = "instance-export" HTYPE = constants.HTYPE_INSTANCE - _OP_PARAMS = [ - _PInstanceName, - ("target_node", ht.NoDefault, ht.TOr(ht.TNonEmptyString, ht.TList)), - ("shutdown", True, ht.TBool), - _PShutdownTimeout, - ("remove_instance", False, ht.TBool), - ("ignore_remove_failures", False, ht.TBool), - ("mode", constants.EXPORT_MODE_LOCAL, ht.TElemOf(constants.EXPORT_MODES)), - ("x509_key_name", None, ht.TOr(ht.TList, ht.TNone)), - ("destination_x509_ca", None, ht.TMaybeString), - ] REQ_BGL = False def CheckArguments(self): @@ -10058,7 +9753,7 @@ class LUExportInstance(LogicalUnit): nodelist.remove(self.dst_node.name) # on one-node clusters nodelist will be empty after the removal - # if we proceed the backup would be removed because OpQueryExports + # if we proceed the backup would be removed because OpBackupQuery # substitutes an empty list with the full cluster node list. iname = self.instance.name if nodelist: @@ -10174,13 +9869,10 @@ class LUExportInstance(LogicalUnit): return fin_resu, dresults -class LURemoveExport(NoHooksLU): +class LUBackupRemove(NoHooksLU): """Remove exports related to the named instance. """ - _OP_PARAMS = [ - _PInstanceName, - ] REQ_BGL = False def ExpandNames(self): @@ -10224,6 +9916,470 @@ class LURemoveExport(NoHooksLU): " Domain Name.") +class LUGroupAdd(LogicalUnit): + """Logical unit for creating node groups. + + """ + HPATH = "group-add" + HTYPE = constants.HTYPE_GROUP + REQ_BGL = False + + def ExpandNames(self): + # We need the new group's UUID here so that we can create and acquire the + # corresponding lock. Later, in Exec(), we'll indicate to cfg.AddNodeGroup + # that it should not check whether the UUID exists in the configuration. + self.group_uuid = self.cfg.GenerateUniqueID(self.proc.GetECId()) + self.needed_locks = {} + self.add_locks[locking.LEVEL_NODEGROUP] = self.group_uuid + + def CheckPrereq(self): + """Check prerequisites. + + This checks that the given group name is not an existing node group + already. + + """ + try: + existing_uuid = self.cfg.LookupNodeGroup(self.op.group_name) + except errors.OpPrereqError: + pass + else: + raise errors.OpPrereqError("Desired group name '%s' already exists as a" + " node group (UUID: %s)" % + (self.op.group_name, existing_uuid), + errors.ECODE_EXISTS) + + if self.op.ndparams: + utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES) + + def BuildHooksEnv(self): + """Build hooks env. + + """ + env = { + "GROUP_NAME": self.op.group_name, + } + mn = self.cfg.GetMasterNode() + return env, [mn], [mn] + + def Exec(self, feedback_fn): + """Add the node group to the cluster. + + """ + group_obj = objects.NodeGroup(name=self.op.group_name, members=[], + uuid=self.group_uuid, + alloc_policy=self.op.alloc_policy, + ndparams=self.op.ndparams) + + self.cfg.AddNodeGroup(group_obj, self.proc.GetECId(), check_uuid=False) + del self.remove_locks[locking.LEVEL_NODEGROUP] + + +class LUGroupAssignNodes(NoHooksLU): + """Logical unit for assigning nodes to groups. + + """ + REQ_BGL = False + + def ExpandNames(self): + # These raise errors.OpPrereqError on their own: + self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) + self.op.nodes = _GetWantedNodes(self, self.op.nodes) + + # We want to lock all the affected nodes and groups. We have readily + # available the list of nodes, and the *destination* group. To gather the + # list of "source" groups, we need to fetch node information. + self.node_data = self.cfg.GetAllNodesInfo() + affected_groups = set(self.node_data[node].group for node in self.op.nodes) + affected_groups.add(self.group_uuid) + + self.needed_locks = { + locking.LEVEL_NODEGROUP: list(affected_groups), + locking.LEVEL_NODE: self.op.nodes, + } + + def CheckPrereq(self): + """Check prerequisites. + + """ + self.group = self.cfg.GetNodeGroup(self.group_uuid) + instance_data = self.cfg.GetAllInstancesInfo() + + if self.group is None: + raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" % + (self.op.group_name, self.group_uuid)) + + (new_splits, previous_splits) = \ + self.CheckAssignmentForSplitInstances([(node, self.group_uuid) + for node in self.op.nodes], + self.node_data, instance_data) + + if new_splits: + fmt_new_splits = utils.CommaJoin(utils.NiceSort(new_splits)) + + if not self.op.force: + raise errors.OpExecError("The following instances get split by this" + " change and --force was not given: %s" % + fmt_new_splits) + else: + self.LogWarning("This operation will split the following instances: %s", + fmt_new_splits) + + if previous_splits: + self.LogWarning("In addition, these already-split instances continue" + " to be spit across groups: %s", + utils.CommaJoin(utils.NiceSort(previous_splits))) + + def Exec(self, feedback_fn): + """Assign nodes to a new group. + + """ + for node in self.op.nodes: + self.node_data[node].group = self.group_uuid + + self.cfg.Update(self.group, feedback_fn) # Saves all modified nodes. + + @staticmethod + def CheckAssignmentForSplitInstances(changes, node_data, instance_data): + """Check for split instances after a node assignment. + + This method considers a series of node assignments as an atomic operation, + and returns information about split instances after applying the set of + changes. + + In particular, it returns information about newly split instances, and + instances that were already split, and remain so after the change. + + Only instances whose disk template is listed in constants.DTS_NET_MIRROR are + considered. + + @type changes: list of (node_name, new_group_uuid) pairs. + @param changes: list of node assignments to consider. + @param node_data: a dict with data for all nodes + @param instance_data: a dict with all instances to consider + @rtype: a two-tuple + @return: a list of instances that were previously okay and result split as a + consequence of this change, and a list of instances that were previously + split and this change does not fix. + + """ + changed_nodes = dict((node, group) for node, group in changes + if node_data[node].group != group) + + all_split_instances = set() + previously_split_instances = set() + + def InstanceNodes(instance): + return [instance.primary_node] + list(instance.secondary_nodes) + + for inst in instance_data.values(): + if inst.disk_template not in constants.DTS_NET_MIRROR: + continue + + instance_nodes = InstanceNodes(inst) + + if len(set(node_data[node].group for node in instance_nodes)) > 1: + previously_split_instances.add(inst.name) + + if len(set(changed_nodes.get(node, node_data[node].group) + for node in instance_nodes)) > 1: + all_split_instances.add(inst.name) + + return (list(all_split_instances - previously_split_instances), + list(previously_split_instances & all_split_instances)) + + +class _GroupQuery(_QueryBase): + + FIELDS = query.GROUP_FIELDS + + def ExpandNames(self, lu): + lu.needed_locks = {} + + self._all_groups = lu.cfg.GetAllNodeGroupsInfo() + name_to_uuid = dict((g.name, g.uuid) for g in self._all_groups.values()) + + if not self.names: + self.wanted = [name_to_uuid[name] + for name in utils.NiceSort(name_to_uuid.keys())] + else: + # Accept names to be either names or UUIDs. + missing = [] + self.wanted = [] + all_uuid = frozenset(self._all_groups.keys()) + + for name in self.names: + if name in all_uuid: + self.wanted.append(name) + elif name in name_to_uuid: + self.wanted.append(name_to_uuid[name]) + else: + missing.append(name) + + if missing: + raise errors.OpPrereqError("Some groups do not exist: %s" % missing, + errors.ECODE_NOENT) + + def DeclareLocks(self, lu, level): + pass + + def _GetQueryData(self, lu): + """Computes the list of node groups and their attributes. + + """ + do_nodes = query.GQ_NODE in self.requested_data + do_instances = query.GQ_INST in self.requested_data + + group_to_nodes = None + group_to_instances = None + + # For GQ_NODE, we need to map group->[nodes], and group->[instances] for + # GQ_INST. The former is attainable with just GetAllNodesInfo(), but for the + # latter GetAllInstancesInfo() is not enough, for we have to go through + # instance->node. Hence, we will need to process nodes even if we only need + # instance information. + if do_nodes or do_instances: + all_nodes = lu.cfg.GetAllNodesInfo() + group_to_nodes = dict((uuid, []) for uuid in self.wanted) + node_to_group = {} + + for node in all_nodes.values(): + if node.group in group_to_nodes: + group_to_nodes[node.group].append(node.name) + node_to_group[node.name] = node.group + + if do_instances: + all_instances = lu.cfg.GetAllInstancesInfo() + group_to_instances = dict((uuid, []) for uuid in self.wanted) + + for instance in all_instances.values(): + node = instance.primary_node + if node in node_to_group: + group_to_instances[node_to_group[node]].append(instance.name) + + if not do_nodes: + # Do not pass on node information if it was not requested. + group_to_nodes = None + + return query.GroupQueryData([self._all_groups[uuid] + for uuid in self.wanted], + group_to_nodes, group_to_instances) + + +class LUGroupQuery(NoHooksLU): + """Logical unit for querying node groups. + + """ + REQ_BGL = False + + def CheckArguments(self): + self.gq = _GroupQuery(self.op.names, self.op.output_fields, False) + + def ExpandNames(self): + self.gq.ExpandNames(self) + + def Exec(self, feedback_fn): + return self.gq.OldStyleQuery(self) + + +class LUGroupSetParams(LogicalUnit): + """Modifies the parameters of a node group. + + """ + HPATH = "group-modify" + HTYPE = constants.HTYPE_GROUP + REQ_BGL = False + + def CheckArguments(self): + all_changes = [ + self.op.ndparams, + self.op.alloc_policy, + ] + + if all_changes.count(None) == len(all_changes): + raise errors.OpPrereqError("Please pass at least one modification", + errors.ECODE_INVAL) + + def ExpandNames(self): + # This raises errors.OpPrereqError on its own: + self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) + + self.needed_locks = { + locking.LEVEL_NODEGROUP: [self.group_uuid], + } + + def CheckPrereq(self): + """Check prerequisites. + + """ + self.group = self.cfg.GetNodeGroup(self.group_uuid) + + if self.group is None: + raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" % + (self.op.group_name, self.group_uuid)) + + if self.op.ndparams: + new_ndparams = _GetUpdatedParams(self.group.ndparams, self.op.ndparams) + utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES) + self.new_ndparams = new_ndparams + + def BuildHooksEnv(self): + """Build hooks env. + + """ + env = { + "GROUP_NAME": self.op.group_name, + "NEW_ALLOC_POLICY": self.op.alloc_policy, + } + mn = self.cfg.GetMasterNode() + return env, [mn], [mn] + + def Exec(self, feedback_fn): + """Modifies the node group. + + """ + result = [] + + if self.op.ndparams: + self.group.ndparams = self.new_ndparams + result.append(("ndparams", str(self.group.ndparams))) + + if self.op.alloc_policy: + self.group.alloc_policy = self.op.alloc_policy + + self.cfg.Update(self.group, feedback_fn) + return result + + + +class LUGroupRemove(LogicalUnit): + HPATH = "group-remove" + HTYPE = constants.HTYPE_GROUP + REQ_BGL = False + + def ExpandNames(self): + # This will raises errors.OpPrereqError on its own: + self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) + self.needed_locks = { + locking.LEVEL_NODEGROUP: [self.group_uuid], + } + + def CheckPrereq(self): + """Check prerequisites. + + This checks that the given group name exists as a node group, that is + empty (i.e., contains no nodes), and that is not the last group of the + cluster. + + """ + # Verify that the group is empty. + group_nodes = [node.name + for node in self.cfg.GetAllNodesInfo().values() + if node.group == self.group_uuid] + + if group_nodes: + raise errors.OpPrereqError("Group '%s' not empty, has the following" + " nodes: %s" % + (self.op.group_name, + utils.CommaJoin(utils.NiceSort(group_nodes))), + errors.ECODE_STATE) + + # Verify the cluster would not be left group-less. + if len(self.cfg.GetNodeGroupList()) == 1: + raise errors.OpPrereqError("Group '%s' is the last group in the cluster," + " which cannot be left without at least one" + " group" % self.op.group_name, + errors.ECODE_STATE) + + def BuildHooksEnv(self): + """Build hooks env. + + """ + env = { + "GROUP_NAME": self.op.group_name, + } + mn = self.cfg.GetMasterNode() + return env, [mn], [mn] + + def Exec(self, feedback_fn): + """Remove the node group. + + """ + try: + self.cfg.RemoveNodeGroup(self.group_uuid) + except errors.ConfigurationError: + raise errors.OpExecError("Group '%s' with UUID %s disappeared" % + (self.op.group_name, self.group_uuid)) + + self.remove_locks[locking.LEVEL_NODEGROUP] = self.group_uuid + + +class LUGroupRename(LogicalUnit): + HPATH = "group-rename" + HTYPE = constants.HTYPE_GROUP + REQ_BGL = False + + def ExpandNames(self): + # This raises errors.OpPrereqError on its own: + self.group_uuid = self.cfg.LookupNodeGroup(self.op.old_name) + + self.needed_locks = { + locking.LEVEL_NODEGROUP: [self.group_uuid], + } + + def CheckPrereq(self): + """Check prerequisites. + + This checks that the given old_name exists as a node group, and that + new_name doesn't. + + """ + try: + new_name_uuid = self.cfg.LookupNodeGroup(self.op.new_name) + except errors.OpPrereqError: + pass + else: + raise errors.OpPrereqError("Desired new name '%s' clashes with existing" + " node group (UUID: %s)" % + (self.op.new_name, new_name_uuid), + errors.ECODE_EXISTS) + + def BuildHooksEnv(self): + """Build hooks env. + + """ + env = { + "OLD_NAME": self.op.old_name, + "NEW_NAME": self.op.new_name, + } + + mn = self.cfg.GetMasterNode() + all_nodes = self.cfg.GetAllNodesInfo() + run_nodes = [mn] + all_nodes.pop(mn, None) + + for node in all_nodes.values(): + if node.group == self.group_uuid: + run_nodes.append(node.name) + + return env, run_nodes, run_nodes + + def Exec(self, feedback_fn): + """Rename the node group. + + """ + group = self.cfg.GetNodeGroup(self.group_uuid) + + if group is None: + raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" % + (self.op.old_name, self.group_uuid)) + + group.name = self.op.new_name + self.cfg.Update(group, feedback_fn) + + return self.op.new_name + + class TagsLU(NoHooksLU): # pylint: disable-msg=W0223 """Generic tags LU. @@ -10262,11 +10418,6 @@ class LUGetTags(TagsLU): """Returns the tags of a given object. """ - _OP_PARAMS = [ - ("kind", ht.NoDefault, ht.TElemOf(constants.VALID_TAG_TYPES)), - # Name is only meaningful for nodes and instances - ("name", ht.NoDefault, ht.TMaybeString), - ] REQ_BGL = False def ExpandNames(self): @@ -10286,9 +10437,6 @@ class LUSearchTags(NoHooksLU): """Searches the tags for a given pattern. """ - _OP_PARAMS = [ - ("pattern", ht.NoDefault, ht.TNonEmptyString), - ] REQ_BGL = False def ExpandNames(self): @@ -10328,12 +10476,6 @@ class LUAddTags(TagsLU): """Sets a tag on a given object. """ - _OP_PARAMS = [ - ("kind", ht.NoDefault, ht.TElemOf(constants.VALID_TAG_TYPES)), - # Name is only meaningful for nodes and instances - ("name", ht.NoDefault, ht.TMaybeString), - ("tags", ht.NoDefault, ht.TListOf(ht.TNonEmptyString)), - ] REQ_BGL = False def CheckPrereq(self): @@ -10362,12 +10504,6 @@ class LUDelTags(TagsLU): """Delete a list of tags from a given object. """ - _OP_PARAMS = [ - ("kind", ht.NoDefault, ht.TElemOf(constants.VALID_TAG_TYPES)), - # Name is only meaningful for nodes and instances - ("name", ht.NoDefault, ht.TMaybeString), - ("tags", ht.NoDefault, ht.TListOf(ht.TNonEmptyString)), - ] REQ_BGL = False def CheckPrereq(self): @@ -10405,12 +10541,6 @@ class LUTestDelay(NoHooksLU): time. """ - _OP_PARAMS = [ - ("duration", ht.NoDefault, ht.TFloat), - ("on_master", True, ht.TBool), - ("on_nodes", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)), - ("repeat", 0, ht.TPositiveInt) - ] REQ_BGL = False def ExpandNames(self): @@ -10456,12 +10586,6 @@ class LUTestJobqueue(NoHooksLU): """Utility LU to test some aspects of the job queue. """ - _OP_PARAMS = [ - ("notify_waitlock", False, ht.TBool), - ("notify_exec", False, ht.TBool), - ("log_messages", ht.EmptyList, ht.TListOf(ht.TString)), - ("fail", False, ht.TBool), - ] REQ_BGL = False # Must be lower than default timeout for WaitForJobChange to see whether it @@ -10677,11 +10801,12 @@ class IAllocator(object): "enabled_hypervisors": list(cluster_info.enabled_hypervisors), # we don't have job IDs } + ninfo = cfg.GetAllNodesInfo() iinfo = cfg.GetAllInstancesInfo().values() i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo] # node data - node_list = cfg.GetNodeList() + node_list = [n.name for n in ninfo.values() if n.vm_capable] if self.mode == constants.IALLOCATOR_MODE_ALLOC: hypervisor_name = self.hypervisor @@ -10698,7 +10823,11 @@ class IAllocator(object): data["nodegroups"] = self._ComputeNodeGroupData(cfg) - data["nodes"] = self._ComputeNodeData(cfg, node_data, node_iinfo, i_list) + config_ndata = self._ComputeBasicNodeData(ninfo) + data["nodes"] = self._ComputeDynamicNodeData(ninfo, node_data, node_iinfo, + i_list, config_ndata) + assert len(data["nodes"]) == len(ninfo), \ + "Incomplete node data computed" data["instances"] = self._ComputeInstanceData(cluster_info, i_list) @@ -10711,18 +10840,23 @@ class IAllocator(object): """ ng = {} for guuid, gdata in cfg.GetAllNodeGroupsInfo().items(): - ng[guuid] = { "name": gdata.name } + ng[guuid] = { + "name": gdata.name, + "alloc_policy": gdata.alloc_policy, + } return ng @staticmethod - def _ComputeNodeData(cfg, node_data, node_iinfo, i_list): + def _ComputeBasicNodeData(node_cfg): """Compute global node data. + @rtype: dict + @returns: a dict of name: (node dict, node config) + """ node_results = {} - for nname, nresult in node_data.items(): - # first fill in static (config-based) values - ninfo = cfg.GetNodeInfo(nname) + for ninfo in node_cfg.values(): + # fill in static (config-based) values pnr = { "tags": list(ninfo.GetTags()), "primary_ip": ninfo.primary_ip, @@ -10735,6 +10869,24 @@ class IAllocator(object): "vm_capable": ninfo.vm_capable, } + node_results[ninfo.name] = pnr + + return node_results + + @staticmethod + def _ComputeDynamicNodeData(node_cfg, node_data, node_iinfo, i_list, + node_results): + """Compute global node data. + + @param node_results: the basic node structures as filled from the config + + """ + # make a copy of the current dict + node_results = dict(node_results) + for nname, nresult in node_data.items(): + assert nname in node_results, "Missing basic data for node %s" % nname + ninfo = node_cfg[nname] + if not (ninfo.offline or ninfo.drained): nresult.Raise("Can't get data for node %s" % nname) node_iinfo[nname].Raise("Can't get node instance info from node %s" % @@ -10776,9 +10928,9 @@ class IAllocator(object): "i_pri_memory": i_p_mem, "i_pri_up_memory": i_p_up_mem, } - pnr.update(pnr_dyn) + pnr_dyn.update(node_results[nname]) - node_results[nname] = pnr + node_results[nname] = pnr_dyn return node_results @@ -10956,25 +11108,6 @@ class LUTestAllocator(NoHooksLU): This LU runs the allocator tests """ - _OP_PARAMS = [ - ("direction", ht.NoDefault, - ht.TElemOf(constants.VALID_IALLOCATOR_DIRECTIONS)), - ("mode", ht.NoDefault, ht.TElemOf(constants.VALID_IALLOCATOR_MODES)), - ("name", ht.NoDefault, ht.TNonEmptyString), - ("nics", ht.NoDefault, ht.TOr(ht.TNone, ht.TListOf( - ht.TDictOf(ht.TElemOf(["mac", "ip", "bridge"]), - ht.TOr(ht.TNone, ht.TNonEmptyString))))), - ("disks", ht.NoDefault, ht.TOr(ht.TNone, ht.TList)), - ("hypervisor", None, ht.TMaybeString), - ("allocator", None, ht.TMaybeString), - ("tags", ht.EmptyList, ht.TListOf(ht.TNonEmptyString)), - ("mem_size", None, ht.TOr(ht.TNone, ht.TPositiveInt)), - ("vcpus", None, ht.TOr(ht.TNone, ht.TPositiveInt)), - ("os", None, ht.TMaybeString), - ("disk_template", None, ht.TMaybeString), - ("evac_nodes", None, ht.TOr(ht.TNone, ht.TListOf(ht.TNonEmptyString))), - ] - def CheckPrereq(self): """Check prerequisites. @@ -11064,3 +11197,24 @@ class LUTestAllocator(NoHooksLU): ial.Run(self.op.allocator, validate=False) result = ial.out_text return result + + +#: Query type implementations +_QUERY_IMPL = { + constants.QR_INSTANCE: _InstanceQuery, + constants.QR_NODE: _NodeQuery, + constants.QR_GROUP: _GroupQuery, + } + + +def _GetQueryImplementation(name): + """Returns the implemtnation for a query type. + + @param name: Query type, must be one of L{constants.QR_OP_QUERY} + + """ + try: + return _QUERY_IMPL[name] + except KeyError: + raise errors.OpPrereqError("Unknown query resource '%s'" % name, + errors.ECODE_INVAL)