X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/63ac47cc02e5a74d763ce91d8144e8f40d228bb3..e8d61457f16974cbf0d77479f9d06f4c6345a02e:/lib/cmdlib.py diff --git a/lib/cmdlib.py b/lib/cmdlib.py index 7cd5100..1843311 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007, 2008 Google Inc. +# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -36,6 +36,9 @@ import platform import logging import copy import OpenSSL +import socket +import tempfile +import shutil from ganeti import ssh from ganeti import utils @@ -49,6 +52,7 @@ from ganeti import ssconf from ganeti import uidpool from ganeti import compat from ganeti import masterd +from ganeti import netutils import ganeti.masterd.instance # pylint: disable-msg=W0611 @@ -228,6 +232,13 @@ _PInstanceName = ("instance_name", _NoDefault, _TNonEmptyString) #: a required node name (for single-node LUs) _PNodeName = ("node_name", _NoDefault, _TNonEmptyString) +#: the migration type (live/non-live) +_PMigrationMode = ("mode", None, _TOr(_TNone, + _TElemOf(constants.HT_MIGRATION_MODES))) + +#: the obsolete 'live' mode (boolean) +_PMigrationLive = ("live", None, _TMaybeBool) + # End types class LogicalUnit(object): @@ -277,6 +288,7 @@ class LogicalUnit(object): self.recalculate_locks = {} self.__ssh = None # logging + self.Log = processor.Log # pylint: disable-msg=C0103 self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103 self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103 self.LogStep = processor.LogStep # pylint: disable-msg=C0103 @@ -377,11 +389,11 @@ class LogicalUnit(object): # Acquire all nodes and one instance self.needed_locks = { locking.LEVEL_NODE: locking.ALL_SET, - locking.LEVEL_INSTANCE: ['instance1.example.tld'], + locking.LEVEL_INSTANCE: ['instance1.example.com'], } # Acquire just two nodes self.needed_locks = { - locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'], + locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'], } # Acquire no locks self.needed_locks = {} # No, you can't leave it to the default value None @@ -1237,7 +1249,6 @@ class LUDestroyCluster(LogicalUnit): """ master = self.cfg.GetMasterNode() - modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup # Run post hooks on master node before it's removed hm = self.proc.hmclass(self.rpc.call_hooks_runner, self) @@ -1250,11 +1261,6 @@ class LUDestroyCluster(LogicalUnit): result = self.rpc.call_node_stop_master(master, False) result.Raise("Could not disable the master role") - if modify_ssh_setup: - priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS) - utils.CreateBackup(priv_key) - utils.CreateBackup(pub_key) - return master @@ -1431,14 +1437,11 @@ class LUVerifyCluster(LogicalUnit): self.bad = self.bad or cond def _VerifyNode(self, ninfo, nresult): - """Run multiple tests against a node. + """Perform some basic validation on data returned from a node. - Test list: - - - compares ganeti version - - checks vg existence and size > 20G - - checks config file checksum - - checks ssh to other nodes + - check the result data structure is well formed and has all the + mandatory fields + - check ganeti version @type ninfo: L{objects.Node} @param ninfo: the node to check @@ -1648,20 +1651,24 @@ class LUVerifyCluster(LogicalUnit): _ErrorIf(test, self.EINSTANCEWRONGNODE, instance, "instance should not run on node %s", node) - def _VerifyOrphanVolumes(self, node_vol_should, node_image): + def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved): """Verify if there are any unknown volumes in the cluster. The .os, .swap and backup volumes are ignored. All other volumes are reported as unknown. + @type reserved: L{ganeti.utils.FieldSet} + @param reserved: a FieldSet of reserved volume names + """ for node, n_img in node_image.items(): if n_img.offline or n_img.rpc_fail or n_img.lvm_fail: # skip non-healthy nodes continue for volume in n_img.volumes: - test = (node not in node_vol_should or - volume not in node_vol_should[node]) + test = ((node not in node_vol_should or + volume not in node_vol_should[node]) and + not reserved.Matches(volume)) self._ErrorIf(test, self.ENODEORPHANLV, node, "volume %s is unknown", volume) @@ -2226,7 +2233,8 @@ class LUVerifyCluster(LogicalUnit): "instance lives on ghost node %s", node) feedback_fn("* Verifying orphan volumes") - self._VerifyOrphanVolumes(node_vol_should, node_image) + reserved = utils.FieldSet(*cluster.reserved_lvs) + self._VerifyOrphanVolumes(node_vol_should, node_image, reserved) feedback_fn("* Verifying orphan instances") self._VerifyOrphanInstances(instancelist, node_image) @@ -2513,7 +2521,8 @@ class LURenameCluster(LogicalUnit): """Verify that the passed name is a valid one. """ - hostname = utils.GetHostInfo(self.op.name) + hostname = netutils.GetHostname(name=self.op.name, + family=self.cfg.GetPrimaryIPFamily()) new_name = hostname.name self.ip = new_ip = hostname.ip @@ -2524,7 +2533,7 @@ class LURenameCluster(LogicalUnit): " cluster has changed", errors.ECODE_INVAL) if new_ip != old_ip: - if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT): + if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT): raise errors.OpPrereqError("The given cluster IP address (%s) is" " reachable on the network. Aborting." % new_ip, errors.ECODE_NOTUNIQUE) @@ -2572,6 +2581,8 @@ class LURenameCluster(LogicalUnit): self.LogWarning("Could not re-enable the master role on" " the master, please restart manually: %s", msg) + return clustername + class LUSetClusterParams(LogicalUnit): """Change the parameters of the cluster. @@ -2595,6 +2606,7 @@ class LUSetClusterParams(LogicalUnit): ("nicparams", None, _TOr(_TDict, _TNone)), ("drbd_helper", None, _TOr(_TString, _TNone)), ("default_iallocator", None, _TMaybeString), + ("reserved_lvs", None, _TOr(_TListOf(_TNonEmptyString), _TNone)), ] REQ_BGL = False @@ -2865,6 +2877,9 @@ class LUSetClusterParams(LogicalUnit): if self.op.default_iallocator is not None: self.cluster.default_iallocator = self.op.default_iallocator + if self.op.reserved_lvs is not None: + self.cluster.reserved_lvs = self.op.reserved_lvs + self.cfg.Update(self.cluster, feedback_fn) @@ -3654,7 +3669,9 @@ class LUAddNode(LogicalUnit): def CheckArguments(self): # validate/normalize the node name - self.op.node_name = utils.HostInfo.NormalizeName(self.op.node_name) + self.hostname = netutils.GetHostname(name=self.op.node_name, + family=self.cfg.GetPrimaryIPFamily()) + self.op.node_name = self.hostname.name def BuildHooksEnv(self): """Build hooks env. @@ -3683,19 +3700,17 @@ class LUAddNode(LogicalUnit): Any errors are signaled by raising errors.OpPrereqError. """ - node_name = self.op.node_name cfg = self.cfg - - dns_data = utils.GetHostInfo(node_name) - - node = dns_data.name - primary_ip = self.op.primary_ip = dns_data.ip + hostname = self.hostname + node = hostname.name + primary_ip = self.op.primary_ip = hostname.ip if self.op.secondary_ip is None: self.op.secondary_ip = primary_ip - if not utils.IsValidIP4(self.op.secondary_ip): - raise errors.OpPrereqError("Invalid secondary IP given", - errors.ECODE_INVAL) + secondary_ip = self.op.secondary_ip + if not netutils.IP4Address.IsValid(secondary_ip): + raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4" + " address" % secondary_ip, errors.ECODE_INVAL) node_list = cfg.GetNodeList() if not self.op.readd and node in node_list: @@ -3744,13 +3759,13 @@ class LUAddNode(LogicalUnit): errors.ECODE_INVAL) # checks reachability - if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT): + if not netutils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT): raise errors.OpPrereqError("Node not reachable by ping", errors.ECODE_ENVIRON) if not newbie_singlehomed: # check reachability from my secondary ip to newbie's secondary ip - if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT, + if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT, source=myself.secondary_ip): raise errors.OpPrereqError("Node secondary ip not reachable by TCP" " based ping to noded port", @@ -3807,27 +3822,10 @@ class LUAddNode(LogicalUnit): " node version %s" % (constants.PROTOCOL_VERSION, result.payload)) - # setup ssh on node - if self.cfg.GetClusterInfo().modify_ssh_setup: - logging.info("Copy ssh key to node %s", node) - priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS) - keyarray = [] - keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB, - constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB, - priv_key, pub_key] - - for i in keyfiles: - keyarray.append(utils.ReadFile(i)) - - result = self.rpc.call_node_add(node, keyarray[0], keyarray[1], - keyarray[2], keyarray[3], keyarray[4], - keyarray[5]) - result.Raise("Cannot transfer ssh keys to the new node") - # Add node to our /etc/hosts, and add key to known_hosts if self.cfg.GetClusterInfo().modify_etc_hosts: # FIXME: this should be done via an rpc call to node daemon - utils.AddHostToEtcHosts(new_node.name) + utils.AddHostToEtcHosts(self.hostname) if new_node.secondary_ip != new_node.primary_ip: result = self.rpc.call_node_has_ip_address(new_node.name, @@ -3948,7 +3946,7 @@ class LUSetNodeParams(LogicalUnit): # we can't change the master's node flags if self.op.node_name == self.cfg.GetMasterNode(): raise errors.OpPrereqError("The master role can be changed" - " only via masterfailover", + " only via master-failover", errors.ECODE_INVAL) @@ -4099,6 +4097,11 @@ class LUQueryClusterInfo(NoHooksLU): if hv_name in cluster.enabled_hypervisors: os_hvp[os_name][hv_name] = hv_params + # Convert ip_family to ip_version + primary_ip_version = constants.IP4_VERSION + if cluster.primary_ip_family == netutils.IP6Address.family: + primary_ip_version = constants.IP6_VERSION + result = { "software_version": constants.RELEASE_VERSION, "protocol_version": constants.PROTOCOL_VERSION, @@ -4128,6 +4131,8 @@ class LUQueryClusterInfo(NoHooksLU): "tags": list(cluster.GetTags()), "uid_pool": cluster.uid_pool, "default_iallocator": cluster.default_iallocator, + "reserved_lvs": cluster.reserved_lvs, + "primary_ip_version": primary_ip_version, } return result @@ -4854,10 +4859,19 @@ class LURenameInstance(LogicalUnit): _OP_PARAMS = [ _PInstanceName, ("new_name", _NoDefault, _TNonEmptyString), - ("ignore_ip", False, _TBool), - ("check_name", True, _TBool), + ("ip_check", False, _TBool), + ("name_check", True, _TBool), ] + def CheckArguments(self): + """Check arguments. + + """ + if self.op.ip_check and not self.op.name_check: + # TODO: make the ip check more flexible and not depend on the name check + raise errors.OpPrereqError("Cannot do ip check without a name check", + errors.ECODE_INVAL) + def BuildHooksEnv(self): """Build hooks env. @@ -4883,24 +4897,21 @@ class LURenameInstance(LogicalUnit): _CheckInstanceDown(self, instance, "cannot rename") self.instance = instance - # new name verification - if self.op.check_name: - name_info = utils.GetHostInfo(self.op.new_name) - self.op.new_name = name_info.name - new_name = self.op.new_name + if self.op.name_check: + hostname = netutils.GetHostname(name=new_name) + new_name = hostname.name + if (self.op.ip_check and + netutils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT)): + raise errors.OpPrereqError("IP %s of instance %s already in use" % + (hostname.ip, new_name), + errors.ECODE_NOTUNIQUE) instance_list = self.cfg.GetInstanceList() if new_name in instance_list: raise errors.OpPrereqError("Instance '%s' is already in the cluster" % new_name, errors.ECODE_EXISTS) - if not self.op.ignore_ip: - if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT): - raise errors.OpPrereqError("IP %s of instance %s already in use" % - (name_info.ip, new_name), - errors.ECODE_NOTUNIQUE) - def Exec(self, feedback_fn): """Reinstall the instance. @@ -4942,6 +4953,8 @@ class LURenameInstance(LogicalUnit): finally: _ShutdownInstanceDisks(self, inst) + return inst.name + class LURemoveInstance(LogicalUnit): """Remove an instance. @@ -5063,7 +5076,10 @@ class LUQueryInstances(NoHooksLU): if name not in constants.HVC_GLOBALS] + ["be/%s" % name for name in constants.BES_PARAMETERS]) - _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status") + _FIELDS_DYNAMIC = utils.FieldSet("oper_state", + "oper_ram", + "oper_vcpus", + "status") def CheckArguments(self): @@ -5194,6 +5210,13 @@ class LUQueryInstances(NoHooksLU): val = live_data[instance.name].get("memory", "?") else: val = "-" + elif field == "oper_vcpus": + if instance.primary_node in bad_nodes: + val = None + elif instance.name in live_data: + val = live_data[instance.name].get("vcpus", "?") + else: + val = "-" elif field == "vcpus": val = i_be[constants.BE_VCPUS] elif field == "disk_template": @@ -5474,7 +5497,8 @@ class LUMigrateInstance(LogicalUnit): HTYPE = constants.HTYPE_INSTANCE _OP_PARAMS = [ _PInstanceName, - ("live", True, _TBool), + _PMigrationMode, + _PMigrationLive, ("cleanup", False, _TBool), ] @@ -5487,7 +5511,7 @@ class LUMigrateInstance(LogicalUnit): self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE self._migrater = TLMigrateInstance(self, self.op.instance_name, - self.op.live, self.op.cleanup) + self.op.cleanup) self.tasklets = [self._migrater] def DeclareLocks(self, level): @@ -5504,7 +5528,7 @@ class LUMigrateInstance(LogicalUnit): source_node = instance.primary_node target_node = instance.secondary_nodes[0] env = _BuildInstanceHookEnvByObject(self, instance) - env["MIGRATE_LIVE"] = self.op.live + env["MIGRATE_LIVE"] = self._migrater.live env["MIGRATE_CLEANUP"] = self.op.cleanup env.update({ "OLD_PRIMARY": source_node, @@ -5705,7 +5729,8 @@ class LUMigrateNode(LogicalUnit): HTYPE = constants.HTYPE_NODE _OP_PARAMS = [ _PNodeName, - ("live", False, _TBool), + _PMigrationMode, + _PMigrationLive, ] REQ_BGL = False @@ -5726,7 +5751,7 @@ class LUMigrateNode(LogicalUnit): logging.debug("Migrating instance %s", inst.name) names.append(inst.name) - tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False)) + tasklets.append(TLMigrateInstance(self, inst.name, False)) self.tasklets = tasklets @@ -5753,7 +5778,14 @@ class LUMigrateNode(LogicalUnit): class TLMigrateInstance(Tasklet): - def __init__(self, lu, instance_name, live, cleanup): + """Tasklet class for instance migration. + + @type live: boolean + @ivar live: whether the migration will be done live or non-live; + this variable is initalized only after CheckPrereq has run + + """ + def __init__(self, lu, instance_name, cleanup): """Initializes this class. """ @@ -5761,8 +5793,8 @@ class TLMigrateInstance(Tasklet): # Parameters self.instance_name = instance_name - self.live = live self.cleanup = cleanup + self.live = False # will be overridden later def CheckPrereq(self): """Check prerequisites. @@ -5803,6 +5835,25 @@ class TLMigrateInstance(Tasklet): self.instance = instance + if self.lu.op.live is not None and self.lu.op.mode is not None: + raise errors.OpPrereqError("Only one of the 'live' and 'mode'" + " parameters are accepted", + errors.ECODE_INVAL) + if self.lu.op.live is not None: + if self.lu.op.live: + self.lu.op.mode = constants.HT_MIGRATION_LIVE + else: + self.lu.op.mode = constants.HT_MIGRATION_NONLIVE + # reset the 'live' parameter to None so that repeated + # invocations of CheckPrereq do not raise an exception + self.lu.op.live = None + elif self.lu.op.mode is None: + # read the default value from the hypervisor + i_hv = self.cfg.GetClusterInfo().FillHV(instance, skip_globals=False) + self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE] + + self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE + def _WaitUntilSync(self): """Poll with custom rpc for disk sync. @@ -6456,7 +6507,7 @@ class LUCreateInstance(LogicalUnit): ("os_type", None, _TMaybeString), ("force_variant", False, _TBool), ("source_handshake", None, _TOr(_TList, _TNone)), - ("source_x509_ca", None, _TOr(_TList, _TNone)), + ("source_x509_ca", None, _TMaybeString), ("source_instance_name", None, _TMaybeString), ("src_node", None, _TMaybeString), ("src_path", None, _TMaybeString), @@ -6482,10 +6533,12 @@ class LUCreateInstance(LogicalUnit): self.LogInfo("No-installation mode selected, disabling startup") self.op.start = False # validate/normalize the instance name - self.op.instance_name = utils.HostInfo.NormalizeName(self.op.instance_name) + self.op.instance_name = \ + netutils.Hostname.GetNormalizedName(self.op.instance_name) + if self.op.ip_check and not self.op.name_check: # TODO: make the ip check more flexible and not depend on the name check - raise errors.OpPrereqError("Cannot do ip checks without a name check", + raise errors.OpPrereqError("Cannot do ip check without a name check", errors.ECODE_INVAL) # check nics' parameter names @@ -6520,7 +6573,7 @@ class LUCreateInstance(LogicalUnit): # instance name verification if self.op.name_check: - self.hostname1 = utils.GetHostInfo(self.op.instance_name) + self.hostname1 = netutils.GetHostname(name=self.op.instance_name) self.op.instance_name = self.hostname1.name # used in CheckPrereq for ip ping check self.check_ip = self.hostname1.ip @@ -6601,7 +6654,7 @@ class LUCreateInstance(LogicalUnit): errors.ECODE_INVAL) self.source_instance_name = \ - utils.GetHostInfo(utils.HostInfo.NormalizeName(src_instance_name)).name + netutils.GetHostname(name=src_instance_name).name else: raise errors.OpPrereqError("Invalid instance creation mode %r" % @@ -6945,7 +6998,7 @@ class LUCreateInstance(LogicalUnit): errors.ECODE_INVAL) nic_ip = self.hostname1.ip else: - if not utils.IsValidIP4(ip): + if not netutils.IP4Address.IsValid(ip): raise errors.OpPrereqError("Given IP address '%s' doesn't look" " like a valid IP" % ip, errors.ECODE_INVAL) @@ -7051,7 +7104,7 @@ class LUCreateInstance(LogicalUnit): # ip ping checks (we use the same ip that was resolved in ExpandNames) if self.op.ip_check: - if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT): + if netutils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT): raise errors.OpPrereqError("IP %s of instance %s already in use" % (self.check_ip, self.op.instance_name), errors.ECODE_NOTUNIQUE) @@ -8620,7 +8673,7 @@ class LUSetInstanceParams(LogicalUnit): if nic_ip.lower() == constants.VALUE_NONE: nic_dict['ip'] = None else: - if not utils.IsValidIP4(nic_ip): + if not netutils.IP4Address.IsValid(nic_ip): raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip, errors.ECODE_INVAL) @@ -8753,6 +8806,10 @@ class LUSetInstanceParams(LogicalUnit): errors.ECODE_INVAL) _CheckInstanceDown(self, instance, "cannot change disk template") if self.op.disk_template in constants.DTS_NET_MIRROR: + if self.op.remote_node == pnode: + raise errors.OpPrereqError("Given new secondary node %s is the same" + " as the primary node of the instance" % + self.op.remote_node, errors.ECODE_STATE) _CheckNodeOnline(self, self.op.remote_node) _CheckNodeNotDrained(self, self.op.remote_node) disks = [{"size": d.size} for d in instance.disks] @@ -9837,6 +9894,148 @@ class LUTestDelay(NoHooksLU): self._TestDelay() +class LUTestJobqueue(NoHooksLU): + """Utility LU to test some aspects of the job queue. + + """ + _OP_PARAMS = [ + ("notify_waitlock", False, _TBool), + ("notify_exec", False, _TBool), + ("log_messages", _EmptyList, _TListOf(_TString)), + ("fail", False, _TBool), + ] + REQ_BGL = False + + # Must be lower than default timeout for WaitForJobChange to see whether it + # notices changed jobs + _CLIENT_CONNECT_TIMEOUT = 20.0 + _CLIENT_CONFIRM_TIMEOUT = 60.0 + + @classmethod + def _NotifyUsingSocket(cls, cb, errcls): + """Opens a Unix socket and waits for another program to connect. + + @type cb: callable + @param cb: Callback to send socket name to client + @type errcls: class + @param errcls: Exception class to use for errors + + """ + # Using a temporary directory as there's no easy way to create temporary + # sockets without writing a custom loop around tempfile.mktemp and + # socket.bind + tmpdir = tempfile.mkdtemp() + try: + tmpsock = utils.PathJoin(tmpdir, "sock") + + logging.debug("Creating temporary socket at %s", tmpsock) + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + try: + sock.bind(tmpsock) + sock.listen(1) + + # Send details to client + cb(tmpsock) + + # Wait for client to connect before continuing + sock.settimeout(cls._CLIENT_CONNECT_TIMEOUT) + try: + (conn, _) = sock.accept() + except socket.error, err: + raise errcls("Client didn't connect in time (%s)" % err) + finally: + sock.close() + finally: + # Remove as soon as client is connected + shutil.rmtree(tmpdir) + + # Wait for client to close + try: + try: + # pylint: disable-msg=E1101 + # Instance of '_socketobject' has no ... member + conn.settimeout(cls._CLIENT_CONFIRM_TIMEOUT) + conn.recv(1) + except socket.error, err: + raise errcls("Client failed to confirm notification (%s)" % err) + finally: + conn.close() + + def _SendNotification(self, test, arg, sockname): + """Sends a notification to the client. + + @type test: string + @param test: Test name + @param arg: Test argument (depends on test) + @type sockname: string + @param sockname: Socket path + + """ + self.Log(constants.ELOG_JQUEUE_TEST, (sockname, test, arg)) + + def _Notify(self, prereq, test, arg): + """Notifies the client of a test. + + @type prereq: bool + @param prereq: Whether this is a prereq-phase test + @type test: string + @param test: Test name + @param arg: Test argument (depends on test) + + """ + if prereq: + errcls = errors.OpPrereqError + else: + errcls = errors.OpExecError + + return self._NotifyUsingSocket(compat.partial(self._SendNotification, + test, arg), + errcls) + + def CheckArguments(self): + self.checkargs_calls = getattr(self, "checkargs_calls", 0) + 1 + self.expandnames_calls = 0 + + def ExpandNames(self): + checkargs_calls = getattr(self, "checkargs_calls", 0) + if checkargs_calls < 1: + raise errors.ProgrammerError("CheckArguments was not called") + + self.expandnames_calls += 1 + + if self.op.notify_waitlock: + self._Notify(True, constants.JQT_EXPANDNAMES, None) + + self.LogInfo("Expanding names") + + # Get lock on master node (just to get a lock, not for a particular reason) + self.needed_locks = { + locking.LEVEL_NODE: self.cfg.GetMasterNode(), + } + + def Exec(self, feedback_fn): + if self.expandnames_calls < 1: + raise errors.ProgrammerError("ExpandNames was not called") + + if self.op.notify_exec: + self._Notify(False, constants.JQT_EXEC, None) + + self.LogInfo("Executing") + + if self.op.log_messages: + self._Notify(False, constants.JQT_STARTMSG, len(self.op.log_messages)) + for idx, msg in enumerate(self.op.log_messages): + self.LogInfo("Sending log message %s", idx + 1) + feedback_fn(constants.JQT_MSGPREFIX + msg) + # Report how many test messages have been sent + self._Notify(False, constants.JQT_LOGMSG, idx + 1) + + if self.op.fail: + raise errors.OpExecError("Opcode failure was requested") + + return True + + class IAllocator(object): """IAllocator framework.