X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/51cb158170d8a2c51237717a1e7c2257bf4daefb..052ee11766e7981c8905bfda88a91da05429bba6:/lib/cmdlib.py diff --git a/lib/cmdlib.py b/lib/cmdlib.py index b554321..baded30 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007, 2008 Google Inc. +# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -36,6 +36,9 @@ import platform import logging import copy import OpenSSL +import socket +import tempfile +import shutil from ganeti import ssh from ganeti import utils @@ -49,6 +52,7 @@ from ganeti import ssconf from ganeti import uidpool from ganeti import compat from ganeti import masterd +from ganeti import netutils import ganeti.masterd.instance # pylint: disable-msg=W0611 @@ -150,6 +154,13 @@ def _TDict(val): return isinstance(val, dict) +def _TIsLength(size): + """Check is the given container is of the given size. + + """ + return lambda container: len(container) == size + + # Combinator types def _TAnd(*args): """Combine multiple functions using an AND operation. @@ -169,6 +180,13 @@ def _TOr(*args): return fn +def _TMap(fn, test): + """Checks that a modified version of the argument passes the given test. + + """ + return lambda val: test(fn(val)) + + # Type aliases #: a non-empty string @@ -228,6 +246,13 @@ _PInstanceName = ("instance_name", _NoDefault, _TNonEmptyString) #: a required node name (for single-node LUs) _PNodeName = ("node_name", _NoDefault, _TNonEmptyString) +#: the migration type (live/non-live) +_PMigrationMode = ("mode", None, _TOr(_TNone, + _TElemOf(constants.HT_MIGRATION_MODES))) + +#: the obsolete 'live' mode (boolean) +_PMigrationLive = ("live", None, _TMaybeBool) + # End types class LogicalUnit(object): @@ -277,6 +302,7 @@ class LogicalUnit(object): self.recalculate_locks = {} self.__ssh = None # logging + self.Log = processor.Log # pylint: disable-msg=C0103 self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103 self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103 self.LogStep = processor.LogStep # pylint: disable-msg=C0103 @@ -377,11 +403,11 @@ class LogicalUnit(object): # Acquire all nodes and one instance self.needed_locks = { locking.LEVEL_NODE: locking.ALL_SET, - locking.LEVEL_INSTANCE: ['instance1.example.tld'], + locking.LEVEL_INSTANCE: ['instance1.example.com'], } # Acquire just two nodes self.needed_locks = { - locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'], + locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'], } # Acquire no locks self.needed_locks = {} # No, you can't leave it to the default value None @@ -1076,9 +1102,8 @@ def _CheckOSVariant(os_obj, name): """ if not os_obj.supported_variants: return - try: - variant = name.split("+", 1)[1] - except IndexError: + variant = objects.OS.GetVariant(name) + if not variant: raise errors.OpPrereqError("OS name must include a variant", errors.ECODE_INVAL) @@ -1143,6 +1168,38 @@ def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq): return faulty +def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot): + """Check the sanity of iallocator and node arguments and use the + cluster-wide iallocator if appropriate. + + Check that at most one of (iallocator, node) is specified. If none is + specified, then the LU's opcode's iallocator slot is filled with the + cluster-wide default iallocator. + + @type iallocator_slot: string + @param iallocator_slot: the name of the opcode iallocator slot + @type node_slot: string + @param node_slot: the name of the opcode target node slot + + """ + node = getattr(lu.op, node_slot, None) + iallocator = getattr(lu.op, iallocator_slot, None) + + if node is not None and iallocator is not None: + raise errors.OpPrereqError("Do not specify both, iallocator and node.", + errors.ECODE_INVAL) + elif node is None and iallocator is None: + default_iallocator = lu.cfg.GetDefaultIAllocator() + if default_iallocator: + setattr(lu.op, iallocator_slot, default_iallocator) + else: + raise errors.OpPrereqError("No iallocator or node given and no" + " cluster-wide default iallocator found." + " Please specify either an iallocator or a" + " node, or set a cluster-wide default" + " iallocator.") + + class LUPostInitCluster(LogicalUnit): """Logical unit for running hooks after cluster initialization. @@ -1287,6 +1344,7 @@ class LUVerifyCluster(LogicalUnit): EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK") EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE") ENODEDRBD = (TNODE, "ENODEDRBD") + ENODEDRBDHELPER = (TNODE, "ENODEDRBDHELPER") ENODEFILECHECK = (TNODE, "ENODEFILECHECK") ENODEHOOKS = (TNODE, "ENODEHOOKS") ENODEHV = (TNODE, "ENODEHV") @@ -1398,14 +1456,11 @@ class LUVerifyCluster(LogicalUnit): self.bad = self.bad or cond def _VerifyNode(self, ninfo, nresult): - """Run multiple tests against a node. + """Perform some basic validation on data returned from a node. - Test list: - - - compares ganeti version - - checks vg existence and size > 20G - - checks config file checksum - - checks ssh to other nodes + - check the result data structure is well formed and has all the + mandatory fields + - check ganeti version @type ninfo: L{objects.Node} @param ninfo: the node to check @@ -1615,20 +1670,24 @@ class LUVerifyCluster(LogicalUnit): _ErrorIf(test, self.EINSTANCEWRONGNODE, instance, "instance should not run on node %s", node) - def _VerifyOrphanVolumes(self, node_vol_should, node_image): + def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved): """Verify if there are any unknown volumes in the cluster. The .os, .swap and backup volumes are ignored. All other volumes are reported as unknown. + @type reserved: L{ganeti.utils.FieldSet} + @param reserved: a FieldSet of reserved volume names + """ for node, n_img in node_image.items(): if n_img.offline or n_img.rpc_fail or n_img.lvm_fail: # skip non-healthy nodes continue for volume in n_img.volumes: - test = (node not in node_vol_should or - volume not in node_vol_should[node]) + test = ((node not in node_vol_should or + volume not in node_vol_should[node]) and + not reserved.Matches(volume)) self._ErrorIf(test, self.ENODEORPHANLV, node, "volume %s is unknown", volume) @@ -1715,13 +1774,15 @@ class LUVerifyCluster(LogicalUnit): "file '%s' should not exist" " on non master candidates", file_name) - def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_map): + def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper, + drbd_map): """Verifies and the node DRBD status. @type ninfo: L{objects.Node} @param ninfo: the node to check @param nresult: the remote results for the node @param instanceinfo: the dict of instances + @param drbd_helper: the configured DRBD usermode helper @param drbd_map: the DRBD map as returned by L{ganeti.config.ConfigWriter.ComputeDRBDMap} @@ -1729,6 +1790,20 @@ class LUVerifyCluster(LogicalUnit): node = ninfo.name _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + if drbd_helper: + helper_result = nresult.get(constants.NV_DRBDHELPER, None) + test = (helper_result == None) + _ErrorIf(test, self.ENODEDRBDHELPER, node, + "no drbd usermode helper returned") + if helper_result: + status, payload = helper_result + test = not status + _ErrorIf(test, self.ENODEDRBDHELPER, node, + "drbd usermode helper check unsuccessful: %s", payload) + test = status and (payload != drbd_helper) + _ErrorIf(test, self.ENODEDRBDHELPER, node, + "wrong drbd usermode helper: %s", payload) + # compute the DRBD minors node_drbd = {} for minor, instance in drbd_map[node].items(): @@ -1979,6 +2054,7 @@ class LUVerifyCluster(LogicalUnit): _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode) vg_name = self.cfg.GetVGName() + drbd_helper = self.cfg.GetDRBDHelper() hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors cluster = self.cfg.GetClusterInfo() nodelist = utils.NiceSort(self.cfg.GetNodeList()) @@ -2030,6 +2106,9 @@ class LUVerifyCluster(LogicalUnit): node_verify_param[constants.NV_PVLIST] = [vg_name] node_verify_param[constants.NV_DRBDLIST] = None + if drbd_helper: + node_verify_param[constants.NV_DRBDHELPER] = drbd_helper + # Build our expected cluster state node_image = dict((node.name, self.NodeImage(offline=node.offline, name=node.name)) @@ -2110,7 +2189,8 @@ class LUVerifyCluster(LogicalUnit): self._VerifyNodeLVM(node_i, nresult, vg_name) self._VerifyNodeFiles(node_i, nresult, file_names, local_checksums, master_files) - self._VerifyNodeDrbd(node_i, nresult, instanceinfo, all_drbd_map) + self._VerifyNodeDrbd(node_i, nresult, instanceinfo, drbd_helper, + all_drbd_map) self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime) self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name) @@ -2172,7 +2252,8 @@ class LUVerifyCluster(LogicalUnit): "instance lives on ghost node %s", node) feedback_fn("* Verifying orphan volumes") - self._VerifyOrphanVolumes(node_vol_should, node_image) + reserved = utils.FieldSet(*cluster.reserved_lvs) + self._VerifyOrphanVolumes(node_vol_should, node_image, reserved) feedback_fn("* Verifying orphan instances") self._VerifyOrphanInstances(instancelist, node_image) @@ -2459,7 +2540,7 @@ class LURenameCluster(LogicalUnit): """Verify that the passed name is a valid one. """ - hostname = utils.GetHostInfo(self.op.name) + hostname = netutils.GetHostInfo(self.op.name) new_name = hostname.name self.ip = new_ip = hostname.ip @@ -2470,7 +2551,7 @@ class LURenameCluster(LogicalUnit): " cluster has changed", errors.ECODE_INVAL) if new_ip != old_ip: - if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT): + if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT): raise errors.OpPrereqError("The given cluster IP address (%s) is" " reachable on the network. Aborting." % new_ip, errors.ECODE_NOTUNIQUE) @@ -2518,6 +2599,8 @@ class LURenameCluster(LogicalUnit): self.LogWarning("Could not re-enable the master role on" " the master, please restart manually: %s", msg) + return clustername + class LUSetClusterParams(LogicalUnit): """Change the parameters of the cluster. @@ -2539,6 +2622,19 @@ class LUSetClusterParams(LogicalUnit): ("remove_uids", None, _NoType), ("maintain_node_health", None, _TMaybeBool), ("nicparams", None, _TOr(_TDict, _TNone)), + ("drbd_helper", None, _TOr(_TString, _TNone)), + ("default_iallocator", None, _TMaybeString), + ("reserved_lvs", None, _TOr(_TListOf(_TNonEmptyString), _TNone)), + ("hidden_oss", None, _TOr(_TListOf(\ + _TAnd(_TList, + _TIsLength(2), + _TMap(lambda v: v[0], _TElemOf(constants.DDMS_VALUES)))), + _TNone)), + ("blacklisted_oss", None, _TOr(_TListOf(\ + _TAnd(_TList, + _TIsLength(2), + _TMap(lambda v: v[0], _TElemOf(constants.DDMS_VALUES)))), + _TNone)), ] REQ_BGL = False @@ -2586,6 +2682,12 @@ class LUSetClusterParams(LogicalUnit): raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based" " instances exist", errors.ECODE_INVAL) + if self.op.drbd_helper is not None and not self.op.drbd_helper: + if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8): + raise errors.OpPrereqError("Cannot disable drbd helper while" + " drbd-based instances exist", + errors.ECODE_INVAL) + node_list = self.acquired_locks[locking.LEVEL_NODE] # if vg_name not None, checks given volume group on all nodes @@ -2605,6 +2707,24 @@ class LUSetClusterParams(LogicalUnit): raise errors.OpPrereqError("Error on node '%s': %s" % (node, vgstatus), errors.ECODE_ENVIRON) + if self.op.drbd_helper: + # checks given drbd helper on all nodes + helpers = self.rpc.call_drbd_helper(node_list) + for node in node_list: + ninfo = self.cfg.GetNodeInfo(node) + if ninfo.offline: + self.LogInfo("Not checking drbd helper on offline node %s", node) + continue + msg = helpers[node].fail_msg + if msg: + raise errors.OpPrereqError("Error checking drbd helper on node" + " '%s': %s" % (node, msg), + errors.ECODE_ENVIRON) + node_helper = helpers[node].payload + if node_helper != self.op.drbd_helper: + raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" % + (node, node_helper), errors.ECODE_ENVIRON) + self.cluster = cluster = self.cfg.GetClusterInfo() # validate params changes if self.op.beparams: @@ -2720,6 +2840,14 @@ class LUSetClusterParams(LogicalUnit): hv_class.CheckParameterSyntax(new_osp) _CheckHVParams(self, node_list, hv_name, new_osp) + if self.op.default_iallocator: + alloc_script = utils.FindFile(self.op.default_iallocator, + constants.IALLOCATOR_SEARCH_PATH, + os.path.isfile) + if alloc_script is None: + raise errors.OpPrereqError("Invalid default iallocator script '%s'" + " specified" % self.op.default_iallocator, + errors.ECODE_INVAL) def Exec(self, feedback_fn): """Change the parameters of the cluster. @@ -2734,6 +2862,15 @@ class LUSetClusterParams(LogicalUnit): else: feedback_fn("Cluster LVM configuration already in desired" " state, not changing") + if self.op.drbd_helper is not None: + new_helper = self.op.drbd_helper + if not new_helper: + new_helper = None + if new_helper != self.cfg.GetDRBDHelper(): + self.cfg.SetDRBDHelper(new_helper) + else: + feedback_fn("Cluster DRBD helper already in desired state," + " not changing") if self.op.hvparams: self.cluster.hvparams = self.new_hvparams if self.op.os_hvp: @@ -2765,6 +2902,36 @@ class LUSetClusterParams(LogicalUnit): if self.op.uid_pool is not None: self.cluster.uid_pool = self.op.uid_pool + if self.op.default_iallocator is not None: + self.cluster.default_iallocator = self.op.default_iallocator + + if self.op.reserved_lvs is not None: + self.cluster.reserved_lvs = self.op.reserved_lvs + + def helper_oss(aname, mods, desc): + lst = getattr(self.cluster, aname) + for key, val in mods: + if key == constants.DDM_ADD: + if val in lst: + feedback_fn("OS %s already in %s, ignoring", val, desc) + else: + lst.append(val) + elif key == constants.DDM_REMOVE: + if val in lst: + lst.remove(val) + else: + feedback_fn("OS %s not found in %s, ignoring", val, desc) + else: + raise errors.ProgrammerError("Invalid modification '%s'" % key) + + if self.op.hidden_oss: + helper_oss("hidden_oss", self.op.hidden_oss, + "hidden OS list") + + if self.op.blacklisted_oss: + helper_oss("blacklisted_oss", self.op.blacklisted_oss, + "blacklisted OS list") + self.cfg.Update(self.cluster, feedback_fn) @@ -2953,9 +3120,12 @@ class LUDiagnoseOS(NoHooksLU): ("names", _EmptyList, _TListOf(_TNonEmptyString)), ] REQ_BGL = False + _HID = "hidden" + _BLK = "blacklisted" + _VLD = "valid" _FIELDS_STATIC = utils.FieldSet() - _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants", - "parameters", "api_versions") + _FIELDS_DYNAMIC = utils.FieldSet("name", _VLD, "node_status", "variants", + "parameters", "api_versions", _HID, _BLK) def CheckArguments(self): if self.op.names: @@ -3022,8 +3192,10 @@ class LUDiagnoseOS(NoHooksLU): node_data = self.rpc.call_os_diagnose(valid_nodes) pol = self._DiagnoseByOS(node_data) output = [] + cluster = self.cfg.GetClusterInfo() - for os_name, os_data in pol.items(): + for os_name in utils.NiceSort(pol.keys()): + os_data = pol[os_name] row = [] valid = True (variants, params, api_versions) = null_state = (set(), set(), set()) @@ -3042,10 +3214,17 @@ class LUDiagnoseOS(NoHooksLU): params.intersection_update(node_params) api_versions.intersection_update(node_api) + is_hid = os_name in cluster.hidden_oss + is_blk = os_name in cluster.blacklisted_oss + if ((self._HID not in self.op.output_fields and is_hid) or + (self._BLK not in self.op.output_fields and is_blk) or + (self._VLD not in self.op.output_fields and not valid)): + continue + for field in self.op.output_fields: if field == "name": val = os_name - elif field == "valid": + elif field == self._VLD: val = valid elif field == "node_status": # this is just a copy of the dict @@ -3053,11 +3232,15 @@ class LUDiagnoseOS(NoHooksLU): for node_name, nos_list in os_data.items(): val[node_name] = nos_list elif field == "variants": - val = list(variants) + val = utils.NiceSort(list(variants)) elif field == "parameters": val = list(params) elif field == "api_versions": val = list(api_versions) + elif field == self._HID: + val = is_hid + elif field == self._BLK: + val = is_blk else: raise errors.ParameterError(field) row.append(val) @@ -3554,7 +3737,7 @@ class LUAddNode(LogicalUnit): def CheckArguments(self): # validate/normalize the node name - self.op.node_name = utils.HostInfo.NormalizeName(self.op.node_name) + self.op.node_name = netutils.HostInfo.NormalizeName(self.op.node_name) def BuildHooksEnv(self): """Build hooks env. @@ -3586,13 +3769,13 @@ class LUAddNode(LogicalUnit): node_name = self.op.node_name cfg = self.cfg - dns_data = utils.GetHostInfo(node_name) + dns_data = netutils.GetHostInfo(node_name) node = dns_data.name primary_ip = self.op.primary_ip = dns_data.ip if self.op.secondary_ip is None: self.op.secondary_ip = primary_ip - if not utils.IsValidIP4(self.op.secondary_ip): + if not netutils.IsValidIP4(self.op.secondary_ip): raise errors.OpPrereqError("Invalid secondary IP given", errors.ECODE_INVAL) secondary_ip = self.op.secondary_ip @@ -3644,13 +3827,13 @@ class LUAddNode(LogicalUnit): errors.ECODE_INVAL) # checks reachability - if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT): + if not netutils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT): raise errors.OpPrereqError("Node not reachable by ping", errors.ECODE_ENVIRON) if not newbie_singlehomed: # check reachability from my secondary ip to newbie's secondary ip - if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT, + if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT, source=myself.secondary_ip): raise errors.OpPrereqError("Node secondary ip not reachable by TCP" " based ping to noded port", @@ -3848,7 +4031,7 @@ class LUSetNodeParams(LogicalUnit): # we can't change the master's node flags if self.op.node_name == self.cfg.GetMasterNode(): raise errors.OpPrereqError("The master role can be changed" - " only via masterfailover", + " only via master-failover", errors.ECODE_INVAL) @@ -4019,6 +4202,7 @@ class LUQueryClusterInfo(NoHooksLU): "candidate_pool_size": cluster.candidate_pool_size, "master_netdev": cluster.master_netdev, "volume_group_name": cluster.volume_group_name, + "drbd_usermode_helper": cluster.drbd_usermode_helper, "file_storage_dir": cluster.file_storage_dir, "maintain_node_health": cluster.maintain_node_health, "ctime": cluster.ctime, @@ -4026,6 +4210,8 @@ class LUQueryClusterInfo(NoHooksLU): "uuid": cluster.uuid, "tags": list(cluster.GetTags()), "uid_pool": cluster.uid_pool, + "default_iallocator": cluster.default_iallocator, + "reserved_lvs": cluster.reserved_lvs, } return result @@ -4752,10 +4938,19 @@ class LURenameInstance(LogicalUnit): _OP_PARAMS = [ _PInstanceName, ("new_name", _NoDefault, _TNonEmptyString), - ("ignore_ip", False, _TBool), - ("check_name", True, _TBool), + ("ip_check", False, _TBool), + ("name_check", True, _TBool), ] + def CheckArguments(self): + """Check arguments. + + """ + if self.op.ip_check and not self.op.name_check: + # TODO: make the ip check more flexible and not depend on the name check + raise errors.OpPrereqError("Cannot do ip check without a name check", + errors.ECODE_INVAL) + def BuildHooksEnv(self): """Build hooks env. @@ -4781,23 +4976,21 @@ class LURenameInstance(LogicalUnit): _CheckInstanceDown(self, instance, "cannot rename") self.instance = instance - # new name verification - if self.op.check_name: - name_info = utils.GetHostInfo(self.op.new_name) - self.op.new_name = name_info.name - new_name = self.op.new_name + if self.op.name_check: + hostinfo = netutils.HostInfo(netutils.HostInfo.NormalizeName(new_name)) + new_name = self.op.new_name = hostinfo.name + if (self.op.ip_check and + netutils.TcpPing(hostinfo.ip, constants.DEFAULT_NODED_PORT)): + raise errors.OpPrereqError("IP %s of instance %s already in use" % + (hostinfo.ip, new_name), + errors.ECODE_NOTUNIQUE) instance_list = self.cfg.GetInstanceList() if new_name in instance_list: raise errors.OpPrereqError("Instance '%s' is already in the cluster" % new_name, errors.ECODE_EXISTS) - if not self.op.ignore_ip: - if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT): - raise errors.OpPrereqError("IP %s of instance %s already in use" % - (name_info.ip, new_name), - errors.ECODE_NOTUNIQUE) def Exec(self, feedback_fn): """Reinstall the instance. @@ -4840,6 +5033,8 @@ class LURenameInstance(LogicalUnit): finally: _ShutdownInstanceDisks(self, inst) + return inst.name + class LURemoveInstance(LogicalUnit): """Remove an instance. @@ -4961,7 +5156,10 @@ class LUQueryInstances(NoHooksLU): if name not in constants.HVC_GLOBALS] + ["be/%s" % name for name in constants.BES_PARAMETERS]) - _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status") + _FIELDS_DYNAMIC = utils.FieldSet("oper_state", + "oper_ram", + "oper_vcpus", + "status") def CheckArguments(self): @@ -5092,6 +5290,13 @@ class LUQueryInstances(NoHooksLU): val = live_data[instance.name].get("memory", "?") else: val = "-" + elif field == "oper_vcpus": + if instance.primary_node in bad_nodes: + val = None + elif instance.name in live_data: + val = live_data[instance.name].get("vcpus", "?") + else: + val = "-" elif field == "vcpus": val = i_be[constants.BE_VCPUS] elif field == "disk_template": @@ -5372,7 +5577,8 @@ class LUMigrateInstance(LogicalUnit): HTYPE = constants.HTYPE_INSTANCE _OP_PARAMS = [ _PInstanceName, - ("live", True, _TBool), + _PMigrationMode, + _PMigrationLive, ("cleanup", False, _TBool), ] @@ -5385,7 +5591,7 @@ class LUMigrateInstance(LogicalUnit): self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE self._migrater = TLMigrateInstance(self, self.op.instance_name, - self.op.live, self.op.cleanup) + self.op.cleanup) self.tasklets = [self._migrater] def DeclareLocks(self, level): @@ -5402,7 +5608,7 @@ class LUMigrateInstance(LogicalUnit): source_node = instance.primary_node target_node = instance.secondary_nodes[0] env = _BuildInstanceHookEnvByObject(self, instance) - env["MIGRATE_LIVE"] = self.op.live + env["MIGRATE_LIVE"] = self._migrater.live env["MIGRATE_CLEANUP"] = self.op.cleanup env.update({ "OLD_PRIMARY": source_node, @@ -5603,7 +5809,8 @@ class LUMigrateNode(LogicalUnit): HTYPE = constants.HTYPE_NODE _OP_PARAMS = [ _PNodeName, - ("live", False, _TBool), + _PMigrationMode, + _PMigrationLive, ] REQ_BGL = False @@ -5624,7 +5831,7 @@ class LUMigrateNode(LogicalUnit): logging.debug("Migrating instance %s", inst.name) names.append(inst.name) - tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False)) + tasklets.append(TLMigrateInstance(self, inst.name, False)) self.tasklets = tasklets @@ -5651,7 +5858,14 @@ class LUMigrateNode(LogicalUnit): class TLMigrateInstance(Tasklet): - def __init__(self, lu, instance_name, live, cleanup): + """Tasklet class for instance migration. + + @type live: boolean + @ivar live: whether the migration will be done live or non-live; + this variable is initalized only after CheckPrereq has run + + """ + def __init__(self, lu, instance_name, cleanup): """Initializes this class. """ @@ -5659,8 +5873,8 @@ class TLMigrateInstance(Tasklet): # Parameters self.instance_name = instance_name - self.live = live self.cleanup = cleanup + self.live = False # will be overridden later def CheckPrereq(self): """Check prerequisites. @@ -5701,6 +5915,25 @@ class TLMigrateInstance(Tasklet): self.instance = instance + if self.lu.op.live is not None and self.lu.op.mode is not None: + raise errors.OpPrereqError("Only one of the 'live' and 'mode'" + " parameters are accepted", + errors.ECODE_INVAL) + if self.lu.op.live is not None: + if self.lu.op.live: + self.lu.op.mode = constants.HT_MIGRATION_LIVE + else: + self.lu.op.mode = constants.HT_MIGRATION_NONLIVE + # reset the 'live' parameter to None so that repeated + # invocations of CheckPrereq do not raise an exception + self.lu.op.live = None + elif self.lu.op.mode is None: + # read the default value from the hypervisor + i_hv = self.cfg.GetClusterInfo().FillHV(instance, skip_globals=False) + self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE] + + self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE + def _WaitUntilSync(self): """Poll with custom rpc for disk sync. @@ -6354,7 +6587,7 @@ class LUCreateInstance(LogicalUnit): ("os_type", None, _TMaybeString), ("force_variant", False, _TBool), ("source_handshake", None, _TOr(_TList, _TNone)), - ("source_x509_ca", None, _TOr(_TList, _TNone)), + ("source_x509_ca", None, _TMaybeString), ("source_instance_name", None, _TMaybeString), ("src_node", None, _TMaybeString), ("src_path", None, _TMaybeString), @@ -6366,7 +6599,6 @@ class LUCreateInstance(LogicalUnit): ("identify_defaults", False, _TBool), ("file_driver", None, _TOr(_TNone, _TElemOf(constants.FILE_DRIVER))), ("file_storage_dir", None, _TMaybeString), - ("dry_run", False, _TBool), ] REQ_BGL = False @@ -6380,10 +6612,12 @@ class LUCreateInstance(LogicalUnit): self.LogInfo("No-installation mode selected, disabling startup") self.op.start = False # validate/normalize the instance name - self.op.instance_name = utils.HostInfo.NormalizeName(self.op.instance_name) + self.op.instance_name = \ + netutils.HostInfo.NormalizeName(self.op.instance_name) + if self.op.ip_check and not self.op.name_check: # TODO: make the ip check more flexible and not depend on the name check - raise errors.OpPrereqError("Cannot do ip checks without a name check", + raise errors.OpPrereqError("Cannot do ip check without a name check", errors.ECODE_INVAL) # check nics' parameter names @@ -6418,7 +6652,7 @@ class LUCreateInstance(LogicalUnit): # instance name verification if self.op.name_check: - self.hostname1 = utils.GetHostInfo(self.op.instance_name) + self.hostname1 = netutils.GetHostInfo(self.op.instance_name) self.op.instance_name = self.hostname1.name # used in CheckPrereq for ip ping check self.check_ip = self.hostname1.ip @@ -6439,10 +6673,17 @@ class LUCreateInstance(LogicalUnit): errors.ECODE_INVAL) ### Node/iallocator related checks - if [self.op.iallocator, self.op.pnode].count(None) != 1: - raise errors.OpPrereqError("One and only one of iallocator and primary" - " node must be given", - errors.ECODE_INVAL) + _CheckIAllocatorOrNode(self, "iallocator", "pnode") + + if self.op.pnode is not None: + if self.op.disk_template in constants.DTS_NET_MIRROR: + if self.op.snode is None: + raise errors.OpPrereqError("The networked disk templates need" + " a mirror node", errors.ECODE_INVAL) + elif self.op.snode: + self.LogWarning("Secondary node will be ignored on non-mirrored disk" + " template") + self.op.snode = None self._cds = _GetClusterDomainSecret() @@ -6459,6 +6700,10 @@ class LUCreateInstance(LogicalUnit): if self.op.os_type is None: raise errors.OpPrereqError("No guest OS specified", errors.ECODE_INVAL) + if self.op.os_type in self.cfg.GetClusterInfo().blacklisted_oss: + raise errors.OpPrereqError("Guest OS '%s' is not allowed for" + " installation" % self.op.os_type, + errors.ECODE_STATE) if self.op.disk_template is None: raise errors.OpPrereqError("No disk template specified", errors.ECODE_INVAL) @@ -6501,8 +6746,8 @@ class LUCreateInstance(LogicalUnit): raise errors.OpPrereqError("Missing source instance name", errors.ECODE_INVAL) - self.source_instance_name = \ - utils.GetHostInfo(utils.HostInfo.NormalizeName(src_instance_name)).name + norm_name = netutils.HostInfo.NormalizeName(src_instance_name) + self.source_instance_name = netutils.GetHostInfo(norm_name).name else: raise errors.OpPrereqError("Invalid instance creation mode %r" % @@ -6846,7 +7091,7 @@ class LUCreateInstance(LogicalUnit): errors.ECODE_INVAL) nic_ip = self.hostname1.ip else: - if not utils.IsValidIP4(ip): + if not netutils.IsValidIP4(ip): raise errors.OpPrereqError("Given IP address '%s' doesn't look" " like a valid IP" % ip, errors.ECODE_INVAL) @@ -6952,7 +7197,7 @@ class LUCreateInstance(LogicalUnit): # ip ping checks (we use the same ip that was resolved in ExpandNames) if self.op.ip_check: - if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT): + if netutils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT): raise errors.OpPrereqError("IP %s of instance %s already in use" % (self.check_ip, self.op.instance_name), errors.ECODE_NOTUNIQUE) @@ -6991,9 +7236,6 @@ class LUCreateInstance(LogicalUnit): # mirror node verification if self.op.disk_template in constants.DTS_NET_MIRROR: - if self.op.snode is None: - raise errors.OpPrereqError("The networked disk templates need" - " a mirror node", errors.ECODE_INVAL) if self.op.snode == pnode.name: raise errors.OpPrereqError("The secondary node cannot be the" " primary node.", errors.ECODE_INVAL) @@ -8120,9 +8362,7 @@ class LUNodeEvacuationStrategy(NoHooksLU): REQ_BGL = False def CheckArguments(self): - if self.op.remote_node is not None and self.op.iallocator is not None: - raise errors.OpPrereqError("Give either the iallocator or the new" - " secondary, not both", errors.ECODE_INVAL) + _CheckIAllocatorOrNode(self, "iallocator", "remote_node") def ExpandNames(self): self.op.nodes = _GetWantedNodes(self, self.op.nodes) @@ -8523,7 +8763,7 @@ class LUSetInstanceParams(LogicalUnit): if nic_ip.lower() == constants.VALUE_NONE: nic_dict['ip'] = None else: - if not utils.IsValidIP4(nic_ip): + if not netutils.IsValidIP4(nic_ip): raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip, errors.ECODE_INVAL) @@ -8656,6 +8896,10 @@ class LUSetInstanceParams(LogicalUnit): errors.ECODE_INVAL) _CheckInstanceDown(self, instance, "cannot change disk template") if self.op.disk_template in constants.DTS_NET_MIRROR: + if self.op.remote_node == pnode: + raise errors.OpPrereqError("Given new secondary node %s is the same" + " as the primary node of the instance" % + self.op.remote_node, errors.ECODE_STATE) _CheckNodeOnline(self, self.op.remote_node) _CheckNodeNotDrained(self, self.op.remote_node) disks = [{"size": d.size} for d in instance.disks] @@ -9449,15 +9693,25 @@ class LUExportInstance(LogicalUnit): feedback_fn("Deactivating disks for %s" % instance.name) _ShutdownInstanceDisks(self, instance) + if not (compat.all(dresults) and fin_resu): + failures = [] + if not fin_resu: + failures.append("export finalization") + if not compat.all(dresults): + fdsk = utils.CommaJoin(idx for (idx, dsk) in enumerate(dresults) + if not dsk) + failures.append("disk export: disk(s) %s" % fdsk) + + raise errors.OpExecError("Export failed, errors in %s" % + utils.CommaJoin(failures)) + + # At this point, the export was successful, we can cleanup/finish + # Remove instance if requested if self.op.remove_instance: - if not (compat.all(dresults) and fin_resu): - feedback_fn("Not removing instance %s as parts of the export failed" % - instance.name) - else: - feedback_fn("Removing instance %s" % instance.name) - _RemoveInstance(self, feedback_fn, instance, - self.op.ignore_remove_failures) + feedback_fn("Removing instance %s" % instance.name) + _RemoveInstance(self, feedback_fn, instance, + self.op.ignore_remove_failures) if self.op.mode == constants.EXPORT_MODE_LOCAL: self._CleanupExports(feedback_fn) @@ -9552,7 +9806,8 @@ class LUGetTags(TagsLU): """ _OP_PARAMS = [ ("kind", _NoDefault, _TElemOf(constants.VALID_TAG_TYPES)), - ("name", _NoDefault, _TNonEmptyString), + # Name is only meaningful for nodes and instances + ("name", _NoDefault, _TMaybeString), ] REQ_BGL = False @@ -9611,7 +9866,8 @@ class LUAddTags(TagsLU): """ _OP_PARAMS = [ ("kind", _NoDefault, _TElemOf(constants.VALID_TAG_TYPES)), - ("name", _NoDefault, _TNonEmptyString), + # Name is only meaningful for nodes and instances + ("name", _NoDefault, _TMaybeString), ("tags", _NoDefault, _TListOf(_TNonEmptyString)), ] REQ_BGL = False @@ -9644,7 +9900,8 @@ class LUDelTags(TagsLU): """ _OP_PARAMS = [ ("kind", _NoDefault, _TElemOf(constants.VALID_TAG_TYPES)), - ("name", _NoDefault, _TNonEmptyString), + # Name is only meaningful for nodes and instances + ("name", _NoDefault, _TMaybeString), ("tags", _NoDefault, _TListOf(_TNonEmptyString)), ] REQ_BGL = False @@ -9730,6 +9987,148 @@ class LUTestDelay(NoHooksLU): self._TestDelay() +class LUTestJobqueue(NoHooksLU): + """Utility LU to test some aspects of the job queue. + + """ + _OP_PARAMS = [ + ("notify_waitlock", False, _TBool), + ("notify_exec", False, _TBool), + ("log_messages", _EmptyList, _TListOf(_TString)), + ("fail", False, _TBool), + ] + REQ_BGL = False + + # Must be lower than default timeout for WaitForJobChange to see whether it + # notices changed jobs + _CLIENT_CONNECT_TIMEOUT = 20.0 + _CLIENT_CONFIRM_TIMEOUT = 60.0 + + @classmethod + def _NotifyUsingSocket(cls, cb, errcls): + """Opens a Unix socket and waits for another program to connect. + + @type cb: callable + @param cb: Callback to send socket name to client + @type errcls: class + @param errcls: Exception class to use for errors + + """ + # Using a temporary directory as there's no easy way to create temporary + # sockets without writing a custom loop around tempfile.mktemp and + # socket.bind + tmpdir = tempfile.mkdtemp() + try: + tmpsock = utils.PathJoin(tmpdir, "sock") + + logging.debug("Creating temporary socket at %s", tmpsock) + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + try: + sock.bind(tmpsock) + sock.listen(1) + + # Send details to client + cb(tmpsock) + + # Wait for client to connect before continuing + sock.settimeout(cls._CLIENT_CONNECT_TIMEOUT) + try: + (conn, _) = sock.accept() + except socket.error, err: + raise errcls("Client didn't connect in time (%s)" % err) + finally: + sock.close() + finally: + # Remove as soon as client is connected + shutil.rmtree(tmpdir) + + # Wait for client to close + try: + try: + # pylint: disable-msg=E1101 + # Instance of '_socketobject' has no ... member + conn.settimeout(cls._CLIENT_CONFIRM_TIMEOUT) + conn.recv(1) + except socket.error, err: + raise errcls("Client failed to confirm notification (%s)" % err) + finally: + conn.close() + + def _SendNotification(self, test, arg, sockname): + """Sends a notification to the client. + + @type test: string + @param test: Test name + @param arg: Test argument (depends on test) + @type sockname: string + @param sockname: Socket path + + """ + self.Log(constants.ELOG_JQUEUE_TEST, (sockname, test, arg)) + + def _Notify(self, prereq, test, arg): + """Notifies the client of a test. + + @type prereq: bool + @param prereq: Whether this is a prereq-phase test + @type test: string + @param test: Test name + @param arg: Test argument (depends on test) + + """ + if prereq: + errcls = errors.OpPrereqError + else: + errcls = errors.OpExecError + + return self._NotifyUsingSocket(compat.partial(self._SendNotification, + test, arg), + errcls) + + def CheckArguments(self): + self.checkargs_calls = getattr(self, "checkargs_calls", 0) + 1 + self.expandnames_calls = 0 + + def ExpandNames(self): + checkargs_calls = getattr(self, "checkargs_calls", 0) + if checkargs_calls < 1: + raise errors.ProgrammerError("CheckArguments was not called") + + self.expandnames_calls += 1 + + if self.op.notify_waitlock: + self._Notify(True, constants.JQT_EXPANDNAMES, None) + + self.LogInfo("Expanding names") + + # Get lock on master node (just to get a lock, not for a particular reason) + self.needed_locks = { + locking.LEVEL_NODE: self.cfg.GetMasterNode(), + } + + def Exec(self, feedback_fn): + if self.expandnames_calls < 1: + raise errors.ProgrammerError("ExpandNames was not called") + + if self.op.notify_exec: + self._Notify(False, constants.JQT_EXEC, None) + + self.LogInfo("Executing") + + if self.op.log_messages: + self._Notify(False, constants.JQT_STARTMSG, len(self.op.log_messages)) + for idx, msg in enumerate(self.op.log_messages): + self.LogInfo("Sending log message %s", idx + 1) + feedback_fn(constants.JQT_MSGPREFIX + msg) + # Report how many test messages have been sent + self._Notify(False, constants.JQT_LOGMSG, idx + 1) + + if self.op.fail: + raise errors.OpExecError("Opcode failure was requested") + + return True + + class IAllocator(object): """IAllocator framework.