#
#
-# Copyright (C) 2006, 2007, 2008 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
import logging
import copy
import OpenSSL
+import socket
+import tempfile
+import shutil
from ganeti import ssh
from ganeti import utils
#: a required node name (for single-node LUs)
_PNodeName = ("node_name", _NoDefault, _TNonEmptyString)
+#: the migration type (live/non-live)
+_PMigrationMode = ("mode", None, _TOr(_TNone,
+ _TElemOf(constants.HT_MIGRATION_MODES)))
+
+#: the obsolete 'live' mode (boolean)
+_PMigrationLive = ("live", None, _TMaybeBool)
+
# End types
class LogicalUnit(object):
self.recalculate_locks = {}
self.__ssh = None
# logging
+ self.Log = processor.Log # pylint: disable-msg=C0103
self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
self.LogStep = processor.LogStep # pylint: disable-msg=C0103
# Acquire all nodes and one instance
self.needed_locks = {
locking.LEVEL_NODE: locking.ALL_SET,
- locking.LEVEL_INSTANCE: ['instance1.example.tld'],
+ locking.LEVEL_INSTANCE: ['instance1.example.com'],
}
# Acquire just two nodes
self.needed_locks = {
- locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
+ locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'],
}
# Acquire no locks
self.needed_locks = {} # No, you can't leave it to the default value None
self.bad = self.bad or cond
def _VerifyNode(self, ninfo, nresult):
- """Run multiple tests against a node.
-
- Test list:
+ """Perform some basic validation on data returned from a node.
- - compares ganeti version
- - checks vg existence and size > 20G
- - checks config file checksum
- - checks ssh to other nodes
+ - check the result data structure is well formed and has all the
+ mandatory fields
+ - check ganeti version
@type ninfo: L{objects.Node}
@param ninfo: the node to check
_ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
"instance should not run on node %s", node)
- def _VerifyOrphanVolumes(self, node_vol_should, node_image):
+ def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
"""Verify if there are any unknown volumes in the cluster.
The .os, .swap and backup volumes are ignored. All other volumes are
reported as unknown.
+ @type reserved: L{ganeti.utils.FieldSet}
+ @param reserved: a FieldSet of reserved volume names
+
"""
for node, n_img in node_image.items():
if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
# skip non-healthy nodes
continue
for volume in n_img.volumes:
- test = (node not in node_vol_should or
- volume not in node_vol_should[node])
+ test = ((node not in node_vol_should or
+ volume not in node_vol_should[node]) and
+ not reserved.Matches(volume))
self._ErrorIf(test, self.ENODEORPHANLV, node,
"volume %s is unknown", volume)
"instance lives on ghost node %s", node)
feedback_fn("* Verifying orphan volumes")
- self._VerifyOrphanVolumes(node_vol_should, node_image)
+ reserved = utils.FieldSet(*cluster.reserved_lvs)
+ self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
feedback_fn("* Verifying orphan instances")
self._VerifyOrphanInstances(instancelist, node_image)
self.LogWarning("Could not re-enable the master role on"
" the master, please restart manually: %s", msg)
+ return clustername
+
class LUSetClusterParams(LogicalUnit):
"""Change the parameters of the cluster.
("nicparams", None, _TOr(_TDict, _TNone)),
("drbd_helper", None, _TOr(_TString, _TNone)),
("default_iallocator", None, _TMaybeString),
+ ("reserved_lvs", None, _TOr(_TListOf(_TNonEmptyString), _TNone)),
]
REQ_BGL = False
if self.op.default_iallocator is not None:
self.cluster.default_iallocator = self.op.default_iallocator
+ if self.op.reserved_lvs is not None:
+ self.cluster.reserved_lvs = self.op.reserved_lvs
+
self.cfg.Update(self.cluster, feedback_fn)
# we can't change the master's node flags
if self.op.node_name == self.cfg.GetMasterNode():
raise errors.OpPrereqError("The master role can be changed"
- " only via masterfailover",
+ " only via master-failover",
errors.ECODE_INVAL)
"tags": list(cluster.GetTags()),
"uid_pool": cluster.uid_pool,
"default_iallocator": cluster.default_iallocator,
+ "reserved_lvs": cluster.reserved_lvs,
}
return result
_OP_PARAMS = [
_PInstanceName,
("new_name", _NoDefault, _TNonEmptyString),
- ("ignore_ip", False, _TBool),
- ("check_name", True, _TBool),
+ ("ip_check", False, _TBool),
+ ("name_check", True, _TBool),
]
+ def CheckArguments(self):
+ """Check arguments.
+
+ """
+ if self.op.ip_check and not self.op.name_check:
+ # TODO: make the ip check more flexible and not depend on the name check
+ raise errors.OpPrereqError("Cannot do ip check without a name check",
+ errors.ECODE_INVAL)
+
def BuildHooksEnv(self):
"""Build hooks env.
_CheckInstanceDown(self, instance, "cannot rename")
self.instance = instance
- # new name verification
- if self.op.check_name:
- name_info = netutils.GetHostInfo(self.op.new_name)
- self.op.new_name = name_info.name
-
new_name = self.op.new_name
+ if self.op.name_check:
+ hostinfo = netutils.HostInfo(netutils.HostInfo.NormalizeName(new_name))
+ new_name = hostinfo.name
+ if (self.op.ip_check and
+ netutils.TcpPing(hostinfo.ip, constants.DEFAULT_NODED_PORT)):
+ raise errors.OpPrereqError("IP %s of instance %s already in use" %
+ (hostinfo.ip, new_name),
+ errors.ECODE_NOTUNIQUE)
instance_list = self.cfg.GetInstanceList()
if new_name in instance_list:
raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
new_name, errors.ECODE_EXISTS)
- if not self.op.ignore_ip:
- if netutils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
- raise errors.OpPrereqError("IP %s of instance %s already in use" %
- (name_info.ip, new_name),
- errors.ECODE_NOTUNIQUE)
def Exec(self, feedback_fn):
"""Reinstall the instance.
finally:
_ShutdownInstanceDisks(self, inst)
+ return inst.name
+
class LURemoveInstance(LogicalUnit):
"""Remove an instance.
HTYPE = constants.HTYPE_INSTANCE
_OP_PARAMS = [
_PInstanceName,
- ("live", True, _TBool),
+ _PMigrationMode,
+ _PMigrationLive,
("cleanup", False, _TBool),
]
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
self._migrater = TLMigrateInstance(self, self.op.instance_name,
- self.op.live, self.op.cleanup)
+ self.op.cleanup)
self.tasklets = [self._migrater]
def DeclareLocks(self, level):
source_node = instance.primary_node
target_node = instance.secondary_nodes[0]
env = _BuildInstanceHookEnvByObject(self, instance)
- env["MIGRATE_LIVE"] = self.op.live
+ env["MIGRATE_LIVE"] = self._migrater.live
env["MIGRATE_CLEANUP"] = self.op.cleanup
env.update({
"OLD_PRIMARY": source_node,
HTYPE = constants.HTYPE_NODE
_OP_PARAMS = [
_PNodeName,
- ("live", False, _TBool),
+ _PMigrationMode,
+ _PMigrationLive,
]
REQ_BGL = False
logging.debug("Migrating instance %s", inst.name)
names.append(inst.name)
- tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
+ tasklets.append(TLMigrateInstance(self, inst.name, False))
self.tasklets = tasklets
class TLMigrateInstance(Tasklet):
- def __init__(self, lu, instance_name, live, cleanup):
+ """Tasklet class for instance migration.
+
+ @type live: boolean
+ @ivar live: whether the migration will be done live or non-live;
+ this variable is initalized only after CheckPrereq has run
+
+ """
+ def __init__(self, lu, instance_name, cleanup):
"""Initializes this class.
"""
# Parameters
self.instance_name = instance_name
- self.live = live
self.cleanup = cleanup
+ self.live = False # will be overridden later
def CheckPrereq(self):
"""Check prerequisites.
self.instance = instance
+ if self.lu.op.live is not None and self.lu.op.mode is not None:
+ raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
+ " parameters are accepted",
+ errors.ECODE_INVAL)
+ if self.lu.op.live is not None:
+ if self.lu.op.live:
+ self.lu.op.mode = constants.HT_MIGRATION_LIVE
+ else:
+ self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
+ # reset the 'live' parameter to None so that repeated
+ # invocations of CheckPrereq do not raise an exception
+ self.lu.op.live = None
+ elif self.lu.op.mode is None:
+ # read the default value from the hypervisor
+ i_hv = self.cfg.GetClusterInfo().FillHV(instance, skip_globals=False)
+ self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
+
+ self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
+
def _WaitUntilSync(self):
"""Poll with custom rpc for disk sync.
("os_type", None, _TMaybeString),
("force_variant", False, _TBool),
("source_handshake", None, _TOr(_TList, _TNone)),
- ("source_x509_ca", None, _TOr(_TList, _TNone)),
+ ("source_x509_ca", None, _TMaybeString),
("source_instance_name", None, _TMaybeString),
("src_node", None, _TMaybeString),
("src_path", None, _TMaybeString),
("identify_defaults", False, _TBool),
("file_driver", None, _TOr(_TNone, _TElemOf(constants.FILE_DRIVER))),
("file_storage_dir", None, _TMaybeString),
- ("dry_run", False, _TBool),
]
REQ_BGL = False
if self.op.ip_check and not self.op.name_check:
# TODO: make the ip check more flexible and not depend on the name check
- raise errors.OpPrereqError("Cannot do ip checks without a name check",
+ raise errors.OpPrereqError("Cannot do ip check without a name check",
errors.ECODE_INVAL)
# check nics' parameter names
### Node/iallocator related checks
_CheckIAllocatorOrNode(self, "iallocator", "pnode")
+ if self.op.pnode is not None:
+ if self.op.disk_template in constants.DTS_NET_MIRROR:
+ if self.op.snode is None:
+ raise errors.OpPrereqError("The networked disk templates need"
+ " a mirror node", errors.ECODE_INVAL)
+ elif self.op.snode:
+ self.LogWarning("Secondary node will be ignored on non-mirrored disk"
+ " template")
+ self.op.snode = None
+
self._cds = _GetClusterDomainSecret()
if self.op.mode == constants.INSTANCE_IMPORT:
# mirror node verification
if self.op.disk_template in constants.DTS_NET_MIRROR:
- if self.op.snode is None:
- raise errors.OpPrereqError("The networked disk templates need"
- " a mirror node", errors.ECODE_INVAL)
if self.op.snode == pnode.name:
raise errors.OpPrereqError("The secondary node cannot be the"
" primary node.", errors.ECODE_INVAL)
errors.ECODE_INVAL)
_CheckInstanceDown(self, instance, "cannot change disk template")
if self.op.disk_template in constants.DTS_NET_MIRROR:
+ if self.op.remote_node == pnode:
+ raise errors.OpPrereqError("Given new secondary node %s is the same"
+ " as the primary node of the instance" %
+ self.op.remote_node, errors.ECODE_STATE)
_CheckNodeOnline(self, self.op.remote_node)
_CheckNodeNotDrained(self, self.op.remote_node)
disks = [{"size": d.size} for d in instance.disks]
self._TestDelay()
+class LUTestJobqueue(NoHooksLU):
+ """Utility LU to test some aspects of the job queue.
+
+ """
+ _OP_PARAMS = [
+ ("notify_waitlock", False, _TBool),
+ ("notify_exec", False, _TBool),
+ ("log_messages", _EmptyList, _TListOf(_TString)),
+ ("fail", False, _TBool),
+ ]
+ REQ_BGL = False
+
+ # Must be lower than default timeout for WaitForJobChange to see whether it
+ # notices changed jobs
+ _CLIENT_CONNECT_TIMEOUT = 20.0
+ _CLIENT_CONFIRM_TIMEOUT = 60.0
+
+ @classmethod
+ def _NotifyUsingSocket(cls, cb, errcls):
+ """Opens a Unix socket and waits for another program to connect.
+
+ @type cb: callable
+ @param cb: Callback to send socket name to client
+ @type errcls: class
+ @param errcls: Exception class to use for errors
+
+ """
+ # Using a temporary directory as there's no easy way to create temporary
+ # sockets without writing a custom loop around tempfile.mktemp and
+ # socket.bind
+ tmpdir = tempfile.mkdtemp()
+ try:
+ tmpsock = utils.PathJoin(tmpdir, "sock")
+
+ logging.debug("Creating temporary socket at %s", tmpsock)
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ try:
+ sock.bind(tmpsock)
+ sock.listen(1)
+
+ # Send details to client
+ cb(tmpsock)
+
+ # Wait for client to connect before continuing
+ sock.settimeout(cls._CLIENT_CONNECT_TIMEOUT)
+ try:
+ (conn, _) = sock.accept()
+ except socket.error, err:
+ raise errcls("Client didn't connect in time (%s)" % err)
+ finally:
+ sock.close()
+ finally:
+ # Remove as soon as client is connected
+ shutil.rmtree(tmpdir)
+
+ # Wait for client to close
+ try:
+ try:
+ # pylint: disable-msg=E1101
+ # Instance of '_socketobject' has no ... member
+ conn.settimeout(cls._CLIENT_CONFIRM_TIMEOUT)
+ conn.recv(1)
+ except socket.error, err:
+ raise errcls("Client failed to confirm notification (%s)" % err)
+ finally:
+ conn.close()
+
+ def _SendNotification(self, test, arg, sockname):
+ """Sends a notification to the client.
+
+ @type test: string
+ @param test: Test name
+ @param arg: Test argument (depends on test)
+ @type sockname: string
+ @param sockname: Socket path
+
+ """
+ self.Log(constants.ELOG_JQUEUE_TEST, (sockname, test, arg))
+
+ def _Notify(self, prereq, test, arg):
+ """Notifies the client of a test.
+
+ @type prereq: bool
+ @param prereq: Whether this is a prereq-phase test
+ @type test: string
+ @param test: Test name
+ @param arg: Test argument (depends on test)
+
+ """
+ if prereq:
+ errcls = errors.OpPrereqError
+ else:
+ errcls = errors.OpExecError
+
+ return self._NotifyUsingSocket(compat.partial(self._SendNotification,
+ test, arg),
+ errcls)
+
+ def CheckArguments(self):
+ self.checkargs_calls = getattr(self, "checkargs_calls", 0) + 1
+ self.expandnames_calls = 0
+
+ def ExpandNames(self):
+ checkargs_calls = getattr(self, "checkargs_calls", 0)
+ if checkargs_calls < 1:
+ raise errors.ProgrammerError("CheckArguments was not called")
+
+ self.expandnames_calls += 1
+
+ if self.op.notify_waitlock:
+ self._Notify(True, constants.JQT_EXPANDNAMES, None)
+
+ self.LogInfo("Expanding names")
+
+ # Get lock on master node (just to get a lock, not for a particular reason)
+ self.needed_locks = {
+ locking.LEVEL_NODE: self.cfg.GetMasterNode(),
+ }
+
+ def Exec(self, feedback_fn):
+ if self.expandnames_calls < 1:
+ raise errors.ProgrammerError("ExpandNames was not called")
+
+ if self.op.notify_exec:
+ self._Notify(False, constants.JQT_EXEC, None)
+
+ self.LogInfo("Executing")
+
+ if self.op.log_messages:
+ self._Notify(False, constants.JQT_STARTMSG, len(self.op.log_messages))
+ for idx, msg in enumerate(self.op.log_messages):
+ self.LogInfo("Sending log message %s", idx + 1)
+ feedback_fn(constants.JQT_MSGPREFIX + msg)
+ # Report how many test messages have been sent
+ self._Notify(False, constants.JQT_LOGMSG, idx + 1)
+
+ if self.op.fail:
+ raise errors.OpExecError("Opcode failure was requested")
+
+ return True
+
+
class IAllocator(object):
"""IAllocator framework.