X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/4a2c0db015df3ebb345042506371f4ab4f9849eb..3f1e065d5095b2c0cda036a130575458c8f270af:/lib/cmdlib.py?ds=sidebyside diff --git a/lib/cmdlib.py b/lib/cmdlib.py index 9db959e..98a056c 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -57,6 +57,7 @@ from ganeti import netutils from ganeti import query from ganeti import qlang from ganeti import opcodes +from ganeti import ht import ganeti.masterd.instance # pylint: disable-msg=W0611 @@ -74,7 +75,28 @@ def _SupportsOob(cfg, node): return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM] -# End types +class ResultWithJobs: + """Data container for LU results with jobs. + + Instances of this class returned from L{LogicalUnit.Exec} will be recognized + by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs + contained in the C{jobs} attribute and include the job IDs in the opcode + result. + + """ + def __init__(self, jobs, **kwargs): + """Initializes this class. + + Additional return values can be specified as keyword arguments. + + @type jobs: list of lists of L{opcode.OpCode} + @param jobs: A list of lists of opcode objects + + """ + self.jobs = jobs + self.other = kwargs + + class LogicalUnit(object): """Logical Unit base class. @@ -83,6 +105,7 @@ class LogicalUnit(object): - implement CheckPrereq (except when tasklets are used) - implement Exec (except when tasklets are used) - implement BuildHooksEnv + - implement BuildHooksNodes - redefine HPATH and HTYPE - optionally redefine their run requirements: REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively @@ -107,17 +130,16 @@ class LogicalUnit(object): self.proc = processor self.op = op self.cfg = context.cfg + self.glm = context.glm self.context = context self.rpc = rpc # Dicts used to declare locking needs to mcpu self.needed_locks = None - self.acquired_locks = {} self.share_locks = dict.fromkeys(locking.LEVELS, 0) self.add_locks = {} self.remove_locks = {} # Used to force good behavior when calling helper functions self.recalculate_locks = {} - self.__ssh = None # logging self.Log = processor.Log # pylint: disable-msg=C0103 self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103 @@ -138,16 +160,6 @@ class LogicalUnit(object): self.CheckArguments() - def __GetSSH(self): - """Returns the SshRunner object - - """ - if not self.__ssh: - self.__ssh = ssh.SshRunner(self.cfg.GetClusterName()) - return self.__ssh - - ssh = property(fget=__GetSSH) - def CheckArguments(self): """Check syntactic validity for the opcode arguments. @@ -273,21 +285,28 @@ class LogicalUnit(object): def BuildHooksEnv(self): """Build hooks environment for this LU. - This method should return a three-node tuple consisting of: a dict - containing the environment that will be used for running the - specific hook for this LU, a list of node names on which the hook - should run before the execution, and a list of node names on which - the hook should run after the execution. + @rtype: dict + @return: Dictionary containing the environment that will be used for + running the hooks for this LU. The keys of the dict must not be prefixed + with "GANETI_"--that'll be added by the hooks runner. The hooks runner + will extend the environment with additional variables. If no environment + should be defined, an empty dictionary should be returned (not C{None}). + @note: If the C{HPATH} attribute of the LU class is C{None}, this function + will not be called. - The keys of the dict must not have 'GANETI_' prefixed as this will - be handled in the hooks runner. Also note additional keys will be - added by the hooks runner. If the LU doesn't define any - environment, an empty dict (and not None) should be returned. + """ + raise NotImplementedError - No nodes should be returned as an empty list (and not None). + def BuildHooksNodes(self): + """Build list of nodes to run LU's hooks. - Note that if the HPATH for a LU class is None, this function will - not be called. + @rtype: tuple; (list, list) + @return: Tuple containing a list of node names on which the hook + should run before the execution and a list of node names on which the + hook should run after the execution. No nodes should be returned as an + empty list (and not None). + @note: If the C{HPATH} attribute of the LU class is C{None}, this function + will not be called. """ raise NotImplementedError @@ -367,7 +386,7 @@ class LogicalUnit(object): # future we might want to have different behaviors depending on the value # of self.recalculate_locks[locking.LEVEL_NODE] wanted_nodes = [] - for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]: + for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE): instance = self.context.cfg.GetInstanceInfo(instance_name) wanted_nodes.append(instance.primary_node) if not primary_only: @@ -397,7 +416,13 @@ class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223 This just raises an error. """ - assert False, "BuildHooksEnv called for NoHooksLUs" + raise AssertionError("BuildHooksEnv called for NoHooksLUs") + + def BuildHooksNodes(self): + """Empty BuildHooksNodes for NoHooksLU. + + """ + raise AssertionError("BuildHooksNodes called for NoHooksLU") class Tasklet: @@ -453,15 +478,19 @@ class _QueryBase: #: Attribute holding field definitions FIELDS = None - def __init__(self, names, fields, use_locking): + def __init__(self, filter_, fields, use_locking): """Initializes this class. """ - self.names = names self.use_locking = use_locking - self.query = query.Query(self.FIELDS, fields) + self.query = query.Query(self.FIELDS, fields, filter_=filter_, + namefield="name") self.requested_data = self.query.RequestedData() + self.names = self.query.RequestedNames() + + # Sort only if no names were requested + self.sort_by_name = not self.names self.do_locking = None self.wanted = None @@ -471,7 +500,7 @@ class _QueryBase: """ if self.do_locking: - names = lu.acquired_locks[lock_level] + names = lu.glm.list_owned(lock_level) else: names = all_names @@ -482,7 +511,7 @@ class _QueryBase: # caller specified names and we must keep the same order assert self.names - assert not self.do_locking or lu.acquired_locks[lock_level] + assert not self.do_locking or lu.glm.is_owned(lock_level) missing = set(self.wanted).difference(names) if missing: @@ -492,15 +521,6 @@ class _QueryBase: # Return expanded names return self.wanted - @classmethod - def FieldsQuery(cls, fields): - """Returns list of available fields. - - @return: List of L{objects.QueryFieldDefinition} - - """ - return query.QueryFields(cls.FIELDS, fields) - def ExpandNames(self, lu): """Expand names for this query. @@ -529,13 +549,15 @@ class _QueryBase: """Collect data and execute query. """ - return query.GetQueryResponse(self.query, self._GetQueryData(lu)) + return query.GetQueryResponse(self.query, self._GetQueryData(lu), + sort_by_name=self.sort_by_name) def OldStyleQuery(self, lu): """Collect data and execute query. """ - return self.query.OldStyleQuery(self._GetQueryData(lu)) + return self.query.OldStyleQuery(self._GetQueryData(lu), + sort_by_name=self.sort_by_name) def _GetWantedNodes(lu, nodes): @@ -609,6 +631,63 @@ def _GetUpdatedParams(old_params, update_dict, return params_copy +def _ReleaseLocks(lu, level, names=None, keep=None): + """Releases locks owned by an LU. + + @type lu: L{LogicalUnit} + @param level: Lock level + @type names: list or None + @param names: Names of locks to release + @type keep: list or None + @param keep: Names of locks to retain + + """ + assert not (keep is not None and names is not None), \ + "Only one of the 'names' and the 'keep' parameters can be given" + + if names is not None: + should_release = names.__contains__ + elif keep: + should_release = lambda name: name not in keep + else: + should_release = None + + if should_release: + retain = [] + release = [] + + # Determine which locks to release + for name in lu.glm.list_owned(level): + if should_release(name): + release.append(name) + else: + retain.append(name) + + assert len(lu.glm.list_owned(level)) == (len(retain) + len(release)) + + # Release just some locks + lu.glm.release(level, names=release) + + assert frozenset(lu.glm.list_owned(level)) == frozenset(retain) + else: + # Release everything + lu.glm.release(level) + + assert not lu.glm.is_owned(level), "No locks should be owned" + + +def _RunPostHook(lu, node_name): + """Runs the post-hook for an opcode on a single node. + + """ + hm = lu.proc.hmclass(lu.rpc.call_hooks_runner, lu) + try: + hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name]) + except: + # pylint: disable-msg=W0702 + lu.LogWarning("Errors occurred running hooks on %s" % node_name) + + def _CheckOutputFields(static, dynamic, selected): """Checks whether all selected fields are valid. @@ -1035,7 +1114,7 @@ def _GetStorageTypeArgs(cfg, storage_type): # Special case for file storage if storage_type == constants.ST_FILE: # storage.FileStorage wants a list of storage directories - return [[cfg.GetFileStorageDir()]] + return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]] return [] @@ -1075,7 +1154,7 @@ def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot): iallocator = getattr(lu.op, iallocator_slot, None) if node is not None and iallocator is not None: - raise errors.OpPrereqError("Do not specify both, iallocator and node.", + raise errors.OpPrereqError("Do not specify both, iallocator and node", errors.ECODE_INVAL) elif node is None and iallocator is None: default_iallocator = lu.cfg.GetDefaultIAllocator() @@ -1083,10 +1162,10 @@ def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot): setattr(lu.op, iallocator_slot, default_iallocator) else: raise errors.OpPrereqError("No iallocator or node given and no" - " cluster-wide default iallocator found." - " Please specify either an iallocator or a" + " cluster-wide default iallocator found;" + " please specify either an iallocator or a" " node, or set a cluster-wide default" - " iallocator.") + " iallocator") class LUClusterPostInit(LogicalUnit): @@ -1100,9 +1179,15 @@ class LUClusterPostInit(LogicalUnit): """Build hooks env. """ - env = {"OP_TARGET": self.cfg.GetClusterName()} - mn = self.cfg.GetMasterNode() - return env, [], [mn] + return { + "OP_TARGET": self.cfg.GetClusterName(), + } + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ + return ([], [self.cfg.GetMasterNode()]) def Exec(self, feedback_fn): """Nothing to do. @@ -1122,8 +1207,15 @@ class LUClusterDestroy(LogicalUnit): """Build hooks env. """ - env = {"OP_TARGET": self.cfg.GetClusterName()} - return env, [], [] + return { + "OP_TARGET": self.cfg.GetClusterName(), + } + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ + return ([], []) def CheckPrereq(self): """Check prerequisites. @@ -1153,12 +1245,7 @@ class LUClusterDestroy(LogicalUnit): master = self.cfg.GetMasterNode() # Run post hooks on master node before it's removed - hm = self.proc.hmclass(self.rpc.call_hooks_runner, self) - try: - hm.RunPhase(constants.HOOKS_PHASE_POST, [master]) - except: - # pylint: disable-msg=W0702 - self.LogWarning("Errors occurred running hooks on %s" % master) + _RunPostHook(self, master) result = self.rpc.call_node_stop_master(master, False) result.Raise("Could not disable the master role") @@ -1167,7 +1254,7 @@ class LUClusterDestroy(LogicalUnit): def _VerifyCertificate(filename): - """Verifies a certificate for LUClusterVerify. + """Verifies a certificate for LUClusterVerifyConfig. @type filename: string @param filename: Path to PEM file @@ -1177,7 +1264,7 @@ def _VerifyCertificate(filename): cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, utils.ReadFile(filename)) except Exception, err: # pylint: disable-msg=W0703 - return (LUClusterVerify.ETYPE_ERROR, + return (LUClusterVerifyConfig.ETYPE_ERROR, "Failed to load X509 certificate %s: %s" % (filename, err)) (errcode, msg) = \ @@ -1192,27 +1279,61 @@ def _VerifyCertificate(filename): if errcode is None: return (None, fnamemsg) elif errcode == utils.CERT_WARNING: - return (LUClusterVerify.ETYPE_WARNING, fnamemsg) + return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg) elif errcode == utils.CERT_ERROR: - return (LUClusterVerify.ETYPE_ERROR, fnamemsg) + return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg) raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode) -class LUClusterVerify(LogicalUnit): - """Verifies the cluster status. +def _GetAllHypervisorParameters(cluster, instances): + """Compute the set of all hypervisor parameters. + + @type cluster: L{objects.Cluster} + @param cluster: the cluster object + @param instances: list of L{objects.Instance} + @param instances: additional instances from which to obtain parameters + @rtype: list of (origin, hypervisor, parameters) + @return: a list with all parameters found, indicating the hypervisor they + apply to, and the origin (can be "cluster", "os X", or "instance Y") """ - HPATH = "cluster-verify" - HTYPE = constants.HTYPE_CLUSTER - REQ_BGL = False + hvp_data = [] + + for hv_name in cluster.enabled_hypervisors: + hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name))) + + for os_name, os_hvp in cluster.os_hvp.items(): + for hv_name, hv_params in os_hvp.items(): + if hv_params: + full_params = cluster.GetHVDefaults(hv_name, os_name=os_name) + hvp_data.append(("os %s" % os_name, hv_name, full_params)) + + # TODO: collapse identical parameter values in a single one + for instance in instances: + if instance.hvparams: + hvp_data.append(("instance %s" % instance.name, instance.hypervisor, + cluster.FillHV(instance))) + + return hvp_data + + +class _VerifyErrors(object): + """Mix-in for cluster/group verify LUs. + It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects + self.op and self._feedback_fn to be available.) + + """ TCLUSTER = "cluster" TNODE = "node" TINSTANCE = "instance" ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG") ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT") + ECLUSTERFILECHECK = (TCLUSTER, "ECLUSTERFILECHECK") + ECLUSTERDANGLINGNODES = (TNODE, "ECLUSTERDANGLINGNODES") + ECLUSTERDANGLINGINST = (TNODE, "ECLUSTERDANGLINGINST") EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE") EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN") EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT") @@ -1242,6 +1363,138 @@ class LUClusterVerify(LogicalUnit): ETYPE_ERROR = "ERROR" ETYPE_WARNING = "WARNING" + def _Error(self, ecode, item, msg, *args, **kwargs): + """Format an error message. + + Based on the opcode's error_codes parameter, either format a + parseable error code, or a simpler error string. + + This must be called only from Exec and functions called from Exec. + + """ + ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) + itype, etxt = ecode + # first complete the msg + if args: + msg = msg % args + # then format the whole message + if self.op.error_codes: # This is a mix-in. pylint: disable-msg=E1101 + msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg) + else: + if item: + item = " " + item + else: + item = "" + msg = "%s: %s%s: %s" % (ltype, itype, item, msg) + # and finally report it via the feedback_fn + self._feedback_fn(" - %s" % msg) # Mix-in. pylint: disable-msg=E1101 + + def _ErrorIf(self, cond, *args, **kwargs): + """Log an error message if the passed condition is True. + + """ + cond = (bool(cond) + or self.op.debug_simulate_errors) # pylint: disable-msg=E1101 + if cond: + self._Error(*args, **kwargs) + # do not mark the operation as failed for WARN cases only + if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR: + self.bad = self.bad or cond + + +class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors): + """Verifies the cluster config. + + """ + REQ_BGL = False + + def _VerifyHVP(self, hvp_data): + """Verifies locally the syntax of the hypervisor parameters. + + """ + for item, hv_name, hv_params in hvp_data: + msg = ("hypervisor %s parameters syntax check (source %s): %%s" % + (item, hv_name)) + try: + hv_class = hypervisor.GetHypervisor(hv_name) + utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES) + hv_class.CheckParameterSyntax(hv_params) + except errors.GenericError, err: + self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err)) + + def ExpandNames(self): + self.all_group_info = self.cfg.GetAllNodeGroupsInfo() + self.all_node_info = self.cfg.GetAllNodesInfo() + self.all_inst_info = self.cfg.GetAllInstancesInfo() + self.needed_locks = {} + + def Exec(self, feedback_fn): + """Verify integrity of cluster, performing various test on nodes. + + """ + self.bad = False + self._feedback_fn = feedback_fn + + feedback_fn("* Verifying cluster config") + + for msg in self.cfg.VerifyConfig(): + self._ErrorIf(True, self.ECLUSTERCFG, None, msg) + + feedback_fn("* Verifying cluster certificate files") + + for cert_filename in constants.ALL_CERT_FILES: + (errcode, msg) = _VerifyCertificate(cert_filename) + self._ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode) + + feedback_fn("* Verifying hypervisor parameters") + + self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(), + self.all_inst_info.values())) + + feedback_fn("* Verifying all nodes belong to an existing group") + + # We do this verification here because, should this bogus circumstance + # occur, it would never be catched by VerifyGroup, which only acts on + # nodes/instances reachable from existing node groups. + + dangling_nodes = set(node.name for node in self.all_node_info.values() + if node.group not in self.all_group_info) + + dangling_instances = {} + no_node_instances = [] + + for inst in self.all_inst_info.values(): + if inst.primary_node in dangling_nodes: + dangling_instances.setdefault(inst.primary_node, []).append(inst.name) + elif inst.primary_node not in self.all_node_info: + no_node_instances.append(inst.name) + + pretty_dangling = [ + "%s (%s)" % + (node.name, + utils.CommaJoin(dangling_instances.get(node.name, + ["no instances"]))) + for node in dangling_nodes] + + self._ErrorIf(bool(dangling_nodes), self.ECLUSTERDANGLINGNODES, None, + "the following nodes (and their instances) belong to a non" + " existing group: %s", utils.CommaJoin(pretty_dangling)) + + self._ErrorIf(bool(no_node_instances), self.ECLUSTERDANGLINGINST, None, + "the following instances have a non-existing primary-node:" + " %s", utils.CommaJoin(no_node_instances)) + + return (not self.bad, [g.name for g in self.all_group_info.values()]) + + +class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): + """Verifies the status of a node group. + + """ + HPATH = "cluster-verify" + HTYPE = constants.HTYPE_CLUSTER + REQ_BGL = False + _HOOKS_INDENT_RE = re.compile("^", re.M) class NodeImage(object): @@ -1254,8 +1507,8 @@ class LUClusterVerify(LogicalUnit): @ivar instances: a list of running instances (runtime) @ivar pinst: list of configured primary instances (config) @ivar sinst: list of configured secondary instances (config) - @ivar sbp: diction of {secondary-node: list of instances} of all peers - of this node (config) + @ivar sbp: dictionary of {primary-node: list of instances} for all + instances for which this node is secondary (config) @ivar mfree: free memory, as reported by hypervisor (runtime) @ivar dfree: free disk, as reported by the node (runtime) @ivar offline: the offline status (config) @@ -1295,48 +1548,90 @@ class LUClusterVerify(LogicalUnit): self.oslist = {} def ExpandNames(self): + # This raises errors.OpPrereqError on its own: + self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) + + all_node_info = self.cfg.GetAllNodesInfo() + all_inst_info = self.cfg.GetAllInstancesInfo() + + node_names = set(node.name + for node in all_node_info.values() + if node.group == self.group_uuid) + + inst_names = [inst.name + for inst in all_inst_info.values() + if inst.primary_node in node_names] + + # In Exec(), we warn about mirrored instances that have primary and + # secondary living in separate node groups. To fully verify that + # volumes for these instances are healthy, we will need to do an + # extra call to their secondaries. We ensure here those nodes will + # be locked. + for inst in inst_names: + if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR: + node_names.update(all_inst_info[inst].secondary_nodes) + self.needed_locks = { - locking.LEVEL_NODE: locking.ALL_SET, - locking.LEVEL_INSTANCE: locking.ALL_SET, + locking.LEVEL_NODEGROUP: [self.group_uuid], + locking.LEVEL_NODE: list(node_names), + locking.LEVEL_INSTANCE: inst_names, } + self.share_locks = dict.fromkeys(locking.LEVELS, 1) - def _Error(self, ecode, item, msg, *args, **kwargs): - """Format an error message. + def CheckPrereq(self): + self.all_node_info = self.cfg.GetAllNodesInfo() + self.all_inst_info = self.cfg.GetAllInstancesInfo() - Based on the opcode's error_codes parameter, either format a - parseable error code, or a simpler error string. + group_nodes = set(node.name + for node in self.all_node_info.values() + if node.group == self.group_uuid) - This must be called only from Exec and functions called from Exec. + group_instances = set(inst.name + for inst in self.all_inst_info.values() + if inst.primary_node in group_nodes) - """ - ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) - itype, etxt = ecode - # first complete the msg - if args: - msg = msg % args - # then format the whole message - if self.op.error_codes: - msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg) - else: - if item: - item = " " + item - else: - item = "" - msg = "%s: %s%s: %s" % (ltype, itype, item, msg) - # and finally report it via the feedback_fn - self._feedback_fn(" - %s" % msg) + unlocked_nodes = \ + group_nodes.difference(self.glm.list_owned(locking.LEVEL_NODE)) - def _ErrorIf(self, cond, *args, **kwargs): - """Log an error message if the passed condition is True. + unlocked_instances = \ + group_instances.difference(self.glm.list_owned(locking.LEVEL_INSTANCE)) - """ - cond = bool(cond) or self.op.debug_simulate_errors - if cond: - self._Error(*args, **kwargs) - # do not mark the operation as failed for WARN cases only - if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR: - self.bad = self.bad or cond + if unlocked_nodes: + raise errors.OpPrereqError("missing lock for nodes: %s" % + utils.CommaJoin(unlocked_nodes)) + + if unlocked_instances: + raise errors.OpPrereqError("missing lock for instances: %s" % + utils.CommaJoin(unlocked_instances)) + + self.my_node_names = utils.NiceSort(group_nodes) + self.my_inst_names = utils.NiceSort(group_instances) + + self.my_node_info = dict((name, self.all_node_info[name]) + for name in self.my_node_names) + + self.my_inst_info = dict((name, self.all_inst_info[name]) + for name in self.my_inst_names) + + # We detect here the nodes that will need the extra RPC calls for verifying + # split LV volumes; they should be locked. + extra_lv_nodes = set() + + for inst in self.my_inst_info.values(): + if inst.disk_template in constants.DTS_INT_MIRROR: + group = self.my_node_info[inst.primary_node].group + for nname in inst.secondary_nodes: + if self.all_node_info[nname].group != group: + extra_lv_nodes.add(nname) + + unlocked_lv_nodes = \ + extra_lv_nodes.difference(self.glm.list_owned(locking.LEVEL_NODE)) + + if unlocked_lv_nodes: + raise errors.OpPrereqError("these nodes could be locked: %s" % + utils.CommaJoin(unlocked_lv_nodes)) + self.extra_lv_nodes = list(extra_lv_nodes) def _VerifyNode(self, ninfo, nresult): """Perform some basic validation on data returned from a node. @@ -1405,7 +1700,7 @@ class LUClusterVerify(LogicalUnit): hv_name, item, hv_result) test = nresult.get(constants.NV_NODESETUP, - ["Missing NODESETUP results"]) + ["Missing NODESETUP results"]) _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s", "; ".join(test)) @@ -1444,7 +1739,7 @@ class LUClusterVerify(LogicalUnit): ntime_diff) def _VerifyNodeLVM(self, ninfo, nresult, vg_name): - """Check the node time. + """Check the node LVM results. @type ninfo: L{objects.Node} @param ninfo: the node to check @@ -1480,8 +1775,31 @@ class LUClusterVerify(LogicalUnit): _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV" " '%s' of VG '%s'", pvname, owner_vg) + def _VerifyNodeBridges(self, ninfo, nresult, bridges): + """Check the node bridges. + + @type ninfo: L{objects.Node} + @param ninfo: the node to check + @param nresult: the remote results for the node + @param bridges: the expected list of bridges + + """ + if not bridges: + return + + node = ninfo.name + _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + + missing = nresult.get(constants.NV_BRIDGES, None) + test = not isinstance(missing, list) + _ErrorIf(test, self.ENODENET, node, + "did not return valid bridge information") + if not test: + _ErrorIf(bool(missing), self.ENODENET, node, "missing bridges: %s" % + utils.CommaJoin(sorted(missing))) + def _VerifyNodeNetwork(self, ninfo, nresult): - """Check the node time. + """Check the node network connectivity results. @type ninfo: L{objects.Node} @param ninfo: the node to check @@ -1553,12 +1871,6 @@ class LUClusterVerify(LogicalUnit): "instance not running on its primary node %s", node_current) - for node, n_img in node_image.items(): - if node != node_current: - test = instance in n_img.instances - _ErrorIf(test, self.EINSTANCEWRONGNODE, instance, - "instance should not run on node %s", node) - diskdata = [(nname, success, status, idx) for (nname, disks) in diskstatus.items() for idx, (success, status) in enumerate(disks)] @@ -1598,18 +1910,6 @@ class LUClusterVerify(LogicalUnit): self._ErrorIf(test, self.ENODEORPHANLV, node, "volume %s is unknown", volume) - def _VerifyOrphanInstances(self, instancelist, node_image): - """Verify the list of running instances. - - This checks what instances are running but unknown to the cluster. - - """ - for node, n_img in node_image.items(): - for o_inst in n_img.instances: - test = o_inst not in instancelist - self._ErrorIf(test, self.ENODEORPHANINSTANCE, node, - "instance %s on node %s should not exist", o_inst, node) - def _VerifyNPlusOneMemory(self, node_image, instance_cfg): """Verify N+1 Memory Resilience. @@ -1617,6 +1917,7 @@ class LUClusterVerify(LogicalUnit): instances it was primary for. """ + cluster_info = self.cfg.GetClusterInfo() for node, n_img in node_image.items(): # This code checks that every node which is now listed as # secondary has enough memory to host all instances it is @@ -1635,7 +1936,7 @@ class LUClusterVerify(LogicalUnit): for prinode, instances in n_img.sbp.items(): needed_mem = 0 for instance in instances: - bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance]) + bep = cluster_info.FillBE(instance_cfg[instance]) if bep[constants.BE_AUTO_BALANCE]: needed_mem += bep[constants.BE_MEMORY] test = n_img.mfree < needed_mem @@ -1644,61 +1945,103 @@ class LUClusterVerify(LogicalUnit): " should node %s fail (%dMiB needed, %dMiB available)", prinode, needed_mem, n_img.mfree) - def _VerifyNodeFiles(self, ninfo, nresult, file_list, local_cksum, - master_files): - """Verifies and computes the node required file checksums. + @classmethod + def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo, + (files_all, files_all_opt, files_mc, files_vm)): + """Verifies file checksums collected from all nodes. - @type ninfo: L{objects.Node} - @param ninfo: the node to check - @param nresult: the remote results for the node - @param file_list: required list of files - @param local_cksum: dictionary of local files and their checksums - @param master_files: list of files that only masters should have + @param errorif: Callback for reporting errors + @param nodeinfo: List of L{objects.Node} objects + @param master_node: Name of master node + @param all_nvinfo: RPC results """ - node = ninfo.name - _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + node_names = frozenset(node.name for node in nodeinfo) - remote_cksum = nresult.get(constants.NV_FILELIST, None) - test = not isinstance(remote_cksum, dict) - _ErrorIf(test, self.ENODEFILECHECK, node, - "node hasn't returned file checksum data") - if test: - return + assert master_node in node_names + assert (len(files_all | files_all_opt | files_mc | files_vm) == + sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \ + "Found file listed in more than one file list" - for file_name in file_list: - node_is_mc = ninfo.master_candidate - must_have = (file_name not in master_files) or node_is_mc - # missing - test1 = file_name not in remote_cksum - # invalid checksum - test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name] - # existing and good - test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name] - _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node, - "file '%s' missing", file_name) - _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node, - "file '%s' has wrong checksum", file_name) - # not candidate and this is not a must-have file - _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node, - "file '%s' should not exist on non master" - " candidates (and the file is outdated)", file_name) - # all good, except non-master/non-must have combination - _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node, - "file '%s' should not exist" - " on non master candidates", file_name) + # Define functions determining which nodes to consider for a file + file2nodefn = dict([(filename, fn) + for (files, fn) in [(files_all, None), + (files_all_opt, None), + (files_mc, lambda node: (node.master_candidate or + node.name == master_node)), + (files_vm, lambda node: node.vm_capable)] + for filename in files]) - def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper, - drbd_map): - """Verifies and the node DRBD status. + fileinfo = dict((filename, {}) for filename in file2nodefn.keys()) - @type ninfo: L{objects.Node} - @param ninfo: the node to check - @param nresult: the remote results for the node - @param instanceinfo: the dict of instances - @param drbd_helper: the configured DRBD usermode helper - @param drbd_map: the DRBD map as returned by - L{ganeti.config.ConfigWriter.ComputeDRBDMap} + for node in nodeinfo: + nresult = all_nvinfo[node.name] + + if nresult.fail_msg or not nresult.payload: + node_files = None + else: + node_files = nresult.payload.get(constants.NV_FILELIST, None) + + test = not (node_files and isinstance(node_files, dict)) + errorif(test, cls.ENODEFILECHECK, node.name, + "Node did not return file checksum data") + if test: + continue + + for (filename, checksum) in node_files.items(): + # Check if the file should be considered for a node + fn = file2nodefn[filename] + if fn is None or fn(node): + fileinfo[filename].setdefault(checksum, set()).add(node.name) + + for (filename, checksums) in fileinfo.items(): + assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum" + + # Nodes having the file + with_file = frozenset(node_name + for nodes in fileinfo[filename].values() + for node_name in nodes) + + # Nodes missing file + missing_file = node_names - with_file + + if filename in files_all_opt: + # All or no nodes + errorif(missing_file and missing_file != node_names, + cls.ECLUSTERFILECHECK, None, + "File %s is optional, but it must exist on all or no nodes (not" + " found on %s)", + filename, utils.CommaJoin(utils.NiceSort(missing_file))) + else: + errorif(missing_file, cls.ECLUSTERFILECHECK, None, + "File %s is missing from node(s) %s", filename, + utils.CommaJoin(utils.NiceSort(missing_file))) + + # See if there are multiple versions of the file + test = len(checksums) > 1 + if test: + variants = ["variant %s on %s" % + (idx + 1, utils.CommaJoin(utils.NiceSort(nodes))) + for (idx, (checksum, nodes)) in + enumerate(sorted(checksums.items()))] + else: + variants = [] + + errorif(test, cls.ECLUSTERFILECHECK, None, + "File %s found with %s different checksums (%s)", + filename, len(checksums), "; ".join(variants)) + + def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper, + drbd_map): + """Verifies and the node DRBD status. + + @type ninfo: L{objects.Node} + @param ninfo: the node to check + @param nresult: the remote results for the node + @param instanceinfo: the dict of instances + @param drbd_helper: the configured DRBD usermode helper + @param drbd_map: the DRBD map as returned by + L{ganeti.config.ConfigWriter.ComputeDRBDMap} """ node = ninfo.name @@ -1806,6 +2149,7 @@ class LUClusterVerify(LogicalUnit): assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?" + beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l] for os_name, os_data in nimg.oslist.items(): assert os_data, "Empty OS status for OS %s?!" % os_name f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0] @@ -1833,11 +2177,12 @@ class LUClusterVerify(LogicalUnit): continue for kind, a, b in [("API version", f_api, b_api), ("variants list", f_var, b_var), - ("parameters", f_param, b_param)]: + ("parameters", beautify_params(f_param), + beautify_params(b_param))]: _ErrorIf(a != b, self.ENODEOS, node, - "OS %s %s differs from reference node %s: %s vs. %s", + "OS %s for %s differs from reference node %s: [%s] vs. [%s]", kind, os_name, base.name, - utils.CommaJoin(a), utils.CommaJoin(b)) + utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b))) # check any missing OSes missing = set(base.oslist.keys()).difference(nimg.oslist.keys()) @@ -2047,21 +2392,6 @@ class LUClusterVerify(LogicalUnit): return instdisk - def _VerifyHVP(self, hvp_data): - """Verifies locally the syntax of the hypervisor parameters. - - """ - for item, hv_name, hv_params in hvp_data: - msg = ("hypervisor %s parameters syntax check (source %s): %%s" % - (item, hv_name)) - try: - hv_class = hypervisor.GetHypervisor(hv_name) - utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES) - hv_class.CheckParameterSyntax(hv_params) - except errors.GenericError, err: - self._ErrorIf(True, self.ECLUSTERCFG, None, msg % str(err)) - - def BuildHooksEnv(self): """Build hooks env. @@ -2069,17 +2399,25 @@ class LUClusterVerify(LogicalUnit): the output be logged in the verify output and the verification to fail. """ - all_nodes = self.cfg.GetNodeList() env = { "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()) } - for node in self.cfg.GetAllNodesInfo().values(): - env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags()) - return env, [], all_nodes + env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags())) + for node in self.my_node_info.values()) + + return env + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ + assert self.my_node_names, ("Node list not gathered," + " has CheckPrereq been executed?") + return ([], self.my_node_names) def Exec(self, feedback_fn): - """Verify integrity of cluster, performing various test on nodes. + """Verify integrity of the node group, performing various test on nodes. """ # This method has too many local variables. pylint: disable-msg=R0914 @@ -2087,26 +2425,14 @@ class LUClusterVerify(LogicalUnit): _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 verbose = self.op.verbose self._feedback_fn = feedback_fn - feedback_fn("* Verifying global settings") - for msg in self.cfg.VerifyConfig(): - _ErrorIf(True, self.ECLUSTERCFG, None, msg) - - # Check the cluster certificates - for cert_filename in constants.ALL_CERT_FILES: - (errcode, msg) = _VerifyCertificate(cert_filename) - _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode) vg_name = self.cfg.GetVGName() drbd_helper = self.cfg.GetDRBDHelper() - hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors cluster = self.cfg.GetClusterInfo() - nodelist = utils.NiceSort(self.cfg.GetNodeList()) - nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist] - nodeinfo_byname = dict(zip(nodelist, nodeinfo)) - instancelist = utils.NiceSort(self.cfg.GetInstanceList()) - instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname)) - for iname in instancelist) groupinfo = self.cfg.GetAllNodeGroupsInfo() + hypervisors = cluster.enabled_hypervisors + node_data_list = [self.my_node_info[name] for name in self.my_node_names] + i_non_redundant = [] # Non redundant instances i_non_a_balanced = [] # Non auto-balanced instances n_offline = 0 # Count of offline nodes @@ -2114,47 +2440,40 @@ class LUClusterVerify(LogicalUnit): node_vol_should = {} # FIXME: verify OS list + + # File verification + filemap = _ComputeAncillaryFiles(cluster, False) + # do local checksums - master_files = [constants.CLUSTER_CONF_FILE] master_node = self.master_node = self.cfg.GetMasterNode() master_ip = self.cfg.GetMasterIP() - file_names = ssconf.SimpleStore().GetFileList() - file_names.extend(constants.ALL_CERT_FILES) - file_names.extend(master_files) - if cluster.modify_etc_hosts: - file_names.append(constants.ETC_HOSTS) - - local_checksums = utils.FingerprintFiles(file_names) - - # Compute the set of hypervisor parameters - hvp_data = [] - for hv_name in hypervisors: - hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name))) - for os_name, os_hvp in cluster.os_hvp.items(): - for hv_name, hv_params in os_hvp.items(): - if not hv_params: - continue - full_params = cluster.GetHVDefaults(hv_name, os_name=os_name) - hvp_data.append(("os %s" % os_name, hv_name, full_params)) - # TODO: collapse identical parameter values in a single one - for instance in instanceinfo.values(): - if not instance.hvparams: - continue - hvp_data.append(("instance %s" % instance.name, instance.hypervisor, - cluster.FillHV(instance))) - # and verify them locally - self._VerifyHVP(hvp_data) + feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names)) + + # We will make nodes contact all nodes in their group, and one node from + # every other group. + # TODO: should it be a *random* node, different every time? + online_nodes = [node.name for node in node_data_list if not node.offline] + other_group_nodes = {} + + for name in sorted(self.all_node_info): + node = self.all_node_info[name] + if (node.group not in other_group_nodes + and node.group != self.group_uuid + and not node.offline): + other_group_nodes[node.group] = node.name - feedback_fn("* Gathering data (%d nodes)" % len(nodelist)) node_verify_param = { - constants.NV_FILELIST: file_names, - constants.NV_NODELIST: [node.name for node in nodeinfo - if not node.offline], + constants.NV_FILELIST: + utils.UniqueSequence(filename + for files in filemap + for filename in files), + constants.NV_NODELIST: online_nodes + other_group_nodes.values(), constants.NV_HYPERVISOR: hypervisors, - constants.NV_HVPARAMS: hvp_data, - constants.NV_NODENETTEST: [(node.name, node.primary_ip, - node.secondary_ip) for node in nodeinfo + constants.NV_HVPARAMS: + _GetAllHypervisorParameters(cluster, self.all_inst_info.values()), + constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip) + for node in node_data_list if not node.offline], constants.NV_INSTANCELIST: hypervisors, constants.NV_VERSION: None, @@ -2175,15 +2494,30 @@ class LUClusterVerify(LogicalUnit): if drbd_helper: node_verify_param[constants.NV_DRBDHELPER] = drbd_helper + # bridge checks + # FIXME: this needs to be changed per node-group, not cluster-wide + bridges = set() + default_nicpp = cluster.nicparams[constants.PP_DEFAULT] + if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED: + bridges.add(default_nicpp[constants.NIC_LINK]) + for instance in self.my_inst_info.values(): + for nic in instance.nics: + full_nic = cluster.SimpleFillNIC(nic.nicparams) + if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED: + bridges.add(full_nic[constants.NIC_LINK]) + + if bridges: + node_verify_param[constants.NV_BRIDGES] = list(bridges) + # Build our expected cluster state node_image = dict((node.name, self.NodeImage(offline=node.offline, name=node.name, vm_capable=node.vm_capable)) - for node in nodeinfo) + for node in node_data_list) # Gather OOB paths oob_paths = [] - for node in nodeinfo: + for node in self.all_node_info.values(): path = _SupportsOob(self.cfg, node) if path and path not in oob_paths: oob_paths.append(path) @@ -2191,14 +2525,13 @@ class LUClusterVerify(LogicalUnit): if oob_paths: node_verify_param[constants.NV_OOB_PATHS] = oob_paths - for instance in instancelist: - inst_config = instanceinfo[instance] + for instance in self.my_inst_names: + inst_config = self.my_inst_info[instance] for nname in inst_config.all_nodes: if nname not in node_image: - # ghost node gnode = self.NodeImage(name=nname) - gnode.ghost = True + gnode.ghost = (nname not in self.all_node_info) node_image[nname] = gnode inst_config.MapLVsByNode(node_vol_should) @@ -2221,20 +2554,59 @@ class LUClusterVerify(LogicalUnit): # time before and after executing the request, we can at least have a time # window. nvinfo_starttime = time.time() - all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param, + all_nvinfo = self.rpc.call_node_verify(self.my_node_names, + node_verify_param, self.cfg.GetClusterName()) + if self.extra_lv_nodes and vg_name is not None: + extra_lv_nvinfo = \ + self.rpc.call_node_verify(self.extra_lv_nodes, + {constants.NV_LVLIST: vg_name}, + self.cfg.GetClusterName()) + else: + extra_lv_nvinfo = {} nvinfo_endtime = time.time() all_drbd_map = self.cfg.ComputeDRBDMap() - feedback_fn("* Gathering disk information (%s nodes)" % len(nodelist)) - instdisk = self._CollectDiskInfo(nodelist, node_image, instanceinfo) + feedback_fn("* Gathering disk information (%s nodes)" % + len(self.my_node_names)) + instdisk = self._CollectDiskInfo(self.my_node_names, node_image, + self.my_inst_info) + + feedback_fn("* Verifying configuration file consistency") + + # If not all nodes are being checked, we need to make sure the master node + # and a non-checked vm_capable node are in the list. + absent_nodes = set(self.all_node_info).difference(self.my_node_info) + if absent_nodes: + vf_nvinfo = all_nvinfo.copy() + vf_node_info = list(self.my_node_info.values()) + additional_nodes = [] + if master_node not in self.my_node_info: + additional_nodes.append(master_node) + vf_node_info.append(self.all_node_info[master_node]) + # Add the first vm_capable node we find which is not included + for node in absent_nodes: + nodeinfo = self.all_node_info[node] + if nodeinfo.vm_capable and not nodeinfo.offline: + additional_nodes.append(node) + vf_node_info.append(self.all_node_info[node]) + break + key = constants.NV_FILELIST + vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes, + {key: node_verify_param[key]}, + self.cfg.GetClusterName())) + else: + vf_nvinfo = all_nvinfo + vf_node_info = self.my_node_info.values() + + self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap) feedback_fn("* Verifying node status") refos_img = None - for node_i in nodeinfo: + for node_i in node_data_list: node = node_i.name nimg = node_image[node] @@ -2267,30 +2639,45 @@ class LUClusterVerify(LogicalUnit): nimg.call_ok = self._VerifyNode(node_i, nresult) self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime) self._VerifyNodeNetwork(node_i, nresult) - self._VerifyNodeFiles(node_i, nresult, file_names, local_checksums, - master_files) - self._VerifyOob(node_i, nresult) if nimg.vm_capable: self._VerifyNodeLVM(node_i, nresult, vg_name) - self._VerifyNodeDrbd(node_i, nresult, instanceinfo, drbd_helper, + self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper, all_drbd_map) self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name) self._UpdateNodeInstances(node_i, nresult, nimg) self._UpdateNodeInfo(node_i, nresult, nimg, vg_name) self._UpdateNodeOS(node_i, nresult, nimg) + if not nimg.os_fail: if refos_img is None: refos_img = nimg self._VerifyNodeOS(node_i, nimg, refos_img) + self._VerifyNodeBridges(node_i, nresult, bridges) + + # Check whether all running instancies are primary for the node. (This + # can no longer be done from _VerifyInstance below, since some of the + # wrong instances could be from other node groups.) + non_primary_inst = set(nimg.instances).difference(nimg.pinst) + + for inst in non_primary_inst: + test = inst in self.all_inst_info + _ErrorIf(test, self.EINSTANCEWRONGNODE, inst, + "instance should not run on node %s", node_i.name) + _ErrorIf(not test, self.ENODEORPHANINSTANCE, node_i.name, + "node is running unknown instance %s", inst) + + for node, result in extra_lv_nvinfo.items(): + self._UpdateNodeVolumes(self.all_node_info[node], result.payload, + node_image[node], vg_name) feedback_fn("* Verifying instance status") - for instance in instancelist: + for instance in self.my_inst_names: if verbose: feedback_fn("* Verifying instance %s" % instance) - inst_config = instanceinfo[instance] + inst_config = self.my_inst_info[instance] self._VerifyInstance(instance, inst_config, node_image, instdisk[instance]) inst_nodes_offline = [] @@ -2301,8 +2688,10 @@ class LUClusterVerify(LogicalUnit): self.ENODERPC, pnode, "instance %s, connection to" " primary node failed", instance) - _ErrorIf(pnode_img.offline, self.EINSTANCEBADNODE, instance, - "instance lives on offline node %s", inst_config.primary_node) + _ErrorIf(inst_config.admin_up and pnode_img.offline, + self.EINSTANCEBADNODE, instance, + "instance is marked as running and lives on offline node %s", + inst_config.primary_node) # If the instance is non-redundant we cannot survive losing its primary # node, so we are not N+1 compliant. On the other hand we have no disk @@ -2317,13 +2706,13 @@ class LUClusterVerify(LogicalUnit): utils.CommaJoin(inst_config.secondary_nodes), code=self.ETYPE_WARNING) - if inst_config.disk_template in constants.DTS_NET_MIRROR: + if inst_config.disk_template in constants.DTS_INT_MIRROR: pnode = inst_config.primary_node instance_nodes = utils.NiceSort(inst_config.all_nodes) instance_groups = {} for node in instance_nodes: - instance_groups.setdefault(nodeinfo_byname[node].group, + instance_groups.setdefault(self.all_node_info[node].group, []).append(node) pretty_list = [ @@ -2362,14 +2751,22 @@ class LUClusterVerify(LogicalUnit): feedback_fn("* Verifying orphan volumes") reserved = utils.FieldSet(*cluster.reserved_lvs) - self._VerifyOrphanVolumes(node_vol_should, node_image, reserved) - feedback_fn("* Verifying orphan instances") - self._VerifyOrphanInstances(instancelist, node_image) + # We will get spurious "unknown volume" warnings if any node of this group + # is secondary for an instance whose primary is in another group. To avoid + # them, we find these instances and add their volumes to node_vol_should. + for inst in self.all_inst_info.values(): + for secondary in inst.secondary_nodes: + if (secondary in self.my_node_info + and inst.name not in self.my_inst_info): + inst.MapLVsByNode(node_vol_should) + break + + self._VerifyOrphanVolumes(node_vol_should, node_image, reserved) if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks: feedback_fn("* Verifying N+1 Memory redundancy") - self._VerifyNPlusOneMemory(node_image, instanceinfo) + self._VerifyNPlusOneMemory(node_image, self.my_inst_info) feedback_fn("* Other Notes") if i_non_redundant: @@ -2510,10 +2907,7 @@ class LUClusterRepairDiskSizes(NoHooksLU): def ExpandNames(self): if self.op.instances: - self.wanted_names = [] - for name in self.op.instances: - full_name = _ExpandInstanceName(self.cfg, name) - self.wanted_names.append(full_name) + self.wanted_names = _GetWantedInstances(self, self.op.instances) self.needed_locks = { locking.LEVEL_NODE: [], locking.LEVEL_INSTANCE: self.wanted_names, @@ -2525,7 +2919,7 @@ class LUClusterRepairDiskSizes(NoHooksLU): locking.LEVEL_NODE: locking.ALL_SET, locking.LEVEL_INSTANCE: locking.ALL_SET, } - self.share_locks = dict(((i, 1) for i in locking.LEVELS)) + self.share_locks = dict.fromkeys(locking.LEVELS, 1) def DeclareLocks(self, level): if level == locking.LEVEL_NODE and self.wanted_names is not None: @@ -2538,7 +2932,7 @@ class LUClusterRepairDiskSizes(NoHooksLU): """ if self.wanted_names is None: - self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE] + self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE) self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name in self.wanted_names] @@ -2630,13 +3024,16 @@ class LUClusterRename(LogicalUnit): """Build hooks env. """ - env = { + return { "OP_TARGET": self.cfg.GetClusterName(), "NEW_NAME": self.op.name, } - mn = self.cfg.GetMasterNode() - all_nodes = self.cfg.GetNodeList() - return env, [mn], all_nodes + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ + return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList()) def CheckPrereq(self): """Verify that the passed name is a valid one. @@ -2730,12 +3127,17 @@ class LUClusterSetParams(LogicalUnit): """Build hooks env. """ - env = { + return { "OP_TARGET": self.cfg.GetClusterName(), "NEW_VG_NAME": self.op.vg_name, } + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ mn = self.cfg.GetMasterNode() - return env, [mn], [mn] + return ([mn], [mn]) def CheckPrereq(self): """Check prerequisites. @@ -2755,7 +3157,7 @@ class LUClusterSetParams(LogicalUnit): " drbd-based instances exist", errors.ECODE_INVAL) - node_list = self.acquired_locks[locking.LEVEL_NODE] + node_list = self.glm.list_owned(locking.LEVEL_NODE) # if vg_name not None, checks given volume group on all nodes if self.op.vg_name: @@ -2830,8 +3232,8 @@ class LUClusterSetParams(LogicalUnit): # if we're moving instances to routed, check that they have an ip target_mode = params_filled[constants.NIC_MODE] if target_mode == constants.NIC_MODE_ROUTED and not nic.ip: - nic_errors.append("Instance %s, nic/%d: routed nick with no ip" % - (instance.name, nic_idx)) + nic_errors.append("Instance %s, nic/%d: routed NIC with no ip" + " address" % (instance.name, nic_idx)) if nic_errors: raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" % "\n".join(nic_errors)) @@ -3049,6 +3451,50 @@ def _UploadHelper(lu, nodes, fname): lu.proc.LogWarning(msg) +def _ComputeAncillaryFiles(cluster, redist): + """Compute files external to Ganeti which need to be consistent. + + @type redist: boolean + @param redist: Whether to include files which need to be redistributed + + """ + # Compute files for all nodes + files_all = set([ + constants.SSH_KNOWN_HOSTS_FILE, + constants.CONFD_HMAC_KEY, + constants.CLUSTER_DOMAIN_SECRET_FILE, + ]) + + if not redist: + files_all.update(constants.ALL_CERT_FILES) + files_all.update(ssconf.SimpleStore().GetFileList()) + + if cluster.modify_etc_hosts: + files_all.add(constants.ETC_HOSTS) + + # Files which must either exist on all nodes or on none + files_all_opt = set([ + constants.RAPI_USERS_FILE, + ]) + + # Files which should only be on master candidates + files_mc = set() + if not redist: + files_mc.add(constants.CLUSTER_CONF_FILE) + + # Files which should only be on VM-capable nodes + files_vm = set(filename + for hv_name in cluster.enabled_hypervisors + for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()) + + # Filenames must be unique + assert (len(files_all | files_all_opt | files_mc | files_vm) == + sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \ + "Found file listed in more than one file list" + + return (files_all, files_all_opt, files_mc, files_vm) + + def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True): """Distribute additional files which are part of the cluster configuration. @@ -3062,40 +3508,42 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True): @param additional_vm: whether the additional nodes are vm-capable or not """ - # 1. Gather target nodes - myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode()) - dist_nodes = lu.cfg.GetOnlineNodeList() - nvm_nodes = lu.cfg.GetNonVmCapableNodeList() - vm_nodes = [name for name in dist_nodes if name not in nvm_nodes] + # Gather target nodes + cluster = lu.cfg.GetClusterInfo() + master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode()) + + online_nodes = lu.cfg.GetOnlineNodeList() + vm_nodes = lu.cfg.GetVmCapableNodeList() + if additional_nodes is not None: - dist_nodes.extend(additional_nodes) + online_nodes.extend(additional_nodes) if additional_vm: vm_nodes.extend(additional_nodes) - if myself.name in dist_nodes: - dist_nodes.remove(myself.name) - if myself.name in vm_nodes: - vm_nodes.remove(myself.name) - - # 2. Gather files to distribute - dist_files = set([constants.ETC_HOSTS, - constants.SSH_KNOWN_HOSTS_FILE, - constants.RAPI_CERT_FILE, - constants.RAPI_USERS_FILE, - constants.CONFD_HMAC_KEY, - constants.CLUSTER_DOMAIN_SECRET_FILE, - ]) - - vm_files = set() - enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors - for hv_name in enabled_hypervisors: - hv_class = hypervisor.GetHypervisor(hv_name) - vm_files.update(hv_class.GetAncillaryFiles()) - - # 3. Perform the files upload - for fname in dist_files: - _UploadHelper(lu, dist_nodes, fname) - for fname in vm_files: - _UploadHelper(lu, vm_nodes, fname) + + # Never distribute to master node + for nodelist in [online_nodes, vm_nodes]: + if master_info.name in nodelist: + nodelist.remove(master_info.name) + + # Gather file lists + (files_all, files_all_opt, files_mc, files_vm) = \ + _ComputeAncillaryFiles(cluster, True) + + # Never re-distribute configuration file from here + assert not (constants.CLUSTER_CONF_FILE in files_all or + constants.CLUSTER_CONF_FILE in files_vm) + assert not files_mc, "Master candidates not handled in this function" + + filemap = [ + (online_nodes, files_all), + (online_nodes, files_all_opt), + (vm_nodes, files_vm), + ] + + # Upload the files + for (node_list, files) in filemap: + for fname in files: + _UploadHelper(lu, node_list, fname) class LUClusterRedistConf(NoHooksLU): @@ -3234,6 +3682,21 @@ class LUOobCommand(NoHooksLU): """ REG_BGL = False + _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE) + + def ExpandNames(self): + """Gather locks we need. + + """ + if self.op.node_names: + self.op.node_names = _GetWantedNodes(self, self.op.node_names) + lock_names = self.op.node_names + else: + lock_names = locking.ALL_SET + + self.needed_locks = { + locking.LEVEL_NODE: lock_names, + } def CheckPrereq(self): """Check prerequisites. @@ -3246,6 +3709,36 @@ class LUOobCommand(NoHooksLU): """ self.nodes = [] + self.master_node = self.cfg.GetMasterNode() + + assert self.op.power_delay >= 0.0 + + if self.op.node_names: + if (self.op.command in self._SKIP_MASTER and + self.master_node in self.op.node_names): + master_node_obj = self.cfg.GetNodeInfo(self.master_node) + master_oob_handler = _SupportsOob(self.cfg, master_node_obj) + + if master_oob_handler: + additional_text = ("run '%s %s %s' if you want to operate on the" + " master regardless") % (master_oob_handler, + self.op.command, + self.master_node) + else: + additional_text = "it does not support out-of-band operations" + + raise errors.OpPrereqError(("Operating on the master node %s is not" + " allowed for %s; %s") % + (self.master_node, self.op.command, + additional_text), errors.ECODE_INVAL) + else: + self.op.node_names = self.cfg.GetNodeList() + if self.op.command in self._SKIP_MASTER: + self.op.node_names.remove(self.master_node) + + if self.op.command in self._SKIP_MASTER: + assert self.master_node not in self.op.node_names + for node_name in self.op.node_names: node = self.cfg.GetNodeInfo(node_name) @@ -3255,33 +3748,21 @@ class LUOobCommand(NoHooksLU): else: self.nodes.append(node) - if (self.op.command == constants.OOB_POWER_OFF and not node.offline): + if (not self.op.ignore_status and + (self.op.command == constants.OOB_POWER_OFF and not node.offline)): raise errors.OpPrereqError(("Cannot power off node %s because it is" " not marked offline") % node_name, errors.ECODE_STATE) - def ExpandNames(self): - """Gather locks we need. - - """ - if self.op.node_names: - self.op.node_names = [_ExpandNodeName(self.cfg, name) - for name in self.op.node_names] - else: - self.op.node_names = self.cfg.GetNodeList() - - self.needed_locks = { - locking.LEVEL_NODE: self.op.node_names, - } - def Exec(self, feedback_fn): """Execute OOB and return result if we expect any. """ - master_node = self.cfg.GetMasterNode() + master_node = self.master_node ret = [] - for node in self.nodes: + for idx, node in enumerate(utils.NiceSort(self.nodes, + key=lambda node: node.name)): node_entry = [(constants.RS_NORMAL, node.name)] ret.append(node_entry) @@ -3298,14 +3779,14 @@ class LUOobCommand(NoHooksLU): self.op.timeout) if result.fail_msg: - self.LogWarning("On node '%s' out-of-band RPC failed with: %s", + self.LogWarning("Out-of-band RPC failed on node '%s': %s", node.name, result.fail_msg) node_entry.append((constants.RS_NODATA, None)) else: try: self._CheckPayload(result) except errors.OpExecError, err: - self.LogWarning("The payload returned by '%s' is not valid: %s", + self.LogWarning("Payload returned by node '%s' is not valid: %s", node.name, err) node_entry.append((constants.RS_NODATA, None)) else: @@ -3314,8 +3795,8 @@ class LUOobCommand(NoHooksLU): for item, status in result.payload: if status in [constants.OOB_STATUS_WARNING, constants.OOB_STATUS_CRITICAL]: - self.LogWarning("On node '%s' item '%s' has status '%s'", - node.name, item, status) + self.LogWarning("Item '%s' on node '%s' has status '%s'", + item, node.name, status) if self.op.command == constants.OOB_POWER_ON: node.powered = True @@ -3335,6 +3816,10 @@ class LUOobCommand(NoHooksLU): node_entry.append((constants.RS_NORMAL, result.payload)) + if (self.op.command == constants.OOB_POWER_ON and + idx < len(self.nodes) - 1): + time.sleep(self.op.power_delay) + return ret def _CheckPayload(self, result): @@ -3373,37 +3858,28 @@ class LUOobCommand(NoHooksLU): raise errors.OpExecError("Check of out-of-band payload failed due to %s" % utils.CommaJoin(errs)) +class _OsQuery(_QueryBase): + FIELDS = query.OS_FIELDS - -class LUOsDiagnose(NoHooksLU): - """Logical unit for OS diagnose/query. - - """ - REQ_BGL = False - _HID = "hidden" - _BLK = "blacklisted" - _VLD = "valid" - _FIELDS_STATIC = utils.FieldSet() - _FIELDS_DYNAMIC = utils.FieldSet("name", _VLD, "node_status", "variants", - "parameters", "api_versions", _HID, _BLK) - - def CheckArguments(self): - if self.op.names: - raise errors.OpPrereqError("Selective OS query not supported", - errors.ECODE_INVAL) - - _CheckOutputFields(static=self._FIELDS_STATIC, - dynamic=self._FIELDS_DYNAMIC, - selected=self.op.output_fields) - - def ExpandNames(self): - # Lock all nodes, in shared mode + def ExpandNames(self, lu): + # Lock all nodes in shared mode # Temporary removal of locks, should be reverted later # TODO: reintroduce locks when they are lighter-weight - self.needed_locks = {} + lu.needed_locks = {} #self.share_locks[locking.LEVEL_NODE] = 1 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + # The following variables interact with _QueryBase._GetNames + if self.names: + self.wanted = self.names + else: + self.wanted = locking.ALL_SET + + self.do_locking = self.use_locking + + def DeclareLocks(self, lu, level): + pass + @staticmethod def _DiagnoseByOS(rlist): """Remaps a per-node return list into an a per-os per-node dictionary @@ -3444,71 +3920,103 @@ class LUOsDiagnose(NoHooksLU): variants, params, api_versions)) return all_os - def Exec(self, feedback_fn): - """Compute the list of OSes. + def _GetQueryData(self, lu): + """Computes the list of nodes and their attributes. """ + # Locking is not used + assert not (compat.any(lu.glm.is_owned(level) + for level in locking.LEVELS + if level != locking.LEVEL_CLUSTER) or + self.do_locking or self.use_locking) + valid_nodes = [node.name - for node in self.cfg.GetAllNodesInfo().values() + for node in lu.cfg.GetAllNodesInfo().values() if not node.offline and node.vm_capable] - node_data = self.rpc.call_os_diagnose(valid_nodes) - pol = self._DiagnoseByOS(node_data) - output = [] - cluster = self.cfg.GetClusterInfo() + pol = self._DiagnoseByOS(lu.rpc.call_os_diagnose(valid_nodes)) + cluster = lu.cfg.GetClusterInfo() + + data = {} + + for (os_name, os_data) in pol.items(): + info = query.OsInfo(name=os_name, valid=True, node_status=os_data, + hidden=(os_name in cluster.hidden_os), + blacklisted=(os_name in cluster.blacklisted_os)) + + variants = set() + parameters = set() + api_versions = set() - for os_name in utils.NiceSort(pol.keys()): - os_data = pol[os_name] - row = [] - valid = True - (variants, params, api_versions) = null_state = (set(), set(), set()) for idx, osl in enumerate(os_data.values()): - valid = bool(valid and osl and osl[0][1]) - if not valid: - (variants, params, api_versions) = null_state + info.valid = bool(info.valid and osl and osl[0][1]) + if not info.valid: break - node_variants, node_params, node_api = osl[0][3:6] - if idx == 0: # first entry - variants = set(node_variants) - params = set(node_params) - api_versions = set(node_api) - else: # keep consistency + + (node_variants, node_params, node_api) = osl[0][3:6] + if idx == 0: + # First entry + variants.update(node_variants) + parameters.update(node_params) + api_versions.update(node_api) + else: + # Filter out inconsistent values variants.intersection_update(node_variants) - params.intersection_update(node_params) + parameters.intersection_update(node_params) api_versions.intersection_update(node_api) - is_hid = os_name in cluster.hidden_os - is_blk = os_name in cluster.blacklisted_os - if ((self._HID not in self.op.output_fields and is_hid) or - (self._BLK not in self.op.output_fields and is_blk) or - (self._VLD not in self.op.output_fields and not valid)): - continue + info.variants = list(variants) + info.parameters = list(parameters) + info.api_versions = list(api_versions) - for field in self.op.output_fields: - if field == "name": - val = os_name - elif field == self._VLD: - val = valid - elif field == "node_status": - # this is just a copy of the dict - val = {} - for node_name, nos_list in os_data.items(): - val[node_name] = nos_list - elif field == "variants": - val = utils.NiceSort(list(variants)) - elif field == "parameters": - val = list(params) - elif field == "api_versions": - val = list(api_versions) - elif field == self._HID: - val = is_hid - elif field == self._BLK: - val = is_blk - else: - raise errors.ParameterError(field) - row.append(val) - output.append(row) + data[os_name] = info - return output + # Prepare data in requested order + return [data[name] for name in self._GetNames(lu, pol.keys(), None) + if name in data] + + +class LUOsDiagnose(NoHooksLU): + """Logical unit for OS diagnose/query. + + """ + REQ_BGL = False + + @staticmethod + def _BuildFilter(fields, names): + """Builds a filter for querying OSes. + + """ + name_filter = qlang.MakeSimpleFilter("name", names) + + # Legacy behaviour: Hide hidden, blacklisted or invalid OSes if the + # respective field is not requested + status_filter = [[qlang.OP_NOT, [qlang.OP_TRUE, fname]] + for fname in ["hidden", "blacklisted"] + if fname not in fields] + if "valid" not in fields: + status_filter.append([qlang.OP_TRUE, "valid"]) + + if status_filter: + status_filter.insert(0, qlang.OP_AND) + else: + status_filter = None + + if name_filter and status_filter: + return [qlang.OP_AND, name_filter, status_filter] + elif name_filter: + return name_filter + else: + return status_filter + + def CheckArguments(self): + self.oq = _OsQuery(self._BuildFilter(self.op.output_fields, self.op.names), + self.op.output_fields, False) + + def ExpandNames(self): + self.oq.ExpandNames(self) + + def Exec(self, feedback_fn): + return self.oq.OldStyleQuery(self) class LUNodeRemove(LogicalUnit): @@ -3525,17 +4033,22 @@ class LUNodeRemove(LogicalUnit): node would then be impossible to remove. """ - env = { + return { "OP_TARGET": self.op.node_name, "NODE_NAME": self.op.node_name, } + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ all_nodes = self.cfg.GetNodeList() try: all_nodes.remove(self.op.node_name) except ValueError: - logging.warning("Node %s which is about to be removed not found" - " in the all nodes list", self.op.node_name) - return env, all_nodes, all_nodes + logging.warning("Node '%s', which is about to be removed, was not found" + " in the list of all nodes", self.op.node_name) + return (all_nodes, all_nodes) def CheckPrereq(self): """Check prerequisites. @@ -3556,15 +4069,14 @@ class LUNodeRemove(LogicalUnit): masternode = self.cfg.GetMasterNode() if node.name == masternode: - raise errors.OpPrereqError("Node is the master node," - " you need to failover first.", - errors.ECODE_INVAL) + raise errors.OpPrereqError("Node is the master node, failover to another" + " node is required", errors.ECODE_INVAL) for instance_name in instance_list: instance = self.cfg.GetInstanceInfo(instance_name) if node.name in instance.all_nodes: raise errors.OpPrereqError("Instance %s is still running on the node," - " please remove first." % instance_name, + " please remove first" % instance_name, errors.ECODE_INVAL) self.op.node_name = node.name self.node = node @@ -3584,12 +4096,7 @@ class LUNodeRemove(LogicalUnit): self.context.RemoveNode(node.name) # Run post hooks on the node before it's removed - hm = self.proc.hmclass(self.rpc.call_hooks_runner, self) - try: - hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name]) - except: - # pylint: disable-msg=W0702 - self.LogWarning("Errors occurred running hooks on %s" % node.name) + _RunPostHook(self, node.name) result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup) msg = result.fail_msg @@ -3691,8 +4198,8 @@ class LUNodeQuery(NoHooksLU): REQ_BGL = False def CheckArguments(self): - self.nq = _NodeQuery(self.op.names, self.op.output_fields, - self.op.use_locking) + self.nq = _NodeQuery(qlang.MakeSimpleFilter("name", self.op.names), + self.op.output_fields, self.op.use_locking) def ExpandNames(self): self.nq.ExpandNames(self) @@ -3727,7 +4234,7 @@ class LUNodeQueryvols(NoHooksLU): """Computes the list of nodes and their attributes. """ - nodenames = self.acquired_locks[locking.LEVEL_NODE] + nodenames = self.glm.list_owned(locking.LEVEL_NODE) volumes = self.rpc.call_node_volumes(nodenames) ilist = [self.cfg.GetInstanceInfo(iname) for iname @@ -3805,7 +4312,7 @@ class LUNodeQueryStorage(NoHooksLU): """Computes the list of nodes and their attributes. """ - self.nodes = self.acquired_locks[locking.LEVEL_NODE] + self.nodes = self.glm.list_owned(locking.LEVEL_NODE) # Always get name to sort by if constants.SF_NAME in self.op.output_fields: @@ -3934,7 +4441,7 @@ class _InstanceQuery(_QueryBase): if query.IQ_DISKUSAGE in self.requested_data: disk_usage = dict((inst.name, _ComputeDiskSize(inst.disk_template, - [{"size": disk.size} + [{constants.IDISK_SIZE: disk.size} for disk in inst.disks])) for inst in instance_list) else: @@ -3966,9 +4473,8 @@ class LUQuery(NoHooksLU): def CheckArguments(self): qcls = _GetQueryImplementation(self.op.what) - names = qlang.ReadSimpleFilter("name", self.op.filter) - self.impl = qcls(names, self.op.fields, False) + self.impl = qcls(self.op.filter, self.op.fields, False) def ExpandNames(self): self.impl.ExpandNames(self) @@ -3994,7 +4500,7 @@ class LUQueryFields(NoHooksLU): self.needed_locks = {} def Exec(self, feedback_fn): - return self.qcls.FieldsQuery(self.op.fields) + return query.QueryFields(self.qcls.FIELDS, self.op.fields) class LUNodeModifyStorage(NoHooksLU): @@ -4053,6 +4559,11 @@ class LUNodeAdd(LogicalUnit): self.hostname = netutils.GetHostname(name=self.op.node_name, family=self.primary_ip_family) self.op.node_name = self.hostname.name + + if self.op.readd and self.op.node_name == self.cfg.GetMasterNode(): + raise errors.OpPrereqError("Cannot readd the master node", + errors.ECODE_STATE) + if self.op.readd and self.op.group: raise errors.OpPrereqError("Cannot pass a node group when a node is" " being readded", errors.ECODE_INVAL) @@ -4063,7 +4574,7 @@ class LUNodeAdd(LogicalUnit): This will run on all nodes before, and on all nodes + the new node after. """ - env = { + return { "OP_TARGET": self.op.node_name, "NODE_NAME": self.op.node_name, "NODE_PIP": self.op.primary_ip, @@ -4071,9 +4582,16 @@ class LUNodeAdd(LogicalUnit): "MASTER_CAPABLE": str(self.op.master_capable), "VM_CAPABLE": str(self.op.vm_capable), } - nodes_0 = self.cfg.GetNodeList() - nodes_1 = nodes_0 + [self.op.node_name, ] - return env, nodes_0, nodes_1 + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ + # Exclude added node + pre_nodes = list(set(self.cfg.GetNodeList()) - set([self.op.node_name])) + post_nodes = pre_nodes + [self.op.node_name, ] + + return (pre_nodes, post_nodes) def CheckPrereq(self): """Check prerequisites. @@ -4281,7 +4799,7 @@ class LUNodeAdd(LogicalUnit): feedback_fn("ssh/hostname verification failed" " (checking from %s): %s" % (verifier, nl_payload[failed])) - raise errors.OpExecError("ssh/hostname verification failed.") + raise errors.OpExecError("ssh/hostname verification failed") if self.op.readd: _RedistributeAncillaryFiles(self) @@ -4364,21 +4882,22 @@ class LUNodeSetParams(LogicalUnit): # If we have locked all instances, before waiting to lock nodes, release # all the ones living on nodes unrelated to the current operation. if level == locking.LEVEL_NODE and self.lock_instances: - instances_release = [] - instances_keep = [] self.affected_instances = [] if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET: - for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]: + instances_keep = [] + + # Build list of instances to release + for instance_name in self.glm.list_owned(locking.LEVEL_INSTANCE): instance = self.context.cfg.GetInstanceInfo(instance_name) - i_mirrored = instance.disk_template in constants.DTS_NET_MIRROR - if i_mirrored and self.op.node_name in instance.all_nodes: + if (instance.disk_template in constants.DTS_INT_MIRROR and + self.op.node_name in instance.all_nodes): instances_keep.append(instance_name) self.affected_instances.append(instance) - else: - instances_release.append(instance_name) - if instances_release: - self.context.glm.release(locking.LEVEL_INSTANCE, instances_release) - self.acquired_locks[locking.LEVEL_INSTANCE] = instances_keep + + _ReleaseLocks(self, locking.LEVEL_INSTANCE, keep=instances_keep) + + assert (set(self.glm.list_owned(locking.LEVEL_INSTANCE)) == + set(instances_keep)) def BuildHooksEnv(self): """Build hooks env. @@ -4386,7 +4905,7 @@ class LUNodeSetParams(LogicalUnit): This runs on the master node. """ - env = { + return { "OP_TARGET": self.op.node_name, "MASTER_CANDIDATE": str(self.op.master_candidate), "OFFLINE": str(self.op.offline), @@ -4394,9 +4913,13 @@ class LUNodeSetParams(LogicalUnit): "MASTER_CAPABLE": str(self.op.master_capable), "VM_CAPABLE": str(self.op.vm_capable), } - nl = [self.cfg.GetMasterNode(), - self.op.node_name] - return env, nl, nl + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ + nl = [self.cfg.GetMasterNode(), self.op.node_name] + return (nl, nl) def CheckPrereq(self): """Check prerequisites. @@ -4440,7 +4963,7 @@ class LUNodeSetParams(LogicalUnit): self.old_flags = old_flags = (node.master_candidate, node.drained, node.offline) - assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags) + assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags) self.old_role = old_role = self._F2R[old_flags] # Check for ineffective changes @@ -4456,12 +4979,12 @@ class LUNodeSetParams(LogicalUnit): if _SupportsOob(self.cfg, node): if self.op.offline is False and not (node.powered or self.op.powered == True): - raise errors.OpPrereqError(("Please power on node %s first before you" - " can reset offline state") % + raise errors.OpPrereqError(("Node %s needs to be turned on before its" + " offline status can be reset") % self.op.node_name) elif self.op.powered is not None: raise errors.OpPrereqError(("Unable to change powered state for node %s" - " which does not support out-of-band" + " as it does not support out-of-band" " handling") % self.op.node_name) # If we're being deofflined/drained, we'll MC ourself if needed @@ -4677,6 +5200,7 @@ class LUClusterQuery(NoHooksLU): "volume_group_name": cluster.volume_group_name, "drbd_usermode_helper": cluster.drbd_usermode_helper, "file_storage_dir": cluster.file_storage_dir, + "shared_file_storage_dir": cluster.shared_file_storage_dir, "maintain_node_health": cluster.maintain_node_health, "ctime": cluster.ctime, "mtime": cluster.mtime, @@ -5089,9 +5613,17 @@ class LUInstanceStartup(LogicalUnit): env = { "FORCE": self.op.force, } + env.update(_BuildInstanceHookEnvByObject(self, self.instance)) + + return env + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) - return env, nl, nl + return (nl, nl) def CheckPrereq(self): """Check prerequisites. @@ -5146,7 +5678,8 @@ class LUInstanceStartup(LogicalUnit): instance = self.instance force = self.op.force - self.cfg.MarkInstanceUp(instance.name) + if not self.op.no_remember: + self.cfg.MarkInstanceUp(instance.name) if self.primary_offline: assert self.op.ignore_offline_nodes @@ -5186,9 +5719,17 @@ class LUInstanceReboot(LogicalUnit): "REBOOT_TYPE": self.op.reboot_type, "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout, } + env.update(_BuildInstanceHookEnvByObject(self, self.instance)) + + return env + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) - return env, nl, nl + return (nl, nl) def CheckPrereq(self): """Check prerequisites. @@ -5213,10 +5754,16 @@ class LUInstanceReboot(LogicalUnit): ignore_secondaries = self.op.ignore_secondaries reboot_type = self.op.reboot_type + remote_info = self.rpc.call_instance_info(instance.primary_node, + instance.name, + instance.hypervisor) + remote_info.Raise("Error checking node %s" % instance.primary_node) + instance_running = bool(remote_info.payload) + node_current = instance.primary_node - if reboot_type in [constants.INSTANCE_REBOOT_SOFT, - constants.INSTANCE_REBOOT_HARD]: + if instance_running and reboot_type in [constants.INSTANCE_REBOOT_SOFT, + constants.INSTANCE_REBOOT_HARD]: for disk in instance.disks: self.cfg.SetDiskID(disk, node_current) result = self.rpc.call_instance_reboot(node_current, instance, @@ -5224,10 +5771,14 @@ class LUInstanceReboot(LogicalUnit): self.op.shutdown_timeout) result.Raise("Could not reboot instance") else: - result = self.rpc.call_instance_shutdown(node_current, instance, - self.op.shutdown_timeout) - result.Raise("Could not shutdown instance for full reboot") - _ShutdownInstanceDisks(self, instance) + if instance_running: + result = self.rpc.call_instance_shutdown(node_current, instance, + self.op.shutdown_timeout) + result.Raise("Could not shutdown instance for full reboot") + _ShutdownInstanceDisks(self, instance) + else: + self.LogInfo("Instance %s was already stopped, starting now", + instance.name) _StartInstanceDisks(self, instance, ignore_secondaries) result = self.rpc.call_instance_start(node_current, instance, None, None) msg = result.fail_msg @@ -5258,8 +5809,14 @@ class LUInstanceShutdown(LogicalUnit): """ env = _BuildInstanceHookEnvByObject(self, self.instance) env["TIMEOUT"] = self.op.timeout + return env + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) - return env, nl, nl + return (nl, nl) def CheckPrereq(self): """Check prerequisites. @@ -5287,7 +5844,8 @@ class LUInstanceShutdown(LogicalUnit): node_current = instance.primary_node timeout = self.op.timeout - self.cfg.MarkInstanceDown(instance.name) + if not self.op.no_remember: + self.cfg.MarkInstanceDown(instance.name) if self.primary_offline: assert self.op.ignore_offline_nodes @@ -5318,9 +5876,14 @@ class LUInstanceReinstall(LogicalUnit): This runs on master, primary and secondary nodes of the instance. """ - env = _BuildInstanceHookEnvByObject(self, self.instance) + return _BuildInstanceHookEnvByObject(self, self.instance) + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) - return env, nl, nl + return (nl, nl) def CheckPrereq(self): """Check prerequisites. @@ -5395,18 +5958,40 @@ class LUInstanceRecreateDisks(LogicalUnit): HTYPE = constants.HTYPE_INSTANCE REQ_BGL = False + def CheckArguments(self): + # normalise the disk list + self.op.disks = sorted(frozenset(self.op.disks)) + def ExpandNames(self): self._ExpandAndLockInstance() - - def BuildHooksEnv(self): + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + if self.op.nodes: + self.op.nodes = [_ExpandNodeName(self.cfg, n) for n in self.op.nodes] + self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes) + else: + self.needed_locks[locking.LEVEL_NODE] = [] + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE: + # if we replace the nodes, we only need to lock the old primary, + # otherwise we need to lock all nodes for disk re-creation + primary_only = bool(self.op.nodes) + self._LockInstancesNodes(primary_only=primary_only) + + def BuildHooksEnv(self): """Build hooks env. This runs on master, primary and secondary nodes of the instance. """ - env = _BuildInstanceHookEnvByObject(self, self.instance) + return _BuildInstanceHookEnvByObject(self, self.instance) + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) - return env, nl, nl + return (nl, nl) def CheckPrereq(self): """Check prerequisites. @@ -5417,32 +6002,72 @@ class LUInstanceRecreateDisks(LogicalUnit): instance = self.cfg.GetInstanceInfo(self.op.instance_name) assert instance is not None, \ "Cannot retrieve locked instance %s" % self.op.instance_name - _CheckNodeOnline(self, instance.primary_node) + if self.op.nodes: + if len(self.op.nodes) != len(instance.all_nodes): + raise errors.OpPrereqError("Instance %s currently has %d nodes, but" + " %d replacement nodes were specified" % + (instance.name, len(instance.all_nodes), + len(self.op.nodes)), + errors.ECODE_INVAL) + assert instance.disk_template != constants.DT_DRBD8 or \ + len(self.op.nodes) == 2 + assert instance.disk_template != constants.DT_PLAIN or \ + len(self.op.nodes) == 1 + primary_node = self.op.nodes[0] + else: + primary_node = instance.primary_node + _CheckNodeOnline(self, primary_node) if instance.disk_template == constants.DT_DISKLESS: raise errors.OpPrereqError("Instance '%s' has no disks" % self.op.instance_name, errors.ECODE_INVAL) - _CheckInstanceDown(self, instance, "cannot recreate disks") + # if we replace nodes *and* the old primary is offline, we don't + # check + assert instance.primary_node in self.needed_locks[locking.LEVEL_NODE] + old_pnode = self.cfg.GetNodeInfo(instance.primary_node) + if not (self.op.nodes and old_pnode.offline): + _CheckInstanceDown(self, instance, "cannot recreate disks") if not self.op.disks: self.op.disks = range(len(instance.disks)) else: for idx in self.op.disks: if idx >= len(instance.disks): - raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx, + raise errors.OpPrereqError("Invalid disk index '%s'" % idx, errors.ECODE_INVAL) - + if self.op.disks != range(len(instance.disks)) and self.op.nodes: + raise errors.OpPrereqError("Can't recreate disks partially and" + " change the nodes at the same time", + errors.ECODE_INVAL) self.instance = instance def Exec(self, feedback_fn): """Recreate the disks. """ + # change primary node, if needed + if self.op.nodes: + self.instance.primary_node = self.op.nodes[0] + self.LogWarning("Changing the instance's nodes, you will have to" + " remove any disks left on the older nodes manually") + to_skip = [] - for idx, _ in enumerate(self.instance.disks): + for idx, disk in enumerate(self.instance.disks): if idx not in self.op.disks: # disk idx has not been passed in to_skip.append(idx) continue + # update secondaries for disks, if needed + if self.op.nodes: + if disk.dev_type == constants.LD_DRBD8: + # need to update the nodes + assert len(self.op.nodes) == 2 + logical_id = list(disk.logical_id) + logical_id[0] = self.op.nodes[0] + logical_id[1] = self.op.nodes[1] + disk.logical_id = tuple(logical_id) + + if self.op.nodes: + self.cfg.Update(self.instance, feedback_fn) _CreateDisks(self, self.instance, to_skip=to_skip) @@ -5460,7 +6085,7 @@ class LUInstanceRename(LogicalUnit): """ if self.op.ip_check and not self.op.name_check: # TODO: make the ip check more flexible and not depend on the name check - raise errors.OpPrereqError("Cannot do ip check without a name check", + raise errors.OpPrereqError("IP address check requires a name check", errors.ECODE_INVAL) def BuildHooksEnv(self): @@ -5471,8 +6096,14 @@ class LUInstanceRename(LogicalUnit): """ env = _BuildInstanceHookEnvByObject(self, self.instance) env["INSTANCE_NEW_NAME"] = self.op.new_name + return env + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) - return env, nl, nl + return (nl, nl) def CheckPrereq(self): """Check prerequisites. @@ -5491,8 +6122,14 @@ class LUInstanceRename(LogicalUnit): new_name = self.op.new_name if self.op.name_check: hostname = netutils.GetHostname(name=new_name) - self.LogInfo("Resolved given name '%s' to '%s'", new_name, - hostname.name) + if hostname != new_name: + self.LogInfo("Resolved given name '%s' to '%s'", new_name, + hostname.name) + if not utils.MatchNameComponent(self.op.new_name, [hostname.name]): + raise errors.OpPrereqError(("Resolved hostname '%s' does not look the" + " same as given hostname '%s'") % + (hostname.name, self.op.new_name), + errors.ECODE_INVAL) new_name = self.op.new_name = hostname.name if (self.op.ip_check and netutils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT)): @@ -5513,15 +6150,17 @@ class LUInstanceRename(LogicalUnit): old_name = inst.name rename_file_storage = False - if (inst.disk_template == constants.DT_FILE and + if (inst.disk_template in (constants.DT_FILE, constants.DT_SHARED_FILE) and self.op.new_name != inst.name): old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1]) rename_file_storage = True self.cfg.RenameInstance(inst.name, self.op.new_name) - # Change the instance lock. This is definitely safe while we hold the BGL - self.context.glm.remove(locking.LEVEL_INSTANCE, old_name) - self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name) + # Change the instance lock. This is definitely safe while we hold the BGL. + # Otherwise the new lock would have to be added in acquired mode. + assert self.REQ_BGL + self.glm.remove(locking.LEVEL_INSTANCE, old_name) + self.glm.add(locking.LEVEL_INSTANCE, self.op.new_name) # re-read the instance from the configuration after rename inst = self.cfg.GetInstanceInfo(self.op.new_name) @@ -5577,9 +6216,15 @@ class LUInstanceRemove(LogicalUnit): """ env = _BuildInstanceHookEnvByObject(self, self.instance) env["SHUTDOWN_TIMEOUT"] = self.op.shutdown_timeout + return env + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ nl = [self.cfg.GetMasterNode()] nl_post = list(self.instance.all_nodes) + nl - return env, nl, nl_post + return (nl, nl_post) def CheckPrereq(self): """Check prerequisites. @@ -5643,8 +6288,8 @@ class LUInstanceQuery(NoHooksLU): REQ_BGL = False def CheckArguments(self): - self.iq = _InstanceQuery(self.op.names, self.op.output_fields, - self.op.use_locking) + self.iq = _InstanceQuery(qlang.MakeSimpleFilter("name", self.op.names), + self.op.output_fields, self.op.use_locking) def ExpandNames(self): self.iq.ExpandNames(self) @@ -5664,14 +6309,43 @@ class LUInstanceFailover(LogicalUnit): HTYPE = constants.HTYPE_INSTANCE REQ_BGL = False + def CheckArguments(self): + """Check the arguments. + + """ + self.iallocator = getattr(self.op, "iallocator", None) + self.target_node = getattr(self.op, "target_node", None) + def ExpandNames(self): self._ExpandAndLockInstance() + + if self.op.target_node is not None: + self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node) + self.needed_locks[locking.LEVEL_NODE] = [] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + ignore_consistency = self.op.ignore_consistency + shutdown_timeout = self.op.shutdown_timeout + self._migrater = TLMigrateInstance(self, self.op.instance_name, + cleanup=False, + failover=True, + ignore_consistency=ignore_consistency, + shutdown_timeout=shutdown_timeout) + self.tasklets = [self._migrater] + def DeclareLocks(self, level): if level == locking.LEVEL_NODE: - self._LockInstancesNodes() + instance = self.context.cfg.GetInstanceInfo(self.op.instance_name) + if instance.disk_template in constants.DTS_EXT_MIRROR: + if self.op.target_node is None: + self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + else: + self.needed_locks[locking.LEVEL_NODE] = [instance.primary_node, + self.op.target_node] + del self.recalculate_locks[locking.LEVEL_NODE] + else: + self._LockInstancesNodes() def BuildHooksEnv(self): """Build hooks env. @@ -5679,128 +6353,33 @@ class LUInstanceFailover(LogicalUnit): This runs on master, primary and secondary nodes of the instance. """ - instance = self.instance + instance = self._migrater.instance source_node = instance.primary_node - target_node = instance.secondary_nodes[0] + target_node = self.op.target_node env = { "IGNORE_CONSISTENCY": self.op.ignore_consistency, "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout, "OLD_PRIMARY": source_node, - "OLD_SECONDARY": target_node, "NEW_PRIMARY": target_node, - "NEW_SECONDARY": source_node, } - env.update(_BuildInstanceHookEnvByObject(self, instance)) - nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes) - nl_post = list(nl) - nl_post.append(source_node) - return env, nl, nl_post - def CheckPrereq(self): - """Check prerequisites. - - This checks that the instance is in the cluster. - - """ - self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name) - assert self.instance is not None, \ - "Cannot retrieve locked instance %s" % self.op.instance_name - - bep = self.cfg.GetClusterInfo().FillBE(instance) - if instance.disk_template not in constants.DTS_NET_MIRROR: - raise errors.OpPrereqError("Instance's disk layout is not" - " network mirrored, cannot failover.", - errors.ECODE_STATE) - - secondary_nodes = instance.secondary_nodes - if not secondary_nodes: - raise errors.ProgrammerError("no secondary node but using " - "a mirrored disk template") - - target_node = secondary_nodes[0] - _CheckNodeOnline(self, target_node) - _CheckNodeNotDrained(self, target_node) - if instance.admin_up: - # check memory requirements on the secondary node - _CheckNodeFreeMemory(self, target_node, "failing over instance %s" % - instance.name, bep[constants.BE_MEMORY], - instance.hypervisor) + if instance.disk_template in constants.DTS_INT_MIRROR: + env["OLD_SECONDARY"] = instance.secondary_nodes[0] + env["NEW_SECONDARY"] = source_node else: - self.LogInfo("Not checking memory on the secondary node as" - " instance will not be started") + env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = "" - # check bridge existance - _CheckInstanceBridgesExist(self, instance, node=target_node) + env.update(_BuildInstanceHookEnvByObject(self, instance)) - def Exec(self, feedback_fn): - """Failover an instance. + return env - The failover is done by shutting it down on its present node and - starting it on the secondary. + def BuildHooksNodes(self): + """Build hooks nodes. """ - instance = self.instance - primary_node = self.cfg.GetNodeInfo(instance.primary_node) - - source_node = instance.primary_node - target_node = instance.secondary_nodes[0] - - if instance.admin_up: - feedback_fn("* checking disk consistency between source and target") - for dev in instance.disks: - # for drbd, these are drbd over lvm - if not _CheckDiskConsistency(self, dev, target_node, False): - if not self.op.ignore_consistency: - raise errors.OpExecError("Disk %s is degraded on target node," - " aborting failover." % dev.iv_name) - else: - feedback_fn("* not checking disk consistency as instance is not running") - - feedback_fn("* shutting down instance on source node") - logging.info("Shutting down instance %s on node %s", - instance.name, source_node) - - result = self.rpc.call_instance_shutdown(source_node, instance, - self.op.shutdown_timeout) - msg = result.fail_msg - if msg: - if self.op.ignore_consistency or primary_node.offline: - self.proc.LogWarning("Could not shutdown instance %s on node %s." - " Proceeding anyway. Please make sure node" - " %s is down. Error details: %s", - instance.name, source_node, source_node, msg) - else: - raise errors.OpExecError("Could not shutdown instance %s on" - " node %s: %s" % - (instance.name, source_node, msg)) - - feedback_fn("* deactivating the instance's disks on source node") - if not _ShutdownInstanceDisks(self, instance, ignore_primary=True): - raise errors.OpExecError("Can't shut down the instance's disks.") - - instance.primary_node = target_node - # distribute new instance config to the other nodes - self.cfg.Update(instance, feedback_fn) - - # Only start the instance if it's marked as up - if instance.admin_up: - feedback_fn("* activating the instance's disks on target node") - logging.info("Starting instance %s on node %s", - instance.name, target_node) - - disks_ok, _ = _AssembleInstanceDisks(self, instance, - ignore_secondaries=True) - if not disks_ok: - _ShutdownInstanceDisks(self, instance) - raise errors.OpExecError("Can't activate the instance's disks") - - feedback_fn("* starting the instance on the target node") - result = self.rpc.call_instance_start(target_node, instance, None, None) - msg = result.fail_msg - if msg: - _ShutdownInstanceDisks(self, instance) - raise errors.OpExecError("Could not start instance %s on node %s: %s" % - (instance.name, target_node, msg)) + instance = self._migrater.instance + nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes) + return (nl, nl + [instance.primary_node]) class LUInstanceMigrate(LogicalUnit): @@ -5817,16 +6396,30 @@ class LUInstanceMigrate(LogicalUnit): def ExpandNames(self): self._ExpandAndLockInstance() + if self.op.target_node is not None: + self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node) + self.needed_locks[locking.LEVEL_NODE] = [] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE self._migrater = TLMigrateInstance(self, self.op.instance_name, - self.op.cleanup) + cleanup=self.op.cleanup, + failover=False, + fallback=self.op.allow_failover) self.tasklets = [self._migrater] def DeclareLocks(self, level): if level == locking.LEVEL_NODE: - self._LockInstancesNodes() + instance = self.context.cfg.GetInstanceInfo(self.op.instance_name) + if instance.disk_template in constants.DTS_EXT_MIRROR: + if self.op.target_node is None: + self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + else: + self.needed_locks[locking.LEVEL_NODE] = [instance.primary_node, + self.op.target_node] + del self.recalculate_locks[locking.LEVEL_NODE] + else: + self._LockInstancesNodes() def BuildHooksEnv(self): """Build hooks env. @@ -5836,20 +6429,30 @@ class LUInstanceMigrate(LogicalUnit): """ instance = self._migrater.instance source_node = instance.primary_node - target_node = instance.secondary_nodes[0] + target_node = self.op.target_node env = _BuildInstanceHookEnvByObject(self, instance) - env["MIGRATE_LIVE"] = self._migrater.live - env["MIGRATE_CLEANUP"] = self.op.cleanup env.update({ - "OLD_PRIMARY": source_node, - "OLD_SECONDARY": target_node, - "NEW_PRIMARY": target_node, - "NEW_SECONDARY": source_node, - }) + "MIGRATE_LIVE": self._migrater.live, + "MIGRATE_CLEANUP": self.op.cleanup, + "OLD_PRIMARY": source_node, + "NEW_PRIMARY": target_node, + }) + + if instance.disk_template in constants.DTS_INT_MIRROR: + env["OLD_SECONDARY"] = target_node + env["NEW_SECONDARY"] = source_node + else: + env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = None + + return env + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ + instance = self._migrater.instance nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes) - nl_post = list(nl) - nl_post.append(source_node) - return env, nl, nl_post + return (nl, nl + [instance.primary_node]) class LUInstanceMove(LogicalUnit): @@ -5882,9 +6485,18 @@ class LUInstanceMove(LogicalUnit): "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout, } env.update(_BuildInstanceHookEnvByObject(self, self.instance)) - nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node, - self.op.target_node] - return env, nl, nl + return env + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ + nl = [ + self.cfg.GetMasterNode(), + self.instance.primary_node, + self.op.target_node, + ] + return (nl, nl) def CheckPrereq(self): """Check prerequisites. @@ -6035,32 +6647,45 @@ class LUNodeMigrate(LogicalUnit): HTYPE = constants.HTYPE_NODE REQ_BGL = False + def CheckArguments(self): + _CheckIAllocatorOrNode(self, "iallocator", "remote_node") + def ExpandNames(self): self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) - self.needed_locks = { - locking.LEVEL_NODE: [self.op.node_name], - } - - self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + self.needed_locks = {} # Create tasklets for migrating instances for all instances on this node names = [] tasklets = [] + self.lock_all_nodes = False + for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name): logging.debug("Migrating instance %s", inst.name) names.append(inst.name) - tasklets.append(TLMigrateInstance(self, inst.name, False)) + tasklets.append(TLMigrateInstance(self, inst.name, cleanup=False)) + + if inst.disk_template in constants.DTS_EXT_MIRROR: + # We need to lock all nodes, as the iallocator will choose the + # destination nodes afterwards + self.lock_all_nodes = True self.tasklets = tasklets + # Declare node locks + if self.lock_all_nodes: + self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + else: + self.needed_locks[locking.LEVEL_NODE] = [self.op.node_name] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + # Declare instance locks self.needed_locks[locking.LEVEL_INSTANCE] = names def DeclareLocks(self, level): - if level == locking.LEVEL_NODE: + if level == locking.LEVEL_NODE and not self.lock_all_nodes: self._LockInstancesNodes() def BuildHooksEnv(self): @@ -6069,13 +6694,16 @@ class LUNodeMigrate(LogicalUnit): This runs on the master, the primary and all the secondaries. """ - env = { + return { "NODE_NAME": self.op.node_name, } - nl = [self.cfg.GetMasterNode()] + def BuildHooksNodes(self): + """Build hooks nodes. - return (env, nl, nl) + """ + nl = [self.cfg.GetMasterNode()] + return (nl, nl) class TLMigrateInstance(Tasklet): @@ -6084,9 +6712,28 @@ class TLMigrateInstance(Tasklet): @type live: boolean @ivar live: whether the migration will be done live or non-live; this variable is initalized only after CheckPrereq has run + @type cleanup: boolean + @ivar cleanup: Wheater we cleanup from a failed migration + @type iallocator: string + @ivar iallocator: The iallocator used to determine target_node + @type target_node: string + @ivar target_node: If given, the target_node to reallocate the instance to + @type failover: boolean + @ivar failover: Whether operation results in failover or migration + @type fallback: boolean + @ivar fallback: Whether fallback to failover is allowed if migration not + possible + @type ignore_consistency: boolean + @ivar ignore_consistency: Wheter we should ignore consistency between source + and target node + @type shutdown_timeout: int + @ivar shutdown_timeout: In case of failover timeout of the shutdown """ - def __init__(self, lu, instance_name, cleanup): + def __init__(self, lu, instance_name, cleanup=False, + failover=False, fallback=False, + ignore_consistency=False, + shutdown_timeout=constants.DEFAULT_SHUTDOWN_TIMEOUT): """Initializes this class. """ @@ -6096,6 +6743,10 @@ class TLMigrateInstance(Tasklet): self.instance_name = instance_name self.cleanup = cleanup self.live = False # will be overridden later + self.failover = failover + self.fallback = fallback + self.ignore_consistency = ignore_consistency + self.shutdown_timeout = shutdown_timeout def CheckPrereq(self): """Check prerequisites. @@ -6106,54 +6757,148 @@ class TLMigrateInstance(Tasklet): instance_name = _ExpandInstanceName(self.lu.cfg, self.instance_name) instance = self.cfg.GetInstanceInfo(instance_name) assert instance is not None + self.instance = instance - if instance.disk_template != constants.DT_DRBD8: - raise errors.OpPrereqError("Instance's disk layout is not" - " drbd8, cannot migrate.", errors.ECODE_STATE) + if (not self.cleanup and not instance.admin_up and not self.failover and + self.fallback): + self.lu.LogInfo("Instance is marked down, fallback allowed, switching" + " to failover") + self.failover = True + + if instance.disk_template not in constants.DTS_MIRRORED: + if self.failover: + text = "failovers" + else: + text = "migrations" + raise errors.OpPrereqError("Instance's disk layout '%s' does not allow" + " %s" % (instance.disk_template, text), + errors.ECODE_STATE) + + if instance.disk_template in constants.DTS_EXT_MIRROR: + _CheckIAllocatorOrNode(self.lu, "iallocator", "target_node") + + if self.lu.op.iallocator: + self._RunAllocator() + else: + # We set set self.target_node as it is required by + # BuildHooksEnv + self.target_node = self.lu.op.target_node + + # self.target_node is already populated, either directly or by the + # iallocator run + target_node = self.target_node + if self.target_node == instance.primary_node: + raise errors.OpPrereqError("Cannot migrate instance %s" + " to its primary (%s)" % + (instance.name, instance.primary_node)) + + if len(self.lu.tasklets) == 1: + # It is safe to release locks only when we're the only tasklet + # in the LU + _ReleaseLocks(self.lu, locking.LEVEL_NODE, + keep=[instance.primary_node, self.target_node]) - secondary_nodes = instance.secondary_nodes - if not secondary_nodes: - raise errors.ConfigurationError("No secondary node but using" - " drbd8 disk template") + else: + secondary_nodes = instance.secondary_nodes + if not secondary_nodes: + raise errors.ConfigurationError("No secondary node but using" + " %s disk template" % + instance.disk_template) + target_node = secondary_nodes[0] + if self.lu.op.iallocator or (self.lu.op.target_node and + self.lu.op.target_node != target_node): + if self.failover: + text = "failed over" + else: + text = "migrated" + raise errors.OpPrereqError("Instances with disk template %s cannot" + " be %s to arbitrary nodes" + " (neither an iallocator nor a target" + " node can be passed)" % + (instance.disk_template, text), + errors.ECODE_INVAL) i_be = self.cfg.GetClusterInfo().FillBE(instance) - target_node = secondary_nodes[0] # check memory requirements on the secondary node - _CheckNodeFreeMemory(self.lu, target_node, "migrating instance %s" % - instance.name, i_be[constants.BE_MEMORY], - instance.hypervisor) + if not self.failover or instance.admin_up: + _CheckNodeFreeMemory(self.lu, target_node, "migrating instance %s" % + instance.name, i_be[constants.BE_MEMORY], + instance.hypervisor) + else: + self.lu.LogInfo("Not checking memory on the secondary node as" + " instance will not be started") # check bridge existance _CheckInstanceBridgesExist(self.lu, instance, node=target_node) if not self.cleanup: _CheckNodeNotDrained(self.lu, target_node) - result = self.rpc.call_instance_migratable(instance.primary_node, - instance) - result.Raise("Can't migrate, please use failover", - prereq=True, ecode=errors.ECODE_STATE) + if not self.failover: + result = self.rpc.call_instance_migratable(instance.primary_node, + instance) + if result.fail_msg and self.fallback: + self.lu.LogInfo("Can't migrate, instance offline, fallback to" + " failover") + self.failover = True + else: + result.Raise("Can't migrate, please use failover", + prereq=True, ecode=errors.ECODE_STATE) - self.instance = instance + assert not (self.failover and self.cleanup) - if self.lu.op.live is not None and self.lu.op.mode is not None: - raise errors.OpPrereqError("Only one of the 'live' and 'mode'" - " parameters are accepted", - errors.ECODE_INVAL) - if self.lu.op.live is not None: - if self.lu.op.live: - self.lu.op.mode = constants.HT_MIGRATION_LIVE - else: - self.lu.op.mode = constants.HT_MIGRATION_NONLIVE - # reset the 'live' parameter to None so that repeated - # invocations of CheckPrereq do not raise an exception - self.lu.op.live = None - elif self.lu.op.mode is None: - # read the default value from the hypervisor - i_hv = self.cfg.GetClusterInfo().FillHV(instance, skip_globals=False) - self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE] + if not self.failover: + if self.lu.op.live is not None and self.lu.op.mode is not None: + raise errors.OpPrereqError("Only one of the 'live' and 'mode'" + " parameters are accepted", + errors.ECODE_INVAL) + if self.lu.op.live is not None: + if self.lu.op.live: + self.lu.op.mode = constants.HT_MIGRATION_LIVE + else: + self.lu.op.mode = constants.HT_MIGRATION_NONLIVE + # reset the 'live' parameter to None so that repeated + # invocations of CheckPrereq do not raise an exception + self.lu.op.live = None + elif self.lu.op.mode is None: + # read the default value from the hypervisor + i_hv = self.cfg.GetClusterInfo().FillHV(self.instance, + skip_globals=False) + self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE] + + self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE + else: + # Failover is never live + self.live = False - self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE + def _RunAllocator(self): + """Run the allocator based on input opcode. + + """ + ial = IAllocator(self.cfg, self.rpc, + mode=constants.IALLOCATOR_MODE_RELOC, + name=self.instance_name, + # TODO See why hail breaks with a single node below + relocate_from=[self.instance.primary_node, + self.instance.primary_node], + ) + + ial.Run(self.lu.op.iallocator) + + if not ial.success: + raise errors.OpPrereqError("Can't compute nodes using" + " iallocator '%s': %s" % + (self.lu.op.iallocator, ial.info), + errors.ECODE_NORES) + if len(ial.result) != ial.required_nodes: + raise errors.OpPrereqError("iallocator '%s' returned invalid number" + " of nodes (%s), required %s" % + (self.lu.op.iallocator, len(ial.result), + ial.required_nodes), errors.ECODE_FAULT) + self.target_node = ial.result[0] + self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s", + self.instance_name, self.lu.op.iallocator, + utils.CommaJoin(ial.result)) def _WaitUntilSync(self): """Poll with custom rpc for disk sync. @@ -6248,15 +6993,15 @@ class TLMigrateInstance(Tasklet): if runningon_source and runningon_target: raise errors.OpExecError("Instance seems to be running on two nodes," - " or the hypervisor is confused. You will have" + " or the hypervisor is confused; you will have" " to ensure manually that it runs only on one" - " and restart this operation.") + " and restart this operation") if not (runningon_source or runningon_target): - raise errors.OpExecError("Instance does not seem to be running at all." - " In this case, it's safer to repair by" + raise errors.OpExecError("Instance does not seem to be running at all;" + " in this case it's safer to repair by" " running 'gnt-instance stop' to ensure disk" - " shutdown, and then restarting it.") + " shutdown, and then restarting it") if runningon_target: # the migration has actually succeeded, we need to update the config @@ -6270,16 +7015,17 @@ class TLMigrateInstance(Tasklet): " primary node (%s)" % source_node) demoted_node = target_node - self._EnsureSecondary(demoted_node) - try: + if instance.disk_template in constants.DTS_INT_MIRROR: + self._EnsureSecondary(demoted_node) + try: + self._WaitUntilSync() + except errors.OpExecError: + # we ignore here errors, since if the device is standalone, it + # won't be able to sync + pass + self._GoStandalone() + self._GoReconnect(False) self._WaitUntilSync() - except errors.OpExecError: - # we ignore here errors, since if the device is standalone, it - # won't be able to sync - pass - self._GoStandalone() - self._GoReconnect(False) - self._WaitUntilSync() self.feedback_fn("* done") @@ -6288,16 +7034,18 @@ class TLMigrateInstance(Tasklet): """ target_node = self.target_node + if self.instance.disk_template in constants.DTS_EXT_MIRROR: + return + try: self._EnsureSecondary(target_node) self._GoStandalone() self._GoReconnect(False) self._WaitUntilSync() except errors.OpExecError, err: - self.lu.LogWarning("Migration failed and I can't reconnect the" - " drives: error '%s'\n" - "Please look and recover the instance status" % - str(err)) + self.lu.LogWarning("Migration failed and I can't reconnect the drives," + " please try to recover the instance manually;" + " error '%s'" % str(err)) def _AbortMigration(self): """Call the hypervisor code to abort a started migration. @@ -6339,7 +7087,7 @@ class TLMigrateInstance(Tasklet): if not _CheckDiskConsistency(self.lu, dev, target_node, False): raise errors.OpExecError("Disk %s is degraded or not fully" " synchronized on target node," - " aborting migrate." % dev.iv_name) + " aborting migration" % dev.iv_name) # First get the migration information from the remote node result = self.rpc.call_migration_info(source_node, instance) @@ -6352,11 +7100,12 @@ class TLMigrateInstance(Tasklet): self.migration_info = migration_info = result.payload - # Then switch the disks to master/master mode - self._EnsureSecondary(target_node) - self._GoStandalone() - self._GoReconnect(True) - self._WaitUntilSync() + if self.instance.disk_template not in constants.DTS_EXT_MIRROR: + # Then switch the disks to master/master mode + self._EnsureSecondary(target_node) + self._GoStandalone() + self._GoReconnect(True) + self._WaitUntilSync() self.feedback_fn("* preparing %s to accept the instance" % target_node) result = self.rpc.call_accept_instance(target_node, @@ -6375,64 +7124,143 @@ class TLMigrateInstance(Tasklet): (instance.name, msg)) self.feedback_fn("* migrating instance to %s" % target_node) - time.sleep(10) result = self.rpc.call_instance_migrate(source_node, instance, self.nodes_ip[target_node], self.live) msg = result.fail_msg if msg: - logging.error("Instance migration failed, trying to revert" - " disk status: %s", msg) - self.feedback_fn("Migration failed, aborting") - self._AbortMigration() - self._RevertDiskStatus() - raise errors.OpExecError("Could not migrate instance %s: %s" % - (instance.name, msg)) - time.sleep(10) + logging.error("Instance migration failed, trying to revert" + " disk status: %s", msg) + self.feedback_fn("Migration failed, aborting") + self._AbortMigration() + self._RevertDiskStatus() + raise errors.OpExecError("Could not migrate instance %s: %s" % + (instance.name, msg)) + + instance.primary_node = target_node + # distribute new instance config to the other nodes + self.cfg.Update(instance, self.feedback_fn) + + result = self.rpc.call_finalize_migration(target_node, + instance, + migration_info, + True) + msg = result.fail_msg + if msg: + logging.error("Instance migration succeeded, but finalization failed:" + " %s", msg) + raise errors.OpExecError("Could not finalize instance migration: %s" % + msg) + + if self.instance.disk_template not in constants.DTS_EXT_MIRROR: + self._EnsureSecondary(source_node) + self._WaitUntilSync() + self._GoStandalone() + self._GoReconnect(False) + self._WaitUntilSync() + + self.feedback_fn("* done") + + def _ExecFailover(self): + """Failover an instance. + + The failover is done by shutting it down on its present node and + starting it on the secondary. + + """ + instance = self.instance + primary_node = self.cfg.GetNodeInfo(instance.primary_node) + + source_node = instance.primary_node + target_node = self.target_node + + if instance.admin_up: + self.feedback_fn("* checking disk consistency between source and target") + for dev in instance.disks: + # for drbd, these are drbd over lvm + if not _CheckDiskConsistency(self, dev, target_node, False): + if not self.ignore_consistency: + raise errors.OpExecError("Disk %s is degraded on target node," + " aborting failover" % dev.iv_name) + else: + self.feedback_fn("* not checking disk consistency as instance is not" + " running") + + self.feedback_fn("* shutting down instance on source node") + logging.info("Shutting down instance %s on node %s", + instance.name, source_node) + + result = self.rpc.call_instance_shutdown(source_node, instance, + self.shutdown_timeout) + msg = result.fail_msg + if msg: + if self.ignore_consistency or primary_node.offline: + self.lu.LogWarning("Could not shutdown instance %s on node %s," + " proceeding anyway; please make sure node" + " %s is down; error details: %s", + instance.name, source_node, source_node, msg) + else: + raise errors.OpExecError("Could not shutdown instance %s on" + " node %s: %s" % + (instance.name, source_node, msg)) + + self.feedback_fn("* deactivating the instance's disks on source node") + if not _ShutdownInstanceDisks(self, instance, ignore_primary=True): + raise errors.OpExecError("Can't shut down the instance's disks.") instance.primary_node = target_node # distribute new instance config to the other nodes self.cfg.Update(instance, self.feedback_fn) - result = self.rpc.call_finalize_migration(target_node, - instance, - migration_info, - True) - msg = result.fail_msg - if msg: - logging.error("Instance migration succeeded, but finalization failed:" - " %s", msg) - raise errors.OpExecError("Could not finalize instance migration: %s" % - msg) + # Only start the instance if it's marked as up + if instance.admin_up: + self.feedback_fn("* activating the instance's disks on target node") + logging.info("Starting instance %s on node %s", + instance.name, target_node) - self._EnsureSecondary(source_node) - self._WaitUntilSync() - self._GoStandalone() - self._GoReconnect(False) - self._WaitUntilSync() + disks_ok, _ = _AssembleInstanceDisks(self, instance, + ignore_secondaries=True) + if not disks_ok: + _ShutdownInstanceDisks(self, instance) + raise errors.OpExecError("Can't activate the instance's disks") - self.feedback_fn("* done") + self.feedback_fn("* starting the instance on the target node") + result = self.rpc.call_instance_start(target_node, instance, None, None) + msg = result.fail_msg + if msg: + _ShutdownInstanceDisks(self, instance) + raise errors.OpExecError("Could not start instance %s on node %s: %s" % + (instance.name, target_node, msg)) def Exec(self, feedback_fn): """Perform the migration. """ - feedback_fn("Migrating instance %s" % self.instance.name) - self.feedback_fn = feedback_fn - self.source_node = self.instance.primary_node - self.target_node = self.instance.secondary_nodes[0] + + # FIXME: if we implement migrate-to-any in DRBD, this needs fixing + if self.instance.disk_template in constants.DTS_INT_MIRROR: + self.target_node = self.instance.secondary_nodes[0] + # Otherwise self.target_node has been populated either + # directly, or through an iallocator. + self.all_nodes = [self.source_node, self.target_node] self.nodes_ip = { self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip, self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip, } - if self.cleanup: - return self._ExecCleanup() + if self.failover: + feedback_fn("Failover instance %s" % self.instance.name) + self._ExecFailover() else: - return self._ExecMigration() + feedback_fn("Migrating instance %s" % self.instance.name) + + if self.cleanup: + return self._ExecCleanup() + else: + return self._ExecMigration() def _CreateBlockDev(lu, node, instance, device, force_create, @@ -6520,17 +7348,18 @@ def _GenerateUniqueNames(lu, exts): return results -def _GenerateDRBD8Branch(lu, primary, secondary, size, vgname, names, iv_name, - p_minor, s_minor): +def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names, + iv_name, p_minor, s_minor): """Generate a drbd8 device complete with its children. """ + assert len(vgnames) == len(names) == 2 port = lu.cfg.AllocatePort() shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId()) dev_data = objects.Disk(dev_type=constants.LD_LV, size=size, - logical_id=(vgname, names[0])) + logical_id=(vgnames[0], names[0])) dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128, - logical_id=(vgname, names[1])) + logical_id=(vgnames[1], names[1])) drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size, logical_id=(primary, secondary, port, p_minor, s_minor, @@ -6563,12 +7392,13 @@ def _GenerateDiskTemplate(lu, template_name, for i in range(disk_count)]) for idx, disk in enumerate(disk_info): disk_index = idx + base_index - vg = disk.get("vg", vgname) + vg = disk.get(constants.IDISK_VG, vgname) feedback_fn("* disk %i, vg %s, name %s" % (idx, vg, names[idx])) - disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"], + disk_dev = objects.Disk(dev_type=constants.LD_LV, + size=disk[constants.IDISK_SIZE], logical_id=(vg, names[idx]), iv_name="disk/%d" % disk_index, - mode=disk["mode"]) + mode=disk[constants.IDISK_MODE]) disks.append(disk_dev) elif template_name == constants.DT_DRBD8: if len(secondary_nodes) != 1: @@ -6584,12 +7414,15 @@ def _GenerateDiskTemplate(lu, template_name, names.append(lv_prefix + "_meta") for idx, disk in enumerate(disk_info): disk_index = idx + base_index - vg = disk.get("vg", vgname) + data_vg = disk.get(constants.IDISK_VG, vgname) + meta_vg = disk.get(constants.IDISK_METAVG, data_vg) disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node, - disk["size"], vg, names[idx*2:idx*2+2], + disk[constants.IDISK_SIZE], + [data_vg, meta_vg], + names[idx * 2:idx * 2 + 2], "disk/%d" % disk_index, - minors[idx*2], minors[idx*2+1]) - disk_dev.mode = disk["mode"] + minors[idx * 2], minors[idx * 2 + 1]) + disk_dev.mode = disk[constants.IDISK_MODE] disks.append(disk_dev) elif template_name == constants.DT_FILE: if len(secondary_nodes) != 0: @@ -6599,13 +7432,44 @@ def _GenerateDiskTemplate(lu, template_name, for idx, disk in enumerate(disk_info): disk_index = idx + base_index - disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"], + disk_dev = objects.Disk(dev_type=constants.LD_FILE, + size=disk[constants.IDISK_SIZE], iv_name="disk/%d" % disk_index, logical_id=(file_driver, "%s/disk%d" % (file_storage_dir, disk_index)), - mode=disk["mode"]) + mode=disk[constants.IDISK_MODE]) + disks.append(disk_dev) + elif template_name == constants.DT_SHARED_FILE: + if len(secondary_nodes) != 0: + raise errors.ProgrammerError("Wrong template configuration") + + opcodes.RequireSharedFileStorage() + + for idx, disk in enumerate(disk_info): + disk_index = idx + base_index + disk_dev = objects.Disk(dev_type=constants.LD_FILE, + size=disk[constants.IDISK_SIZE], + iv_name="disk/%d" % disk_index, + logical_id=(file_driver, + "%s/disk%d" % (file_storage_dir, + disk_index)), + mode=disk[constants.IDISK_MODE]) + disks.append(disk_dev) + elif template_name == constants.DT_BLOCK: + if len(secondary_nodes) != 0: + raise errors.ProgrammerError("Wrong template configuration") + + for idx, disk in enumerate(disk_info): + disk_index = idx + base_index + disk_dev = objects.Disk(dev_type=constants.LD_BLOCKDEV, + size=disk[constants.IDISK_SIZE], + logical_id=(constants.BLOCKDEV_DRIVER_MANUAL, + disk[constants.IDISK_ADOPT]), + iv_name="disk/%d" % disk_index, + mode=disk[constants.IDISK_MODE]) disks.append(disk_dev) + else: raise errors.ProgrammerError("Invalid disk template '%s'" % template_name) return disks @@ -6656,14 +7520,17 @@ def _WipeDisks(lu, instance): try: for idx, device in enumerate(instance.disks): - lu.LogInfo("* Wiping disk %d", idx) - logging.info("Wiping disk %d for instance %s, node %s", - idx, instance.name, node) - # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but # MAX_WIPE_CHUNK at max wipe_chunk_size = min(constants.MAX_WIPE_CHUNK, device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT) + # we _must_ make this an int, otherwise rounding errors will + # occur + wipe_chunk_size = int(wipe_chunk_size) + + lu.LogInfo("* Wiping disk %d", idx) + logging.info("Wiping disk %d for instance %s, node %s using" + " chunk size %s", idx, instance.name, node, wipe_chunk_size) offset = 0 size = device.size @@ -6672,6 +7539,8 @@ def _WipeDisks(lu, instance): while offset < size: wipe_size = min(wipe_chunk_size, size - offset) + logging.debug("Wiping disk %d, offset %s, chunk %s", + idx, offset, wipe_size) result = lu.rpc.call_blockdev_wipe(node, device, offset, wipe_size) result.Raise("Could not wipe disk %d at offset %d for size %d" % (idx, offset, wipe_size)) @@ -6689,8 +7558,8 @@ def _WipeDisks(lu, instance): for idx, success in enumerate(result.payload): if not success: - lu.LogWarning("Warning: Resume sync of disk %d failed. Please have a" - " look at the status and troubleshoot the issue.", idx) + lu.LogWarning("Resume sync of disk %d failed, please have a" + " look at the status and troubleshoot the issue", idx) logging.warn("resume-sync of instance %s for disks %d failed", instance.name, idx) @@ -6720,7 +7589,7 @@ def _CreateDisks(lu, instance, to_skip=None, target_node=None): pnode = target_node all_nodes = [pnode] - if instance.disk_template == constants.DT_FILE: + if instance.disk_template in (constants.DT_FILE, constants.DT_SHARED_FILE): file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1]) result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir) @@ -6794,12 +7663,13 @@ def _ComputeDiskSizePerVG(disk_template, disks): """ def _compute(disks, payload): - """Universal algorithm + """Universal algorithm. """ vgs = {} for disk in disks: - vgs[disk["vg"]] = vgs.get("vg", 0) + disk["size"] + payload + vgs[disk[constants.IDISK_VG]] = \ + vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload return vgs @@ -6810,6 +7680,7 @@ def _ComputeDiskSizePerVG(disk_template, disks): # 128 MB are added for drbd metadata for each disk constants.DT_DRBD8: _compute(disks, 128), constants.DT_FILE: {}, + constants.DT_SHARED_FILE: {}, } if disk_template not in req_size_dict: @@ -6826,10 +7697,12 @@ def _ComputeDiskSize(disk_template, disks): # Required free disk space as a function of disk and swap space req_size_dict = { constants.DT_DISKLESS: None, - constants.DT_PLAIN: sum(d["size"] for d in disks), + constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks), # 128 MB are added for drbd metadata for each disk - constants.DT_DRBD8: sum(d["size"] + 128 for d in disks), + constants.DT_DRBD8: sum(d[constants.IDISK_SIZE] + 128 for d in disks), constants.DT_FILE: None, + constants.DT_SHARED_FILE: 0, + constants.DT_BLOCK: 0, } if disk_template not in req_size_dict: @@ -6935,8 +7808,8 @@ class LUInstanceCreate(LogicalUnit): if self.op.ip_check and not self.op.name_check: # TODO: make the ip check more flexible and not depend on the name check - raise errors.OpPrereqError("Cannot do ip check without a name check", - errors.ECODE_INVAL) + raise errors.OpPrereqError("Cannot do IP address check without a name" + " check", errors.ECODE_INVAL) # check nics' parameter names for nic in self.op.nics: @@ -6946,7 +7819,7 @@ class LUInstanceCreate(LogicalUnit): has_adopt = has_no_adopt = False for disk in self.op.disks: utils.ForceDictType(disk, constants.IDISK_PARAMS_TYPES) - if "adopt" in disk: + if constants.IDISK_ADOPT in disk: has_adopt = True else: has_no_adopt = True @@ -6965,6 +7838,12 @@ class LUInstanceCreate(LogicalUnit): if self.op.mode == constants.INSTANCE_IMPORT: raise errors.OpPrereqError("Disk adoption not allowed for" " instance import", errors.ECODE_INVAL) + else: + if self.op.disk_template in constants.DTS_MUST_ADOPT: + raise errors.OpPrereqError("Disk template %s requires disk adoption," + " but no 'adopt' parameter given" % + self.op.disk_template, + errors.ECODE_INVAL) self.adopt_disks = has_adopt @@ -6991,7 +7870,7 @@ class LUInstanceCreate(LogicalUnit): _CheckIAllocatorOrNode(self, "iallocator", "pnode") if self.op.pnode is not None: - if self.op.disk_template in constants.DTS_NET_MIRROR: + if self.op.disk_template in constants.DTS_INT_MIRROR: if self.op.snode is None: raise errors.OpPrereqError("The networked disk templates need" " a mirror node", errors.ECODE_INVAL) @@ -7108,7 +7987,7 @@ class LUInstanceCreate(LogicalUnit): self.op.src_node = None if os.path.isabs(src_path): raise errors.OpPrereqError("Importing an instance from an absolute" - " path requires a source node option.", + " path requires a source node option", errors.ECODE_INVAL) else: self.op.src_node = src_node = _ExpandNodeName(self.cfg, src_node) @@ -7179,15 +8058,21 @@ class LUInstanceCreate(LogicalUnit): vcpus=self.be_full[constants.BE_VCPUS], nics=_NICListToTuple(self, self.nics), disk_template=self.op.disk_template, - disks=[(d["size"], d["mode"]) for d in self.disks], + disks=[(d[constants.IDISK_SIZE], d[constants.IDISK_MODE]) + for d in self.disks], bep=self.be_full, hvp=self.hv_full, hypervisor_name=self.op.hypervisor, )) - nl = ([self.cfg.GetMasterNode(), self.op.pnode] + - self.secondaries) - return env, nl, nl + return env + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ + nl = [self.cfg.GetMasterNode(), self.op.pnode] + self.secondaries + return nl, nl def _ReadExportInfo(self): """Reads the export information from disk. @@ -7204,7 +8089,7 @@ class LUInstanceCreate(LogicalUnit): src_path = self.op.src_path if src_node is None: - locked_nodes = self.acquired_locks[locking.LEVEL_NODE] + locked_nodes = self.glm.list_owned(locking.LEVEL_NODE) exp_list = self.rpc.call_export_list(locked_nodes) found = False for node in exp_list: @@ -7261,7 +8146,7 @@ class LUInstanceCreate(LogicalUnit): # TODO: import the disk iv_name too for idx in range(einfo.getint(constants.INISECT_INS, "disk_count")): disk_sz = einfo.getint(constants.INISECT_INS, "disk%d_size" % idx) - disks.append({"size": disk_sz}) + disks.append({constants.IDISK_SIZE: disk_sz}) self.op.disks = disks else: raise errors.OpPrereqError("No disk info specified and the export" @@ -7382,7 +8267,7 @@ class LUInstanceCreate(LogicalUnit): # NIC buildup self.nics = [] for idx, nic in enumerate(self.op.nics): - nic_mode_req = nic.get("mode", None) + nic_mode_req = nic.get(constants.INIC_MODE, None) nic_mode = nic_mode_req if nic_mode is None: nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE] @@ -7394,7 +8279,7 @@ class LUInstanceCreate(LogicalUnit): default_ip_mode = constants.VALUE_NONE # ip validity checks - ip = nic.get("ip", default_ip_mode) + ip = nic.get(constants.INIC_IP, default_ip_mode) if ip is None or ip.lower() == constants.VALUE_NONE: nic_ip = None elif ip.lower() == constants.VALUE_AUTO: @@ -7415,7 +8300,7 @@ class LUInstanceCreate(LogicalUnit): errors.ECODE_INVAL) # MAC address verification - mac = nic.get("mac", constants.VALUE_AUTO) + mac = nic.get(constants.INIC_MAC, constants.VALUE_AUTO) if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE): mac = utils.NormalizeAndValidateMac(mac) @@ -7426,18 +8311,8 @@ class LUInstanceCreate(LogicalUnit): " in cluster" % mac, errors.ECODE_NOTUNIQUE) - # bridge verification - bridge = nic.get("bridge", None) - link = nic.get("link", None) - if bridge and link: - raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'" - " at the same time", errors.ECODE_INVAL) - elif bridge and nic_mode == constants.NIC_MODE_ROUTED: - raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic", - errors.ECODE_INVAL) - elif bridge: - link = bridge - + # Build nic parameters + link = nic.get(constants.INIC_LINK, None) nicparams = {} if nic_mode_req: nicparams[constants.NIC_MODE] = nic_mode_req @@ -7449,13 +8324,14 @@ class LUInstanceCreate(LogicalUnit): self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams)) # disk checks/pre-build + default_vg = self.cfg.GetVGName() self.disks = [] for disk in self.op.disks: - mode = disk.get("mode", constants.DISK_RDWR) + mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR) if mode not in constants.DISK_ACCESS_SET: raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode, errors.ECODE_INVAL) - size = disk.get("size", None) + size = disk.get(constants.IDISK_SIZE, None) if size is None: raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL) try: @@ -7463,10 +8339,16 @@ class LUInstanceCreate(LogicalUnit): except (TypeError, ValueError): raise errors.OpPrereqError("Invalid disk size '%s'" % size, errors.ECODE_INVAL) - vg = disk.get("vg", self.cfg.GetVGName()) - new_disk = {"size": size, "mode": mode, "vg": vg} - if "adopt" in disk: - new_disk["adopt"] = disk["adopt"] + + data_vg = disk.get(constants.IDISK_VG, default_vg) + new_disk = { + constants.IDISK_SIZE: size, + constants.IDISK_MODE: mode, + constants.IDISK_VG: data_vg, + constants.IDISK_METAVG: disk.get(constants.IDISK_METAVG, data_vg), + } + if constants.IDISK_ADOPT in disk: + new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT] self.disks.append(new_disk) if self.op.mode == constants.INSTANCE_IMPORT: @@ -7551,10 +8433,10 @@ class LUInstanceCreate(LogicalUnit): self.secondaries = [] # mirror node verification - if self.op.disk_template in constants.DTS_NET_MIRROR: + if self.op.disk_template in constants.DTS_INT_MIRROR: if self.op.snode == pnode.name: raise errors.OpPrereqError("The secondary node cannot be the" - " primary node.", errors.ECODE_INVAL) + " primary node", errors.ECODE_INVAL) _CheckNodeOnline(self, self.op.snode) _CheckNodeNotDrained(self, self.op.snode) _CheckNodeVmCapable(self, self.op.snode) @@ -7567,8 +8449,10 @@ class LUInstanceCreate(LogicalUnit): req_sizes = _ComputeDiskSizePerVG(self.op.disk_template, self.disks) _CheckNodesFreeDiskPerVG(self, nodenames, req_sizes) - else: # instead, we must check the adoption data - all_lvs = set([i["vg"] + "/" + i["adopt"] for i in self.disks]) + elif self.op.disk_template == constants.DT_PLAIN: # Check the adoption data + all_lvs = set(["%s/%s" % (disk[constants.IDISK_VG], + disk[constants.IDISK_ADOPT]) + for disk in self.disks]) if len(all_lvs) != len(self.disks): raise errors.OpPrereqError("Duplicate volume names given for adoption", errors.ECODE_INVAL) @@ -7601,7 +8485,39 @@ class LUInstanceCreate(LogicalUnit): errors.ECODE_STATE) # update the size of disk based on what is found for dsk in self.disks: - dsk["size"] = int(float(node_lvs[dsk["vg"] + "/" + dsk["adopt"]][0])) + dsk[constants.IDISK_SIZE] = \ + int(float(node_lvs["%s/%s" % (dsk[constants.IDISK_VG], + dsk[constants.IDISK_ADOPT])][0])) + + elif self.op.disk_template == constants.DT_BLOCK: + # Normalize and de-duplicate device paths + all_disks = set([os.path.abspath(disk[constants.IDISK_ADOPT]) + for disk in self.disks]) + if len(all_disks) != len(self.disks): + raise errors.OpPrereqError("Duplicate disk names given for adoption", + errors.ECODE_INVAL) + baddisks = [d for d in all_disks + if not d.startswith(constants.ADOPTABLE_BLOCKDEV_ROOT)] + if baddisks: + raise errors.OpPrereqError("Device node(s) %s lie outside %s and" + " cannot be adopted" % + (", ".join(baddisks), + constants.ADOPTABLE_BLOCKDEV_ROOT), + errors.ECODE_INVAL) + + node_disks = self.rpc.call_bdev_sizes([pnode.name], + list(all_disks))[pnode.name] + node_disks.Raise("Cannot get block device information from node %s" % + pnode.name) + node_disks = node_disks.payload + delta = all_disks.difference(node_disks.keys()) + if delta: + raise errors.OpPrereqError("Missing block device(s): %s" % + utils.CommaJoin(delta), + errors.ECODE_INVAL) + for dsk in self.disks: + dsk[constants.IDISK_SIZE] = \ + int(float(node_disks[dsk[constants.IDISK_ADOPT]])) _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams) @@ -7633,7 +8549,7 @@ class LUInstanceCreate(LogicalUnit): else: network_port = None - if constants.ENABLE_FILE_STORAGE: + if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE: # this is needed because os.path.join does not accept None arguments if self.op.file_storage_dir is None: string_file_storage_dir = "" @@ -7641,7 +8557,12 @@ class LUInstanceCreate(LogicalUnit): string_file_storage_dir = self.op.file_storage_dir # build the full file storage dir path - file_storage_dir = utils.PathJoin(self.cfg.GetFileStorageDir(), + if self.op.disk_template == constants.DT_SHARED_FILE: + get_fsd_fn = self.cfg.GetSharedFileStorageDir + else: + get_fsd_fn = self.cfg.GetFileStorageDir + + file_storage_dir = utils.PathJoin(get_fsd_fn(), string_file_storage_dir, instance) else: file_storage_dir = "" @@ -7669,17 +8590,18 @@ class LUInstanceCreate(LogicalUnit): ) if self.adopt_disks: - # rename LVs to the newly-generated names; we need to construct - # 'fake' LV disks with the old data, plus the new unique_id - tmp_disks = [objects.Disk.FromDict(v.ToDict()) for v in disks] - rename_to = [] - for t_dsk, a_dsk in zip (tmp_disks, self.disks): - rename_to.append(t_dsk.logical_id) - t_dsk.logical_id = (t_dsk.logical_id[0], a_dsk["adopt"]) - self.cfg.SetDiskID(t_dsk, pnode_name) - result = self.rpc.call_blockdev_rename(pnode_name, - zip(tmp_disks, rename_to)) - result.Raise("Failed to rename adoped LVs") + if self.op.disk_template == constants.DT_PLAIN: + # rename LVs to the newly-generated names; we need to construct + # 'fake' LV disks with the old data, plus the new unique_id + tmp_disks = [objects.Disk.FromDict(v.ToDict()) for v in disks] + rename_to = [] + for t_dsk, a_dsk in zip (tmp_disks, self.disks): + rename_to.append(t_dsk.logical_id) + t_dsk.logical_id = (t_dsk.logical_id[0], a_dsk[constants.IDISK_ADOPT]) + self.cfg.SetDiskID(t_dsk, pnode_name) + result = self.rpc.call_blockdev_rename(pnode_name, + zip(tmp_disks, rename_to)) + result.Raise("Failed to rename adoped LVs") else: feedback_fn("* creating instance disks...") try: @@ -7699,16 +8621,13 @@ class LUInstanceCreate(LogicalUnit): # Declare that we don't want to remove the instance lock anymore, as we've # added the instance to the config del self.remove_locks[locking.LEVEL_INSTANCE] - # Unlock all the nodes + if self.op.mode == constants.INSTANCE_IMPORT: - nodes_keep = [self.op.src_node] - nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE] - if node != self.op.src_node] - self.context.glm.release(locking.LEVEL_NODE, nodes_release) - self.acquired_locks[locking.LEVEL_NODE] = nodes_keep + # Release unused nodes + _ReleaseLocks(self, locking.LEVEL_NODE, keep=[self.op.src_node]) else: - self.context.glm.release(locking.LEVEL_NODE) - del self.acquired_locks[locking.LEVEL_NODE] + # Release all nodes + _ReleaseLocks(self, locking.LEVEL_NODE) disk_abort = False if not self.adopt_disks and self.cfg.GetClusterInfo().prealloc_wipe_disks: @@ -7725,7 +8644,7 @@ class LUInstanceCreate(LogicalUnit): pass elif self.op.wait_for_sync: disk_abort = not _WaitForSync(self, iobj) - elif iobj.disk_template in constants.DTS_NET_MIRROR: + elif iobj.disk_template in constants.DTS_INT_MIRROR: # make sure the disks are not degraded (still sync-ing is ok) time.sleep(15) feedback_fn("* checking mirrors status") @@ -7860,9 +8779,9 @@ class LUInstanceConsole(NoHooksLU): if instance.name not in node_insts.payload: if instance.admin_up: - state = "ERROR_down" + state = constants.INSTST_ERRORDOWN else: - state = "ADMIN_down" + state = constants.INSTST_ADMINDOWN raise errors.OpExecError("Instance %s is not running (state %s)" % (instance.name, state)) @@ -7907,24 +8826,29 @@ class LUInstanceReplaceDisks(LogicalUnit): def ExpandNames(self): self._ExpandAndLockInstance() - if self.op.iallocator is not None: - self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + assert locking.LEVEL_NODE not in self.needed_locks + assert locking.LEVEL_NODEGROUP not in self.needed_locks + + assert self.op.iallocator is None or self.op.remote_node is None, \ + "Conflicting options" - elif self.op.remote_node is not None: - remote_node = _ExpandNodeName(self.cfg, self.op.remote_node) - self.op.remote_node = remote_node + if self.op.remote_node is not None: + self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node) # Warning: do not remove the locking of the new secondary here # unless DRBD8.AddChildren is changed to work in parallel; # currently it doesn't since parallel invocations of # FindUnusedMinor will conflict - self.needed_locks[locking.LEVEL_NODE] = [remote_node] + self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND - else: self.needed_locks[locking.LEVEL_NODE] = [] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + if self.op.iallocator is not None: + # iallocator will select a new node in the same group + self.needed_locks[locking.LEVEL_NODEGROUP] = [] + self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode, self.op.iallocator, self.op.remote_node, self.op.disks, False, self.op.early_release) @@ -7932,11 +8856,26 @@ class LUInstanceReplaceDisks(LogicalUnit): self.tasklets = [self.replacer] def DeclareLocks(self, level): - # If we're not already locking all nodes in the set we have to declare the - # instance's primary/secondary nodes. - if (level == locking.LEVEL_NODE and - self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET): - self._LockInstancesNodes() + if level == locking.LEVEL_NODEGROUP: + assert self.op.remote_node is None + assert self.op.iallocator is not None + assert not self.needed_locks[locking.LEVEL_NODEGROUP] + + self.share_locks[locking.LEVEL_NODEGROUP] = 1 + self.needed_locks[locking.LEVEL_NODEGROUP] = \ + self.cfg.GetInstanceNodeGroups(self.op.instance_name) + + elif level == locking.LEVEL_NODE: + if self.op.iallocator is not None: + assert self.op.remote_node is None + assert not self.needed_locks[locking.LEVEL_NODE] + + # Lock member nodes of all locked groups + self.needed_locks[locking.LEVEL_NODE] = [node_name + for group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP) + for node_name in self.cfg.GetNodeGroup(group_uuid).members] + else: + self._LockInstancesNodes() def BuildHooksEnv(self): """Build hooks env. @@ -7951,13 +8890,40 @@ class LUInstanceReplaceDisks(LogicalUnit): "OLD_SECONDARY": instance.secondary_nodes[0], } env.update(_BuildInstanceHookEnvByObject(self, instance)) + return env + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ + instance = self.replacer.instance nl = [ self.cfg.GetMasterNode(), instance.primary_node, ] if self.op.remote_node is not None: nl.append(self.op.remote_node) - return env, nl, nl + return nl, nl + + def CheckPrereq(self): + """Check prerequisites. + + """ + assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or + self.op.iallocator is None) + + owned_groups = self.glm.list_owned(locking.LEVEL_NODEGROUP) + if owned_groups: + groups = self.cfg.GetInstanceNodeGroups(self.op.instance_name) + if owned_groups != groups: + raise errors.OpExecError("Node groups used by instance '%s' changed" + " since lock was acquired, current list is %r," + " used to be '%s'" % + (self.op.instance_name, + utils.CommaJoin(groups), + utils.CommaJoin(owned_groups))) + + return LogicalUnit.CheckPrereq(self) class TLReplaceDisks(Tasklet): @@ -8047,6 +9013,29 @@ class TLReplaceDisks(Tasklet): return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance, node_name, True) + def _CheckDisksActivated(self, instance): + """Checks if the instance disks are activated. + + @param instance: The instance to check disks + @return: True if they are activated, False otherwise + + """ + nodes = instance.all_nodes + + for idx, dev in enumerate(instance.disks): + for node in nodes: + self.lu.LogInfo("Checking disk/%d on %s", idx, node) + self.cfg.SetDiskID(dev, node) + + result = self.rpc.call_blockdev_find(node, dev) + + if result.offline: + continue + elif result.fail_msg or not result.payload: + return False + + return True + def CheckPrereq(self): """Check prerequisites. @@ -8088,20 +9077,23 @@ class TLReplaceDisks(Tasklet): remote_node = self._RunAllocator(self.lu, self.iallocator_name, instance.name, instance.secondary_nodes) - if remote_node is not None: + if remote_node is None: + self.remote_node_info = None + else: + assert remote_node in self.lu.glm.list_owned(locking.LEVEL_NODE), \ + "Remote node '%s' is not locked" % remote_node + self.remote_node_info = self.cfg.GetNodeInfo(remote_node) assert self.remote_node_info is not None, \ "Cannot retrieve locked node %s" % remote_node - else: - self.remote_node_info = None if remote_node == self.instance.primary_node: raise errors.OpPrereqError("The specified node is the primary node of" - " the instance.", errors.ECODE_INVAL) + " the instance", errors.ECODE_INVAL) if remote_node == secondary_node: raise errors.OpPrereqError("The specified node is already the" - " secondary node of the instance.", + " secondary node of the instance", errors.ECODE_INVAL) if self.disks and self.mode in (constants.REPLACE_DISK_AUTO, @@ -8110,6 +9102,10 @@ class TLReplaceDisks(Tasklet): errors.ECODE_INVAL) if self.mode == constants.REPLACE_DISK_AUTO: + if not self._CheckDisksActivated(instance): + raise errors.OpPrereqError("Please run activate-disks on instance %s" + " first" % self.instance_name, + errors.ECODE_STATE) faulty_primary = self._FindFaultyDisks(instance.primary_node) faulty_secondary = self._FindFaultyDisks(secondary_node) @@ -8173,18 +9169,26 @@ class TLReplaceDisks(Tasklet): for node in check_nodes: _CheckNodeOnline(self.lu, node) + touched_nodes = frozenset(node_name for node_name in [self.new_node, + self.other_node, + self.target_node] + if node_name is not None) + + # Release unneeded node locks + _ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes) + + # Release any owned node group + if self.lu.glm.is_owned(locking.LEVEL_NODEGROUP): + _ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP) + # Check whether disks are valid for disk_idx in self.disks: instance.FindDisk(disk_idx) # Get secondary node IP addresses - node_2nd_ip = {} - - for node_name in [self.target_node, self.other_node, self.new_node]: - if node_name is not None: - node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip - - self.node_secondary_ip = node_2nd_ip + self.node_secondary_ip = \ + dict((node_name, self.cfg.GetNodeInfo(node_name).secondary_ip) + for node_name in touched_nodes) def Exec(self, feedback_fn): """Execute disk replacement. @@ -8195,6 +9199,20 @@ class TLReplaceDisks(Tasklet): if self.delay_iallocator: self._CheckPrereq2() + if __debug__: + # Verify owned locks before starting operation + owned_locks = self.lu.glm.list_owned(locking.LEVEL_NODE) + assert set(owned_locks) == set(self.node_secondary_ip), \ + ("Incorrect node locks, owning %s, expected %s" % + (owned_locks, self.node_secondary_ip.keys())) + + owned_locks = self.lu.glm.list_owned(locking.LEVEL_INSTANCE) + assert list(owned_locks) == [self.instance_name], \ + "Instance '%s' not locked" % self.instance_name + + assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \ + "Should not own any node group lock at this point" + if not self.disks: feedback_fn("No disks need replacement") return @@ -8215,14 +9233,24 @@ class TLReplaceDisks(Tasklet): else: fn = self._ExecDrbd8DiskOnly - return fn(feedback_fn) - + result = fn(feedback_fn) finally: # Deactivate the instance disks if we're replacing them on a # down instance if activate_disks: _SafeShutdownInstanceDisks(self.lu, self.instance) + if __debug__: + # Verify owned locks + owned_locks = self.lu.glm.list_owned(locking.LEVEL_NODE) + nodes = frozenset(self.node_secondary_ip) + assert ((self.early_release and not owned_locks) or + (not self.early_release and not (set(owned_locks) - nodes))), \ + ("Not owning the correct locks, early_release=%s, owned=%r," + " nodes=%r" % (self.early_release, owned_locks, nodes)) + + return result + def _CheckVolumeGroup(self, nodes): self.lu.LogInfo("Checking volume groups") @@ -8274,7 +9302,6 @@ class TLReplaceDisks(Tasklet): (node_name, self.instance.name)) def _CreateNewStorage(self, node_name): - vgname = self.cfg.GetVGName() iv_names = {} for idx, dev in enumerate(self.instance.disks): @@ -8288,10 +9315,12 @@ class TLReplaceDisks(Tasklet): lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]] names = _GenerateUniqueNames(self.lu, lv_names) + vg_data = dev.children[0].logical_id[0] lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size, - logical_id=(vgname, names[0])) + logical_id=(vg_data, names[0])) + vg_meta = dev.children[1].logical_id[0] lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128, - logical_id=(vgname, names[1])) + logical_id=(vg_meta, names[1])) new_lvs = [lv_data, lv_meta] old_lvs = dev.children @@ -8332,10 +9361,6 @@ class TLReplaceDisks(Tasklet): self.lu.LogWarning("Can't remove old LV: %s" % msg, hint="remove unused LVs manually") - def _ReleaseNodeLock(self, node_name): - """Releases the lock for a given node.""" - self.lu.context.glm.release(locking.LEVEL_NODE, node_name) - def _ExecDrbd8DiskOnly(self, feedback_fn): """Replace a disk on the primary or secondary for DRBD 8. @@ -8453,7 +9478,8 @@ class TLReplaceDisks(Tasklet): self._RemoveOldStorage(self.target_node, iv_names) # WARNING: we release both node locks here, do not do other RPCs # than WaitForSync to the primary node - self._ReleaseNodeLock([self.target_node, self.other_node]) + _ReleaseLocks(self.lu, locking.LEVEL_NODE, + names=[self.target_node, self.other_node]) # Wait for sync # This can fail as the old devices are degraded and _WaitForSync @@ -8610,9 +9636,10 @@ class TLReplaceDisks(Tasklet): self._RemoveOldStorage(self.target_node, iv_names) # WARNING: we release all node locks here, do not do other RPCs # than WaitForSync to the primary node - self._ReleaseNodeLock([self.instance.primary_node, - self.target_node, - self.new_node]) + _ReleaseLocks(self.lu, locking.LEVEL_NODE, + names=[self.instance.primary_node, + self.target_node, + self.new_node]) # Wait for sync # This can fail as the old devices are degraded and _WaitForSync @@ -8764,8 +9791,14 @@ class LUInstanceGrowDisk(LogicalUnit): "AMOUNT": self.op.amount, } env.update(_BuildInstanceHookEnvByObject(self, self.instance)) + return env + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) - return env, nl, nl + return (nl, nl) def CheckPrereq(self): """Check prerequisites. @@ -8784,13 +9817,14 @@ class LUInstanceGrowDisk(LogicalUnit): if instance.disk_template not in constants.DTS_GROWABLE: raise errors.OpPrereqError("Instance's disk layout does not support" - " growing.", errors.ECODE_INVAL) + " growing", errors.ECODE_INVAL) self.disk = instance.FindDisk(self.op.disk) - if instance.disk_template != constants.DT_FILE: - # TODO: check the free disk space for file, when that feature - # will be supported + if instance.disk_template not in (constants.DT_FILE, + constants.DT_SHARED_FILE): + # TODO: check the free disk space for file, when that feature will be + # supported _CheckNodesFreeDiskPerVG(self, nodenames, self.disk.ComputeGrowth(self.op.amount)) @@ -8805,9 +9839,17 @@ class LUInstanceGrowDisk(LogicalUnit): if not disks_ok: raise errors.OpExecError("Cannot activate block device to grow") + # First run all grow ops in dry-run mode + for node in instance.all_nodes: + self.cfg.SetDiskID(disk, node) + result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, True) + result.Raise("Grow request failed to node %s" % node) + + # We know that (as far as we can test) operations across different + # nodes will succeed, time to run it for real for node in instance.all_nodes: self.cfg.SetDiskID(disk, node) - result = self.rpc.call_blockdev_grow(node, disk, self.op.amount) + result = self.rpc.call_blockdev_grow(node, disk, self.op.amount, False) result.Raise("Grow request failed to node %s" % node) # TODO: Rewrite code to work properly @@ -8822,14 +9864,14 @@ class LUInstanceGrowDisk(LogicalUnit): if self.op.wait_for_sync: disk_abort = not _WaitForSync(self, instance, disks=[disk]) if disk_abort: - self.proc.LogWarning("Warning: disk sync-ing has not returned a good" - " status.\nPlease check the instance.") + self.proc.LogWarning("Disk sync-ing has not returned a good" + " status; please check the instance") if not instance.admin_up: _SafeShutdownInstanceDisks(self, instance, disks=[disk]) elif not instance.admin_up: self.proc.LogWarning("Not shutting down the disk even if the instance is" " not supposed to be running because no wait for" - " sync mode was requested.") + " sync mode was requested") class LUInstanceQueryData(NoHooksLU): @@ -8877,7 +9919,7 @@ class LUInstanceQueryData(NoHooksLU): """ if self.wanted_names is None: assert self.op.use_locking, "Locking was not used" - self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE] + self.wanted_names = self.glm.list_owned(locking.LEVEL_INSTANCE) self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name in self.wanted_names] @@ -9026,11 +10068,11 @@ class LUInstanceSetParams(LogicalUnit): raise errors.OpPrereqError(msg, errors.ECODE_INVAL) if disk_op == constants.DDM_ADD: - mode = disk_dict.setdefault('mode', constants.DISK_RDWR) + mode = disk_dict.setdefault(constants.IDISK_MODE, constants.DISK_RDWR) if mode not in constants.DISK_ACCESS_SET: raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode, errors.ECODE_INVAL) - size = disk_dict.get('size', None) + size = disk_dict.get(constants.IDISK_SIZE, None) if size is None: raise errors.OpPrereqError("Required disk parameter size missing", errors.ECODE_INVAL) @@ -9039,10 +10081,10 @@ class LUInstanceSetParams(LogicalUnit): except (TypeError, ValueError), err: raise errors.OpPrereqError("Invalid disk size parameter: %s" % str(err), errors.ECODE_INVAL) - disk_dict['size'] = size + disk_dict[constants.IDISK_SIZE] = size else: # modification of disk - if 'size' in disk_dict: + if constants.IDISK_SIZE in disk_dict: raise errors.OpPrereqError("Disk size change not possible, use" " grow-disk", errors.ECODE_INVAL) @@ -9056,7 +10098,7 @@ class LUInstanceSetParams(LogicalUnit): errors.ECODE_INVAL) if (self.op.disk_template and - self.op.disk_template in constants.DTS_NET_MIRROR and + self.op.disk_template in constants.DTS_INT_MIRROR and self.op.remote_node is None): raise errors.OpPrereqError("Changing the disk template to a mirrored" " one requires specifying a secondary node", @@ -9079,32 +10121,32 @@ class LUInstanceSetParams(LogicalUnit): raise errors.OpPrereqError(msg, errors.ECODE_INVAL) # nic_dict should be a dict - nic_ip = nic_dict.get('ip', None) + nic_ip = nic_dict.get(constants.INIC_IP, None) if nic_ip is not None: if nic_ip.lower() == constants.VALUE_NONE: - nic_dict['ip'] = None + nic_dict[constants.INIC_IP] = None else: if not netutils.IPAddress.IsValid(nic_ip): raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip, errors.ECODE_INVAL) nic_bridge = nic_dict.get('bridge', None) - nic_link = nic_dict.get('link', None) + nic_link = nic_dict.get(constants.INIC_LINK, None) if nic_bridge and nic_link: raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'" " at the same time", errors.ECODE_INVAL) elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE: nic_dict['bridge'] = None elif nic_link and nic_link.lower() == constants.VALUE_NONE: - nic_dict['link'] = None + nic_dict[constants.INIC_LINK] = None if nic_op == constants.DDM_ADD: - nic_mac = nic_dict.get('mac', None) + nic_mac = nic_dict.get(constants.INIC_MAC, None) if nic_mac is None: - nic_dict['mac'] = constants.VALUE_AUTO + nic_dict[constants.INIC_MAC] = constants.VALUE_AUTO - if 'mac' in nic_dict: - nic_mac = nic_dict['mac'] + if constants.INIC_MAC in nic_dict: + nic_mac = nic_dict[constants.INIC_MAC] if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE): nic_mac = utils.NormalizeAndValidateMac(nic_mac) @@ -9150,12 +10192,12 @@ class LUInstanceSetParams(LogicalUnit): this_nic_override = nic_override[idx] else: this_nic_override = {} - if 'ip' in this_nic_override: - ip = this_nic_override['ip'] + if constants.INIC_IP in this_nic_override: + ip = this_nic_override[constants.INIC_IP] else: ip = nic.ip - if 'mac' in this_nic_override: - mac = this_nic_override['mac'] + if constants.INIC_MAC in this_nic_override: + mac = this_nic_override[constants.INIC_MAC] else: mac = nic.mac if idx in self.nic_pnew: @@ -9166,8 +10208,8 @@ class LUInstanceSetParams(LogicalUnit): link = nicparams[constants.NIC_LINK] args['nics'].append((ip, mac, mode, link)) if constants.DDM_ADD in nic_override: - ip = nic_override[constants.DDM_ADD].get('ip', None) - mac = nic_override[constants.DDM_ADD]['mac'] + ip = nic_override[constants.DDM_ADD].get(constants.INIC_IP, None) + mac = nic_override[constants.DDM_ADD][constants.INIC_MAC] nicparams = self.nic_pnew[constants.DDM_ADD] mode = nicparams[constants.NIC_MODE] link = nicparams[constants.NIC_LINK] @@ -9178,8 +10220,15 @@ class LUInstanceSetParams(LogicalUnit): env = _BuildInstanceHookEnvByObject(self, self.instance, override=args) if self.op.disk_template: env["NEW_DISK_TEMPLATE"] = self.op.disk_template + + return env + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) - return env, nl, nl + return (nl, nl) def CheckPrereq(self): """Check prerequisites. @@ -9216,7 +10265,7 @@ class LUInstanceSetParams(LogicalUnit): self.op.disk_template), errors.ECODE_INVAL) _CheckInstanceDown(self, instance, "cannot change disk template") - if self.op.disk_template in constants.DTS_NET_MIRROR: + if self.op.disk_template in constants.DTS_INT_MIRROR: if self.op.remote_node == pnode: raise errors.OpPrereqError("Given new secondary node %s is the same" " as the primary node of the instance" % @@ -9225,7 +10274,8 @@ class LUInstanceSetParams(LogicalUnit): _CheckNodeNotDrained(self, self.op.remote_node) # FIXME: here we assume that the old instance type is DT_PLAIN assert instance.disk_template == constants.DT_PLAIN - disks = [{"size": d.size, "vg": d.logical_id[0]} + disks = [{constants.IDISK_SIZE: d.size, + constants.IDISK_VG: d.logical_id[0]} for d in instance.disks] required = _ComputeDiskSizePerVG(self.op.disk_template, disks) _CheckNodesFreeDiskPerVG(self, [self.op.remote_node], required) @@ -9255,6 +10305,7 @@ class LUInstanceSetParams(LogicalUnit): self.be_inst = i_bedict # the new dict (without defaults) else: self.be_new = self.be_inst = {} + be_old = cluster.FillBE(instance) # osparams processing if self.op.osparams: @@ -9266,7 +10317,8 @@ class LUInstanceSetParams(LogicalUnit): self.warn = [] - if constants.BE_MEMORY in self.op.beparams and not self.op.force: + if (constants.BE_MEMORY in self.op.beparams and not self.op.force and + be_new[constants.BE_MEMORY] > be_old[constants.BE_MEMORY]): mem_check_list = [pnode] if be_new[constants.BE_AUTO_BALANCE]: # either we changed auto_balance to yes or it was from before @@ -9307,16 +10359,17 @@ class LUInstanceSetParams(LogicalUnit): for node, nres in nodeinfo.items(): if node not in instance.secondary_nodes: continue - msg = nres.fail_msg - if msg: - self.warn.append("Can't get info from secondary node %s: %s" % - (node, msg)) - elif not isinstance(nres.payload.get('memory_free', None), int): - self.warn.append("Secondary node %s didn't return free" - " memory information" % node) + nres.Raise("Can't get info from secondary node %s" % node, + prereq=True, ecode=errors.ECODE_STATE) + if not isinstance(nres.payload.get('memory_free', None), int): + raise errors.OpPrereqError("Secondary node %s didn't return free" + " memory information" % node, + errors.ECODE_STATE) elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']: - self.warn.append("Not enough memory to failover instance to" - " secondary node %s" % node) + raise errors.OpPrereqError("This change will prevent the instance" + " from failover to its secondary node" + " %s, due to not enough memory" % node, + errors.ECODE_STATE) # NIC processing self.nic_pnew = {} @@ -9370,21 +10423,22 @@ class LUInstanceSetParams(LogicalUnit): else: raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON) if new_nic_mode == constants.NIC_MODE_ROUTED: - if 'ip' in nic_dict: - nic_ip = nic_dict['ip'] + if constants.INIC_IP in nic_dict: + nic_ip = nic_dict[constants.INIC_IP] else: nic_ip = old_nic_ip if nic_ip is None: raise errors.OpPrereqError('Cannot set the nic ip to None' ' on a routed nic', errors.ECODE_INVAL) - if 'mac' in nic_dict: - nic_mac = nic_dict['mac'] + if constants.INIC_MAC in nic_dict: + nic_mac = nic_dict[constants.INIC_MAC] if nic_mac is None: raise errors.OpPrereqError('Cannot set the nic mac to None', errors.ECODE_INVAL) elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE): # otherwise generate the mac - nic_dict['mac'] = self.cfg.GenerateMAC(self.proc.GetECId()) + nic_dict[constants.INIC_MAC] = \ + self.cfg.GenerateMAC(self.proc.GetECId()) else: # or validate/reserve the current one try: @@ -9431,7 +10485,9 @@ class LUInstanceSetParams(LogicalUnit): snode = self.op.remote_node # create a fake disk info for _GenerateDiskTemplate - disk_info = [{"size": d.size, "mode": d.mode} for d in instance.disks] + disk_info = [{constants.IDISK_SIZE: d.size, constants.IDISK_MODE: d.mode, + constants.IDISK_VG: d.logical_id[0]} + for d in instance.disks] new_disks = _GenerateDiskTemplate(self, self.op.disk_template, instance.name, pnode, [snode], disk_info, None, None, 0, feedback_fn) @@ -9465,7 +10521,8 @@ class LUInstanceSetParams(LogicalUnit): self.cfg.Update(instance, feedback_fn) # disks are created, waiting for sync - disk_abort = not _WaitForSync(self, instance) + disk_abort = not _WaitForSync(self, instance, + oneshot=not self.op.wait_for_sync) if disk_abort: raise errors.OpExecError("There are some degraded disks for" " this instance, please cleanup manually") @@ -9538,7 +10595,8 @@ class LUInstanceSetParams(LogicalUnit): result.append(("disk/%d" % device_idx, "remove")) elif disk_op == constants.DDM_ADD: # add a new disk - if instance.disk_template == constants.DT_FILE: + if instance.disk_template in (constants.DT_FILE, + constants.DT_SHARED_FILE): file_driver, file_path = instance.disks[0].logical_id file_path = os.path.dirname(file_path) else: @@ -9572,8 +10630,9 @@ class LUInstanceSetParams(LogicalUnit): (new_disk.size, new_disk.mode))) else: # change a given disk - instance.disks[disk_op].mode = disk_dict['mode'] - result.append(("disk.mode/%d" % disk_op, disk_dict['mode'])) + instance.disks[disk_op].mode = disk_dict[constants.IDISK_MODE] + result.append(("disk.mode/%d" % disk_op, + disk_dict[constants.IDISK_MODE])) if self.op.disk_template: r_shut = _ShutdownInstanceDisks(self, instance) @@ -9596,8 +10655,8 @@ class LUInstanceSetParams(LogicalUnit): result.append(("nic.%d" % len(instance.nics), "remove")) elif nic_op == constants.DDM_ADD: # mac and bridge should be set, by now - mac = nic_dict['mac'] - ip = nic_dict.get('ip', None) + mac = nic_dict[constants.INIC_MAC] + ip = nic_dict.get(constants.INIC_IP, None) nicparams = self.nic_pinst[constants.DDM_ADD] new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams) instance.nics.append(new_nic) @@ -9608,7 +10667,7 @@ class LUInstanceSetParams(LogicalUnit): self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK] ))) else: - for key in 'mac', 'ip': + for key in (constants.INIC_MAC, constants.INIC_IP): if key in nic_dict: setattr(instance.nics[nic_op], key, nic_dict[key]) if nic_op in self.nic_pinst: @@ -9672,7 +10731,7 @@ class LUBackupQuery(NoHooksLU): that node. """ - self.nodes = self.acquired_locks[locking.LEVEL_NODE] + self.nodes = self.glm.list_owned(locking.LEVEL_NODE) rpcresult = self.rpc.call_export_list(self.nodes) result = {} for node in rpcresult: @@ -9795,12 +10854,18 @@ class LUBackupExport(LogicalUnit): env.update(_BuildInstanceHookEnvByObject(self, self.instance)) + return env + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ nl = [self.cfg.GetMasterNode(), self.instance.primary_node] if self.op.mode == constants.EXPORT_MODE_LOCAL: nl.append(self.op.target_node) - return env, nl, nl + return (nl, nl) def CheckPrereq(self): """Check prerequisites. @@ -10048,7 +11113,7 @@ class LUBackupRemove(NoHooksLU): fqdn_warn = True instance_name = self.op.instance_name - locked_nodes = self.acquired_locks[locking.LEVEL_NODE] + locked_nodes = self.glm.list_owned(locking.LEVEL_NODE) exportlist = self.rpc.call_export_list(locked_nodes) found = False for node in exportlist: @@ -10110,11 +11175,16 @@ class LUGroupAdd(LogicalUnit): """Build hooks env. """ - env = { + return { "GROUP_NAME": self.op.group_name, } + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ mn = self.cfg.GetMasterNode() - return env, [mn], [mn] + return ([mn], [mn]) def Exec(self, feedback_fn): """Add the node group to the cluster. @@ -10142,20 +11212,40 @@ class LUGroupAssignNodes(NoHooksLU): # We want to lock all the affected nodes and groups. We have readily # available the list of nodes, and the *destination* group. To gather the - # list of "source" groups, we need to fetch node information. - self.node_data = self.cfg.GetAllNodesInfo() - affected_groups = set(self.node_data[node].group for node in self.op.nodes) - affected_groups.add(self.group_uuid) - + # list of "source" groups, we need to fetch node information later on. self.needed_locks = { - locking.LEVEL_NODEGROUP: list(affected_groups), + locking.LEVEL_NODEGROUP: set([self.group_uuid]), locking.LEVEL_NODE: self.op.nodes, } + def DeclareLocks(self, level): + if level == locking.LEVEL_NODEGROUP: + assert len(self.needed_locks[locking.LEVEL_NODEGROUP]) == 1 + + # Try to get all affected nodes' groups without having the group or node + # lock yet. Needs verification later in the code flow. + groups = self.cfg.GetNodeGroupsFromNodes(self.op.nodes) + + self.needed_locks[locking.LEVEL_NODEGROUP].update(groups) + def CheckPrereq(self): """Check prerequisites. """ + assert self.needed_locks[locking.LEVEL_NODEGROUP] + assert (frozenset(self.glm.list_owned(locking.LEVEL_NODE)) == + frozenset(self.op.nodes)) + + expected_locks = (set([self.group_uuid]) | + self.cfg.GetNodeGroupsFromNodes(self.op.nodes)) + actual_locks = self.glm.list_owned(locking.LEVEL_NODEGROUP) + if actual_locks != expected_locks: + raise errors.OpExecError("Nodes changed groups since locks were acquired," + " current groups are '%s', used to be '%s'" % + (utils.CommaJoin(expected_locks), + utils.CommaJoin(actual_locks))) + + self.node_data = self.cfg.GetAllNodesInfo() self.group = self.cfg.GetNodeGroup(self.group_uuid) instance_data = self.cfg.GetAllInstancesInfo() @@ -10181,7 +11271,7 @@ class LUGroupAssignNodes(NoHooksLU): if previous_splits: self.LogWarning("In addition, these already-split instances continue" - " to be spit across groups: %s", + " to be split across groups: %s", utils.CommaJoin(utils.NiceSort(previous_splits))) def Exec(self, feedback_fn): @@ -10191,6 +11281,9 @@ class LUGroupAssignNodes(NoHooksLU): for node in self.op.nodes: self.node_data[node].group = self.group_uuid + # FIXME: Depends on side-effects of modifying the result of + # C{cfg.GetAllNodesInfo} + self.cfg.Update(self.group, feedback_fn) # Saves all modified nodes. @staticmethod @@ -10204,7 +11297,7 @@ class LUGroupAssignNodes(NoHooksLU): In particular, it returns information about newly split instances, and instances that were already split, and remain so after the change. - Only instances whose disk template is listed in constants.DTS_NET_MIRROR are + Only instances whose disk template is listed in constants.DTS_INT_MIRROR are considered. @type changes: list of (node_name, new_group_uuid) pairs. @@ -10227,7 +11320,7 @@ class LUGroupAssignNodes(NoHooksLU): return [instance.primary_node] + list(instance.secondary_nodes) for inst in instance_data.values(): - if inst.disk_template not in constants.DTS_NET_MIRROR: + if inst.disk_template not in constants.DTS_INT_MIRROR: continue instance_nodes = InstanceNodes(inst) @@ -10244,7 +11337,6 @@ class LUGroupAssignNodes(NoHooksLU): class _GroupQuery(_QueryBase): - FIELDS = query.GROUP_FIELDS def ExpandNames(self, lu): @@ -10328,7 +11420,8 @@ class LUGroupQuery(NoHooksLU): REQ_BGL = False def CheckArguments(self): - self.gq = _GroupQuery(self.op.names, self.op.output_fields, False) + self.gq = _GroupQuery(qlang.MakeSimpleFilter("name", self.op.names), + self.op.output_fields, False) def ExpandNames(self): self.gq.ExpandNames(self) @@ -10382,12 +11475,17 @@ class LUGroupSetParams(LogicalUnit): """Build hooks env. """ - env = { + return { "GROUP_NAME": self.op.group_name, "NEW_ALLOC_POLICY": self.op.alloc_policy, } + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ mn = self.cfg.GetMasterNode() - return env, [mn], [mn] + return ([mn], [mn]) def Exec(self, feedback_fn): """Modifies the node group. @@ -10450,11 +11548,16 @@ class LUGroupRemove(LogicalUnit): """Build hooks env. """ - env = { + return { "GROUP_NAME": self.op.group_name, } + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ mn = self.cfg.GetMasterNode() - return env, [mn], [mn] + return ([mn], [mn]) def Exec(self, feedback_fn): """Remove the node group. @@ -10476,7 +11579,7 @@ class LUGroupRename(LogicalUnit): def ExpandNames(self): # This raises errors.OpPrereqError on its own: - self.group_uuid = self.cfg.LookupNodeGroup(self.op.old_name) + self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) self.needed_locks = { locking.LEVEL_NODEGROUP: [self.group_uuid], @@ -10485,8 +11588,7 @@ class LUGroupRename(LogicalUnit): def CheckPrereq(self): """Check prerequisites. - This checks that the given old_name exists as a node group, and that - new_name doesn't. + Ensures requested new name is not yet used. """ try: @@ -10503,21 +11605,25 @@ class LUGroupRename(LogicalUnit): """Build hooks env. """ - env = { - "OLD_NAME": self.op.old_name, + return { + "OLD_NAME": self.op.group_name, "NEW_NAME": self.op.new_name, } + def BuildHooksNodes(self): + """Build hooks nodes. + + """ mn = self.cfg.GetMasterNode() + all_nodes = self.cfg.GetAllNodesInfo() - run_nodes = [mn] all_nodes.pop(mn, None) - for node in all_nodes.values(): - if node.group == self.group_uuid: - run_nodes.append(node.name) + run_nodes = [mn] + run_nodes.extend(node.name for node in all_nodes.values() + if node.group == self.group_uuid) - return env, run_nodes, run_nodes + return (run_nodes, run_nodes) def Exec(self, feedback_fn): """Rename the node group. @@ -10527,7 +11633,7 @@ class LUGroupRename(LogicalUnit): if group is None: raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" % - (self.op.old_name, self.group_uuid)) + (self.op.group_name, self.group_uuid)) group.name = self.op.new_name self.cfg.Update(group, feedback_fn) @@ -10541,8 +11647,8 @@ class TagsLU(NoHooksLU): # pylint: disable-msg=W0223 This is an abstract class which is the parent of all the other tags LUs. """ - def ExpandNames(self): + self.group_uuid = None self.needed_locks = {} if self.op.kind == constants.TAG_NODE: self.op.name = _ExpandNodeName(self.cfg, self.op.name) @@ -10550,6 +11656,8 @@ class TagsLU(NoHooksLU): # pylint: disable-msg=W0223 elif self.op.kind == constants.TAG_INSTANCE: self.op.name = _ExpandInstanceName(self.cfg, self.op.name) self.needed_locks[locking.LEVEL_INSTANCE] = self.op.name + elif self.op.kind == constants.TAG_NODEGROUP: + self.group_uuid = self.cfg.LookupNodeGroup(self.op.name) # FIXME: Acquire BGL for cluster tag operations (as of this writing it's # not possible to acquire the BGL based on opcode parameters) @@ -10564,6 +11672,8 @@ class TagsLU(NoHooksLU): # pylint: disable-msg=W0223 self.target = self.cfg.GetNodeInfo(self.op.name) elif self.op.kind == constants.TAG_INSTANCE: self.target = self.cfg.GetInstanceInfo(self.op.name) + elif self.op.kind == constants.TAG_NODEGROUP: + self.target = self.cfg.GetNodeGroup(self.group_uuid) else: raise errors.OpPrereqError("Wrong tag type requested (%s)" % str(self.op.kind), errors.ECODE_INVAL) @@ -10619,6 +11729,8 @@ class LUTagsSearch(NoHooksLU): tgts.extend([("/instances/%s" % i.name, i) for i in ilist]) nlist = cfg.GetAllNodesInfo().values() tgts.extend([("/nodes/%s" % n.name, n) for n in nlist]) + tgts.extend(("/nodegroup/%s" % n.name, n) + for n in cfg.GetAllNodeGroupsInfo().values()) results = [] for path, target in tgts: for tag in target.GetTags(): @@ -10888,16 +12000,6 @@ class IAllocator(object): """ # pylint: disable-msg=R0902 # lots of instance attributes - _ALLO_KEYS = [ - "name", "mem_size", "disks", "disk_template", - "os", "tags", "nics", "vcpus", "hypervisor", - ] - _RELO_KEYS = [ - "name", "relocate_from", - ] - _EVAC_KEYS = [ - "evac_nodes", - ] def __init__(self, cfg, rpc, mode, **kwargs): self.cfg = cfg @@ -10912,22 +12014,20 @@ class IAllocator(object): self.relocate_from = None self.name = None self.evac_nodes = None + self.instances = None + self.reloc_mode = None + self.target_groups = None # computed fields self.required_nodes = None # init result fields self.success = self.info = self.result = None - if self.mode == constants.IALLOCATOR_MODE_ALLOC: - keyset = self._ALLO_KEYS - fn = self._AddNewInstance - elif self.mode == constants.IALLOCATOR_MODE_RELOC: - keyset = self._RELO_KEYS - fn = self._AddRelocateInstance - elif self.mode == constants.IALLOCATOR_MODE_MEVAC: - keyset = self._EVAC_KEYS - fn = self._AddEvacuateNodes - else: + + try: + (fn, keyset, self._result_check) = self._MODE_DATA[self.mode] + except KeyError: raise errors.ProgrammerError("Unknown mode '%s' passed to the" " IAllocator" % self.mode) + for key in kwargs: if key not in keyset: raise errors.ProgrammerError("Invalid input parameter '%s' to" @@ -10938,7 +12038,7 @@ class IAllocator(object): if key not in kwargs: raise errors.ProgrammerError("Missing input parameter '%s' to" " IAllocator" % key) - self._BuildInputData(fn) + self._BuildInputData(compat.partial(fn, self)) def _ComputeClusterData(self): """Compute the generic allocator input data. @@ -10967,7 +12067,8 @@ class IAllocator(object): hypervisor_name = self.hypervisor elif self.mode == constants.IALLOCATOR_MODE_RELOC: hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor - elif self.mode == constants.IALLOCATOR_MODE_MEVAC: + elif self.mode in (constants.IALLOCATOR_MODE_MEVAC, + constants.IALLOCATOR_MODE_MRELOC): hypervisor_name = cluster_info.enabled_hypervisors[0] node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(), @@ -10993,12 +12094,12 @@ class IAllocator(object): """Compute node groups data. """ - ng = {} - for guuid, gdata in cfg.GetAllNodeGroupsInfo().items(): - ng[guuid] = { - "name": gdata.name, - "alloc_policy": gdata.alloc_policy, - } + ng = dict((guuid, { + "name": gdata.name, + "alloc_policy": gdata.alloc_policy, + }) + for guuid, gdata in cfg.GetAllNodeGroupsInfo().items()) + return ng @staticmethod @@ -11009,22 +12110,19 @@ class IAllocator(object): @returns: a dict of name: (node dict, node config) """ - node_results = {} - for ninfo in node_cfg.values(): - # fill in static (config-based) values - pnr = { - "tags": list(ninfo.GetTags()), - "primary_ip": ninfo.primary_ip, - "secondary_ip": ninfo.secondary_ip, - "offline": ninfo.offline, - "drained": ninfo.drained, - "master_candidate": ninfo.master_candidate, - "group": ninfo.group, - "master_capable": ninfo.master_capable, - "vm_capable": ninfo.vm_capable, - } - - node_results[ninfo.name] = pnr + # fill in static (config-based) values + node_results = dict((ninfo.name, { + "tags": list(ninfo.GetTags()), + "primary_ip": ninfo.primary_ip, + "secondary_ip": ninfo.secondary_ip, + "offline": ninfo.offline, + "drained": ninfo.drained, + "master_candidate": ninfo.master_candidate, + "group": ninfo.group, + "master_capable": ninfo.master_capable, + "vm_capable": ninfo.vm_capable, + }) + for ninfo in node_cfg.values()) return node_results @@ -11098,11 +12196,12 @@ class IAllocator(object): nic_data = [] for nic in iinfo.nics: filled_params = cluster_info.SimpleFillNIC(nic.nicparams) - nic_dict = {"mac": nic.mac, - "ip": nic.ip, - "mode": filled_params[constants.NIC_MODE], - "link": filled_params[constants.NIC_LINK], - } + nic_dict = { + "mac": nic.mac, + "ip": nic.ip, + "mode": filled_params[constants.NIC_MODE], + "link": filled_params[constants.NIC_LINK], + } if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED: nic_dict["bridge"] = filled_params[constants.NIC_LINK] nic_data.append(nic_dict) @@ -11114,7 +12213,9 @@ class IAllocator(object): "os": iinfo.os, "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes), "nics": nic_data, - "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks], + "disks": [{constants.IDISK_SIZE: dsk.size, + constants.IDISK_MODE: dsk.mode} + for dsk in iinfo.disks], "disk_template": iinfo.disk_template, "hypervisor": iinfo.hypervisor, } @@ -11136,10 +12237,11 @@ class IAllocator(object): """ disk_space = _ComputeDiskSize(self.disk_template, self.disks) - if self.disk_template in constants.DTS_NET_MIRROR: + if self.disk_template in constants.DTS_INT_MIRROR: self.required_nodes = 2 else: self.required_nodes = 1 + request = { "name": self.name, "disk_template": self.disk_template, @@ -11152,6 +12254,7 @@ class IAllocator(object): "nics": self.nics, "required_nodes": self.required_nodes, } + return request def _AddRelocateInstance(self): @@ -11169,16 +12272,17 @@ class IAllocator(object): raise errors.ProgrammerError("Unknown instance '%s' passed to" " IAllocator" % self.name) - if instance.disk_template not in constants.DTS_NET_MIRROR: + if instance.disk_template not in constants.DTS_MIRRORED: raise errors.OpPrereqError("Can't relocate non-mirrored instances", errors.ECODE_INVAL) - if len(instance.secondary_nodes) != 1: + if instance.disk_template in constants.DTS_INT_MIRROR and \ + len(instance.secondary_nodes) != 1: raise errors.OpPrereqError("Instance has not exactly one secondary node", errors.ECODE_STATE) self.required_nodes = 1 - disk_sizes = [{'size': disk.size} for disk in instance.disks] + disk_sizes = [{constants.IDISK_SIZE: disk.size} for disk in instance.disks] disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes) request = { @@ -11198,6 +12302,16 @@ class IAllocator(object): } return request + def _AddMultiRelocate(self): + """Get data for multi-relocate requests. + + """ + return { + "instances": self.instances, + "reloc_mode": self.reloc_mode, + "target_groups": self.target_groups, + } + def _BuildInputData(self, fn): """Build input data structures. @@ -11210,6 +12324,28 @@ class IAllocator(object): self.in_text = serializer.Dump(self.in_data) + _MODE_DATA = { + constants.IALLOCATOR_MODE_ALLOC: + (_AddNewInstance, + ["name", "mem_size", "disks", "disk_template", "os", "tags", "nics", + "vcpus", "hypervisor"], ht.TList), + constants.IALLOCATOR_MODE_RELOC: + (_AddRelocateInstance, ["name", "relocate_from"], ht.TList), + constants.IALLOCATOR_MODE_MEVAC: + (_AddEvacuateNodes, ["evac_nodes"], + ht.TListOf(ht.TAnd(ht.TIsLength(2), + ht.TListOf(ht.TString)))), + constants.IALLOCATOR_MODE_MRELOC: + (_AddMultiRelocate, ["instances", "reloc_mode", "target_groups"], + ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, { + # pylint: disable-msg=E1101 + # Class '...' has no 'OP_ID' member + "OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID, + opcodes.OpInstanceMigrate.OP_ID, + opcodes.OpInstanceReplaceDisks.OP_ID]) + })))), + } + def Run(self, name, validate=True, call_fn=None): """Run an instance allocator and return the results. @@ -11250,11 +12386,81 @@ class IAllocator(object): " missing key '%s'" % key) setattr(self, key, rdict[key]) - if not isinstance(rdict["result"], list): - raise errors.OpExecError("Can't parse iallocator results: 'result' key" - " is not a list") + if not self._result_check(self.result): + raise errors.OpExecError("Iallocator returned invalid result," + " expected %s, got %s" % + (self._result_check, self.result), + errors.ECODE_INVAL) + + if self.mode in (constants.IALLOCATOR_MODE_RELOC, + constants.IALLOCATOR_MODE_MEVAC): + node2group = dict((name, ndata["group"]) + for (name, ndata) in self.in_data["nodes"].items()) + + fn = compat.partial(self._NodesToGroups, node2group, + self.in_data["nodegroups"]) + + if self.mode == constants.IALLOCATOR_MODE_RELOC: + assert self.relocate_from is not None + assert self.required_nodes == 1 + + request_groups = fn(self.relocate_from) + result_groups = fn(rdict["result"]) + + if result_groups != request_groups: + raise errors.OpExecError("Groups of nodes returned by iallocator (%s)" + " differ from original groups (%s)" % + (utils.CommaJoin(result_groups), + utils.CommaJoin(request_groups))) + elif self.mode == constants.IALLOCATOR_MODE_MEVAC: + request_groups = fn(self.evac_nodes) + for (instance_name, secnode) in self.result: + result_groups = fn([secnode]) + if result_groups != request_groups: + raise errors.OpExecError("Iallocator returned new secondary node" + " '%s' (group '%s') for instance '%s'" + " which is not in original group '%s'" % + (secnode, utils.CommaJoin(result_groups), + instance_name, + utils.CommaJoin(request_groups))) + else: + raise errors.ProgrammerError("Unhandled mode '%s'" % self.mode) + self.out_data = rdict + @staticmethod + def _NodesToGroups(node2group, groups, nodes): + """Returns a list of unique group names for a list of nodes. + + @type node2group: dict + @param node2group: Map from node name to group UUID + @type groups: dict + @param groups: Group information + @type nodes: list + @param nodes: Node names + + """ + result = set() + + for node in nodes: + try: + group_uuid = node2group[node] + except KeyError: + # Ignore unknown node + pass + else: + try: + group = groups[group_uuid] + except KeyError: + # Can't find group, let's use UUID + group_name = group_uuid + else: + group_name = group["name"] + + result.add(group_name) + + return sorted(result) + class LUTestAllocator(NoHooksLU): """Run allocator tests. @@ -11302,6 +12508,12 @@ class LUTestAllocator(NoHooksLU): if not hasattr(self.op, "evac_nodes"): raise errors.OpPrereqError("Missing attribute 'evac_nodes' on" " opcode input", errors.ECODE_INVAL) + elif self.op.mode == constants.IALLOCATOR_MODE_MRELOC: + if self.op.instances: + self.op.instances = _GetWantedInstances(self, self.op.instances) + else: + raise errors.OpPrereqError("Missing instances to relocate", + errors.ECODE_INVAL) else: raise errors.OpPrereqError("Invalid test allocator mode '%s'" % self.op.mode, errors.ECODE_INVAL) @@ -11341,6 +12553,12 @@ class LUTestAllocator(NoHooksLU): ial = IAllocator(self.cfg, self.rpc, mode=self.op.mode, evac_nodes=self.op.evac_nodes) + elif self.op.mode == constants.IALLOCATOR_MODE_MRELOC: + ial = IAllocator(self.cfg, self.rpc, + mode=self.op.mode, + instances=self.op.instances, + reloc_mode=self.op.reloc_mode, + target_groups=self.op.target_groups) else: raise errors.ProgrammerError("Uncatched mode %s in" " LUTestAllocator.Exec", self.op.mode) @@ -11358,13 +12576,16 @@ _QUERY_IMPL = { constants.QR_INSTANCE: _InstanceQuery, constants.QR_NODE: _NodeQuery, constants.QR_GROUP: _GroupQuery, + constants.QR_OS: _OsQuery, } +assert set(_QUERY_IMPL.keys()) == constants.QR_VIA_OP + def _GetQueryImplementation(name): """Returns the implemtnation for a query type. - @param name: Query type, must be one of L{constants.QR_OP_QUERY} + @param name: Query type, must be one of L{constants.QR_VIA_OP} """ try: