X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/b4ec07f86bf3e9cc58aa8593a0b9dbad8ba0a54f..88cd08aaaba8ad68c59228551c75424fb86c1904:/lib/cmdlib.py diff --git a/lib/cmdlib.py b/lib/cmdlib.py index e57f00a..0023cac 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -26,12 +26,10 @@ import os import os.path import time -import tempfile import re import platform import logging import copy -import random from ganeti import ssh from ganeti import utils @@ -40,7 +38,6 @@ from ganeti import hypervisor from ganeti import locking from ganeti import constants from ganeti import objects -from ganeti import opcodes from ganeti import serializer from ganeti import ssconf @@ -50,8 +47,8 @@ class LogicalUnit(object): Subclasses must follow these rules: - implement ExpandNames - - implement CheckPrereq - - implement Exec + - implement CheckPrereq (except when tasklets are used) + - implement Exec (except when tasklets are used) - implement BuildHooksEnv - redefine HPATH and HTYPE - optionally redefine their run requirements: @@ -59,6 +56,9 @@ class LogicalUnit(object): Note that all commands require root permissions. + @ivar dry_run_result: the value (if any) that will be returned to the caller + in dry-run mode (signalled by opcode dry_run parameter) + """ HPATH = None HTYPE = None @@ -68,7 +68,7 @@ class LogicalUnit(object): def __init__(self, processor, op, context, rpc): """Constructor for LogicalUnit. - This needs to be overriden in derived classes in order to check op + This needs to be overridden in derived classes in order to check op validity. """ @@ -80,7 +80,7 @@ class LogicalUnit(object): # Dicts used to declare locking needs to mcpu self.needed_locks = None self.acquired_locks = {} - self.share_locks = dict(((i, 0) for i in locking.LEVELS)) + self.share_locks = dict.fromkeys(locking.LEVELS, 0) self.add_locks = {} self.remove_locks = {} # Used to force good behavior when calling helper functions @@ -89,12 +89,19 @@ class LogicalUnit(object): # logging self.LogWarning = processor.LogWarning self.LogInfo = processor.LogInfo + self.LogStep = processor.LogStep + # support for dry-run + self.dry_run_result = None + + # Tasklets + self.tasklets = None for attr_name in self._OP_REQP: attr_val = getattr(op, attr_name, None) if attr_val is None: raise errors.OpPrereqError("Required parameter '%s' missing" % attr_name) + self.CheckArguments() def __GetSSH(self): @@ -116,7 +123,7 @@ class LogicalUnit(object): CheckPrereq, doing these separate is better because: - ExpandNames is left as as purely a lock-related function - - CheckPrereq is run after we have aquired locks (and possible + - CheckPrereq is run after we have acquired locks (and possible waited for them) The function is allowed to change the self.op attribute so that @@ -146,6 +153,10 @@ class LogicalUnit(object): level you can modify self.share_locks, setting a true value (usually 1) for that level. By default locks are not shared. + This function can also define a list of tasklets, which then will be + executed in order instead of the usual LU-level CheckPrereq and Exec + functions, if those are not defined by the LU. + Examples:: # Acquire all nodes and one instance @@ -202,7 +213,13 @@ class LogicalUnit(object): their canonical form if it hasn't been done by ExpandNames before. """ - raise NotImplementedError + if self.tasklets is not None: + for (idx, tl) in enumerate(self.tasklets): + logging.debug("Checking prerequisites for tasklet %s/%s", + idx + 1, len(self.tasklets)) + tl.CheckPrereq() + else: + raise NotImplementedError def Exec(self, feedback_fn): """Execute the LU. @@ -212,7 +229,12 @@ class LogicalUnit(object): code, or expected. """ - raise NotImplementedError + if self.tasklets is not None: + for (idx, tl) in enumerate(self.tasklets): + logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets)) + tl.Exec(feedback_fn) + else: + raise NotImplementedError def BuildHooksEnv(self): """Build hooks environment for this LU. @@ -336,6 +358,52 @@ class NoHooksLU(LogicalUnit): HTYPE = None +class Tasklet: + """Tasklet base class. + + Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or + they can mix legacy code with tasklets. Locking needs to be done in the LU, + tasklets know nothing about locks. + + Subclasses must follow these rules: + - Implement CheckPrereq + - Implement Exec + + """ + def __init__(self, lu): + self.lu = lu + + # Shortcuts + self.cfg = lu.cfg + self.rpc = lu.rpc + + def CheckPrereq(self): + """Check prerequisites for this tasklets. + + This method should check whether the prerequisites for the execution of + this tasklet are fulfilled. It can do internode communication, but it + should be idempotent - no cluster or system changes are allowed. + + The method should raise errors.OpPrereqError in case something is not + fulfilled. Its return value is ignored. + + This method should also update all parameters to their canonical form if it + hasn't been done before. + + """ + raise NotImplementedError + + def Exec(self, feedback_fn): + """Execute the tasklet. + + This method should implement the actual work. It should raise + errors.OpExecError for failures that are somewhat dealt with in code, or + expected. + + """ + raise NotImplementedError + + def _GetWantedNodes(lu, nodes): """Returns list of checked and expanded node names. @@ -454,7 +522,7 @@ def _CheckNodeNotDrained(lu, node): def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, memory, vcpus, nics, disk_template, disks, - bep, hvp, hypervisor): + bep, hvp, hypervisor_name): """Builds instance related env variables for hooks This builds the hook environment from individual variables. @@ -474,18 +542,18 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, @type vcpus: string @param vcpus: the count of VCPUs the instance has @type nics: list - @param nics: list of tuples (ip, bridge, mac) representing - the NICs the instance has + @param nics: list of tuples (ip, mac, mode, link) representing + the NICs the instance has @type disk_template: string - @param disk_template: the distk template of the instance + @param disk_template: the disk template of the instance @type disks: list @param disks: the list of (size, mode) pairs @type bep: dict @param bep: the backend parameters for the instance @type hvp: dict @param hvp: the hypervisor parameters for the instance - @type hypervisor: string - @param hypervisor: the hypervisor for the instance + @type hypervisor_name: string + @param hypervisor_name: the hypervisor for the instance @rtype: dict @return: the hook environment for this instance @@ -504,17 +572,20 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, "INSTANCE_MEMORY": memory, "INSTANCE_VCPUS": vcpus, "INSTANCE_DISK_TEMPLATE": disk_template, - "INSTANCE_HYPERVISOR": hypervisor, + "INSTANCE_HYPERVISOR": hypervisor_name, } if nics: nic_count = len(nics) - for idx, (ip, bridge, mac) in enumerate(nics): + for idx, (ip, mac, mode, link) in enumerate(nics): if ip is None: ip = "" env["INSTANCE_NIC%d_IP" % idx] = ip - env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge env["INSTANCE_NIC%d_MAC" % idx] = mac + env["INSTANCE_NIC%d_MODE" % idx] = mode + env["INSTANCE_NIC%d_LINK" % idx] = link + if mode == constants.NIC_MODE_BRIDGED: + env["INSTANCE_NIC%d_BRIDGE" % idx] = link else: nic_count = 0 @@ -537,6 +608,30 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, return env +def _NICListToTuple(lu, nics): + """Build a list of nic information tuples. + + This list is suitable to be passed to _BuildInstanceHookEnv or as a return + value in LUQueryInstanceData. + + @type lu: L{LogicalUnit} + @param lu: the logical unit on whose behalf we execute + @type nics: list of L{objects.NIC} + @param nics: list of nics to convert to hooks tuples + + """ + hooks_nics = [] + c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT] + for nic in nics: + ip = nic.ip + mac = nic.mac + filled_params = objects.FillDict(c_nicparams, nic.nicparams) + mode = filled_params[constants.NIC_MODE] + link = filled_params[constants.NIC_LINK] + hooks_nics.append((ip, mac, mode, link)) + return hooks_nics + + def _BuildInstanceHookEnvByObject(lu, instance, override=None): """Builds instance related env variables for hooks from an object. @@ -563,60 +658,197 @@ def _BuildInstanceHookEnvByObject(lu, instance, override=None): 'status': instance.admin_up, 'memory': bep[constants.BE_MEMORY], 'vcpus': bep[constants.BE_VCPUS], - 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics], + 'nics': _NICListToTuple(lu, instance.nics), 'disk_template': instance.disk_template, 'disks': [(disk.size, disk.mode) for disk in instance.disks], 'bep': bep, 'hvp': hvp, - 'hypervisor': instance.hypervisor, + 'hypervisor_name': instance.hypervisor, } if override: args.update(override) return _BuildInstanceHookEnv(**args) -def _AdjustCandidatePool(lu): +def _AdjustCandidatePool(lu, exceptions): """Adjust the candidate pool after node operations. """ - mod_list = lu.cfg.MaintainCandidatePool() + mod_list = lu.cfg.MaintainCandidatePool(exceptions) if mod_list: lu.LogInfo("Promoted nodes to master candidate role: %s", ", ".join(node.name for node in mod_list)) for name in mod_list: lu.context.ReaddNode(name) - mc_now, mc_max = lu.cfg.GetMasterCandidateStats() + mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions) if mc_now > mc_max: lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" % (mc_now, mc_max)) -def _CheckInstanceBridgesExist(lu, instance): +def _DecideSelfPromotion(lu, exceptions=None): + """Decide whether I should promote myself as a master candidate. + + """ + cp_size = lu.cfg.GetClusterInfo().candidate_pool_size + mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions) + # the new node will increase mc_max with one, so: + mc_should = min(mc_should + 1, cp_size) + return mc_now < mc_should + + +def _CheckNicsBridgesExist(lu, target_nics, target_node, + profile=constants.PP_DEFAULT): + """Check that the brigdes needed by a list of nics exist. + + """ + c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile] + paramslist = [objects.FillDict(c_nicparams, nic.nicparams) + for nic in target_nics] + brlist = [params[constants.NIC_LINK] for params in paramslist + if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED] + if brlist: + result = lu.rpc.call_bridges_exist(target_node, brlist) + result.Raise("Error checking bridges on destination node '%s'" % + target_node, prereq=True) + + +def _CheckInstanceBridgesExist(lu, instance, node=None): """Check that the brigdes needed by an instance exist. """ - # check bridges existance - brlist = [nic.bridge for nic in instance.nics] - result = lu.rpc.call_bridges_exist(instance.primary_node, brlist) - result.Raise() - if not result.data: - raise errors.OpPrereqError("One or more target bridges %s does not" - " exist on destination node '%s'" % - (brlist, instance.primary_node)) + if node is None: + node = instance.primary_node + _CheckNicsBridgesExist(lu, instance.nics, node) + + +def _CheckOSVariant(os, name): + """Check whether an OS name conforms to the os variants specification. + + @type os: L{objects.OS} + @param os: OS object to check + @type name: string + @param name: OS name passed by the user, to check for validity + + """ + if not os.supported_variants: + return + try: + variant = name.split("+", 1)[1] + except IndexError: + raise errors.OpPrereqError("OS name must include a variant") + + if variant not in os.supported_variants: + raise errors.OpPrereqError("Unsupported OS variant") + + +def _GetNodeInstancesInner(cfg, fn): + return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)] + + +def _GetNodeInstances(cfg, node_name): + """Returns a list of all primary and secondary instances on a node. + + """ + + return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes) + + +def _GetNodePrimaryInstances(cfg, node_name): + """Returns primary instances on a node. + + """ + return _GetNodeInstancesInner(cfg, + lambda inst: node_name == inst.primary_node) + + +def _GetNodeSecondaryInstances(cfg, node_name): + """Returns secondary instances on a node. + + """ + return _GetNodeInstancesInner(cfg, + lambda inst: node_name in inst.secondary_nodes) + + +def _GetStorageTypeArgs(cfg, storage_type): + """Returns the arguments for a storage type. + + """ + # Special case for file storage + if storage_type == constants.ST_FILE: + # storage.FileStorage wants a list of storage directories + return [[cfg.GetFileStorageDir()]] + + return [] + + +def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq): + faulty = [] + + for dev in instance.disks: + cfg.SetDiskID(dev, node_name) + + result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks) + result.Raise("Failed to get disk status from node %s" % node_name, + prereq=prereq) + + for idx, bdev_status in enumerate(result.payload): + if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY: + faulty.append(idx) + + return faulty + + +class LUPostInitCluster(LogicalUnit): + """Logical unit for running hooks after cluster initialization. + + """ + HPATH = "cluster-init" + HTYPE = constants.HTYPE_CLUSTER + _OP_REQP = [] + + def BuildHooksEnv(self): + """Build hooks env. + + """ + env = {"OP_TARGET": self.cfg.GetClusterName()} + mn = self.cfg.GetMasterNode() + return env, [], [mn] + + def CheckPrereq(self): + """No prerequisites to check. + + """ + return True + + def Exec(self, feedback_fn): + """Nothing to do. + + """ + return True -class LUDestroyCluster(NoHooksLU): +class LUDestroyCluster(LogicalUnit): """Logical unit for destroying the cluster. """ + HPATH = "cluster-destroy" + HTYPE = constants.HTYPE_CLUSTER _OP_REQP = [] + def BuildHooksEnv(self): + """Build hooks env. + + """ + env = {"OP_TARGET": self.cfg.GetClusterName()} + return env, [], [] + def CheckPrereq(self): """Check prerequisites. This checks whether the cluster is empty. - Any errors are signalled by raising errors.OpPrereqError. + Any errors are signaled by raising errors.OpPrereqError. """ master = self.cfg.GetMasterNode() @@ -635,10 +867,16 @@ class LUDestroyCluster(NoHooksLU): """ master = self.cfg.GetMasterNode() + + # Run post hooks on master node before it's removed + hm = self.proc.hmclass(self.rpc.call_hooks_runner, self) + try: + hm.RunPhase(constants.HOOKS_PHASE_POST, [master]) + except: + self.LogWarning("Errors occurred running hooks on %s" % master) + result = self.rpc.call_node_stop_master(master, False) - result.Raise() - if not result.data: - raise errors.OpExecError("Could not disable the master role") + result.Raise("Could not disable the master role") priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS) utils.CreateBackup(priv_key) utils.CreateBackup(pub_key) @@ -651,25 +889,89 @@ class LUVerifyCluster(LogicalUnit): """ HPATH = "cluster-verify" HTYPE = constants.HTYPE_CLUSTER - _OP_REQP = ["skip_checks"] + _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"] REQ_BGL = False + TCLUSTER = "cluster" + TNODE = "node" + TINSTANCE = "instance" + + ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG") + EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE") + EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN") + EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT") + EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK") + EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK") + EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE") + ENODEDRBD = (TNODE, "ENODEDRBD") + ENODEFILECHECK = (TNODE, "ENODEFILECHECK") + ENODEHOOKS = (TNODE, "ENODEHOOKS") + ENODEHV = (TNODE, "ENODEHV") + ENODELVM = (TNODE, "ENODELVM") + ENODEN1 = (TNODE, "ENODEN1") + ENODENET = (TNODE, "ENODENET") + ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE") + ENODEORPHANLV = (TNODE, "ENODEORPHANLV") + ENODERPC = (TNODE, "ENODERPC") + ENODESSH = (TNODE, "ENODESSH") + ENODEVERSION = (TNODE, "ENODEVERSION") + + ETYPE_FIELD = "code" + ETYPE_ERROR = "ERROR" + ETYPE_WARNING = "WARNING" + def ExpandNames(self): self.needed_locks = { locking.LEVEL_NODE: locking.ALL_SET, locking.LEVEL_INSTANCE: locking.ALL_SET, } - self.share_locks = dict(((i, 1) for i in locking.LEVELS)) + self.share_locks = dict.fromkeys(locking.LEVELS, 1) + + def _Error(self, ecode, item, msg, *args, **kwargs): + """Format an error message. + + Based on the opcode's error_codes parameter, either format a + parseable error code, or a simpler error string. + + This must be called only from Exec and functions called from Exec. + + """ + ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) + itype, etxt = ecode + # first complete the msg + if args: + msg = msg % args + # then format the whole message + if self.op.error_codes: + msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg) + else: + if item: + item = " " + item + else: + item = "" + msg = "%s: %s%s: %s" % (ltype, itype, item, msg) + # and finally report it via the feedback_fn + self._feedback_fn(" - %s" % msg) + + def _ErrorIf(self, cond, *args, **kwargs): + """Log an error message if the passed condition is True. + + """ + cond = bool(cond) or self.op.debug_simulate_errors + if cond: + self._Error(*args, **kwargs) + # do not mark the operation as failed for WARN cases only + if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR: + self.bad = self.bad or cond def _VerifyNode(self, nodeinfo, file_list, local_cksum, - node_result, feedback_fn, master_files, - drbd_map, vg_name): + node_result, master_files, drbd_map, vg_name): """Run multiple tests against a node. Test list: - compares ganeti version - - checks vg existance and size > 20G + - checks vg existence and size > 20G - checks config file checksum - checks ssh to other nodes @@ -678,7 +980,6 @@ class LUVerifyCluster(LogicalUnit): @param file_list: required list of files @param local_cksum: dictionary of local files and their checksums @param node_result: the results from the node - @param feedback_fn: function used to accumulate results @param master_files: list of files that only masters should have @param drbd_map: the useddrbd minors for this node, in form of minor: (instance, must_exist) which correspond to instances @@ -687,138 +988,136 @@ class LUVerifyCluster(LogicalUnit): """ node = nodeinfo.name + _ErrorIf = self._ErrorIf # main result, node_result should be a non-empty dict - if not node_result or not isinstance(node_result, dict): - feedback_fn(" - ERROR: unable to verify node %s." % (node,)) - return True + test = not node_result or not isinstance(node_result, dict) + _ErrorIf(test, self.ENODERPC, node, + "unable to verify node: no data returned") + if test: + return # compares ganeti version local_version = constants.PROTOCOL_VERSION remote_version = node_result.get('version', None) - if not (remote_version and isinstance(remote_version, (list, tuple)) and - len(remote_version) == 2): - feedback_fn(" - ERROR: connection to %s failed" % (node)) - return True - - if local_version != remote_version[0]: - feedback_fn(" - ERROR: incompatible protocol versions: master %s," - " node %s %s" % (local_version, node, remote_version[0])) - return True + test = not (remote_version and + isinstance(remote_version, (list, tuple)) and + len(remote_version) == 2) + _ErrorIf(test, self.ENODERPC, node, + "connection to node returned invalid data") + if test: + return + + test = local_version != remote_version[0] + _ErrorIf(test, self.ENODEVERSION, node, + "incompatible protocol versions: master %s," + " node %s", local_version, remote_version[0]) + if test: + return # node seems compatible, we can actually try to look into its results - bad = False - # full package version - if constants.RELEASE_VERSION != remote_version[1]: - feedback_fn(" - WARNING: software version mismatch: master %s," - " node %s %s" % - (constants.RELEASE_VERSION, node, remote_version[1])) + self._ErrorIf(constants.RELEASE_VERSION != remote_version[1], + self.ENODEVERSION, node, + "software version mismatch: master %s, node %s", + constants.RELEASE_VERSION, remote_version[1], + code=self.ETYPE_WARNING) # checks vg existence and size > 20G if vg_name is not None: vglist = node_result.get(constants.NV_VGLIST, None) - if not vglist: - feedback_fn(" - ERROR: unable to check volume groups on node %s." % - (node,)) - bad = True - else: + test = not vglist + _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups") + if not test: vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name, constants.MIN_VG_SIZE) - if vgstatus: - feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node)) - bad = True + _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus) # checks config file checksum remote_cksum = node_result.get(constants.NV_FILELIST, None) - if not isinstance(remote_cksum, dict): - bad = True - feedback_fn(" - ERROR: node hasn't returned file checksum data") - else: + test = not isinstance(remote_cksum, dict) + _ErrorIf(test, self.ENODEFILECHECK, node, + "node hasn't returned file checksum data") + if not test: for file_name in file_list: node_is_mc = nodeinfo.master_candidate - must_have_file = file_name not in master_files - if file_name not in remote_cksum: - if node_is_mc or must_have_file: - bad = True - feedback_fn(" - ERROR: file '%s' missing" % file_name) - elif remote_cksum[file_name] != local_cksum[file_name]: - if node_is_mc or must_have_file: - bad = True - feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name) - else: - # not candidate and this is not a must-have file - bad = True - feedback_fn(" - ERROR: file '%s' should not exist on non master" - " candidates (and the file is outdated)" % file_name) - else: - # all good, except non-master/non-must have combination - if not node_is_mc and not must_have_file: - feedback_fn(" - ERROR: file '%s' should not exist on non master" - " candidates" % file_name) + must_have = (file_name not in master_files) or node_is_mc + # missing + test1 = file_name not in remote_cksum + # invalid checksum + test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name] + # existing and good + test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name] + _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node, + "file '%s' missing", file_name) + _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node, + "file '%s' has wrong checksum", file_name) + # not candidate and this is not a must-have file + _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node, + "file '%s' should not exist on non master" + " candidates (and the file is outdated)", file_name) + # all good, except non-master/non-must have combination + _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node, + "file '%s' should not exist" + " on non master candidates", file_name) # checks ssh to any - if constants.NV_NODELIST not in node_result: - bad = True - feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data") - else: + test = constants.NV_NODELIST not in node_result + _ErrorIf(test, self.ENODESSH, node, + "node hasn't returned node ssh connectivity data") + if not test: if node_result[constants.NV_NODELIST]: - bad = True - for node in node_result[constants.NV_NODELIST]: - feedback_fn(" - ERROR: ssh communication with node '%s': %s" % - (node, node_result[constants.NV_NODELIST][node])) - - if constants.NV_NODENETTEST not in node_result: - bad = True - feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data") - else: + for a_node, a_msg in node_result[constants.NV_NODELIST].items(): + _ErrorIf(True, self.ENODESSH, node, + "ssh communication with node '%s': %s", a_node, a_msg) + + test = constants.NV_NODENETTEST not in node_result + _ErrorIf(test, self.ENODENET, node, + "node hasn't returned node tcp connectivity data") + if not test: if node_result[constants.NV_NODENETTEST]: - bad = True nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys()) - for node in nlist: - feedback_fn(" - ERROR: tcp communication with node '%s': %s" % - (node, node_result[constants.NV_NODENETTEST][node])) + for anode in nlist: + _ErrorIf(True, self.ENODENET, node, + "tcp communication with node '%s': %s", + anode, node_result[constants.NV_NODENETTEST][anode]) hyp_result = node_result.get(constants.NV_HYPERVISOR, None) if isinstance(hyp_result, dict): for hv_name, hv_result in hyp_result.iteritems(): - if hv_result is not None: - feedback_fn(" - ERROR: hypervisor %s verify failure: '%s'" % - (hv_name, hv_result)) + test = hv_result is not None + _ErrorIf(test, self.ENODEHV, node, + "hypervisor %s verify failure: '%s'", hv_name, hv_result) # check used drbd list if vg_name is not None: used_minors = node_result.get(constants.NV_DRBDLIST, []) - if not isinstance(used_minors, (tuple, list)): - feedback_fn(" - ERROR: cannot parse drbd status file: %s" % - str(used_minors)) - else: + test = not isinstance(used_minors, (tuple, list)) + _ErrorIf(test, self.ENODEDRBD, node, + "cannot parse drbd status file: %s", str(used_minors)) + if not test: for minor, (iname, must_exist) in drbd_map.items(): - if minor not in used_minors and must_exist: - feedback_fn(" - ERROR: drbd minor %d of instance %s is" - " not active" % (minor, iname)) - bad = True + test = minor not in used_minors and must_exist + _ErrorIf(test, self.ENODEDRBD, node, + "drbd minor %d of instance %s is not active", + minor, iname) for minor in used_minors: - if minor not in drbd_map: - feedback_fn(" - ERROR: unallocated drbd minor %d is in use" % - minor) - bad = True - - return bad + test = minor not in drbd_map + _ErrorIf(test, self.ENODEDRBD, node, + "unallocated drbd minor %d is in use", minor) def _VerifyInstance(self, instance, instanceconfig, node_vol_is, - node_instance, feedback_fn, n_offline): + node_instance, n_offline): """Verify an instance. This function checks to see if the required block devices are available on the instance's node. """ - bad = False - + _ErrorIf = self._ErrorIf node_current = instanceconfig.primary_node node_vol_should = {} @@ -829,69 +1128,57 @@ class LUVerifyCluster(LogicalUnit): # ignore missing volumes on offline nodes continue for volume in node_vol_should[node]: - if node not in node_vol_is or volume not in node_vol_is[node]: - feedback_fn(" - ERROR: volume %s missing on node %s" % - (volume, node)) - bad = True + test = node not in node_vol_is or volume not in node_vol_is[node] + _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance, + "volume %s missing on node %s", volume, node) if instanceconfig.admin_up: - if ((node_current not in node_instance or - not instance in node_instance[node_current]) and - node_current not in n_offline): - feedback_fn(" - ERROR: instance %s not running on node %s" % - (instance, node_current)) - bad = True + test = ((node_current not in node_instance or + not instance in node_instance[node_current]) and + node_current not in n_offline) + _ErrorIf(test, self.EINSTANCEDOWN, instance, + "instance not running on its primary node %s", + node_current) for node in node_instance: if (not node == node_current): - if instance in node_instance[node]: - feedback_fn(" - ERROR: instance %s should not run on node %s" % - (instance, node)) - bad = True + test = instance in node_instance[node] + _ErrorIf(test, self.EINSTANCEWRONGNODE, instance, + "instance should not run on node %s", node) - return bad - - def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn): + def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is): """Verify if there are any unknown volumes in the cluster. The .os, .swap and backup volumes are ignored. All other volumes are reported as unknown. """ - bad = False - for node in node_vol_is: for volume in node_vol_is[node]: - if node not in node_vol_should or volume not in node_vol_should[node]: - feedback_fn(" - ERROR: volume %s on node %s should not exist" % - (volume, node)) - bad = True - return bad + test = (node not in node_vol_should or + volume not in node_vol_should[node]) + self._ErrorIf(test, self.ENODEORPHANLV, node, + "volume %s is unknown", volume) - def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn): + def _VerifyOrphanInstances(self, instancelist, node_instance): """Verify the list of running instances. This checks what instances are running but unknown to the cluster. """ - bad = False for node in node_instance: - for runninginstance in node_instance[node]: - if runninginstance not in instancelist: - feedback_fn(" - ERROR: instance %s on node %s should not exist" % - (runninginstance, node)) - bad = True - return bad - - def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn): + for o_inst in node_instance[node]: + test = o_inst not in instancelist + self._ErrorIf(test, self.ENODEORPHANINSTANCE, node, + "instance %s on node %s should not exist", o_inst, node) + + def _VerifyNPlusOneMemory(self, node_info, instance_cfg): """Verify N+1 Memory Resilience. Check that if one single node dies we can still start all the instances it was primary for. """ - bad = False - for node, nodeinfo in node_info.iteritems(): # This code checks that every node which is now listed as secondary has # enough memory to host all instances it is supposed to should a single @@ -907,11 +1194,10 @@ class LUVerifyCluster(LogicalUnit): bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance]) if bep[constants.BE_AUTO_BALANCE]: needed_mem += bep[constants.BE_MEMORY] - if nodeinfo['mfree'] < needed_mem: - feedback_fn(" - ERROR: not enough memory on node %s to accomodate" - " failovers should node %s fail" % (node, prinode)) - bad = True - return bad + test = nodeinfo['mfree'] < needed_mem + self._ErrorIf(test, self.ENODEN1, node, + "not enough memory on to accommodate" + " failovers should peer node %s fail", prinode) def CheckPrereq(self): """Check prerequisites. @@ -927,7 +1213,7 @@ class LUVerifyCluster(LogicalUnit): def BuildHooksEnv(self): """Build hooks env. - Cluster-Verify hooks just rone in the post phase and their failure makes + Cluster-Verify hooks just ran in the post phase and their failure makes the output be logged in the verify output and the verification to fail. """ @@ -944,10 +1230,13 @@ class LUVerifyCluster(LogicalUnit): """Verify integrity of cluster, performing various test on nodes. """ - bad = False + self.bad = False + _ErrorIf = self._ErrorIf + verbose = self.op.verbose + self._feedback_fn = feedback_fn feedback_fn("* Verifying global settings") for msg in self.cfg.VerifyConfig(): - feedback_fn(" - ERROR: %s" % msg) + _ErrorIf(True, self.ECLUSTERCFG, None, msg) vg_name = self.cfg.GetVGName() hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors @@ -1000,12 +1289,13 @@ class LUVerifyCluster(LogicalUnit): master_node = self.cfg.GetMasterNode() all_drbd_map = self.cfg.ComputeDRBDMap() + feedback_fn("* Verifying node status") for node_i in nodeinfo: node = node_i.name - nresult = all_nvinfo[node].data if node_i.offline: - feedback_fn("* Skipping offline node %s" % (node,)) + if verbose: + feedback_fn("* Skipping offline node %s" % (node,)) n_offline.append(node) continue @@ -1018,60 +1308,59 @@ class LUVerifyCluster(LogicalUnit): n_drained.append(node) else: ntype = "regular" - feedback_fn("* Verifying node %s (%s)" % (node, ntype)) + if verbose: + feedback_fn("* Verifying node %s (%s)" % (node, ntype)) - if all_nvinfo[node].failed or not isinstance(nresult, dict): - feedback_fn(" - ERROR: connection to %s failed" % (node,)) - bad = True + msg = all_nvinfo[node].fail_msg + _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg) + if msg: continue + nresult = all_nvinfo[node].payload node_drbd = {} for minor, instance in all_drbd_map[node].items(): - if instance not in instanceinfo: - feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" % - instance) + test = instance not in instanceinfo + _ErrorIf(test, self.ECLUSTERCFG, None, + "ghost instance '%s' in temporary DRBD map", instance) # ghost instance should not be running, but otherwise we # don't give double warnings (both ghost instance and # unallocated minor in use) + if test: node_drbd[minor] = (instance, False) else: instance = instanceinfo[instance] node_drbd[minor] = (instance.name, instance.admin_up) - result = self._VerifyNode(node_i, file_names, local_checksums, - nresult, feedback_fn, master_files, - node_drbd, vg_name) - bad = bad or result + self._VerifyNode(node_i, file_names, local_checksums, + nresult, master_files, node_drbd, vg_name) lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data") if vg_name is None: node_volume[node] = {} elif isinstance(lvdata, basestring): - feedback_fn(" - ERROR: LVM problem on node %s: %s" % - (node, utils.SafeEncode(lvdata))) - bad = True + _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s", + utils.SafeEncode(lvdata)) node_volume[node] = {} elif not isinstance(lvdata, dict): - feedback_fn(" - ERROR: connection to %s failed (lvlist)" % (node,)) - bad = True + _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)") continue else: node_volume[node] = lvdata # node_instance idata = nresult.get(constants.NV_INSTANCELIST, None) - if not isinstance(idata, list): - feedback_fn(" - ERROR: connection to %s failed (instancelist)" % - (node,)) - bad = True + test = not isinstance(idata, list) + _ErrorIf(test, self.ENODEHV, node, + "rpc call to node failed (instancelist)") + if test: continue node_instance[node] = idata # node_info nodeinfo = nresult.get(constants.NV_HVINFO, None) - if not isinstance(nodeinfo, dict): - feedback_fn(" - ERROR: connection to %s failed (hvinfo)" % (node,)) - bad = True + test = not isinstance(nodeinfo, dict) + _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)") + if test: continue try: @@ -1089,28 +1378,28 @@ class LUVerifyCluster(LogicalUnit): } # FIXME: devise a free space model for file based instances as well if vg_name is not None: - if (constants.NV_VGLIST not in nresult or - vg_name not in nresult[constants.NV_VGLIST]): - feedback_fn(" - ERROR: node %s didn't return data for the" - " volume group '%s' - it is either missing or broken" % - (node, vg_name)) - bad = True + test = (constants.NV_VGLIST not in nresult or + vg_name not in nresult[constants.NV_VGLIST]) + _ErrorIf(test, self.ENODELVM, node, + "node didn't return data for the volume group '%s'" + " - it is either missing or broken", vg_name) + if test: continue node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name]) except (ValueError, KeyError): - feedback_fn(" - ERROR: invalid nodeinfo value returned" - " from node %s" % (node,)) - bad = True + _ErrorIf(True, self.ENODERPC, node, + "node returned invalid nodeinfo, check lvm/hypervisor") continue node_vol_should = {} + feedback_fn("* Verifying instance status") for instance in instancelist: - feedback_fn("* Verifying instance %s" % instance) + if verbose: + feedback_fn("* Verifying instance %s" % instance) inst_config = instanceinfo[instance] - result = self._VerifyInstance(instance, inst_config, node_volume, - node_instance, feedback_fn, n_offline) - bad = bad or result + self._VerifyInstance(instance, inst_config, node_volume, + node_instance, n_offline) inst_nodes_offline = [] inst_config.MapLVsByNode(node_vol_should) @@ -1118,12 +1407,11 @@ class LUVerifyCluster(LogicalUnit): instance_cfg[instance] = inst_config pnode = inst_config.primary_node + _ErrorIf(pnode not in node_info and pnode not in n_offline, + self.ENODERPC, pnode, "instance %s, connection to" + " primary node failed", instance) if pnode in node_info: node_info[pnode]['pinst'].append(instance) - elif pnode not in n_offline: - feedback_fn(" - ERROR: instance %s, connection to primary node" - " %s failed" % (instance, pnode)) - bad = True if pnode in n_offline: inst_nodes_offline.append(pnode) @@ -1135,46 +1423,42 @@ class LUVerifyCluster(LogicalUnit): # FIXME: does not support file-backed instances if len(inst_config.secondary_nodes) == 0: i_non_redundant.append(instance) - elif len(inst_config.secondary_nodes) > 1: - feedback_fn(" - WARNING: multiple secondaries for instance %s" - % instance) + _ErrorIf(len(inst_config.secondary_nodes) > 1, + self.EINSTANCELAYOUT, instance, + "instance has multiple secondary nodes", code="WARNING") if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]: i_non_a_balanced.append(instance) for snode in inst_config.secondary_nodes: + _ErrorIf(snode not in node_info and snode not in n_offline, + self.ENODERPC, snode, + "instance %s, connection to secondary node" + "failed", instance) + if snode in node_info: node_info[snode]['sinst'].append(instance) if pnode not in node_info[snode]['sinst-by-pnode']: node_info[snode]['sinst-by-pnode'][pnode] = [] node_info[snode]['sinst-by-pnode'][pnode].append(instance) - elif snode not in n_offline: - feedback_fn(" - ERROR: instance %s, connection to secondary node" - " %s failed" % (instance, snode)) - bad = True + if snode in n_offline: inst_nodes_offline.append(snode) - if inst_nodes_offline: - # warn that the instance lives on offline nodes, and set bad=True - feedback_fn(" - ERROR: instance lives on offline node(s) %s" % - ", ".join(inst_nodes_offline)) - bad = True + # warn that the instance lives on offline nodes + _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance, + "instance lives on offline node(s) %s", + ", ".join(inst_nodes_offline)) feedback_fn("* Verifying orphan volumes") - result = self._VerifyOrphanVolumes(node_vol_should, node_volume, - feedback_fn) - bad = bad or result + self._VerifyOrphanVolumes(node_vol_should, node_volume) feedback_fn("* Verifying remaining instances") - result = self._VerifyOrphanInstances(instancelist, node_instance, - feedback_fn) - bad = bad or result + self._VerifyOrphanInstances(instancelist, node_instance) if constants.VERIFY_NPLUSONE_MEM not in self.skip_set: feedback_fn("* Verifying N+1 Memory redundancy") - result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn) - bad = bad or result + self._VerifyNPlusOneMemory(node_info, instance_cfg) feedback_fn("* Other Notes") if i_non_redundant: @@ -1191,10 +1475,10 @@ class LUVerifyCluster(LogicalUnit): if n_drained: feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained)) - return not bad + return not self.bad def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result): - """Analize the post-hooks' result + """Analyze the post-hooks' result This method analyses the hook result, handles it, and sends some nicely-formatted feedback back to the user. @@ -1214,31 +1498,28 @@ class LUVerifyCluster(LogicalUnit): # Used to change hooks' output to proper indentation indent_re = re.compile('^', re.M) feedback_fn("* Hooks Results") - if not hooks_results: - feedback_fn(" - ERROR: general communication failure") - lu_result = 1 - else: - for node_name in hooks_results: - show_node_header = True - res = hooks_results[node_name] - if res.failed or res.data is False or not isinstance(res.data, list): - if res.offline: - # no need to warn or set fail return value - continue - feedback_fn(" Communication failure in hooks execution") + assert hooks_results, "invalid result from hooks" + + for node_name in hooks_results: + show_node_header = True + res = hooks_results[node_name] + msg = res.fail_msg + test = msg and not res.offline + self._ErrorIf(test, self.ENODEHOOKS, node_name, + "Communication failure in hooks execution: %s", msg) + if test: + # override manually lu_result here as _ErrorIf only + # overrides self.bad + lu_result = 1 + continue + for script, hkr, output in res.payload: + test = hkr == constants.HKR_FAIL + self._ErrorIf(test, self.ENODEHOOKS, node_name, + "Script %s failed, output:", script) + if test: + output = indent_re.sub(' ', output) + feedback_fn("%s" % output) lu_result = 1 - continue - for script, hkr, output in res.data: - if hkr == constants.HKR_FAIL: - # The node header is only shown once, if there are - # failing hooks on that node - if show_node_header: - feedback_fn(" Node %s:" % node_name) - show_node_header = False - feedback_fn(" ERROR: Script %s failed, output:" % script) - output = indent_re.sub(' ', output) - feedback_fn("%s" % output) - lu_result = 1 return lu_result @@ -1255,7 +1536,7 @@ class LUVerifyDisks(NoHooksLU): locking.LEVEL_NODE: locking.ALL_SET, locking.LEVEL_INSTANCE: locking.ALL_SET, } - self.share_locks = dict(((i, 1) for i in locking.LEVELS)) + self.share_locks = dict.fromkeys(locking.LEVELS, 1) def CheckPrereq(self): """Check prerequisites. @@ -1268,8 +1549,13 @@ class LUVerifyDisks(NoHooksLU): def Exec(self, feedback_fn): """Verify integrity of cluster disks. + @rtype: tuple of three items + @return: a tuple of (dict of node-to-node_error, list of instances + which need activate-disks, dict of instance: (node, volume) for + missing volumes + """ - result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {} + result = res_nodes, res_instances, res_missing = {}, [], {} vg_name = self.cfg.GetVGName() nodes = utils.NiceSort(self.cfg.GetNodeList()) @@ -1291,29 +1577,21 @@ class LUVerifyDisks(NoHooksLU): if not nv_dict: return result - node_lvs = self.rpc.call_volume_list(nodes, vg_name) + node_lvs = self.rpc.call_lv_list(nodes, vg_name) - to_act = set() for node in nodes: # node_volume - lvs = node_lvs[node] - if lvs.failed: - if not lvs.offline: - self.LogWarning("Connection to node %s failed: %s" % - (node, lvs.data)) + node_res = node_lvs[node] + if node_res.offline: continue - lvs = lvs.data - if isinstance(lvs, basestring): - logging.warning("Error enumerating LVs on node %s: %s", node, lvs) - res_nlvm[node] = lvs - continue - elif not isinstance(lvs, dict): - logging.warning("Connection to node %s failed or invalid data" - " returned", node) - res_nodes.append(node) + msg = node_res.fail_msg + if msg: + logging.warning("Error enumerating LVs on node %s: %s", node, msg) + res_nodes[node] = msg continue - for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems(): + lvs = node_res.payload + for lv_name, (_, lv_inactive, lv_online) in lvs.items(): inst = nv_dict.pop((node, lv_name), None) if (not lv_online and inst is not None and inst.name not in res_instances): @@ -1329,6 +1607,127 @@ class LUVerifyDisks(NoHooksLU): return result +class LURepairDiskSizes(NoHooksLU): + """Verifies the cluster disks sizes. + + """ + _OP_REQP = ["instances"] + REQ_BGL = False + + def ExpandNames(self): + if not isinstance(self.op.instances, list): + raise errors.OpPrereqError("Invalid argument type 'instances'") + + if self.op.instances: + self.wanted_names = [] + for name in self.op.instances: + full_name = self.cfg.ExpandInstanceName(name) + if full_name is None: + raise errors.OpPrereqError("Instance '%s' not known" % name) + self.wanted_names.append(full_name) + self.needed_locks = { + locking.LEVEL_NODE: [], + locking.LEVEL_INSTANCE: self.wanted_names, + } + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + else: + self.wanted_names = None + self.needed_locks = { + locking.LEVEL_NODE: locking.ALL_SET, + locking.LEVEL_INSTANCE: locking.ALL_SET, + } + self.share_locks = dict(((i, 1) for i in locking.LEVELS)) + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE and self.wanted_names is not None: + self._LockInstancesNodes(primary_only=True) + + def CheckPrereq(self): + """Check prerequisites. + + This only checks the optional instance list against the existing names. + + """ + if self.wanted_names is None: + self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE] + + self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name + in self.wanted_names] + + def _EnsureChildSizes(self, disk): + """Ensure children of the disk have the needed disk size. + + This is valid mainly for DRBD8 and fixes an issue where the + children have smaller disk size. + + @param disk: an L{ganeti.objects.Disk} object + + """ + if disk.dev_type == constants.LD_DRBD8: + assert disk.children, "Empty children for DRBD8?" + fchild = disk.children[0] + mismatch = fchild.size < disk.size + if mismatch: + self.LogInfo("Child disk has size %d, parent %d, fixing", + fchild.size, disk.size) + fchild.size = disk.size + + # and we recurse on this child only, not on the metadev + return self._EnsureChildSizes(fchild) or mismatch + else: + return False + + def Exec(self, feedback_fn): + """Verify the size of cluster disks. + + """ + # TODO: check child disks too + # TODO: check differences in size between primary/secondary nodes + per_node_disks = {} + for instance in self.wanted_instances: + pnode = instance.primary_node + if pnode not in per_node_disks: + per_node_disks[pnode] = [] + for idx, disk in enumerate(instance.disks): + per_node_disks[pnode].append((instance, idx, disk)) + + changed = [] + for node, dskl in per_node_disks.items(): + newl = [v[2].Copy() for v in dskl] + for dsk in newl: + self.cfg.SetDiskID(dsk, node) + result = self.rpc.call_blockdev_getsizes(node, newl) + if result.fail_msg: + self.LogWarning("Failure in blockdev_getsizes call to node" + " %s, ignoring", node) + continue + if len(result.data) != len(dskl): + self.LogWarning("Invalid result from node %s, ignoring node results", + node) + continue + for ((instance, idx, disk), size) in zip(dskl, result.data): + if size is None: + self.LogWarning("Disk %d of instance %s did not return size" + " information, ignoring", idx, instance.name) + continue + if not isinstance(size, (int, long)): + self.LogWarning("Disk %d of instance %s did not return valid" + " size information, ignoring", idx, instance.name) + continue + size = size >> 20 + if size != disk.size: + self.LogInfo("Disk %d of instance %s has mismatched size," + " correcting: recorded %d, actual %d", idx, + instance.name, disk.size, size) + disk.size = size + self.cfg.Update(instance) + changed.append((instance.name, idx, size)) + if self._EnsureChildSizes(disk): + self.cfg.Update(instance) + changed.append((instance.name, idx, disk.size)) + return changed + + class LURenameCluster(LogicalUnit): """Rename the cluster. @@ -1379,8 +1778,7 @@ class LURenameCluster(LogicalUnit): # shutdown the master IP master = self.cfg.GetMasterNode() result = self.rpc.call_node_stop_master(master, False) - if result.failed or not result.data: - raise errors.OpExecError("Could not disable the master role") + result.Raise("Could not disable the master role") try: cluster = self.cfg.GetClusterInfo() @@ -1398,15 +1796,18 @@ class LURenameCluster(LogicalUnit): result = self.rpc.call_upload_file(node_list, constants.SSH_KNOWN_HOSTS_FILE) for to_node, to_result in result.iteritems(): - if to_result.failed or not to_result.data: - logging.error("Copy of file %s to node %s failed", - constants.SSH_KNOWN_HOSTS_FILE, to_node) + msg = to_result.fail_msg + if msg: + msg = ("Copy of file %s to node %s failed: %s" % + (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg)) + self.proc.LogWarning(msg) finally: result = self.rpc.call_node_start_master(master, False, False) - if result.failed or not result.data: + msg = result.fail_msg + if msg: self.LogWarning("Could not re-enable the master role on" - " the master, please restart manually.") + " the master, please restart manually: %s", msg) def _RecursiveCheckIfLVMBased(disk): @@ -1414,7 +1815,7 @@ def _RecursiveCheckIfLVMBased(disk): @type disk: L{objects.Disk} @param disk: the disk to check - @rtype: booleean + @rtype: boolean @return: boolean indicating whether a LD_LV dev_type was found or not """ @@ -1489,11 +1890,13 @@ class LUSetClusterParams(LogicalUnit): if self.op.vg_name: vglist = self.rpc.call_vg_list(node_list) for node in node_list: - if vglist[node].failed: + msg = vglist[node].fail_msg + if msg: # ignoring down node - self.LogWarning("Node %s unreachable/error, ignoring" % node) + self.LogWarning("Error while gathering data on node %s" + " (ignoring node): %s", node, msg) continue - vgstatus = utils.CheckVolumeGroupSize(vglist[node].data, + vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload, self.op.vg_name, constants.MIN_VG_SIZE) if vgstatus: @@ -1501,14 +1904,20 @@ class LUSetClusterParams(LogicalUnit): (node, vgstatus)) self.cluster = cluster = self.cfg.GetClusterInfo() - # validate beparams changes + # validate params changes if self.op.beparams: utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES) - self.new_beparams = cluster.FillDict( - cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams) + self.new_beparams = objects.FillDict( + cluster.beparams[constants.PP_DEFAULT], self.op.beparams) + + if self.op.nicparams: + utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES) + self.new_nicparams = objects.FillDict( + cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams) + objects.NIC.CheckParameterSyntax(self.new_nicparams) # hypervisor list/parameters - self.new_hvparams = cluster.FillDict(cluster.hvparams, {}) + self.new_hvparams = objects.FillDict(cluster.hvparams, {}) if self.op.hvparams: if not isinstance(self.op.hvparams, dict): raise errors.OpPrereqError("Invalid 'hvparams' parameter on input") @@ -1520,6 +1929,13 @@ class LUSetClusterParams(LogicalUnit): if self.op.enabled_hypervisors is not None: self.hv_list = self.op.enabled_hypervisors + if not self.hv_list: + raise errors.OpPrereqError("Enabled hypervisors list must contain at" + " least one member") + invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES + if invalid_hvs: + raise errors.OpPrereqError("Enabled hypervisors contains invalid" + " entries: %s" % " ,".join(invalid_hvs)) else: self.hv_list = cluster.enabled_hypervisors @@ -1553,15 +1969,61 @@ class LUSetClusterParams(LogicalUnit): if self.op.enabled_hypervisors is not None: self.cluster.enabled_hypervisors = self.op.enabled_hypervisors if self.op.beparams: - self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams + self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams + if self.op.nicparams: + self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams + if self.op.candidate_pool_size is not None: self.cluster.candidate_pool_size = self.op.candidate_pool_size # we need to update the pool size here, otherwise the save will fail - _AdjustCandidatePool(self) + _AdjustCandidatePool(self, []) self.cfg.Update(self.cluster) +def _RedistributeAncillaryFiles(lu, additional_nodes=None): + """Distribute additional files which are part of the cluster configuration. + + ConfigWriter takes care of distributing the config and ssconf files, but + there are more files which should be distributed to all nodes. This function + makes sure those are copied. + + @param lu: calling logical unit + @param additional_nodes: list of nodes not in the config to distribute to + + """ + # 1. Gather target nodes + myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode()) + dist_nodes = lu.cfg.GetNodeList() + if additional_nodes is not None: + dist_nodes.extend(additional_nodes) + if myself.name in dist_nodes: + dist_nodes.remove(myself.name) + # 2. Gather files to distribute + dist_files = set([constants.ETC_HOSTS, + constants.SSH_KNOWN_HOSTS_FILE, + constants.RAPI_CERT_FILE, + constants.RAPI_USERS_FILE, + constants.HMAC_CLUSTER_KEY, + ]) + + enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors + for hv_name in enabled_hypervisors: + hv_class = hypervisor.GetHypervisor(hv_name) + dist_files.update(hv_class.GetAncillaryFiles()) + + # 3. Perform the files upload + for fname in dist_files: + if os.path.exists(fname): + result = lu.rpc.call_upload_file(dist_nodes, fname) + for to_node, to_result in result.items(): + msg = to_result.fail_msg + if msg: + msg = ("Copy of file %s to node %s failed: %s" % + (fname, to_node, msg)) + lu.proc.LogWarning(msg) + + class LURedistributeConfig(NoHooksLU): """Force the redistribution of cluster configuration. @@ -1587,6 +2049,7 @@ class LURedistributeConfig(NoHooksLU): """ self.cfg.Update(self.cfg.GetClusterInfo()) + _RedistributeAncillaryFiles(self) def _WaitForSync(lu, instance, oneshot=False, unlock=False): @@ -1611,33 +2074,35 @@ def _WaitForSync(lu, instance, oneshot=False, unlock=False): done = True cumul_degraded = False rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks) - if rstats.failed or not rstats.data: - lu.LogWarning("Can't get any data from node %s", node) + msg = rstats.fail_msg + if msg: + lu.LogWarning("Can't get any data from node %s: %s", node, msg) retries += 1 if retries >= 10: raise errors.RemoteError("Can't contact node %s for mirror data," " aborting." % node) time.sleep(6) continue - rstats = rstats.data + rstats = rstats.payload retries = 0 for i, mstat in enumerate(rstats): if mstat is None: lu.LogWarning("Can't compute data for node %s/%s", node, instance.disks[i].iv_name) continue - # we ignore the ldisk parameter - perc_done, est_time, is_degraded, _ = mstat - cumul_degraded = cumul_degraded or (is_degraded and perc_done is None) - if perc_done is not None: + + cumul_degraded = (cumul_degraded or + (mstat.is_degraded and mstat.sync_percent is None)) + if mstat.sync_percent is not None: done = False - if est_time is not None: - rem_time = "%d estimated seconds remaining" % est_time - max_time = est_time + if mstat.estimated_time is not None: + rem_time = "%d estimated seconds remaining" % mstat.estimated_time + max_time = mstat.estimated_time else: rem_time = "no time estimate" lu.proc.LogInfo("- device %s: %5.2f%% done, %s" % - (instance.disks[i].iv_name, perc_done, rem_time)) + (instance.disks[i].iv_name, mstat.sync_percent, + rem_time)) # if we're done but degraded, let's do a few small retries, to # make sure we see a stable and not transient situation; therefore @@ -1667,15 +2132,12 @@ def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False): """ lu.cfg.SetDiskID(dev, node) - if ldisk: - idx = 6 - else: - idx = 5 result = True + if on_primary or dev.AssembleOnSecondary(): rstats = lu.rpc.call_blockdev_find(node, dev) - msg = rstats.RemoteFailMsg() + msg = rstats.fail_msg if msg: lu.LogWarning("Can't find disk on node %s: %s", node, msg) result = False @@ -1683,7 +2145,11 @@ def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False): lu.LogWarning("Can't find disk on node %s", node) result = False else: - result = result and (not rstats.payload[idx]) + if ldisk: + result = result and rstats.payload.ldisk_status == constants.LDS_OKAY + else: + result = result and not rstats.payload.is_degraded + if dev.children: for child in dev.children: result = result and _CheckDiskConsistency(lu, child, node, on_primary) @@ -1698,7 +2164,9 @@ class LUDiagnoseOS(NoHooksLU): _OP_REQP = ["output_fields", "names"] REQ_BGL = False _FIELDS_STATIC = utils.FieldSet() - _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status") + _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants") + # Fields that need calculation of global os validity + _FIELDS_NEEDVALID = frozenset(["valid", "variants"]) def ExpandNames(self): if self.op.names: @@ -1729,10 +2197,11 @@ class LUDiagnoseOS(NoHooksLU): @rtype: dict @return: a dictionary with osnames as keys and as value another map, with - nodes as keys and list of OS objects as values, eg:: + nodes as keys and tuples of (path, status, diagnose) as values, eg:: - {"debian-etch": {"node1": [,...], - "node2": [,]} + {"debian-etch": {"node1": [(/usr/lib/..., True, ""), + (/srv/..., False, "invalid api")], + "node2": [(/srv/..., True, "")]} } """ @@ -1741,18 +2210,18 @@ class LUDiagnoseOS(NoHooksLU): # level), so that nodes with a non-responding node daemon don't # make all OSes invalid good_nodes = [node_name for node_name in rlist - if not rlist[node_name].failed] - for node_name, nr in rlist.iteritems(): - if nr.failed or not nr.data: + if not rlist[node_name].fail_msg] + for node_name, nr in rlist.items(): + if nr.fail_msg or not nr.payload: continue - for os_obj in nr.data: - if os_obj.name not in all_os: + for name, path, status, diagnose, variants in nr.payload: + if name not in all_os: # build a list of nodes for this os containing empty lists # for each node in node_list - all_os[os_obj.name] = {} + all_os[name] = {} for nname in good_nodes: - all_os[os_obj.name][nname] = [] - all_os[os_obj.name][node_name].append(os_obj) + all_os[name][nname] = [] + all_os[name][node_name].append((path, status, diagnose, variants)) return all_os def Exec(self, feedback_fn): @@ -1761,21 +2230,40 @@ class LUDiagnoseOS(NoHooksLU): """ valid_nodes = [node for node in self.cfg.GetOnlineNodeList()] node_data = self.rpc.call_os_diagnose(valid_nodes) - if node_data == False: - raise errors.OpExecError("Can't gather the list of OSes") pol = self._DiagnoseByOS(valid_nodes, node_data) output = [] - for os_name, os_data in pol.iteritems(): + calc_valid = self._FIELDS_NEEDVALID.intersection(self.op.output_fields) + calc_variants = "variants" in self.op.output_fields + + for os_name, os_data in pol.items(): row = [] + if calc_valid: + valid = True + variants = None + for osl in os_data.values(): + valid = valid and osl and osl[0][1] + if not valid: + variants = None + break + if calc_variants: + node_variants = osl[0][3] + if variants is None: + variants = node_variants + else: + variants = [v for v in variants if v in node_variants] + for field in self.op.output_fields: if field == "name": val = os_name elif field == "valid": - val = utils.all([osl and osl[0] for osl in os_data.values()]) + val = valid elif field == "node_status": + # this is just a copy of the dict val = {} - for node_name, nos_list in os_data.iteritems(): - val[node_name] = [(v.status, v.path) for v in nos_list] + for node_name, nos_list in os_data.items(): + val[node_name] = nos_list + elif field == "variants": + val = variants else: raise errors.ParameterError(field) row.append(val) @@ -1804,7 +2292,8 @@ class LURemoveNode(LogicalUnit): "NODE_NAME": self.op.node_name, } all_nodes = self.cfg.GetNodeList() - all_nodes.remove(self.op.node_name) + if self.op.node_name in all_nodes: + all_nodes.remove(self.op.node_name) return env, all_nodes, all_nodes def CheckPrereq(self): @@ -1815,7 +2304,7 @@ class LURemoveNode(LogicalUnit): - it does not have primary or secondary instances - it's not the master - Any errors are signalled by raising errors.OpPrereqError. + Any errors are signaled by raising errors.OpPrereqError. """ node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name)) @@ -1845,12 +2334,22 @@ class LURemoveNode(LogicalUnit): logging.info("Stopping the node daemon and removing configs from node %s", node.name) + # Promote nodes to master candidate as needed + _AdjustCandidatePool(self, exceptions=[node.name]) self.context.RemoveNode(node.name) - self.rpc.call_node_leave_cluster(node.name) + # Run post hooks on the node before it's removed + hm = self.proc.hmclass(self.rpc.call_hooks_runner, self) + try: + h_results = hm.RunPhase(constants.HOOKS_PHASE_POST, [node.name]) + except: + self.LogWarning("Errors occurred running hooks on %s" % node.name) - # Promote nodes to master candidate as needed - _AdjustCandidatePool(self) + result = self.rpc.call_node_leave_cluster(node.name) + msg = result.fail_msg + if msg: + self.LogWarning("Errors encountered on the remote node while leaving" + " the cluster: %s", msg) class LUQueryNodes(NoHooksLU): @@ -1859,6 +2358,10 @@ class LUQueryNodes(NoHooksLU): """ _OP_REQP = ["output_fields", "names", "use_locking"] REQ_BGL = False + + _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid", + "master_candidate", "offline", "drained"] + _FIELDS_DYNAMIC = utils.FieldSet( "dtotal", "dfree", "mtotal", "mnode", "mfree", @@ -1866,16 +2369,12 @@ class LUQueryNodes(NoHooksLU): "ctotal", "cnodes", "csockets", ) - _FIELDS_STATIC = utils.FieldSet( - "name", "pinst_cnt", "sinst_cnt", + _FIELDS_STATIC = utils.FieldSet(*[ + "pinst_cnt", "sinst_cnt", "pinst_list", "sinst_list", "pip", "sip", "tags", - "serial_no", - "master_candidate", "master", - "offline", - "drained", - "role", + "role"] + _SIMPLE_FIELDS ) def ExpandNames(self): @@ -1933,8 +2432,8 @@ class LUQueryNodes(NoHooksLU): self.cfg.GetHypervisorType()) for name in nodenames: nodeinfo = node_data[name] - if not nodeinfo.failed and nodeinfo.data: - nodeinfo = nodeinfo.data + if not nodeinfo.fail_msg and nodeinfo.payload: + nodeinfo = nodeinfo.payload fn = utils.TryConvert live_data[name] = { "mtotal": fn(int, nodeinfo.get('memory_total', None)), @@ -1976,8 +2475,8 @@ class LUQueryNodes(NoHooksLU): for node in nodelist: node_output = [] for field in self.op.output_fields: - if field == "name": - val = node.name + if field in self._SIMPLE_FIELDS: + val = getattr(node, field) elif field == "pinst_list": val = list(node_to_primary[node.name]) elif field == "sinst_list": @@ -1992,16 +2491,8 @@ class LUQueryNodes(NoHooksLU): val = node.secondary_ip elif field == "tags": val = list(node.GetTags()) - elif field == "serial_no": - val = node.serial_no - elif field == "master_candidate": - val = node.master_candidate elif field == "master": val = node.name == master_node - elif field == "offline": - val = node.offline - elif field == "drained": - val = node.drained elif self._FIELDS_DYNAMIC.Matches(field): val = live_data[node.name].get(field, None) elif field == "role": @@ -2067,10 +2558,15 @@ class LUQueryNodeVolumes(NoHooksLU): output = [] for node in nodenames: - if node not in volumes or volumes[node].failed or not volumes[node].data: + nresult = volumes[node] + if nresult.offline: + continue + msg = nresult.fail_msg + if msg: + self.LogWarning("Can't compute volume data on node %s: %s", node, msg) continue - node_vols = volumes[node].data[:] + node_vols = nresult.payload[:] node_vols.sort(key=lambda vol: vol['dev']) for vol in node_vols: @@ -2104,22 +2600,170 @@ class LUQueryNodeVolumes(NoHooksLU): return output -class LUAddNode(LogicalUnit): - """Logical unit for adding node to the cluster. +class LUQueryNodeStorage(NoHooksLU): + """Logical unit for getting information on storage units on node(s). """ - HPATH = "node-add" - HTYPE = constants.HTYPE_NODE - _OP_REQP = ["node_name"] + _OP_REQP = ["nodes", "storage_type", "output_fields"] + REQ_BGL = False + _FIELDS_STATIC = utils.FieldSet("node") - def BuildHooksEnv(self): - """Build hooks env. + def ExpandNames(self): + storage_type = self.op.storage_type - This will run on all nodes before, and on all nodes + the new node after. + if storage_type not in constants.VALID_STORAGE_FIELDS: + raise errors.OpPrereqError("Unknown storage type: %s" % storage_type) - """ - env = { - "OP_TARGET": self.op.node_name, + dynamic_fields = constants.VALID_STORAGE_FIELDS[storage_type] + + _CheckOutputFields(static=self._FIELDS_STATIC, + dynamic=utils.FieldSet(*dynamic_fields), + selected=self.op.output_fields) + + self.needed_locks = {} + self.share_locks[locking.LEVEL_NODE] = 1 + + if self.op.nodes: + self.needed_locks[locking.LEVEL_NODE] = \ + _GetWantedNodes(self, self.op.nodes) + else: + self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + + def CheckPrereq(self): + """Check prerequisites. + + This checks that the fields required are valid output fields. + + """ + self.op.name = getattr(self.op, "name", None) + + self.nodes = self.acquired_locks[locking.LEVEL_NODE] + + def Exec(self, feedback_fn): + """Computes the list of nodes and their attributes. + + """ + # Always get name to sort by + if constants.SF_NAME in self.op.output_fields: + fields = self.op.output_fields[:] + else: + fields = [constants.SF_NAME] + self.op.output_fields + + # Never ask for node as it's only known to the LU + while "node" in fields: + fields.remove("node") + + field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)]) + name_idx = field_idx[constants.SF_NAME] + + st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) + data = self.rpc.call_storage_list(self.nodes, + self.op.storage_type, st_args, + self.op.name, fields) + + result = [] + + for node in utils.NiceSort(self.nodes): + nresult = data[node] + if nresult.offline: + continue + + msg = nresult.fail_msg + if msg: + self.LogWarning("Can't get storage data from node %s: %s", node, msg) + continue + + rows = dict([(row[name_idx], row) for row in nresult.payload]) + + for name in utils.NiceSort(rows.keys()): + row = rows[name] + + out = [] + + for field in self.op.output_fields: + if field == "node": + val = node + elif field in field_idx: + val = row[field_idx[field]] + else: + raise errors.ParameterError(field) + + out.append(val) + + result.append(out) + + return result + + +class LUModifyNodeStorage(NoHooksLU): + """Logical unit for modifying a storage volume on a node. + + """ + _OP_REQP = ["node_name", "storage_type", "name", "changes"] + REQ_BGL = False + + def CheckArguments(self): + node_name = self.cfg.ExpandNodeName(self.op.node_name) + if node_name is None: + raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name) + + self.op.node_name = node_name + + storage_type = self.op.storage_type + if storage_type not in constants.VALID_STORAGE_FIELDS: + raise errors.OpPrereqError("Unknown storage type: %s" % storage_type) + + def ExpandNames(self): + self.needed_locks = { + locking.LEVEL_NODE: self.op.node_name, + } + + def CheckPrereq(self): + """Check prerequisites. + + """ + storage_type = self.op.storage_type + + try: + modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type] + except KeyError: + raise errors.OpPrereqError("Storage units of type '%s' can not be" + " modified" % storage_type) + + diff = set(self.op.changes.keys()) - modifiable + if diff: + raise errors.OpPrereqError("The following fields can not be modified for" + " storage units of type '%s': %r" % + (storage_type, list(diff))) + + def Exec(self, feedback_fn): + """Computes the list of nodes and their attributes. + + """ + st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) + result = self.rpc.call_storage_modify(self.op.node_name, + self.op.storage_type, st_args, + self.op.name, self.op.changes) + result.Raise("Failed to modify storage unit '%s' on %s" % + (self.op.name, self.op.node_name)) + + +class LUAddNode(LogicalUnit): + """Logical unit for adding node to the cluster. + + """ + HPATH = "node-add" + HTYPE = constants.HTYPE_NODE + _OP_REQP = ["node_name"] + + def BuildHooksEnv(self): + """Build hooks env. + + This will run on all nodes before, and on all nodes + the new node after. + + """ + env = { + "OP_TARGET": self.op.node_name, "NODE_NAME": self.op.node_name, "NODE_PIP": self.op.primary_ip, "NODE_SIP": self.op.secondary_ip, @@ -2136,7 +2780,7 @@ class LUAddNode(LogicalUnit): - it is resolvable - its parameters (single/dual homed) matches the cluster - Any errors are signalled by raising errors.OpPrereqError. + Any errors are signaled by raising errors.OpPrereqError. """ node_name = self.op.node_name @@ -2190,7 +2834,7 @@ class LUAddNode(LogicalUnit): raise errors.OpPrereqError("The master has a private ip but the" " new node doesn't have one") - # checks reachablity + # checks reachability if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT): raise errors.OpPrereqError("Node not reachable by ping") @@ -2201,15 +2845,12 @@ class LUAddNode(LogicalUnit): raise errors.OpPrereqError("Node secondary ip not reachable by TCP" " based ping to noded port") - cp_size = self.cfg.GetClusterInfo().candidate_pool_size if self.op.readd: exceptions = [node] else: exceptions = [] - mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions) - # the new node will increase mc_max with one, so: - mc_max = min(mc_max + 1, cp_size) - self.master_candidate = mc_now < mc_max + + self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions) if self.op.readd: self.new_node = self.cfg.GetNodeInfo(node) @@ -2244,17 +2885,14 @@ class LUAddNode(LogicalUnit): # check connectivity result = self.rpc.call_version([node])[node] - result.Raise() - if result.data: - if constants.PROTOCOL_VERSION == result.data: - logging.info("Communication to node %s fine, sw version %s match", - node, result.data) - else: - raise errors.OpExecError("Version mismatch master version %s," - " node version %s" % - (constants.PROTOCOL_VERSION, result.data)) + result.Raise("Can't get version information from node %s" % node) + if constants.PROTOCOL_VERSION == result.payload: + logging.info("Communication to node %s fine, sw version %s match", + node, result.payload) else: - raise errors.OpExecError("Cannot get version from the new node") + raise errors.OpExecError("Version mismatch master version %s," + " node version %s" % + (constants.PROTOCOL_VERSION, result.payload)) # setup ssh on node logging.info("Copy ssh key to node %s", node) @@ -2265,88 +2903,59 @@ class LUAddNode(LogicalUnit): priv_key, pub_key] for i in keyfiles: - f = open(i, 'r') - try: - keyarray.append(f.read()) - finally: - f.close() + keyarray.append(utils.ReadFile(i)) result = self.rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2], keyarray[3], keyarray[4], keyarray[5]) - - msg = result.RemoteFailMsg() - if msg: - raise errors.OpExecError("Cannot transfer ssh keys to the" - " new node: %s" % msg) + result.Raise("Cannot transfer ssh keys to the new node") # Add node to our /etc/hosts, and add key to known_hosts - utils.AddHostToEtcHosts(new_node.name) + if self.cfg.GetClusterInfo().modify_etc_hosts: + utils.AddHostToEtcHosts(new_node.name) if new_node.secondary_ip != new_node.primary_ip: result = self.rpc.call_node_has_ip_address(new_node.name, new_node.secondary_ip) - if result.failed or not result.data: + result.Raise("Failure checking secondary ip on node %s" % new_node.name, + prereq=True) + if not result.payload: raise errors.OpExecError("Node claims it doesn't have the secondary ip" " you gave (%s). Please fix and re-run this" " command." % new_node.secondary_ip) node_verify_list = [self.cfg.GetMasterNode()] node_verify_param = { - 'nodelist': [node], + constants.NV_NODELIST: [node], # TODO: do a node-net-test as well? } result = self.rpc.call_node_verify(node_verify_list, node_verify_param, self.cfg.GetClusterName()) for verifier in node_verify_list: - if result[verifier].failed or not result[verifier].data: - raise errors.OpExecError("Cannot communicate with %s's node daemon" - " for remote verification" % verifier) - if result[verifier].data['nodelist']: - for failed in result[verifier].data['nodelist']: - feedback_fn("ssh/hostname verification failed %s -> %s" % - (verifier, result[verifier].data['nodelist'][failed])) + result[verifier].Raise("Cannot communicate with node %s" % verifier) + nl_payload = result[verifier].payload[constants.NV_NODELIST] + if nl_payload: + for failed in nl_payload: + feedback_fn("ssh/hostname verification failed" + " (checking from %s): %s" % + (verifier, nl_payload[failed])) raise errors.OpExecError("ssh/hostname verification failed.") - # Distribute updated /etc/hosts and known_hosts to all nodes, - # including the node just added - myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode()) - dist_nodes = self.cfg.GetNodeList() - if not self.op.readd: - dist_nodes.append(node) - if myself.name in dist_nodes: - dist_nodes.remove(myself.name) - - logging.debug("Copying hosts and known_hosts to all nodes") - for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE): - result = self.rpc.call_upload_file(dist_nodes, fname) - for to_node, to_result in result.iteritems(): - if to_result.failed or not to_result.data: - logging.error("Copy of file %s to node %s failed", fname, to_node) - - to_copy = [] - enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors - if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors): - to_copy.append(constants.VNC_PASSWORD_FILE) - - for fname in to_copy: - result = self.rpc.call_upload_file([node], fname) - if result[node].failed or not result[node]: - logging.error("Could not copy file %s to node %s", fname, node) - if self.op.readd: + _RedistributeAncillaryFiles(self) self.context.ReaddNode(new_node) # make sure we redistribute the config self.cfg.Update(new_node) # and make sure the new node will not have old files around if not new_node.master_candidate: result = self.rpc.call_node_demote_from_mc(new_node.name) - msg = result.RemoteFailMsg() + msg = result.fail_msg if msg: self.LogWarning("Node failed to demote itself from master" " candidate status: %s" % msg) else: + _RedistributeAncillaryFiles(self, additional_nodes=[node]) self.context.AddNode(new_node) @@ -2401,18 +3010,30 @@ class LUSetNodeParams(LogicalUnit): """ node = self.node = self.cfg.GetNodeInfo(self.op.node_name) - if ((self.op.master_candidate == False or self.op.offline == True or - self.op.drained == True) and node.master_candidate): - # we will demote the node from master_candidate + if (self.op.master_candidate is not None or + self.op.drained is not None or + self.op.offline is not None): + # we can't change the master's node flags if self.op.node_name == self.cfg.GetMasterNode(): - raise errors.OpPrereqError("The master node has to be a" - " master candidate, online and not drained") + raise errors.OpPrereqError("The master role can be changed" + " only via masterfailover") + + # Boolean value that tells us whether we're offlining or draining the node + offline_or_drain = self.op.offline == True or self.op.drained == True + deoffline_or_drain = self.op.offline == False or self.op.drained == False + + if (node.master_candidate and + (self.op.master_candidate == False or offline_or_drain)): cp_size = self.cfg.GetClusterInfo().candidate_pool_size - num_candidates, _ = self.cfg.GetMasterCandidateStats() - if num_candidates <= cp_size: + mc_now, mc_should, mc_max = self.cfg.GetMasterCandidateStats() + if mc_now <= cp_size: msg = ("Not enough master candidates (desired" - " %d, new value will be %d)" % (cp_size, num_candidates-1)) - if self.op.force: + " %d, new value will be %d)" % (cp_size, mc_now-1)) + # Only allow forcing the operation if it's an offline/drain operation, + # and we could not possibly promote more nodes. + # FIXME: this can still lead to issues if in any way another node which + # could be promoted appears in the meantime. + if self.op.force and offline_or_drain and mc_should == mc_max: self.LogWarning(msg) else: raise errors.OpPrereqError(msg) @@ -2423,6 +3044,13 @@ class LUSetNodeParams(LogicalUnit): raise errors.OpPrereqError("Node '%s' is offline or drained, can't set" " to master_candidate" % node.name) + # If we're being deofflined/drained, we'll MC ourself if needed + if (deoffline_or_drain and not offline_or_drain and not + self.op.master_candidate == True): + self.op.master_candidate = _DecideSelfPromotion(self) + if self.op.master_candidate: + self.LogInfo("Autopromoting node to master candidate") + return def Exec(self, feedback_fn): @@ -2452,7 +3080,7 @@ class LUSetNodeParams(LogicalUnit): result.append(("master_candidate", str(self.op.master_candidate))) if self.op.master_candidate == False: rrc = self.rpc.call_node_demote_from_mc(node.name) - msg = rrc.RemoteFailMsg() + msg = rrc.fail_msg if msg: self.LogWarning("Node failed to demote itself: %s" % msg) @@ -2465,7 +3093,7 @@ class LUSetNodeParams(LogicalUnit): changed_mc = True result.append(("master_candidate", "auto-demotion due to drain")) rrc = self.rpc.call_node_demote_from_mc(node.name) - msg = rrc.RemoteFailMsg() + msg = rrc.fail_msg if msg: self.LogWarning("Node failed to demote itself: %s" % msg) if node.offline: @@ -2481,6 +3109,49 @@ class LUSetNodeParams(LogicalUnit): return result +class LUPowercycleNode(NoHooksLU): + """Powercycles a node. + + """ + _OP_REQP = ["node_name", "force"] + REQ_BGL = False + + def CheckArguments(self): + node_name = self.cfg.ExpandNodeName(self.op.node_name) + if node_name is None: + raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name) + self.op.node_name = node_name + if node_name == self.cfg.GetMasterNode() and not self.op.force: + raise errors.OpPrereqError("The node is the master and the force" + " parameter was not set") + + def ExpandNames(self): + """Locking for PowercycleNode. + + This is a last-resort option and shouldn't block on other + jobs. Therefore, we grab no locks. + + """ + self.needed_locks = {} + + def CheckPrereq(self): + """Check prerequisites. + + This LU has no prereqs. + + """ + pass + + def Exec(self, feedback_fn): + """Reboots a node. + + """ + result = self.rpc.call_node_powercycle(self.op.node_name, + self.cfg.GetHypervisorType()) + result.Raise("Failed to schedule the reboot") + return result.payload + + class LUQueryClusterInfo(NoHooksLU): """Query cluster configuration. @@ -2506,21 +3177,25 @@ class LUQueryClusterInfo(NoHooksLU): "software_version": constants.RELEASE_VERSION, "protocol_version": constants.PROTOCOL_VERSION, "config_version": constants.CONFIG_VERSION, - "os_api_version": constants.OS_API_VERSION, + "os_api_version": max(constants.OS_API_VERSIONS), "export_version": constants.EXPORT_VERSION, "architecture": (platform.architecture()[0], platform.machine()), "name": cluster.cluster_name, "master": cluster.master_node, - "default_hypervisor": cluster.default_hypervisor, + "default_hypervisor": cluster.enabled_hypervisors[0], "enabled_hypervisors": cluster.enabled_hypervisors, - "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor]) - for hypervisor in cluster.enabled_hypervisors]), + "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name]) + for hypervisor_name in cluster.enabled_hypervisors]), "beparams": cluster.beparams, + "nicparams": cluster.nicparams, "candidate_pool_size": cluster.candidate_pool_size, - "default_bridge": cluster.default_bridge, "master_netdev": cluster.master_netdev, "volume_group_name": cluster.volume_group_name, "file_storage_dir": cluster.file_storage_dir, + "ctime": cluster.ctime, + "mtime": cluster.mtime, + "uuid": cluster.uuid, + "tags": list(cluster.GetTags()), } return result @@ -2533,7 +3208,8 @@ class LUQueryConfigValues(NoHooksLU): _OP_REQP = [] REQ_BGL = False _FIELDS_DYNAMIC = utils.FieldSet() - _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag") + _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag", + "watcher_pause") def ExpandNames(self): self.needed_locks = {} @@ -2560,6 +3236,8 @@ class LUQueryConfigValues(NoHooksLU): entry = self.cfg.GetMasterNode() elif field == "drain_flag": entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE) + elif field == "watcher_pause": + return utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE) else: raise errors.ParameterError(field) values.append(entry) @@ -2650,7 +3328,7 @@ def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False, node_disk.UnsetSize() lu.cfg.SetDiskID(node_disk, node) result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False) - msg = result.RemoteFailMsg() + msg = result.fail_msg if msg: lu.proc.LogWarning("Could not prepare block device %s on node %s" " (is_primary=False, pass=1): %s", @@ -2670,7 +3348,7 @@ def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False, node_disk.UnsetSize() lu.cfg.SetDiskID(node_disk, node) result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True) - msg = result.RemoteFailMsg() + msg = result.fail_msg if msg: lu.proc.LogWarning("Could not prepare block device %s on node %s" " (is_primary=True, pass=2): %s", @@ -2692,7 +3370,7 @@ def _StartInstanceDisks(lu, instance, force): """Start the disks of an instance. """ - disks_ok, dummy = _AssembleInstanceDisks(lu, instance, + disks_ok, _ = _AssembleInstanceDisks(lu, instance, ignore_secondaries=force) if not disks_ok: _ShutdownInstanceDisks(lu, instance) @@ -2744,14 +3422,11 @@ def _SafeShutdownInstanceDisks(lu, instance): _ShutdownInstanceDisks. """ - ins_l = lu.rpc.call_instance_list([instance.primary_node], - [instance.hypervisor]) - ins_l = ins_l[instance.primary_node] - if ins_l.failed or not isinstance(ins_l.data, list): - raise errors.OpExecError("Can't contact node '%s'" % - instance.primary_node) - - if instance.name in ins_l.data: + pnode = instance.primary_node + ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode] + ins_l.Raise("Can't contact node %s" % pnode) + + if instance.name in ins_l.payload: raise errors.OpExecError("Instance is running, can't shutdown" " block devices.") @@ -2772,7 +3447,7 @@ def _ShutdownInstanceDisks(lu, instance, ignore_primary=False): for node, top_disk in disk.ComputeNodeTree(instance.primary_node): lu.cfg.SetDiskID(top_disk, node) result = lu.rpc.call_blockdev_shutdown(node, top_disk) - msg = result.RemoteFailMsg() + msg = result.fail_msg if msg: lu.LogWarning("Could not shutdown block device %s on node %s: %s", disk.iv_name, node, msg) @@ -2804,15 +3479,15 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name): """ nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name) - nodeinfo[node].Raise() - free_mem = nodeinfo[node].data.get('memory_free') + nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True) + free_mem = nodeinfo[node].payload.get('memory_free', None) if not isinstance(free_mem, int): raise errors.OpPrereqError("Can't compute free memory on node %s, result" - " was '%s'" % (node, free_mem)) + " was '%s'" % (node, free_mem)) if requested > free_mem: raise errors.OpPrereqError("Not enough memory on node %s for %s:" - " needed %s MiB, available %s MiB" % - (node, reason, requested, free_mem)) + " needed %s MiB, available %s MiB" % + (node, reason, requested, free_mem)) class LUStartupInstance(LogicalUnit): @@ -2870,7 +3545,7 @@ class LUStartupInstance(LogicalUnit): # check hypervisor parameter syntax (locally) cluster = self.cfg.GetClusterInfo() utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES) - filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor], + filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor], instance.hvparams) filled_hvp.update(self.hvparams) hv_type = hypervisor.GetHypervisor(instance.hypervisor) @@ -2881,14 +3556,15 @@ class LUStartupInstance(LogicalUnit): _CheckNodeOnline(self, instance.primary_node) bep = self.cfg.GetClusterInfo().FillBE(instance) - # check bridges existance + # check bridges existence _CheckInstanceBridgesExist(self, instance) remote_info = self.rpc.call_instance_info(instance.primary_node, instance.name, instance.hypervisor) - remote_info.Raise() - if not remote_info.data: + remote_info.Raise("Error checking node %s" % instance.primary_node, + prereq=True) + if not remote_info.payload: # not running already _CheckNodeFreeMemory(self, instance.primary_node, "starting instance %s" % instance.name, bep[constants.BE_MEMORY], instance.hypervisor) @@ -2908,7 +3584,7 @@ class LUStartupInstance(LogicalUnit): result = self.rpc.call_instance_start(node_current, instance, self.hvparams, self.beparams) - msg = result.RemoteFailMsg() + msg = result.fail_msg if msg: _ShutdownInstanceDisks(self, instance) raise errors.OpExecError("Could not start instance: %s" % msg) @@ -2959,7 +3635,7 @@ class LURebootInstance(LogicalUnit): _CheckNodeOnline(self, instance.primary_node) - # check bridges existance + # check bridges existence _CheckInstanceBridgesExist(self, instance) def Exec(self, feedback_fn): @@ -2978,19 +3654,14 @@ class LURebootInstance(LogicalUnit): self.cfg.SetDiskID(disk, node_current) result = self.rpc.call_instance_reboot(node_current, instance, reboot_type) - msg = result.RemoteFailMsg() - if msg: - raise errors.OpExecError("Could not reboot instance: %s" % msg) + result.Raise("Could not reboot instance") else: result = self.rpc.call_instance_shutdown(node_current, instance) - msg = result.RemoteFailMsg() - if msg: - raise errors.OpExecError("Could not shutdown instance for" - " full reboot: %s" % msg) + result.Raise("Could not shutdown instance for full reboot") _ShutdownInstanceDisks(self, instance) _StartInstanceDisks(self, instance, ignore_secondaries) result = self.rpc.call_instance_start(node_current, instance, None, None) - msg = result.RemoteFailMsg() + msg = result.fail_msg if msg: _ShutdownInstanceDisks(self, instance) raise errors.OpExecError("Could not start instance for" @@ -3040,7 +3711,7 @@ class LUShutdownInstance(LogicalUnit): node_current = instance.primary_node self.cfg.MarkInstanceDown(instance.name) result = self.rpc.call_instance_shutdown(node_current, instance) - msg = result.RemoteFailMsg() + msg = result.fail_msg if msg: self.proc.LogWarning("Could not shutdown instance: %s" % msg) @@ -3089,13 +3760,15 @@ class LUReinstallInstance(LogicalUnit): remote_info = self.rpc.call_instance_info(instance.primary_node, instance.name, instance.hypervisor) - remote_info.Raise() - if remote_info.data: + remote_info.Raise("Error checking node %s" % instance.primary_node, + prereq=True) + if remote_info.payload: raise errors.OpPrereqError("Instance '%s' is running on the node %s" % (self.op.instance_name, instance.primary_node)) self.op.os_type = getattr(self.op, "os_type", None) + self.op.force_variant = getattr(self.op, "force_variant", False) if self.op.os_type is not None: # OS verification pnode = self.cfg.GetNodeInfo( @@ -3104,10 +3777,10 @@ class LUReinstallInstance(LogicalUnit): raise errors.OpPrereqError("Primary node '%s' is unknown" % self.op.pnode) result = self.rpc.call_os_get(pnode.name, self.op.os_type) - result.Raise() - if not isinstance(result.data, objects.OS): - raise errors.OpPrereqError("OS '%s' not in supported OS list for" - " primary node" % self.op.os_type) + result.Raise("OS '%s' not in supported OS list for primary node %s" % + (self.op.os_type, pnode.name), prereq=True) + if not self.op.force_variant: + _CheckOSVariant(result.payload, self.op.os_type) self.instance = instance @@ -3125,16 +3798,96 @@ class LUReinstallInstance(LogicalUnit): _StartInstanceDisks(self, inst, None) try: feedback_fn("Running the instance OS create scripts...") - result = self.rpc.call_instance_os_add(inst.primary_node, inst) - msg = result.RemoteFailMsg() - if msg: - raise errors.OpExecError("Could not install OS for instance %s" - " on node %s: %s" % - (inst.name, inst.primary_node, msg)) + result = self.rpc.call_instance_os_add(inst.primary_node, inst, True) + result.Raise("Could not install OS for instance %s on node %s" % + (inst.name, inst.primary_node)) finally: _ShutdownInstanceDisks(self, inst) +class LURecreateInstanceDisks(LogicalUnit): + """Recreate an instance's missing disks. + + """ + HPATH = "instance-recreate-disks" + HTYPE = constants.HTYPE_INSTANCE + _OP_REQP = ["instance_name", "disks"] + REQ_BGL = False + + def CheckArguments(self): + """Check the arguments. + + """ + if not isinstance(self.op.disks, list): + raise errors.OpPrereqError("Invalid disks parameter") + for item in self.op.disks: + if (not isinstance(item, int) or + item < 0): + raise errors.OpPrereqError("Invalid disk specification '%s'" % + str(item)) + + def ExpandNames(self): + self._ExpandAndLockInstance() + + def BuildHooksEnv(self): + """Build hooks env. + + This runs on master, primary and secondary nodes of the instance. + + """ + env = _BuildInstanceHookEnvByObject(self, self.instance) + nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) + return env, nl, nl + + def CheckPrereq(self): + """Check prerequisites. + + This checks that the instance is in the cluster and is not running. + + """ + instance = self.cfg.GetInstanceInfo(self.op.instance_name) + assert instance is not None, \ + "Cannot retrieve locked instance %s" % self.op.instance_name + _CheckNodeOnline(self, instance.primary_node) + + if instance.disk_template == constants.DT_DISKLESS: + raise errors.OpPrereqError("Instance '%s' has no disks" % + self.op.instance_name) + if instance.admin_up: + raise errors.OpPrereqError("Instance '%s' is marked to be up" % + self.op.instance_name) + remote_info = self.rpc.call_instance_info(instance.primary_node, + instance.name, + instance.hypervisor) + remote_info.Raise("Error checking node %s" % instance.primary_node, + prereq=True) + if remote_info.payload: + raise errors.OpPrereqError("Instance '%s' is running on the node %s" % + (self.op.instance_name, + instance.primary_node)) + + if not self.op.disks: + self.op.disks = range(len(instance.disks)) + else: + for idx in self.op.disks: + if idx >= len(instance.disks): + raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx) + + self.instance = instance + + def Exec(self, feedback_fn): + """Recreate the disks. + + """ + to_skip = [] + for idx, disk in enumerate(self.instance.disks): + if idx not in self.op.disks: # disk idx has not been passed in + to_skip.append(idx) + continue + + _CreateDisks(self, self.instance, to_skip=to_skip) + + class LURenameInstance(LogicalUnit): """Rename an instance. @@ -3173,8 +3926,9 @@ class LURenameInstance(LogicalUnit): remote_info = self.rpc.call_instance_info(instance.primary_node, instance.name, instance.hypervisor) - remote_info.Raise() - if remote_info.data: + remote_info.Raise("Error checking node %s" % instance.primary_node, + prereq=True) + if remote_info.payload: raise errors.OpPrereqError("Instance '%s' is running on the node %s" % (self.op.instance_name, instance.primary_node)) @@ -3218,25 +3972,16 @@ class LURenameInstance(LogicalUnit): result = self.rpc.call_file_storage_dir_rename(inst.primary_node, old_file_storage_dir, new_file_storage_dir) - result.Raise() - if not result.data: - raise errors.OpExecError("Could not connect to node '%s' to rename" - " directory '%s' to '%s' (but the instance" - " has been renamed in Ganeti)" % ( - inst.primary_node, old_file_storage_dir, - new_file_storage_dir)) - - if not result.data[0]: - raise errors.OpExecError("Could not rename directory '%s' to '%s'" - " (but the instance has been renamed in" - " Ganeti)" % (old_file_storage_dir, - new_file_storage_dir)) + result.Raise("Could not rename on node %s directory '%s' to '%s'" + " (but the instance has been renamed in Ganeti)" % + (inst.primary_node, old_file_storage_dir, + new_file_storage_dir)) _StartInstanceDisks(self, inst, None) try: result = self.rpc.call_instance_run_rename(inst.primary_node, inst, old_name) - msg = result.RemoteFailMsg() + msg = result.fail_msg if msg: msg = ("Could not run OS rename script for instance %s on node %s" " (but the instance has been renamed in Ganeti): %s" % @@ -3293,7 +4038,7 @@ class LURemoveInstance(LogicalUnit): instance.name, instance.primary_node) result = self.rpc.call_instance_shutdown(instance.primary_node, instance) - msg = result.RemoteFailMsg() + msg = result.fail_msg if msg: if self.op.ignore_failures: feedback_fn("Warning: can't shutdown instance: %s" % msg) @@ -3322,17 +4067,22 @@ class LUQueryInstances(NoHooksLU): """ _OP_REQP = ["output_fields", "names", "use_locking"] REQ_BGL = False + _SIMPLE_FIELDS = ["name", "os", "network_port", "hypervisor", + "serial_no", "ctime", "mtime", "uuid"] _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes", "admin_state", "disk_template", "ip", "mac", "bridge", + "nic_mode", "nic_link", "sda_size", "sdb_size", "vcpus", "tags", "network_port", "beparams", r"(disk)\.(size)/([0-9]+)", r"(disk)\.(sizes)", "disk_usage", - r"(nic)\.(mac|ip|bridge)/([0-9]+)", - r"(nic)\.(macs|ips|bridges)", + r"(nic)\.(mac|ip|mode|link)/([0-9]+)", + r"(nic)\.(bridge)/([0-9]+)", + r"(nic)\.(macs|ips|modes|links|bridges)", r"(disk|nic)\.(count)", - "serial_no", "hypervisor", "hvparams",] + + "hvparams", + ] + _SIMPLE_FIELDS + ["hv/%s" % name for name in constants.HVS_PARAMETERS] + ["be/%s" % name @@ -3412,12 +4162,12 @@ class LUQueryInstances(NoHooksLU): if result.offline: # offline nodes will be in both lists off_nodes.append(name) - if result.failed: + if result.fail_msg: bad_nodes.append(name) else: - if result.data: - live_data.update(result.data) - # else no instance is alive + if result.payload: + live_data.update(result.payload) + # else no instance is alive else: live_data = dict([(name, {}) for name in instance_names]) @@ -3426,16 +4176,17 @@ class LUQueryInstances(NoHooksLU): HVPREFIX = "hv/" BEPREFIX = "be/" output = [] + cluster = self.cfg.GetClusterInfo() for instance in instance_list: iout = [] - i_hv = self.cfg.GetClusterInfo().FillHV(instance) - i_be = self.cfg.GetClusterInfo().FillBE(instance) + i_hv = cluster.FillHV(instance) + i_be = cluster.FillBE(instance) + i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT], + nic.nicparams) for nic in instance.nics] for field in self.op.output_fields: st_match = self._FIELDS_STATIC.Matches(field) - if field == "name": - val = instance.name - elif field == "os": - val = instance.os + if field in self._SIMPLE_FIELDS: + val = getattr(instance, field) elif field == "pnode": val = instance.primary_node elif field == "snodes": @@ -3480,9 +4231,20 @@ class LUQueryInstances(NoHooksLU): val = instance.nics[0].ip else: val = None - elif field == "bridge": + elif field == "nic_mode": + if instance.nics: + val = i_nicp[0][constants.NIC_MODE] + else: + val = None + elif field == "nic_link": if instance.nics: - val = instance.nics[0].bridge + val = i_nicp[0][constants.NIC_LINK] + else: + val = None + elif field == "bridge": + if (instance.nics and + i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED): + val = i_nicp[0][constants.NIC_LINK] else: val = None elif field == "mac": @@ -3501,12 +4263,6 @@ class LUQueryInstances(NoHooksLU): val = _ComputeDiskSize(instance.disk_template, disk_sizes) elif field == "tags": val = list(instance.GetTags()) - elif field == "serial_no": - val = instance.serial_no - elif field == "network_port": - val = instance.network_port - elif field == "hypervisor": - val = instance.hypervisor elif field == "hvparams": val = i_hv elif (field.startswith(HVPREFIX) and @@ -3539,8 +4295,17 @@ class LUQueryInstances(NoHooksLU): val = [nic.mac for nic in instance.nics] elif st_groups[1] == "ips": val = [nic.ip for nic in instance.nics] + elif st_groups[1] == "modes": + val = [nicp[constants.NIC_MODE] for nicp in i_nicp] + elif st_groups[1] == "links": + val = [nicp[constants.NIC_LINK] for nicp in i_nicp] elif st_groups[1] == "bridges": - val = [nic.bridge for nic in instance.nics] + val = [] + for nicp in i_nicp: + if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED: + val.append(nicp[constants.NIC_LINK]) + else: + val.append(None) else: # index-based item nic_idx = int(st_groups[2]) @@ -3551,8 +4316,16 @@ class LUQueryInstances(NoHooksLU): val = instance.nics[nic_idx].mac elif st_groups[1] == "ip": val = instance.nics[nic_idx].ip + elif st_groups[1] == "mode": + val = i_nicp[nic_idx][constants.NIC_MODE] + elif st_groups[1] == "link": + val = i_nicp[nic_idx][constants.NIC_LINK] elif st_groups[1] == "bridge": - val = instance.nics[nic_idx].bridge + nic_mode = i_nicp[nic_idx][constants.NIC_MODE] + if nic_mode == constants.NIC_MODE_BRIDGED: + val = i_nicp[nic_idx][constants.NIC_LINK] + else: + val = None else: assert False, "Unhandled NIC parameter" else: @@ -3620,7 +4393,6 @@ class LUFailoverInstance(LogicalUnit): target_node = secondary_nodes[0] _CheckNodeOnline(self, target_node) _CheckNodeNotDrained(self, target_node) - if instance.admin_up: # check memory requirements on the secondary node _CheckNodeFreeMemory(self, target_node, "failing over instance %s" % @@ -3631,13 +4403,7 @@ class LUFailoverInstance(LogicalUnit): " instance will not be started") # check bridge existance - brlist = [nic.bridge for nic in instance.nics] - result = self.rpc.call_bridges_exist(target_node, brlist) - result.Raise() - if not result.data: - raise errors.OpPrereqError("One or more target bridges %s does not" - " exist on destination node '%s'" % - (brlist, target_node)) + _CheckInstanceBridgesExist(self, instance, node=target_node) def Exec(self, feedback_fn): """Failover an instance. @@ -3664,7 +4430,7 @@ class LUFailoverInstance(LogicalUnit): instance.name, source_node) result = self.rpc.call_instance_shutdown(source_node, instance) - msg = result.RemoteFailMsg() + msg = result.fail_msg if msg: if self.op.ignore_consistency: self.proc.LogWarning("Could not shutdown instance %s on node %s." @@ -3690,7 +4456,7 @@ class LUFailoverInstance(LogicalUnit): logging.info("Starting instance %s on node %s", instance.name, target_node) - disks_ok, dummy = _AssembleInstanceDisks(self, instance, + disks_ok, _ = _AssembleInstanceDisks(self, instance, ignore_secondaries=True) if not disks_ok: _ShutdownInstanceDisks(self, instance) @@ -3698,7 +4464,7 @@ class LUFailoverInstance(LogicalUnit): feedback_fn("* starting the instance on the target node") result = self.rpc.call_instance_start(target_node, instance, None, None) - msg = result.RemoteFailMsg() + msg = result.fail_msg if msg: _ShutdownInstanceDisks(self, instance) raise errors.OpExecError("Could not start instance %s on node %s: %s" % @@ -3720,9 +4486,14 @@ class LUMigrateInstance(LogicalUnit): def ExpandNames(self): self._ExpandAndLockInstance() + self.needed_locks[locking.LEVEL_NODE] = [] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + self._migrater = TLMigrateInstance(self, self.op.instance_name, + self.op.live, self.op.cleanup) + self.tasklets = [self._migrater] + def DeclareLocks(self, level): if level == locking.LEVEL_NODE: self._LockInstancesNodes() @@ -3733,10 +4504,49 @@ class LUMigrateInstance(LogicalUnit): This runs on master, primary and secondary nodes of the instance. """ - env = _BuildInstanceHookEnvByObject(self, self.instance) + instance = self._migrater.instance + env = _BuildInstanceHookEnvByObject(self, instance) env["MIGRATE_LIVE"] = self.op.live env["MIGRATE_CLEANUP"] = self.op.cleanup - nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes) + nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes) + return env, nl, nl + + +class LUMoveInstance(LogicalUnit): + """Move an instance by data-copying. + + """ + HPATH = "instance-move" + HTYPE = constants.HTYPE_INSTANCE + _OP_REQP = ["instance_name", "target_node"] + REQ_BGL = False + + def ExpandNames(self): + self._ExpandAndLockInstance() + target_node = self.cfg.ExpandNodeName(self.op.target_node) + if target_node is None: + raise errors.OpPrereqError("Node '%s' not known" % + self.op.target_node) + self.op.target_node = target_node + self.needed_locks[locking.LEVEL_NODE] = [target_node] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE: + self._LockInstancesNodes(primary_only=True) + + def BuildHooksEnv(self): + """Build hooks env. + + This runs on master, primary and secondary nodes of the instance. + + """ + env = { + "TARGET_NODE": self.op.target_node, + } + env.update(_BuildInstanceHookEnvByObject(self, self.instance)) + nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node, + self.op.target_node] return env, nl, nl def CheckPrereq(self): @@ -3745,20 +4555,224 @@ class LUMigrateInstance(LogicalUnit): This checks that the instance is in the cluster. """ - instance = self.cfg.GetInstanceInfo( - self.cfg.ExpandInstanceName(self.op.instance_name)) - if instance is None: - raise errors.OpPrereqError("Instance '%s' not known" % - self.op.instance_name) + self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name) + assert self.instance is not None, \ + "Cannot retrieve locked instance %s" % self.op.instance_name - if instance.disk_template != constants.DT_DRBD8: - raise errors.OpPrereqError("Instance's disk layout is not" - " drbd8, cannot migrate.") + node = self.cfg.GetNodeInfo(self.op.target_node) + assert node is not None, \ + "Cannot retrieve locked node %s" % self.op.target_node - secondary_nodes = instance.secondary_nodes - if not secondary_nodes: - raise errors.ConfigurationError("No secondary node but using" - " drbd8 disk template") + self.target_node = target_node = node.name + + if target_node == instance.primary_node: + raise errors.OpPrereqError("Instance %s is already on the node %s" % + (instance.name, target_node)) + + bep = self.cfg.GetClusterInfo().FillBE(instance) + + for idx, dsk in enumerate(instance.disks): + if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE): + raise errors.OpPrereqError("Instance disk %d has a complex layout," + " cannot copy") + + _CheckNodeOnline(self, target_node) + _CheckNodeNotDrained(self, target_node) + + if instance.admin_up: + # check memory requirements on the secondary node + _CheckNodeFreeMemory(self, target_node, "failing over instance %s" % + instance.name, bep[constants.BE_MEMORY], + instance.hypervisor) + else: + self.LogInfo("Not checking memory on the secondary node as" + " instance will not be started") + + # check bridge existance + _CheckInstanceBridgesExist(self, instance, node=target_node) + + def Exec(self, feedback_fn): + """Move an instance. + + The move is done by shutting it down on its present node, copying + the data over (slow) and starting it on the new node. + + """ + instance = self.instance + + source_node = instance.primary_node + target_node = self.target_node + + self.LogInfo("Shutting down instance %s on source node %s", + instance.name, source_node) + + result = self.rpc.call_instance_shutdown(source_node, instance) + msg = result.fail_msg + if msg: + if self.op.ignore_consistency: + self.proc.LogWarning("Could not shutdown instance %s on node %s." + " Proceeding anyway. Please make sure node" + " %s is down. Error details: %s", + instance.name, source_node, source_node, msg) + else: + raise errors.OpExecError("Could not shutdown instance %s on" + " node %s: %s" % + (instance.name, source_node, msg)) + + # create the target disks + try: + _CreateDisks(self, instance, target_node=target_node) + except errors.OpExecError: + self.LogWarning("Device creation failed, reverting...") + try: + _RemoveDisks(self, instance, target_node=target_node) + finally: + self.cfg.ReleaseDRBDMinors(instance.name) + raise + + cluster_name = self.cfg.GetClusterInfo().cluster_name + + errs = [] + # activate, get path, copy the data over + for idx, disk in enumerate(instance.disks): + self.LogInfo("Copying data for disk %d", idx) + result = self.rpc.call_blockdev_assemble(target_node, disk, + instance.name, True) + if result.fail_msg: + self.LogWarning("Can't assemble newly created disk %d: %s", + idx, result.fail_msg) + errs.append(result.fail_msg) + break + dev_path = result.payload + result = self.rpc.call_blockdev_export(source_node, disk, + target_node, dev_path, + cluster_name) + if result.fail_msg: + self.LogWarning("Can't copy data over for disk %d: %s", + idx, result.fail_msg) + errs.append(result.fail_msg) + break + + if errs: + self.LogWarning("Some disks failed to copy, aborting") + try: + _RemoveDisks(self, instance, target_node=target_node) + finally: + self.cfg.ReleaseDRBDMinors(instance.name) + raise errors.OpExecError("Errors during disk copy: %s" % + (",".join(errs),)) + + instance.primary_node = target_node + self.cfg.Update(instance) + + self.LogInfo("Removing the disks on the original node") + _RemoveDisks(self, instance, target_node=source_node) + + # Only start the instance if it's marked as up + if instance.admin_up: + self.LogInfo("Starting instance %s on node %s", + instance.name, target_node) + + disks_ok, _ = _AssembleInstanceDisks(self, instance, + ignore_secondaries=True) + if not disks_ok: + _ShutdownInstanceDisks(self, instance) + raise errors.OpExecError("Can't activate the instance's disks") + + result = self.rpc.call_instance_start(target_node, instance, None, None) + msg = result.fail_msg + if msg: + _ShutdownInstanceDisks(self, instance) + raise errors.OpExecError("Could not start instance %s on node %s: %s" % + (instance.name, target_node, msg)) + + +class LUMigrateNode(LogicalUnit): + """Migrate all instances from a node. + + """ + HPATH = "node-migrate" + HTYPE = constants.HTYPE_NODE + _OP_REQP = ["node_name", "live"] + REQ_BGL = False + + def ExpandNames(self): + self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name) + if self.op.node_name is None: + raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name) + + self.needed_locks = { + locking.LEVEL_NODE: [self.op.node_name], + } + + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + + # Create tasklets for migrating instances for all instances on this node + names = [] + tasklets = [] + + for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name): + logging.debug("Migrating instance %s", inst.name) + names.append(inst.name) + + tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False)) + + self.tasklets = tasklets + + # Declare instance locks + self.needed_locks[locking.LEVEL_INSTANCE] = names + + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE: + self._LockInstancesNodes() + + def BuildHooksEnv(self): + """Build hooks env. + + This runs on the master, the primary and all the secondaries. + + """ + env = { + "NODE_NAME": self.op.node_name, + } + + nl = [self.cfg.GetMasterNode()] + + return (env, nl, nl) + + +class TLMigrateInstance(Tasklet): + def __init__(self, lu, instance_name, live, cleanup): + """Initializes this class. + + """ + Tasklet.__init__(self, lu) + + # Parameters + self.instance_name = instance_name + self.live = live + self.cleanup = cleanup + + def CheckPrereq(self): + """Check prerequisites. + + This checks that the instance is in the cluster. + + """ + instance = self.cfg.GetInstanceInfo( + self.cfg.ExpandInstanceName(self.instance_name)) + if instance is None: + raise errors.OpPrereqError("Instance '%s' not known" % + self.instance_name) + + if instance.disk_template != constants.DT_DRBD8: + raise errors.OpPrereqError("Instance's disk layout is not" + " drbd8, cannot migrate.") + + secondary_nodes = instance.secondary_nodes + if not secondary_nodes: + raise errors.ConfigurationError("No secondary node but using" + " drbd8 disk template") i_be = self.cfg.GetClusterInfo().FillBE(instance) @@ -3769,21 +4783,13 @@ class LUMigrateInstance(LogicalUnit): instance.hypervisor) # check bridge existance - brlist = [nic.bridge for nic in instance.nics] - result = self.rpc.call_bridges_exist(target_node, brlist) - if result.failed or not result.data: - raise errors.OpPrereqError("One or more target bridges %s does not" - " exist on destination node '%s'" % - (brlist, target_node)) - - if not self.op.cleanup: + _CheckInstanceBridgesExist(self, instance, node=target_node) + + if not self.cleanup: _CheckNodeNotDrained(self, target_node) result = self.rpc.call_instance_migratable(instance.primary_node, instance) - msg = result.RemoteFailMsg() - if msg: - raise errors.OpPrereqError("Can't migrate: %s - please use failover" % - msg) + result.Raise("Can't migrate, please use failover", prereq=True) self.instance = instance @@ -3802,10 +4808,7 @@ class LUMigrateInstance(LogicalUnit): self.instance.disks) min_percent = 100 for node, nres in result.items(): - msg = nres.RemoteFailMsg() - if msg: - raise errors.OpExecError("Cannot resync disks on node %s: %s" % - (node, msg)) + nres.Raise("Cannot resync disks on node %s" % node) node_done, node_percent = nres.payload all_done = all_done and node_done if node_percent is not None: @@ -3826,10 +4829,7 @@ class LUMigrateInstance(LogicalUnit): result = self.rpc.call_blockdev_close(node, self.instance.name, self.instance.disks) - msg = result.RemoteFailMsg() - if msg: - raise errors.OpExecError("Cannot change disk to secondary on node %s," - " error %s" % (node, msg)) + result.Raise("Cannot change disk to secondary on node %s" % node) def _GoStandalone(self): """Disconnect from the network. @@ -3839,10 +4839,7 @@ class LUMigrateInstance(LogicalUnit): result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip, self.instance.disks) for node, nres in result.items(): - msg = nres.RemoteFailMsg() - if msg: - raise errors.OpExecError("Cannot disconnect disks node %s," - " error %s" % (node, msg)) + nres.Raise("Cannot disconnect disks node %s" % node) def _GoReconnect(self, multimaster): """Reconnect to the network. @@ -3857,10 +4854,7 @@ class LUMigrateInstance(LogicalUnit): self.instance.disks, self.instance.name, multimaster) for node, nres in result.items(): - msg = nres.RemoteFailMsg() - if msg: - raise errors.OpExecError("Cannot change disks config on node %s," - " error: %s" % (node, msg)) + nres.Raise("Cannot change disks config on node %s" % node) def _ExecCleanup(self): """Try to cleanup after a failed migration. @@ -3885,12 +4879,10 @@ class LUMigrateInstance(LogicalUnit): " a bad state)") ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor]) for node, result in ins_l.items(): - result.Raise() - if not isinstance(result.data, list): - raise errors.OpExecError("Can't contact node '%s'" % node) + result.Raise("Can't contact node %s" % node) - runningon_source = instance.name in ins_l[source_node].data - runningon_target = instance.name in ins_l[target_node].data + runningon_source = instance.name in ins_l[source_node].payload + runningon_target = instance.name in ins_l[target_node].payload if runningon_source and runningon_target: raise errors.OpExecError("Instance seems to be running on two nodes," @@ -3940,10 +4932,10 @@ class LUMigrateInstance(LogicalUnit): self._GoReconnect(False) self._WaitUntilSync() except errors.OpExecError, err: - self.LogWarning("Migration failed and I can't reconnect the" - " drives: error '%s'\n" - "Please look and recover the instance status" % - str(err)) + self.lu.LogWarning("Migration failed and I can't reconnect the" + " drives: error '%s'\n" + "Please look and recover the instance status" % + str(err)) def _AbortMigration(self): """Call the hypervisor code to abort a started migration. @@ -3957,7 +4949,7 @@ class LUMigrateInstance(LogicalUnit): instance, migration_info, False) - abort_msg = abort_result.RemoteFailMsg() + abort_msg = abort_result.fail_msg if abort_msg: logging.error("Aborting migration failed on target node %s: %s" % (target_node, abort_msg)) @@ -3989,7 +4981,7 @@ class LUMigrateInstance(LogicalUnit): # First get the migration information from the remote node result = self.rpc.call_migration_info(source_node, instance) - msg = result.RemoteFailMsg() + msg = result.fail_msg if msg: log_err = ("Failed fetching source migration information from %s: %s" % (source_node, msg)) @@ -4010,7 +5002,7 @@ class LUMigrateInstance(LogicalUnit): migration_info, self.nodes_ip[target_node]) - msg = result.RemoteFailMsg() + msg = result.fail_msg if msg: logging.error("Instance pre-migration failed, trying to revert" " disk status: %s", msg) @@ -4023,8 +5015,8 @@ class LUMigrateInstance(LogicalUnit): time.sleep(10) result = self.rpc.call_instance_migrate(source_node, instance, self.nodes_ip[target_node], - self.op.live) - msg = result.RemoteFailMsg() + self.live) + msg = result.fail_msg if msg: logging.error("Instance migration failed, trying to revert" " disk status: %s", msg) @@ -4042,7 +5034,7 @@ class LUMigrateInstance(LogicalUnit): instance, migration_info, True) - msg = result.RemoteFailMsg() + msg = result.fail_msg if msg: logging.error("Instance migration succeeded, but finalization failed:" " %s" % msg) @@ -4061,6 +5053,8 @@ class LUMigrateInstance(LogicalUnit): """Perform the migration. """ + feedback_fn("Migrating instance %s" % self.instance.name) + self.feedback_fn = feedback_fn self.source_node = self.instance.primary_node @@ -4070,7 +5064,8 @@ class LUMigrateInstance(LogicalUnit): self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip, self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip, } - if self.op.cleanup: + + if self.cleanup: return self._ExecCleanup() else: return self._ExecMigration() @@ -4142,11 +5137,8 @@ def _CreateSingleBlockDev(lu, node, instance, device, info, force_open): lu.cfg.SetDiskID(device, node) result = lu.rpc.call_blockdev_create(node, device, device.size, instance.name, force_open, info) - msg = result.RemoteFailMsg() - if msg: - raise errors.OpExecError("Can't create block device %s on" - " node %s for instance %s: %s" % - (device, node, instance.name, msg)) + result.Raise("Can't create block device %s on" + " node %s for instance %s" % (device, node, instance.name)) if device.physical_id is None: device.physical_id = result.payload @@ -4204,7 +5196,7 @@ def _GenerateDiskTemplate(lu, template_name, if len(secondary_nodes) != 0: raise errors.ProgrammerError("Wrong template configuration") - names = _GenerateUniqueNames(lu, [".disk%d" % i + names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i) for i in range(disk_count)]) for idx, disk in enumerate(disk_info): disk_index = idx + base_index @@ -4221,7 +5213,7 @@ def _GenerateDiskTemplate(lu, template_name, [primary_node, remote_node] * len(disk_info), instance_name) names = [] - for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i + for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i) for i in range(disk_count)]): names.append(lv_prefix + "_data") names.append(lv_prefix + "_meta") @@ -4258,7 +5250,7 @@ def _GetInstanceInfoText(instance): return "originstname+%s" % instance.name -def _CreateDisks(lu, instance): +def _CreateDisks(lu, instance, to_skip=None, target_node=None): """Create all disks for an instance. This abstracts away some work from AddInstance. @@ -4267,36 +5259,43 @@ def _CreateDisks(lu, instance): @param lu: the logical unit on whose behalf we execute @type instance: L{objects.Instance} @param instance: the instance whose disks we should create + @type to_skip: list + @param to_skip: list of indices to skip + @type target_node: string + @param target_node: if passed, overrides the target node for creation @rtype: boolean @return: the success of the creation """ info = _GetInstanceInfoText(instance) - pnode = instance.primary_node + if target_node is None: + pnode = instance.primary_node + all_nodes = instance.all_nodes + else: + pnode = target_node + all_nodes = [pnode] if instance.disk_template == constants.DT_FILE: file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1]) result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir) - if result.failed or not result.data: - raise errors.OpExecError("Could not connect to node '%s'" % pnode) - - if not result.data[0]: - raise errors.OpExecError("Failed to create directory '%s'" % - file_storage_dir) + result.Raise("Failed to create directory '%s' on" + " node %s" % (file_storage_dir, pnode)) # Note: this needs to be kept in sync with adding of disks in # LUSetInstanceParams - for device in instance.disks: + for idx, device in enumerate(instance.disks): + if to_skip and idx in to_skip: + continue logging.info("Creating volume %s for instance %s", device.iv_name, instance.name) #HARDCODE - for node in instance.all_nodes: + for node in all_nodes: f_create = node == pnode _CreateBlockDev(lu, node, instance, device, f_create, info, f_create) -def _RemoveDisks(lu, instance): +def _RemoveDisks(lu, instance, target_node=None): """Remove all disks for an instance. This abstracts away some work from `AddInstance()` and @@ -4308,6 +5307,8 @@ def _RemoveDisks(lu, instance): @param lu: the logical unit on whose behalf we execute @type instance: L{objects.Instance} @param instance: the instance whose disks we should remove + @type target_node: string + @param target_node: used to override the node on which to remove the disks @rtype: boolean @return: the success of the removal @@ -4316,9 +5317,13 @@ def _RemoveDisks(lu, instance): all_result = True for device in instance.disks: - for node, disk in device.ComputeNodeTree(instance.primary_node): + if target_node: + edata = [(target_node, device)] + else: + edata = device.ComputeNodeTree(instance.primary_node) + for node, disk in edata: lu.cfg.SetDiskID(disk, node) - msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg() + msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg if msg: lu.LogWarning("Could not remove block device %s on node %s," " continuing anyway: %s", device.iv_name, node, msg) @@ -4326,10 +5331,14 @@ def _RemoveDisks(lu, instance): if instance.disk_template == constants.DT_FILE: file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1]) - result = lu.rpc.call_file_storage_dir_remove(instance.primary_node, - file_storage_dir) - if result.failed or not result.data: - logging.error("Could not remove directory '%s'", file_storage_dir) + if target_node: + tgt = target_node + else: + tgt = instance.primary_node + result = lu.rpc.call_file_storage_dir_remove(tgt, file_storage_dir) + if result.fail_msg: + lu.LogWarning("Could not remove directory '%s' on node %s: %s", + file_storage_dir, instance.primary_node, result.fail_msg) all_result = False return all_result @@ -4379,10 +5388,7 @@ def _CheckHVParams(lu, nodenames, hvname, hvparams): info = hvinfo[node] if info.offline: continue - msg = info.RemoteFailMsg() - if msg: - raise errors.OpPrereqError("Hypervisor parameter validation" - " failed on node %s: %s" % (node, msg)) + info.Raise("Hypervisor parameter validation failed on node %s" % node) class LUCreateInstance(LogicalUnit): @@ -4443,7 +5449,7 @@ class LUCreateInstance(LogicalUnit): # check hypervisor parameter syntax (locally) utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES) - filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor], + filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor], self.op.hvparams) hv_type = hypervisor.GetHypervisor(self.op.hypervisor) hv_type.CheckParameterSyntax(filled_hvp) @@ -4451,7 +5457,7 @@ class LUCreateInstance(LogicalUnit): # fill and remember the beparams dict utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES) - self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT], + self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT], self.op.beparams) #### instance parameters check @@ -4470,10 +5476,21 @@ class LUCreateInstance(LogicalUnit): # NIC buildup self.nics = [] - for nic in self.op.nics: + for idx, nic in enumerate(self.op.nics): + nic_mode_req = nic.get("mode", None) + nic_mode = nic_mode_req + if nic_mode is None: + nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE] + + # in routed mode, for the first nic, the default ip is 'auto' + if nic_mode == constants.NIC_MODE_ROUTED and idx == 0: + default_ip_mode = constants.VALUE_AUTO + else: + default_ip_mode = constants.VALUE_NONE + # ip validity checks - ip = nic.get("ip", None) - if ip is None or ip.lower() == "none": + ip = nic.get("ip", default_ip_mode) + if ip is None or ip.lower() == constants.VALUE_NONE: nic_ip = None elif ip.lower() == constants.VALUE_AUTO: nic_ip = hostname1.ip @@ -4483,17 +5500,43 @@ class LUCreateInstance(LogicalUnit): " like a valid IP" % ip) nic_ip = ip + # TODO: check the ip for uniqueness !! + if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip: + raise errors.OpPrereqError("Routed nic mode requires an ip address") + # MAC address verification mac = nic.get("mac", constants.VALUE_AUTO) if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE): if not utils.IsValidMac(mac.lower()): raise errors.OpPrereqError("Invalid MAC address specified: %s" % mac) + else: + # or validate/reserve the current one + if self.cfg.IsMacInUse(mac): + raise errors.OpPrereqError("MAC address %s already in use" + " in cluster" % mac) + # bridge verification bridge = nic.get("bridge", None) - if bridge is None: - bridge = self.cfg.GetDefBridge() - self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge)) + link = nic.get("link", None) + if bridge and link: + raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'" + " at the same time") + elif bridge and nic_mode == constants.NIC_MODE_ROUTED: + raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic") + elif bridge: + link = bridge + + nicparams = {} + if nic_mode_req: + nicparams[constants.NIC_MODE] = nic_mode_req + if link: + nicparams[constants.NIC_LINK] = link + + check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT], + nicparams) + objects.NIC.CheckParameterSyntax(check_params) + self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams)) # disk checks/pre-build self.disks = [] @@ -4560,16 +5603,22 @@ class LUCreateInstance(LogicalUnit): self.op.src_path = src_path = \ os.path.join(constants.EXPORT_DIR, src_path) + # On import force_variant must be True, because if we forced it at + # initial install, our only chance when importing it back is that it + # works again! + self.op.force_variant = True + else: # INSTANCE_CREATE if getattr(self.op, "os_type", None) is None: raise errors.OpPrereqError("No guest OS specified") + self.op.force_variant = getattr(self.op, "force_variant", False) def _RunAllocator(self): """Run the allocator based on input opcode. """ nics = [n.ToDict() for n in self.nics] - ial = IAllocator(self, + ial = IAllocator(self.cfg, self.rpc, mode=constants.IALLOCATOR_MODE_ALLOC, name=self.op.instance_name, disk_template=self.op.disk_template, @@ -4622,12 +5671,12 @@ class LUCreateInstance(LogicalUnit): os_type=self.op.os_type, memory=self.be_full[constants.BE_MEMORY], vcpus=self.be_full[constants.BE_VCPUS], - nics=[(n.ip, n.bridge, n.mac) for n in self.nics], + nics=_NICListToTuple(self, self.nics), disk_template=self.op.disk_template, disks=[(d["size"], d["mode"]) for d in self.disks], bep=self.be_full, hvp=self.hv_full, - hypervisor=self.op.hypervisor, + hypervisor_name=self.op.hypervisor, )) nl = ([self.cfg.GetMasterNode(), self.op.pnode] + @@ -4649,11 +5698,13 @@ class LUCreateInstance(LogicalUnit): src_path = self.op.src_path if src_node is None: - exp_list = self.rpc.call_export_list( - self.acquired_locks[locking.LEVEL_NODE]) + locked_nodes = self.acquired_locks[locking.LEVEL_NODE] + exp_list = self.rpc.call_export_list(locked_nodes) found = False for node in exp_list: - if not exp_list[node].failed and src_path in exp_list[node].data: + if exp_list[node].fail_msg: + continue + if src_path in exp_list[node].payload: found = True self.op.src_node = src_node = node self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR, @@ -4665,11 +5716,9 @@ class LUCreateInstance(LogicalUnit): _CheckNodeOnline(self, src_node) result = self.rpc.call_export_info(src_node, src_path) - result.Raise() - if not result.data: - raise errors.OpPrereqError("No export found in dir %s" % src_path) + result.Raise("No export or invalid export found in dir %s" % src_path) - export_info = result.data + export_info = objects.SerializableConfigParser.Loads(str(result.payload)) if not export_info.has_section(constants.INISECT_EXP): raise errors.ProgrammerError("Corrupted export config") @@ -4775,37 +5824,27 @@ class LUCreateInstance(LogicalUnit): self.op.hypervisor) for node in nodenames: info = nodeinfo[node] - info.Raise() - info = info.data - if not info: - raise errors.OpPrereqError("Cannot get current information" - " from node '%s'" % node) + info.Raise("Cannot get current information from node %s" % node) + info = info.payload vg_free = info.get('vg_free', None) if not isinstance(vg_free, int): raise errors.OpPrereqError("Can't compute free disk space on" " node %s" % node) - if req_size > info['vg_free']: + if req_size > vg_free: raise errors.OpPrereqError("Not enough disk space on target node %s." " %d MB available, %d MB required" % - (node, info['vg_free'], req_size)) + (node, vg_free, req_size)) _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams) # os verification result = self.rpc.call_os_get(pnode.name, self.op.os_type) - result.Raise() - if not isinstance(result.data, objects.OS) or not result.data: - raise errors.OpPrereqError("OS '%s' not in supported os list for" - " primary node" % self.op.os_type) - - # bridge check on primary node - bridges = [n.bridge for n in self.nics] - result = self.rpc.call_bridges_exist(self.pnode.name, bridges) - result.Raise() - if not result.data: - raise errors.OpPrereqError("One of the target bridges '%s' does not" - " exist on destination node '%s'" % - (",".join(bridges), pnode.name)) + result.Raise("OS '%s' not in supported os list for primary node %s" % + (self.op.os_type, pnode.name), prereq=True) + if not self.op.force_variant: + _CheckOSVariant(result.payload, self.op.os_type) + + _CheckNicsBridgesExist(self, self.nics, self.pnode.name) # memory check on primary node if self.op.start: @@ -4814,6 +5853,8 @@ class LUCreateInstance(LogicalUnit): self.be_full[constants.BE_MEMORY], self.op.hypervisor) + self.dry_run_result = list(nodenames) + def Exec(self, feedback_fn): """Create and add the instance to the cluster. @@ -4914,12 +5955,9 @@ class LUCreateInstance(LogicalUnit): if iobj.disk_template != constants.DT_DISKLESS: if self.op.mode == constants.INSTANCE_CREATE: feedback_fn("* running the instance OS create scripts...") - result = self.rpc.call_instance_os_add(pnode_name, iobj) - msg = result.RemoteFailMsg() - if msg: - raise errors.OpExecError("Could not add os for instance %s" - " on node %s: %s" % - (instance, pnode_name, msg)) + result = self.rpc.call_instance_os_add(pnode_name, iobj, False) + result.Raise("Could not add os for instance %s" + " on node %s" % (instance, pnode_name)) elif self.op.mode == constants.INSTANCE_IMPORT: feedback_fn("* running the instance OS import scripts...") @@ -4929,12 +5967,10 @@ class LUCreateInstance(LogicalUnit): import_result = self.rpc.call_instance_os_import(pnode_name, iobj, src_node, src_images, cluster_name) - import_result.Raise() - for idx, result in enumerate(import_result.data): - if not result: - self.LogWarning("Could not import the image %s for instance" - " %s, disk %d, on node %s" % - (src_images[idx], instance, idx, pnode_name)) + msg = import_result.fail_msg + if msg: + self.LogWarning("Error while importing the disk images for instance" + " %s on node %s: %s" % (instance, pnode_name, msg)) else: # also checked in the prereq part raise errors.ProgrammerError("Unknown OS initialization mode '%s'" @@ -4946,9 +5982,9 @@ class LUCreateInstance(LogicalUnit): logging.info("Starting instance %s on node %s", instance, pnode_name) feedback_fn("* starting instance...") result = self.rpc.call_instance_start(pnode_name, iobj, None, None) - msg = result.RemoteFailMsg() - if msg: - raise errors.OpExecError("Could not start instance: %s" % msg) + result.Raise("Could not start instance") + + return list(iobj.all_nodes) class LUConnectConsole(NoHooksLU): @@ -4985,9 +6021,9 @@ class LUConnectConsole(NoHooksLU): node_insts = self.rpc.call_instance_list([node], [instance.hypervisor])[node] - node_insts.Raise() + node_insts.Raise("Can't get node information from %s" % node) - if instance.name not in node_insts.data: + if instance.name not in node_insts.payload: raise errors.OpExecError("Instance %s is not running." % instance.name) logging.debug("Connecting to console of %s on %s", instance.name, node) @@ -5019,43 +6055,40 @@ class LUReplaceDisks(LogicalUnit): if not hasattr(self.op, "iallocator"): self.op.iallocator = None - # check for valid parameter combination - cnt = [self.op.remote_node, self.op.iallocator].count(None) - if self.op.mode == constants.REPLACE_DISK_CHG: - if cnt == 2: - raise errors.OpPrereqError("When changing the secondary either an" - " iallocator script must be used or the" - " new node given") - elif cnt == 0: - raise errors.OpPrereqError("Give either the iallocator or the new" - " secondary, not both") - else: # not replacing the secondary - if cnt != 2: - raise errors.OpPrereqError("The iallocator and new node options can" - " be used only when changing the" - " secondary node") + TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node, + self.op.iallocator) def ExpandNames(self): self._ExpandAndLockInstance() if self.op.iallocator is not None: self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + elif self.op.remote_node is not None: remote_node = self.cfg.ExpandNodeName(self.op.remote_node) if remote_node is None: raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node) + self.op.remote_node = remote_node + # Warning: do not remove the locking of the new secondary here # unless DRBD8.AddChildren is changed to work in parallel; # currently it doesn't since parallel invocations of # FindUnusedMinor will conflict self.needed_locks[locking.LEVEL_NODE] = [remote_node] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + else: self.needed_locks[locking.LEVEL_NODE] = [] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode, + self.op.iallocator, self.op.remote_node, + self.op.disks) + + self.tasklets = [self.replacer] + def DeclareLocks(self, level): # If we're not already locking all nodes in the set we have to declare the # instance's primary/secondary nodes. @@ -5063,48 +6096,202 @@ class LUReplaceDisks(LogicalUnit): self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET): self._LockInstancesNodes() - def _RunAllocator(self): + def BuildHooksEnv(self): + """Build hooks env. + + This runs on the master, the primary and all the secondaries. + + """ + instance = self.replacer.instance + env = { + "MODE": self.op.mode, + "NEW_SECONDARY": self.op.remote_node, + "OLD_SECONDARY": instance.secondary_nodes[0], + } + env.update(_BuildInstanceHookEnvByObject(self, instance)) + nl = [ + self.cfg.GetMasterNode(), + instance.primary_node, + ] + if self.op.remote_node is not None: + nl.append(self.op.remote_node) + return env, nl, nl + + +class LUEvacuateNode(LogicalUnit): + """Relocate the secondary instances from a node. + + """ + HPATH = "node-evacuate" + HTYPE = constants.HTYPE_NODE + _OP_REQP = ["node_name"] + REQ_BGL = False + + def CheckArguments(self): + if not hasattr(self.op, "remote_node"): + self.op.remote_node = None + if not hasattr(self.op, "iallocator"): + self.op.iallocator = None + + TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG, + self.op.remote_node, + self.op.iallocator) + + def ExpandNames(self): + self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name) + if self.op.node_name is None: + raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name) + + self.needed_locks = {} + + # Declare node locks + if self.op.iallocator is not None: + self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + + elif self.op.remote_node is not None: + remote_node = self.cfg.ExpandNodeName(self.op.remote_node) + if remote_node is None: + raise errors.OpPrereqError("Node '%s' not known" % + self.op.remote_node) + + self.op.remote_node = remote_node + + # Warning: do not remove the locking of the new secondary here + # unless DRBD8.AddChildren is changed to work in parallel; + # currently it doesn't since parallel invocations of + # FindUnusedMinor will conflict + self.needed_locks[locking.LEVEL_NODE] = [remote_node] + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + + else: + raise errors.OpPrereqError("Invalid parameters") + + # Create tasklets for replacing disks for all secondary instances on this + # node + names = [] + tasklets = [] + + for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name): + logging.debug("Replacing disks for instance %s", inst.name) + names.append(inst.name) + + replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG, + self.op.iallocator, self.op.remote_node, []) + tasklets.append(replacer) + + self.tasklets = tasklets + self.instance_names = names + + # Declare instance locks + self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names + + def DeclareLocks(self, level): + # If we're not already locking all nodes in the set we have to declare the + # instance's primary/secondary nodes. + if (level == locking.LEVEL_NODE and + self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET): + self._LockInstancesNodes() + + def BuildHooksEnv(self): + """Build hooks env. + + This runs on the master, the primary and all the secondaries. + + """ + env = { + "NODE_NAME": self.op.node_name, + } + + nl = [self.cfg.GetMasterNode()] + + if self.op.remote_node is not None: + env["NEW_SECONDARY"] = self.op.remote_node + nl.append(self.op.remote_node) + + return (env, nl, nl) + + +class TLReplaceDisks(Tasklet): + """Replaces disks for an instance. + + Note: Locking is not within the scope of this class. + + """ + def __init__(self, lu, instance_name, mode, iallocator_name, remote_node, + disks): + """Initializes this class. + + """ + Tasklet.__init__(self, lu) + + # Parameters + self.instance_name = instance_name + self.mode = mode + self.iallocator_name = iallocator_name + self.remote_node = remote_node + self.disks = disks + + # Runtime data + self.instance = None + self.new_node = None + self.target_node = None + self.other_node = None + self.remote_node_info = None + self.node_secondary_ip = None + + @staticmethod + def CheckArguments(mode, remote_node, iallocator): + """Helper function for users of this class. + + """ + # check for valid parameter combination + if mode == constants.REPLACE_DISK_CHG: + if remote_node is None and iallocator is None: + raise errors.OpPrereqError("When changing the secondary either an" + " iallocator script must be used or the" + " new node given") + + if remote_node is not None and iallocator is not None: + raise errors.OpPrereqError("Give either the iallocator or the new" + " secondary, not both") + + elif remote_node is not None or iallocator is not None: + # Not replacing the secondary + raise errors.OpPrereqError("The iallocator and new node options can" + " only be used when changing the" + " secondary node") + + @staticmethod + def _RunAllocator(lu, iallocator_name, instance_name, relocate_from): """Compute a new secondary node using an IAllocator. """ - ial = IAllocator(self, + ial = IAllocator(lu.cfg, lu.rpc, mode=constants.IALLOCATOR_MODE_RELOC, - name=self.op.instance_name, - relocate_from=[self.sec_node]) + name=instance_name, + relocate_from=relocate_from) - ial.Run(self.op.iallocator) + ial.Run(iallocator_name) if not ial.success: - raise errors.OpPrereqError("Can't compute nodes using" - " iallocator '%s': %s" % (self.op.iallocator, - ial.info)) + raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':" + " %s" % (iallocator_name, ial.info)) + if len(ial.nodes) != ial.required_nodes: raise errors.OpPrereqError("iallocator '%s' returned invalid number" " of nodes (%s), required %s" % (len(ial.nodes), ial.required_nodes)) - self.op.remote_node = ial.nodes[0] - self.LogInfo("Selected new secondary for the instance: %s", - self.op.remote_node) - def BuildHooksEnv(self): - """Build hooks env. + remote_node_name = ial.nodes[0] - This runs on the master, the primary and all the secondaries. + lu.LogInfo("Selected new secondary for instance '%s': %s", + instance_name, remote_node_name) - """ - env = { - "MODE": self.op.mode, - "NEW_SECONDARY": self.op.remote_node, - "OLD_SECONDARY": self.instance.secondary_nodes[0], - } - env.update(_BuildInstanceHookEnvByObject(self, self.instance)) - nl = [ - self.cfg.GetMasterNode(), - self.instance.primary_node, - ] - if self.op.remote_node is not None: - nl.append(self.op.remote_node) - return env, nl, nl + return remote_node_name + + def _FindFaultyDisks(self, node_name): + return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance, + node_name, True) def CheckPrereq(self): """Check prerequisites. @@ -5112,165 +6299,301 @@ class LUReplaceDisks(LogicalUnit): This checks that the instance is in the cluster. """ - instance = self.cfg.GetInstanceInfo(self.op.instance_name) - assert instance is not None, \ - "Cannot retrieve locked instance %s" % self.op.instance_name - self.instance = instance + self.instance = self.cfg.GetInstanceInfo(self.instance_name) + assert self.instance is not None, \ + "Cannot retrieve locked instance %s" % self.instance_name - if instance.disk_template != constants.DT_DRBD8: + if self.instance.disk_template != constants.DT_DRBD8: raise errors.OpPrereqError("Can only run replace disks for DRBD8-based" " instances") - if len(instance.secondary_nodes) != 1: + if len(self.instance.secondary_nodes) != 1: raise errors.OpPrereqError("The instance has a strange layout," " expected one secondary but found %d" % - len(instance.secondary_nodes)) + len(self.instance.secondary_nodes)) - self.sec_node = instance.secondary_nodes[0] + secondary_node = self.instance.secondary_nodes[0] - if self.op.iallocator is not None: - self._RunAllocator() + if self.iallocator_name is None: + remote_node = self.remote_node + else: + remote_node = self._RunAllocator(self.lu, self.iallocator_name, + self.instance.name, secondary_node) - remote_node = self.op.remote_node if remote_node is not None: self.remote_node_info = self.cfg.GetNodeInfo(remote_node) assert self.remote_node_info is not None, \ "Cannot retrieve locked node %s" % remote_node else: self.remote_node_info = None - if remote_node == instance.primary_node: + + if remote_node == self.instance.primary_node: raise errors.OpPrereqError("The specified node is the primary node of" " the instance.") - elif remote_node == self.sec_node: + + if remote_node == secondary_node: raise errors.OpPrereqError("The specified node is already the" " secondary node of the instance.") - if self.op.mode == constants.REPLACE_DISK_PRI: - n1 = self.tgt_node = instance.primary_node - n2 = self.oth_node = self.sec_node - elif self.op.mode == constants.REPLACE_DISK_SEC: - n1 = self.tgt_node = self.sec_node - n2 = self.oth_node = instance.primary_node - elif self.op.mode == constants.REPLACE_DISK_CHG: - n1 = self.new_node = remote_node - n2 = self.oth_node = instance.primary_node - self.tgt_node = self.sec_node - _CheckNodeNotDrained(self, remote_node) + if self.disks and self.mode in (constants.REPLACE_DISK_AUTO, + constants.REPLACE_DISK_CHG): + raise errors.OpPrereqError("Cannot specify disks to be replaced") + + if self.mode == constants.REPLACE_DISK_AUTO: + faulty_primary = self._FindFaultyDisks(self.instance.primary_node) + faulty_secondary = self._FindFaultyDisks(secondary_node) + + if faulty_primary and faulty_secondary: + raise errors.OpPrereqError("Instance %s has faulty disks on more than" + " one node and can not be repaired" + " automatically" % self.instance_name) + + if faulty_primary: + self.disks = faulty_primary + self.target_node = self.instance.primary_node + self.other_node = secondary_node + check_nodes = [self.target_node, self.other_node] + elif faulty_secondary: + self.disks = faulty_secondary + self.target_node = secondary_node + self.other_node = self.instance.primary_node + check_nodes = [self.target_node, self.other_node] + else: + self.disks = [] + check_nodes = [] + else: - raise errors.ProgrammerError("Unhandled disk replace mode") + # Non-automatic modes + if self.mode == constants.REPLACE_DISK_PRI: + self.target_node = self.instance.primary_node + self.other_node = secondary_node + check_nodes = [self.target_node, self.other_node] - _CheckNodeOnline(self, n1) - _CheckNodeOnline(self, n2) + elif self.mode == constants.REPLACE_DISK_SEC: + self.target_node = secondary_node + self.other_node = self.instance.primary_node + check_nodes = [self.target_node, self.other_node] - if not self.op.disks: - self.op.disks = range(len(instance.disks)) + elif self.mode == constants.REPLACE_DISK_CHG: + self.new_node = remote_node + self.other_node = self.instance.primary_node + self.target_node = secondary_node + check_nodes = [self.new_node, self.other_node] - for disk_idx in self.op.disks: - instance.FindDisk(disk_idx) + _CheckNodeNotDrained(self.lu, remote_node) - def _ExecD8DiskOnly(self, feedback_fn): - """Replace a disk on the primary or secondary for dbrd8. + else: + raise errors.ProgrammerError("Unhandled disk replace mode (%s)" % + self.mode) - The algorithm for replace is quite complicated: + # If not specified all disks should be replaced + if not self.disks: + self.disks = range(len(self.instance.disks)) - 1. for each disk to be replaced: + for node in check_nodes: + _CheckNodeOnline(self.lu, node) - 1. create new LVs on the target node with unique names - 1. detach old LVs from the drbd device - 1. rename old LVs to name_replaced. - 1. rename new LVs to old LVs - 1. attach the new LVs (with the old names now) to the drbd device + # Check whether disks are valid + for disk_idx in self.disks: + self.instance.FindDisk(disk_idx) - 1. wait for sync across all devices + # Get secondary node IP addresses + node_2nd_ip = {} - 1. for each modified disk: + for node_name in [self.target_node, self.other_node, self.new_node]: + if node_name is not None: + node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip - 1. remove old LVs (which have the name name_replaces.) + self.node_secondary_ip = node_2nd_ip - Failures are not very well handled. + def Exec(self, feedback_fn): + """Execute disk replacement. + + This dispatches the disk replacement to the appropriate handler. """ - steps_total = 6 - warning, info = (self.proc.LogWarning, self.proc.LogInfo) - instance = self.instance - iv_names = {} + if not self.disks: + feedback_fn("No disks need replacement") + return + + feedback_fn("Replacing disk(s) %s for %s" % + (", ".join([str(i) for i in self.disks]), self.instance.name)) + + activate_disks = (not self.instance.admin_up) + + # Activate the instance disks if we're replacing them on a down instance + if activate_disks: + _StartInstanceDisks(self.lu, self.instance, True) + + try: + # Should we replace the secondary node? + if self.new_node is not None: + return self._ExecDrbd8Secondary() + else: + return self._ExecDrbd8DiskOnly() + + finally: + # Deactivate the instance disks if we're replacing them on a down instance + if activate_disks: + _SafeShutdownInstanceDisks(self.lu, self.instance) + + def _CheckVolumeGroup(self, nodes): + self.lu.LogInfo("Checking volume groups") + vgname = self.cfg.GetVGName() - # start of work - cfg = self.cfg - tgt_node = self.tgt_node - oth_node = self.oth_node - # Step: check device activation - self.proc.LogStep(1, steps_total, "check device existence") - info("checking volume groups") - my_vg = cfg.GetVGName() - results = self.rpc.call_vg_list([oth_node, tgt_node]) + # Make sure volume group exists on all involved nodes + results = self.rpc.call_vg_list(nodes) if not results: raise errors.OpExecError("Can't list volume groups on the nodes") - for node in oth_node, tgt_node: + + for node in nodes: res = results[node] - if res.failed or not res.data or my_vg not in res.data: - raise errors.OpExecError("Volume group '%s' not found on %s" % - (my_vg, node)) - for idx, dev in enumerate(instance.disks): - if idx not in self.op.disks: + res.Raise("Error checking node %s" % node) + if vgname not in res.payload: + raise errors.OpExecError("Volume group '%s' not found on node %s" % + (vgname, node)) + + def _CheckDisksExistence(self, nodes): + # Check disk existence + for idx, dev in enumerate(self.instance.disks): + if idx not in self.disks: continue - for node in tgt_node, oth_node: - info("checking disk/%d on %s" % (idx, node)) - cfg.SetDiskID(dev, node) + + for node in nodes: + self.lu.LogInfo("Checking disk/%d on %s" % (idx, node)) + self.cfg.SetDiskID(dev, node) + result = self.rpc.call_blockdev_find(node, dev) - msg = result.RemoteFailMsg() - if not msg and not result.payload: - msg = "disk not found" - if msg: + + msg = result.fail_msg + if msg or not result.payload: + if not msg: + msg = "disk not found" raise errors.OpExecError("Can't find disk/%d on node %s: %s" % (idx, node, msg)) - # Step: check other node consistency - self.proc.LogStep(2, steps_total, "check peer consistency") - for idx, dev in enumerate(instance.disks): - if idx not in self.op.disks: + def _CheckDisksConsistency(self, node_name, on_primary, ldisk): + for idx, dev in enumerate(self.instance.disks): + if idx not in self.disks: continue - info("checking disk/%d consistency on %s" % (idx, oth_node)) - if not _CheckDiskConsistency(self, dev, oth_node, - oth_node==instance.primary_node): - raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe" - " to replace disks on this node (%s)" % - (oth_node, tgt_node)) - # Step: create new storage - self.proc.LogStep(3, steps_total, "allocate new storage") - for idx, dev in enumerate(instance.disks): - if idx not in self.op.disks: + self.lu.LogInfo("Checking disk/%d consistency on node %s" % + (idx, node_name)) + + if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary, + ldisk=ldisk): + raise errors.OpExecError("Node %s has degraded storage, unsafe to" + " replace disks for instance %s" % + (node_name, self.instance.name)) + + def _CreateNewStorage(self, node_name): + vgname = self.cfg.GetVGName() + iv_names = {} + + for idx, dev in enumerate(self.instance.disks): + if idx not in self.disks: continue - size = dev.size - cfg.SetDiskID(dev, tgt_node) - lv_names = [".disk%d_%s" % (idx, suf) - for suf in ["data", "meta"]] - names = _GenerateUniqueNames(self, lv_names) - lv_data = objects.Disk(dev_type=constants.LD_LV, size=size, + + self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx)) + + self.cfg.SetDiskID(dev, node_name) + + lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]] + names = _GenerateUniqueNames(self.lu, lv_names) + + lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size, logical_id=(vgname, names[0])) lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128, logical_id=(vgname, names[1])) + new_lvs = [lv_data, lv_meta] old_lvs = dev.children iv_names[dev.iv_name] = (dev, old_lvs, new_lvs) - info("creating new local storage on %s for %s" % - (tgt_node, dev.iv_name)) + # we pass force_create=True to force the LVM creation for new_lv in new_lvs: - _CreateBlockDev(self, tgt_node, instance, new_lv, True, - _GetInstanceInfoText(instance), False) + _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True, + _GetInstanceInfoText(self.instance), False) + + return iv_names + + def _CheckDevices(self, node_name, iv_names): + for name, (dev, old_lvs, new_lvs) in iv_names.iteritems(): + self.cfg.SetDiskID(dev, node_name) + + result = self.rpc.call_blockdev_find(node_name, dev) + + msg = result.fail_msg + if msg or not result.payload: + if not msg: + msg = "disk not found" + raise errors.OpExecError("Can't find DRBD device %s: %s" % + (name, msg)) + + if result.payload.is_degraded: + raise errors.OpExecError("DRBD device %s is degraded!" % name) + + def _RemoveOldStorage(self, node_name, iv_names): + for name, (dev, old_lvs, _) in iv_names.iteritems(): + self.lu.LogInfo("Remove logical volumes for %s" % name) + + for lv in old_lvs: + self.cfg.SetDiskID(lv, node_name) + + msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg + if msg: + self.lu.LogWarning("Can't remove old LV: %s" % msg, + hint="remove unused LVs manually") + + def _ExecDrbd8DiskOnly(self): + """Replace a disk on the primary or secondary for DRBD 8. + + The algorithm for replace is quite complicated: + + 1. for each disk to be replaced: + + 1. create new LVs on the target node with unique names + 1. detach old LVs from the drbd device + 1. rename old LVs to name_replaced. + 1. rename new LVs to old LVs + 1. attach the new LVs (with the old names now) to the drbd device + + 1. wait for sync across all devices + + 1. for each modified disk: + + 1. remove old LVs (which have the name name_replaces.) + + Failures are not very well handled. + + """ + steps_total = 6 + + # Step: check device activation + self.lu.LogStep(1, steps_total, "Check device existence") + self._CheckDisksExistence([self.other_node, self.target_node]) + self._CheckVolumeGroup([self.target_node, self.other_node]) + + # Step: check other node consistency + self.lu.LogStep(2, steps_total, "Check peer consistency") + self._CheckDisksConsistency(self.other_node, + self.other_node == self.instance.primary_node, + False) + + # Step: create new storage + self.lu.LogStep(3, steps_total, "Allocate new storage") + iv_names = self._CreateNewStorage(self.target_node) # Step: for each lv, detach+rename*2+attach - self.proc.LogStep(4, steps_total, "change drbd configuration") + self.lu.LogStep(4, steps_total, "Changing drbd configuration") for dev, old_lvs, new_lvs in iv_names.itervalues(): - info("detaching %s drbd from local storage" % dev.iv_name) - result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs) - result.Raise() - if not result.data: - raise errors.OpExecError("Can't detach drbd from local storage on node" - " %s for device %s" % (tgt_node, dev.iv_name)) + self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name) + + result = self.rpc.call_blockdev_removechildren(self.target_node, dev, + old_lvs) + result.Raise("Can't detach drbd from local storage on node" + " %s for device %s" % (self.target_node, dev.iv_name)) #dev.children = [] #cfg.Update(instance) @@ -5284,84 +6607,70 @@ class LUReplaceDisks(LogicalUnit): temp_suffix = int(time.time()) ren_fn = lambda d, suff: (d.physical_id[0], d.physical_id[1] + "_replaced-%s" % suff) - # build the rename list based on what LVs exist on the node - rlist = [] + + # Build the rename list based on what LVs exist on the node + rename_old_to_new = [] for to_ren in old_lvs: - result = self.rpc.call_blockdev_find(tgt_node, to_ren) - if not result.RemoteFailMsg() and result.payload: + result = self.rpc.call_blockdev_find(self.target_node, to_ren) + if not result.fail_msg and result.payload: # device exists - rlist.append((to_ren, ren_fn(to_ren, temp_suffix))) - - info("renaming the old LVs on the target node") - result = self.rpc.call_blockdev_rename(tgt_node, rlist) - result.Raise() - if not result.data: - raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node) - # now we rename the new LVs to the old LVs - info("renaming the new LVs on the target node") - rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)] - result = self.rpc.call_blockdev_rename(tgt_node, rlist) - result.Raise() - if not result.data: - raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node) + rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix))) + + self.lu.LogInfo("Renaming the old LVs on the target node") + result = self.rpc.call_blockdev_rename(self.target_node, + rename_old_to_new) + result.Raise("Can't rename old LVs on node %s" % self.target_node) + + # Now we rename the new LVs to the old LVs + self.lu.LogInfo("Renaming the new LVs on the target node") + rename_new_to_old = [(new, old.physical_id) + for old, new in zip(old_lvs, new_lvs)] + result = self.rpc.call_blockdev_rename(self.target_node, + rename_new_to_old) + result.Raise("Can't rename new LVs on node %s" % self.target_node) for old, new in zip(old_lvs, new_lvs): new.logical_id = old.logical_id - cfg.SetDiskID(new, tgt_node) + self.cfg.SetDiskID(new, self.target_node) for disk in old_lvs: disk.logical_id = ren_fn(disk, temp_suffix) - cfg.SetDiskID(disk, tgt_node) + self.cfg.SetDiskID(disk, self.target_node) - # now that the new lvs have the old name, we can add them to the device - info("adding new mirror component on %s" % tgt_node) - result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs) - if result.failed or not result.data: + # Now that the new lvs have the old name, we can add them to the device + self.lu.LogInfo("Adding new mirror component on %s" % self.target_node) + result = self.rpc.call_blockdev_addchildren(self.target_node, dev, + new_lvs) + msg = result.fail_msg + if msg: for new_lv in new_lvs: - msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg() - if msg: - warning("Can't rollback device %s: %s", dev, msg, - hint="cleanup manually the unused logical volumes") - raise errors.OpExecError("Can't add local storage to drbd") + msg2 = self.rpc.call_blockdev_remove(self.target_node, + new_lv).fail_msg + if msg2: + self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2, + hint=("cleanup manually the unused logical" + "volumes")) + raise errors.OpExecError("Can't add local storage to drbd: %s" % msg) dev.children = new_lvs - cfg.Update(instance) - # Step: wait for sync + self.cfg.Update(self.instance) - # this can fail as the old devices are degraded and _WaitForSync - # does a combined result over all disks, so we don't check its - # return value - self.proc.LogStep(5, steps_total, "sync devices") - _WaitForSync(self, instance, unlock=True) + # Wait for sync + # This can fail as the old devices are degraded and _WaitForSync + # does a combined result over all disks, so we don't check its return value + self.lu.LogStep(5, steps_total, "Sync devices") + _WaitForSync(self.lu, self.instance, unlock=True) - # so check manually all the devices - for name, (dev, old_lvs, new_lvs) in iv_names.iteritems(): - cfg.SetDiskID(dev, instance.primary_node) - result = self.rpc.call_blockdev_find(instance.primary_node, dev) - msg = result.RemoteFailMsg() - if not msg and not result.payload: - msg = "disk not found" - if msg: - raise errors.OpExecError("Can't find DRBD device %s: %s" % - (name, msg)) - if result.payload[5]: - raise errors.OpExecError("DRBD device %s is degraded!" % name) + # Check all devices manually + self._CheckDevices(self.instance.primary_node, iv_names) # Step: remove old storage - self.proc.LogStep(6, steps_total, "removing old storage") - for name, (dev, old_lvs, new_lvs) in iv_names.iteritems(): - info("remove logical volumes for %s" % name) - for lv in old_lvs: - cfg.SetDiskID(lv, tgt_node) - msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg() - if msg: - warning("Can't remove old LV: %s" % msg, - hint="manually remove unused LVs") - continue + self.lu.LogStep(6, steps_total, "Removing old storage") + self._RemoveOldStorage(self.target_node, iv_names) - def _ExecD8Secondary(self, feedback_fn): - """Replace the secondary node for drbd8. + def _ExecDrbd8Secondary(self): + """Replace the secondary node for DRBD 8. The algorithm for replace is quite complicated: - for all disks of the instance: @@ -5380,86 +6689,53 @@ class LUReplaceDisks(LogicalUnit): """ steps_total = 6 - warning, info = (self.proc.LogWarning, self.proc.LogInfo) - instance = self.instance - iv_names = {} - # start of work - cfg = self.cfg - old_node = self.tgt_node - new_node = self.new_node - pri_node = instance.primary_node - nodes_ip = { - old_node: self.cfg.GetNodeInfo(old_node).secondary_ip, - new_node: self.cfg.GetNodeInfo(new_node).secondary_ip, - pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip, - } # Step: check device activation - self.proc.LogStep(1, steps_total, "check device existence") - info("checking volume groups") - my_vg = cfg.GetVGName() - results = self.rpc.call_vg_list([pri_node, new_node]) - for node in pri_node, new_node: - res = results[node] - if res.failed or not res.data or my_vg not in res.data: - raise errors.OpExecError("Volume group '%s' not found on %s" % - (my_vg, node)) - for idx, dev in enumerate(instance.disks): - if idx not in self.op.disks: - continue - info("checking disk/%d on %s" % (idx, pri_node)) - cfg.SetDiskID(dev, pri_node) - result = self.rpc.call_blockdev_find(pri_node, dev) - msg = result.RemoteFailMsg() - if not msg and not result.payload: - msg = "disk not found" - if msg: - raise errors.OpExecError("Can't find disk/%d on node %s: %s" % - (idx, pri_node, msg)) + self.lu.LogStep(1, steps_total, "Check device existence") + self._CheckDisksExistence([self.instance.primary_node]) + self._CheckVolumeGroup([self.instance.primary_node]) # Step: check other node consistency - self.proc.LogStep(2, steps_total, "check peer consistency") - for idx, dev in enumerate(instance.disks): - if idx not in self.op.disks: - continue - info("checking disk/%d consistency on %s" % (idx, pri_node)) - if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True): - raise errors.OpExecError("Primary node (%s) has degraded storage," - " unsafe to replace the secondary" % - pri_node) + self.lu.LogStep(2, steps_total, "Check peer consistency") + self._CheckDisksConsistency(self.instance.primary_node, True, True) # Step: create new storage - self.proc.LogStep(3, steps_total, "allocate new storage") - for idx, dev in enumerate(instance.disks): - info("adding new local storage on %s for disk/%d" % - (new_node, idx)) + self.lu.LogStep(3, steps_total, "Allocate new storage") + for idx, dev in enumerate(self.instance.disks): + self.lu.LogInfo("Adding new local storage on %s for disk/%d" % + (self.new_node, idx)) # we pass force_create=True to force LVM creation for new_lv in dev.children: - _CreateBlockDev(self, new_node, instance, new_lv, True, - _GetInstanceInfoText(instance), False) + _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True, + _GetInstanceInfoText(self.instance), False) # Step 4: dbrd minors and drbd setups changes # after this, we must manually remove the drbd minors on both the # error and the success paths - minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks], - instance.name) - logging.debug("Allocated minors %s" % (minors,)) - self.proc.LogStep(4, steps_total, "changing drbd configuration") - for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)): - size = dev.size - info("activating a new drbd on %s for disk/%d" % (new_node, idx)) + self.lu.LogStep(4, steps_total, "Changing drbd configuration") + minors = self.cfg.AllocateDRBDMinor([self.new_node + for dev in self.instance.disks], + self.instance.name) + logging.debug("Allocated minors %r" % (minors,)) + + iv_names = {} + for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)): + self.lu.LogInfo("activating a new drbd on %s for disk/%d" % + (self.new_node, idx)) # create new devices on new_node; note that we create two IDs: # one without port, so the drbd will be activated without # networking information on the new node at this stage, and one # with network, for the latter activation in step 4 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id - if pri_node == o_node1: + if self.instance.primary_node == o_node1: p_minor = o_minor1 else: p_minor = o_minor2 - new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret) - new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret) + new_alone_id = (self.instance.primary_node, self.new_node, None, + p_minor, new_minor, o_secret) + new_net_id = (self.instance.primary_node, self.new_node, o_port, + p_minor, new_minor, o_secret) iv_names[idx] = (dev, dev.children, new_net_id) logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor, @@ -5469,106 +6745,130 @@ class LUReplaceDisks(LogicalUnit): children=dev.children, size=dev.size) try: - _CreateSingleBlockDev(self, new_node, instance, new_drbd, - _GetInstanceInfoText(instance), False) + _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd, + _GetInstanceInfoText(self.instance), False) except errors.GenericError: - self.cfg.ReleaseDRBDMinors(instance.name) + self.cfg.ReleaseDRBDMinors(self.instance.name) raise - for idx, dev in enumerate(instance.disks): - # we have new devices, shutdown the drbd on the old secondary - info("shutting down drbd for disk/%d on old node" % idx) - cfg.SetDiskID(dev, old_node) - msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg() + # We have new devices, shutdown the drbd on the old secondary + for idx, dev in enumerate(self.instance.disks): + self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx) + self.cfg.SetDiskID(dev, self.target_node) + msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg if msg: - warning("Failed to shutdown drbd for disk/%d on old node: %s" % - (idx, msg), - hint="Please cleanup this device manually as soon as possible") - - info("detaching primary drbds from the network (=> standalone)") - result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip, - instance.disks)[pri_node] - - msg = result.RemoteFailMsg() + self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old" + "node: %s" % (idx, msg), + hint=("Please cleanup this device manually as" + " soon as possible")) + + self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)") + result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node], + self.node_secondary_ip, + self.instance.disks)\ + [self.instance.primary_node] + + msg = result.fail_msg if msg: # detaches didn't succeed (unlikely) - self.cfg.ReleaseDRBDMinors(instance.name) + self.cfg.ReleaseDRBDMinors(self.instance.name) raise errors.OpExecError("Can't detach the disks from the network on" " old node: %s" % (msg,)) # if we managed to detach at least one, we update all the disks of # the instance to point to the new secondary - info("updating instance configuration") + self.lu.LogInfo("Updating instance configuration") for dev, _, new_logical_id in iv_names.itervalues(): dev.logical_id = new_logical_id - cfg.SetDiskID(dev, pri_node) - cfg.Update(instance) + self.cfg.SetDiskID(dev, self.instance.primary_node) + + self.cfg.Update(self.instance) # and now perform the drbd attach - info("attaching primary drbds to new secondary (standalone => connected)") - result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip, - instance.disks, instance.name, + self.lu.LogInfo("Attaching primary drbds to new secondary" + " (standalone => connected)") + result = self.rpc.call_drbd_attach_net([self.instance.primary_node, + self.new_node], + self.node_secondary_ip, + self.instance.disks, + self.instance.name, False) for to_node, to_result in result.items(): - msg = to_result.RemoteFailMsg() + msg = to_result.fail_msg if msg: - warning("can't attach drbd disks on node %s: %s", to_node, msg, - hint="please do a gnt-instance info to see the" - " status of disks") - - # this can fail as the old devices are degraded and _WaitForSync - # does a combined result over all disks, so we don't check its - # return value - self.proc.LogStep(5, steps_total, "sync devices") - _WaitForSync(self, instance, unlock=True) - - # so check manually all the devices - for idx, (dev, old_lvs, _) in iv_names.iteritems(): - cfg.SetDiskID(dev, pri_node) - result = self.rpc.call_blockdev_find(pri_node, dev) - msg = result.RemoteFailMsg() - if not msg and not result.payload: - msg = "disk not found" - if msg: - raise errors.OpExecError("Can't find DRBD device disk/%d: %s" % - (idx, msg)) - if result.payload[5]: - raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx) - - self.proc.LogStep(6, steps_total, "removing old storage") - for idx, (dev, old_lvs, _) in iv_names.iteritems(): - info("remove logical volumes for disk/%d" % idx) - for lv in old_lvs: - cfg.SetDiskID(lv, old_node) - msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg() - if msg: - warning("Can't remove LV on old secondary: %s", msg, - hint="Cleanup stale volumes by hand") + self.lu.LogWarning("Can't attach drbd disks on node %s: %s", + to_node, msg, + hint=("please do a gnt-instance info to see the" + " status of disks")) - def Exec(self, feedback_fn): - """Execute disk replacement. + # Wait for sync + # This can fail as the old devices are degraded and _WaitForSync + # does a combined result over all disks, so we don't check its return value + self.lu.LogStep(5, steps_total, "Sync devices") + _WaitForSync(self.lu, self.instance, unlock=True) - This dispatches the disk replacement to the appropriate handler. + # Check all devices manually + self._CheckDevices(self.instance.primary_node, iv_names) - """ - instance = self.instance + # Step: remove old storage + self.lu.LogStep(6, steps_total, "Removing old storage") + self._RemoveOldStorage(self.target_node, iv_names) - # Activate the instance disks if we're replacing them on a down instance - if not instance.admin_up: - _StartInstanceDisks(self, instance, True) - if self.op.mode == constants.REPLACE_DISK_CHG: - fn = self._ExecD8Secondary - else: - fn = self._ExecD8DiskOnly +class LURepairNodeStorage(NoHooksLU): + """Repairs the volume group on a node. + + """ + _OP_REQP = ["node_name"] + REQ_BGL = False + + def CheckArguments(self): + node_name = self.cfg.ExpandNodeName(self.op.node_name) + if node_name is None: + raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name) + + self.op.node_name = node_name + + def ExpandNames(self): + self.needed_locks = { + locking.LEVEL_NODE: [self.op.node_name], + } + + def _CheckFaultyDisks(self, instance, node_name): + if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance, + node_name, True): + raise errors.OpPrereqError("Instance '%s' has faulty disks on" + " node '%s'" % (instance.name, node_name)) + + def CheckPrereq(self): + """Check prerequisites. + + """ + storage_type = self.op.storage_type - ret = fn(feedback_fn) + if (constants.SO_FIX_CONSISTENCY not in + constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])): + raise errors.OpPrereqError("Storage units of type '%s' can not be" + " repaired" % storage_type) - # Deactivate the instance disks if we're replacing them on a down instance - if not instance.admin_up: - _SafeShutdownInstanceDisks(self, instance) + # Check whether any instance on this node has faulty disks + for inst in _GetNodeInstances(self.cfg, self.op.node_name): + check_nodes = set(inst.all_nodes) + check_nodes.discard(self.op.node_name) + for inst_node_name in check_nodes: + self._CheckFaultyDisks(inst, inst_node_name) - return ret + def Exec(self, feedback_fn): + feedback_fn("Repairing storage unit '%s' on %s ..." % + (self.op.name, self.op.node_name)) + + st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type) + result = self.rpc.call_storage_execute(self.op.node_name, + self.op.storage_type, st_args, + self.op.name, + constants.SO_FIX_CONSISTENCY) + result.Raise("Failed to repair storage unit '%s' on %s" % + (self.op.name, self.op.node_name)) class LUGrowDisk(LogicalUnit): @@ -5632,10 +6932,8 @@ class LUGrowDisk(LogicalUnit): instance.hypervisor) for node in nodenames: info = nodeinfo[node] - if info.failed or not info.data: - raise errors.OpPrereqError("Cannot get current information" - " from node '%s'" % node) - vg_free = info.data.get('vg_free', None) + info.Raise("Cannot get current information from node %s" % node) + vg_free = info.payload.get('vg_free', None) if not isinstance(vg_free, int): raise errors.OpPrereqError("Can't compute free disk space on" " node %s" % node) @@ -5653,10 +6951,7 @@ class LUGrowDisk(LogicalUnit): for node in instance.all_nodes: self.cfg.SetDiskID(disk, node) result = self.rpc.call_blockdev_grow(node, disk, self.op.amount) - msg = result.RemoteFailMsg() - if msg: - raise errors.OpExecError("Grow request failed to node %s: %s" % - (node, msg)) + result.Raise("Grow request failed to node %s" % node) disk.RecordGrow(self.op.amount) self.cfg.Update(instance) if self.op.wait_for_sync: @@ -5675,7 +6970,7 @@ class LUQueryInstanceData(NoHooksLU): def ExpandNames(self): self.needed_locks = {} - self.share_locks = dict(((i, 1) for i in locking.LEVELS)) + self.share_locks = dict.fromkeys(locking.LEVELS, 1) if not isinstance(self.op.instances, list): raise errors.OpPrereqError("Invalid argument type 'instances'") @@ -5712,25 +7007,33 @@ class LUQueryInstanceData(NoHooksLU): in self.wanted_names] return + def _ComputeBlockdevStatus(self, node, instance_name, dev): + """Returns the status of a block device + + """ + if self.op.static or not node: + return None + + self.cfg.SetDiskID(dev, node) + + result = self.rpc.call_blockdev_find(node, dev) + if result.offline: + return None + + result.Raise("Can't compute disk status for %s" % instance_name) + + status = result.payload + if status is None: + return None + + return (status.dev_path, status.major, status.minor, + status.sync_percent, status.estimated_time, + status.is_degraded, status.ldisk_status) + def _ComputeDiskStatus(self, instance, snode, dev): """Compute block device status. """ - static = self.op.static - if not static: - self.cfg.SetDiskID(dev, instance.primary_node) - dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev) - if dev_pstatus.offline: - dev_pstatus = None - else: - msg = dev_pstatus.RemoteFailMsg() - if msg: - raise errors.OpExecError("Can't compute disk status for %s: %s" % - (instance.name, msg)) - dev_pstatus = dev_pstatus.payload - else: - dev_pstatus = None - if dev.dev_type in constants.LDS_DRBD: # we change the snode then (otherwise we use the one passed in) if dev.logical_id[0] == instance.primary_node: @@ -5738,19 +7041,9 @@ class LUQueryInstanceData(NoHooksLU): else: snode = dev.logical_id[0] - if snode and not static: - self.cfg.SetDiskID(dev, snode) - dev_sstatus = self.rpc.call_blockdev_find(snode, dev) - if dev_sstatus.offline: - dev_sstatus = None - else: - msg = dev_sstatus.RemoteFailMsg() - if msg: - raise errors.OpExecError("Can't compute disk status for %s: %s" % - (instance.name, msg)) - dev_sstatus = dev_sstatus.payload - else: - dev_sstatus = None + dev_pstatus = self._ComputeBlockdevStatus(instance.primary_node, + instance.name, dev) + dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev) if dev.children: dev_children = [self._ComputeDiskStatus(instance, snode, child) @@ -5783,8 +7076,8 @@ class LUQueryInstanceData(NoHooksLU): remote_info = self.rpc.call_instance_info(instance.primary_node, instance.name, instance.hypervisor) - remote_info.Raise() - remote_info = remote_info.data + remote_info.Raise("Error checking node %s" % instance.primary_node) + remote_info = remote_info.payload if remote_info and "state" in remote_info: remote_state = "up" else: @@ -5806,7 +7099,8 @@ class LUQueryInstanceData(NoHooksLU): "pnode": instance.primary_node, "snodes": instance.secondary_nodes, "os": instance.os, - "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics], + # this happens to be the same format used for hooks + "nics": _NICListToTuple(self, instance.nics), "disks": disks, "hypervisor": instance.hypervisor, "network_port": instance.network_port, @@ -5814,6 +7108,10 @@ class LUQueryInstanceData(NoHooksLU): "hv_actual": cluster.FillHV(instance), "be_instance": instance.beparams, "be_actual": cluster.FillBE(instance), + "serial_no": instance.serial_no, + "mtime": instance.mtime, + "ctime": instance.ctime, + "uuid": instance.uuid, } result[instance.name] = idict @@ -5855,6 +7153,10 @@ class LUSetInstanceParams(LogicalUnit): else: if not isinstance(disk_op, int): raise errors.OpPrereqError("Invalid disk index") + if not isinstance(disk_dict, dict): + msg = "Invalid disk value: expected dict, got '%s'" % disk_dict + raise errors.OpPrereqError(msg) + if disk_op == constants.DDM_ADD: mode = disk_dict.setdefault('mode', constants.DISK_RDWR) if mode not in constants.DISK_ACCESS_SET: @@ -5889,6 +7191,9 @@ class LUSetInstanceParams(LogicalUnit): else: if not isinstance(nic_op, int): raise errors.OpPrereqError("Invalid nic index") + if not isinstance(nic_dict, dict): + msg = "Invalid nic value: expected dict, got '%s'" % nic_dict + raise errors.OpPrereqError(msg) # nic_dict should be a dict nic_ip = nic_dict.get('ip', None) @@ -5899,10 +7204,17 @@ class LUSetInstanceParams(LogicalUnit): if not utils.IsValidIP(nic_ip): raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip) + nic_bridge = nic_dict.get('bridge', None) + nic_link = nic_dict.get('link', None) + if nic_bridge and nic_link: + raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'" + " at the same time") + elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE: + nic_dict['bridge'] = None + elif nic_link and nic_link.lower() == constants.VALUE_NONE: + nic_dict['link'] = None + if nic_op == constants.DDM_ADD: - nic_bridge = nic_dict.get('bridge', None) - if nic_bridge is None: - nic_dict['bridge'] = self.cfg.GetDefBridge() nic_mac = nic_dict.get('mac', None) if nic_mac is None: nic_dict['mac'] = constants.VALUE_AUTO @@ -5945,6 +7257,7 @@ class LUSetInstanceParams(LogicalUnit): if self.op.nics: args['nics'] = [] nic_override = dict(self.op.nics) + c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT] for idx, nic in enumerate(self.instance.nics): if idx in nic_override: this_nic_override = nic_override[idx] @@ -5954,20 +7267,24 @@ class LUSetInstanceParams(LogicalUnit): ip = this_nic_override['ip'] else: ip = nic.ip - if 'bridge' in this_nic_override: - bridge = this_nic_override['bridge'] - else: - bridge = nic.bridge if 'mac' in this_nic_override: mac = this_nic_override['mac'] else: mac = nic.mac - args['nics'].append((ip, bridge, mac)) + if idx in self.nic_pnew: + nicparams = self.nic_pnew[idx] + else: + nicparams = objects.FillDict(c_nicparams, nic.nicparams) + mode = nicparams[constants.NIC_MODE] + link = nicparams[constants.NIC_LINK] + args['nics'].append((ip, mac, mode, link)) if constants.DDM_ADD in nic_override: ip = nic_override[constants.DDM_ADD].get('ip', None) - bridge = nic_override[constants.DDM_ADD]['bridge'] mac = nic_override[constants.DDM_ADD]['mac'] - args['nics'].append((ip, bridge, mac)) + nicparams = self.nic_pnew[constants.DDM_ADD] + mode = nicparams[constants.NIC_MODE] + link = nicparams[constants.NIC_LINK] + args['nics'].append((ip, mac, mode, link)) elif constants.DDM_REMOVE in nic_override: del args['nics'][-1] @@ -5975,17 +7292,50 @@ class LUSetInstanceParams(LogicalUnit): nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) return env, nl, nl + def _GetUpdatedParams(self, old_params, update_dict, + default_values, parameter_types): + """Return the new params dict for the given params. + + @type old_params: dict + @param old_params: old parameters + @type update_dict: dict + @param update_dict: dict containing new parameter values, + or constants.VALUE_DEFAULT to reset the + parameter to its default value + @type default_values: dict + @param default_values: default values for the filled parameters + @type parameter_types: dict + @param parameter_types: dict mapping target dict keys to types + in constants.ENFORCEABLE_TYPES + @rtype: (dict, dict) + @return: (new_parameters, filled_parameters) + + """ + params_copy = copy.deepcopy(old_params) + for key, val in update_dict.iteritems(): + if val == constants.VALUE_DEFAULT: + try: + del params_copy[key] + except KeyError: + pass + else: + params_copy[key] = val + utils.ForceDictType(params_copy, parameter_types) + params_filled = objects.FillDict(default_values, params_copy) + return (params_copy, params_filled) + def CheckPrereq(self): """Check prerequisites. This only checks the instance list against the existing names. """ - force = self.force = self.op.force + self.force = self.op.force # checking the new params on the primary/secondary nodes instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name) + cluster = self.cluster = self.cfg.GetClusterInfo() assert self.instance is not None, \ "Cannot retrieve locked instance %s" % self.op.instance_name pnode = instance.primary_node @@ -5993,19 +7343,10 @@ class LUSetInstanceParams(LogicalUnit): # hvparams processing if self.op.hvparams: - i_hvdict = copy.deepcopy(instance.hvparams) - for key, val in self.op.hvparams.iteritems(): - if val == constants.VALUE_DEFAULT: - try: - del i_hvdict[key] - except KeyError: - pass - else: - i_hvdict[key] = val - cluster = self.cfg.GetClusterInfo() - utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES) - hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor], - i_hvdict) + i_hvdict, hv_new = self._GetUpdatedParams( + instance.hvparams, self.op.hvparams, + cluster.hvparams[instance.hypervisor], + constants.HVS_PARAMETER_TYPES) # local check hypervisor.GetHypervisor( instance.hypervisor).CheckParameterSyntax(hv_new) @@ -6017,19 +7358,10 @@ class LUSetInstanceParams(LogicalUnit): # beparams processing if self.op.beparams: - i_bedict = copy.deepcopy(instance.beparams) - for key, val in self.op.beparams.iteritems(): - if val == constants.VALUE_DEFAULT: - try: - del i_bedict[key] - except KeyError: - pass - else: - i_bedict[key] = val - cluster = self.cfg.GetClusterInfo() - utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES) - be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT], - i_bedict) + i_bedict, be_new = self._GetUpdatedParams( + instance.beparams, self.op.beparams, + cluster.beparams[constants.PP_DEFAULT], + constants.BES_PARAMETER_TYPES) self.be_new = be_new # the new actual values self.be_inst = i_bedict # the new dict (without defaults) else: @@ -6046,35 +7378,51 @@ class LUSetInstanceParams(LogicalUnit): instance.hypervisor) nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(), instance.hypervisor) - if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict): + pninfo = nodeinfo[pnode] + msg = pninfo.fail_msg + if msg: # Assume the primary node is unreachable and go ahead - self.warn.append("Can't get info from primary node %s" % pnode) + self.warn.append("Can't get info from primary node %s: %s" % + (pnode, msg)) + elif not isinstance(pninfo.payload.get('memory_free', None), int): + self.warn.append("Node data from primary node %s doesn't contain" + " free memory information" % pnode) + elif instance_info.fail_msg: + self.warn.append("Can't get instance runtime information: %s" % + instance_info.fail_msg) else: - if not instance_info.failed and instance_info.data: - current_mem = int(instance_info.data['memory']) + if instance_info.payload: + current_mem = int(instance_info.payload['memory']) else: # Assume instance not running # (there is a slight race condition here, but it's not very probable, # and we have no other way to check) current_mem = 0 miss_mem = (be_new[constants.BE_MEMORY] - current_mem - - nodeinfo[pnode].data['memory_free']) + pninfo.payload['memory_free']) if miss_mem > 0: raise errors.OpPrereqError("This change will prevent the instance" " from starting, due to %d MB of memory" " missing on its primary node" % miss_mem) if be_new[constants.BE_AUTO_BALANCE]: - for node, nres in nodeinfo.iteritems(): + for node, nres in nodeinfo.items(): if node not in instance.secondary_nodes: continue - if nres.failed or not isinstance(nres.data, dict): - self.warn.append("Can't get info from secondary node %s" % node) - elif be_new[constants.BE_MEMORY] > nres.data['memory_free']: + msg = nres.fail_msg + if msg: + self.warn.append("Can't get info from secondary node %s: %s" % + (node, msg)) + elif not isinstance(nres.payload.get('memory_free', None), int): + self.warn.append("Secondary node %s didn't return free" + " memory information" % node) + elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']: self.warn.append("Not enough memory to failover instance to" " secondary node %s" % node) # NIC processing + self.nic_pnew = {} + self.nic_pinst = {} for nic_op, nic_dict in self.op.nics: if nic_op == constants.DDM_REMOVE: if not instance.nics: @@ -6086,17 +7434,45 @@ class LUSetInstanceParams(LogicalUnit): raise errors.OpPrereqError("Invalid NIC index %s, valid values" " are 0 to %d" % (nic_op, len(instance.nics))) + old_nic_params = instance.nics[nic_op].nicparams + old_nic_ip = instance.nics[nic_op].ip + else: + old_nic_params = {} + old_nic_ip = None + + update_params_dict = dict([(key, nic_dict[key]) + for key in constants.NICS_PARAMETERS + if key in nic_dict]) + if 'bridge' in nic_dict: - nic_bridge = nic_dict['bridge'] - if nic_bridge is None: - raise errors.OpPrereqError('Cannot set the nic bridge to None') - if not self.rpc.call_bridges_exist(pnode, [nic_bridge]): - msg = ("Bridge '%s' doesn't exist on one of" - " the instance nodes" % nic_bridge) + update_params_dict[constants.NIC_LINK] = nic_dict['bridge'] + + new_nic_params, new_filled_nic_params = \ + self._GetUpdatedParams(old_nic_params, update_params_dict, + cluster.nicparams[constants.PP_DEFAULT], + constants.NICS_PARAMETER_TYPES) + objects.NIC.CheckParameterSyntax(new_filled_nic_params) + self.nic_pinst[nic_op] = new_nic_params + self.nic_pnew[nic_op] = new_filled_nic_params + new_nic_mode = new_filled_nic_params[constants.NIC_MODE] + + if new_nic_mode == constants.NIC_MODE_BRIDGED: + nic_bridge = new_filled_nic_params[constants.NIC_LINK] + msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg + if msg: + msg = "Error checking bridges on node %s: %s" % (pnode, msg) if self.force: self.warn.append(msg) else: raise errors.OpPrereqError(msg) + if new_nic_mode == constants.NIC_MODE_ROUTED: + if 'ip' in nic_dict: + nic_ip = nic_dict['ip'] + else: + nic_ip = old_nic_ip + if nic_ip is None: + raise errors.OpPrereqError('Cannot set the nic ip to None' + ' on a routed nic') if 'mac' in nic_dict: nic_mac = nic_dict['mac'] if nic_mac is None: @@ -6121,9 +7497,11 @@ class LUSetInstanceParams(LogicalUnit): " an instance") ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor]) ins_l = ins_l[pnode] - if ins_l.failed or not isinstance(ins_l.data, list): - raise errors.OpPrereqError("Can't contact node '%s'" % pnode) - if instance.name in ins_l.data: + msg = ins_l.fail_msg + if msg: + raise errors.OpPrereqError("Can't contact node %s: %s" % + (pnode, msg)) + if instance.name in ins_l.payload: raise errors.OpPrereqError("Instance is running, can't remove" " disks.") @@ -6153,6 +7531,7 @@ class LUSetInstanceParams(LogicalUnit): result = [] instance = self.instance + cluster = self.cluster # disk changes for disk_op, disk_dict in self.op.disks: if disk_op == constants.DDM_REMOVE: @@ -6161,7 +7540,7 @@ class LUSetInstanceParams(LogicalUnit): device_idx = len(instance.disks) for node, disk in device.ComputeNodeTree(instance.primary_node): self.cfg.SetDiskID(disk, node) - msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg() + msg = self.rpc.call_blockdev_remove(node, disk).fail_msg if msg: self.LogWarning("Could not remove disk/%d on node %s: %s," " continuing anyway", device_idx, node, msg) @@ -6213,19 +7592,24 @@ class LUSetInstanceParams(LogicalUnit): elif nic_op == constants.DDM_ADD: # mac and bridge should be set, by now mac = nic_dict['mac'] - bridge = nic_dict['bridge'] - new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None), - bridge=bridge) + ip = nic_dict.get('ip', None) + nicparams = self.nic_pinst[constants.DDM_ADD] + new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams) instance.nics.append(new_nic) result.append(("nic.%d" % (len(instance.nics) - 1), - "add:mac=%s,ip=%s,bridge=%s" % - (new_nic.mac, new_nic.ip, new_nic.bridge))) + "add:mac=%s,ip=%s,mode=%s,link=%s" % + (new_nic.mac, new_nic.ip, + self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE], + self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK] + ))) else: - # change a given nic - for key in 'mac', 'ip', 'bridge': + for key in 'mac', 'ip': if key in nic_dict: setattr(instance.nics[nic_op], key, nic_dict[key]) - result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key])) + if nic_op in self.nic_pnew: + instance.nics[nic_op].nicparams = self.nic_pnew[nic_op] + for key, val in nic_dict.iteritems(): + result.append(("nic.%s/%d" % (key, nic_op), val)) # hvparams changes if self.op.hvparams: @@ -6278,10 +7662,10 @@ class LUQueryExports(NoHooksLU): rpcresult = self.rpc.call_export_list(self.nodes) result = {} for node in rpcresult: - if rpcresult[node].failed: + if rpcresult[node].fail_msg: result[node] = False else: - result[node] = rpcresult[node].data + result[node] = rpcresult[node].payload return result @@ -6304,7 +7688,7 @@ class LUExportInstance(LogicalUnit): # remove it from its current node. In the future we could fix this by: # - making a tasklet to search (share-lock all), then create the new one, # then one to remove, after - # - removing the removal operation altoghether + # - removing the removal operation altogether self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET def DeclareLocks(self, level): @@ -6360,14 +7744,13 @@ class LUExportInstance(LogicalUnit): instance = self.instance dst_node = self.dst_node src_node = instance.primary_node + if self.op.shutdown: # shutdown the instance, but not the disks + feedback_fn("Shutting down instance %s" % instance.name) result = self.rpc.call_instance_shutdown(src_node, instance) - msg = result.RemoteFailMsg() - if msg: - raise errors.OpExecError("Could not shutdown instance %s on" - " node %s: %s" % - (instance.name, src_node, msg)) + result.Raise("Could not shutdown instance %s on" + " node %s" % (instance.name, src_node)) vgname = self.cfg.GetVGName() @@ -6378,25 +7761,32 @@ class LUExportInstance(LogicalUnit): for disk in instance.disks: self.cfg.SetDiskID(disk, src_node) + # per-disk results + dresults = [] try: for idx, disk in enumerate(instance.disks): - # new_dev_name will be a snapshot of an lvm leaf of the one we passed - new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk) - if new_dev_name.failed or not new_dev_name.data: - self.LogWarning("Could not snapshot disk/%d on node %s", - idx, src_node) + feedback_fn("Creating a snapshot of disk/%s on node %s" % + (idx, src_node)) + + # result.payload will be a snapshot of an lvm leaf of the one we passed + result = self.rpc.call_blockdev_snapshot(src_node, disk) + msg = result.fail_msg + if msg: + self.LogWarning("Could not snapshot disk/%s on node %s: %s", + idx, src_node, msg) snap_disks.append(False) else: + disk_id = (vgname, result.payload) new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size, - logical_id=(vgname, new_dev_name.data), - physical_id=(vgname, new_dev_name.data), + logical_id=disk_id, physical_id=disk_id, iv_name=disk.iv_name) snap_disks.append(new_dev) finally: if self.op.shutdown and instance.admin_up: + feedback_fn("Starting instance %s" % instance.name) result = self.rpc.call_instance_start(src_node, instance, None, None) - msg = result.RemoteFailMsg() + msg = result.fail_msg if msg: _ShutdownInstanceDisks(self, instance) raise errors.OpExecError("Could not start instance: %s" % msg) @@ -6405,21 +7795,33 @@ class LUExportInstance(LogicalUnit): cluster_name = self.cfg.GetClusterName() for idx, dev in enumerate(snap_disks): + feedback_fn("Exporting snapshot %s from %s to %s" % + (idx, src_node, dst_node.name)) if dev: result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name, instance, cluster_name, idx) - if result.failed or not result.data: - self.LogWarning("Could not export disk/%d from node %s to" - " node %s", idx, src_node, dst_node.name) - msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg() + msg = result.fail_msg + if msg: + self.LogWarning("Could not export disk/%s from node %s to" + " node %s: %s", idx, src_node, dst_node.name, msg) + dresults.append(False) + else: + dresults.append(True) + msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg if msg: self.LogWarning("Could not remove snapshot for disk/%d from node" " %s: %s", idx, src_node, msg) + else: + dresults.append(False) + feedback_fn("Finalizing export on %s" % dst_node.name) result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks) - if result.failed or not result.data: - self.LogWarning("Could not finalize export for instance %s on node %s", - instance.name, dst_node.name) + fin_resu = True + msg = result.fail_msg + if msg: + self.LogWarning("Could not finalize export for instance %s" + " on node %s: %s", instance.name, dst_node.name, msg) + fin_resu = False nodelist = self.cfg.GetNodeList() nodelist.remove(dst_node.name) @@ -6427,15 +7829,19 @@ class LUExportInstance(LogicalUnit): # on one-node clusters nodelist will be empty after the removal # if we proceed the backup would be removed because OpQueryExports # substitutes an empty list with the full cluster node list. + iname = instance.name if nodelist: + feedback_fn("Removing old exports for instance %s" % iname) exportlist = self.rpc.call_export_list(nodelist) for node in exportlist: - if exportlist[node].failed: + if exportlist[node].fail_msg: continue - if instance.name in exportlist[node].data: - if not self.rpc.call_export_remove(node, instance.name): + if iname in exportlist[node].payload: + msg = self.rpc.call_export_remove(node, iname).fail_msg + if msg: self.LogWarning("Could not remove older export for instance %s" - " on node %s", instance.name, node) + " on node %s: %s", iname, node, msg) + return fin_resu, dresults class LURemoveExport(NoHooksLU): @@ -6469,19 +7875,21 @@ class LURemoveExport(NoHooksLU): fqdn_warn = True instance_name = self.op.instance_name - exportlist = self.rpc.call_export_list(self.acquired_locks[ - locking.LEVEL_NODE]) + locked_nodes = self.acquired_locks[locking.LEVEL_NODE] + exportlist = self.rpc.call_export_list(locked_nodes) found = False for node in exportlist: - if exportlist[node].failed: - self.LogWarning("Failed to query node %s, continuing" % node) + msg = exportlist[node].fail_msg + if msg: + self.LogWarning("Failed to query node %s (continuing): %s", node, msg) continue - if instance_name in exportlist[node].data: + if instance_name in exportlist[node].payload: found = True result = self.rpc.call_export_remove(node, instance_name) - if result.failed or not result.data: + msg = result.fail_msg + if msg: logging.error("Could not remove export for instance %s" - " on node %s", instance_name, node) + " on node %s: %s", instance_name, node, msg) if fqdn_warn and not found: feedback_fn("Export not found. If trying to remove an export belonging" @@ -6693,13 +8101,8 @@ class LUTestDelay(NoHooksLU): raise errors.OpExecError("Error during master delay test") if self.op.on_nodes: result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration) - if not result: - raise errors.OpExecError("Complete failure from rpc call") for node, node_result in result.items(): - node_result.Raise() - if not node_result.data: - raise errors.OpExecError("Failure during rpc call to node %s," - " result: %s" % (node, node_result.data)) + node_result.Raise("Failure during rpc call to node %s" % node) class IAllocator(object): @@ -6723,8 +8126,9 @@ class IAllocator(object): "relocate_from", ] - def __init__(self, lu, mode, name, **kwargs): - self.lu = lu + def __init__(self, cfg, rpc, mode, name, **kwargs): + self.cfg = cfg + self.rpc = rpc # init buffer variables self.in_text = self.out_text = self.in_data = self.out_data = None # init all input fields so that pylint is happy @@ -6762,7 +8166,7 @@ class IAllocator(object): This is the data that is independent of the actual operation. """ - cfg = self.lu.cfg + cfg = self.cfg cluster_info = cfg.GetClusterInfo() # cluster data data = { @@ -6784,10 +8188,11 @@ class IAllocator(object): elif self.mode == constants.IALLOCATOR_MODE_RELOC: hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor - node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(), - hypervisor_name) - node_iinfo = self.lu.rpc.call_all_instances_info(node_list, - cluster_info.enabled_hypervisors) + node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(), + hypervisor_name) + node_iinfo = \ + self.rpc.call_all_instances_info(node_list, + cluster_info.enabled_hypervisors) for nname, nresult in node_data.items(): # first fill in static (config-based) values ninfo = cfg.GetNodeInfo(nname) @@ -6800,30 +8205,30 @@ class IAllocator(object): "master_candidate": ninfo.master_candidate, } - if not ninfo.offline: - nresult.Raise() - if not isinstance(nresult.data, dict): - raise errors.OpExecError("Can't get data for node %s" % nname) - remote_info = nresult.data + if not (ninfo.offline or ninfo.drained): + nresult.Raise("Can't get data for node %s" % nname) + node_iinfo[nname].Raise("Can't get node instance info from node %s" % + nname) + remote_info = nresult.payload + for attr in ['memory_total', 'memory_free', 'memory_dom0', 'vg_size', 'vg_free', 'cpu_total']: if attr not in remote_info: raise errors.OpExecError("Node '%s' didn't return attribute" " '%s'" % (nname, attr)) - try: - remote_info[attr] = int(remote_info[attr]) - except ValueError, err: + if not isinstance(remote_info[attr], int): raise errors.OpExecError("Node '%s' returned invalid value" - " for '%s': %s" % (nname, attr, err)) + " for '%s': %s" % + (nname, attr, remote_info[attr])) # compute memory used by primary instances i_p_mem = i_p_up_mem = 0 for iinfo, beinfo in i_list: if iinfo.primary_node == nname: i_p_mem += beinfo[constants.BE_MEMORY] - if iinfo.name not in node_iinfo[nname].data: + if iinfo.name not in node_iinfo[nname].payload: i_used_mem = 0 else: - i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory']) + i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory']) i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem remote_info['memory_free'] -= max(0, i_mem_diff) @@ -6849,8 +8254,19 @@ class IAllocator(object): # instance data instance_data = {} for iinfo, beinfo in i_list: - nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge} - for n in iinfo.nics] + nic_data = [] + for nic in iinfo.nics: + filled_params = objects.FillDict( + cluster_info.nicparams[constants.PP_DEFAULT], + nic.nicparams) + nic_dict = {"mac": nic.mac, + "ip": nic.ip, + "mode": filled_params[constants.NIC_MODE], + "link": filled_params[constants.NIC_LINK], + } + if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED: + nic_dict["bridge"] = filled_params[constants.NIC_LINK] + nic_data.append(nic_dict) pir = { "tags": list(iinfo.GetTags()), "admin_up": iinfo.admin_up, @@ -6914,7 +8330,7 @@ class IAllocator(object): done. """ - instance = self.lu.cfg.GetInstanceInfo(self.name) + instance = self.cfg.GetInstanceInfo(self.name) if instance is None: raise errors.ProgrammerError("Unknown instance '%s' passed to" " IAllocator" % self.name) @@ -6956,23 +8372,12 @@ class IAllocator(object): """ if call_fn is None: - call_fn = self.lu.rpc.call_iallocator_runner - data = self.in_text - - result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text) - result.Raise() - - if not isinstance(result.data, (list, tuple)) or len(result.data) != 4: - raise errors.OpExecError("Invalid result from master iallocator runner") + call_fn = self.rpc.call_iallocator_runner - rcode, stdout, stderr, fail = result.data + result = call_fn(self.cfg.GetMasterNode(), name, self.in_text) + result.Raise("Failure while running the iallocator script") - if rcode == constants.IARUN_NOTFOUND: - raise errors.OpExecError("Can't find allocator '%s'" % name) - elif rcode == constants.IARUN_FAILURE: - raise errors.OpExecError("Instance allocator call failed: %s," - " output: %s" % (fail, stdout+stderr)) - self.out_text = stdout + self.out_text = result.payload if validate: self._ValidateResult() @@ -7073,7 +8478,7 @@ class LUTestAllocator(NoHooksLU): """ if self.op.mode == constants.IALLOCATOR_MODE_ALLOC: - ial = IAllocator(self, + ial = IAllocator(self.cfg, self.rpc, mode=self.op.mode, name=self.op.name, mem_size=self.op.mem_size, @@ -7086,7 +8491,7 @@ class LUTestAllocator(NoHooksLU): hypervisor=self.op.hypervisor, ) else: - ial = IAllocator(self, + ial = IAllocator(self.cfg, self.rpc, mode=self.op.mode, name=self.op.name, relocate_from=list(self.relocate_from),