X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/ef628379568ae9f10b68c0901816328365966b4e..e8d61457f16974cbf0d77479f9d06f4c6345a02e:/lib/cmdlib.py diff --git a/lib/cmdlib.py b/lib/cmdlib.py index eb1f478..1843311 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007, 2008 Google Inc. +# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -21,11 +21,13 @@ """Module implementing the master-side code.""" -# pylint: disable-msg=W0201 +# pylint: disable-msg=W0201,C0302 # W0201 since most LU attributes are defined in CheckPrereq or similar # functions +# C0302: since we have waaaay to many lines in this module + import os import os.path import time @@ -34,6 +36,9 @@ import platform import logging import copy import OpenSSL +import socket +import tempfile +import shutil from ganeti import ssh from ganeti import utils @@ -46,8 +51,196 @@ from ganeti import serializer from ganeti import ssconf from ganeti import uidpool from ganeti import compat +from ganeti import masterd +from ganeti import netutils + +import ganeti.masterd.instance # pylint: disable-msg=W0611 + + +# Modifiable default values; need to define these here before the +# actual LUs + +def _EmptyList(): + """Returns an empty list. + + """ + return [] + + +def _EmptyDict(): + """Returns an empty dict. + + """ + return {} + + +#: The without-default default value +_NoDefault = object() + + +#: The no-type (value to complex to check it in the type system) +_NoType = object() + + +# Some basic types +def _TNotNone(val): + """Checks if the given value is not None. + + """ + return val is not None + + +def _TNone(val): + """Checks if the given value is None. + + """ + return val is None + + +def _TBool(val): + """Checks if the given value is a boolean. + + """ + return isinstance(val, bool) + + +def _TInt(val): + """Checks if the given value is an integer. + + """ + return isinstance(val, int) + + +def _TFloat(val): + """Checks if the given value is a float. + + """ + return isinstance(val, float) + + +def _TString(val): + """Checks if the given value is a string. + + """ + return isinstance(val, basestring) + + +def _TTrue(val): + """Checks if a given value evaluates to a boolean True value. + + """ + return bool(val) + + +def _TElemOf(target_list): + """Builds a function that checks if a given value is a member of a list. + + """ + return lambda val: val in target_list + + +# Container types +def _TList(val): + """Checks if the given value is a list. + + """ + return isinstance(val, list) + + +def _TDict(val): + """Checks if the given value is a dictionary. + + """ + return isinstance(val, dict) + + +# Combinator types +def _TAnd(*args): + """Combine multiple functions using an AND operation. + + """ + def fn(val): + return compat.all(t(val) for t in args) + return fn + + +def _TOr(*args): + """Combine multiple functions using an AND operation. + + """ + def fn(val): + return compat.any(t(val) for t in args) + return fn +# Type aliases + +#: a non-empty string +_TNonEmptyString = _TAnd(_TString, _TTrue) + + +#: a maybe non-empty string +_TMaybeString = _TOr(_TNonEmptyString, _TNone) + + +#: a maybe boolean (bool or none) +_TMaybeBool = _TOr(_TBool, _TNone) + + +#: a positive integer +_TPositiveInt = _TAnd(_TInt, lambda v: v >= 0) + +#: a strictly positive integer +_TStrictPositiveInt = _TAnd(_TInt, lambda v: v > 0) + + +def _TListOf(my_type): + """Checks if a given value is a list with all elements of the same type. + + """ + return _TAnd(_TList, + lambda lst: compat.all(my_type(v) for v in lst)) + + +def _TDictOf(key_type, val_type): + """Checks a dict type for the type of its key/values. + + """ + return _TAnd(_TDict, + lambda my_dict: (compat.all(key_type(v) for v in my_dict.keys()) + and compat.all(val_type(v) + for v in my_dict.values()))) + + +# Common opcode attributes + +#: output fields for a query operation +_POutputFields = ("output_fields", _NoDefault, _TListOf(_TNonEmptyString)) + + +#: the shutdown timeout +_PShutdownTimeout = ("shutdown_timeout", constants.DEFAULT_SHUTDOWN_TIMEOUT, + _TPositiveInt) + +#: the force parameter +_PForce = ("force", False, _TBool) + +#: a required instance name (for single-instance LUs) +_PInstanceName = ("instance_name", _NoDefault, _TNonEmptyString) + + +#: a required node name (for single-node LUs) +_PNodeName = ("node_name", _NoDefault, _TNonEmptyString) + +#: the migration type (live/non-live) +_PMigrationMode = ("mode", None, _TOr(_TNone, + _TElemOf(constants.HT_MIGRATION_MODES))) + +#: the obsolete 'live' mode (boolean) +_PMigrationLive = ("live", None, _TMaybeBool) + + +# End types class LogicalUnit(object): """Logical Unit base class. @@ -64,11 +257,13 @@ class LogicalUnit(object): @ivar dry_run_result: the value (if any) that will be returned to the caller in dry-run mode (signalled by opcode dry_run parameter) + @cvar _OP_PARAMS: a list of opcode attributes, their defaults values + they should get if not already defined, and types they must match """ HPATH = None HTYPE = None - _OP_REQP = [] + _OP_PARAMS = [] REQ_BGL = True def __init__(self, processor, op, context, rpc): @@ -93,6 +288,7 @@ class LogicalUnit(object): self.recalculate_locks = {} self.__ssh = None # logging + self.Log = processor.Log # pylint: disable-msg=C0103 self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103 self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103 self.LogStep = processor.LogStep # pylint: disable-msg=C0103 @@ -106,11 +302,32 @@ class LogicalUnit(object): # Tasklets self.tasklets = None - for attr_name in self._OP_REQP: - attr_val = getattr(op, attr_name, None) - if attr_val is None: - raise errors.OpPrereqError("Required parameter '%s' missing" % - attr_name, errors.ECODE_INVAL) + # The new kind-of-type-system + op_id = self.op.OP_ID + for attr_name, aval, test in self._OP_PARAMS: + if not hasattr(op, attr_name): + if aval == _NoDefault: + raise errors.OpPrereqError("Required parameter '%s.%s' missing" % + (op_id, attr_name), errors.ECODE_INVAL) + else: + if callable(aval): + dval = aval() + else: + dval = aval + setattr(self.op, attr_name, dval) + attr_val = getattr(op, attr_name) + if test == _NoType: + # no tests here + continue + if not callable(test): + raise errors.ProgrammerError("Validation for parameter '%s.%s' failed," + " given type is not a proper type (%s)" % + (op_id, attr_name, test)) + if not test(attr_val): + logging.error("OpCode %s, parameter %s, has invalid type %s/value %s", + self.op.OP_ID, attr_name, type(attr_val), attr_val) + raise errors.OpPrereqError("Parameter '%s.%s' fails validation" % + (op_id, attr_name), errors.ECODE_INVAL) self.CheckArguments() @@ -172,11 +389,11 @@ class LogicalUnit(object): # Acquire all nodes and one instance self.needed_locks = { locking.LEVEL_NODE: locking.ALL_SET, - locking.LEVEL_INSTANCE: ['instance1.example.tld'], + locking.LEVEL_INSTANCE: ['instance1.example.com'], } # Acquire just two nodes self.needed_locks = { - locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'], + locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'], } # Acquire no locks self.needed_locks = {} # No, you can't leave it to the default value None @@ -229,7 +446,7 @@ class LogicalUnit(object): idx + 1, len(self.tasklets)) tl.CheckPrereq() else: - raise NotImplementedError + pass def Exec(self, feedback_fn): """Execute the LU. @@ -409,7 +626,7 @@ class Tasklet: hasn't been done before. """ - raise NotImplementedError + pass def Exec(self, feedback_fn): """Execute the tasklet. @@ -434,10 +651,6 @@ def _GetWantedNodes(lu, nodes): @raise errors.ProgrammerError: if the nodes parameter is wrong type """ - if not isinstance(nodes, list): - raise errors.OpPrereqError("Invalid argument type 'nodes'", - errors.ECODE_INVAL) - if not nodes: raise errors.ProgrammerError("_GetWantedNodes should only be called with a" " non-empty list of nodes whose name is to be expanded.") @@ -459,10 +672,6 @@ def _GetWantedInstances(lu, instances): @raise errors.OpPrereqError: if any of the passed instances is not found """ - if not isinstance(instances, list): - raise errors.OpPrereqError("Invalid argument type 'instances'", - errors.ECODE_INVAL) - if instances: wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances] else: @@ -470,6 +679,39 @@ def _GetWantedInstances(lu, instances): return wanted +def _GetUpdatedParams(old_params, update_dict, + use_default=True, use_none=False): + """Return the new version of a parameter dictionary. + + @type old_params: dict + @param old_params: old parameters + @type update_dict: dict + @param update_dict: dict containing new parameter values, or + constants.VALUE_DEFAULT to reset the parameter to its default + value + @param use_default: boolean + @type use_default: whether to recognise L{constants.VALUE_DEFAULT} + values as 'to be deleted' values + @param use_none: boolean + @type use_none: whether to recognise C{None} values as 'to be + deleted' values + @rtype: dict + @return: the new parameter dictionary + + """ + params_copy = copy.deepcopy(old_params) + for key, val in update_dict.iteritems(): + if ((use_default and val == constants.VALUE_DEFAULT) or + (use_none and val is None)): + try: + del params_copy[key] + except KeyError: + pass + else: + params_copy[key] = val + return params_copy + + def _CheckOutputFields(static, dynamic, selected): """Checks whether all selected fields are valid. @@ -489,20 +731,6 @@ def _CheckOutputFields(static, dynamic, selected): % ",".join(delta), errors.ECODE_INVAL) -def _CheckBooleanOpField(op, name): - """Validates boolean opcode parameters. - - This will ensure that an opcode parameter is either a boolean value, - or None (but that it always exists). - - """ - val = getattr(op, name, None) - if not (val is None or isinstance(val, bool)): - raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" % - (name, str(val)), errors.ECODE_INVAL) - setattr(op, name, val) - - def _CheckGlobalHvParams(params): """Validates that given hypervisor params are not global ones. @@ -583,6 +811,7 @@ def _CheckDiskTemplate(template): raise errors.OpPrereqError(msg, errors.ECODE_INVAL) if template == constants.DT_FILE: _RequireFileStorage() + return True def _CheckStorageType(storage_type): @@ -594,7 +823,15 @@ def _CheckStorageType(storage_type): errors.ECODE_INVAL) if storage_type == constants.ST_FILE: _RequireFileStorage() + return True + +def _GetClusterDomainSecret(): + """Reads the cluster domain secret. + + """ + return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE, + strict=True) def _CheckInstanceDown(lu, instance, reason): @@ -741,11 +978,11 @@ def _NICListToTuple(lu, nics): """ hooks_nics = [] - c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT] + cluster = lu.cfg.GetClusterInfo() for nic in nics: ip = nic.ip mac = nic.mac - filled_params = objects.FillDict(c_nicparams, nic.nicparams) + filled_params = cluster.SimpleFillNIC(nic.nicparams) mode = filled_params[constants.NIC_MODE] link = filled_params[constants.NIC_LINK] hooks_nics.append((ip, mac, mode, link)) @@ -817,14 +1054,12 @@ def _DecideSelfPromotion(lu, exceptions=None): return mc_now < mc_should -def _CheckNicsBridgesExist(lu, target_nics, target_node, - profile=constants.PP_DEFAULT): +def _CheckNicsBridgesExist(lu, target_nics, target_node): """Check that the brigdes needed by a list of nics exist. """ - c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile] - paramslist = [objects.FillDict(c_nicparams, nic.nicparams) - for nic in target_nics] + cluster = lu.cfg.GetClusterInfo() + paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics] brlist = [params[constants.NIC_LINK] for params in paramslist if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED] if brlist: @@ -920,11 +1155,36 @@ def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq): return faulty -def _FormatTimestamp(secs): - """Formats a Unix timestamp with the local timezone. +def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot): + """Check the sanity of iallocator and node arguments and use the + cluster-wide iallocator if appropriate. + + Check that at most one of (iallocator, node) is specified. If none is + specified, then the LU's opcode's iallocator slot is filled with the + cluster-wide default iallocator. + + @type iallocator_slot: string + @param iallocator_slot: the name of the opcode iallocator slot + @type node_slot: string + @param node_slot: the name of the opcode target node slot """ - return time.strftime("%F %T %Z", time.gmtime(secs)) + node = getattr(lu.op, node_slot, None) + iallocator = getattr(lu.op, iallocator_slot, None) + + if node is not None and iallocator is not None: + raise errors.OpPrereqError("Do not specify both, iallocator and node.", + errors.ECODE_INVAL) + elif node is None and iallocator is None: + default_iallocator = lu.cfg.GetDefaultIAllocator() + if default_iallocator: + setattr(lu.op, iallocator_slot, default_iallocator) + else: + raise errors.OpPrereqError("No iallocator or node given and no" + " cluster-wide default iallocator found." + " Please specify either an iallocator or a" + " node, or set a cluster-wide default" + " iallocator.") class LUPostInitCluster(LogicalUnit): @@ -933,7 +1193,6 @@ class LUPostInitCluster(LogicalUnit): """ HPATH = "cluster-init" HTYPE = constants.HTYPE_CLUSTER - _OP_REQP = [] def BuildHooksEnv(self): """Build hooks env. @@ -943,12 +1202,6 @@ class LUPostInitCluster(LogicalUnit): mn = self.cfg.GetMasterNode() return env, [], [mn] - def CheckPrereq(self): - """No prerequisites to check. - - """ - return True - def Exec(self, feedback_fn): """Nothing to do. @@ -962,7 +1215,6 @@ class LUDestroyCluster(LogicalUnit): """ HPATH = "cluster-destroy" HTYPE = constants.HTYPE_CLUSTER - _OP_REQP = [] def BuildHooksEnv(self): """Build hooks env. @@ -997,7 +1249,6 @@ class LUDestroyCluster(LogicalUnit): """ master = self.cfg.GetMasterNode() - modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup # Run post hooks on master node before it's removed hm = self.proc.hmclass(self.rpc.call_hooks_runner, self) @@ -1010,53 +1261,9 @@ class LUDestroyCluster(LogicalUnit): result = self.rpc.call_node_stop_master(master, False) result.Raise("Could not disable the master role") - if modify_ssh_setup: - priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS) - utils.CreateBackup(priv_key) - utils.CreateBackup(pub_key) - return master -def _VerifyCertificateInner(filename, expired, not_before, not_after, now, - warn_days=constants.SSL_CERT_EXPIRATION_WARN, - error_days=constants.SSL_CERT_EXPIRATION_ERROR): - """Verifies certificate details for LUVerifyCluster. - - """ - if expired: - msg = "Certificate %s is expired" % filename - - if not_before is not None and not_after is not None: - msg += (" (valid from %s to %s)" % - (_FormatTimestamp(not_before), - _FormatTimestamp(not_after))) - elif not_before is not None: - msg += " (valid from %s)" % _FormatTimestamp(not_before) - elif not_after is not None: - msg += " (valid until %s)" % _FormatTimestamp(not_after) - - return (LUVerifyCluster.ETYPE_ERROR, msg) - - elif not_before is not None and not_before > now: - return (LUVerifyCluster.ETYPE_WARNING, - "Certificate %s not yet valid (valid from %s)" % - (filename, _FormatTimestamp(not_before))) - - elif not_after is not None: - remaining_days = int((not_after - now) / (24 * 3600)) - - msg = ("Certificate %s expires in %d days" % (filename, remaining_days)) - - if remaining_days <= error_days: - return (LUVerifyCluster.ETYPE_ERROR, msg) - - if remaining_days <= warn_days: - return (LUVerifyCluster.ETYPE_WARNING, msg) - - return (None, None) - - def _VerifyCertificate(filename): """Verifies a certificate for LUVerifyCluster. @@ -1071,11 +1278,23 @@ def _VerifyCertificate(filename): return (LUVerifyCluster.ETYPE_ERROR, "Failed to load X509 certificate %s: %s" % (filename, err)) - # Depending on the pyOpenSSL version, this can just return (None, None) - (not_before, not_after) = utils.GetX509CertValidity(cert) + (errcode, msg) = \ + utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN, + constants.SSL_CERT_EXPIRATION_ERROR) + + if msg: + fnamemsg = "While verifying %s: %s" % (filename, msg) + else: + fnamemsg = None + + if errcode is None: + return (None, fnamemsg) + elif errcode == utils.CERT_WARNING: + return (LUVerifyCluster.ETYPE_WARNING, fnamemsg) + elif errcode == utils.CERT_ERROR: + return (LUVerifyCluster.ETYPE_ERROR, fnamemsg) - return _VerifyCertificateInner(filename, cert.has_expired(), - not_before, not_after, time.time()) + raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode) class LUVerifyCluster(LogicalUnit): @@ -1084,7 +1303,13 @@ class LUVerifyCluster(LogicalUnit): """ HPATH = "cluster-verify" HTYPE = constants.HTYPE_CLUSTER - _OP_REQP = ["skip_checks", "verbose", "error_codes", "debug_simulate_errors"] + _OP_PARAMS = [ + ("skip_checks", _EmptyList, + _TListOf(_TElemOf(constants.VERIFY_OPTIONAL_CHECKS))), + ("verbose", False, _TBool), + ("error_codes", False, _TBool), + ("debug_simulate_errors", False, _TBool), + ] REQ_BGL = False TCLUSTER = "cluster" @@ -1100,12 +1325,14 @@ class LUVerifyCluster(LogicalUnit): EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK") EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE") ENODEDRBD = (TNODE, "ENODEDRBD") + ENODEDRBDHELPER = (TNODE, "ENODEDRBDHELPER") ENODEFILECHECK = (TNODE, "ENODEFILECHECK") ENODEHOOKS = (TNODE, "ENODEHOOKS") ENODEHV = (TNODE, "ENODEHV") ENODELVM = (TNODE, "ENODELVM") ENODEN1 = (TNODE, "ENODEN1") ENODENET = (TNODE, "ENODENET") + ENODEOS = (TNODE, "ENODEOS") ENODEORPHANINSTANCE = (TNODE, "ENODEORPHANINSTANCE") ENODEORPHANLV = (TNODE, "ENODEORPHANLV") ENODERPC = (TNODE, "ENODERPC") @@ -1121,6 +1348,8 @@ class LUVerifyCluster(LogicalUnit): class NodeImage(object): """A class representing the logical and physical status of a node. + @type name: string + @ivar name: the node name to which this object refers @ivar volumes: a structure as returned from L{ganeti.backend.GetVolumeList} (runtime) @ivar instances: a list of running instances (runtime) @@ -1140,9 +1369,14 @@ class LUVerifyCluster(LogicalUnit): @ivar hyp_fail: whether the RPC call didn't return the instance list @type ghost: boolean @ivar ghost: whether this is a known node or not (config) + @type os_fail: boolean + @ivar os_fail: whether the RPC call didn't return valid OS data + @type oslist: list + @ivar oslist: list of OSes as diagnosed by DiagnoseOS """ - def __init__(self, offline=False): + def __init__(self, offline=False, name=None): + self.name = name self.volumes = {} self.instances = [] self.pinst = [] @@ -1155,6 +1389,8 @@ class LUVerifyCluster(LogicalUnit): self.lvm_fail = False self.hyp_fail = False self.ghost = False + self.os_fail = False + self.oslist = {} def ExpandNames(self): self.needed_locks = { @@ -1201,14 +1437,11 @@ class LUVerifyCluster(LogicalUnit): self.bad = self.bad or cond def _VerifyNode(self, ninfo, nresult): - """Run multiple tests against a node. + """Perform some basic validation on data returned from a node. - Test list: - - - compares ganeti version - - checks vg existence and size > 20G - - checks config file checksum - - checks ssh to other nodes + - check the result data structure is well formed and has all the + mandatory fields + - check ganeti version @type ninfo: L{objects.Node} @param ninfo: the node to check @@ -1418,20 +1651,24 @@ class LUVerifyCluster(LogicalUnit): _ErrorIf(test, self.EINSTANCEWRONGNODE, instance, "instance should not run on node %s", node) - def _VerifyOrphanVolumes(self, node_vol_should, node_image): + def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved): """Verify if there are any unknown volumes in the cluster. The .os, .swap and backup volumes are ignored. All other volumes are reported as unknown. + @type reserved: L{ganeti.utils.FieldSet} + @param reserved: a FieldSet of reserved volume names + """ for node, n_img in node_image.items(): if n_img.offline or n_img.rpc_fail or n_img.lvm_fail: # skip non-healthy nodes continue for volume in n_img.volumes: - test = (node not in node_vol_should or - volume not in node_vol_should[node]) + test = ((node not in node_vol_should or + volume not in node_vol_should[node]) and + not reserved.Matches(volume)) self._ErrorIf(test, self.ENODEORPHANLV, node, "volume %s is unknown", volume) @@ -1518,13 +1755,15 @@ class LUVerifyCluster(LogicalUnit): "file '%s' should not exist" " on non master candidates", file_name) - def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_map): + def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper, + drbd_map): """Verifies and the node DRBD status. @type ninfo: L{objects.Node} @param ninfo: the node to check @param nresult: the remote results for the node @param instanceinfo: the dict of instances + @param drbd_helper: the configured DRBD usermode helper @param drbd_map: the DRBD map as returned by L{ganeti.config.ConfigWriter.ComputeDRBDMap} @@ -1532,6 +1771,20 @@ class LUVerifyCluster(LogicalUnit): node = ninfo.name _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + if drbd_helper: + helper_result = nresult.get(constants.NV_DRBDHELPER, None) + test = (helper_result == None) + _ErrorIf(test, self.ENODEDRBDHELPER, node, + "no drbd usermode helper returned") + if helper_result: + status, payload = helper_result + test = not status + _ErrorIf(test, self.ENODEDRBDHELPER, node, + "drbd usermode helper check unsuccessful: %s", payload) + test = status and (payload != drbd_helper) + _ErrorIf(test, self.ENODEDRBDHELPER, node, + "wrong drbd usermode helper: %s", payload) + # compute the DRBD minors node_drbd = {} for minor, instance in drbd_map[node].items(): @@ -1565,6 +1818,100 @@ class LUVerifyCluster(LogicalUnit): _ErrorIf(test, self.ENODEDRBD, node, "unallocated drbd minor %d is in use", minor) + def _UpdateNodeOS(self, ninfo, nresult, nimg): + """Builds the node OS structures. + + @type ninfo: L{objects.Node} + @param ninfo: the node to check + @param nresult: the remote results for the node + @param nimg: the node image object + + """ + node = ninfo.name + _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + + remote_os = nresult.get(constants.NV_OSLIST, None) + test = (not isinstance(remote_os, list) or + not compat.all(isinstance(v, list) and len(v) == 7 + for v in remote_os)) + + _ErrorIf(test, self.ENODEOS, node, + "node hasn't returned valid OS data") + + nimg.os_fail = test + + if test: + return + + os_dict = {} + + for (name, os_path, status, diagnose, + variants, parameters, api_ver) in nresult[constants.NV_OSLIST]: + + if name not in os_dict: + os_dict[name] = [] + + # parameters is a list of lists instead of list of tuples due to + # JSON lacking a real tuple type, fix it: + parameters = [tuple(v) for v in parameters] + os_dict[name].append((os_path, status, diagnose, + set(variants), set(parameters), set(api_ver))) + + nimg.oslist = os_dict + + def _VerifyNodeOS(self, ninfo, nimg, base): + """Verifies the node OS list. + + @type ninfo: L{objects.Node} + @param ninfo: the node to check + @param nimg: the node image object + @param base: the 'template' node we match against (e.g. from the master) + + """ + node = ninfo.name + _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103 + + assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?" + + for os_name, os_data in nimg.oslist.items(): + assert os_data, "Empty OS status for OS %s?!" % os_name + f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0] + _ErrorIf(not f_status, self.ENODEOS, node, + "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag) + _ErrorIf(len(os_data) > 1, self.ENODEOS, node, + "OS '%s' has multiple entries (first one shadows the rest): %s", + os_name, utils.CommaJoin([v[0] for v in os_data])) + # this will catched in backend too + _ErrorIf(compat.any(v >= constants.OS_API_V15 for v in f_api) + and not f_var, self.ENODEOS, node, + "OS %s with API at least %d does not declare any variant", + os_name, constants.OS_API_V15) + # comparisons with the 'base' image + test = os_name not in base.oslist + _ErrorIf(test, self.ENODEOS, node, + "Extra OS %s not present on reference node (%s)", + os_name, base.name) + if test: + continue + assert base.oslist[os_name], "Base node has empty OS status?" + _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0] + if not b_status: + # base OS is invalid, skipping + continue + for kind, a, b in [("API version", f_api, b_api), + ("variants list", f_var, b_var), + ("parameters", f_param, b_param)]: + _ErrorIf(a != b, self.ENODEOS, node, + "OS %s %s differs from reference node %s: %s vs. %s", + kind, os_name, base.name, + utils.CommaJoin(a), utils.CommaJoin(b)) + + # check any missing OSes + missing = set(base.oslist.keys()).difference(nimg.oslist.keys()) + _ErrorIf(missing, self.ENODEOS, node, + "OSes present on reference node %s but missing on this node: %s", + base.name, utils.CommaJoin(missing)) + def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name): """Verifies and updates the node volume data. @@ -1654,18 +2001,6 @@ class LUVerifyCluster(LogicalUnit): _ErrorIf(True, self.ENODERPC, node, "node returned invalid LVM info, check LVM status") - def CheckPrereq(self): - """Check prerequisites. - - Transform the list of checks we're going to skip into a set and check that - all its members are valid. - - """ - self.skip_set = frozenset(self.op.skip_checks) - if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set): - raise errors.OpPrereqError("Invalid checks to be skipped specified", - errors.ECODE_INVAL) - def BuildHooksEnv(self): """Build hooks env. @@ -1700,6 +2035,7 @@ class LUVerifyCluster(LogicalUnit): _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode) vg_name = self.cfg.GetVGName() + drbd_helper = self.cfg.GetDRBDHelper() hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors cluster = self.cfg.GetClusterInfo() nodelist = utils.NiceSort(self.cfg.GetNodeList()) @@ -1742,6 +2078,7 @@ class LUVerifyCluster(LogicalUnit): constants.NV_NODESETUP: None, constants.NV_TIME: None, constants.NV_MASTERIP: (master_node, master_ip), + constants.NV_OSLIST: None, } if vg_name is not None: @@ -1750,8 +2087,12 @@ class LUVerifyCluster(LogicalUnit): node_verify_param[constants.NV_PVLIST] = [vg_name] node_verify_param[constants.NV_DRBDLIST] = None + if drbd_helper: + node_verify_param[constants.NV_DRBDHELPER] = drbd_helper + # Build our expected cluster state - node_image = dict((node.name, self.NodeImage(offline=node.offline)) + node_image = dict((node.name, self.NodeImage(offline=node.offline, + name=node.name)) for node in nodeinfo) for instance in instancelist: @@ -1760,7 +2101,7 @@ class LUVerifyCluster(LogicalUnit): for nname in inst_config.all_nodes: if nname not in node_image: # ghost node - gnode = self.NodeImage() + gnode = self.NodeImage(name=nname) gnode.ghost = True node_image[nname] = gnode @@ -1791,6 +2132,9 @@ class LUVerifyCluster(LogicalUnit): all_drbd_map = self.cfg.ComputeDRBDMap() feedback_fn("* Verifying node status") + + refos_img = None + for node_i in nodeinfo: node = node_i.name nimg = node_image[node] @@ -1826,12 +2170,18 @@ class LUVerifyCluster(LogicalUnit): self._VerifyNodeLVM(node_i, nresult, vg_name) self._VerifyNodeFiles(node_i, nresult, file_names, local_checksums, master_files) - self._VerifyNodeDrbd(node_i, nresult, instanceinfo, all_drbd_map) + self._VerifyNodeDrbd(node_i, nresult, instanceinfo, drbd_helper, + all_drbd_map) self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime) self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name) self._UpdateNodeInstances(node_i, nresult, nimg) self._UpdateNodeInfo(node_i, nresult, nimg, vg_name) + self._UpdateNodeOS(node_i, nresult, nimg) + if not nimg.os_fail: + if refos_img is None: + refos_img = nimg + self._VerifyNodeOS(node_i, nimg, refos_img) feedback_fn("* Verifying instance status") for instance in instancelist: @@ -1883,12 +2233,13 @@ class LUVerifyCluster(LogicalUnit): "instance lives on ghost node %s", node) feedback_fn("* Verifying orphan volumes") - self._VerifyOrphanVolumes(node_vol_should, node_image) + reserved = utils.FieldSet(*cluster.reserved_lvs) + self._VerifyOrphanVolumes(node_vol_should, node_image, reserved) feedback_fn("* Verifying orphan instances") self._VerifyOrphanInstances(instancelist, node_image) - if constants.VERIFY_NPLUSONE_MEM not in self.skip_set: + if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks: feedback_fn("* Verifying N+1 Memory redundancy") self._VerifyNPlusOneMemory(node_image, instanceinfo) @@ -1960,7 +2311,6 @@ class LUVerifyDisks(NoHooksLU): """Verifies the cluster disks status. """ - _OP_REQP = [] REQ_BGL = False def ExpandNames(self): @@ -1970,14 +2320,6 @@ class LUVerifyDisks(NoHooksLU): } self.share_locks = dict.fromkeys(locking.LEVELS, 1) - def CheckPrereq(self): - """Check prerequisites. - - This has no prerequisites. - - """ - pass - def Exec(self, feedback_fn): """Verify integrity of cluster disks. @@ -2043,14 +2385,10 @@ class LURepairDiskSizes(NoHooksLU): """Verifies the cluster disks sizes. """ - _OP_REQP = ["instances"] + _OP_PARAMS = [("instances", _EmptyList, _TListOf(_TNonEmptyString))] REQ_BGL = False def ExpandNames(self): - if not isinstance(self.op.instances, list): - raise errors.OpPrereqError("Invalid argument type 'instances'", - errors.ECODE_INVAL) - if self.op.instances: self.wanted_names = [] for name in self.op.instances: @@ -2165,7 +2503,7 @@ class LURenameCluster(LogicalUnit): """ HPATH = "cluster-rename" HTYPE = constants.HTYPE_CLUSTER - _OP_REQP = ["name"] + _OP_PARAMS = [("name", _NoDefault, _TNonEmptyString)] def BuildHooksEnv(self): """Build hooks env. @@ -2183,7 +2521,8 @@ class LURenameCluster(LogicalUnit): """Verify that the passed name is a valid one. """ - hostname = utils.GetHostInfo(self.op.name) + hostname = netutils.GetHostname(name=self.op.name, + family=self.cfg.GetPrimaryIPFamily()) new_name = hostname.name self.ip = new_ip = hostname.ip @@ -2194,7 +2533,7 @@ class LURenameCluster(LogicalUnit): " cluster has changed", errors.ECODE_INVAL) if new_ip != old_ip: - if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT): + if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT): raise errors.OpPrereqError("The given cluster IP address (%s) is" " reachable on the network. Aborting." % new_ip, errors.ECODE_NOTUNIQUE) @@ -2242,21 +2581,7 @@ class LURenameCluster(LogicalUnit): self.LogWarning("Could not re-enable the master role on" " the master, please restart manually: %s", msg) - -def _RecursiveCheckIfLVMBased(disk): - """Check if the given disk or its children are lvm-based. - - @type disk: L{objects.Disk} - @param disk: the disk to check - @rtype: boolean - @return: boolean indicating whether a LD_LV dev_type was found or not - - """ - if disk.children: - for chdisk in disk.children: - if _RecursiveCheckIfLVMBased(chdisk): - return True - return disk.dev_type == constants.LD_LV + return clustername class LUSetClusterParams(LogicalUnit): @@ -2265,30 +2590,30 @@ class LUSetClusterParams(LogicalUnit): """ HPATH = "cluster-modify" HTYPE = constants.HTYPE_CLUSTER - _OP_REQP = [] + _OP_PARAMS = [ + ("vg_name", None, _TMaybeString), + ("enabled_hypervisors", None, + _TOr(_TAnd(_TListOf(_TElemOf(constants.HYPER_TYPES)), _TTrue), _TNone)), + ("hvparams", None, _TOr(_TDictOf(_TNonEmptyString, _TDict), _TNone)), + ("beparams", None, _TOr(_TDictOf(_TNonEmptyString, _TDict), _TNone)), + ("os_hvp", None, _TOr(_TDictOf(_TNonEmptyString, _TDict), _TNone)), + ("osparams", None, _TOr(_TDictOf(_TNonEmptyString, _TDict), _TNone)), + ("candidate_pool_size", None, _TOr(_TStrictPositiveInt, _TNone)), + ("uid_pool", None, _NoType), + ("add_uids", None, _NoType), + ("remove_uids", None, _NoType), + ("maintain_node_health", None, _TMaybeBool), + ("nicparams", None, _TOr(_TDict, _TNone)), + ("drbd_helper", None, _TOr(_TString, _TNone)), + ("default_iallocator", None, _TMaybeString), + ("reserved_lvs", None, _TOr(_TListOf(_TNonEmptyString), _TNone)), + ] REQ_BGL = False def CheckArguments(self): """Check parameters """ - for attr in ["candidate_pool_size", - "uid_pool", "add_uids", "remove_uids"]: - if not hasattr(self.op, attr): - setattr(self.op, attr, None) - - if self.op.candidate_pool_size is not None: - try: - self.op.candidate_pool_size = int(self.op.candidate_pool_size) - except (ValueError, TypeError), err: - raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" % - str(err), errors.ECODE_INVAL) - if self.op.candidate_pool_size < 1: - raise errors.OpPrereqError("At least one master candidate needed", - errors.ECODE_INVAL) - - _CheckBooleanOpField(self.op, "maintain_node_health") - if self.op.uid_pool: uidpool.CheckUidPool(self.op.uid_pool) @@ -2325,13 +2650,15 @@ class LUSetClusterParams(LogicalUnit): """ if self.op.vg_name is not None and not self.op.vg_name: - instances = self.cfg.GetAllInstancesInfo().values() - for inst in instances: - for disk in inst.disks: - if _RecursiveCheckIfLVMBased(disk): - raise errors.OpPrereqError("Cannot disable lvm storage while" - " lvm-based instances exist", - errors.ECODE_INVAL) + if self.cfg.HasAnyDiskOfType(constants.LD_LV): + raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based" + " instances exist", errors.ECODE_INVAL) + + if self.op.drbd_helper is not None and not self.op.drbd_helper: + if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8): + raise errors.OpPrereqError("Cannot disable drbd helper while" + " drbd-based instances exist", + errors.ECODE_INVAL) node_list = self.acquired_locks[locking.LEVEL_NODE] @@ -2352,17 +2679,33 @@ class LUSetClusterParams(LogicalUnit): raise errors.OpPrereqError("Error on node '%s': %s" % (node, vgstatus), errors.ECODE_ENVIRON) + if self.op.drbd_helper: + # checks given drbd helper on all nodes + helpers = self.rpc.call_drbd_helper(node_list) + for node in node_list: + ninfo = self.cfg.GetNodeInfo(node) + if ninfo.offline: + self.LogInfo("Not checking drbd helper on offline node %s", node) + continue + msg = helpers[node].fail_msg + if msg: + raise errors.OpPrereqError("Error checking drbd helper on node" + " '%s': %s" % (node, msg), + errors.ECODE_ENVIRON) + node_helper = helpers[node].payload + if node_helper != self.op.drbd_helper: + raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" % + (node, node_helper), errors.ECODE_ENVIRON) + self.cluster = cluster = self.cfg.GetClusterInfo() # validate params changes if self.op.beparams: utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES) - self.new_beparams = objects.FillDict( - cluster.beparams[constants.PP_DEFAULT], self.op.beparams) + self.new_beparams = cluster.SimpleFillBE(self.op.beparams) if self.op.nicparams: utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES) - self.new_nicparams = objects.FillDict( - cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams) + self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams) objects.NIC.CheckParameterSyntax(self.new_nicparams) nic_errors = [] @@ -2391,9 +2734,6 @@ class LUSetClusterParams(LogicalUnit): # hypervisor list/parameters self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {}) if self.op.hvparams: - if not isinstance(self.op.hvparams, dict): - raise errors.OpPrereqError("Invalid 'hvparams' parameter on input", - errors.ECODE_INVAL) for hv_name, hv_dict in self.op.hvparams.items(): if hv_name not in self.new_hvparams: self.new_hvparams[hv_name] = hv_dict @@ -2403,13 +2743,7 @@ class LUSetClusterParams(LogicalUnit): # os hypervisor parameters self.new_os_hvp = objects.FillDict(cluster.os_hvp, {}) if self.op.os_hvp: - if not isinstance(self.op.os_hvp, dict): - raise errors.OpPrereqError("Invalid 'os_hvp' parameter on input", - errors.ECODE_INVAL) for os_name, hvs in self.op.os_hvp.items(): - if not isinstance(hvs, dict): - raise errors.OpPrereqError(("Invalid 'os_hvp' parameter on" - " input"), errors.ECODE_INVAL) if os_name not in self.new_os_hvp: self.new_os_hvp[os_name] = hvs else: @@ -2419,19 +2753,27 @@ class LUSetClusterParams(LogicalUnit): else: self.new_os_hvp[os_name][hv_name].update(hv_dict) + # os parameters + self.new_osp = objects.FillDict(cluster.osparams, {}) + if self.op.osparams: + for os_name, osp in self.op.osparams.items(): + if os_name not in self.new_osp: + self.new_osp[os_name] = {} + + self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp, + use_none=True) + + if not self.new_osp[os_name]: + # we removed all parameters + del self.new_osp[os_name] + else: + # check the parameter validity (remote check) + _CheckOSParams(self, False, [self.cfg.GetMasterNode()], + os_name, self.new_osp[os_name]) + # changes to the hypervisor list if self.op.enabled_hypervisors is not None: self.hv_list = self.op.enabled_hypervisors - if not self.hv_list: - raise errors.OpPrereqError("Enabled hypervisors list must contain at" - " least one member", - errors.ECODE_INVAL) - invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES - if invalid_hvs: - raise errors.OpPrereqError("Enabled hypervisors contains invalid" - " entries: %s" % - utils.CommaJoin(invalid_hvs), - errors.ECODE_INVAL) for hv in self.hv_list: # if the hypervisor doesn't already exist in the cluster # hvparams, we initialize it to empty, and then (in both @@ -2470,6 +2812,14 @@ class LUSetClusterParams(LogicalUnit): hv_class.CheckParameterSyntax(new_osp) _CheckHVParams(self, node_list, hv_name, new_osp) + if self.op.default_iallocator: + alloc_script = utils.FindFile(self.op.default_iallocator, + constants.IALLOCATOR_SEARCH_PATH, + os.path.isfile) + if alloc_script is None: + raise errors.OpPrereqError("Invalid default iallocator script '%s'" + " specified" % self.op.default_iallocator, + errors.ECODE_INVAL) def Exec(self, feedback_fn): """Change the parameters of the cluster. @@ -2484,6 +2834,15 @@ class LUSetClusterParams(LogicalUnit): else: feedback_fn("Cluster LVM configuration already in desired" " state, not changing") + if self.op.drbd_helper is not None: + new_helper = self.op.drbd_helper + if not new_helper: + new_helper = None + if new_helper != self.cfg.GetDRBDHelper(): + self.cfg.SetDRBDHelper(new_helper) + else: + feedback_fn("Cluster DRBD helper already in desired state," + " not changing") if self.op.hvparams: self.cluster.hvparams = self.new_hvparams if self.op.os_hvp: @@ -2495,6 +2854,8 @@ class LUSetClusterParams(LogicalUnit): self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams if self.op.nicparams: self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams + if self.op.osparams: + self.cluster.osparams = self.new_osp if self.op.candidate_pool_size is not None: self.cluster.candidate_pool_size = self.op.candidate_pool_size @@ -2513,6 +2874,12 @@ class LUSetClusterParams(LogicalUnit): if self.op.uid_pool is not None: self.cluster.uid_pool = self.op.uid_pool + if self.op.default_iallocator is not None: + self.cluster.default_iallocator = self.op.default_iallocator + + if self.op.reserved_lvs is not None: + self.cluster.reserved_lvs = self.op.reserved_lvs + self.cfg.Update(self.cluster, feedback_fn) @@ -2541,6 +2908,7 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None): constants.RAPI_CERT_FILE, constants.RAPI_USERS_FILE, constants.CONFD_HMAC_KEY, + constants.CLUSTER_DOMAIN_SECRET_FILE, ]) enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors @@ -2566,7 +2934,6 @@ class LURedistributeConfig(NoHooksLU): This is a very simple LU. """ - _OP_REQP = [] REQ_BGL = False def ExpandNames(self): @@ -2575,11 +2942,6 @@ class LURedistributeConfig(NoHooksLU): } self.share_locks[locking.LEVEL_NODE] = 1 - def CheckPrereq(self): - """Check prerequisites. - - """ - def Exec(self, feedback_fn): """Redistribute the configuration. @@ -2636,7 +2998,8 @@ def _WaitForSync(lu, instance, disks=None, oneshot=False): if mstat.sync_percent is not None: done = False if mstat.estimated_time is not None: - rem_time = "%d estimated seconds remaining" % mstat.estimated_time + rem_time = ("%s remaining (estimated)" % + utils.FormatSeconds(mstat.estimated_time)) max_time = mstat.estimated_time else: rem_time = "no time estimate" @@ -2700,14 +3063,16 @@ class LUDiagnoseOS(NoHooksLU): """Logical unit for OS diagnose/query. """ - _OP_REQP = ["output_fields", "names"] + _OP_PARAMS = [ + _POutputFields, + ("names", _EmptyList, _TListOf(_TNonEmptyString)), + ] REQ_BGL = False _FIELDS_STATIC = utils.FieldSet() - _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants") - # Fields that need calculation of global os validity - _FIELDS_NEEDVALID = frozenset(["valid", "variants"]) + _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status", "variants", + "parameters", "api_versions") - def ExpandNames(self): + def CheckArguments(self): if self.op.names: raise errors.OpPrereqError("Selective OS query not supported", errors.ECODE_INVAL) @@ -2716,6 +3081,7 @@ class LUDiagnoseOS(NoHooksLU): dynamic=self._FIELDS_DYNAMIC, selected=self.op.output_fields) + def ExpandNames(self): # Lock all nodes, in shared mode # Temporary removal of locks, should be reverted later # TODO: reintroduce locks when they are lighter-weight @@ -2723,11 +3089,6 @@ class LUDiagnoseOS(NoHooksLU): #self.share_locks[locking.LEVEL_NODE] = 1 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET - def CheckPrereq(self): - """Check prerequisites. - - """ - @staticmethod def _DiagnoseByOS(rlist): """Remaps a per-node return list into an a per-os per-node dictionary @@ -2735,12 +3096,13 @@ class LUDiagnoseOS(NoHooksLU): @param rlist: a map with node names as keys and OS objects as values @rtype: dict - @return: a dictionary with osnames as keys and as value another map, with - nodes as keys and tuples of (path, status, diagnose) as values, eg:: + @return: a dictionary with osnames as keys and as value another + map, with nodes as keys and tuples of (path, status, diagnose, + variants, parameters, api_versions) as values, eg:: - {"debian-etch": {"node1": [(/usr/lib/..., True, ""), + {"debian-etch": {"node1": [(/usr/lib/..., True, "", [], []), (/srv/..., False, "invalid api")], - "node2": [(/srv/..., True, "")]} + "node2": [(/srv/..., True, "", [], [])]} } """ @@ -2753,14 +3115,18 @@ class LUDiagnoseOS(NoHooksLU): for node_name, nr in rlist.items(): if nr.fail_msg or not nr.payload: continue - for name, path, status, diagnose, variants in nr.payload: + for (name, path, status, diagnose, variants, + params, api_versions) in nr.payload: if name not in all_os: # build a list of nodes for this os containing empty lists # for each node in node_list all_os[name] = {} for nname in good_nodes: all_os[name][nname] = [] - all_os[name][node_name].append((path, status, diagnose, variants)) + # convert params from [name, help] to (name, help) + params = [tuple(v) for v in params] + all_os[name][node_name].append((path, status, diagnose, + variants, params, api_versions)) return all_os def Exec(self, feedback_fn): @@ -2771,25 +3137,25 @@ class LUDiagnoseOS(NoHooksLU): node_data = self.rpc.call_os_diagnose(valid_nodes) pol = self._DiagnoseByOS(node_data) output = [] - calc_valid = self._FIELDS_NEEDVALID.intersection(self.op.output_fields) - calc_variants = "variants" in self.op.output_fields for os_name, os_data in pol.items(): row = [] - if calc_valid: - valid = True - variants = None - for osl in os_data.values(): - valid = valid and osl and osl[0][1] - if not valid: - variants = None - break - if calc_variants: - node_variants = osl[0][3] - if variants is None: - variants = node_variants - else: - variants = [v for v in variants if v in node_variants] + valid = True + (variants, params, api_versions) = null_state = (set(), set(), set()) + for idx, osl in enumerate(os_data.values()): + valid = bool(valid and osl and osl[0][1]) + if not valid: + (variants, params, api_versions) = null_state + break + node_variants, node_params, node_api = osl[0][3:6] + if idx == 0: # first entry + variants = set(node_variants) + params = set(node_params) + api_versions = set(node_api) + else: # keep consistency + variants.intersection_update(node_variants) + params.intersection_update(node_params) + api_versions.intersection_update(node_api) for field in self.op.output_fields: if field == "name": @@ -2802,7 +3168,11 @@ class LUDiagnoseOS(NoHooksLU): for node_name, nos_list in os_data.items(): val[node_name] = nos_list elif field == "variants": - val = variants + val = list(variants) + elif field == "parameters": + val = list(params) + elif field == "api_versions": + val = list(api_versions) else: raise errors.ParameterError(field) row.append(val) @@ -2817,7 +3187,9 @@ class LURemoveNode(LogicalUnit): """ HPATH = "node-remove" HTYPE = constants.HTYPE_NODE - _OP_REQP = ["node_name"] + _OP_PARAMS = [ + _PNodeName, + ] def BuildHooksEnv(self): """Build hooks env. @@ -2910,7 +3282,11 @@ class LUQueryNodes(NoHooksLU): """ # pylint: disable-msg=W0142 - _OP_REQP = ["output_fields", "names", "use_locking"] + _OP_PARAMS = [ + _POutputFields, + ("names", _EmptyList, _TListOf(_TNonEmptyString)), + ("use_locking", False, _TBool), + ] REQ_BGL = False _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid", @@ -2931,11 +3307,12 @@ class LUQueryNodes(NoHooksLU): "role"] + _SIMPLE_FIELDS ) - def ExpandNames(self): + def CheckArguments(self): _CheckOutputFields(static=self._FIELDS_STATIC, dynamic=self._FIELDS_DYNAMIC, selected=self.op.output_fields) + def ExpandNames(self): self.needed_locks = {} self.share_locks[locking.LEVEL_NODE] = 1 @@ -2950,14 +3327,6 @@ class LUQueryNodes(NoHooksLU): # if we don't request only static fields, we need to lock the nodes self.needed_locks[locking.LEVEL_NODE] = self.wanted - def CheckPrereq(self): - """Check prerequisites. - - """ - # The validation of the node list is done in the _GetWantedNodes, - # if non empty, and if empty, there's no validation to do - pass - def Exec(self, feedback_fn): """Computes the list of nodes and their attributes. @@ -3070,16 +3439,20 @@ class LUQueryNodeVolumes(NoHooksLU): """Logical unit for getting volumes on node(s). """ - _OP_REQP = ["nodes", "output_fields"] + _OP_PARAMS = [ + ("nodes", _EmptyList, _TListOf(_TNonEmptyString)), + ("output_fields", _NoDefault, _TListOf(_TNonEmptyString)), + ] REQ_BGL = False _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance") _FIELDS_STATIC = utils.FieldSet("node") - def ExpandNames(self): + def CheckArguments(self): _CheckOutputFields(static=self._FIELDS_STATIC, dynamic=self._FIELDS_DYNAMIC, selected=self.op.output_fields) + def ExpandNames(self): self.needed_locks = {} self.share_locks[locking.LEVEL_NODE] = 1 if not self.op.nodes: @@ -3088,19 +3461,11 @@ class LUQueryNodeVolumes(NoHooksLU): self.needed_locks[locking.LEVEL_NODE] = \ _GetWantedNodes(self, self.op.nodes) - def CheckPrereq(self): - """Check prerequisites. - - This checks that the fields required are valid output fields. - - """ - self.nodes = self.acquired_locks[locking.LEVEL_NODE] - def Exec(self, feedback_fn): """Computes the list of nodes and their attributes. """ - nodenames = self.nodes + nodenames = self.acquired_locks[locking.LEVEL_NODE] volumes = self.rpc.call_node_volumes(nodenames) ilist = [self.cfg.GetInstanceInfo(iname) for iname @@ -3156,13 +3521,16 @@ class LUQueryNodeStorage(NoHooksLU): """Logical unit for getting information on storage units on node(s). """ - _OP_REQP = ["nodes", "storage_type", "output_fields"] - REQ_BGL = False _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE) + _OP_PARAMS = [ + ("nodes", _EmptyList, _TListOf(_TNonEmptyString)), + ("storage_type", _NoDefault, _CheckStorageType), + ("output_fields", _NoDefault, _TListOf(_TNonEmptyString)), + ("name", None, _TMaybeString), + ] + REQ_BGL = False def CheckArguments(self): - _CheckStorageType(self.op.storage_type) - _CheckOutputFields(static=self._FIELDS_STATIC, dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS), selected=self.op.output_fields) @@ -3177,20 +3545,12 @@ class LUQueryNodeStorage(NoHooksLU): else: self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET - def CheckPrereq(self): - """Check prerequisites. - - This checks that the fields required are valid output fields. - - """ - self.op.name = getattr(self.op, "name", None) - - self.nodes = self.acquired_locks[locking.LEVEL_NODE] - def Exec(self, feedback_fn): """Computes the list of nodes and their attributes. """ + self.nodes = self.acquired_locks[locking.LEVEL_NODE] + # Always get name to sort by if constants.SF_NAME in self.op.output_fields: fields = self.op.output_fields[:] @@ -3250,23 +3610,17 @@ class LUModifyNodeStorage(NoHooksLU): """Logical unit for modifying a storage volume on a node. """ - _OP_REQP = ["node_name", "storage_type", "name", "changes"] + _OP_PARAMS = [ + _PNodeName, + ("storage_type", _NoDefault, _CheckStorageType), + ("name", _NoDefault, _TNonEmptyString), + ("changes", _NoDefault, _TDict), + ] REQ_BGL = False def CheckArguments(self): - self.opnode_name = _ExpandNodeName(self.cfg, self.op.node_name) - - _CheckStorageType(self.op.storage_type) - - def ExpandNames(self): - self.needed_locks = { - locking.LEVEL_NODE: self.op.node_name, - } - - def CheckPrereq(self): - """Check prerequisites. + self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) - """ storage_type = self.op.storage_type try: @@ -3283,6 +3637,11 @@ class LUModifyNodeStorage(NoHooksLU): (storage_type, list(diff)), errors.ECODE_INVAL) + def ExpandNames(self): + self.needed_locks = { + locking.LEVEL_NODE: self.op.node_name, + } + def Exec(self, feedback_fn): """Computes the list of nodes and their attributes. @@ -3301,11 +3660,18 @@ class LUAddNode(LogicalUnit): """ HPATH = "node-add" HTYPE = constants.HTYPE_NODE - _OP_REQP = ["node_name"] + _OP_PARAMS = [ + _PNodeName, + ("primary_ip", None, _NoType), + ("secondary_ip", None, _TMaybeString), + ("readd", False, _TBool), + ] def CheckArguments(self): # validate/normalize the node name - self.op.node_name = utils.HostInfo.NormalizeName(self.op.node_name) + self.hostname = netutils.GetHostname(name=self.op.node_name, + family=self.cfg.GetPrimaryIPFamily()) + self.op.node_name = self.hostname.name def BuildHooksEnv(self): """Build hooks env. @@ -3334,20 +3700,17 @@ class LUAddNode(LogicalUnit): Any errors are signaled by raising errors.OpPrereqError. """ - node_name = self.op.node_name cfg = self.cfg + hostname = self.hostname + node = hostname.name + primary_ip = self.op.primary_ip = hostname.ip + if self.op.secondary_ip is None: + self.op.secondary_ip = primary_ip - dns_data = utils.GetHostInfo(node_name) - - node = dns_data.name - primary_ip = self.op.primary_ip = dns_data.ip - secondary_ip = getattr(self.op, "secondary_ip", None) - if secondary_ip is None: - secondary_ip = primary_ip - if not utils.IsValidIP(secondary_ip): - raise errors.OpPrereqError("Invalid secondary IP given", - errors.ECODE_INVAL) - self.op.secondary_ip = secondary_ip + secondary_ip = self.op.secondary_ip + if not netutils.IP4Address.IsValid(secondary_ip): + raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4" + " address" % secondary_ip, errors.ECODE_INVAL) node_list = cfg.GetNodeList() if not self.op.readd and node in node_list: @@ -3396,13 +3759,13 @@ class LUAddNode(LogicalUnit): errors.ECODE_INVAL) # checks reachability - if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT): + if not netutils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT): raise errors.OpPrereqError("Node not reachable by ping", errors.ECODE_ENVIRON) if not newbie_singlehomed: # check reachability from my secondary ip to newbie's secondary ip - if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT, + if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT, source=myself.secondary_ip): raise errors.OpPrereqError("Node secondary ip not reachable by TCP" " based ping to noded port", @@ -3459,27 +3822,10 @@ class LUAddNode(LogicalUnit): " node version %s" % (constants.PROTOCOL_VERSION, result.payload)) - # setup ssh on node - if self.cfg.GetClusterInfo().modify_ssh_setup: - logging.info("Copy ssh key to node %s", node) - priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS) - keyarray = [] - keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB, - constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB, - priv_key, pub_key] - - for i in keyfiles: - keyarray.append(utils.ReadFile(i)) - - result = self.rpc.call_node_add(node, keyarray[0], keyarray[1], - keyarray[2], keyarray[3], keyarray[4], - keyarray[5]) - result.Raise("Cannot transfer ssh keys to the new node") - # Add node to our /etc/hosts, and add key to known_hosts if self.cfg.GetClusterInfo().modify_etc_hosts: # FIXME: this should be done via an rpc call to node daemon - utils.AddHostToEtcHosts(new_node.name) + utils.AddHostToEtcHosts(self.hostname) if new_node.secondary_ip != new_node.primary_ip: result = self.rpc.call_node_has_ip_address(new_node.name, @@ -3532,15 +3878,18 @@ class LUSetNodeParams(LogicalUnit): """ HPATH = "node-modify" HTYPE = constants.HTYPE_NODE - _OP_REQP = ["node_name"] + _OP_PARAMS = [ + _PNodeName, + ("master_candidate", None, _TMaybeBool), + ("offline", None, _TMaybeBool), + ("drained", None, _TMaybeBool), + ("auto_promote", False, _TBool), + _PForce, + ] REQ_BGL = False def CheckArguments(self): self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) - _CheckBooleanOpField(self.op, 'master_candidate') - _CheckBooleanOpField(self.op, 'offline') - _CheckBooleanOpField(self.op, 'drained') - _CheckBooleanOpField(self.op, 'auto_promote') all_mods = [self.op.offline, self.op.master_candidate, self.op.drained] if all_mods.count(None) == 3: raise errors.OpPrereqError("Please pass at least one modification", @@ -3597,7 +3946,7 @@ class LUSetNodeParams(LogicalUnit): # we can't change the master's node flags if self.op.node_name == self.cfg.GetMasterNode(): raise errors.OpPrereqError("The master role can be changed" - " only via masterfailover", + " only via master-failover", errors.ECODE_INVAL) @@ -3693,7 +4042,10 @@ class LUPowercycleNode(NoHooksLU): """Powercycles a node. """ - _OP_REQP = ["node_name", "force"] + _OP_PARAMS = [ + _PNodeName, + _PForce, + ] REQ_BGL = False def CheckArguments(self): @@ -3712,14 +4064,6 @@ class LUPowercycleNode(NoHooksLU): """ self.needed_locks = {} - def CheckPrereq(self): - """Check prerequisites. - - This LU has no prereqs. - - """ - pass - def Exec(self, feedback_fn): """Reboots a node. @@ -3734,18 +4078,11 @@ class LUQueryClusterInfo(NoHooksLU): """Query cluster configuration. """ - _OP_REQP = [] REQ_BGL = False def ExpandNames(self): self.needed_locks = {} - def CheckPrereq(self): - """No prerequsites needed for this LU. - - """ - pass - def Exec(self, feedback_fn): """Return cluster config. @@ -3760,6 +4097,11 @@ class LUQueryClusterInfo(NoHooksLU): if hv_name in cluster.enabled_hypervisors: os_hvp[os_name][hv_name] = hv_params + # Convert ip_family to ip_version + primary_ip_version = constants.IP4_VERSION + if cluster.primary_ip_family == netutils.IP6Address.family: + primary_ip_version = constants.IP6_VERSION + result = { "software_version": constants.RELEASE_VERSION, "protocol_version": constants.PROTOCOL_VERSION, @@ -3775,10 +4117,12 @@ class LUQueryClusterInfo(NoHooksLU): for hypervisor_name in cluster.enabled_hypervisors]), "os_hvp": os_hvp, "beparams": cluster.beparams, + "osparams": cluster.osparams, "nicparams": cluster.nicparams, "candidate_pool_size": cluster.candidate_pool_size, "master_netdev": cluster.master_netdev, "volume_group_name": cluster.volume_group_name, + "drbd_usermode_helper": cluster.drbd_usermode_helper, "file_storage_dir": cluster.file_storage_dir, "maintain_node_health": cluster.maintain_node_health, "ctime": cluster.ctime, @@ -3786,6 +4130,9 @@ class LUQueryClusterInfo(NoHooksLU): "uuid": cluster.uuid, "tags": list(cluster.GetTags()), "uid_pool": cluster.uid_pool, + "default_iallocator": cluster.default_iallocator, + "reserved_lvs": cluster.reserved_lvs, + "primary_ip_version": primary_ip_version, } return result @@ -3795,24 +4142,19 @@ class LUQueryConfigValues(NoHooksLU): """Return configuration values. """ - _OP_REQP = [] + _OP_PARAMS = [_POutputFields] REQ_BGL = False _FIELDS_DYNAMIC = utils.FieldSet() _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag", "watcher_pause") - def ExpandNames(self): - self.needed_locks = {} - + def CheckArguments(self): _CheckOutputFields(static=self._FIELDS_STATIC, dynamic=self._FIELDS_DYNAMIC, selected=self.op.output_fields) - def CheckPrereq(self): - """No prerequisites. - - """ - pass + def ExpandNames(self): + self.needed_locks = {} def Exec(self, feedback_fn): """Dump a representation of the cluster config to the standard output. @@ -3838,7 +4180,10 @@ class LUActivateInstanceDisks(NoHooksLU): """Bring up an instance's disks. """ - _OP_REQP = ["instance_name"] + _OP_PARAMS = [ + _PInstanceName, + ("ignore_size", False, _TBool), + ] REQ_BGL = False def ExpandNames(self): @@ -3860,8 +4205,6 @@ class LUActivateInstanceDisks(NoHooksLU): assert self.instance is not None, \ "Cannot retrieve locked instance %s" % self.op.instance_name _CheckNodeOnline(self, self.instance.primary_node) - if not hasattr(self.op, "ignore_size"): - self.op.ignore_size = False def Exec(self, feedback_fn): """Activate the disks. @@ -3983,7 +4326,9 @@ class LUDeactivateInstanceDisks(NoHooksLU): """Shutdown an instance's disks. """ - _OP_REQP = ["instance_name"] + _OP_PARAMS = [ + _PInstanceName, + ] REQ_BGL = False def ExpandNames(self): @@ -4146,9 +4491,20 @@ class LUStartupInstance(LogicalUnit): """ HPATH = "instance-start" HTYPE = constants.HTYPE_INSTANCE - _OP_REQP = ["instance_name", "force"] + _OP_PARAMS = [ + _PInstanceName, + _PForce, + ("hvparams", _EmptyDict, _TDict), + ("beparams", _EmptyDict, _TDict), + ] REQ_BGL = False + def CheckArguments(self): + # extra beparams + if self.op.beparams: + # fill the beparams dict + utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES) + def ExpandNames(self): self._ExpandAndLockInstance() @@ -4175,35 +4531,16 @@ class LUStartupInstance(LogicalUnit): assert self.instance is not None, \ "Cannot retrieve locked instance %s" % self.op.instance_name - # extra beparams - self.beparams = getattr(self.op, "beparams", {}) - if self.beparams: - if not isinstance(self.beparams, dict): - raise errors.OpPrereqError("Invalid beparams passed: %s, expected" - " dict" % (type(self.beparams), ), - errors.ECODE_INVAL) - # fill the beparams dict - utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES) - self.op.beparams = self.beparams - # extra hvparams - self.hvparams = getattr(self.op, "hvparams", {}) - if self.hvparams: - if not isinstance(self.hvparams, dict): - raise errors.OpPrereqError("Invalid hvparams passed: %s, expected" - " dict" % (type(self.hvparams), ), - errors.ECODE_INVAL) - + if self.op.hvparams: # check hypervisor parameter syntax (locally) cluster = self.cfg.GetClusterInfo() - utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES) - filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor], - instance.hvparams) - filled_hvp.update(self.hvparams) + utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES) + filled_hvp = cluster.FillHV(instance) + filled_hvp.update(self.op.hvparams) hv_type = hypervisor.GetHypervisor(instance.hypervisor) hv_type.CheckParameterSyntax(filled_hvp) _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp) - self.op.hvparams = self.hvparams _CheckNodeOnline(self, instance.primary_node) @@ -4235,7 +4572,7 @@ class LUStartupInstance(LogicalUnit): _StartInstanceDisks(self, instance, force) result = self.rpc.call_instance_start(node_current, instance, - self.hvparams, self.beparams) + self.op.hvparams, self.op.beparams) msg = result.fail_msg if msg: _ShutdownInstanceDisks(self, instance) @@ -4248,24 +4585,15 @@ class LURebootInstance(LogicalUnit): """ HPATH = "instance-reboot" HTYPE = constants.HTYPE_INSTANCE - _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"] + _OP_PARAMS = [ + _PInstanceName, + ("ignore_secondaries", False, _TBool), + ("reboot_type", _NoDefault, _TElemOf(constants.REBOOT_TYPES)), + _PShutdownTimeout, + ] REQ_BGL = False - def CheckArguments(self): - """Check the arguments. - - """ - self.shutdown_timeout = getattr(self.op, "shutdown_timeout", - constants.DEFAULT_SHUTDOWN_TIMEOUT) - def ExpandNames(self): - if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT, - constants.INSTANCE_REBOOT_HARD, - constants.INSTANCE_REBOOT_FULL]: - raise errors.ParameterError("reboot type not in [%s, %s, %s]" % - (constants.INSTANCE_REBOOT_SOFT, - constants.INSTANCE_REBOOT_HARD, - constants.INSTANCE_REBOOT_FULL)) self._ExpandAndLockInstance() def BuildHooksEnv(self): @@ -4277,7 +4605,7 @@ class LURebootInstance(LogicalUnit): env = { "IGNORE_SECONDARIES": self.op.ignore_secondaries, "REBOOT_TYPE": self.op.reboot_type, - "SHUTDOWN_TIMEOUT": self.shutdown_timeout, + "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout, } env.update(_BuildInstanceHookEnvByObject(self, self.instance)) nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) @@ -4314,11 +4642,11 @@ class LURebootInstance(LogicalUnit): self.cfg.SetDiskID(disk, node_current) result = self.rpc.call_instance_reboot(node_current, instance, reboot_type, - self.shutdown_timeout) + self.op.shutdown_timeout) result.Raise("Could not reboot instance") else: result = self.rpc.call_instance_shutdown(node_current, instance, - self.shutdown_timeout) + self.op.shutdown_timeout) result.Raise("Could not shutdown instance for full reboot") _ShutdownInstanceDisks(self, instance) _StartInstanceDisks(self, instance, ignore_secondaries) @@ -4338,16 +4666,12 @@ class LUShutdownInstance(LogicalUnit): """ HPATH = "instance-stop" HTYPE = constants.HTYPE_INSTANCE - _OP_REQP = ["instance_name"] + _OP_PARAMS = [ + _PInstanceName, + ("timeout", constants.DEFAULT_SHUTDOWN_TIMEOUT, _TPositiveInt), + ] REQ_BGL = False - def CheckArguments(self): - """Check the arguments. - - """ - self.timeout = getattr(self.op, "timeout", - constants.DEFAULT_SHUTDOWN_TIMEOUT) - def ExpandNames(self): self._ExpandAndLockInstance() @@ -4358,7 +4682,7 @@ class LUShutdownInstance(LogicalUnit): """ env = _BuildInstanceHookEnvByObject(self, self.instance) - env["TIMEOUT"] = self.timeout + env["TIMEOUT"] = self.op.timeout nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) return env, nl, nl @@ -4379,7 +4703,7 @@ class LUShutdownInstance(LogicalUnit): """ instance = self.instance node_current = instance.primary_node - timeout = self.timeout + timeout = self.op.timeout self.cfg.MarkInstanceDown(instance.name) result = self.rpc.call_instance_shutdown(node_current, instance, timeout) msg = result.fail_msg @@ -4395,7 +4719,11 @@ class LUReinstallInstance(LogicalUnit): """ HPATH = "instance-reinstall" HTYPE = constants.HTYPE_INSTANCE - _OP_REQP = ["instance_name"] + _OP_PARAMS = [ + _PInstanceName, + ("os_type", None, _TMaybeString), + ("force_variant", False, _TBool), + ] REQ_BGL = False def ExpandNames(self): @@ -4428,8 +4756,6 @@ class LUReinstallInstance(LogicalUnit): errors.ECODE_INVAL) _CheckInstanceDown(self, instance, "cannot reinstall") - self.op.os_type = getattr(self.op, "os_type", None) - self.op.force_variant = getattr(self.op, "force_variant", False) if self.op.os_type is not None: # OS verification pnode = _ExpandNodeName(self.cfg, instance.primary_node) @@ -4466,21 +4792,12 @@ class LURecreateInstanceDisks(LogicalUnit): """ HPATH = "instance-recreate-disks" HTYPE = constants.HTYPE_INSTANCE - _OP_REQP = ["instance_name", "disks"] + _OP_PARAMS = [ + _PInstanceName, + ("disks", _EmptyList, _TListOf(_TPositiveInt)), + ] REQ_BGL = False - def CheckArguments(self): - """Check the arguments. - - """ - if not isinstance(self.op.disks, list): - raise errors.OpPrereqError("Invalid disks parameter", errors.ECODE_INVAL) - for item in self.op.disks: - if (not isinstance(item, int) or - item < 0): - raise errors.OpPrereqError("Invalid disk specification '%s'" % - str(item), errors.ECODE_INVAL) - def ExpandNames(self): self._ExpandAndLockInstance() @@ -4539,7 +4856,21 @@ class LURenameInstance(LogicalUnit): """ HPATH = "instance-rename" HTYPE = constants.HTYPE_INSTANCE - _OP_REQP = ["instance_name", "new_name"] + _OP_PARAMS = [ + _PInstanceName, + ("new_name", _NoDefault, _TNonEmptyString), + ("ip_check", False, _TBool), + ("name_check", True, _TBool), + ] + + def CheckArguments(self): + """Check arguments. + + """ + if self.op.ip_check and not self.op.name_check: + # TODO: make the ip check more flexible and not depend on the name check + raise errors.OpPrereqError("Cannot do ip check without a name check", + errors.ECODE_INVAL) def BuildHooksEnv(self): """Build hooks env. @@ -4566,22 +4897,21 @@ class LURenameInstance(LogicalUnit): _CheckInstanceDown(self, instance, "cannot rename") self.instance = instance - # new name verification - name_info = utils.GetHostInfo(self.op.new_name) + new_name = self.op.new_name + if self.op.name_check: + hostname = netutils.GetHostname(name=new_name) + new_name = hostname.name + if (self.op.ip_check and + netutils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT)): + raise errors.OpPrereqError("IP %s of instance %s already in use" % + (hostname.ip, new_name), + errors.ECODE_NOTUNIQUE) - self.op.new_name = new_name = name_info.name instance_list = self.cfg.GetInstanceList() if new_name in instance_list: raise errors.OpPrereqError("Instance '%s' is already in the cluster" % new_name, errors.ECODE_EXISTS) - if not getattr(self.op, "ignore_ip", False): - if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT): - raise errors.OpPrereqError("IP %s of instance %s already in use" % - (name_info.ip, new_name), - errors.ECODE_NOTUNIQUE) - - def Exec(self, feedback_fn): """Reinstall the instance. @@ -4623,6 +4953,8 @@ class LURenameInstance(LogicalUnit): finally: _ShutdownInstanceDisks(self, inst) + return inst.name + class LURemoveInstance(LogicalUnit): """Remove an instance. @@ -4630,16 +4962,13 @@ class LURemoveInstance(LogicalUnit): """ HPATH = "instance-remove" HTYPE = constants.HTYPE_INSTANCE - _OP_REQP = ["instance_name", "ignore_failures"] + _OP_PARAMS = [ + _PInstanceName, + ("ignore_failures", False, _TBool), + _PShutdownTimeout, + ] REQ_BGL = False - def CheckArguments(self): - """Check the arguments. - - """ - self.shutdown_timeout = getattr(self.op, "shutdown_timeout", - constants.DEFAULT_SHUTDOWN_TIMEOUT) - def ExpandNames(self): self._ExpandAndLockInstance() self.needed_locks[locking.LEVEL_NODE] = [] @@ -4656,7 +4985,7 @@ class LURemoveInstance(LogicalUnit): """ env = _BuildInstanceHookEnvByObject(self, self.instance) - env["SHUTDOWN_TIMEOUT"] = self.shutdown_timeout + env["SHUTDOWN_TIMEOUT"] = self.op.shutdown_timeout nl = [self.cfg.GetMasterNode()] nl_post = list(self.instance.all_nodes) + nl return env, nl, nl_post @@ -4680,7 +5009,7 @@ class LURemoveInstance(LogicalUnit): instance.name, instance.primary_node) result = self.rpc.call_instance_shutdown(instance.primary_node, instance, - self.shutdown_timeout) + self.op.shutdown_timeout) msg = result.fail_msg if msg: if self.op.ignore_failures: @@ -4690,18 +5019,29 @@ class LURemoveInstance(LogicalUnit): " node %s: %s" % (instance.name, instance.primary_node, msg)) - logging.info("Removing block devices for instance %s", instance.name) + _RemoveInstance(self, feedback_fn, instance, self.op.ignore_failures) - if not _RemoveDisks(self, instance): - if self.op.ignore_failures: - feedback_fn("Warning: can't remove instance's disks") - else: - raise errors.OpExecError("Can't remove instance's disks") - logging.info("Removing instance %s out of cluster config", instance.name) +def _RemoveInstance(lu, feedback_fn, instance, ignore_failures): + """Utility function to remove an instance. + + """ + logging.info("Removing block devices for instance %s", instance.name) + + if not _RemoveDisks(lu, instance): + if not ignore_failures: + raise errors.OpExecError("Can't remove instance's disks") + feedback_fn("Warning: can't remove instance's disks") - self.cfg.RemoveInstance(instance.name) - self.remove_locks[locking.LEVEL_INSTANCE] = instance.name + logging.info("Removing instance %s out of cluster config", instance.name) + + lu.cfg.RemoveInstance(instance.name) + + assert not lu.remove_locks.get(locking.LEVEL_INSTANCE), \ + "Instance lock removal conflict" + + # Remove lock for the instance + lu.remove_locks[locking.LEVEL_INSTANCE] = instance.name class LUQueryInstances(NoHooksLU): @@ -4709,7 +5049,11 @@ class LUQueryInstances(NoHooksLU): """ # pylint: disable-msg=W0142 - _OP_REQP = ["output_fields", "names", "use_locking"] + _OP_PARAMS = [ + ("output_fields", _NoDefault, _TListOf(_TNonEmptyString)), + ("names", _EmptyList, _TListOf(_TNonEmptyString)), + ("use_locking", False, _TBool), + ] REQ_BGL = False _SIMPLE_FIELDS = ["name", "os", "network_port", "hypervisor", "serial_no", "ctime", "mtime", "uuid"] @@ -4732,14 +5076,18 @@ class LUQueryInstances(NoHooksLU): if name not in constants.HVC_GLOBALS] + ["be/%s" % name for name in constants.BES_PARAMETERS]) - _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status") + _FIELDS_DYNAMIC = utils.FieldSet("oper_state", + "oper_ram", + "oper_vcpus", + "status") - def ExpandNames(self): + def CheckArguments(self): _CheckOutputFields(static=self._FIELDS_STATIC, dynamic=self._FIELDS_DYNAMIC, selected=self.op.output_fields) + def ExpandNames(self): self.needed_locks = {} self.share_locks[locking.LEVEL_INSTANCE] = 1 self.share_locks[locking.LEVEL_NODE] = 1 @@ -4760,12 +5108,6 @@ class LUQueryInstances(NoHooksLU): if level == locking.LEVEL_NODE and self.do_locking: self._LockInstancesNodes() - def CheckPrereq(self): - """Check prerequisites. - - """ - pass - def Exec(self, feedback_fn): """Computes the list of nodes and their attributes. @@ -4828,8 +5170,7 @@ class LUQueryInstances(NoHooksLU): iout = [] i_hv = cluster.FillHV(instance, skip_globals=True) i_be = cluster.FillBE(instance) - i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT], - nic.nicparams) for nic in instance.nics] + i_nicp = [cluster.SimpleFillNIC(nic.nicparams) for nic in instance.nics] for field in self.op.output_fields: st_match = self._FIELDS_STATIC.Matches(field) if field in self._SIMPLE_FIELDS: @@ -4869,6 +5210,13 @@ class LUQueryInstances(NoHooksLU): val = live_data[instance.name].get("memory", "?") else: val = "-" + elif field == "oper_vcpus": + if instance.primary_node in bad_nodes: + val = None + elif instance.name in live_data: + val = live_data[instance.name].get("vcpus", "?") + else: + val = "-" elif field == "vcpus": val = i_be[constants.BE_VCPUS] elif field == "disk_template": @@ -4993,16 +5341,13 @@ class LUFailoverInstance(LogicalUnit): """ HPATH = "instance-failover" HTYPE = constants.HTYPE_INSTANCE - _OP_REQP = ["instance_name", "ignore_consistency"] + _OP_PARAMS = [ + _PInstanceName, + ("ignore_consistency", False, _TBool), + _PShutdownTimeout, + ] REQ_BGL = False - def CheckArguments(self): - """Check the arguments. - - """ - self.shutdown_timeout = getattr(self.op, "shutdown_timeout", - constants.DEFAULT_SHUTDOWN_TIMEOUT) - def ExpandNames(self): self._ExpandAndLockInstance() self.needed_locks[locking.LEVEL_NODE] = [] @@ -5023,7 +5368,7 @@ class LUFailoverInstance(LogicalUnit): target_node = instance.secondary_nodes[0] env = { "IGNORE_CONSISTENCY": self.op.ignore_consistency, - "SHUTDOWN_TIMEOUT": self.shutdown_timeout, + "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout, "OLD_PRIMARY": source_node, "OLD_SECONDARY": target_node, "NEW_PRIMARY": target_node, @@ -5099,7 +5444,7 @@ class LUFailoverInstance(LogicalUnit): instance.name, source_node) result = self.rpc.call_instance_shutdown(source_node, instance, - self.shutdown_timeout) + self.op.shutdown_timeout) msg = result.fail_msg if msg: if self.op.ignore_consistency: @@ -5127,7 +5472,7 @@ class LUFailoverInstance(LogicalUnit): instance.name, target_node) disks_ok, _ = _AssembleInstanceDisks(self, instance, - ignore_secondaries=True) + ignore_secondaries=True) if not disks_ok: _ShutdownInstanceDisks(self, instance) raise errors.OpExecError("Can't activate the instance's disks") @@ -5150,7 +5495,12 @@ class LUMigrateInstance(LogicalUnit): """ HPATH = "instance-migrate" HTYPE = constants.HTYPE_INSTANCE - _OP_REQP = ["instance_name", "live", "cleanup"] + _OP_PARAMS = [ + _PInstanceName, + _PMigrationMode, + _PMigrationLive, + ("cleanup", False, _TBool), + ] REQ_BGL = False @@ -5161,7 +5511,7 @@ class LUMigrateInstance(LogicalUnit): self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE self._migrater = TLMigrateInstance(self, self.op.instance_name, - self.op.live, self.op.cleanup) + self.op.cleanup) self.tasklets = [self._migrater] def DeclareLocks(self, level): @@ -5178,7 +5528,7 @@ class LUMigrateInstance(LogicalUnit): source_node = instance.primary_node target_node = instance.secondary_nodes[0] env = _BuildInstanceHookEnvByObject(self, instance) - env["MIGRATE_LIVE"] = self.op.live + env["MIGRATE_LIVE"] = self._migrater.live env["MIGRATE_CLEANUP"] = self.op.cleanup env.update({ "OLD_PRIMARY": source_node, @@ -5198,16 +5548,13 @@ class LUMoveInstance(LogicalUnit): """ HPATH = "instance-move" HTYPE = constants.HTYPE_INSTANCE - _OP_REQP = ["instance_name", "target_node"] + _OP_PARAMS = [ + _PInstanceName, + ("target_node", _NoDefault, _TNonEmptyString), + _PShutdownTimeout, + ] REQ_BGL = False - def CheckArguments(self): - """Check the arguments. - - """ - self.shutdown_timeout = getattr(self.op, "shutdown_timeout", - constants.DEFAULT_SHUTDOWN_TIMEOUT) - def ExpandNames(self): self._ExpandAndLockInstance() target_node = _ExpandNodeName(self.cfg, self.op.target_node) @@ -5227,7 +5574,7 @@ class LUMoveInstance(LogicalUnit): """ env = { "TARGET_NODE": self.op.target_node, - "SHUTDOWN_TIMEOUT": self.shutdown_timeout, + "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout, } env.update(_BuildInstanceHookEnvByObject(self, self.instance)) nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node, @@ -5293,7 +5640,7 @@ class LUMoveInstance(LogicalUnit): instance.name, source_node) result = self.rpc.call_instance_shutdown(source_node, instance, - self.shutdown_timeout) + self.op.shutdown_timeout) msg = result.fail_msg if msg: if self.op.ignore_consistency: @@ -5380,7 +5727,11 @@ class LUMigrateNode(LogicalUnit): """ HPATH = "node-migrate" HTYPE = constants.HTYPE_NODE - _OP_REQP = ["node_name", "live"] + _OP_PARAMS = [ + _PNodeName, + _PMigrationMode, + _PMigrationLive, + ] REQ_BGL = False def ExpandNames(self): @@ -5400,7 +5751,7 @@ class LUMigrateNode(LogicalUnit): logging.debug("Migrating instance %s", inst.name) names.append(inst.name) - tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False)) + tasklets.append(TLMigrateInstance(self, inst.name, False)) self.tasklets = tasklets @@ -5427,7 +5778,14 @@ class LUMigrateNode(LogicalUnit): class TLMigrateInstance(Tasklet): - def __init__(self, lu, instance_name, live, cleanup): + """Tasklet class for instance migration. + + @type live: boolean + @ivar live: whether the migration will be done live or non-live; + this variable is initalized only after CheckPrereq has run + + """ + def __init__(self, lu, instance_name, cleanup): """Initializes this class. """ @@ -5435,8 +5793,8 @@ class TLMigrateInstance(Tasklet): # Parameters self.instance_name = instance_name - self.live = live self.cleanup = cleanup + self.live = False # will be overridden later def CheckPrereq(self): """Check prerequisites. @@ -5461,15 +5819,15 @@ class TLMigrateInstance(Tasklet): target_node = secondary_nodes[0] # check memory requirements on the secondary node - _CheckNodeFreeMemory(self, target_node, "migrating instance %s" % + _CheckNodeFreeMemory(self.lu, target_node, "migrating instance %s" % instance.name, i_be[constants.BE_MEMORY], instance.hypervisor) # check bridge existance - _CheckInstanceBridgesExist(self, instance, node=target_node) + _CheckInstanceBridgesExist(self.lu, instance, node=target_node) if not self.cleanup: - _CheckNodeNotDrained(self, target_node) + _CheckNodeNotDrained(self.lu, target_node) result = self.rpc.call_instance_migratable(instance.primary_node, instance) result.Raise("Can't migrate, please use failover", @@ -5477,6 +5835,25 @@ class TLMigrateInstance(Tasklet): self.instance = instance + if self.lu.op.live is not None and self.lu.op.mode is not None: + raise errors.OpPrereqError("Only one of the 'live' and 'mode'" + " parameters are accepted", + errors.ECODE_INVAL) + if self.lu.op.live is not None: + if self.lu.op.live: + self.lu.op.mode = constants.HT_MIGRATION_LIVE + else: + self.lu.op.mode = constants.HT_MIGRATION_NONLIVE + # reset the 'live' parameter to None so that repeated + # invocations of CheckPrereq do not raise an exception + self.lu.op.live = None + elif self.lu.op.mode is None: + # read the default value from the hypervisor + i_hv = self.cfg.GetClusterInfo().FillHV(instance, skip_globals=False) + self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE] + + self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE + def _WaitUntilSync(self): """Poll with custom rpc for disk sync. @@ -5658,7 +6035,7 @@ class TLMigrateInstance(Tasklet): self.feedback_fn("* checking disk consistency between source and target") for dev in instance.disks: - if not _CheckDiskConsistency(self, dev, target_node, False): + if not _CheckDiskConsistency(self.lu, dev, target_node, False): raise errors.OpExecError("Disk %s is degraded or not fully" " synchronized on target node," " aborting migrate." % dev.iv_name) @@ -6079,46 +6456,99 @@ def _CheckHVParams(lu, nodenames, hvname, hvparams): info.Raise("Hypervisor parameter validation failed on node %s" % node) +def _CheckOSParams(lu, required, nodenames, osname, osparams): + """OS parameters validation. + + @type lu: L{LogicalUnit} + @param lu: the logical unit for which we check + @type required: boolean + @param required: whether the validation should fail if the OS is not + found + @type nodenames: list + @param nodenames: the list of nodes on which we should check + @type osname: string + @param osname: the name of the hypervisor we should use + @type osparams: dict + @param osparams: the parameters which we need to check + @raise errors.OpPrereqError: if the parameters are not valid + + """ + result = lu.rpc.call_os_validate(required, nodenames, osname, + [constants.OS_VALIDATE_PARAMETERS], + osparams) + for node, nres in result.items(): + # we don't check for offline cases since this should be run only + # against the master node and/or an instance's nodes + nres.Raise("OS Parameters validation failed on node %s" % node) + if not nres.payload: + lu.LogInfo("OS %s not found on node %s, validation skipped", + osname, node) + + class LUCreateInstance(LogicalUnit): """Create an instance. """ HPATH = "instance-add" HTYPE = constants.HTYPE_INSTANCE - _OP_REQP = ["instance_name", "disks", - "mode", "start", - "wait_for_sync", "ip_check", "nics", - "hvparams", "beparams"] + _OP_PARAMS = [ + _PInstanceName, + ("mode", _NoDefault, _TElemOf(constants.INSTANCE_CREATE_MODES)), + ("start", True, _TBool), + ("wait_for_sync", True, _TBool), + ("ip_check", True, _TBool), + ("name_check", True, _TBool), + ("disks", _NoDefault, _TListOf(_TDict)), + ("nics", _NoDefault, _TListOf(_TDict)), + ("hvparams", _EmptyDict, _TDict), + ("beparams", _EmptyDict, _TDict), + ("osparams", _EmptyDict, _TDict), + ("no_install", None, _TMaybeBool), + ("os_type", None, _TMaybeString), + ("force_variant", False, _TBool), + ("source_handshake", None, _TOr(_TList, _TNone)), + ("source_x509_ca", None, _TMaybeString), + ("source_instance_name", None, _TMaybeString), + ("src_node", None, _TMaybeString), + ("src_path", None, _TMaybeString), + ("pnode", None, _TMaybeString), + ("snode", None, _TMaybeString), + ("iallocator", None, _TMaybeString), + ("hypervisor", None, _TMaybeString), + ("disk_template", _NoDefault, _CheckDiskTemplate), + ("identify_defaults", False, _TBool), + ("file_driver", None, _TOr(_TNone, _TElemOf(constants.FILE_DRIVER))), + ("file_storage_dir", None, _TMaybeString), + ("dry_run", False, _TBool), + ] REQ_BGL = False def CheckArguments(self): """Check arguments. """ - # set optional parameters to none if they don't exist - for attr in ["pnode", "snode", "iallocator", "hypervisor", - "disk_template", "identify_defaults"]: - if not hasattr(self.op, attr): - setattr(self.op, attr, None) - # do not require name_check to ease forward/backward compatibility # for tools - if not hasattr(self.op, "name_check"): - self.op.name_check = True - if not hasattr(self.op, "no_install"): - self.op.no_install = False if self.op.no_install and self.op.start: self.LogInfo("No-installation mode selected, disabling startup") self.op.start = False # validate/normalize the instance name - self.op.instance_name = utils.HostInfo.NormalizeName(self.op.instance_name) + self.op.instance_name = \ + netutils.Hostname.GetNormalizedName(self.op.instance_name) + if self.op.ip_check and not self.op.name_check: # TODO: make the ip check more flexible and not depend on the name check - raise errors.OpPrereqError("Cannot do ip checks without a name check", + raise errors.OpPrereqError("Cannot do ip check without a name check", errors.ECODE_INVAL) - # check disk information: either all adopt, or no adopt + + # check nics' parameter names + for nic in self.op.nics: + utils.ForceDictType(nic, constants.INIC_PARAMS_TYPES) + + # check disks. parameter names and consistent adopt/no-adopt strategy has_adopt = has_no_adopt = False for disk in self.op.disks: + utils.ForceDictType(disk, constants.IDISK_PARAMS_TYPES) if "adopt" in disk: has_adopt = True else: @@ -6127,9 +6557,10 @@ class LUCreateInstance(LogicalUnit): raise errors.OpPrereqError("Either all disks are adopted or none is", errors.ECODE_INVAL) if has_adopt: - if self.op.disk_template != constants.DT_PLAIN: - raise errors.OpPrereqError("Disk adoption is only supported for the" - " 'plain' disk template", + if self.op.disk_template not in constants.DTS_MAY_ADOPT: + raise errors.OpPrereqError("Disk adoption is not supported for the" + " '%s' disk template" % + self.op.disk_template, errors.ECODE_INVAL) if self.op.iallocator is not None: raise errors.OpPrereqError("Disk adoption not allowed with an" @@ -6140,18 +6571,15 @@ class LUCreateInstance(LogicalUnit): self.adopt_disks = has_adopt - # verify creation mode - if self.op.mode not in (constants.INSTANCE_CREATE, - constants.INSTANCE_IMPORT): - raise errors.OpPrereqError("Invalid instance creation mode '%s'" % - self.op.mode, errors.ECODE_INVAL) - # instance name verification if self.op.name_check: - self.hostname1 = utils.GetHostInfo(self.op.instance_name) + self.hostname1 = netutils.GetHostname(name=self.op.instance_name) self.op.instance_name = self.hostname1.name # used in CheckPrereq for ip ping check self.check_ip = self.hostname1.ip + elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT: + raise errors.OpPrereqError("Remote imports require names to be checked" % + errors.ECODE_INVAL) else: self.check_ip = None @@ -6166,10 +6594,9 @@ class LUCreateInstance(LogicalUnit): errors.ECODE_INVAL) ### Node/iallocator related checks - if [self.op.iallocator, self.op.pnode].count(None) != 1: - raise errors.OpPrereqError("One and only one of iallocator and primary" - " node must be given", - errors.ECODE_INVAL) + _CheckIAllocatorOrNode(self, "iallocator", "pnode") + + self._cds = _GetClusterDomainSecret() if self.op.mode == constants.INSTANCE_IMPORT: # On import force_variant must be True, because if we forced it at @@ -6180,15 +6607,59 @@ class LUCreateInstance(LogicalUnit): if self.op.no_install: self.LogInfo("No-installation mode has no effect during import") - else: # INSTANCE_CREATE - if getattr(self.op, "os_type", None) is None: + elif self.op.mode == constants.INSTANCE_CREATE: + if self.op.os_type is None: raise errors.OpPrereqError("No guest OS specified", errors.ECODE_INVAL) - self.op.force_variant = getattr(self.op, "force_variant", False) if self.op.disk_template is None: raise errors.OpPrereqError("No disk template specified", errors.ECODE_INVAL) + elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT: + # Check handshake to ensure both clusters have the same domain secret + src_handshake = self.op.source_handshake + if not src_handshake: + raise errors.OpPrereqError("Missing source handshake", + errors.ECODE_INVAL) + + errmsg = masterd.instance.CheckRemoteExportHandshake(self._cds, + src_handshake) + if errmsg: + raise errors.OpPrereqError("Invalid handshake: %s" % errmsg, + errors.ECODE_INVAL) + + # Load and check source CA + self.source_x509_ca_pem = self.op.source_x509_ca + if not self.source_x509_ca_pem: + raise errors.OpPrereqError("Missing source X509 CA", + errors.ECODE_INVAL) + + try: + (cert, _) = utils.LoadSignedX509Certificate(self.source_x509_ca_pem, + self._cds) + except OpenSSL.crypto.Error, err: + raise errors.OpPrereqError("Unable to load source X509 CA (%s)" % + (err, ), errors.ECODE_INVAL) + + (errcode, msg) = utils.VerifyX509Certificate(cert, None, None) + if errcode is not None: + raise errors.OpPrereqError("Invalid source X509 CA (%s)" % (msg, ), + errors.ECODE_INVAL) + + self.source_x509_ca = cert + + src_instance_name = self.op.source_instance_name + if not src_instance_name: + raise errors.OpPrereqError("Missing source instance name", + errors.ECODE_INVAL) + + self.source_instance_name = \ + netutils.GetHostname(name=src_instance_name).name + + else: + raise errors.OpPrereqError("Invalid instance creation mode %r" % + self.op.mode, errors.ECODE_INVAL) + def ExpandNames(self): """ExpandNames for CreateInstance. @@ -6218,8 +6689,8 @@ class LUCreateInstance(LogicalUnit): # in case of import lock the source node too if self.op.mode == constants.INSTANCE_IMPORT: - src_node = getattr(self.op, "src_node", None) - src_path = getattr(self.op, "src_path", None) + src_node = self.op.src_node + src_path = self.op.src_path if src_path is None: self.op.src_path = src_path = self.op.instance_name @@ -6422,26 +6893,37 @@ class LUCreateInstance(LogicalUnit): einfo.has_option(constants.INISECT_INS, name)): self.op.beparams[name] = einfo.get(constants.INISECT_INS, name) + if einfo.has_section(constants.INISECT_OSP): + # use the parameters, without overriding + for name, value in einfo.items(constants.INISECT_OSP): + if name not in self.op.osparams: + self.op.osparams[name] = value + def _RevertToDefaults(self, cluster): """Revert the instance parameters to the default values. """ # hvparams - hv_defs = cluster.GetHVDefaults(self.op.hypervisor, self.op.os_type) + hv_defs = cluster.SimpleFillHV(self.op.hypervisor, self.op.os_type, {}) for name in self.op.hvparams.keys(): if name in hv_defs and hv_defs[name] == self.op.hvparams[name]: del self.op.hvparams[name] # beparams - be_defs = cluster.beparams.get(constants.PP_DEFAULT, {}) + be_defs = cluster.SimpleFillBE({}) for name in self.op.beparams.keys(): if name in be_defs and be_defs[name] == self.op.beparams[name]: del self.op.beparams[name] # nic params - nic_defs = cluster.nicparams.get(constants.PP_DEFAULT, {}) + nic_defs = cluster.SimpleFillNIC({}) for nic in self.op.nics: for name in constants.NICS_PARAMETERS: if name in nic and name in nic_defs and nic[name] == nic_defs[name]: del nic[name] + # osparams + os_defs = cluster.SimpleFillOS(self.op.os_type, {}) + for name in self.op.osparams.keys(): + if name in os_defs and os_defs[name] == self.op.osparams[name]: + del self.op.osparams[name] def CheckPrereq(self): """Check prerequisites. @@ -6471,9 +6953,8 @@ class LUCreateInstance(LogicalUnit): # check hypervisor parameter syntax (locally) utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES) - filled_hvp = objects.FillDict(cluster.GetHVDefaults(self.op.hypervisor, - self.op.os_type), - self.op.hvparams) + filled_hvp = cluster.SimpleFillHV(self.op.hypervisor, self.op.os_type, + self.op.hvparams) hv_type = hypervisor.GetHypervisor(self.op.hypervisor) hv_type.CheckParameterSyntax(filled_hvp) self.hv_full = filled_hvp @@ -6482,8 +6963,10 @@ class LUCreateInstance(LogicalUnit): # fill and remember the beparams dict utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES) - self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT], - self.op.beparams) + self.be_full = cluster.SimpleFillBE(self.op.beparams) + + # build os parameters + self.os_full = cluster.SimpleFillOS(self.op.os_type, self.op.osparams) # now that hvp/bep are in final format, let's reset to defaults, # if told to do so @@ -6515,7 +6998,7 @@ class LUCreateInstance(LogicalUnit): errors.ECODE_INVAL) nic_ip = self.hostname1.ip else: - if not utils.IsValidIP(ip): + if not netutils.IP4Address.IsValid(ip): raise errors.OpPrereqError("Given IP address '%s' doesn't look" " like a valid IP" % ip, errors.ECODE_INVAL) @@ -6556,8 +7039,7 @@ class LUCreateInstance(LogicalUnit): if link: nicparams[constants.NIC_LINK] = link - check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT], - nicparams) + check_params = cluster.SimpleFillNIC(nicparams) objects.NIC.CheckParameterSyntax(check_params) self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams)) @@ -6622,7 +7104,7 @@ class LUCreateInstance(LogicalUnit): # ip ping checks (we use the same ip that was resolved in ExpandNames) if self.op.ip_check: - if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT): + if netutils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT): raise errors.OpPrereqError("IP %s of instance %s already in use" % (self.check_ip, self.op.instance_name), errors.ECODE_NOTUNIQUE) @@ -6713,6 +7195,8 @@ class LUCreateInstance(LogicalUnit): _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams) _CheckNodeHasOS(self, pnode.name, self.op.os_type, self.op.force_variant) + # check OS parameters (remotely) + _CheckOSParams(self, True, nodenames, self.op.os_type, self.os_full) _CheckNicsBridgesExist(self, self.nics, self.pnode.name) @@ -6751,7 +7235,6 @@ class LUCreateInstance(LogicalUnit): else: file_storage_dir = "" - disks = _GenerateDiskTemplate(self, self.op.disk_template, instance, pnode_name, @@ -6770,6 +7253,7 @@ class LUCreateInstance(LogicalUnit): beparams=self.op.beparams, hvparams=self.op.hvparams, hypervisor=self.op.hypervisor, + osparams=self.op.osparams, ) if self.adopt_disks: @@ -6844,18 +7328,54 @@ class LUCreateInstance(LogicalUnit): elif self.op.mode == constants.INSTANCE_IMPORT: feedback_fn("* running the instance OS import scripts...") - src_node = self.op.src_node - src_images = self.src_images - cluster_name = self.cfg.GetClusterName() - # FIXME: pass debug option from opcode to backend - import_result = self.rpc.call_instance_os_import(pnode_name, iobj, - src_node, src_images, - cluster_name, - self.op.debug_level) - msg = import_result.fail_msg - if msg: - self.LogWarning("Error while importing the disk images for instance" - " %s on node %s: %s" % (instance, pnode_name, msg)) + + transfers = [] + + for idx, image in enumerate(self.src_images): + if not image: + continue + + # FIXME: pass debug option from opcode to backend + dt = masterd.instance.DiskTransfer("disk/%s" % idx, + constants.IEIO_FILE, (image, ), + constants.IEIO_SCRIPT, + (iobj.disks[idx], idx), + None) + transfers.append(dt) + + import_result = \ + masterd.instance.TransferInstanceData(self, feedback_fn, + self.op.src_node, pnode_name, + self.pnode.secondary_ip, + iobj, transfers) + if not compat.all(import_result): + self.LogWarning("Some disks for instance %s on node %s were not" + " imported successfully" % (instance, pnode_name)) + + elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT: + feedback_fn("* preparing remote import...") + connect_timeout = constants.RIE_CONNECT_TIMEOUT + timeouts = masterd.instance.ImportExportTimeouts(connect_timeout) + + disk_results = masterd.instance.RemoteImport(self, feedback_fn, iobj, + self.source_x509_ca, + self._cds, timeouts) + if not compat.all(disk_results): + # TODO: Should the instance still be started, even if some disks + # failed to import (valid for local imports, too)? + self.LogWarning("Some disks for instance %s on node %s were not" + " imported successfully" % (instance, pnode_name)) + + # Run rename script on newly imported instance + assert iobj.name == instance + feedback_fn("Running rename script for %s" % instance) + result = self.rpc.call_instance_run_rename(pnode_name, iobj, + self.source_instance_name, + self.op.debug_level) + if result.fail_msg: + self.LogWarning("Failed to run rename script for %s on node" + " %s: %s" % (instance, pnode_name, result.fail_msg)) + else: # also checked in the prereq part raise errors.ProgrammerError("Unknown OS initialization mode '%s'" @@ -6880,7 +7400,9 @@ class LUConnectConsole(NoHooksLU): console. """ - _OP_REQP = ["instance_name"] + _OP_PARAMS = [ + _PInstanceName + ] REQ_BGL = False def ExpandNames(self): @@ -6931,17 +7453,17 @@ class LUReplaceDisks(LogicalUnit): """ HPATH = "mirrors-replace" HTYPE = constants.HTYPE_INSTANCE - _OP_REQP = ["instance_name", "mode", "disks"] + _OP_PARAMS = [ + _PInstanceName, + ("mode", _NoDefault, _TElemOf(constants.REPLACE_MODES)), + ("disks", _EmptyList, _TListOf(_TPositiveInt)), + ("remote_node", None, _TMaybeString), + ("iallocator", None, _TMaybeString), + ("early_release", False, _TBool), + ] REQ_BGL = False def CheckArguments(self): - if not hasattr(self.op, "remote_node"): - self.op.remote_node = None - if not hasattr(self.op, "iallocator"): - self.op.iallocator = None - if not hasattr(self.op, "early_release"): - self.op.early_release = False - TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node, self.op.iallocator) @@ -7001,107 +7523,18 @@ class LUReplaceDisks(LogicalUnit): return env, nl, nl -class LUEvacuateNode(LogicalUnit): - """Relocate the secondary instances from a node. - - """ - HPATH = "node-evacuate" - HTYPE = constants.HTYPE_NODE - _OP_REQP = ["node_name"] - REQ_BGL = False +class TLReplaceDisks(Tasklet): + """Replaces disks for an instance. - def CheckArguments(self): - if not hasattr(self.op, "remote_node"): - self.op.remote_node = None - if not hasattr(self.op, "iallocator"): - self.op.iallocator = None - if not hasattr(self.op, "early_release"): - self.op.early_release = False - - TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG, - self.op.remote_node, - self.op.iallocator) + Note: Locking is not within the scope of this class. - def ExpandNames(self): - self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) + """ + def __init__(self, lu, instance_name, mode, iallocator_name, remote_node, + disks, delay_iallocator, early_release): + """Initializes this class. - self.needed_locks = {} - - # Declare node locks - if self.op.iallocator is not None: - self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET - - elif self.op.remote_node is not None: - self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node) - - # Warning: do not remove the locking of the new secondary here - # unless DRBD8.AddChildren is changed to work in parallel; - # currently it doesn't since parallel invocations of - # FindUnusedMinor will conflict - self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node] - self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND - - else: - raise errors.OpPrereqError("Invalid parameters", errors.ECODE_INVAL) - - # Create tasklets for replacing disks for all secondary instances on this - # node - names = [] - tasklets = [] - - for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name): - logging.debug("Replacing disks for instance %s", inst.name) - names.append(inst.name) - - replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG, - self.op.iallocator, self.op.remote_node, [], - True, self.op.early_release) - tasklets.append(replacer) - - self.tasklets = tasklets - self.instance_names = names - - # Declare instance locks - self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names - - def DeclareLocks(self, level): - # If we're not already locking all nodes in the set we have to declare the - # instance's primary/secondary nodes. - if (level == locking.LEVEL_NODE and - self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET): - self._LockInstancesNodes() - - def BuildHooksEnv(self): - """Build hooks env. - - This runs on the master, the primary and all the secondaries. - - """ - env = { - "NODE_NAME": self.op.node_name, - } - - nl = [self.cfg.GetMasterNode()] - - if self.op.remote_node is not None: - env["NEW_SECONDARY"] = self.op.remote_node - nl.append(self.op.remote_node) - - return (env, nl, nl) - - -class TLReplaceDisks(Tasklet): - """Replaces disks for an instance. - - Note: Locking is not within the scope of this class. - - """ - def __init__(self, lu, instance_name, mode, iallocator_name, remote_node, - disks, delay_iallocator, early_release): - """Initializes this class. - - """ - Tasklet.__init__(self, lu) + """ + Tasklet.__init__(self, lu) # Parameters self.instance_name = instance_name @@ -7763,13 +8196,24 @@ class LURepairNodeStorage(NoHooksLU): """Repairs the volume group on a node. """ - _OP_REQP = ["node_name"] + _OP_PARAMS = [ + _PNodeName, + ("storage_type", _NoDefault, _CheckStorageType), + ("name", _NoDefault, _TNonEmptyString), + ("ignore_consistency", False, _TBool), + ] REQ_BGL = False def CheckArguments(self): self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) - _CheckStorageType(self.op.storage_type) + storage_type = self.op.storage_type + + if (constants.SO_FIX_CONSISTENCY not in + constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])): + raise errors.OpPrereqError("Storage units of type '%s' can not be" + " repaired" % storage_type, + errors.ECODE_INVAL) def ExpandNames(self): self.needed_locks = { @@ -7794,14 +8238,6 @@ class LURepairNodeStorage(NoHooksLU): """Check prerequisites. """ - storage_type = self.op.storage_type - - if (constants.SO_FIX_CONSISTENCY not in - constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])): - raise errors.OpPrereqError("Storage units of type '%s' can not be" - " repaired" % storage_type, - errors.ECODE_INVAL) - # Check whether any instance on this node has faulty disks for inst in _GetNodeInstances(self.cfg, self.op.node_name): if not inst.admin_up: @@ -7828,17 +8264,15 @@ class LUNodeEvacuationStrategy(NoHooksLU): """Computes the node evacuation strategy. """ - _OP_REQP = ["nodes"] + _OP_PARAMS = [ + ("nodes", _NoDefault, _TListOf(_TNonEmptyString)), + ("remote_node", None, _TMaybeString), + ("iallocator", None, _TMaybeString), + ] REQ_BGL = False def CheckArguments(self): - if not hasattr(self.op, "remote_node"): - self.op.remote_node = None - if not hasattr(self.op, "iallocator"): - self.op.iallocator = None - if self.op.remote_node is not None and self.op.iallocator is not None: - raise errors.OpPrereqError("Give either the iallocator or the new" - " secondary, not both", errors.ECODE_INVAL) + _CheckIAllocatorOrNode(self, "iallocator", "remote_node") def ExpandNames(self): self.op.nodes = _GetWantedNodes(self, self.op.nodes) @@ -7849,9 +8283,6 @@ class LUNodeEvacuationStrategy(NoHooksLU): self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node) locks[locking.LEVEL_NODE] = self.op.nodes + [self.op.remote_node] - def CheckPrereq(self): - pass - def Exec(self, feedback_fn): if self.op.remote_node is not None: instances = [] @@ -7884,7 +8315,12 @@ class LUGrowDisk(LogicalUnit): """ HPATH = "disk-grow" HTYPE = constants.HTYPE_INSTANCE - _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"] + _OP_PARAMS = [ + _PInstanceName, + ("disk", _NoDefault, _TInt), + ("amount", _NoDefault, _TInt), + ("wait_for_sync", True, _TBool), + ] REQ_BGL = False def ExpandNames(self): @@ -7923,7 +8359,6 @@ class LUGrowDisk(LogicalUnit): for node in nodenames: _CheckNodeOnline(self, node) - self.instance = instance if instance.disk_template not in constants.DTS_GROWABLE: @@ -7943,6 +8378,11 @@ class LUGrowDisk(LogicalUnit): """ instance = self.instance disk = self.disk + + disks_ok, _ = _AssembleInstanceDisks(self, self.instance, disks=[disk]) + if not disks_ok: + raise errors.OpExecError("Cannot activate block device to grow") + for node in instance.all_nodes: self.cfg.SetDiskID(disk, node) result = self.rpc.call_blockdev_grow(node, disk, self.op.amount) @@ -7958,27 +8398,32 @@ class LUGrowDisk(LogicalUnit): disk.RecordGrow(self.op.amount) self.cfg.Update(instance, feedback_fn) if self.op.wait_for_sync: - disk_abort = not _WaitForSync(self, instance) + disk_abort = not _WaitForSync(self, instance, disks=[disk]) if disk_abort: self.proc.LogWarning("Warning: disk sync-ing has not returned a good" " status.\nPlease check the instance.") + if not instance.admin_up: + _SafeShutdownInstanceDisks(self, instance, disks=[disk]) + elif not instance.admin_up: + self.proc.LogWarning("Not shutting down the disk even if the instance is" + " not supposed to be running because no wait for" + " sync mode was requested.") class LUQueryInstanceData(NoHooksLU): """Query runtime instance data. """ - _OP_REQP = ["instances", "static"] + _OP_PARAMS = [ + ("instances", _EmptyList, _TListOf(_TNonEmptyString)), + ("static", False, _TBool), + ] REQ_BGL = False def ExpandNames(self): self.needed_locks = {} self.share_locks = dict.fromkeys(locking.LEVELS, 1) - if not isinstance(self.op.instances, list): - raise errors.OpPrereqError("Invalid argument type 'instances'", - errors.ECODE_INVAL) - if self.op.instances: self.wanted_names = [] for name in self.op.instances: @@ -8007,7 +8452,6 @@ class LUQueryInstanceData(NoHooksLU): self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name in self.wanted_names] - return def _ComputeBlockdevStatus(self, node, instance_name, dev): """Returns the status of a block device @@ -8111,6 +8555,8 @@ class LUQueryInstanceData(NoHooksLU): "hv_actual": cluster.FillHV(instance, skip_globals=True), "be_instance": instance.beparams, "be_actual": cluster.FillBE(instance), + "os_instance": instance.osparams, + "os_actual": cluster.SimpleFillOS(instance.os, instance.osparams), "serial_no": instance.serial_no, "mtime": instance.mtime, "ctime": instance.ctime, @@ -8128,27 +8574,22 @@ class LUSetInstanceParams(LogicalUnit): """ HPATH = "instance-modify" HTYPE = constants.HTYPE_INSTANCE - _OP_REQP = ["instance_name"] + _OP_PARAMS = [ + _PInstanceName, + ("nics", _EmptyList, _TList), + ("disks", _EmptyList, _TList), + ("beparams", _EmptyDict, _TDict), + ("hvparams", _EmptyDict, _TDict), + ("disk_template", None, _TMaybeString), + ("remote_node", None, _TMaybeString), + ("os_name", None, _TMaybeString), + ("force_variant", False, _TBool), + ("osparams", None, _TOr(_TDict, _TNone)), + _PForce, + ] REQ_BGL = False def CheckArguments(self): - if not hasattr(self.op, 'nics'): - self.op.nics = [] - if not hasattr(self.op, 'disks'): - self.op.disks = [] - if not hasattr(self.op, 'beparams'): - self.op.beparams = {} - if not hasattr(self.op, 'hvparams'): - self.op.hvparams = {} - if not hasattr(self.op, "disk_template"): - self.op.disk_template = None - if not hasattr(self.op, "remote_node"): - self.op.remote_node = None - if not hasattr(self.op, "os_name"): - self.op.os_name = None - if not hasattr(self.op, "force_variant"): - self.op.force_variant = False - self.op.force = getattr(self.op, "force", False) if not (self.op.nics or self.op.disks or self.op.disk_template or self.op.hvparams or self.op.beparams or self.op.os_name): raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL) @@ -8159,6 +8600,7 @@ class LUSetInstanceParams(LogicalUnit): # Disk validation disk_addremove = 0 for disk_op, disk_dict in self.op.disks: + utils.ForceDictType(disk_dict, constants.IDISK_PARAMS_TYPES) if disk_op == constants.DDM_REMOVE: disk_addremove += 1 continue @@ -8212,6 +8654,7 @@ class LUSetInstanceParams(LogicalUnit): # NIC validation nic_addremove = 0 for nic_op, nic_dict in self.op.nics: + utils.ForceDictType(nic_dict, constants.INIC_PARAMS_TYPES) if nic_op == constants.DDM_REMOVE: nic_addremove += 1 continue @@ -8230,7 +8673,7 @@ class LUSetInstanceParams(LogicalUnit): if nic_ip.lower() == constants.VALUE_NONE: nic_dict['ip'] = None else: - if not utils.IsValidIP(nic_ip): + if not netutils.IP4Address.IsValid(nic_ip): raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip, errors.ECODE_INVAL) @@ -8291,7 +8734,6 @@ class LUSetInstanceParams(LogicalUnit): if self.op.nics: args['nics'] = [] nic_override = dict(self.op.nics) - c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT] for idx, nic in enumerate(self.instance.nics): if idx in nic_override: this_nic_override = nic_override[idx] @@ -8308,7 +8750,7 @@ class LUSetInstanceParams(LogicalUnit): if idx in self.nic_pnew: nicparams = self.nic_pnew[idx] else: - nicparams = objects.FillDict(c_nicparams, nic.nicparams) + nicparams = self.cluster.SimpleFillNIC(nic.nicparams) mode = nicparams[constants.NIC_MODE] link = nicparams[constants.NIC_LINK] args['nics'].append((ip, mac, mode, link)) @@ -8328,47 +8770,12 @@ class LUSetInstanceParams(LogicalUnit): nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes) return env, nl, nl - @staticmethod - def _GetUpdatedParams(old_params, update_dict, - default_values, parameter_types): - """Return the new params dict for the given params. - - @type old_params: dict - @param old_params: old parameters - @type update_dict: dict - @param update_dict: dict containing new parameter values, - or constants.VALUE_DEFAULT to reset the - parameter to its default value - @type default_values: dict - @param default_values: default values for the filled parameters - @type parameter_types: dict - @param parameter_types: dict mapping target dict keys to types - in constants.ENFORCEABLE_TYPES - @rtype: (dict, dict) - @return: (new_parameters, filled_parameters) - - """ - params_copy = copy.deepcopy(old_params) - for key, val in update_dict.iteritems(): - if val == constants.VALUE_DEFAULT: - try: - del params_copy[key] - except KeyError: - pass - else: - params_copy[key] = val - utils.ForceDictType(params_copy, parameter_types) - params_filled = objects.FillDict(default_values, params_copy) - return (params_copy, params_filled) - def CheckPrereq(self): """Check prerequisites. This only checks the instance list against the existing names. """ - self.force = self.op.force - # checking the new params on the primary/secondary nodes instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name) @@ -8378,6 +8785,14 @@ class LUSetInstanceParams(LogicalUnit): pnode = instance.primary_node nodelist = list(instance.all_nodes) + # OS change + if self.op.os_name and not self.op.force: + _CheckNodeHasOS(self, instance.primary_node, self.op.os_name, + self.op.force_variant) + instance_os = self.op.os_name + else: + instance_os = instance.os + if self.op.disk_template: if instance.disk_template == self.op.disk_template: raise errors.OpPrereqError("Instance already has disk template %s" % @@ -8389,23 +8804,27 @@ class LUSetInstanceParams(LogicalUnit): " %s to %s" % (instance.disk_template, self.op.disk_template), errors.ECODE_INVAL) + _CheckInstanceDown(self, instance, "cannot change disk template") if self.op.disk_template in constants.DTS_NET_MIRROR: + if self.op.remote_node == pnode: + raise errors.OpPrereqError("Given new secondary node %s is the same" + " as the primary node of the instance" % + self.op.remote_node, errors.ECODE_STATE) _CheckNodeOnline(self, self.op.remote_node) _CheckNodeNotDrained(self, self.op.remote_node) disks = [{"size": d.size} for d in instance.disks] required = _ComputeDiskSize(self.op.disk_template, disks) _CheckNodesFreeDisk(self, [self.op.remote_node], required) - _CheckInstanceDown(self, instance, "cannot change disk template") # hvparams processing if self.op.hvparams: - i_hvdict, hv_new = self._GetUpdatedParams( - instance.hvparams, self.op.hvparams, - cluster.hvparams[instance.hypervisor], - constants.HVS_PARAMETER_TYPES) + hv_type = instance.hypervisor + i_hvdict = _GetUpdatedParams(instance.hvparams, self.op.hvparams) + utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES) + hv_new = cluster.SimpleFillHV(hv_type, instance.os, i_hvdict) + # local check - hypervisor.GetHypervisor( - instance.hypervisor).CheckParameterSyntax(hv_new) + hypervisor.GetHypervisor(hv_type).CheckParameterSyntax(hv_new) _CheckHVParams(self, nodelist, instance.hypervisor, hv_new) self.hv_new = hv_new # the new actual values self.hv_inst = i_hvdict # the new dict (without defaults) @@ -8414,18 +8833,27 @@ class LUSetInstanceParams(LogicalUnit): # beparams processing if self.op.beparams: - i_bedict, be_new = self._GetUpdatedParams( - instance.beparams, self.op.beparams, - cluster.beparams[constants.PP_DEFAULT], - constants.BES_PARAMETER_TYPES) + i_bedict = _GetUpdatedParams(instance.beparams, self.op.beparams, + use_none=True) + utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES) + be_new = cluster.SimpleFillBE(i_bedict) self.be_new = be_new # the new actual values self.be_inst = i_bedict # the new dict (without defaults) else: self.be_new = self.be_inst = {} + # osparams processing + if self.op.osparams: + i_osdict = _GetUpdatedParams(instance.osparams, self.op.osparams) + _CheckOSParams(self, True, nodelist, instance_os, i_osdict) + self.os_new = cluster.SimpleFillOS(instance_os, i_osdict) + self.os_inst = i_osdict # the new dict (without defaults) + else: + self.os_new = self.os_inst = {} + self.warn = [] - if constants.BE_MEMORY in self.op.beparams and not self.force: + if constants.BE_MEMORY in self.op.beparams and not self.op.force: mem_check_list = [pnode] if be_new[constants.BE_AUTO_BALANCE]: # either we changed auto_balance to yes or it was from before @@ -8510,10 +8938,10 @@ class LUSetInstanceParams(LogicalUnit): if 'bridge' in nic_dict: update_params_dict[constants.NIC_LINK] = nic_dict['bridge'] - new_nic_params, new_filled_nic_params = \ - self._GetUpdatedParams(old_nic_params, update_params_dict, - cluster.nicparams[constants.PP_DEFAULT], - constants.NICS_PARAMETER_TYPES) + new_nic_params = _GetUpdatedParams(old_nic_params, + update_params_dict) + utils.ForceDictType(new_nic_params, constants.NICS_PARAMETER_TYPES) + new_filled_nic_params = cluster.SimpleFillNIC(new_nic_params) objects.NIC.CheckParameterSyntax(new_filled_nic_params) self.nic_pinst[nic_op] = new_nic_params self.nic_pnew[nic_op] = new_filled_nic_params @@ -8524,7 +8952,7 @@ class LUSetInstanceParams(LogicalUnit): msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg if msg: msg = "Error checking bridges on node %s: %s" % (pnode, msg) - if self.force: + if self.op.force: self.warn.append(msg) else: raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON) @@ -8578,11 +9006,6 @@ class LUSetInstanceParams(LogicalUnit): (disk_op, len(instance.disks)), errors.ECODE_INVAL) - # OS change - if self.op.os_name and not self.op.force: - _CheckNodeHasOS(self, instance.primary_node, self.op.os_name, - self.op.force_variant) - return def _ConvertPlainToDrbd(self, feedback_fn): @@ -8797,6 +9220,12 @@ class LUSetInstanceParams(LogicalUnit): if self.op.os_name: instance.os = self.op.os_name + # osparams changes + if self.op.osparams: + instance.osparams = self.os_inst + for key, val in self.op.osparams.iteritems(): + result.append(("os/%s" % key, val)) + self.cfg.Update(instance, feedback_fn) return result @@ -8806,11 +9235,15 @@ class LUSetInstanceParams(LogicalUnit): (constants.DT_DRBD8, constants.DT_PLAIN): _ConvertDrbdToPlain, } + class LUQueryExports(NoHooksLU): """Query the exports list """ - _OP_REQP = ['nodes'] + _OP_PARAMS = [ + ("nodes", _EmptyList, _TListOf(_TNonEmptyString)), + ("use_locking", False, _TBool), + ] REQ_BGL = False def ExpandNames(self): @@ -8822,12 +9255,6 @@ class LUQueryExports(NoHooksLU): self.needed_locks[locking.LEVEL_NODE] = \ _GetWantedNodes(self, self.op.nodes) - def CheckPrereq(self): - """Check prerequisites. - - """ - self.nodes = self.acquired_locks[locking.LEVEL_NODE] - def Exec(self, feedback_fn): """Compute the list of all the exported system images. @@ -8837,6 +9264,7 @@ class LUQueryExports(NoHooksLU): that node. """ + self.nodes = self.acquired_locks[locking.LEVEL_NODE] rpcresult = self.rpc.call_export_list(self.nodes) result = {} for node in rpcresult: @@ -8848,33 +9276,114 @@ class LUQueryExports(NoHooksLU): return result +class LUPrepareExport(NoHooksLU): + """Prepares an instance for an export and returns useful information. + + """ + _OP_PARAMS = [ + _PInstanceName, + ("mode", _NoDefault, _TElemOf(constants.EXPORT_MODES)), + ] + REQ_BGL = False + + def ExpandNames(self): + self._ExpandAndLockInstance() + + def CheckPrereq(self): + """Check prerequisites. + + """ + instance_name = self.op.instance_name + + self.instance = self.cfg.GetInstanceInfo(instance_name) + assert self.instance is not None, \ + "Cannot retrieve locked instance %s" % self.op.instance_name + _CheckNodeOnline(self, self.instance.primary_node) + + self._cds = _GetClusterDomainSecret() + + def Exec(self, feedback_fn): + """Prepares an instance for an export. + + """ + instance = self.instance + + if self.op.mode == constants.EXPORT_MODE_REMOTE: + salt = utils.GenerateSecret(8) + + feedback_fn("Generating X509 certificate on %s" % instance.primary_node) + result = self.rpc.call_x509_cert_create(instance.primary_node, + constants.RIE_CERT_VALIDITY) + result.Raise("Can't create X509 key and certificate on %s" % result.node) + + (name, cert_pem) = result.payload + + cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, + cert_pem) + + return { + "handshake": masterd.instance.ComputeRemoteExportHandshake(self._cds), + "x509_key_name": (name, utils.Sha1Hmac(self._cds, name, salt=salt), + salt), + "x509_ca": utils.SignX509Certificate(cert, self._cds, salt), + } + + return None + + class LUExportInstance(LogicalUnit): """Export an instance to an image in the cluster. """ HPATH = "instance-export" HTYPE = constants.HTYPE_INSTANCE - _OP_REQP = ["instance_name", "target_node", "shutdown"] + _OP_PARAMS = [ + _PInstanceName, + ("target_node", _NoDefault, _TOr(_TNonEmptyString, _TList)), + ("shutdown", True, _TBool), + _PShutdownTimeout, + ("remove_instance", False, _TBool), + ("ignore_remove_failures", False, _TBool), + ("mode", constants.EXPORT_MODE_LOCAL, _TElemOf(constants.EXPORT_MODES)), + ("x509_key_name", None, _TOr(_TList, _TNone)), + ("destination_x509_ca", None, _TMaybeString), + ] REQ_BGL = False def CheckArguments(self): """Check the arguments. """ - self.shutdown_timeout = getattr(self.op, "shutdown_timeout", - constants.DEFAULT_SHUTDOWN_TIMEOUT) + self.x509_key_name = self.op.x509_key_name + self.dest_x509_ca_pem = self.op.destination_x509_ca + + if self.op.remove_instance and not self.op.shutdown: + raise errors.OpPrereqError("Can not remove instance without shutting it" + " down before") + + if self.op.mode == constants.EXPORT_MODE_REMOTE: + if not self.x509_key_name: + raise errors.OpPrereqError("Missing X509 key name for encryption", + errors.ECODE_INVAL) + + if not self.dest_x509_ca_pem: + raise errors.OpPrereqError("Missing destination X509 CA", + errors.ECODE_INVAL) def ExpandNames(self): self._ExpandAndLockInstance() - # FIXME: lock only instance primary and destination node - # - # Sad but true, for now we have do lock all nodes, as we don't know where - # the previous export might be, and and in this LU we search for it and - # remove it from its current node. In the future we could fix this by: - # - making a tasklet to search (share-lock all), then create the new one, - # then one to remove, after - # - removing the removal operation altogether - self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + + # Lock all nodes for local exports + if self.op.mode == constants.EXPORT_MODE_LOCAL: + # FIXME: lock only instance primary and destination node + # + # Sad but true, for now we have do lock all nodes, as we don't know where + # the previous export might be, and in this LU we search for it and + # remove it from its current node. In the future we could fix this by: + # - making a tasklet to search (share-lock all), then create the + # new one, then one to remove, after + # - removing the removal operation altogether + self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET def DeclareLocks(self, level): """Last minute lock declaration.""" @@ -8887,13 +9396,21 @@ class LUExportInstance(LogicalUnit): """ env = { + "EXPORT_MODE": self.op.mode, "EXPORT_NODE": self.op.target_node, "EXPORT_DO_SHUTDOWN": self.op.shutdown, - "SHUTDOWN_TIMEOUT": self.shutdown_timeout, + "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout, + # TODO: Generic function for boolean env variables + "REMOVE_INSTANCE": str(bool(self.op.remove_instance)), } + env.update(_BuildInstanceHookEnvByObject(self, self.instance)) - nl = [self.cfg.GetMasterNode(), self.instance.primary_node, - self.op.target_node] + + nl = [self.cfg.GetMasterNode(), self.instance.primary_node] + + if self.op.mode == constants.EXPORT_MODE_LOCAL: + nl.append(self.op.target_node) + return env, nl, nl def CheckPrereq(self): @@ -8903,85 +9420,85 @@ class LUExportInstance(LogicalUnit): """ instance_name = self.op.instance_name + self.instance = self.cfg.GetInstanceInfo(instance_name) assert self.instance is not None, \ "Cannot retrieve locked instance %s" % self.op.instance_name _CheckNodeOnline(self, self.instance.primary_node) - self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node) - self.dst_node = self.cfg.GetNodeInfo(self.op.target_node) - assert self.dst_node is not None - - _CheckNodeOnline(self, self.dst_node.name) - _CheckNodeNotDrained(self, self.dst_node.name) - - # instance disk type verification - for disk in self.instance.disks: - if disk.dev_type == constants.LD_FILE: - raise errors.OpPrereqError("Export not supported for instances with" - " file-based disks", errors.ECODE_INVAL) + if self.op.mode == constants.EXPORT_MODE_LOCAL: + self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node) + self.dst_node = self.cfg.GetNodeInfo(self.op.target_node) + assert self.dst_node is not None - def _CreateSnapshots(self, feedback_fn): - """Creates an LVM snapshot for every disk of the instance. + _CheckNodeOnline(self, self.dst_node.name) + _CheckNodeNotDrained(self, self.dst_node.name) - @return: List of snapshots as L{objects.Disk} instances + self._cds = None + self.dest_disk_info = None + self.dest_x509_ca = None - """ - instance = self.instance - src_node = instance.primary_node + elif self.op.mode == constants.EXPORT_MODE_REMOTE: + self.dst_node = None - vgname = self.cfg.GetVGName() + if len(self.op.target_node) != len(self.instance.disks): + raise errors.OpPrereqError(("Received destination information for %s" + " disks, but instance %s has %s disks") % + (len(self.op.target_node), instance_name, + len(self.instance.disks)), + errors.ECODE_INVAL) - snap_disks = [] + cds = _GetClusterDomainSecret() - for idx, disk in enumerate(instance.disks): - feedback_fn("Creating a snapshot of disk/%s on node %s" % - (idx, src_node)) + # Check X509 key name + try: + (key_name, hmac_digest, hmac_salt) = self.x509_key_name + except (TypeError, ValueError), err: + raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err) - # result.payload will be a snapshot of an lvm leaf of the one we - # passed - result = self.rpc.call_blockdev_snapshot(src_node, disk) - msg = result.fail_msg - if msg: - self.LogWarning("Could not snapshot disk/%s on node %s: %s", - idx, src_node, msg) - snap_disks.append(False) - else: - disk_id = (vgname, result.payload) - new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size, - logical_id=disk_id, physical_id=disk_id, - iv_name=disk.iv_name) - snap_disks.append(new_dev) + if not utils.VerifySha1Hmac(cds, key_name, hmac_digest, salt=hmac_salt): + raise errors.OpPrereqError("HMAC for X509 key name is wrong", + errors.ECODE_INVAL) - return snap_disks + # Load and verify CA + try: + (cert, _) = utils.LoadSignedX509Certificate(self.dest_x509_ca_pem, cds) + except OpenSSL.crypto.Error, err: + raise errors.OpPrereqError("Unable to load destination X509 CA (%s)" % + (err, ), errors.ECODE_INVAL) - def _RemoveSnapshot(self, feedback_fn, snap_disks, disk_index): - """Removes an LVM snapshot. + (errcode, msg) = utils.VerifyX509Certificate(cert, None, None) + if errcode is not None: + raise errors.OpPrereqError("Invalid destination X509 CA (%s)" % + (msg, ), errors.ECODE_INVAL) - @type snap_disks: list - @param snap_disks: The list of all snapshots as returned by - L{_CreateSnapshots} - @type disk_index: number - @param disk_index: Index of the snapshot to be removed - @rtype: bool - @return: Whether removal was successful or not + self.dest_x509_ca = cert - """ - disk = snap_disks[disk_index] - if disk: - src_node = self.instance.primary_node + # Verify target information + disk_info = [] + for idx, disk_data in enumerate(self.op.target_node): + try: + (host, port, magic) = \ + masterd.instance.CheckRemoteExportDiskInfo(cds, idx, disk_data) + except errors.GenericError, err: + raise errors.OpPrereqError("Target info for disk %s: %s" % + (idx, err), errors.ECODE_INVAL) - feedback_fn("Removing snapshot of disk/%s on node %s" % - (disk_index, src_node)) + disk_info.append((host, port, magic)) - result = self.rpc.call_blockdev_remove(src_node, disk) - if not result.fail_msg: - return True + assert len(disk_info) == len(self.op.target_node) + self.dest_disk_info = disk_info - self.LogWarning("Could not remove snapshot for disk/%d from node" - " %s: %s", disk_index, src_node, result.fail_msg) + else: + raise errors.ProgrammerError("Unhandled export mode %r" % + self.op.mode) - return False + # instance disk type verification + # TODO: Implement export support for file-based disks + for disk in self.instance.disks: + if disk.dev_type == constants.LD_FILE: + raise errors.OpPrereqError("Export not supported for instances with" + " file-based disks", errors.ECODE_INVAL) def _CleanupExports(self, feedback_fn): """Removes exports of current instance from all other nodes. @@ -8990,6 +9507,8 @@ class LUExportInstance(LogicalUnit): exports will be removed from the nodes A, B and D. """ + assert self.op.mode != constants.EXPORT_MODE_REMOTE + nodelist = self.cfg.GetNodeList() nodelist.remove(self.dst_node.name) @@ -9013,15 +9532,17 @@ class LUExportInstance(LogicalUnit): """Export an instance to an image in the cluster. """ + assert self.op.mode in constants.EXPORT_MODES + instance = self.instance - dst_node = self.dst_node src_node = instance.primary_node if self.op.shutdown: # shutdown the instance, but not the disks feedback_fn("Shutting down instance %s" % instance.name) result = self.rpc.call_instance_shutdown(src_node, instance, - self.shutdown_timeout) + self.op.shutdown_timeout) + # TODO: Maybe ignore failures if ignore_remove_failures is set result.Raise("Could not shutdown instance %s on" " node %s" % (instance.name, src_node)) @@ -9038,80 +9559,72 @@ class LUExportInstance(LogicalUnit): _StartInstanceDisks(self, instance, None) try: - # per-disk results - dresults = [] - removed_snaps = [False] * len(instance.disks) + helper = masterd.instance.ExportInstanceHelper(self, feedback_fn, + instance) - snap_disks = None + helper.CreateSnapshots() try: - try: - snap_disks = self._CreateSnapshots(feedback_fn) - finally: - if self.op.shutdown and instance.admin_up: - feedback_fn("Starting instance %s" % instance.name) - result = self.rpc.call_instance_start(src_node, instance, - None, None) - msg = result.fail_msg - if msg: - _ShutdownInstanceDisks(self, instance) - raise errors.OpExecError("Could not start instance: %s" % msg) - - assert len(snap_disks) == len(instance.disks) - assert len(removed_snaps) == len(instance.disks) - - # TODO: check for size - - cluster_name = self.cfg.GetClusterName() - for idx, dev in enumerate(snap_disks): - feedback_fn("Exporting snapshot %s from %s to %s" % - (idx, src_node, dst_node.name)) - if dev: - # FIXME: pass debug from opcode to backend - result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name, - instance, cluster_name, - idx, self.op.debug_level) - msg = result.fail_msg - if msg: - self.LogWarning("Could not export disk/%s from node %s to" - " node %s: %s", idx, src_node, dst_node.name, msg) - dresults.append(False) - else: - dresults.append(True) + if (self.op.shutdown and instance.admin_up and + not self.op.remove_instance): + assert not activate_disks + feedback_fn("Starting instance %s" % instance.name) + result = self.rpc.call_instance_start(src_node, instance, None, None) + msg = result.fail_msg + if msg: + feedback_fn("Failed to start instance: %s" % msg) + _ShutdownInstanceDisks(self, instance) + raise errors.OpExecError("Could not start instance: %s" % msg) - # Remove snapshot - if self._RemoveSnapshot(feedback_fn, snap_disks, idx): - removed_snaps[idx] = True - else: - dresults.append(False) + if self.op.mode == constants.EXPORT_MODE_LOCAL: + (fin_resu, dresults) = helper.LocalExport(self.dst_node) + elif self.op.mode == constants.EXPORT_MODE_REMOTE: + connect_timeout = constants.RIE_CONNECT_TIMEOUT + timeouts = masterd.instance.ImportExportTimeouts(connect_timeout) - assert len(dresults) == len(instance.disks) + (key_name, _, _) = self.x509_key_name - # Check for backwards compatibility - assert compat.all(isinstance(i, bool) for i in dresults), \ - "Not all results are boolean: %r" % dresults - - feedback_fn("Finalizing export on %s" % dst_node.name) - result = self.rpc.call_finalize_export(dst_node.name, instance, - snap_disks) - msg = result.fail_msg - fin_resu = not msg - if msg: - self.LogWarning("Could not finalize export for instance %s" - " on node %s: %s", instance.name, dst_node.name, msg) + dest_ca_pem = \ + OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, + self.dest_x509_ca) + (fin_resu, dresults) = helper.RemoteExport(self.dest_disk_info, + key_name, dest_ca_pem, + timeouts) finally: - # Remove all snapshots - assert len(removed_snaps) == len(instance.disks) - for idx, removed in enumerate(removed_snaps): - if not removed: - self._RemoveSnapshot(feedback_fn, snap_disks, idx) + helper.Cleanup() + + # Check for backwards compatibility + assert len(dresults) == len(instance.disks) + assert compat.all(isinstance(i, bool) for i in dresults), \ + "Not all results are boolean: %r" % dresults finally: if activate_disks: feedback_fn("Deactivating disks for %s" % instance.name) _ShutdownInstanceDisks(self, instance) - self._CleanupExports(feedback_fn) + if not (compat.all(dresults) and fin_resu): + failures = [] + if not fin_resu: + failures.append("export finalization") + if not compat.all(dresults): + fdsk = utils.CommaJoin(idx for (idx, dsk) in enumerate(dresults) + if not dsk) + failures.append("disk export: disk(s) %s" % fdsk) + + raise errors.OpExecError("Export failed, errors in %s" % + utils.CommaJoin(failures)) + + # At this point, the export was successful, we can cleanup/finish + + # Remove instance if requested + if self.op.remove_instance: + feedback_fn("Removing instance %s" % instance.name) + _RemoveInstance(self, feedback_fn, instance, + self.op.ignore_remove_failures) + + if self.op.mode == constants.EXPORT_MODE_LOCAL: + self._CleanupExports(feedback_fn) return fin_resu, dresults @@ -9120,7 +9633,9 @@ class LURemoveExport(NoHooksLU): """Remove exports related to the named instance. """ - _OP_REQP = ["instance_name"] + _OP_PARAMS = [ + _PInstanceName, + ] REQ_BGL = False def ExpandNames(self): @@ -9130,11 +9645,6 @@ class LURemoveExport(NoHooksLU): # we can remove exports also for a removed instance) self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET - def CheckPrereq(self): - """Check prerequisites. - """ - pass - def Exec(self, feedback_fn): """Remove any export. @@ -9204,7 +9714,10 @@ class LUGetTags(TagsLU): """Returns the tags of a given object. """ - _OP_REQP = ["kind", "name"] + _OP_PARAMS = [ + ("kind", _NoDefault, _TElemOf(constants.VALID_TAG_TYPES)), + ("name", _NoDefault, _TNonEmptyString), + ] REQ_BGL = False def Exec(self, feedback_fn): @@ -9218,7 +9731,9 @@ class LUSearchTags(NoHooksLU): """Searches the tags for a given pattern. """ - _OP_REQP = ["pattern"] + _OP_PARAMS = [ + ("pattern", _NoDefault, _TNonEmptyString), + ] REQ_BGL = False def ExpandNames(self): @@ -9258,7 +9773,11 @@ class LUAddTags(TagsLU): """Sets a tag on a given object. """ - _OP_REQP = ["kind", "name", "tags"] + _OP_PARAMS = [ + ("kind", _NoDefault, _TElemOf(constants.VALID_TAG_TYPES)), + ("name", _NoDefault, _TNonEmptyString), + ("tags", _NoDefault, _TListOf(_TNonEmptyString)), + ] REQ_BGL = False def CheckPrereq(self): @@ -9287,7 +9806,11 @@ class LUDelTags(TagsLU): """Delete a list of tags from a given object. """ - _OP_REQP = ["kind", "name", "tags"] + _OP_PARAMS = [ + ("kind", _NoDefault, _TElemOf(constants.VALID_TAG_TYPES)), + ("name", _NoDefault, _TNonEmptyString), + ("tags", _NoDefault, _TListOf(_TNonEmptyString)), + ] REQ_BGL = False def CheckPrereq(self): @@ -9324,7 +9847,12 @@ class LUTestDelay(NoHooksLU): time. """ - _OP_REQP = ["duration", "on_master", "on_nodes"] + _OP_PARAMS = [ + ("duration", _NoDefault, _TFloat), + ("on_master", True, _TBool), + ("on_nodes", _EmptyList, _TListOf(_TNonEmptyString)), + ("repeat", 0, _TPositiveInt) + ] REQ_BGL = False def ExpandNames(self): @@ -9341,12 +9869,7 @@ class LUTestDelay(NoHooksLU): self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes) self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes - def CheckPrereq(self): - """Check prerequisites. - - """ - - def Exec(self, feedback_fn): + def _TestDelay(self): """Do the actual sleep. """ @@ -9358,6 +9881,160 @@ class LUTestDelay(NoHooksLU): for node, node_result in result.items(): node_result.Raise("Failure during rpc call to node %s" % node) + def Exec(self, feedback_fn): + """Execute the test delay opcode, with the wanted repetitions. + + """ + if self.op.repeat == 0: + self._TestDelay() + else: + top_value = self.op.repeat - 1 + for i in range(self.op.repeat): + self.LogInfo("Test delay iteration %d/%d" % (i, top_value)) + self._TestDelay() + + +class LUTestJobqueue(NoHooksLU): + """Utility LU to test some aspects of the job queue. + + """ + _OP_PARAMS = [ + ("notify_waitlock", False, _TBool), + ("notify_exec", False, _TBool), + ("log_messages", _EmptyList, _TListOf(_TString)), + ("fail", False, _TBool), + ] + REQ_BGL = False + + # Must be lower than default timeout for WaitForJobChange to see whether it + # notices changed jobs + _CLIENT_CONNECT_TIMEOUT = 20.0 + _CLIENT_CONFIRM_TIMEOUT = 60.0 + + @classmethod + def _NotifyUsingSocket(cls, cb, errcls): + """Opens a Unix socket and waits for another program to connect. + + @type cb: callable + @param cb: Callback to send socket name to client + @type errcls: class + @param errcls: Exception class to use for errors + + """ + # Using a temporary directory as there's no easy way to create temporary + # sockets without writing a custom loop around tempfile.mktemp and + # socket.bind + tmpdir = tempfile.mkdtemp() + try: + tmpsock = utils.PathJoin(tmpdir, "sock") + + logging.debug("Creating temporary socket at %s", tmpsock) + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + try: + sock.bind(tmpsock) + sock.listen(1) + + # Send details to client + cb(tmpsock) + + # Wait for client to connect before continuing + sock.settimeout(cls._CLIENT_CONNECT_TIMEOUT) + try: + (conn, _) = sock.accept() + except socket.error, err: + raise errcls("Client didn't connect in time (%s)" % err) + finally: + sock.close() + finally: + # Remove as soon as client is connected + shutil.rmtree(tmpdir) + + # Wait for client to close + try: + try: + # pylint: disable-msg=E1101 + # Instance of '_socketobject' has no ... member + conn.settimeout(cls._CLIENT_CONFIRM_TIMEOUT) + conn.recv(1) + except socket.error, err: + raise errcls("Client failed to confirm notification (%s)" % err) + finally: + conn.close() + + def _SendNotification(self, test, arg, sockname): + """Sends a notification to the client. + + @type test: string + @param test: Test name + @param arg: Test argument (depends on test) + @type sockname: string + @param sockname: Socket path + + """ + self.Log(constants.ELOG_JQUEUE_TEST, (sockname, test, arg)) + + def _Notify(self, prereq, test, arg): + """Notifies the client of a test. + + @type prereq: bool + @param prereq: Whether this is a prereq-phase test + @type test: string + @param test: Test name + @param arg: Test argument (depends on test) + + """ + if prereq: + errcls = errors.OpPrereqError + else: + errcls = errors.OpExecError + + return self._NotifyUsingSocket(compat.partial(self._SendNotification, + test, arg), + errcls) + + def CheckArguments(self): + self.checkargs_calls = getattr(self, "checkargs_calls", 0) + 1 + self.expandnames_calls = 0 + + def ExpandNames(self): + checkargs_calls = getattr(self, "checkargs_calls", 0) + if checkargs_calls < 1: + raise errors.ProgrammerError("CheckArguments was not called") + + self.expandnames_calls += 1 + + if self.op.notify_waitlock: + self._Notify(True, constants.JQT_EXPANDNAMES, None) + + self.LogInfo("Expanding names") + + # Get lock on master node (just to get a lock, not for a particular reason) + self.needed_locks = { + locking.LEVEL_NODE: self.cfg.GetMasterNode(), + } + + def Exec(self, feedback_fn): + if self.expandnames_calls < 1: + raise errors.ProgrammerError("ExpandNames was not called") + + if self.op.notify_exec: + self._Notify(False, constants.JQT_EXEC, None) + + self.LogInfo("Executing") + + if self.op.log_messages: + self._Notify(False, constants.JQT_STARTMSG, len(self.op.log_messages)) + for idx, msg in enumerate(self.op.log_messages): + self.LogInfo("Sending log message %s", idx + 1) + feedback_fn(constants.JQT_MSGPREFIX + msg) + # Report how many test messages have been sent + self._Notify(False, constants.JQT_LOGMSG, idx + 1) + + if self.op.fail: + raise errors.OpExecError("Opcode failure was requested") + + return True + class IAllocator(object): """IAllocator framework. @@ -9524,9 +10201,7 @@ class IAllocator(object): for iinfo, beinfo in i_list: nic_data = [] for nic in iinfo.nics: - filled_params = objects.FillDict( - cluster_info.nicparams[constants.PP_DEFAULT], - nic.nicparams) + filled_params = cluster_info.SimpleFillNIC(nic.nicparams) nic_dict = {"mac": nic.mac, "ip": nic.ip, "mode": filled_params[constants.NIC_MODE], @@ -9693,7 +10368,23 @@ class LUTestAllocator(NoHooksLU): This LU runs the allocator tests """ - _OP_REQP = ["direction", "mode", "name"] + _OP_PARAMS = [ + ("direction", _NoDefault, _TElemOf(constants.VALID_IALLOCATOR_DIRECTIONS)), + ("mode", _NoDefault, _TElemOf(constants.VALID_IALLOCATOR_MODES)), + ("name", _NoDefault, _TNonEmptyString), + ("nics", _NoDefault, _TOr(_TNone, _TListOf( + _TDictOf(_TElemOf(["mac", "ip", "bridge"]), + _TOr(_TNone, _TNonEmptyString))))), + ("disks", _NoDefault, _TOr(_TNone, _TList)), + ("hypervisor", None, _TMaybeString), + ("allocator", None, _TMaybeString), + ("tags", _EmptyList, _TListOf(_TNonEmptyString)), + ("mem_size", None, _TOr(_TNone, _TPositiveInt)), + ("vcpus", None, _TOr(_TNone, _TPositiveInt)), + ("os", None, _TMaybeString), + ("disk_template", None, _TMaybeString), + ("evac_nodes", None, _TOr(_TNone, _TListOf(_TNonEmptyString))), + ] def CheckPrereq(self): """Check prerequisites. @@ -9702,7 +10393,7 @@ class LUTestAllocator(NoHooksLU): """ if self.op.mode == constants.IALLOCATOR_MODE_ALLOC: - for attr in ["name", "mem_size", "disks", "disk_template", + for attr in ["mem_size", "disks", "disk_template", "os", "tags", "nics", "vcpus"]: if not hasattr(self.op, attr): raise errors.OpPrereqError("Missing attribute '%s' on opcode input" % @@ -9714,13 +10405,6 @@ class LUTestAllocator(NoHooksLU): if not isinstance(self.op.nics, list): raise errors.OpPrereqError("Invalid parameter 'nics'", errors.ECODE_INVAL) - for row in self.op.nics: - if (not isinstance(row, dict) or - "mac" not in row or - "ip" not in row or - "bridge" not in row): - raise errors.OpPrereqError("Invalid contents of the 'nics'" - " parameter", errors.ECODE_INVAL) if not isinstance(self.op.disks, list): raise errors.OpPrereqError("Invalid parameter 'disks'", errors.ECODE_INVAL) @@ -9732,12 +10416,9 @@ class LUTestAllocator(NoHooksLU): row["mode"] not in ['r', 'w']): raise errors.OpPrereqError("Invalid contents of the 'disks'" " parameter", errors.ECODE_INVAL) - if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None: + if self.op.hypervisor is None: self.op.hypervisor = self.cfg.GetHypervisorType() elif self.op.mode == constants.IALLOCATOR_MODE_RELOC: - if not hasattr(self.op, "name"): - raise errors.OpPrereqError("Missing attribute 'name' on opcode input", - errors.ECODE_INVAL) fname = _ExpandInstanceName(self.cfg, self.op.name) self.op.name = fname self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes @@ -9750,7 +10431,7 @@ class LUTestAllocator(NoHooksLU): self.op.mode, errors.ECODE_INVAL) if self.op.direction == constants.IALLOCATOR_DIR_OUT: - if not hasattr(self.op, "allocator") or self.op.allocator is None: + if self.op.allocator is None: raise errors.OpPrereqError("Missing allocator name", errors.ECODE_INVAL) elif self.op.direction != constants.IALLOCATOR_DIR_IN: