X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/9a6800e144302846f6f381d246a165bffe7f8e76..7ea7bcf6d21e8ebe6d24f0cc6f7852a04a28e202:/lib/cmdlib.py diff --git a/lib/cmdlib.py b/lib/cmdlib.py index c484343..5cf76ad 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -21,7 +21,10 @@ """Module implementing the master-side code.""" -# pylint: disable-msg=W0613,W0201 +# pylint: disable-msg=W0201 + +# W0201 since most LU attributes are defined in CheckPrereq or similar +# functions import os import os.path @@ -47,8 +50,8 @@ class LogicalUnit(object): Subclasses must follow these rules: - implement ExpandNames - - implement CheckPrereq - - implement Exec + - implement CheckPrereq (except when tasklets are used) + - implement Exec (except when tasklets are used) - implement BuildHooksEnv - redefine HPATH and HTYPE - optionally redefine their run requirements: @@ -87,17 +90,21 @@ class LogicalUnit(object): self.recalculate_locks = {} self.__ssh = None # logging - self.LogWarning = processor.LogWarning - self.LogInfo = processor.LogInfo - self.LogStep = processor.LogStep + self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103 + self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103 + self.LogStep = processor.LogStep # pylint: disable-msg=C0103 # support for dry-run self.dry_run_result = None + # Tasklets + self.tasklets = None + for attr_name in self._OP_REQP: attr_val = getattr(op, attr_name, None) if attr_val is None: raise errors.OpPrereqError("Required parameter '%s' missing" % - attr_name) + attr_name, errors.ECODE_INVAL) + self.CheckArguments() def __GetSSH(self): @@ -149,6 +156,10 @@ class LogicalUnit(object): level you can modify self.share_locks, setting a true value (usually 1) for that level. By default locks are not shared. + This function can also define a list of tasklets, which then will be + executed in order instead of the usual LU-level CheckPrereq and Exec + functions, if those are not defined by the LU. + Examples:: # Acquire all nodes and one instance @@ -205,7 +216,13 @@ class LogicalUnit(object): their canonical form if it hasn't been done by ExpandNames before. """ - raise NotImplementedError + if self.tasklets is not None: + for (idx, tl) in enumerate(self.tasklets): + logging.debug("Checking prerequisites for tasklet %s/%s", + idx + 1, len(self.tasklets)) + tl.CheckPrereq() + else: + raise NotImplementedError def Exec(self, feedback_fn): """Execute the LU. @@ -215,7 +232,12 @@ class LogicalUnit(object): code, or expected. """ - raise NotImplementedError + if self.tasklets is not None: + for (idx, tl) in enumerate(self.tasklets): + logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets)) + tl.Exec(feedback_fn) + else: + raise NotImplementedError def BuildHooksEnv(self): """Build hooks environment for this LU. @@ -258,6 +280,9 @@ class LogicalUnit(object): and hook results """ + # API must be kept, thus we ignore the unused argument and could + # be a function warnings + # pylint: disable-msg=W0613,R0201 return lu_result def _ExpandAndLockInstance(self): @@ -278,7 +303,7 @@ class LogicalUnit(object): expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name) if expanded_name is None: raise errors.OpPrereqError("Instance '%s' not known" % - self.op.instance_name) + self.op.instance_name, errors.ECODE_NOENT) self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name self.op.instance_name = expanded_name @@ -328,7 +353,7 @@ class LogicalUnit(object): del self.recalculate_locks[locking.LEVEL_NODE] -class NoHooksLU(LogicalUnit): +class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223 """Simple LU which runs no hooks. This LU is intended as a parent for other LogicalUnits which will @@ -338,6 +363,14 @@ class NoHooksLU(LogicalUnit): HPATH = None HTYPE = None + def BuildHooksEnv(self): + """Empty BuildHooksEnv for NoHooksLu. + + This just raises an error. + + """ + assert False, "BuildHooksEnv called for NoHooksLUs" + class Tasklet: """Tasklet base class. @@ -351,6 +384,13 @@ class Tasklet: - Implement Exec """ + def __init__(self, lu): + self.lu = lu + + # Shortcuts + self.cfg = lu.cfg + self.rpc = lu.rpc + def CheckPrereq(self): """Check prerequisites for this tasklets. @@ -391,7 +431,8 @@ def _GetWantedNodes(lu, nodes): """ if not isinstance(nodes, list): - raise errors.OpPrereqError("Invalid argument type 'nodes'") + raise errors.OpPrereqError("Invalid argument type 'nodes'", + errors.ECODE_INVAL) if not nodes: raise errors.ProgrammerError("_GetWantedNodes should only be called with a" @@ -401,7 +442,8 @@ def _GetWantedNodes(lu, nodes): for name in nodes: node = lu.cfg.ExpandNodeName(name) if node is None: - raise errors.OpPrereqError("No such node name '%s'" % name) + raise errors.OpPrereqError("No such node name '%s'" % name, + errors.ECODE_NOENT) wanted.append(node) return utils.NiceSort(wanted) @@ -421,7 +463,8 @@ def _GetWantedInstances(lu, instances): """ if not isinstance(instances, list): - raise errors.OpPrereqError("Invalid argument type 'instances'") + raise errors.OpPrereqError("Invalid argument type 'instances'", + errors.ECODE_INVAL) if instances: wanted = [] @@ -429,7 +472,8 @@ def _GetWantedInstances(lu, instances): for name in instances: instance = lu.cfg.ExpandInstanceName(name) if instance is None: - raise errors.OpPrereqError("No such instance name '%s'" % name) + raise errors.OpPrereqError("No such instance name '%s'" % name, + errors.ECODE_NOENT) wanted.append(instance) else: @@ -453,7 +497,7 @@ def _CheckOutputFields(static, dynamic, selected): delta = f.NonMatching(selected) if delta: raise errors.OpPrereqError("Unknown output fields selected: %s" - % ",".join(delta)) + % ",".join(delta), errors.ECODE_INVAL) def _CheckBooleanOpField(op, name): @@ -466,10 +510,25 @@ def _CheckBooleanOpField(op, name): val = getattr(op, name, None) if not (val is None or isinstance(val, bool)): raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" % - (name, str(val))) + (name, str(val)), errors.ECODE_INVAL) setattr(op, name, val) +def _CheckGlobalHvParams(params): + """Validates that given hypervisor params are not global ones. + + This will ensure that instances don't get customised versions of + global params. + + """ + used_globals = constants.HVC_GLOBALS.intersection(params) + if used_globals: + msg = ("The following hypervisor parameters are global and cannot" + " be customized at instance level, please modify them at" + " cluster level: %s" % utils.CommaJoin(used_globals)) + raise errors.OpPrereqError(msg, errors.ECODE_INVAL) + + def _CheckNodeOnline(lu, node): """Ensure that a given node is online. @@ -479,7 +538,8 @@ def _CheckNodeOnline(lu, node): """ if lu.cfg.GetNodeInfo(node).offline: - raise errors.OpPrereqError("Can't use offline node %s" % node) + raise errors.OpPrereqError("Can't use offline node %s" % node, + errors.ECODE_INVAL) def _CheckNodeNotDrained(lu, node): @@ -491,7 +551,8 @@ def _CheckNodeNotDrained(lu, node): """ if lu.cfg.GetNodeInfo(node).drained: - raise errors.OpPrereqError("Can't use drained node %s" % node) + raise errors.OpPrereqError("Can't use drained node %s" % node, + errors.ECODE_INVAL) def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, @@ -581,6 +642,7 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, return env + def _NICListToTuple(lu, nics): """Build a list of nic information tuples. @@ -604,6 +666,7 @@ def _NICListToTuple(lu, nics): hooks_nics.append((ip, mac, mode, link)) return hooks_nics + def _BuildInstanceHookEnvByObject(lu, instance, override=None): """Builds instance related env variables for hooks from an object. @@ -639,25 +702,36 @@ def _BuildInstanceHookEnvByObject(lu, instance, override=None): } if override: args.update(override) - return _BuildInstanceHookEnv(**args) + return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142 -def _AdjustCandidatePool(lu): +def _AdjustCandidatePool(lu, exceptions): """Adjust the candidate pool after node operations. """ - mod_list = lu.cfg.MaintainCandidatePool() + mod_list = lu.cfg.MaintainCandidatePool(exceptions) if mod_list: lu.LogInfo("Promoted nodes to master candidate role: %s", - ", ".join(node.name for node in mod_list)) + utils.CommaJoin(node.name for node in mod_list)) for name in mod_list: lu.context.ReaddNode(name) - mc_now, mc_max = lu.cfg.GetMasterCandidateStats() + mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions) if mc_now > mc_max: lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" % (mc_now, mc_max)) +def _DecideSelfPromotion(lu, exceptions=None): + """Decide whether I should promote myself as a master candidate. + + """ + cp_size = lu.cfg.GetClusterInfo().candidate_pool_size + mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions) + # the new node will increase mc_max with one, so: + mc_should = min(mc_should + 1, cp_size) + return mc_now < mc_should + + def _CheckNicsBridgesExist(lu, target_nics, target_node, profile=constants.PP_DEFAULT): """Check that the brigdes needed by a list of nics exist. @@ -671,7 +745,7 @@ def _CheckNicsBridgesExist(lu, target_nics, target_node, if brlist: result = lu.rpc.call_bridges_exist(target_node, brlist) result.Raise("Error checking bridges on destination node '%s'" % - target_node, prereq=True) + target_node, prereq=True, ecode=errors.ECODE_ENVIRON) def _CheckInstanceBridgesExist(lu, instance, node=None): @@ -683,12 +757,128 @@ def _CheckInstanceBridgesExist(lu, instance, node=None): _CheckNicsBridgesExist(lu, instance.nics, node) -class LUDestroyCluster(NoHooksLU): +def _CheckOSVariant(os_obj, name): + """Check whether an OS name conforms to the os variants specification. + + @type os_obj: L{objects.OS} + @param os_obj: OS object to check + @type name: string + @param name: OS name passed by the user, to check for validity + + """ + if not os_obj.supported_variants: + return + try: + variant = name.split("+", 1)[1] + except IndexError: + raise errors.OpPrereqError("OS name must include a variant", + errors.ECODE_INVAL) + + if variant not in os_obj.supported_variants: + raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL) + + +def _GetNodeInstancesInner(cfg, fn): + return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)] + + +def _GetNodeInstances(cfg, node_name): + """Returns a list of all primary and secondary instances on a node. + + """ + + return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes) + + +def _GetNodePrimaryInstances(cfg, node_name): + """Returns primary instances on a node. + + """ + return _GetNodeInstancesInner(cfg, + lambda inst: node_name == inst.primary_node) + + +def _GetNodeSecondaryInstances(cfg, node_name): + """Returns secondary instances on a node. + + """ + return _GetNodeInstancesInner(cfg, + lambda inst: node_name in inst.secondary_nodes) + + +def _GetStorageTypeArgs(cfg, storage_type): + """Returns the arguments for a storage type. + + """ + # Special case for file storage + if storage_type == constants.ST_FILE: + # storage.FileStorage wants a list of storage directories + return [[cfg.GetFileStorageDir()]] + + return [] + + +def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq): + faulty = [] + + for dev in instance.disks: + cfg.SetDiskID(dev, node_name) + + result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks) + result.Raise("Failed to get disk status from node %s" % node_name, + prereq=prereq, ecode=errors.ECODE_ENVIRON) + + for idx, bdev_status in enumerate(result.payload): + if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY: + faulty.append(idx) + + return faulty + + +class LUPostInitCluster(LogicalUnit): + """Logical unit for running hooks after cluster initialization. + + """ + HPATH = "cluster-init" + HTYPE = constants.HTYPE_CLUSTER + _OP_REQP = [] + + def BuildHooksEnv(self): + """Build hooks env. + + """ + env = {"OP_TARGET": self.cfg.GetClusterName()} + mn = self.cfg.GetMasterNode() + return env, [], [mn] + + def CheckPrereq(self): + """No prerequisites to check. + + """ + return True + + def Exec(self, feedback_fn): + """Nothing to do. + + """ + return True + + +class LUDestroyCluster(LogicalUnit): """Logical unit for destroying the cluster. """ + HPATH = "cluster-destroy" + HTYPE = constants.HTYPE_CLUSTER _OP_REQP = [] + def BuildHooksEnv(self): + """Build hooks env. + + """ + env = {"OP_TARGET": self.cfg.GetClusterName()} + return env, [], [] + def CheckPrereq(self): """Check prerequisites. @@ -702,22 +892,37 @@ class LUDestroyCluster(NoHooksLU): nodelist = self.cfg.GetNodeList() if len(nodelist) != 1 or nodelist[0] != master: raise errors.OpPrereqError("There are still %d node(s) in" - " this cluster." % (len(nodelist) - 1)) + " this cluster." % (len(nodelist) - 1), + errors.ECODE_INVAL) instancelist = self.cfg.GetInstanceList() if instancelist: raise errors.OpPrereqError("There are still %d instance(s) in" - " this cluster." % len(instancelist)) + " this cluster." % len(instancelist), + errors.ECODE_INVAL) def Exec(self, feedback_fn): """Destroys the cluster. """ master = self.cfg.GetMasterNode() + modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup + + # Run post hooks on master node before it's removed + hm = self.proc.hmclass(self.rpc.call_hooks_runner, self) + try: + hm.RunPhase(constants.HOOKS_PHASE_POST, [master]) + except: + # pylint: disable-msg=W0702 + self.LogWarning("Errors occurred running hooks on %s" % master) + result = self.rpc.call_node_stop_master(master, False) result.Raise("Could not disable the master role") - priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS) - utils.CreateBackup(priv_key) - utils.CreateBackup(pub_key) + + if modify_ssh_setup: + priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS) + utils.CreateBackup(priv_key) + utils.CreateBackup(pub_key) + return master @@ -727,9 +932,39 @@ class LUVerifyCluster(LogicalUnit): """ HPATH = "cluster-verify" HTYPE = constants.HTYPE_CLUSTER - _OP_REQP = ["skip_checks"] + _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"] REQ_BGL = False + TCLUSTER = "cluster" + TNODE = "node" + TINSTANCE = "instance" + + ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG") + EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE") + EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN") + EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT") + EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK") + EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK") + EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE") + ENODEDRBD = (TNODE, "ENODEDRBD") + ENODEFILECHECK = (TNODE, "ENODEFILECHECK") + ENODEHOOKS = (TNODE, "ENODEHOOKS") + ENODEHV = (TNODE, "ENODEHV") + ENODELVM = (TNODE, "ENODELVM") + ENODEN1 = (TNODE, "ENODEN1") + ENODENET = (TNODE, "ENODENET") + ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE") + ENODEORPHANLV = (TNODE, "ENODEORPHANLV") + ENODERPC = (TNODE, "ENODERPC") + ENODESSH = (TNODE, "ENODESSH") + ENODEVERSION = (TNODE, "ENODEVERSION") + ENODESETUP = (TNODE, "ENODESETUP") + ENODETIME = (TNODE, "ENODETIME") + + ETYPE_FIELD = "code" + ETYPE_ERROR = "ERROR" + ETYPE_WARNING = "WARNING" + def ExpandNames(self): self.needed_locks = { locking.LEVEL_NODE: locking.ALL_SET, @@ -737,9 +972,45 @@ class LUVerifyCluster(LogicalUnit): } self.share_locks = dict.fromkeys(locking.LEVELS, 1) + def _Error(self, ecode, item, msg, *args, **kwargs): + """Format an error message. + + Based on the opcode's error_codes parameter, either format a + parseable error code, or a simpler error string. + + This must be called only from Exec and functions called from Exec. + + """ + ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) + itype, etxt = ecode + # first complete the msg + if args: + msg = msg % args + # then format the whole message + if self.op.error_codes: + msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg) + else: + if item: + item = " " + item + else: + item = "" + msg = "%s: %s%s: %s" % (ltype, itype, item, msg) + # and finally report it via the feedback_fn + self._feedback_fn(" - %s" % msg) + + def _ErrorIf(self, cond, *args, **kwargs): + """Log an error message if the passed condition is True. + + """ + cond = bool(cond) or self.op.debug_simulate_errors + if cond: + self._Error(*args, **kwargs) + # do not mark the operation as failed for WARN cases only + if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR: + self.bad = self.bad or cond + def _VerifyNode(self, nodeinfo, file_list, local_cksum, - node_result, feedback_fn, master_files, - drbd_map, vg_name): + node_result, master_files, drbd_map, vg_name): """Run multiple tests against a node. Test list: @@ -754,7 +1025,6 @@ class LUVerifyCluster(LogicalUnit): @param file_list: required list of files @param local_cksum: dictionary of local files and their checksums @param node_result: the results from the node - @param feedback_fn: function used to accumulate results @param master_files: list of files that only masters should have @param drbd_map: the useddrbd minors for this node, in form of minor: (instance, must_exist) which correspond to instances @@ -763,138 +1033,154 @@ class LUVerifyCluster(LogicalUnit): """ node = nodeinfo.name + _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 # main result, node_result should be a non-empty dict - if not node_result or not isinstance(node_result, dict): - feedback_fn(" - ERROR: unable to verify node %s." % (node,)) - return True + test = not node_result or not isinstance(node_result, dict) + _ErrorIf(test, self.ENODERPC, node, + "unable to verify node: no data returned") + if test: + return # compares ganeti version local_version = constants.PROTOCOL_VERSION remote_version = node_result.get('version', None) - if not (remote_version and isinstance(remote_version, (list, tuple)) and - len(remote_version) == 2): - feedback_fn(" - ERROR: connection to %s failed" % (node)) - return True - - if local_version != remote_version[0]: - feedback_fn(" - ERROR: incompatible protocol versions: master %s," - " node %s %s" % (local_version, node, remote_version[0])) - return True + test = not (remote_version and + isinstance(remote_version, (list, tuple)) and + len(remote_version) == 2) + _ErrorIf(test, self.ENODERPC, node, + "connection to node returned invalid data") + if test: + return + + test = local_version != remote_version[0] + _ErrorIf(test, self.ENODEVERSION, node, + "incompatible protocol versions: master %s," + " node %s", local_version, remote_version[0]) + if test: + return # node seems compatible, we can actually try to look into its results - bad = False - # full package version - if constants.RELEASE_VERSION != remote_version[1]: - feedback_fn(" - WARNING: software version mismatch: master %s," - " node %s %s" % - (constants.RELEASE_VERSION, node, remote_version[1])) + self._ErrorIf(constants.RELEASE_VERSION != remote_version[1], + self.ENODEVERSION, node, + "software version mismatch: master %s, node %s", + constants.RELEASE_VERSION, remote_version[1], + code=self.ETYPE_WARNING) # checks vg existence and size > 20G if vg_name is not None: vglist = node_result.get(constants.NV_VGLIST, None) - if not vglist: - feedback_fn(" - ERROR: unable to check volume groups on node %s." % - (node,)) - bad = True - else: + test = not vglist + _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups") + if not test: vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name, constants.MIN_VG_SIZE) - if vgstatus: - feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node)) - bad = True + _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus) # checks config file checksum remote_cksum = node_result.get(constants.NV_FILELIST, None) - if not isinstance(remote_cksum, dict): - bad = True - feedback_fn(" - ERROR: node hasn't returned file checksum data") - else: + test = not isinstance(remote_cksum, dict) + _ErrorIf(test, self.ENODEFILECHECK, node, + "node hasn't returned file checksum data") + if not test: for file_name in file_list: node_is_mc = nodeinfo.master_candidate - must_have_file = file_name not in master_files - if file_name not in remote_cksum: - if node_is_mc or must_have_file: - bad = True - feedback_fn(" - ERROR: file '%s' missing" % file_name) - elif remote_cksum[file_name] != local_cksum[file_name]: - if node_is_mc or must_have_file: - bad = True - feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name) - else: - # not candidate and this is not a must-have file - bad = True - feedback_fn(" - ERROR: file '%s' should not exist on non master" - " candidates (and the file is outdated)" % file_name) - else: - # all good, except non-master/non-must have combination - if not node_is_mc and not must_have_file: - feedback_fn(" - ERROR: file '%s' should not exist on non master" - " candidates" % file_name) + must_have = (file_name not in master_files) or node_is_mc + # missing + test1 = file_name not in remote_cksum + # invalid checksum + test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name] + # existing and good + test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name] + _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node, + "file '%s' missing", file_name) + _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node, + "file '%s' has wrong checksum", file_name) + # not candidate and this is not a must-have file + _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node, + "file '%s' should not exist on non master" + " candidates (and the file is outdated)", file_name) + # all good, except non-master/non-must have combination + _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node, + "file '%s' should not exist" + " on non master candidates", file_name) # checks ssh to any - if constants.NV_NODELIST not in node_result: - bad = True - feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data") - else: + test = constants.NV_NODELIST not in node_result + _ErrorIf(test, self.ENODESSH, node, + "node hasn't returned node ssh connectivity data") + if not test: if node_result[constants.NV_NODELIST]: - bad = True - for node in node_result[constants.NV_NODELIST]: - feedback_fn(" - ERROR: ssh communication with node '%s': %s" % - (node, node_result[constants.NV_NODELIST][node])) - - if constants.NV_NODENETTEST not in node_result: - bad = True - feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data") - else: + for a_node, a_msg in node_result[constants.NV_NODELIST].items(): + _ErrorIf(True, self.ENODESSH, node, + "ssh communication with node '%s': %s", a_node, a_msg) + + test = constants.NV_NODENETTEST not in node_result + _ErrorIf(test, self.ENODENET, node, + "node hasn't returned node tcp connectivity data") + if not test: if node_result[constants.NV_NODENETTEST]: - bad = True nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys()) - for node in nlist: - feedback_fn(" - ERROR: tcp communication with node '%s': %s" % - (node, node_result[constants.NV_NODENETTEST][node])) + for anode in nlist: + _ErrorIf(True, self.ENODENET, node, + "tcp communication with node '%s': %s", + anode, node_result[constants.NV_NODENETTEST][anode]) hyp_result = node_result.get(constants.NV_HYPERVISOR, None) if isinstance(hyp_result, dict): for hv_name, hv_result in hyp_result.iteritems(): - if hv_result is not None: - feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" % - (hv_name, hv_result)) + test = hv_result is not None + _ErrorIf(test, self.ENODEHV, node, + "hypervisor %s verify failure: '%s'", hv_name, hv_result) # check used drbd list if vg_name is not None: used_minors = node_result.get(constants.NV_DRBDLIST, []) - if not isinstance(used_minors, (tuple, list)): - feedback_fn(" - ERROR: cannot parse drbd status file: %s" % - str(used_minors)) - else: + test = not isinstance(used_minors, (tuple, list)) + _ErrorIf(test, self.ENODEDRBD, node, + "cannot parse drbd status file: %s", str(used_minors)) + if not test: for minor, (iname, must_exist) in drbd_map.items(): - if minor not in used_minors and must_exist: - feedback_fn(" - ERROR: drbd minor %d of instance %s is" - " not active" % (minor, iname)) - bad = True + test = minor not in used_minors and must_exist + _ErrorIf(test, self.ENODEDRBD, node, + "drbd minor %d of instance %s is not active", + minor, iname) for minor in used_minors: - if minor not in drbd_map: - feedback_fn(" - ERROR: unallocated drbd minor %d is in use" % - minor) - bad = True - - return bad + test = minor not in drbd_map + _ErrorIf(test, self.ENODEDRBD, node, + "unallocated drbd minor %d is in use", minor) + test = node_result.get(constants.NV_NODESETUP, + ["Missing NODESETUP results"]) + _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s", + "; ".join(test)) + + # check pv names + if vg_name is not None: + pvlist = node_result.get(constants.NV_PVLIST, None) + test = pvlist is None + _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node") + if not test: + # check that ':' is not present in PV names, since it's a + # special character for lvcreate (denotes the range of PEs to + # use on the PV) + for _, pvname, owner_vg in pvlist: + test = ":" in pvname + _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV" + " '%s' of VG '%s'", pvname, owner_vg) def _VerifyInstance(self, instance, instanceconfig, node_vol_is, - node_instance, feedback_fn, n_offline): + node_instance, n_offline): """Verify an instance. This function checks to see if the required block devices are available on the instance's node. """ - bad = False - + _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 node_current = instanceconfig.primary_node node_vol_should = {} @@ -905,69 +1191,57 @@ class LUVerifyCluster(LogicalUnit): # ignore missing volumes on offline nodes continue for volume in node_vol_should[node]: - if node not in node_vol_is or volume not in node_vol_is[node]: - feedback_fn(" - ERROR: volume %s missing on node %s" % - (volume, node)) - bad = True + test = node not in node_vol_is or volume not in node_vol_is[node] + _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance, + "volume %s missing on node %s", volume, node) if instanceconfig.admin_up: - if ((node_current not in node_instance or - not instance in node_instance[node_current]) and - node_current not in n_offline): - feedback_fn(" - ERROR: instance %s not running on node %s" % - (instance, node_current)) - bad = True + test = ((node_current not in node_instance or + not instance in node_instance[node_current]) and + node_current not in n_offline) + _ErrorIf(test, self.EINSTANCEDOWN, instance, + "instance not running on its primary node %s", + node_current) for node in node_instance: if (not node == node_current): - if instance in node_instance[node]: - feedback_fn(" - ERROR: instance %s should not run on node %s" % - (instance, node)) - bad = True + test = instance in node_instance[node] + _ErrorIf(test, self.EINSTANCEWRONGNODE, instance, + "instance should not run on node %s", node) - return bad - - def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn): + def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is): """Verify if there are any unknown volumes in the cluster. The .os, .swap and backup volumes are ignored. All other volumes are reported as unknown. """ - bad = False - for node in node_vol_is: for volume in node_vol_is[node]: - if node not in node_vol_should or volume not in node_vol_should[node]: - feedback_fn(" - ERROR: volume %s on node %s should not exist" % - (volume, node)) - bad = True - return bad + test = (node not in node_vol_should or + volume not in node_vol_should[node]) + self._ErrorIf(test, self.ENODEORPHANLV, node, + "volume %s is unknown", volume) - def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn): + def _VerifyOrphanInstances(self, instancelist, node_instance): """Verify the list of running instances. This checks what instances are running but unknown to the cluster. """ - bad = False for node in node_instance: - for runninginstance in node_instance[node]: - if runninginstance not in instancelist: - feedback_fn(" - ERROR: instance %s on node %s should not exist" % - (runninginstance, node)) - bad = True - return bad - - def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn): + for o_inst in node_instance[node]: + test = o_inst not in instancelist + self._ErrorIf(test, self.ENODEORPHANINSTANCE, node, + "instance %s on node %s should not exist", o_inst, node) + + def _VerifyNPlusOneMemory(self, node_info, instance_cfg): """Verify N+1 Memory Resilience. Check that if one single node dies we can still start all the instances it was primary for. """ - bad = False - for node, nodeinfo in node_info.iteritems(): # This code checks that every node which is now listed as secondary has # enough memory to host all instances it is supposed to should a single @@ -983,11 +1257,10 @@ class LUVerifyCluster(LogicalUnit): bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance]) if bep[constants.BE_AUTO_BALANCE]: needed_mem += bep[constants.BE_MEMORY] - if nodeinfo['mfree'] < needed_mem: - feedback_fn(" - ERROR: not enough memory on node %s to accommodate" - " failovers should node %s fail" % (node, prinode)) - bad = True - return bad + test = nodeinfo['mfree'] < needed_mem + self._ErrorIf(test, self.ENODEN1, node, + "not enough memory on to accommodate" + " failovers should peer node %s fail", prinode) def CheckPrereq(self): """Check prerequisites. @@ -998,7 +1271,8 @@ class LUVerifyCluster(LogicalUnit): """ self.skip_set = frozenset(self.op.skip_checks) if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set): - raise errors.OpPrereqError("Invalid checks to be skipped specified") + raise errors.OpPrereqError("Invalid checks to be skipped specified", + errors.ECODE_INVAL) def BuildHooksEnv(self): """Build hooks env. @@ -1020,10 +1294,13 @@ class LUVerifyCluster(LogicalUnit): """Verify integrity of cluster, performing various test on nodes. """ - bad = False + self.bad = False + _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + verbose = self.op.verbose + self._feedback_fn = feedback_fn feedback_fn("* Verifying global settings") for msg in self.cfg.VerifyConfig(): - feedback_fn(" - ERROR: %s" % msg) + _ErrorIf(True, self.ECLUSTERCFG, None, msg) vg_name = self.cfg.GetVGName() hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors @@ -1064,23 +1341,36 @@ class LUVerifyCluster(LogicalUnit): constants.NV_INSTANCELIST: hypervisors, constants.NV_VERSION: None, constants.NV_HVINFO: self.cfg.GetHypervisorType(), + constants.NV_NODESETUP: None, + constants.NV_TIME: None, } + if vg_name is not None: node_verify_param[constants.NV_VGLIST] = None node_verify_param[constants.NV_LVLIST] = vg_name + node_verify_param[constants.NV_PVLIST] = [vg_name] node_verify_param[constants.NV_DRBDLIST] = None + + # Due to the way our RPC system works, exact response times cannot be + # guaranteed (e.g. a broken node could run into a timeout). By keeping the + # time before and after executing the request, we can at least have a time + # window. + nvinfo_starttime = time.time() all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param, self.cfg.GetClusterName()) + nvinfo_endtime = time.time() cluster = self.cfg.GetClusterInfo() master_node = self.cfg.GetMasterNode() all_drbd_map = self.cfg.ComputeDRBDMap() + feedback_fn("* Verifying node status") for node_i in nodeinfo: node = node_i.name if node_i.offline: - feedback_fn("* Skipping offline node %s" % (node,)) + if verbose: + feedback_fn("* Skipping offline node %s" % (node,)) n_offline.append(node) continue @@ -1093,62 +1383,81 @@ class LUVerifyCluster(LogicalUnit): n_drained.append(node) else: ntype = "regular" - feedback_fn("* Verifying node %s (%s)" % (node, ntype)) + if verbose: + feedback_fn("* Verifying node %s (%s)" % (node, ntype)) msg = all_nvinfo[node].fail_msg + _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg) if msg: - feedback_fn(" - ERROR: while contacting node %s: %s" % (node, msg)) - bad = True continue nresult = all_nvinfo[node].payload node_drbd = {} for minor, instance in all_drbd_map[node].items(): - if instance not in instanceinfo: - feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" % - instance) + test = instance not in instanceinfo + _ErrorIf(test, self.ECLUSTERCFG, None, + "ghost instance '%s' in temporary DRBD map", instance) # ghost instance should not be running, but otherwise we # don't give double warnings (both ghost instance and # unallocated minor in use) + if test: node_drbd[minor] = (instance, False) else: instance = instanceinfo[instance] node_drbd[minor] = (instance.name, instance.admin_up) - result = self._VerifyNode(node_i, file_names, local_checksums, - nresult, feedback_fn, master_files, - node_drbd, vg_name) - bad = bad or result + + self._VerifyNode(node_i, file_names, local_checksums, + nresult, master_files, node_drbd, vg_name) lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data") if vg_name is None: node_volume[node] = {} elif isinstance(lvdata, basestring): - feedback_fn(" - ERROR: LVM problem on node %s: %s" % - (node, utils.SafeEncode(lvdata))) - bad = True + _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s", + utils.SafeEncode(lvdata)) node_volume[node] = {} elif not isinstance(lvdata, dict): - feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,)) - bad = True + _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)") continue else: node_volume[node] = lvdata # node_instance idata = nresult.get(constants.NV_INSTANCELIST, None) - if not isinstance(idata, list): - feedback_fn(" - ERROR: connection to %s failed (instancelist)" % - (node,)) - bad = True + test = not isinstance(idata, list) + _ErrorIf(test, self.ENODEHV, node, + "rpc call to node failed (instancelist)") + if test: continue node_instance[node] = idata # node_info nodeinfo = nresult.get(constants.NV_HVINFO, None) - if not isinstance(nodeinfo, dict): - feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,)) - bad = True + test = not isinstance(nodeinfo, dict) + _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)") + if test: + continue + + # Node time + ntime = nresult.get(constants.NV_TIME, None) + try: + ntime_merged = utils.MergeTime(ntime) + except (ValueError, TypeError): + _ErrorIf(test, self.ENODETIME, node, "Node returned invalid time") + + if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW): + ntime_diff = abs(nvinfo_starttime - ntime_merged) + elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW): + ntime_diff = abs(ntime_merged - nvinfo_endtime) + else: + ntime_diff = None + + _ErrorIf(ntime_diff is not None, self.ENODETIME, node, + "Node time diverges by at least %0.1fs from master node time", + ntime_diff) + + if ntime_diff is not None: continue try: @@ -1166,28 +1475,28 @@ class LUVerifyCluster(LogicalUnit): } # FIXME: devise a free space model for file based instances as well if vg_name is not None: - if (constants.NV_VGLIST not in nresult or - vg_name not in nresult[constants.NV_VGLIST]): - feedback_fn(" - ERROR: node %s didn't return data for the" - " volume group '%s' - it is either missing or broken" % - (node, vg_name)) - bad = True + test = (constants.NV_VGLIST not in nresult or + vg_name not in nresult[constants.NV_VGLIST]) + _ErrorIf(test, self.ENODELVM, node, + "node didn't return data for the volume group '%s'" + " - it is either missing or broken", vg_name) + if test: continue node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name]) except (ValueError, KeyError): - feedback_fn(" - ERROR: invalid nodeinfo value returned" - " from node %s" % (node,)) - bad = True + _ErrorIf(True, self.ENODERPC, node, + "node returned invalid nodeinfo, check lvm/hypervisor") continue node_vol_should = {} + feedback_fn("* Verifying instance status") for instance in instancelist: - feedback_fn("* Verifying instance %s" % instance) + if verbose: + feedback_fn("* Verifying instance %s" % instance) inst_config = instanceinfo[instance] - result = self._VerifyInstance(instance, inst_config, node_volume, - node_instance, feedback_fn, n_offline) - bad = bad or result + self._VerifyInstance(instance, inst_config, node_volume, + node_instance, n_offline) inst_nodes_offline = [] inst_config.MapLVsByNode(node_vol_should) @@ -1195,12 +1504,11 @@ class LUVerifyCluster(LogicalUnit): instance_cfg[instance] = inst_config pnode = inst_config.primary_node + _ErrorIf(pnode not in node_info and pnode not in n_offline, + self.ENODERPC, pnode, "instance %s, connection to" + " primary node failed", instance) if pnode in node_info: node_info[pnode]['pinst'].append(instance) - elif pnode not in n_offline: - feedback_fn(" - ERROR: instance %s, connection to primary node" - " %s failed" % (instance, pnode)) - bad = True if pnode in n_offline: inst_nodes_offline.append(pnode) @@ -1212,46 +1520,42 @@ class LUVerifyCluster(LogicalUnit): # FIXME: does not support file-backed instances if len(inst_config.secondary_nodes) == 0: i_non_redundant.append(instance) - elif len(inst_config.secondary_nodes) > 1: - feedback_fn(" - WARNING: multiple secondaries for instance %s" - % instance) + _ErrorIf(len(inst_config.secondary_nodes) > 1, + self.EINSTANCELAYOUT, instance, + "instance has multiple secondary nodes", code="WARNING") if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]: i_non_a_balanced.append(instance) for snode in inst_config.secondary_nodes: + _ErrorIf(snode not in node_info and snode not in n_offline, + self.ENODERPC, snode, + "instance %s, connection to secondary node" + "failed", instance) + if snode in node_info: node_info[snode]['sinst'].append(instance) if pnode not in node_info[snode]['sinst-by-pnode']: node_info[snode]['sinst-by-pnode'][pnode] = [] node_info[snode]['sinst-by-pnode'][pnode].append(instance) - elif snode not in n_offline: - feedback_fn(" - ERROR: instance %s, connection to secondary node" - " %s failed" % (instance, snode)) - bad = True + if snode in n_offline: inst_nodes_offline.append(snode) - if inst_nodes_offline: - # warn that the instance lives on offline nodes, and set bad=True - feedback_fn(" - ERROR: instance lives on offline node(s) %s" % - ", ".join(inst_nodes_offline)) - bad = True + # warn that the instance lives on offline nodes + _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance, + "instance lives on offline node(s) %s", + utils.CommaJoin(inst_nodes_offline)) feedback_fn("* Verifying orphan volumes") - result = self._VerifyOrphanVolumes(node_vol_should, node_volume, - feedback_fn) - bad = bad or result + self._VerifyOrphanVolumes(node_vol_should, node_volume) feedback_fn("* Verifying remaining instances") - result = self._VerifyOrphanInstances(instancelist, node_instance, - feedback_fn) - bad = bad or result + self._VerifyOrphanInstances(instancelist, node_instance) if constants.VERIFY_NPLUSONE_MEM not in self.skip_set: feedback_fn("* Verifying N+1 Memory redundancy") - result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn) - bad = bad or result + self._VerifyNPlusOneMemory(node_info, instance_cfg) feedback_fn("* Other Notes") if i_non_redundant: @@ -1268,7 +1572,7 @@ class LUVerifyCluster(LogicalUnit): if n_drained: feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained)) - return not bad + return not self.bad def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result): """Analyze the post-hooks' result @@ -1291,33 +1595,28 @@ class LUVerifyCluster(LogicalUnit): # Used to change hooks' output to proper indentation indent_re = re.compile('^', re.M) feedback_fn("* Hooks Results") - if not hooks_results: - feedback_fn(" - ERROR: general communication failure") - lu_result = 1 - else: - for node_name in hooks_results: - show_node_header = True - res = hooks_results[node_name] - msg = res.fail_msg - if msg: - if res.offline: - # no need to warn or set fail return value - continue - feedback_fn(" Communication failure in hooks execution: %s" % - msg) + assert hooks_results, "invalid result from hooks" + + for node_name in hooks_results: + res = hooks_results[node_name] + msg = res.fail_msg + test = msg and not res.offline + self._ErrorIf(test, self.ENODEHOOKS, node_name, + "Communication failure in hooks execution: %s", msg) + if res.offline or msg: + # No need to investigate payload if node is offline or gave an error. + # override manually lu_result here as _ErrorIf only + # overrides self.bad + lu_result = 1 + continue + for script, hkr, output in res.payload: + test = hkr == constants.HKR_FAIL + self._ErrorIf(test, self.ENODEHOOKS, node_name, + "Script %s failed, output:", script) + if test: + output = indent_re.sub(' ', output) + feedback_fn("%s" % output) lu_result = 1 - continue - for script, hkr, output in res.payload: - if hkr == constants.HKR_FAIL: - # The node header is only shown once, if there are - # failing hooks on that node - if show_node_header: - feedback_fn(" Node %s:" % node_name) - show_node_header = False - feedback_fn(" ERROR: Script %s failed, output:" % script) - output = indent_re.sub(' ', output) - feedback_fn("%s" % output) - lu_result = 1 return lu_result @@ -1389,7 +1688,7 @@ class LUVerifyDisks(NoHooksLU): continue lvs = node_res.payload - for lv_name, (_, lv_inactive, lv_online) in lvs.items(): + for lv_name, (_, _, lv_online) in lvs.items(): inst = nv_dict.pop((node, lv_name), None) if (not lv_online and inst is not None and inst.name not in res_instances): @@ -1405,6 +1704,129 @@ class LUVerifyDisks(NoHooksLU): return result +class LURepairDiskSizes(NoHooksLU): + """Verifies the cluster disks sizes. + + """ + _OP_REQP = ["instances"] + REQ_BGL = False + + def ExpandNames(self): + if not isinstance(self.op.instances, list): + raise errors.OpPrereqError("Invalid argument type 'instances'", + errors.ECODE_INVAL) + + if self.op.instances: + self.wanted_names = [] + for name in self.op.instances: + full_name = self.cfg.ExpandInstanceName(name) + if full_name is None: + raise errors.OpPrereqError("Instance '%s' not known" % name, + errors.ECODE_NOENT) + self.wanted_names.append(full_name) + self.needed_locks = { + locking.LEVEL_NODE: [], + locking.LEVEL_INSTANCE: self.wanted_names, + } + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + else: + self.wanted_names = None + self.needed_locks = { + locking.LEVEL_NODE: locking.ALL_SET, + locking.LEVEL_INSTANCE: locking.ALL_SET, + } + self.share_locks = dict(((i, 1) for i in locking.LEVELS)) + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE and self.wanted_names is not None: + self._LockInstancesNodes(primary_only=True) + + def CheckPrereq(self): + """Check prerequisites. + + This only checks the optional instance list against the existing names. + + """ + if self.wanted_names is None: + self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE] + + self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name + in self.wanted_names] + + def _EnsureChildSizes(self, disk): + """Ensure children of the disk have the needed disk size. + + This is valid mainly for DRBD8 and fixes an issue where the + children have smaller disk size. + + @param disk: an L{ganeti.objects.Disk} object + + """ + if disk.dev_type == constants.LD_DRBD8: + assert disk.children, "Empty children for DRBD8?" + fchild = disk.children[0] + mismatch = fchild.size < disk.size + if mismatch: + self.LogInfo("Child disk has size %d, parent %d, fixing", + fchild.size, disk.size) + fchild.size = disk.size + + # and we recurse on this child only, not on the metadev + return self._EnsureChildSizes(fchild) or mismatch + else: + return False + + def Exec(self, feedback_fn): + """Verify the size of cluster disks. + + """ + # TODO: check child disks too + # TODO: check differences in size between primary/secondary nodes + per_node_disks = {} + for instance in self.wanted_instances: + pnode = instance.primary_node + if pnode not in per_node_disks: + per_node_disks[pnode] = [] + for idx, disk in enumerate(instance.disks): + per_node_disks[pnode].append((instance, idx, disk)) + + changed = [] + for node, dskl in per_node_disks.items(): + newl = [v[2].Copy() for v in dskl] + for dsk in newl: + self.cfg.SetDiskID(dsk, node) + result = self.rpc.call_blockdev_getsizes(node, newl) + if result.fail_msg: + self.LogWarning("Failure in blockdev_getsizes call to node" + " %s, ignoring", node) + continue + if len(result.data) != len(dskl): + self.LogWarning("Invalid result from node %s, ignoring node results", + node) + continue + for ((instance, idx, disk), size) in zip(dskl, result.data): + if size is None: + self.LogWarning("Disk %d of instance %s did not return size" + " information, ignoring", idx, instance.name) + continue + if not isinstance(size, (int, long)): + self.LogWarning("Disk %d of instance %s did not return valid" + " size information, ignoring", idx, instance.name) + continue + size = size >> 20 + if size != disk.size: + self.LogInfo("Disk %d of instance %s has mismatched size," + " correcting: recorded %d, actual %d", idx, + instance.name, disk.size, size) + disk.size = size + self.cfg.Update(instance, feedback_fn) + changed.append((instance.name, idx, size)) + if self._EnsureChildSizes(disk): + self.cfg.Update(instance, feedback_fn) + changed.append((instance.name, idx, disk.size)) + return changed + + class LURenameCluster(LogicalUnit): """Rename the cluster. @@ -1422,13 +1844,14 @@ class LURenameCluster(LogicalUnit): "NEW_NAME": self.op.name, } mn = self.cfg.GetMasterNode() - return env, [mn], [mn] + all_nodes = self.cfg.GetNodeList() + return env, [mn], all_nodes def CheckPrereq(self): """Verify that the passed name is a valid one. """ - hostname = utils.HostInfo(self.op.name) + hostname = utils.GetHostInfo(self.op.name) new_name = hostname.name self.ip = new_ip = hostname.ip @@ -1436,12 +1859,13 @@ class LURenameCluster(LogicalUnit): old_ip = self.cfg.GetMasterIP() if new_name == old_name and new_ip == old_ip: raise errors.OpPrereqError("Neither the name nor the IP address of the" - " cluster has changed") + " cluster has changed", + errors.ECODE_INVAL) if new_ip != old_ip: if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT): raise errors.OpPrereqError("The given cluster IP address (%s) is" " reachable on the network. Aborting." % - new_ip) + new_ip, errors.ECODE_NOTUNIQUE) self.op.name = new_name @@ -1461,7 +1885,7 @@ class LURenameCluster(LogicalUnit): cluster = self.cfg.GetClusterInfo() cluster.cluster_name = clustername cluster.master_ip = ip - self.cfg.Update(cluster) + self.cfg.Update(cluster, feedback_fn) # update the known hosts file ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE) @@ -1523,9 +1947,10 @@ class LUSetClusterParams(LogicalUnit): self.op.candidate_pool_size = int(self.op.candidate_pool_size) except (ValueError, TypeError), err: raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" % - str(err)) + str(err), errors.ECODE_INVAL) if self.op.candidate_pool_size < 1: - raise errors.OpPrereqError("At least one master candidate needed") + raise errors.OpPrereqError("At least one master candidate needed", + errors.ECODE_INVAL) def ExpandNames(self): # FIXME: in the future maybe other cluster params won't require checking on @@ -1559,7 +1984,8 @@ class LUSetClusterParams(LogicalUnit): for disk in inst.disks: if _RecursiveCheckIfLVMBased(disk): raise errors.OpPrereqError("Cannot disable lvm storage while" - " lvm-based instances exist") + " lvm-based instances exist", + errors.ECODE_INVAL) node_list = self.acquired_locks[locking.LEVEL_NODE] @@ -1578,7 +2004,7 @@ class LUSetClusterParams(LogicalUnit): constants.MIN_VG_SIZE) if vgstatus: raise errors.OpPrereqError("Error on node '%s': %s" % - (node, vgstatus)) + (node, vgstatus), errors.ECODE_ENVIRON) self.cluster = cluster = self.cfg.GetClusterInfo() # validate params changes @@ -1592,12 +2018,36 @@ class LUSetClusterParams(LogicalUnit): self.new_nicparams = objects.FillDict( cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams) objects.NIC.CheckParameterSyntax(self.new_nicparams) + nic_errors = [] + + # check all instances for consistency + for instance in self.cfg.GetAllInstancesInfo().values(): + for nic_idx, nic in enumerate(instance.nics): + params_copy = copy.deepcopy(nic.nicparams) + params_filled = objects.FillDict(self.new_nicparams, params_copy) + + # check parameter syntax + try: + objects.NIC.CheckParameterSyntax(params_filled) + except errors.ConfigurationError, err: + nic_errors.append("Instance %s, nic/%d: %s" % + (instance.name, nic_idx, err)) + + # if we're moving instances to routed, check that they have an ip + target_mode = params_filled[constants.NIC_MODE] + if target_mode == constants.NIC_MODE_ROUTED and not nic.ip: + nic_errors.append("Instance %s, nic/%d: routed nick with no ip" % + (instance.name, nic_idx)) + if nic_errors: + raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" % + "\n".join(nic_errors)) # hypervisor list/parameters self.new_hvparams = objects.FillDict(cluster.hvparams, {}) if self.op.hvparams: if not isinstance(self.op.hvparams, dict): - raise errors.OpPrereqError("Invalid 'hvparams' parameter on input") + raise errors.OpPrereqError("Invalid 'hvparams' parameter on input", + errors.ECODE_INVAL) for hv_name, hv_dict in self.op.hvparams.items(): if hv_name not in self.new_hvparams: self.new_hvparams[hv_name] = hv_dict @@ -1608,11 +2058,14 @@ class LUSetClusterParams(LogicalUnit): self.hv_list = self.op.enabled_hypervisors if not self.hv_list: raise errors.OpPrereqError("Enabled hypervisors list must contain at" - " least one member") + " least one member", + errors.ECODE_INVAL) invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES if invalid_hvs: raise errors.OpPrereqError("Enabled hypervisors contains invalid" - " entries: %s" % invalid_hvs) + " entries: %s" % + utils.CommaJoin(invalid_hvs), + errors.ECODE_INVAL) else: self.hv_list = cluster.enabled_hypervisors @@ -1653,9 +2106,9 @@ class LUSetClusterParams(LogicalUnit): if self.op.candidate_pool_size is not None: self.cluster.candidate_pool_size = self.op.candidate_pool_size # we need to update the pool size here, otherwise the save will fail - _AdjustCandidatePool(self) + _AdjustCandidatePool(self, []) - self.cfg.Update(self.cluster) + self.cfg.Update(self.cluster, feedback_fn) def _RedistributeAncillaryFiles(lu, additional_nodes=None): @@ -1676,6 +2129,7 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None): dist_nodes.extend(additional_nodes) if myself.name in dist_nodes: dist_nodes.remove(myself.name) + # 2. Gather files to distribute dist_files = set([constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE, @@ -1725,11 +2179,11 @@ class LURedistributeConfig(NoHooksLU): """Redistribute the configuration. """ - self.cfg.Update(self.cfg.GetClusterInfo()) + self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn) _RedistributeAncillaryFiles(self) -def _WaitForSync(lu, instance, oneshot=False, unlock=False): +def _WaitForSync(lu, instance, oneshot=False): """Sleep and poll for an instance's disk to sync. """ @@ -1744,6 +2198,8 @@ def _WaitForSync(lu, instance, oneshot=False, unlock=False): for dev in instance.disks: lu.cfg.SetDiskID(dev, node) + # TODO: Convert to utils.Retry + retries = 0 degr_retries = 10 # in seconds, as we sleep 1 second each time while True: @@ -1767,18 +2223,19 @@ def _WaitForSync(lu, instance, oneshot=False, unlock=False): lu.LogWarning("Can't compute data for node %s/%s", node, instance.disks[i].iv_name) continue - # we ignore the ldisk parameter - perc_done, est_time, is_degraded, _ = mstat - cumul_degraded = cumul_degraded or (is_degraded and perc_done is None) - if perc_done is not None: + + cumul_degraded = (cumul_degraded or + (mstat.is_degraded and mstat.sync_percent is None)) + if mstat.sync_percent is not None: done = False - if est_time is not None: - rem_time = "%d estimated seconds remaining" % est_time - max_time = est_time + if mstat.estimated_time is not None: + rem_time = "%d estimated seconds remaining" % mstat.estimated_time + max_time = mstat.estimated_time else: rem_time = "no time estimate" lu.proc.LogInfo("- device %s: %5.2f%% done, %s" % - (instance.disks[i].iv_name, perc_done, rem_time)) + (instance.disks[i].iv_name, mstat.sync_percent, + rem_time)) # if we're done but degraded, let's do a few small retries, to # make sure we see a stable and not transient situation; therefore @@ -1808,12 +2265,9 @@ def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False): """ lu.cfg.SetDiskID(dev, node) - if ldisk: - idx = 6 - else: - idx = 5 result = True + if on_primary or dev.AssembleOnSecondary(): rstats = lu.rpc.call_blockdev_find(node, dev) msg = rstats.fail_msg @@ -1824,7 +2278,11 @@ def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False): lu.LogWarning("Can't find disk on node %s", node) result = False else: - result = result and (not rstats.payload[idx]) + if ldisk: + result = result and rstats.payload.ldisk_status == constants.LDS_OKAY + else: + result = result and not rstats.payload.is_degraded + if dev.children: for child in dev.children: result = result and _CheckDiskConsistency(lu, child, node, on_primary) @@ -1839,11 +2297,14 @@ class LUDiagnoseOS(NoHooksLU): _OP_REQP = ["output_fields", "names"] REQ_BGL = False _FIELDS_STATIC = utils.FieldSet() - _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status") + _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants") + # Fields that need calculation of global os validity + _FIELDS_NEEDVALID = frozenset(["valid", "variants"]) def ExpandNames(self): if self.op.names: - raise errors.OpPrereqError("Selective OS query not supported") + raise errors.OpPrereqError("Selective OS query not supported", + errors.ECODE_INVAL) _CheckOutputFields(static=self._FIELDS_STATIC, dynamic=self._FIELDS_DYNAMIC, @@ -1862,10 +2323,9 @@ class LUDiagnoseOS(NoHooksLU): """ @staticmethod - def _DiagnoseByOS(node_list, rlist): + def _DiagnoseByOS(rlist): """Remaps a per-node return list into an a per-os per-node dictionary - @param node_list: a list with the names of all nodes @param rlist: a map with node names as keys and OS objects as values @rtype: dict @@ -1887,14 +2347,14 @@ class LUDiagnoseOS(NoHooksLU): for node_name, nr in rlist.items(): if nr.fail_msg or not nr.payload: continue - for name, path, status, diagnose in nr.payload: + for name, path, status, diagnose, variants in nr.payload: if name not in all_os: # build a list of nodes for this os containing empty lists # for each node in node_list all_os[name] = {} for nname in good_nodes: all_os[name][nname] = [] - all_os[name][node_name].append((path, status, diagnose)) + all_os[name][node_name].append((path, status, diagnose, variants)) return all_os def Exec(self, feedback_fn): @@ -1903,20 +2363,40 @@ class LUDiagnoseOS(NoHooksLU): """ valid_nodes = [node for node in self.cfg.GetOnlineNodeList()] node_data = self.rpc.call_os_diagnose(valid_nodes) - pol = self._DiagnoseByOS(valid_nodes, node_data) + pol = self._DiagnoseByOS(node_data) output = [] + calc_valid = self._FIELDS_NEEDVALID.intersection(self.op.output_fields) + calc_variants = "variants" in self.op.output_fields + for os_name, os_data in pol.items(): row = [] + if calc_valid: + valid = True + variants = None + for osl in os_data.values(): + valid = valid and osl and osl[0][1] + if not valid: + variants = None + break + if calc_variants: + node_variants = osl[0][3] + if variants is None: + variants = node_variants + else: + variants = [v for v in variants if v in node_variants] + for field in self.op.output_fields: if field == "name": val = os_name elif field == "valid": - val = utils.all([osl and osl[0][1] for osl in os_data.values()]) + val = valid elif field == "node_status": # this is just a copy of the dict val = {} for node_name, nos_list in os_data.items(): val[node_name] = nos_list + elif field == "variants": + val = variants else: raise errors.ParameterError(field) row.append(val) @@ -1945,7 +2425,11 @@ class LURemoveNode(LogicalUnit): "NODE_NAME": self.op.node_name, } all_nodes = self.cfg.GetNodeList() - all_nodes.remove(self.op.node_name) + try: + all_nodes.remove(self.op.node_name) + except ValueError: + logging.warning("Node %s which is about to be removed not found" + " in the all nodes list", self.op.node_name) return env, all_nodes, all_nodes def CheckPrereq(self): @@ -1961,20 +2445,23 @@ class LURemoveNode(LogicalUnit): """ node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name)) if node is None: - raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name) + raise errors.OpPrereqError("Node '%s' is unknown." % self.op.node_name, + errors.ECODE_NOENT) instance_list = self.cfg.GetInstanceList() masternode = self.cfg.GetMasterNode() if node.name == masternode: raise errors.OpPrereqError("Node is the master node," - " you need to failover first.") + " you need to failover first.", + errors.ECODE_INVAL) for instance_name in instance_list: instance = self.cfg.GetInstanceInfo(instance_name) if node.name in instance.all_nodes: raise errors.OpPrereqError("Instance %s is still running on the node," - " please remove first." % instance_name) + " please remove first." % instance_name, + errors.ECODE_INVAL) self.op.node_name = node.name self.node = node @@ -1986,24 +2473,38 @@ class LURemoveNode(LogicalUnit): logging.info("Stopping the node daemon and removing configs from node %s", node.name) + modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup + + # Promote nodes to master candidate as needed + _AdjustCandidatePool(self, exceptions=[node.name]) self.context.RemoveNode(node.name) - result = self.rpc.call_node_leave_cluster(node.name) + # Run post hooks on the node before it's removed + hm = self.proc.hmclass(self.rpc.call_hooks_runner, self) + try: + hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name]) + except: + # pylint: disable-msg=W0702 + self.LogWarning("Errors occurred running hooks on %s" % node.name) + + result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup) msg = result.fail_msg if msg: self.LogWarning("Errors encountered on the remote node while leaving" " the cluster: %s", msg) - # Promote nodes to master candidate as needed - _AdjustCandidatePool(self) - class LUQueryNodes(NoHooksLU): """Logical unit for querying nodes. """ + # pylint: disable-msg=W0142 _OP_REQP = ["output_fields", "names", "use_locking"] REQ_BGL = False + + _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid", + "master_candidate", "offline", "drained"] + _FIELDS_DYNAMIC = utils.FieldSet( "dtotal", "dfree", "mtotal", "mnode", "mfree", @@ -2011,16 +2512,12 @@ class LUQueryNodes(NoHooksLU): "ctotal", "cnodes", "csockets", ) - _FIELDS_STATIC = utils.FieldSet( - "name", "pinst_cnt", "sinst_cnt", + _FIELDS_STATIC = utils.FieldSet(*[ + "pinst_cnt", "sinst_cnt", "pinst_list", "sinst_list", "pip", "sip", "tags", - "serial_no", - "master_candidate", "master", - "offline", - "drained", - "role", + "role"] + _SIMPLE_FIELDS ) def ExpandNames(self): @@ -2042,7 +2539,6 @@ class LUQueryNodes(NoHooksLU): # if we don't request only static fields, we need to lock the nodes self.needed_locks[locking.LEVEL_NODE] = self.wanted - def CheckPrereq(self): """Check prerequisites. @@ -2103,10 +2599,9 @@ class LUQueryNodes(NoHooksLU): inst_fields = frozenset(("pinst_cnt", "pinst_list", "sinst_cnt", "sinst_list")) if inst_fields & frozenset(self.op.output_fields): - instancelist = self.cfg.GetInstanceList() + inst_data = self.cfg.GetAllInstancesInfo() - for instance_name in instancelist: - inst = self.cfg.GetInstanceInfo(instance_name) + for inst in inst_data.values(): if inst.primary_node in node_to_primary: node_to_primary[inst.primary_node].add(inst.name) for secnode in inst.secondary_nodes: @@ -2121,8 +2616,8 @@ class LUQueryNodes(NoHooksLU): for node in nodelist: node_output = [] for field in self.op.output_fields: - if field == "name": - val = node.name + if field in self._SIMPLE_FIELDS: + val = getattr(node, field) elif field == "pinst_list": val = list(node_to_primary[node.name]) elif field == "sinst_list": @@ -2137,16 +2632,8 @@ class LUQueryNodes(NoHooksLU): val = node.secondary_ip elif field == "tags": val = list(node.GetTags()) - elif field == "serial_no": - val = node.serial_no - elif field == "master_candidate": - val = node.master_candidate elif field == "master": val = node.name == master_node - elif field == "offline": - val = node.offline - elif field == "drained": - val = node.drained elif self._FIELDS_DYNAMIC.Matches(field): val = live_data[node.name].get(field, None) elif field == "role": @@ -2254,34 +2741,188 @@ class LUQueryNodeVolumes(NoHooksLU): return output -class LUAddNode(LogicalUnit): - """Logical unit for adding node to the cluster. +class LUQueryNodeStorage(NoHooksLU): + """Logical unit for getting information on storage units on node(s). """ - HPATH = "node-add" - HTYPE = constants.HTYPE_NODE - _OP_REQP = ["node_name"] + _OP_REQP = ["nodes", "storage_type", "output_fields"] + REQ_BGL = False + _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE) - def BuildHooksEnv(self): - """Build hooks env. + def ExpandNames(self): + storage_type = self.op.storage_type - This will run on all nodes before, and on all nodes + the new node after. + if storage_type not in constants.VALID_STORAGE_TYPES: + raise errors.OpPrereqError("Unknown storage type: %s" % storage_type, + errors.ECODE_INVAL) - """ - env = { - "OP_TARGET": self.op.node_name, - "NODE_NAME": self.op.node_name, - "NODE_PIP": self.op.primary_ip, - "NODE_SIP": self.op.secondary_ip, - } - nodes_0 = self.cfg.GetNodeList() - nodes_1 = nodes_0 + [self.op.node_name, ] - return env, nodes_0, nodes_1 + _CheckOutputFields(static=self._FIELDS_STATIC, + dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS), + selected=self.op.output_fields) + + self.needed_locks = {} + self.share_locks[locking.LEVEL_NODE] = 1 + + if self.op.nodes: + self.needed_locks[locking.LEVEL_NODE] = \ + _GetWantedNodes(self, self.op.nodes) + else: + self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET def CheckPrereq(self): """Check prerequisites. - This checks: + This checks that the fields required are valid output fields. + + """ + self.op.name = getattr(self.op, "name", None) + + self.nodes = self.acquired_locks[locking.LEVEL_NODE] + + def Exec(self, feedback_fn): + """Computes the list of nodes and their attributes. + + """ + # Always get name to sort by + if constants.SF_NAME in self.op.output_fields: + fields = self.op.output_fields[:] + else: + fields = [constants.SF_NAME] + self.op.output_fields + + # Never ask for node or type as it's only known to the LU + for extra in [constants.SF_NODE, constants.SF_TYPE]: + while extra in fields: + fields.remove(extra) + + field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)]) + name_idx = field_idx[constants.SF_NAME] + + st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) + data = self.rpc.call_storage_list(self.nodes, + self.op.storage_type, st_args, + self.op.name, fields) + + result = [] + + for node in utils.NiceSort(self.nodes): + nresult = data[node] + if nresult.offline: + continue + + msg = nresult.fail_msg + if msg: + self.LogWarning("Can't get storage data from node %s: %s", node, msg) + continue + + rows = dict([(row[name_idx], row) for row in nresult.payload]) + + for name in utils.NiceSort(rows.keys()): + row = rows[name] + + out = [] + + for field in self.op.output_fields: + if field == constants.SF_NODE: + val = node + elif field == constants.SF_TYPE: + val = self.op.storage_type + elif field in field_idx: + val = row[field_idx[field]] + else: + raise errors.ParameterError(field) + + out.append(val) + + result.append(out) + + return result + + +class LUModifyNodeStorage(NoHooksLU): + """Logical unit for modifying a storage volume on a node. + + """ + _OP_REQP = ["node_name", "storage_type", "name", "changes"] + REQ_BGL = False + + def CheckArguments(self): + node_name = self.cfg.ExpandNodeName(self.op.node_name) + if node_name is None: + raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name, + errors.ECODE_NOENT) + + self.op.node_name = node_name + + storage_type = self.op.storage_type + if storage_type not in constants.VALID_STORAGE_TYPES: + raise errors.OpPrereqError("Unknown storage type: %s" % storage_type, + errors.ECODE_INVAL) + + def ExpandNames(self): + self.needed_locks = { + locking.LEVEL_NODE: self.op.node_name, + } + + def CheckPrereq(self): + """Check prerequisites. + + """ + storage_type = self.op.storage_type + + try: + modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type] + except KeyError: + raise errors.OpPrereqError("Storage units of type '%s' can not be" + " modified" % storage_type, + errors.ECODE_INVAL) + + diff = set(self.op.changes.keys()) - modifiable + if diff: + raise errors.OpPrereqError("The following fields can not be modified for" + " storage units of type '%s': %r" % + (storage_type, list(diff)), + errors.ECODE_INVAL) + + def Exec(self, feedback_fn): + """Computes the list of nodes and their attributes. + + """ + st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) + result = self.rpc.call_storage_modify(self.op.node_name, + self.op.storage_type, st_args, + self.op.name, self.op.changes) + result.Raise("Failed to modify storage unit '%s' on %s" % + (self.op.name, self.op.node_name)) + + +class LUAddNode(LogicalUnit): + """Logical unit for adding node to the cluster. + + """ + HPATH = "node-add" + HTYPE = constants.HTYPE_NODE + _OP_REQP = ["node_name"] + + def BuildHooksEnv(self): + """Build hooks env. + + This will run on all nodes before, and on all nodes + the new node after. + + """ + env = { + "OP_TARGET": self.op.node_name, + "NODE_NAME": self.op.node_name, + "NODE_PIP": self.op.primary_ip, + "NODE_SIP": self.op.secondary_ip, + } + nodes_0 = self.cfg.GetNodeList() + nodes_1 = nodes_0 + [self.op.node_name, ] + return env, nodes_0, nodes_1 + + def CheckPrereq(self): + """Check prerequisites. + + This checks: - the new node is not already in the config - it is resolvable - its parameters (single/dual homed) matches the cluster @@ -2292,7 +2933,7 @@ class LUAddNode(LogicalUnit): node_name = self.op.node_name cfg = self.cfg - dns_data = utils.HostInfo(node_name) + dns_data = utils.GetHostInfo(node_name) node = dns_data.name primary_ip = self.op.primary_ip = dns_data.ip @@ -2300,15 +2941,17 @@ class LUAddNode(LogicalUnit): if secondary_ip is None: secondary_ip = primary_ip if not utils.IsValidIP(secondary_ip): - raise errors.OpPrereqError("Invalid secondary IP given") + raise errors.OpPrereqError("Invalid secondary IP given", + errors.ECODE_INVAL) self.op.secondary_ip = secondary_ip node_list = cfg.GetNodeList() if not self.op.readd and node in node_list: raise errors.OpPrereqError("Node %s is already in the configuration" % - node) + node, errors.ECODE_EXISTS) elif self.op.readd and node not in node_list: - raise errors.OpPrereqError("Node %s is not in the configuration" % node) + raise errors.OpPrereqError("Node %s is not in the configuration" % node, + errors.ECODE_NOENT) for existing_node_name in node_list: existing_node = cfg.GetNodeInfo(existing_node_name) @@ -2317,7 +2960,8 @@ class LUAddNode(LogicalUnit): if (existing_node.primary_ip != primary_ip or existing_node.secondary_ip != secondary_ip): raise errors.OpPrereqError("Readded node doesn't have the same IP" - " address configuration as before") + " address configuration as before", + errors.ECODE_INVAL) continue if (existing_node.primary_ip == primary_ip or @@ -2325,7 +2969,8 @@ class LUAddNode(LogicalUnit): existing_node.primary_ip == secondary_ip or existing_node.secondary_ip == secondary_ip): raise errors.OpPrereqError("New node ip address(es) conflict with" - " existing node %s" % existing_node.name) + " existing node %s" % existing_node.name, + errors.ECODE_NOTUNIQUE) # check that the type of the node (single versus dual homed) is the # same as for the master @@ -2335,31 +2980,32 @@ class LUAddNode(LogicalUnit): if master_singlehomed != newbie_singlehomed: if master_singlehomed: raise errors.OpPrereqError("The master has no private ip but the" - " new node has one") + " new node has one", + errors.ECODE_INVAL) else: raise errors.OpPrereqError("The master has a private ip but the" - " new node doesn't have one") + " new node doesn't have one", + errors.ECODE_INVAL) # checks reachability if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT): - raise errors.OpPrereqError("Node not reachable by ping") + raise errors.OpPrereqError("Node not reachable by ping", + errors.ECODE_ENVIRON) if not newbie_singlehomed: # check reachability from my secondary ip to newbie's secondary ip if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT, source=myself.secondary_ip): raise errors.OpPrereqError("Node secondary ip not reachable by TCP" - " based ping to noded port") + " based ping to noded port", + errors.ECODE_ENVIRON) - cp_size = self.cfg.GetClusterInfo().candidate_pool_size if self.op.readd: exceptions = [node] else: exceptions = [] - mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions) - # the new node will increase mc_max with one, so: - mc_max = min(mc_max + 1, cp_size) - self.master_candidate = mc_now < mc_max + + self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions) if self.op.readd: self.new_node = self.cfg.GetNodeInfo(node) @@ -2383,7 +3029,7 @@ class LUAddNode(LogicalUnit): # later in the procedure; this also means that if the re-add # fails, we are left with a non-offlined, broken node if self.op.readd: - new_node.drained = new_node.offline = False + new_node.drained = new_node.offline = False # pylint: disable-msg=W0201 self.LogInfo("Readding a node, the offline/drained flags were reset") # if we demote the node, we do cleanup later in the procedure new_node.master_candidate = self.master_candidate @@ -2404,24 +3050,21 @@ class LUAddNode(LogicalUnit): (constants.PROTOCOL_VERSION, result.payload)) # setup ssh on node - logging.info("Copy ssh key to node %s", node) - priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS) - keyarray = [] - keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB, - constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB, - priv_key, pub_key] - - for i in keyfiles: - f = open(i, 'r') - try: - keyarray.append(f.read()) - finally: - f.close() - - result = self.rpc.call_node_add(node, keyarray[0], keyarray[1], - keyarray[2], - keyarray[3], keyarray[4], keyarray[5]) - result.Raise("Cannot transfer ssh keys to the new node") + if self.cfg.GetClusterInfo().modify_ssh_setup: + logging.info("Copy ssh key to node %s", node) + priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS) + keyarray = [] + keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB, + constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB, + priv_key, pub_key] + + for i in keyfiles: + keyarray.append(utils.ReadFile(i)) + + result = self.rpc.call_node_add(node, keyarray[0], keyarray[1], + keyarray[2], keyarray[3], keyarray[4], + keyarray[5]) + result.Raise("Cannot transfer ssh keys to the new node") # Add node to our /etc/hosts, and add key to known_hosts if self.cfg.GetClusterInfo().modify_etc_hosts: @@ -2431,7 +3074,7 @@ class LUAddNode(LogicalUnit): result = self.rpc.call_node_has_ip_address(new_node.name, new_node.secondary_ip) result.Raise("Failure checking secondary ip on node %s" % new_node.name, - prereq=True) + prereq=True, ecode=errors.ECODE_ENVIRON) if not result.payload: raise errors.OpExecError("Node claims it doesn't have the secondary ip" " you gave (%s). Please fix and re-run this" @@ -2439,7 +3082,7 @@ class LUAddNode(LogicalUnit): node_verify_list = [self.cfg.GetMasterNode()] node_verify_param = { - 'nodelist': [node], + constants.NV_NODELIST: [node], # TODO: do a node-net-test as well? } @@ -2447,10 +3090,11 @@ class LUAddNode(LogicalUnit): self.cfg.GetClusterName()) for verifier in node_verify_list: result[verifier].Raise("Cannot communicate with node %s" % verifier) - nl_payload = result[verifier].payload['nodelist'] + nl_payload = result[verifier].payload[constants.NV_NODELIST] if nl_payload: for failed in nl_payload: - feedback_fn("ssh/hostname verification failed %s -> %s" % + feedback_fn("ssh/hostname verification failed" + " (checking from %s): %s" % (verifier, nl_payload[failed])) raise errors.OpExecError("ssh/hostname verification failed.") @@ -2458,17 +3102,17 @@ class LUAddNode(LogicalUnit): _RedistributeAncillaryFiles(self) self.context.ReaddNode(new_node) # make sure we redistribute the config - self.cfg.Update(new_node) + self.cfg.Update(new_node, feedback_fn) # and make sure the new node will not have old files around if not new_node.master_candidate: result = self.rpc.call_node_demote_from_mc(new_node.name) - msg = result.RemoteFailMsg() + msg = result.fail_msg if msg: self.LogWarning("Node failed to demote itself from master" " candidate status: %s" % msg) else: _RedistributeAncillaryFiles(self, additional_nodes=[node]) - self.context.AddNode(new_node) + self.context.AddNode(new_node, self.proc.GetECId()) class LUSetNodeParams(LogicalUnit): @@ -2483,17 +3127,20 @@ class LUSetNodeParams(LogicalUnit): def CheckArguments(self): node_name = self.cfg.ExpandNodeName(self.op.node_name) if node_name is None: - raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name) + raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name, + errors.ECODE_INVAL) self.op.node_name = node_name _CheckBooleanOpField(self.op, 'master_candidate') _CheckBooleanOpField(self.op, 'offline') _CheckBooleanOpField(self.op, 'drained') all_mods = [self.op.offline, self.op.master_candidate, self.op.drained] if all_mods.count(None) == 3: - raise errors.OpPrereqError("Please pass at least one modification") + raise errors.OpPrereqError("Please pass at least one modification", + errors.ECODE_INVAL) if all_mods.count(True) > 1: raise errors.OpPrereqError("Can't set the node into more than one" - " state at the same time") + " state at the same time", + errors.ECODE_INVAL) def ExpandNames(self): self.needed_locks = {locking.LEVEL_NODE: self.op.node_name} @@ -2522,27 +3169,48 @@ class LUSetNodeParams(LogicalUnit): """ node = self.node = self.cfg.GetNodeInfo(self.op.node_name) - if ((self.op.master_candidate == False or self.op.offline == True or - self.op.drained == True) and node.master_candidate): - # we will demote the node from master_candidate + if (self.op.master_candidate is not None or + self.op.drained is not None or + self.op.offline is not None): + # we can't change the master's node flags if self.op.node_name == self.cfg.GetMasterNode(): - raise errors.OpPrereqError("The master node has to be a" - " master candidate, online and not drained") + raise errors.OpPrereqError("The master role can be changed" + " only via masterfailover", + errors.ECODE_INVAL) + + # Boolean value that tells us whether we're offlining or draining the node + offline_or_drain = self.op.offline == True or self.op.drained == True + deoffline_or_drain = self.op.offline == False or self.op.drained == False + + if (node.master_candidate and + (self.op.master_candidate == False or offline_or_drain)): cp_size = self.cfg.GetClusterInfo().candidate_pool_size - num_candidates, _ = self.cfg.GetMasterCandidateStats() - if num_candidates <= cp_size: + mc_now, mc_should, mc_max = self.cfg.GetMasterCandidateStats() + if mc_now <= cp_size: msg = ("Not enough master candidates (desired" - " %d, new value will be %d)" % (cp_size, num_candidates-1)) - if self.op.force: + " %d, new value will be %d)" % (cp_size, mc_now-1)) + # Only allow forcing the operation if it's an offline/drain operation, + # and we could not possibly promote more nodes. + # FIXME: this can still lead to issues if in any way another node which + # could be promoted appears in the meantime. + if self.op.force and offline_or_drain and mc_should == mc_max: self.LogWarning(msg) else: - raise errors.OpPrereqError(msg) + raise errors.OpPrereqError(msg, errors.ECODE_INVAL) if (self.op.master_candidate == True and ((node.offline and not self.op.offline == False) or (node.drained and not self.op.drained == False))): raise errors.OpPrereqError("Node '%s' is offline or drained, can't set" - " to master_candidate" % node.name) + " to master_candidate" % node.name, + errors.ECODE_INVAL) + + # If we're being deofflined/drained, we'll MC ourself if needed + if (deoffline_or_drain and not offline_or_drain and not + self.op.master_candidate == True and not node.master_candidate): + self.op.master_candidate = _DecideSelfPromotion(self) + if self.op.master_candidate: + self.LogInfo("Autopromoting node to master candidate") return @@ -2586,7 +3254,7 @@ class LUSetNodeParams(LogicalUnit): changed_mc = True result.append(("master_candidate", "auto-demotion due to drain")) rrc = self.rpc.call_node_demote_from_mc(node.name) - msg = rrc.RemoteFailMsg() + msg = rrc.fail_msg if msg: self.LogWarning("Node failed to demote itself: %s" % msg) if node.offline: @@ -2594,7 +3262,7 @@ class LUSetNodeParams(LogicalUnit): result.append(("offline", "clear offline status due to drain")) # this will trigger configuration file update, if needed - self.cfg.Update(node) + self.cfg.Update(node, feedback_fn) # this will trigger job queue propagation or cleanup if changed_mc: self.context.ReaddNode(node) @@ -2612,16 +3280,18 @@ class LUPowercycleNode(NoHooksLU): def CheckArguments(self): node_name = self.cfg.ExpandNodeName(self.op.node_name) if node_name is None: - raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name) + raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name, + errors.ECODE_NOENT) self.op.node_name = node_name if node_name == self.cfg.GetMasterNode() and not self.op.force: raise errors.OpPrereqError("The node is the master and the force" - " parameter was not set") + " parameter was not set", + errors.ECODE_INVAL) def ExpandNames(self): """Locking for PowercycleNode. - This is a last-resource option and shouldn't block on other + This is a last-resort option and shouldn't block on other jobs. Therefore, we grab no locks. """ @@ -2685,6 +3355,10 @@ class LUQueryClusterInfo(NoHooksLU): "master_netdev": cluster.master_netdev, "volume_group_name": cluster.volume_group_name, "file_storage_dir": cluster.file_storage_dir, + "ctime": cluster.ctime, + "mtime": cluster.mtime, + "uuid": cluster.uuid, + "tags": list(cluster.GetTags()), } return result @@ -2697,7 +3371,8 @@ class LUQueryConfigValues(NoHooksLU): _OP_REQP = [] REQ_BGL = False _FIELDS_DYNAMIC = utils.FieldSet() - _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag") + _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag", + "watcher_pause") def ExpandNames(self): self.needed_locks = {} @@ -2724,6 +3399,8 @@ class LUQueryConfigValues(NoHooksLU): entry = self.cfg.GetMasterNode() elif field == "drain_flag": entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE) + elif field == "watcher_pause": + return utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE) else: raise errors.ParameterError(field) values.append(entry) @@ -2756,19 +3433,24 @@ class LUActivateInstanceDisks(NoHooksLU): assert self.instance is not None, \ "Cannot retrieve locked instance %s" % self.op.instance_name _CheckNodeOnline(self, self.instance.primary_node) + if not hasattr(self.op, "ignore_size"): + self.op.ignore_size = False def Exec(self, feedback_fn): """Activate the disks. """ - disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance) + disks_ok, disks_info = \ + _AssembleInstanceDisks(self, self.instance, + ignore_size=self.op.ignore_size) if not disks_ok: raise errors.OpExecError("Cannot activate block devices") return disks_info -def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False): +def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False, + ignore_size=False): """Prepare the block devices for an instance. This sets up the block devices on all nodes. @@ -2780,6 +3462,10 @@ def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False): @type ignore_secondaries: boolean @param ignore_secondaries: if true, errors on secondary nodes won't result in an error return from the function + @type ignore_size: boolean + @param ignore_size: if true, the current known size of the disk + will not be used during the disk activation, useful for cases + when the size is wrong @return: False if the operation failed, otherwise a list of (host, instance_visible_name, node_visible_name) with the mapping from node devices to instance devices @@ -2800,6 +3486,9 @@ def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False): # 1st pass, assemble on all nodes in secondary mode for inst_disk in instance.disks: for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node): + if ignore_size: + node_disk = node_disk.Copy() + node_disk.UnsetSize() lu.cfg.SetDiskID(node_disk, node) result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False) msg = result.fail_msg @@ -2814,9 +3503,14 @@ def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False): # 2nd pass, do only the primary node for inst_disk in instance.disks: + dev_path = None + for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node): if node != instance.primary_node: continue + if ignore_size: + node_disk = node_disk.Copy() + node_disk.UnsetSize() lu.cfg.SetDiskID(node_disk, node) result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True) msg = result.fail_msg @@ -2825,8 +3519,10 @@ def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False): " (is_primary=True, pass=2): %s", inst_disk.iv_name, node, msg) disks_ok = False - device_info.append((instance.primary_node, inst_disk.iv_name, - result.payload)) + else: + dev_path = result.payload + + device_info.append((instance.primary_node, inst_disk.iv_name, dev_path)) # leave the disks configured for the primary node # this is a workaround that would be fixed better by @@ -2950,15 +3646,18 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name): """ nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name) - nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True) + nodeinfo[node].Raise("Can't get data from node %s" % node, + prereq=True, ecode=errors.ECODE_ENVIRON) free_mem = nodeinfo[node].payload.get('memory_free', None) if not isinstance(free_mem, int): raise errors.OpPrereqError("Can't compute free memory on node %s, result" - " was '%s'" % (node, free_mem)) + " was '%s'" % (node, free_mem), + errors.ECODE_ENVIRON) if requested > free_mem: raise errors.OpPrereqError("Not enough memory on node %s for %s:" " needed %s MiB, available %s MiB" % - (node, reason, requested, free_mem)) + (node, reason, requested, free_mem), + errors.ECODE_NORES) class LUStartupInstance(LogicalUnit): @@ -3001,7 +3700,8 @@ class LUStartupInstance(LogicalUnit): if self.beparams: if not isinstance(self.beparams, dict): raise errors.OpPrereqError("Invalid beparams passed: %s, expected" - " dict" % (type(self.beparams), )) + " dict" % (type(self.beparams), ), + errors.ECODE_INVAL) # fill the beparams dict utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES) self.op.beparams = self.beparams @@ -3011,7 +3711,8 @@ class LUStartupInstance(LogicalUnit): if self.hvparams: if not isinstance(self.hvparams, dict): raise errors.OpPrereqError("Invalid hvparams passed: %s, expected" - " dict" % (type(self.hvparams), )) + " dict" % (type(self.hvparams), ), + errors.ECODE_INVAL) # check hypervisor parameter syntax (locally) cluster = self.cfg.GetClusterInfo() @@ -3034,7 +3735,7 @@ class LUStartupInstance(LogicalUnit): instance.name, instance.hypervisor) remote_info.Raise("Error checking node %s" % instance.primary_node, - prereq=True) + prereq=True, ecode=errors.ECODE_ENVIRON) if not remote_info.payload: # not running already _CheckNodeFreeMemory(self, instance.primary_node, "starting instance %s" % instance.name, @@ -3070,6 +3771,13 @@ class LURebootInstance(LogicalUnit): _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"] REQ_BGL = False + def CheckArguments(self): + """Check the arguments. + + """ + self.shutdown_timeout = getattr(self.op, "shutdown_timeout", + constants.DEFAULT_SHUTDOWN_TIMEOUT) + def ExpandNames(self): if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT, constants.INSTANCE_REBOOT_HARD, @@ -3089,6 +3797,7 @@ class LURebootInstance(LogicalUnit): env = { "IGNORE_SECONDARIES": self.op.ignore_secondaries, "REBOOT_TYPE": self.op.reboot_type, + "SHUTDOWN_TIMEOUT": self.shutdown_timeout, } env.update(_BuildInstanceHookEnvByObject(self, self.instance)) nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) @@ -3124,10 +3833,12 @@ class LURebootInstance(LogicalUnit): for disk in instance.disks: self.cfg.SetDiskID(disk, node_current) result = self.rpc.call_instance_reboot(node_current, instance, - reboot_type) + reboot_type, + self.shutdown_timeout) result.Raise("Could not reboot instance") else: - result = self.rpc.call_instance_shutdown(node_current, instance) + result = self.rpc.call_instance_shutdown(node_current, instance, + self.shutdown_timeout) result.Raise("Could not shutdown instance for full reboot") _ShutdownInstanceDisks(self, instance) _StartInstanceDisks(self, instance, ignore_secondaries) @@ -3150,6 +3861,13 @@ class LUShutdownInstance(LogicalUnit): _OP_REQP = ["instance_name"] REQ_BGL = False + def CheckArguments(self): + """Check the arguments. + + """ + self.timeout = getattr(self.op, "timeout", + constants.DEFAULT_SHUTDOWN_TIMEOUT) + def ExpandNames(self): self._ExpandAndLockInstance() @@ -3160,6 +3878,7 @@ class LUShutdownInstance(LogicalUnit): """ env = _BuildInstanceHookEnvByObject(self, self.instance) + env["TIMEOUT"] = self.timeout nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) return env, nl, nl @@ -3180,8 +3899,9 @@ class LUShutdownInstance(LogicalUnit): """ instance = self.instance node_current = instance.primary_node + timeout = self.timeout self.cfg.MarkInstanceDown(instance.name) - result = self.rpc.call_instance_shutdown(node_current, instance) + result = self.rpc.call_instance_shutdown(node_current, instance, timeout) msg = result.fail_msg if msg: self.proc.LogWarning("Could not shutdown instance: %s" % msg) @@ -3224,31 +3944,38 @@ class LUReinstallInstance(LogicalUnit): if instance.disk_template == constants.DT_DISKLESS: raise errors.OpPrereqError("Instance '%s' has no disks" % - self.op.instance_name) + self.op.instance_name, + errors.ECODE_INVAL) if instance.admin_up: raise errors.OpPrereqError("Instance '%s' is marked to be up" % - self.op.instance_name) + self.op.instance_name, + errors.ECODE_STATE) remote_info = self.rpc.call_instance_info(instance.primary_node, instance.name, instance.hypervisor) remote_info.Raise("Error checking node %s" % instance.primary_node, - prereq=True) + prereq=True, ecode=errors.ECODE_ENVIRON) if remote_info.payload: raise errors.OpPrereqError("Instance '%s' is running on the node %s" % (self.op.instance_name, - instance.primary_node)) + instance.primary_node), + errors.ECODE_STATE) self.op.os_type = getattr(self.op, "os_type", None) + self.op.force_variant = getattr(self.op, "force_variant", False) if self.op.os_type is not None: # OS verification pnode = self.cfg.GetNodeInfo( self.cfg.ExpandNodeName(instance.primary_node)) if pnode is None: raise errors.OpPrereqError("Primary node '%s' is unknown" % - self.op.pnode) + self.op.pnode, errors.ECODE_NOENT) result = self.rpc.call_os_get(pnode.name, self.op.os_type) result.Raise("OS '%s' not in supported OS list for primary node %s" % - (self.op.os_type, pnode.name), prereq=True) + (self.op.os_type, pnode.name), + prereq=True, ecode=errors.ECODE_INVAL) + if not self.op.force_variant: + _CheckOSVariant(result.payload, self.op.os_type) self.instance = instance @@ -3261,18 +3988,103 @@ class LUReinstallInstance(LogicalUnit): if self.op.os_type is not None: feedback_fn("Changing OS to '%s'..." % self.op.os_type) inst.os = self.op.os_type - self.cfg.Update(inst) + self.cfg.Update(inst, feedback_fn) _StartInstanceDisks(self, inst, None) try: feedback_fn("Running the instance OS create scripts...") - result = self.rpc.call_instance_os_add(inst.primary_node, inst, True) + # FIXME: pass debug option from opcode to backend + result = self.rpc.call_instance_os_add(inst.primary_node, inst, True, 0) result.Raise("Could not install OS for instance %s on node %s" % (inst.name, inst.primary_node)) finally: _ShutdownInstanceDisks(self, inst) +class LURecreateInstanceDisks(LogicalUnit): + """Recreate an instance's missing disks. + + """ + HPATH = "instance-recreate-disks" + HTYPE = constants.HTYPE_INSTANCE + _OP_REQP = ["instance_name", "disks"] + REQ_BGL = False + + def CheckArguments(self): + """Check the arguments. + + """ + if not isinstance(self.op.disks, list): + raise errors.OpPrereqError("Invalid disks parameter", errors.ECODE_INVAL) + for item in self.op.disks: + if (not isinstance(item, int) or + item < 0): + raise errors.OpPrereqError("Invalid disk specification '%s'" % + str(item), errors.ECODE_INVAL) + + def ExpandNames(self): + self._ExpandAndLockInstance() + + def BuildHooksEnv(self): + """Build hooks env. + + This runs on master, primary and secondary nodes of the instance. + + """ + env = _BuildInstanceHookEnvByObject(self, self.instance) + nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) + return env, nl, nl + + def CheckPrereq(self): + """Check prerequisites. + + This checks that the instance is in the cluster and is not running. + + """ + instance = self.cfg.GetInstanceInfo(self.op.instance_name) + assert instance is not None, \ + "Cannot retrieve locked instance %s" % self.op.instance_name + _CheckNodeOnline(self, instance.primary_node) + + if instance.disk_template == constants.DT_DISKLESS: + raise errors.OpPrereqError("Instance '%s' has no disks" % + self.op.instance_name, errors.ECODE_INVAL) + if instance.admin_up: + raise errors.OpPrereqError("Instance '%s' is marked to be up" % + self.op.instance_name, errors.ECODE_STATE) + remote_info = self.rpc.call_instance_info(instance.primary_node, + instance.name, + instance.hypervisor) + remote_info.Raise("Error checking node %s" % instance.primary_node, + prereq=True, ecode=errors.ECODE_ENVIRON) + if remote_info.payload: + raise errors.OpPrereqError("Instance '%s' is running on the node %s" % + (self.op.instance_name, + instance.primary_node), errors.ECODE_STATE) + + if not self.op.disks: + self.op.disks = range(len(instance.disks)) + else: + for idx in self.op.disks: + if idx >= len(instance.disks): + raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx, + errors.ECODE_INVAL) + + self.instance = instance + + def Exec(self, feedback_fn): + """Recreate the disks. + + """ + to_skip = [] + for idx, _ in enumerate(self.instance.disks): + if idx not in self.op.disks: # disk idx has not been passed in + to_skip.append(idx) + continue + + _CreateDisks(self, self.instance, to_skip=to_skip) + + class LURenameInstance(LogicalUnit): """Rename an instance. @@ -3302,36 +4114,37 @@ class LURenameInstance(LogicalUnit): self.cfg.ExpandInstanceName(self.op.instance_name)) if instance is None: raise errors.OpPrereqError("Instance '%s' not known" % - self.op.instance_name) + self.op.instance_name, errors.ECODE_NOENT) _CheckNodeOnline(self, instance.primary_node) if instance.admin_up: raise errors.OpPrereqError("Instance '%s' is marked to be up" % - self.op.instance_name) + self.op.instance_name, errors.ECODE_STATE) remote_info = self.rpc.call_instance_info(instance.primary_node, instance.name, instance.hypervisor) remote_info.Raise("Error checking node %s" % instance.primary_node, - prereq=True) + prereq=True, ecode=errors.ECODE_ENVIRON) if remote_info.payload: raise errors.OpPrereqError("Instance '%s' is running on the node %s" % (self.op.instance_name, - instance.primary_node)) + instance.primary_node), errors.ECODE_STATE) self.instance = instance # new name verification - name_info = utils.HostInfo(self.op.new_name) + name_info = utils.GetHostInfo(self.op.new_name) self.op.new_name = new_name = name_info.name instance_list = self.cfg.GetInstanceList() if new_name in instance_list: raise errors.OpPrereqError("Instance '%s' is already in the cluster" % - new_name) + new_name, errors.ECODE_EXISTS) if not getattr(self.op, "ignore_ip", False): if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT): raise errors.OpPrereqError("IP %s of instance %s already in use" % - (name_info.ip, new_name)) + (name_info.ip, new_name), + errors.ECODE_NOTUNIQUE) def Exec(self, feedback_fn): @@ -3365,7 +4178,7 @@ class LURenameInstance(LogicalUnit): _StartInstanceDisks(self, inst, None) try: result = self.rpc.call_instance_run_rename(inst.primary_node, inst, - old_name) + old_name, 0) msg = result.fail_msg if msg: msg = ("Could not run OS rename script for instance %s on node %s" @@ -3385,6 +4198,13 @@ class LURemoveInstance(LogicalUnit): _OP_REQP = ["instance_name", "ignore_failures"] REQ_BGL = False + def CheckArguments(self): + """Check the arguments. + + """ + self.shutdown_timeout = getattr(self.op, "shutdown_timeout", + constants.DEFAULT_SHUTDOWN_TIMEOUT) + def ExpandNames(self): self._ExpandAndLockInstance() self.needed_locks[locking.LEVEL_NODE] = [] @@ -3401,6 +4221,7 @@ class LURemoveInstance(LogicalUnit): """ env = _BuildInstanceHookEnvByObject(self, self.instance) + env["SHUTDOWN_TIMEOUT"] = self.shutdown_timeout nl = [self.cfg.GetMasterNode()] return env, nl, nl @@ -3422,7 +4243,8 @@ class LURemoveInstance(LogicalUnit): logging.info("Shutting down instance %s on node %s", instance.name, instance.primary_node) - result = self.rpc.call_instance_shutdown(instance.primary_node, instance) + result = self.rpc.call_instance_shutdown(instance.primary_node, instance, + self.shutdown_timeout) msg = result.fail_msg if msg: if self.op.ignore_failures: @@ -3450,8 +4272,11 @@ class LUQueryInstances(NoHooksLU): """Logical unit for querying instances. """ + # pylint: disable-msg=W0142 _OP_REQP = ["output_fields", "names", "use_locking"] REQ_BGL = False + _SIMPLE_FIELDS = ["name", "os", "network_port", "hypervisor", + "serial_no", "ctime", "mtime", "uuid"] _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes", "admin_state", "disk_template", "ip", "mac", "bridge", @@ -3464,9 +4289,11 @@ class LUQueryInstances(NoHooksLU): r"(nic)\.(bridge)/([0-9]+)", r"(nic)\.(macs|ips|modes|links|bridges)", r"(disk|nic)\.(count)", - "serial_no", "hypervisor", "hvparams",] + + "hvparams", + ] + _SIMPLE_FIELDS + ["hv/%s" % name - for name in constants.HVS_PARAMETERS] + + for name in constants.HVS_PARAMETERS + if name not in constants.HVC_GLOBALS] + ["be/%s" % name for name in constants.BES_PARAMETERS]) _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status") @@ -3507,6 +4334,8 @@ class LUQueryInstances(NoHooksLU): """Computes the list of nodes and their attributes. """ + # pylint: disable-msg=R0912 + # way too many branches here all_info = self.cfg.GetAllInstancesInfo() if self.wanted == locking.ALL_SET: # caller didn't specify instance names, so ordering is not important @@ -3544,7 +4373,7 @@ class LUQueryInstances(NoHooksLU): if result.offline: # offline nodes will be in both lists off_nodes.append(name) - if result.failed or result.fail_msg: + if result.fail_msg: bad_nodes.append(name) else: if result.payload: @@ -3561,16 +4390,14 @@ class LUQueryInstances(NoHooksLU): cluster = self.cfg.GetClusterInfo() for instance in instance_list: iout = [] - i_hv = cluster.FillHV(instance) + i_hv = cluster.FillHV(instance, skip_globals=True) i_be = cluster.FillBE(instance) i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT], nic.nicparams) for nic in instance.nics] for field in self.op.output_fields: st_match = self._FIELDS_STATIC.Matches(field) - if field == "name": - val = instance.name - elif field == "os": - val = instance.os + if field in self._SIMPLE_FIELDS: + val = getattr(instance, field) elif field == "pnode": val = instance.primary_node elif field == "snodes": @@ -3647,16 +4474,11 @@ class LUQueryInstances(NoHooksLU): val = _ComputeDiskSize(instance.disk_template, disk_sizes) elif field == "tags": val = list(instance.GetTags()) - elif field == "serial_no": - val = instance.serial_no - elif field == "network_port": - val = instance.network_port - elif field == "hypervisor": - val = instance.hypervisor elif field == "hvparams": val = i_hv elif (field.startswith(HVPREFIX) and - field[len(HVPREFIX):] in constants.HVS_PARAMETERS): + field[len(HVPREFIX):] in constants.HVS_PARAMETERS and + field[len(HVPREFIX):] not in constants.HVC_GLOBALS): val = i_hv.get(field[len(HVPREFIX):], None) elif field == "beparams": val = i_be @@ -3738,6 +4560,13 @@ class LUFailoverInstance(LogicalUnit): _OP_REQP = ["instance_name", "ignore_consistency"] REQ_BGL = False + def CheckArguments(self): + """Check the arguments. + + """ + self.shutdown_timeout = getattr(self.op, "shutdown_timeout", + constants.DEFAULT_SHUTDOWN_TIMEOUT) + def ExpandNames(self): self._ExpandAndLockInstance() self.needed_locks[locking.LEVEL_NODE] = [] @@ -3755,6 +4584,7 @@ class LUFailoverInstance(LogicalUnit): """ env = { "IGNORE_CONSISTENCY": self.op.ignore_consistency, + "SHUTDOWN_TIMEOUT": self.shutdown_timeout, } env.update(_BuildInstanceHookEnvByObject(self, self.instance)) nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes) @@ -3773,7 +4603,8 @@ class LUFailoverInstance(LogicalUnit): bep = self.cfg.GetClusterInfo().FillBE(instance) if instance.disk_template not in constants.DTS_NET_MIRROR: raise errors.OpPrereqError("Instance's disk layout is not" - " network mirrored, cannot failover.") + " network mirrored, cannot failover.", + errors.ECODE_STATE) secondary_nodes = instance.secondary_nodes if not secondary_nodes: @@ -3807,19 +4638,23 @@ class LUFailoverInstance(LogicalUnit): source_node = instance.primary_node target_node = instance.secondary_nodes[0] - feedback_fn("* checking disk consistency between source and target") - for dev in instance.disks: - # for drbd, these are drbd over lvm - if not _CheckDiskConsistency(self, dev, target_node, False): - if instance.admin_up and not self.op.ignore_consistency: - raise errors.OpExecError("Disk %s is degraded on target node," - " aborting failover." % dev.iv_name) + if instance.admin_up: + feedback_fn("* checking disk consistency between source and target") + for dev in instance.disks: + # for drbd, these are drbd over lvm + if not _CheckDiskConsistency(self, dev, target_node, False): + if not self.op.ignore_consistency: + raise errors.OpExecError("Disk %s is degraded on target node," + " aborting failover." % dev.iv_name) + else: + feedback_fn("* not checking disk consistency as instance is not running") feedback_fn("* shutting down instance on source node") logging.info("Shutting down instance %s on node %s", instance.name, source_node) - result = self.rpc.call_instance_shutdown(source_node, instance) + result = self.rpc.call_instance_shutdown(source_node, instance, + self.shutdown_timeout) msg = result.fail_msg if msg: if self.op.ignore_consistency: @@ -3838,7 +4673,7 @@ class LUFailoverInstance(LogicalUnit): instance.primary_node = target_node # distribute new instance config to the other nodes - self.cfg.Update(instance) + self.cfg.Update(instance, feedback_fn) # Only start the instance if it's marked as up if instance.admin_up: @@ -3864,20 +4699,264 @@ class LUFailoverInstance(LogicalUnit): class LUMigrateInstance(LogicalUnit): """Migrate an instance. - This is migration without shutting down, compared to the failover, - which is done with shutdown. + This is migration without shutting down, compared to the failover, + which is done with shutdown. + + """ + HPATH = "instance-migrate" + HTYPE = constants.HTYPE_INSTANCE + _OP_REQP = ["instance_name", "live", "cleanup"] + + REQ_BGL = False + + def ExpandNames(self): + self._ExpandAndLockInstance() + + self.needed_locks[locking.LEVEL_NODE] = [] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + + self._migrater = TLMigrateInstance(self, self.op.instance_name, + self.op.live, self.op.cleanup) + self.tasklets = [self._migrater] + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE: + self._LockInstancesNodes() + + def BuildHooksEnv(self): + """Build hooks env. + + This runs on master, primary and secondary nodes of the instance. + + """ + instance = self._migrater.instance + env = _BuildInstanceHookEnvByObject(self, instance) + env["MIGRATE_LIVE"] = self.op.live + env["MIGRATE_CLEANUP"] = self.op.cleanup + nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes) + return env, nl, nl + + +class LUMoveInstance(LogicalUnit): + """Move an instance by data-copying. + + """ + HPATH = "instance-move" + HTYPE = constants.HTYPE_INSTANCE + _OP_REQP = ["instance_name", "target_node"] + REQ_BGL = False + + def CheckArguments(self): + """Check the arguments. + + """ + self.shutdown_timeout = getattr(self.op, "shutdown_timeout", + constants.DEFAULT_SHUTDOWN_TIMEOUT) + + def ExpandNames(self): + self._ExpandAndLockInstance() + target_node = self.cfg.ExpandNodeName(self.op.target_node) + if target_node is None: + raise errors.OpPrereqError("Node '%s' not known" % + self.op.target_node, errors.ECODE_NOENT) + self.op.target_node = target_node + self.needed_locks[locking.LEVEL_NODE] = [target_node] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE: + self._LockInstancesNodes(primary_only=True) + + def BuildHooksEnv(self): + """Build hooks env. + + This runs on master, primary and secondary nodes of the instance. + + """ + env = { + "TARGET_NODE": self.op.target_node, + "SHUTDOWN_TIMEOUT": self.shutdown_timeout, + } + env.update(_BuildInstanceHookEnvByObject(self, self.instance)) + nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node, + self.op.target_node] + return env, nl, nl + + def CheckPrereq(self): + """Check prerequisites. + + This checks that the instance is in the cluster. + + """ + self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name) + assert self.instance is not None, \ + "Cannot retrieve locked instance %s" % self.op.instance_name + + node = self.cfg.GetNodeInfo(self.op.target_node) + assert node is not None, \ + "Cannot retrieve locked node %s" % self.op.target_node + + self.target_node = target_node = node.name + + if target_node == instance.primary_node: + raise errors.OpPrereqError("Instance %s is already on the node %s" % + (instance.name, target_node), + errors.ECODE_STATE) + + bep = self.cfg.GetClusterInfo().FillBE(instance) + + for idx, dsk in enumerate(instance.disks): + if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE): + raise errors.OpPrereqError("Instance disk %d has a complex layout," + " cannot copy" % idx, errors.ECODE_STATE) + + _CheckNodeOnline(self, target_node) + _CheckNodeNotDrained(self, target_node) + + if instance.admin_up: + # check memory requirements on the secondary node + _CheckNodeFreeMemory(self, target_node, "failing over instance %s" % + instance.name, bep[constants.BE_MEMORY], + instance.hypervisor) + else: + self.LogInfo("Not checking memory on the secondary node as" + " instance will not be started") + + # check bridge existance + _CheckInstanceBridgesExist(self, instance, node=target_node) + + def Exec(self, feedback_fn): + """Move an instance. + + The move is done by shutting it down on its present node, copying + the data over (slow) and starting it on the new node. + + """ + instance = self.instance + + source_node = instance.primary_node + target_node = self.target_node + + self.LogInfo("Shutting down instance %s on source node %s", + instance.name, source_node) + + result = self.rpc.call_instance_shutdown(source_node, instance, + self.shutdown_timeout) + msg = result.fail_msg + if msg: + if self.op.ignore_consistency: + self.proc.LogWarning("Could not shutdown instance %s on node %s." + " Proceeding anyway. Please make sure node" + " %s is down. Error details: %s", + instance.name, source_node, source_node, msg) + else: + raise errors.OpExecError("Could not shutdown instance %s on" + " node %s: %s" % + (instance.name, source_node, msg)) + + # create the target disks + try: + _CreateDisks(self, instance, target_node=target_node) + except errors.OpExecError: + self.LogWarning("Device creation failed, reverting...") + try: + _RemoveDisks(self, instance, target_node=target_node) + finally: + self.cfg.ReleaseDRBDMinors(instance.name) + raise + + cluster_name = self.cfg.GetClusterInfo().cluster_name + + errs = [] + # activate, get path, copy the data over + for idx, disk in enumerate(instance.disks): + self.LogInfo("Copying data for disk %d", idx) + result = self.rpc.call_blockdev_assemble(target_node, disk, + instance.name, True) + if result.fail_msg: + self.LogWarning("Can't assemble newly created disk %d: %s", + idx, result.fail_msg) + errs.append(result.fail_msg) + break + dev_path = result.payload + result = self.rpc.call_blockdev_export(source_node, disk, + target_node, dev_path, + cluster_name) + if result.fail_msg: + self.LogWarning("Can't copy data over for disk %d: %s", + idx, result.fail_msg) + errs.append(result.fail_msg) + break + + if errs: + self.LogWarning("Some disks failed to copy, aborting") + try: + _RemoveDisks(self, instance, target_node=target_node) + finally: + self.cfg.ReleaseDRBDMinors(instance.name) + raise errors.OpExecError("Errors during disk copy: %s" % + (",".join(errs),)) + + instance.primary_node = target_node + self.cfg.Update(instance, feedback_fn) + + self.LogInfo("Removing the disks on the original node") + _RemoveDisks(self, instance, target_node=source_node) + + # Only start the instance if it's marked as up + if instance.admin_up: + self.LogInfo("Starting instance %s on node %s", + instance.name, target_node) + + disks_ok, _ = _AssembleInstanceDisks(self, instance, + ignore_secondaries=True) + if not disks_ok: + _ShutdownInstanceDisks(self, instance) + raise errors.OpExecError("Can't activate the instance's disks") + + result = self.rpc.call_instance_start(target_node, instance, None, None) + msg = result.fail_msg + if msg: + _ShutdownInstanceDisks(self, instance) + raise errors.OpExecError("Could not start instance %s on node %s: %s" % + (instance.name, target_node, msg)) + + +class LUMigrateNode(LogicalUnit): + """Migrate all instances from a node. + + """ + HPATH = "node-migrate" + HTYPE = constants.HTYPE_NODE + _OP_REQP = ["node_name", "live"] + REQ_BGL = False + + def ExpandNames(self): + self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name) + if self.op.node_name is None: + raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name, + errors.ECODE_NOENT) + + self.needed_locks = { + locking.LEVEL_NODE: [self.op.node_name], + } + + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + + # Create tasklets for migrating instances for all instances on this node + names = [] + tasklets = [] + + for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name): + logging.debug("Migrating instance %s", inst.name) + names.append(inst.name) - """ - HPATH = "instance-migrate" - HTYPE = constants.HTYPE_INSTANCE - _OP_REQP = ["instance_name", "live", "cleanup"] + tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False)) - REQ_BGL = False + self.tasklets = tasklets - def ExpandNames(self): - self._ExpandAndLockInstance() - self.needed_locks[locking.LEVEL_NODE] = [] - self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + # Declare instance locks + self.needed_locks[locking.LEVEL_INSTANCE] = names def DeclareLocks(self, level): if level == locking.LEVEL_NODE: @@ -3886,14 +4965,29 @@ class LUMigrateInstance(LogicalUnit): def BuildHooksEnv(self): """Build hooks env. - This runs on master, primary and secondary nodes of the instance. + This runs on the master, the primary and all the secondaries. """ - env = _BuildInstanceHookEnvByObject(self, self.instance) - env["MIGRATE_LIVE"] = self.op.live - env["MIGRATE_CLEANUP"] = self.op.cleanup - nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes) - return env, nl, nl + env = { + "NODE_NAME": self.op.node_name, + } + + nl = [self.cfg.GetMasterNode()] + + return (env, nl, nl) + + +class TLMigrateInstance(Tasklet): + def __init__(self, lu, instance_name, live, cleanup): + """Initializes this class. + + """ + Tasklet.__init__(self, lu) + + # Parameters + self.instance_name = instance_name + self.live = live + self.cleanup = cleanup def CheckPrereq(self): """Check prerequisites. @@ -3902,14 +4996,14 @@ class LUMigrateInstance(LogicalUnit): """ instance = self.cfg.GetInstanceInfo( - self.cfg.ExpandInstanceName(self.op.instance_name)) + self.cfg.ExpandInstanceName(self.instance_name)) if instance is None: raise errors.OpPrereqError("Instance '%s' not known" % - self.op.instance_name) + self.instance_name, errors.ECODE_NOENT) if instance.disk_template != constants.DT_DRBD8: raise errors.OpPrereqError("Instance's disk layout is not" - " drbd8, cannot migrate.") + " drbd8, cannot migrate.", errors.ECODE_STATE) secondary_nodes = instance.secondary_nodes if not secondary_nodes: @@ -3927,11 +5021,12 @@ class LUMigrateInstance(LogicalUnit): # check bridge existance _CheckInstanceBridgesExist(self, instance, node=target_node) - if not self.op.cleanup: + if not self.cleanup: _CheckNodeNotDrained(self, target_node) result = self.rpc.call_instance_migratable(instance.primary_node, instance) - result.Raise("Can't migrate, please use failover", prereq=True) + result.Raise("Can't migrate, please use failover", + prereq=True, ecode=errors.ECODE_STATE) self.instance = instance @@ -4043,7 +5138,7 @@ class LUMigrateInstance(LogicalUnit): self.feedback_fn("* instance running on secondary node (%s)," " updating config" % target_node) instance.primary_node = target_node - self.cfg.Update(instance) + self.cfg.Update(instance, self.feedback_fn) demoted_node = source_node else: self.feedback_fn("* instance confirmed to be running on its" @@ -4074,10 +5169,10 @@ class LUMigrateInstance(LogicalUnit): self._GoReconnect(False) self._WaitUntilSync() except errors.OpExecError, err: - self.LogWarning("Migration failed and I can't reconnect the" - " drives: error '%s'\n" - "Please look and recover the instance status" % - str(err)) + self.lu.LogWarning("Migration failed and I can't reconnect the" + " drives: error '%s'\n" + "Please look and recover the instance status" % + str(err)) def _AbortMigration(self): """Call the hypervisor code to abort a started migration. @@ -4093,8 +5188,8 @@ class LUMigrateInstance(LogicalUnit): False) abort_msg = abort_result.fail_msg if abort_msg: - logging.error("Aborting migration failed on target node %s: %s" % - (target_node, abort_msg)) + logging.error("Aborting migration failed on target node %s: %s", + target_node, abort_msg) # Don't raise an exception here, as we stil have to try to revert the # disk status, even if this step failed. @@ -4148,6 +5243,7 @@ class LUMigrateInstance(LogicalUnit): if msg: logging.error("Instance pre-migration failed, trying to revert" " disk status: %s", msg) + self.feedback_fn("Pre-migration failed, aborting") self._AbortMigration() self._RevertDiskStatus() raise errors.OpExecError("Could not pre-migrate instance %s: %s" % @@ -4157,11 +5253,12 @@ class LUMigrateInstance(LogicalUnit): time.sleep(10) result = self.rpc.call_instance_migrate(source_node, instance, self.nodes_ip[target_node], - self.op.live) + self.live) msg = result.fail_msg if msg: logging.error("Instance migration failed, trying to revert" " disk status: %s", msg) + self.feedback_fn("Migration failed, aborting") self._AbortMigration() self._RevertDiskStatus() raise errors.OpExecError("Could not migrate instance %s: %s" % @@ -4170,7 +5267,7 @@ class LUMigrateInstance(LogicalUnit): instance.primary_node = target_node # distribute new instance config to the other nodes - self.cfg.Update(instance) + self.cfg.Update(instance, self.feedback_fn) result = self.rpc.call_finalize_migration(target_node, instance, @@ -4179,7 +5276,7 @@ class LUMigrateInstance(LogicalUnit): msg = result.fail_msg if msg: logging.error("Instance migration succeeded, but finalization failed:" - " %s" % msg) + " %s", msg) raise errors.OpExecError("Could not finalize instance migration: %s" % msg) @@ -4195,6 +5292,8 @@ class LUMigrateInstance(LogicalUnit): """Perform the migration. """ + feedback_fn("Migrating instance %s" % self.instance.name) + self.feedback_fn = feedback_fn self.source_node = self.instance.primary_node @@ -4204,7 +5303,8 @@ class LUMigrateInstance(LogicalUnit): self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip, self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip, } - if self.op.cleanup: + + if self.cleanup: return self._ExecCleanup() else: return self._ExecMigration() @@ -4290,7 +5390,7 @@ def _GenerateUniqueNames(lu, exts): """ results = [] for val in exts: - new_id = lu.cfg.GenerateUniqueID() + new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) results.append("%s%s" % (new_id, val)) return results @@ -4302,7 +5402,7 @@ def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name, """ port = lu.cfg.AllocatePort() vgname = lu.cfg.GetVGName() - shared_secret = lu.cfg.GenerateDRBDSecret() + shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId()) dev_data = objects.Disk(dev_type=constants.LD_LV, size=size, logical_id=(vgname, names[0])) dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128, @@ -4389,7 +5489,7 @@ def _GetInstanceInfoText(instance): return "originstname+%s" % instance.name -def _CreateDisks(lu, instance): +def _CreateDisks(lu, instance, to_skip=None, target_node=None): """Create all disks for an instance. This abstracts away some work from AddInstance. @@ -4398,32 +5498,43 @@ def _CreateDisks(lu, instance): @param lu: the logical unit on whose behalf we execute @type instance: L{objects.Instance} @param instance: the instance whose disks we should create + @type to_skip: list + @param to_skip: list of indices to skip + @type target_node: string + @param target_node: if passed, overrides the target node for creation @rtype: boolean @return: the success of the creation """ info = _GetInstanceInfoText(instance) - pnode = instance.primary_node + if target_node is None: + pnode = instance.primary_node + all_nodes = instance.all_nodes + else: + pnode = target_node + all_nodes = [pnode] if instance.disk_template == constants.DT_FILE: file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1]) result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir) result.Raise("Failed to create directory '%s' on" - " node %s: %s" % (file_storage_dir, pnode)) + " node %s" % (file_storage_dir, pnode)) # Note: this needs to be kept in sync with adding of disks in # LUSetInstanceParams - for device in instance.disks: + for idx, device in enumerate(instance.disks): + if to_skip and idx in to_skip: + continue logging.info("Creating volume %s for instance %s", device.iv_name, instance.name) #HARDCODE - for node in instance.all_nodes: + for node in all_nodes: f_create = node == pnode _CreateBlockDev(lu, node, instance, device, f_create, info, f_create) -def _RemoveDisks(lu, instance): +def _RemoveDisks(lu, instance, target_node=None): """Remove all disks for an instance. This abstracts away some work from `AddInstance()` and @@ -4435,6 +5546,8 @@ def _RemoveDisks(lu, instance): @param lu: the logical unit on whose behalf we execute @type instance: L{objects.Instance} @param instance: the instance whose disks we should remove + @type target_node: string + @param target_node: used to override the node on which to remove the disks @rtype: boolean @return: the success of the removal @@ -4443,7 +5556,11 @@ def _RemoveDisks(lu, instance): all_result = True for device in instance.disks: - for node, disk in device.ComputeNodeTree(instance.primary_node): + if target_node: + edata = [(target_node, device)] + else: + edata = device.ComputeNodeTree(instance.primary_node) + for node, disk in edata: lu.cfg.SetDiskID(disk, node) msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg if msg: @@ -4453,12 +5570,14 @@ def _RemoveDisks(lu, instance): if instance.disk_template == constants.DT_FILE: file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1]) - result = lu.rpc.call_file_storage_dir_remove(instance.primary_node, - file_storage_dir) - msg = result.fail_msg - if msg: + if target_node: + tgt = target_node + else: + tgt = instance.primary_node + result = lu.rpc.call_file_storage_dir_remove(tgt, file_storage_dir) + if result.fail_msg: lu.LogWarning("Could not remove directory '%s' on node %s: %s", - file_storage_dir, instance.primary_node, msg) + file_storage_dir, instance.primary_node, result.fail_msg) all_result = False return all_result @@ -4523,13 +5642,26 @@ class LUCreateInstance(LogicalUnit): "hvparams", "beparams"] REQ_BGL = False + def CheckArguments(self): + """Check arguments. + + """ + # do not require name_check to ease forward/backward compatibility + # for tools + if not hasattr(self.op, "name_check"): + self.op.name_check = True + if self.op.ip_check and not self.op.name_check: + # TODO: make the ip check more flexible and not depend on the name check + raise errors.OpPrereqError("Cannot do ip checks without a name check", + errors.ECODE_INVAL) + def _ExpandNode(self, node): """Expands and checks one node name. """ node_full = self.cfg.ExpandNodeName(node) if node_full is None: - raise errors.OpPrereqError("Unknown node %s" % node) + raise errors.OpPrereqError("Unknown node %s" % node, errors.ECODE_NOENT) return node_full def ExpandNames(self): @@ -4551,11 +5683,12 @@ class LUCreateInstance(LogicalUnit): if self.op.mode not in (constants.INSTANCE_CREATE, constants.INSTANCE_IMPORT): raise errors.OpPrereqError("Invalid instance creation mode '%s'" % - self.op.mode) + self.op.mode, errors.ECODE_INVAL) # disk template and mirror node verification if self.op.disk_template not in constants.DISK_TEMPLATES: - raise errors.OpPrereqError("Invalid disk template name") + raise errors.OpPrereqError("Invalid disk template name", + errors.ECODE_INVAL) if self.op.hypervisor is None: self.op.hypervisor = self.cfg.GetHypervisorType() @@ -4565,7 +5698,8 @@ class LUCreateInstance(LogicalUnit): if self.op.hypervisor not in enabled_hvs: raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the" " cluster (%s)" % (self.op.hypervisor, - ",".join(enabled_hvs))) + ",".join(enabled_hvs)), + errors.ECODE_STATE) # check hypervisor parameter syntax (locally) utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES) @@ -4574,6 +5708,8 @@ class LUCreateInstance(LogicalUnit): hv_type = hypervisor.GetHypervisor(self.op.hypervisor) hv_type.CheckParameterSyntax(filled_hvp) self.hv_full = filled_hvp + # check that we don't specify global parameters on an instance + _CheckGlobalHvParams(self.op.hvparams) # fill and remember the beparams dict utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES) @@ -4583,14 +5719,20 @@ class LUCreateInstance(LogicalUnit): #### instance parameters check # instance name verification - hostname1 = utils.HostInfo(self.op.instance_name) - self.op.instance_name = instance_name = hostname1.name + if self.op.name_check: + hostname1 = utils.GetHostInfo(self.op.instance_name) + self.op.instance_name = instance_name = hostname1.name + # used in CheckPrereq for ip ping check + self.check_ip = hostname1.ip + else: + instance_name = self.op.instance_name + self.check_ip = None # this is just a preventive check, but someone might still add this # instance in the meantime, and creation will fail at lock-add time if instance_name in self.cfg.GetInstanceList(): raise errors.OpPrereqError("Instance '%s' is already in the cluster" % - instance_name) + instance_name, errors.ECODE_EXISTS) self.add_locks[locking.LEVEL_INSTANCE] = instance_name @@ -4613,31 +5755,44 @@ class LUCreateInstance(LogicalUnit): if ip is None or ip.lower() == constants.VALUE_NONE: nic_ip = None elif ip.lower() == constants.VALUE_AUTO: + if not self.op.name_check: + raise errors.OpPrereqError("IP address set to auto but name checks" + " have been skipped. Aborting.", + errors.ECODE_INVAL) nic_ip = hostname1.ip else: if not utils.IsValidIP(ip): raise errors.OpPrereqError("Given IP address '%s' doesn't look" - " like a valid IP" % ip) + " like a valid IP" % ip, + errors.ECODE_INVAL) nic_ip = ip - # TODO: check the ip for uniqueness !! + # TODO: check the ip address for uniqueness if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip: - raise errors.OpPrereqError("Routed nic mode requires an ip address") + raise errors.OpPrereqError("Routed nic mode requires an ip address", + errors.ECODE_INVAL) # MAC address verification mac = nic.get("mac", constants.VALUE_AUTO) if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE): - if not utils.IsValidMac(mac.lower()): - raise errors.OpPrereqError("Invalid MAC address specified: %s" % - mac) + mac = utils.NormalizeAndValidateMac(mac) + + try: + self.cfg.ReserveMAC(mac, self.proc.GetECId()) + except errors.ReservationError: + raise errors.OpPrereqError("MAC address %s already in use" + " in cluster" % mac, + errors.ECODE_NOTUNIQUE) + # bridge verification bridge = nic.get("bridge", None) link = nic.get("link", None) if bridge and link: raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'" - " at the same time") + " at the same time", errors.ECODE_INVAL) elif bridge and nic_mode == constants.NIC_MODE_ROUTED: - raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic") + raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic", + errors.ECODE_INVAL) elif bridge: link = bridge @@ -4658,32 +5813,32 @@ class LUCreateInstance(LogicalUnit): mode = disk.get("mode", constants.DISK_RDWR) if mode not in constants.DISK_ACCESS_SET: raise errors.OpPrereqError("Invalid disk access mode '%s'" % - mode) + mode, errors.ECODE_INVAL) size = disk.get("size", None) if size is None: - raise errors.OpPrereqError("Missing disk size") + raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL) try: size = int(size) - except ValueError: - raise errors.OpPrereqError("Invalid disk size '%s'" % size) + except (TypeError, ValueError): + raise errors.OpPrereqError("Invalid disk size '%s'" % size, + errors.ECODE_INVAL) self.disks.append({"size": size, "mode": mode}) - # used in CheckPrereq for ip ping check - self.check_ip = hostname1.ip - # file storage checks if (self.op.file_driver and not self.op.file_driver in constants.FILE_DRIVER): raise errors.OpPrereqError("Invalid file driver name '%s'" % - self.op.file_driver) + self.op.file_driver, errors.ECODE_INVAL) if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir): - raise errors.OpPrereqError("File storage directory path not absolute") + raise errors.OpPrereqError("File storage directory path not absolute", + errors.ECODE_INVAL) ### Node/iallocator related checks if [self.op.iallocator, self.op.pnode].count(None) != 1: raise errors.OpPrereqError("One and only one of iallocator and primary" - " node must be given") + " node must be given", + errors.ECODE_INVAL) if self.op.iallocator: self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET @@ -4708,7 +5863,8 @@ class LUCreateInstance(LogicalUnit): self.op.src_node = None if os.path.isabs(src_path): raise errors.OpPrereqError("Importing an instance from an absolute" - " path requires a source node option.") + " path requires a source node option.", + errors.ECODE_INVAL) else: self.op.src_node = src_node = self._ExpandNode(src_node) if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET: @@ -4717,9 +5873,16 @@ class LUCreateInstance(LogicalUnit): self.op.src_path = src_path = \ os.path.join(constants.EXPORT_DIR, src_path) + # On import force_variant must be True, because if we forced it at + # initial install, our only chance when importing it back is that it + # works again! + self.op.force_variant = True + else: # INSTANCE_CREATE if getattr(self.op, "os_type", None) is None: - raise errors.OpPrereqError("No guest OS specified") + raise errors.OpPrereqError("No guest OS specified", + errors.ECODE_INVAL) + self.op.force_variant = getattr(self.op, "force_variant", False) def _RunAllocator(self): """Run the allocator based on input opcode. @@ -4743,17 +5906,18 @@ class LUCreateInstance(LogicalUnit): if not ial.success: raise errors.OpPrereqError("Can't compute nodes using" - " iallocator '%s': %s" % (self.op.iallocator, - ial.info)) + " iallocator '%s': %s" % + (self.op.iallocator, ial.info), + errors.ECODE_NORES) if len(ial.nodes) != ial.required_nodes: raise errors.OpPrereqError("iallocator '%s' returned invalid number" " of nodes (%s), required %s" % (self.op.iallocator, len(ial.nodes), - ial.required_nodes)) + ial.required_nodes), errors.ECODE_FAULT) self.op.pnode = ial.nodes[0] self.LogInfo("Selected nodes for instance %s via iallocator %s: %s", self.op.instance_name, self.op.iallocator, - ", ".join(ial.nodes)) + utils.CommaJoin(ial.nodes)) if ial.required_nodes == 2: self.op.snode = ial.nodes[1] @@ -4799,7 +5963,7 @@ class LUCreateInstance(LogicalUnit): if (not self.cfg.GetVGName() and self.op.disk_template not in constants.DTS_NOT_LVM): raise errors.OpPrereqError("Cluster does not support lvm-based" - " instances") + " instances", errors.ECODE_STATE) if self.op.mode == constants.INSTANCE_IMPORT: src_node = self.op.src_node @@ -4820,7 +5984,7 @@ class LUCreateInstance(LogicalUnit): break if not found: raise errors.OpPrereqError("No export found for relative path %s" % - src_path) + src_path, errors.ECODE_INVAL) _CheckNodeOnline(self, src_node) result = self.rpc.call_export_info(src_node, src_path) @@ -4828,12 +5992,14 @@ class LUCreateInstance(LogicalUnit): export_info = objects.SerializableConfigParser.Loads(str(result.payload)) if not export_info.has_section(constants.INISECT_EXP): - raise errors.ProgrammerError("Corrupted export config") + raise errors.ProgrammerError("Corrupted export config", + errors.ECODE_ENVIRON) ei_version = export_info.get(constants.INISECT_EXP, 'version') if (int(ei_version) != constants.EXPORT_VERSION): raise errors.OpPrereqError("Wrong export version %s (wanted %d)" % - (ei_version, constants.EXPORT_VERSION)) + (ei_version, constants.EXPORT_VERSION), + errors.ECODE_ENVIRON) # Check that the new instance doesn't have less disks than the export instance_disks = len(self.disks) @@ -4841,7 +6007,8 @@ class LUCreateInstance(LogicalUnit): if instance_disks < export_disks: raise errors.OpPrereqError("Not enough disks to import." " (instance: %d, export: %d)" % - (instance_disks, export_disks)) + (instance_disks, export_disks), + errors.ECODE_INVAL) self.op.os_type = export_info.get(constants.INISECT_EXP, 'os') disk_images = [] @@ -4867,15 +6034,13 @@ class LUCreateInstance(LogicalUnit): nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini) # ENDIF: self.op.mode == constants.INSTANCE_IMPORT - # ip ping checks (we use the same ip that was resolved in ExpandNames) - if self.op.start and not self.op.ip_check: - raise errors.OpPrereqError("Cannot ignore IP address conflicts when" - " adding an instance in start mode") + # ip ping checks (we use the same ip that was resolved in ExpandNames) if self.op.ip_check: if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT): raise errors.OpPrereqError("IP %s of instance %s already in use" % - (self.check_ip, self.op.instance_name)) + (self.check_ip, self.op.instance_name), + errors.ECODE_NOTUNIQUE) #### mac address generation # By generating here the mac address both the allocator and the hooks get @@ -4887,7 +6052,7 @@ class LUCreateInstance(LogicalUnit): # creation job will fail. for nic in self.nics: if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE): - nic.mac = self.cfg.GenerateMAC() + nic.mac = self.cfg.GenerateMAC(self.proc.GetECId()) #### allocator run @@ -4902,10 +6067,10 @@ class LUCreateInstance(LogicalUnit): "Cannot retrieve locked node %s" % self.op.pnode if pnode.offline: raise errors.OpPrereqError("Cannot use offline primary node '%s'" % - pnode.name) + pnode.name, errors.ECODE_STATE) if pnode.drained: raise errors.OpPrereqError("Cannot use drained primary node '%s'" % - pnode.name) + pnode.name, errors.ECODE_STATE) self.secondaries = [] @@ -4913,10 +6078,10 @@ class LUCreateInstance(LogicalUnit): if self.op.disk_template in constants.DTS_NET_MIRROR: if self.op.snode is None: raise errors.OpPrereqError("The networked disk templates need" - " a mirror node") + " a mirror node", errors.ECODE_INVAL) if self.op.snode == pnode.name: - raise errors.OpPrereqError("The secondary node cannot be" - " the primary node.") + raise errors.OpPrereqError("The secondary node cannot be the" + " primary node.", errors.ECODE_INVAL) _CheckNodeOnline(self, self.op.snode) _CheckNodeNotDrained(self, self.op.snode) self.secondaries.append(self.op.snode) @@ -4937,18 +6102,22 @@ class LUCreateInstance(LogicalUnit): vg_free = info.get('vg_free', None) if not isinstance(vg_free, int): raise errors.OpPrereqError("Can't compute free disk space on" - " node %s" % node) + " node %s" % node, errors.ECODE_ENVIRON) if req_size > vg_free: raise errors.OpPrereqError("Not enough disk space on target node %s." " %d MB available, %d MB required" % - (node, vg_free, req_size)) + (node, vg_free, req_size), + errors.ECODE_NORES) _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams) # os verification result = self.rpc.call_os_get(pnode.name, self.op.os_type) result.Raise("OS '%s' not in supported os list for primary node %s" % - (self.op.os_type, pnode.name), prereq=True) + (self.op.os_type, pnode.name), + prereq=True, ecode=errors.ECODE_INVAL) + if not self.op.force_variant: + _CheckOSVariant(result.payload, self.op.os_type) _CheckNicsBridgesExist(self, self.nics, self.pnode.name) @@ -5022,7 +6191,8 @@ class LUCreateInstance(LogicalUnit): feedback_fn("adding instance %s to cluster config" % instance) - self.cfg.AddInstance(iobj) + self.cfg.AddInstance(iobj, self.proc.GetECId()) + # Declare that we don't want to remove the instance lock anymore, as we've # added the instance to the config del self.remove_locks[locking.LEVEL_INSTANCE] @@ -5061,7 +6231,8 @@ class LUCreateInstance(LogicalUnit): if iobj.disk_template != constants.DT_DISKLESS: if self.op.mode == constants.INSTANCE_CREATE: feedback_fn("* running the instance OS create scripts...") - result = self.rpc.call_instance_os_add(pnode_name, iobj, False) + # FIXME: pass debug option from opcode to backend + result = self.rpc.call_instance_os_add(pnode_name, iobj, False, 0) result.Raise("Could not add os for instance %s" " on node %s" % (instance, pnode_name)) @@ -5070,9 +6241,10 @@ class LUCreateInstance(LogicalUnit): src_node = self.op.src_node src_images = self.src_images cluster_name = self.cfg.GetClusterName() + # FIXME: pass debug option from opcode to backend import_result = self.rpc.call_instance_os_import(pnode_name, iobj, src_node, src_images, - cluster_name) + cluster_name, 0) msg = import_result.fail_msg if msg: self.LogWarning("Error while importing the disk images for instance" @@ -5084,7 +6256,7 @@ class LUCreateInstance(LogicalUnit): if self.op.start: iobj.admin_up = True - self.cfg.Update(iobj) + self.cfg.Update(iobj, feedback_fn) logging.info("Starting instance %s on node %s", instance, pnode_name) feedback_fn("* starting instance...") result = self.rpc.call_instance_start(pnode_name, iobj, None, None) @@ -5160,9 +6332,11 @@ class LUReplaceDisks(LogicalUnit): self.op.remote_node = None if not hasattr(self.op, "iallocator"): self.op.iallocator = None + if not hasattr(self.op, "early_release"): + self.op.early_release = False - _DiskReplacer.CheckArguments(self.op.mode, self.op.remote_node, - self.op.iallocator) + TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node, + self.op.iallocator) def ExpandNames(self): self._ExpandAndLockInstance() @@ -5174,7 +6348,7 @@ class LUReplaceDisks(LogicalUnit): remote_node = self.cfg.ExpandNodeName(self.op.remote_node) if remote_node is None: raise errors.OpPrereqError("Node '%s' not known" % - self.op.remote_node) + self.op.remote_node, errors.ECODE_NOENT) self.op.remote_node = remote_node @@ -5189,9 +6363,11 @@ class LUReplaceDisks(LogicalUnit): self.needed_locks[locking.LEVEL_NODE] = [] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE - self.replacer = _DiskReplacer(self, self.op.instance_name, self.op.mode, - self.op.iallocator, self.op.remote_node, - self.op.disks) + self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode, + self.op.iallocator, self.op.remote_node, + self.op.disks, False, self.op.early_release) + + self.tasklets = [self.replacer] def DeclareLocks(self, level): # If we're not already locking all nodes in the set we have to declare the @@ -5221,45 +6397,125 @@ class LUReplaceDisks(LogicalUnit): nl.append(self.op.remote_node) return env, nl, nl - def CheckPrereq(self): - """Check prerequisites. - This checks that the instance is in the cluster. +class LUEvacuateNode(LogicalUnit): + """Relocate the secondary instances from a node. - """ - self.replacer.CheckPrereq() + """ + HPATH = "node-evacuate" + HTYPE = constants.HTYPE_NODE + _OP_REQP = ["node_name"] + REQ_BGL = False - def Exec(self, feedback_fn): - """Execute disk replacement. + def CheckArguments(self): + if not hasattr(self.op, "remote_node"): + self.op.remote_node = None + if not hasattr(self.op, "iallocator"): + self.op.iallocator = None + if not hasattr(self.op, "early_release"): + self.op.early_release = False - This dispatches the disk replacement to the appropriate handler. + TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG, + self.op.remote_node, + self.op.iallocator) + + def ExpandNames(self): + self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name) + if self.op.node_name is None: + raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name, + errors.ECODE_NOENT) + + self.needed_locks = {} + + # Declare node locks + if self.op.iallocator is not None: + self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + + elif self.op.remote_node is not None: + remote_node = self.cfg.ExpandNodeName(self.op.remote_node) + if remote_node is None: + raise errors.OpPrereqError("Node '%s' not known" % + self.op.remote_node, errors.ECODE_NOENT) + + self.op.remote_node = remote_node + + # Warning: do not remove the locking of the new secondary here + # unless DRBD8.AddChildren is changed to work in parallel; + # currently it doesn't since parallel invocations of + # FindUnusedMinor will conflict + self.needed_locks[locking.LEVEL_NODE] = [remote_node] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + + else: + raise errors.OpPrereqError("Invalid parameters", errors.ECODE_INVAL) + + # Create tasklets for replacing disks for all secondary instances on this + # node + names = [] + tasklets = [] + + for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name): + logging.debug("Replacing disks for instance %s", inst.name) + names.append(inst.name) + + replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG, + self.op.iallocator, self.op.remote_node, [], + True, self.op.early_release) + tasklets.append(replacer) + + self.tasklets = tasklets + self.instance_names = names + + # Declare instance locks + self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names + + def DeclareLocks(self, level): + # If we're not already locking all nodes in the set we have to declare the + # instance's primary/secondary nodes. + if (level == locking.LEVEL_NODE and + self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET): + self._LockInstancesNodes() + + def BuildHooksEnv(self): + """Build hooks env. + + This runs on the master, the primary and all the secondaries. """ - self.replacer.Exec() + env = { + "NODE_NAME": self.op.node_name, + } + + nl = [self.cfg.GetMasterNode()] + + if self.op.remote_node is not None: + env["NEW_SECONDARY"] = self.op.remote_node + nl.append(self.op.remote_node) + return (env, nl, nl) -class _DiskReplacer: + +class TLReplaceDisks(Tasklet): """Replaces disks for an instance. Note: Locking is not within the scope of this class. """ def __init__(self, lu, instance_name, mode, iallocator_name, remote_node, - disks): + disks, delay_iallocator, early_release): """Initializes this class. """ + Tasklet.__init__(self, lu) + # Parameters - self.lu = lu self.instance_name = instance_name self.mode = mode self.iallocator_name = iallocator_name self.remote_node = remote_node self.disks = disks - - # Shortcuts - self.cfg = lu.cfg - self.rpc = lu.rpc + self.delay_iallocator = delay_iallocator + self.early_release = early_release # Runtime data self.instance = None @@ -5271,21 +6527,25 @@ class _DiskReplacer: @staticmethod def CheckArguments(mode, remote_node, iallocator): + """Helper function for users of this class. + + """ # check for valid parameter combination - cnt = [remote_node, iallocator].count(None) if mode == constants.REPLACE_DISK_CHG: - if cnt == 2: + if remote_node is None and iallocator is None: raise errors.OpPrereqError("When changing the secondary either an" " iallocator script must be used or the" - " new node given") - elif cnt == 0: + " new node given", errors.ECODE_INVAL) + + if remote_node is not None and iallocator is not None: raise errors.OpPrereqError("Give either the iallocator or the new" - " secondary, not both") - else: # not replacing the secondary - if cnt != 2: - raise errors.OpPrereqError("The iallocator and new node options can" - " be used only when changing the" - " secondary node") + " secondary, not both", errors.ECODE_INVAL) + + elif remote_node is not None or iallocator is not None: + # Not replacing the secondary + raise errors.OpPrereqError("The iallocator and new node options can" + " only be used when changing the" + " secondary node", errors.ECODE_INVAL) @staticmethod def _RunAllocator(lu, iallocator_name, instance_name, relocate_from): @@ -5301,12 +6561,15 @@ class _DiskReplacer: if not ial.success: raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':" - " %s" % (iallocator_name, ial.info)) + " %s" % (iallocator_name, ial.info), + errors.ECODE_NORES) if len(ial.nodes) != ial.required_nodes: raise errors.OpPrereqError("iallocator '%s' returned invalid number" " of nodes (%s), required %s" % - (len(ial.nodes), ial.required_nodes)) + (iallocator_name, + len(ial.nodes), ial.required_nodes), + errors.ECODE_FAULT) remote_node_name = ial.nodes[0] @@ -5315,32 +6578,50 @@ class _DiskReplacer: return remote_node_name + def _FindFaultyDisks(self, node_name): + return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance, + node_name, True) + def CheckPrereq(self): """Check prerequisites. This checks that the instance is in the cluster. """ - self.instance = self.cfg.GetInstanceInfo(self.instance_name) - assert self.instance is not None, \ + self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name) + assert instance is not None, \ "Cannot retrieve locked instance %s" % self.instance_name - if self.instance.disk_template != constants.DT_DRBD8: + if instance.disk_template != constants.DT_DRBD8: raise errors.OpPrereqError("Can only run replace disks for DRBD8-based" - " instances") + " instances", errors.ECODE_INVAL) - if len(self.instance.secondary_nodes) != 1: + if len(instance.secondary_nodes) != 1: raise errors.OpPrereqError("The instance has a strange layout," " expected one secondary but found %d" % - len(self.instance.secondary_nodes)) + len(instance.secondary_nodes), + errors.ECODE_FAULT) + + if not self.delay_iallocator: + self._CheckPrereq2() + + def _CheckPrereq2(self): + """Check prerequisites, second part. - secondary_node = self.instance.secondary_nodes[0] + This function should always be part of CheckPrereq. It was separated and is + now called from Exec because during node evacuation iallocator was only + called with an unmodified cluster model, not taking planned changes into + account. + + """ + instance = self.instance + secondary_node = instance.secondary_nodes[0] if self.iallocator_name is None: remote_node = self.remote_node else: remote_node = self._RunAllocator(self.lu, self.iallocator_name, - self.instance.name, secondary_node) + instance.name, instance.secondary_nodes) if remote_node is not None: self.remote_node_info = self.cfg.GetNodeInfo(remote_node) @@ -5351,44 +6632,76 @@ class _DiskReplacer: if remote_node == self.instance.primary_node: raise errors.OpPrereqError("The specified node is the primary node of" - " the instance.") + " the instance.", errors.ECODE_INVAL) if remote_node == secondary_node: raise errors.OpPrereqError("The specified node is already the" - " secondary node of the instance.") + " secondary node of the instance.", + errors.ECODE_INVAL) + + if self.disks and self.mode in (constants.REPLACE_DISK_AUTO, + constants.REPLACE_DISK_CHG): + raise errors.OpPrereqError("Cannot specify disks to be replaced", + errors.ECODE_INVAL) + + if self.mode == constants.REPLACE_DISK_AUTO: + faulty_primary = self._FindFaultyDisks(instance.primary_node) + faulty_secondary = self._FindFaultyDisks(secondary_node) + + if faulty_primary and faulty_secondary: + raise errors.OpPrereqError("Instance %s has faulty disks on more than" + " one node and can not be repaired" + " automatically" % self.instance_name, + errors.ECODE_STATE) + + if faulty_primary: + self.disks = faulty_primary + self.target_node = instance.primary_node + self.other_node = secondary_node + check_nodes = [self.target_node, self.other_node] + elif faulty_secondary: + self.disks = faulty_secondary + self.target_node = secondary_node + self.other_node = instance.primary_node + check_nodes = [self.target_node, self.other_node] + else: + self.disks = [] + check_nodes = [] - if self.mode == constants.REPLACE_DISK_PRI: - self.target_node = self.instance.primary_node - self.other_node = secondary_node - check_nodes = [self.target_node, self.other_node] + else: + # Non-automatic modes + if self.mode == constants.REPLACE_DISK_PRI: + self.target_node = instance.primary_node + self.other_node = secondary_node + check_nodes = [self.target_node, self.other_node] - elif self.mode == constants.REPLACE_DISK_SEC: - self.target_node = secondary_node - self.other_node = self.instance.primary_node - check_nodes = [self.target_node, self.other_node] + elif self.mode == constants.REPLACE_DISK_SEC: + self.target_node = secondary_node + self.other_node = instance.primary_node + check_nodes = [self.target_node, self.other_node] - elif self.mode == constants.REPLACE_DISK_CHG: - self.new_node = remote_node - self.other_node = self.instance.primary_node - self.target_node = secondary_node - check_nodes = [self.new_node, self.other_node] + elif self.mode == constants.REPLACE_DISK_CHG: + self.new_node = remote_node + self.other_node = instance.primary_node + self.target_node = secondary_node + check_nodes = [self.new_node, self.other_node] - _CheckNodeNotDrained(self.lu, remote_node) + _CheckNodeNotDrained(self.lu, remote_node) - else: - raise errors.ProgrammerError("Unhandled disk replace mode (%s)" % - self.mode) + else: + raise errors.ProgrammerError("Unhandled disk replace mode (%s)" % + self.mode) + + # If not specified all disks should be replaced + if not self.disks: + self.disks = range(len(self.instance.disks)) for node in check_nodes: _CheckNodeOnline(self.lu, node) - # If not specified all disks should be replaced - if not self.disks: - self.disks = range(len(self.instance.disks)) - # Check whether disks are valid for disk_idx in self.disks: - self.instance.FindDisk(disk_idx) + instance.FindDisk(disk_idx) # Get secondary node IP addresses node_2nd_ip = {} @@ -5399,12 +6712,22 @@ class _DiskReplacer: self.node_secondary_ip = node_2nd_ip - def Exec(self): + def Exec(self, feedback_fn): """Execute disk replacement. This dispatches the disk replacement to the appropriate handler. """ + if self.delay_iallocator: + self._CheckPrereq2() + + if not self.disks: + feedback_fn("No disks need replacement") + return + + feedback_fn("Replacing disk(s) %s for %s" % + (utils.CommaJoin(self.disks), self.instance.name)) + activate_disks = (not self.instance.admin_up) # Activate the instance disks if we're replacing them on a down instance @@ -5412,13 +6735,17 @@ class _DiskReplacer: _StartInstanceDisks(self.lu, self.instance, True) try: - if self.mode == constants.REPLACE_DISK_CHG: - return self._ExecDrbd8Secondary() + # Should we replace the secondary node? + if self.new_node is not None: + fn = self._ExecDrbd8Secondary else: - return self._ExecDrbd8DiskOnly() + fn = self._ExecDrbd8DiskOnly + + return fn(feedback_fn) finally: - # Deactivate the instance disks if we're replacing them on a down instance + # Deactivate the instance disks if we're replacing them on a + # down instance if activate_disks: _SafeShutdownInstanceDisks(self.lu, self.instance) @@ -5504,7 +6831,7 @@ class _DiskReplacer: return iv_names def _CheckDevices(self, node_name, iv_names): - for name, (dev, old_lvs, new_lvs) in iv_names.iteritems(): + for name, (dev, _, _) in iv_names.iteritems(): self.cfg.SetDiskID(dev, node_name) result = self.rpc.call_blockdev_find(node_name, dev) @@ -5516,11 +6843,11 @@ class _DiskReplacer: raise errors.OpExecError("Can't find DRBD device %s: %s" % (name, msg)) - if result.payload[5]: + if result.payload.is_degraded: raise errors.OpExecError("DRBD device %s is degraded!" % name) def _RemoveOldStorage(self, node_name, iv_names): - for name, (dev, old_lvs, _) in iv_names.iteritems(): + for name, (_, old_lvs, _) in iv_names.iteritems(): self.lu.LogInfo("Remove logical volumes for %s" % name) for lv in old_lvs: @@ -5531,7 +6858,11 @@ class _DiskReplacer: self.lu.LogWarning("Can't remove old LV: %s" % msg, hint="remove unused LVs manually") - def _ExecDrbd8DiskOnly(self): + def _ReleaseNodeLock(self, node_name): + """Releases the lock for a given node.""" + self.lu.context.glm.release(locking.LEVEL_NODE, node_name) + + def _ExecDrbd8DiskOnly(self, feedback_fn): """Replace a disk on the primary or secondary for DRBD 8. The algorithm for replace is quite complicated: @@ -5575,7 +6906,8 @@ class _DiskReplacer: for dev, old_lvs, new_lvs in iv_names.itervalues(): self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name) - result = self.rpc.call_blockdev_removechildren(self.target_node, dev, old_lvs) + result = self.rpc.call_blockdev_removechildren(self.target_node, dev, + old_lvs) result.Raise("Can't detach drbd from local storage on node" " %s for device %s" % (self.target_node, dev.iv_name)) #dev.children = [] @@ -5601,14 +6933,16 @@ class _DiskReplacer: rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix))) self.lu.LogInfo("Renaming the old LVs on the target node") - result = self.rpc.call_blockdev_rename(self.target_node, rename_old_to_new) + result = self.rpc.call_blockdev_rename(self.target_node, + rename_old_to_new) result.Raise("Can't rename old LVs on node %s" % self.target_node) # Now we rename the new LVs to the old LVs self.lu.LogInfo("Renaming the new LVs on the target node") rename_new_to_old = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)] - result = self.rpc.call_blockdev_rename(self.target_node, rename_new_to_old) + result = self.rpc.call_blockdev_rename(self.target_node, + rename_new_to_old) result.Raise("Can't rename new LVs on node %s" % self.target_node) for old, new in zip(old_lvs, new_lvs): @@ -5621,11 +6955,13 @@ class _DiskReplacer: # Now that the new lvs have the old name, we can add them to the device self.lu.LogInfo("Adding new mirror component on %s" % self.target_node) - result = self.rpc.call_blockdev_addchildren(self.target_node, dev, new_lvs) + result = self.rpc.call_blockdev_addchildren(self.target_node, dev, + new_lvs) msg = result.fail_msg if msg: for new_lv in new_lvs: - msg2 = self.rpc.call_blockdev_remove(self.target_node, new_lv).fail_msg + msg2 = self.rpc.call_blockdev_remove(self.target_node, + new_lv).fail_msg if msg2: self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2, hint=("cleanup manually the unused logical" @@ -5634,22 +6970,35 @@ class _DiskReplacer: dev.children = new_lvs - self.cfg.Update(self.instance) + self.cfg.Update(self.instance, feedback_fn) + + cstep = 5 + if self.early_release: + self.lu.LogStep(cstep, steps_total, "Removing old storage") + cstep += 1 + self._RemoveOldStorage(self.target_node, iv_names) + # only release the lock if we're doing secondary replace, since + # we use the primary node later + if self.target_node != self.instance.primary_node: + self._ReleaseNodeLock(self.target_node) # Wait for sync # This can fail as the old devices are degraded and _WaitForSync # does a combined result over all disks, so we don't check its return value - self.lu.LogStep(5, steps_total, "Sync devices") - _WaitForSync(self.lu, self.instance, unlock=True) + self.lu.LogStep(cstep, steps_total, "Sync devices") + cstep += 1 + _WaitForSync(self.lu, self.instance) # Check all devices manually self._CheckDevices(self.instance.primary_node, iv_names) # Step: remove old storage - self.lu.LogStep(6, steps_total, "Removing old storage") - self._RemoveOldStorage(self.target_node, iv_names) + if not self.early_release: + self.lu.LogStep(cstep, steps_total, "Removing old storage") + cstep += 1 + self._RemoveOldStorage(self.target_node, iv_names) - def _ExecDrbd8Secondary(self): + def _ExecDrbd8Secondary(self, feedback_fn): """Replace the secondary node for DRBD 8. The algorithm for replace is quite complicated: @@ -5693,13 +7042,15 @@ class _DiskReplacer: # after this, we must manually remove the drbd minors on both the # error and the success paths self.lu.LogStep(4, steps_total, "Changing drbd configuration") - minors = self.cfg.AllocateDRBDMinor([self.new_node for dev in self.instance.disks], + minors = self.cfg.AllocateDRBDMinor([self.new_node + for dev in self.instance.disks], self.instance.name) - logging.debug("Allocated minors %r" % (minors,)) + logging.debug("Allocated minors %r", minors) iv_names = {} for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)): - self.lu.LogInfo("activating a new drbd on %s for disk/%d" % (self.new_node, idx)) + self.lu.LogInfo("activating a new drbd on %s for disk/%d" % + (self.new_node, idx)) # create new devices on new_node; note that we create two IDs: # one without port, so the drbd will be activated without # networking information on the new node at this stage, and one @@ -5708,10 +7059,13 @@ class _DiskReplacer: if self.instance.primary_node == o_node1: p_minor = o_minor1 else: + assert self.instance.primary_node == o_node2, "Three-node instance?" p_minor = o_minor2 - new_alone_id = (self.instance.primary_node, self.new_node, None, p_minor, new_minor, o_secret) - new_net_id = (self.instance.primary_node, self.new_node, o_port, p_minor, new_minor, o_secret) + new_alone_id = (self.instance.primary_node, self.new_node, None, + p_minor, new_minor, o_secret) + new_net_id = (self.instance.primary_node, self.new_node, o_port, + p_minor, new_minor, o_secret) iv_names[idx] = (dev, dev.children, new_net_id) logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor, @@ -5739,8 +7093,10 @@ class _DiskReplacer: " soon as possible")) self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)") - result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node], self.node_secondary_ip, - self.instance.disks)[self.instance.primary_node] + result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node], + self.node_secondary_ip, + self.instance.disks)\ + [self.instance.primary_node] msg = result.fail_msg if msg: @@ -5756,33 +7112,113 @@ class _DiskReplacer: dev.logical_id = new_logical_id self.cfg.SetDiskID(dev, self.instance.primary_node) - self.cfg.Update(self.instance) + self.cfg.Update(self.instance, feedback_fn) # and now perform the drbd attach self.lu.LogInfo("Attaching primary drbds to new secondary" " (standalone => connected)") - result = self.rpc.call_drbd_attach_net([self.instance.primary_node, self.new_node], self.node_secondary_ip, - self.instance.disks, self.instance.name, + result = self.rpc.call_drbd_attach_net([self.instance.primary_node, + self.new_node], + self.node_secondary_ip, + self.instance.disks, + self.instance.name, False) for to_node, to_result in result.items(): msg = to_result.fail_msg if msg: - self.lu.LogWarning("Can't attach drbd disks on node %s: %s", to_node, msg, + self.lu.LogWarning("Can't attach drbd disks on node %s: %s", + to_node, msg, hint=("please do a gnt-instance info to see the" " status of disks")) + cstep = 5 + if self.early_release: + self.lu.LogStep(cstep, steps_total, "Removing old storage") + cstep += 1 + self._RemoveOldStorage(self.target_node, iv_names) + self._ReleaseNodeLock([self.target_node, self.new_node]) # Wait for sync # This can fail as the old devices are degraded and _WaitForSync # does a combined result over all disks, so we don't check its return value - self.lu.LogStep(5, steps_total, "Sync devices") - _WaitForSync(self.lu, self.instance, unlock=True) + self.lu.LogStep(cstep, steps_total, "Sync devices") + cstep += 1 + _WaitForSync(self.lu, self.instance) # Check all devices manually self._CheckDevices(self.instance.primary_node, iv_names) # Step: remove old storage - self.lu.LogStep(6, steps_total, "Removing old storage") - self._RemoveOldStorage(self.target_node, iv_names) + if not self.early_release: + self.lu.LogStep(cstep, steps_total, "Removing old storage") + self._RemoveOldStorage(self.target_node, iv_names) + + +class LURepairNodeStorage(NoHooksLU): + """Repairs the volume group on a node. + + """ + _OP_REQP = ["node_name"] + REQ_BGL = False + + def CheckArguments(self): + node_name = self.cfg.ExpandNodeName(self.op.node_name) + if node_name is None: + raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name, + errors.ECODE_NOENT) + + self.op.node_name = node_name + + def ExpandNames(self): + self.needed_locks = { + locking.LEVEL_NODE: [self.op.node_name], + } + + def _CheckFaultyDisks(self, instance, node_name): + """Ensure faulty disks abort the opcode or at least warn.""" + try: + if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance, + node_name, True): + raise errors.OpPrereqError("Instance '%s' has faulty disks on" + " node '%s'" % (instance.name, node_name), + errors.ECODE_STATE) + except errors.OpPrereqError, err: + if self.op.ignore_consistency: + self.proc.LogWarning(str(err.args[0])) + else: + raise + + def CheckPrereq(self): + """Check prerequisites. + + """ + storage_type = self.op.storage_type + + if (constants.SO_FIX_CONSISTENCY not in + constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])): + raise errors.OpPrereqError("Storage units of type '%s' can not be" + " repaired" % storage_type, + errors.ECODE_INVAL) + + # Check whether any instance on this node has faulty disks + for inst in _GetNodeInstances(self.cfg, self.op.node_name): + if not inst.admin_up: + continue + check_nodes = set(inst.all_nodes) + check_nodes.discard(self.op.node_name) + for inst_node_name in check_nodes: + self._CheckFaultyDisks(inst, inst_node_name) + + def Exec(self, feedback_fn): + feedback_fn("Repairing storage unit '%s' on %s ..." % + (self.op.name, self.op.node_name)) + + st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) + result = self.rpc.call_storage_execute(self.op.node_name, + self.op.storage_type, st_args, + self.op.name, + constants.SO_FIX_CONSISTENCY) + result.Raise("Failed to repair storage unit '%s' on %s" % + (self.op.name, self.op.node_name)) class LUGrowDisk(LogicalUnit): @@ -5838,7 +7274,7 @@ class LUGrowDisk(LogicalUnit): if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8): raise errors.OpPrereqError("Instance's disk layout does not support" - " growing.") + " growing.", errors.ECODE_INVAL) self.disk = instance.FindDisk(self.op.disk) @@ -5850,11 +7286,12 @@ class LUGrowDisk(LogicalUnit): vg_free = info.payload.get('vg_free', None) if not isinstance(vg_free, int): raise errors.OpPrereqError("Can't compute free disk space on" - " node %s" % node) + " node %s" % node, errors.ECODE_ENVIRON) if self.op.amount > vg_free: raise errors.OpPrereqError("Not enough disk space on target node %s:" " %d MiB available, %d MiB required" % - (node, vg_free, self.op.amount)) + (node, vg_free, self.op.amount), + errors.ECODE_NORES) def Exec(self, feedback_fn): """Execute disk grow. @@ -5866,8 +7303,16 @@ class LUGrowDisk(LogicalUnit): self.cfg.SetDiskID(disk, node) result = self.rpc.call_blockdev_grow(node, disk, self.op.amount) result.Raise("Grow request failed to node %s" % node) + + # TODO: Rewrite code to work properly + # DRBD goes into sync mode for a short amount of time after executing the + # "resize" command. DRBD 8.x below version 8.0.13 contains a bug whereby + # calling "resize" in sync mode fails. Sleeping for a short amount of + # time is a work-around. + time.sleep(5) + disk.RecordGrow(self.op.amount) - self.cfg.Update(instance) + self.cfg.Update(instance, feedback_fn) if self.op.wait_for_sync: disk_abort = not _WaitForSync(self, instance) if disk_abort: @@ -5887,14 +7332,16 @@ class LUQueryInstanceData(NoHooksLU): self.share_locks = dict.fromkeys(locking.LEVELS, 1) if not isinstance(self.op.instances, list): - raise errors.OpPrereqError("Invalid argument type 'instances'") + raise errors.OpPrereqError("Invalid argument type 'instances'", + errors.ECODE_INVAL) if self.op.instances: self.wanted_names = [] for name in self.op.instances: full_name = self.cfg.ExpandInstanceName(name) if full_name is None: - raise errors.OpPrereqError("Instance '%s' not known" % name) + raise errors.OpPrereqError("Instance '%s' not known" % name, + errors.ECODE_NOENT) self.wanted_names.append(full_name) self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names else: @@ -5921,22 +7368,33 @@ class LUQueryInstanceData(NoHooksLU): in self.wanted_names] return + def _ComputeBlockdevStatus(self, node, instance_name, dev): + """Returns the status of a block device + + """ + if self.op.static or not node: + return None + + self.cfg.SetDiskID(dev, node) + + result = self.rpc.call_blockdev_find(node, dev) + if result.offline: + return None + + result.Raise("Can't compute disk status for %s" % instance_name) + + status = result.payload + if status is None: + return None + + return (status.dev_path, status.major, status.minor, + status.sync_percent, status.estimated_time, + status.is_degraded, status.ldisk_status) + def _ComputeDiskStatus(self, instance, snode, dev): """Compute block device status. """ - static = self.op.static - if not static: - self.cfg.SetDiskID(dev, instance.primary_node) - dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev) - if dev_pstatus.offline: - dev_pstatus = None - else: - dev_pstatus.Raise("Can't compute disk status for %s" % instance.name) - dev_pstatus = dev_pstatus.payload - else: - dev_pstatus = None - if dev.dev_type in constants.LDS_DRBD: # we change the snode then (otherwise we use the one passed in) if dev.logical_id[0] == instance.primary_node: @@ -5944,16 +7402,9 @@ class LUQueryInstanceData(NoHooksLU): else: snode = dev.logical_id[0] - if snode and not static: - self.cfg.SetDiskID(dev, snode) - dev_sstatus = self.rpc.call_blockdev_find(snode, dev) - if dev_sstatus.offline: - dev_sstatus = None - else: - dev_sstatus.Raise("Can't compute disk status for %s" % instance.name) - dev_sstatus = dev_sstatus.payload - else: - dev_sstatus = None + dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node, + instance.name, dev) + dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev) if dev.children: dev_children = [self._ComputeDiskStatus(instance, snode, child) @@ -6015,9 +7466,13 @@ class LUQueryInstanceData(NoHooksLU): "hypervisor": instance.hypervisor, "network_port": instance.network_port, "hv_instance": instance.hvparams, - "hv_actual": cluster.FillHV(instance), + "hv_actual": cluster.FillHV(instance, skip_globals=True), "be_instance": instance.beparams, "be_actual": cluster.FillBE(instance), + "serial_no": instance.serial_no, + "mtime": instance.mtime, + "ctime": instance.ctime, + "uuid": instance.uuid, } result[instance.name] = idict @@ -6046,7 +7501,10 @@ class LUSetInstanceParams(LogicalUnit): self.op.force = getattr(self.op, "force", False) if not (self.op.nics or self.op.disks or self.op.hvparams or self.op.beparams): - raise errors.OpPrereqError("No changes submitted") + raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL) + + if self.op.hvparams: + _CheckGlobalHvParams(self.op.hvparams) # Disk validation disk_addremove = 0 @@ -6058,33 +7516,35 @@ class LUSetInstanceParams(LogicalUnit): disk_addremove += 1 else: if not isinstance(disk_op, int): - raise errors.OpPrereqError("Invalid disk index") + raise errors.OpPrereqError("Invalid disk index", errors.ECODE_INVAL) if not isinstance(disk_dict, dict): msg = "Invalid disk value: expected dict, got '%s'" % disk_dict - raise errors.OpPrereqError(msg) + raise errors.OpPrereqError(msg, errors.ECODE_INVAL) if disk_op == constants.DDM_ADD: mode = disk_dict.setdefault('mode', constants.DISK_RDWR) if mode not in constants.DISK_ACCESS_SET: - raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode) + raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode, + errors.ECODE_INVAL) size = disk_dict.get('size', None) if size is None: - raise errors.OpPrereqError("Required disk parameter size missing") + raise errors.OpPrereqError("Required disk parameter size missing", + errors.ECODE_INVAL) try: size = int(size) - except ValueError, err: + except (TypeError, ValueError), err: raise errors.OpPrereqError("Invalid disk size parameter: %s" % - str(err)) + str(err), errors.ECODE_INVAL) disk_dict['size'] = size else: # modification of disk if 'size' in disk_dict: raise errors.OpPrereqError("Disk size change not possible, use" - " grow-disk") + " grow-disk", errors.ECODE_INVAL) if disk_addremove > 1: raise errors.OpPrereqError("Only one disk add or remove operation" - " supported at a time") + " supported at a time", errors.ECODE_INVAL) # NIC validation nic_addremove = 0 @@ -6096,10 +7556,10 @@ class LUSetInstanceParams(LogicalUnit): nic_addremove += 1 else: if not isinstance(nic_op, int): - raise errors.OpPrereqError("Invalid nic index") + raise errors.OpPrereqError("Invalid nic index", errors.ECODE_INVAL) if not isinstance(nic_dict, dict): msg = "Invalid nic value: expected dict, got '%s'" % nic_dict - raise errors.OpPrereqError(msg) + raise errors.OpPrereqError(msg, errors.ECODE_INVAL) # nic_dict should be a dict nic_ip = nic_dict.get('ip', None) @@ -6108,13 +7568,14 @@ class LUSetInstanceParams(LogicalUnit): nic_dict['ip'] = None else: if not utils.IsValidIP(nic_ip): - raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip) + raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip, + errors.ECODE_INVAL) nic_bridge = nic_dict.get('bridge', None) nic_link = nic_dict.get('link', None) if nic_bridge and nic_link: raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'" - " at the same time") + " at the same time", errors.ECODE_INVAL) elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE: nic_dict['bridge'] = None elif nic_link and nic_link.lower() == constants.VALUE_NONE: @@ -6128,15 +7589,16 @@ class LUSetInstanceParams(LogicalUnit): if 'mac' in nic_dict: nic_mac = nic_dict['mac'] if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE): - if not utils.IsValidMac(nic_mac): - raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac) + nic_mac = utils.NormalizeAndValidateMac(nic_mac) + if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO: raise errors.OpPrereqError("'auto' is not a valid MAC address when" - " modifying an existing nic") + " modifying an existing nic", + errors.ECODE_INVAL) if nic_addremove > 1: raise errors.OpPrereqError("Only one NIC add or remove operation" - " supported at a time") + " supported at a time", errors.ECODE_INVAL) def ExpandNames(self): self._ExpandAndLockInstance() @@ -6198,7 +7660,8 @@ class LUSetInstanceParams(LogicalUnit): nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) return env, nl, nl - def _GetUpdatedParams(self, old_params, update_dict, + @staticmethod + def _GetUpdatedParams(old_params, update_dict, default_values, parameter_types): """Return the new params dict for the given params. @@ -6309,7 +7772,8 @@ class LUSetInstanceParams(LogicalUnit): if miss_mem > 0: raise errors.OpPrereqError("This change will prevent the instance" " from starting, due to %d MB of memory" - " missing on its primary node" % miss_mem) + " missing on its primary node" % miss_mem, + errors.ECODE_NORES) if be_new[constants.BE_AUTO_BALANCE]: for node, nres in nodeinfo.items(): @@ -6332,14 +7796,20 @@ class LUSetInstanceParams(LogicalUnit): for nic_op, nic_dict in self.op.nics: if nic_op == constants.DDM_REMOVE: if not instance.nics: - raise errors.OpPrereqError("Instance has no NICs, cannot remove") + raise errors.OpPrereqError("Instance has no NICs, cannot remove", + errors.ECODE_INVAL) continue if nic_op != constants.DDM_ADD: # an existing nic + if not instance.nics: + raise errors.OpPrereqError("Invalid NIC index %s, instance has" + " no NICs" % nic_op, + errors.ECODE_INVAL) if nic_op < 0 or nic_op >= len(instance.nics): raise errors.OpPrereqError("Invalid NIC index %s, valid values" " are 0 to %d" % - (nic_op, len(instance.nics))) + (nic_op, len(instance.nics) - 1), + errors.ECODE_INVAL) old_nic_params = instance.nics[nic_op].nicparams old_nic_ip = instance.nics[nic_op].ip else: @@ -6370,7 +7840,7 @@ class LUSetInstanceParams(LogicalUnit): if self.force: self.warn.append(msg) else: - raise errors.OpPrereqError(msg) + raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON) if new_nic_mode == constants.NIC_MODE_ROUTED: if 'ip' in nic_dict: nic_ip = nic_dict['ip'] @@ -6378,49 +7848,57 @@ class LUSetInstanceParams(LogicalUnit): nic_ip = old_nic_ip if nic_ip is None: raise errors.OpPrereqError('Cannot set the nic ip to None' - ' on a routed nic') + ' on a routed nic', errors.ECODE_INVAL) if 'mac' in nic_dict: nic_mac = nic_dict['mac'] if nic_mac is None: - raise errors.OpPrereqError('Cannot set the nic mac to None') + raise errors.OpPrereqError('Cannot set the nic mac to None', + errors.ECODE_INVAL) elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE): # otherwise generate the mac - nic_dict['mac'] = self.cfg.GenerateMAC() + nic_dict['mac'] = self.cfg.GenerateMAC(self.proc.GetECId()) else: # or validate/reserve the current one - if self.cfg.IsMacInUse(nic_mac): + try: + self.cfg.ReserveMAC(nic_mac, self.proc.GetECId()) + except errors.ReservationError: raise errors.OpPrereqError("MAC address %s already in use" - " in cluster" % nic_mac) + " in cluster" % nic_mac, + errors.ECODE_NOTUNIQUE) # DISK processing if self.op.disks and instance.disk_template == constants.DT_DISKLESS: raise errors.OpPrereqError("Disk operations not supported for" - " diskless instances") - for disk_op, disk_dict in self.op.disks: + " diskless instances", + errors.ECODE_INVAL) + for disk_op, _ in self.op.disks: if disk_op == constants.DDM_REMOVE: if len(instance.disks) == 1: raise errors.OpPrereqError("Cannot remove the last disk of" - " an instance") + " an instance", + errors.ECODE_INVAL) ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor]) ins_l = ins_l[pnode] msg = ins_l.fail_msg if msg: raise errors.OpPrereqError("Can't contact node %s: %s" % - (pnode, msg)) + (pnode, msg), errors.ECODE_ENVIRON) if instance.name in ins_l.payload: raise errors.OpPrereqError("Instance is running, can't remove" - " disks.") + " disks.", errors.ECODE_STATE) if (disk_op == constants.DDM_ADD and len(instance.nics) >= constants.MAX_DISKS): raise errors.OpPrereqError("Instance has too many disks (%d), cannot" - " add more" % constants.MAX_DISKS) + " add more" % constants.MAX_DISKS, + errors.ECODE_STATE) if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE): # an existing disk if disk_op < 0 or disk_op >= len(instance.disks): raise errors.OpPrereqError("Invalid disk index %s, valid values" " are 0 to %d" % - (disk_op, len(instance.disks))) + (disk_op, len(instance.disks)), + errors.ECODE_INVAL) return @@ -6437,7 +7915,6 @@ class LUSetInstanceParams(LogicalUnit): result = [] instance = self.instance - cluster = self.cluster # disk changes for disk_op, disk_dict in self.op.disks: if disk_op == constants.DDM_REMOVE: @@ -6512,8 +7989,8 @@ class LUSetInstanceParams(LogicalUnit): for key in 'mac', 'ip': if key in nic_dict: setattr(instance.nics[nic_op], key, nic_dict[key]) - if nic_op in self.nic_pnew: - instance.nics[nic_op].nicparams = self.nic_pnew[nic_op] + if nic_op in self.nic_pinst: + instance.nics[nic_op].nicparams = self.nic_pinst[nic_op] for key, val in nic_dict.iteritems(): result.append(("nic.%s/%d" % (key, nic_op), val)) @@ -6529,7 +8006,7 @@ class LUSetInstanceParams(LogicalUnit): for key, val in self.op.beparams.iteritems(): result.append(("be/%s" % key, val)) - self.cfg.Update(instance) + self.cfg.Update(instance, feedback_fn) return result @@ -6585,6 +8062,13 @@ class LUExportInstance(LogicalUnit): _OP_REQP = ["instance_name", "target_node", "shutdown"] REQ_BGL = False + def CheckArguments(self): + """Check the arguments. + + """ + self.shutdown_timeout = getattr(self.op, "shutdown_timeout", + constants.DEFAULT_SHUTDOWN_TIMEOUT) + def ExpandNames(self): self._ExpandAndLockInstance() # FIXME: lock only instance primary and destination node @@ -6610,6 +8094,7 @@ class LUExportInstance(LogicalUnit): env = { "EXPORT_NODE": self.op.target_node, "EXPORT_DO_SHUTDOWN": self.op.shutdown, + "SHUTDOWN_TIMEOUT": self.shutdown_timeout, } env.update(_BuildInstanceHookEnvByObject(self, self.instance)) nl = [self.cfg.GetMasterNode(), self.instance.primary_node, @@ -6633,7 +8118,8 @@ class LUExportInstance(LogicalUnit): if self.dst_node is None: # This is wrong node name, not a non-locked node - raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node) + raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node, + errors.ECODE_NOENT) _CheckNodeOnline(self, self.dst_node.name) _CheckNodeNotDrained(self, self.dst_node.name) @@ -6641,7 +8127,7 @@ class LUExportInstance(LogicalUnit): for disk in self.instance.disks: if disk.dev_type == constants.LD_FILE: raise errors.OpPrereqError("Export not supported for instances with" - " file-based disks") + " file-based disks", errors.ECODE_INVAL) def Exec(self, feedback_fn): """Export an instance to an image in the cluster. @@ -6650,9 +8136,12 @@ class LUExportInstance(LogicalUnit): instance = self.instance dst_node = self.dst_node src_node = instance.primary_node + if self.op.shutdown: # shutdown the instance, but not the disks - result = self.rpc.call_instance_shutdown(src_node, instance) + feedback_fn("Shutting down instance %s" % instance.name) + result = self.rpc.call_instance_shutdown(src_node, instance, + self.shutdown_timeout) result.Raise("Could not shutdown instance %s on" " node %s" % (instance.name, src_node)) @@ -6665,51 +8154,84 @@ class LUExportInstance(LogicalUnit): for disk in instance.disks: self.cfg.SetDiskID(disk, src_node) - try: - for idx, disk in enumerate(instance.disks): - # result.payload will be a snapshot of an lvm leaf of the one we passed - result = self.rpc.call_blockdev_snapshot(src_node, disk) - msg = result.fail_msg - if msg: - self.LogWarning("Could not snapshot disk/%s on node %s: %s", - idx, src_node, msg) - snap_disks.append(False) - else: - disk_id = (vgname, result.payload) - new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size, - logical_id=disk_id, physical_id=disk_id, - iv_name=disk.iv_name) - snap_disks.append(new_dev) + activate_disks = (not instance.admin_up) - finally: - if self.op.shutdown and instance.admin_up: - result = self.rpc.call_instance_start(src_node, instance, None, None) - msg = result.fail_msg - if msg: - _ShutdownInstanceDisks(self, instance) - raise errors.OpExecError("Could not start instance: %s" % msg) + if activate_disks: + # Activate the instance disks if we'exporting a stopped instance + feedback_fn("Activating disks for %s" % instance.name) + _StartInstanceDisks(self, instance, None) - # TODO: check for size + try: + # per-disk results + dresults = [] + try: + for idx, disk in enumerate(instance.disks): + feedback_fn("Creating a snapshot of disk/%s on node %s" % + (idx, src_node)) + + # result.payload will be a snapshot of an lvm leaf of the one we + # passed + result = self.rpc.call_blockdev_snapshot(src_node, disk) + msg = result.fail_msg + if msg: + self.LogWarning("Could not snapshot disk/%s on node %s: %s", + idx, src_node, msg) + snap_disks.append(False) + else: + disk_id = (vgname, result.payload) + new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size, + logical_id=disk_id, physical_id=disk_id, + iv_name=disk.iv_name) + snap_disks.append(new_dev) - cluster_name = self.cfg.GetClusterName() - for idx, dev in enumerate(snap_disks): - if dev: - result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name, - instance, cluster_name, idx) - msg = result.fail_msg - if msg: - self.LogWarning("Could not export disk/%s from node %s to" - " node %s: %s", idx, src_node, dst_node.name, msg) - msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg - if msg: - self.LogWarning("Could not remove snapshot for disk/%d from node" - " %s: %s", idx, src_node, msg) + finally: + if self.op.shutdown and instance.admin_up: + feedback_fn("Starting instance %s" % instance.name) + result = self.rpc.call_instance_start(src_node, instance, None, None) + msg = result.fail_msg + if msg: + _ShutdownInstanceDisks(self, instance) + raise errors.OpExecError("Could not start instance: %s" % msg) + + # TODO: check for size + + cluster_name = self.cfg.GetClusterName() + for idx, dev in enumerate(snap_disks): + feedback_fn("Exporting snapshot %s from %s to %s" % + (idx, src_node, dst_node.name)) + if dev: + # FIXME: pass debug from opcode to backend + result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name, + instance, cluster_name, + idx, 0) + msg = result.fail_msg + if msg: + self.LogWarning("Could not export disk/%s from node %s to" + " node %s: %s", idx, src_node, dst_node.name, msg) + dresults.append(False) + else: + dresults.append(True) + msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg + if msg: + self.LogWarning("Could not remove snapshot for disk/%d from node" + " %s: %s", idx, src_node, msg) + else: + dresults.append(False) - result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks) - msg = result.fail_msg - if msg: - self.LogWarning("Could not finalize export for instance %s" - " on node %s: %s", instance.name, dst_node.name, msg) + feedback_fn("Finalizing export on %s" % dst_node.name) + result = self.rpc.call_finalize_export(dst_node.name, instance, + snap_disks) + fin_resu = True + msg = result.fail_msg + if msg: + self.LogWarning("Could not finalize export for instance %s" + " on node %s: %s", instance.name, dst_node.name, msg) + fin_resu = False + + finally: + if activate_disks: + feedback_fn("Deactivating disks for %s" % instance.name) + _ShutdownInstanceDisks(self, instance) nodelist = self.cfg.GetNodeList() nodelist.remove(dst_node.name) @@ -6719,6 +8241,7 @@ class LUExportInstance(LogicalUnit): # substitutes an empty list with the full cluster node list. iname = instance.name if nodelist: + feedback_fn("Removing old exports for instance %s" % iname) exportlist = self.rpc.call_export_list(nodelist) for node in exportlist: if exportlist[node].fail_msg: @@ -6728,6 +8251,7 @@ class LUExportInstance(LogicalUnit): if msg: self.LogWarning("Could not remove older export for instance %s" " on node %s: %s", iname, node, msg) + return fin_resu, dresults class LURemoveExport(NoHooksLU): @@ -6783,7 +8307,7 @@ class LURemoveExport(NoHooksLU): " Domain Name.") -class TagsLU(NoHooksLU): +class TagsLU(NoHooksLU): # pylint: disable-msg=W0223 """Generic tags LU. This is an abstract class which is the parent of all the other tags LUs. @@ -6796,14 +8320,14 @@ class TagsLU(NoHooksLU): name = self.cfg.ExpandNodeName(self.op.name) if name is None: raise errors.OpPrereqError("Invalid node name (%s)" % - (self.op.name,)) + (self.op.name,), errors.ECODE_NOENT) self.op.name = name self.needed_locks[locking.LEVEL_NODE] = name elif self.op.kind == constants.TAG_INSTANCE: name = self.cfg.ExpandInstanceName(self.op.name) if name is None: raise errors.OpPrereqError("Invalid instance name (%s)" % - (self.op.name,)) + (self.op.name,), errors.ECODE_NOENT) self.op.name = name self.needed_locks[locking.LEVEL_INSTANCE] = name @@ -6819,7 +8343,7 @@ class TagsLU(NoHooksLU): self.target = self.cfg.GetInstanceInfo(self.op.name) else: raise errors.OpPrereqError("Wrong tag type requested (%s)" % - str(self.op.kind)) + str(self.op.kind), errors.ECODE_INVAL) class LUGetTags(TagsLU): @@ -6856,7 +8380,7 @@ class LUSearchTags(NoHooksLU): self.re = re.compile(self.op.pattern) except re.error, err: raise errors.OpPrereqError("Invalid search pattern '%s': %s" % - (self.op.pattern, err)) + (self.op.pattern, err), errors.ECODE_INVAL) def Exec(self, feedback_fn): """Returns the tag list. @@ -6902,12 +8426,7 @@ class LUAddTags(TagsLU): self.target.AddTag(tag) except errors.TagError, err: raise errors.OpExecError("Error while setting tag: %s" % str(err)) - try: - self.cfg.Update(self.target) - except errors.ConfigurationError: - raise errors.OpRetryError("There has been a modification to the" - " config file and the operation has been" - " aborted. Please retry.") + self.cfg.Update(self.target, feedback_fn) class LUDelTags(TagsLU): @@ -6933,7 +8452,7 @@ class LUDelTags(TagsLU): diff_names = ["'%s'" % tag for tag in diff_tags] diff_names.sort() raise errors.OpPrereqError("Tag(s) %s not found" % - (",".join(diff_names))) + (",".join(diff_names)), errors.ECODE_NOENT) def Exec(self, feedback_fn): """Remove the tag from the object. @@ -6941,12 +8460,7 @@ class LUDelTags(TagsLU): """ for tag in self.op.tags: self.target.RemoveTag(tag) - try: - self.cfg.Update(self.target) - except errors.ConfigurationError: - raise errors.OpRetryError("There has been a modification to the" - " config file and the operation has been" - " aborted. Please retry.") + self.cfg.Update(self.target, feedback_fn) class LUTestDelay(NoHooksLU): @@ -7004,6 +8518,8 @@ class IAllocator(object): easy usage """ + # pylint: disable-msg=R0902 + # lots of instance attributes _ALLO_KEYS = [ "mem_size", "disks", "disk_template", "os", "tags", "nics", "vcpus", "hypervisor", @@ -7091,11 +8607,12 @@ class IAllocator(object): "master_candidate": ninfo.master_candidate, } - if not ninfo.offline: + if not (ninfo.offline or ninfo.drained): nresult.Raise("Can't get data for node %s" % nname) node_iinfo[nname].Raise("Can't get node instance info from node %s" % nname) remote_info = nresult.payload + for attr in ['memory_total', 'memory_free', 'memory_dom0', 'vg_size', 'vg_free', 'cpu_total']: if attr not in remote_info: @@ -7221,10 +8738,12 @@ class IAllocator(object): " IAllocator" % self.name) if instance.disk_template not in constants.DTS_NET_MIRROR: - raise errors.OpPrereqError("Can't relocate non-mirrored instances") + raise errors.OpPrereqError("Can't relocate non-mirrored instances", + errors.ECODE_INVAL) if len(instance.secondary_nodes) != 1: - raise errors.OpPrereqError("Instance has not exactly one secondary node") + raise errors.OpPrereqError("Instance has not exactly one secondary node", + errors.ECODE_STATE) self.required_nodes = 1 disk_sizes = [{'size': disk.size} for disk in instance.disks] @@ -7312,51 +8831,55 @@ class LUTestAllocator(NoHooksLU): "os", "tags", "nics", "vcpus"]: if not hasattr(self.op, attr): raise errors.OpPrereqError("Missing attribute '%s' on opcode input" % - attr) + attr, errors.ECODE_INVAL) iname = self.cfg.ExpandInstanceName(self.op.name) if iname is not None: raise errors.OpPrereqError("Instance '%s' already in the cluster" % - iname) + iname, errors.ECODE_EXISTS) if not isinstance(self.op.nics, list): - raise errors.OpPrereqError("Invalid parameter 'nics'") + raise errors.OpPrereqError("Invalid parameter 'nics'", + errors.ECODE_INVAL) for row in self.op.nics: if (not isinstance(row, dict) or "mac" not in row or "ip" not in row or "bridge" not in row): - raise errors.OpPrereqError("Invalid contents of the" - " 'nics' parameter") + raise errors.OpPrereqError("Invalid contents of the 'nics'" + " parameter", errors.ECODE_INVAL) if not isinstance(self.op.disks, list): - raise errors.OpPrereqError("Invalid parameter 'disks'") + raise errors.OpPrereqError("Invalid parameter 'disks'", + errors.ECODE_INVAL) for row in self.op.disks: if (not isinstance(row, dict) or "size" not in row or not isinstance(row["size"], int) or "mode" not in row or row["mode"] not in ['r', 'w']): - raise errors.OpPrereqError("Invalid contents of the" - " 'disks' parameter") + raise errors.OpPrereqError("Invalid contents of the 'disks'" + " parameter", errors.ECODE_INVAL) if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None: self.op.hypervisor = self.cfg.GetHypervisorType() elif self.op.mode == constants.IALLOCATOR_MODE_RELOC: if not hasattr(self.op, "name"): - raise errors.OpPrereqError("Missing attribute 'name' on opcode input") + raise errors.OpPrereqError("Missing attribute 'name' on opcode input", + errors.ECODE_INVAL) fname = self.cfg.ExpandInstanceName(self.op.name) if fname is None: raise errors.OpPrereqError("Instance '%s' not found for relocation" % - self.op.name) + self.op.name, errors.ECODE_NOENT) self.op.name = fname self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes else: raise errors.OpPrereqError("Invalid test allocator mode '%s'" % - self.op.mode) + self.op.mode, errors.ECODE_INVAL) if self.op.direction == constants.IALLOCATOR_DIR_OUT: if not hasattr(self.op, "allocator") or self.op.allocator is None: - raise errors.OpPrereqError("Missing allocator name") + raise errors.OpPrereqError("Missing allocator name", + errors.ECODE_INVAL) elif self.op.direction != constants.IALLOCATOR_DIR_IN: raise errors.OpPrereqError("Wrong allocator test '%s'" % - self.op.direction) + self.op.direction, errors.ECODE_INVAL) def Exec(self, feedback_fn): """Run the allocator test.