X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/fe89794e5fe6b4d45ed968307fd1c7bf0fdd7fc4..4c06a2d089f42c7f9d8eabf821aeb6806b0afe28:/lib/cmdlib.py diff --git a/lib/cmdlib.py b/lib/cmdlib.py index ddbc48d..cb922eb 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -21,7 +21,10 @@ """Module implementing the master-side code.""" -# pylint: disable-msg=W0613,W0201 +# pylint: disable-msg=W0201 + +# W0201 since most LU attributes are defined in CheckPrereq or similar +# functions import os import os.path @@ -87,9 +90,9 @@ class LogicalUnit(object): self.recalculate_locks = {} self.__ssh = None # logging - self.LogWarning = processor.LogWarning - self.LogInfo = processor.LogInfo - self.LogStep = processor.LogStep + self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103 + self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103 + self.LogStep = processor.LogStep # pylint: disable-msg=C0103 # support for dry-run self.dry_run_result = None @@ -100,7 +103,7 @@ class LogicalUnit(object): attr_val = getattr(op, attr_name, None) if attr_val is None: raise errors.OpPrereqError("Required parameter '%s' missing" % - attr_name) + attr_name, errors.ECODE_INVAL) self.CheckArguments() @@ -277,6 +280,9 @@ class LogicalUnit(object): and hook results """ + # API must be kept, thus we ignore the unused argument and could + # be a function warnings + # pylint: disable-msg=W0613,R0201 return lu_result def _ExpandAndLockInstance(self): @@ -297,7 +303,7 @@ class LogicalUnit(object): expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name) if expanded_name is None: raise errors.OpPrereqError("Instance '%s' not known" % - self.op.instance_name) + self.op.instance_name, errors.ECODE_NOENT) self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name self.op.instance_name = expanded_name @@ -347,7 +353,7 @@ class LogicalUnit(object): del self.recalculate_locks[locking.LEVEL_NODE] -class NoHooksLU(LogicalUnit): +class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223 """Simple LU which runs no hooks. This LU is intended as a parent for other LogicalUnits which will @@ -357,6 +363,14 @@ class NoHooksLU(LogicalUnit): HPATH = None HTYPE = None + def BuildHooksEnv(self): + """Empty BuildHooksEnv for NoHooksLu. + + This just raises an error. + + """ + assert False, "BuildHooksEnv called for NoHooksLUs" + class Tasklet: """Tasklet base class. @@ -417,7 +431,8 @@ def _GetWantedNodes(lu, nodes): """ if not isinstance(nodes, list): - raise errors.OpPrereqError("Invalid argument type 'nodes'") + raise errors.OpPrereqError("Invalid argument type 'nodes'", + errors.ECODE_INVAL) if not nodes: raise errors.ProgrammerError("_GetWantedNodes should only be called with a" @@ -427,7 +442,8 @@ def _GetWantedNodes(lu, nodes): for name in nodes: node = lu.cfg.ExpandNodeName(name) if node is None: - raise errors.OpPrereqError("No such node name '%s'" % name) + raise errors.OpPrereqError("No such node name '%s'" % name, + errors.ECODE_NOENT) wanted.append(node) return utils.NiceSort(wanted) @@ -447,7 +463,8 @@ def _GetWantedInstances(lu, instances): """ if not isinstance(instances, list): - raise errors.OpPrereqError("Invalid argument type 'instances'") + raise errors.OpPrereqError("Invalid argument type 'instances'", + errors.ECODE_INVAL) if instances: wanted = [] @@ -455,7 +472,8 @@ def _GetWantedInstances(lu, instances): for name in instances: instance = lu.cfg.ExpandInstanceName(name) if instance is None: - raise errors.OpPrereqError("No such instance name '%s'" % name) + raise errors.OpPrereqError("No such instance name '%s'" % name, + errors.ECODE_NOENT) wanted.append(instance) else: @@ -479,7 +497,7 @@ def _CheckOutputFields(static, dynamic, selected): delta = f.NonMatching(selected) if delta: raise errors.OpPrereqError("Unknown output fields selected: %s" - % ",".join(delta)) + % ",".join(delta), errors.ECODE_INVAL) def _CheckBooleanOpField(op, name): @@ -492,10 +510,25 @@ def _CheckBooleanOpField(op, name): val = getattr(op, name, None) if not (val is None or isinstance(val, bool)): raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" % - (name, str(val))) + (name, str(val)), errors.ECODE_INVAL) setattr(op, name, val) +def _CheckGlobalHvParams(params): + """Validates that given hypervisor params are not global ones. + + This will ensure that instances don't get customised versions of + global params. + + """ + used_globals = constants.HVC_GLOBALS.intersection(params) + if used_globals: + msg = ("The following hypervisor parameters are global and cannot" + " be customized at instance level, please modify them at" + " cluster level: %s" % utils.CommaJoin(used_globals)) + raise errors.OpPrereqError(msg, errors.ECODE_INVAL) + + def _CheckNodeOnline(lu, node): """Ensure that a given node is online. @@ -505,7 +538,8 @@ def _CheckNodeOnline(lu, node): """ if lu.cfg.GetNodeInfo(node).offline: - raise errors.OpPrereqError("Can't use offline node %s" % node) + raise errors.OpPrereqError("Can't use offline node %s" % node, + errors.ECODE_INVAL) def _CheckNodeNotDrained(lu, node): @@ -517,7 +551,8 @@ def _CheckNodeNotDrained(lu, node): """ if lu.cfg.GetNodeInfo(node).drained: - raise errors.OpPrereqError("Can't use drained node %s" % node) + raise errors.OpPrereqError("Can't use drained node %s" % node, + errors.ECODE_INVAL) def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, @@ -667,25 +702,36 @@ def _BuildInstanceHookEnvByObject(lu, instance, override=None): } if override: args.update(override) - return _BuildInstanceHookEnv(**args) + return _BuildInstanceHookEnv(**args) # pylint: disable-msg=W0142 -def _AdjustCandidatePool(lu): +def _AdjustCandidatePool(lu, exceptions): """Adjust the candidate pool after node operations. """ - mod_list = lu.cfg.MaintainCandidatePool() + mod_list = lu.cfg.MaintainCandidatePool(exceptions) if mod_list: lu.LogInfo("Promoted nodes to master candidate role: %s", - ", ".join(node.name for node in mod_list)) + utils.CommaJoin(node.name for node in mod_list)) for name in mod_list: lu.context.ReaddNode(name) - mc_now, mc_max = lu.cfg.GetMasterCandidateStats() + mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions) if mc_now > mc_max: lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" % (mc_now, mc_max)) +def _DecideSelfPromotion(lu, exceptions=None): + """Decide whether I should promote myself as a master candidate. + + """ + cp_size = lu.cfg.GetClusterInfo().candidate_pool_size + mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions) + # the new node will increase mc_max with one, so: + mc_should = min(mc_should + 1, cp_size) + return mc_now < mc_should + + def _CheckNicsBridgesExist(lu, target_nics, target_node, profile=constants.PP_DEFAULT): """Check that the brigdes needed by a list of nics exist. @@ -699,7 +745,7 @@ def _CheckNicsBridgesExist(lu, target_nics, target_node, if brlist: result = lu.rpc.call_bridges_exist(target_node, brlist) result.Raise("Error checking bridges on destination node '%s'" % - target_node, prereq=True) + target_node, prereq=True, ecode=errors.ECODE_ENVIRON) def _CheckInstanceBridgesExist(lu, instance, node=None): @@ -711,6 +757,27 @@ def _CheckInstanceBridgesExist(lu, instance, node=None): _CheckNicsBridgesExist(lu, instance.nics, node) +def _CheckOSVariant(os_obj, name): + """Check whether an OS name conforms to the os variants specification. + + @type os_obj: L{objects.OS} + @param os_obj: OS object to check + @type name: string + @param name: OS name passed by the user, to check for validity + + """ + if not os_obj.supported_variants: + return + try: + variant = name.split("+", 1)[1] + except IndexError: + raise errors.OpPrereqError("OS name must include a variant", + errors.ECODE_INVAL) + + if variant not in os_obj.supported_variants: + raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL) + + def _GetNodeInstancesInner(cfg, fn): return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)] @@ -759,7 +826,7 @@ def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq): result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks) result.Raise("Failed to get disk status from node %s" % node_name, - prereq=prereq) + prereq=prereq, ecode=errors.ECODE_ENVIRON) for idx, bdev_status in enumerate(result.payload): if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY: @@ -825,30 +892,37 @@ class LUDestroyCluster(LogicalUnit): nodelist = self.cfg.GetNodeList() if len(nodelist) != 1 or nodelist[0] != master: raise errors.OpPrereqError("There are still %d node(s) in" - " this cluster." % (len(nodelist) - 1)) + " this cluster." % (len(nodelist) - 1), + errors.ECODE_INVAL) instancelist = self.cfg.GetInstanceList() if instancelist: raise errors.OpPrereqError("There are still %d instance(s) in" - " this cluster." % len(instancelist)) + " this cluster." % len(instancelist), + errors.ECODE_INVAL) def Exec(self, feedback_fn): """Destroys the cluster. """ master = self.cfg.GetMasterNode() + modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup # Run post hooks on master node before it's removed hm = self.proc.hmclass(self.rpc.call_hooks_runner, self) try: hm.RunPhase(constants.HOOKS_PHASE_POST, [master]) except: + # pylint: disable-msg=W0702 self.LogWarning("Errors occurred running hooks on %s" % master) result = self.rpc.call_node_stop_master(master, False) result.Raise("Could not disable the master role") - priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS) - utils.CreateBackup(priv_key) - utils.CreateBackup(pub_key) + + if modify_ssh_setup: + priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS) + utils.CreateBackup(priv_key) + utils.CreateBackup(pub_key) + return master @@ -858,9 +932,39 @@ class LUVerifyCluster(LogicalUnit): """ HPATH = "cluster-verify" HTYPE = constants.HTYPE_CLUSTER - _OP_REQP = ["skip_checks"] + _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"] REQ_BGL = False + TCLUSTER = "cluster" + TNODE = "node" + TINSTANCE = "instance" + + ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG") + EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE") + EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN") + EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT") + EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK") + EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK") + EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE") + ENODEDRBD = (TNODE, "ENODEDRBD") + ENODEFILECHECK = (TNODE, "ENODEFILECHECK") + ENODEHOOKS = (TNODE, "ENODEHOOKS") + ENODEHV = (TNODE, "ENODEHV") + ENODELVM = (TNODE, "ENODELVM") + ENODEN1 = (TNODE, "ENODEN1") + ENODENET = (TNODE, "ENODENET") + ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE") + ENODEORPHANLV = (TNODE, "ENODEORPHANLV") + ENODERPC = (TNODE, "ENODERPC") + ENODESSH = (TNODE, "ENODESSH") + ENODEVERSION = (TNODE, "ENODEVERSION") + ENODESETUP = (TNODE, "ENODESETUP") + ENODETIME = (TNODE, "ENODETIME") + + ETYPE_FIELD = "code" + ETYPE_ERROR = "ERROR" + ETYPE_WARNING = "WARNING" + def ExpandNames(self): self.needed_locks = { locking.LEVEL_NODE: locking.ALL_SET, @@ -868,9 +972,45 @@ class LUVerifyCluster(LogicalUnit): } self.share_locks = dict.fromkeys(locking.LEVELS, 1) + def _Error(self, ecode, item, msg, *args, **kwargs): + """Format an error message. + + Based on the opcode's error_codes parameter, either format a + parseable error code, or a simpler error string. + + This must be called only from Exec and functions called from Exec. + + """ + ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) + itype, etxt = ecode + # first complete the msg + if args: + msg = msg % args + # then format the whole message + if self.op.error_codes: + msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg) + else: + if item: + item = " " + item + else: + item = "" + msg = "%s: %s%s: %s" % (ltype, itype, item, msg) + # and finally report it via the feedback_fn + self._feedback_fn(" - %s" % msg) + + def _ErrorIf(self, cond, *args, **kwargs): + """Log an error message if the passed condition is True. + + """ + cond = bool(cond) or self.op.debug_simulate_errors + if cond: + self._Error(*args, **kwargs) + # do not mark the operation as failed for WARN cases only + if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR: + self.bad = self.bad or cond + def _VerifyNode(self, nodeinfo, file_list, local_cksum, - node_result, feedback_fn, master_files, - drbd_map, vg_name): + node_result, master_files, drbd_map, vg_name): """Run multiple tests against a node. Test list: @@ -885,7 +1025,6 @@ class LUVerifyCluster(LogicalUnit): @param file_list: required list of files @param local_cksum: dictionary of local files and their checksums @param node_result: the results from the node - @param feedback_fn: function used to accumulate results @param master_files: list of files that only masters should have @param drbd_map: the useddrbd minors for this node, in form of minor: (instance, must_exist) which correspond to instances @@ -894,138 +1033,154 @@ class LUVerifyCluster(LogicalUnit): """ node = nodeinfo.name + _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 # main result, node_result should be a non-empty dict - if not node_result or not isinstance(node_result, dict): - feedback_fn(" - ERROR: unable to verify node %s." % (node,)) - return True + test = not node_result or not isinstance(node_result, dict) + _ErrorIf(test, self.ENODERPC, node, + "unable to verify node: no data returned") + if test: + return # compares ganeti version local_version = constants.PROTOCOL_VERSION remote_version = node_result.get('version', None) - if not (remote_version and isinstance(remote_version, (list, tuple)) and - len(remote_version) == 2): - feedback_fn(" - ERROR: connection to %s failed" % (node)) - return True + test = not (remote_version and + isinstance(remote_version, (list, tuple)) and + len(remote_version) == 2) + _ErrorIf(test, self.ENODERPC, node, + "connection to node returned invalid data") + if test: + return - if local_version != remote_version[0]: - feedback_fn(" - ERROR: incompatible protocol versions: master %s," - " node %s %s" % (local_version, node, remote_version[0])) - return True + test = local_version != remote_version[0] + _ErrorIf(test, self.ENODEVERSION, node, + "incompatible protocol versions: master %s," + " node %s", local_version, remote_version[0]) + if test: + return # node seems compatible, we can actually try to look into its results - bad = False - # full package version - if constants.RELEASE_VERSION != remote_version[1]: - feedback_fn(" - WARNING: software version mismatch: master %s," - " node %s %s" % - (constants.RELEASE_VERSION, node, remote_version[1])) + self._ErrorIf(constants.RELEASE_VERSION != remote_version[1], + self.ENODEVERSION, node, + "software version mismatch: master %s, node %s", + constants.RELEASE_VERSION, remote_version[1], + code=self.ETYPE_WARNING) # checks vg existence and size > 20G if vg_name is not None: vglist = node_result.get(constants.NV_VGLIST, None) - if not vglist: - feedback_fn(" - ERROR: unable to check volume groups on node %s." % - (node,)) - bad = True - else: + test = not vglist + _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups") + if not test: vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name, constants.MIN_VG_SIZE) - if vgstatus: - feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node)) - bad = True + _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus) # checks config file checksum remote_cksum = node_result.get(constants.NV_FILELIST, None) - if not isinstance(remote_cksum, dict): - bad = True - feedback_fn(" - ERROR: node hasn't returned file checksum data") - else: + test = not isinstance(remote_cksum, dict) + _ErrorIf(test, self.ENODEFILECHECK, node, + "node hasn't returned file checksum data") + if not test: for file_name in file_list: node_is_mc = nodeinfo.master_candidate - must_have_file = file_name not in master_files - if file_name not in remote_cksum: - if node_is_mc or must_have_file: - bad = True - feedback_fn(" - ERROR: file '%s' missing" % file_name) - elif remote_cksum[file_name] != local_cksum[file_name]: - if node_is_mc or must_have_file: - bad = True - feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name) - else: - # not candidate and this is not a must-have file - bad = True - feedback_fn(" - ERROR: file '%s' should not exist on non master" - " candidates (and the file is outdated)" % file_name) - else: - # all good, except non-master/non-must have combination - if not node_is_mc and not must_have_file: - feedback_fn(" - ERROR: file '%s' should not exist on non master" - " candidates" % file_name) + must_have = (file_name not in master_files) or node_is_mc + # missing + test1 = file_name not in remote_cksum + # invalid checksum + test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name] + # existing and good + test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name] + _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node, + "file '%s' missing", file_name) + _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node, + "file '%s' has wrong checksum", file_name) + # not candidate and this is not a must-have file + _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node, + "file '%s' should not exist on non master" + " candidates (and the file is outdated)", file_name) + # all good, except non-master/non-must have combination + _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node, + "file '%s' should not exist" + " on non master candidates", file_name) # checks ssh to any - if constants.NV_NODELIST not in node_result: - bad = True - feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data") - else: + test = constants.NV_NODELIST not in node_result + _ErrorIf(test, self.ENODESSH, node, + "node hasn't returned node ssh connectivity data") + if not test: if node_result[constants.NV_NODELIST]: - bad = True - for node in node_result[constants.NV_NODELIST]: - feedback_fn(" - ERROR: ssh communication with node '%s': %s" % - (node, node_result[constants.NV_NODELIST][node])) - - if constants.NV_NODENETTEST not in node_result: - bad = True - feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data") - else: + for a_node, a_msg in node_result[constants.NV_NODELIST].items(): + _ErrorIf(True, self.ENODESSH, node, + "ssh communication with node '%s': %s", a_node, a_msg) + + test = constants.NV_NODENETTEST not in node_result + _ErrorIf(test, self.ENODENET, node, + "node hasn't returned node tcp connectivity data") + if not test: if node_result[constants.NV_NODENETTEST]: - bad = True nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys()) - for node in nlist: - feedback_fn(" - ERROR: tcp communication with node '%s': %s" % - (node, node_result[constants.NV_NODENETTEST][node])) + for anode in nlist: + _ErrorIf(True, self.ENODENET, node, + "tcp communication with node '%s': %s", + anode, node_result[constants.NV_NODENETTEST][anode]) hyp_result = node_result.get(constants.NV_HYPERVISOR, None) if isinstance(hyp_result, dict): for hv_name, hv_result in hyp_result.iteritems(): - if hv_result is not None: - feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" % - (hv_name, hv_result)) + test = hv_result is not None + _ErrorIf(test, self.ENODEHV, node, + "hypervisor %s verify failure: '%s'", hv_name, hv_result) # check used drbd list if vg_name is not None: used_minors = node_result.get(constants.NV_DRBDLIST, []) - if not isinstance(used_minors, (tuple, list)): - feedback_fn(" - ERROR: cannot parse drbd status file: %s" % - str(used_minors)) - else: + test = not isinstance(used_minors, (tuple, list)) + _ErrorIf(test, self.ENODEDRBD, node, + "cannot parse drbd status file: %s", str(used_minors)) + if not test: for minor, (iname, must_exist) in drbd_map.items(): - if minor not in used_minors and must_exist: - feedback_fn(" - ERROR: drbd minor %d of instance %s is" - " not active" % (minor, iname)) - bad = True + test = minor not in used_minors and must_exist + _ErrorIf(test, self.ENODEDRBD, node, + "drbd minor %d of instance %s is not active", + minor, iname) for minor in used_minors: - if minor not in drbd_map: - feedback_fn(" - ERROR: unallocated drbd minor %d is in use" % - minor) - bad = True - - return bad + test = minor not in drbd_map + _ErrorIf(test, self.ENODEDRBD, node, + "unallocated drbd minor %d is in use", minor) + test = node_result.get(constants.NV_NODESETUP, + ["Missing NODESETUP results"]) + _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s", + "; ".join(test)) + + # check pv names + if vg_name is not None: + pvlist = node_result.get(constants.NV_PVLIST, None) + test = pvlist is None + _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node") + if not test: + # check that ':' is not present in PV names, since it's a + # special character for lvcreate (denotes the range of PEs to + # use on the PV) + for _, pvname, owner_vg in pvlist: + test = ":" in pvname + _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV" + " '%s' of VG '%s'", pvname, owner_vg) def _VerifyInstance(self, instance, instanceconfig, node_vol_is, - node_instance, feedback_fn, n_offline): + node_instance, n_offline): """Verify an instance. This function checks to see if the required block devices are available on the instance's node. """ - bad = False - + _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 node_current = instanceconfig.primary_node node_vol_should = {} @@ -1036,69 +1191,57 @@ class LUVerifyCluster(LogicalUnit): # ignore missing volumes on offline nodes continue for volume in node_vol_should[node]: - if node not in node_vol_is or volume not in node_vol_is[node]: - feedback_fn(" - ERROR: volume %s missing on node %s" % - (volume, node)) - bad = True + test = node not in node_vol_is or volume not in node_vol_is[node] + _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance, + "volume %s missing on node %s", volume, node) if instanceconfig.admin_up: - if ((node_current not in node_instance or - not instance in node_instance[node_current]) and - node_current not in n_offline): - feedback_fn(" - ERROR: instance %s not running on node %s" % - (instance, node_current)) - bad = True + test = ((node_current not in node_instance or + not instance in node_instance[node_current]) and + node_current not in n_offline) + _ErrorIf(test, self.EINSTANCEDOWN, instance, + "instance not running on its primary node %s", + node_current) for node in node_instance: if (not node == node_current): - if instance in node_instance[node]: - feedback_fn(" - ERROR: instance %s should not run on node %s" % - (instance, node)) - bad = True + test = instance in node_instance[node] + _ErrorIf(test, self.EINSTANCEWRONGNODE, instance, + "instance should not run on node %s", node) - return bad - - def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn): + def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is): """Verify if there are any unknown volumes in the cluster. The .os, .swap and backup volumes are ignored. All other volumes are reported as unknown. """ - bad = False - for node in node_vol_is: for volume in node_vol_is[node]: - if node not in node_vol_should or volume not in node_vol_should[node]: - feedback_fn(" - ERROR: volume %s on node %s should not exist" % - (volume, node)) - bad = True - return bad + test = (node not in node_vol_should or + volume not in node_vol_should[node]) + self._ErrorIf(test, self.ENODEORPHANLV, node, + "volume %s is unknown", volume) - def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn): + def _VerifyOrphanInstances(self, instancelist, node_instance): """Verify the list of running instances. This checks what instances are running but unknown to the cluster. """ - bad = False for node in node_instance: - for runninginstance in node_instance[node]: - if runninginstance not in instancelist: - feedback_fn(" - ERROR: instance %s on node %s should not exist" % - (runninginstance, node)) - bad = True - return bad - - def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn): + for o_inst in node_instance[node]: + test = o_inst not in instancelist + self._ErrorIf(test, self.ENODEORPHANINSTANCE, node, + "instance %s on node %s should not exist", o_inst, node) + + def _VerifyNPlusOneMemory(self, node_info, instance_cfg): """Verify N+1 Memory Resilience. Check that if one single node dies we can still start all the instances it was primary for. """ - bad = False - for node, nodeinfo in node_info.iteritems(): # This code checks that every node which is now listed as secondary has # enough memory to host all instances it is supposed to should a single @@ -1114,11 +1257,10 @@ class LUVerifyCluster(LogicalUnit): bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance]) if bep[constants.BE_AUTO_BALANCE]: needed_mem += bep[constants.BE_MEMORY] - if nodeinfo['mfree'] < needed_mem: - feedback_fn(" - ERROR: not enough memory on node %s to accommodate" - " failovers should node %s fail" % (node, prinode)) - bad = True - return bad + test = nodeinfo['mfree'] < needed_mem + self._ErrorIf(test, self.ENODEN1, node, + "not enough memory on to accommodate" + " failovers should peer node %s fail", prinode) def CheckPrereq(self): """Check prerequisites. @@ -1129,7 +1271,8 @@ class LUVerifyCluster(LogicalUnit): """ self.skip_set = frozenset(self.op.skip_checks) if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set): - raise errors.OpPrereqError("Invalid checks to be skipped specified") + raise errors.OpPrereqError("Invalid checks to be skipped specified", + errors.ECODE_INVAL) def BuildHooksEnv(self): """Build hooks env. @@ -1151,10 +1294,13 @@ class LUVerifyCluster(LogicalUnit): """Verify integrity of cluster, performing various test on nodes. """ - bad = False + self.bad = False + _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + verbose = self.op.verbose + self._feedback_fn = feedback_fn feedback_fn("* Verifying global settings") for msg in self.cfg.VerifyConfig(): - feedback_fn(" - ERROR: %s" % msg) + _ErrorIf(True, self.ECLUSTERCFG, None, msg) vg_name = self.cfg.GetVGName() hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors @@ -1195,23 +1341,36 @@ class LUVerifyCluster(LogicalUnit): constants.NV_INSTANCELIST: hypervisors, constants.NV_VERSION: None, constants.NV_HVINFO: self.cfg.GetHypervisorType(), + constants.NV_NODESETUP: None, + constants.NV_TIME: None, } + if vg_name is not None: node_verify_param[constants.NV_VGLIST] = None node_verify_param[constants.NV_LVLIST] = vg_name + node_verify_param[constants.NV_PVLIST] = [vg_name] node_verify_param[constants.NV_DRBDLIST] = None + + # Due to the way our RPC system works, exact response times cannot be + # guaranteed (e.g. a broken node could run into a timeout). By keeping the + # time before and after executing the request, we can at least have a time + # window. + nvinfo_starttime = time.time() all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param, self.cfg.GetClusterName()) + nvinfo_endtime = time.time() cluster = self.cfg.GetClusterInfo() master_node = self.cfg.GetMasterNode() all_drbd_map = self.cfg.ComputeDRBDMap() + feedback_fn("* Verifying node status") for node_i in nodeinfo: node = node_i.name if node_i.offline: - feedback_fn("* Skipping offline node %s" % (node,)) + if verbose: + feedback_fn("* Skipping offline node %s" % (node,)) n_offline.append(node) continue @@ -1224,62 +1383,81 @@ class LUVerifyCluster(LogicalUnit): n_drained.append(node) else: ntype = "regular" - feedback_fn("* Verifying node %s (%s)" % (node, ntype)) + if verbose: + feedback_fn("* Verifying node %s (%s)" % (node, ntype)) msg = all_nvinfo[node].fail_msg + _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg) if msg: - feedback_fn(" - ERROR: while contacting node %s: %s" % (node, msg)) - bad = True continue nresult = all_nvinfo[node].payload node_drbd = {} for minor, instance in all_drbd_map[node].items(): - if instance not in instanceinfo: - feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" % - instance) + test = instance not in instanceinfo + _ErrorIf(test, self.ECLUSTERCFG, None, + "ghost instance '%s' in temporary DRBD map", instance) # ghost instance should not be running, but otherwise we # don't give double warnings (both ghost instance and # unallocated minor in use) + if test: node_drbd[minor] = (instance, False) else: instance = instanceinfo[instance] node_drbd[minor] = (instance.name, instance.admin_up) - result = self._VerifyNode(node_i, file_names, local_checksums, - nresult, feedback_fn, master_files, - node_drbd, vg_name) - bad = bad or result + + self._VerifyNode(node_i, file_names, local_checksums, + nresult, master_files, node_drbd, vg_name) lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data") if vg_name is None: node_volume[node] = {} elif isinstance(lvdata, basestring): - feedback_fn(" - ERROR: LVM problem on node %s: %s" % - (node, utils.SafeEncode(lvdata))) - bad = True + _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s", + utils.SafeEncode(lvdata)) node_volume[node] = {} elif not isinstance(lvdata, dict): - feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,)) - bad = True + _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)") continue else: node_volume[node] = lvdata # node_instance idata = nresult.get(constants.NV_INSTANCELIST, None) - if not isinstance(idata, list): - feedback_fn(" - ERROR: connection to %s failed (instancelist)" % - (node,)) - bad = True + test = not isinstance(idata, list) + _ErrorIf(test, self.ENODEHV, node, + "rpc call to node failed (instancelist)") + if test: continue node_instance[node] = idata # node_info nodeinfo = nresult.get(constants.NV_HVINFO, None) - if not isinstance(nodeinfo, dict): - feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,)) - bad = True + test = not isinstance(nodeinfo, dict) + _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)") + if test: + continue + + # Node time + ntime = nresult.get(constants.NV_TIME, None) + try: + ntime_merged = utils.MergeTime(ntime) + except (ValueError, TypeError): + _ErrorIf(test, self.ENODETIME, node, "Node returned invalid time") + + if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW): + ntime_diff = abs(nvinfo_starttime - ntime_merged) + elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW): + ntime_diff = abs(ntime_merged - nvinfo_endtime) + else: + ntime_diff = None + + _ErrorIf(ntime_diff is not None, self.ENODETIME, node, + "Node time diverges by at least %0.1fs from master node time", + ntime_diff) + + if ntime_diff is not None: continue try: @@ -1297,28 +1475,28 @@ class LUVerifyCluster(LogicalUnit): } # FIXME: devise a free space model for file based instances as well if vg_name is not None: - if (constants.NV_VGLIST not in nresult or - vg_name not in nresult[constants.NV_VGLIST]): - feedback_fn(" - ERROR: node %s didn't return data for the" - " volume group '%s' - it is either missing or broken" % - (node, vg_name)) - bad = True + test = (constants.NV_VGLIST not in nresult or + vg_name not in nresult[constants.NV_VGLIST]) + _ErrorIf(test, self.ENODELVM, node, + "node didn't return data for the volume group '%s'" + " - it is either missing or broken", vg_name) + if test: continue node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name]) except (ValueError, KeyError): - feedback_fn(" - ERROR: invalid nodeinfo value returned" - " from node %s" % (node,)) - bad = True + _ErrorIf(True, self.ENODERPC, node, + "node returned invalid nodeinfo, check lvm/hypervisor") continue node_vol_should = {} + feedback_fn("* Verifying instance status") for instance in instancelist: - feedback_fn("* Verifying instance %s" % instance) + if verbose: + feedback_fn("* Verifying instance %s" % instance) inst_config = instanceinfo[instance] - result = self._VerifyInstance(instance, inst_config, node_volume, - node_instance, feedback_fn, n_offline) - bad = bad or result + self._VerifyInstance(instance, inst_config, node_volume, + node_instance, n_offline) inst_nodes_offline = [] inst_config.MapLVsByNode(node_vol_should) @@ -1326,12 +1504,11 @@ class LUVerifyCluster(LogicalUnit): instance_cfg[instance] = inst_config pnode = inst_config.primary_node + _ErrorIf(pnode not in node_info and pnode not in n_offline, + self.ENODERPC, pnode, "instance %s, connection to" + " primary node failed", instance) if pnode in node_info: node_info[pnode]['pinst'].append(instance) - elif pnode not in n_offline: - feedback_fn(" - ERROR: instance %s, connection to primary node" - " %s failed" % (instance, pnode)) - bad = True if pnode in n_offline: inst_nodes_offline.append(pnode) @@ -1343,46 +1520,42 @@ class LUVerifyCluster(LogicalUnit): # FIXME: does not support file-backed instances if len(inst_config.secondary_nodes) == 0: i_non_redundant.append(instance) - elif len(inst_config.secondary_nodes) > 1: - feedback_fn(" - WARNING: multiple secondaries for instance %s" - % instance) + _ErrorIf(len(inst_config.secondary_nodes) > 1, + self.EINSTANCELAYOUT, instance, + "instance has multiple secondary nodes", code="WARNING") if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]: i_non_a_balanced.append(instance) for snode in inst_config.secondary_nodes: + _ErrorIf(snode not in node_info and snode not in n_offline, + self.ENODERPC, snode, + "instance %s, connection to secondary node" + "failed", instance) + if snode in node_info: node_info[snode]['sinst'].append(instance) if pnode not in node_info[snode]['sinst-by-pnode']: node_info[snode]['sinst-by-pnode'][pnode] = [] node_info[snode]['sinst-by-pnode'][pnode].append(instance) - elif snode not in n_offline: - feedback_fn(" - ERROR: instance %s, connection to secondary node" - " %s failed" % (instance, snode)) - bad = True + if snode in n_offline: inst_nodes_offline.append(snode) - if inst_nodes_offline: - # warn that the instance lives on offline nodes, and set bad=True - feedback_fn(" - ERROR: instance lives on offline node(s) %s" % - ", ".join(inst_nodes_offline)) - bad = True + # warn that the instance lives on offline nodes + _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance, + "instance lives on offline node(s) %s", + utils.CommaJoin(inst_nodes_offline)) feedback_fn("* Verifying orphan volumes") - result = self._VerifyOrphanVolumes(node_vol_should, node_volume, - feedback_fn) - bad = bad or result + self._VerifyOrphanVolumes(node_vol_should, node_volume) feedback_fn("* Verifying remaining instances") - result = self._VerifyOrphanInstances(instancelist, node_instance, - feedback_fn) - bad = bad or result + self._VerifyOrphanInstances(instancelist, node_instance) if constants.VERIFY_NPLUSONE_MEM not in self.skip_set: feedback_fn("* Verifying N+1 Memory redundancy") - result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn) - bad = bad or result + self._VerifyNPlusOneMemory(node_info, instance_cfg) feedback_fn("* Other Notes") if i_non_redundant: @@ -1399,7 +1572,7 @@ class LUVerifyCluster(LogicalUnit): if n_drained: feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained)) - return not bad + return not self.bad def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result): """Analyze the post-hooks' result @@ -1422,33 +1595,28 @@ class LUVerifyCluster(LogicalUnit): # Used to change hooks' output to proper indentation indent_re = re.compile('^', re.M) feedback_fn("* Hooks Results") - if not hooks_results: - feedback_fn(" - ERROR: general communication failure") - lu_result = 1 - else: - for node_name in hooks_results: - show_node_header = True - res = hooks_results[node_name] - msg = res.fail_msg - if msg: - if res.offline: - # no need to warn or set fail return value - continue - feedback_fn(" Communication failure in hooks execution: %s" % - msg) + assert hooks_results, "invalid result from hooks" + + for node_name in hooks_results: + res = hooks_results[node_name] + msg = res.fail_msg + test = msg and not res.offline + self._ErrorIf(test, self.ENODEHOOKS, node_name, + "Communication failure in hooks execution: %s", msg) + if res.offline or msg: + # No need to investigate payload if node is offline or gave an error. + # override manually lu_result here as _ErrorIf only + # overrides self.bad + lu_result = 1 + continue + for script, hkr, output in res.payload: + test = hkr == constants.HKR_FAIL + self._ErrorIf(test, self.ENODEHOOKS, node_name, + "Script %s failed, output:", script) + if test: + output = indent_re.sub(' ', output) + feedback_fn("%s" % output) lu_result = 1 - continue - for script, hkr, output in res.payload: - if hkr == constants.HKR_FAIL: - # The node header is only shown once, if there are - # failing hooks on that node - if show_node_header: - feedback_fn(" Node %s:" % node_name) - show_node_header = False - feedback_fn(" ERROR: Script %s failed, output:" % script) - output = indent_re.sub(' ', output) - feedback_fn("%s" % output) - lu_result = 1 return lu_result @@ -1520,7 +1688,7 @@ class LUVerifyDisks(NoHooksLU): continue lvs = node_res.payload - for lv_name, (_, lv_inactive, lv_online) in lvs.items(): + for lv_name, (_, _, lv_online) in lvs.items(): inst = nv_dict.pop((node, lv_name), None) if (not lv_online and inst is not None and inst.name not in res_instances): @@ -1545,16 +1713,17 @@ class LURepairDiskSizes(NoHooksLU): def ExpandNames(self): if not isinstance(self.op.instances, list): - raise errors.OpPrereqError("Invalid argument type 'instances'") + raise errors.OpPrereqError("Invalid argument type 'instances'", + errors.ECODE_INVAL) if self.op.instances: self.wanted_names = [] for name in self.op.instances: full_name = self.cfg.ExpandInstanceName(name) if full_name is None: - raise errors.OpPrereqError("Instance '%s' not known" % name) + raise errors.OpPrereqError("Instance '%s' not known" % name, + errors.ECODE_NOENT) self.wanted_names.append(full_name) - self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names self.needed_locks = { locking.LEVEL_NODE: [], locking.LEVEL_INSTANCE: self.wanted_names, @@ -1584,6 +1753,29 @@ class LURepairDiskSizes(NoHooksLU): self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name in self.wanted_names] + def _EnsureChildSizes(self, disk): + """Ensure children of the disk have the needed disk size. + + This is valid mainly for DRBD8 and fixes an issue where the + children have smaller disk size. + + @param disk: an L{ganeti.objects.Disk} object + + """ + if disk.dev_type == constants.LD_DRBD8: + assert disk.children, "Empty children for DRBD8?" + fchild = disk.children[0] + mismatch = fchild.size < disk.size + if mismatch: + self.LogInfo("Child disk has size %d, parent %d, fixing", + fchild.size, disk.size) + fchild.size = disk.size + + # and we recurse on this child only, not on the metadev + return self._EnsureChildSizes(fchild) or mismatch + else: + return False + def Exec(self, feedback_fn): """Verify the size of cluster disks. @@ -1600,8 +1792,11 @@ class LURepairDiskSizes(NoHooksLU): changed = [] for node, dskl in per_node_disks.items(): - result = self.rpc.call_blockdev_getsizes(node, [v[2] for v in dskl]) - if result.failed or result.fail_msg: + newl = [v[2].Copy() for v in dskl] + for dsk in newl: + self.cfg.SetDiskID(dsk, node) + result = self.rpc.call_blockdev_getsizes(node, newl) + if result.fail_msg: self.LogWarning("Failure in blockdev_getsizes call to node" " %s, ignoring", node) continue @@ -1624,8 +1819,11 @@ class LURepairDiskSizes(NoHooksLU): " correcting: recorded %d, actual %d", idx, instance.name, disk.size, size) disk.size = size - self.cfg.Update(instance) + self.cfg.Update(instance, feedback_fn) changed.append((instance.name, idx, size)) + if self._EnsureChildSizes(disk): + self.cfg.Update(instance, feedback_fn) + changed.append((instance.name, idx, disk.size)) return changed @@ -1646,13 +1844,14 @@ class LURenameCluster(LogicalUnit): "NEW_NAME": self.op.name, } mn = self.cfg.GetMasterNode() - return env, [mn], [mn] + all_nodes = self.cfg.GetNodeList() + return env, [mn], all_nodes def CheckPrereq(self): """Verify that the passed name is a valid one. """ - hostname = utils.HostInfo(self.op.name) + hostname = utils.GetHostInfo(self.op.name) new_name = hostname.name self.ip = new_ip = hostname.ip @@ -1660,12 +1859,13 @@ class LURenameCluster(LogicalUnit): old_ip = self.cfg.GetMasterIP() if new_name == old_name and new_ip == old_ip: raise errors.OpPrereqError("Neither the name nor the IP address of the" - " cluster has changed") + " cluster has changed", + errors.ECODE_INVAL) if new_ip != old_ip: if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT): raise errors.OpPrereqError("The given cluster IP address (%s) is" " reachable on the network. Aborting." % - new_ip) + new_ip, errors.ECODE_NOTUNIQUE) self.op.name = new_name @@ -1685,7 +1885,7 @@ class LURenameCluster(LogicalUnit): cluster = self.cfg.GetClusterInfo() cluster.cluster_name = clustername cluster.master_ip = ip - self.cfg.Update(cluster) + self.cfg.Update(cluster, feedback_fn) # update the known hosts file ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE) @@ -1747,9 +1947,10 @@ class LUSetClusterParams(LogicalUnit): self.op.candidate_pool_size = int(self.op.candidate_pool_size) except (ValueError, TypeError), err: raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" % - str(err)) + str(err), errors.ECODE_INVAL) if self.op.candidate_pool_size < 1: - raise errors.OpPrereqError("At least one master candidate needed") + raise errors.OpPrereqError("At least one master candidate needed", + errors.ECODE_INVAL) def ExpandNames(self): # FIXME: in the future maybe other cluster params won't require checking on @@ -1783,7 +1984,8 @@ class LUSetClusterParams(LogicalUnit): for disk in inst.disks: if _RecursiveCheckIfLVMBased(disk): raise errors.OpPrereqError("Cannot disable lvm storage while" - " lvm-based instances exist") + " lvm-based instances exist", + errors.ECODE_INVAL) node_list = self.acquired_locks[locking.LEVEL_NODE] @@ -1802,7 +2004,7 @@ class LUSetClusterParams(LogicalUnit): constants.MIN_VG_SIZE) if vgstatus: raise errors.OpPrereqError("Error on node '%s': %s" % - (node, vgstatus)) + (node, vgstatus), errors.ECODE_ENVIRON) self.cluster = cluster = self.cfg.GetClusterInfo() # validate params changes @@ -1816,12 +2018,36 @@ class LUSetClusterParams(LogicalUnit): self.new_nicparams = objects.FillDict( cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams) objects.NIC.CheckParameterSyntax(self.new_nicparams) + nic_errors = [] + + # check all instances for consistency + for instance in self.cfg.GetAllInstancesInfo().values(): + for nic_idx, nic in enumerate(instance.nics): + params_copy = copy.deepcopy(nic.nicparams) + params_filled = objects.FillDict(self.new_nicparams, params_copy) + + # check parameter syntax + try: + objects.NIC.CheckParameterSyntax(params_filled) + except errors.ConfigurationError, err: + nic_errors.append("Instance %s, nic/%d: %s" % + (instance.name, nic_idx, err)) + + # if we're moving instances to routed, check that they have an ip + target_mode = params_filled[constants.NIC_MODE] + if target_mode == constants.NIC_MODE_ROUTED and not nic.ip: + nic_errors.append("Instance %s, nic/%d: routed nick with no ip" % + (instance.name, nic_idx)) + if nic_errors: + raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" % + "\n".join(nic_errors)) # hypervisor list/parameters self.new_hvparams = objects.FillDict(cluster.hvparams, {}) if self.op.hvparams: if not isinstance(self.op.hvparams, dict): - raise errors.OpPrereqError("Invalid 'hvparams' parameter on input") + raise errors.OpPrereqError("Invalid 'hvparams' parameter on input", + errors.ECODE_INVAL) for hv_name, hv_dict in self.op.hvparams.items(): if hv_name not in self.new_hvparams: self.new_hvparams[hv_name] = hv_dict @@ -1832,12 +2058,14 @@ class LUSetClusterParams(LogicalUnit): self.hv_list = self.op.enabled_hypervisors if not self.hv_list: raise errors.OpPrereqError("Enabled hypervisors list must contain at" - " least one member") + " least one member", + errors.ECODE_INVAL) invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES if invalid_hvs: raise errors.OpPrereqError("Enabled hypervisors contains invalid" " entries: %s" % - utils.CommaJoin(invalid_hvs)) + utils.CommaJoin(invalid_hvs), + errors.ECODE_INVAL) else: self.hv_list = cluster.enabled_hypervisors @@ -1878,9 +2106,9 @@ class LUSetClusterParams(LogicalUnit): if self.op.candidate_pool_size is not None: self.cluster.candidate_pool_size = self.op.candidate_pool_size # we need to update the pool size here, otherwise the save will fail - _AdjustCandidatePool(self) + _AdjustCandidatePool(self, []) - self.cfg.Update(self.cluster) + self.cfg.Update(self.cluster, feedback_fn) def _RedistributeAncillaryFiles(lu, additional_nodes=None): @@ -1901,6 +2129,7 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None): dist_nodes.extend(additional_nodes) if myself.name in dist_nodes: dist_nodes.remove(myself.name) + # 2. Gather files to distribute dist_files = set([constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE, @@ -1950,11 +2179,11 @@ class LURedistributeConfig(NoHooksLU): """Redistribute the configuration. """ - self.cfg.Update(self.cfg.GetClusterInfo()) + self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn) _RedistributeAncillaryFiles(self) -def _WaitForSync(lu, instance, oneshot=False, unlock=False): +def _WaitForSync(lu, instance, oneshot=False): """Sleep and poll for an instance's disk to sync. """ @@ -1969,6 +2198,8 @@ def _WaitForSync(lu, instance, oneshot=False, unlock=False): for dev in instance.disks: lu.cfg.SetDiskID(dev, node) + # TODO: Convert to utils.Retry + retries = 0 degr_retries = 10 # in seconds, as we sleep 1 second each time while True: @@ -2066,11 +2297,14 @@ class LUDiagnoseOS(NoHooksLU): _OP_REQP = ["output_fields", "names"] REQ_BGL = False _FIELDS_STATIC = utils.FieldSet() - _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status") + _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants") + # Fields that need calculation of global os validity + _FIELDS_NEEDVALID = frozenset(["valid", "variants"]) def ExpandNames(self): if self.op.names: - raise errors.OpPrereqError("Selective OS query not supported") + raise errors.OpPrereqError("Selective OS query not supported", + errors.ECODE_INVAL) _CheckOutputFields(static=self._FIELDS_STATIC, dynamic=self._FIELDS_DYNAMIC, @@ -2089,10 +2323,9 @@ class LUDiagnoseOS(NoHooksLU): """ @staticmethod - def _DiagnoseByOS(node_list, rlist): + def _DiagnoseByOS(rlist): """Remaps a per-node return list into an a per-os per-node dictionary - @param node_list: a list with the names of all nodes @param rlist: a map with node names as keys and OS objects as values @rtype: dict @@ -2114,14 +2347,14 @@ class LUDiagnoseOS(NoHooksLU): for node_name, nr in rlist.items(): if nr.fail_msg or not nr.payload: continue - for name, path, status, diagnose in nr.payload: + for name, path, status, diagnose, variants in nr.payload: if name not in all_os: # build a list of nodes for this os containing empty lists # for each node in node_list all_os[name] = {} for nname in good_nodes: all_os[name][nname] = [] - all_os[name][node_name].append((path, status, diagnose)) + all_os[name][node_name].append((path, status, diagnose, variants)) return all_os def Exec(self, feedback_fn): @@ -2130,20 +2363,40 @@ class LUDiagnoseOS(NoHooksLU): """ valid_nodes = [node for node in self.cfg.GetOnlineNodeList()] node_data = self.rpc.call_os_diagnose(valid_nodes) - pol = self._DiagnoseByOS(valid_nodes, node_data) + pol = self._DiagnoseByOS(node_data) output = [] + calc_valid = self._FIELDS_NEEDVALID.intersection(self.op.output_fields) + calc_variants = "variants" in self.op.output_fields + for os_name, os_data in pol.items(): row = [] + if calc_valid: + valid = True + variants = None + for osl in os_data.values(): + valid = valid and osl and osl[0][1] + if not valid: + variants = None + break + if calc_variants: + node_variants = osl[0][3] + if variants is None: + variants = node_variants + else: + variants = [v for v in variants if v in node_variants] + for field in self.op.output_fields: if field == "name": val = os_name elif field == "valid": - val = utils.all([osl and osl[0][1] for osl in os_data.values()]) + val = valid elif field == "node_status": # this is just a copy of the dict val = {} for node_name, nos_list in os_data.items(): val[node_name] = nos_list + elif field == "variants": + val = variants else: raise errors.ParameterError(field) row.append(val) @@ -2172,8 +2425,11 @@ class LURemoveNode(LogicalUnit): "NODE_NAME": self.op.node_name, } all_nodes = self.cfg.GetNodeList() - if self.op.node_name in all_nodes: + try: all_nodes.remove(self.op.node_name) + except ValueError: + logging.warning("Node %s which is about to be removed not found" + " in the all nodes list", self.op.node_name) return env, all_nodes, all_nodes def CheckPrereq(self): @@ -2189,20 +2445,23 @@ class LURemoveNode(LogicalUnit): """ node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name)) if node is None: - raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name) + raise errors.OpPrereqError("Node '%s' is unknown." % self.op.node_name, + errors.ECODE_NOENT) instance_list = self.cfg.GetInstanceList() masternode = self.cfg.GetMasterNode() if node.name == masternode: raise errors.OpPrereqError("Node is the master node," - " you need to failover first.") + " you need to failover first.", + errors.ECODE_INVAL) for instance_name in instance_list: instance = self.cfg.GetInstanceInfo(instance_name) if node.name in instance.all_nodes: raise errors.OpPrereqError("Instance %s is still running on the node," - " please remove first." % instance_name) + " please remove first." % instance_name, + errors.ECODE_INVAL) self.op.node_name = node.name self.node = node @@ -2214,31 +2473,38 @@ class LURemoveNode(LogicalUnit): logging.info("Stopping the node daemon and removing configs from node %s", node.name) + modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup + + # Promote nodes to master candidate as needed + _AdjustCandidatePool(self, exceptions=[node.name]) self.context.RemoveNode(node.name) # Run post hooks on the node before it's removed hm = self.proc.hmclass(self.rpc.call_hooks_runner, self) try: - h_results = hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name]) + hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name]) except: + # pylint: disable-msg=W0702 self.LogWarning("Errors occurred running hooks on %s" % node.name) - result = self.rpc.call_node_leave_cluster(node.name) + result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup) msg = result.fail_msg if msg: self.LogWarning("Errors encountered on the remote node while leaving" " the cluster: %s", msg) - # Promote nodes to master candidate as needed - _AdjustCandidatePool(self) - class LUQueryNodes(NoHooksLU): """Logical unit for querying nodes. """ + # pylint: disable-msg=W0142 _OP_REQP = ["output_fields", "names", "use_locking"] REQ_BGL = False + + _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid", + "master_candidate", "offline", "drained"] + _FIELDS_DYNAMIC = utils.FieldSet( "dtotal", "dfree", "mtotal", "mnode", "mfree", @@ -2246,16 +2512,12 @@ class LUQueryNodes(NoHooksLU): "ctotal", "cnodes", "csockets", ) - _FIELDS_STATIC = utils.FieldSet( - "name", "pinst_cnt", "sinst_cnt", + _FIELDS_STATIC = utils.FieldSet(*[ + "pinst_cnt", "sinst_cnt", "pinst_list", "sinst_list", "pip", "sip", "tags", - "serial_no", "ctime", "mtime", - "master_candidate", "master", - "offline", - "drained", - "role", + "role"] + _SIMPLE_FIELDS ) def ExpandNames(self): @@ -2277,7 +2539,6 @@ class LUQueryNodes(NoHooksLU): # if we don't request only static fields, we need to lock the nodes self.needed_locks[locking.LEVEL_NODE] = self.wanted - def CheckPrereq(self): """Check prerequisites. @@ -2338,10 +2599,9 @@ class LUQueryNodes(NoHooksLU): inst_fields = frozenset(("pinst_cnt", "pinst_list", "sinst_cnt", "sinst_list")) if inst_fields & frozenset(self.op.output_fields): - instancelist = self.cfg.GetInstanceList() + inst_data = self.cfg.GetAllInstancesInfo() - for instance_name in instancelist: - inst = self.cfg.GetInstanceInfo(instance_name) + for inst in inst_data.values(): if inst.primary_node in node_to_primary: node_to_primary[inst.primary_node].add(inst.name) for secnode in inst.secondary_nodes: @@ -2356,8 +2616,8 @@ class LUQueryNodes(NoHooksLU): for node in nodelist: node_output = [] for field in self.op.output_fields: - if field == "name": - val = node.name + if field in self._SIMPLE_FIELDS: + val = getattr(node, field) elif field == "pinst_list": val = list(node_to_primary[node.name]) elif field == "sinst_list": @@ -2372,20 +2632,8 @@ class LUQueryNodes(NoHooksLU): val = node.secondary_ip elif field == "tags": val = list(node.GetTags()) - elif field == "serial_no": - val = node.serial_no - elif field == "ctime": - val = node.ctime - elif field == "mtime": - val = node.mtime - elif field == "master_candidate": - val = node.master_candidate elif field == "master": val = node.name == master_node - elif field == "offline": - val = node.offline - elif field == "drained": - val = node.drained elif self._FIELDS_DYNAMIC.Matches(field): val = live_data[node.name].get(field, None) elif field == "role": @@ -2499,18 +2747,17 @@ class LUQueryNodeStorage(NoHooksLU): """ _OP_REQP = ["nodes", "storage_type", "output_fields"] REQ_BGL = False - _FIELDS_STATIC = utils.FieldSet("node") + _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE) def ExpandNames(self): storage_type = self.op.storage_type - if storage_type not in constants.VALID_STORAGE_FIELDS: - raise errors.OpPrereqError("Unknown storage type: %s" % storage_type) - - dynamic_fields = constants.VALID_STORAGE_FIELDS[storage_type] + if storage_type not in constants.VALID_STORAGE_TYPES: + raise errors.OpPrereqError("Unknown storage type: %s" % storage_type, + errors.ECODE_INVAL) _CheckOutputFields(static=self._FIELDS_STATIC, - dynamic=utils.FieldSet(*dynamic_fields), + dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS), selected=self.op.output_fields) self.needed_locks = {} @@ -2542,9 +2789,10 @@ class LUQueryNodeStorage(NoHooksLU): else: fields = [constants.SF_NAME] + self.op.output_fields - # Never ask for node as it's only known to the LU - while "node" in fields: - fields.remove("node") + # Never ask for node or type as it's only known to the LU + for extra in [constants.SF_NODE, constants.SF_TYPE]: + while extra in fields: + fields.remove(extra) field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)]) name_idx = field_idx[constants.SF_NAME] @@ -2574,8 +2822,10 @@ class LUQueryNodeStorage(NoHooksLU): out = [] for field in self.op.output_fields: - if field == "node": + if field == constants.SF_NODE: val = node + elif field == constants.SF_TYPE: + val = self.op.storage_type elif field in field_idx: val = row[field_idx[field]] else: @@ -2598,13 +2848,15 @@ class LUModifyNodeStorage(NoHooksLU): def CheckArguments(self): node_name = self.cfg.ExpandNodeName(self.op.node_name) if node_name is None: - raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name) + raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name, + errors.ECODE_NOENT) self.op.node_name = node_name storage_type = self.op.storage_type - if storage_type not in constants.VALID_STORAGE_FIELDS: - raise errors.OpPrereqError("Unknown storage type: %s" % storage_type) + if storage_type not in constants.VALID_STORAGE_TYPES: + raise errors.OpPrereqError("Unknown storage type: %s" % storage_type, + errors.ECODE_INVAL) def ExpandNames(self): self.needed_locks = { @@ -2621,13 +2873,15 @@ class LUModifyNodeStorage(NoHooksLU): modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type] except KeyError: raise errors.OpPrereqError("Storage units of type '%s' can not be" - " modified" % storage_type) + " modified" % storage_type, + errors.ECODE_INVAL) diff = set(self.op.changes.keys()) - modifiable if diff: raise errors.OpPrereqError("The following fields can not be modified for" " storage units of type '%s': %r" % - (storage_type, list(diff))) + (storage_type, list(diff)), + errors.ECODE_INVAL) def Exec(self, feedback_fn): """Computes the list of nodes and their attributes. @@ -2679,7 +2933,7 @@ class LUAddNode(LogicalUnit): node_name = self.op.node_name cfg = self.cfg - dns_data = utils.HostInfo(node_name) + dns_data = utils.GetHostInfo(node_name) node = dns_data.name primary_ip = self.op.primary_ip = dns_data.ip @@ -2687,15 +2941,17 @@ class LUAddNode(LogicalUnit): if secondary_ip is None: secondary_ip = primary_ip if not utils.IsValidIP(secondary_ip): - raise errors.OpPrereqError("Invalid secondary IP given") + raise errors.OpPrereqError("Invalid secondary IP given", + errors.ECODE_INVAL) self.op.secondary_ip = secondary_ip node_list = cfg.GetNodeList() if not self.op.readd and node in node_list: raise errors.OpPrereqError("Node %s is already in the configuration" % - node) + node, errors.ECODE_EXISTS) elif self.op.readd and node not in node_list: - raise errors.OpPrereqError("Node %s is not in the configuration" % node) + raise errors.OpPrereqError("Node %s is not in the configuration" % node, + errors.ECODE_NOENT) for existing_node_name in node_list: existing_node = cfg.GetNodeInfo(existing_node_name) @@ -2704,7 +2960,8 @@ class LUAddNode(LogicalUnit): if (existing_node.primary_ip != primary_ip or existing_node.secondary_ip != secondary_ip): raise errors.OpPrereqError("Readded node doesn't have the same IP" - " address configuration as before") + " address configuration as before", + errors.ECODE_INVAL) continue if (existing_node.primary_ip == primary_ip or @@ -2712,7 +2969,8 @@ class LUAddNode(LogicalUnit): existing_node.primary_ip == secondary_ip or existing_node.secondary_ip == secondary_ip): raise errors.OpPrereqError("New node ip address(es) conflict with" - " existing node %s" % existing_node.name) + " existing node %s" % existing_node.name, + errors.ECODE_NOTUNIQUE) # check that the type of the node (single versus dual homed) is the # same as for the master @@ -2722,31 +2980,32 @@ class LUAddNode(LogicalUnit): if master_singlehomed != newbie_singlehomed: if master_singlehomed: raise errors.OpPrereqError("The master has no private ip but the" - " new node has one") + " new node has one", + errors.ECODE_INVAL) else: raise errors.OpPrereqError("The master has a private ip but the" - " new node doesn't have one") + " new node doesn't have one", + errors.ECODE_INVAL) # checks reachability if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT): - raise errors.OpPrereqError("Node not reachable by ping") + raise errors.OpPrereqError("Node not reachable by ping", + errors.ECODE_ENVIRON) if not newbie_singlehomed: # check reachability from my secondary ip to newbie's secondary ip if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT, source=myself.secondary_ip): raise errors.OpPrereqError("Node secondary ip not reachable by TCP" - " based ping to noded port") + " based ping to noded port", + errors.ECODE_ENVIRON) - cp_size = self.cfg.GetClusterInfo().candidate_pool_size if self.op.readd: exceptions = [node] else: exceptions = [] - mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions) - # the new node will increase mc_max with one, so: - mc_max = min(mc_max + 1, cp_size) - self.master_candidate = mc_now < mc_max + + self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions) if self.op.readd: self.new_node = self.cfg.GetNodeInfo(node) @@ -2770,7 +3029,7 @@ class LUAddNode(LogicalUnit): # later in the procedure; this also means that if the re-add # fails, we are left with a non-offlined, broken node if self.op.readd: - new_node.drained = new_node.offline = False + new_node.drained = new_node.offline = False # pylint: disable-msg=W0201 self.LogInfo("Readding a node, the offline/drained flags were reset") # if we demote the node, we do cleanup later in the procedure new_node.master_candidate = self.master_candidate @@ -2791,20 +3050,21 @@ class LUAddNode(LogicalUnit): (constants.PROTOCOL_VERSION, result.payload)) # setup ssh on node - logging.info("Copy ssh key to node %s", node) - priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS) - keyarray = [] - keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB, - constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB, - priv_key, pub_key] - - for i in keyfiles: - keyarray.append(utils.ReadFile(i)) - - result = self.rpc.call_node_add(node, keyarray[0], keyarray[1], - keyarray[2], - keyarray[3], keyarray[4], keyarray[5]) - result.Raise("Cannot transfer ssh keys to the new node") + if self.cfg.GetClusterInfo().modify_ssh_setup: + logging.info("Copy ssh key to node %s", node) + priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS) + keyarray = [] + keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB, + constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB, + priv_key, pub_key] + + for i in keyfiles: + keyarray.append(utils.ReadFile(i)) + + result = self.rpc.call_node_add(node, keyarray[0], keyarray[1], + keyarray[2], keyarray[3], keyarray[4], + keyarray[5]) + result.Raise("Cannot transfer ssh keys to the new node") # Add node to our /etc/hosts, and add key to known_hosts if self.cfg.GetClusterInfo().modify_etc_hosts: @@ -2814,7 +3074,7 @@ class LUAddNode(LogicalUnit): result = self.rpc.call_node_has_ip_address(new_node.name, new_node.secondary_ip) result.Raise("Failure checking secondary ip on node %s" % new_node.name, - prereq=True) + prereq=True, ecode=errors.ECODE_ENVIRON) if not result.payload: raise errors.OpExecError("Node claims it doesn't have the secondary ip" " you gave (%s). Please fix and re-run this" @@ -2822,7 +3082,7 @@ class LUAddNode(LogicalUnit): node_verify_list = [self.cfg.GetMasterNode()] node_verify_param = { - 'nodelist': [node], + constants.NV_NODELIST: [node], # TODO: do a node-net-test as well? } @@ -2830,10 +3090,11 @@ class LUAddNode(LogicalUnit): self.cfg.GetClusterName()) for verifier in node_verify_list: result[verifier].Raise("Cannot communicate with node %s" % verifier) - nl_payload = result[verifier].payload['nodelist'] + nl_payload = result[verifier].payload[constants.NV_NODELIST] if nl_payload: for failed in nl_payload: - feedback_fn("ssh/hostname verification failed %s -> %s" % + feedback_fn("ssh/hostname verification failed" + " (checking from %s): %s" % (verifier, nl_payload[failed])) raise errors.OpExecError("ssh/hostname verification failed.") @@ -2841,17 +3102,17 @@ class LUAddNode(LogicalUnit): _RedistributeAncillaryFiles(self) self.context.ReaddNode(new_node) # make sure we redistribute the config - self.cfg.Update(new_node) + self.cfg.Update(new_node, feedback_fn) # and make sure the new node will not have old files around if not new_node.master_candidate: result = self.rpc.call_node_demote_from_mc(new_node.name) - msg = result.RemoteFailMsg() + msg = result.fail_msg if msg: self.LogWarning("Node failed to demote itself from master" " candidate status: %s" % msg) else: _RedistributeAncillaryFiles(self, additional_nodes=[node]) - self.context.AddNode(new_node) + self.context.AddNode(new_node, self.proc.GetECId()) class LUSetNodeParams(LogicalUnit): @@ -2866,17 +3127,20 @@ class LUSetNodeParams(LogicalUnit): def CheckArguments(self): node_name = self.cfg.ExpandNodeName(self.op.node_name) if node_name is None: - raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name) + raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name, + errors.ECODE_INVAL) self.op.node_name = node_name _CheckBooleanOpField(self.op, 'master_candidate') _CheckBooleanOpField(self.op, 'offline') _CheckBooleanOpField(self.op, 'drained') all_mods = [self.op.offline, self.op.master_candidate, self.op.drained] if all_mods.count(None) == 3: - raise errors.OpPrereqError("Please pass at least one modification") + raise errors.OpPrereqError("Please pass at least one modification", + errors.ECODE_INVAL) if all_mods.count(True) > 1: raise errors.OpPrereqError("Can't set the node into more than one" - " state at the same time") + " state at the same time", + errors.ECODE_INVAL) def ExpandNames(self): self.needed_locks = {locking.LEVEL_NODE: self.op.node_name} @@ -2911,25 +3175,42 @@ class LUSetNodeParams(LogicalUnit): # we can't change the master's node flags if self.op.node_name == self.cfg.GetMasterNode(): raise errors.OpPrereqError("The master role can be changed" - " only via masterfailover") + " only via masterfailover", + errors.ECODE_INVAL) + + # Boolean value that tells us whether we're offlining or draining the node + offline_or_drain = self.op.offline == True or self.op.drained == True + deoffline_or_drain = self.op.offline == False or self.op.drained == False - if ((self.op.master_candidate == False or self.op.offline == True or - self.op.drained == True) and node.master_candidate): + if (node.master_candidate and + (self.op.master_candidate == False or offline_or_drain)): cp_size = self.cfg.GetClusterInfo().candidate_pool_size - num_candidates, _ = self.cfg.GetMasterCandidateStats() - if num_candidates <= cp_size: + mc_now, mc_should, mc_max = self.cfg.GetMasterCandidateStats() + if mc_now <= cp_size: msg = ("Not enough master candidates (desired" - " %d, new value will be %d)" % (cp_size, num_candidates-1)) - if self.op.force: + " %d, new value will be %d)" % (cp_size, mc_now-1)) + # Only allow forcing the operation if it's an offline/drain operation, + # and we could not possibly promote more nodes. + # FIXME: this can still lead to issues if in any way another node which + # could be promoted appears in the meantime. + if self.op.force and offline_or_drain and mc_should == mc_max: self.LogWarning(msg) else: - raise errors.OpPrereqError(msg) + raise errors.OpPrereqError(msg, errors.ECODE_INVAL) if (self.op.master_candidate == True and ((node.offline and not self.op.offline == False) or (node.drained and not self.op.drained == False))): raise errors.OpPrereqError("Node '%s' is offline or drained, can't set" - " to master_candidate" % node.name) + " to master_candidate" % node.name, + errors.ECODE_INVAL) + + # If we're being deofflined/drained, we'll MC ourself if needed + if (deoffline_or_drain and not offline_or_drain and not + self.op.master_candidate == True and not node.master_candidate): + self.op.master_candidate = _DecideSelfPromotion(self) + if self.op.master_candidate: + self.LogInfo("Autopromoting node to master candidate") return @@ -2973,7 +3254,7 @@ class LUSetNodeParams(LogicalUnit): changed_mc = True result.append(("master_candidate", "auto-demotion due to drain")) rrc = self.rpc.call_node_demote_from_mc(node.name) - msg = rrc.RemoteFailMsg() + msg = rrc.fail_msg if msg: self.LogWarning("Node failed to demote itself: %s" % msg) if node.offline: @@ -2981,7 +3262,7 @@ class LUSetNodeParams(LogicalUnit): result.append(("offline", "clear offline status due to drain")) # this will trigger configuration file update, if needed - self.cfg.Update(node) + self.cfg.Update(node, feedback_fn) # this will trigger job queue propagation or cleanup if changed_mc: self.context.ReaddNode(node) @@ -2999,11 +3280,13 @@ class LUPowercycleNode(NoHooksLU): def CheckArguments(self): node_name = self.cfg.ExpandNodeName(self.op.node_name) if node_name is None: - raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name) + raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name, + errors.ECODE_NOENT) self.op.node_name = node_name if node_name == self.cfg.GetMasterNode() and not self.op.force: raise errors.OpPrereqError("The node is the master and the force" - " parameter was not set") + " parameter was not set", + errors.ECODE_INVAL) def ExpandNames(self): """Locking for PowercycleNode. @@ -3074,6 +3357,7 @@ class LUQueryClusterInfo(NoHooksLU): "file_storage_dir": cluster.file_storage_dir, "ctime": cluster.ctime, "mtime": cluster.mtime, + "uuid": cluster.uuid, "tags": list(cluster.GetTags()), } @@ -3219,6 +3503,8 @@ def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False, # 2nd pass, do only the primary node for inst_disk in instance.disks: + dev_path = None + for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node): if node != instance.primary_node: continue @@ -3233,8 +3519,10 @@ def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False, " (is_primary=True, pass=2): %s", inst_disk.iv_name, node, msg) disks_ok = False - device_info.append((instance.primary_node, inst_disk.iv_name, - result.payload)) + else: + dev_path = result.payload + + device_info.append((instance.primary_node, inst_disk.iv_name, dev_path)) # leave the disks configured for the primary node # this is a workaround that would be fixed better by @@ -3358,15 +3646,18 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name): """ nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name) - nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True) + nodeinfo[node].Raise("Can't get data from node %s" % node, + prereq=True, ecode=errors.ECODE_ENVIRON) free_mem = nodeinfo[node].payload.get('memory_free', None) if not isinstance(free_mem, int): raise errors.OpPrereqError("Can't compute free memory on node %s, result" - " was '%s'" % (node, free_mem)) + " was '%s'" % (node, free_mem), + errors.ECODE_ENVIRON) if requested > free_mem: raise errors.OpPrereqError("Not enough memory on node %s for %s:" " needed %s MiB, available %s MiB" % - (node, reason, requested, free_mem)) + (node, reason, requested, free_mem), + errors.ECODE_NORES) class LUStartupInstance(LogicalUnit): @@ -3409,7 +3700,8 @@ class LUStartupInstance(LogicalUnit): if self.beparams: if not isinstance(self.beparams, dict): raise errors.OpPrereqError("Invalid beparams passed: %s, expected" - " dict" % (type(self.beparams), )) + " dict" % (type(self.beparams), ), + errors.ECODE_INVAL) # fill the beparams dict utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES) self.op.beparams = self.beparams @@ -3419,7 +3711,8 @@ class LUStartupInstance(LogicalUnit): if self.hvparams: if not isinstance(self.hvparams, dict): raise errors.OpPrereqError("Invalid hvparams passed: %s, expected" - " dict" % (type(self.hvparams), )) + " dict" % (type(self.hvparams), ), + errors.ECODE_INVAL) # check hypervisor parameter syntax (locally) cluster = self.cfg.GetClusterInfo() @@ -3442,7 +3735,7 @@ class LUStartupInstance(LogicalUnit): instance.name, instance.hypervisor) remote_info.Raise("Error checking node %s" % instance.primary_node, - prereq=True) + prereq=True, ecode=errors.ECODE_ENVIRON) if not remote_info.payload: # not running already _CheckNodeFreeMemory(self, instance.primary_node, "starting instance %s" % instance.name, @@ -3478,6 +3771,13 @@ class LURebootInstance(LogicalUnit): _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"] REQ_BGL = False + def CheckArguments(self): + """Check the arguments. + + """ + self.shutdown_timeout = getattr(self.op, "shutdown_timeout", + constants.DEFAULT_SHUTDOWN_TIMEOUT) + def ExpandNames(self): if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT, constants.INSTANCE_REBOOT_HARD, @@ -3497,6 +3797,7 @@ class LURebootInstance(LogicalUnit): env = { "IGNORE_SECONDARIES": self.op.ignore_secondaries, "REBOOT_TYPE": self.op.reboot_type, + "SHUTDOWN_TIMEOUT": self.shutdown_timeout, } env.update(_BuildInstanceHookEnvByObject(self, self.instance)) nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) @@ -3532,10 +3833,12 @@ class LURebootInstance(LogicalUnit): for disk in instance.disks: self.cfg.SetDiskID(disk, node_current) result = self.rpc.call_instance_reboot(node_current, instance, - reboot_type) + reboot_type, + self.shutdown_timeout) result.Raise("Could not reboot instance") else: - result = self.rpc.call_instance_shutdown(node_current, instance) + result = self.rpc.call_instance_shutdown(node_current, instance, + self.shutdown_timeout) result.Raise("Could not shutdown instance for full reboot") _ShutdownInstanceDisks(self, instance) _StartInstanceDisks(self, instance, ignore_secondaries) @@ -3558,6 +3861,13 @@ class LUShutdownInstance(LogicalUnit): _OP_REQP = ["instance_name"] REQ_BGL = False + def CheckArguments(self): + """Check the arguments. + + """ + self.timeout = getattr(self.op, "timeout", + constants.DEFAULT_SHUTDOWN_TIMEOUT) + def ExpandNames(self): self._ExpandAndLockInstance() @@ -3568,6 +3878,7 @@ class LUShutdownInstance(LogicalUnit): """ env = _BuildInstanceHookEnvByObject(self, self.instance) + env["TIMEOUT"] = self.timeout nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) return env, nl, nl @@ -3588,8 +3899,9 @@ class LUShutdownInstance(LogicalUnit): """ instance = self.instance node_current = instance.primary_node + timeout = self.timeout self.cfg.MarkInstanceDown(instance.name) - result = self.rpc.call_instance_shutdown(node_current, instance) + result = self.rpc.call_instance_shutdown(node_current, instance, timeout) msg = result.fail_msg if msg: self.proc.LogWarning("Could not shutdown instance: %s" % msg) @@ -3632,31 +3944,38 @@ class LUReinstallInstance(LogicalUnit): if instance.disk_template == constants.DT_DISKLESS: raise errors.OpPrereqError("Instance '%s' has no disks" % - self.op.instance_name) + self.op.instance_name, + errors.ECODE_INVAL) if instance.admin_up: raise errors.OpPrereqError("Instance '%s' is marked to be up" % - self.op.instance_name) + self.op.instance_name, + errors.ECODE_STATE) remote_info = self.rpc.call_instance_info(instance.primary_node, instance.name, instance.hypervisor) remote_info.Raise("Error checking node %s" % instance.primary_node, - prereq=True) + prereq=True, ecode=errors.ECODE_ENVIRON) if remote_info.payload: raise errors.OpPrereqError("Instance '%s' is running on the node %s" % (self.op.instance_name, - instance.primary_node)) + instance.primary_node), + errors.ECODE_STATE) self.op.os_type = getattr(self.op, "os_type", None) + self.op.force_variant = getattr(self.op, "force_variant", False) if self.op.os_type is not None: # OS verification pnode = self.cfg.GetNodeInfo( self.cfg.ExpandNodeName(instance.primary_node)) if pnode is None: raise errors.OpPrereqError("Primary node '%s' is unknown" % - self.op.pnode) + self.op.pnode, errors.ECODE_NOENT) result = self.rpc.call_os_get(pnode.name, self.op.os_type) result.Raise("OS '%s' not in supported OS list for primary node %s" % - (self.op.os_type, pnode.name), prereq=True) + (self.op.os_type, pnode.name), + prereq=True, ecode=errors.ECODE_INVAL) + if not self.op.force_variant: + _CheckOSVariant(result.payload, self.op.os_type) self.instance = instance @@ -3669,12 +3988,13 @@ class LUReinstallInstance(LogicalUnit): if self.op.os_type is not None: feedback_fn("Changing OS to '%s'..." % self.op.os_type) inst.os = self.op.os_type - self.cfg.Update(inst) + self.cfg.Update(inst, feedback_fn) _StartInstanceDisks(self, inst, None) try: feedback_fn("Running the instance OS create scripts...") - result = self.rpc.call_instance_os_add(inst.primary_node, inst, True) + # FIXME: pass debug option from opcode to backend + result = self.rpc.call_instance_os_add(inst.primary_node, inst, True, 0) result.Raise("Could not install OS for instance %s on node %s" % (inst.name, inst.primary_node)) finally: @@ -3695,12 +4015,12 @@ class LURecreateInstanceDisks(LogicalUnit): """ if not isinstance(self.op.disks, list): - raise errors.OpPrereqError("Invalid disks parameter") + raise errors.OpPrereqError("Invalid disks parameter", errors.ECODE_INVAL) for item in self.op.disks: if (not isinstance(item, int) or item < 0): raise errors.OpPrereqError("Invalid disk specification '%s'" % - str(item)) + str(item), errors.ECODE_INVAL) def ExpandNames(self): self._ExpandAndLockInstance() @@ -3728,26 +4048,27 @@ class LURecreateInstanceDisks(LogicalUnit): if instance.disk_template == constants.DT_DISKLESS: raise errors.OpPrereqError("Instance '%s' has no disks" % - self.op.instance_name) + self.op.instance_name, errors.ECODE_INVAL) if instance.admin_up: raise errors.OpPrereqError("Instance '%s' is marked to be up" % - self.op.instance_name) + self.op.instance_name, errors.ECODE_STATE) remote_info = self.rpc.call_instance_info(instance.primary_node, instance.name, instance.hypervisor) remote_info.Raise("Error checking node %s" % instance.primary_node, - prereq=True) + prereq=True, ecode=errors.ECODE_ENVIRON) if remote_info.payload: raise errors.OpPrereqError("Instance '%s' is running on the node %s" % (self.op.instance_name, - instance.primary_node)) + instance.primary_node), errors.ECODE_STATE) if not self.op.disks: self.op.disks = range(len(instance.disks)) else: for idx in self.op.disks: if idx >= len(instance.disks): - raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx) + raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx, + errors.ECODE_INVAL) self.instance = instance @@ -3756,7 +4077,7 @@ class LURecreateInstanceDisks(LogicalUnit): """ to_skip = [] - for idx, disk in enumerate(self.instance.disks): + for idx, _ in enumerate(self.instance.disks): if idx not in self.op.disks: # disk idx has not been passed in to_skip.append(idx) continue @@ -3793,36 +4114,37 @@ class LURenameInstance(LogicalUnit): self.cfg.ExpandInstanceName(self.op.instance_name)) if instance is None: raise errors.OpPrereqError("Instance '%s' not known" % - self.op.instance_name) + self.op.instance_name, errors.ECODE_NOENT) _CheckNodeOnline(self, instance.primary_node) if instance.admin_up: raise errors.OpPrereqError("Instance '%s' is marked to be up" % - self.op.instance_name) + self.op.instance_name, errors.ECODE_STATE) remote_info = self.rpc.call_instance_info(instance.primary_node, instance.name, instance.hypervisor) remote_info.Raise("Error checking node %s" % instance.primary_node, - prereq=True) + prereq=True, ecode=errors.ECODE_ENVIRON) if remote_info.payload: raise errors.OpPrereqError("Instance '%s' is running on the node %s" % (self.op.instance_name, - instance.primary_node)) + instance.primary_node), errors.ECODE_STATE) self.instance = instance # new name verification - name_info = utils.HostInfo(self.op.new_name) + name_info = utils.GetHostInfo(self.op.new_name) self.op.new_name = new_name = name_info.name instance_list = self.cfg.GetInstanceList() if new_name in instance_list: raise errors.OpPrereqError("Instance '%s' is already in the cluster" % - new_name) + new_name, errors.ECODE_EXISTS) if not getattr(self.op, "ignore_ip", False): if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT): raise errors.OpPrereqError("IP %s of instance %s already in use" % - (name_info.ip, new_name)) + (name_info.ip, new_name), + errors.ECODE_NOTUNIQUE) def Exec(self, feedback_fn): @@ -3856,7 +4178,7 @@ class LURenameInstance(LogicalUnit): _StartInstanceDisks(self, inst, None) try: result = self.rpc.call_instance_run_rename(inst.primary_node, inst, - old_name) + old_name, 0) msg = result.fail_msg if msg: msg = ("Could not run OS rename script for instance %s on node %s" @@ -3876,6 +4198,13 @@ class LURemoveInstance(LogicalUnit): _OP_REQP = ["instance_name", "ignore_failures"] REQ_BGL = False + def CheckArguments(self): + """Check the arguments. + + """ + self.shutdown_timeout = getattr(self.op, "shutdown_timeout", + constants.DEFAULT_SHUTDOWN_TIMEOUT) + def ExpandNames(self): self._ExpandAndLockInstance() self.needed_locks[locking.LEVEL_NODE] = [] @@ -3892,6 +4221,7 @@ class LURemoveInstance(LogicalUnit): """ env = _BuildInstanceHookEnvByObject(self, self.instance) + env["SHUTDOWN_TIMEOUT"] = self.shutdown_timeout nl = [self.cfg.GetMasterNode()] return env, nl, nl @@ -3913,7 +4243,8 @@ class LURemoveInstance(LogicalUnit): logging.info("Shutting down instance %s on node %s", instance.name, instance.primary_node) - result = self.rpc.call_instance_shutdown(instance.primary_node, instance) + result = self.rpc.call_instance_shutdown(instance.primary_node, instance, + self.shutdown_timeout) msg = result.fail_msg if msg: if self.op.ignore_failures: @@ -3941,8 +4272,11 @@ class LUQueryInstances(NoHooksLU): """Logical unit for querying instances. """ + # pylint: disable-msg=W0142 _OP_REQP = ["output_fields", "names", "use_locking"] REQ_BGL = False + _SIMPLE_FIELDS = ["name", "os", "network_port", "hypervisor", + "serial_no", "ctime", "mtime", "uuid"] _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes", "admin_state", "disk_template", "ip", "mac", "bridge", @@ -3955,11 +4289,11 @@ class LUQueryInstances(NoHooksLU): r"(nic)\.(bridge)/([0-9]+)", r"(nic)\.(macs|ips|modes|links|bridges)", r"(disk|nic)\.(count)", - "serial_no", "hypervisor", "hvparams", - "ctime", "mtime", - ] + + "hvparams", + ] + _SIMPLE_FIELDS + ["hv/%s" % name - for name in constants.HVS_PARAMETERS] + + for name in constants.HVS_PARAMETERS + if name not in constants.HVC_GLOBALS] + ["be/%s" % name for name in constants.BES_PARAMETERS]) _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status") @@ -4000,6 +4334,8 @@ class LUQueryInstances(NoHooksLU): """Computes the list of nodes and their attributes. """ + # pylint: disable-msg=R0912 + # way too many branches here all_info = self.cfg.GetAllInstancesInfo() if self.wanted == locking.ALL_SET: # caller didn't specify instance names, so ordering is not important @@ -4037,7 +4373,7 @@ class LUQueryInstances(NoHooksLU): if result.offline: # offline nodes will be in both lists off_nodes.append(name) - if result.failed or result.fail_msg: + if result.fail_msg: bad_nodes.append(name) else: if result.payload: @@ -4054,16 +4390,14 @@ class LUQueryInstances(NoHooksLU): cluster = self.cfg.GetClusterInfo() for instance in instance_list: iout = [] - i_hv = cluster.FillHV(instance) + i_hv = cluster.FillHV(instance, skip_globals=True) i_be = cluster.FillBE(instance) i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT], nic.nicparams) for nic in instance.nics] for field in self.op.output_fields: st_match = self._FIELDS_STATIC.Matches(field) - if field == "name": - val = instance.name - elif field == "os": - val = instance.os + if field in self._SIMPLE_FIELDS: + val = getattr(instance, field) elif field == "pnode": val = instance.primary_node elif field == "snodes": @@ -4140,20 +4474,11 @@ class LUQueryInstances(NoHooksLU): val = _ComputeDiskSize(instance.disk_template, disk_sizes) elif field == "tags": val = list(instance.GetTags()) - elif field == "serial_no": - val = instance.serial_no - elif field == "ctime": - val = instance.ctime - elif field == "mtime": - val = instance.mtime - elif field == "network_port": - val = instance.network_port - elif field == "hypervisor": - val = instance.hypervisor elif field == "hvparams": val = i_hv elif (field.startswith(HVPREFIX) and - field[len(HVPREFIX):] in constants.HVS_PARAMETERS): + field[len(HVPREFIX):] in constants.HVS_PARAMETERS and + field[len(HVPREFIX):] not in constants.HVC_GLOBALS): val = i_hv.get(field[len(HVPREFIX):], None) elif field == "beparams": val = i_be @@ -4235,6 +4560,13 @@ class LUFailoverInstance(LogicalUnit): _OP_REQP = ["instance_name", "ignore_consistency"] REQ_BGL = False + def CheckArguments(self): + """Check the arguments. + + """ + self.shutdown_timeout = getattr(self.op, "shutdown_timeout", + constants.DEFAULT_SHUTDOWN_TIMEOUT) + def ExpandNames(self): self._ExpandAndLockInstance() self.needed_locks[locking.LEVEL_NODE] = [] @@ -4252,6 +4584,7 @@ class LUFailoverInstance(LogicalUnit): """ env = { "IGNORE_CONSISTENCY": self.op.ignore_consistency, + "SHUTDOWN_TIMEOUT": self.shutdown_timeout, } env.update(_BuildInstanceHookEnvByObject(self, self.instance)) nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes) @@ -4270,7 +4603,8 @@ class LUFailoverInstance(LogicalUnit): bep = self.cfg.GetClusterInfo().FillBE(instance) if instance.disk_template not in constants.DTS_NET_MIRROR: raise errors.OpPrereqError("Instance's disk layout is not" - " network mirrored, cannot failover.") + " network mirrored, cannot failover.", + errors.ECODE_STATE) secondary_nodes = instance.secondary_nodes if not secondary_nodes: @@ -4304,19 +4638,23 @@ class LUFailoverInstance(LogicalUnit): source_node = instance.primary_node target_node = instance.secondary_nodes[0] - feedback_fn("* checking disk consistency between source and target") - for dev in instance.disks: - # for drbd, these are drbd over lvm - if not _CheckDiskConsistency(self, dev, target_node, False): - if instance.admin_up and not self.op.ignore_consistency: - raise errors.OpExecError("Disk %s is degraded on target node," - " aborting failover." % dev.iv_name) + if instance.admin_up: + feedback_fn("* checking disk consistency between source and target") + for dev in instance.disks: + # for drbd, these are drbd over lvm + if not _CheckDiskConsistency(self, dev, target_node, False): + if not self.op.ignore_consistency: + raise errors.OpExecError("Disk %s is degraded on target node," + " aborting failover." % dev.iv_name) + else: + feedback_fn("* not checking disk consistency as instance is not running") feedback_fn("* shutting down instance on source node") logging.info("Shutting down instance %s on node %s", instance.name, source_node) - result = self.rpc.call_instance_shutdown(source_node, instance) + result = self.rpc.call_instance_shutdown(source_node, instance, + self.shutdown_timeout) msg = result.fail_msg if msg: if self.op.ignore_consistency: @@ -4335,7 +4673,7 @@ class LUFailoverInstance(LogicalUnit): instance.primary_node = target_node # distribute new instance config to the other nodes - self.cfg.Update(instance) + self.cfg.Update(instance, feedback_fn) # Only start the instance if it's marked as up if instance.admin_up: @@ -4408,12 +4746,19 @@ class LUMoveInstance(LogicalUnit): _OP_REQP = ["instance_name", "target_node"] REQ_BGL = False + def CheckArguments(self): + """Check the arguments. + + """ + self.shutdown_timeout = getattr(self.op, "shutdown_timeout", + constants.DEFAULT_SHUTDOWN_TIMEOUT) + def ExpandNames(self): self._ExpandAndLockInstance() target_node = self.cfg.ExpandNodeName(self.op.target_node) if target_node is None: raise errors.OpPrereqError("Node '%s' not known" % - self.op.target_node) + self.op.target_node, errors.ECODE_NOENT) self.op.target_node = target_node self.needed_locks[locking.LEVEL_NODE] = [target_node] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND @@ -4430,6 +4775,7 @@ class LUMoveInstance(LogicalUnit): """ env = { "TARGET_NODE": self.op.target_node, + "SHUTDOWN_TIMEOUT": self.shutdown_timeout, } env.update(_BuildInstanceHookEnvByObject(self, self.instance)) nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node, @@ -4454,14 +4800,15 @@ class LUMoveInstance(LogicalUnit): if target_node == instance.primary_node: raise errors.OpPrereqError("Instance %s is already on the node %s" % - (instance.name, target_node)) + (instance.name, target_node), + errors.ECODE_STATE) bep = self.cfg.GetClusterInfo().FillBE(instance) for idx, dsk in enumerate(instance.disks): if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE): raise errors.OpPrereqError("Instance disk %d has a complex layout," - " cannot copy") + " cannot copy" % idx, errors.ECODE_STATE) _CheckNodeOnline(self, target_node) _CheckNodeNotDrained(self, target_node) @@ -4493,7 +4840,8 @@ class LUMoveInstance(LogicalUnit): self.LogInfo("Shutting down instance %s on source node %s", instance.name, source_node) - result = self.rpc.call_instance_shutdown(source_node, instance) + result = self.rpc.call_instance_shutdown(source_node, instance, + self.shutdown_timeout) msg = result.fail_msg if msg: if self.op.ignore_consistency: @@ -4550,7 +4898,7 @@ class LUMoveInstance(LogicalUnit): (",".join(errs),)) instance.primary_node = target_node - self.cfg.Update(instance) + self.cfg.Update(instance, feedback_fn) self.LogInfo("Removing the disks on the original node") _RemoveDisks(self, instance, target_node=source_node) @@ -4586,7 +4934,8 @@ class LUMigrateNode(LogicalUnit): def ExpandNames(self): self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name) if self.op.node_name is None: - raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name) + raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name, + errors.ECODE_NOENT) self.needed_locks = { locking.LEVEL_NODE: [self.op.node_name], @@ -4650,11 +4999,11 @@ class TLMigrateInstance(Tasklet): self.cfg.ExpandInstanceName(self.instance_name)) if instance is None: raise errors.OpPrereqError("Instance '%s' not known" % - self.instance_name) + self.instance_name, errors.ECODE_NOENT) if instance.disk_template != constants.DT_DRBD8: raise errors.OpPrereqError("Instance's disk layout is not" - " drbd8, cannot migrate.") + " drbd8, cannot migrate.", errors.ECODE_STATE) secondary_nodes = instance.secondary_nodes if not secondary_nodes: @@ -4676,7 +5025,8 @@ class TLMigrateInstance(Tasklet): _CheckNodeNotDrained(self, target_node) result = self.rpc.call_instance_migratable(instance.primary_node, instance) - result.Raise("Can't migrate, please use failover", prereq=True) + result.Raise("Can't migrate, please use failover", + prereq=True, ecode=errors.ECODE_STATE) self.instance = instance @@ -4788,7 +5138,7 @@ class TLMigrateInstance(Tasklet): self.feedback_fn("* instance running on secondary node (%s)," " updating config" % target_node) instance.primary_node = target_node - self.cfg.Update(instance) + self.cfg.Update(instance, self.feedback_fn) demoted_node = source_node else: self.feedback_fn("* instance confirmed to be running on its" @@ -4838,8 +5188,8 @@ class TLMigrateInstance(Tasklet): False) abort_msg = abort_result.fail_msg if abort_msg: - logging.error("Aborting migration failed on target node %s: %s" % - (target_node, abort_msg)) + logging.error("Aborting migration failed on target node %s: %s", + target_node, abort_msg) # Don't raise an exception here, as we stil have to try to revert the # disk status, even if this step failed. @@ -4893,6 +5243,7 @@ class TLMigrateInstance(Tasklet): if msg: logging.error("Instance pre-migration failed, trying to revert" " disk status: %s", msg) + self.feedback_fn("Pre-migration failed, aborting") self._AbortMigration() self._RevertDiskStatus() raise errors.OpExecError("Could not pre-migrate instance %s: %s" % @@ -4907,6 +5258,7 @@ class TLMigrateInstance(Tasklet): if msg: logging.error("Instance migration failed, trying to revert" " disk status: %s", msg) + self.feedback_fn("Migration failed, aborting") self._AbortMigration() self._RevertDiskStatus() raise errors.OpExecError("Could not migrate instance %s: %s" % @@ -4915,7 +5267,7 @@ class TLMigrateInstance(Tasklet): instance.primary_node = target_node # distribute new instance config to the other nodes - self.cfg.Update(instance) + self.cfg.Update(instance, self.feedback_fn) result = self.rpc.call_finalize_migration(target_node, instance, @@ -4924,7 +5276,7 @@ class TLMigrateInstance(Tasklet): msg = result.fail_msg if msg: logging.error("Instance migration succeeded, but finalization failed:" - " %s" % msg) + " %s", msg) raise errors.OpExecError("Could not finalize instance migration: %s" % msg) @@ -5038,7 +5390,7 @@ def _GenerateUniqueNames(lu, exts): """ results = [] for val in exts: - new_id = lu.cfg.GenerateUniqueID() + new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) results.append("%s%s" % (new_id, val)) return results @@ -5050,7 +5402,7 @@ def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name, """ port = lu.cfg.AllocatePort() vgname = lu.cfg.GetVGName() - shared_secret = lu.cfg.GenerateDRBDSecret() + shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId()) dev_data = objects.Disk(dev_type=constants.LD_LV, size=size, logical_id=(vgname, names[0])) dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128, @@ -5167,7 +5519,7 @@ def _CreateDisks(lu, instance, to_skip=None, target_node=None): result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir) result.Raise("Failed to create directory '%s' on" - " node %s: %s" % (file_storage_dir, pnode)) + " node %s" % (file_storage_dir, pnode)) # Note: this needs to be kept in sync with adding of disks in # LUSetInstanceParams @@ -5218,10 +5570,10 @@ def _RemoveDisks(lu, instance, target_node=None): if instance.disk_template == constants.DT_FILE: file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1]) - if target_node is node: - tgt = instance.primary_node + if target_node: + tgt = target_node else: - tgt = instance.target_node + tgt = instance.primary_node result = lu.rpc.call_file_storage_dir_remove(tgt, file_storage_dir) if result.fail_msg: lu.LogWarning("Could not remove directory '%s' on node %s: %s", @@ -5290,13 +5642,26 @@ class LUCreateInstance(LogicalUnit): "hvparams", "beparams"] REQ_BGL = False + def CheckArguments(self): + """Check arguments. + + """ + # do not require name_check to ease forward/backward compatibility + # for tools + if not hasattr(self.op, "name_check"): + self.op.name_check = True + if self.op.ip_check and not self.op.name_check: + # TODO: make the ip check more flexible and not depend on the name check + raise errors.OpPrereqError("Cannot do ip checks without a name check", + errors.ECODE_INVAL) + def _ExpandNode(self, node): """Expands and checks one node name. """ node_full = self.cfg.ExpandNodeName(node) if node_full is None: - raise errors.OpPrereqError("Unknown node %s" % node) + raise errors.OpPrereqError("Unknown node %s" % node, errors.ECODE_NOENT) return node_full def ExpandNames(self): @@ -5318,11 +5683,12 @@ class LUCreateInstance(LogicalUnit): if self.op.mode not in (constants.INSTANCE_CREATE, constants.INSTANCE_IMPORT): raise errors.OpPrereqError("Invalid instance creation mode '%s'" % - self.op.mode) + self.op.mode, errors.ECODE_INVAL) # disk template and mirror node verification if self.op.disk_template not in constants.DISK_TEMPLATES: - raise errors.OpPrereqError("Invalid disk template name") + raise errors.OpPrereqError("Invalid disk template name", + errors.ECODE_INVAL) if self.op.hypervisor is None: self.op.hypervisor = self.cfg.GetHypervisorType() @@ -5332,7 +5698,8 @@ class LUCreateInstance(LogicalUnit): if self.op.hypervisor not in enabled_hvs: raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the" " cluster (%s)" % (self.op.hypervisor, - ",".join(enabled_hvs))) + ",".join(enabled_hvs)), + errors.ECODE_STATE) # check hypervisor parameter syntax (locally) utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES) @@ -5341,6 +5708,8 @@ class LUCreateInstance(LogicalUnit): hv_type = hypervisor.GetHypervisor(self.op.hypervisor) hv_type.CheckParameterSyntax(filled_hvp) self.hv_full = filled_hvp + # check that we don't specify global parameters on an instance + _CheckGlobalHvParams(self.op.hvparams) # fill and remember the beparams dict utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES) @@ -5350,14 +5719,20 @@ class LUCreateInstance(LogicalUnit): #### instance parameters check # instance name verification - hostname1 = utils.HostInfo(self.op.instance_name) - self.op.instance_name = instance_name = hostname1.name + if self.op.name_check: + hostname1 = utils.GetHostInfo(self.op.instance_name) + self.op.instance_name = instance_name = hostname1.name + # used in CheckPrereq for ip ping check + self.check_ip = hostname1.ip + else: + instance_name = self.op.instance_name + self.check_ip = None # this is just a preventive check, but someone might still add this # instance in the meantime, and creation will fail at lock-add time if instance_name in self.cfg.GetInstanceList(): raise errors.OpPrereqError("Instance '%s' is already in the cluster" % - instance_name) + instance_name, errors.ECODE_EXISTS) self.add_locks[locking.LEVEL_INSTANCE] = instance_name @@ -5380,37 +5755,44 @@ class LUCreateInstance(LogicalUnit): if ip is None or ip.lower() == constants.VALUE_NONE: nic_ip = None elif ip.lower() == constants.VALUE_AUTO: + if not self.op.name_check: + raise errors.OpPrereqError("IP address set to auto but name checks" + " have been skipped. Aborting.", + errors.ECODE_INVAL) nic_ip = hostname1.ip else: if not utils.IsValidIP(ip): raise errors.OpPrereqError("Given IP address '%s' doesn't look" - " like a valid IP" % ip) + " like a valid IP" % ip, + errors.ECODE_INVAL) nic_ip = ip - # TODO: check the ip for uniqueness !! + # TODO: check the ip address for uniqueness if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip: - raise errors.OpPrereqError("Routed nic mode requires an ip address") + raise errors.OpPrereqError("Routed nic mode requires an ip address", + errors.ECODE_INVAL) # MAC address verification mac = nic.get("mac", constants.VALUE_AUTO) if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE): - if not utils.IsValidMac(mac.lower()): - raise errors.OpPrereqError("Invalid MAC address specified: %s" % - mac) - else: - # or validate/reserve the current one - if self.cfg.IsMacInUse(mac): - raise errors.OpPrereqError("MAC address %s already in use" - " in cluster" % mac) + mac = utils.NormalizeAndValidateMac(mac) + + try: + self.cfg.ReserveMAC(mac, self.proc.GetECId()) + except errors.ReservationError: + raise errors.OpPrereqError("MAC address %s already in use" + " in cluster" % mac, + errors.ECODE_NOTUNIQUE) # bridge verification bridge = nic.get("bridge", None) link = nic.get("link", None) if bridge and link: raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'" - " at the same time") + " at the same time", errors.ECODE_INVAL) elif bridge and nic_mode == constants.NIC_MODE_ROUTED: - raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic") + raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic", + errors.ECODE_INVAL) elif bridge: link = bridge @@ -5431,32 +5813,32 @@ class LUCreateInstance(LogicalUnit): mode = disk.get("mode", constants.DISK_RDWR) if mode not in constants.DISK_ACCESS_SET: raise errors.OpPrereqError("Invalid disk access mode '%s'" % - mode) + mode, errors.ECODE_INVAL) size = disk.get("size", None) if size is None: - raise errors.OpPrereqError("Missing disk size") + raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL) try: size = int(size) - except ValueError: - raise errors.OpPrereqError("Invalid disk size '%s'" % size) + except (TypeError, ValueError): + raise errors.OpPrereqError("Invalid disk size '%s'" % size, + errors.ECODE_INVAL) self.disks.append({"size": size, "mode": mode}) - # used in CheckPrereq for ip ping check - self.check_ip = hostname1.ip - # file storage checks if (self.op.file_driver and not self.op.file_driver in constants.FILE_DRIVER): raise errors.OpPrereqError("Invalid file driver name '%s'" % - self.op.file_driver) + self.op.file_driver, errors.ECODE_INVAL) if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir): - raise errors.OpPrereqError("File storage directory path not absolute") + raise errors.OpPrereqError("File storage directory path not absolute", + errors.ECODE_INVAL) ### Node/iallocator related checks if [self.op.iallocator, self.op.pnode].count(None) != 1: raise errors.OpPrereqError("One and only one of iallocator and primary" - " node must be given") + " node must be given", + errors.ECODE_INVAL) if self.op.iallocator: self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET @@ -5481,7 +5863,8 @@ class LUCreateInstance(LogicalUnit): self.op.src_node = None if os.path.isabs(src_path): raise errors.OpPrereqError("Importing an instance from an absolute" - " path requires a source node option.") + " path requires a source node option.", + errors.ECODE_INVAL) else: self.op.src_node = src_node = self._ExpandNode(src_node) if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET: @@ -5490,9 +5873,16 @@ class LUCreateInstance(LogicalUnit): self.op.src_path = src_path = \ os.path.join(constants.EXPORT_DIR, src_path) + # On import force_variant must be True, because if we forced it at + # initial install, our only chance when importing it back is that it + # works again! + self.op.force_variant = True + else: # INSTANCE_CREATE if getattr(self.op, "os_type", None) is None: - raise errors.OpPrereqError("No guest OS specified") + raise errors.OpPrereqError("No guest OS specified", + errors.ECODE_INVAL) + self.op.force_variant = getattr(self.op, "force_variant", False) def _RunAllocator(self): """Run the allocator based on input opcode. @@ -5516,17 +5906,18 @@ class LUCreateInstance(LogicalUnit): if not ial.success: raise errors.OpPrereqError("Can't compute nodes using" - " iallocator '%s': %s" % (self.op.iallocator, - ial.info)) + " iallocator '%s': %s" % + (self.op.iallocator, ial.info), + errors.ECODE_NORES) if len(ial.nodes) != ial.required_nodes: raise errors.OpPrereqError("iallocator '%s' returned invalid number" " of nodes (%s), required %s" % (self.op.iallocator, len(ial.nodes), - ial.required_nodes)) + ial.required_nodes), errors.ECODE_FAULT) self.op.pnode = ial.nodes[0] self.LogInfo("Selected nodes for instance %s via iallocator %s: %s", self.op.instance_name, self.op.iallocator, - ", ".join(ial.nodes)) + utils.CommaJoin(ial.nodes)) if ial.required_nodes == 2: self.op.snode = ial.nodes[1] @@ -5572,7 +5963,7 @@ class LUCreateInstance(LogicalUnit): if (not self.cfg.GetVGName() and self.op.disk_template not in constants.DTS_NOT_LVM): raise errors.OpPrereqError("Cluster does not support lvm-based" - " instances") + " instances", errors.ECODE_STATE) if self.op.mode == constants.INSTANCE_IMPORT: src_node = self.op.src_node @@ -5593,7 +5984,7 @@ class LUCreateInstance(LogicalUnit): break if not found: raise errors.OpPrereqError("No export found for relative path %s" % - src_path) + src_path, errors.ECODE_INVAL) _CheckNodeOnline(self, src_node) result = self.rpc.call_export_info(src_node, src_path) @@ -5601,12 +5992,14 @@ class LUCreateInstance(LogicalUnit): export_info = objects.SerializableConfigParser.Loads(str(result.payload)) if not export_info.has_section(constants.INISECT_EXP): - raise errors.ProgrammerError("Corrupted export config") + raise errors.ProgrammerError("Corrupted export config", + errors.ECODE_ENVIRON) ei_version = export_info.get(constants.INISECT_EXP, 'version') if (int(ei_version) != constants.EXPORT_VERSION): raise errors.OpPrereqError("Wrong export version %s (wanted %d)" % - (ei_version, constants.EXPORT_VERSION)) + (ei_version, constants.EXPORT_VERSION), + errors.ECODE_ENVIRON) # Check that the new instance doesn't have less disks than the export instance_disks = len(self.disks) @@ -5614,7 +6007,8 @@ class LUCreateInstance(LogicalUnit): if instance_disks < export_disks: raise errors.OpPrereqError("Not enough disks to import." " (instance: %d, export: %d)" % - (instance_disks, export_disks)) + (instance_disks, export_disks), + errors.ECODE_INVAL) self.op.os_type = export_info.get(constants.INISECT_EXP, 'os') disk_images = [] @@ -5640,15 +6034,13 @@ class LUCreateInstance(LogicalUnit): nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini) # ENDIF: self.op.mode == constants.INSTANCE_IMPORT - # ip ping checks (we use the same ip that was resolved in ExpandNames) - if self.op.start and not self.op.ip_check: - raise errors.OpPrereqError("Cannot ignore IP address conflicts when" - " adding an instance in start mode") + # ip ping checks (we use the same ip that was resolved in ExpandNames) if self.op.ip_check: if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT): raise errors.OpPrereqError("IP %s of instance %s already in use" % - (self.check_ip, self.op.instance_name)) + (self.check_ip, self.op.instance_name), + errors.ECODE_NOTUNIQUE) #### mac address generation # By generating here the mac address both the allocator and the hooks get @@ -5660,7 +6052,7 @@ class LUCreateInstance(LogicalUnit): # creation job will fail. for nic in self.nics: if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE): - nic.mac = self.cfg.GenerateMAC() + nic.mac = self.cfg.GenerateMAC(self.proc.GetECId()) #### allocator run @@ -5675,10 +6067,10 @@ class LUCreateInstance(LogicalUnit): "Cannot retrieve locked node %s" % self.op.pnode if pnode.offline: raise errors.OpPrereqError("Cannot use offline primary node '%s'" % - pnode.name) + pnode.name, errors.ECODE_STATE) if pnode.drained: raise errors.OpPrereqError("Cannot use drained primary node '%s'" % - pnode.name) + pnode.name, errors.ECODE_STATE) self.secondaries = [] @@ -5686,10 +6078,10 @@ class LUCreateInstance(LogicalUnit): if self.op.disk_template in constants.DTS_NET_MIRROR: if self.op.snode is None: raise errors.OpPrereqError("The networked disk templates need" - " a mirror node") + " a mirror node", errors.ECODE_INVAL) if self.op.snode == pnode.name: - raise errors.OpPrereqError("The secondary node cannot be" - " the primary node.") + raise errors.OpPrereqError("The secondary node cannot be the" + " primary node.", errors.ECODE_INVAL) _CheckNodeOnline(self, self.op.snode) _CheckNodeNotDrained(self, self.op.snode) self.secondaries.append(self.op.snode) @@ -5710,18 +6102,22 @@ class LUCreateInstance(LogicalUnit): vg_free = info.get('vg_free', None) if not isinstance(vg_free, int): raise errors.OpPrereqError("Can't compute free disk space on" - " node %s" % node) + " node %s" % node, errors.ECODE_ENVIRON) if req_size > vg_free: raise errors.OpPrereqError("Not enough disk space on target node %s." " %d MB available, %d MB required" % - (node, vg_free, req_size)) + (node, vg_free, req_size), + errors.ECODE_NORES) _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams) # os verification result = self.rpc.call_os_get(pnode.name, self.op.os_type) result.Raise("OS '%s' not in supported os list for primary node %s" % - (self.op.os_type, pnode.name), prereq=True) + (self.op.os_type, pnode.name), + prereq=True, ecode=errors.ECODE_INVAL) + if not self.op.force_variant: + _CheckOSVariant(result.payload, self.op.os_type) _CheckNicsBridgesExist(self, self.nics, self.pnode.name) @@ -5795,7 +6191,8 @@ class LUCreateInstance(LogicalUnit): feedback_fn("adding instance %s to cluster config" % instance) - self.cfg.AddInstance(iobj) + self.cfg.AddInstance(iobj, self.proc.GetECId()) + # Declare that we don't want to remove the instance lock anymore, as we've # added the instance to the config del self.remove_locks[locking.LEVEL_INSTANCE] @@ -5834,7 +6231,8 @@ class LUCreateInstance(LogicalUnit): if iobj.disk_template != constants.DT_DISKLESS: if self.op.mode == constants.INSTANCE_CREATE: feedback_fn("* running the instance OS create scripts...") - result = self.rpc.call_instance_os_add(pnode_name, iobj, False) + # FIXME: pass debug option from opcode to backend + result = self.rpc.call_instance_os_add(pnode_name, iobj, False, 0) result.Raise("Could not add os for instance %s" " on node %s" % (instance, pnode_name)) @@ -5843,9 +6241,10 @@ class LUCreateInstance(LogicalUnit): src_node = self.op.src_node src_images = self.src_images cluster_name = self.cfg.GetClusterName() + # FIXME: pass debug option from opcode to backend import_result = self.rpc.call_instance_os_import(pnode_name, iobj, src_node, src_images, - cluster_name) + cluster_name, 0) msg = import_result.fail_msg if msg: self.LogWarning("Error while importing the disk images for instance" @@ -5857,7 +6256,7 @@ class LUCreateInstance(LogicalUnit): if self.op.start: iobj.admin_up = True - self.cfg.Update(iobj) + self.cfg.Update(iobj, feedback_fn) logging.info("Starting instance %s on node %s", instance, pnode_name) feedback_fn("* starting instance...") result = self.rpc.call_instance_start(pnode_name, iobj, None, None) @@ -5947,7 +6346,7 @@ class LUReplaceDisks(LogicalUnit): remote_node = self.cfg.ExpandNodeName(self.op.remote_node) if remote_node is None: raise errors.OpPrereqError("Node '%s' not known" % - self.op.remote_node) + self.op.remote_node, errors.ECODE_NOENT) self.op.remote_node = remote_node @@ -5964,7 +6363,7 @@ class LUReplaceDisks(LogicalUnit): self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode, self.op.iallocator, self.op.remote_node, - self.op.disks) + self.op.disks, False) self.tasklets = [self.replacer] @@ -6019,7 +6418,8 @@ class LUEvacuateNode(LogicalUnit): def ExpandNames(self): self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name) if self.op.node_name is None: - raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name) + raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name, + errors.ECODE_NOENT) self.needed_locks = {} @@ -6031,7 +6431,7 @@ class LUEvacuateNode(LogicalUnit): remote_node = self.cfg.ExpandNodeName(self.op.remote_node) if remote_node is None: raise errors.OpPrereqError("Node '%s' not known" % - self.op.remote_node) + self.op.remote_node, errors.ECODE_NOENT) self.op.remote_node = remote_node @@ -6043,7 +6443,7 @@ class LUEvacuateNode(LogicalUnit): self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND else: - raise errors.OpPrereqError("Invalid parameters") + raise errors.OpPrereqError("Invalid parameters", errors.ECODE_INVAL) # Create tasklets for replacing disks for all secondary instances on this # node @@ -6055,7 +6455,8 @@ class LUEvacuateNode(LogicalUnit): names.append(inst.name) replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG, - self.op.iallocator, self.op.remote_node, []) + self.op.iallocator, self.op.remote_node, [], + True) tasklets.append(replacer) self.tasklets = tasklets @@ -6097,7 +6498,7 @@ class TLReplaceDisks(Tasklet): """ def __init__(self, lu, instance_name, mode, iallocator_name, remote_node, - disks): + disks, delay_iallocator): """Initializes this class. """ @@ -6109,6 +6510,7 @@ class TLReplaceDisks(Tasklet): self.iallocator_name = iallocator_name self.remote_node = remote_node self.disks = disks + self.delay_iallocator = delay_iallocator # Runtime data self.instance = None @@ -6128,17 +6530,17 @@ class TLReplaceDisks(Tasklet): if remote_node is None and iallocator is None: raise errors.OpPrereqError("When changing the secondary either an" " iallocator script must be used or the" - " new node given") + " new node given", errors.ECODE_INVAL) if remote_node is not None and iallocator is not None: raise errors.OpPrereqError("Give either the iallocator or the new" - " secondary, not both") + " secondary, not both", errors.ECODE_INVAL) elif remote_node is not None or iallocator is not None: # Not replacing the secondary raise errors.OpPrereqError("The iallocator and new node options can" " only be used when changing the" - " secondary node") + " secondary node", errors.ECODE_INVAL) @staticmethod def _RunAllocator(lu, iallocator_name, instance_name, relocate_from): @@ -6154,12 +6556,15 @@ class TLReplaceDisks(Tasklet): if not ial.success: raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':" - " %s" % (iallocator_name, ial.info)) + " %s" % (iallocator_name, ial.info), + errors.ECODE_NORES) if len(ial.nodes) != ial.required_nodes: raise errors.OpPrereqError("iallocator '%s' returned invalid number" " of nodes (%s), required %s" % - (len(ial.nodes), ial.required_nodes)) + (iallocator_name, + len(ial.nodes), ial.required_nodes), + errors.ECODE_FAULT) remote_node_name = ial.nodes[0] @@ -6178,26 +6583,40 @@ class TLReplaceDisks(Tasklet): This checks that the instance is in the cluster. """ - self.instance = self.cfg.GetInstanceInfo(self.instance_name) - assert self.instance is not None, \ + self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name) + assert instance is not None, \ "Cannot retrieve locked instance %s" % self.instance_name - if self.instance.disk_template != constants.DT_DRBD8: + if instance.disk_template != constants.DT_DRBD8: raise errors.OpPrereqError("Can only run replace disks for DRBD8-based" - " instances") + " instances", errors.ECODE_INVAL) - if len(self.instance.secondary_nodes) != 1: + if len(instance.secondary_nodes) != 1: raise errors.OpPrereqError("The instance has a strange layout," " expected one secondary but found %d" % - len(self.instance.secondary_nodes)) + len(instance.secondary_nodes), + errors.ECODE_FAULT) + + if not self.delay_iallocator: + self._CheckPrereq2() + + def _CheckPrereq2(self): + """Check prerequisites, second part. + + This function should always be part of CheckPrereq. It was separated and is + now called from Exec because during node evacuation iallocator was only + called with an unmodified cluster model, not taking planned changes into + account. - secondary_node = self.instance.secondary_nodes[0] + """ + instance = self.instance + secondary_node = instance.secondary_nodes[0] if self.iallocator_name is None: remote_node = self.remote_node else: remote_node = self._RunAllocator(self.lu, self.iallocator_name, - self.instance.name, secondary_node) + instance.name, instance.secondary_nodes) if remote_node is not None: self.remote_node_info = self.cfg.GetNodeInfo(remote_node) @@ -6208,34 +6627,37 @@ class TLReplaceDisks(Tasklet): if remote_node == self.instance.primary_node: raise errors.OpPrereqError("The specified node is the primary node of" - " the instance.") + " the instance.", errors.ECODE_INVAL) if remote_node == secondary_node: raise errors.OpPrereqError("The specified node is already the" - " secondary node of the instance.") + " secondary node of the instance.", + errors.ECODE_INVAL) if self.disks and self.mode in (constants.REPLACE_DISK_AUTO, constants.REPLACE_DISK_CHG): - raise errors.OpPrereqError("Cannot specify disks to be replaced") + raise errors.OpPrereqError("Cannot specify disks to be replaced", + errors.ECODE_INVAL) if self.mode == constants.REPLACE_DISK_AUTO: - faulty_primary = self._FindFaultyDisks(self.instance.primary_node) + faulty_primary = self._FindFaultyDisks(instance.primary_node) faulty_secondary = self._FindFaultyDisks(secondary_node) if faulty_primary and faulty_secondary: raise errors.OpPrereqError("Instance %s has faulty disks on more than" " one node and can not be repaired" - " automatically" % self.instance_name) + " automatically" % self.instance_name, + errors.ECODE_STATE) if faulty_primary: self.disks = faulty_primary - self.target_node = self.instance.primary_node + self.target_node = instance.primary_node self.other_node = secondary_node check_nodes = [self.target_node, self.other_node] elif faulty_secondary: self.disks = faulty_secondary self.target_node = secondary_node - self.other_node = self.instance.primary_node + self.other_node = instance.primary_node check_nodes = [self.target_node, self.other_node] else: self.disks = [] @@ -6244,18 +6666,18 @@ class TLReplaceDisks(Tasklet): else: # Non-automatic modes if self.mode == constants.REPLACE_DISK_PRI: - self.target_node = self.instance.primary_node + self.target_node = instance.primary_node self.other_node = secondary_node check_nodes = [self.target_node, self.other_node] elif self.mode == constants.REPLACE_DISK_SEC: self.target_node = secondary_node - self.other_node = self.instance.primary_node + self.other_node = instance.primary_node check_nodes = [self.target_node, self.other_node] elif self.mode == constants.REPLACE_DISK_CHG: self.new_node = remote_node - self.other_node = self.instance.primary_node + self.other_node = instance.primary_node self.target_node = secondary_node check_nodes = [self.new_node, self.other_node] @@ -6274,7 +6696,7 @@ class TLReplaceDisks(Tasklet): # Check whether disks are valid for disk_idx in self.disks: - self.instance.FindDisk(disk_idx) + instance.FindDisk(disk_idx) # Get secondary node IP addresses node_2nd_ip = {} @@ -6291,12 +6713,15 @@ class TLReplaceDisks(Tasklet): This dispatches the disk replacement to the appropriate handler. """ + if self.delay_iallocator: + self._CheckPrereq2() + if not self.disks: feedback_fn("No disks need replacement") return feedback_fn("Replacing disk(s) %s for %s" % - (", ".join([str(i) for i in self.disks]), self.instance.name)) + (utils.CommaJoin(self.disks), self.instance.name)) activate_disks = (not self.instance.admin_up) @@ -6307,12 +6732,15 @@ class TLReplaceDisks(Tasklet): try: # Should we replace the secondary node? if self.new_node is not None: - return self._ExecDrbd8Secondary() + fn = self._ExecDrbd8Secondary else: - return self._ExecDrbd8DiskOnly() + fn = self._ExecDrbd8DiskOnly + + return fn(feedback_fn) finally: - # Deactivate the instance disks if we're replacing them on a down instance + # Deactivate the instance disks if we're replacing them on a + # down instance if activate_disks: _SafeShutdownInstanceDisks(self.lu, self.instance) @@ -6398,7 +6826,7 @@ class TLReplaceDisks(Tasklet): return iv_names def _CheckDevices(self, node_name, iv_names): - for name, (dev, old_lvs, new_lvs) in iv_names.iteritems(): + for name, (dev, _, _) in iv_names.iteritems(): self.cfg.SetDiskID(dev, node_name) result = self.rpc.call_blockdev_find(node_name, dev) @@ -6414,7 +6842,7 @@ class TLReplaceDisks(Tasklet): raise errors.OpExecError("DRBD device %s is degraded!" % name) def _RemoveOldStorage(self, node_name, iv_names): - for name, (dev, old_lvs, _) in iv_names.iteritems(): + for name, (_, old_lvs, _) in iv_names.iteritems(): self.lu.LogInfo("Remove logical volumes for %s" % name) for lv in old_lvs: @@ -6425,7 +6853,7 @@ class TLReplaceDisks(Tasklet): self.lu.LogWarning("Can't remove old LV: %s" % msg, hint="remove unused LVs manually") - def _ExecDrbd8DiskOnly(self): + def _ExecDrbd8DiskOnly(self, feedback_fn): """Replace a disk on the primary or secondary for DRBD 8. The algorithm for replace is quite complicated: @@ -6533,13 +6961,13 @@ class TLReplaceDisks(Tasklet): dev.children = new_lvs - self.cfg.Update(self.instance) + self.cfg.Update(self.instance, feedback_fn) # Wait for sync # This can fail as the old devices are degraded and _WaitForSync # does a combined result over all disks, so we don't check its return value self.lu.LogStep(5, steps_total, "Sync devices") - _WaitForSync(self.lu, self.instance, unlock=True) + _WaitForSync(self.lu, self.instance) # Check all devices manually self._CheckDevices(self.instance.primary_node, iv_names) @@ -6548,7 +6976,7 @@ class TLReplaceDisks(Tasklet): self.lu.LogStep(6, steps_total, "Removing old storage") self._RemoveOldStorage(self.target_node, iv_names) - def _ExecDrbd8Secondary(self): + def _ExecDrbd8Secondary(self, feedback_fn): """Replace the secondary node for DRBD 8. The algorithm for replace is quite complicated: @@ -6595,7 +7023,7 @@ class TLReplaceDisks(Tasklet): minors = self.cfg.AllocateDRBDMinor([self.new_node for dev in self.instance.disks], self.instance.name) - logging.debug("Allocated minors %r" % (minors,)) + logging.debug("Allocated minors %r", minors) iv_names = {} for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)): @@ -6609,6 +7037,7 @@ class TLReplaceDisks(Tasklet): if self.instance.primary_node == o_node1: p_minor = o_minor1 else: + assert self.instance.primary_node == o_node2, "Three-node instance?" p_minor = o_minor2 new_alone_id = (self.instance.primary_node, self.new_node, None, @@ -6661,7 +7090,7 @@ class TLReplaceDisks(Tasklet): dev.logical_id = new_logical_id self.cfg.SetDiskID(dev, self.instance.primary_node) - self.cfg.Update(self.instance) + self.cfg.Update(self.instance, feedback_fn) # and now perform the drbd attach self.lu.LogInfo("Attaching primary drbds to new secondary" @@ -6684,7 +7113,7 @@ class TLReplaceDisks(Tasklet): # This can fail as the old devices are degraded and _WaitForSync # does a combined result over all disks, so we don't check its return value self.lu.LogStep(5, steps_total, "Sync devices") - _WaitForSync(self.lu, self.instance, unlock=True) + _WaitForSync(self.lu, self.instance) # Check all devices manually self._CheckDevices(self.instance.primary_node, iv_names) @@ -6704,7 +7133,8 @@ class LURepairNodeStorage(NoHooksLU): def CheckArguments(self): node_name = self.cfg.ExpandNodeName(self.op.node_name) if node_name is None: - raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name) + raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name, + errors.ECODE_NOENT) self.op.node_name = node_name @@ -6714,10 +7144,18 @@ class LURepairNodeStorage(NoHooksLU): } def _CheckFaultyDisks(self, instance, node_name): - if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance, - node_name, True): - raise errors.OpPrereqError("Instance '%s' has faulty disks on" - " node '%s'" % (instance.name, node_name)) + """Ensure faulty disks abort the opcode or at least warn.""" + try: + if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance, + node_name, True): + raise errors.OpPrereqError("Instance '%s' has faulty disks on" + " node '%s'" % (instance.name, node_name), + errors.ECODE_STATE) + except errors.OpPrereqError, err: + if self.op.ignore_consistency: + self.proc.LogWarning(str(err.args[0])) + else: + raise def CheckPrereq(self): """Check prerequisites. @@ -6728,10 +7166,13 @@ class LURepairNodeStorage(NoHooksLU): if (constants.SO_FIX_CONSISTENCY not in constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])): raise errors.OpPrereqError("Storage units of type '%s' can not be" - " repaired" % storage_type) + " repaired" % storage_type, + errors.ECODE_INVAL) # Check whether any instance on this node has faulty disks for inst in _GetNodeInstances(self.cfg, self.op.node_name): + if not inst.admin_up: + continue check_nodes = set(inst.all_nodes) check_nodes.discard(self.op.node_name) for inst_node_name in check_nodes: @@ -6803,7 +7244,7 @@ class LUGrowDisk(LogicalUnit): if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8): raise errors.OpPrereqError("Instance's disk layout does not support" - " growing.") + " growing.", errors.ECODE_INVAL) self.disk = instance.FindDisk(self.op.disk) @@ -6815,11 +7256,12 @@ class LUGrowDisk(LogicalUnit): vg_free = info.payload.get('vg_free', None) if not isinstance(vg_free, int): raise errors.OpPrereqError("Can't compute free disk space on" - " node %s" % node) + " node %s" % node, errors.ECODE_ENVIRON) if self.op.amount > vg_free: raise errors.OpPrereqError("Not enough disk space on target node %s:" " %d MiB available, %d MiB required" % - (node, vg_free, self.op.amount)) + (node, vg_free, self.op.amount), + errors.ECODE_NORES) def Exec(self, feedback_fn): """Execute disk grow. @@ -6831,8 +7273,16 @@ class LUGrowDisk(LogicalUnit): self.cfg.SetDiskID(disk, node) result = self.rpc.call_blockdev_grow(node, disk, self.op.amount) result.Raise("Grow request failed to node %s" % node) + + # TODO: Rewrite code to work properly + # DRBD goes into sync mode for a short amount of time after executing the + # "resize" command. DRBD 8.x below version 8.0.13 contains a bug whereby + # calling "resize" in sync mode fails. Sleeping for a short amount of + # time is a work-around. + time.sleep(5) + disk.RecordGrow(self.op.amount) - self.cfg.Update(instance) + self.cfg.Update(instance, feedback_fn) if self.op.wait_for_sync: disk_abort = not _WaitForSync(self, instance) if disk_abort: @@ -6852,14 +7302,16 @@ class LUQueryInstanceData(NoHooksLU): self.share_locks = dict.fromkeys(locking.LEVELS, 1) if not isinstance(self.op.instances, list): - raise errors.OpPrereqError("Invalid argument type 'instances'") + raise errors.OpPrereqError("Invalid argument type 'instances'", + errors.ECODE_INVAL) if self.op.instances: self.wanted_names = [] for name in self.op.instances: full_name = self.cfg.ExpandInstanceName(name) if full_name is None: - raise errors.OpPrereqError("Instance '%s' not known" % name) + raise errors.OpPrereqError("Instance '%s' not known" % name, + errors.ECODE_NOENT) self.wanted_names.append(full_name) self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names else: @@ -6984,12 +7436,13 @@ class LUQueryInstanceData(NoHooksLU): "hypervisor": instance.hypervisor, "network_port": instance.network_port, "hv_instance": instance.hvparams, - "hv_actual": cluster.FillHV(instance), + "hv_actual": cluster.FillHV(instance, skip_globals=True), "be_instance": instance.beparams, "be_actual": cluster.FillBE(instance), "serial_no": instance.serial_no, "mtime": instance.mtime, "ctime": instance.ctime, + "uuid": instance.uuid, } result[instance.name] = idict @@ -7018,7 +7471,10 @@ class LUSetInstanceParams(LogicalUnit): self.op.force = getattr(self.op, "force", False) if not (self.op.nics or self.op.disks or self.op.hvparams or self.op.beparams): - raise errors.OpPrereqError("No changes submitted") + raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL) + + if self.op.hvparams: + _CheckGlobalHvParams(self.op.hvparams) # Disk validation disk_addremove = 0 @@ -7030,33 +7486,35 @@ class LUSetInstanceParams(LogicalUnit): disk_addremove += 1 else: if not isinstance(disk_op, int): - raise errors.OpPrereqError("Invalid disk index") + raise errors.OpPrereqError("Invalid disk index", errors.ECODE_INVAL) if not isinstance(disk_dict, dict): msg = "Invalid disk value: expected dict, got '%s'" % disk_dict - raise errors.OpPrereqError(msg) + raise errors.OpPrereqError(msg, errors.ECODE_INVAL) if disk_op == constants.DDM_ADD: mode = disk_dict.setdefault('mode', constants.DISK_RDWR) if mode not in constants.DISK_ACCESS_SET: - raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode) + raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode, + errors.ECODE_INVAL) size = disk_dict.get('size', None) if size is None: - raise errors.OpPrereqError("Required disk parameter size missing") + raise errors.OpPrereqError("Required disk parameter size missing", + errors.ECODE_INVAL) try: size = int(size) - except ValueError, err: + except (TypeError, ValueError), err: raise errors.OpPrereqError("Invalid disk size parameter: %s" % - str(err)) + str(err), errors.ECODE_INVAL) disk_dict['size'] = size else: # modification of disk if 'size' in disk_dict: raise errors.OpPrereqError("Disk size change not possible, use" - " grow-disk") + " grow-disk", errors.ECODE_INVAL) if disk_addremove > 1: raise errors.OpPrereqError("Only one disk add or remove operation" - " supported at a time") + " supported at a time", errors.ECODE_INVAL) # NIC validation nic_addremove = 0 @@ -7068,10 +7526,10 @@ class LUSetInstanceParams(LogicalUnit): nic_addremove += 1 else: if not isinstance(nic_op, int): - raise errors.OpPrereqError("Invalid nic index") + raise errors.OpPrereqError("Invalid nic index", errors.ECODE_INVAL) if not isinstance(nic_dict, dict): msg = "Invalid nic value: expected dict, got '%s'" % nic_dict - raise errors.OpPrereqError(msg) + raise errors.OpPrereqError(msg, errors.ECODE_INVAL) # nic_dict should be a dict nic_ip = nic_dict.get('ip', None) @@ -7080,13 +7538,14 @@ class LUSetInstanceParams(LogicalUnit): nic_dict['ip'] = None else: if not utils.IsValidIP(nic_ip): - raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip) + raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip, + errors.ECODE_INVAL) nic_bridge = nic_dict.get('bridge', None) nic_link = nic_dict.get('link', None) if nic_bridge and nic_link: raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'" - " at the same time") + " at the same time", errors.ECODE_INVAL) elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE: nic_dict['bridge'] = None elif nic_link and nic_link.lower() == constants.VALUE_NONE: @@ -7100,15 +7559,16 @@ class LUSetInstanceParams(LogicalUnit): if 'mac' in nic_dict: nic_mac = nic_dict['mac'] if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE): - if not utils.IsValidMac(nic_mac): - raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac) + nic_mac = utils.NormalizeAndValidateMac(nic_mac) + if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO: raise errors.OpPrereqError("'auto' is not a valid MAC address when" - " modifying an existing nic") + " modifying an existing nic", + errors.ECODE_INVAL) if nic_addremove > 1: raise errors.OpPrereqError("Only one NIC add or remove operation" - " supported at a time") + " supported at a time", errors.ECODE_INVAL) def ExpandNames(self): self._ExpandAndLockInstance() @@ -7170,7 +7630,8 @@ class LUSetInstanceParams(LogicalUnit): nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) return env, nl, nl - def _GetUpdatedParams(self, old_params, update_dict, + @staticmethod + def _GetUpdatedParams(old_params, update_dict, default_values, parameter_types): """Return the new params dict for the given params. @@ -7281,7 +7742,8 @@ class LUSetInstanceParams(LogicalUnit): if miss_mem > 0: raise errors.OpPrereqError("This change will prevent the instance" " from starting, due to %d MB of memory" - " missing on its primary node" % miss_mem) + " missing on its primary node" % miss_mem, + errors.ECODE_NORES) if be_new[constants.BE_AUTO_BALANCE]: for node, nres in nodeinfo.items(): @@ -7304,14 +7766,20 @@ class LUSetInstanceParams(LogicalUnit): for nic_op, nic_dict in self.op.nics: if nic_op == constants.DDM_REMOVE: if not instance.nics: - raise errors.OpPrereqError("Instance has no NICs, cannot remove") + raise errors.OpPrereqError("Instance has no NICs, cannot remove", + errors.ECODE_INVAL) continue if nic_op != constants.DDM_ADD: # an existing nic + if not instance.nics: + raise errors.OpPrereqError("Invalid NIC index %s, instance has" + " no NICs" % nic_op, + errors.ECODE_INVAL) if nic_op < 0 or nic_op >= len(instance.nics): raise errors.OpPrereqError("Invalid NIC index %s, valid values" " are 0 to %d" % - (nic_op, len(instance.nics))) + (nic_op, len(instance.nics) - 1), + errors.ECODE_INVAL) old_nic_params = instance.nics[nic_op].nicparams old_nic_ip = instance.nics[nic_op].ip else: @@ -7342,7 +7810,7 @@ class LUSetInstanceParams(LogicalUnit): if self.force: self.warn.append(msg) else: - raise errors.OpPrereqError(msg) + raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON) if new_nic_mode == constants.NIC_MODE_ROUTED: if 'ip' in nic_dict: nic_ip = nic_dict['ip'] @@ -7350,49 +7818,57 @@ class LUSetInstanceParams(LogicalUnit): nic_ip = old_nic_ip if nic_ip is None: raise errors.OpPrereqError('Cannot set the nic ip to None' - ' on a routed nic') + ' on a routed nic', errors.ECODE_INVAL) if 'mac' in nic_dict: nic_mac = nic_dict['mac'] if nic_mac is None: - raise errors.OpPrereqError('Cannot set the nic mac to None') + raise errors.OpPrereqError('Cannot set the nic mac to None', + errors.ECODE_INVAL) elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE): # otherwise generate the mac - nic_dict['mac'] = self.cfg.GenerateMAC() + nic_dict['mac'] = self.cfg.GenerateMAC(self.proc.GetECId()) else: # or validate/reserve the current one - if self.cfg.IsMacInUse(nic_mac): + try: + self.cfg.ReserveMAC(nic_mac, self.proc.GetECId()) + except errors.ReservationError: raise errors.OpPrereqError("MAC address %s already in use" - " in cluster" % nic_mac) + " in cluster" % nic_mac, + errors.ECODE_NOTUNIQUE) # DISK processing if self.op.disks and instance.disk_template == constants.DT_DISKLESS: raise errors.OpPrereqError("Disk operations not supported for" - " diskless instances") - for disk_op, disk_dict in self.op.disks: + " diskless instances", + errors.ECODE_INVAL) + for disk_op, _ in self.op.disks: if disk_op == constants.DDM_REMOVE: if len(instance.disks) == 1: raise errors.OpPrereqError("Cannot remove the last disk of" - " an instance") + " an instance", + errors.ECODE_INVAL) ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor]) ins_l = ins_l[pnode] msg = ins_l.fail_msg if msg: raise errors.OpPrereqError("Can't contact node %s: %s" % - (pnode, msg)) + (pnode, msg), errors.ECODE_ENVIRON) if instance.name in ins_l.payload: raise errors.OpPrereqError("Instance is running, can't remove" - " disks.") + " disks.", errors.ECODE_STATE) if (disk_op == constants.DDM_ADD and len(instance.nics) >= constants.MAX_DISKS): raise errors.OpPrereqError("Instance has too many disks (%d), cannot" - " add more" % constants.MAX_DISKS) + " add more" % constants.MAX_DISKS, + errors.ECODE_STATE) if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE): # an existing disk if disk_op < 0 or disk_op >= len(instance.disks): raise errors.OpPrereqError("Invalid disk index %s, valid values" " are 0 to %d" % - (disk_op, len(instance.disks))) + (disk_op, len(instance.disks)), + errors.ECODE_INVAL) return @@ -7409,7 +7885,6 @@ class LUSetInstanceParams(LogicalUnit): result = [] instance = self.instance - cluster = self.cluster # disk changes for disk_op, disk_dict in self.op.disks: if disk_op == constants.DDM_REMOVE: @@ -7484,8 +7959,8 @@ class LUSetInstanceParams(LogicalUnit): for key in 'mac', 'ip': if key in nic_dict: setattr(instance.nics[nic_op], key, nic_dict[key]) - if nic_op in self.nic_pnew: - instance.nics[nic_op].nicparams = self.nic_pnew[nic_op] + if nic_op in self.nic_pinst: + instance.nics[nic_op].nicparams = self.nic_pinst[nic_op] for key, val in nic_dict.iteritems(): result.append(("nic.%s/%d" % (key, nic_op), val)) @@ -7501,7 +7976,7 @@ class LUSetInstanceParams(LogicalUnit): for key, val in self.op.beparams.iteritems(): result.append(("be/%s" % key, val)) - self.cfg.Update(instance) + self.cfg.Update(instance, feedback_fn) return result @@ -7557,6 +8032,13 @@ class LUExportInstance(LogicalUnit): _OP_REQP = ["instance_name", "target_node", "shutdown"] REQ_BGL = False + def CheckArguments(self): + """Check the arguments. + + """ + self.shutdown_timeout = getattr(self.op, "shutdown_timeout", + constants.DEFAULT_SHUTDOWN_TIMEOUT) + def ExpandNames(self): self._ExpandAndLockInstance() # FIXME: lock only instance primary and destination node @@ -7582,6 +8064,7 @@ class LUExportInstance(LogicalUnit): env = { "EXPORT_NODE": self.op.target_node, "EXPORT_DO_SHUTDOWN": self.op.shutdown, + "SHUTDOWN_TIMEOUT": self.shutdown_timeout, } env.update(_BuildInstanceHookEnvByObject(self, self.instance)) nl = [self.cfg.GetMasterNode(), self.instance.primary_node, @@ -7605,7 +8088,8 @@ class LUExportInstance(LogicalUnit): if self.dst_node is None: # This is wrong node name, not a non-locked node - raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node) + raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node, + errors.ECODE_NOENT) _CheckNodeOnline(self, self.dst_node.name) _CheckNodeNotDrained(self, self.dst_node.name) @@ -7613,7 +8097,7 @@ class LUExportInstance(LogicalUnit): for disk in self.instance.disks: if disk.dev_type == constants.LD_FILE: raise errors.OpPrereqError("Export not supported for instances with" - " file-based disks") + " file-based disks", errors.ECODE_INVAL) def Exec(self, feedback_fn): """Export an instance to an image in the cluster. @@ -7626,7 +8110,8 @@ class LUExportInstance(LogicalUnit): if self.op.shutdown: # shutdown the instance, but not the disks feedback_fn("Shutting down instance %s" % instance.name) - result = self.rpc.call_instance_shutdown(src_node, instance) + result = self.rpc.call_instance_shutdown(src_node, instance, + self.shutdown_timeout) result.Raise("Could not shutdown instance %s on" " node %s" % (instance.name, src_node)) @@ -7639,67 +8124,84 @@ class LUExportInstance(LogicalUnit): for disk in instance.disks: self.cfg.SetDiskID(disk, src_node) - # per-disk results - dresults = [] + activate_disks = (not instance.admin_up) + + if activate_disks: + # Activate the instance disks if we'exporting a stopped instance + feedback_fn("Activating disks for %s" % instance.name) + _StartInstanceDisks(self, instance, None) + try: - for idx, disk in enumerate(instance.disks): - feedback_fn("Creating a snapshot of disk/%s on node %s" % - (idx, src_node)) + # per-disk results + dresults = [] + try: + for idx, disk in enumerate(instance.disks): + feedback_fn("Creating a snapshot of disk/%s on node %s" % + (idx, src_node)) + + # result.payload will be a snapshot of an lvm leaf of the one we + # passed + result = self.rpc.call_blockdev_snapshot(src_node, disk) + msg = result.fail_msg + if msg: + self.LogWarning("Could not snapshot disk/%s on node %s: %s", + idx, src_node, msg) + snap_disks.append(False) + else: + disk_id = (vgname, result.payload) + new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size, + logical_id=disk_id, physical_id=disk_id, + iv_name=disk.iv_name) + snap_disks.append(new_dev) - # result.payload will be a snapshot of an lvm leaf of the one we passed - result = self.rpc.call_blockdev_snapshot(src_node, disk) - msg = result.fail_msg - if msg: - self.LogWarning("Could not snapshot disk/%s on node %s: %s", - idx, src_node, msg) - snap_disks.append(False) + finally: + if self.op.shutdown and instance.admin_up: + feedback_fn("Starting instance %s" % instance.name) + result = self.rpc.call_instance_start(src_node, instance, None, None) + msg = result.fail_msg + if msg: + _ShutdownInstanceDisks(self, instance) + raise errors.OpExecError("Could not start instance: %s" % msg) + + # TODO: check for size + + cluster_name = self.cfg.GetClusterName() + for idx, dev in enumerate(snap_disks): + feedback_fn("Exporting snapshot %s from %s to %s" % + (idx, src_node, dst_node.name)) + if dev: + # FIXME: pass debug from opcode to backend + result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name, + instance, cluster_name, + idx, 0) + msg = result.fail_msg + if msg: + self.LogWarning("Could not export disk/%s from node %s to" + " node %s: %s", idx, src_node, dst_node.name, msg) + dresults.append(False) + else: + dresults.append(True) + msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg + if msg: + self.LogWarning("Could not remove snapshot for disk/%d from node" + " %s: %s", idx, src_node, msg) else: - disk_id = (vgname, result.payload) - new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size, - logical_id=disk_id, physical_id=disk_id, - iv_name=disk.iv_name) - snap_disks.append(new_dev) - - finally: - if self.op.shutdown and instance.admin_up: - feedback_fn("Starting instance %s" % instance.name) - result = self.rpc.call_instance_start(src_node, instance, None, None) - msg = result.fail_msg - if msg: - _ShutdownInstanceDisks(self, instance) - raise errors.OpExecError("Could not start instance: %s" % msg) - - # TODO: check for size - - cluster_name = self.cfg.GetClusterName() - for idx, dev in enumerate(snap_disks): - feedback_fn("Exporting snapshot %s from %s to %s" % - (idx, src_node, dst_node.name)) - if dev: - result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name, - instance, cluster_name, idx) - msg = result.fail_msg - if msg: - self.LogWarning("Could not export disk/%s from node %s to" - " node %s: %s", idx, src_node, dst_node.name, msg) dresults.append(False) - else: - dresults.append(True) - msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg - if msg: - self.LogWarning("Could not remove snapshot for disk/%d from node" - " %s: %s", idx, src_node, msg) - else: - dresults.append(False) - feedback_fn("Finalizing export on %s" % dst_node.name) - result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks) - fin_resu = True - msg = result.fail_msg - if msg: - self.LogWarning("Could not finalize export for instance %s" - " on node %s: %s", instance.name, dst_node.name, msg) - fin_resu = False + feedback_fn("Finalizing export on %s" % dst_node.name) + result = self.rpc.call_finalize_export(dst_node.name, instance, + snap_disks) + fin_resu = True + msg = result.fail_msg + if msg: + self.LogWarning("Could not finalize export for instance %s" + " on node %s: %s", instance.name, dst_node.name, msg) + fin_resu = False + + finally: + if activate_disks: + feedback_fn("Deactivating disks for %s" % instance.name) + _ShutdownInstanceDisks(self, instance) nodelist = self.cfg.GetNodeList() nodelist.remove(dst_node.name) @@ -7775,7 +8277,7 @@ class LURemoveExport(NoHooksLU): " Domain Name.") -class TagsLU(NoHooksLU): +class TagsLU(NoHooksLU): # pylint: disable-msg=W0223 """Generic tags LU. This is an abstract class which is the parent of all the other tags LUs. @@ -7788,14 +8290,14 @@ class TagsLU(NoHooksLU): name = self.cfg.ExpandNodeName(self.op.name) if name is None: raise errors.OpPrereqError("Invalid node name (%s)" % - (self.op.name,)) + (self.op.name,), errors.ECODE_NOENT) self.op.name = name self.needed_locks[locking.LEVEL_NODE] = name elif self.op.kind == constants.TAG_INSTANCE: name = self.cfg.ExpandInstanceName(self.op.name) if name is None: raise errors.OpPrereqError("Invalid instance name (%s)" % - (self.op.name,)) + (self.op.name,), errors.ECODE_NOENT) self.op.name = name self.needed_locks[locking.LEVEL_INSTANCE] = name @@ -7811,7 +8313,7 @@ class TagsLU(NoHooksLU): self.target = self.cfg.GetInstanceInfo(self.op.name) else: raise errors.OpPrereqError("Wrong tag type requested (%s)" % - str(self.op.kind)) + str(self.op.kind), errors.ECODE_INVAL) class LUGetTags(TagsLU): @@ -7848,7 +8350,7 @@ class LUSearchTags(NoHooksLU): self.re = re.compile(self.op.pattern) except re.error, err: raise errors.OpPrereqError("Invalid search pattern '%s': %s" % - (self.op.pattern, err)) + (self.op.pattern, err), errors.ECODE_INVAL) def Exec(self, feedback_fn): """Returns the tag list. @@ -7894,12 +8396,7 @@ class LUAddTags(TagsLU): self.target.AddTag(tag) except errors.TagError, err: raise errors.OpExecError("Error while setting tag: %s" % str(err)) - try: - self.cfg.Update(self.target) - except errors.ConfigurationError: - raise errors.OpRetryError("There has been a modification to the" - " config file and the operation has been" - " aborted. Please retry.") + self.cfg.Update(self.target, feedback_fn) class LUDelTags(TagsLU): @@ -7925,7 +8422,7 @@ class LUDelTags(TagsLU): diff_names = ["'%s'" % tag for tag in diff_tags] diff_names.sort() raise errors.OpPrereqError("Tag(s) %s not found" % - (",".join(diff_names))) + (",".join(diff_names)), errors.ECODE_NOENT) def Exec(self, feedback_fn): """Remove the tag from the object. @@ -7933,12 +8430,7 @@ class LUDelTags(TagsLU): """ for tag in self.op.tags: self.target.RemoveTag(tag) - try: - self.cfg.Update(self.target) - except errors.ConfigurationError: - raise errors.OpRetryError("There has been a modification to the" - " config file and the operation has been" - " aborted. Please retry.") + self.cfg.Update(self.target, feedback_fn) class LUTestDelay(NoHooksLU): @@ -7996,6 +8488,8 @@ class IAllocator(object): easy usage """ + # pylint: disable-msg=R0902 + # lots of instance attributes _ALLO_KEYS = [ "mem_size", "disks", "disk_template", "os", "tags", "nics", "vcpus", "hypervisor", @@ -8214,10 +8708,12 @@ class IAllocator(object): " IAllocator" % self.name) if instance.disk_template not in constants.DTS_NET_MIRROR: - raise errors.OpPrereqError("Can't relocate non-mirrored instances") + raise errors.OpPrereqError("Can't relocate non-mirrored instances", + errors.ECODE_INVAL) if len(instance.secondary_nodes) != 1: - raise errors.OpPrereqError("Instance has not exactly one secondary node") + raise errors.OpPrereqError("Instance has not exactly one secondary node", + errors.ECODE_STATE) self.required_nodes = 1 disk_sizes = [{'size': disk.size} for disk in instance.disks] @@ -8305,51 +8801,55 @@ class LUTestAllocator(NoHooksLU): "os", "tags", "nics", "vcpus"]: if not hasattr(self.op, attr): raise errors.OpPrereqError("Missing attribute '%s' on opcode input" % - attr) + attr, errors.ECODE_INVAL) iname = self.cfg.ExpandInstanceName(self.op.name) if iname is not None: raise errors.OpPrereqError("Instance '%s' already in the cluster" % - iname) + iname, errors.ECODE_EXISTS) if not isinstance(self.op.nics, list): - raise errors.OpPrereqError("Invalid parameter 'nics'") + raise errors.OpPrereqError("Invalid parameter 'nics'", + errors.ECODE_INVAL) for row in self.op.nics: if (not isinstance(row, dict) or "mac" not in row or "ip" not in row or "bridge" not in row): - raise errors.OpPrereqError("Invalid contents of the" - " 'nics' parameter") + raise errors.OpPrereqError("Invalid contents of the 'nics'" + " parameter", errors.ECODE_INVAL) if not isinstance(self.op.disks, list): - raise errors.OpPrereqError("Invalid parameter 'disks'") + raise errors.OpPrereqError("Invalid parameter 'disks'", + errors.ECODE_INVAL) for row in self.op.disks: if (not isinstance(row, dict) or "size" not in row or not isinstance(row["size"], int) or "mode" not in row or row["mode"] not in ['r', 'w']): - raise errors.OpPrereqError("Invalid contents of the" - " 'disks' parameter") + raise errors.OpPrereqError("Invalid contents of the 'disks'" + " parameter", errors.ECODE_INVAL) if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None: self.op.hypervisor = self.cfg.GetHypervisorType() elif self.op.mode == constants.IALLOCATOR_MODE_RELOC: if not hasattr(self.op, "name"): - raise errors.OpPrereqError("Missing attribute 'name' on opcode input") + raise errors.OpPrereqError("Missing attribute 'name' on opcode input", + errors.ECODE_INVAL) fname = self.cfg.ExpandInstanceName(self.op.name) if fname is None: raise errors.OpPrereqError("Instance '%s' not found for relocation" % - self.op.name) + self.op.name, errors.ECODE_NOENT) self.op.name = fname self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes else: raise errors.OpPrereqError("Invalid test allocator mode '%s'" % - self.op.mode) + self.op.mode, errors.ECODE_INVAL) if self.op.direction == constants.IALLOCATOR_DIR_OUT: if not hasattr(self.op, "allocator") or self.op.allocator is None: - raise errors.OpPrereqError("Missing allocator name") + raise errors.OpPrereqError("Missing allocator name", + errors.ECODE_INVAL) elif self.op.direction != constants.IALLOCATOR_DIR_IN: raise errors.OpPrereqError("Wrong allocator test '%s'" % - self.op.direction) + self.op.direction, errors.ECODE_INVAL) def Exec(self, feedback_fn): """Run the allocator test.