X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/6d2e83d5cf81ef128ab5e2ad2fdc406b980f2487..0889602611398a76c6d72f1196e51ff77402cad8:/lib/cmdlib.py diff --git a/lib/cmdlib.py b/lib/cmdlib.py index 01ad1fd..03660c1 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -25,14 +25,11 @@ import os import os.path -import sha import time -import tempfile import re import platform import logging import copy -import random from ganeti import ssh from ganeti import utils @@ -41,7 +38,6 @@ from ganeti import hypervisor from ganeti import locking from ganeti import constants from ganeti import objects -from ganeti import opcodes from ganeti import serializer from ganeti import ssconf @@ -60,6 +56,9 @@ class LogicalUnit(object): Note that all commands require root permissions. + @ivar dry_run_result: the value (if any) that will be returned to the caller + in dry-run mode (signalled by opcode dry_run parameter) + """ HPATH = None HTYPE = None @@ -90,6 +89,8 @@ class LogicalUnit(object): # logging self.LogWarning = processor.LogWarning self.LogInfo = processor.LogInfo + # support for dry-run + self.dry_run_result = None for attr_name in self._OP_REQP: attr_val = getattr(op, attr_name, None) @@ -392,8 +393,8 @@ def _GetWantedInstances(lu, instances): wanted.append(instance) else: - wanted = lu.cfg.GetInstanceList() - return utils.NiceSort(wanted) + wanted = utils.NiceSort(lu.cfg.GetInstanceList()) + return wanted def _CheckOutputFields(static, dynamic, selected): @@ -434,15 +435,28 @@ def _CheckNodeOnline(lu, node): @param lu: the LU on behalf of which we make the check @param node: the node to check - @raise errors.OpPrereqError: if the nodes is offline + @raise errors.OpPrereqError: if the node is offline """ if lu.cfg.GetNodeInfo(node).offline: raise errors.OpPrereqError("Can't use offline node %s" % node) +def _CheckNodeNotDrained(lu, node): + """Ensure that a given node is not drained. + + @param lu: the LU on behalf of which we make the check + @param node: the node to check + @raise errors.OpPrereqError: if the node is drained + + """ + if lu.cfg.GetNodeInfo(node).drained: + raise errors.OpPrereqError("Can't use drained node %s" % node) + + def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, - memory, vcpus, nics): + memory, vcpus, nics, disk_template, disks, + bep, hvp, hypervisor): """Builds instance related env variables for hooks This builds the hook environment from individual variables. @@ -455,45 +469,100 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, @param secondary_nodes: list of secondary nodes as strings @type os_type: string @param os_type: the name of the instance's OS - @type status: string - @param status: the desired status of the instances + @type status: boolean + @param status: the should_run status of the instance @type memory: string @param memory: the memory size of the instance @type vcpus: string @param vcpus: the count of VCPUs the instance has @type nics: list - @param nics: list of tuples (ip, bridge, mac) representing - the NICs the instance has + @param nics: list of tuples (ip, mac, mode, link) representing + the NICs the instance has + @type disk_template: string + @param disk_template: the distk template of the instance + @type disks: list + @param disks: the list of (size, mode) pairs + @type bep: dict + @param bep: the backend parameters for the instance + @type hvp: dict + @param hvp: the hypervisor parameters for the instance + @type hypervisor: string + @param hypervisor: the hypervisor for the instance @rtype: dict @return: the hook environment for this instance """ + if status: + str_status = "up" + else: + str_status = "down" env = { "OP_TARGET": name, "INSTANCE_NAME": name, "INSTANCE_PRIMARY": primary_node, "INSTANCE_SECONDARIES": " ".join(secondary_nodes), "INSTANCE_OS_TYPE": os_type, - "INSTANCE_STATUS": status, + "INSTANCE_STATUS": str_status, "INSTANCE_MEMORY": memory, "INSTANCE_VCPUS": vcpus, + "INSTANCE_DISK_TEMPLATE": disk_template, + "INSTANCE_HYPERVISOR": hypervisor, } if nics: nic_count = len(nics) - for idx, (ip, bridge, mac) in enumerate(nics): + for idx, (ip, mac, mode, link) in enumerate(nics): if ip is None: ip = "" env["INSTANCE_NIC%d_IP" % idx] = ip - env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge - env["INSTANCE_NIC%d_HWADDR" % idx] = mac + env["INSTANCE_NIC%d_MAC" % idx] = mac + env["INSTANCE_NIC%d_MODE" % idx] = mode + env["INSTANCE_NIC%d_LINK" % idx] = link + if mode == constants.NIC_MODE_BRIDGED: + env["INSTANCE_NIC%d_BRIDGE" % idx] = link else: nic_count = 0 env["INSTANCE_NIC_COUNT"] = nic_count + if disks: + disk_count = len(disks) + for idx, (size, mode) in enumerate(disks): + env["INSTANCE_DISK%d_SIZE" % idx] = size + env["INSTANCE_DISK%d_MODE" % idx] = mode + else: + disk_count = 0 + + env["INSTANCE_DISK_COUNT"] = disk_count + + for source, kind in [(bep, "BE"), (hvp, "HV")]: + for key, value in source.items(): + env["INSTANCE_%s_%s" % (kind, key)] = value + return env +def _NICListToTuple(lu, nics): + """Build a list of nic information tuples. + + This list is suitable to be passed to _BuildInstanceHookEnv or as a return + value in LUQueryInstanceData. + + @type lu: L{LogicalUnit} + @param lu: the logical unit on whose behalf we execute + @type nics: list of L{objects.NIC} + @param nics: list of nics to convert to hooks tuples + + """ + hooks_nics = [] + c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT] + for nic in nics: + ip = nic.ip + mac = nic.mac + filled_params = objects.FillDict(c_nicparams, nic.nicparams) + mode = filled_params[constants.NIC_MODE] + link = filled_params[constants.NIC_LINK] + hooks_nics.append((ip, mac, mode, link)) + return hooks_nics def _BuildInstanceHookEnvByObject(lu, instance, override=None): """Builds instance related env variables for hooks from an object. @@ -510,16 +579,23 @@ def _BuildInstanceHookEnvByObject(lu, instance, override=None): @return: the hook environment dictionary """ - bep = lu.cfg.GetClusterInfo().FillBE(instance) + cluster = lu.cfg.GetClusterInfo() + bep = cluster.FillBE(instance) + hvp = cluster.FillHV(instance) args = { 'name': instance.name, 'primary_node': instance.primary_node, 'secondary_nodes': instance.secondary_nodes, 'os_type': instance.os, - 'status': instance.os, + 'status': instance.admin_up, 'memory': bep[constants.BE_MEMORY], 'vcpus': bep[constants.BE_VCPUS], - 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics], + 'nics': _NICListToTuple(lu, instance.nics), + 'disk_template': instance.disk_template, + 'disks': [(disk.size, disk.mode) for disk in instance.disks], + 'bep': bep, + 'hvp': hvp, + 'hypervisor': instance.hypervisor, } if override: args.update(override) @@ -542,18 +618,29 @@ def _AdjustCandidatePool(lu): (mc_now, mc_max)) -def _CheckInstanceBridgesExist(lu, instance): +def _CheckNicsBridgesExist(lu, target_nics, target_node, + profile=constants.PP_DEFAULT): + """Check that the brigdes needed by a list of nics exist. + + """ + c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile] + paramslist = [objects.FillDict(c_nicparams, nic.nicparams) + for nic in target_nics] + brlist = [params[constants.NIC_LINK] for params in paramslist + if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED] + if brlist: + result = lu.rpc.call_bridges_exist(target_node, brlist) + result.Raise("Error checking bridges on destination node '%s'" % + target_node, prereq=True) + + +def _CheckInstanceBridgesExist(lu, instance, node=None): """Check that the brigdes needed by an instance exist. """ - # check bridges existance - brlist = [nic.bridge for nic in instance.nics] - result = lu.rpc.call_bridges_exist(instance.primary_node, brlist) - result.Raise() - if not result.data: - raise errors.OpPrereqError("One or more target bridges %s does not" - " exist on destination node '%s'" % - (brlist, instance.primary_node)) + if node is None: + node = instance.primary_node + _CheckNicsBridgesExist(lu, instance.nics, node) class LUDestroyCluster(NoHooksLU): @@ -587,9 +674,7 @@ class LUDestroyCluster(NoHooksLU): """ master = self.cfg.GetMasterNode() result = self.rpc.call_node_stop_master(master, False) - result.Raise() - if not result.data: - raise errors.OpExecError("Could not disable the master role") + result.Raise("Could not disable the master role") priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS) utils.CreateBackup(priv_key) utils.CreateBackup(pub_key) @@ -614,7 +699,7 @@ class LUVerifyCluster(LogicalUnit): def _VerifyNode(self, nodeinfo, file_list, local_cksum, node_result, feedback_fn, master_files, - drbd_map): + drbd_map, vg_name): """Run multiple tests against a node. Test list: @@ -634,6 +719,7 @@ class LUVerifyCluster(LogicalUnit): @param drbd_map: the useddrbd minors for this node, in form of minor: (instance, must_exist) which correspond to instances and their running status + @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName()) """ node = nodeinfo.name @@ -646,29 +732,39 @@ class LUVerifyCluster(LogicalUnit): # compares ganeti version local_version = constants.PROTOCOL_VERSION remote_version = node_result.get('version', None) - if not remote_version: + if not (remote_version and isinstance(remote_version, (list, tuple)) and + len(remote_version) == 2): feedback_fn(" - ERROR: connection to %s failed" % (node)) return True - if local_version != remote_version: - feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" % - (local_version, node, remote_version)) + if local_version != remote_version[0]: + feedback_fn(" - ERROR: incompatible protocol versions: master %s," + " node %s %s" % (local_version, node, remote_version[0])) return True - # checks vg existance and size > 20G + # node seems compatible, we can actually try to look into its results bad = False - vglist = node_result.get(constants.NV_VGLIST, None) - if not vglist: - feedback_fn(" - ERROR: unable to check volume groups on node %s." % - (node,)) - bad = True - else: - vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(), - constants.MIN_VG_SIZE) - if vgstatus: - feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node)) + + # full package version + if constants.RELEASE_VERSION != remote_version[1]: + feedback_fn(" - WARNING: software version mismatch: master %s," + " node %s %s" % + (constants.RELEASE_VERSION, node, remote_version[1])) + + # checks vg existence and size > 20G + if vg_name is not None: + vglist = node_result.get(constants.NV_VGLIST, None) + if not vglist: + feedback_fn(" - ERROR: unable to check volume groups on node %s." % + (node,)) bad = True + else: + vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name, + constants.MIN_VG_SIZE) + if vgstatus: + feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node)) + bad = True # checks config file checksum @@ -730,16 +826,22 @@ class LUVerifyCluster(LogicalUnit): (hv_name, hv_result)) # check used drbd list - used_minors = node_result.get(constants.NV_DRBDLIST, []) - for minor, (iname, must_exist) in drbd_map.items(): - if minor not in used_minors and must_exist: - feedback_fn(" - ERROR: drbd minor %d of instance %s is not active" % - (minor, iname)) - bad = True - for minor in used_minors: - if minor not in drbd_map: - feedback_fn(" - ERROR: unallocated drbd minor %d is in use" % minor) - bad = True + if vg_name is not None: + used_minors = node_result.get(constants.NV_DRBDLIST, []) + if not isinstance(used_minors, (tuple, list)): + feedback_fn(" - ERROR: cannot parse drbd status file: %s" % + str(used_minors)) + else: + for minor, (iname, must_exist) in drbd_map.items(): + if minor not in used_minors and must_exist: + feedback_fn(" - ERROR: drbd minor %d of instance %s is" + " not active" % (minor, iname)) + bad = True + for minor in used_minors: + if minor not in drbd_map: + feedback_fn(" - ERROR: unallocated drbd minor %d is in use" % + minor) + bad = True return bad @@ -768,7 +870,7 @@ class LUVerifyCluster(LogicalUnit): (volume, node)) bad = True - if not instanceconfig.status == 'down': + if instanceconfig.admin_up: if ((node_current not in node_instance or not instance in node_instance[node_current]) and node_current not in n_offline): @@ -866,8 +968,12 @@ class LUVerifyCluster(LogicalUnit): """ all_nodes = self.cfg.GetNodeList() - # TODO: populate the environment with useful information for verify hooks - env = {} + env = { + "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()) + } + for node in self.cfg.GetAllNodesInfo().values(): + env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags()) + return env, [], all_nodes def Exec(self, feedback_fn): @@ -889,6 +995,7 @@ class LUVerifyCluster(LogicalUnit): i_non_redundant = [] # Non redundant instances i_non_a_balanced = [] # Non auto-balanced instances n_offline = [] # List of offline nodes + n_drained = [] # List of nodes being drained node_volume = {} node_instance = {} node_info = {} @@ -914,13 +1021,14 @@ class LUVerifyCluster(LogicalUnit): constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip) for node in nodeinfo if not node.offline], - constants.NV_LVLIST: vg_name, constants.NV_INSTANCELIST: hypervisors, - constants.NV_VGLIST: None, constants.NV_VERSION: None, constants.NV_HVINFO: self.cfg.GetHypervisorType(), - constants.NV_DRBDLIST: None, } + if vg_name is not None: + node_verify_param[constants.NV_VGLIST] = None + node_verify_param[constants.NV_LVLIST] = vg_name + node_verify_param[constants.NV_DRBDLIST] = None all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param, self.cfg.GetClusterName()) @@ -930,7 +1038,6 @@ class LUVerifyCluster(LogicalUnit): for node_i in nodeinfo: node = node_i.name - nresult = all_nvinfo[node].data if node_i.offline: feedback_fn("* Skipping offline node %s" % (node,)) @@ -941,28 +1048,43 @@ class LUVerifyCluster(LogicalUnit): ntype = "master" elif node_i.master_candidate: ntype = "master candidate" + elif node_i.drained: + ntype = "drained" + n_drained.append(node) else: ntype = "regular" feedback_fn("* Verifying node %s (%s)" % (node, ntype)) - if all_nvinfo[node].failed or not isinstance(nresult, dict): - feedback_fn(" - ERROR: connection to %s failed" % (node,)) + msg = all_nvinfo[node].fail_msg + if msg: + feedback_fn(" - ERROR: while contacting node %s: %s" % (node, msg)) bad = True continue + nresult = all_nvinfo[node].payload node_drbd = {} for minor, instance in all_drbd_map[node].items(): - instance = instanceinfo[instance] - node_drbd[minor] = (instance.name, instance.status == "up") + if instance not in instanceinfo: + feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" % + instance) + # ghost instance should not be running, but otherwise we + # don't give double warnings (both ghost instance and + # unallocated minor in use) + node_drbd[minor] = (instance, False) + else: + instance = instanceinfo[instance] + node_drbd[minor] = (instance.name, instance.admin_up) result = self._VerifyNode(node_i, file_names, local_checksums, nresult, feedback_fn, master_files, - node_drbd) + node_drbd, vg_name) bad = bad or result lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data") - if isinstance(lvdata, basestring): + if vg_name is None: + node_volume[node] = {} + elif isinstance(lvdata, basestring): feedback_fn(" - ERROR: LVM problem on node %s: %s" % - (node, lvdata.encode('string_escape'))) + (node, utils.SafeEncode(lvdata))) bad = True node_volume[node] = {} elif not isinstance(lvdata, dict): @@ -992,7 +1114,6 @@ class LUVerifyCluster(LogicalUnit): try: node_info[node] = { "mfree": int(nodeinfo['memory_free']), - "dfree": int(nresult[constants.NV_VGLIST][vg_name]), "pinst": [], "sinst": [], # dictionary holding all instances this node is secondary for, @@ -1003,8 +1124,19 @@ class LUVerifyCluster(LogicalUnit): # secondary. "sinst-by-pnode": {}, } - except ValueError: - feedback_fn(" - ERROR: invalid value returned from node %s" % (node,)) + # FIXME: devise a free space model for file based instances as well + if vg_name is not None: + if (constants.NV_VGLIST not in nresult or + vg_name not in nresult[constants.NV_VGLIST]): + feedback_fn(" - ERROR: node %s didn't return data for the" + " volume group '%s' - it is either missing or broken" % + (node, vg_name)) + bad = True + continue + node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name]) + except (ValueError, KeyError): + feedback_fn(" - ERROR: invalid nodeinfo value returned" + " from node %s" % (node,)) bad = True continue @@ -1093,6 +1225,9 @@ class LUVerifyCluster(LogicalUnit): if n_offline: feedback_fn(" - NOTICE: %d offline node(s) found." % len(n_offline)) + if n_drained: + feedback_fn(" - NOTICE: %d drained node(s) found." % len(n_drained)) + return not bad def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result): @@ -1123,14 +1258,16 @@ class LUVerifyCluster(LogicalUnit): for node_name in hooks_results: show_node_header = True res = hooks_results[node_name] - if res.failed or res.data is False or not isinstance(res.data, list): + msg = res.fail_msg + if msg: if res.offline: # no need to warn or set fail return value continue - feedback_fn(" Communication failure in hooks execution") + feedback_fn(" Communication failure in hooks execution: %s" % + msg) lu_result = 1 continue - for script, hkr, output in res.data: + for script, hkr, output in res.payload: if hkr == constants.HKR_FAIL: # The node header is only shown once, if there are # failing hooks on that node @@ -1170,8 +1307,13 @@ class LUVerifyDisks(NoHooksLU): def Exec(self, feedback_fn): """Verify integrity of cluster disks. + @rtype: tuple of three items + @return: a tuple of (dict of node-to-node_error, list of instances + which need activate-disks, dict of instance: (node, volume) for + missing volumes + """ - result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {} + result = res_nodes, res_instances, res_missing = {}, [], {} vg_name = self.cfg.GetVGName() nodes = utils.NiceSort(self.cfg.GetNodeList()) @@ -1181,7 +1323,7 @@ class LUVerifyDisks(NoHooksLU): nv_dict = {} for inst in instances: inst_lvs = {} - if (inst.status != "up" or + if (not inst.admin_up or inst.disk_template not in constants.DTS_NET_MIRROR): continue inst.MapLVsByNode(inst_lvs) @@ -1198,23 +1340,17 @@ class LUVerifyDisks(NoHooksLU): to_act = set() for node in nodes: # node_volume - lvs = node_lvs[node] - if lvs.failed: - if not lvs.offline: - self.LogWarning("Connection to node %s failed: %s" % - (node, lvs.data)) + node_res = node_lvs[node] + if node_res.offline: continue - lvs = lvs.data - if isinstance(lvs, basestring): - logging.warning("Error enumerating LVs on node %s: %s", node, lvs) - res_nlvm[node] = lvs - elif not isinstance(lvs, dict): - logging.warning("Connection to node %s failed or invalid data" - " returned", node) - res_nodes.append(node) + msg = node_res.fail_msg + if msg: + logging.warning("Error enumerating LVs on node %s: %s", node, msg) + res_nodes[node] = msg continue - for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems(): + lvs = node_res.payload + for lv_name, (_, lv_inactive, lv_online) in lvs.items(): inst = nv_dict.pop((node, lv_name), None) if (not lv_online and inst is not None and inst.name not in res_instances): @@ -1280,8 +1416,7 @@ class LURenameCluster(LogicalUnit): # shutdown the master IP master = self.cfg.GetMasterNode() result = self.rpc.call_node_stop_master(master, False) - if result.failed or not result.data: - raise errors.OpExecError("Could not disable the master role") + result.Raise("Could not disable the master role") try: cluster = self.cfg.GetClusterInfo() @@ -1299,15 +1434,18 @@ class LURenameCluster(LogicalUnit): result = self.rpc.call_upload_file(node_list, constants.SSH_KNOWN_HOSTS_FILE) for to_node, to_result in result.iteritems(): - if to_result.failed or not to_result.data: - logging.error("Copy of file %s to node %s failed", - constants.SSH_KNOWN_HOSTS_FILE, to_node) + msg = to_result.fail_msg + if msg: + msg = ("Copy of file %s to node %s failed: %s" % + (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg)) + self.proc.LogWarning(msg) finally: result = self.rpc.call_node_start_master(master, False) - if result.failed or not result.data: + msg = result.fail_msg + if msg: self.LogWarning("Could not re-enable the master role on" - " the master, please restart manually.") + " the master, please restart manually: %s", msg) def _RecursiveCheckIfLVMBased(disk): @@ -1335,7 +1473,7 @@ class LUSetClusterParams(LogicalUnit): _OP_REQP = [] REQ_BGL = False - def CheckParameters(self): + def CheckArguments(self): """Check parameters """ @@ -1344,7 +1482,7 @@ class LUSetClusterParams(LogicalUnit): if self.op.candidate_pool_size is not None: try: self.op.candidate_pool_size = int(self.op.candidate_pool_size) - except ValueError, err: + except (ValueError, TypeError), err: raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" % str(err)) if self.op.candidate_pool_size < 1: @@ -1376,8 +1514,6 @@ class LUSetClusterParams(LogicalUnit): if the given volume group is valid. """ - # FIXME: This only works because there is only one parameter that can be - # changed or removed. if self.op.vg_name is not None and not self.op.vg_name: instances = self.cfg.GetAllInstancesInfo().values() for inst in instances: @@ -1392,11 +1528,13 @@ class LUSetClusterParams(LogicalUnit): if self.op.vg_name: vglist = self.rpc.call_vg_list(node_list) for node in node_list: - if vglist[node].failed: + msg = vglist[node].fail_msg + if msg: # ignoring down node - self.LogWarning("Node %s unreachable/error, ignoring" % node) + self.LogWarning("Error while gathering data on node %s" + " (ignoring node): %s", node, msg) continue - vgstatus = utils.CheckVolumeGroupSize(vglist[node].data, + vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload, self.op.vg_name, constants.MIN_VG_SIZE) if vgstatus: @@ -1404,14 +1542,20 @@ class LUSetClusterParams(LogicalUnit): (node, vgstatus)) self.cluster = cluster = self.cfg.GetClusterInfo() - # validate beparams changes + # validate params changes if self.op.beparams: - utils.CheckBEParams(self.op.beparams) - self.new_beparams = cluster.FillDict( - cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams) + utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES) + self.new_beparams = objects.FillDict( + cluster.beparams[constants.PP_DEFAULT], self.op.beparams) + + if self.op.nicparams: + utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES) + self.new_nicparams = objects.FillDict( + cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams) + objects.NIC.CheckParameterSyntax(self.new_nicparams) # hypervisor list/parameters - self.new_hvparams = cluster.FillDict(cluster.hvparams, {}) + self.new_hvparams = objects.FillDict(cluster.hvparams, {}) if self.op.hvparams: if not isinstance(self.op.hvparams, dict): raise errors.OpPrereqError("Invalid 'hvparams' parameter on input") @@ -1434,6 +1578,7 @@ class LUSetClusterParams(LogicalUnit): hv_name in self.op.enabled_hypervisors)): # either this is a new hypervisor, or its parameters have changed hv_class = hypervisor.GetHypervisor(hv_name) + utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES) hv_class.CheckParameterSyntax(hv_params) _CheckHVParams(self, node_list, hv_name, hv_params) @@ -1442,8 +1587,11 @@ class LUSetClusterParams(LogicalUnit): """ if self.op.vg_name is not None: - if self.op.vg_name != self.cfg.GetVGName(): - self.cfg.SetVGName(self.op.vg_name) + new_volume = self.op.vg_name + if not new_volume: + new_volume = None + if new_volume != self.cfg.GetVGName(): + self.cfg.SetVGName(new_volume) else: feedback_fn("Cluster LVM configuration already in desired" " state, not changing") @@ -1452,7 +1600,10 @@ class LUSetClusterParams(LogicalUnit): if self.op.enabled_hypervisors is not None: self.cluster.enabled_hypervisors = self.op.enabled_hypervisors if self.op.beparams: - self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams + self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams + if self.op.nicparams: + self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams + if self.op.candidate_pool_size is not None: self.cluster.candidate_pool_size = self.op.candidate_pool_size @@ -1464,6 +1615,48 @@ class LUSetClusterParams(LogicalUnit): _AdjustCandidatePool(self) +def _RedistributeAncillaryFiles(lu, additional_nodes=None): + """Distribute additional files which are part of the cluster configuration. + + ConfigWriter takes care of distributing the config and ssconf files, but + there are more files which should be distributed to all nodes. This function + makes sure those are copied. + + @param lu: calling logical unit + @param additional_nodes: list of nodes not in the config to distribute to + + """ + # 1. Gather target nodes + myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode()) + dist_nodes = lu.cfg.GetNodeList() + if additional_nodes is not None: + dist_nodes.extend(additional_nodes) + if myself.name in dist_nodes: + dist_nodes.remove(myself.name) + # 2. Gather files to distribute + dist_files = set([constants.ETC_HOSTS, + constants.SSH_KNOWN_HOSTS_FILE, + constants.RAPI_CERT_FILE, + constants.RAPI_USERS_FILE, + ]) + + enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors + for hv_name in enabled_hypervisors: + hv_class = hypervisor.GetHypervisor(hv_name) + dist_files.update(hv_class.GetAncillaryFiles()) + + # 3. Perform the files upload + for fname in dist_files: + if os.path.exists(fname): + result = lu.rpc.call_upload_file(dist_nodes, fname) + for to_node, to_result in result.items(): + msg = to_result.fail_msg + if msg: + msg = ("Copy of file %s to node %s failed: %s" % + (fname, to_node, msg)) + lu.proc.LogWarning(msg) + + class LURedistributeConfig(NoHooksLU): """Force the redistribution of cluster configuration. @@ -1489,6 +1682,7 @@ class LURedistributeConfig(NoHooksLU): """ self.cfg.Update(self.cfg.GetClusterInfo()) + _RedistributeAncillaryFiles(self) def _WaitForSync(lu, instance, oneshot=False, unlock=False): @@ -1507,20 +1701,22 @@ def _WaitForSync(lu, instance, oneshot=False, unlock=False): lu.cfg.SetDiskID(dev, node) retries = 0 + degr_retries = 10 # in seconds, as we sleep 1 second each time while True: max_time = 0 done = True cumul_degraded = False rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks) - if rstats.failed or not rstats.data: - lu.LogWarning("Can't get any data from node %s", node) + msg = rstats.fail_msg + if msg: + lu.LogWarning("Can't get any data from node %s: %s", node, msg) retries += 1 if retries >= 10: raise errors.RemoteError("Can't contact node %s for mirror data," " aborting." % node) time.sleep(6) continue - rstats = rstats.data + rstats = rstats.payload retries = 0 for i, mstat in enumerate(rstats): if mstat is None: @@ -1539,6 +1735,16 @@ def _WaitForSync(lu, instance, oneshot=False, unlock=False): rem_time = "no time estimate" lu.proc.LogInfo("- device %s: %5.2f%% done, %s" % (instance.disks[i].iv_name, perc_done, rem_time)) + + # if we're done but degraded, let's do a few small retries, to + # make sure we see a stable and not transient situation; therefore + # we force restart of the loop + if (done or oneshot) and cumul_degraded and degr_retries > 0: + logging.info("Degraded disks found, %d retries left", degr_retries) + degr_retries -= 1 + time.sleep(1) + continue + if done or oneshot: break @@ -1566,11 +1772,15 @@ def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False): result = True if on_primary or dev.AssembleOnSecondary(): rstats = lu.rpc.call_blockdev_find(node, dev) - if rstats.failed or not rstats.data: - logging.warning("Node %s: disk degraded, not found or node down", node) + msg = rstats.fail_msg + if msg: + lu.LogWarning("Can't find disk on node %s: %s", node, msg) + result = False + elif not rstats.payload: + lu.LogWarning("Can't find disk on node %s", node) result = False else: - result = result and (not rstats.data[idx]) + result = result and (not rstats.payload[idx]) if dev.children: for child in dev.children: result = result and _CheckDiskConsistency(lu, child, node, on_primary) @@ -1596,9 +1806,11 @@ class LUDiagnoseOS(NoHooksLU): selected=self.op.output_fields) # Lock all nodes, in shared mode + # Temporary removal of locks, should be reverted later + # TODO: reintroduce locks when they are lighter-weight self.needed_locks = {} - self.share_locks[locking.LEVEL_NODE] = 1 - self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + #self.share_locks[locking.LEVEL_NODE] = 1 + #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET def CheckPrereq(self): """Check prerequisites. @@ -1613,51 +1825,54 @@ class LUDiagnoseOS(NoHooksLU): @param rlist: a map with node names as keys and OS objects as values @rtype: dict - @returns: a dictionary with osnames as keys and as value another map, with - nodes as keys and list of OS objects as values, eg:: + @return: a dictionary with osnames as keys and as value another map, with + nodes as keys and tuples of (path, status, diagnose) as values, eg:: - {"debian-etch": {"node1": [,...], - "node2": [,]} + {"debian-etch": {"node1": [(/usr/lib/..., True, ""), + (/srv/..., False, "invalid api")], + "node2": [(/srv/..., True, "")]} } """ all_os = {} - for node_name, nr in rlist.iteritems(): - if nr.failed or not nr.data: + # we build here the list of nodes that didn't fail the RPC (at RPC + # level), so that nodes with a non-responding node daemon don't + # make all OSes invalid + good_nodes = [node_name for node_name in rlist + if not rlist[node_name].fail_msg] + for node_name, nr in rlist.items(): + if nr.fail_msg or not nr.payload: continue - for os_obj in nr.data: - if os_obj.name not in all_os: + for name, path, status, diagnose in nr.payload: + if name not in all_os: # build a list of nodes for this os containing empty lists # for each node in node_list - all_os[os_obj.name] = {} - for nname in node_list: - all_os[os_obj.name][nname] = [] - all_os[os_obj.name][node_name].append(os_obj) + all_os[name] = {} + for nname in good_nodes: + all_os[name][nname] = [] + all_os[name][node_name].append((path, status, diagnose)) return all_os def Exec(self, feedback_fn): """Compute the list of OSes. """ - node_list = self.acquired_locks[locking.LEVEL_NODE] - valid_nodes = [node for node in self.cfg.GetOnlineNodeList() - if node in node_list] + valid_nodes = [node for node in self.cfg.GetOnlineNodeList()] node_data = self.rpc.call_os_diagnose(valid_nodes) - if node_data == False: - raise errors.OpExecError("Can't gather the list of OSes") pol = self._DiagnoseByOS(valid_nodes, node_data) output = [] - for os_name, os_data in pol.iteritems(): + for os_name, os_data in pol.items(): row = [] for field in self.op.output_fields: if field == "name": val = os_name elif field == "valid": - val = utils.all([osl and osl[0] for osl in os_data.values()]) + val = utils.all([osl and osl[0][1] for osl in os_data.values()]) elif field == "node_status": + # this is just a copy of the dict val = {} - for node_name, nos_list in os_data.iteritems(): - val[node_name] = [(v.status, v.path) for v in nos_list] + for node_name, nos_list in os_data.items(): + val[node_name] = nos_list else: raise errors.ParameterError(field) row.append(val) @@ -1729,7 +1944,11 @@ class LURemoveNode(LogicalUnit): self.context.RemoveNode(node.name) - self.rpc.call_node_leave_cluster(node.name) + result = self.rpc.call_node_leave_cluster(node.name) + msg = result.fail_msg + if msg: + self.LogWarning("Errors encountered on the remote node while leaving" + " the cluster: %s", msg) # Promote nodes to master candidate as needed _AdjustCandidatePool(self) @@ -1739,13 +1958,13 @@ class LUQueryNodes(NoHooksLU): """Logical unit for querying nodes. """ - _OP_REQP = ["output_fields", "names"] + _OP_REQP = ["output_fields", "names", "use_locking"] REQ_BGL = False _FIELDS_DYNAMIC = utils.FieldSet( "dtotal", "dfree", "mtotal", "mnode", "mfree", "bootid", - "ctotal", + "ctotal", "cnodes", "csockets", ) _FIELDS_STATIC = utils.FieldSet( @@ -1756,6 +1975,7 @@ class LUQueryNodes(NoHooksLU): "master_candidate", "master", "offline", + "drained", ) def ExpandNames(self): @@ -1771,7 +1991,8 @@ class LUQueryNodes(NoHooksLU): else: self.wanted = locking.ALL_SET - self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields) + self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields) + self.do_locking = self.do_node_query and self.op.use_locking if self.do_locking: # if we don't request only static fields, we need to lock the nodes self.needed_locks[locking.LEVEL_NODE] = self.wanted @@ -1806,14 +2027,14 @@ class LUQueryNodes(NoHooksLU): # begin data gathering - if self.do_locking: + if self.do_node_query: live_data = {} node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(), self.cfg.GetHypervisorType()) for name in nodenames: nodeinfo = node_data[name] - if not nodeinfo.failed and nodeinfo.data: - nodeinfo = nodeinfo.data + if not nodeinfo.fail_msg and nodeinfo.payload: + nodeinfo = nodeinfo.payload fn = utils.TryConvert live_data[name] = { "mtotal": fn(int, nodeinfo.get('memory_total', None)), @@ -1823,6 +2044,8 @@ class LUQueryNodes(NoHooksLU): "dfree": fn(int, nodeinfo.get('vg_free', None)), "ctotal": fn(int, nodeinfo.get('cpu_total', None)), "bootid": nodeinfo.get('bootid', None), + "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)), + "csockets": fn(int, nodeinfo.get('cpu_sockets', None)), } else: live_data[name] = {} @@ -1877,6 +2100,8 @@ class LUQueryNodes(NoHooksLU): val = node.name == master_node elif field == "offline": val = node.offline + elif field == "drained": + val = node.drained elif self._FIELDS_DYNAMIC.Matches(field): val = live_data[node.name].get(field, None) else: @@ -1931,10 +2156,15 @@ class LUQueryNodeVolumes(NoHooksLU): output = [] for node in nodenames: - if node not in volumes or volumes[node].failed or not volumes[node].data: + nresult = volumes[node] + if nresult.offline: + continue + msg = nresult.fail_msg + if msg: + self.LogWarning("Can't compute volume data on node %s: %s", node, msg) continue - node_vols = volumes[node].data[:] + node_vols = nresult.payload[:] node_vols.sort(key=lambda vol: vol['dev']) for vol in node_vols: @@ -2073,7 +2303,7 @@ class LUAddNode(LogicalUnit): primary_ip=primary_ip, secondary_ip=secondary_ip, master_candidate=master_candidate, - offline=False) + offline=False, drained=False) def Exec(self, feedback_fn): """Adds the new node to the cluster. @@ -2084,17 +2314,14 @@ class LUAddNode(LogicalUnit): # check connectivity result = self.rpc.call_version([node])[node] - result.Raise() - if result.data: - if constants.PROTOCOL_VERSION == result.data: - logging.info("Communication to node %s fine, sw version %s match", - node, result.data) - else: - raise errors.OpExecError("Version mismatch master version %s," - " node version %s" % - (constants.PROTOCOL_VERSION, result.data)) + result.Raise("Can't get version information from node %s" % node) + if constants.PROTOCOL_VERSION == result.payload: + logging.info("Communication to node %s fine, sw version %s match", + node, result.payload) else: - raise errors.OpExecError("Cannot get version from the new node") + raise errors.OpExecError("Version mismatch master version %s," + " node version %s" % + (constants.PROTOCOL_VERSION, result.payload)) # setup ssh on node logging.info("Copy ssh key to node %s", node) @@ -2114,17 +2341,18 @@ class LUAddNode(LogicalUnit): result = self.rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2], keyarray[3], keyarray[4], keyarray[5]) - - if result.failed or not result.data: - raise errors.OpExecError("Cannot transfer ssh keys to the new node") + result.Raise("Cannot transfer ssh keys to the new node") # Add node to our /etc/hosts, and add key to known_hosts - utils.AddHostToEtcHosts(new_node.name) + if self.cfg.GetClusterInfo().modify_etc_hosts: + utils.AddHostToEtcHosts(new_node.name) if new_node.secondary_ip != new_node.primary_ip: result = self.rpc.call_node_has_ip_address(new_node.name, new_node.secondary_ip) - if result.failed or not result.data: + result.Raise("Failure checking secondary ip on node %s" % new_node.name, + prereq=True) + if not result.payload: raise errors.OpExecError("Node claims it doesn't have the secondary ip" " you gave (%s). Please fix and re-run this" " command." % new_node.secondary_ip) @@ -2138,42 +2366,19 @@ class LUAddNode(LogicalUnit): result = self.rpc.call_node_verify(node_verify_list, node_verify_param, self.cfg.GetClusterName()) for verifier in node_verify_list: - if result[verifier].failed or not result[verifier].data: - raise errors.OpExecError("Cannot communicate with %s's node daemon" - " for remote verification" % verifier) - if result[verifier].data['nodelist']: - for failed in result[verifier].data['nodelist']: + result[verifier].Raise("Cannot communicate with node %s" % verifier) + nl_payload = result[verifier].payload['nodelist'] + if nl_payload: + for failed in nl_payload: feedback_fn("ssh/hostname verification failed %s -> %s" % - (verifier, result[verifier]['nodelist'][failed])) + (verifier, nl_payload[failed])) raise errors.OpExecError("ssh/hostname verification failed.") - # Distribute updated /etc/hosts and known_hosts to all nodes, - # including the node just added - myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode()) - dist_nodes = self.cfg.GetNodeList() - if not self.op.readd: - dist_nodes.append(node) - if myself.name in dist_nodes: - dist_nodes.remove(myself.name) - - logging.debug("Copying hosts and known_hosts to all nodes") - for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE): - result = self.rpc.call_upload_file(dist_nodes, fname) - for to_node, to_result in result.iteritems(): - if to_result.failed or not to_result.data: - logging.error("Copy of file %s to node %s failed", fname, to_node) - - to_copy = [] - if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors: - to_copy.append(constants.VNC_PASSWORD_FILE) - for fname in to_copy: - result = self.rpc.call_upload_file([node], fname) - if result[node].failed or not result[node]: - logging.error("Could not copy file %s to node %s", fname, node) - if self.op.readd: + _RedistributeAncillaryFiles(self) self.context.ReaddNode(new_node) else: + _RedistributeAncillaryFiles(self, additional_nodes=[node]) self.context.AddNode(new_node) @@ -2193,11 +2398,13 @@ class LUSetNodeParams(LogicalUnit): self.op.node_name = node_name _CheckBooleanOpField(self.op, 'master_candidate') _CheckBooleanOpField(self.op, 'offline') - if self.op.master_candidate is None and self.op.offline is None: + _CheckBooleanOpField(self.op, 'drained') + all_mods = [self.op.offline, self.op.master_candidate, self.op.drained] + if all_mods.count(None) == 3: raise errors.OpPrereqError("Please pass at least one modification") - if self.op.offline == True and self.op.master_candidate == True: - raise errors.OpPrereqError("Can't set the node into offline and" - " master_candidate at the same time") + if all_mods.count(True) > 1: + raise errors.OpPrereqError("Can't set the node into more than one" + " state at the same time") def ExpandNames(self): self.needed_locks = {locking.LEVEL_NODE: self.op.node_name} @@ -2212,6 +2419,7 @@ class LUSetNodeParams(LogicalUnit): "OP_TARGET": self.op.node_name, "MASTER_CANDIDATE": str(self.op.master_candidate), "OFFLINE": str(self.op.offline), + "DRAINED": str(self.op.drained), } nl = [self.cfg.GetMasterNode(), self.op.node_name] @@ -2225,12 +2433,12 @@ class LUSetNodeParams(LogicalUnit): """ node = self.node = self.cfg.GetNodeInfo(self.op.node_name) - if ((self.op.master_candidate == False or self.op.offline == True) - and node.master_candidate): + if ((self.op.master_candidate == False or self.op.offline == True or + self.op.drained == True) and node.master_candidate): # we will demote the node from master_candidate if self.op.node_name == self.cfg.GetMasterNode(): raise errors.OpPrereqError("The master node has to be a" - " master candidate and online") + " master candidate, online and not drained") cp_size = self.cfg.GetClusterInfo().candidate_pool_size num_candidates, _ = self.cfg.GetMasterCandidateStats() if num_candidates <= cp_size: @@ -2241,10 +2449,11 @@ class LUSetNodeParams(LogicalUnit): else: raise errors.OpPrereqError(msg) - if (self.op.master_candidate == True and node.offline and - not self.op.offline == False): - raise errors.OpPrereqError("Can't set an offline node to" - " master_candidate") + if (self.op.master_candidate == True and + ((node.offline and not self.op.offline == False) or + (node.drained and not self.op.drained == False))): + raise errors.OpPrereqError("Node '%s' is offline or drained, can't set" + " to master_candidate" % node.name) return @@ -2255,34 +2464,94 @@ class LUSetNodeParams(LogicalUnit): node = self.node result = [] + changed_mc = False if self.op.offline is not None: node.offline = self.op.offline result.append(("offline", str(self.op.offline))) - if self.op.offline == True and node.master_candidate: - node.master_candidate = False - result.append(("master_candidate", "auto-demotion due to offline")) + if self.op.offline == True: + if node.master_candidate: + node.master_candidate = False + changed_mc = True + result.append(("master_candidate", "auto-demotion due to offline")) + if node.drained: + node.drained = False + result.append(("drained", "clear drained status due to offline")) if self.op.master_candidate is not None: node.master_candidate = self.op.master_candidate + changed_mc = True result.append(("master_candidate", str(self.op.master_candidate))) if self.op.master_candidate == False: rrc = self.rpc.call_node_demote_from_mc(node.name) - if (rrc.failed or not isinstance(rrc.data, (tuple, list)) - or len(rrc.data) != 2): - self.LogWarning("Node rpc error: %s" % rrc.error) - elif not rrc.data[0]: - self.LogWarning("Node failed to demote itself: %s" % rrc.data[1]) + msg = rrc.fail_msg + if msg: + self.LogWarning("Node failed to demote itself: %s" % msg) + + if self.op.drained is not None: + node.drained = self.op.drained + result.append(("drained", str(self.op.drained))) + if self.op.drained == True: + if node.master_candidate: + node.master_candidate = False + changed_mc = True + result.append(("master_candidate", "auto-demotion due to drain")) + if node.offline: + node.offline = False + result.append(("offline", "clear offline status due to drain")) # this will trigger configuration file update, if needed self.cfg.Update(node) # this will trigger job queue propagation or cleanup - if self.op.node_name != self.cfg.GetMasterNode(): + if changed_mc: self.context.ReaddNode(node) return result +class LUPowercycleNode(NoHooksLU): + """Powercycles a node. + + """ + _OP_REQP = ["node_name", "force"] + REQ_BGL = False + + def CheckArguments(self): + node_name = self.cfg.ExpandNodeName(self.op.node_name) + if node_name is None: + raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name) + self.op.node_name = node_name + if node_name == self.cfg.GetMasterNode() and not self.op.force: + raise errors.OpPrereqError("The node is the master and the force" + " parameter was not set") + + def ExpandNames(self): + """Locking for PowercycleNode. + + This is a last-resource option and shouldn't block on other + jobs. Therefore, we grab no locks. + + """ + self.needed_locks = {} + + def CheckPrereq(self): + """Check prerequisites. + + This LU has no prereqs. + + """ + pass + + def Exec(self, feedback_fn): + """Reboots a node. + + """ + result = self.rpc.call_node_powercycle(self.op.node_name, + self.cfg.GetHypervisorType()) + result.Raise("Failed to schedule the reboot") + return result.payload + + class LUQueryClusterInfo(NoHooksLU): """Query cluster configuration. @@ -2315,9 +2584,14 @@ class LUQueryClusterInfo(NoHooksLU): "master": cluster.master_node, "default_hypervisor": cluster.default_hypervisor, "enabled_hypervisors": cluster.enabled_hypervisors, - "hvparams": cluster.hvparams, + "hvparams": dict([(hvname, cluster.hvparams[hvname]) + for hvname in cluster.enabled_hypervisors]), "beparams": cluster.beparams, + "nicparams": cluster.nicparams, "candidate_pool_size": cluster.candidate_pool_size, + "master_netdev": cluster.master_netdev, + "volume_group_name": cluster.volume_group_name, + "file_storage_dir": cluster.file_storage_dir, } return result @@ -2435,10 +2709,11 @@ def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False): for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node): lu.cfg.SetDiskID(node_disk, node) result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False) - if result.failed or not result: + msg = result.fail_msg + if msg: lu.proc.LogWarning("Could not prepare block device %s on node %s" - " (is_primary=False, pass=1)", - inst_disk.iv_name, node) + " (is_primary=False, pass=1): %s", + inst_disk.iv_name, node, msg) if not ignore_secondaries: disks_ok = False @@ -2451,12 +2726,14 @@ def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False): continue lu.cfg.SetDiskID(node_disk, node) result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True) - if result.failed or not result: + msg = result.fail_msg + if msg: lu.proc.LogWarning("Could not prepare block device %s on node %s" - " (is_primary=True, pass=2)", - inst_disk.iv_name, node) + " (is_primary=True, pass=2): %s", + inst_disk.iv_name, node, msg) disks_ok = False - device_info.append((instance.primary_node, inst_disk.iv_name, result.data)) + device_info.append((instance.primary_node, inst_disk.iv_name, + result.payload)) # leave the disks configured for the primary node # this is a workaround that would be fixed better by @@ -2523,14 +2800,11 @@ def _SafeShutdownInstanceDisks(lu, instance): _ShutdownInstanceDisks. """ - ins_l = lu.rpc.call_instance_list([instance.primary_node], - [instance.hypervisor]) - ins_l = ins_l[instance.primary_node] - if ins_l.failed or not isinstance(ins_l.data, list): - raise errors.OpExecError("Can't contact node '%s'" % - instance.primary_node) - - if instance.name in ins_l.data: + pnode = instance.primary_node + ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode] + ins_l.Raise("Can't contact node %s" % pnode) + + if instance.name in ins_l.payload: raise errors.OpExecError("Instance is running, can't shutdown" " block devices.") @@ -2546,17 +2820,18 @@ def _ShutdownInstanceDisks(lu, instance, ignore_primary=False): ignored. """ - result = True + all_result = True for disk in instance.disks: for node, top_disk in disk.ComputeNodeTree(instance.primary_node): lu.cfg.SetDiskID(top_disk, node) result = lu.rpc.call_blockdev_shutdown(node, top_disk) - if result.failed or not result.data: - logging.error("Could not shutdown block device %s on node %s", - disk.iv_name, node) + msg = result.fail_msg + if msg: + lu.LogWarning("Could not shutdown block device %s on node %s: %s", + disk.iv_name, node, msg) if not ignore_primary or node != instance.primary_node: - result = False - return result + all_result = False + return all_result def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name): @@ -2582,15 +2857,15 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name): """ nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name) - nodeinfo[node].Raise() - free_mem = nodeinfo[node].data.get('memory_free') + nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True) + free_mem = nodeinfo[node].payload.get('memory_free', None) if not isinstance(free_mem, int): raise errors.OpPrereqError("Can't compute free memory on node %s, result" - " was '%s'" % (node, free_mem)) + " was '%s'" % (node, free_mem)) if requested > free_mem: raise errors.OpPrereqError("Not enough memory on node %s for %s:" - " needed %s MiB, available %s MiB" % - (node, reason, requested, free_mem)) + " needed %s MiB, available %s MiB" % + (node, reason, requested, free_mem)) class LUStartupInstance(LogicalUnit): @@ -2628,15 +2903,49 @@ class LUStartupInstance(LogicalUnit): assert self.instance is not None, \ "Cannot retrieve locked instance %s" % self.op.instance_name + # extra beparams + self.beparams = getattr(self.op, "beparams", {}) + if self.beparams: + if not isinstance(self.beparams, dict): + raise errors.OpPrereqError("Invalid beparams passed: %s, expected" + " dict" % (type(self.beparams), )) + # fill the beparams dict + utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES) + self.op.beparams = self.beparams + + # extra hvparams + self.hvparams = getattr(self.op, "hvparams", {}) + if self.hvparams: + if not isinstance(self.hvparams, dict): + raise errors.OpPrereqError("Invalid hvparams passed: %s, expected" + " dict" % (type(self.hvparams), )) + + # check hypervisor parameter syntax (locally) + cluster = self.cfg.GetClusterInfo() + utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES) + filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor], + instance.hvparams) + filled_hvp.update(self.hvparams) + hv_type = hypervisor.GetHypervisor(instance.hypervisor) + hv_type.CheckParameterSyntax(filled_hvp) + _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp) + self.op.hvparams = self.hvparams + _CheckNodeOnline(self, instance.primary_node) bep = self.cfg.GetClusterInfo().FillBE(instance) # check bridges existance _CheckInstanceBridgesExist(self, instance) - _CheckNodeFreeMemory(self, instance.primary_node, - "starting instance %s" % instance.name, - bep[constants.BE_MEMORY], instance.hypervisor) + remote_info = self.rpc.call_instance_info(instance.primary_node, + instance.name, + instance.hypervisor) + remote_info.Raise("Error checking node %s" % instance.primary_node, + prereq=True) + if not remote_info.payload: # not running already + _CheckNodeFreeMemory(self, instance.primary_node, + "starting instance %s" % instance.name, + bep[constants.BE_MEMORY], instance.hypervisor) def Exec(self, feedback_fn): """Start the instance. @@ -2644,7 +2953,6 @@ class LUStartupInstance(LogicalUnit): """ instance = self.instance force = self.op.force - extra_args = getattr(self.op, "extra_args", "") self.cfg.MarkInstanceUp(instance.name) @@ -2652,8 +2960,9 @@ class LUStartupInstance(LogicalUnit): _StartInstanceDisks(self, instance, force) - result = self.rpc.call_instance_start(node_current, instance, extra_args) - msg = result.RemoteFailMsg() + result = self.rpc.call_instance_start(node_current, instance, + self.hvparams, self.beparams) + msg = result.fail_msg if msg: _ShutdownInstanceDisks(self, instance) raise errors.OpExecError("Could not start instance: %s" % msg) @@ -2686,6 +2995,7 @@ class LURebootInstance(LogicalUnit): """ env = { "IGNORE_SECONDARIES": self.op.ignore_secondaries, + "REBOOT_TYPE": self.op.reboot_type, } env.update(_BuildInstanceHookEnvByObject(self, self.instance)) nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) @@ -2713,23 +3023,23 @@ class LURebootInstance(LogicalUnit): instance = self.instance ignore_secondaries = self.op.ignore_secondaries reboot_type = self.op.reboot_type - extra_args = getattr(self.op, "extra_args", "") node_current = instance.primary_node if reboot_type in [constants.INSTANCE_REBOOT_SOFT, constants.INSTANCE_REBOOT_HARD]: + for disk in instance.disks: + self.cfg.SetDiskID(disk, node_current) result = self.rpc.call_instance_reboot(node_current, instance, - reboot_type, extra_args) - if result.failed or not result.data: - raise errors.OpExecError("Could not reboot instance") + reboot_type) + result.Raise("Could not reboot instance") else: - if not self.rpc.call_instance_shutdown(node_current, instance): - raise errors.OpExecError("could not shutdown instance for full reboot") + result = self.rpc.call_instance_shutdown(node_current, instance) + result.Raise("Could not shutdown instance for full reboot") _ShutdownInstanceDisks(self, instance) _StartInstanceDisks(self, instance, ignore_secondaries) - result = self.rpc.call_instance_start(node_current, instance, extra_args) - msg = result.RemoteFailMsg() + result = self.rpc.call_instance_start(node_current, instance, None, None) + msg = result.fail_msg if msg: _ShutdownInstanceDisks(self, instance) raise errors.OpExecError("Could not start instance for" @@ -2779,8 +3089,9 @@ class LUShutdownInstance(LogicalUnit): node_current = instance.primary_node self.cfg.MarkInstanceDown(instance.name) result = self.rpc.call_instance_shutdown(node_current, instance) - if result.failed or not result.data: - self.proc.LogWarning("Could not shutdown instance") + msg = result.fail_msg + if msg: + self.proc.LogWarning("Could not shutdown instance: %s" % msg) _ShutdownInstanceDisks(self, instance) @@ -2821,13 +3132,15 @@ class LUReinstallInstance(LogicalUnit): if instance.disk_template == constants.DT_DISKLESS: raise errors.OpPrereqError("Instance '%s' has no disks" % self.op.instance_name) - if instance.status != "down": + if instance.admin_up: raise errors.OpPrereqError("Instance '%s' is marked to be up" % self.op.instance_name) remote_info = self.rpc.call_instance_info(instance.primary_node, instance.name, instance.hypervisor) - if remote_info.failed or remote_info.data: + remote_info.Raise("Error checking node %s" % instance.primary_node, + prereq=True) + if remote_info.payload: raise errors.OpPrereqError("Instance '%s' is running on the node %s" % (self.op.instance_name, instance.primary_node)) @@ -2841,10 +3154,8 @@ class LUReinstallInstance(LogicalUnit): raise errors.OpPrereqError("Primary node '%s' is unknown" % self.op.pnode) result = self.rpc.call_os_get(pnode.name, self.op.os_type) - result.Raise() - if not isinstance(result.data, objects.OS): - raise errors.OpPrereqError("OS '%s' not in supported OS list for" - " primary node" % self.op.os_type) + result.Raise("OS '%s' not in supported OS list for primary node %s" % + (self.op.os_type, pnode.name), prereq=True) self.instance = instance @@ -2862,12 +3173,9 @@ class LUReinstallInstance(LogicalUnit): _StartInstanceDisks(self, inst, None) try: feedback_fn("Running the instance OS create scripts...") - result = self.rpc.call_instance_os_add(inst.primary_node, inst) - msg = result.RemoteFailMsg() - if msg: - raise errors.OpExecError("Could not install OS for instance %s" - " on node %s: %s" % - (inst.name, inst.primary_node, msg)) + result = self.rpc.call_instance_os_add(inst.primary_node, inst, True) + result.Raise("Could not install OS for instance %s on node %s" % + (inst.name, inst.primary_node)) finally: _ShutdownInstanceDisks(self, inst) @@ -2904,14 +3212,15 @@ class LURenameInstance(LogicalUnit): self.op.instance_name) _CheckNodeOnline(self, instance.primary_node) - if instance.status != "down": + if instance.admin_up: raise errors.OpPrereqError("Instance '%s' is marked to be up" % self.op.instance_name) remote_info = self.rpc.call_instance_info(instance.primary_node, instance.name, instance.hypervisor) - remote_info.Raise() - if remote_info.data: + remote_info.Raise("Error checking node %s" % instance.primary_node, + prereq=True) + if remote_info.payload: raise errors.OpPrereqError("Instance '%s' is running on the node %s" % (self.op.instance_name, instance.primary_node)) @@ -2955,28 +3264,20 @@ class LURenameInstance(LogicalUnit): result = self.rpc.call_file_storage_dir_rename(inst.primary_node, old_file_storage_dir, new_file_storage_dir) - result.Raise() - if not result.data: - raise errors.OpExecError("Could not connect to node '%s' to rename" - " directory '%s' to '%s' (but the instance" - " has been renamed in Ganeti)" % ( - inst.primary_node, old_file_storage_dir, - new_file_storage_dir)) - - if not result.data[0]: - raise errors.OpExecError("Could not rename directory '%s' to '%s'" - " (but the instance has been renamed in" - " Ganeti)" % (old_file_storage_dir, - new_file_storage_dir)) + result.Raise("Could not rename on node %s directory '%s' to '%s'" + " (but the instance has been renamed in Ganeti)" % + (inst.primary_node, old_file_storage_dir, + new_file_storage_dir)) _StartInstanceDisks(self, inst, None) try: result = self.rpc.call_instance_run_rename(inst.primary_node, inst, old_name) - if result.failed or not result.data: + msg = result.fail_msg + if msg: msg = ("Could not run OS rename script for instance %s on node %s" - " (but the instance has been renamed in Ganeti)" % - (inst.name, inst.primary_node)) + " (but the instance has been renamed in Ganeti): %s" % + (inst.name, inst.primary_node, msg)) self.proc.LogWarning(msg) finally: _ShutdownInstanceDisks(self, inst) @@ -3029,12 +3330,14 @@ class LURemoveInstance(LogicalUnit): instance.name, instance.primary_node) result = self.rpc.call_instance_shutdown(instance.primary_node, instance) - if result.failed or not result.data: + msg = result.fail_msg + if msg: if self.op.ignore_failures: - feedback_fn("Warning: can't shutdown instance") + feedback_fn("Warning: can't shutdown instance: %s" % msg) else: - raise errors.OpExecError("Could not shutdown instance %s on node %s" % - (instance.name, instance.primary_node)) + raise errors.OpExecError("Could not shutdown instance %s on" + " node %s: %s" % + (instance.name, instance.primary_node, msg)) logging.info("Removing block devices for instance %s", instance.name) @@ -3054,18 +3357,20 @@ class LUQueryInstances(NoHooksLU): """Logical unit for querying instances. """ - _OP_REQP = ["output_fields", "names"] + _OP_REQP = ["output_fields", "names", "use_locking"] REQ_BGL = False _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes", - "admin_state", "admin_ram", + "admin_state", "disk_template", "ip", "mac", "bridge", + "nic_mode", "nic_link", "sda_size", "sdb_size", "vcpus", "tags", "network_port", "beparams", - "(disk).(size)/([0-9]+)", - "(disk).(sizes)", - "(nic).(mac|ip|bridge)/([0-9]+)", - "(nic).(macs|ips|bridges)", - "(disk|nic).(count)", + r"(disk)\.(size)/([0-9]+)", + r"(disk)\.(sizes)", "disk_usage", + r"(nic)\.(mac|ip|mode|link)/([0-9]+)", + r"(nic)\.(bridge)/([0-9]+)", + r"(nic)\.(macs|ips|modes|links|bridges)", + r"(disk|nic)\.(count)", "serial_no", "hypervisor", "hvparams",] + ["hv/%s" % name for name in constants.HVS_PARAMETERS] + @@ -3088,7 +3393,8 @@ class LUQueryInstances(NoHooksLU): else: self.wanted = locking.ALL_SET - self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields) + self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields) + self.do_locking = self.do_node_query and self.op.use_locking if self.do_locking: self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted self.needed_locks[locking.LEVEL_NODE] = [] @@ -3109,19 +3415,25 @@ class LUQueryInstances(NoHooksLU): """ all_info = self.cfg.GetAllInstancesInfo() - if self.do_locking: - instance_names = self.acquired_locks[locking.LEVEL_INSTANCE] - elif self.wanted != locking.ALL_SET: - instance_names = self.wanted - missing = set(instance_names).difference(all_info.keys()) - if missing: - raise errors.OpExecError( - "Some instances were removed before retrieving their data: %s" - % missing) + if self.wanted == locking.ALL_SET: + # caller didn't specify instance names, so ordering is not important + if self.do_locking: + instance_names = self.acquired_locks[locking.LEVEL_INSTANCE] + else: + instance_names = all_info.keys() + instance_names = utils.NiceSort(instance_names) else: - instance_names = all_info.keys() + # caller did specify names, so we must keep the ordering + if self.do_locking: + tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE] + else: + tgt_set = all_info.keys() + missing = set(self.wanted).difference(tgt_set) + if missing: + raise errors.OpExecError("Some instances were removed before" + " retrieving their data: %s" % missing) + instance_names = self.wanted - instance_names = utils.NiceSort(instance_names) instance_list = [all_info[iname] for iname in instance_names] # begin data gathering @@ -3131,7 +3443,7 @@ class LUQueryInstances(NoHooksLU): bad_nodes = [] off_nodes = [] - if self.do_locking: + if self.do_node_query: live_data = {} node_data = self.rpc.call_all_instances_info(nodes, hv_list) for name in nodes: @@ -3139,12 +3451,12 @@ class LUQueryInstances(NoHooksLU): if result.offline: # offline nodes will be in both lists off_nodes.append(name) - if result.failed: + if result.failed or result.fail_msg: bad_nodes.append(name) else: - if result.data: - live_data.update(result.data) - # else no instance is alive + if result.payload: + live_data.update(result.payload) + # else no instance is alive else: live_data = dict([(name, {}) for name in instance_names]) @@ -3153,10 +3465,13 @@ class LUQueryInstances(NoHooksLU): HVPREFIX = "hv/" BEPREFIX = "be/" output = [] + cluster = self.cfg.GetClusterInfo() for instance in instance_list: iout = [] - i_hv = self.cfg.GetClusterInfo().FillHV(instance) - i_be = self.cfg.GetClusterInfo().FillBE(instance) + i_hv = cluster.FillHV(instance) + i_be = cluster.FillBE(instance) + i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT], + nic.nicparams) for nic in instance.nics] for field in self.op.output_fields: st_match = self._FIELDS_STATIC.Matches(field) if field == "name": @@ -3168,7 +3483,7 @@ class LUQueryInstances(NoHooksLU): elif field == "snodes": val = list(instance.secondary_nodes) elif field == "admin_state": - val = (instance.status != "down") + val = instance.admin_up elif field == "oper_state": if instance.primary_node in bad_nodes: val = None @@ -3182,12 +3497,12 @@ class LUQueryInstances(NoHooksLU): else: running = bool(live_data.get(instance.name)) if running: - if instance.status != "down": + if instance.admin_up: val = "running" else: val = "ERROR_up" else: - if instance.status != "down": + if instance.admin_up: val = "ERROR_down" else: val = "ADMIN_down" @@ -3201,17 +3516,40 @@ class LUQueryInstances(NoHooksLU): elif field == "disk_template": val = instance.disk_template elif field == "ip": - val = instance.nics[0].ip + if instance.nics: + val = instance.nics[0].ip + else: + val = None + elif field == "nic_mode": + if instance.nics: + val = i_nicp[0][constants.NIC_MODE] + else: + val = None + elif field == "nic_link": + if instance.nics: + val = i_nicp[0][constants.NIC_LINK] + else: + val = None elif field == "bridge": - val = instance.nics[0].bridge + if (instance.nics and + i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED): + val = i_nicp[0][constants.NIC_LINK] + else: + val = None elif field == "mac": - val = instance.nics[0].mac + if instance.nics: + val = instance.nics[0].mac + else: + val = None elif field == "sda_size" or field == "sdb_size": idx = ord(field[2]) - ord('a') try: val = instance.FindDisk(idx).size except errors.OpPrereqError: val = None + elif field == "disk_usage": # total disk usage per node + disk_sizes = [{'size': disk.size} for disk in instance.disks] + val = _ComputeDiskSize(instance.disk_template, disk_sizes) elif field == "tags": val = list(instance.GetTags()) elif field == "serial_no": @@ -3252,8 +3590,17 @@ class LUQueryInstances(NoHooksLU): val = [nic.mac for nic in instance.nics] elif st_groups[1] == "ips": val = [nic.ip for nic in instance.nics] + elif st_groups[1] == "modes": + val = [nicp[constants.NIC_MODE] for nicp in i_nicp] + elif st_groups[1] == "links": + val = [nicp[constants.NIC_LINK] for nicp in i_nicp] elif st_groups[1] == "bridges": - val = [nic.bridge for nic in instance.nics] + val = [] + for nicp in i_nicp: + if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED: + val.append(nicp[constants.NIC_LINK]) + else: + val.append(None) else: # index-based item nic_idx = int(st_groups[2]) @@ -3264,8 +3611,16 @@ class LUQueryInstances(NoHooksLU): val = instance.nics[nic_idx].mac elif st_groups[1] == "ip": val = instance.nics[nic_idx].ip + elif st_groups[1] == "mode": + val = i_nicp[nic_idx][constants.NIC_MODE] + elif st_groups[1] == "link": + val = i_nicp[nic_idx][constants.NIC_LINK] elif st_groups[1] == "bridge": - val = instance.nics[nic_idx].bridge + nic_mode = i_nicp[nic_idx][constants.NIC_MODE] + if nic_mode == constants.NIC_MODE_BRIDGED: + val = i_nicp[nic_idx][constants.NIC_LINK] + else: + val = None else: assert False, "Unhandled NIC parameter" else: @@ -3331,19 +3686,18 @@ class LUFailoverInstance(LogicalUnit): target_node = secondary_nodes[0] _CheckNodeOnline(self, target_node) - # check memory requirements on the secondary node - _CheckNodeFreeMemory(self, target_node, "failing over instance %s" % - instance.name, bep[constants.BE_MEMORY], - instance.hypervisor) + _CheckNodeNotDrained(self, target_node) + if instance.admin_up: + # check memory requirements on the secondary node + _CheckNodeFreeMemory(self, target_node, "failing over instance %s" % + instance.name, bep[constants.BE_MEMORY], + instance.hypervisor) + else: + self.LogInfo("Not checking memory on the secondary node as" + " instance will not be started") # check bridge existance - brlist = [nic.bridge for nic in instance.nics] - result = self.rpc.call_bridges_exist(target_node, brlist) - result.Raise() - if not result.data: - raise errors.OpPrereqError("One or more target bridges %s does not" - " exist on destination node '%s'" % - (brlist, target_node)) + _CheckInstanceBridgesExist(self, instance, node=target_node) def Exec(self, feedback_fn): """Failover an instance. @@ -3361,7 +3715,7 @@ class LUFailoverInstance(LogicalUnit): for dev in instance.disks: # for drbd, these are drbd over lvm if not _CheckDiskConsistency(self, dev, target_node, False): - if instance.status == "up" and not self.op.ignore_consistency: + if instance.admin_up and not self.op.ignore_consistency: raise errors.OpExecError("Disk %s is degraded on target node," " aborting failover." % dev.iv_name) @@ -3370,15 +3724,17 @@ class LUFailoverInstance(LogicalUnit): instance.name, source_node) result = self.rpc.call_instance_shutdown(source_node, instance) - if result.failed or not result.data: + msg = result.fail_msg + if msg: if self.op.ignore_consistency: self.proc.LogWarning("Could not shutdown instance %s on node %s." - " Proceeding" - " anyway. Please make sure node %s is down", - instance.name, source_node, source_node) + " Proceeding anyway. Please make sure node" + " %s is down. Error details: %s", + instance.name, source_node, source_node, msg) else: - raise errors.OpExecError("Could not shutdown instance %s on node %s" % - (instance.name, source_node)) + raise errors.OpExecError("Could not shutdown instance %s on" + " node %s: %s" % + (instance.name, source_node, msg)) feedback_fn("* deactivating the instance's disks on source node") if not _ShutdownInstanceDisks(self, instance, ignore_primary=True): @@ -3389,7 +3745,7 @@ class LUFailoverInstance(LogicalUnit): self.cfg.Update(instance) # Only start the instance if it's marked as up - if instance.status == "up": + if instance.admin_up: feedback_fn("* activating the instance's disks on target node") logging.info("Starting instance %s on node %s", instance.name, target_node) @@ -3401,8 +3757,8 @@ class LUFailoverInstance(LogicalUnit): raise errors.OpExecError("Can't activate the instance's disks") feedback_fn("* starting the instance on the target node") - result = self.rpc.call_instance_start(target_node, instance, None) - msg = result.RemoteFailMsg() + result = self.rpc.call_instance_start(target_node, instance, None, None) + msg = result.fail_msg if msg: _ShutdownInstanceDisks(self, instance) raise errors.OpExecError("Could not start instance %s on node %s: %s" % @@ -3438,6 +3794,8 @@ class LUMigrateInstance(LogicalUnit): """ env = _BuildInstanceHookEnvByObject(self, self.instance) + env["MIGRATE_LIVE"] = self.op.live + env["MIGRATE_CLEANUP"] = self.op.cleanup nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes) return env, nl, nl @@ -3459,8 +3817,8 @@ class LUMigrateInstance(LogicalUnit): secondary_nodes = instance.secondary_nodes if not secondary_nodes: - raise errors.ProgrammerError("no secondary node but using " - "drbd8 disk template") + raise errors.ConfigurationError("No secondary node but using" + " drbd8 disk template") i_be = self.cfg.GetClusterInfo().FillBE(instance) @@ -3471,20 +3829,13 @@ class LUMigrateInstance(LogicalUnit): instance.hypervisor) # check bridge existance - brlist = [nic.bridge for nic in instance.nics] - result = self.rpc.call_bridges_exist(target_node, brlist) - if result.failed or not result.data: - raise errors.OpPrereqError("One or more target bridges %s does not" - " exist on destination node '%s'" % - (brlist, target_node)) + _CheckInstanceBridgesExist(self, instance, node=target_node) if not self.op.cleanup: + _CheckNodeNotDrained(self, target_node) result = self.rpc.call_instance_migratable(instance.primary_node, instance) - msg = result.RemoteFailMsg() - if msg: - raise errors.OpPrereqError("Can't migrate: %s - please use failover" % - msg) + result.Raise("Can't migrate, please use failover", prereq=True) self.instance = instance @@ -3503,11 +3854,8 @@ class LUMigrateInstance(LogicalUnit): self.instance.disks) min_percent = 100 for node, nres in result.items(): - msg = nres.RemoteFailMsg() - if msg: - raise errors.OpExecError("Cannot resync disks on node %s: %s" % - (node, msg)) - node_done, node_percent = nres.data[1] + nres.Raise("Cannot resync disks on node %s" % node) + node_done, node_percent = nres.payload all_done = all_done and node_done if node_percent is not None: min_percent = min(min_percent, node_percent) @@ -3527,10 +3875,7 @@ class LUMigrateInstance(LogicalUnit): result = self.rpc.call_blockdev_close(node, self.instance.name, self.instance.disks) - msg = result.RemoteFailMsg() - if msg: - raise errors.OpExecError("Cannot change disk to secondary on node %s," - " error %s" % (node, msg)) + result.Raise("Cannot change disk to secondary on node %s" % node) def _GoStandalone(self): """Disconnect from the network. @@ -3540,10 +3885,7 @@ class LUMigrateInstance(LogicalUnit): result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip, self.instance.disks) for node, nres in result.items(): - msg = nres.RemoteFailMsg() - if msg: - raise errors.OpExecError("Cannot disconnect disks node %s," - " error %s" % (node, msg)) + nres.Raise("Cannot disconnect disks node %s" % node) def _GoReconnect(self, multimaster): """Reconnect to the network. @@ -3558,10 +3900,7 @@ class LUMigrateInstance(LogicalUnit): self.instance.disks, self.instance.name, multimaster) for node, nres in result.items(): - msg = nres.RemoteFailMsg() - if msg: - raise errors.OpExecError("Cannot change disks config on node %s," - " error: %s" % (node, msg)) + nres.Raise("Cannot change disks config on node %s" % node) def _ExecCleanup(self): """Try to cleanup after a failed migration. @@ -3586,12 +3925,10 @@ class LUMigrateInstance(LogicalUnit): " a bad state)") ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor]) for node, result in ins_l.items(): - result.Raise() - if not isinstance(result.data, list): - raise errors.OpExecError("Can't contact node '%s'" % node) + result.Raise("Can't contact node %s" % node) - runningon_source = instance.name in ins_l[source_node].data - runningon_target = instance.name in ins_l[target_node].data + runningon_source = instance.name in ins_l[source_node].payload + runningon_target = instance.name in ins_l[target_node].payload if runningon_source and runningon_target: raise errors.OpExecError("Instance seems to be running on two nodes," @@ -3630,6 +3967,41 @@ class LUMigrateInstance(LogicalUnit): self.feedback_fn("* done") + def _RevertDiskStatus(self): + """Try to revert the disk status after a failed migration. + + """ + target_node = self.target_node + try: + self._EnsureSecondary(target_node) + self._GoStandalone() + self._GoReconnect(False) + self._WaitUntilSync() + except errors.OpExecError, err: + self.LogWarning("Migration failed and I can't reconnect the" + " drives: error '%s'\n" + "Please look and recover the instance status" % + str(err)) + + def _AbortMigration(self): + """Call the hypervisor code to abort a started migration. + + """ + instance = self.instance + target_node = self.target_node + migration_info = self.migration_info + + abort_result = self.rpc.call_finalize_migration(target_node, + instance, + migration_info, + False) + abort_msg = abort_result.fail_msg + if abort_msg: + logging.error("Aborting migration failed on target node %s: %s" % + (target_node, abort_msg)) + # Don't raise an exception here, as we stil have to try to revert the + # disk status, even if this step failed. + def _ExecMigration(self): """Migrate an instance. @@ -3653,31 +4025,49 @@ class LUMigrateInstance(LogicalUnit): " synchronized on target node," " aborting migrate." % dev.iv_name) + # First get the migration information from the remote node + result = self.rpc.call_migration_info(source_node, instance) + msg = result.fail_msg + if msg: + log_err = ("Failed fetching source migration information from %s: %s" % + (source_node, msg)) + logging.error(log_err) + raise errors.OpExecError(log_err) + + self.migration_info = migration_info = result.payload + + # Then switch the disks to master/master mode self._EnsureSecondary(target_node) self._GoStandalone() self._GoReconnect(True) self._WaitUntilSync() + self.feedback_fn("* preparing %s to accept the instance" % target_node) + result = self.rpc.call_accept_instance(target_node, + instance, + migration_info, + self.nodes_ip[target_node]) + + msg = result.fail_msg + if msg: + logging.error("Instance pre-migration failed, trying to revert" + " disk status: %s", msg) + self._AbortMigration() + self._RevertDiskStatus() + raise errors.OpExecError("Could not pre-migrate instance %s: %s" % + (instance.name, msg)) + self.feedback_fn("* migrating instance to %s" % target_node) time.sleep(10) result = self.rpc.call_instance_migrate(source_node, instance, self.nodes_ip[target_node], self.op.live) - msg = result.RemoteFailMsg() + msg = result.fail_msg if msg: logging.error("Instance migration failed, trying to revert" " disk status: %s", msg) - try: - self._EnsureSecondary(target_node) - self._GoStandalone() - self._GoReconnect(False) - self._WaitUntilSync() - except errors.OpExecError, err: - self.LogWarning("Migration failed and I can't reconnect the" - " drives: error '%s'\n" - "Please look and recover the instance status" % - str(err)) - + self._AbortMigration() + self._RevertDiskStatus() raise errors.OpExecError("Could not migrate instance %s: %s" % (instance.name, msg)) time.sleep(10) @@ -3686,6 +4076,17 @@ class LUMigrateInstance(LogicalUnit): # distribute new instance config to the other nodes self.cfg.Update(instance) + result = self.rpc.call_finalize_migration(target_node, + instance, + migration_info, + True) + msg = result.fail_msg + if msg: + logging.error("Instance migration succeeded, but finalization failed:" + " %s" % msg) + raise errors.OpExecError("Could not finalize instance migration: %s" % + msg) + self._EnsureSecondary(source_node) self._WaitUntilSync() self._GoStandalone() @@ -3736,7 +4137,7 @@ def _CreateBlockDev(lu, node, instance, device, force_create, (this will be represented as a LVM tag) @type force_open: boolean @param force_open: this parameter will be passes to the - L{backend.CreateBlockDevice} function where it specifies + L{backend.BlockdevCreate} function where it specifies whether we run on primary or not, and it affects both the child assembly and the device own Open() execution @@ -3771,7 +4172,7 @@ def _CreateSingleBlockDev(lu, node, instance, device, info, force_open): (this will be represented as a LVM tag) @type force_open: boolean @param force_open: this parameter will be passes to the - L{backend.CreateBlockDevice} function where it specifies + L{backend.BlockdevCreate} function where it specifies whether we run on primary or not, and it affects both the child assembly and the device own Open() execution @@ -3779,13 +4180,10 @@ def _CreateSingleBlockDev(lu, node, instance, device, info, force_open): lu.cfg.SetDiskID(device, node) result = lu.rpc.call_blockdev_create(node, device, device.size, instance.name, force_open, info) - msg = result.RemoteFailMsg() - if msg: - raise errors.OpExecError("Can't create block device %s on" - " node %s for instance %s: %s" % - (device, node, instance.name, msg)) + result.Raise("Can't create block device %s on" + " node %s for instance %s" % (device, node, instance.name)) if device.physical_id is None: - device.physical_id = result.data[1] + device.physical_id = result.payload def _GenerateUniqueNames(lu, exts): @@ -3847,7 +4245,8 @@ def _GenerateDiskTemplate(lu, template_name, disk_index = idx + base_index disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"], logical_id=(vgname, names[idx]), - iv_name="disk/%d" % disk_index) + iv_name="disk/%d" % disk_index, + mode=disk["mode"]) disks.append(disk_dev) elif template_name == constants.DT_DRBD8: if len(secondary_nodes) != 1: @@ -3867,6 +4266,7 @@ def _GenerateDiskTemplate(lu, template_name, disk["size"], names[idx*2:idx*2+2], "disk/%d" % disk_index, minors[idx*2], minors[idx*2+1]) + disk_dev.mode = disk["mode"] disks.append(disk_dev) elif template_name == constants.DT_FILE: if len(secondary_nodes) != 0: @@ -3878,7 +4278,8 @@ def _GenerateDiskTemplate(lu, template_name, iv_name="disk/%d" % disk_index, logical_id=(file_driver, "%s/disk%d" % (file_storage_dir, - idx))) + disk_index)), + mode=disk["mode"]) disks.append(disk_dev) else: raise errors.ProgrammerError("Invalid disk template '%s'" % template_name) @@ -3912,12 +4313,8 @@ def _CreateDisks(lu, instance): file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1]) result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir) - if result.failed or not result.data: - raise errors.OpExecError("Could not connect to node '%s'" % pnode) - - if not result.data[0]: - raise errors.OpExecError("Failed to create directory '%s'" % - file_storage_dir) + result.Raise("Failed to create directory '%s' on" + " node %s: %s" % (file_storage_dir, pnode)) # Note: this needs to be kept in sync with adding of disks in # LUSetInstanceParams @@ -3948,25 +4345,27 @@ def _RemoveDisks(lu, instance): """ logging.info("Removing block devices for instance %s", instance.name) - result = True + all_result = True for device in instance.disks: for node, disk in device.ComputeNodeTree(instance.primary_node): lu.cfg.SetDiskID(disk, node) - result = lu.rpc.call_blockdev_remove(node, disk) - if result.failed or not result.data: - lu.proc.LogWarning("Could not remove block device %s on node %s," - " continuing anyway", device.iv_name, node) - result = False + msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg + if msg: + lu.LogWarning("Could not remove block device %s on node %s," + " continuing anyway: %s", device.iv_name, node, msg) + all_result = False if instance.disk_template == constants.DT_FILE: file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1]) result = lu.rpc.call_file_storage_dir_remove(instance.primary_node, file_storage_dir) - if result.failed or not result.data: - logging.error("Could not remove directory '%s'", file_storage_dir) - result = False + msg = result.fail_msg + if msg: + lu.LogWarning("Could not remove directory '%s' on node %s: %s", + file_storage_dir, instance.primary_node, msg) + all_result = False - return result + return all_result def _ComputeDiskSize(disk_template, disks): @@ -4011,13 +4410,9 @@ def _CheckHVParams(lu, nodenames, hvname, hvparams): hvparams) for node in nodenames: info = hvinfo[node] - info.Raise() - if not info.data or not isinstance(info.data, (tuple, list)): - raise errors.OpPrereqError("Cannot get current information" - " from node '%s' (%s)" % (node, info.data)) - if not info.data[0]: - raise errors.OpPrereqError("Hypervisor parameter validation failed:" - " %s" % info.data[1]) + if info.offline: + continue + info.Raise("Hypervisor parameter validation failed on node %s" % node) class LUCreateInstance(LogicalUnit): @@ -4077,15 +4472,16 @@ class LUCreateInstance(LogicalUnit): ",".join(enabled_hvs))) # check hypervisor parameter syntax (locally) - - filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor], + utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES) + filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor], self.op.hvparams) hv_type = hypervisor.GetHypervisor(self.op.hypervisor) hv_type.CheckParameterSyntax(filled_hvp) + self.hv_full = filled_hvp # fill and remember the beparams dict - utils.CheckBEParams(self.op.beparams) - self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT], + utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES) + self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT], self.op.beparams) #### instance parameters check @@ -4104,10 +4500,21 @@ class LUCreateInstance(LogicalUnit): # NIC buildup self.nics = [] - for nic in self.op.nics: + for idx, nic in enumerate(self.op.nics): + nic_mode_req = nic.get("mode", None) + nic_mode = nic_mode_req + if nic_mode is None: + nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE] + + # in routed mode, for the first nic, the default ip is 'auto' + if nic_mode == constants.NIC_MODE_ROUTED and idx == 0: + default_ip_mode = constants.VALUE_AUTO + else: + default_ip_mode = constants.VALUE_NONE + # ip validity checks - ip = nic.get("ip", None) - if ip is None or ip.lower() == "none": + ip = nic.get("ip", default_ip_mode) + if ip is None or ip.lower() == constants.VALUE_NONE: nic_ip = None elif ip.lower() == constants.VALUE_AUTO: nic_ip = hostname1.ip @@ -4117,6 +4524,10 @@ class LUCreateInstance(LogicalUnit): " like a valid IP" % ip) nic_ip = ip + # TODO: check the ip for uniqueness !! + if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip: + raise errors.OpPrereqError("Routed nic mode requires an ip address") + # MAC address verification mac = nic.get("mac", constants.VALUE_AUTO) if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE): @@ -4124,8 +4535,26 @@ class LUCreateInstance(LogicalUnit): raise errors.OpPrereqError("Invalid MAC address specified: %s" % mac) # bridge verification - bridge = nic.get("bridge", self.cfg.GetDefBridge()) - self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge)) + bridge = nic.get("bridge", None) + link = nic.get("link", None) + if bridge and link: + raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'" + " at the same time") + elif bridge and nic_mode == constants.NIC_MODE_ROUTED: + raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic") + elif bridge: + link = bridge + + nicparams = {} + if nic_mode_req: + nicparams[constants.NIC_MODE] = nic_mode_req + if link: + nicparams[constants.NIC_LINK] = link + + check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT], + nicparams) + objects.NIC.CheckParameterSyntax(check_params) + self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams)) # disk checks/pre-build self.disks = [] @@ -4239,23 +4668,27 @@ class LUCreateInstance(LogicalUnit): """ env = { - "INSTANCE_DISK_TEMPLATE": self.op.disk_template, - "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks), - "INSTANCE_ADD_MODE": self.op.mode, + "ADD_MODE": self.op.mode, } if self.op.mode == constants.INSTANCE_IMPORT: - env["INSTANCE_SRC_NODE"] = self.op.src_node - env["INSTANCE_SRC_PATH"] = self.op.src_path - env["INSTANCE_SRC_IMAGES"] = self.src_images + env["SRC_NODE"] = self.op.src_node + env["SRC_PATH"] = self.op.src_path + env["SRC_IMAGES"] = self.src_images - env.update(_BuildInstanceHookEnv(name=self.op.instance_name, + env.update(_BuildInstanceHookEnv( + name=self.op.instance_name, primary_node=self.op.pnode, secondary_nodes=self.secondaries, - status=self.instance_status, + status=self.op.start, os_type=self.op.os_type, memory=self.be_full[constants.BE_MEMORY], vcpus=self.be_full[constants.BE_VCPUS], - nics=[(n.ip, n.bridge, n.mac) for n in self.nics], + nics=_NICListToTuple(self, self.nics), + disk_template=self.op.disk_template, + disks=[(d["size"], d["mode"]) for d in self.disks], + bep=self.be_full, + hvp=self.hv_full, + hypervisor=self.op.hypervisor, )) nl = ([self.cfg.GetMasterNode(), self.op.pnode] + @@ -4272,17 +4705,18 @@ class LUCreateInstance(LogicalUnit): raise errors.OpPrereqError("Cluster does not support lvm-based" " instances") - if self.op.mode == constants.INSTANCE_IMPORT: src_node = self.op.src_node src_path = self.op.src_path if src_node is None: - exp_list = self.rpc.call_export_list( - self.acquired_locks[locking.LEVEL_NODE]) + locked_nodes = self.acquired_locks[locking.LEVEL_NODE] + exp_list = self.rpc.call_export_list(locked_nodes) found = False for node in exp_list: - if not exp_list[node].failed and src_path in exp_list[node].data: + if exp_list[node].fail_msg: + continue + if src_path in exp_list[node].payload: found = True self.op.src_node = src_node = node self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR, @@ -4294,11 +4728,9 @@ class LUCreateInstance(LogicalUnit): _CheckNodeOnline(self, src_node) result = self.rpc.call_export_info(src_node, src_path) - result.Raise() - if not result.data: - raise errors.OpPrereqError("No export found in dir %s" % src_path) + result.Raise("No export or invalid export found in dir %s" % src_path) - export_info = result.data + export_info = objects.SerializableConfigParser.Loads(str(result.payload)) if not export_info.has_section(constants.INISECT_EXP): raise errors.ProgrammerError("Corrupted export config") @@ -4338,6 +4770,7 @@ class LUCreateInstance(LogicalUnit): nic_mac_ini = 'nic%d_mac' % idx nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini) + # ENDIF: self.op.mode == constants.INSTANCE_IMPORT # ip ping checks (we use the same ip that was resolved in ExpandNames) if self.op.start and not self.op.ip_check: raise errors.OpPrereqError("Cannot ignore IP address conflicts when" @@ -4348,6 +4781,18 @@ class LUCreateInstance(LogicalUnit): raise errors.OpPrereqError("IP %s of instance %s already in use" % (self.check_ip, self.op.instance_name)) + #### mac address generation + # By generating here the mac address both the allocator and the hooks get + # the real final mac address rather than the 'auto' or 'generate' value. + # There is a race condition between the generation and the instance object + # creation, which means that we know the mac is valid now, but we're not + # sure it will be when we actually add the instance. If things go bad + # adding the instance will abort because of a duplicate mac, and the + # creation job will fail. + for nic in self.nics: + if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE): + nic.mac = self.cfg.GenerateMAC() + #### allocator run if self.op.iallocator is not None: @@ -4362,6 +4807,9 @@ class LUCreateInstance(LogicalUnit): if pnode.offline: raise errors.OpPrereqError("Cannot use offline primary node '%s'" % pnode.name) + if pnode.drained: + raise errors.OpPrereqError("Cannot use drained primary node '%s'" % + pnode.name) self.secondaries = [] @@ -4373,8 +4821,9 @@ class LUCreateInstance(LogicalUnit): if self.op.snode == pnode.name: raise errors.OpPrereqError("The secondary node cannot be" " the primary node.") - self.secondaries.append(self.op.snode) _CheckNodeOnline(self, self.op.snode) + _CheckNodeNotDrained(self, self.op.snode) + self.secondaries.append(self.op.snode) nodenames = [pnode.name] + self.secondaries @@ -4387,37 +4836,25 @@ class LUCreateInstance(LogicalUnit): self.op.hypervisor) for node in nodenames: info = nodeinfo[node] - info.Raise() - info = info.data - if not info: - raise errors.OpPrereqError("Cannot get current information" - " from node '%s'" % node) + info.Raise("Cannot get current information from node %s" % node) + info = info.payload vg_free = info.get('vg_free', None) if not isinstance(vg_free, int): raise errors.OpPrereqError("Can't compute free disk space on" " node %s" % node) - if req_size > info['vg_free']: + if req_size > vg_free: raise errors.OpPrereqError("Not enough disk space on target node %s." " %d MB available, %d MB required" % - (node, info['vg_free'], req_size)) + (node, vg_free, req_size)) _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams) # os verification result = self.rpc.call_os_get(pnode.name, self.op.os_type) - result.Raise() - if not isinstance(result.data, objects.OS): - raise errors.OpPrereqError("OS '%s' not in supported os list for" - " primary node" % self.op.os_type) - - # bridge check on primary node - bridges = [n.bridge for n in self.nics] - result = self.rpc.call_bridges_exist(self.pnode.name, bridges) - result.Raise() - if not result.data: - raise errors.OpPrereqError("One of the target bridges '%s' does not" - " exist on destination node '%s'" % - (",".join(bridges), pnode.name)) + result.Raise("OS '%s' not in supported os list for primary node %s" % + (self.op.os_type, pnode.name), prereq=True) + + _CheckNicsBridgesExist(self, self.nics, self.pnode.name) # memory check on primary node if self.op.start: @@ -4426,10 +4863,7 @@ class LUCreateInstance(LogicalUnit): self.be_full[constants.BE_MEMORY], self.op.hypervisor) - if self.op.start: - self.instance_status = 'up' - else: - self.instance_status = 'down' + self.dry_run_result = list(nodenames) def Exec(self, feedback_fn): """Create and add the instance to the cluster. @@ -4438,10 +4872,6 @@ class LUCreateInstance(LogicalUnit): instance = self.op.instance_name pnode_name = self.pnode.name - for nic in self.nics: - if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE): - nic.mac = self.cfg.GenerateMAC() - ht_kind = self.op.hypervisor if ht_kind in constants.HTS_REQ_PORT: network_port = self.cfg.AllocatePort() @@ -4476,7 +4906,7 @@ class LUCreateInstance(LogicalUnit): primary_node=pnode_name, nics=self.nics, disks=disks, disk_template=self.op.disk_template, - status=self.instance_status, + admin_up=False, network_port=network_port, beparams=self.op.beparams, hvparams=self.op.hvparams, @@ -4500,8 +4930,6 @@ class LUCreateInstance(LogicalUnit): # Declare that we don't want to remove the instance lock anymore, as we've # added the instance to the config del self.remove_locks[locking.LEVEL_INSTANCE] - # Remove the temp. assignements for the instance's drbds - self.cfg.ReleaseDRBDMinors(instance) # Unlock all the nodes if self.op.mode == constants.INSTANCE_IMPORT: nodes_keep = [self.op.src_node] @@ -4537,12 +4965,9 @@ class LUCreateInstance(LogicalUnit): if iobj.disk_template != constants.DT_DISKLESS: if self.op.mode == constants.INSTANCE_CREATE: feedback_fn("* running the instance OS create scripts...") - result = self.rpc.call_instance_os_add(pnode_name, iobj) - msg = result.RemoteFailMsg() - if msg: - raise errors.OpExecError("Could not add os for instance %s" - " on node %s: %s" % - (instance, pnode_name, msg)) + result = self.rpc.call_instance_os_add(pnode_name, iobj, False) + result.Raise("Could not add os for instance %s" + " on node %s" % (instance, pnode_name)) elif self.op.mode == constants.INSTANCE_IMPORT: feedback_fn("* running the instance OS import scripts...") @@ -4552,24 +4977,24 @@ class LUCreateInstance(LogicalUnit): import_result = self.rpc.call_instance_os_import(pnode_name, iobj, src_node, src_images, cluster_name) - import_result.Raise() - for idx, result in enumerate(import_result.data): - if not result: - self.LogWarning("Could not import the image %s for instance" - " %s, disk %d, on node %s" % - (src_images[idx], instance, idx, pnode_name)) + msg = import_result.fail_msg + if msg: + self.LogWarning("Error while importing the disk images for instance" + " %s on node %s: %s" % (instance, pnode_name, msg)) else: # also checked in the prereq part raise errors.ProgrammerError("Unknown OS initialization mode '%s'" % self.op.mode) if self.op.start: + iobj.admin_up = True + self.cfg.Update(iobj) logging.info("Starting instance %s on node %s", instance, pnode_name) feedback_fn("* starting instance...") - result = self.rpc.call_instance_start(pnode_name, iobj, None) - msg = result.RemoteFailMsg() - if msg: - raise errors.OpExecError("Could not start instance: %s" % msg) + result = self.rpc.call_instance_start(pnode_name, iobj, None, None) + result.Raise("Could not start instance") + + return list(iobj.all_nodes) class LUConnectConsole(NoHooksLU): @@ -4606,15 +5031,20 @@ class LUConnectConsole(NoHooksLU): node_insts = self.rpc.call_instance_list([node], [instance.hypervisor])[node] - node_insts.Raise() + node_insts.Raise("Can't get node information from %s" % node) - if instance.name not in node_insts.data: + if instance.name not in node_insts.payload: raise errors.OpExecError("Instance %s is not running." % instance.name) logging.debug("Connecting to console of %s on %s", instance.name, node) hyper = hypervisor.GetHypervisor(instance.hypervisor) - console_cmd = hyper.GetShellCommandForConsole(instance) + cluster = self.cfg.GetClusterInfo() + # beparams and hvparams are passed separately, to avoid editing the + # instance and then saving the defaults in the instance itself. + hvparams = cluster.FillHV(instance) + beparams = cluster.FillBE(instance) + console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams) # build ssh cmdline return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True) @@ -4662,6 +5092,10 @@ class LUReplaceDisks(LogicalUnit): raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node) self.op.remote_node = remote_node + # Warning: do not remove the locking of the new secondary here + # unless DRBD8.AddChildren is changed to work in parallel; + # currently it doesn't since parallel invocations of + # FindUnusedMinor will conflict self.needed_locks[locking.LEVEL_NODE] = [remote_node] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND else: @@ -4767,6 +5201,7 @@ class LUReplaceDisks(LogicalUnit): n1 = self.new_node = remote_node n2 = self.oth_node = instance.primary_node self.tgt_node = self.sec_node + _CheckNodeNotDrained(self, remote_node) else: raise errors.ProgrammerError("Unhandled disk replace mode") @@ -4820,7 +5255,8 @@ class LUReplaceDisks(LogicalUnit): raise errors.OpExecError("Can't list volume groups on the nodes") for node in oth_node, tgt_node: res = results[node] - if res.failed or not res.data or my_vg not in res.data: + res.Raise("Error checking node %s" % node) + if my_vg not in res.payload: raise errors.OpExecError("Volume group '%s' not found on %s" % (my_vg, node)) for idx, dev in enumerate(instance.disks): @@ -4829,9 +5265,13 @@ class LUReplaceDisks(LogicalUnit): for node in tgt_node, oth_node: info("checking disk/%d on %s" % (idx, node)) cfg.SetDiskID(dev, node) - if not self.rpc.call_blockdev_find(node, dev): - raise errors.OpExecError("Can't find disk/%d on node %s" % - (idx, node)) + result = self.rpc.call_blockdev_find(node, dev) + msg = result.fail_msg + if not msg and not result.payload: + msg = "disk not found" + if msg: + raise errors.OpExecError("Can't find disk/%d on node %s: %s" % + (idx, node, msg)) # Step: check other node consistency self.proc.LogStep(2, steps_total, "check peer consistency") @@ -4874,10 +5314,8 @@ class LUReplaceDisks(LogicalUnit): for dev, old_lvs, new_lvs in iv_names.itervalues(): info("detaching %s drbd from local storage" % dev.iv_name) result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs) - result.Raise() - if not result.data: - raise errors.OpExecError("Can't detach drbd from local storage on node" - " %s for device %s" % (tgt_node, dev.iv_name)) + result.Raise("Can't detach drbd from local storage on node" + " %s for device %s" % (tgt_node, dev.iv_name)) #dev.children = [] #cfg.Update(instance) @@ -4894,22 +5332,19 @@ class LUReplaceDisks(LogicalUnit): # build the rename list based on what LVs exist on the node rlist = [] for to_ren in old_lvs: - find_res = self.rpc.call_blockdev_find(tgt_node, to_ren) - if not find_res.failed and find_res.data is not None: # device exists + result = self.rpc.call_blockdev_find(tgt_node, to_ren) + if not result.fail_msg and result.payload: + # device exists rlist.append((to_ren, ren_fn(to_ren, temp_suffix))) info("renaming the old LVs on the target node") result = self.rpc.call_blockdev_rename(tgt_node, rlist) - result.Raise() - if not result.data: - raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node) + result.Raise("Can't rename old LVs on node %s" % tgt_node) # now we rename the new LVs to the old LVs info("renaming the new LVs on the target node") rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)] result = self.rpc.call_blockdev_rename(tgt_node, rlist) - result.Raise() - if not result.data: - raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node) + result.Raise("Can't rename new LVs on node %s" % tgt_node) for old, new in zip(old_lvs, new_lvs): new.logical_id = old.logical_id @@ -4922,13 +5357,14 @@ class LUReplaceDisks(LogicalUnit): # now that the new lvs have the old name, we can add them to the device info("adding new mirror component on %s" % tgt_node) result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs) - if result.failed or not result.data: + msg = result.fail_msg + if msg: for new_lv in new_lvs: - result = self.rpc.call_blockdev_remove(tgt_node, new_lv) - if result.failed or not result.data: - warning("Can't rollback device %s", hint="manually cleanup unused" - " logical volumes") - raise errors.OpExecError("Can't add local storage to drbd") + msg2 = self.rpc.call_blockdev_remove(tgt_node, new_lv).fail_msg + if msg2: + warning("Can't rollback device %s: %s", dev, msg2, + hint="cleanup manually the unused logical volumes") + raise errors.OpExecError("Can't add local storage to drbd: %s" % msg) dev.children = new_lvs cfg.Update(instance) @@ -4945,7 +5381,13 @@ class LUReplaceDisks(LogicalUnit): for name, (dev, old_lvs, new_lvs) in iv_names.iteritems(): cfg.SetDiskID(dev, instance.primary_node) result = self.rpc.call_blockdev_find(instance.primary_node, dev) - if result.failed or result.data[5]: + msg = result.fail_msg + if not msg and not result.payload: + msg = "disk not found" + if msg: + raise errors.OpExecError("Can't find DRBD device %s: %s" % + (name, msg)) + if result.payload[5]: raise errors.OpExecError("DRBD device %s is degraded!" % name) # Step: remove old storage @@ -4954,9 +5396,10 @@ class LUReplaceDisks(LogicalUnit): info("remove logical volumes for %s" % name) for lv in old_lvs: cfg.SetDiskID(lv, tgt_node) - result = self.rpc.call_blockdev_remove(tgt_node, lv) - if result.failed or not result.data: - warning("Can't remove old LV", hint="manually remove unused LVs") + msg = self.rpc.call_blockdev_remove(tgt_node, lv).fail_msg + if msg: + warning("Can't remove old LV: %s" % msg, + hint="manually remove unused LVs") continue def _ExecD8Secondary(self, feedback_fn): @@ -5000,7 +5443,8 @@ class LUReplaceDisks(LogicalUnit): results = self.rpc.call_vg_list([pri_node, new_node]) for node in pri_node, new_node: res = results[node] - if res.failed or not res.data or my_vg not in res.data: + res.Raise("Error checking node %s" % node) + if my_vg not in res.payload: raise errors.OpExecError("Volume group '%s' not found on %s" % (my_vg, node)) for idx, dev in enumerate(instance.disks): @@ -5009,10 +5453,12 @@ class LUReplaceDisks(LogicalUnit): info("checking disk/%d on %s" % (idx, pri_node)) cfg.SetDiskID(dev, pri_node) result = self.rpc.call_blockdev_find(pri_node, dev) - result.Raise() - if not result.data: - raise errors.OpExecError("Can't find disk/%d on node %s" % - (idx, pri_node)) + msg = result.fail_msg + if not msg and not result.payload: + msg = "disk not found" + if msg: + raise errors.OpExecError("Can't find disk/%d on node %s: %s" % + (idx, pri_node, msg)) # Step: check other node consistency self.proc.LogStep(2, steps_total, "check peer consistency") @@ -5063,11 +5509,12 @@ class LUReplaceDisks(LogicalUnit): new_net_id) new_drbd = objects.Disk(dev_type=constants.LD_DRBD8, logical_id=new_alone_id, - children=dev.children) + children=dev.children, + size=dev.size) try: _CreateSingleBlockDev(self, new_node, instance, new_drbd, _GetInstanceInfoText(instance), False) - except errors.BlockDeviceError: + except errors.GenericError: self.cfg.ReleaseDRBDMinors(instance.name) raise @@ -5075,16 +5522,17 @@ class LUReplaceDisks(LogicalUnit): # we have new devices, shutdown the drbd on the old secondary info("shutting down drbd for disk/%d on old node" % idx) cfg.SetDiskID(dev, old_node) - result = self.rpc.call_blockdev_shutdown(old_node, dev) - if result.failed or not result.data: - warning("Failed to shutdown drbd for disk/%d on old node" % idx, + msg = self.rpc.call_blockdev_shutdown(old_node, dev).fail_msg + if msg: + warning("Failed to shutdown drbd for disk/%d on old node: %s" % + (idx, msg), hint="Please cleanup this device manually as soon as possible") info("detaching primary drbds from the network (=> standalone)") result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip, instance.disks)[pri_node] - msg = result.RemoteFailMsg() + msg = result.fail_msg if msg: # detaches didn't succeed (unlikely) self.cfg.ReleaseDRBDMinors(instance.name) @@ -5098,9 +5546,6 @@ class LUReplaceDisks(LogicalUnit): dev.logical_id = new_logical_id cfg.SetDiskID(dev, pri_node) cfg.Update(instance) - # we can remove now the temp minors as now the new values are - # written to the config file (and therefore stable) - self.cfg.ReleaseDRBDMinors(instance.name) # and now perform the drbd attach info("attaching primary drbds to new secondary (standalone => connected)") @@ -5108,7 +5553,7 @@ class LUReplaceDisks(LogicalUnit): instance.disks, instance.name, False) for to_node, to_result in result.items(): - msg = to_result.RemoteFailMsg() + msg = to_result.fail_msg if msg: warning("can't attach drbd disks on node %s: %s", to_node, msg, hint="please do a gnt-instance info to see the" @@ -5124,8 +5569,13 @@ class LUReplaceDisks(LogicalUnit): for idx, (dev, old_lvs, _) in iv_names.iteritems(): cfg.SetDiskID(dev, pri_node) result = self.rpc.call_blockdev_find(pri_node, dev) - result.Raise() - if result.data[5]: + msg = result.fail_msg + if not msg and not result.payload: + msg = "disk not found" + if msg: + raise errors.OpExecError("Can't find DRBD device disk/%d: %s" % + (idx, msg)) + if result.payload[5]: raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx) self.proc.LogStep(6, steps_total, "removing old storage") @@ -5133,9 +5583,9 @@ class LUReplaceDisks(LogicalUnit): info("remove logical volumes for disk/%d" % idx) for lv in old_lvs: cfg.SetDiskID(lv, old_node) - result = self.rpc.call_blockdev_remove(old_node, lv) - if result.failed or not result.data: - warning("Can't remove LV on old secondary", + msg = self.rpc.call_blockdev_remove(old_node, lv).fail_msg + if msg: + warning("Can't remove LV on old secondary: %s", msg, hint="Cleanup stale volumes by hand") def Exec(self, feedback_fn): @@ -5147,7 +5597,7 @@ class LUReplaceDisks(LogicalUnit): instance = self.instance # Activate the instance disks if we're replacing them on a down instance - if instance.status == "down": + if not instance.admin_up: _StartInstanceDisks(self, instance, True) if self.op.mode == constants.REPLACE_DISK_CHG: @@ -5158,7 +5608,7 @@ class LUReplaceDisks(LogicalUnit): ret = fn(feedback_fn) # Deactivate the instance disks if we're replacing them on a down instance - if instance.status == "down": + if not instance.admin_up: _SafeShutdownInstanceDisks(self, instance) return ret @@ -5225,10 +5675,8 @@ class LUGrowDisk(LogicalUnit): instance.hypervisor) for node in nodenames: info = nodeinfo[node] - if info.failed or not info.data: - raise errors.OpPrereqError("Cannot get current information" - " from node '%s'" % node) - vg_free = info.data.get('vg_free', None) + info.Raise("Cannot get current information from node %s" % node) + vg_free = info.payload.get('vg_free', None) if not isinstance(vg_free, int): raise errors.OpPrereqError("Can't compute free disk space on" " node %s" % node) @@ -5246,13 +5694,7 @@ class LUGrowDisk(LogicalUnit): for node in instance.all_nodes: self.cfg.SetDiskID(disk, node) result = self.rpc.call_blockdev_grow(node, disk, self.op.amount) - result.Raise() - if (not result.data or not isinstance(result.data, (list, tuple)) or - len(result.data) != 2): - raise errors.OpExecError("Grow request failed to node %s" % node) - elif not result.data[0]: - raise errors.OpExecError("Grow request failed to node %s: %s" % - (node, result.data[1])) + result.Raise("Grow request failed to node %s" % node) disk.RecordGrow(self.op.amount) self.cfg.Update(instance) if self.op.wait_for_sync: @@ -5316,8 +5758,11 @@ class LUQueryInstanceData(NoHooksLU): if not static: self.cfg.SetDiskID(dev, instance.primary_node) dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev) - dev_pstatus.Raise() - dev_pstatus = dev_pstatus.data + if dev_pstatus.offline: + dev_pstatus = None + else: + dev_pstatus.Raise("Can't compute disk status for %s" % instance.name) + dev_pstatus = dev_pstatus.payload else: dev_pstatus = None @@ -5331,8 +5776,11 @@ class LUQueryInstanceData(NoHooksLU): if snode and not static: self.cfg.SetDiskID(dev, snode) dev_sstatus = self.rpc.call_blockdev_find(snode, dev) - dev_sstatus.Raise() - dev_sstatus = dev_sstatus.data + if dev_sstatus.offline: + dev_sstatus = None + else: + dev_sstatus.Raise("Can't compute disk status for %s" % instance.name) + dev_sstatus = dev_sstatus.payload else: dev_sstatus = None @@ -5366,18 +5814,18 @@ class LUQueryInstanceData(NoHooksLU): remote_info = self.rpc.call_instance_info(instance.primary_node, instance.name, instance.hypervisor) - remote_info.Raise() - remote_info = remote_info.data + remote_info.Raise("Error checking node %s" % instance.primary_node) + remote_info = remote_info.payload if remote_info and "state" in remote_info: remote_state = "up" else: remote_state = "down" else: remote_state = None - if instance.status == "down": - config_state = "down" - else: + if instance.admin_up: config_state = "up" + else: + config_state = "down" disks = [self._ComputeDiskStatus(instance, None, device) for device in instance.disks] @@ -5389,7 +5837,8 @@ class LUQueryInstanceData(NoHooksLU): "pnode": instance.primary_node, "snodes": instance.secondary_nodes, "os": instance.os, - "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics], + # this happens to be the same format used for hooks + "nics": _NICListToTuple(self, instance.nics), "disks": disks, "hypervisor": instance.hypervisor, "network_port": instance.network_port, @@ -5427,8 +5876,6 @@ class LUSetInstanceParams(LogicalUnit): self.op.hvparams or self.op.beparams): raise errors.OpPrereqError("No changes submitted") - utils.CheckBEParams(self.op.beparams) - # Disk validation disk_addremove = 0 for disk_op, disk_dict in self.op.disks: @@ -5442,7 +5889,7 @@ class LUSetInstanceParams(LogicalUnit): raise errors.OpPrereqError("Invalid disk index") if disk_op == constants.DDM_ADD: mode = disk_dict.setdefault('mode', constants.DISK_RDWR) - if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR): + if mode not in constants.DISK_ACCESS_SET: raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode) size = disk_dict.get('size', None) if size is None: @@ -5478,24 +5925,36 @@ class LUSetInstanceParams(LogicalUnit): # nic_dict should be a dict nic_ip = nic_dict.get('ip', None) if nic_ip is not None: - if nic_ip.lower() == "none": + if nic_ip.lower() == constants.VALUE_NONE: nic_dict['ip'] = None else: if not utils.IsValidIP(nic_ip): raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip) - # we can only check None bridges and assign the default one + nic_bridge = nic_dict.get('bridge', None) - if nic_bridge is None: - nic_dict['bridge'] = self.cfg.GetDefBridge() - # but we can validate MACs - nic_mac = nic_dict.get('mac', None) - if nic_mac is not None: - if self.cfg.IsMacInUse(nic_mac): - raise errors.OpPrereqError("MAC address %s already in use" - " in cluster" % nic_mac) + nic_link = nic_dict.get('link', None) + if nic_bridge and nic_link: + raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'" + " at the same time") + elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE: + nic_dict['bridge'] = None + elif nic_link and nic_link.lower() == constants.VALUE_NONE: + nic_dict['link'] = None + + if nic_op == constants.DDM_ADD: + nic_mac = nic_dict.get('mac', None) + if nic_mac is None: + nic_dict['mac'] = constants.VALUE_AUTO + + if 'mac' in nic_dict: + nic_mac = nic_dict['mac'] if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE): if not utils.IsValidMac(nic_mac): raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac) + if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO: + raise errors.OpPrereqError("'auto' is not a valid MAC address when" + " modifying an existing nic") + if nic_addremove > 1: raise errors.OpPrereqError("Only one NIC add or remove operation" " supported at a time") @@ -5520,11 +5979,78 @@ class LUSetInstanceParams(LogicalUnit): args['memory'] = self.be_new[constants.BE_MEMORY] if constants.BE_VCPUS in self.be_new: args['vcpus'] = self.be_new[constants.BE_VCPUS] - # FIXME: readd disk/nic changes + # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk + # information at all. + if self.op.nics: + args['nics'] = [] + nic_override = dict(self.op.nics) + c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT] + for idx, nic in enumerate(self.instance.nics): + if idx in nic_override: + this_nic_override = nic_override[idx] + else: + this_nic_override = {} + if 'ip' in this_nic_override: + ip = this_nic_override['ip'] + else: + ip = nic.ip + if 'mac' in this_nic_override: + mac = this_nic_override['mac'] + else: + mac = nic.mac + if idx in self.nic_pnew: + nicparams = self.nic_pnew[idx] + else: + nicparams = objects.FillDict(c_nicparams, nic.nicparams) + mode = nicparams[constants.NIC_MODE] + link = nicparams[constants.NIC_LINK] + args['nics'].append((ip, mac, mode, link)) + if constants.DDM_ADD in nic_override: + ip = nic_override[constants.DDM_ADD].get('ip', None) + mac = nic_override[constants.DDM_ADD]['mac'] + nicparams = self.nic_pnew[constants.DDM_ADD] + mode = nicparams[constants.NIC_MODE] + link = nicparams[constants.NIC_LINK] + args['nics'].append((ip, mac, mode, link)) + elif constants.DDM_REMOVE in nic_override: + del args['nics'][-1] + env = _BuildInstanceHookEnvByObject(self, self.instance, override=args) nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) return env, nl, nl + def _GetUpdatedParams(self, old_params, update_dict, + default_values, parameter_types): + """Return the new params dict for the given params. + + @type old_params: dict + @param old_params: old parameters + @type update_dict: dict + @param update_dict: dict containing new parameter values, + or constants.VALUE_DEFAULT to reset the + parameter to its default value + @type default_values: dict + @param default_values: default values for the filled parameters + @type parameter_types: dict + @param parameter_types: dict mapping target dict keys to types + in constants.ENFORCEABLE_TYPES + @rtype: (dict, dict) + @return: (new_parameters, filled_parameters) + + """ + params_copy = copy.deepcopy(old_params) + for key, val in update_dict.iteritems(): + if val == constants.VALUE_DEFAULT: + try: + del params_copy[key] + except KeyError: + pass + else: + params_copy[key] = val + utils.ForceDictType(params_copy, parameter_types) + params_filled = objects.FillDict(default_values, params_copy) + return (params_copy, params_filled) + def CheckPrereq(self): """Check prerequisites. @@ -5536,6 +6062,7 @@ class LUSetInstanceParams(LogicalUnit): # checking the new params on the primary/secondary nodes instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name) + cluster = self.cluster = self.cfg.GetClusterInfo() assert self.instance is not None, \ "Cannot retrieve locked instance %s" % self.op.instance_name pnode = instance.primary_node @@ -5543,20 +6070,10 @@ class LUSetInstanceParams(LogicalUnit): # hvparams processing if self.op.hvparams: - i_hvdict = copy.deepcopy(instance.hvparams) - for key, val in self.op.hvparams.iteritems(): - if val == constants.VALUE_DEFAULT: - try: - del i_hvdict[key] - except KeyError: - pass - elif val == constants.VALUE_NONE: - i_hvdict[key] = None - else: - i_hvdict[key] = val - cluster = self.cfg.GetClusterInfo() - hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor], - i_hvdict) + i_hvdict, hv_new = self._GetUpdatedParams( + instance.hvparams, self.op.hvparams, + cluster.hvparams[instance.hypervisor], + constants.HVS_PARAMETER_TYPES) # local check hypervisor.GetHypervisor( instance.hypervisor).CheckParameterSyntax(hv_new) @@ -5568,18 +6085,10 @@ class LUSetInstanceParams(LogicalUnit): # beparams processing if self.op.beparams: - i_bedict = copy.deepcopy(instance.beparams) - for key, val in self.op.beparams.iteritems(): - if val == constants.VALUE_DEFAULT: - try: - del i_bedict[key] - except KeyError: - pass - else: - i_bedict[key] = val - cluster = self.cfg.GetClusterInfo() - be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT], - i_bedict) + i_bedict, be_new = self._GetUpdatedParams( + instance.beparams, self.op.beparams, + cluster.beparams[constants.PP_DEFAULT], + constants.BES_PARAMETER_TYPES) self.be_new = be_new # the new actual values self.be_inst = i_bedict # the new dict (without defaults) else: @@ -5596,35 +6105,51 @@ class LUSetInstanceParams(LogicalUnit): instance.hypervisor) nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(), instance.hypervisor) - if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict): + pninfo = nodeinfo[pnode] + msg = pninfo.fail_msg + if msg: # Assume the primary node is unreachable and go ahead - self.warn.append("Can't get info from primary node %s" % pnode) + self.warn.append("Can't get info from primary node %s: %s" % + (pnode, msg)) + elif not isinstance(pninfo.payload.get('memory_free', None), int): + self.warn.append("Node data from primary node %s doesn't contain" + " free memory information" % pnode) + elif instance_info.fail_msg: + self.warn.append("Can't get instance runtime information: %s" % + instance_info.fail_msg) else: - if not instance_info.failed and instance_info.data: - current_mem = instance_info.data['memory'] + if instance_info.payload: + current_mem = int(instance_info.payload['memory']) else: # Assume instance not running # (there is a slight race condition here, but it's not very probable, # and we have no other way to check) current_mem = 0 miss_mem = (be_new[constants.BE_MEMORY] - current_mem - - nodeinfo[pnode].data['memory_free']) + pninfo.payload['memory_free']) if miss_mem > 0: raise errors.OpPrereqError("This change will prevent the instance" " from starting, due to %d MB of memory" " missing on its primary node" % miss_mem) if be_new[constants.BE_AUTO_BALANCE]: - for node, nres in nodeinfo.iteritems(): + for node, nres in nodeinfo.items(): if node not in instance.secondary_nodes: continue - if nres.failed or not isinstance(nres.data, dict): - self.warn.append("Can't get info from secondary node %s" % node) - elif be_new[constants.BE_MEMORY] > nres.data['memory_free']: + msg = nres.fail_msg + if msg: + self.warn.append("Can't get info from secondary node %s: %s" % + (node, msg)) + elif not isinstance(nres.payload.get('memory_free', None), int): + self.warn.append("Secondary node %s didn't return free" + " memory information" % node) + elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']: self.warn.append("Not enough memory to failover instance to" " secondary node %s" % node) # NIC processing + self.nic_pnew = {} + self.nic_pinst = {} for nic_op, nic_dict in self.op.nics: if nic_op == constants.DDM_REMOVE: if not instance.nics: @@ -5636,15 +6161,57 @@ class LUSetInstanceParams(LogicalUnit): raise errors.OpPrereqError("Invalid NIC index %s, valid values" " are 0 to %d" % (nic_op, len(instance.nics))) - nic_bridge = nic_dict.get('bridge', None) - if nic_bridge is not None: - if not self.rpc.call_bridges_exist(pnode, [nic_bridge]): - msg = ("Bridge '%s' doesn't exist on one of" - " the instance nodes" % nic_bridge) + old_nic_params = instance.nics[nic_op].nicparams + old_nic_ip = instance.nics[nic_op].ip + else: + old_nic_params = {} + old_nic_ip = None + + update_params_dict = dict([(key, nic_dict[key]) + for key in constants.NICS_PARAMETERS + if key in nic_dict]) + + if 'bridge' in nic_dict: + update_params_dict[constants.NIC_LINK] = nic_dict['bridge'] + + new_nic_params, new_filled_nic_params = \ + self._GetUpdatedParams(old_nic_params, update_params_dict, + cluster.nicparams[constants.PP_DEFAULT], + constants.NICS_PARAMETER_TYPES) + objects.NIC.CheckParameterSyntax(new_filled_nic_params) + self.nic_pinst[nic_op] = new_nic_params + self.nic_pnew[nic_op] = new_filled_nic_params + new_nic_mode = new_filled_nic_params[constants.NIC_MODE] + + if new_nic_mode == constants.NIC_MODE_BRIDGED: + nic_bridge = new_filled_nic_params[constants.NIC_LINK] + msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg + if msg: + msg = "Error checking bridges on node %s: %s" % (pnode, msg) if self.force: self.warn.append(msg) else: raise errors.OpPrereqError(msg) + if new_nic_mode == constants.NIC_MODE_ROUTED: + if 'ip' in nic_dict: + nic_ip = nic_dict['ip'] + else: + nic_ip = old_nic_ip + if nic_ip is None: + raise errors.OpPrereqError('Cannot set the nic ip to None' + ' on a routed nic') + if 'mac' in nic_dict: + nic_mac = nic_dict['mac'] + if nic_mac is None: + raise errors.OpPrereqError('Cannot set the nic mac to None') + elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE): + # otherwise generate the mac + nic_dict['mac'] = self.cfg.GenerateMAC() + else: + # or validate/reserve the current one + if self.cfg.IsMacInUse(nic_mac): + raise errors.OpPrereqError("MAC address %s already in use" + " in cluster" % nic_mac) # DISK processing if self.op.disks and instance.disk_template == constants.DT_DISKLESS: @@ -5657,9 +6224,11 @@ class LUSetInstanceParams(LogicalUnit): " an instance") ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor]) ins_l = ins_l[pnode] - if ins_l.failed or not isinstance(ins_l.data, list): - raise errors.OpPrereqError("Can't contact node '%s'" % pnode) - if instance.name in ins_l.data: + msg = ins_l.fail_msg + if msg: + raise errors.OpPrereqError("Can't contact node %s: %s" % + (pnode, msg)) + if instance.name in ins_l.payload: raise errors.OpPrereqError("Instance is running, can't remove" " disks.") @@ -5689,6 +6258,7 @@ class LUSetInstanceParams(LogicalUnit): result = [] instance = self.instance + cluster = self.cluster # disk changes for disk_op, disk_dict in self.op.disks: if disk_op == constants.DDM_REMOVE: @@ -5697,10 +6267,10 @@ class LUSetInstanceParams(LogicalUnit): device_idx = len(instance.disks) for node, disk in device.ComputeNodeTree(instance.primary_node): self.cfg.SetDiskID(disk, node) - rpc_result = self.rpc.call_blockdev_remove(node, disk) - if rpc_result.failed or not rpc_result.data: - self.proc.LogWarning("Could not remove disk/%d on node %s," - " continuing anyway", device_idx, node) + msg = self.rpc.call_blockdev_remove(node, disk).fail_msg + if msg: + self.LogWarning("Could not remove disk/%d on node %s: %s," + " continuing anyway", device_idx, node, msg) result.append(("disk/%d" % device_idx, "remove")) elif disk_op == constants.DDM_ADD: # add a new disk @@ -5712,13 +6282,12 @@ class LUSetInstanceParams(LogicalUnit): disk_idx_base = len(instance.disks) new_disk = _GenerateDiskTemplate(self, instance.disk_template, - instance, instance.primary_node, + instance.name, instance.primary_node, instance.secondary_nodes, [disk_dict], file_path, file_driver, disk_idx_base)[0] - new_disk.mode = disk_dict['mode'] instance.disks.append(new_disk) info = _GetInstanceInfoText(instance) @@ -5748,29 +6317,30 @@ class LUSetInstanceParams(LogicalUnit): del instance.nics[-1] result.append(("nic.%d" % len(instance.nics), "remove")) elif nic_op == constants.DDM_ADD: - # add a new nic - if 'mac' not in nic_dict: - mac = constants.VALUE_GENERATE - else: - mac = nic_dict['mac'] - if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE): - mac = self.cfg.GenerateMAC() - new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None), - bridge=nic_dict.get('bridge', None)) + # mac and bridge should be set, by now + mac = nic_dict['mac'] + ip = nic_dict.get('ip', None) + nicparams = self.nic_pinst[constants.DDM_ADD] + new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams) instance.nics.append(new_nic) result.append(("nic.%d" % (len(instance.nics) - 1), - "add:mac=%s,ip=%s,bridge=%s" % - (new_nic.mac, new_nic.ip, new_nic.bridge))) + "add:mac=%s,ip=%s,mode=%s,link=%s" % + (new_nic.mac, new_nic.ip, + self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE], + self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK] + ))) else: - # change a given nic - for key in 'mac', 'ip', 'bridge': + for key in 'mac', 'ip': if key in nic_dict: setattr(instance.nics[nic_op], key, nic_dict[key]) - result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key])) + if nic_op in self.nic_pnew: + instance.nics[nic_op].nicparams = self.nic_pnew[nic_op] + for key, val in nic_dict.iteritems(): + result.append(("nic.%s/%d" % (key, nic_op), val)) # hvparams changes if self.op.hvparams: - instance.hvparams = self.hv_new + instance.hvparams = self.hv_inst for key, val in self.op.hvparams.iteritems(): result.append(("hv/%s" % key, val)) @@ -5819,10 +6389,10 @@ class LUQueryExports(NoHooksLU): rpcresult = self.rpc.call_export_list(self.nodes) result = {} for node in rpcresult: - if rpcresult[node].failed: + if rpcresult[node].fail_msg: result[node] = False else: - result[node] = rpcresult[node].data + result[node] = rpcresult[node].payload return result @@ -5886,6 +6456,7 @@ class LUExportInstance(LogicalUnit): # This is wrong node name, not a non-locked node raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node) _CheckNodeOnline(self, self.dst_node.name) + _CheckNodeNotDrained(self, self.dst_node.name) # instance disk type verification for disk in self.instance.disks: @@ -5903,10 +6474,8 @@ class LUExportInstance(LogicalUnit): if self.op.shutdown: # shutdown the instance, but not the disks result = self.rpc.call_instance_shutdown(src_node, instance) - result.Raise() - if not result.data: - raise errors.OpExecError("Could not shutdown instance %s on node %s" % - (instance.name, src_node)) + result.Raise("Could not shutdown instance %s on" + " node %s" % (instance.name, src_node)) vgname = self.cfg.GetVGName() @@ -5918,24 +6487,25 @@ class LUExportInstance(LogicalUnit): self.cfg.SetDiskID(disk, src_node) try: - for disk in instance.disks: - # new_dev_name will be a snapshot of an lvm leaf of the one we passed - new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk) - if new_dev_name.failed or not new_dev_name.data: - self.LogWarning("Could not snapshot block device %s on node %s", - disk.logical_id[1], src_node) + for idx, disk in enumerate(instance.disks): + # result.payload will be a snapshot of an lvm leaf of the one we passed + result = self.rpc.call_blockdev_snapshot(src_node, disk) + msg = result.fail_msg + if msg: + self.LogWarning("Could not snapshot disk/%s on node %s: %s", + idx, src_node, msg) snap_disks.append(False) else: + disk_id = (vgname, result.payload) new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size, - logical_id=(vgname, new_dev_name.data), - physical_id=(vgname, new_dev_name.data), + logical_id=disk_id, physical_id=disk_id, iv_name=disk.iv_name) snap_disks.append(new_dev) finally: - if self.op.shutdown and instance.status == "up": - result = self.rpc.call_instance_start(src_node, instance, None) - msg = result.RemoteFailMsg() + if self.op.shutdown and instance.admin_up: + result = self.rpc.call_instance_start(src_node, instance, None, None) + msg = result.fail_msg if msg: _ShutdownInstanceDisks(self, instance) raise errors.OpExecError("Could not start instance: %s" % msg) @@ -5947,19 +6517,20 @@ class LUExportInstance(LogicalUnit): if dev: result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name, instance, cluster_name, idx) - if result.failed or not result.data: - self.LogWarning("Could not export block device %s from node %s to" - " node %s", dev.logical_id[1], src_node, - dst_node.name) - result = self.rpc.call_blockdev_remove(src_node, dev) - if result.failed or not result.data: - self.LogWarning("Could not remove snapshot block device %s from node" - " %s", dev.logical_id[1], src_node) + msg = result.fail_msg + if msg: + self.LogWarning("Could not export disk/%s from node %s to" + " node %s: %s", idx, src_node, dst_node.name, msg) + msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg + if msg: + self.LogWarning("Could not remove snapshot for disk/%d from node" + " %s: %s", idx, src_node, msg) result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks) - if result.failed or not result.data: - self.LogWarning("Could not finalize export for instance %s on node %s", - instance.name, dst_node.name) + msg = result.fail_msg + if msg: + self.LogWarning("Could not finalize export for instance %s" + " on node %s: %s", instance.name, dst_node.name, msg) nodelist = self.cfg.GetNodeList() nodelist.remove(dst_node.name) @@ -5967,15 +6538,17 @@ class LUExportInstance(LogicalUnit): # on one-node clusters nodelist will be empty after the removal # if we proceed the backup would be removed because OpQueryExports # substitutes an empty list with the full cluster node list. + iname = instance.name if nodelist: exportlist = self.rpc.call_export_list(nodelist) for node in exportlist: - if exportlist[node].failed: + if exportlist[node].fail_msg: continue - if instance.name in exportlist[node].data: - if not self.rpc.call_export_remove(node, instance.name): + if iname in exportlist[node].payload: + msg = self.rpc.call_export_remove(node, iname).fail_msg + if msg: self.LogWarning("Could not remove older export for instance %s" - " on node %s", instance.name, node) + " on node %s: %s", iname, node, msg) class LURemoveExport(NoHooksLU): @@ -6009,19 +6582,21 @@ class LURemoveExport(NoHooksLU): fqdn_warn = True instance_name = self.op.instance_name - exportlist = self.rpc.call_export_list(self.acquired_locks[ - locking.LEVEL_NODE]) + locked_nodes = self.acquired_locks[locking.LEVEL_NODE] + exportlist = self.rpc.call_export_list(locked_nodes) found = False for node in exportlist: - if exportlist[node].failed: - self.LogWarning("Failed to query node %s, continuing" % node) + msg = exportlist[node].fail_msg + if msg: + self.LogWarning("Failed to query node %s (continuing): %s", node, msg) continue - if instance_name in exportlist[node].data: + if instance_name in exportlist[node].payload: found = True result = self.rpc.call_export_remove(node, instance_name) - if result.failed or not result.data: + msg = result.fail_msg + if msg: logging.error("Could not remove export for instance %s" - " on node %s", instance_name, node) + " on node %s: %s", instance_name, node, msg) if fqdn_warn and not found: feedback_fn("Export not found. If trying to remove an export belonging" @@ -6233,13 +6808,8 @@ class LUTestDelay(NoHooksLU): raise errors.OpExecError("Error during master delay test") if self.op.on_nodes: result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration) - if not result: - raise errors.OpExecError("Complete failure from rpc call") for node, node_result in result.items(): - node_result.Raise() - if not node_result.data: - raise errors.OpExecError("Failure during rpc call to node %s," - " result: %s" % (node, node_result.data)) + node_result.Raise("Failure during rpc call to node %s" % node) class IAllocator(object): @@ -6306,10 +6876,10 @@ class IAllocator(object): cluster_info = cfg.GetClusterInfo() # cluster data data = { - "version": 1, + "version": constants.IALLOCATOR_VERSION, "cluster_name": cfg.GetClusterName(), "cluster_tags": list(cluster_info.GetTags()), - "enable_hypervisors": list(cluster_info.enabled_hypervisors), + "enabled_hypervisors": list(cluster_info.enabled_hypervisors), # we don't have job IDs } iinfo = cfg.GetAllInstancesInfo().values() @@ -6328,72 +6898,93 @@ class IAllocator(object): hypervisor_name) node_iinfo = self.lu.rpc.call_all_instances_info(node_list, cluster_info.enabled_hypervisors) - for nname in node_list: + for nname, nresult in node_data.items(): + # first fill in static (config-based) values ninfo = cfg.GetNodeInfo(nname) - node_data[nname].Raise() - if not isinstance(node_data[nname].data, dict): - raise errors.OpExecError("Can't get data for node %s" % nname) - remote_info = node_data[nname].data - for attr in ['memory_total', 'memory_free', 'memory_dom0', - 'vg_size', 'vg_free', 'cpu_total']: - if attr not in remote_info: - raise errors.OpExecError("Node '%s' didn't return attribute '%s'" % - (nname, attr)) - try: - remote_info[attr] = int(remote_info[attr]) - except ValueError, err: - raise errors.OpExecError("Node '%s' returned invalid value for '%s':" - " %s" % (nname, attr, str(err))) - # compute memory used by primary instances - i_p_mem = i_p_up_mem = 0 - for iinfo, beinfo in i_list: - if iinfo.primary_node == nname: - i_p_mem += beinfo[constants.BE_MEMORY] - if iinfo.name not in node_iinfo[nname]: - i_used_mem = 0 - else: - i_used_mem = int(node_iinfo[nname][iinfo.name]['memory']) - i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem - remote_info['memory_free'] -= max(0, i_mem_diff) - - if iinfo.status == "up": - i_p_up_mem += beinfo[constants.BE_MEMORY] - - # compute memory used by instances pnr = { "tags": list(ninfo.GetTags()), - "total_memory": remote_info['memory_total'], - "reserved_memory": remote_info['memory_dom0'], - "free_memory": remote_info['memory_free'], - "i_pri_memory": i_p_mem, - "i_pri_up_memory": i_p_up_mem, - "total_disk": remote_info['vg_size'], - "free_disk": remote_info['vg_free'], "primary_ip": ninfo.primary_ip, "secondary_ip": ninfo.secondary_ip, - "total_cpus": remote_info['cpu_total'], "offline": ninfo.offline, + "drained": ninfo.drained, + "master_candidate": ninfo.master_candidate, } + + if not ninfo.offline: + nresult.Raise("Can't get data for node %s" % nname) + node_iinfo[nname].Raise("Can't get node instance info from node %s" % + nname) + remote_info = nresult.payload + for attr in ['memory_total', 'memory_free', 'memory_dom0', + 'vg_size', 'vg_free', 'cpu_total']: + if attr not in remote_info: + raise errors.OpExecError("Node '%s' didn't return attribute" + " '%s'" % (nname, attr)) + if not isinstance(remote_info[attr], int): + raise errors.OpExecError("Node '%s' returned invalid value" + " for '%s': %s" % + (nname, attr, remote_info[attr])) + # compute memory used by primary instances + i_p_mem = i_p_up_mem = 0 + for iinfo, beinfo in i_list: + if iinfo.primary_node == nname: + i_p_mem += beinfo[constants.BE_MEMORY] + if iinfo.name not in node_iinfo[nname].payload: + i_used_mem = 0 + else: + i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory']) + i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem + remote_info['memory_free'] -= max(0, i_mem_diff) + + if iinfo.admin_up: + i_p_up_mem += beinfo[constants.BE_MEMORY] + + # compute memory used by instances + pnr_dyn = { + "total_memory": remote_info['memory_total'], + "reserved_memory": remote_info['memory_dom0'], + "free_memory": remote_info['memory_free'], + "total_disk": remote_info['vg_size'], + "free_disk": remote_info['vg_free'], + "total_cpus": remote_info['cpu_total'], + "i_pri_memory": i_p_mem, + "i_pri_up_memory": i_p_up_mem, + } + pnr.update(pnr_dyn) + node_results[nname] = pnr data["nodes"] = node_results # instance data instance_data = {} for iinfo, beinfo in i_list: - nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge} - for n in iinfo.nics] + nic_data = [] + for nic in iinfo.nics: + filled_params = objects.FillDict( + cluster_info.nicparams[constants.PP_DEFAULT], + nic.nicparams) + nic_dict = {"mac": nic.mac, + "ip": nic.ip, + "mode": filled_params[constants.NIC_MODE], + "link": filled_params[constants.NIC_LINK], + } + if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED: + nic_dict["bridge"] = filled_params[constants.NIC_LINK] + nic_data.append(nic_dict) pir = { "tags": list(iinfo.GetTags()), - "should_run": iinfo.status == "up", + "admin_up": iinfo.admin_up, "vcpus": beinfo[constants.BE_VCPUS], "memory": beinfo[constants.BE_MEMORY], "os": iinfo.os, - "nodes": list(iinfo.all_nodes), + "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes), "nics": nic_data, - "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks], + "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks], "disk_template": iinfo.disk_template, "hypervisor": iinfo.hypervisor, } + pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template, + pir["disks"]) instance_data[iinfo.name] = pir data["instances"] = instance_data @@ -6411,8 +7002,6 @@ class IAllocator(object): """ data = self.in_data - if len(self.disks) != 2: - raise errors.OpExecError("Only two-disk configurations supported") disk_space = _ComputeDiskSize(self.disk_template, self.disks) @@ -6491,19 +7080,9 @@ class IAllocator(object): data = self.in_text result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text) - result.Raise() - - if not isinstance(result.data, (list, tuple)) or len(result.data) != 4: - raise errors.OpExecError("Invalid result from master iallocator runner") - - rcode, stdout, stderr, fail = result.data + result.Raise("Failure while running the iallocator script") - if rcode == constants.IARUN_NOTFOUND: - raise errors.OpExecError("Can't find allocator '%s'" % name) - elif rcode == constants.IARUN_FAILURE: - raise errors.OpExecError("Instance allocator call failed: %s," - " output: %s" % (fail, stdout+stderr)) - self.out_text = stdout + self.out_text = result.payload if validate: self._ValidateResult() @@ -6569,8 +7148,6 @@ class LUTestAllocator(NoHooksLU): " 'nics' parameter") if not isinstance(self.op.disks, list): raise errors.OpPrereqError("Invalid parameter 'disks'") - if len(self.op.disks) != 2: - raise errors.OpPrereqError("Only two-disk configurations supported") for row in self.op.disks: if (not isinstance(row, dict) or "size" not in row or @@ -6579,7 +7156,7 @@ class LUTestAllocator(NoHooksLU): row["mode"] not in ['r', 'w']): raise errors.OpPrereqError("Invalid contents of the" " 'disks' parameter") - if self.op.hypervisor is None: + if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None: self.op.hypervisor = self.cfg.GetHypervisorType() elif self.op.mode == constants.IALLOCATOR_MODE_RELOC: if not hasattr(self.op, "name"):