import logging
import copy
import OpenSSL
+import socket
+import tempfile
+import shutil
from ganeti import ssh
from ganeti import utils
from ganeti import uidpool
from ganeti import compat
from ganeti import masterd
+from ganeti import netutils
import ganeti.masterd.instance # pylint: disable-msg=W0611
#: a required node name (for single-node LUs)
_PNodeName = ("node_name", _NoDefault, _TNonEmptyString)
+#: the migration type (live/non-live)
+_PMigrationLive = ("live", None, _TOr(_TNone,
+ _TElemOf(constants.HT_MIGRATION_TYPES)))
+
# End types
class LogicalUnit(object):
self.recalculate_locks = {}
self.__ssh = None
# logging
+ self.Log = processor.Log # pylint: disable-msg=C0103
self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
self.LogStep = processor.LogStep # pylint: disable-msg=C0103
# Acquire all nodes and one instance
self.needed_locks = {
locking.LEVEL_NODE: locking.ALL_SET,
- locking.LEVEL_INSTANCE: ['instance1.example.tld'],
+ locking.LEVEL_INSTANCE: ['instance1.example.com'],
}
# Acquire just two nodes
self.needed_locks = {
- locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
+ locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'],
}
# Acquire no locks
self.needed_locks = {} # No, you can't leave it to the default value None
return faulty
+def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
+ """Check the sanity of iallocator and node arguments and use the
+ cluster-wide iallocator if appropriate.
+
+ Check that at most one of (iallocator, node) is specified. If none is
+ specified, then the LU's opcode's iallocator slot is filled with the
+ cluster-wide default iallocator.
+
+ @type iallocator_slot: string
+ @param iallocator_slot: the name of the opcode iallocator slot
+ @type node_slot: string
+ @param node_slot: the name of the opcode target node slot
+
+ """
+ node = getattr(lu.op, node_slot, None)
+ iallocator = getattr(lu.op, iallocator_slot, None)
+
+ if node is not None and iallocator is not None:
+ raise errors.OpPrereqError("Do not specify both, iallocator and node.",
+ errors.ECODE_INVAL)
+ elif node is None and iallocator is None:
+ default_iallocator = lu.cfg.GetDefaultIAllocator()
+ if default_iallocator:
+ setattr(lu.op, iallocator_slot, default_iallocator)
+ else:
+ raise errors.OpPrereqError("No iallocator or node given and no"
+ " cluster-wide default iallocator found."
+ " Please specify either an iallocator or a"
+ " node, or set a cluster-wide default"
+ " iallocator.")
+
+
class LUPostInitCluster(LogicalUnit):
"""Logical unit for running hooks after cluster initialization.
self.bad = self.bad or cond
def _VerifyNode(self, ninfo, nresult):
- """Run multiple tests against a node.
+ """Perform some basic validation on data returned from a node.
- Test list:
-
- - compares ganeti version
- - checks vg existence and size > 20G
- - checks config file checksum
- - checks ssh to other nodes
+ - check the result data structure is well formed and has all the mandatory
+ fields
+ - check ganeti version
@type ninfo: L{objects.Node}
@param ninfo: the node to check
"""Verify that the passed name is a valid one.
"""
- hostname = utils.GetHostInfo(self.op.name)
+ hostname = netutils.GetHostInfo(self.op.name)
new_name = hostname.name
self.ip = new_ip = hostname.ip
" cluster has changed",
errors.ECODE_INVAL)
if new_ip != old_ip:
- if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
+ if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
raise errors.OpPrereqError("The given cluster IP address (%s) is"
" reachable on the network. Aborting." %
new_ip, errors.ECODE_NOTUNIQUE)
("maintain_node_health", None, _TMaybeBool),
("nicparams", None, _TOr(_TDict, _TNone)),
("drbd_helper", None, _TOr(_TString, _TNone)),
+ ("default_iallocator", None, _TMaybeString),
]
REQ_BGL = False
hv_class.CheckParameterSyntax(new_osp)
_CheckHVParams(self, node_list, hv_name, new_osp)
+ if self.op.default_iallocator:
+ alloc_script = utils.FindFile(self.op.default_iallocator,
+ constants.IALLOCATOR_SEARCH_PATH,
+ os.path.isfile)
+ if alloc_script is None:
+ raise errors.OpPrereqError("Invalid default iallocator script '%s'"
+ " specified" % self.op.default_iallocator,
+ errors.ECODE_INVAL)
def Exec(self, feedback_fn):
"""Change the parameters of the cluster.
if self.op.uid_pool is not None:
self.cluster.uid_pool = self.op.uid_pool
+ if self.op.default_iallocator is not None:
+ self.cluster.default_iallocator = self.op.default_iallocator
+
self.cfg.Update(self.cluster, feedback_fn)
def CheckArguments(self):
# validate/normalize the node name
- self.op.node_name = utils.HostInfo.NormalizeName(self.op.node_name)
+ self.op.node_name = netutils.HostInfo.NormalizeName(self.op.node_name)
def BuildHooksEnv(self):
"""Build hooks env.
node_name = self.op.node_name
cfg = self.cfg
- dns_data = utils.GetHostInfo(node_name)
+ dns_data = netutils.GetHostInfo(node_name)
node = dns_data.name
primary_ip = self.op.primary_ip = dns_data.ip
if self.op.secondary_ip is None:
self.op.secondary_ip = primary_ip
- if not utils.IsValidIP4(self.op.secondary_ip):
+ if not netutils.IsValidIP4(self.op.secondary_ip):
raise errors.OpPrereqError("Invalid secondary IP given",
errors.ECODE_INVAL)
secondary_ip = self.op.secondary_ip
errors.ECODE_INVAL)
# checks reachability
- if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
+ if not netutils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
raise errors.OpPrereqError("Node not reachable by ping",
errors.ECODE_ENVIRON)
if not newbie_singlehomed:
# check reachability from my secondary ip to newbie's secondary ip
- if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
+ if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
source=myself.secondary_ip):
raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
" based ping to noded port",
"uuid": cluster.uuid,
"tags": list(cluster.GetTags()),
"uid_pool": cluster.uid_pool,
+ "default_iallocator": cluster.default_iallocator,
}
return result
# new name verification
if self.op.check_name:
- name_info = utils.GetHostInfo(self.op.new_name)
+ name_info = netutils.GetHostInfo(self.op.new_name)
self.op.new_name = name_info.name
new_name = self.op.new_name
new_name, errors.ECODE_EXISTS)
if not self.op.ignore_ip:
- if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
+ if netutils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
raise errors.OpPrereqError("IP %s of instance %s already in use" %
(name_info.ip, new_name),
errors.ECODE_NOTUNIQUE)
if name not in constants.HVC_GLOBALS] +
["be/%s" % name
for name in constants.BES_PARAMETERS])
- _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
+ _FIELDS_DYNAMIC = utils.FieldSet("oper_state",
+ "oper_ram",
+ "oper_vcpus",
+ "status")
def CheckArguments(self):
val = live_data[instance.name].get("memory", "?")
else:
val = "-"
+ elif field == "oper_vcpus":
+ if instance.primary_node in bad_nodes:
+ val = None
+ elif instance.name in live_data:
+ val = live_data[instance.name].get("vcpus", "?")
+ else:
+ val = "-"
elif field == "vcpus":
val = i_be[constants.BE_VCPUS]
elif field == "disk_template":
HTYPE = constants.HTYPE_INSTANCE
_OP_PARAMS = [
_PInstanceName,
- ("live", True, _TBool),
+ _PMigrationLive,
("cleanup", False, _TBool),
]
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
self._migrater = TLMigrateInstance(self, self.op.instance_name,
- self.op.live, self.op.cleanup)
+ self.op.cleanup)
self.tasklets = [self._migrater]
def DeclareLocks(self, level):
HTYPE = constants.HTYPE_NODE
_OP_PARAMS = [
_PNodeName,
- ("live", False, _TBool),
+ _PMigrationLive,
]
REQ_BGL = False
logging.debug("Migrating instance %s", inst.name)
names.append(inst.name)
- tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
+ tasklets.append(TLMigrateInstance(self, inst.name, False))
self.tasklets = tasklets
class TLMigrateInstance(Tasklet):
- def __init__(self, lu, instance_name, live, cleanup):
+ def __init__(self, lu, instance_name, cleanup):
"""Initializes this class.
"""
# Parameters
self.instance_name = instance_name
- self.live = live
self.cleanup = cleanup
+ self.live = False # will be overridden later
def CheckPrereq(self):
"""Check prerequisites.
self.instance = instance
+ if self.lu.op.live is None:
+ # read the default value from the hypervisor
+ i_hv = self.cfg.GetClusterInfo().FillHV(instance, skip_globals=False)
+ self.lu.op.live = i_hv[constants.HV_MIGRATION_TYPE]
+
+ self.live = self.lu.op.live == constants.HT_MIGRATION_LIVE
+
def _WaitUntilSync(self):
"""Poll with custom rpc for disk sync.
self.LogInfo("No-installation mode selected, disabling startup")
self.op.start = False
# validate/normalize the instance name
- self.op.instance_name = utils.HostInfo.NormalizeName(self.op.instance_name)
+ self.op.instance_name = \
+ netutils.HostInfo.NormalizeName(self.op.instance_name)
+
if self.op.ip_check and not self.op.name_check:
# TODO: make the ip check more flexible and not depend on the name check
raise errors.OpPrereqError("Cannot do ip checks without a name check",
# instance name verification
if self.op.name_check:
- self.hostname1 = utils.GetHostInfo(self.op.instance_name)
+ self.hostname1 = netutils.GetHostInfo(self.op.instance_name)
self.op.instance_name = self.hostname1.name
# used in CheckPrereq for ip ping check
self.check_ip = self.hostname1.ip
errors.ECODE_INVAL)
### Node/iallocator related checks
- if [self.op.iallocator, self.op.pnode].count(None) != 1:
- raise errors.OpPrereqError("One and only one of iallocator and primary"
- " node must be given",
- errors.ECODE_INVAL)
+ _CheckIAllocatorOrNode(self, "iallocator", "pnode")
self._cds = _GetClusterDomainSecret()
raise errors.OpPrereqError("Missing source instance name",
errors.ECODE_INVAL)
- self.source_instance_name = \
- utils.GetHostInfo(utils.HostInfo.NormalizeName(src_instance_name)).name
+ norm_name = netutils.HostInfo.NormalizeName(src_instance_name)
+ self.source_instance_name = netutils.GetHostInfo(norm_name).name
else:
raise errors.OpPrereqError("Invalid instance creation mode %r" %
errors.ECODE_INVAL)
nic_ip = self.hostname1.ip
else:
- if not utils.IsValidIP4(ip):
+ if not netutils.IsValidIP4(ip):
raise errors.OpPrereqError("Given IP address '%s' doesn't look"
" like a valid IP" % ip,
errors.ECODE_INVAL)
# ip ping checks (we use the same ip that was resolved in ExpandNames)
if self.op.ip_check:
- if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
+ if netutils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
raise errors.OpPrereqError("IP %s of instance %s already in use" %
(self.check_ip, self.op.instance_name),
errors.ECODE_NOTUNIQUE)
REQ_BGL = False
def CheckArguments(self):
- if self.op.remote_node is not None and self.op.iallocator is not None:
- raise errors.OpPrereqError("Give either the iallocator or the new"
- " secondary, not both", errors.ECODE_INVAL)
+ _CheckIAllocatorOrNode(self, "iallocator", "remote_node")
def ExpandNames(self):
self.op.nodes = _GetWantedNodes(self, self.op.nodes)
if nic_ip.lower() == constants.VALUE_NONE:
nic_dict['ip'] = None
else:
- if not utils.IsValidIP4(nic_ip):
+ if not netutils.IsValidIP4(nic_ip):
raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip,
errors.ECODE_INVAL)
feedback_fn("Deactivating disks for %s" % instance.name)
_ShutdownInstanceDisks(self, instance)
+ if not (compat.all(dresults) and fin_resu):
+ failures = []
+ if not fin_resu:
+ failures.append("export finalization")
+ if not compat.all(dresults):
+ fdsk = utils.CommaJoin(idx for (idx, dsk) in enumerate(dresults)
+ if not dsk)
+ failures.append("disk export: disk(s) %s" % fdsk)
+
+ raise errors.OpExecError("Export failed, errors in %s" %
+ utils.CommaJoin(failures))
+
+ # At this point, the export was successful, we can cleanup/finish
+
# Remove instance if requested
if self.op.remove_instance:
- if not (compat.all(dresults) and fin_resu):
- feedback_fn("Not removing instance %s as parts of the export failed" %
- instance.name)
- else:
- feedback_fn("Removing instance %s" % instance.name)
- _RemoveInstance(self, feedback_fn, instance,
- self.op.ignore_remove_failures)
+ feedback_fn("Removing instance %s" % instance.name)
+ _RemoveInstance(self, feedback_fn, instance,
+ self.op.ignore_remove_failures)
if self.op.mode == constants.EXPORT_MODE_LOCAL:
self._CleanupExports(feedback_fn)
self._TestDelay()
+class LUTestJobqueue(NoHooksLU):
+ """Utility LU to test some aspects of the job queue.
+
+ """
+ _OP_PARAMS = [
+ ("notify_waitlock", False, _TBool),
+ ("notify_exec", False, _TBool),
+ ("log_messages", _EmptyList, _TListOf(_TString)),
+ ("fail", False, _TBool),
+ ]
+ REQ_BGL = False
+
+ # Must be lower than default timeout for WaitForJobChange to see whether it
+ # notices changed jobs
+ _CLIENT_CONNECT_TIMEOUT = 20.0
+ _CLIENT_CONFIRM_TIMEOUT = 60.0
+
+ @classmethod
+ def _NotifyUsingSocket(cls, cb, errcls):
+ """Opens a Unix socket and waits for another program to connect.
+
+ @type cb: callable
+ @param cb: Callback to send socket name to client
+ @type errcls: class
+ @param errcls: Exception class to use for errors
+
+ """
+ # Using a temporary directory as there's no easy way to create temporary
+ # sockets without writing a custom loop around tempfile.mktemp and
+ # socket.bind
+ tmpdir = tempfile.mkdtemp()
+ try:
+ tmpsock = utils.PathJoin(tmpdir, "sock")
+
+ logging.debug("Creating temporary socket at %s", tmpsock)
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ try:
+ sock.bind(tmpsock)
+ sock.listen(1)
+
+ # Send details to client
+ cb(tmpsock)
+
+ # Wait for client to connect before continuing
+ sock.settimeout(cls._CLIENT_CONNECT_TIMEOUT)
+ try:
+ (conn, _) = sock.accept()
+ except socket.error, err:
+ raise errcls("Client didn't connect in time (%s)" % err)
+ finally:
+ sock.close()
+ finally:
+ # Remove as soon as client is connected
+ shutil.rmtree(tmpdir)
+
+ # Wait for client to close
+ try:
+ try:
+ conn.settimeout(cls._CLIENT_CONFIRM_TIMEOUT)
+ conn.recv(1)
+ except socket.error, err:
+ raise errcls("Client failed to confirm notification (%s)" % err)
+ finally:
+ conn.close()
+
+ def _SendNotification(self, test, arg, sockname):
+ """Sends a notification to the client.
+
+ @type test: string
+ @param test: Test name
+ @param arg: Test argument (depends on test)
+ @type sockname: string
+ @param sockname: Socket path
+
+ """
+ self.Log(constants.ELOG_JQUEUE_TEST, (sockname, test, arg))
+
+ def _Notify(self, prereq, test, arg):
+ """Notifies the client of a test.
+
+ @type prereq: bool
+ @param prereq: Whether this is a prereq-phase test
+ @type test: string
+ @param test: Test name
+ @param arg: Test argument (depends on test)
+
+ """
+ if prereq:
+ errcls = errors.OpPrereqError
+ else:
+ errcls = errors.OpExecError
+
+ return self._NotifyUsingSocket(compat.partial(self._SendNotification,
+ test, arg),
+ errcls)
+
+ def CheckArguments(self):
+ self.checkargs_calls = getattr(self, "checkargs_calls", 0) + 1
+ self.expandnames_calls = 0
+
+ def ExpandNames(self):
+ checkargs_calls = getattr(self, "checkargs_calls", 0)
+ if checkargs_calls < 1:
+ raise errors.ProgrammerError("CheckArguments was not called")
+
+ self.expandnames_calls += 1
+
+ if self.op.notify_waitlock:
+ self._Notify(True, constants.JQT_EXPANDNAMES, None)
+
+ self.LogInfo("Expanding names")
+
+ # Get lock on master node (just to get a lock, not for a particular reason)
+ self.needed_locks = {
+ locking.LEVEL_NODE: self.cfg.GetMasterNode(),
+ }
+
+ def Exec(self, feedback_fn):
+ if self.expandnames_calls < 1:
+ raise errors.ProgrammerError("ExpandNames was not called")
+
+ if self.op.notify_exec:
+ self._Notify(False, constants.JQT_EXEC, None)
+
+ self.LogInfo("Executing")
+
+ if self.op.log_messages:
+ for idx, msg in enumerate(self.op.log_messages):
+ self.LogInfo("Sending log message %s", idx + 1)
+ feedback_fn(constants.JQT_MSGPREFIX + msg)
+ # Report how many test messages have been sent
+ self._Notify(False, constants.JQT_LOGMSG, idx + 1)
+
+ if self.op.fail:
+ raise errors.OpExecError("Opcode failure was requested")
+
+ return True
+
+
class IAllocator(object):
"""IAllocator framework.