X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/2d9005d812c577040f46b937e3e6421f6f724b71..b6c07b791f57777815c2540084c6cb2aedfb7560:/lib/cmdlib.py diff --git a/lib/cmdlib.py b/lib/cmdlib.py index ceb8871..768d67b 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -100,7 +100,7 @@ class LogicalUnit(object): attr_val = getattr(op, attr_name, None) if attr_val is None: raise errors.OpPrereqError("Required parameter '%s' missing" % - attr_name) + attr_name, errors.ECODE_INVAL) self.CheckArguments() @@ -297,7 +297,7 @@ class LogicalUnit(object): expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name) if expanded_name is None: raise errors.OpPrereqError("Instance '%s' not known" % - self.op.instance_name) + self.op.instance_name, errors.ECODE_NOENT) self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name self.op.instance_name = expanded_name @@ -417,7 +417,8 @@ def _GetWantedNodes(lu, nodes): """ if not isinstance(nodes, list): - raise errors.OpPrereqError("Invalid argument type 'nodes'") + raise errors.OpPrereqError("Invalid argument type 'nodes'", + errors.ECODE_INVAL) if not nodes: raise errors.ProgrammerError("_GetWantedNodes should only be called with a" @@ -427,7 +428,8 @@ def _GetWantedNodes(lu, nodes): for name in nodes: node = lu.cfg.ExpandNodeName(name) if node is None: - raise errors.OpPrereqError("No such node name '%s'" % name) + raise errors.OpPrereqError("No such node name '%s'" % name, + errors.ECODE_NOENT) wanted.append(node) return utils.NiceSort(wanted) @@ -447,7 +449,8 @@ def _GetWantedInstances(lu, instances): """ if not isinstance(instances, list): - raise errors.OpPrereqError("Invalid argument type 'instances'") + raise errors.OpPrereqError("Invalid argument type 'instances'", + errors.ECODE_INVAL) if instances: wanted = [] @@ -455,7 +458,8 @@ def _GetWantedInstances(lu, instances): for name in instances: instance = lu.cfg.ExpandInstanceName(name) if instance is None: - raise errors.OpPrereqError("No such instance name '%s'" % name) + raise errors.OpPrereqError("No such instance name '%s'" % name, + errors.ECODE_NOENT) wanted.append(instance) else: @@ -479,7 +483,7 @@ def _CheckOutputFields(static, dynamic, selected): delta = f.NonMatching(selected) if delta: raise errors.OpPrereqError("Unknown output fields selected: %s" - % ",".join(delta)) + % ",".join(delta), errors.ECODE_INVAL) def _CheckBooleanOpField(op, name): @@ -492,10 +496,25 @@ def _CheckBooleanOpField(op, name): val = getattr(op, name, None) if not (val is None or isinstance(val, bool)): raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" % - (name, str(val))) + (name, str(val)), errors.ECODE_INVAL) setattr(op, name, val) +def _CheckGlobalHvParams(params): + """Validates that given hypervisor params are not global ones. + + This will ensure that instances don't get customised versions of + global params. + + """ + used_globals = constants.HVC_GLOBALS.intersection(params) + if used_globals: + msg = ("The following hypervisor parameters are global and cannot" + " be customized at instance level, please modify them at" + " cluster level: %s" % ", ".join(used_globals)) + raise errors.OpPrereqError(msg, errors.ECODE_INVAL) + + def _CheckNodeOnline(lu, node): """Ensure that a given node is online. @@ -505,7 +524,8 @@ def _CheckNodeOnline(lu, node): """ if lu.cfg.GetNodeInfo(node).offline: - raise errors.OpPrereqError("Can't use offline node %s" % node) + raise errors.OpPrereqError("Can't use offline node %s" % node, + errors.ECODE_INVAL) def _CheckNodeNotDrained(lu, node): @@ -517,7 +537,8 @@ def _CheckNodeNotDrained(lu, node): """ if lu.cfg.GetNodeInfo(node).drained: - raise errors.OpPrereqError("Can't use drained node %s" % node) + raise errors.OpPrereqError("Can't use drained node %s" % node, + errors.ECODE_INVAL) def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, @@ -670,22 +691,33 @@ def _BuildInstanceHookEnvByObject(lu, instance, override=None): return _BuildInstanceHookEnv(**args) -def _AdjustCandidatePool(lu): +def _AdjustCandidatePool(lu, exceptions): """Adjust the candidate pool after node operations. """ - mod_list = lu.cfg.MaintainCandidatePool() + mod_list = lu.cfg.MaintainCandidatePool(exceptions) if mod_list: lu.LogInfo("Promoted nodes to master candidate role: %s", ", ".join(node.name for node in mod_list)) for name in mod_list: lu.context.ReaddNode(name) - mc_now, mc_max = lu.cfg.GetMasterCandidateStats() + mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions) if mc_now > mc_max: lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" % (mc_now, mc_max)) +def _DecideSelfPromotion(lu, exceptions=None): + """Decide whether I should promote myself as a master candidate. + + """ + cp_size = lu.cfg.GetClusterInfo().candidate_pool_size + mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions) + # the new node will increase mc_max with one, so: + mc_should = min(mc_should + 1, cp_size) + return mc_now < mc_should + + def _CheckNicsBridgesExist(lu, target_nics, target_node, profile=constants.PP_DEFAULT): """Check that the brigdes needed by a list of nics exist. @@ -699,7 +731,7 @@ def _CheckNicsBridgesExist(lu, target_nics, target_node, if brlist: result = lu.rpc.call_bridges_exist(target_node, brlist) result.Raise("Error checking bridges on destination node '%s'" % - target_node, prereq=True) + target_node, prereq=True, ecode=errors.ECODE_ENVIRON) def _CheckInstanceBridgesExist(lu, instance, node=None): @@ -711,30 +743,53 @@ def _CheckInstanceBridgesExist(lu, instance, node=None): _CheckNicsBridgesExist(lu, instance.nics, node) -def _GetNodePrimaryInstances(cfg, node_name): - """Returns primary instances on a node. +def _CheckOSVariant(os_obj, name): + """Check whether an OS name conforms to the os variants specification. + + @type os_obj: L{objects.OS} + @param os_obj: OS object to check + @type name: string + @param name: OS name passed by the user, to check for validity """ - instances = [] + if not os_obj.supported_variants: + return + try: + variant = name.split("+", 1)[1] + except IndexError: + raise errors.OpPrereqError("OS name must include a variant", + errors.ECODE_INVAL) - for (_, inst) in cfg.GetAllInstancesInfo().iteritems(): - if node_name == inst.primary_node: - instances.append(inst) + if variant not in os_obj.supported_variants: + raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL) - return instances +def _GetNodeInstancesInner(cfg, fn): + return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)] -def _GetNodeSecondaryInstances(cfg, node_name): - """Returns secondary instances on a node. + +def _GetNodeInstances(cfg, node_name): + """Returns a list of all primary and secondary instances on a node. """ - instances = [] - for (_, inst) in cfg.GetAllInstancesInfo().iteritems(): - if node_name in inst.secondary_nodes: - instances.append(inst) + return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes) - return instances + +def _GetNodePrimaryInstances(cfg, node_name): + """Returns primary instances on a node. + + """ + return _GetNodeInstancesInner(cfg, + lambda inst: node_name == inst.primary_node) + + +def _GetNodeSecondaryInstances(cfg, node_name): + """Returns secondary instances on a node. + + """ + return _GetNodeInstancesInner(cfg, + lambda inst: node_name in inst.secondary_nodes) def _GetStorageTypeArgs(cfg, storage_type): @@ -757,7 +812,7 @@ def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq): result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks) result.Raise("Failed to get disk status from node %s" % node_name, - prereq=prereq) + prereq=prereq, ecode=errors.ECODE_ENVIRON) for idx, bdev_status in enumerate(result.payload): if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY: @@ -795,12 +850,21 @@ class LUPostInitCluster(LogicalUnit): return True -class LUDestroyCluster(NoHooksLU): +class LUDestroyCluster(LogicalUnit): """Logical unit for destroying the cluster. """ + HPATH = "cluster-destroy" + HTYPE = constants.HTYPE_CLUSTER _OP_REQP = [] + def BuildHooksEnv(self): + """Build hooks env. + + """ + env = {"OP_TARGET": self.cfg.GetClusterName()} + return env, [], [] + def CheckPrereq(self): """Check prerequisites. @@ -814,22 +878,36 @@ class LUDestroyCluster(NoHooksLU): nodelist = self.cfg.GetNodeList() if len(nodelist) != 1 or nodelist[0] != master: raise errors.OpPrereqError("There are still %d node(s) in" - " this cluster." % (len(nodelist) - 1)) + " this cluster." % (len(nodelist) - 1), + errors.ECODE_INVAL) instancelist = self.cfg.GetInstanceList() if instancelist: raise errors.OpPrereqError("There are still %d instance(s) in" - " this cluster." % len(instancelist)) + " this cluster." % len(instancelist), + errors.ECODE_INVAL) def Exec(self, feedback_fn): """Destroys the cluster. """ master = self.cfg.GetMasterNode() + modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup + + # Run post hooks on master node before it's removed + hm = self.proc.hmclass(self.rpc.call_hooks_runner, self) + try: + hm.RunPhase(constants.HOOKS_PHASE_POST, [master]) + except: + self.LogWarning("Errors occurred running hooks on %s" % master) + result = self.rpc.call_node_stop_master(master, False) result.Raise("Could not disable the master role") - priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS) - utils.CreateBackup(priv_key) - utils.CreateBackup(pub_key) + + if modify_ssh_setup: + priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS) + utils.CreateBackup(priv_key) + utils.CreateBackup(pub_key) + return master @@ -839,9 +917,38 @@ class LUVerifyCluster(LogicalUnit): """ HPATH = "cluster-verify" HTYPE = constants.HTYPE_CLUSTER - _OP_REQP = ["skip_checks"] + _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"] REQ_BGL = False + TCLUSTER = "cluster" + TNODE = "node" + TINSTANCE = "instance" + + ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG") + EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE") + EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN") + EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT") + EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK") + EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK") + EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE") + ENODEDRBD = (TNODE, "ENODEDRBD") + ENODEFILECHECK = (TNODE, "ENODEFILECHECK") + ENODEHOOKS = (TNODE, "ENODEHOOKS") + ENODEHV = (TNODE, "ENODEHV") + ENODELVM = (TNODE, "ENODELVM") + ENODEN1 = (TNODE, "ENODEN1") + ENODENET = (TNODE, "ENODENET") + ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE") + ENODEORPHANLV = (TNODE, "ENODEORPHANLV") + ENODERPC = (TNODE, "ENODERPC") + ENODESSH = (TNODE, "ENODESSH") + ENODEVERSION = (TNODE, "ENODEVERSION") + ENODESETUP = (TNODE, "ENODESETUP") + + ETYPE_FIELD = "code" + ETYPE_ERROR = "ERROR" + ETYPE_WARNING = "WARNING" + def ExpandNames(self): self.needed_locks = { locking.LEVEL_NODE: locking.ALL_SET, @@ -849,9 +956,45 @@ class LUVerifyCluster(LogicalUnit): } self.share_locks = dict.fromkeys(locking.LEVELS, 1) + def _Error(self, ecode, item, msg, *args, **kwargs): + """Format an error message. + + Based on the opcode's error_codes parameter, either format a + parseable error code, or a simpler error string. + + This must be called only from Exec and functions called from Exec. + + """ + ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) + itype, etxt = ecode + # first complete the msg + if args: + msg = msg % args + # then format the whole message + if self.op.error_codes: + msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg) + else: + if item: + item = " " + item + else: + item = "" + msg = "%s: %s%s: %s" % (ltype, itype, item, msg) + # and finally report it via the feedback_fn + self._feedback_fn(" - %s" % msg) + + def _ErrorIf(self, cond, *args, **kwargs): + """Log an error message if the passed condition is True. + + """ + cond = bool(cond) or self.op.debug_simulate_errors + if cond: + self._Error(*args, **kwargs) + # do not mark the operation as failed for WARN cases only + if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR: + self.bad = self.bad or cond + def _VerifyNode(self, nodeinfo, file_list, local_cksum, - node_result, feedback_fn, master_files, - drbd_map, vg_name): + node_result, master_files, drbd_map, vg_name): """Run multiple tests against a node. Test list: @@ -866,7 +1009,6 @@ class LUVerifyCluster(LogicalUnit): @param file_list: required list of files @param local_cksum: dictionary of local files and their checksums @param node_result: the results from the node - @param feedback_fn: function used to accumulate results @param master_files: list of files that only masters should have @param drbd_map: the useddrbd minors for this node, in form of minor: (instance, must_exist) which correspond to instances @@ -875,138 +1017,154 @@ class LUVerifyCluster(LogicalUnit): """ node = nodeinfo.name + _ErrorIf = self._ErrorIf # main result, node_result should be a non-empty dict - if not node_result or not isinstance(node_result, dict): - feedback_fn(" - ERROR: unable to verify node %s." % (node,)) - return True + test = not node_result or not isinstance(node_result, dict) + _ErrorIf(test, self.ENODERPC, node, + "unable to verify node: no data returned") + if test: + return # compares ganeti version local_version = constants.PROTOCOL_VERSION remote_version = node_result.get('version', None) - if not (remote_version and isinstance(remote_version, (list, tuple)) and - len(remote_version) == 2): - feedback_fn(" - ERROR: connection to %s failed" % (node)) - return True + test = not (remote_version and + isinstance(remote_version, (list, tuple)) and + len(remote_version) == 2) + _ErrorIf(test, self.ENODERPC, node, + "connection to node returned invalid data") + if test: + return - if local_version != remote_version[0]: - feedback_fn(" - ERROR: incompatible protocol versions: master %s," - " node %s %s" % (local_version, node, remote_version[0])) - return True + test = local_version != remote_version[0] + _ErrorIf(test, self.ENODEVERSION, node, + "incompatible protocol versions: master %s," + " node %s", local_version, remote_version[0]) + if test: + return # node seems compatible, we can actually try to look into its results - bad = False - # full package version - if constants.RELEASE_VERSION != remote_version[1]: - feedback_fn(" - WARNING: software version mismatch: master %s," - " node %s %s" % - (constants.RELEASE_VERSION, node, remote_version[1])) + self._ErrorIf(constants.RELEASE_VERSION != remote_version[1], + self.ENODEVERSION, node, + "software version mismatch: master %s, node %s", + constants.RELEASE_VERSION, remote_version[1], + code=self.ETYPE_WARNING) # checks vg existence and size > 20G if vg_name is not None: vglist = node_result.get(constants.NV_VGLIST, None) - if not vglist: - feedback_fn(" - ERROR: unable to check volume groups on node %s." % - (node,)) - bad = True - else: + test = not vglist + _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups") + if not test: vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name, constants.MIN_VG_SIZE) - if vgstatus: - feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node)) - bad = True + _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus) # checks config file checksum remote_cksum = node_result.get(constants.NV_FILELIST, None) - if not isinstance(remote_cksum, dict): - bad = True - feedback_fn(" - ERROR: node hasn't returned file checksum data") - else: + test = not isinstance(remote_cksum, dict) + _ErrorIf(test, self.ENODEFILECHECK, node, + "node hasn't returned file checksum data") + if not test: for file_name in file_list: node_is_mc = nodeinfo.master_candidate - must_have_file = file_name not in master_files - if file_name not in remote_cksum: - if node_is_mc or must_have_file: - bad = True - feedback_fn(" - ERROR: file '%s' missing" % file_name) - elif remote_cksum[file_name] != local_cksum[file_name]: - if node_is_mc or must_have_file: - bad = True - feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name) - else: - # not candidate and this is not a must-have file - bad = True - feedback_fn(" - ERROR: file '%s' should not exist on non master" - " candidates (and the file is outdated)" % file_name) - else: - # all good, except non-master/non-must have combination - if not node_is_mc and not must_have_file: - feedback_fn(" - ERROR: file '%s' should not exist on non master" - " candidates" % file_name) + must_have = (file_name not in master_files) or node_is_mc + # missing + test1 = file_name not in remote_cksum + # invalid checksum + test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name] + # existing and good + test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name] + _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node, + "file '%s' missing", file_name) + _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node, + "file '%s' has wrong checksum", file_name) + # not candidate and this is not a must-have file + _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node, + "file '%s' should not exist on non master" + " candidates (and the file is outdated)", file_name) + # all good, except non-master/non-must have combination + _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node, + "file '%s' should not exist" + " on non master candidates", file_name) # checks ssh to any - if constants.NV_NODELIST not in node_result: - bad = True - feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data") - else: + test = constants.NV_NODELIST not in node_result + _ErrorIf(test, self.ENODESSH, node, + "node hasn't returned node ssh connectivity data") + if not test: if node_result[constants.NV_NODELIST]: - bad = True - for node in node_result[constants.NV_NODELIST]: - feedback_fn(" - ERROR: ssh communication with node '%s': %s" % - (node, node_result[constants.NV_NODELIST][node])) - - if constants.NV_NODENETTEST not in node_result: - bad = True - feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data") - else: + for a_node, a_msg in node_result[constants.NV_NODELIST].items(): + _ErrorIf(True, self.ENODESSH, node, + "ssh communication with node '%s': %s", a_node, a_msg) + + test = constants.NV_NODENETTEST not in node_result + _ErrorIf(test, self.ENODENET, node, + "node hasn't returned node tcp connectivity data") + if not test: if node_result[constants.NV_NODENETTEST]: - bad = True nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys()) - for node in nlist: - feedback_fn(" - ERROR: tcp communication with node '%s': %s" % - (node, node_result[constants.NV_NODENETTEST][node])) + for anode in nlist: + _ErrorIf(True, self.ENODENET, node, + "tcp communication with node '%s': %s", + anode, node_result[constants.NV_NODENETTEST][anode]) hyp_result = node_result.get(constants.NV_HYPERVISOR, None) if isinstance(hyp_result, dict): for hv_name, hv_result in hyp_result.iteritems(): - if hv_result is not None: - feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" % - (hv_name, hv_result)) + test = hv_result is not None + _ErrorIf(test, self.ENODEHV, node, + "hypervisor %s verify failure: '%s'", hv_name, hv_result) # check used drbd list if vg_name is not None: used_minors = node_result.get(constants.NV_DRBDLIST, []) - if not isinstance(used_minors, (tuple, list)): - feedback_fn(" - ERROR: cannot parse drbd status file: %s" % - str(used_minors)) - else: + test = not isinstance(used_minors, (tuple, list)) + _ErrorIf(test, self.ENODEDRBD, node, + "cannot parse drbd status file: %s", str(used_minors)) + if not test: for minor, (iname, must_exist) in drbd_map.items(): - if minor not in used_minors and must_exist: - feedback_fn(" - ERROR: drbd minor %d of instance %s is" - " not active" % (minor, iname)) - bad = True + test = minor not in used_minors and must_exist + _ErrorIf(test, self.ENODEDRBD, node, + "drbd minor %d of instance %s is not active", + minor, iname) for minor in used_minors: - if minor not in drbd_map: - feedback_fn(" - ERROR: unallocated drbd minor %d is in use" % - minor) - bad = True - - return bad + test = minor not in drbd_map + _ErrorIf(test, self.ENODEDRBD, node, + "unallocated drbd minor %d is in use", minor) + test = node_result.get(constants.NV_NODESETUP, + ["Missing NODESETUP results"]) + _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s", + "; ".join(test)) + + # check pv names + if vg_name is not None: + pvlist = node_result.get(constants.NV_PVLIST, None) + test = pvlist is None + _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node") + if not test: + # check that ':' is not present in PV names, since it's a + # special character for lvcreate (denotes the range of PEs to + # use on the PV) + for size, pvname, owner_vg in pvlist: + test = ":" in pvname + _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV" + " '%s' of VG '%s'", pvname, owner_vg) def _VerifyInstance(self, instance, instanceconfig, node_vol_is, - node_instance, feedback_fn, n_offline): + node_instance, n_offline): """Verify an instance. This function checks to see if the required block devices are available on the instance's node. """ - bad = False - + _ErrorIf = self._ErrorIf node_current = instanceconfig.primary_node node_vol_should = {} @@ -1017,69 +1175,57 @@ class LUVerifyCluster(LogicalUnit): # ignore missing volumes on offline nodes continue for volume in node_vol_should[node]: - if node not in node_vol_is or volume not in node_vol_is[node]: - feedback_fn(" - ERROR: volume %s missing on node %s" % - (volume, node)) - bad = True + test = node not in node_vol_is or volume not in node_vol_is[node] + _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance, + "volume %s missing on node %s", volume, node) if instanceconfig.admin_up: - if ((node_current not in node_instance or - not instance in node_instance[node_current]) and - node_current not in n_offline): - feedback_fn(" - ERROR: instance %s not running on node %s" % - (instance, node_current)) - bad = True + test = ((node_current not in node_instance or + not instance in node_instance[node_current]) and + node_current not in n_offline) + _ErrorIf(test, self.EINSTANCEDOWN, instance, + "instance not running on its primary node %s", + node_current) for node in node_instance: if (not node == node_current): - if instance in node_instance[node]: - feedback_fn(" - ERROR: instance %s should not run on node %s" % - (instance, node)) - bad = True + test = instance in node_instance[node] + _ErrorIf(test, self.EINSTANCEWRONGNODE, instance, + "instance should not run on node %s", node) - return bad - - def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn): + def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is): """Verify if there are any unknown volumes in the cluster. The .os, .swap and backup volumes are ignored. All other volumes are reported as unknown. """ - bad = False - for node in node_vol_is: for volume in node_vol_is[node]: - if node not in node_vol_should or volume not in node_vol_should[node]: - feedback_fn(" - ERROR: volume %s on node %s should not exist" % - (volume, node)) - bad = True - return bad + test = (node not in node_vol_should or + volume not in node_vol_should[node]) + self._ErrorIf(test, self.ENODEORPHANLV, node, + "volume %s is unknown", volume) - def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn): + def _VerifyOrphanInstances(self, instancelist, node_instance): """Verify the list of running instances. This checks what instances are running but unknown to the cluster. """ - bad = False for node in node_instance: - for runninginstance in node_instance[node]: - if runninginstance not in instancelist: - feedback_fn(" - ERROR: instance %s on node %s should not exist" % - (runninginstance, node)) - bad = True - return bad - - def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn): + for o_inst in node_instance[node]: + test = o_inst not in instancelist + self._ErrorIf(test, self.ENODEORPHANINSTANCE, node, + "instance %s on node %s should not exist", o_inst, node) + + def _VerifyNPlusOneMemory(self, node_info, instance_cfg): """Verify N+1 Memory Resilience. Check that if one single node dies we can still start all the instances it was primary for. """ - bad = False - for node, nodeinfo in node_info.iteritems(): # This code checks that every node which is now listed as secondary has # enough memory to host all instances it is supposed to should a single @@ -1095,11 +1241,10 @@ class LUVerifyCluster(LogicalUnit): bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance]) if bep[constants.BE_AUTO_BALANCE]: needed_mem += bep[constants.BE_MEMORY] - if nodeinfo['mfree'] < needed_mem: - feedback_fn(" - ERROR: not enough memory on node %s to accommodate" - " failovers should node %s fail" % (node, prinode)) - bad = True - return bad + test = nodeinfo['mfree'] < needed_mem + self._ErrorIf(test, self.ENODEN1, node, + "not enough memory on to accommodate" + " failovers should peer node %s fail", prinode) def CheckPrereq(self): """Check prerequisites. @@ -1110,7 +1255,8 @@ class LUVerifyCluster(LogicalUnit): """ self.skip_set = frozenset(self.op.skip_checks) if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set): - raise errors.OpPrereqError("Invalid checks to be skipped specified") + raise errors.OpPrereqError("Invalid checks to be skipped specified", + errors.ECODE_INVAL) def BuildHooksEnv(self): """Build hooks env. @@ -1132,10 +1278,13 @@ class LUVerifyCluster(LogicalUnit): """Verify integrity of cluster, performing various test on nodes. """ - bad = False + self.bad = False + _ErrorIf = self._ErrorIf + verbose = self.op.verbose + self._feedback_fn = feedback_fn feedback_fn("* Verifying global settings") for msg in self.cfg.VerifyConfig(): - feedback_fn(" - ERROR: %s" % msg) + _ErrorIf(True, self.ECLUSTERCFG, None, msg) vg_name = self.cfg.GetVGName() hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors @@ -1176,10 +1325,12 @@ class LUVerifyCluster(LogicalUnit): constants.NV_INSTANCELIST: hypervisors, constants.NV_VERSION: None, constants.NV_HVINFO: self.cfg.GetHypervisorType(), + constants.NV_NODESETUP: None, } if vg_name is not None: node_verify_param[constants.NV_VGLIST] = None node_verify_param[constants.NV_LVLIST] = vg_name + node_verify_param[constants.NV_PVLIST] = [vg_name] node_verify_param[constants.NV_DRBDLIST] = None all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param, self.cfg.GetClusterName()) @@ -1188,11 +1339,13 @@ class LUVerifyCluster(LogicalUnit): master_node = self.cfg.GetMasterNode() all_drbd_map = self.cfg.ComputeDRBDMap() + feedback_fn("* Verifying node status") for node_i in nodeinfo: node = node_i.name if node_i.offline: - feedback_fn("* Skipping offline node %s" % (node,)) + if verbose: + feedback_fn("* Skipping offline node %s" % (node,)) n_offline.append(node) continue @@ -1205,62 +1358,59 @@ class LUVerifyCluster(LogicalUnit): n_drained.append(node) else: ntype = "regular" - feedback_fn("* Verifying node %s (%s)" % (node, ntype)) + if verbose: + feedback_fn("* Verifying node %s (%s)" % (node, ntype)) msg = all_nvinfo[node].fail_msg + _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg) if msg: - feedback_fn(" - ERROR: while contacting node %s: %s" % (node, msg)) - bad = True continue nresult = all_nvinfo[node].payload node_drbd = {} for minor, instance in all_drbd_map[node].items(): - if instance not in instanceinfo: - feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" % - instance) + test = instance not in instanceinfo + _ErrorIf(test, self.ECLUSTERCFG, None, + "ghost instance '%s' in temporary DRBD map", instance) # ghost instance should not be running, but otherwise we # don't give double warnings (both ghost instance and # unallocated minor in use) + if test: node_drbd[minor] = (instance, False) else: instance = instanceinfo[instance] node_drbd[minor] = (instance.name, instance.admin_up) - result = self._VerifyNode(node_i, file_names, local_checksums, - nresult, feedback_fn, master_files, - node_drbd, vg_name) - bad = bad or result + self._VerifyNode(node_i, file_names, local_checksums, + nresult, master_files, node_drbd, vg_name) lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data") if vg_name is None: node_volume[node] = {} elif isinstance(lvdata, basestring): - feedback_fn(" - ERROR: LVM problem on node %s: %s" % - (node, utils.SafeEncode(lvdata))) - bad = True + _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s", + utils.SafeEncode(lvdata)) node_volume[node] = {} elif not isinstance(lvdata, dict): - feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,)) - bad = True + _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)") continue else: node_volume[node] = lvdata # node_instance idata = nresult.get(constants.NV_INSTANCELIST, None) - if not isinstance(idata, list): - feedback_fn(" - ERROR: connection to %s failed (instancelist)" % - (node,)) - bad = True + test = not isinstance(idata, list) + _ErrorIf(test, self.ENODEHV, node, + "rpc call to node failed (instancelist)") + if test: continue node_instance[node] = idata # node_info nodeinfo = nresult.get(constants.NV_HVINFO, None) - if not isinstance(nodeinfo, dict): - feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,)) - bad = True + test = not isinstance(nodeinfo, dict) + _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)") + if test: continue try: @@ -1278,28 +1428,28 @@ class LUVerifyCluster(LogicalUnit): } # FIXME: devise a free space model for file based instances as well if vg_name is not None: - if (constants.NV_VGLIST not in nresult or - vg_name not in nresult[constants.NV_VGLIST]): - feedback_fn(" - ERROR: node %s didn't return data for the" - " volume group '%s' - it is either missing or broken" % - (node, vg_name)) - bad = True + test = (constants.NV_VGLIST not in nresult or + vg_name not in nresult[constants.NV_VGLIST]) + _ErrorIf(test, self.ENODELVM, node, + "node didn't return data for the volume group '%s'" + " - it is either missing or broken", vg_name) + if test: continue node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name]) except (ValueError, KeyError): - feedback_fn(" - ERROR: invalid nodeinfo value returned" - " from node %s" % (node,)) - bad = True + _ErrorIf(True, self.ENODERPC, node, + "node returned invalid nodeinfo, check lvm/hypervisor") continue node_vol_should = {} + feedback_fn("* Verifying instance status") for instance in instancelist: - feedback_fn("* Verifying instance %s" % instance) + if verbose: + feedback_fn("* Verifying instance %s" % instance) inst_config = instanceinfo[instance] - result = self._VerifyInstance(instance, inst_config, node_volume, - node_instance, feedback_fn, n_offline) - bad = bad or result + self._VerifyInstance(instance, inst_config, node_volume, + node_instance, n_offline) inst_nodes_offline = [] inst_config.MapLVsByNode(node_vol_should) @@ -1307,12 +1457,11 @@ class LUVerifyCluster(LogicalUnit): instance_cfg[instance] = inst_config pnode = inst_config.primary_node + _ErrorIf(pnode not in node_info and pnode not in n_offline, + self.ENODERPC, pnode, "instance %s, connection to" + " primary node failed", instance) if pnode in node_info: node_info[pnode]['pinst'].append(instance) - elif pnode not in n_offline: - feedback_fn(" - ERROR: instance %s, connection to primary node" - " %s failed" % (instance, pnode)) - bad = True if pnode in n_offline: inst_nodes_offline.append(pnode) @@ -1324,46 +1473,42 @@ class LUVerifyCluster(LogicalUnit): # FIXME: does not support file-backed instances if len(inst_config.secondary_nodes) == 0: i_non_redundant.append(instance) - elif len(inst_config.secondary_nodes) > 1: - feedback_fn(" - WARNING: multiple secondaries for instance %s" - % instance) + _ErrorIf(len(inst_config.secondary_nodes) > 1, + self.EINSTANCELAYOUT, instance, + "instance has multiple secondary nodes", code="WARNING") if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]: i_non_a_balanced.append(instance) for snode in inst_config.secondary_nodes: + _ErrorIf(snode not in node_info and snode not in n_offline, + self.ENODERPC, snode, + "instance %s, connection to secondary node" + "failed", instance) + if snode in node_info: node_info[snode]['sinst'].append(instance) if pnode not in node_info[snode]['sinst-by-pnode']: node_info[snode]['sinst-by-pnode'][pnode] = [] node_info[snode]['sinst-by-pnode'][pnode].append(instance) - elif snode not in n_offline: - feedback_fn(" - ERROR: instance %s, connection to secondary node" - " %s failed" % (instance, snode)) - bad = True + if snode in n_offline: inst_nodes_offline.append(snode) - if inst_nodes_offline: - # warn that the instance lives on offline nodes, and set bad=True - feedback_fn(" - ERROR: instance lives on offline node(s) %s" % - ", ".join(inst_nodes_offline)) - bad = True + # warn that the instance lives on offline nodes + _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance, + "instance lives on offline node(s) %s", + ", ".join(inst_nodes_offline)) feedback_fn("* Verifying orphan volumes") - result = self._VerifyOrphanVolumes(node_vol_should, node_volume, - feedback_fn) - bad = bad or result + self._VerifyOrphanVolumes(node_vol_should, node_volume) feedback_fn("* Verifying remaining instances") - result = self._VerifyOrphanInstances(instancelist, node_instance, - feedback_fn) - bad = bad or result + self._VerifyOrphanInstances(instancelist, node_instance) if constants.VERIFY_NPLUSONE_MEM not in self.skip_set: feedback_fn("* Verifying N+1 Memory redundancy") - result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn) - bad = bad or result + self._VerifyNPlusOneMemory(node_info, instance_cfg) feedback_fn("* Other Notes") if i_non_redundant: @@ -1380,7 +1525,7 @@ class LUVerifyCluster(LogicalUnit): if n_drained: feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained)) - return not bad + return not self.bad def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result): """Analyze the post-hooks' result @@ -1403,33 +1548,28 @@ class LUVerifyCluster(LogicalUnit): # Used to change hooks' output to proper indentation indent_re = re.compile('^', re.M) feedback_fn("* Hooks Results") - if not hooks_results: - feedback_fn(" - ERROR: general communication failure") - lu_result = 1 - else: - for node_name in hooks_results: - show_node_header = True - res = hooks_results[node_name] - msg = res.fail_msg - if msg: - if res.offline: - # no need to warn or set fail return value - continue - feedback_fn(" Communication failure in hooks execution: %s" % - msg) + assert hooks_results, "invalid result from hooks" + + for node_name in hooks_results: + show_node_header = True + res = hooks_results[node_name] + msg = res.fail_msg + test = msg and not res.offline + self._ErrorIf(test, self.ENODEHOOKS, node_name, + "Communication failure in hooks execution: %s", msg) + if test: + # override manually lu_result here as _ErrorIf only + # overrides self.bad + lu_result = 1 + continue + for script, hkr, output in res.payload: + test = hkr == constants.HKR_FAIL + self._ErrorIf(test, self.ENODEHOOKS, node_name, + "Script %s failed, output:", script) + if test: + output = indent_re.sub(' ', output) + feedback_fn("%s" % output) lu_result = 1 - continue - for script, hkr, output in res.payload: - if hkr == constants.HKR_FAIL: - # The node header is only shown once, if there are - # failing hooks on that node - if show_node_header: - feedback_fn(" Node %s:" % node_name) - show_node_header = False - feedback_fn(" ERROR: Script %s failed, output:" % script) - output = indent_re.sub(' ', output) - feedback_fn("%s" % output) - lu_result = 1 return lu_result @@ -1525,18 +1665,18 @@ class LURepairDiskSizes(NoHooksLU): REQ_BGL = False def ExpandNames(self): - if not isinstance(self.op.instances, list): - raise errors.OpPrereqError("Invalid argument type 'instances'") + raise errors.OpPrereqError("Invalid argument type 'instances'", + errors.ECODE_INVAL) if self.op.instances: self.wanted_names = [] for name in self.op.instances: full_name = self.cfg.ExpandInstanceName(name) if full_name is None: - raise errors.OpPrereqError("Instance '%s' not known" % name) + raise errors.OpPrereqError("Instance '%s' not known" % name, + errors.ECODE_NOENT) self.wanted_names.append(full_name) - self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names self.needed_locks = { locking.LEVEL_NODE: [], locking.LEVEL_INSTANCE: self.wanted_names, @@ -1566,6 +1706,29 @@ class LURepairDiskSizes(NoHooksLU): self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name in self.wanted_names] + def _EnsureChildSizes(self, disk): + """Ensure children of the disk have the needed disk size. + + This is valid mainly for DRBD8 and fixes an issue where the + children have smaller disk size. + + @param disk: an L{ganeti.objects.Disk} object + + """ + if disk.dev_type == constants.LD_DRBD8: + assert disk.children, "Empty children for DRBD8?" + fchild = disk.children[0] + mismatch = fchild.size < disk.size + if mismatch: + self.LogInfo("Child disk has size %d, parent %d, fixing", + fchild.size, disk.size) + fchild.size = disk.size + + # and we recurse on this child only, not on the metadev + return self._EnsureChildSizes(fchild) or mismatch + else: + return False + def Exec(self, feedback_fn): """Verify the size of cluster disks. @@ -1582,8 +1745,11 @@ class LURepairDiskSizes(NoHooksLU): changed = [] for node, dskl in per_node_disks.items(): - result = self.rpc.call_blockdev_getsizes(node, [v[2] for v in dskl]) - if result.failed: + newl = [v[2].Copy() for v in dskl] + for dsk in newl: + self.cfg.SetDiskID(dsk, node) + result = self.rpc.call_blockdev_getsizes(node, newl) + if result.fail_msg: self.LogWarning("Failure in blockdev_getsizes call to node" " %s, ignoring", node) continue @@ -1606,8 +1772,11 @@ class LURepairDiskSizes(NoHooksLU): " correcting: recorded %d, actual %d", idx, instance.name, disk.size, size) disk.size = size - self.cfg.Update(instance) + self.cfg.Update(instance, feedback_fn) changed.append((instance.name, idx, size)) + if self._EnsureChildSizes(disk): + self.cfg.Update(instance, feedback_fn) + changed.append((instance.name, idx, disk.size)) return changed @@ -1634,7 +1803,7 @@ class LURenameCluster(LogicalUnit): """Verify that the passed name is a valid one. """ - hostname = utils.HostInfo(self.op.name) + hostname = utils.GetHostInfo(self.op.name) new_name = hostname.name self.ip = new_ip = hostname.ip @@ -1642,12 +1811,13 @@ class LURenameCluster(LogicalUnit): old_ip = self.cfg.GetMasterIP() if new_name == old_name and new_ip == old_ip: raise errors.OpPrereqError("Neither the name nor the IP address of the" - " cluster has changed") + " cluster has changed", + errors.ECODE_INVAL) if new_ip != old_ip: if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT): raise errors.OpPrereqError("The given cluster IP address (%s) is" " reachable on the network. Aborting." % - new_ip) + new_ip, errors.ECODE_NOTUNIQUE) self.op.name = new_name @@ -1667,7 +1837,7 @@ class LURenameCluster(LogicalUnit): cluster = self.cfg.GetClusterInfo() cluster.cluster_name = clustername cluster.master_ip = ip - self.cfg.Update(cluster) + self.cfg.Update(cluster, feedback_fn) # update the known hosts file ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE) @@ -1729,9 +1899,10 @@ class LUSetClusterParams(LogicalUnit): self.op.candidate_pool_size = int(self.op.candidate_pool_size) except (ValueError, TypeError), err: raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" % - str(err)) + str(err), errors.ECODE_INVAL) if self.op.candidate_pool_size < 1: - raise errors.OpPrereqError("At least one master candidate needed") + raise errors.OpPrereqError("At least one master candidate needed", + errors.ECODE_INVAL) def ExpandNames(self): # FIXME: in the future maybe other cluster params won't require checking on @@ -1765,7 +1936,8 @@ class LUSetClusterParams(LogicalUnit): for disk in inst.disks: if _RecursiveCheckIfLVMBased(disk): raise errors.OpPrereqError("Cannot disable lvm storage while" - " lvm-based instances exist") + " lvm-based instances exist", + errors.ECODE_INVAL) node_list = self.acquired_locks[locking.LEVEL_NODE] @@ -1784,7 +1956,7 @@ class LUSetClusterParams(LogicalUnit): constants.MIN_VG_SIZE) if vgstatus: raise errors.OpPrereqError("Error on node '%s': %s" % - (node, vgstatus)) + (node, vgstatus), errors.ECODE_ENVIRON) self.cluster = cluster = self.cfg.GetClusterInfo() # validate params changes @@ -1798,12 +1970,36 @@ class LUSetClusterParams(LogicalUnit): self.new_nicparams = objects.FillDict( cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams) objects.NIC.CheckParameterSyntax(self.new_nicparams) + nic_errors = [] + + # check all instances for consistency + for instance in self.cfg.GetAllInstancesInfo().values(): + for nic_idx, nic in enumerate(instance.nics): + params_copy = copy.deepcopy(nic.nicparams) + params_filled = objects.FillDict(self.new_nicparams, params_copy) + + # check parameter syntax + try: + objects.NIC.CheckParameterSyntax(params_filled) + except errors.ConfigurationError, err: + nic_errors.append("Instance %s, nic/%d: %s" % + (instance.name, nic_idx, err)) + + # if we're moving instances to routed, check that they have an ip + target_mode = params_filled[constants.NIC_MODE] + if target_mode == constants.NIC_MODE_ROUTED and not nic.ip: + nic_errors.append("Instance %s, nic/%d: routed nick with no ip" % + (instance.name, nic_idx)) + if nic_errors: + raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" % + "\n".join(nic_errors)) # hypervisor list/parameters self.new_hvparams = objects.FillDict(cluster.hvparams, {}) if self.op.hvparams: if not isinstance(self.op.hvparams, dict): - raise errors.OpPrereqError("Invalid 'hvparams' parameter on input") + raise errors.OpPrereqError("Invalid 'hvparams' parameter on input", + errors.ECODE_INVAL) for hv_name, hv_dict in self.op.hvparams.items(): if hv_name not in self.new_hvparams: self.new_hvparams[hv_name] = hv_dict @@ -1814,11 +2010,13 @@ class LUSetClusterParams(LogicalUnit): self.hv_list = self.op.enabled_hypervisors if not self.hv_list: raise errors.OpPrereqError("Enabled hypervisors list must contain at" - " least one member") + " least one member", + errors.ECODE_INVAL) invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES if invalid_hvs: raise errors.OpPrereqError("Enabled hypervisors contains invalid" - " entries: %s" % invalid_hvs) + " entries: %s" % " ,".join(invalid_hvs), + errors.ECODE_INVAL) else: self.hv_list = cluster.enabled_hypervisors @@ -1859,9 +2057,9 @@ class LUSetClusterParams(LogicalUnit): if self.op.candidate_pool_size is not None: self.cluster.candidate_pool_size = self.op.candidate_pool_size # we need to update the pool size here, otherwise the save will fail - _AdjustCandidatePool(self) + _AdjustCandidatePool(self, []) - self.cfg.Update(self.cluster) + self.cfg.Update(self.cluster, feedback_fn) def _RedistributeAncillaryFiles(lu, additional_nodes=None): @@ -1882,6 +2080,7 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None): dist_nodes.extend(additional_nodes) if myself.name in dist_nodes: dist_nodes.remove(myself.name) + # 2. Gather files to distribute dist_files = set([constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE, @@ -1931,11 +2130,11 @@ class LURedistributeConfig(NoHooksLU): """Redistribute the configuration. """ - self.cfg.Update(self.cfg.GetClusterInfo()) + self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn) _RedistributeAncillaryFiles(self) -def _WaitForSync(lu, instance, oneshot=False, unlock=False): +def _WaitForSync(lu, instance, oneshot=False): """Sleep and poll for an instance's disk to sync. """ @@ -1950,6 +2149,8 @@ def _WaitForSync(lu, instance, oneshot=False, unlock=False): for dev in instance.disks: lu.cfg.SetDiskID(dev, node) + # TODO: Convert to utils.Retry + retries = 0 degr_retries = 10 # in seconds, as we sleep 1 second each time while True: @@ -1984,7 +2185,8 @@ def _WaitForSync(lu, instance, oneshot=False, unlock=False): else: rem_time = "no time estimate" lu.proc.LogInfo("- device %s: %5.2f%% done, %s" % - (instance.disks[i].iv_name, mstat.sync_percent, rem_time)) + (instance.disks[i].iv_name, mstat.sync_percent, + rem_time)) # if we're done but degraded, let's do a few small retries, to # make sure we see a stable and not transient situation; therefore @@ -2046,11 +2248,14 @@ class LUDiagnoseOS(NoHooksLU): _OP_REQP = ["output_fields", "names"] REQ_BGL = False _FIELDS_STATIC = utils.FieldSet() - _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status") + _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants") + # Fields that need calculation of global os validity + _FIELDS_NEEDVALID = frozenset(["valid", "variants"]) def ExpandNames(self): if self.op.names: - raise errors.OpPrereqError("Selective OS query not supported") + raise errors.OpPrereqError("Selective OS query not supported", + errors.ECODE_INVAL) _CheckOutputFields(static=self._FIELDS_STATIC, dynamic=self._FIELDS_DYNAMIC, @@ -2094,14 +2299,14 @@ class LUDiagnoseOS(NoHooksLU): for node_name, nr in rlist.items(): if nr.fail_msg or not nr.payload: continue - for name, path, status, diagnose in nr.payload: + for name, path, status, diagnose, variants in nr.payload: if name not in all_os: # build a list of nodes for this os containing empty lists # for each node in node_list all_os[name] = {} for nname in good_nodes: all_os[name][nname] = [] - all_os[name][node_name].append((path, status, diagnose)) + all_os[name][node_name].append((path, status, diagnose, variants)) return all_os def Exec(self, feedback_fn): @@ -2112,18 +2317,38 @@ class LUDiagnoseOS(NoHooksLU): node_data = self.rpc.call_os_diagnose(valid_nodes) pol = self._DiagnoseByOS(valid_nodes, node_data) output = [] + calc_valid = self._FIELDS_NEEDVALID.intersection(self.op.output_fields) + calc_variants = "variants" in self.op.output_fields + for os_name, os_data in pol.items(): row = [] + if calc_valid: + valid = True + variants = None + for osl in os_data.values(): + valid = valid and osl and osl[0][1] + if not valid: + variants = None + break + if calc_variants: + node_variants = osl[0][3] + if variants is None: + variants = node_variants + else: + variants = [v for v in variants if v in node_variants] + for field in self.op.output_fields: if field == "name": val = os_name elif field == "valid": - val = utils.all([osl and osl[0][1] for osl in os_data.values()]) + val = valid elif field == "node_status": # this is just a copy of the dict val = {} for node_name, nos_list in os_data.items(): val[node_name] = nos_list + elif field == "variants": + val = variants else: raise errors.ParameterError(field) row.append(val) @@ -2152,7 +2377,8 @@ class LURemoveNode(LogicalUnit): "NODE_NAME": self.op.node_name, } all_nodes = self.cfg.GetNodeList() - all_nodes.remove(self.op.node_name) + if self.op.node_name in all_nodes: + all_nodes.remove(self.op.node_name) return env, all_nodes, all_nodes def CheckPrereq(self): @@ -2168,20 +2394,23 @@ class LURemoveNode(LogicalUnit): """ node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name)) if node is None: - raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name) + raise errors.OpPrereqError("Node '%s' is unknown." % self.op.node_name, + errors.ECODE_NOENT) instance_list = self.cfg.GetInstanceList() masternode = self.cfg.GetMasterNode() if node.name == masternode: raise errors.OpPrereqError("Node is the master node," - " you need to failover first.") + " you need to failover first.", + errors.ECODE_INVAL) for instance_name in instance_list: instance = self.cfg.GetInstanceInfo(instance_name) if node.name in instance.all_nodes: raise errors.OpPrereqError("Instance %s is still running on the node," - " please remove first." % instance_name) + " please remove first." % instance_name, + errors.ECODE_INVAL) self.op.node_name = node.name self.node = node @@ -2193,17 +2422,25 @@ class LURemoveNode(LogicalUnit): logging.info("Stopping the node daemon and removing configs from node %s", node.name) + modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup + + # Promote nodes to master candidate as needed + _AdjustCandidatePool(self, exceptions=[node.name]) self.context.RemoveNode(node.name) - result = self.rpc.call_node_leave_cluster(node.name) + # Run post hooks on the node before it's removed + hm = self.proc.hmclass(self.rpc.call_hooks_runner, self) + try: + h_results = hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name]) + except: + self.LogWarning("Errors occurred running hooks on %s" % node.name) + + result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup) msg = result.fail_msg if msg: self.LogWarning("Errors encountered on the remote node while leaving" " the cluster: %s", msg) - # Promote nodes to master candidate as needed - _AdjustCandidatePool(self) - class LUQueryNodes(NoHooksLU): """Logical unit for querying nodes. @@ -2211,6 +2448,10 @@ class LUQueryNodes(NoHooksLU): """ _OP_REQP = ["output_fields", "names", "use_locking"] REQ_BGL = False + + _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid", + "master_candidate", "offline", "drained"] + _FIELDS_DYNAMIC = utils.FieldSet( "dtotal", "dfree", "mtotal", "mnode", "mfree", @@ -2218,16 +2459,12 @@ class LUQueryNodes(NoHooksLU): "ctotal", "cnodes", "csockets", ) - _FIELDS_STATIC = utils.FieldSet( - "name", "pinst_cnt", "sinst_cnt", + _FIELDS_STATIC = utils.FieldSet(*[ + "pinst_cnt", "sinst_cnt", "pinst_list", "sinst_list", "pip", "sip", "tags", - "serial_no", - "master_candidate", "master", - "offline", - "drained", - "role", + "role"] + _SIMPLE_FIELDS ) def ExpandNames(self): @@ -2249,7 +2486,6 @@ class LUQueryNodes(NoHooksLU): # if we don't request only static fields, we need to lock the nodes self.needed_locks[locking.LEVEL_NODE] = self.wanted - def CheckPrereq(self): """Check prerequisites. @@ -2328,8 +2564,8 @@ class LUQueryNodes(NoHooksLU): for node in nodelist: node_output = [] for field in self.op.output_fields: - if field == "name": - val = node.name + if field in self._SIMPLE_FIELDS: + val = getattr(node, field) elif field == "pinst_list": val = list(node_to_primary[node.name]) elif field == "sinst_list": @@ -2344,16 +2580,8 @@ class LUQueryNodes(NoHooksLU): val = node.secondary_ip elif field == "tags": val = list(node.GetTags()) - elif field == "serial_no": - val = node.serial_no - elif field == "master_candidate": - val = node.master_candidate elif field == "master": val = node.name == master_node - elif field == "offline": - val = node.offline - elif field == "drained": - val = node.drained elif self._FIELDS_DYNAMIC.Matches(field): val = live_data[node.name].get(field, None) elif field == "role": @@ -2467,18 +2695,17 @@ class LUQueryNodeStorage(NoHooksLU): """ _OP_REQP = ["nodes", "storage_type", "output_fields"] REQ_BGL = False - _FIELDS_STATIC = utils.FieldSet("node") + _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE) def ExpandNames(self): storage_type = self.op.storage_type - if storage_type not in constants.VALID_STORAGE_FIELDS: - raise errors.OpPrereqError("Unknown storage type: %s" % storage_type) - - dynamic_fields = constants.VALID_STORAGE_FIELDS[storage_type] + if storage_type not in constants.VALID_STORAGE_TYPES: + raise errors.OpPrereqError("Unknown storage type: %s" % storage_type, + errors.ECODE_INVAL) _CheckOutputFields(static=self._FIELDS_STATIC, - dynamic=utils.FieldSet(*dynamic_fields), + dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS), selected=self.op.output_fields) self.needed_locks = {} @@ -2510,9 +2737,10 @@ class LUQueryNodeStorage(NoHooksLU): else: fields = [constants.SF_NAME] + self.op.output_fields - # Never ask for node as it's only known to the LU - while "node" in fields: - fields.remove("node") + # Never ask for node or type as it's only known to the LU + for extra in [constants.SF_NODE, constants.SF_TYPE]: + while extra in fields: + fields.remove(extra) field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)]) name_idx = field_idx[constants.SF_NAME] @@ -2542,8 +2770,10 @@ class LUQueryNodeStorage(NoHooksLU): out = [] for field in self.op.output_fields: - if field == "node": + if field == constants.SF_NODE: val = node + elif field == constants.SF_TYPE: + val = self.op.storage_type elif field in field_idx: val = row[field_idx[field]] else: @@ -2566,13 +2796,15 @@ class LUModifyNodeStorage(NoHooksLU): def CheckArguments(self): node_name = self.cfg.ExpandNodeName(self.op.node_name) if node_name is None: - raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name) + raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name, + errors.ECODE_NOENT) self.op.node_name = node_name storage_type = self.op.storage_type - if storage_type not in constants.VALID_STORAGE_FIELDS: - raise errors.OpPrereqError("Unknown storage type: %s" % storage_type) + if storage_type not in constants.VALID_STORAGE_TYPES: + raise errors.OpPrereqError("Unknown storage type: %s" % storage_type, + errors.ECODE_INVAL) def ExpandNames(self): self.needed_locks = { @@ -2589,13 +2821,15 @@ class LUModifyNodeStorage(NoHooksLU): modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type] except KeyError: raise errors.OpPrereqError("Storage units of type '%s' can not be" - " modified" % storage_type) + " modified" % storage_type, + errors.ECODE_INVAL) diff = set(self.op.changes.keys()) - modifiable if diff: raise errors.OpPrereqError("The following fields can not be modified for" " storage units of type '%s': %r" % - (storage_type, list(diff))) + (storage_type, list(diff)), + errors.ECODE_INVAL) def Exec(self, feedback_fn): """Computes the list of nodes and their attributes. @@ -2647,7 +2881,7 @@ class LUAddNode(LogicalUnit): node_name = self.op.node_name cfg = self.cfg - dns_data = utils.HostInfo(node_name) + dns_data = utils.GetHostInfo(node_name) node = dns_data.name primary_ip = self.op.primary_ip = dns_data.ip @@ -2655,15 +2889,17 @@ class LUAddNode(LogicalUnit): if secondary_ip is None: secondary_ip = primary_ip if not utils.IsValidIP(secondary_ip): - raise errors.OpPrereqError("Invalid secondary IP given") + raise errors.OpPrereqError("Invalid secondary IP given", + errors.ECODE_INVAL) self.op.secondary_ip = secondary_ip node_list = cfg.GetNodeList() if not self.op.readd and node in node_list: raise errors.OpPrereqError("Node %s is already in the configuration" % - node) + node, errors.ECODE_EXISTS) elif self.op.readd and node not in node_list: - raise errors.OpPrereqError("Node %s is not in the configuration" % node) + raise errors.OpPrereqError("Node %s is not in the configuration" % node, + errors.ECODE_NOENT) for existing_node_name in node_list: existing_node = cfg.GetNodeInfo(existing_node_name) @@ -2672,7 +2908,8 @@ class LUAddNode(LogicalUnit): if (existing_node.primary_ip != primary_ip or existing_node.secondary_ip != secondary_ip): raise errors.OpPrereqError("Readded node doesn't have the same IP" - " address configuration as before") + " address configuration as before", + errors.ECODE_INVAL) continue if (existing_node.primary_ip == primary_ip or @@ -2680,7 +2917,8 @@ class LUAddNode(LogicalUnit): existing_node.primary_ip == secondary_ip or existing_node.secondary_ip == secondary_ip): raise errors.OpPrereqError("New node ip address(es) conflict with" - " existing node %s" % existing_node.name) + " existing node %s" % existing_node.name, + errors.ECODE_NOTUNIQUE) # check that the type of the node (single versus dual homed) is the # same as for the master @@ -2690,31 +2928,32 @@ class LUAddNode(LogicalUnit): if master_singlehomed != newbie_singlehomed: if master_singlehomed: raise errors.OpPrereqError("The master has no private ip but the" - " new node has one") + " new node has one", + errors.ECODE_INVAL) else: raise errors.OpPrereqError("The master has a private ip but the" - " new node doesn't have one") + " new node doesn't have one", + errors.ECODE_INVAL) # checks reachability if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT): - raise errors.OpPrereqError("Node not reachable by ping") + raise errors.OpPrereqError("Node not reachable by ping", + errors.ECODE_ENVIRON) if not newbie_singlehomed: # check reachability from my secondary ip to newbie's secondary ip if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT, source=myself.secondary_ip): raise errors.OpPrereqError("Node secondary ip not reachable by TCP" - " based ping to noded port") + " based ping to noded port", + errors.ECODE_ENVIRON) - cp_size = self.cfg.GetClusterInfo().candidate_pool_size if self.op.readd: exceptions = [node] else: exceptions = [] - mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions) - # the new node will increase mc_max with one, so: - mc_max = min(mc_max + 1, cp_size) - self.master_candidate = mc_now < mc_max + + self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions) if self.op.readd: self.new_node = self.cfg.GetNodeInfo(node) @@ -2759,24 +2998,21 @@ class LUAddNode(LogicalUnit): (constants.PROTOCOL_VERSION, result.payload)) # setup ssh on node - logging.info("Copy ssh key to node %s", node) - priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS) - keyarray = [] - keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB, - constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB, - priv_key, pub_key] - - for i in keyfiles: - f = open(i, 'r') - try: - keyarray.append(f.read()) - finally: - f.close() - - result = self.rpc.call_node_add(node, keyarray[0], keyarray[1], - keyarray[2], - keyarray[3], keyarray[4], keyarray[5]) - result.Raise("Cannot transfer ssh keys to the new node") + if self.cfg.GetClusterInfo().modify_ssh_setup: + logging.info("Copy ssh key to node %s", node) + priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS) + keyarray = [] + keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB, + constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB, + priv_key, pub_key] + + for i in keyfiles: + keyarray.append(utils.ReadFile(i)) + + result = self.rpc.call_node_add(node, keyarray[0], keyarray[1], + keyarray[2], keyarray[3], keyarray[4], + keyarray[5]) + result.Raise("Cannot transfer ssh keys to the new node") # Add node to our /etc/hosts, and add key to known_hosts if self.cfg.GetClusterInfo().modify_etc_hosts: @@ -2786,7 +3022,7 @@ class LUAddNode(LogicalUnit): result = self.rpc.call_node_has_ip_address(new_node.name, new_node.secondary_ip) result.Raise("Failure checking secondary ip on node %s" % new_node.name, - prereq=True) + prereq=True, ecode=errors.ECODE_ENVIRON) if not result.payload: raise errors.OpExecError("Node claims it doesn't have the secondary ip" " you gave (%s). Please fix and re-run this" @@ -2794,7 +3030,7 @@ class LUAddNode(LogicalUnit): node_verify_list = [self.cfg.GetMasterNode()] node_verify_param = { - 'nodelist': [node], + constants.NV_NODELIST: [node], # TODO: do a node-net-test as well? } @@ -2802,10 +3038,11 @@ class LUAddNode(LogicalUnit): self.cfg.GetClusterName()) for verifier in node_verify_list: result[verifier].Raise("Cannot communicate with node %s" % verifier) - nl_payload = result[verifier].payload['nodelist'] + nl_payload = result[verifier].payload[constants.NV_NODELIST] if nl_payload: for failed in nl_payload: - feedback_fn("ssh/hostname verification failed %s -> %s" % + feedback_fn("ssh/hostname verification failed" + " (checking from %s): %s" % (verifier, nl_payload[failed])) raise errors.OpExecError("ssh/hostname verification failed.") @@ -2813,17 +3050,17 @@ class LUAddNode(LogicalUnit): _RedistributeAncillaryFiles(self) self.context.ReaddNode(new_node) # make sure we redistribute the config - self.cfg.Update(new_node) + self.cfg.Update(new_node, feedback_fn) # and make sure the new node will not have old files around if not new_node.master_candidate: result = self.rpc.call_node_demote_from_mc(new_node.name) - msg = result.RemoteFailMsg() + msg = result.fail_msg if msg: self.LogWarning("Node failed to demote itself from master" " candidate status: %s" % msg) else: _RedistributeAncillaryFiles(self, additional_nodes=[node]) - self.context.AddNode(new_node) + self.context.AddNode(new_node, self.proc.GetECId()) class LUSetNodeParams(LogicalUnit): @@ -2838,17 +3075,20 @@ class LUSetNodeParams(LogicalUnit): def CheckArguments(self): node_name = self.cfg.ExpandNodeName(self.op.node_name) if node_name is None: - raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name) + raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name, + errors.ECODE_INVAL) self.op.node_name = node_name _CheckBooleanOpField(self.op, 'master_candidate') _CheckBooleanOpField(self.op, 'offline') _CheckBooleanOpField(self.op, 'drained') all_mods = [self.op.offline, self.op.master_candidate, self.op.drained] if all_mods.count(None) == 3: - raise errors.OpPrereqError("Please pass at least one modification") + raise errors.OpPrereqError("Please pass at least one modification", + errors.ECODE_INVAL) if all_mods.count(True) > 1: raise errors.OpPrereqError("Can't set the node into more than one" - " state at the same time") + " state at the same time", + errors.ECODE_INVAL) def ExpandNames(self): self.needed_locks = {locking.LEVEL_NODE: self.op.node_name} @@ -2877,27 +3117,48 @@ class LUSetNodeParams(LogicalUnit): """ node = self.node = self.cfg.GetNodeInfo(self.op.node_name) - if ((self.op.master_candidate == False or self.op.offline == True or - self.op.drained == True) and node.master_candidate): - # we will demote the node from master_candidate + if (self.op.master_candidate is not None or + self.op.drained is not None or + self.op.offline is not None): + # we can't change the master's node flags if self.op.node_name == self.cfg.GetMasterNode(): - raise errors.OpPrereqError("The master node has to be a" - " master candidate, online and not drained") + raise errors.OpPrereqError("The master role can be changed" + " only via masterfailover", + errors.ECODE_INVAL) + + # Boolean value that tells us whether we're offlining or draining the node + offline_or_drain = self.op.offline == True or self.op.drained == True + deoffline_or_drain = self.op.offline == False or self.op.drained == False + + if (node.master_candidate and + (self.op.master_candidate == False or offline_or_drain)): cp_size = self.cfg.GetClusterInfo().candidate_pool_size - num_candidates, _ = self.cfg.GetMasterCandidateStats() - if num_candidates <= cp_size: + mc_now, mc_should, mc_max = self.cfg.GetMasterCandidateStats() + if mc_now <= cp_size: msg = ("Not enough master candidates (desired" - " %d, new value will be %d)" % (cp_size, num_candidates-1)) - if self.op.force: + " %d, new value will be %d)" % (cp_size, mc_now-1)) + # Only allow forcing the operation if it's an offline/drain operation, + # and we could not possibly promote more nodes. + # FIXME: this can still lead to issues if in any way another node which + # could be promoted appears in the meantime. + if self.op.force and offline_or_drain and mc_should == mc_max: self.LogWarning(msg) else: - raise errors.OpPrereqError(msg) + raise errors.OpPrereqError(msg, errors.ECODE_INVAL) if (self.op.master_candidate == True and ((node.offline and not self.op.offline == False) or (node.drained and not self.op.drained == False))): raise errors.OpPrereqError("Node '%s' is offline or drained, can't set" - " to master_candidate" % node.name) + " to master_candidate" % node.name, + errors.ECODE_INVAL) + + # If we're being deofflined/drained, we'll MC ourself if needed + if (deoffline_or_drain and not offline_or_drain and not + self.op.master_candidate == True): + self.op.master_candidate = _DecideSelfPromotion(self) + if self.op.master_candidate: + self.LogInfo("Autopromoting node to master candidate") return @@ -2941,7 +3202,7 @@ class LUSetNodeParams(LogicalUnit): changed_mc = True result.append(("master_candidate", "auto-demotion due to drain")) rrc = self.rpc.call_node_demote_from_mc(node.name) - msg = rrc.RemoteFailMsg() + msg = rrc.fail_msg if msg: self.LogWarning("Node failed to demote itself: %s" % msg) if node.offline: @@ -2949,7 +3210,7 @@ class LUSetNodeParams(LogicalUnit): result.append(("offline", "clear offline status due to drain")) # this will trigger configuration file update, if needed - self.cfg.Update(node) + self.cfg.Update(node, feedback_fn) # this will trigger job queue propagation or cleanup if changed_mc: self.context.ReaddNode(node) @@ -2967,11 +3228,13 @@ class LUPowercycleNode(NoHooksLU): def CheckArguments(self): node_name = self.cfg.ExpandNodeName(self.op.node_name) if node_name is None: - raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name) + raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name, + errors.ECODE_NOENT) self.op.node_name = node_name if node_name == self.cfg.GetMasterNode() and not self.op.force: raise errors.OpPrereqError("The node is the master and the force" - " parameter was not set") + " parameter was not set", + errors.ECODE_INVAL) def ExpandNames(self): """Locking for PowercycleNode. @@ -3040,6 +3303,10 @@ class LUQueryClusterInfo(NoHooksLU): "master_netdev": cluster.master_netdev, "volume_group_name": cluster.volume_group_name, "file_storage_dir": cluster.file_storage_dir, + "ctime": cluster.ctime, + "mtime": cluster.mtime, + "uuid": cluster.uuid, + "tags": list(cluster.GetTags()), } return result @@ -3052,7 +3319,8 @@ class LUQueryConfigValues(NoHooksLU): _OP_REQP = [] REQ_BGL = False _FIELDS_DYNAMIC = utils.FieldSet() - _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag") + _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag", + "watcher_pause") def ExpandNames(self): self.needed_locks = {} @@ -3079,6 +3347,8 @@ class LUQueryConfigValues(NoHooksLU): entry = self.cfg.GetMasterNode() elif field == "drain_flag": entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE) + elif field == "watcher_pause": + return utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE) else: raise errors.ParameterError(field) values.append(entry) @@ -3181,6 +3451,8 @@ def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False, # 2nd pass, do only the primary node for inst_disk in instance.disks: + dev_path = None + for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node): if node != instance.primary_node: continue @@ -3195,8 +3467,10 @@ def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False, " (is_primary=True, pass=2): %s", inst_disk.iv_name, node, msg) disks_ok = False - device_info.append((instance.primary_node, inst_disk.iv_name, - result.payload)) + else: + dev_path = result.payload + + device_info.append((instance.primary_node, inst_disk.iv_name, dev_path)) # leave the disks configured for the primary node # this is a workaround that would be fixed better by @@ -3320,15 +3594,18 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name): """ nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name) - nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True) + nodeinfo[node].Raise("Can't get data from node %s" % node, + prereq=True, ecode=errors.ECODE_ENVIRON) free_mem = nodeinfo[node].payload.get('memory_free', None) if not isinstance(free_mem, int): raise errors.OpPrereqError("Can't compute free memory on node %s, result" - " was '%s'" % (node, free_mem)) + " was '%s'" % (node, free_mem), + errors.ECODE_ENVIRON) if requested > free_mem: raise errors.OpPrereqError("Not enough memory on node %s for %s:" " needed %s MiB, available %s MiB" % - (node, reason, requested, free_mem)) + (node, reason, requested, free_mem), + errors.ECODE_NORES) class LUStartupInstance(LogicalUnit): @@ -3371,7 +3648,8 @@ class LUStartupInstance(LogicalUnit): if self.beparams: if not isinstance(self.beparams, dict): raise errors.OpPrereqError("Invalid beparams passed: %s, expected" - " dict" % (type(self.beparams), )) + " dict" % (type(self.beparams), ), + errors.ECODE_INVAL) # fill the beparams dict utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES) self.op.beparams = self.beparams @@ -3381,7 +3659,8 @@ class LUStartupInstance(LogicalUnit): if self.hvparams: if not isinstance(self.hvparams, dict): raise errors.OpPrereqError("Invalid hvparams passed: %s, expected" - " dict" % (type(self.hvparams), )) + " dict" % (type(self.hvparams), ), + errors.ECODE_INVAL) # check hypervisor parameter syntax (locally) cluster = self.cfg.GetClusterInfo() @@ -3404,7 +3683,7 @@ class LUStartupInstance(LogicalUnit): instance.name, instance.hypervisor) remote_info.Raise("Error checking node %s" % instance.primary_node, - prereq=True) + prereq=True, ecode=errors.ECODE_ENVIRON) if not remote_info.payload: # not running already _CheckNodeFreeMemory(self, instance.primary_node, "starting instance %s" % instance.name, @@ -3440,6 +3719,13 @@ class LURebootInstance(LogicalUnit): _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"] REQ_BGL = False + def CheckArguments(self): + """Check the arguments. + + """ + self.shutdown_timeout = getattr(self.op, "shutdown_timeout", + constants.DEFAULT_SHUTDOWN_TIMEOUT) + def ExpandNames(self): if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT, constants.INSTANCE_REBOOT_HARD, @@ -3459,6 +3745,7 @@ class LURebootInstance(LogicalUnit): env = { "IGNORE_SECONDARIES": self.op.ignore_secondaries, "REBOOT_TYPE": self.op.reboot_type, + "SHUTDOWN_TIMEOUT": self.shutdown_timeout, } env.update(_BuildInstanceHookEnvByObject(self, self.instance)) nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) @@ -3494,10 +3781,12 @@ class LURebootInstance(LogicalUnit): for disk in instance.disks: self.cfg.SetDiskID(disk, node_current) result = self.rpc.call_instance_reboot(node_current, instance, - reboot_type) + reboot_type, + self.shutdown_timeout) result.Raise("Could not reboot instance") else: - result = self.rpc.call_instance_shutdown(node_current, instance) + result = self.rpc.call_instance_shutdown(node_current, instance, + self.shutdown_timeout) result.Raise("Could not shutdown instance for full reboot") _ShutdownInstanceDisks(self, instance) _StartInstanceDisks(self, instance, ignore_secondaries) @@ -3520,6 +3809,13 @@ class LUShutdownInstance(LogicalUnit): _OP_REQP = ["instance_name"] REQ_BGL = False + def CheckArguments(self): + """Check the arguments. + + """ + self.timeout = getattr(self.op, "timeout", + constants.DEFAULT_SHUTDOWN_TIMEOUT) + def ExpandNames(self): self._ExpandAndLockInstance() @@ -3530,6 +3826,7 @@ class LUShutdownInstance(LogicalUnit): """ env = _BuildInstanceHookEnvByObject(self, self.instance) + env["TIMEOUT"] = self.timeout nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) return env, nl, nl @@ -3550,8 +3847,9 @@ class LUShutdownInstance(LogicalUnit): """ instance = self.instance node_current = instance.primary_node + timeout = self.timeout self.cfg.MarkInstanceDown(instance.name) - result = self.rpc.call_instance_shutdown(node_current, instance) + result = self.rpc.call_instance_shutdown(node_current, instance, timeout) msg = result.fail_msg if msg: self.proc.LogWarning("Could not shutdown instance: %s" % msg) @@ -3594,31 +3892,38 @@ class LUReinstallInstance(LogicalUnit): if instance.disk_template == constants.DT_DISKLESS: raise errors.OpPrereqError("Instance '%s' has no disks" % - self.op.instance_name) + self.op.instance_name, + errors.ECODE_INVAL) if instance.admin_up: raise errors.OpPrereqError("Instance '%s' is marked to be up" % - self.op.instance_name) + self.op.instance_name, + errors.ECODE_STATE) remote_info = self.rpc.call_instance_info(instance.primary_node, instance.name, instance.hypervisor) remote_info.Raise("Error checking node %s" % instance.primary_node, - prereq=True) + prereq=True, ecode=errors.ECODE_ENVIRON) if remote_info.payload: raise errors.OpPrereqError("Instance '%s' is running on the node %s" % (self.op.instance_name, - instance.primary_node)) + instance.primary_node), + errors.ECODE_STATE) self.op.os_type = getattr(self.op, "os_type", None) + self.op.force_variant = getattr(self.op, "force_variant", False) if self.op.os_type is not None: # OS verification pnode = self.cfg.GetNodeInfo( self.cfg.ExpandNodeName(instance.primary_node)) if pnode is None: raise errors.OpPrereqError("Primary node '%s' is unknown" % - self.op.pnode) + self.op.pnode, errors.ECODE_NOENT) result = self.rpc.call_os_get(pnode.name, self.op.os_type) result.Raise("OS '%s' not in supported OS list for primary node %s" % - (self.op.os_type, pnode.name), prereq=True) + (self.op.os_type, pnode.name), + prereq=True, ecode=errors.ECODE_INVAL) + if not self.op.force_variant: + _CheckOSVariant(result.payload, self.op.os_type) self.instance = instance @@ -3631,7 +3936,7 @@ class LUReinstallInstance(LogicalUnit): if self.op.os_type is not None: feedback_fn("Changing OS to '%s'..." % self.op.os_type) inst.os = self.op.os_type - self.cfg.Update(inst) + self.cfg.Update(inst, feedback_fn) _StartInstanceDisks(self, inst, None) try: @@ -3657,12 +3962,12 @@ class LURecreateInstanceDisks(LogicalUnit): """ if not isinstance(self.op.disks, list): - raise errors.OpPrereqError("Invalid disks parameter") + raise errors.OpPrereqError("Invalid disks parameter", errors.ECODE_INVAL) for item in self.op.disks: if (not isinstance(item, int) or item < 0): raise errors.OpPrereqError("Invalid disk specification '%s'" % - str(item)) + str(item), errors.ECODE_INVAL) def ExpandNames(self): self._ExpandAndLockInstance() @@ -3690,26 +3995,27 @@ class LURecreateInstanceDisks(LogicalUnit): if instance.disk_template == constants.DT_DISKLESS: raise errors.OpPrereqError("Instance '%s' has no disks" % - self.op.instance_name) + self.op.instance_name, errors.ECODE_INVAL) if instance.admin_up: raise errors.OpPrereqError("Instance '%s' is marked to be up" % - self.op.instance_name) + self.op.instance_name, errors.ECODE_STATE) remote_info = self.rpc.call_instance_info(instance.primary_node, instance.name, instance.hypervisor) remote_info.Raise("Error checking node %s" % instance.primary_node, - prereq=True) + prereq=True, ecode=errors.ECODE_ENVIRON) if remote_info.payload: raise errors.OpPrereqError("Instance '%s' is running on the node %s" % (self.op.instance_name, - instance.primary_node)) + instance.primary_node), errors.ECODE_STATE) if not self.op.disks: self.op.disks = range(len(instance.disks)) else: for idx in self.op.disks: if idx >= len(instance.disks): - raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx) + raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx, + errors.ECODE_INVAL) self.instance = instance @@ -3755,36 +4061,37 @@ class LURenameInstance(LogicalUnit): self.cfg.ExpandInstanceName(self.op.instance_name)) if instance is None: raise errors.OpPrereqError("Instance '%s' not known" % - self.op.instance_name) + self.op.instance_name, errors.ECODE_NOENT) _CheckNodeOnline(self, instance.primary_node) if instance.admin_up: raise errors.OpPrereqError("Instance '%s' is marked to be up" % - self.op.instance_name) + self.op.instance_name, errors.ECODE_STATE) remote_info = self.rpc.call_instance_info(instance.primary_node, instance.name, instance.hypervisor) remote_info.Raise("Error checking node %s" % instance.primary_node, - prereq=True) + prereq=True, ecode=errors.ECODE_ENVIRON) if remote_info.payload: raise errors.OpPrereqError("Instance '%s' is running on the node %s" % (self.op.instance_name, - instance.primary_node)) + instance.primary_node), errors.ECODE_STATE) self.instance = instance # new name verification - name_info = utils.HostInfo(self.op.new_name) + name_info = utils.GetHostInfo(self.op.new_name) self.op.new_name = new_name = name_info.name instance_list = self.cfg.GetInstanceList() if new_name in instance_list: raise errors.OpPrereqError("Instance '%s' is already in the cluster" % - new_name) + new_name, errors.ECODE_EXISTS) if not getattr(self.op, "ignore_ip", False): if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT): raise errors.OpPrereqError("IP %s of instance %s already in use" % - (name_info.ip, new_name)) + (name_info.ip, new_name), + errors.ECODE_NOTUNIQUE) def Exec(self, feedback_fn): @@ -3838,6 +4145,13 @@ class LURemoveInstance(LogicalUnit): _OP_REQP = ["instance_name", "ignore_failures"] REQ_BGL = False + def CheckArguments(self): + """Check the arguments. + + """ + self.shutdown_timeout = getattr(self.op, "shutdown_timeout", + constants.DEFAULT_SHUTDOWN_TIMEOUT) + def ExpandNames(self): self._ExpandAndLockInstance() self.needed_locks[locking.LEVEL_NODE] = [] @@ -3854,6 +4168,7 @@ class LURemoveInstance(LogicalUnit): """ env = _BuildInstanceHookEnvByObject(self, self.instance) + env["SHUTDOWN_TIMEOUT"] = self.shutdown_timeout nl = [self.cfg.GetMasterNode()] return env, nl, nl @@ -3875,7 +4190,8 @@ class LURemoveInstance(LogicalUnit): logging.info("Shutting down instance %s on node %s", instance.name, instance.primary_node) - result = self.rpc.call_instance_shutdown(instance.primary_node, instance) + result = self.rpc.call_instance_shutdown(instance.primary_node, instance, + self.shutdown_timeout) msg = result.fail_msg if msg: if self.op.ignore_failures: @@ -3905,6 +4221,8 @@ class LUQueryInstances(NoHooksLU): """ _OP_REQP = ["output_fields", "names", "use_locking"] REQ_BGL = False + _SIMPLE_FIELDS = ["name", "os", "network_port", "hypervisor", + "serial_no", "ctime", "mtime", "uuid"] _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes", "admin_state", "disk_template", "ip", "mac", "bridge", @@ -3917,9 +4235,11 @@ class LUQueryInstances(NoHooksLU): r"(nic)\.(bridge)/([0-9]+)", r"(nic)\.(macs|ips|modes|links|bridges)", r"(disk|nic)\.(count)", - "serial_no", "hypervisor", "hvparams",] + + "hvparams", + ] + _SIMPLE_FIELDS + ["hv/%s" % name - for name in constants.HVS_PARAMETERS] + + for name in constants.HVS_PARAMETERS + if name not in constants.HVC_GLOBALS] + ["be/%s" % name for name in constants.BES_PARAMETERS]) _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status") @@ -3997,7 +4317,7 @@ class LUQueryInstances(NoHooksLU): if result.offline: # offline nodes will be in both lists off_nodes.append(name) - if result.failed or result.fail_msg: + if result.fail_msg: bad_nodes.append(name) else: if result.payload: @@ -4014,16 +4334,14 @@ class LUQueryInstances(NoHooksLU): cluster = self.cfg.GetClusterInfo() for instance in instance_list: iout = [] - i_hv = cluster.FillHV(instance) + i_hv = cluster.FillHV(instance, skip_globals=True) i_be = cluster.FillBE(instance) i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT], nic.nicparams) for nic in instance.nics] for field in self.op.output_fields: st_match = self._FIELDS_STATIC.Matches(field) - if field == "name": - val = instance.name - elif field == "os": - val = instance.os + if field in self._SIMPLE_FIELDS: + val = getattr(instance, field) elif field == "pnode": val = instance.primary_node elif field == "snodes": @@ -4100,16 +4418,11 @@ class LUQueryInstances(NoHooksLU): val = _ComputeDiskSize(instance.disk_template, disk_sizes) elif field == "tags": val = list(instance.GetTags()) - elif field == "serial_no": - val = instance.serial_no - elif field == "network_port": - val = instance.network_port - elif field == "hypervisor": - val = instance.hypervisor elif field == "hvparams": val = i_hv elif (field.startswith(HVPREFIX) and - field[len(HVPREFIX):] in constants.HVS_PARAMETERS): + field[len(HVPREFIX):] in constants.HVS_PARAMETERS and + field[len(HVPREFIX):] not in constants.HVC_GLOBALS): val = i_hv.get(field[len(HVPREFIX):], None) elif field == "beparams": val = i_be @@ -4191,6 +4504,13 @@ class LUFailoverInstance(LogicalUnit): _OP_REQP = ["instance_name", "ignore_consistency"] REQ_BGL = False + def CheckArguments(self): + """Check the arguments. + + """ + self.shutdown_timeout = getattr(self.op, "shutdown_timeout", + constants.DEFAULT_SHUTDOWN_TIMEOUT) + def ExpandNames(self): self._ExpandAndLockInstance() self.needed_locks[locking.LEVEL_NODE] = [] @@ -4208,6 +4528,7 @@ class LUFailoverInstance(LogicalUnit): """ env = { "IGNORE_CONSISTENCY": self.op.ignore_consistency, + "SHUTDOWN_TIMEOUT": self.shutdown_timeout, } env.update(_BuildInstanceHookEnvByObject(self, self.instance)) nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes) @@ -4226,7 +4547,8 @@ class LUFailoverInstance(LogicalUnit): bep = self.cfg.GetClusterInfo().FillBE(instance) if instance.disk_template not in constants.DTS_NET_MIRROR: raise errors.OpPrereqError("Instance's disk layout is not" - " network mirrored, cannot failover.") + " network mirrored, cannot failover.", + errors.ECODE_STATE) secondary_nodes = instance.secondary_nodes if not secondary_nodes: @@ -4260,19 +4582,23 @@ class LUFailoverInstance(LogicalUnit): source_node = instance.primary_node target_node = instance.secondary_nodes[0] - feedback_fn("* checking disk consistency between source and target") - for dev in instance.disks: - # for drbd, these are drbd over lvm - if not _CheckDiskConsistency(self, dev, target_node, False): - if instance.admin_up and not self.op.ignore_consistency: - raise errors.OpExecError("Disk %s is degraded on target node," - " aborting failover." % dev.iv_name) + if instance.admin_up: + feedback_fn("* checking disk consistency between source and target") + for dev in instance.disks: + # for drbd, these are drbd over lvm + if not _CheckDiskConsistency(self, dev, target_node, False): + if not self.op.ignore_consistency: + raise errors.OpExecError("Disk %s is degraded on target node," + " aborting failover." % dev.iv_name) + else: + feedback_fn("* not checking disk consistency as instance is not running") feedback_fn("* shutting down instance on source node") logging.info("Shutting down instance %s on node %s", instance.name, source_node) - result = self.rpc.call_instance_shutdown(source_node, instance) + result = self.rpc.call_instance_shutdown(source_node, instance, + self.shutdown_timeout) msg = result.fail_msg if msg: if self.op.ignore_consistency: @@ -4291,7 +4617,7 @@ class LUFailoverInstance(LogicalUnit): instance.primary_node = target_node # distribute new instance config to the other nodes - self.cfg.Update(instance) + self.cfg.Update(instance, feedback_fn) # Only start the instance if it's marked as up if instance.admin_up: @@ -4355,6 +4681,191 @@ class LUMigrateInstance(LogicalUnit): return env, nl, nl +class LUMoveInstance(LogicalUnit): + """Move an instance by data-copying. + + """ + HPATH = "instance-move" + HTYPE = constants.HTYPE_INSTANCE + _OP_REQP = ["instance_name", "target_node"] + REQ_BGL = False + + def CheckArguments(self): + """Check the arguments. + + """ + self.shutdown_timeout = getattr(self.op, "shutdown_timeout", + constants.DEFAULT_SHUTDOWN_TIMEOUT) + + def ExpandNames(self): + self._ExpandAndLockInstance() + target_node = self.cfg.ExpandNodeName(self.op.target_node) + if target_node is None: + raise errors.OpPrereqError("Node '%s' not known" % + self.op.target_node, errors.ECODE_NOENT) + self.op.target_node = target_node + self.needed_locks[locking.LEVEL_NODE] = [target_node] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE: + self._LockInstancesNodes(primary_only=True) + + def BuildHooksEnv(self): + """Build hooks env. + + This runs on master, primary and secondary nodes of the instance. + + """ + env = { + "TARGET_NODE": self.op.target_node, + "SHUTDOWN_TIMEOUT": self.shutdown_timeout, + } + env.update(_BuildInstanceHookEnvByObject(self, self.instance)) + nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node, + self.op.target_node] + return env, nl, nl + + def CheckPrereq(self): + """Check prerequisites. + + This checks that the instance is in the cluster. + + """ + self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name) + assert self.instance is not None, \ + "Cannot retrieve locked instance %s" % self.op.instance_name + + node = self.cfg.GetNodeInfo(self.op.target_node) + assert node is not None, \ + "Cannot retrieve locked node %s" % self.op.target_node + + self.target_node = target_node = node.name + + if target_node == instance.primary_node: + raise errors.OpPrereqError("Instance %s is already on the node %s" % + (instance.name, target_node), + errors.ECODE_STATE) + + bep = self.cfg.GetClusterInfo().FillBE(instance) + + for idx, dsk in enumerate(instance.disks): + if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE): + raise errors.OpPrereqError("Instance disk %d has a complex layout," + " cannot copy", errors.ECODE_STATE) + + _CheckNodeOnline(self, target_node) + _CheckNodeNotDrained(self, target_node) + + if instance.admin_up: + # check memory requirements on the secondary node + _CheckNodeFreeMemory(self, target_node, "failing over instance %s" % + instance.name, bep[constants.BE_MEMORY], + instance.hypervisor) + else: + self.LogInfo("Not checking memory on the secondary node as" + " instance will not be started") + + # check bridge existance + _CheckInstanceBridgesExist(self, instance, node=target_node) + + def Exec(self, feedback_fn): + """Move an instance. + + The move is done by shutting it down on its present node, copying + the data over (slow) and starting it on the new node. + + """ + instance = self.instance + + source_node = instance.primary_node + target_node = self.target_node + + self.LogInfo("Shutting down instance %s on source node %s", + instance.name, source_node) + + result = self.rpc.call_instance_shutdown(source_node, instance, + self.shutdown_timeout) + msg = result.fail_msg + if msg: + if self.op.ignore_consistency: + self.proc.LogWarning("Could not shutdown instance %s on node %s." + " Proceeding anyway. Please make sure node" + " %s is down. Error details: %s", + instance.name, source_node, source_node, msg) + else: + raise errors.OpExecError("Could not shutdown instance %s on" + " node %s: %s" % + (instance.name, source_node, msg)) + + # create the target disks + try: + _CreateDisks(self, instance, target_node=target_node) + except errors.OpExecError: + self.LogWarning("Device creation failed, reverting...") + try: + _RemoveDisks(self, instance, target_node=target_node) + finally: + self.cfg.ReleaseDRBDMinors(instance.name) + raise + + cluster_name = self.cfg.GetClusterInfo().cluster_name + + errs = [] + # activate, get path, copy the data over + for idx, disk in enumerate(instance.disks): + self.LogInfo("Copying data for disk %d", idx) + result = self.rpc.call_blockdev_assemble(target_node, disk, + instance.name, True) + if result.fail_msg: + self.LogWarning("Can't assemble newly created disk %d: %s", + idx, result.fail_msg) + errs.append(result.fail_msg) + break + dev_path = result.payload + result = self.rpc.call_blockdev_export(source_node, disk, + target_node, dev_path, + cluster_name) + if result.fail_msg: + self.LogWarning("Can't copy data over for disk %d: %s", + idx, result.fail_msg) + errs.append(result.fail_msg) + break + + if errs: + self.LogWarning("Some disks failed to copy, aborting") + try: + _RemoveDisks(self, instance, target_node=target_node) + finally: + self.cfg.ReleaseDRBDMinors(instance.name) + raise errors.OpExecError("Errors during disk copy: %s" % + (",".join(errs),)) + + instance.primary_node = target_node + self.cfg.Update(instance, feedback_fn) + + self.LogInfo("Removing the disks on the original node") + _RemoveDisks(self, instance, target_node=source_node) + + # Only start the instance if it's marked as up + if instance.admin_up: + self.LogInfo("Starting instance %s on node %s", + instance.name, target_node) + + disks_ok, _ = _AssembleInstanceDisks(self, instance, + ignore_secondaries=True) + if not disks_ok: + _ShutdownInstanceDisks(self, instance) + raise errors.OpExecError("Can't activate the instance's disks") + + result = self.rpc.call_instance_start(target_node, instance, None, None) + msg = result.fail_msg + if msg: + _ShutdownInstanceDisks(self, instance) + raise errors.OpExecError("Could not start instance %s on node %s: %s" % + (instance.name, target_node, msg)) + + class LUMigrateNode(LogicalUnit): """Migrate all instances from a node. @@ -4367,7 +4878,8 @@ class LUMigrateNode(LogicalUnit): def ExpandNames(self): self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name) if self.op.node_name is None: - raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name) + raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name, + errors.ECODE_NOENT) self.needed_locks = { locking.LEVEL_NODE: [self.op.node_name], @@ -4431,11 +4943,11 @@ class TLMigrateInstance(Tasklet): self.cfg.ExpandInstanceName(self.instance_name)) if instance is None: raise errors.OpPrereqError("Instance '%s' not known" % - self.instance_name) + self.instance_name, errors.ECODE_NOENT) if instance.disk_template != constants.DT_DRBD8: raise errors.OpPrereqError("Instance's disk layout is not" - " drbd8, cannot migrate.") + " drbd8, cannot migrate.", errors.ECODE_STATE) secondary_nodes = instance.secondary_nodes if not secondary_nodes: @@ -4457,7 +4969,8 @@ class TLMigrateInstance(Tasklet): _CheckNodeNotDrained(self, target_node) result = self.rpc.call_instance_migratable(instance.primary_node, instance) - result.Raise("Can't migrate, please use failover", prereq=True) + result.Raise("Can't migrate, please use failover", + prereq=True, ecode=errors.ECODE_STATE) self.instance = instance @@ -4569,7 +5082,7 @@ class TLMigrateInstance(Tasklet): self.feedback_fn("* instance running on secondary node (%s)," " updating config" % target_node) instance.primary_node = target_node - self.cfg.Update(instance) + self.cfg.Update(instance, self.feedback_fn) demoted_node = source_node else: self.feedback_fn("* instance confirmed to be running on its" @@ -4619,8 +5132,8 @@ class TLMigrateInstance(Tasklet): False) abort_msg = abort_result.fail_msg if abort_msg: - logging.error("Aborting migration failed on target node %s: %s" % - (target_node, abort_msg)) + logging.error("Aborting migration failed on target node %s: %s", + target_node, abort_msg) # Don't raise an exception here, as we stil have to try to revert the # disk status, even if this step failed. @@ -4674,6 +5187,7 @@ class TLMigrateInstance(Tasklet): if msg: logging.error("Instance pre-migration failed, trying to revert" " disk status: %s", msg) + self.feedback_fn("Pre-migration failed, aborting") self._AbortMigration() self._RevertDiskStatus() raise errors.OpExecError("Could not pre-migrate instance %s: %s" % @@ -4688,6 +5202,7 @@ class TLMigrateInstance(Tasklet): if msg: logging.error("Instance migration failed, trying to revert" " disk status: %s", msg) + self.feedback_fn("Migration failed, aborting") self._AbortMigration() self._RevertDiskStatus() raise errors.OpExecError("Could not migrate instance %s: %s" % @@ -4696,7 +5211,7 @@ class TLMigrateInstance(Tasklet): instance.primary_node = target_node # distribute new instance config to the other nodes - self.cfg.Update(instance) + self.cfg.Update(instance, self.feedback_fn) result = self.rpc.call_finalize_migration(target_node, instance, @@ -4705,7 +5220,7 @@ class TLMigrateInstance(Tasklet): msg = result.fail_msg if msg: logging.error("Instance migration succeeded, but finalization failed:" - " %s" % msg) + " %s", msg) raise errors.OpExecError("Could not finalize instance migration: %s" % msg) @@ -4819,7 +5334,7 @@ def _GenerateUniqueNames(lu, exts): """ results = [] for val in exts: - new_id = lu.cfg.GenerateUniqueID() + new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId()) results.append("%s%s" % (new_id, val)) return results @@ -4831,7 +5346,7 @@ def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name, """ port = lu.cfg.AllocatePort() vgname = lu.cfg.GetVGName() - shared_secret = lu.cfg.GenerateDRBDSecret() + shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId()) dev_data = objects.Disk(dev_type=constants.LD_LV, size=size, logical_id=(vgname, names[0])) dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128, @@ -4918,7 +5433,7 @@ def _GetInstanceInfoText(instance): return "originstname+%s" % instance.name -def _CreateDisks(lu, instance, to_skip=None): +def _CreateDisks(lu, instance, to_skip=None, target_node=None): """Create all disks for an instance. This abstracts away some work from AddInstance. @@ -4929,19 +5444,26 @@ def _CreateDisks(lu, instance, to_skip=None): @param instance: the instance whose disks we should create @type to_skip: list @param to_skip: list of indices to skip + @type target_node: string + @param target_node: if passed, overrides the target node for creation @rtype: boolean @return: the success of the creation """ info = _GetInstanceInfoText(instance) - pnode = instance.primary_node + if target_node is None: + pnode = instance.primary_node + all_nodes = instance.all_nodes + else: + pnode = target_node + all_nodes = [pnode] if instance.disk_template == constants.DT_FILE: file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1]) result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir) result.Raise("Failed to create directory '%s' on" - " node %s: %s" % (file_storage_dir, pnode)) + " node %s" % (file_storage_dir, pnode)) # Note: this needs to be kept in sync with adding of disks in # LUSetInstanceParams @@ -4951,12 +5473,12 @@ def _CreateDisks(lu, instance, to_skip=None): logging.info("Creating volume %s for instance %s", device.iv_name, instance.name) #HARDCODE - for node in instance.all_nodes: + for node in all_nodes: f_create = node == pnode _CreateBlockDev(lu, node, instance, device, f_create, info, f_create) -def _RemoveDisks(lu, instance): +def _RemoveDisks(lu, instance, target_node=None): """Remove all disks for an instance. This abstracts away some work from `AddInstance()` and @@ -4968,6 +5490,8 @@ def _RemoveDisks(lu, instance): @param lu: the logical unit on whose behalf we execute @type instance: L{objects.Instance} @param instance: the instance whose disks we should remove + @type target_node: string + @param target_node: used to override the node on which to remove the disks @rtype: boolean @return: the success of the removal @@ -4976,7 +5500,11 @@ def _RemoveDisks(lu, instance): all_result = True for device in instance.disks: - for node, disk in device.ComputeNodeTree(instance.primary_node): + if target_node: + edata = [(target_node, device)] + else: + edata = device.ComputeNodeTree(instance.primary_node) + for node, disk in edata: lu.cfg.SetDiskID(disk, node) msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg if msg: @@ -4986,12 +5514,14 @@ def _RemoveDisks(lu, instance): if instance.disk_template == constants.DT_FILE: file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1]) - result = lu.rpc.call_file_storage_dir_remove(instance.primary_node, - file_storage_dir) - msg = result.fail_msg - if msg: + if target_node: + tgt = target_node + else: + tgt = instance.primary_node + result = lu.rpc.call_file_storage_dir_remove(tgt, file_storage_dir) + if result.fail_msg: lu.LogWarning("Could not remove directory '%s' on node %s: %s", - file_storage_dir, instance.primary_node, msg) + file_storage_dir, instance.primary_node, result.fail_msg) all_result = False return all_result @@ -5062,7 +5592,7 @@ class LUCreateInstance(LogicalUnit): """ node_full = self.cfg.ExpandNodeName(node) if node_full is None: - raise errors.OpPrereqError("Unknown node %s" % node) + raise errors.OpPrereqError("Unknown node %s" % node, errors.ECODE_NOENT) return node_full def ExpandNames(self): @@ -5084,11 +5614,12 @@ class LUCreateInstance(LogicalUnit): if self.op.mode not in (constants.INSTANCE_CREATE, constants.INSTANCE_IMPORT): raise errors.OpPrereqError("Invalid instance creation mode '%s'" % - self.op.mode) + self.op.mode, errors.ECODE_INVAL) # disk template and mirror node verification if self.op.disk_template not in constants.DISK_TEMPLATES: - raise errors.OpPrereqError("Invalid disk template name") + raise errors.OpPrereqError("Invalid disk template name", + errors.ECODE_INVAL) if self.op.hypervisor is None: self.op.hypervisor = self.cfg.GetHypervisorType() @@ -5098,7 +5629,8 @@ class LUCreateInstance(LogicalUnit): if self.op.hypervisor not in enabled_hvs: raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the" " cluster (%s)" % (self.op.hypervisor, - ",".join(enabled_hvs))) + ",".join(enabled_hvs)), + errors.ECODE_STATE) # check hypervisor parameter syntax (locally) utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES) @@ -5107,6 +5639,8 @@ class LUCreateInstance(LogicalUnit): hv_type = hypervisor.GetHypervisor(self.op.hypervisor) hv_type.CheckParameterSyntax(filled_hvp) self.hv_full = filled_hvp + # check that we don't specify global parameters on an instance + _CheckGlobalHvParams(self.op.hvparams) # fill and remember the beparams dict utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES) @@ -5116,14 +5650,14 @@ class LUCreateInstance(LogicalUnit): #### instance parameters check # instance name verification - hostname1 = utils.HostInfo(self.op.instance_name) + hostname1 = utils.GetHostInfo(self.op.instance_name) self.op.instance_name = instance_name = hostname1.name # this is just a preventive check, but someone might still add this # instance in the meantime, and creation will fail at lock-add time if instance_name in self.cfg.GetInstanceList(): raise errors.OpPrereqError("Instance '%s' is already in the cluster" % - instance_name) + instance_name, errors.ECODE_EXISTS) self.add_locks[locking.LEVEL_INSTANCE] = instance_name @@ -5150,27 +5684,38 @@ class LUCreateInstance(LogicalUnit): else: if not utils.IsValidIP(ip): raise errors.OpPrereqError("Given IP address '%s' doesn't look" - " like a valid IP" % ip) + " like a valid IP" % ip, + errors.ECODE_INVAL) nic_ip = ip - # TODO: check the ip for uniqueness !! + # TODO: check the ip address for uniqueness if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip: - raise errors.OpPrereqError("Routed nic mode requires an ip address") + raise errors.OpPrereqError("Routed nic mode requires an ip address", + errors.ECODE_INVAL) # MAC address verification mac = nic.get("mac", constants.VALUE_AUTO) if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE): if not utils.IsValidMac(mac.lower()): raise errors.OpPrereqError("Invalid MAC address specified: %s" % - mac) + mac, errors.ECODE_INVAL) + else: + try: + self.cfg.ReserveMAC(mac, self.proc.GetECId()) + except errors.ReservationError: + raise errors.OpPrereqError("MAC address %s already in use" + " in cluster" % mac, + errors.ECODE_NOTUNIQUE) + # bridge verification bridge = nic.get("bridge", None) link = nic.get("link", None) if bridge and link: raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'" - " at the same time") + " at the same time", errors.ECODE_INVAL) elif bridge and nic_mode == constants.NIC_MODE_ROUTED: - raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic") + raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic", + errors.ECODE_INVAL) elif bridge: link = bridge @@ -5191,14 +5736,15 @@ class LUCreateInstance(LogicalUnit): mode = disk.get("mode", constants.DISK_RDWR) if mode not in constants.DISK_ACCESS_SET: raise errors.OpPrereqError("Invalid disk access mode '%s'" % - mode) + mode, errors.ECODE_INVAL) size = disk.get("size", None) if size is None: - raise errors.OpPrereqError("Missing disk size") + raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL) try: size = int(size) except ValueError: - raise errors.OpPrereqError("Invalid disk size '%s'" % size) + raise errors.OpPrereqError("Invalid disk size '%s'" % size, + errors.ECODE_INVAL) self.disks.append({"size": size, "mode": mode}) # used in CheckPrereq for ip ping check @@ -5208,15 +5754,17 @@ class LUCreateInstance(LogicalUnit): if (self.op.file_driver and not self.op.file_driver in constants.FILE_DRIVER): raise errors.OpPrereqError("Invalid file driver name '%s'" % - self.op.file_driver) + self.op.file_driver, errors.ECODE_INVAL) if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir): - raise errors.OpPrereqError("File storage directory path not absolute") + raise errors.OpPrereqError("File storage directory path not absolute", + errors.ECODE_INVAL) ### Node/iallocator related checks if [self.op.iallocator, self.op.pnode].count(None) != 1: raise errors.OpPrereqError("One and only one of iallocator and primary" - " node must be given") + " node must be given", + errors.ECODE_INVAL) if self.op.iallocator: self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET @@ -5241,7 +5789,8 @@ class LUCreateInstance(LogicalUnit): self.op.src_node = None if os.path.isabs(src_path): raise errors.OpPrereqError("Importing an instance from an absolute" - " path requires a source node option.") + " path requires a source node option.", + errors.ECODE_INVAL) else: self.op.src_node = src_node = self._ExpandNode(src_node) if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET: @@ -5250,9 +5799,16 @@ class LUCreateInstance(LogicalUnit): self.op.src_path = src_path = \ os.path.join(constants.EXPORT_DIR, src_path) + # On import force_variant must be True, because if we forced it at + # initial install, our only chance when importing it back is that it + # works again! + self.op.force_variant = True + else: # INSTANCE_CREATE if getattr(self.op, "os_type", None) is None: - raise errors.OpPrereqError("No guest OS specified") + raise errors.OpPrereqError("No guest OS specified", + errors.ECODE_INVAL) + self.op.force_variant = getattr(self.op, "force_variant", False) def _RunAllocator(self): """Run the allocator based on input opcode. @@ -5276,13 +5832,14 @@ class LUCreateInstance(LogicalUnit): if not ial.success: raise errors.OpPrereqError("Can't compute nodes using" - " iallocator '%s': %s" % (self.op.iallocator, - ial.info)) + " iallocator '%s': %s" % + (self.op.iallocator, ial.info), + errors.ECODE_NORES) if len(ial.nodes) != ial.required_nodes: raise errors.OpPrereqError("iallocator '%s' returned invalid number" " of nodes (%s), required %s" % (self.op.iallocator, len(ial.nodes), - ial.required_nodes)) + ial.required_nodes), errors.ECODE_FAULT) self.op.pnode = ial.nodes[0] self.LogInfo("Selected nodes for instance %s via iallocator %s: %s", self.op.instance_name, self.op.iallocator, @@ -5332,7 +5889,7 @@ class LUCreateInstance(LogicalUnit): if (not self.cfg.GetVGName() and self.op.disk_template not in constants.DTS_NOT_LVM): raise errors.OpPrereqError("Cluster does not support lvm-based" - " instances") + " instances", errors.ECODE_STATE) if self.op.mode == constants.INSTANCE_IMPORT: src_node = self.op.src_node @@ -5353,7 +5910,7 @@ class LUCreateInstance(LogicalUnit): break if not found: raise errors.OpPrereqError("No export found for relative path %s" % - src_path) + src_path, errors.ECODE_INVAL) _CheckNodeOnline(self, src_node) result = self.rpc.call_export_info(src_node, src_path) @@ -5361,12 +5918,14 @@ class LUCreateInstance(LogicalUnit): export_info = objects.SerializableConfigParser.Loads(str(result.payload)) if not export_info.has_section(constants.INISECT_EXP): - raise errors.ProgrammerError("Corrupted export config") + raise errors.ProgrammerError("Corrupted export config", + errors.ECODE_ENVIRON) ei_version = export_info.get(constants.INISECT_EXP, 'version') if (int(ei_version) != constants.EXPORT_VERSION): raise errors.OpPrereqError("Wrong export version %s (wanted %d)" % - (ei_version, constants.EXPORT_VERSION)) + (ei_version, constants.EXPORT_VERSION), + errors.ECODE_ENVIRON) # Check that the new instance doesn't have less disks than the export instance_disks = len(self.disks) @@ -5374,7 +5933,8 @@ class LUCreateInstance(LogicalUnit): if instance_disks < export_disks: raise errors.OpPrereqError("Not enough disks to import." " (instance: %d, export: %d)" % - (instance_disks, export_disks)) + (instance_disks, export_disks), + errors.ECODE_INVAL) self.op.os_type = export_info.get(constants.INISECT_EXP, 'os') disk_images = [] @@ -5403,12 +5963,14 @@ class LUCreateInstance(LogicalUnit): # ip ping checks (we use the same ip that was resolved in ExpandNames) if self.op.start and not self.op.ip_check: raise errors.OpPrereqError("Cannot ignore IP address conflicts when" - " adding an instance in start mode") + " adding an instance in start mode", + errors.ECODE_INVAL) if self.op.ip_check: if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT): raise errors.OpPrereqError("IP %s of instance %s already in use" % - (self.check_ip, self.op.instance_name)) + (self.check_ip, self.op.instance_name), + errors.ECODE_NOTUNIQUE) #### mac address generation # By generating here the mac address both the allocator and the hooks get @@ -5420,7 +5982,7 @@ class LUCreateInstance(LogicalUnit): # creation job will fail. for nic in self.nics: if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE): - nic.mac = self.cfg.GenerateMAC() + nic.mac = self.cfg.GenerateMAC(self.proc.GetECId()) #### allocator run @@ -5435,10 +5997,10 @@ class LUCreateInstance(LogicalUnit): "Cannot retrieve locked node %s" % self.op.pnode if pnode.offline: raise errors.OpPrereqError("Cannot use offline primary node '%s'" % - pnode.name) + pnode.name, errors.ECODE_STATE) if pnode.drained: raise errors.OpPrereqError("Cannot use drained primary node '%s'" % - pnode.name) + pnode.name, errors.ECODE_STATE) self.secondaries = [] @@ -5446,10 +6008,10 @@ class LUCreateInstance(LogicalUnit): if self.op.disk_template in constants.DTS_NET_MIRROR: if self.op.snode is None: raise errors.OpPrereqError("The networked disk templates need" - " a mirror node") + " a mirror node", errors.ECODE_INVAL) if self.op.snode == pnode.name: - raise errors.OpPrereqError("The secondary node cannot be" - " the primary node.") + raise errors.OpPrereqError("The secondary node cannot be the" + " primary node.", errors.ECODE_INVAL) _CheckNodeOnline(self, self.op.snode) _CheckNodeNotDrained(self, self.op.snode) self.secondaries.append(self.op.snode) @@ -5470,18 +6032,22 @@ class LUCreateInstance(LogicalUnit): vg_free = info.get('vg_free', None) if not isinstance(vg_free, int): raise errors.OpPrereqError("Can't compute free disk space on" - " node %s" % node) + " node %s" % node, errors.ECODE_ENVIRON) if req_size > vg_free: raise errors.OpPrereqError("Not enough disk space on target node %s." " %d MB available, %d MB required" % - (node, vg_free, req_size)) + (node, vg_free, req_size), + errors.ECODE_NORES) _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams) # os verification result = self.rpc.call_os_get(pnode.name, self.op.os_type) result.Raise("OS '%s' not in supported os list for primary node %s" % - (self.op.os_type, pnode.name), prereq=True) + (self.op.os_type, pnode.name), + prereq=True, ecode=errors.ECODE_INVAL) + if not self.op.force_variant: + _CheckOSVariant(result.payload, self.op.os_type) _CheckNicsBridgesExist(self, self.nics, self.pnode.name) @@ -5555,7 +6121,8 @@ class LUCreateInstance(LogicalUnit): feedback_fn("adding instance %s to cluster config" % instance) - self.cfg.AddInstance(iobj) + self.cfg.AddInstance(iobj, self.proc.GetECId()) + # Declare that we don't want to remove the instance lock anymore, as we've # added the instance to the config del self.remove_locks[locking.LEVEL_INSTANCE] @@ -5617,7 +6184,7 @@ class LUCreateInstance(LogicalUnit): if self.op.start: iobj.admin_up = True - self.cfg.Update(iobj) + self.cfg.Update(iobj, feedback_fn) logging.info("Starting instance %s on node %s", instance, pnode_name) feedback_fn("* starting instance...") result = self.rpc.call_instance_start(pnode_name, iobj, None, None) @@ -5707,7 +6274,7 @@ class LUReplaceDisks(LogicalUnit): remote_node = self.cfg.ExpandNodeName(self.op.remote_node) if remote_node is None: raise errors.OpPrereqError("Node '%s' not known" % - self.op.remote_node) + self.op.remote_node, errors.ECODE_NOENT) self.op.remote_node = remote_node @@ -5779,7 +6346,8 @@ class LUEvacuateNode(LogicalUnit): def ExpandNames(self): self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name) if self.op.node_name is None: - raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name) + raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name, + errors.ECODE_NOENT) self.needed_locks = {} @@ -5791,7 +6359,7 @@ class LUEvacuateNode(LogicalUnit): remote_node = self.cfg.ExpandNodeName(self.op.remote_node) if remote_node is None: raise errors.OpPrereqError("Node '%s' not known" % - self.op.remote_node) + self.op.remote_node, errors.ECODE_NOENT) self.op.remote_node = remote_node @@ -5803,7 +6371,7 @@ class LUEvacuateNode(LogicalUnit): self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND else: - raise errors.OpPrereqError("Invalid parameters") + raise errors.OpPrereqError("Invalid parameters", errors.ECODE_INVAL) # Create tasklets for replacing disks for all secondary instances on this # node @@ -5888,17 +6456,17 @@ class TLReplaceDisks(Tasklet): if remote_node is None and iallocator is None: raise errors.OpPrereqError("When changing the secondary either an" " iallocator script must be used or the" - " new node given") + " new node given", errors.ECODE_INVAL) if remote_node is not None and iallocator is not None: raise errors.OpPrereqError("Give either the iallocator or the new" - " secondary, not both") + " secondary, not both", errors.ECODE_INVAL) elif remote_node is not None or iallocator is not None: # Not replacing the secondary raise errors.OpPrereqError("The iallocator and new node options can" " only be used when changing the" - " secondary node") + " secondary node", errors.ECODE_INVAL) @staticmethod def _RunAllocator(lu, iallocator_name, instance_name, relocate_from): @@ -5914,12 +6482,14 @@ class TLReplaceDisks(Tasklet): if not ial.success: raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':" - " %s" % (iallocator_name, ial.info)) + " %s" % (iallocator_name, ial.info), + errors.ECODE_NORES) if len(ial.nodes) != ial.required_nodes: raise errors.OpPrereqError("iallocator '%s' returned invalid number" " of nodes (%s), required %s" % - (len(ial.nodes), ial.required_nodes)) + (len(ial.nodes), ial.required_nodes), + errors.ECODE_FAULT) remote_node_name = ial.nodes[0] @@ -5938,26 +6508,27 @@ class TLReplaceDisks(Tasklet): This checks that the instance is in the cluster. """ - self.instance = self.cfg.GetInstanceInfo(self.instance_name) - assert self.instance is not None, \ + self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name) + assert instance is not None, \ "Cannot retrieve locked instance %s" % self.instance_name - if self.instance.disk_template != constants.DT_DRBD8: + if instance.disk_template != constants.DT_DRBD8: raise errors.OpPrereqError("Can only run replace disks for DRBD8-based" - " instances") + " instances", errors.ECODE_INVAL) - if len(self.instance.secondary_nodes) != 1: + if len(instance.secondary_nodes) != 1: raise errors.OpPrereqError("The instance has a strange layout," " expected one secondary but found %d" % - len(self.instance.secondary_nodes)) + len(instance.secondary_nodes), + errors.ECODE_FAULT) - secondary_node = self.instance.secondary_nodes[0] + secondary_node = instance.secondary_nodes[0] if self.iallocator_name is None: remote_node = self.remote_node else: remote_node = self._RunAllocator(self.lu, self.iallocator_name, - self.instance.name, secondary_node) + instance.name, instance.secondary_nodes) if remote_node is not None: self.remote_node_info = self.cfg.GetNodeInfo(remote_node) @@ -5968,34 +6539,37 @@ class TLReplaceDisks(Tasklet): if remote_node == self.instance.primary_node: raise errors.OpPrereqError("The specified node is the primary node of" - " the instance.") + " the instance.", errors.ECODE_INVAL) if remote_node == secondary_node: raise errors.OpPrereqError("The specified node is already the" - " secondary node of the instance.") + " secondary node of the instance.", + errors.ECODE_INVAL) if self.disks and self.mode in (constants.REPLACE_DISK_AUTO, constants.REPLACE_DISK_CHG): - raise errors.OpPrereqError("Cannot specify disks to be replaced") + raise errors.OpPrereqError("Cannot specify disks to be replaced", + errors.ECODE_INVAL) if self.mode == constants.REPLACE_DISK_AUTO: - faulty_primary = self._FindFaultyDisks(self.instance.primary_node) + faulty_primary = self._FindFaultyDisks(instance.primary_node) faulty_secondary = self._FindFaultyDisks(secondary_node) if faulty_primary and faulty_secondary: raise errors.OpPrereqError("Instance %s has faulty disks on more than" " one node and can not be repaired" - " automatically" % self.instance_name) + " automatically" % self.instance_name, + errors.ECODE_STATE) if faulty_primary: self.disks = faulty_primary - self.target_node = self.instance.primary_node + self.target_node = instance.primary_node self.other_node = secondary_node check_nodes = [self.target_node, self.other_node] elif faulty_secondary: self.disks = faulty_secondary self.target_node = secondary_node - self.other_node = self.instance.primary_node + self.other_node = instance.primary_node check_nodes = [self.target_node, self.other_node] else: self.disks = [] @@ -6004,18 +6578,18 @@ class TLReplaceDisks(Tasklet): else: # Non-automatic modes if self.mode == constants.REPLACE_DISK_PRI: - self.target_node = self.instance.primary_node + self.target_node = instance.primary_node self.other_node = secondary_node check_nodes = [self.target_node, self.other_node] elif self.mode == constants.REPLACE_DISK_SEC: self.target_node = secondary_node - self.other_node = self.instance.primary_node + self.other_node = instance.primary_node check_nodes = [self.target_node, self.other_node] elif self.mode == constants.REPLACE_DISK_CHG: self.new_node = remote_node - self.other_node = self.instance.primary_node + self.other_node = instance.primary_node self.target_node = secondary_node check_nodes = [self.new_node, self.other_node] @@ -6034,7 +6608,7 @@ class TLReplaceDisks(Tasklet): # Check whether disks are valid for disk_idx in self.disks: - self.instance.FindDisk(disk_idx) + instance.FindDisk(disk_idx) # Get secondary node IP addresses node_2nd_ip = {} @@ -6067,12 +6641,15 @@ class TLReplaceDisks(Tasklet): try: # Should we replace the secondary node? if self.new_node is not None: - return self._ExecDrbd8Secondary() + fn = self._ExecDrbd8Secondary else: - return self._ExecDrbd8DiskOnly() + fn = self._ExecDrbd8DiskOnly + + return fn(feedback_fn) finally: - # Deactivate the instance disks if we're replacing them on a down instance + # Deactivate the instance disks if we're replacing them on a + # down instance if activate_disks: _SafeShutdownInstanceDisks(self.lu, self.instance) @@ -6185,7 +6762,7 @@ class TLReplaceDisks(Tasklet): self.lu.LogWarning("Can't remove old LV: %s" % msg, hint="remove unused LVs manually") - def _ExecDrbd8DiskOnly(self): + def _ExecDrbd8DiskOnly(self, feedback_fn): """Replace a disk on the primary or secondary for DRBD 8. The algorithm for replace is quite complicated: @@ -6229,7 +6806,8 @@ class TLReplaceDisks(Tasklet): for dev, old_lvs, new_lvs in iv_names.itervalues(): self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name) - result = self.rpc.call_blockdev_removechildren(self.target_node, dev, old_lvs) + result = self.rpc.call_blockdev_removechildren(self.target_node, dev, + old_lvs) result.Raise("Can't detach drbd from local storage on node" " %s for device %s" % (self.target_node, dev.iv_name)) #dev.children = [] @@ -6255,14 +6833,16 @@ class TLReplaceDisks(Tasklet): rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix))) self.lu.LogInfo("Renaming the old LVs on the target node") - result = self.rpc.call_blockdev_rename(self.target_node, rename_old_to_new) + result = self.rpc.call_blockdev_rename(self.target_node, + rename_old_to_new) result.Raise("Can't rename old LVs on node %s" % self.target_node) # Now we rename the new LVs to the old LVs self.lu.LogInfo("Renaming the new LVs on the target node") rename_new_to_old = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)] - result = self.rpc.call_blockdev_rename(self.target_node, rename_new_to_old) + result = self.rpc.call_blockdev_rename(self.target_node, + rename_new_to_old) result.Raise("Can't rename new LVs on node %s" % self.target_node) for old, new in zip(old_lvs, new_lvs): @@ -6275,11 +6855,13 @@ class TLReplaceDisks(Tasklet): # Now that the new lvs have the old name, we can add them to the device self.lu.LogInfo("Adding new mirror component on %s" % self.target_node) - result = self.rpc.call_blockdev_addchildren(self.target_node, dev, new_lvs) + result = self.rpc.call_blockdev_addchildren(self.target_node, dev, + new_lvs) msg = result.fail_msg if msg: for new_lv in new_lvs: - msg2 = self.rpc.call_blockdev_remove(self.target_node, new_lv).fail_msg + msg2 = self.rpc.call_blockdev_remove(self.target_node, + new_lv).fail_msg if msg2: self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2, hint=("cleanup manually the unused logical" @@ -6288,13 +6870,13 @@ class TLReplaceDisks(Tasklet): dev.children = new_lvs - self.cfg.Update(self.instance) + self.cfg.Update(self.instance, feedback_fn) # Wait for sync # This can fail as the old devices are degraded and _WaitForSync # does a combined result over all disks, so we don't check its return value self.lu.LogStep(5, steps_total, "Sync devices") - _WaitForSync(self.lu, self.instance, unlock=True) + _WaitForSync(self.lu, self.instance) # Check all devices manually self._CheckDevices(self.instance.primary_node, iv_names) @@ -6303,7 +6885,7 @@ class TLReplaceDisks(Tasklet): self.lu.LogStep(6, steps_total, "Removing old storage") self._RemoveOldStorage(self.target_node, iv_names) - def _ExecDrbd8Secondary(self): + def _ExecDrbd8Secondary(self, feedback_fn): """Replace the secondary node for DRBD 8. The algorithm for replace is quite complicated: @@ -6347,13 +6929,15 @@ class TLReplaceDisks(Tasklet): # after this, we must manually remove the drbd minors on both the # error and the success paths self.lu.LogStep(4, steps_total, "Changing drbd configuration") - minors = self.cfg.AllocateDRBDMinor([self.new_node for dev in self.instance.disks], + minors = self.cfg.AllocateDRBDMinor([self.new_node + for dev in self.instance.disks], self.instance.name) - logging.debug("Allocated minors %r" % (minors,)) + logging.debug("Allocated minors %r", minors) iv_names = {} for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)): - self.lu.LogInfo("activating a new drbd on %s for disk/%d" % (self.new_node, idx)) + self.lu.LogInfo("activating a new drbd on %s for disk/%d" % + (self.new_node, idx)) # create new devices on new_node; note that we create two IDs: # one without port, so the drbd will be activated without # networking information on the new node at this stage, and one @@ -6364,8 +6948,10 @@ class TLReplaceDisks(Tasklet): else: p_minor = o_minor2 - new_alone_id = (self.instance.primary_node, self.new_node, None, p_minor, new_minor, o_secret) - new_net_id = (self.instance.primary_node, self.new_node, o_port, p_minor, new_minor, o_secret) + new_alone_id = (self.instance.primary_node, self.new_node, None, + p_minor, new_minor, o_secret) + new_net_id = (self.instance.primary_node, self.new_node, o_port, + p_minor, new_minor, o_secret) iv_names[idx] = (dev, dev.children, new_net_id) logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor, @@ -6393,8 +6979,10 @@ class TLReplaceDisks(Tasklet): " soon as possible")) self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)") - result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node], self.node_secondary_ip, - self.instance.disks)[self.instance.primary_node] + result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node], + self.node_secondary_ip, + self.instance.disks)\ + [self.instance.primary_node] msg = result.fail_msg if msg: @@ -6410,18 +6998,22 @@ class TLReplaceDisks(Tasklet): dev.logical_id = new_logical_id self.cfg.SetDiskID(dev, self.instance.primary_node) - self.cfg.Update(self.instance) + self.cfg.Update(self.instance, feedback_fn) # and now perform the drbd attach self.lu.LogInfo("Attaching primary drbds to new secondary" " (standalone => connected)") - result = self.rpc.call_drbd_attach_net([self.instance.primary_node, self.new_node], self.node_secondary_ip, - self.instance.disks, self.instance.name, + result = self.rpc.call_drbd_attach_net([self.instance.primary_node, + self.new_node], + self.node_secondary_ip, + self.instance.disks, + self.instance.name, False) for to_node, to_result in result.items(): msg = to_result.fail_msg if msg: - self.lu.LogWarning("Can't attach drbd disks on node %s: %s", to_node, msg, + self.lu.LogWarning("Can't attach drbd disks on node %s: %s", + to_node, msg, hint=("please do a gnt-instance info to see the" " status of disks")) @@ -6429,7 +7021,7 @@ class TLReplaceDisks(Tasklet): # This can fail as the old devices are degraded and _WaitForSync # does a combined result over all disks, so we don't check its return value self.lu.LogStep(5, steps_total, "Sync devices") - _WaitForSync(self.lu, self.instance, unlock=True) + _WaitForSync(self.lu, self.instance) # Check all devices manually self._CheckDevices(self.instance.primary_node, iv_names) @@ -6439,6 +7031,74 @@ class TLReplaceDisks(Tasklet): self._RemoveOldStorage(self.target_node, iv_names) +class LURepairNodeStorage(NoHooksLU): + """Repairs the volume group on a node. + + """ + _OP_REQP = ["node_name"] + REQ_BGL = False + + def CheckArguments(self): + node_name = self.cfg.ExpandNodeName(self.op.node_name) + if node_name is None: + raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name, + errors.ECODE_NOENT) + + self.op.node_name = node_name + + def ExpandNames(self): + self.needed_locks = { + locking.LEVEL_NODE: [self.op.node_name], + } + + def _CheckFaultyDisks(self, instance, node_name): + """Ensure faulty disks abort the opcode or at least warn.""" + try: + if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance, + node_name, True): + raise errors.OpPrereqError("Instance '%s' has faulty disks on" + " node '%s'" % (instance.name, node_name), + errors.ECODE_STATE) + except errors.OpPrereqError, err: + if self.op.ignore_consistency: + self.proc.LogWarning(str(err.args[0])) + else: + raise + + def CheckPrereq(self): + """Check prerequisites. + + """ + storage_type = self.op.storage_type + + if (constants.SO_FIX_CONSISTENCY not in + constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])): + raise errors.OpPrereqError("Storage units of type '%s' can not be" + " repaired" % storage_type, + errors.ECODE_INVAL) + + # Check whether any instance on this node has faulty disks + for inst in _GetNodeInstances(self.cfg, self.op.node_name): + if not inst.admin_up: + continue + check_nodes = set(inst.all_nodes) + check_nodes.discard(self.op.node_name) + for inst_node_name in check_nodes: + self._CheckFaultyDisks(inst, inst_node_name) + + def Exec(self, feedback_fn): + feedback_fn("Repairing storage unit '%s' on %s ..." % + (self.op.name, self.op.node_name)) + + st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) + result = self.rpc.call_storage_execute(self.op.node_name, + self.op.storage_type, st_args, + self.op.name, + constants.SO_FIX_CONSISTENCY) + result.Raise("Failed to repair storage unit '%s' on %s" % + (self.op.name, self.op.node_name)) + + class LUGrowDisk(LogicalUnit): """Grow a disk of an instance. @@ -6492,7 +7152,7 @@ class LUGrowDisk(LogicalUnit): if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8): raise errors.OpPrereqError("Instance's disk layout does not support" - " growing.") + " growing.", errors.ECODE_INVAL) self.disk = instance.FindDisk(self.op.disk) @@ -6504,11 +7164,12 @@ class LUGrowDisk(LogicalUnit): vg_free = info.payload.get('vg_free', None) if not isinstance(vg_free, int): raise errors.OpPrereqError("Can't compute free disk space on" - " node %s" % node) + " node %s" % node, errors.ECODE_ENVIRON) if self.op.amount > vg_free: raise errors.OpPrereqError("Not enough disk space on target node %s:" " %d MiB available, %d MiB required" % - (node, vg_free, self.op.amount)) + (node, vg_free, self.op.amount), + errors.ECODE_NORES) def Exec(self, feedback_fn): """Execute disk grow. @@ -6521,7 +7182,7 @@ class LUGrowDisk(LogicalUnit): result = self.rpc.call_blockdev_grow(node, disk, self.op.amount) result.Raise("Grow request failed to node %s" % node) disk.RecordGrow(self.op.amount) - self.cfg.Update(instance) + self.cfg.Update(instance, feedback_fn) if self.op.wait_for_sync: disk_abort = not _WaitForSync(self, instance) if disk_abort: @@ -6541,14 +7202,16 @@ class LUQueryInstanceData(NoHooksLU): self.share_locks = dict.fromkeys(locking.LEVELS, 1) if not isinstance(self.op.instances, list): - raise errors.OpPrereqError("Invalid argument type 'instances'") + raise errors.OpPrereqError("Invalid argument type 'instances'", + errors.ECODE_INVAL) if self.op.instances: self.wanted_names = [] for name in self.op.instances: full_name = self.cfg.ExpandInstanceName(name) if full_name is None: - raise errors.OpPrereqError("Instance '%s' not known" % name) + raise errors.OpPrereqError("Instance '%s' not known" % name, + errors.ECODE_NOENT) self.wanted_names.append(full_name) self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names else: @@ -6579,7 +7242,7 @@ class LUQueryInstanceData(NoHooksLU): """Returns the status of a block device """ - if self.op.static: + if self.op.static or not node: return None self.cfg.SetDiskID(dev, node) @@ -6673,9 +7336,13 @@ class LUQueryInstanceData(NoHooksLU): "hypervisor": instance.hypervisor, "network_port": instance.network_port, "hv_instance": instance.hvparams, - "hv_actual": cluster.FillHV(instance), + "hv_actual": cluster.FillHV(instance, skip_globals=True), "be_instance": instance.beparams, "be_actual": cluster.FillBE(instance), + "serial_no": instance.serial_no, + "mtime": instance.mtime, + "ctime": instance.ctime, + "uuid": instance.uuid, } result[instance.name] = idict @@ -6704,7 +7371,10 @@ class LUSetInstanceParams(LogicalUnit): self.op.force = getattr(self.op, "force", False) if not (self.op.nics or self.op.disks or self.op.hvparams or self.op.beparams): - raise errors.OpPrereqError("No changes submitted") + raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL) + + if self.op.hvparams: + _CheckGlobalHvParams(self.op.hvparams) # Disk validation disk_addremove = 0 @@ -6716,33 +7386,35 @@ class LUSetInstanceParams(LogicalUnit): disk_addremove += 1 else: if not isinstance(disk_op, int): - raise errors.OpPrereqError("Invalid disk index") + raise errors.OpPrereqError("Invalid disk index", errors.ECODE_INVAL) if not isinstance(disk_dict, dict): msg = "Invalid disk value: expected dict, got '%s'" % disk_dict - raise errors.OpPrereqError(msg) + raise errors.OpPrereqError(msg, errors.ECODE_INVAL) if disk_op == constants.DDM_ADD: mode = disk_dict.setdefault('mode', constants.DISK_RDWR) if mode not in constants.DISK_ACCESS_SET: - raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode) + raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode, + errors.ECODE_INVAL) size = disk_dict.get('size', None) if size is None: - raise errors.OpPrereqError("Required disk parameter size missing") + raise errors.OpPrereqError("Required disk parameter size missing", + errors.ECODE_INVAL) try: size = int(size) except ValueError, err: raise errors.OpPrereqError("Invalid disk size parameter: %s" % - str(err)) + str(err), errors.ECODE_INVAL) disk_dict['size'] = size else: # modification of disk if 'size' in disk_dict: raise errors.OpPrereqError("Disk size change not possible, use" - " grow-disk") + " grow-disk", errors.ECODE_INVAL) if disk_addremove > 1: raise errors.OpPrereqError("Only one disk add or remove operation" - " supported at a time") + " supported at a time", errors.ECODE_INVAL) # NIC validation nic_addremove = 0 @@ -6754,10 +7426,10 @@ class LUSetInstanceParams(LogicalUnit): nic_addremove += 1 else: if not isinstance(nic_op, int): - raise errors.OpPrereqError("Invalid nic index") + raise errors.OpPrereqError("Invalid nic index", errors.ECODE_INVAL) if not isinstance(nic_dict, dict): msg = "Invalid nic value: expected dict, got '%s'" % nic_dict - raise errors.OpPrereqError(msg) + raise errors.OpPrereqError(msg, errors.ECODE_INVAL) # nic_dict should be a dict nic_ip = nic_dict.get('ip', None) @@ -6766,13 +7438,14 @@ class LUSetInstanceParams(LogicalUnit): nic_dict['ip'] = None else: if not utils.IsValidIP(nic_ip): - raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip) + raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip, + errors.ECODE_INVAL) nic_bridge = nic_dict.get('bridge', None) nic_link = nic_dict.get('link', None) if nic_bridge and nic_link: raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'" - " at the same time") + " at the same time", errors.ECODE_INVAL) elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE: nic_dict['bridge'] = None elif nic_link and nic_link.lower() == constants.VALUE_NONE: @@ -6787,14 +7460,16 @@ class LUSetInstanceParams(LogicalUnit): nic_mac = nic_dict['mac'] if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE): if not utils.IsValidMac(nic_mac): - raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac) + raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac, + errors.ECODE_INVAL) if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO: raise errors.OpPrereqError("'auto' is not a valid MAC address when" - " modifying an existing nic") + " modifying an existing nic", + errors.ECODE_INVAL) if nic_addremove > 1: raise errors.OpPrereqError("Only one NIC add or remove operation" - " supported at a time") + " supported at a time", errors.ECODE_INVAL) def ExpandNames(self): self._ExpandAndLockInstance() @@ -6967,7 +7642,8 @@ class LUSetInstanceParams(LogicalUnit): if miss_mem > 0: raise errors.OpPrereqError("This change will prevent the instance" " from starting, due to %d MB of memory" - " missing on its primary node" % miss_mem) + " missing on its primary node" % miss_mem, + errors.ECODE_NORES) if be_new[constants.BE_AUTO_BALANCE]: for node, nres in nodeinfo.items(): @@ -6990,14 +7666,20 @@ class LUSetInstanceParams(LogicalUnit): for nic_op, nic_dict in self.op.nics: if nic_op == constants.DDM_REMOVE: if not instance.nics: - raise errors.OpPrereqError("Instance has no NICs, cannot remove") + raise errors.OpPrereqError("Instance has no NICs, cannot remove", + errors.ECODE_INVAL) continue if nic_op != constants.DDM_ADD: # an existing nic + if not instance.nics: + raise errors.OpPrereqError("Invalid NIC index %s, instance has" + " no NICs" % nic_op, + errors.ECODE_INVAL) if nic_op < 0 or nic_op >= len(instance.nics): raise errors.OpPrereqError("Invalid NIC index %s, valid values" " are 0 to %d" % - (nic_op, len(instance.nics))) + (nic_op, len(instance.nics) - 1), + errors.ECODE_INVAL) old_nic_params = instance.nics[nic_op].nicparams old_nic_ip = instance.nics[nic_op].ip else: @@ -7028,7 +7710,7 @@ class LUSetInstanceParams(LogicalUnit): if self.force: self.warn.append(msg) else: - raise errors.OpPrereqError(msg) + raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON) if new_nic_mode == constants.NIC_MODE_ROUTED: if 'ip' in nic_dict: nic_ip = nic_dict['ip'] @@ -7036,49 +7718,57 @@ class LUSetInstanceParams(LogicalUnit): nic_ip = old_nic_ip if nic_ip is None: raise errors.OpPrereqError('Cannot set the nic ip to None' - ' on a routed nic') + ' on a routed nic', errors.ECODE_INVAL) if 'mac' in nic_dict: nic_mac = nic_dict['mac'] if nic_mac is None: - raise errors.OpPrereqError('Cannot set the nic mac to None') + raise errors.OpPrereqError('Cannot set the nic mac to None', + errors.ECODE_INVAL) elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE): # otherwise generate the mac - nic_dict['mac'] = self.cfg.GenerateMAC() + nic_dict['mac'] = self.cfg.GenerateMAC(self.proc.GetECId()) else: # or validate/reserve the current one - if self.cfg.IsMacInUse(nic_mac): + try: + self.cfg.ReserveMAC(nic_mac, self.proc.GetECId()) + except errors.ReservationError: raise errors.OpPrereqError("MAC address %s already in use" - " in cluster" % nic_mac) + " in cluster" % nic_mac, + errors.ECODE_NOTUNIQUE) # DISK processing if self.op.disks and instance.disk_template == constants.DT_DISKLESS: raise errors.OpPrereqError("Disk operations not supported for" - " diskless instances") + " diskless instances", + errors.ECODE_INVAL) for disk_op, disk_dict in self.op.disks: if disk_op == constants.DDM_REMOVE: if len(instance.disks) == 1: raise errors.OpPrereqError("Cannot remove the last disk of" - " an instance") + " an instance", + errors.ECODE_INVAL) ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor]) ins_l = ins_l[pnode] msg = ins_l.fail_msg if msg: raise errors.OpPrereqError("Can't contact node %s: %s" % - (pnode, msg)) + (pnode, msg), errors.ECODE_ENVIRON) if instance.name in ins_l.payload: raise errors.OpPrereqError("Instance is running, can't remove" - " disks.") + " disks.", errors.ECODE_STATE) if (disk_op == constants.DDM_ADD and len(instance.nics) >= constants.MAX_DISKS): raise errors.OpPrereqError("Instance has too many disks (%d), cannot" - " add more" % constants.MAX_DISKS) + " add more" % constants.MAX_DISKS, + errors.ECODE_STATE) if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE): # an existing disk if disk_op < 0 or disk_op >= len(instance.disks): raise errors.OpPrereqError("Invalid disk index %s, valid values" " are 0 to %d" % - (disk_op, len(instance.disks))) + (disk_op, len(instance.disks)), + errors.ECODE_INVAL) return @@ -7170,8 +7860,8 @@ class LUSetInstanceParams(LogicalUnit): for key in 'mac', 'ip': if key in nic_dict: setattr(instance.nics[nic_op], key, nic_dict[key]) - if nic_op in self.nic_pnew: - instance.nics[nic_op].nicparams = self.nic_pnew[nic_op] + if nic_op in self.nic_pinst: + instance.nics[nic_op].nicparams = self.nic_pinst[nic_op] for key, val in nic_dict.iteritems(): result.append(("nic.%s/%d" % (key, nic_op), val)) @@ -7187,7 +7877,7 @@ class LUSetInstanceParams(LogicalUnit): for key, val in self.op.beparams.iteritems(): result.append(("be/%s" % key, val)) - self.cfg.Update(instance) + self.cfg.Update(instance, feedback_fn) return result @@ -7243,6 +7933,13 @@ class LUExportInstance(LogicalUnit): _OP_REQP = ["instance_name", "target_node", "shutdown"] REQ_BGL = False + def CheckArguments(self): + """Check the arguments. + + """ + self.shutdown_timeout = getattr(self.op, "shutdown_timeout", + constants.DEFAULT_SHUTDOWN_TIMEOUT) + def ExpandNames(self): self._ExpandAndLockInstance() # FIXME: lock only instance primary and destination node @@ -7268,6 +7965,7 @@ class LUExportInstance(LogicalUnit): env = { "EXPORT_NODE": self.op.target_node, "EXPORT_DO_SHUTDOWN": self.op.shutdown, + "SHUTDOWN_TIMEOUT": self.shutdown_timeout, } env.update(_BuildInstanceHookEnvByObject(self, self.instance)) nl = [self.cfg.GetMasterNode(), self.instance.primary_node, @@ -7291,7 +7989,8 @@ class LUExportInstance(LogicalUnit): if self.dst_node is None: # This is wrong node name, not a non-locked node - raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node) + raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node, + errors.ECODE_NOENT) _CheckNodeOnline(self, self.dst_node.name) _CheckNodeNotDrained(self, self.dst_node.name) @@ -7299,7 +7998,7 @@ class LUExportInstance(LogicalUnit): for disk in self.instance.disks: if disk.dev_type == constants.LD_FILE: raise errors.OpPrereqError("Export not supported for instances with" - " file-based disks") + " file-based disks", errors.ECODE_INVAL) def Exec(self, feedback_fn): """Export an instance to an image in the cluster. @@ -7308,9 +8007,12 @@ class LUExportInstance(LogicalUnit): instance = self.instance dst_node = self.dst_node src_node = instance.primary_node + if self.op.shutdown: # shutdown the instance, but not the disks - result = self.rpc.call_instance_shutdown(src_node, instance) + feedback_fn("Shutting down instance %s" % instance.name) + result = self.rpc.call_instance_shutdown(src_node, instance, + self.shutdown_timeout) result.Raise("Could not shutdown instance %s on" " node %s" % (instance.name, src_node)) @@ -7323,60 +8025,82 @@ class LUExportInstance(LogicalUnit): for disk in instance.disks: self.cfg.SetDiskID(disk, src_node) - # per-disk results - dresults = [] - try: - for idx, disk in enumerate(instance.disks): - # result.payload will be a snapshot of an lvm leaf of the one we passed - result = self.rpc.call_blockdev_snapshot(src_node, disk) - msg = result.fail_msg - if msg: - self.LogWarning("Could not snapshot disk/%s on node %s: %s", - idx, src_node, msg) - snap_disks.append(False) - else: - disk_id = (vgname, result.payload) - new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size, - logical_id=disk_id, physical_id=disk_id, - iv_name=disk.iv_name) - snap_disks.append(new_dev) + activate_disks = (not instance.admin_up) - finally: - if self.op.shutdown and instance.admin_up: - result = self.rpc.call_instance_start(src_node, instance, None, None) - msg = result.fail_msg - if msg: - _ShutdownInstanceDisks(self, instance) - raise errors.OpExecError("Could not start instance: %s" % msg) + if activate_disks: + # Activate the instance disks if we'exporting a stopped instance + feedback_fn("Activating disks for %s" % instance.name) + _StartInstanceDisks(self, instance, None) - # TODO: check for size + try: + # per-disk results + dresults = [] + try: + for idx, disk in enumerate(instance.disks): + feedback_fn("Creating a snapshot of disk/%s on node %s" % + (idx, src_node)) + + # result.payload will be a snapshot of an lvm leaf of the one we + # passed + result = self.rpc.call_blockdev_snapshot(src_node, disk) + msg = result.fail_msg + if msg: + self.LogWarning("Could not snapshot disk/%s on node %s: %s", + idx, src_node, msg) + snap_disks.append(False) + else: + disk_id = (vgname, result.payload) + new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size, + logical_id=disk_id, physical_id=disk_id, + iv_name=disk.iv_name) + snap_disks.append(new_dev) - cluster_name = self.cfg.GetClusterName() - for idx, dev in enumerate(snap_disks): - if dev: - result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name, - instance, cluster_name, idx) - msg = result.fail_msg - if msg: - self.LogWarning("Could not export disk/%s from node %s to" - " node %s: %s", idx, src_node, dst_node.name, msg) - dresults.append(False) + finally: + if self.op.shutdown and instance.admin_up: + feedback_fn("Starting instance %s" % instance.name) + result = self.rpc.call_instance_start(src_node, instance, None, None) + msg = result.fail_msg + if msg: + _ShutdownInstanceDisks(self, instance) + raise errors.OpExecError("Could not start instance: %s" % msg) + + # TODO: check for size + + cluster_name = self.cfg.GetClusterName() + for idx, dev in enumerate(snap_disks): + feedback_fn("Exporting snapshot %s from %s to %s" % + (idx, src_node, dst_node.name)) + if dev: + result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name, + instance, cluster_name, idx) + msg = result.fail_msg + if msg: + self.LogWarning("Could not export disk/%s from node %s to" + " node %s: %s", idx, src_node, dst_node.name, msg) + dresults.append(False) + else: + dresults.append(True) + msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg + if msg: + self.LogWarning("Could not remove snapshot for disk/%d from node" + " %s: %s", idx, src_node, msg) else: - dresults.append(True) - msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg - if msg: - self.LogWarning("Could not remove snapshot for disk/%d from node" - " %s: %s", idx, src_node, msg) - else: - dresults.append(False) + dresults.append(False) - result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks) - fin_resu = True - msg = result.fail_msg - if msg: - self.LogWarning("Could not finalize export for instance %s" - " on node %s: %s", instance.name, dst_node.name, msg) - fin_resu = False + feedback_fn("Finalizing export on %s" % dst_node.name) + result = self.rpc.call_finalize_export(dst_node.name, instance, + snap_disks) + fin_resu = True + msg = result.fail_msg + if msg: + self.LogWarning("Could not finalize export for instance %s" + " on node %s: %s", instance.name, dst_node.name, msg) + fin_resu = False + + finally: + if activate_disks: + feedback_fn("Deactivating disks for %s" % instance.name) + _ShutdownInstanceDisks(self, instance) nodelist = self.cfg.GetNodeList() nodelist.remove(dst_node.name) @@ -7386,6 +8110,7 @@ class LUExportInstance(LogicalUnit): # substitutes an empty list with the full cluster node list. iname = instance.name if nodelist: + feedback_fn("Removing old exports for instance %s" % iname) exportlist = self.rpc.call_export_list(nodelist) for node in exportlist: if exportlist[node].fail_msg: @@ -7464,14 +8189,14 @@ class TagsLU(NoHooksLU): name = self.cfg.ExpandNodeName(self.op.name) if name is None: raise errors.OpPrereqError("Invalid node name (%s)" % - (self.op.name,)) + (self.op.name,), errors.ECODE_NOENT) self.op.name = name self.needed_locks[locking.LEVEL_NODE] = name elif self.op.kind == constants.TAG_INSTANCE: name = self.cfg.ExpandInstanceName(self.op.name) if name is None: raise errors.OpPrereqError("Invalid instance name (%s)" % - (self.op.name,)) + (self.op.name,), errors.ECODE_NOENT) self.op.name = name self.needed_locks[locking.LEVEL_INSTANCE] = name @@ -7487,7 +8212,7 @@ class TagsLU(NoHooksLU): self.target = self.cfg.GetInstanceInfo(self.op.name) else: raise errors.OpPrereqError("Wrong tag type requested (%s)" % - str(self.op.kind)) + str(self.op.kind), errors.ECODE_INVAL) class LUGetTags(TagsLU): @@ -7524,7 +8249,7 @@ class LUSearchTags(NoHooksLU): self.re = re.compile(self.op.pattern) except re.error, err: raise errors.OpPrereqError("Invalid search pattern '%s': %s" % - (self.op.pattern, err)) + (self.op.pattern, err), errors.ECODE_INVAL) def Exec(self, feedback_fn): """Returns the tag list. @@ -7570,12 +8295,7 @@ class LUAddTags(TagsLU): self.target.AddTag(tag) except errors.TagError, err: raise errors.OpExecError("Error while setting tag: %s" % str(err)) - try: - self.cfg.Update(self.target) - except errors.ConfigurationError: - raise errors.OpRetryError("There has been a modification to the" - " config file and the operation has been" - " aborted. Please retry.") + self.cfg.Update(self.target, feedback_fn) class LUDelTags(TagsLU): @@ -7601,7 +8321,7 @@ class LUDelTags(TagsLU): diff_names = ["'%s'" % tag for tag in diff_tags] diff_names.sort() raise errors.OpPrereqError("Tag(s) %s not found" % - (",".join(diff_names))) + (",".join(diff_names)), errors.ECODE_NOENT) def Exec(self, feedback_fn): """Remove the tag from the object. @@ -7609,12 +8329,7 @@ class LUDelTags(TagsLU): """ for tag in self.op.tags: self.target.RemoveTag(tag) - try: - self.cfg.Update(self.target) - except errors.ConfigurationError: - raise errors.OpRetryError("There has been a modification to the" - " config file and the operation has been" - " aborted. Please retry.") + self.cfg.Update(self.target, feedback_fn) class LUTestDelay(NoHooksLU): @@ -7890,10 +8605,12 @@ class IAllocator(object): " IAllocator" % self.name) if instance.disk_template not in constants.DTS_NET_MIRROR: - raise errors.OpPrereqError("Can't relocate non-mirrored instances") + raise errors.OpPrereqError("Can't relocate non-mirrored instances", + errors.ECODE_INVAL) if len(instance.secondary_nodes) != 1: - raise errors.OpPrereqError("Instance has not exactly one secondary node") + raise errors.OpPrereqError("Instance has not exactly one secondary node", + errors.ECODE_STATE) self.required_nodes = 1 disk_sizes = [{'size': disk.size} for disk in instance.disks] @@ -7981,51 +8698,55 @@ class LUTestAllocator(NoHooksLU): "os", "tags", "nics", "vcpus"]: if not hasattr(self.op, attr): raise errors.OpPrereqError("Missing attribute '%s' on opcode input" % - attr) + attr, errors.ECODE_INVAL) iname = self.cfg.ExpandInstanceName(self.op.name) if iname is not None: raise errors.OpPrereqError("Instance '%s' already in the cluster" % - iname) + iname, errors.ECODE_EXISTS) if not isinstance(self.op.nics, list): - raise errors.OpPrereqError("Invalid parameter 'nics'") + raise errors.OpPrereqError("Invalid parameter 'nics'", + errors.ECODE_INVAL) for row in self.op.nics: if (not isinstance(row, dict) or "mac" not in row or "ip" not in row or "bridge" not in row): - raise errors.OpPrereqError("Invalid contents of the" - " 'nics' parameter") + raise errors.OpPrereqError("Invalid contents of the 'nics'" + " parameter", errors.ECODE_INVAL) if not isinstance(self.op.disks, list): - raise errors.OpPrereqError("Invalid parameter 'disks'") + raise errors.OpPrereqError("Invalid parameter 'disks'", + errors.ECODE_INVAL) for row in self.op.disks: if (not isinstance(row, dict) or "size" not in row or not isinstance(row["size"], int) or "mode" not in row or row["mode"] not in ['r', 'w']): - raise errors.OpPrereqError("Invalid contents of the" - " 'disks' parameter") + raise errors.OpPrereqError("Invalid contents of the 'disks'" + " parameter", errors.ECODE_INVAL) if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None: self.op.hypervisor = self.cfg.GetHypervisorType() elif self.op.mode == constants.IALLOCATOR_MODE_RELOC: if not hasattr(self.op, "name"): - raise errors.OpPrereqError("Missing attribute 'name' on opcode input") + raise errors.OpPrereqError("Missing attribute 'name' on opcode input", + errors.ECODE_INVAL) fname = self.cfg.ExpandInstanceName(self.op.name) if fname is None: raise errors.OpPrereqError("Instance '%s' not found for relocation" % - self.op.name) + self.op.name, errors.ECODE_NOENT) self.op.name = fname self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes else: raise errors.OpPrereqError("Invalid test allocator mode '%s'" % - self.op.mode) + self.op.mode, errors.ECODE_INVAL) if self.op.direction == constants.IALLOCATOR_DIR_OUT: if not hasattr(self.op, "allocator") or self.op.allocator is None: - raise errors.OpPrereqError("Missing allocator name") + raise errors.OpPrereqError("Missing allocator name", + errors.ECODE_INVAL) elif self.op.direction != constants.IALLOCATOR_DIR_IN: raise errors.OpPrereqError("Wrong allocator test '%s'" % - self.op.direction) + self.op.direction, errors.ECODE_INVAL) def Exec(self, feedback_fn): """Run the allocator test.