X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/c5312a10fae803c1b5fba0cbe9494776ce800fef..f39b695a38d891e17403c1c604d0246a5b2837b1:/lib/cmdlib.py diff --git a/lib/cmdlib.py b/lib/cmdlib.py index 798d80c..41ba0e4 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -40,6 +40,7 @@ import socket import tempfile import shutil import itertools +import operator from ganeti import ssh from ganeti import utils @@ -62,19 +63,6 @@ from ganeti import ht import ganeti.masterd.instance # pylint: disable-msg=W0611 -def _SupportsOob(cfg, node): - """Tells if node supports OOB. - - @type cfg: L{config.ConfigWriter} - @param cfg: The cluster configuration - @type node: L{objects.Node} - @param node: The node - @return: The OOB script if supported or an empty string otherwise - - """ - return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM] - - class ResultWithJobs: """Data container for LU results with jobs. @@ -560,6 +548,26 @@ class _QueryBase: sort_by_name=self.sort_by_name) +def _ShareAll(): + """Returns a dict declaring all lock levels shared. + + """ + return dict.fromkeys(locking.LEVELS, 1) + + +def _SupportsOob(cfg, node): + """Tells if node supports OOB. + + @type cfg: L{config.ConfigWriter} + @param cfg: The cluster configuration + @type node: L{objects.Node} + @param node: The node + @return: The OOB script if supported or an empty string otherwise + + """ + return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM] + + def _GetWantedNodes(lu, nodes): """Returns list of checked and expanded node names. @@ -676,6 +684,19 @@ def _ReleaseLocks(lu, level, names=None, keep=None): assert not lu.glm.is_owned(level), "No locks should be owned" +def _MapInstanceDisksToNodes(instances): + """Creates a map from (node, volume) to instance name. + + @type instances: list of L{objects.Instance} + @rtype: dict; tuple of (node name, volume name) as key, instance name as value + + """ + return dict(((node, vol), inst.name) + for inst in instances + for (node, vols) in inst.MapLVsByNode().items() + for vol in vols) + + def _RunPostHook(lu, node_name): """Runs the post-hook for an opcode on a single node. @@ -997,20 +1018,20 @@ def _BuildInstanceHookEnvByObject(lu, instance, override=None): bep = cluster.FillBE(instance) hvp = cluster.FillHV(instance) args = { - 'name': instance.name, - 'primary_node': instance.primary_node, - 'secondary_nodes': instance.secondary_nodes, - 'os_type': instance.os, - 'status': instance.admin_up, - 'memory': bep[constants.BE_MEMORY], - 'vcpus': bep[constants.BE_VCPUS], - 'nics': _NICListToTuple(lu, instance.nics), - 'disk_template': instance.disk_template, - 'disks': [(disk.size, disk.mode) for disk in instance.disks], - 'bep': bep, - 'hvp': hvp, - 'hypervisor_name': instance.hypervisor, - 'tags': instance.tags, + "name": instance.name, + "primary_node": instance.primary_node, + "secondary_nodes": instance.secondary_nodes, + "os_type": instance.os, + "status": instance.admin_up, + "memory": bep[constants.BE_MEMORY], + "vcpus": bep[constants.BE_VCPUS], + "nics": _NICListToTuple(lu, instance.nics), + "disk_template": instance.disk_template, + "disks": [(disk.size, disk.mode) for disk in instance.disks], + "bep": bep, + "hvp": hvp, + "hypervisor_name": instance.hypervisor, + "tags": instance.tags, } if override: args.update(override) @@ -1076,9 +1097,13 @@ def _CheckOSVariant(os_obj, name): @param name: OS name passed by the user, to check for validity """ + variant = objects.OS.GetVariant(name) if not os_obj.supported_variants: + if variant: + raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'" + " passed)" % (os_obj.name, variant), + errors.ECODE_INVAL) return - variant = objects.OS.GetVariant(name) if not variant: raise errors.OpPrereqError("OS name must include a variant", errors.ECODE_INVAL) @@ -1561,45 +1586,39 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): # This raises errors.OpPrereqError on its own: self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) - all_node_info = self.cfg.GetAllNodesInfo() - all_inst_info = self.cfg.GetAllInstancesInfo() - - node_names = set(node.name - for node in all_node_info.values() - if node.group == self.group_uuid) - - inst_names = [inst.name - for inst in all_inst_info.values() - if inst.primary_node in node_names] - - # In Exec(), we warn about mirrored instances that have primary and - # secondary living in separate node groups. To fully verify that - # volumes for these instances are healthy, we will need to do an - # extra call to their secondaries. We ensure here those nodes will - # be locked. - for inst in inst_names: - if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR: - node_names.update(all_inst_info[inst].secondary_nodes) + # Get instances in node group; this is unsafe and needs verification later + inst_names = self.cfg.GetNodeGroupInstances(self.group_uuid) self.needed_locks = { - locking.LEVEL_NODEGROUP: [self.group_uuid], - locking.LEVEL_NODE: list(node_names), locking.LEVEL_INSTANCE: inst_names, - } + locking.LEVEL_NODEGROUP: [self.group_uuid], + locking.LEVEL_NODE: [], + } - self.share_locks = dict.fromkeys(locking.LEVELS, 1) + self.share_locks = _ShareAll() - def CheckPrereq(self): - self.all_node_info = self.cfg.GetAllNodesInfo() - self.all_inst_info = self.cfg.GetAllInstancesInfo() + def DeclareLocks(self, level): + if level == locking.LEVEL_NODE: + # Get members of node group; this is unsafe and needs verification later + nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members) - group_nodes = set(node.name - for node in self.all_node_info.values() - if node.group == self.group_uuid) + all_inst_info = self.cfg.GetAllInstancesInfo() - group_instances = set(inst.name - for inst in self.all_inst_info.values() - if inst.primary_node in group_nodes) + # In Exec(), we warn about mirrored instances that have primary and + # secondary living in separate node groups. To fully verify that + # volumes for these instances are healthy, we will need to do an + # extra call to their secondaries. We ensure here those nodes will + # be locked. + for inst in self.glm.list_owned(locking.LEVEL_INSTANCE): + # Important: access only the instances whose lock is owned + if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR: + nodes.update(all_inst_info[inst].secondary_nodes) + + self.needed_locks[locking.LEVEL_NODE] = nodes + + def CheckPrereq(self): + group_nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members) + group_instances = self.cfg.GetNodeGroupInstances(self.group_uuid) unlocked_nodes = \ group_nodes.difference(self.glm.list_owned(locking.LEVEL_NODE)) @@ -1608,13 +1627,16 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): group_instances.difference(self.glm.list_owned(locking.LEVEL_INSTANCE)) if unlocked_nodes: - raise errors.OpPrereqError("missing lock for nodes: %s" % + raise errors.OpPrereqError("Missing lock for nodes: %s" % utils.CommaJoin(unlocked_nodes)) if unlocked_instances: - raise errors.OpPrereqError("missing lock for instances: %s" % + raise errors.OpPrereqError("Missing lock for instances: %s" % utils.CommaJoin(unlocked_instances)) + self.all_node_info = self.cfg.GetAllNodesInfo() + self.all_inst_info = self.cfg.GetAllInstancesInfo() + self.my_node_names = utils.NiceSort(group_nodes) self.my_inst_names = utils.NiceSort(group_instances) @@ -1966,7 +1988,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): @param all_nvinfo: RPC results """ - node_names = frozenset(node.name for node in nodeinfo) + node_names = frozenset(node.name for node in nodeinfo if not node.offline) assert master_node in node_names assert (len(files_all | files_all_opt | files_mc | files_vm) == @@ -1985,6 +2007,9 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): fileinfo = dict((filename, {}) for filename in file2nodefn.keys()) for node in nodeinfo: + if node.offline: + continue + nresult = all_nvinfo[node.name] if nresult.fail_msg or not nresult.payload: @@ -2019,8 +2044,8 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): # All or no nodes errorif(missing_file and missing_file != node_names, cls.ECLUSTERFILECHECK, None, - "File %s is optional, but it must exist on all or no nodes (not" - " found on %s)", + "File %s is optional, but it must exist on all or no" + " nodes (not found on %s)", filename, utils.CommaJoin(utils.NiceSort(missing_file))) else: errorif(missing_file, cls.ECLUSTERFILECHECK, None, @@ -2422,8 +2447,6 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): """Build hooks nodes. """ - assert self.my_node_names, ("Node list not gathered," - " has CheckPrereq been executed?") return ([], self.my_node_names) def Exec(self, feedback_fn): @@ -2431,6 +2454,12 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): """ # This method has too many local variables. pylint: disable-msg=R0914 + + if not self.my_node_names: + # empty node group + feedback_fn("* Empty node group, skipping verification") + return True + self.bad = False _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 verbose = self.op.verbose @@ -2567,6 +2596,8 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): all_nvinfo = self.rpc.call_node_verify(self.my_node_names, node_verify_param, self.cfg.GetClusterName()) + nvinfo_endtime = time.time() + if self.extra_lv_nodes and vg_name is not None: extra_lv_nvinfo = \ self.rpc.call_node_verify(self.extra_lv_nodes, @@ -2574,7 +2605,6 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): self.cfg.GetClusterName()) else: extra_lv_nvinfo = {} - nvinfo_endtime = time.time() all_drbd_map = self.cfg.ComputeDRBDMap() @@ -2810,9 +2840,12 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): and hook results """ - # We only really run POST phase hooks, and are only interested in - # their results - if phase == constants.HOOKS_PHASE_POST: + # We only really run POST phase hooks, only for non-empty groups, + # and are only interested in their results + if not self.my_node_names: + # empty node group + pass + elif phase == constants.HOOKS_PHASE_POST: # Used to change hooks' output to proper indentation feedback_fn("* Hooks Results") assert hooks_results, "invalid result from hooks" @@ -2834,11 +2867,11 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): self._ErrorIf(test, self.ENODEHOOKS, node_name, "Script %s failed, output:", script) if test: - output = self._HOOKS_INDENT_RE.sub(' ', output) + output = self._HOOKS_INDENT_RE.sub(" ", output) feedback_fn("%s" % output) lu_result = 0 - return lu_result + return lu_result class LUClusterVerifyDisks(NoHooksLU): @@ -2848,11 +2881,109 @@ class LUClusterVerifyDisks(NoHooksLU): REQ_BGL = False def ExpandNames(self): + self.share_locks = _ShareAll() self.needed_locks = { - locking.LEVEL_NODE: locking.ALL_SET, - locking.LEVEL_INSTANCE: locking.ALL_SET, - } - self.share_locks = dict.fromkeys(locking.LEVELS, 1) + locking.LEVEL_NODEGROUP: locking.ALL_SET, + } + + def Exec(self, feedback_fn): + group_names = self.glm.list_owned(locking.LEVEL_NODEGROUP) + + # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group + return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)] + for group in group_names]) + + +class LUGroupVerifyDisks(NoHooksLU): + """Verifies the status of all disks in a node group. + + """ + REQ_BGL = False + + def ExpandNames(self): + # Raises errors.OpPrereqError on its own if group can't be found + self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) + + self.share_locks = _ShareAll() + self.needed_locks = { + locking.LEVEL_INSTANCE: [], + locking.LEVEL_NODEGROUP: [], + locking.LEVEL_NODE: [], + } + + def DeclareLocks(self, level): + if level == locking.LEVEL_INSTANCE: + assert not self.needed_locks[locking.LEVEL_INSTANCE] + + # Lock instances optimistically, needs verification once node and group + # locks have been acquired + self.needed_locks[locking.LEVEL_INSTANCE] = \ + self.cfg.GetNodeGroupInstances(self.group_uuid) + + elif level == locking.LEVEL_NODEGROUP: + assert not self.needed_locks[locking.LEVEL_NODEGROUP] + + self.needed_locks[locking.LEVEL_NODEGROUP] = \ + set([self.group_uuid] + + # Lock all groups used by instances optimistically; this requires + # going via the node before it's locked, requiring verification + # later on + [group_uuid + for instance_name in + self.glm.list_owned(locking.LEVEL_INSTANCE) + for group_uuid in + self.cfg.GetInstanceNodeGroups(instance_name)]) + + elif level == locking.LEVEL_NODE: + # This will only lock the nodes in the group to be verified which contain + # actual instances + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + self._LockInstancesNodes() + + # Lock all nodes in group to be verified + assert self.group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP) + member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members + self.needed_locks[locking.LEVEL_NODE].extend(member_nodes) + + def CheckPrereq(self): + owned_instances = frozenset(self.glm.list_owned(locking.LEVEL_INSTANCE)) + owned_groups = frozenset(self.glm.list_owned(locking.LEVEL_NODEGROUP)) + owned_nodes = frozenset(self.glm.list_owned(locking.LEVEL_NODE)) + + assert self.group_uuid in owned_groups + + # Check if locked instances are still correct + wanted_instances = self.cfg.GetNodeGroupInstances(self.group_uuid) + if owned_instances != wanted_instances: + raise errors.OpPrereqError("Instances in node group %s changed since" + " locks were acquired, wanted %s, have %s;" + " retry the operation" % + (self.op.group_name, + utils.CommaJoin(wanted_instances), + utils.CommaJoin(owned_instances)), + errors.ECODE_STATE) + + # Get instance information + self.instances = dict((name, self.cfg.GetInstanceInfo(name)) + for name in owned_instances) + + # Check if node groups for locked instances are still correct + for (instance_name, inst) in self.instances.items(): + assert self.group_uuid in self.cfg.GetInstanceNodeGroups(instance_name), \ + "Instance %s has no node in group %s" % (instance_name, self.group_uuid) + assert owned_nodes.issuperset(inst.all_nodes), \ + "Instance %s's nodes changed while we kept the lock" % instance_name + + inst_groups = self.cfg.GetInstanceNodeGroups(instance_name) + if not owned_groups.issuperset(inst_groups): + raise errors.OpPrereqError("Instance %s's node groups changed since" + " locks were acquired, current groups are" + " are '%s', owning groups '%s'; retry the" + " operation" % + (instance_name, + utils.CommaJoin(inst_groups), + utils.CommaJoin(owned_groups)), + errors.ECODE_STATE) def Exec(self, feedback_fn): """Verify integrity of cluster disks. @@ -2863,50 +2994,41 @@ class LUClusterVerifyDisks(NoHooksLU): missing volumes """ - result = res_nodes, res_instances, res_missing = {}, [], {} + res_nodes = {} + res_instances = set() + res_missing = {} - nodes = utils.NiceSort(self.cfg.GetVmCapableNodeList()) - instances = self.cfg.GetAllInstancesInfo().values() + nv_dict = _MapInstanceDisksToNodes([inst + for inst in self.instances.values() + if inst.admin_up]) - nv_dict = {} - for inst in instances: - inst_lvs = {} - if not inst.admin_up: - continue - inst.MapLVsByNode(inst_lvs) - # transform { iname: {node: [vol,],},} to {(node, vol): iname} - for node, vol_list in inst_lvs.iteritems(): - for vol in vol_list: - nv_dict[(node, vol)] = inst - - if not nv_dict: - return result - - node_lvs = self.rpc.call_lv_list(nodes, []) - for node, node_res in node_lvs.items(): - if node_res.offline: - continue - msg = node_res.fail_msg - if msg: - logging.warning("Error enumerating LVs on node %s: %s", node, msg) - res_nodes[node] = msg - continue + if nv_dict: + nodes = utils.NiceSort(set(self.glm.list_owned(locking.LEVEL_NODE)) & + set(self.cfg.GetVmCapableNodeList())) - lvs = node_res.payload - for lv_name, (_, _, lv_online) in lvs.items(): - inst = nv_dict.pop((node, lv_name), None) - if (not lv_online and inst is not None - and inst.name not in res_instances): - res_instances.append(inst.name) + node_lvs = self.rpc.call_lv_list(nodes, []) - # any leftover items in nv_dict are missing LVs, let's arrange the - # data better - for key, inst in nv_dict.iteritems(): - if inst.name not in res_missing: - res_missing[inst.name] = [] - res_missing[inst.name].append(key) + for (node, node_res) in node_lvs.items(): + if node_res.offline: + continue - return result + msg = node_res.fail_msg + if msg: + logging.warning("Error enumerating LVs on node %s: %s", node, msg) + res_nodes[node] = msg + continue + + for lv_name, (_, _, lv_online) in node_res.payload.items(): + inst = nv_dict.pop((node, lv_name), None) + if not (lv_online or inst is None): + res_instances.add(inst) + + # any leftover items in nv_dict are missing LVs, let's arrange the data + # better + for key, inst in nv_dict.iteritems(): + res_missing.setdefault(inst, []).append(key) + + return (res_nodes, list(res_instances), res_missing) class LUClusterRepairDiskSizes(NoHooksLU): @@ -2929,7 +3051,7 @@ class LUClusterRepairDiskSizes(NoHooksLU): locking.LEVEL_NODE: locking.ALL_SET, locking.LEVEL_INSTANCE: locking.ALL_SET, } - self.share_locks = dict.fromkeys(locking.LEVELS, 1) + self.share_locks = _ShareAll() def DeclareLocks(self, level): if level == locking.LEVEL_NODE and self.wanted_names is not None: @@ -4247,10 +4369,8 @@ class LUNodeQueryvols(NoHooksLU): nodenames = self.glm.list_owned(locking.LEVEL_NODE) volumes = self.rpc.call_node_volumes(nodenames) - ilist = [self.cfg.GetInstanceInfo(iname) for iname - in self.cfg.GetInstanceList()] - - lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist]) + ilist = self.cfg.GetAllInstancesInfo() + vol2inst = _MapInstanceDisksToNodes(ilist.values()) output = [] for node in nodenames: @@ -4262,8 +4382,8 @@ class LUNodeQueryvols(NoHooksLU): self.LogWarning("Can't compute volume data on node %s: %s", node, msg) continue - node_vols = nresult.payload[:] - node_vols.sort(key=lambda vol: vol['dev']) + node_vols = sorted(nresult.payload, + key=operator.itemgetter("dev")) for vol in node_vols: node_output = [] @@ -4271,22 +4391,15 @@ class LUNodeQueryvols(NoHooksLU): if field == "node": val = node elif field == "phys": - val = vol['dev'] + val = vol["dev"] elif field == "vg": - val = vol['vg'] + val = vol["vg"] elif field == "name": - val = vol['name'] + val = vol["name"] elif field == "size": - val = int(float(vol['size'])) + val = int(float(vol["size"])) elif field == "instance": - for inst in ilist: - if node not in lv_by_node[inst]: - continue - if vol['name'] in lv_by_node[inst][node]: - val = inst.name - break - else: - val = '-' + val = vol2inst.get((node, vol["vg"] + "/" + vol["name"]), "-") else: raise errors.ParameterError(field) node_output.append(str(val)) @@ -5525,7 +5638,7 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name): nodeinfo = lu.rpc.call_node_info([node], None, hypervisor_name) nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True, ecode=errors.ECODE_ENVIRON) - free_mem = nodeinfo[node].payload.get('memory_free', None) + free_mem = nodeinfo[node].payload.get("memory_free", None) if not isinstance(free_mem, int): raise errors.OpPrereqError("Can't compute free memory on node %s, result" " was '%s'" % (node, free_mem), @@ -5700,7 +5813,8 @@ class LUInstanceStartup(LogicalUnit): _StartInstanceDisks(self, instance, force) result = self.rpc.call_instance_start(node_current, instance, - self.op.hvparams, self.op.beparams) + self.op.hvparams, self.op.beparams, + self.op.startup_paused) msg = result.fail_msg if msg: _ShutdownInstanceDisks(self, instance) @@ -5790,7 +5904,8 @@ class LUInstanceReboot(LogicalUnit): self.LogInfo("Instance %s was already stopped, starting now", instance.name) _StartInstanceDisks(self, instance, ignore_secondaries) - result = self.rpc.call_instance_start(node_current, instance, None, None) + result = self.rpc.call_instance_start(node_current, instance, + None, None, False) msg = result.fail_msg if msg: _ShutdownInstanceDisks(self, instance) @@ -6055,31 +6170,44 @@ class LUInstanceRecreateDisks(LogicalUnit): """Recreate the disks. """ - # change primary node, if needed - if self.op.nodes: - self.instance.primary_node = self.op.nodes[0] - self.LogWarning("Changing the instance's nodes, you will have to" - " remove any disks left on the older nodes manually") + instance = self.instance to_skip = [] - for idx, disk in enumerate(self.instance.disks): + mods = [] # keeps track of needed logical_id changes + + for idx, disk in enumerate(instance.disks): if idx not in self.op.disks: # disk idx has not been passed in to_skip.append(idx) continue # update secondaries for disks, if needed if self.op.nodes: if disk.dev_type == constants.LD_DRBD8: - # need to update the nodes + # need to update the nodes and minors assert len(self.op.nodes) == 2 - logical_id = list(disk.logical_id) - logical_id[0] = self.op.nodes[0] - logical_id[1] = self.op.nodes[1] - disk.logical_id = tuple(logical_id) + assert len(disk.logical_id) == 6 # otherwise disk internals + # have changed + (_, _, old_port, _, _, old_secret) = disk.logical_id + new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name) + new_id = (self.op.nodes[0], self.op.nodes[1], old_port, + new_minors[0], new_minors[1], old_secret) + assert len(disk.logical_id) == len(new_id) + mods.append((idx, new_id)) + + # now that we have passed all asserts above, we can apply the mods + # in a single run (to avoid partial changes) + for idx, new_id in mods: + instance.disks[idx].logical_id = new_id + + # change primary node, if needed + if self.op.nodes: + instance.primary_node = self.op.nodes[0] + self.LogWarning("Changing the instance's nodes, you will have to" + " remove any disks left on the older nodes manually") if self.op.nodes: - self.cfg.Update(self.instance, feedback_fn) + self.cfg.Update(instance, feedback_fn) - _CreateDisks(self, self.instance, to_skip=to_skip) + _CreateDisks(self, instance, to_skip=to_skip) class LUInstanceRename(LogicalUnit): @@ -6160,7 +6288,7 @@ class LUInstanceRename(LogicalUnit): old_name = inst.name rename_file_storage = False - if (inst.disk_template in (constants.DT_FILE, constants.DT_SHARED_FILE) and + if (inst.disk_template in constants.DTS_FILEBASED and self.op.new_name != inst.name): old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1]) rename_file_storage = True @@ -6641,7 +6769,8 @@ class LUInstanceMove(LogicalUnit): _ShutdownInstanceDisks(self, instance) raise errors.OpExecError("Can't activate the instance's disks") - result = self.rpc.call_instance_start(target_node, instance, None, None) + result = self.rpc.call_instance_start(target_node, instance, + None, None, False) msg = result.fail_msg if msg: _ShutdownInstanceDisks(self, instance) @@ -6663,7 +6792,7 @@ class LUNodeMigrate(LogicalUnit): def ExpandNames(self): self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) - self.share_locks = dict.fromkeys(locking.LEVELS, 1) + self.share_locks = _ShareAll() self.needed_locks = { locking.LEVEL_NODE: [self.op.node_name], } @@ -7182,8 +7311,12 @@ class TLMigrateInstance(Tasklet): self.feedback_fn("* checking disk consistency between source and target") for dev in instance.disks: # for drbd, these are drbd over lvm - if not _CheckDiskConsistency(self, dev, target_node, False): - if not self.ignore_consistency: + if not _CheckDiskConsistency(self.lu, dev, target_node, False): + if primary_node.offline: + self.feedback_fn("Node %s is offline, ignoring degraded disk %s on" + " target node %s" % + (primary_node.name, dev.iv_name, target_node)) + elif not self.ignore_consistency: raise errors.OpExecError("Disk %s is degraded on target node," " aborting failover" % dev.iv_name) else: @@ -7209,8 +7342,8 @@ class TLMigrateInstance(Tasklet): (instance.name, source_node, msg)) self.feedback_fn("* deactivating the instance's disks on source node") - if not _ShutdownInstanceDisks(self, instance, ignore_primary=True): - raise errors.OpExecError("Can't shut down the instance's disks.") + if not _ShutdownInstanceDisks(self.lu, instance, ignore_primary=True): + raise errors.OpExecError("Can't shut down the instance's disks") instance.primary_node = target_node # distribute new instance config to the other nodes @@ -7218,21 +7351,24 @@ class TLMigrateInstance(Tasklet): # Only start the instance if it's marked as up if instance.admin_up: - self.feedback_fn("* activating the instance's disks on target node") + self.feedback_fn("* activating the instance's disks on target node %s" % + target_node) logging.info("Starting instance %s on node %s", instance.name, target_node) - disks_ok, _ = _AssembleInstanceDisks(self, instance, + disks_ok, _ = _AssembleInstanceDisks(self.lu, instance, ignore_secondaries=True) if not disks_ok: - _ShutdownInstanceDisks(self, instance) + _ShutdownInstanceDisks(self.lu, instance) raise errors.OpExecError("Can't activate the instance's disks") - self.feedback_fn("* starting the instance on the target node") - result = self.rpc.call_instance_start(target_node, instance, None, None) + self.feedback_fn("* starting the instance on the target node %s" % + target_node) + result = self.rpc.call_instance_start(target_node, instance, None, None, + False) msg = result.fail_msg if msg: - _ShutdownInstanceDisks(self, instance) + _ShutdownInstanceDisks(self.lu, instance) raise errors.OpExecError("Could not start instance %s on node %s: %s" % (instance.name, target_node, msg)) @@ -7593,7 +7729,7 @@ def _CreateDisks(lu, instance, to_skip=None, target_node=None): pnode = target_node all_nodes = [pnode] - if instance.disk_template in (constants.DT_FILE, constants.DT_SHARED_FILE): + if instance.disk_template in constants.DTS_FILEBASED: file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1]) result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir) @@ -7866,9 +8002,10 @@ class LUInstanceCreate(LogicalUnit): raise errors.OpPrereqError("Invalid file driver name '%s'" % self.op.file_driver, errors.ECODE_INVAL) - if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir): - raise errors.OpPrereqError("File storage directory path not absolute", - errors.ECODE_INVAL) + if self.op.disk_template == constants.DT_FILE: + opcodes.RequireFileStorage() + elif self.op.disk_template == constants.DT_SHARED_FILE: + opcodes.RequireSharedFileStorage() ### Node/iallocator related checks _CheckIAllocatorOrNode(self, "iallocator", "pnode") @@ -8227,10 +8364,40 @@ class LUInstanceCreate(LogicalUnit): if name in os_defs and os_defs[name] == self.op.osparams[name]: del self.op.osparams[name] + def _CalculateFileStorageDir(self): + """Calculate final instance file storage dir. + + """ + # file storage dir calculation/check + self.instance_file_storage_dir = None + if self.op.disk_template in constants.DTS_FILEBASED: + # build the full file storage dir path + joinargs = [] + + if self.op.disk_template == constants.DT_SHARED_FILE: + get_fsd_fn = self.cfg.GetSharedFileStorageDir + else: + get_fsd_fn = self.cfg.GetFileStorageDir + + cfg_storagedir = get_fsd_fn() + if not cfg_storagedir: + raise errors.OpPrereqError("Cluster file storage dir not defined") + joinargs.append(cfg_storagedir) + + if self.op.file_storage_dir is not None: + joinargs.append(self.op.file_storage_dir) + + joinargs.append(self.op.instance_name) + + # pylint: disable-msg=W0142 + self.instance_file_storage_dir = utils.PathJoin(*joinargs) + def CheckPrereq(self): """Check prerequisites. """ + self._CalculateFileStorageDir() + if self.op.mode == constants.INSTANCE_IMPORT: export_info = self._ReadExportInfo() self._ReadExportParams(export_info) @@ -8377,7 +8544,7 @@ class LUInstanceCreate(LogicalUnit): disk_images = [] for idx in range(export_disks): - option = 'disk%d_dump' % idx + option = "disk%d_dump" % idx if export_info.has_option(constants.INISECT_INS, option): # FIXME: are the old os-es, disk sizes, etc. useful? export_name = export_info.get(constants.INISECT_INS, option) @@ -8388,9 +8555,9 @@ class LUInstanceCreate(LogicalUnit): self.src_images = disk_images - old_name = export_info.get(constants.INISECT_INS, 'name') + old_name = export_info.get(constants.INISECT_INS, "name") try: - exp_nic_count = export_info.getint(constants.INISECT_INS, 'nic_count') + exp_nic_count = export_info.getint(constants.INISECT_INS, "nic_count") except (TypeError, ValueError), err: raise errors.OpPrereqError("Invalid export file, nic_count is not" " an integer: %s" % str(err), @@ -8398,7 +8565,7 @@ class LUInstanceCreate(LogicalUnit): if self.op.instance_name == old_name: for idx, nic in enumerate(self.nics): if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx: - nic_mac_ini = 'nic%d_mac' % idx + nic_mac_ini = "nic%d_mac" % idx nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini) # ENDIF: self.op.mode == constants.INSTANCE_IMPORT @@ -8562,30 +8729,12 @@ class LUInstanceCreate(LogicalUnit): else: network_port = None - if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE: - # this is needed because os.path.join does not accept None arguments - if self.op.file_storage_dir is None: - string_file_storage_dir = "" - else: - string_file_storage_dir = self.op.file_storage_dir - - # build the full file storage dir path - if self.op.disk_template == constants.DT_SHARED_FILE: - get_fsd_fn = self.cfg.GetSharedFileStorageDir - else: - get_fsd_fn = self.cfg.GetFileStorageDir - - file_storage_dir = utils.PathJoin(get_fsd_fn(), - string_file_storage_dir, instance) - else: - file_storage_dir = "" - disks = _GenerateDiskTemplate(self, self.op.disk_template, instance, pnode_name, self.secondaries, self.disks, - file_storage_dir, + self.instance_file_storage_dir, self.op.file_driver, 0, feedback_fn) @@ -8753,7 +8902,8 @@ class LUInstanceCreate(LogicalUnit): self.cfg.Update(iobj, feedback_fn) logging.info("Starting instance %s on node %s", instance, pnode_name) feedback_fn("* starting instance...") - result = self.rpc.call_instance_start(pnode_name, iobj, None, None) + result = self.rpc.call_instance_start(pnode_name, iobj, + None, None, False) result.Raise("Could not start instance") return list(iobj.all_nodes) @@ -9319,6 +9469,12 @@ class TLReplaceDisks(Tasklet): (node_name, self.instance.name)) def _CreateNewStorage(self, node_name): + """Create new storage on the primary or secondary node. + + This is only used for same-node replaces, not for changing the + secondary node, hence we don't want to modify the existing disk. + + """ iv_names = {} for idx, dev in enumerate(self.instance.disks): @@ -9340,7 +9496,7 @@ class TLReplaceDisks(Tasklet): logical_id=(vg_meta, names[1])) new_lvs = [lv_data, lv_meta] - old_lvs = dev.children + old_lvs = [child.Copy() for child in dev.children] iv_names[dev.iv_name] = (dev, old_lvs, new_lvs) # we pass force_create=True to force the LVM creation @@ -9378,7 +9534,7 @@ class TLReplaceDisks(Tasklet): self.lu.LogWarning("Can't remove old LV: %s" % msg, hint="remove unused LVs manually") - def _ExecDrbd8DiskOnly(self, feedback_fn): + def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable-msg=W0613 """Replace a disk on the primary or secondary for DRBD 8. The algorithm for replace is quite complicated: @@ -9461,10 +9617,14 @@ class TLReplaceDisks(Tasklet): rename_new_to_old) result.Raise("Can't rename new LVs on node %s" % self.target_node) + # Intermediate steps of in memory modifications for old, new in zip(old_lvs, new_lvs): new.logical_id = old.logical_id self.cfg.SetDiskID(new, self.target_node) + # We need to modify old_lvs so that removal later removes the + # right LVs, not the newly added ones; note that old_lvs is a + # copy here for disk in old_lvs: disk.logical_id = ren_fn(disk, temp_suffix) self.cfg.SetDiskID(disk, self.target_node) @@ -9484,10 +9644,6 @@ class TLReplaceDisks(Tasklet): "volumes")) raise errors.OpExecError("Can't add local storage to drbd: %s" % msg) - dev.children = new_lvs - - self.cfg.Update(self.instance, feedback_fn) - cstep = 5 if self.early_release: self.lu.LogStep(cstep, steps_total, "Removing old storage") @@ -9736,8 +9892,8 @@ class LURepairNodeStorage(NoHooksLU): (self.op.name, self.op.node_name)) -class LUNodeEvacStrategy(NoHooksLU): - """Computes the node evacuation strategy. +class LUNodeEvacuate(NoHooksLU): + """Evacuates instances off a list of nodes. """ REQ_BGL = False @@ -9746,41 +9902,213 @@ class LUNodeEvacStrategy(NoHooksLU): _CheckIAllocatorOrNode(self, "iallocator", "remote_node") def ExpandNames(self): - self.op.nodes = _GetWantedNodes(self, self.op.nodes) - self.needed_locks = locks = {} + self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) + + if self.op.remote_node is not None: + self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node) + assert self.op.remote_node + + if self.op.remote_node == self.op.node_name: + raise errors.OpPrereqError("Can not use evacuated node as a new" + " secondary node", errors.ECODE_INVAL) + + if self.op.mode != constants.IALLOCATOR_NEVAC_SEC: + raise errors.OpPrereqError("Without the use of an iallocator only" + " secondary instances can be evacuated", + errors.ECODE_INVAL) + + # Declare locks + self.share_locks = _ShareAll() + self.needed_locks = { + locking.LEVEL_INSTANCE: [], + locking.LEVEL_NODEGROUP: [], + locking.LEVEL_NODE: [], + } + if self.op.remote_node is None: - locks[locking.LEVEL_NODE] = locking.ALL_SET + # Iallocator will choose any node(s) in the same group + group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_name]) else: - self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node) - locks[locking.LEVEL_NODE] = self.op.nodes + [self.op.remote_node] + group_nodes = frozenset([self.op.remote_node]) - def Exec(self, feedback_fn): - instances = [] - for node in self.op.nodes: - instances.extend(_GetNodeSecondaryInstances(self.cfg, node)) - if not instances: - return [] + # Determine nodes to be locked + self.lock_nodes = set([self.op.node_name]) | group_nodes + + def _DetermineInstances(self): + """Builds list of instances to operate on. + + """ + assert self.op.mode in constants.IALLOCATOR_NEVAC_MODES + + if self.op.mode == constants.IALLOCATOR_NEVAC_PRI: + # Primary instances only + inst_fn = _GetNodePrimaryInstances + assert self.op.remote_node is None, \ + "Evacuating primary instances requires iallocator" + elif self.op.mode == constants.IALLOCATOR_NEVAC_SEC: + # Secondary instances only + inst_fn = _GetNodeSecondaryInstances + else: + # All instances + assert self.op.mode == constants.IALLOCATOR_NEVAC_ALL + inst_fn = _GetNodeInstances + + return inst_fn(self.cfg, self.op.node_name) + + def DeclareLocks(self, level): + if level == locking.LEVEL_INSTANCE: + # Lock instances optimistically, needs verification once node and group + # locks have been acquired + self.needed_locks[locking.LEVEL_INSTANCE] = \ + set(i.name for i in self._DetermineInstances()) + + elif level == locking.LEVEL_NODEGROUP: + # Lock node groups optimistically, needs verification once nodes have + # been acquired + self.needed_locks[locking.LEVEL_NODEGROUP] = \ + self.cfg.GetNodeGroupsFromNodes(self.lock_nodes) + + elif level == locking.LEVEL_NODE: + self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes + + def CheckPrereq(self): + # Verify locks + owned_instances = self.glm.list_owned(locking.LEVEL_INSTANCE) + owned_nodes = self.glm.list_owned(locking.LEVEL_NODE) + owned_groups = self.glm.list_owned(locking.LEVEL_NODEGROUP) + + assert owned_nodes == self.lock_nodes + + wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes) + if owned_groups != wanted_groups: + raise errors.OpExecError("Node groups changed since locks were acquired," + " current groups are '%s', used to be '%s'" % + (utils.CommaJoin(wanted_groups), + utils.CommaJoin(owned_groups))) + + # Determine affected instances + self.instances = self._DetermineInstances() + self.instance_names = [i.name for i in self.instances] + + if set(self.instance_names) != owned_instances: + raise errors.OpExecError("Instances on node '%s' changed since locks" + " were acquired, current instances are '%s'," + " used to be '%s'" % + (self.op.node_name, + utils.CommaJoin(self.instance_names), + utils.CommaJoin(owned_instances))) + + if self.instance_names: + self.LogInfo("Evacuating instances from node '%s': %s", + self.op.node_name, + utils.CommaJoin(utils.NiceSort(self.instance_names))) + else: + self.LogInfo("No instances to evacuate from node '%s'", + self.op.node_name) if self.op.remote_node is not None: - result = [] - for i in instances: + for i in self.instances: if i.primary_node == self.op.remote_node: raise errors.OpPrereqError("Node %s is the primary node of" " instance %s, cannot use it as" " secondary" % (self.op.remote_node, i.name), errors.ECODE_INVAL) - result.append([i.name, self.op.remote_node]) - else: - ial = IAllocator(self.cfg, self.rpc, - mode=constants.IALLOCATOR_MODE_MEVAC, - evac_nodes=self.op.nodes) - ial.Run(self.op.iallocator, validate=True) + + def Exec(self, feedback_fn): + assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None) + + if not self.instance_names: + # No instances to evacuate + jobs = [] + + elif self.op.iallocator is not None: + # TODO: Implement relocation to other group + ial = IAllocator(self.cfg, self.rpc, constants.IALLOCATOR_MODE_NODE_EVAC, + evac_mode=self.op.mode, + instances=list(self.instance_names)) + + ial.Run(self.op.iallocator) + if not ial.success: - raise errors.OpExecError("No valid evacuation solution: %s" % ial.info, - errors.ECODE_NORES) - result = ial.result - return result + raise errors.OpPrereqError("Can't compute node evacuation using" + " iallocator '%s': %s" % + (self.op.iallocator, ial.info), + errors.ECODE_NORES) + + jobs = _LoadNodeEvacResult(self, ial.result, self.op.early_release, True) + + elif self.op.remote_node is not None: + assert self.op.mode == constants.IALLOCATOR_NEVAC_SEC + jobs = [ + [opcodes.OpInstanceReplaceDisks(instance_name=instance_name, + remote_node=self.op.remote_node, + disks=[], + mode=constants.REPLACE_DISK_CHG, + early_release=self.op.early_release)] + for instance_name in self.instance_names + ] + + else: + raise errors.ProgrammerError("No iallocator or remote node") + + return ResultWithJobs(jobs) + + +def _SetOpEarlyRelease(early_release, op): + """Sets C{early_release} flag on opcodes if available. + + """ + try: + op.early_release = early_release + except AttributeError: + assert not isinstance(op, opcodes.OpInstanceReplaceDisks) + + return op + + +def _NodeEvacDest(use_nodes, group, nodes): + """Returns group or nodes depending on caller's choice. + + """ + if use_nodes: + return utils.CommaJoin(nodes) + else: + return group + + +def _LoadNodeEvacResult(lu, alloc_result, early_release, use_nodes): + """Unpacks the result of change-group and node-evacuate iallocator requests. + + Iallocator modes L{constants.IALLOCATOR_MODE_NODE_EVAC} and + L{constants.IALLOCATOR_MODE_CHG_GROUP}. + + @type lu: L{LogicalUnit} + @param lu: Logical unit instance + @type alloc_result: tuple/list + @param alloc_result: Result from iallocator + @type early_release: bool + @param early_release: Whether to release locks early if possible + @type use_nodes: bool + @param use_nodes: Whether to display node names instead of groups + + """ + (moved, failed, jobs) = alloc_result + + if failed: + lu.LogWarning("Unable to evacuate instances %s", + utils.CommaJoin("%s (%s)" % (name, reason) + for (name, reason) in failed)) + + if moved: + lu.LogInfo("Instances to be moved: %s", + utils.CommaJoin("%s (to %s)" % + (name, _NodeEvacDest(use_nodes, group, nodes)) + for (name, group, nodes) in moved)) + + return [map(compat.partial(_SetOpEarlyRelease, early_release), + map(opcodes.OpCode.LoadOpCode, ops)) + for ops in jobs] class LUInstanceGrowDisk(LogicalUnit): @@ -9916,7 +10244,7 @@ class LUInstanceQueryData(NoHooksLU): self.wanted_names = None if self.op.use_locking: - self.share_locks = dict.fromkeys(locking.LEVELS, 1) + self.share_locks = _ShareAll() if self.wanted_names is None: self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET @@ -9924,7 +10252,6 @@ class LUInstanceQueryData(NoHooksLU): self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names self.needed_locks[locking.LEVEL_NODE] = [] - self.share_locks = dict.fromkeys(locking.LEVELS, 1) self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE def DeclareLocks(self, level): @@ -9983,8 +10310,9 @@ class LUInstanceQueryData(NoHooksLU): dev_sstatus = self._ComputeBlockdevStatus(snode, instance.name, dev) if dev.children: - dev_children = [self._ComputeDiskStatus(instance, snode, child) - for child in dev.children] + dev_children = map(compat.partial(self._ComputeDiskStatus, + instance, snode), + dev.children) else: dev_children = [] @@ -10007,7 +10335,15 @@ class LUInstanceQueryData(NoHooksLU): cluster = self.cfg.GetClusterInfo() for instance in self.wanted_instances: - if not self.op.static: + pnode = self.cfg.GetNodeInfo(instance.primary_node) + + if self.op.static or pnode.offline: + remote_state = None + if pnode.offline: + self.LogWarning("Primary node %s is marked offline, returning static" + " information only for instance %s" % + (pnode.name, instance.name)) + else: remote_info = self.rpc.call_instance_info(instance.primary_node, instance.name, instance.hypervisor) @@ -10017,15 +10353,14 @@ class LUInstanceQueryData(NoHooksLU): remote_state = "up" else: remote_state = "down" - else: - remote_state = None + if instance.admin_up: config_state = "up" else: config_state = "down" - disks = [self._ComputeDiskStatus(instance, None, device) - for device in instance.disks] + disks = map(compat.partial(self._ComputeDiskStatus, instance, None), + instance.disks) result[instance.name] = { "name": instance.name, @@ -10150,13 +10485,13 @@ class LUInstanceSetParams(LogicalUnit): raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip, errors.ECODE_INVAL) - nic_bridge = nic_dict.get('bridge', None) + nic_bridge = nic_dict.get("bridge", None) nic_link = nic_dict.get(constants.INIC_LINK, None) if nic_bridge and nic_link: raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'" " at the same time", errors.ECODE_INVAL) elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE: - nic_dict['bridge'] = None + nic_dict["bridge"] = None elif nic_link and nic_link.lower() == constants.VALUE_NONE: nic_dict[constants.INIC_LINK] = None @@ -10199,13 +10534,13 @@ class LUInstanceSetParams(LogicalUnit): """ args = dict() if constants.BE_MEMORY in self.be_new: - args['memory'] = self.be_new[constants.BE_MEMORY] + args["memory"] = self.be_new[constants.BE_MEMORY] if constants.BE_VCPUS in self.be_new: - args['vcpus'] = self.be_new[constants.BE_VCPUS] + args["vcpus"] = self.be_new[constants.BE_VCPUS] # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk # information at all. if self.op.nics: - args['nics'] = [] + args["nics"] = [] nic_override = dict(self.op.nics) for idx, nic in enumerate(self.instance.nics): if idx in nic_override: @@ -10226,16 +10561,16 @@ class LUInstanceSetParams(LogicalUnit): nicparams = self.cluster.SimpleFillNIC(nic.nicparams) mode = nicparams[constants.NIC_MODE] link = nicparams[constants.NIC_LINK] - args['nics'].append((ip, mac, mode, link)) + args["nics"].append((ip, mac, mode, link)) if constants.DDM_ADD in nic_override: ip = nic_override[constants.DDM_ADD].get(constants.INIC_IP, None) mac = nic_override[constants.DDM_ADD][constants.INIC_MAC] nicparams = self.nic_pnew[constants.DDM_ADD] mode = nicparams[constants.NIC_MODE] link = nicparams[constants.NIC_LINK] - args['nics'].append((ip, mac, mode, link)) + args["nics"].append((ip, mac, mode, link)) elif constants.DDM_REMOVE in nic_override: - del args['nics'][-1] + del args["nics"][-1] env = _BuildInstanceHookEnvByObject(self, self.instance, override=args) if self.op.disk_template: @@ -10353,7 +10688,7 @@ class LUInstanceSetParams(LogicalUnit): # Assume the primary node is unreachable and go ahead self.warn.append("Can't get info from primary node %s: %s" % (pnode, msg)) - elif not isinstance(pninfo.payload.get('memory_free', None), int): + elif not isinstance(pninfo.payload.get("memory_free", None), int): self.warn.append("Node data from primary node %s doesn't contain" " free memory information" % pnode) elif instance_info.fail_msg: @@ -10361,14 +10696,14 @@ class LUInstanceSetParams(LogicalUnit): instance_info.fail_msg) else: if instance_info.payload: - current_mem = int(instance_info.payload['memory']) + current_mem = int(instance_info.payload["memory"]) else: # Assume instance not running # (there is a slight race condition here, but it's not very probable, # and we have no other way to check) current_mem = 0 miss_mem = (be_new[constants.BE_MEMORY] - current_mem - - pninfo.payload['memory_free']) + pninfo.payload["memory_free"]) if miss_mem > 0: raise errors.OpPrereqError("This change will prevent the instance" " from starting, due to %d MB of memory" @@ -10381,11 +10716,11 @@ class LUInstanceSetParams(LogicalUnit): continue nres.Raise("Can't get info from secondary node %s" % node, prereq=True, ecode=errors.ECODE_STATE) - if not isinstance(nres.payload.get('memory_free', None), int): + if not isinstance(nres.payload.get("memory_free", None), int): raise errors.OpPrereqError("Secondary node %s didn't return free" " memory information" % node, errors.ECODE_STATE) - elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']: + elif be_new[constants.BE_MEMORY] > nres.payload["memory_free"]: raise errors.OpPrereqError("This change will prevent the instance" " from failover to its secondary node" " %s, due to not enough memory" % node, @@ -10421,8 +10756,8 @@ class LUInstanceSetParams(LogicalUnit): for key in constants.NICS_PARAMETERS if key in nic_dict]) - if 'bridge' in nic_dict: - update_params_dict[constants.NIC_LINK] = nic_dict['bridge'] + if "bridge" in nic_dict: + update_params_dict[constants.NIC_LINK] = nic_dict["bridge"] new_nic_params = _GetUpdatedParams(old_nic_params, update_params_dict) @@ -10448,12 +10783,12 @@ class LUInstanceSetParams(LogicalUnit): else: nic_ip = old_nic_ip if nic_ip is None: - raise errors.OpPrereqError('Cannot set the nic ip to None' - ' on a routed nic', errors.ECODE_INVAL) + raise errors.OpPrereqError("Cannot set the nic ip to None" + " on a routed nic", errors.ECODE_INVAL) if constants.INIC_MAC in nic_dict: nic_mac = nic_dict[constants.INIC_MAC] if nic_mac is None: - raise errors.OpPrereqError('Cannot set the nic mac to None', + raise errors.OpPrereqError("Cannot set the nic mac to None", errors.ECODE_INVAL) elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE): # otherwise generate the mac @@ -11047,7 +11382,8 @@ class LUBackupExport(LogicalUnit): not self.op.remove_instance): assert not activate_disks feedback_fn("Starting instance %s" % instance.name) - result = self.rpc.call_instance_start(src_node, instance, None, None) + result = self.rpc.call_instance_start(src_node, instance, + None, None, False) msg = result.fail_msg if msg: feedback_fn("Failed to start instance: %s" % msg) @@ -11661,6 +11997,181 @@ class LUGroupRename(LogicalUnit): return self.op.new_name +class LUGroupEvacuate(LogicalUnit): + HPATH = "group-evacuate" + HTYPE = constants.HTYPE_GROUP + REQ_BGL = False + + def ExpandNames(self): + # This raises errors.OpPrereqError on its own: + self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name) + + if self.op.target_groups: + self.req_target_uuids = map(self.cfg.LookupNodeGroup, + self.op.target_groups) + else: + self.req_target_uuids = [] + + if self.group_uuid in self.req_target_uuids: + raise errors.OpPrereqError("Group to be evacuated (%s) can not be used" + " as a target group (targets are %s)" % + (self.group_uuid, + utils.CommaJoin(self.req_target_uuids)), + errors.ECODE_INVAL) + + if not self.op.iallocator: + # Use default iallocator + self.op.iallocator = self.cfg.GetDefaultIAllocator() + + if not self.op.iallocator: + raise errors.OpPrereqError("No iallocator was specified, neither in the" + " opcode nor as a cluster-wide default", + errors.ECODE_INVAL) + + self.share_locks = _ShareAll() + self.needed_locks = { + locking.LEVEL_INSTANCE: [], + locking.LEVEL_NODEGROUP: [], + locking.LEVEL_NODE: [], + } + + def DeclareLocks(self, level): + if level == locking.LEVEL_INSTANCE: + assert not self.needed_locks[locking.LEVEL_INSTANCE] + + # Lock instances optimistically, needs verification once node and group + # locks have been acquired + self.needed_locks[locking.LEVEL_INSTANCE] = \ + self.cfg.GetNodeGroupInstances(self.group_uuid) + + elif level == locking.LEVEL_NODEGROUP: + assert not self.needed_locks[locking.LEVEL_NODEGROUP] + + if self.req_target_uuids: + lock_groups = set([self.group_uuid] + self.req_target_uuids) + + # Lock all groups used by instances optimistically; this requires going + # via the node before it's locked, requiring verification later on + lock_groups.update(group_uuid + for instance_name in + self.glm.list_owned(locking.LEVEL_INSTANCE) + for group_uuid in + self.cfg.GetInstanceNodeGroups(instance_name)) + else: + # No target groups, need to lock all of them + lock_groups = locking.ALL_SET + + self.needed_locks[locking.LEVEL_NODEGROUP] = lock_groups + + elif level == locking.LEVEL_NODE: + # This will only lock the nodes in the group to be evacuated which + # contain actual instances + self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND + self._LockInstancesNodes() + + # Lock all nodes in group to be evacuated + assert self.group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP) + member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members + self.needed_locks[locking.LEVEL_NODE].extend(member_nodes) + + def CheckPrereq(self): + owned_instances = frozenset(self.glm.list_owned(locking.LEVEL_INSTANCE)) + owned_groups = frozenset(self.glm.list_owned(locking.LEVEL_NODEGROUP)) + owned_nodes = frozenset(self.glm.list_owned(locking.LEVEL_NODE)) + + assert owned_groups.issuperset(self.req_target_uuids) + assert self.group_uuid in owned_groups + + # Check if locked instances are still correct + wanted_instances = self.cfg.GetNodeGroupInstances(self.group_uuid) + if owned_instances != wanted_instances: + raise errors.OpPrereqError("Instances in node group to be evacuated (%s)" + " changed since locks were acquired, wanted" + " %s, have %s; retry the operation" % + (self.group_uuid, + utils.CommaJoin(wanted_instances), + utils.CommaJoin(owned_instances)), + errors.ECODE_STATE) + + # Get instance information + self.instances = dict((name, self.cfg.GetInstanceInfo(name)) + for name in owned_instances) + + # Check if node groups for locked instances are still correct + for instance_name in owned_instances: + inst = self.instances[instance_name] + assert self.group_uuid in self.cfg.GetInstanceNodeGroups(instance_name), \ + "Instance %s has no node in group %s" % (instance_name, self.group_uuid) + assert owned_nodes.issuperset(inst.all_nodes), \ + "Instance %s's nodes changed while we kept the lock" % instance_name + + inst_groups = self.cfg.GetInstanceNodeGroups(instance_name) + if not owned_groups.issuperset(inst_groups): + raise errors.OpPrereqError("Instance %s's node groups changed since" + " locks were acquired, current groups" + " are '%s', owning groups '%s'; retry the" + " operation" % + (instance_name, + utils.CommaJoin(inst_groups), + utils.CommaJoin(owned_groups)), + errors.ECODE_STATE) + + if self.req_target_uuids: + # User requested specific target groups + self.target_uuids = self.req_target_uuids + else: + # All groups except the one to be evacuated are potential targets + self.target_uuids = [group_uuid for group_uuid in owned_groups + if group_uuid != self.group_uuid] + + if not self.target_uuids: + raise errors.OpExecError("There are no possible target groups") + + def BuildHooksEnv(self): + """Build hooks env. + + """ + return { + "GROUP_NAME": self.op.group_name, + "TARGET_GROUPS": " ".join(self.target_uuids), + } + + def BuildHooksNodes(self): + """Build hooks nodes. + + """ + mn = self.cfg.GetMasterNode() + + assert self.group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP) + + run_nodes = [mn] + self.cfg.GetNodeGroup(self.group_uuid).members + + return (run_nodes, run_nodes) + + def Exec(self, feedback_fn): + instances = list(self.glm.list_owned(locking.LEVEL_INSTANCE)) + + assert self.group_uuid not in self.target_uuids + + ial = IAllocator(self.cfg, self.rpc, constants.IALLOCATOR_MODE_CHG_GROUP, + instances=instances, target_groups=self.target_uuids) + + ial.Run(self.op.iallocator) + + if not ial.success: + raise errors.OpPrereqError("Can't compute group evacuation using" + " iallocator '%s': %s" % + (self.op.iallocator, ial.info), + errors.ECODE_NORES) + + jobs = _LoadNodeEvacResult(self, ial.result, self.op.early_release, False) + + self.LogInfo("Iallocator returned %s job(s) for evacuating node group %s", + len(jobs), self.op.group_name) + + return ResultWithJobs(jobs) + + class TagsLU(NoHooksLU): # pylint: disable-msg=W0223 """Generic tags LU. @@ -11709,7 +12220,7 @@ class LUTagsGet(TagsLU): TagsLU.ExpandNames(self) # Share locks as this is only a read operation - self.share_locks = dict.fromkeys(locking.LEVELS, 1) + self.share_locks = _ShareAll() def Exec(self, feedback_fn): """Returns the tag list. @@ -12035,7 +12546,7 @@ class IAllocator(object): self.name = None self.evac_nodes = None self.instances = None - self.reloc_mode = None + self.evac_mode = None self.target_groups = [] # computed fields self.required_nodes = None @@ -12089,8 +12600,7 @@ class IAllocator(object): hypervisor_name = self.hypervisor elif self.mode == constants.IALLOCATOR_MODE_RELOC: hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor - elif self.mode in (constants.IALLOCATOR_MODE_MEVAC, - constants.IALLOCATOR_MODE_MRELOC): + else: hypervisor_name = cluster_info.enabled_hypervisors[0] node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(), @@ -12168,8 +12678,8 @@ class IAllocator(object): nname) remote_info = nresult.payload - for attr in ['memory_total', 'memory_free', 'memory_dom0', - 'vg_size', 'vg_free', 'cpu_total']: + for attr in ["memory_total", "memory_free", "memory_dom0", + "vg_size", "vg_free", "cpu_total"]: if attr not in remote_info: raise errors.OpExecError("Node '%s' didn't return attribute" " '%s'" % (nname, attr)) @@ -12185,21 +12695,21 @@ class IAllocator(object): if iinfo.name not in node_iinfo[nname].payload: i_used_mem = 0 else: - i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory']) + i_used_mem = int(node_iinfo[nname].payload[iinfo.name]["memory"]) i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem - remote_info['memory_free'] -= max(0, i_mem_diff) + remote_info["memory_free"] -= max(0, i_mem_diff) if iinfo.admin_up: i_p_up_mem += beinfo[constants.BE_MEMORY] # compute memory used by instances pnr_dyn = { - "total_memory": remote_info['memory_total'], - "reserved_memory": remote_info['memory_dom0'], - "free_memory": remote_info['memory_free'], - "total_disk": remote_info['vg_size'], - "free_disk": remote_info['vg_free'], - "total_cpus": remote_info['cpu_total'], + "total_memory": remote_info["memory_total"], + "reserved_memory": remote_info["memory_dom0"], + "free_memory": remote_info["memory_free"], + "total_disk": remote_info["vg_size"], + "free_disk": remote_info["vg_free"], + "total_cpus": remote_info["cpu_total"], "i_pri_memory": i_p_mem, "i_pri_up_memory": i_p_up_mem, } @@ -12325,13 +12835,21 @@ class IAllocator(object): } return request - def _AddMultiRelocate(self): - """Get data for multi-relocate requests. + def _AddNodeEvacuate(self): + """Get data for node-evacuate requests. + + """ + return { + "instances": self.instances, + "evac_mode": self.evac_mode, + } + + def _AddChangeGroup(self): + """Get data for node-evacuate requests. """ return { "instances": self.instances, - "reloc_mode": self.reloc_mode, "target_groups": self.target_groups, } @@ -12357,6 +12875,28 @@ class IAllocator(object): self.in_text = serializer.Dump(self.in_data) _STRING_LIST = ht.TListOf(ht.TString) + _JOB_LIST = ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, { + # pylint: disable-msg=E1101 + # Class '...' has no 'OP_ID' member + "OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID, + opcodes.OpInstanceMigrate.OP_ID, + opcodes.OpInstanceReplaceDisks.OP_ID]) + }))) + + _NEVAC_MOVED = \ + ht.TListOf(ht.TAnd(ht.TIsLength(3), + ht.TItems([ht.TNonEmptyString, + ht.TNonEmptyString, + ht.TListOf(ht.TNonEmptyString), + ]))) + _NEVAC_FAILED = \ + ht.TListOf(ht.TAnd(ht.TIsLength(2), + ht.TItems([ht.TNonEmptyString, + ht.TMaybeString, + ]))) + _NEVAC_RESULT = ht.TAnd(ht.TIsLength(3), + ht.TItems([_NEVAC_MOVED, _NEVAC_FAILED, _JOB_LIST])) + _MODE_DATA = { constants.IALLOCATOR_MODE_ALLOC: (_AddNewInstance, @@ -12378,19 +12918,16 @@ class IAllocator(object): constants.IALLOCATOR_MODE_MEVAC: (_AddEvacuateNodes, [("evac_nodes", _STRING_LIST)], ht.TListOf(ht.TAnd(ht.TIsLength(2), _STRING_LIST))), - constants.IALLOCATOR_MODE_MRELOC: - (_AddMultiRelocate, [ + constants.IALLOCATOR_MODE_NODE_EVAC: + (_AddNodeEvacuate, [ + ("instances", _STRING_LIST), + ("evac_mode", ht.TElemOf(constants.IALLOCATOR_NEVAC_MODES)), + ], _NEVAC_RESULT), + constants.IALLOCATOR_MODE_CHG_GROUP: + (_AddChangeGroup, [ ("instances", _STRING_LIST), - ("reloc_mode", ht.TElemOf(constants.IALLOCATOR_MRELOC_MODES)), ("target_groups", _STRING_LIST), - ], - ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, { - # pylint: disable-msg=E1101 - # Class '...' has no 'OP_ID' member - "OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID, - opcodes.OpInstanceMigrate.OP_ID, - opcodes.OpInstanceReplaceDisks.OP_ID]) - })))), + ], _NEVAC_RESULT), } def Run(self, name, validate=True, call_fn=None): @@ -12473,6 +13010,9 @@ class IAllocator(object): else: raise errors.ProgrammerError("Unhandled mode '%s'" % self.mode) + elif self.mode == constants.IALLOCATOR_MODE_NODE_EVAC: + assert self.evac_mode in constants.IALLOCATOR_NEVAC_MODES + self.out_data = rdict @staticmethod @@ -12555,12 +13095,11 @@ class LUTestAllocator(NoHooksLU): if not hasattr(self.op, "evac_nodes"): raise errors.OpPrereqError("Missing attribute 'evac_nodes' on" " opcode input", errors.ECODE_INVAL) - elif self.op.mode == constants.IALLOCATOR_MODE_MRELOC: - if self.op.instances: - self.op.instances = _GetWantedInstances(self, self.op.instances) - else: - raise errors.OpPrereqError("Missing instances to relocate", - errors.ECODE_INVAL) + elif self.op.mode in (constants.IALLOCATOR_MODE_CHG_GROUP, + constants.IALLOCATOR_MODE_NODE_EVAC): + if not self.op.instances: + raise errors.OpPrereqError("Missing instances", errors.ECODE_INVAL) + self.op.instances = _GetWantedInstances(self, self.op.instances) else: raise errors.OpPrereqError("Invalid test allocator mode '%s'" % self.op.mode, errors.ECODE_INVAL) @@ -12600,12 +13139,16 @@ class LUTestAllocator(NoHooksLU): ial = IAllocator(self.cfg, self.rpc, mode=self.op.mode, evac_nodes=self.op.evac_nodes) - elif self.op.mode == constants.IALLOCATOR_MODE_MRELOC: + elif self.op.mode == constants.IALLOCATOR_MODE_CHG_GROUP: ial = IAllocator(self.cfg, self.rpc, mode=self.op.mode, instances=self.op.instances, - reloc_mode=self.op.reloc_mode, target_groups=self.op.target_groups) + elif self.op.mode == constants.IALLOCATOR_MODE_NODE_EVAC: + ial = IAllocator(self.cfg, self.rpc, + mode=self.op.mode, + instances=self.op.instances, + evac_mode=self.op.evac_mode) else: raise errors.ProgrammerError("Uncatched mode %s in" " LUTestAllocator.Exec", self.op.mode)