doc/news.rst
maintainer-clean-local:
- rm -rf doc/api doc/html
+ rm -rf doc/api doc/html doc/coverage
CLEANFILES = \
autotools/replace_vars.sed \
doc/admin.rst \
doc/design-2.0.rst \
doc/design-2.1.rst \
+ doc/design-2.2.rst \
+ doc/design-cluster-merger.rst \
doc/devnotes.rst \
doc/glossary.rst \
doc/hooks.rst \
tools/burnin \
tools/cfgshell \
tools/cfgupgrade \
+ tools/cluster-merge \
tools/lvmstrap
pkglib_SCRIPTS = \
autotools/check-python-code \
autotools/check-man \
autotools/docbook-wrapper \
+ autotools/gen-coverage \
+ autotools/testrunner \
$(RUN_IN_TEMPDIR) \
daemons/daemon-util.in \
daemons/ganeti-cleaner.in \
test/data/bdev-8.3-both.txt \
test/data/bdev-disk.txt \
test/data/bdev-net.txt \
+ test/data/cert1.pem \
test/data/proc_drbd8.txt \
test/data/proc_drbd80-emptyline.txt \
test/data/proc_drbd83.txt
- dist_TESTS = \
+ python_tests = \
test/ganeti.bdev_unittest.py \
test/ganeti.cli_unittest.py \
test/ganeti.cmdlib_unittest.py \
test/ganeti.confd_client_unittest.py \
test/ganeti.config_unittest.py \
test/ganeti.constants_unittest.py \
+ test/ganeti.errors_unittest.py \
test/ganeti.hooks_unittest.py \
test/ganeti.http_unittest.py \
test/ganeti.locking_unittest.py \
+ test/ganeti.luxi_unittest.py \
test/ganeti.mcpu_unittest.py \
test/ganeti.objects_unittest.py \
+ test/ganeti.opcodes_unittest.py \
test/ganeti.rapi.resources_unittest.py \
test/ganeti.serializer_unittest.py \
test/ganeti.ssh_unittest.py \
test/docs_unittest.py \
test/tempfile_fork_unittest.py
+ dist_TESTS = \
+ test/daemon-util_unittest.bash \
+ $(python_tests)
+
nodist_TESTS =
TESTS = $(dist_TESTS) $(nodist_TESTS)
+ # Environment for all tests
+ PLAIN_TESTS_ENVIRONMENT = \
+ PYTHONPATH=. TOP_SRCDIR=$(abs_top_srcdir) PYTHON=$(PYTHON) $(RUN_IN_TEMPDIR)
+
+ # Environment for tests run by automake
TESTS_ENVIRONMENT = \
- PYTHONPATH=. TOP_SRCDIR=$(abs_top_srcdir) \
- $(RUN_IN_TEMPDIR) $(PYTHON)
+ $(PLAIN_TESTS_ENVIRONMENT) $(abs_top_srcdir)/autotools/testrunner
all_python_code = \
$(dist_sbin_SCRIPTS) \
$(dist_tools_SCRIPTS) \
- $(dist_TESTS) \
+ $(python_tests) \
$(pkgpython_PYTHON) \
$(hypervisor_PYTHON) \
$(rapi_PYTHON) \
srclink_files = \
man/footer.sgml \
+ test/daemon-util_unittest.bash \
$(all_python_code)
check_python_code = \
$(dist_tools_SCRIPTS) \
$(BUILD_BASH_COMPLETION)
+ test/daemon-util_unittest.bash: daemons/daemon-util
+
devel/upload: devel/upload.in $(REPLACE_VARS_SED)
sed -f $(REPLACE_VARS_SED) < $< > $@
chmod u+x $@
echo "VERSION_FULL = '$(VERSION_FULL)'"; \
echo "LOCALSTATEDIR = '$(localstatedir)'"; \
echo "SYSCONFDIR = '$(sysconfdir)'"; \
+ echo "SSH_CONFIG_DIR = '$(SSH_CONFIG_DIR)'"; \
echo "EXPORT_DIR = '$(EXPORT_DIR)'"; \
echo "OS_SEARCH_PATH = [$(OS_SEARCH_PATH)]"; \
echo "XEN_BOOTLOADER = '$(XEN_BOOTLOADER)'"; \
echo "XEN_KERNEL = '$(XEN_KERNEL)'"; \
echo "XEN_INITRD = '$(XEN_INITRD)'"; \
echo "FILE_STORAGE_DIR = '$(FILE_STORAGE_DIR)'"; \
+ echo "ENABLE_FILE_STORAGE = $(ENABLE_FILE_STORAGE)"; \
echo "IALLOCATOR_SEARCH_PATH = [$(IALLOCATOR_SEARCH_PATH)]"; \
echo "KVM_PATH = '$(KVM_PATH)'"; \
echo "SOCAT_PATH = '$(SOCAT)'"; \
-path 'daemons/ganeti-*' -o -path 'tools/*' | \
etags -
+ .PHONY: coverage
+ coverage: $(BUILT_SOURCES) $(python_tests)
+ set -e; \
+ mkdir -p doc/coverage; \
+ COVERAGE_FILE=$(CURDIR)/doc/coverage/data \
+ TEXT_COVERAGE=$(CURDIR)/doc/coverage/report.txt \
+ HTML_COVERAGE=$(CURDIR)/doc/coverage \
+ $(PLAIN_TESTS_ENVIRONMENT) $(abs_top_srcdir)/autotools/gen-coverage \
+ $(python_tests)
+
# vim: set noet :
News
====
+Version 2.2.0
+-------------
+
+- RAPI now requires a Content-Type header for requests with a body (e.g.
+ ``PUT`` or ``POST``) which must be set to ``application/json`` (see
+ RFC2616 (HTTP/1.1), section 7.2.1)
+
+ Version 2.1.1
+ -------------
+
+ During the 2.1.0 long release candidate cycle, a lot of improvements and
+ changes have accumulated with were released later as 2.1.1.
+
+ Major changes
+ ~~~~~~~~~~~~~
+
+ The node evacuate command (``gnt-node evacuate``) was significantly
+ rewritten, and as such the IAllocator protocol was changed - a new
+ request type has been added. This unfortunate change during a stable
+ series is designed to improve performance of node evacuations; on
+ clusters with more than about five nodes and which are well-balanced,
+ evacuation should proceed in parallel for all instances of the node
+ being evacuated. As such, any existing IAllocator scripts need to be
+ updated, otherwise the above command will fail due to the unknown
+ request. The provided "dumb" allocator has not been updated; but the
+ ganeti-htools package supports the new protocol since version 0.2.4.
+
+ Another important change is increased validation of node and instance
+ names. This might create problems in special cases, if invalid host
+ names are being used.
+
+ Also, a new layer of hypervisor parameters has been added, that sits at
+ OS level between the cluster defaults and the instance ones. This allows
+ customisation of virtualization parameters depending on the installed
+ OS. For example instances with OS 'X' may have a different KVM kernel
+ (or any other parameter) than the cluster defaults. This is intended to
+ help managing a multiple OSes on the same cluster, without manual
+ modification of each instance's parameters.
+
+ A tool for merging clusters, ``cluster-merge``, has been added in the
+ tools sub-directory.
+
+ Bug fixes
+ ~~~~~~~~~
+
+ - Improved the int/float conversions that should make the code more
+ robust in face of errors from the node daemons
+ - Fixed the remove node code in case of internal configuration errors
+ - Fixed the node daemon behaviour in face of inconsistent queue
+ directory (e.g. read-only file-system where we can't open the files
+ read-write, etc.)
+ - Fixed the behaviour of gnt-node modify for master candidate demotion;
+ now it either aborts cleanly or, if given the new “auto_promote”
+ parameter, will automatically promote other nodes as needed
+ - Fixed compatibility with (unreleased yet) Python 2.6.5 that would
+ completely prevent Ganeti from working
+ - Fixed bug for instance export when not all disks were successfully
+ exported
+ - Fixed behaviour of node add when the new node is slow in starting up
+ the node daemon
+ - Fixed handling of signals in the LUXI client, which should improve
+ behaviour of command-line scripts
+ - Added checks for invalid node/instance names in the configuration (now
+ flagged during cluster verify)
+ - Fixed watcher behaviour for disk activation errors
+ - Fixed two potentially endless loops in http library, which led to the
+ RAPI daemon hanging and consuming 100% CPU in some cases
+ - Fixed bug in RAPI daemon related to hashed passwords
+ - Fixed bug for unintended qemu-level bridging of multi-NIC KVM
+ instances
+ - Enhanced compatibility with non-Debian OSes, but not using absolute
+ path in some commands and allowing customisation of the ssh
+ configuration directory
+ - Fixed possible future issue with new Python versions by abiding to the
+ proper use of ``__slots__`` attribute on classes
+ - Added checks that should prevent directory traversal attacks
+ - Many documentation fixes based on feedback from users
+
+ New features
+ ~~~~~~~~~~~~
+
+ - Added an “early_release” more for instance replace disks and node
+ evacuate, where we release locks earlier and thus allow higher
+ parallelism within the cluster
+ - Added watcher hooks, intended to allow the watcher to restart other
+ daemons (e.g. from the ganeti-nbma project), but they can be used of
+ course for any other purpose
+ - Added a compile-time disable for DRBD barriers, to increase
+ performance if the administrator trusts the power supply or the
+ storage system to not lose writes
+ - Added the option of using syslog for logging instead of, or in
+ addition to, Ganeti's own log files
+ - Removed boot restriction for paravirtual NICs for KVM, recent versions
+ can indeed boot from a paravirtual NIC
+ - Added a generic debug level for many operations; while this is not
+ used widely yet, it allows one to pass the debug value all the way to
+ the OS scripts
+ - Enhanced the hooks environment for instance moves (failovers,
+ migrations) where the primary/secondary nodes changed during the
+ operation, by adding {NEW,OLD}_{PRIMARY,SECONDARY} vars
+ - Enhanced data validations for many user-supplied values; one important
+ item is the restrictions imposed on instance and node names, which
+ might reject some (invalid) host names
+ - Add a configure-time option to disable file-based storage, if it's not
+ needed; this allows greater security separation between the master
+ node and the other nodes from the point of view of the inter-node RPC
+ protocol
+ - Added user notification in interactive tools if job is waiting in the
+ job queue or trying to acquire locks
+ - Added log messages when a job is waiting for locks
+ - Added filtering by node tags in instance operations which admit
+ multiple instances (start, stop, reboot, reinstall)
+ - Added a new tool for cluster mergers, ``cluster-merge``
+ - Parameters from command line which are of the form ``a=b,c=d`` can now
+ use backslash escapes to pass in values which contain commas,
+ e.g. ``a=b\\c,d=e`` where the 'a' parameter would get the value
+ ``b,c``
+ - For KVM, the instance name is the first parameter passed to KVM, so
+ that it's more visible in the process list
+
+
Version 2.1.0
-------------
- Improved burnin
+ Version 2.0.6
+ -------------
+
+ - Fix cleaner behaviour on nodes not in a cluster (Debian bug 568105)
+ - Fix a string formatting bug
+ - Improve safety of the code in some error paths
+ - Improve data validation in the master of values returned from nodes
+
+
Version 2.0.5
-------------
self.handler = None
self.handler_fn = None
self.handler_access = None
+ self.body_data = None
class JsonErrorRequestExecutor(http.server.HttpServerRequestExecutor):
"""Custom Request Executor class that formats HTTP errors in JSON.
"""
- error_content_type = "application/json"
+ error_content_type = http.HTTP_APP_JSON
def _FormatErrorMessage(self, values):
"""Formats the body of an error message.
if ctx.handler_access is None:
raise AssertionError("Permissions definition missing")
+ # This is only made available in HandleRequest
+ ctx.body_data = None
+
req.private = ctx
+ # Check for expected attributes
+ assert req.private.handler
+ assert req.private.handler_fn
+ assert req.private.handler_access is not None
+
return req.private
- def GetAuthRealm(self, req):
- """Override the auth realm for queries.
+ def AuthenticationRequired(self, req):
+ """Determine whether authentication is required.
"""
- ctx = self._GetRequestContext(req)
- if ctx.handler_access:
- return self.AUTH_REALM
- else:
- return None
+ return bool(self._GetRequestContext(req).handler_access)
def Authenticate(self, req, username, password):
"""Checks whether a user can access a resource.
"""
ctx = self._GetRequestContext(req)
+ # Deserialize request parameters
+ if req.request_body:
+ # RFC2616, 7.2.1: Any HTTP/1.1 message containing an entity-body SHOULD
+ # include a Content-Type header field defining the media type of that
+ # body. [...] If the media type remains unknown, the recipient SHOULD
+ # treat it as type "application/octet-stream".
+ req_content_type = req.request_headers.get(http.HTTP_CONTENT_TYPE,
+ http.HTTP_APP_OCTET_STREAM)
+ if req_content_type.lower() != http.HTTP_APP_JSON.lower():
+ raise http.HttpUnsupportedMediaType()
+
+ try:
+ ctx.body_data = serializer.LoadJson(req.request_body)
+ except Exception:
+ raise http.HttpBadRequest(message="Unable to parse JSON data")
+ else:
+ ctx.body_data = None
+
try:
result = ctx.handler_fn()
- sn = ctx.handler.getSerialNumber()
- if sn:
- req.response_headers[http.HTTP_ETAG] = str(sn)
except luxi.TimeoutError:
raise http.HttpGatewayTimeout()
except luxi.ProtocolError, err:
logging.exception("Error while handling the %s request", method)
raise
- return result
+ req.resp_headers[http.HTTP_CONTENT_TYPE] = http.HTTP_APP_JSON
+
+ return serializer.DumpJson(result)
def CheckRapi(options, args):
dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
dirs.append((constants.LOG_OS_DIR, 0750))
- daemon.GenericMain(constants.RAPI, parser, dirs, CheckRapi, ExecRapi)
+ daemon.GenericMain(constants.RAPI, parser, dirs, CheckRapi, ExecRapi,
+ default_ssl_cert=constants.RAPI_CERT_FILE,
+ default_ssl_key=constants.RAPI_CERT_FILE)
if __name__ == "__main__":
import os.path
import re
import logging
-import tempfile
import time
from ganeti import rpc
utils.AddAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
-def GenerateSelfSignedSslCert(file_name, validity=(365 * 5)):
- """Generates a self-signed SSL certificate.
-
- @type file_name: str
- @param file_name: Path to output file
- @type validity: int
- @param validity: Validity for certificate in days
-
- """
- (fd, tmp_file_name) = tempfile.mkstemp(dir=os.path.dirname(file_name))
- try:
- try:
- # Set permissions before writing key
- os.chmod(tmp_file_name, 0600)
-
- result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
- "-days", str(validity), "-nodes", "-x509",
- "-keyout", tmp_file_name, "-out", tmp_file_name,
- "-batch"])
- if result.failed:
- raise errors.OpExecError("Could not generate SSL certificate, command"
- " %s had exitcode %s and error message %s" %
- (result.cmd, result.exit_code, result.output))
-
- # Make read-only
- os.chmod(tmp_file_name, 0400)
-
- os.rename(tmp_file_name, file_name)
- finally:
- utils.RemoveFile(tmp_file_name)
- finally:
- os.close(fd)
-
-
def GenerateHmacKey(file_name):
"""Writes a new HMAC key.
@param file_name: Path to output file
"""
- utils.WriteFile(file_name, data="%s\n" % utils.GenerateSecret(), mode=0400)
+ utils.WriteFile(file_name, data="%s\n" % utils.GenerateSecret(), mode=0400,
+ backup=True)
+
+
+ def GenerateClusterCrypto(new_cluster_cert, new_rapi_cert, new_hmac_key,
+ rapi_cert_pem=None):
+ """Updates the cluster certificates, keys and secrets.
+
+ @type new_cluster_cert: bool
+ @param new_cluster_cert: Whether to generate a new cluster certificate
+ @type new_rapi_cert: bool
+ @param new_rapi_cert: Whether to generate a new RAPI certificate
+ @type new_hmac_key: bool
+ @param new_hmac_key: Whether to generate a new HMAC key
+ @type rapi_cert_pem: string
+ @param rapi_cert_pem: New RAPI certificate in PEM format
+
+ """
+ # SSL certificate
+ cluster_cert_exists = os.path.exists(constants.SSL_CERT_FILE)
+ if new_cluster_cert or not cluster_cert_exists:
+ if cluster_cert_exists:
+ utils.CreateBackup(constants.SSL_CERT_FILE)
+
+ logging.debug("Generating new cluster certificate at %s",
+ constants.SSL_CERT_FILE)
- GenerateSelfSignedSslCert(constants.SSL_CERT_FILE)
++ utils.GenerateSelfSignedSslCert(constants.SSL_CERT_FILE)
+
+ # HMAC key
+ if new_hmac_key or not os.path.exists(constants.HMAC_CLUSTER_KEY):
+ logging.debug("Writing new HMAC key to %s", constants.HMAC_CLUSTER_KEY)
+ GenerateHmacKey(constants.HMAC_CLUSTER_KEY)
+
+ # RAPI
+ rapi_cert_exists = os.path.exists(constants.RAPI_CERT_FILE)
+
+ if rapi_cert_pem:
+ # Assume rapi_pem contains a valid PEM-formatted certificate and key
+ logging.debug("Writing RAPI certificate at %s",
+ constants.RAPI_CERT_FILE)
+ utils.WriteFile(constants.RAPI_CERT_FILE, data=rapi_cert_pem, backup=True)
+
+ elif new_rapi_cert or not rapi_cert_exists:
+ if rapi_cert_exists:
+ utils.CreateBackup(constants.RAPI_CERT_FILE)
+
+ logging.debug("Generating new RAPI certificate at %s",
+ constants.RAPI_CERT_FILE)
- GenerateSelfSignedSslCert(constants.RAPI_CERT_FILE)
++ utils.GenerateSelfSignedSslCert(constants.RAPI_CERT_FILE)
def _InitGanetiServerSetup(master_name):
the cluster and also generates the SSL certificate.
"""
- utils.GenerateSelfSignedSslCert(constants.SSL_CERT_FILE)
-
- # Don't overwrite existing file
- if not os.path.exists(constants.RAPI_CERT_FILE):
- utils.GenerateSelfSignedSslCert(constants.RAPI_CERT_FILE)
-
- if not os.path.exists(constants.HMAC_CLUSTER_KEY):
- GenerateHmacKey(constants.HMAC_CLUSTER_KEY)
+ # Generate cluster secrets
+ GenerateClusterCrypto(True, False, False)
result = utils.RunCmd([constants.DAEMON_UTIL, "start", constants.NODED])
if result.failed:
" had exitcode %s and error %s" %
(result.cmd, result.exit_code, result.output))
- # Wait for node daemon to become responsive
+ _WaitForNodeDaemon(master_name)
+
+
+ def _WaitForNodeDaemon(node_name):
+ """Wait for node daemon to become responsive.
+
+ """
def _CheckNodeDaemon():
- result = rpc.RpcRunner.call_version([master_name])[master_name]
+ result = rpc.RpcRunner.call_version([node_name])[node_name]
if result.fail_msg:
raise utils.RetryAgain()
try:
utils.Retry(_CheckNodeDaemon, 1.0, 10.0)
except utils.RetryTimeout:
- raise errors.OpExecError("Node daemon didn't answer queries within"
- " 10 seconds")
+ raise errors.OpExecError("Node daemon on %s didn't answer queries within"
+ " 10 seconds" % node_name)
+
def InitCluster(cluster_name, mac_prefix,
master_netdev, file_storage_dir, candidate_pool_size,
" belong to this host. Aborting." %
hostname.ip, errors.ECODE_ENVIRON)
- clustername = utils.GetHostInfo(cluster_name)
+ clustername = utils.GetHostInfo(utils.HostInfo.NormalizeName(cluster_name))
if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
timeout=5):
" output: %s" %
(node, result.fail_reason, result.output))
+ _WaitForNodeDaemon(node)
+
def MasterFailover(no_voting=False):
"""Failover the master node.
from ganeti import luxi
from ganeti import ssconf
from ganeti import rpc
+ from ganeti import ssh
from optparse import (OptionParser, TitledHelpFormatter,
Option, OptionValueError)
# Command line options
"ALLOCATABLE_OPT",
"ALL_OPT",
+ "AUTO_PROMOTE_OPT",
"AUTO_REPLACE_OPT",
"BACKEND_OPT",
"CLEANUP_OPT",
"IALLOCATOR_OPT",
"IGNORE_CONSIST_OPT",
"IGNORE_FAILURES_OPT",
+ "IGNORE_REMOVE_FAILURES_OPT",
"IGNORE_SECONDARIES_OPT",
"IGNORE_SIZE_OPT",
"MAC_PREFIX_OPT",
"MASTER_NETDEV_OPT",
"MC_OPT",
"NET_OPT",
+ "NEW_CLUSTER_CERT_OPT",
+ "NEW_HMAC_KEY_OPT",
+ "NEW_RAPI_CERT_OPT",
"NEW_SECONDARY_OPT",
"NIC_PARAMS_OPT",
"NODE_LIST_OPT",
"OFFLINE_OPT",
"OS_OPT",
"OS_SIZE_OPT",
+ "RAPI_CERT_OPT",
"READD_OPT",
"REBOOT_TYPE_OPT",
+ "REMOVE_INSTANCE_OPT",
"SECONDARY_IP_OPT",
"SELECT_OS_OPT",
"SEP_OPT",
"JobExecutor",
"JobSubmittedException",
"ParseTimespec",
+ "RunWhileClusterStopped",
"SubmitOpCode",
"SubmitOrSend",
"UsesRPC",
"ARGS_NONE",
"ARGS_ONE_INSTANCE",
"ARGS_ONE_NODE",
+ "ARGS_ONE_OS",
"ArgChoice",
"ArgCommand",
"ArgFile",
"ArgInstance",
"ArgJobId",
"ArgNode",
+ "ArgOs",
"ArgSuggest",
"ArgUnknown",
"OPT_COMPL_INST_ADD_NODES",
"""
+ class ArgOs(_Argument):
+ """OS argument.
+
+ """
+
+
ARGS_NONE = []
ARGS_MANY_INSTANCES = [ArgInstance()]
ARGS_MANY_NODES = [ArgNode()]
ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
+ ARGS_ONE_OS = [ArgOs(min=1, max=1)]
def _ExtractTagsObject(opts, args):
" configuration even if there are failures"
" during the removal process")
+IGNORE_REMOVE_FAILURES_OPT = cli_option("--ignore-remove-failures",
+ dest="ignore_remove_failures",
+ action="store_true", default=False,
+ help="Remove the instance from the"
+ " cluster configuration even if there"
+ " are failures during the removal"
+ " process")
+
+REMOVE_INSTANCE_OPT = cli_option("--remove-instance", dest="remove_instance",
+ action="store_true", default=False,
+ help="Remove the instance from the cluster")
+
NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
help="Specifies the new secondary node",
metavar="NODE", default=None,
help="Replace the disk(s) on the secondary"
" node (only for the drbd template)")
+ AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
+ default=False, action="store_true",
+ help="Lock all nodes and auto-promote as needed"
+ " to MC status")
+
AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
default=False, action="store_true",
help="Automatically replace faulty disks"
help="Release the locks on the secondary"
" node(s) early")
+ NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
+ dest="new_cluster_cert",
+ default=False, action="store_true",
+ help="Generate a new cluster certificate")
+
+ RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
+ default=None,
+ help="File containing new RAPI certificate")
+
+ NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
+ default=None, action="store_true",
+ help=("Generate a new self-signed RAPI"
+ " certificate"))
+
+ NEW_HMAC_KEY_OPT = cli_option("--new-hmac-key", dest="new_hmac_key",
+ default=False, action="store_true",
+ help="Create a new HMAC key")
+
def _ParseArgs(argv, commands, aliases):
"""Parser for the command line arguments.
prev_job_info = None
prev_logmsg_serial = None
+ status = None
+
+ notified_queued = False
+ notified_waitlock = False
+
while True:
- result = cl.WaitForJobChange(job_id, ["status"], prev_job_info,
- prev_logmsg_serial)
+ result = cl.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
+ prev_logmsg_serial)
if not result:
# job not found, go away!
raise errors.JobLost("Job with id %s lost" % job_id)
+ elif result == constants.JOB_NOTCHANGED:
+ if status is not None and not callable(feedback_fn):
+ if status == constants.JOB_STATUS_QUEUED and not notified_queued:
+ ToStderr("Job %s is waiting in queue", job_id)
+ notified_queued = True
+ elif status == constants.JOB_STATUS_WAITLOCK and not notified_waitlock:
+ ToStderr("Job %s is trying to acquire all necessary locks", job_id)
+ notified_waitlock = True
+
+ # Wait again
+ continue
# Split result, a tuple of (field values, log entries)
(job_info, log_entries) = result
obuf.write("Parameter Error: %s" % msg)
elif isinstance(err, errors.ParameterError):
obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
- elif isinstance(err, errors.GenericError):
- obuf.write("Unhandled Ganeti error: %s" % msg)
elif isinstance(err, luxi.NoMasterError):
obuf.write("Cannot communicate with the master daemon.\nIs it running"
" and listening for connections?")
elif isinstance(err, luxi.ProtocolError):
obuf.write("Unhandled protocol error while talking to the master daemon:\n"
"%s" % msg)
+ elif isinstance(err, errors.GenericError):
+ obuf.write("Unhandled Ganeti error: %s" % msg)
elif isinstance(err, JobSubmittedException):
obuf.write("JobID: %s\n" % err.args[0])
retcode = 0
return 0
+ class _RunWhileClusterStoppedHelper:
+ """Helper class for L{RunWhileClusterStopped} to simplify state management
+
+ """
+ def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
+ """Initializes this class.
+
+ @type feedback_fn: callable
+ @param feedback_fn: Feedback function
+ @type cluster_name: string
+ @param cluster_name: Cluster name
+ @type master_node: string
+ @param master_node Master node name
+ @type online_nodes: list
+ @param online_nodes: List of names of online nodes
+
+ """
+ self.feedback_fn = feedback_fn
+ self.cluster_name = cluster_name
+ self.master_node = master_node
+ self.online_nodes = online_nodes
+
+ self.ssh = ssh.SshRunner(self.cluster_name)
+
+ self.nonmaster_nodes = [name for name in online_nodes
+ if name != master_node]
+
+ assert self.master_node not in self.nonmaster_nodes
+
+ def _RunCmd(self, node_name, cmd):
+ """Runs a command on the local or a remote machine.
+
+ @type node_name: string
+ @param node_name: Machine name
+ @type cmd: list
+ @param cmd: Command
+
+ """
+ if node_name is None or node_name == self.master_node:
+ # No need to use SSH
+ result = utils.RunCmd(cmd)
+ else:
+ result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
+
+ if result.failed:
+ errmsg = ["Failed to run command %s" % result.cmd]
+ if node_name:
+ errmsg.append("on node %s" % node_name)
+ errmsg.append(": exitcode %s and error %s" %
+ (result.exit_code, result.output))
+ raise errors.OpExecError(" ".join(errmsg))
+
+ def Call(self, fn, *args):
+ """Call function while all daemons are stopped.
+
+ @type fn: callable
+ @param fn: Function to be called
+
+ """
+ # Pause watcher by acquiring an exclusive lock on watcher state file
+ self.feedback_fn("Blocking watcher")
+ watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
+ try:
+ # TODO: Currently, this just blocks. There's no timeout.
+ # TODO: Should it be a shared lock?
+ watcher_block.Exclusive(blocking=True)
+
+ # Stop master daemons, so that no new jobs can come in and all running
+ # ones are finished
+ self.feedback_fn("Stopping master daemons")
+ self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
+ try:
+ # Stop daemons on all nodes
+ for node_name in self.online_nodes:
+ self.feedback_fn("Stopping daemons on %s" % node_name)
+ self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
+
+ # All daemons are shut down now
+ try:
+ return fn(self, *args)
+ except Exception:
+ logging.exception("Caught exception")
+ raise
+ finally:
+ # Start cluster again, master node last
+ for node_name in self.nonmaster_nodes + [self.master_node]:
+ self.feedback_fn("Starting daemons on %s" % node_name)
+ self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
+ finally:
+ # Resume watcher
+ watcher_block.Close()
+
+
+ def RunWhileClusterStopped(feedback_fn, fn, *args):
+ """Calls a function while all cluster daemons are stopped.
+
+ @type feedback_fn: callable
+ @param feedback_fn: Feedback function
+ @type fn: callable
+ @param fn: Function to be called when daemons are stopped
+
+ """
+ feedback_fn("Gathering cluster information")
+
+ # This ensures we're running on the master daemon
+ cl = GetClient()
+
+ (cluster_name, master_node) = \
+ cl.QueryConfigValues(["cluster_name", "master_node"])
+
+ online_nodes = GetOnlineNodes([], cl=cl)
+
+ # Don't keep a reference to the client. The master daemon will go away.
+ del cl
+
+ assert master_node in online_nodes
+
+ return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
+ online_nodes).Call(fn, *args)
+
+
def GenerateTable(headers, fields, separator, data,
numfields=None, unitfields=None,
units=None):
GetResults() calls.
"""
- def __init__(self, cl=None, verbose=True, opts=None):
+ def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
self.queue = []
if cl is None:
cl = GetClient()
self.verbose = verbose
self.jobs = []
self.opts = opts
+ self.feedback_fn = feedback_fn
def QueueJob(self, name, *ops):
"""Record a job for later submit.
"""
results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
- for ((status, data), (name, _)) in zip(results, self.queue):
- self.jobs.append((status, data, name))
+ for (idx, ((status, data), (name, _))) in enumerate(zip(results,
+ self.queue)):
+ self.jobs.append((idx, status, data, name))
+
+ def _ChooseJob(self):
+ """Choose a non-waiting/queued job to poll next.
+
+ """
+ assert self.jobs, "_ChooseJob called with empty job list"
+
+ result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
+ assert result
+
+ for job_data, status in zip(self.jobs, result):
+ if status[0] in (constants.JOB_STATUS_QUEUED,
+ constants.JOB_STATUS_WAITLOCK,
+ constants.JOB_STATUS_CANCELING):
+ # job is still waiting
+ continue
+ # good candidate found
+ self.jobs.remove(job_data)
+ return job_data
+
+ # no job found
+ return self.jobs.pop(0)
def GetResults(self):
"""Wait for and return the results of all jobs.
self.SubmitPending()
results = []
if self.verbose:
- ok_jobs = [row[1] for row in self.jobs if row[0]]
+ ok_jobs = [row[2] for row in self.jobs if row[1]]
if ok_jobs:
ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
- for submit_status, jid, name in self.jobs:
- if not submit_status:
- ToStderr("Failed to submit job for %s: %s", name, jid)
- results.append((False, jid))
- continue
- if self.verbose:
- ToStdout("Waiting for job %s for %s...", jid, name)
+
+ # first, remove any non-submitted jobs
+ self.jobs, failures = utils.partition(self.jobs, lambda x: x[1])
+ for idx, _, jid, name in failures:
+ ToStderr("Failed to submit job for %s: %s", name, jid)
+ results.append((idx, False, jid))
+
+ while self.jobs:
+ (idx, _, jid, name) = self._ChooseJob()
+ ToStdout("Waiting for job %s for %s...", jid, name)
try:
- job_result = PollJob(jid, cl=self.cl)
+ job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
success = True
except (errors.GenericError, luxi.ProtocolError), err:
_, job_result = FormatError(err)
# the error message will always be shown, verbose or not
ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
- results.append((success, job_result))
+ results.append((idx, success, job_result))
+
+ # sort based on the index, then drop it
+ results.sort()
+ results = [i[1:] for i in results]
+
return results
def WaitOrShow(self, wait):
import platform
import logging
import copy
+ import OpenSSL
from ganeti import ssh
from ganeti import utils
return faulty
+ def _FormatTimestamp(secs):
+ """Formats a Unix timestamp with the local timezone.
+
+ """
+ return time.strftime("%F %T %Z", time.gmtime(secs))
+
+
class LUPostInitCluster(LogicalUnit):
"""Logical unit for running hooks after cluster initialization.
return master
+ def _VerifyCertificateInner(filename, expired, not_before, not_after, now,
+ warn_days=constants.SSL_CERT_EXPIRATION_WARN,
+ error_days=constants.SSL_CERT_EXPIRATION_ERROR):
+ """Verifies certificate details for LUVerifyCluster.
+
+ """
+ if expired:
+ msg = "Certificate %s is expired" % filename
+
+ if not_before is not None and not_after is not None:
+ msg += (" (valid from %s to %s)" %
+ (_FormatTimestamp(not_before),
+ _FormatTimestamp(not_after)))
+ elif not_before is not None:
+ msg += " (valid from %s)" % _FormatTimestamp(not_before)
+ elif not_after is not None:
+ msg += " (valid until %s)" % _FormatTimestamp(not_after)
+
+ return (LUVerifyCluster.ETYPE_ERROR, msg)
+
+ elif not_before is not None and not_before > now:
+ return (LUVerifyCluster.ETYPE_WARNING,
+ "Certificate %s not yet valid (valid from %s)" %
+ (filename, _FormatTimestamp(not_before)))
+
+ elif not_after is not None:
+ remaining_days = int((not_after - now) / (24 * 3600))
+
+ msg = ("Certificate %s expires in %d days" % (filename, remaining_days))
+
+ if remaining_days <= error_days:
+ return (LUVerifyCluster.ETYPE_ERROR, msg)
+
+ if remaining_days <= warn_days:
+ return (LUVerifyCluster.ETYPE_WARNING, msg)
+
+ return (None, None)
+
+
+ def _VerifyCertificate(filename):
+ """Verifies a certificate for LUVerifyCluster.
+
+ @type filename: string
+ @param filename: Path to PEM file
+
+ """
+ try:
+ cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
+ utils.ReadFile(filename))
+ except Exception, err: # pylint: disable-msg=W0703
+ return (LUVerifyCluster.ETYPE_ERROR,
+ "Failed to load X509 certificate %s: %s" % (filename, err))
+
+ # Depending on the pyOpenSSL version, this can just return (None, None)
+ (not_before, not_after) = utils.GetX509CertValidity(cert)
+
+ return _VerifyCertificateInner(filename, cert.has_expired(),
+ not_before, not_after, time.time())
+
+
class LUVerifyCluster(LogicalUnit):
"""Verifies the cluster status.
TINSTANCE = "instance"
ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
+ ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
for msg in self.cfg.VerifyConfig():
_ErrorIf(True, self.ECLUSTERCFG, None, msg)
+ # Check the cluster certificates
+ for cert_filename in constants.ALL_CERT_FILES:
+ (errcode, msg) = _VerifyCertificate(cert_filename)
+ _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
+
vg_name = self.cfg.GetVGName()
hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
nodelist = utils.NiceSort(self.cfg.GetNodeList())
master_files = [constants.CLUSTER_CONF_FILE]
file_names = ssconf.SimpleStore().GetFileList()
- file_names.append(constants.SSL_CERT_FILE)
- file_names.append(constants.RAPI_CERT_FILE)
+ file_names.extend(constants.ALL_CERT_FILES)
file_names.extend(master_files)
local_checksums = utils.FingerprintFiles(file_names)
idata = nresult.get(constants.NV_INSTANCELIST, None)
test = not isinstance(idata, list)
_ErrorIf(test, self.ENODEHV, node,
- "rpc call to node failed (instancelist)")
+ "rpc call to node failed (instancelist): %s",
+ utils.SafeEncode(str(idata)))
if test:
continue
try:
ntime_merged = utils.MergeTime(ntime)
except (ValueError, TypeError):
- _ErrorIf(test, self.ENODETIME, node, "Node returned invalid time")
+ _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
- ntime_diff = abs(nvinfo_starttime - ntime_merged)
+ ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
- ntime_diff = abs(ntime_merged - nvinfo_endtime)
+ ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
else:
ntime_diff = None
_ErrorIf(ntime_diff is not None, self.ENODETIME, node,
- "Node time diverges by at least %0.1fs from master node time",
+ "Node time diverges by at least %s from master node time",
ntime_diff)
if ntime_diff is not None:
_ErrorIf(snode not in node_info and snode not in n_offline,
self.ENODERPC, snode,
"instance %s, connection to secondary node"
- "failed", instance)
+ " failed", instance)
if snode in node_info:
node_info[snode]['sinst'].append(instance)
if test:
output = indent_re.sub(' ', output)
feedback_fn("%s" % output)
- lu_result = 1
+ lu_result = 0
return lu_result
else:
self.new_hvparams[hv_name].update(hv_dict)
+ # os hypervisor parameters
+ self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
+ if self.op.os_hvp:
+ if not isinstance(self.op.os_hvp, dict):
+ raise errors.OpPrereqError("Invalid 'os_hvp' parameter on input",
+ errors.ECODE_INVAL)
+ for os_name, hvs in self.op.os_hvp.items():
+ if not isinstance(hvs, dict):
+ raise errors.OpPrereqError(("Invalid 'os_hvp' parameter on"
+ " input"), errors.ECODE_INVAL)
+ if os_name not in self.new_os_hvp:
+ self.new_os_hvp[os_name] = hvs
+ else:
+ for hv_name, hv_dict in hvs.items():
+ if hv_name not in self.new_os_hvp[os_name]:
+ self.new_os_hvp[os_name][hv_name] = hv_dict
+ else:
+ self.new_os_hvp[os_name][hv_name].update(hv_dict)
+
if self.op.enabled_hypervisors is not None:
self.hv_list = self.op.enabled_hypervisors
if not self.hv_list:
hv_class.CheckParameterSyntax(hv_params)
_CheckHVParams(self, node_list, hv_name, hv_params)
+ if self.op.os_hvp:
+ # no need to check any newly-enabled hypervisors, since the
+ # defaults have already been checked in the above code-block
+ for os_name, os_hvp in self.new_os_hvp.items():
+ for hv_name, hv_params in os_hvp.items():
+ utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
+ # we need to fill in the new os_hvp on top of the actual hv_p
+ cluster_defaults = self.new_hvparams.get(hv_name, {})
+ new_osp = objects.FillDict(cluster_defaults, hv_params)
+ hv_class = hypervisor.GetHypervisor(hv_name)
+ hv_class.CheckParameterSyntax(new_osp)
+ _CheckHVParams(self, node_list, hv_name, new_osp)
+
+
def Exec(self, feedback_fn):
"""Change the parameters of the cluster.
" state, not changing")
if self.op.hvparams:
self.cluster.hvparams = self.new_hvparams
+ if self.op.os_hvp:
+ self.cluster.os_hvp = self.new_os_hvp
if self.op.enabled_hypervisors is not None:
self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
if self.op.beparams:
"""
# 1. Gather target nodes
myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
- dist_nodes = lu.cfg.GetNodeList()
+ dist_nodes = lu.cfg.GetOnlineNodeList()
if additional_nodes is not None:
dist_nodes.extend(additional_nodes)
if myself.name in dist_nodes:
HTYPE = constants.HTYPE_NODE
_OP_REQP = ["node_name"]
+ def CheckArguments(self):
+ # validate/normalize the node name
+ self.op.node_name = utils.HostInfo.NormalizeName(self.op.node_name)
+
def BuildHooksEnv(self):
"""Build hooks env.
_CheckBooleanOpField(self.op, 'master_candidate')
_CheckBooleanOpField(self.op, 'offline')
_CheckBooleanOpField(self.op, 'drained')
+ _CheckBooleanOpField(self.op, 'auto_promote')
all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
if all_mods.count(None) == 3:
raise errors.OpPrereqError("Please pass at least one modification",
" state at the same time",
errors.ECODE_INVAL)
+ # Boolean value that tells us whether we're offlining or draining the node
+ self.offline_or_drain = (self.op.offline == True or
+ self.op.drained == True)
+ self.deoffline_or_drain = (self.op.offline == False or
+ self.op.drained == False)
+ self.might_demote = (self.op.master_candidate == False or
+ self.offline_or_drain)
+
+ self.lock_all = self.op.auto_promote and self.might_demote
+
+
def ExpandNames(self):
- self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
+ if self.lock_all:
+ self.needed_locks = {locking.LEVEL_NODE: locking.ALL_SET}
+ else:
+ self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
def BuildHooksEnv(self):
"""Build hooks env.
" only via masterfailover",
errors.ECODE_INVAL)
- # Boolean value that tells us whether we're offlining or draining the node
- offline_or_drain = self.op.offline == True or self.op.drained == True
- deoffline_or_drain = self.op.offline == False or self.op.drained == False
-
- if (node.master_candidate and
- (self.op.master_candidate == False or offline_or_drain)):
- cp_size = self.cfg.GetClusterInfo().candidate_pool_size
- mc_now, mc_should, mc_max = self.cfg.GetMasterCandidateStats()
- if mc_now <= cp_size:
- msg = ("Not enough master candidates (desired"
- " %d, new value will be %d)" % (cp_size, mc_now-1))
- # Only allow forcing the operation if it's an offline/drain operation,
- # and we could not possibly promote more nodes.
- # FIXME: this can still lead to issues if in any way another node which
- # could be promoted appears in the meantime.
- if self.op.force and offline_or_drain and mc_should == mc_max:
- self.LogWarning(msg)
- else:
- raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
+
+ if node.master_candidate and self.might_demote and not self.lock_all:
+ assert not self.op.auto_promote, "auto-promote set but lock_all not"
+ # check if after removing the current node, we're missing master
+ # candidates
+ (mc_remaining, mc_should, _) = \
+ self.cfg.GetMasterCandidateStats(exceptions=[node.name])
+ if mc_remaining != mc_should:
+ raise errors.OpPrereqError("Not enough master candidates, please"
+ " pass auto_promote to allow promotion",
+ errors.ECODE_INVAL)
if (self.op.master_candidate == True and
((node.offline and not self.op.offline == False) or
errors.ECODE_INVAL)
# If we're being deofflined/drained, we'll MC ourself if needed
- if (deoffline_or_drain and not offline_or_drain and not
+ if (self.deoffline_or_drain and not self.offline_or_drain and not
self.op.master_candidate == True and not node.master_candidate):
self.op.master_candidate = _DecideSelfPromotion(self)
if self.op.master_candidate:
node.offline = False
result.append(("offline", "clear offline status due to drain"))
+ # we locked all nodes, we adjust the CP before updating this node
+ if self.lock_all:
+ _AdjustCandidatePool(self, [node.name])
+
# this will trigger configuration file update, if needed
self.cfg.Update(node, feedback_fn)
+
# this will trigger job queue propagation or cleanup
if changed_mc:
self.context.ReaddNode(node)
"""
cluster = self.cfg.GetClusterInfo()
+ os_hvp = {}
+
+ # Filter just for enabled hypervisors
+ for os_name, hv_dict in cluster.os_hvp.items():
+ os_hvp[os_name] = {}
+ for hv_name, hv_params in hv_dict.items():
+ if hv_name in cluster.enabled_hypervisors:
+ os_hvp[os_name][hv_name] = hv_params
+
result = {
"software_version": constants.RELEASE_VERSION,
"protocol_version": constants.PROTOCOL_VERSION,
"enabled_hypervisors": cluster.enabled_hypervisors,
"hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
for hypervisor_name in cluster.enabled_hypervisors]),
+ "os_hvp": os_hvp,
"beparams": cluster.beparams,
"nicparams": cluster.nicparams,
"candidate_pool_size": cluster.candidate_pool_size,
elif field == "drain_flag":
entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
elif field == "watcher_pause":
- return utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
+ entry = utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
else:
raise errors.ParameterError(field)
values.append(entry)
" node %s: %s" %
(instance.name, instance.primary_node, msg))
- logging.info("Removing block devices for instance %s", instance.name)
+ _RemoveInstance(self, feedback_fn, instance, self.op.ignore_failures)
- if not _RemoveDisks(self, instance):
- if self.op.ignore_failures:
- feedback_fn("Warning: can't remove instance's disks")
- else:
- raise errors.OpExecError("Can't remove instance's disks")
- logging.info("Removing instance %s out of cluster config", instance.name)
+def _RemoveInstance(lu, feedback_fn, instance, ignore_failures):
+ """Utility function to remove an instance.
+
+ """
+ logging.info("Removing block devices for instance %s", instance.name)
+
+ if not _RemoveDisks(lu, instance):
+ if not ignore_failures:
+ raise errors.OpExecError("Can't remove instance's disks")
+ feedback_fn("Warning: can't remove instance's disks")
+
+ logging.info("Removing instance %s out of cluster config", instance.name)
+
+ lu.cfg.RemoveInstance(instance.name)
+
+ assert not lu.remove_locks.get(locking.LEVEL_INSTANCE), \
+ "Instance lock removal conflict"
- self.cfg.RemoveInstance(instance.name)
- self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
+ # Remove lock for the instance
+ lu.remove_locks[locking.LEVEL_INSTANCE] = instance.name
class LUQueryInstances(NoHooksLU):
# for tools
if not hasattr(self.op, "name_check"):
self.op.name_check = True
+ # validate/normalize the instance name
+ self.op.instance_name = utils.HostInfo.NormalizeName(self.op.instance_name)
if self.op.ip_check and not self.op.name_check:
# TODO: make the ip check more flexible and not depend on the name check
raise errors.OpPrereqError("Cannot do ip checks without a name check",
errors.ECODE_INVAL)
+ if (self.op.disk_template == constants.DT_FILE and
+ not constants.ENABLE_FILE_STORAGE):
+ raise errors.OpPrereqError("File storage disabled at configure time",
+ errors.ECODE_INVAL)
def ExpandNames(self):
"""ExpandNames for CreateInstance.
self.needed_locks[locking.LEVEL_NODE].append(src_node)
if not os.path.isabs(src_path):
self.op.src_path = src_path = \
- os.path.join(constants.EXPORT_DIR, src_path)
+ utils.PathJoin(constants.EXPORT_DIR, src_path)
# On import force_variant must be True, because if we forced it at
# initial install, our only chance when importing it back is that it
" iallocator '%s': %s" %
(self.op.iallocator, ial.info),
errors.ECODE_NORES)
- if len(ial.nodes) != ial.required_nodes:
+ if len(ial.result) != ial.required_nodes:
raise errors.OpPrereqError("iallocator '%s' returned invalid number"
" of nodes (%s), required %s" %
- (self.op.iallocator, len(ial.nodes),
+ (self.op.iallocator, len(ial.result),
ial.required_nodes), errors.ECODE_FAULT)
- self.op.pnode = ial.nodes[0]
+ self.op.pnode = ial.result[0]
self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
self.op.instance_name, self.op.iallocator,
- utils.CommaJoin(ial.nodes))
+ utils.CommaJoin(ial.result))
if ial.required_nodes == 2:
- self.op.snode = ial.nodes[1]
+ self.op.snode = ial.result[1]
def BuildHooksEnv(self):
"""Build hooks env.
self.secondaries)
return env, nl, nl
-
def CheckPrereq(self):
"""Check prerequisites.
if src_path in exp_list[node].payload:
found = True
self.op.src_node = src_node = node
- self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
- src_path)
+ self.op.src_path = src_path = utils.PathJoin(constants.EXPORT_DIR,
+ src_path)
break
if not found:
raise errors.OpPrereqError("No export found for relative path %s" %
if export_info.has_option(constants.INISECT_INS, option):
# FIXME: are the old os-es, disk sizes, etc. useful?
export_name = export_info.get(constants.INISECT_INS, option)
- image = os.path.join(src_path, export_name)
+ image = utils.PathJoin(src_path, export_name)
disk_images.append(image)
else:
disk_images.append(False)
else:
network_port = None
- ##if self.op.vnc_bind_address is None:
- ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
-
# this is needed because os.path.join does not accept None arguments
if self.op.file_storage_dir is None:
string_file_storage_dir = ""
string_file_storage_dir = self.op.file_storage_dir
# build the full file storage dir path
- file_storage_dir = os.path.normpath(os.path.join(
- self.cfg.GetFileStorageDir(),
- string_file_storage_dir, instance))
+ file_storage_dir = utils.PathJoin(self.cfg.GetFileStorageDir(),
+ string_file_storage_dir, instance)
-
disks = _GenerateDiskTemplate(self,
self.op.disk_template,
instance, pnode_name,
" %s" % (iallocator_name, ial.info),
errors.ECODE_NORES)
- if len(ial.nodes) != ial.required_nodes:
+ if len(ial.result) != ial.required_nodes:
raise errors.OpPrereqError("iallocator '%s' returned invalid number"
" of nodes (%s), required %s" %
(iallocator_name,
- len(ial.nodes), ial.required_nodes),
+ len(ial.result), ial.required_nodes),
errors.ECODE_FAULT)
- remote_node_name = ial.nodes[0]
+ remote_node_name = ial.result[0]
lu.LogInfo("Selected new secondary for instance '%s': %s",
instance_name, remote_node_name)
(self.op.name, self.op.node_name))
+ class LUNodeEvacuationStrategy(NoHooksLU):
+ """Computes the node evacuation strategy.
+
+ """
+ _OP_REQP = ["nodes"]
+ REQ_BGL = False
+
+ def CheckArguments(self):
+ if not hasattr(self.op, "remote_node"):
+ self.op.remote_node = None
+ if not hasattr(self.op, "iallocator"):
+ self.op.iallocator = None
+ if self.op.remote_node is not None and self.op.iallocator is not None:
+ raise errors.OpPrereqError("Give either the iallocator or the new"
+ " secondary, not both", errors.ECODE_INVAL)
+
+ def ExpandNames(self):
+ self.op.nodes = _GetWantedNodes(self, self.op.nodes)
+ self.needed_locks = locks = {}
+ if self.op.remote_node is None:
+ locks[locking.LEVEL_NODE] = locking.ALL_SET
+ else:
+ self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
+ locks[locking.LEVEL_NODE] = self.op.nodes + [self.op.remote_node]
+
+ def CheckPrereq(self):
+ pass
+
+ def Exec(self, feedback_fn):
+ if self.op.remote_node is not None:
+ instances = []
+ for node in self.op.nodes:
+ instances.extend(_GetNodeSecondaryInstances(self.cfg, node))
+ result = []
+ for i in instances:
+ if i.primary_node == self.op.remote_node:
+ raise errors.OpPrereqError("Node %s is the primary node of"
+ " instance %s, cannot use it as"
+ " secondary" %
+ (self.op.remote_node, i.name),
+ errors.ECODE_INVAL)
+ result.append([i.name, self.op.remote_node])
+ else:
+ ial = IAllocator(self.cfg, self.rpc,
+ mode=constants.IALLOCATOR_MODE_MEVAC,
+ evac_nodes=self.op.nodes)
+ ial.Run(self.op.iallocator, validate=True)
+ if not ial.success:
+ raise errors.OpExecError("No valid evacuation solution: %s" % ial.info,
+ errors.ECODE_NORES)
+ result = ial.result
+ return result
+
+
class LUGrowDisk(LogicalUnit):
"""Grow a disk of an instance.
"""Check the arguments.
"""
+ _CheckBooleanOpField(self.op, "remove_instance")
+ _CheckBooleanOpField(self.op, "ignore_remove_failures")
+
self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
constants.DEFAULT_SHUTDOWN_TIMEOUT)
+ self.remove_instance = getattr(self.op, "remove_instance", False)
+ self.ignore_remove_failures = getattr(self.op, "ignore_remove_failures",
+ False)
+
+ if self.remove_instance and not self.op.shutdown:
+ raise errors.OpPrereqError("Can not remove instance without shutting it"
+ " down before")
def ExpandNames(self):
self._ExpandAndLockInstance()
+
# FIXME: lock only instance primary and destination node
#
# Sad but true, for now we have do lock all nodes, as we don't know where
"EXPORT_NODE": self.op.target_node,
"EXPORT_DO_SHUTDOWN": self.op.shutdown,
"SHUTDOWN_TIMEOUT": self.shutdown_timeout,
+ # TODO: Generic function for boolean env variables
+ "REMOVE_INSTANCE": str(bool(self.remove_instance)),
}
env.update(_BuildInstanceHookEnvByObject(self, self.instance))
nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
_CheckNodeNotDrained(self, self.dst_node.name)
# instance disk type verification
+ # TODO: Implement export support for file-based disks
for disk in self.instance.disks:
if disk.dev_type == constants.LD_FILE:
raise errors.OpPrereqError("Export not supported for instances with"
feedback_fn("Shutting down instance %s" % instance.name)
result = self.rpc.call_instance_shutdown(src_node, instance,
self.shutdown_timeout)
+ # TODO: Maybe ignore failures if ignore_remove_failures is set
result.Raise("Could not shutdown instance %s on"
" node %s" % (instance.name, src_node))
snap_disks.append(new_dev)
finally:
- if self.op.shutdown and instance.admin_up:
+ if self.op.shutdown and instance.admin_up and not self.remove_instance:
feedback_fn("Starting instance %s" % instance.name)
result = self.rpc.call_instance_start(src_node, instance, None, None)
msg = result.fail_msg
feedback_fn("Deactivating disks for %s" % instance.name)
_ShutdownInstanceDisks(self, instance)
+ # Remove instance if requested
+ if self.remove_instance:
+ feedback_fn("Removing instance %s" % instance.name)
+ _RemoveInstance(self, feedback_fn, instance, self.ignore_remove_failures)
+
nodelist = self.cfg.GetNodeList()
nodelist.remove(dst_node.name)
if msg:
self.LogWarning("Could not remove older export for instance %s"
" on node %s: %s", iname, node, msg)
+
return fin_resu, dresults
# pylint: disable-msg=R0902
# lots of instance attributes
_ALLO_KEYS = [
- "mem_size", "disks", "disk_template",
+ "name", "mem_size", "disks", "disk_template",
"os", "tags", "nics", "vcpus", "hypervisor",
]
_RELO_KEYS = [
- "relocate_from",
+ "name", "relocate_from",
+ ]
+ _EVAC_KEYS = [
+ "evac_nodes",
]
- def __init__(self, cfg, rpc, mode, name, **kwargs):
+ def __init__(self, cfg, rpc, mode, **kwargs):
self.cfg = cfg
self.rpc = rpc
# init buffer variables
self.in_text = self.out_text = self.in_data = self.out_data = None
# init all input fields so that pylint is happy
self.mode = mode
- self.name = name
self.mem_size = self.disks = self.disk_template = None
self.os = self.tags = self.nics = self.vcpus = None
self.hypervisor = None
self.relocate_from = None
+ self.name = None
+ self.evac_nodes = None
# computed fields
self.required_nodes = None
# init result fields
- self.success = self.info = self.nodes = None
+ self.success = self.info = self.result = None
if self.mode == constants.IALLOCATOR_MODE_ALLOC:
keyset = self._ALLO_KEYS
+ fn = self._AddNewInstance
elif self.mode == constants.IALLOCATOR_MODE_RELOC:
keyset = self._RELO_KEYS
+ fn = self._AddRelocateInstance
+ elif self.mode == constants.IALLOCATOR_MODE_MEVAC:
+ keyset = self._EVAC_KEYS
+ fn = self._AddEvacuateNodes
else:
raise errors.ProgrammerError("Unknown mode '%s' passed to the"
" IAllocator" % self.mode)
raise errors.ProgrammerError("Invalid input parameter '%s' to"
" IAllocator" % key)
setattr(self, key, kwargs[key])
+
for key in keyset:
if key not in kwargs:
raise errors.ProgrammerError("Missing input parameter '%s' to"
" IAllocator" % key)
- self._BuildInputData()
+ self._BuildInputData(fn)
def _ComputeClusterData(self):
"""Compute the generic allocator input data.
hypervisor_name = self.hypervisor
elif self.mode == constants.IALLOCATOR_MODE_RELOC:
hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
+ elif self.mode == constants.IALLOCATOR_MODE_MEVAC:
+ hypervisor_name = cluster_info.enabled_hypervisors[0]
node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
hypervisor_name)
done.
"""
- data = self.in_data
-
disk_space = _ComputeDiskSize(self.disk_template, self.disks)
if self.disk_template in constants.DTS_NET_MIRROR:
else:
self.required_nodes = 1
request = {
- "type": "allocate",
"name": self.name,
"disk_template": self.disk_template,
"tags": self.tags,
"nics": self.nics,
"required_nodes": self.required_nodes,
}
- data["request"] = request
+ return request
def _AddRelocateInstance(self):
"""Add relocate instance data to allocator structure.
disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
request = {
- "type": "relocate",
"name": self.name,
"disk_space_total": disk_space,
"required_nodes": self.required_nodes,
"relocate_from": self.relocate_from,
}
- self.in_data["request"] = request
+ return request
- def _BuildInputData(self):
+ def _AddEvacuateNodes(self):
+ """Add evacuate nodes data to allocator structure.
+
+ """
+ request = {
+ "evac_nodes": self.evac_nodes
+ }
+ return request
+
+ def _BuildInputData(self, fn):
"""Build input data structures.
"""
self._ComputeClusterData()
- if self.mode == constants.IALLOCATOR_MODE_ALLOC:
- self._AddNewInstance()
- else:
- self._AddRelocateInstance()
+ request = fn()
+ request["type"] = self.mode
+ self.in_data["request"] = request
self.in_text = serializer.Dump(self.in_data)
if not isinstance(rdict, dict):
raise errors.OpExecError("Can't parse iallocator results: not a dict")
- for key in "success", "info", "nodes":
+ # TODO: remove backwards compatiblity in later versions
+ if "nodes" in rdict and "result" not in rdict:
+ rdict["result"] = rdict["nodes"]
+ del rdict["nodes"]
+
+ for key in "success", "info", "result":
if key not in rdict:
raise errors.OpExecError("Can't parse iallocator results:"
" missing key '%s'" % key)
setattr(self, key, rdict[key])
- if not isinstance(rdict["nodes"], list):
- raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
+ if not isinstance(rdict["result"], list):
+ raise errors.OpExecError("Can't parse iallocator results: 'result' key"
" is not a list")
self.out_data = rdict
fname = _ExpandInstanceName(self.cfg, self.op.name)
self.op.name = fname
self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
+ elif self.op.mode == constants.IALLOCATOR_MODE_MEVAC:
+ if not hasattr(self.op, "evac_nodes"):
+ raise errors.OpPrereqError("Missing attribute 'evac_nodes' on"
+ " opcode input", errors.ECODE_INVAL)
else:
raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
self.op.mode, errors.ECODE_INVAL)
vcpus=self.op.vcpus,
hypervisor=self.op.hypervisor,
)
- else:
+ elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
ial = IAllocator(self.cfg, self.rpc,
mode=self.op.mode,
name=self.op.name,
relocate_from=list(self.relocate_from),
)
+ elif self.op.mode == constants.IALLOCATOR_MODE_MEVAC:
+ ial = IAllocator(self.cfg, self.rpc,
+ mode=self.op.mode,
+ evac_nodes=self.op.evac_nodes)
+ else:
+ raise errors.ProgrammerError("Uncatched mode %s in"
+ " LUTestAllocator.Exec", self.op.mode)
if self.op.direction == constants.IALLOCATOR_DIR_IN:
result = ial.in_text
DAEMON_UTIL = _autoconf.PKGLIBDIR + "/daemon-util"
ETC_HOSTS = "/etc/hosts"
DEFAULT_FILE_STORAGE_DIR = _autoconf.FILE_STORAGE_DIR
+ ENABLE_FILE_STORAGE = _autoconf.ENABLE_FILE_STORAGE
SYSCONFDIR = _autoconf.SYSCONFDIR
TOOLSDIR = _autoconf.TOOLSDIR
CONF_DIR = SYSCONFDIR + "/ganeti"
+ ALL_CERT_FILES = frozenset([SSL_CERT_FILE, RAPI_CERT_FILE])
+
MASTER_SOCKET = SOCKET_DIR + "/ganeti-master"
NODED = "ganeti-noded"
CONFD = "ganeti-confd"
RAPI = "ganeti-rapi"
MASTERD = "ganeti-masterd"
+ # used in the ganeti-nbma project
+ NLD = "ganeti-nld"
-MULTITHREADED_DAEMONS = frozenset([MASTERD])
-
-DAEMONS_SSL = {
- # daemon-name: (default-cert-path, default-key-path)
- NODED: (SSL_CERT_FILE, SSL_CERT_FILE),
- RAPI: (RAPI_CERT_FILE, RAPI_CERT_FILE),
-}
-
DAEMONS_PORTS = {
# daemon-name: ("proto", "default-port")
NODED: ("tcp", 1811),
CONFD: ("udp", 1814),
RAPI: ("tcp", 5080),
- }
-
+ # used in the ganeti-nbma project
+ NLD: ("udp", 1816),
+ }
DEFAULT_NODED_PORT = DAEMONS_PORTS[NODED][1]
DEFAULT_CONFD_PORT = DAEMONS_PORTS[CONFD][1]
DEFAULT_RAPI_PORT = DAEMONS_PORTS[RAPI][1]
+ # used in the ganeti-nbma project
+ DEFAULT_NLD_PORT = DAEMONS_PORTS[NLD][1]
FIRST_DRBD_PORT = 11000
LAST_DRBD_PORT = 14999
CONFD: LOG_DIR + "conf-daemon.log",
RAPI: LOG_DIR + "rapi-daemon.log",
MASTERD: LOG_DIR + "master-daemon.log",
+ # used in the ganeti-nbma project
+ NLD: LOG_DIR + "nl-daemon.log",
}
+
LOG_OS_DIR = LOG_DIR + "os"
LOG_WATCHER = LOG_DIR + "watcher.log"
LOG_COMMANDS = LOG_DIR + "commands.log"
SOCAT_USE_ESCAPE = _autoconf.SOCAT_USE_ESCAPE
SOCAT_ESCAPE_CODE = "0x1d"
+# For RSA keys more bits are better, but they also make operations more
+# expensive. NIST SP 800-131 recommends a minimum of 2048 bits from the year
+# 2010 on.
+RSA_KEY_BITS = 2048
+
+# Digest used to sign certificates ("openssl x509" uses SHA1 by default)
+X509_CERT_SIGN_DIGEST = "SHA1"
+
VALUE_DEFAULT = "default"
VALUE_AUTO = "auto"
VALUE_GENERATE = "generate"
HOOKS_PHASE_PRE = "pre"
HOOKS_PHASE_POST = "post"
HOOKS_NAME_CFGUPDATE = "config-update"
+ HOOKS_NAME_WATCHER = "watcher"
HOOKS_VERSION = 2
# hooks subject type (what object type does the LU deal with)
DEFAULT_SHUTDOWN_TIMEOUT = 120
NODE_MAX_CLOCK_SKEW = 150
+ # runparts results
+ (RUNPARTS_SKIP,
+ RUNPARTS_RUN,
+ RUNPARTS_ERR) = range(3)
+
+ RUNPARTS_STATUS = frozenset([RUNPARTS_SKIP, RUNPARTS_RUN, RUNPARTS_ERR])
+
# RPC constants
(RPC_ENCODING_NONE,
RPC_ENCODING_ZLIB_BASE64) = range(2)
OS_VARIANTS_FILE = 'variants.list'
# ssh constants
- SSH_CONFIG_DIR = "/etc/ssh/"
- SSH_HOST_DSA_PRIV = SSH_CONFIG_DIR + "ssh_host_dsa_key"
+ SSH_CONFIG_DIR = _autoconf.SSH_CONFIG_DIR
+ SSH_HOST_DSA_PRIV = SSH_CONFIG_DIR + "/ssh_host_dsa_key"
SSH_HOST_DSA_PUB = SSH_HOST_DSA_PRIV + ".pub"
- SSH_HOST_RSA_PRIV = SSH_CONFIG_DIR + "ssh_host_rsa_key"
+ SSH_HOST_RSA_PRIV = SSH_CONFIG_DIR + "/ssh_host_rsa_key"
SSH_HOST_RSA_PUB = SSH_HOST_RSA_PRIV + ".pub"
SSH = "ssh"
SCP = "scp"
HV_MIGRATION_PORT = "migration_port"
HV_USE_LOCALTIME = "use_localtime"
HV_DISK_CACHE = "disk_cache"
+ HV_SECURITY_MODEL = "security_model"
+ HV_SECURITY_DOMAIN = "security_domain"
HVS_PARAMETER_TYPES = {
HV_BOOT_ORDER: VTYPE_STRING,
HV_MIGRATION_PORT: VTYPE_INT,
HV_USE_LOCALTIME: VTYPE_BOOL,
HV_DISK_CACHE: VTYPE_STRING,
+ HV_SECURITY_MODEL: VTYPE_STRING,
+ HV_SECURITY_DOMAIN: VTYPE_STRING,
}
HVS_PARAMETERS = frozenset(HVS_PARAMETER_TYPES.keys())
HT_KVM_VALID_BO_TYPES = frozenset([HT_BO_CDROM, HT_BO_DISK, HT_BO_NETWORK])
+ # Security models
+ HT_SM_NONE = "none"
+ HT_SM_USER = "user"
+ HT_SM_POOL = "pool"
+
+ HT_KVM_VALID_SM_TYPES = frozenset([HT_SM_NONE, HT_SM_USER, HT_SM_POOL])
+
# Cluster Verify steps
VERIFY_NPLUSONE_MEM = 'nplusone_mem'
VERIFY_OPTIONAL_CHECKS = frozenset([VERIFY_NPLUSONE_MEM])
NV_NODESETUP = "nodesetup"
NV_TIME = "time"
+ # SSL certificate check constants (in days)
+ SSL_CERT_EXPIRATION_WARN = 30
+ SSL_CERT_EXPIRATION_ERROR = 7
+
# Allocator framework constants
IALLOCATOR_VERSION = 2
IALLOCATOR_DIR_IN = "in"
IALLOCATOR_DIR_OUT = "out"
IALLOCATOR_MODE_ALLOC = "allocate"
IALLOCATOR_MODE_RELOC = "relocate"
+ IALLOCATOR_MODE_MEVAC = "multi-evacuate"
IALLOCATOR_SEARCH_PATH = _autoconf.IALLOCATOR_SEARCH_PATH
# Job queue
HV_MIGRATION_PORT: 8102,
HV_USE_LOCALTIME: False,
HV_DISK_CACHE: HT_CACHE_DEFAULT,
+ HV_SECURITY_MODEL: HT_SM_NONE,
+ HV_SECURITY_DOMAIN: '',
},
HT_FAKE: {
},
from cStringIO import StringIO
from ganeti import constants
-from ganeti import serializer
from ganeti import utils
HTTP_AUTHENTICATION_INFO = "Authentication-Info"
HTTP_ALLOW = "Allow"
+HTTP_APP_OCTET_STREAM = "application/octet-stream"
+HTTP_APP_JSON = "application/json"
+
_SSL_UNEXPECTED_EOF = "Unexpected EOF"
# Socket operations
code = 405
+class HttpNotAcceptable(HttpException):
+ """406 Not Acceptable
+
+ RFC2616, 10.4.7: The resource identified by the request is only capable of
+ generating response entities which have content characteristics not
+ acceptable according to the accept headers sent in the request.
+
+ """
+ code = 406
+
+
class HttpRequestTimeout(HttpException):
"""408 Request Timeout
code = 412
+class HttpUnsupportedMediaType(HttpException):
+ """415 Unsupported Media Type
+
+ RFC2616, 10.4.16: The server is refusing to service the request because the
+ entity of the request is in a format not supported by the requested resource
+ for the requested method.
+
+ """
+ code = 415
+
+
class HttpInternalServerError(HttpException):
"""500 Internal Server Error
code = 505
-class HttpJsonConverter: # pylint: disable-msg=W0232
- CONTENT_TYPE = "application/json"
-
- @staticmethod
- def Encode(data):
- return serializer.DumpJson(data)
-
- @staticmethod
- def Decode(data):
- return serializer.LoadJson(data)
-
-
def WaitForSocketCondition(sock, event, timeout):
"""Waits for a condition to occur on the socket.
if event is None:
raise HttpSocketTimeout()
- if (op == SOCKOP_RECV and
- event & (select.POLLNVAL | select.POLLHUP | select.POLLERR)):
- return ""
+ if event & (select.POLLNVAL | select.POLLHUP | select.POLLERR):
+ # Let the socket functions handle these
+ break
if not event & wait_for_event:
continue
self.start_line = None
self.headers = None
self.body = None
- self.decoded_body = None
class HttpClientToServerStartLine(object):
buf = self._ContinueParsing(buf, eof)
# Must be done only after the buffer has been evaluated
- # TODO: Connection-length < len(data read) and connection closed
+ # TODO: Content-Length < len(data read) and connection closed
if (eof and
self.parser_status in (self.PS_START_LINE,
self.PS_HEADERS)):
assert self.parser_status == self.PS_COMPLETE
assert not buf, "Parser didn't read full response"
+ # Body is complete
msg.body = self.body_buffer.getvalue()
- # TODO: Content-type, error handling
- if msg.body:
- msg.decoded_body = HttpJsonConverter().Decode(msg.body)
- else:
- msg.decoded_body = None
-
- if msg.decoded_body:
- logging.debug("Message body: %s", msg.decoded_body)
-
def _ContinueParsing(self, buf, eof):
"""Main function for HTTP message state machine.
# the CRLF."
if idx == 0:
# TODO: Limit number of CRLFs/empty lines for safety?
- buf = buf[:2]
+ buf = buf[2:]
continue
if idx > 0:
"""Data structure for HTTP request on server side.
"""
- def __init__(self, request_msg):
+ def __init__(self, method, path, headers, body):
# Request attributes
- self.request_method = request_msg.start_line.method
- self.request_path = request_msg.start_line.path
- self.request_headers = request_msg.headers
- self.request_body = request_msg.body
+ self.request_method = method
+ self.request_path = path
+ self.request_headers = headers
+ self.request_body = body
# Response attributes
self.resp_headers = {}
# authentication)
self.private = None
+ def __repr__(self):
+ status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
+ self.request_method, self.request_path,
+ "headers=%r" % str(self.request_headers),
+ "body=%r" % (self.request_body, )]
+
+ return "<%s at %#x>" % (" ".join(status), id(self))
+
class _HttpServerToClientMessageWriter(http.HttpMessageWriter):
"""Writes an HTTP response to client.
try:
try:
request_msg_reader = self._ReadRequest()
+
+ # RFC2616, 14.23: All Internet-based HTTP/1.1 servers MUST respond
+ # with a 400 (Bad Request) status code to any HTTP/1.1 request
+ # message which lacks a Host header field.
+ if (self.request_msg.start_line.version == http.HTTP_1_1 and
+ http.HTTP_HOST not in self.request_msg.headers):
+ raise http.HttpBadRequest(message="Missing Host header")
+
self._HandleRequest()
# Only wait for client to close if we didn't have any exception.
"""Calls the handler function for the current request.
"""
- handler_context = _HttpServerRequest(self.request_msg)
+ handler_context = _HttpServerRequest(self.request_msg.start_line.method,
+ self.request_msg.start_line.path,
+ self.request_msg.headers,
- self.request_msg.decoded_body)
++ self.request_msg.body)
+
+ logging.debug("Handling request %r", handler_context)
try:
try:
logging.exception("Unknown exception")
raise http.HttpInternalServerError(message="Unknown error")
- # TODO: Content-type
- encoder = http.HttpJsonConverter()
+ if not isinstance(result, basestring):
+ raise http.HttpError("Handler function didn't return string type")
+
self.response_msg.start_line.code = http.HTTP_OK
- self.response_msg.body = encoder.Encode(result)
self.response_msg.headers = handler_context.resp_headers
- self.response_msg.headers[http.HTTP_CONTENT_TYPE] = encoder.CONTENT_TYPE
+ self.response_msg.body = result
finally:
# No reason to keep this any longer, even for exceptions
handler_context.private = None
"""
return self.error_message_format % values
+
class HttpServer(http.HttpBase, asyncore.dispatcher):
"""Generic HTTP server class
import collections
import time
import errno
+import logging
from ganeti import serializer
from ganeti import constants
from ganeti import errors
+ from ganeti import utils
-KEY_METHOD = 'method'
-KEY_ARGS = 'args'
+KEY_METHOD = "method"
+KEY_ARGS = "args"
KEY_SUCCESS = "success"
KEY_RESULT = "result"
DEF_RWTO = 60
-class ProtocolError(Exception):
- """Denotes an error in the server communication"""
+class ProtocolError(errors.GenericError):
+ """Denotes an error in the LUXI protocol"""
class ConnectionClosedError(ProtocolError):
"""Operation timeout error"""
-class EncodingError(ProtocolError):
- """Encoding failure on the sending side"""
-
-
-class DecodingError(ProtocolError):
- """Decoding failure on the receiving side"""
-
-
class RequestError(ProtocolError):
"""Error on request
try:
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- self.socket.settimeout(self._ctimeout)
+
+ # Try to connect
try:
- self.socket.connect(address)
- except socket.timeout, err:
- raise TimeoutError("Connect timed out: %s" % str(err))
- except socket.error, err:
- if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
- raise NoMasterError(address)
- raise
+ utils.Retry(self._Connect, 1.0, self._ctimeout,
+ args=(self.socket, address, self._ctimeout))
+ except utils.RetryTimeout:
+ raise TimeoutError("Connect timed out")
+
self.socket.settimeout(self._rwtimeout)
except (socket.error, NoMasterError):
if self.socket is not None:
self.socket = None
raise
+ @staticmethod
+ def _Connect(sock, address, timeout):
+ sock.settimeout(timeout)
+ try:
+ sock.connect(address)
+ except socket.timeout, err:
+ raise TimeoutError("Connect timed out: %s" % str(err))
+ except socket.error, err:
+ if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
+ raise NoMasterError(address)
+ if err.args[0] == errno.EAGAIN:
+ # Server's socket backlog is full at the moment
+ raise utils.RetryAgain()
+ raise
+
def _CheckSocket(self):
"""Make sure we are connected.
"""
if self.eom in msg:
- raise EncodingError("Message terminator found in payload")
+ raise ProtocolError("Message terminator found in payload")
+
self._CheckSocket()
try:
# TODO: sendall is not guaranteed to send everything
self.socket = None
+def ParseRequest(msg):
+ """Parses a LUXI request message.
+
+ """
+ try:
+ request = serializer.LoadJson(msg)
+ except ValueError, err:
+ raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
+
+ logging.debug("LUXI request: %s", request)
+
+ if not isinstance(request, dict):
+ logging.error("LUXI request not a dict: %r", msg)
+ raise ProtocolError("Invalid LUXI request (not a dict)")
+
+ method = request.get(KEY_METHOD, None)
+ args = request.get(KEY_ARGS, None)
+ if method is None or args is None:
+ logging.error("LUXI request missing method or arguments: %r", msg)
+ raise ProtocolError(("Invalid LUXI request (no method or arguments"
+ " in request): %r") % msg)
+
+ return (method, args)
+
+
+def ParseResponse(msg):
+ """Parses a LUXI response message.
+
+ """
+ # Parse the result
+ try:
+ data = serializer.LoadJson(msg)
+ except Exception, err:
+ raise ProtocolError("Error while deserializing response: %s" % str(err))
+
+ # Validate response
+ if not (isinstance(data, dict) and
+ KEY_SUCCESS in data and
+ KEY_RESULT in data):
+ raise ProtocolError("Invalid response from server: %r" % data)
+
+ return (data[KEY_SUCCESS], data[KEY_RESULT])
+
+
+def FormatResponse(success, result):
+ """Formats a LUXI response message.
+
+ """
+ response = {
+ KEY_SUCCESS: success,
+ KEY_RESULT: result,
+ }
+
+ logging.debug("LUXI response: %s", response)
+
+ return serializer.DumpJson(response)
+
+
+def FormatRequest(method, args):
+ """Formats a LUXI request message.
+
+ """
+ # Build request
+ request = {
+ KEY_METHOD: method,
+ KEY_ARGS: args,
+ }
+
+ # Serialize the request
+ return serializer.DumpJson(request, indent=False)
+
+
+def CallLuxiMethod(transport_cb, method, args):
+ """Send a LUXI request via a transport and return the response.
+
+ """
+ assert callable(transport_cb)
+
+ request_msg = FormatRequest(method, args)
+
+ # Send request and wait for response
+ response_msg = transport_cb(request_msg)
+
+ (success, result) = ParseResponse(response_msg)
+
+ if success:
+ return result
+
+ errors.MaybeRaise(result)
+ raise RequestError(result)
+
+
class Client(object):
"""High-level client implementation.
except Exception: # pylint: disable-msg=W0703
pass
- def CallMethod(self, method, args):
- """Send a generic request and return the response.
-
- """
- # Build request
- request = {
- KEY_METHOD: method,
- KEY_ARGS: args,
- }
-
- # Serialize the request
- send_data = serializer.DumpJson(request, indent=False)
-
+ def _SendMethodCall(self, data):
# Send request and wait for response
try:
self._InitTransport()
- result = self.transport.Call(send_data)
+ return self.transport.Call(data)
except Exception:
self._CloseTransport()
raise
- # Parse the result
- try:
- data = serializer.LoadJson(result)
- except Exception, err:
- raise ProtocolError("Error while deserializing response: %s" % str(err))
-
- # Validate response
- if (not isinstance(data, dict) or
- KEY_SUCCESS not in data or
- KEY_RESULT not in data):
- raise DecodingError("Invalid response from server: %s" % str(data))
-
- result = data[KEY_RESULT]
-
- if not data[KEY_SUCCESS]:
- errors.MaybeRaise(result)
- raise RequestError(result)
+ def CallMethod(self, method, args):
+ """Send a generic request and return the response.
- return result
+ """
+ return CallLuxiMethod(self._SendMethodCall, method, args)
def SetQueueDrainFlag(self, drain_flag):
return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
timeout = (DEF_RWTO - 1) / 2
return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
- def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
+ def WaitForJobChangeOnce(self, job_id, fields,
+ prev_job_info, prev_log_serial):
timeout = (DEF_RWTO - 1) / 2
+ return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
+ (job_id, fields, prev_job_info,
+ prev_log_serial, timeout))
+
+ def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
while True:
- result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
- (job_id, fields, prev_job_info,
- prev_log_serial, timeout))
+ result = self.WaitForJobChangeOnce(job_id, fields,
+ prev_job_info, prev_log_serial)
if result != constants.JOB_NOTCHANGED:
break
return result
def QueryTags(self, kind, name):
return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
-
-
- # TODO: class Server(object)
_TIMESTAMPS = ["ctime", "mtime"]
_UUID = ["uuid"]
+
def FillDict(defaults_dict, custom_dict, skip_keys=None):
"""Basic function to apply settings on top a default dict.
irrespective of their status. For such devices, we return this
path, for others we return None.
+ @warning: The path returned is not a normalized pathname; callers
+ should check that it is a valid path.
+
"""
if self.dev_type == constants.LD_LV:
return "/dev/%s/%s" % (self.logical_id[0], self.logical_id[1])
"file_storage_dir",
"enabled_hypervisors",
"hvparams",
+ "os_hvp",
"beparams",
"nicparams",
"candidate_pool_size",
self.hvparams[hypervisor] = FillDict(
constants.HVC_DEFAULTS[hypervisor], self.hvparams[hypervisor])
+ # TODO: Figure out if it's better to put this into OS than Cluster
+ if self.os_hvp is None:
+ self.os_hvp = {}
+
self.beparams = UpgradeGroupedParams(self.beparams,
constants.BEC_DEFAULTS)
migrate_default_bridge = not self.nicparams
skip_keys = constants.HVC_GLOBALS
else:
skip_keys = []
- return FillDict(self.hvparams.get(instance.hypervisor, {}),
- instance.hvparams, skip_keys=skip_keys)
+
+ # We fill the list from least to most important override
+ fill_stack = [
+ self.hvparams.get(instance.hypervisor, {}),
+ self.os_hvp.get(instance.os, {}).get(instance.hypervisor, {}),
+ instance.hvparams,
+ ]
+
+ ret_dict = {}
+ for o_dict in fill_stack:
+ ret_dict = FillDict(ret_dict, o_dict, skip_keys=skip_keys)
+
+ return ret_dict
def FillBE(self, instance):
"""Fill an instance's beparams dict.
type(state))
for name in self._all_slots():
- if name not in state:
+ if name not in state and hasattr(self, name):
delattr(self, name)
for name in state:
"vg_name",
"enabled_hypervisors",
"hvparams",
+ "os_hvp",
"beparams",
"nicparams",
"candidate_pool_size",
"master_candidate",
"offline",
"drained",
+ "auto_promote",
]
]
+ class OpNodeEvacuationStrategy(OpCode):
+ """Compute the evacuation strategy for a list of nodes."""
+ OP_ID = "OP_NODE_EVAC_STRATEGY"
+ OP_DSC_FIELD = "nodes"
+ __slots__ = ["nodes", "iallocator", "remote_node"]
+
+
# instance opcodes
class OpCreateInstance(OpCode):
OP_DSC_FIELD = "instance_name"
__slots__ = [
"instance_name", "target_node", "shutdown", "shutdown_timeout",
+ "remove_instance",
+ "ignore_remove_failures",
]
"direction", "mode", "allocator", "name",
"mem_size", "disks", "disk_template",
"os", "tags", "nics", "vcpus", "hypervisor",
+ "evac_nodes",
]
import os
+import sys
import time
import subprocess
import re
import logging
import logging.handlers
import signal
+import OpenSSL
+ import datetime
+ import calendar
from cStringIO import StringIO
output = property(_GetOutput, None, None, "Return full output")
- def _BuildCmdEnvironment(env):
-def RunCmd(cmd, env=None, output=None, cwd='/', reset_env=False):
++def _BuildCmdEnvironment(env, reset):
+ """Builds the environment for an external program.
+
+ """
- cmd_env = os.environ.copy()
- cmd_env["LC_ALL"] = "C"
++ if reset:
++ cmd_env = {}
++ else:
++ cmd_env = os.environ.copy()
++ cmd_env["LC_ALL"] = "C"
++
+ if env is not None:
+ cmd_env.update(env)
++
+ return cmd_env
+
+
- def RunCmd(cmd, env=None, output=None, cwd="/"):
++def RunCmd(cmd, env=None, output=None, cwd="/", reset_env=False):
"""Execute a (shell) command.
The command should not read from its standard input, as it will be
@type cmd: string or list
@param cmd: Command to run
@type env: dict
- @param env: Additional environment
+ @param env: Additional environment variables
@type output: str
@param output: if desired, the output of the command can be
saved in a file instead of the RunResult instance; this
@type cwd: string
@param cwd: if specified, will be used as the working
directory for the command; the default will be /
+ @type reset_env: boolean
+ @param reset_env: whether to reset or keep the default os environment
@rtype: L{RunResult}
@return: RunResult instance
@raise errors.ProgrammerError: if we call this when forks are disabled
if no_fork:
raise errors.ProgrammerError("utils.RunCmd() called with fork() disabled")
- if isinstance(cmd, list):
- cmd = [str(val) for val in cmd]
- strcmd = " ".join(cmd)
- shell = False
- else:
+ if isinstance(cmd, basestring):
strcmd = cmd
shell = True
- logging.debug("RunCmd '%s'", strcmd)
+ else:
+ cmd = [str(val) for val in cmd]
+ strcmd = ShellQuoteArgs(cmd)
+ shell = False
- if not reset_env:
- cmd_env = os.environ.copy()
- cmd_env["LC_ALL"] = "C"
+ if output:
+ logging.debug("RunCmd %s, output file '%s'", strcmd, output)
else:
- cmd_env = {}
+ logging.debug("RunCmd %s", strcmd)
- cmd_env = _BuildCmdEnvironment(env)
- if env is not None:
- cmd_env.update(env)
++ cmd_env = _BuildCmdEnvironment(env, reset_env)
try:
if output is None:
return RunResult(exitcode, signal_, out, err, strcmd)
+def StartDaemon(cmd, env=None, cwd="/", output=None, output_fd=None,
+ pidfile=None):
+ """Start a daemon process after forking twice.
+
+ @type cmd: string or list
+ @param cmd: Command to run
+ @type env: dict
+ @param env: Additional environment variables
+ @type cwd: string
+ @param cwd: Working directory for the program
+ @type output: string
+ @param output: Path to file in which to save the output
+ @type output_fd: int
+ @param output_fd: File descriptor for output
+ @type pidfile: string
+ @param pidfile: Process ID file
+ @rtype: int
+ @return: Daemon process ID
+ @raise errors.ProgrammerError: if we call this when forks are disabled
+
+ """
+ if no_fork:
+ raise errors.ProgrammerError("utils.StartDaemon() called with fork()"
+ " disabled")
+
+ if output and not (bool(output) ^ (output_fd is not None)):
+ raise errors.ProgrammerError("Only one of 'output' and 'output_fd' can be"
+ " specified")
+
+ if isinstance(cmd, basestring):
+ cmd = ["/bin/sh", "-c", cmd]
+
+ strcmd = ShellQuoteArgs(cmd)
+
+ if output:
+ logging.debug("StartDaemon %s, output file '%s'", strcmd, output)
+ else:
+ logging.debug("StartDaemon %s", strcmd)
+
- cmd_env = _BuildCmdEnvironment(env)
++ cmd_env = _BuildCmdEnvironment(env, False)
+
+ # Create pipe for sending PID back
+ (pidpipe_read, pidpipe_write) = os.pipe()
+ try:
+ try:
+ # Create pipe for sending error messages
+ (errpipe_read, errpipe_write) = os.pipe()
+ try:
+ try:
+ # First fork
+ pid = os.fork()
+ if pid == 0:
+ try:
+ # Child process, won't return
+ _StartDaemonChild(errpipe_read, errpipe_write,
+ pidpipe_read, pidpipe_write,
+ cmd, cmd_env, cwd,
+ output, output_fd, pidfile)
+ finally:
+ # Well, maybe child process failed
+ os._exit(1) # pylint: disable-msg=W0212
+ finally:
+ _CloseFDNoErr(errpipe_write)
+
+ # Wait for daemon to be started (or an error message to arrive) and read
+ # up to 100 KB as an error message
+ errormsg = RetryOnSignal(os.read, errpipe_read, 100 * 1024)
+ finally:
+ _CloseFDNoErr(errpipe_read)
+ finally:
+ _CloseFDNoErr(pidpipe_write)
+
+ # Read up to 128 bytes for PID
+ pidtext = RetryOnSignal(os.read, pidpipe_read, 128)
+ finally:
+ _CloseFDNoErr(pidpipe_read)
+
+ # Try to avoid zombies by waiting for child process
+ try:
+ os.waitpid(pid, 0)
+ except OSError:
+ pass
+
+ if errormsg:
+ raise errors.OpExecError("Error when starting daemon process: %r" %
+ errormsg)
+
+ try:
+ return int(pidtext)
+ except (ValueError, TypeError), err:
+ raise errors.OpExecError("Error while trying to parse PID %r: %s" %
+ (pidtext, err))
+
+
+def _StartDaemonChild(errpipe_read, errpipe_write,
+ pidpipe_read, pidpipe_write,
+ args, env, cwd,
+ output, fd_output, pidfile):
+ """Child process for starting daemon.
+
+ """
+ try:
+ # Close parent's side
+ _CloseFDNoErr(errpipe_read)
+ _CloseFDNoErr(pidpipe_read)
+
+ # First child process
+ os.chdir("/")
+ os.umask(077)
+ os.setsid()
+
+ # And fork for the second time
+ pid = os.fork()
+ if pid != 0:
+ # Exit first child process
+ os._exit(0) # pylint: disable-msg=W0212
+
+ # Make sure pipe is closed on execv* (and thereby notifies original process)
+ SetCloseOnExecFlag(errpipe_write, True)
+
+ # List of file descriptors to be left open
+ noclose_fds = [errpipe_write]
+
+ # Open PID file
+ if pidfile:
+ try:
+ # TODO: Atomic replace with another locked file instead of writing into
+ # it after creating
+ fd_pidfile = os.open(pidfile, os.O_WRONLY | os.O_CREAT, 0600)
+
+ # Lock the PID file (and fail if not possible to do so). Any code
+ # wanting to send a signal to the daemon should try to lock the PID
+ # file before reading it. If acquiring the lock succeeds, the daemon is
+ # no longer running and the signal should not be sent.
+ LockFile(fd_pidfile)
+
+ os.write(fd_pidfile, "%d\n" % os.getpid())
+ except Exception, err:
+ raise Exception("Creating and locking PID file failed: %s" % err)
+
+ # Keeping the file open to hold the lock
+ noclose_fds.append(fd_pidfile)
+
+ SetCloseOnExecFlag(fd_pidfile, False)
+ else:
+ fd_pidfile = None
+
+ # Open /dev/null
+ fd_devnull = os.open(os.devnull, os.O_RDWR)
+
+ assert not output or (bool(output) ^ (fd_output is not None))
+
+ if fd_output is not None:
+ pass
+ elif output:
+ # Open output file
+ try:
+ # TODO: Implement flag to set append=yes/no
+ fd_output = os.open(output, os.O_WRONLY | os.O_CREAT, 0600)
+ except EnvironmentError, err:
+ raise Exception("Opening output file failed: %s" % err)
+ else:
+ fd_output = fd_devnull
+
+ # Redirect standard I/O
+ os.dup2(fd_devnull, 0)
+ os.dup2(fd_output, 1)
+ os.dup2(fd_output, 2)
+
+ # Send daemon PID to parent
+ RetryOnSignal(os.write, pidpipe_write, str(os.getpid()))
+
+ # Close all file descriptors except stdio and error message pipe
+ CloseFDs(noclose_fds=noclose_fds)
+
+ # Change working directory
+ os.chdir(cwd)
+
+ if env is None:
+ os.execvp(args[0], args)
+ else:
+ os.execvpe(args[0], args, env)
+ except: # pylint: disable-msg=W0702
+ try:
+ # Report errors to original process
+ buf = str(sys.exc_info()[1])
+
+ RetryOnSignal(os.write, errpipe_write, buf)
+ except: # pylint: disable-msg=W0702
+ # Ignore errors in error handling
+ pass
+
+ os._exit(1) # pylint: disable-msg=W0212
+
+
def _RunCmdPipe(cmd, env, via_shell, cwd):
"""Run a command and return its output.
child.stderr.fileno(): (err, child.stderr),
}
for fd in fdmap:
- status = fcntl.fcntl(fd, fcntl.F_GETFL)
- fcntl.fcntl(fd, fcntl.F_SETFL, status | os.O_NONBLOCK)
+ SetNonblockFlag(fd, True)
while fdmap:
- try:
- pollresult = poller.poll()
- except EnvironmentError, eerr:
- if eerr.errno == errno.EINTR:
- continue
- raise
- except select.error, serr:
- if serr[0] == errno.EINTR:
- continue
- raise
+ pollresult = RetryOnSignal(poller.poll)
for fd, event in pollresult:
if event & select.POLLIN or event & select.POLLPRI:
return status
+def SetCloseOnExecFlag(fd, enable):
+ """Sets or unsets the close-on-exec flag on a file descriptor.
+
+ @type fd: int
+ @param fd: File descriptor
+ @type enable: bool
+ @param enable: Whether to set or unset it.
+
+ """
+ flags = fcntl.fcntl(fd, fcntl.F_GETFD)
+
+ if enable:
+ flags |= fcntl.FD_CLOEXEC
+ else:
+ flags &= ~fcntl.FD_CLOEXEC
+
+ fcntl.fcntl(fd, fcntl.F_SETFD, flags)
+
+
+def SetNonblockFlag(fd, enable):
+ """Sets or unsets the O_NONBLOCK flag on on a file descriptor.
+
+ @type fd: int
+ @param fd: File descriptor
+ @type enable: bool
+ @param enable: Whether to set or unset it
+
+ """
+ flags = fcntl.fcntl(fd, fcntl.F_GETFL)
+
+ if enable:
+ flags |= os.O_NONBLOCK
+ else:
+ flags &= ~os.O_NONBLOCK
+
+ fcntl.fcntl(fd, fcntl.F_SETFL, flags)
+
+
+def RetryOnSignal(fn, *args, **kwargs):
+ """Calls a function again if it failed due to EINTR.
+
+ """
+ while True:
+ try:
+ return fn(*args, **kwargs)
+ except EnvironmentError, err:
+ if err.errno != errno.EINTR:
+ raise
+ except select.error, err:
+ if not (err.args and err.args[0] == errno.EINTR):
+ raise
+
+
+ def RunParts(dir_name, env=None, reset_env=False):
+ """Run Scripts or programs in a directory
+
+ @type dir_name: string
+ @param dir_name: absolute path to a directory
+ @type env: dict
+ @param env: The environment to use
+ @type reset_env: boolean
+ @param reset_env: whether to reset or keep the default os environment
+ @rtype: list of tuples
+ @return: list of (name, (one of RUNDIR_STATUS), RunResult)
+
+ """
+ rr = []
+
+ try:
+ dir_contents = ListVisibleFiles(dir_name)
+ except OSError, err:
+ logging.warning("RunParts: skipping %s (cannot list: %s)", dir_name, err)
+ return rr
+
+ for relname in sorted(dir_contents):
+ fname = PathJoin(dir_name, relname)
+ if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
+ constants.EXT_PLUGIN_MASK.match(relname) is not None):
+ rr.append((relname, constants.RUNPARTS_SKIP, None))
+ else:
+ try:
+ result = RunCmd([fname], env=env, reset_env=reset_env)
+ except Exception, err: # pylint: disable-msg=W0703
+ rr.append((relname, constants.RUNPARTS_ERR, str(err)))
+ else:
+ rr.append((relname, constants.RUNPARTS_RUN, result))
+
+ return rr
+
+
def RemoveFile(filename):
"""Remove a file ignoring some errors.
"""Class implementing resolver and hostname functionality
"""
+ _VALID_NAME_RE = re.compile("^[a-z0-9._-]{1,255}$")
+
def __init__(self, name=None):
"""Initialize the host name object.
return result
+ @classmethod
+ def NormalizeName(cls, hostname):
+ """Validate and normalize the given hostname.
+
+ @attention: the validation is a bit more relaxed than the standards
+ require; most importantly, we allow underscores in names
+ @raise errors.OpPrereqError: when the name is not valid
+
+ """
+ hostname = hostname.lower()
+ if (not cls._VALID_NAME_RE.match(hostname) or
+ # double-dots, meaning empty label
+ ".." in hostname or
+ # empty initial label
+ hostname.startswith(".")):
+ raise errors.OpPrereqError("Invalid hostname '%s'" % hostname,
+ errors.ECODE_INVAL)
+ if hostname.endswith("."):
+ hostname = hostname.rstrip(".")
+ return hostname
+
def GetHostInfo(name=None):
"""Lookup host name and raise an OpPrereqError for failures"""
RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
+ def TimestampForFilename():
+ """Returns the current time formatted for filenames.
+
+ The format doesn't contain colons as some shells and applications them as
+ separators.
+
+ """
+ return time.strftime("%Y-%m-%d_%H_%M_%S")
+
+
def CreateBackup(file_name):
"""Creates a backup of a file.
raise errors.ProgrammerError("Can't make a backup of a non-file '%s'" %
file_name)
- prefix = '%s.backup-%d.' % (os.path.basename(file_name), int(time.time()))
+ prefix = ("%s.backup-%s." %
+ (os.path.basename(file_name), TimestampForFilename()))
dir_name = os.path.dirname(file_name)
fsrc = open(file_name, 'rb')
(fd, backup_name) = tempfile.mkstemp(prefix=prefix, dir=dir_name)
fdst = os.fdopen(fd, 'wb')
try:
+ logging.debug("Backing up %s at %s", file_name, backup_name)
shutil.copyfileobj(fsrc, fdst)
finally:
fdst.close()
@param path: the directory to enumerate
@rtype: list
@return: the list of all files not starting with a dot
+ @raise ProgrammerError: if L{path} is not an absolue and normalized path
"""
+ if not IsNormAbsPath(path):
+ raise errors.ProgrammerError("Path passed to ListVisibleFiles is not"
+ " absolute/normalized: '%s'" % path)
files = [i for i in os.listdir(path) if not i.startswith(".")]
files.sort()
return files
return False
+ def partition(seq, pred=bool): # # pylint: disable-msg=W0622
+ "Partition a list in two, based on the given predicate"
+ return (list(itertools.ifilter(pred, seq)),
+ list(itertools.ifilterfalse(pred, seq)))
+
+
def UniqueSequence(seq):
"""Returns a list with unique elements.
daemon name
"""
- return os.path.join(constants.RUN_GANETI_DIR, "%s.pid" % name)
+ return PathJoin(constants.RUN_GANETI_DIR, "%s.pid" % name)
+
+
+ def EnsureDaemon(name):
+ """Check for and start daemon if not alive.
+
+ """
+ result = RunCmd([constants.DAEMON_UTIL, "check-and-start", name])
+ if result.failed:
+ logging.error("Can't start daemon '%s', failure %s, output: %s",
+ name, result.fail_reason, result.output)
+ return False
+
+ return True
def WritePidFile(name):
return None
for dir_name in search_path:
+ # FIXME: investigate switch to PathJoin
item_name = os.path.sep.join([dir_name, name])
# check the user test and that we're indeed resolving to the given
# basename
return os.path.normpath(path) == path and os.path.isabs(path)
+ def PathJoin(*args):
+ """Safe-join a list of path components.
+
+ Requirements:
+ - the first argument must be an absolute path
+ - no component in the path must have backtracking (e.g. /../),
+ since we check for normalization at the end
+
+ @param args: the path components to be joined
+ @raise ValueError: for invalid paths
+
+ """
+ # ensure we're having at least one path passed in
+ assert args
+ # ensure the first component is an absolute and normalized path name
+ root = args[0]
+ if not IsNormAbsPath(root):
+ raise ValueError("Invalid parameter to PathJoin: '%s'" % str(args[0]))
+ result = os.path.join(*args)
+ # ensure that the whole path is normalized
+ if not IsNormAbsPath(result):
+ raise ValueError("Invalid parameters to PathJoin: '%s'" % str(args))
+ # check that we're still under the original prefix
+ prefix = os.path.commonprefix([root, result])
+ if prefix != root:
+ raise ValueError("Error: path joining resulted in different prefix"
+ " (%s != %s)" % (prefix, root))
+ return result
+
+
def TailFile(fname, lines=20):
"""Return the last lines from a file.
return rows[-lines:]
+ def _ParseAsn1Generalizedtime(value):
+ """Parses an ASN1 GENERALIZEDTIME timestamp as used by pyOpenSSL.
+
+ @type value: string
+ @param value: ASN1 GENERALIZEDTIME timestamp
+
+ """
+ m = re.match(r"^(\d+)([-+]\d\d)(\d\d)$", value)
+ if m:
+ # We have an offset
+ asn1time = m.group(1)
+ hours = int(m.group(2))
+ minutes = int(m.group(3))
+ utcoffset = (60 * hours) + minutes
+ else:
+ if not value.endswith("Z"):
+ raise ValueError("Missing timezone")
+ asn1time = value[:-1]
+ utcoffset = 0
+
+ parsed = time.strptime(asn1time, "%Y%m%d%H%M%S")
+
+ tt = datetime.datetime(*(parsed[:7])) - datetime.timedelta(minutes=utcoffset)
+
+ return calendar.timegm(tt.utctimetuple())
+
+
+ def GetX509CertValidity(cert):
+ """Returns the validity period of the certificate.
+
+ @type cert: OpenSSL.crypto.X509
+ @param cert: X509 certificate object
+
+ """
+ # The get_notBefore and get_notAfter functions are only supported in
+ # pyOpenSSL 0.7 and above.
+ try:
+ get_notbefore_fn = cert.get_notBefore
+ except AttributeError:
+ not_before = None
+ else:
+ not_before_asn1 = get_notbefore_fn()
+
+ if not_before_asn1 is None:
+ not_before = None
+ else:
+ not_before = _ParseAsn1Generalizedtime(not_before_asn1)
+
+ try:
+ get_notafter_fn = cert.get_notAfter
+ except AttributeError:
+ not_after = None
+ else:
+ not_after_asn1 = get_notafter_fn()
+
+ if not_after_asn1 is None:
+ not_after = None
+ else:
+ not_after = _ParseAsn1Generalizedtime(not_after_asn1)
+
+ return (not_before, not_after)
+
+
def SafeEncode(text):
"""Return a 'safe' version of a source string.
for (curpath, _, files) in os.walk(path):
for filename in files:
- st = os.lstat(os.path.join(curpath, filename))
+ st = os.lstat(PathJoin(curpath, filename))
size += st.st_size
return BytesToMebibyte(size)
return (tsize, fsize)
- def RunInSeparateProcess(fn):
+ def RunInSeparateProcess(fn, *args):
"""Runs a function in a separate process.
Note: Only boolean return values are supported.
@type fn: callable
@param fn: Function to be called
- @rtype: tuple of (int/None, int/None)
- @return: Exit code and signal number
+ @rtype: bool
+ @return: Function's result
"""
pid = os.fork()
ResetTempfileModule()
# Call function
- result = int(bool(fn()))
+ result = int(bool(fn(*args)))
assert result in (0, 1)
except: # pylint: disable-msg=W0702
logging.exception("Error while calling function in separate process")
wait_fn(current_delay)
+def GetClosedTempfile(*args, **kwargs):
+ """Creates a temporary file and returns its path.
+
+ """
+ (fd, path) = tempfile.mkstemp(*args, **kwargs)
+ _CloseFDNoErr(fd)
+ return path
+
+
+def GenerateSelfSignedX509Cert(common_name, validity):
+ """Generates a self-signed X509 certificate.
+
+ @type common_name: string
+ @param common_name: commonName value
+ @type validity: int
+ @param validity: Validity for certificate in seconds
+
+ """
+ # Create private and public key
+ key = OpenSSL.crypto.PKey()
+ key.generate_key(OpenSSL.crypto.TYPE_RSA, constants.RSA_KEY_BITS)
+
+ # Create self-signed certificate
+ cert = OpenSSL.crypto.X509()
+ if common_name:
+ cert.get_subject().CN = common_name
+ cert.set_serial_number(1)
+ cert.gmtime_adj_notBefore(0)
+ cert.gmtime_adj_notAfter(validity)
+ cert.set_issuer(cert.get_subject())
+ cert.set_pubkey(key)
+ cert.sign(key, constants.X509_CERT_SIGN_DIGEST)
+
+ key_pem = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
+ cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
+
+ return (key_pem, cert_pem)
+
+
+def GenerateSelfSignedSslCert(filename, validity=(5 * 365)):
+ """Legacy function to generate self-signed X509 certificate.
+
+ """
+ (key_pem, cert_pem) = GenerateSelfSignedX509Cert(None,
+ validity * 24 * 60 * 60)
+
+ WriteFile(filename, mode=0400, data=key_pem + cert_pem)
+
+
class FileLock(object):
"""Utility class for file locks.
"""
- def __init__(self, filename):
+ def __init__(self, fd, filename):
"""Constructor for FileLock.
- This will open the file denoted by the I{filename} argument.
-
+ @type fd: file
+ @param fd: File object
@type filename: str
- @param filename: path to the file to be locked
+ @param filename: Path of the file opened at I{fd}
"""
+ self.fd = fd
self.filename = filename
- self.fd = open(self.filename, "w")
+
+ @classmethod
+ def Open(cls, filename):
+ """Creates and opens a file to be used as a file-based lock.
+
+ @type filename: string
+ @param filename: path to the file to be locked
+
+ """
+ # Using "os.open" is necessary to allow both opening existing file
+ # read/write and creating if not existing. Vanilla "open" will truncate an
+ # existing file -or- allow creating if not existing.
+ return cls(os.fdopen(os.open(filename, os.O_RDWR | os.O_CREAT), "w+"),
+ filename)
def __del__(self):
self.Close()
assert self.fd, "Lock was closed"
assert timeout is None or timeout >= 0, \
"If specified, timeout must be positive"
+ assert not (flag & fcntl.LOCK_NB), "LOCK_NB must not be set"
- if timeout is not None:
+ # When a timeout is used, LOCK_NB must always be set
+ if not (timeout is None and blocking):
flag |= fcntl.LOCK_NB
- timeout_end = time.time() + timeout
- # Blocking doesn't have effect with timeout
- elif not blocking:
- flag |= fcntl.LOCK_NB
- timeout_end = None
+ if timeout is None:
+ self._Lock(self.fd, flag, timeout)
+ else:
+ try:
+ Retry(self._Lock, (0.1, 1.2, 1.0), timeout,
+ args=(self.fd, flag, timeout))
+ except RetryTimeout:
+ raise errors.LockError(errmsg)
- # TODO: Convert to utils.Retry
+ @staticmethod
+ def _Lock(fd, flag, timeout):
+ try:
+ fcntl.flock(fd, flag)
+ except IOError, err:
+ if timeout is not None and err.errno == errno.EAGAIN:
+ raise RetryAgain()
- retry = True
- while retry:
- try:
- fcntl.flock(self.fd, flag)
- retry = False
- except IOError, err:
- if err.errno in (errno.EAGAIN, ):
- if timeout_end is not None and time.time() < timeout_end:
- # Wait before trying again
- time.sleep(max(0.1, min(1.0, timeout)))
- else:
- raise errors.LockError(errmsg)
- else:
- logging.exception("fcntl.flock failed")
- raise
+ logging.exception("fcntl.flock failed")
+ raise
def Exclusive(self, blocking=False, timeout=None):
"""Locks the file in exclusive mode.
@ivar called: tracks whether any of the signals have been raised
"""
- def __init__(self, signum):
+ def __init__(self, signum, handler_fn=None):
"""Constructs a new SignalHandler instance.
@type signum: int or list of ints
@param signum: Single signal number or set of signal numbers
+ @type handler_fn: callable
+ @param handler_fn: Signal handling function
"""
+ assert handler_fn is None or callable(handler_fn)
+
self.signum = set(signum)
self.called = False
+ self._handler_fn = handler_fn
+
self._previous = {}
try:
for signum in self.signum:
"""
self.called = False
- # we don't care about arguments, but we leave them named for the future
- def _HandleSignal(self, signum, frame): # pylint: disable-msg=W0613
+ def _HandleSignal(self, signum, frame):
"""Actual signal handling function.
"""
# solution in Python -- there are no atomic types.
self.called = True
+ if self._handler_fn:
+ self._handler_fn(signum, frame)
+
class FieldSet(object):
"""A simple field set.
"""Runs tests related to gnt-cluster.
"""
+ if qa_config.TestEnabled("cluster-renew-crypto"):
+ RunTest(qa_cluster.TestClusterRenewCrypto)
+
if qa_config.TestEnabled('cluster-verify'):
RunTest(qa_cluster.TestClusterVerify)
RunTest(qa_rapi.TestVersion)
RunTest(qa_rapi.TestEmptyCluster)
+
def RunOsTests():
"""Runs all tests related to gnt-os.
RunTest(qa_os.TestOsValid)
RunTest(qa_os.TestOsInvalid)
RunTest(qa_os.TestOsPartiallyValid)
+ RunTest(qa_os.TestOsModifyValid)
+ RunTest(qa_os.TestOsModifyInvalid)
def RunCommonInstanceTests(instance):
finally:
qa_config.ReleaseNode(snode)
+ if (qa_config.TestEnabled('instance-add-plain-disk') and
+ qa_config.TestEnabled("instance-export")):
+ instance = RunTest(qa_instance.TestInstanceAddWithPlainDisk, pnode)
+ expnode = qa_config.AcquireNode(exclude=pnode)
+ try:
+ RunTest(qa_instance.TestInstanceExportWithRemove, instance, expnode)
+ RunTest(qa_instance.TestBackupList, expnode)
+ finally:
+ qa_config.ReleaseNode(expnode)
+ del expnode
+ del instance
+
finally:
qa_config.ReleaseNode(pnode)
import tempfile
+ from ganeti import constants
-from ganeti import bootstrap
from ganeti import utils
import qa_config
import qa_utils
import qa_error
- from qa_utils import AssertEqual, StartSSH
+ from qa_utils import AssertEqual, AssertNotEqual, StartSSH
def _RemoveFileFromAllNodes(filename):
utils.ShellQuoteArgs(cmd)).wait(), 0)
+ def TestClusterRenewCrypto():
+ """gnt-cluster renew-crypto"""
+ master = qa_config.GetMasterNode()
+
+ # Conflicting options
+ cmd = ["gnt-cluster", "renew-crypto", "--force",
+ "--new-cluster-certificate", "--new-hmac-key",
+ "--new-rapi-certificate", "--rapi-certificate=/dev/null"]
+ AssertNotEqual(StartSSH(master["primary"],
+ utils.ShellQuoteArgs(cmd)).wait(), 0)
+
+ # Invalid RAPI certificate
+ cmd = ["gnt-cluster", "renew-crypto", "--force",
+ "--rapi-certificate=/dev/null"]
+ AssertNotEqual(StartSSH(master["primary"],
+ utils.ShellQuoteArgs(cmd)).wait(), 0)
+
+ # Custom RAPI certificate
+ fh = tempfile.NamedTemporaryFile()
+
+ # Ensure certificate doesn't cause "gnt-cluster verify" to complain
+ validity = constants.SSL_CERT_EXPIRATION_WARN * 3
+
- bootstrap.GenerateSelfSignedSslCert(fh.name, validity=validity)
++ utils.GenerateSelfSignedSslCert(fh.name, validity=validity)
+
+ tmpcert = qa_utils.UploadFile(master["primary"], fh.name)
+ try:
+ cmd = ["gnt-cluster", "renew-crypto", "--force",
+ "--rapi-certificate=%s" % tmpcert]
+ AssertEqual(StartSSH(master["primary"],
+ utils.ShellQuoteArgs(cmd)).wait(), 0)
+ finally:
+ cmd = ["rm", "-f", tmpcert]
+ AssertEqual(StartSSH(master["primary"],
+ utils.ShellQuoteArgs(cmd)).wait(), 0)
+
+ # Normal case
+ cmd = ["gnt-cluster", "renew-crypto", "--force",
+ "--new-cluster-certificate", "--new-hmac-key",
+ "--new-rapi-certificate"]
+ AssertEqual(StartSSH(master["primary"],
+ utils.ShellQuoteArgs(cmd)).wait(), 0)
+
+
def TestClusterBurnin():
"""Burnin"""
master = qa_config.GetMasterNode()
import tempfile
import os.path
import os
+ import stat
import md5
import signal
import socket
import re
import select
import string
+import fcntl
import OpenSSL
+ import warnings
+ import distutils.version
+ import glob
import ganeti
import testutils
ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
TailFile, ForceDictType, SafeEncode, IsNormAbsPath, FormatTime, \
- UnescapeAndSplit
+ UnescapeAndSplit, RunParts, PathJoin, HostInfo
from ganeti.errors import LockError, UnitParseError, GenericError, \
- ProgrammerError
+ ProgrammerError, OpPrereqError
class TestIsProcessAlive(unittest.TestCase):
cwd = os.getcwd()
self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
+ def testResetEnv(self):
+ """Test environment reset functionality"""
+ self.failUnlessEqual(RunCmd(["env"], reset_env=True).stdout.strip(), "")
+
+
+ class TestRunParts(unittest.TestCase):
+ """Testing case for the RunParts function"""
+
+ def setUp(self):
+ self.rundir = tempfile.mkdtemp(prefix="ganeti-test", suffix=".tmp")
+
+ def tearDown(self):
+ shutil.rmtree(self.rundir)
+
+ def testEmpty(self):
+ """Test on an empty dir"""
+ self.failUnlessEqual(RunParts(self.rundir, reset_env=True), [])
+
+ def testSkipWrongName(self):
+ """Test that wrong files are skipped"""
+ fname = os.path.join(self.rundir, "00test.dot")
+ utils.WriteFile(fname, data="")
+ os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
+ relname = os.path.basename(fname)
+ self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
+ [(relname, constants.RUNPARTS_SKIP, None)])
+
+ def testSkipNonExec(self):
+ """Test that non executable files are skipped"""
+ fname = os.path.join(self.rundir, "00test")
+ utils.WriteFile(fname, data="")
+ relname = os.path.basename(fname)
+ self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
+ [(relname, constants.RUNPARTS_SKIP, None)])
+
+ def testError(self):
+ """Test error on a broken executable"""
+ fname = os.path.join(self.rundir, "00test")
+ utils.WriteFile(fname, data="")
+ os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
+ (relname, status, error) = RunParts(self.rundir, reset_env=True)[0]
+ self.failUnlessEqual(relname, os.path.basename(fname))
+ self.failUnlessEqual(status, constants.RUNPARTS_ERR)
+ self.failUnless(error)
+
+ def testSorted(self):
+ """Test executions are sorted"""
+ files = []
+ files.append(os.path.join(self.rundir, "64test"))
+ files.append(os.path.join(self.rundir, "00test"))
+ files.append(os.path.join(self.rundir, "42test"))
+
+ for fname in files:
+ utils.WriteFile(fname, data="")
+
+ results = RunParts(self.rundir, reset_env=True)
+
+ for fname in sorted(files):
+ self.failUnlessEqual(os.path.basename(fname), results.pop(0)[0])
+
+ def testOk(self):
+ """Test correct execution"""
+ fname = os.path.join(self.rundir, "00test")
+ utils.WriteFile(fname, data="#!/bin/sh\n\necho -n ciao")
+ os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
+ (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
+ self.failUnlessEqual(relname, os.path.basename(fname))
+ self.failUnlessEqual(status, constants.RUNPARTS_RUN)
+ self.failUnlessEqual(runresult.stdout, "ciao")
+
+ def testRunFail(self):
+ """Test correct execution, with run failure"""
+ fname = os.path.join(self.rundir, "00test")
+ utils.WriteFile(fname, data="#!/bin/sh\n\nexit 1")
+ os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
+ (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
+ self.failUnlessEqual(relname, os.path.basename(fname))
+ self.failUnlessEqual(status, constants.RUNPARTS_RUN)
+ self.failUnlessEqual(runresult.exit_code, 1)
+ self.failUnless(runresult.failed)
+
+ def testRunMix(self):
+ files = []
+ files.append(os.path.join(self.rundir, "00test"))
+ files.append(os.path.join(self.rundir, "42test"))
+ files.append(os.path.join(self.rundir, "64test"))
+ files.append(os.path.join(self.rundir, "99test"))
+
+ files.sort()
+
+ # 1st has errors in execution
+ utils.WriteFile(files[0], data="#!/bin/sh\n\nexit 1")
+ os.chmod(files[0], stat.S_IREAD | stat.S_IEXEC)
+
+ # 2nd is skipped
+ utils.WriteFile(files[1], data="")
+
+ # 3rd cannot execute properly
+ utils.WriteFile(files[2], data="")
+ os.chmod(files[2], stat.S_IREAD | stat.S_IEXEC)
+
+ # 4th execs
+ utils.WriteFile(files[3], data="#!/bin/sh\n\necho -n ciao")
+ os.chmod(files[3], stat.S_IREAD | stat.S_IEXEC)
+
+ results = RunParts(self.rundir, reset_env=True)
+
+ (relname, status, runresult) = results[0]
+ self.failUnlessEqual(relname, os.path.basename(files[0]))
+ self.failUnlessEqual(status, constants.RUNPARTS_RUN)
+ self.failUnlessEqual(runresult.exit_code, 1)
+ self.failUnless(runresult.failed)
+
+ (relname, status, runresult) = results[1]
+ self.failUnlessEqual(relname, os.path.basename(files[1]))
+ self.failUnlessEqual(status, constants.RUNPARTS_SKIP)
+ self.failUnlessEqual(runresult, None)
+
+ (relname, status, runresult) = results[2]
+ self.failUnlessEqual(relname, os.path.basename(files[2]))
+ self.failUnlessEqual(status, constants.RUNPARTS_ERR)
+ self.failUnless(runresult)
+
+ (relname, status, runresult) = results[3]
+ self.failUnlessEqual(relname, os.path.basename(files[3]))
+ self.failUnlessEqual(status, constants.RUNPARTS_RUN)
+ self.failUnlessEqual(runresult.output, "ciao")
+ self.failUnlessEqual(runresult.exit_code, 0)
+ self.failUnless(not runresult.failed)
+
+class TestStartDaemon(testutils.GanetiTestCase):
+ def setUp(self):
+ self.tmpdir = tempfile.mkdtemp(prefix="ganeti-test")
+ self.tmpfile = os.path.join(self.tmpdir, "test")
+
+ def tearDown(self):
+ shutil.rmtree(self.tmpdir)
+
+ def testShell(self):
+ utils.StartDaemon("echo Hello World > %s" % self.tmpfile)
+ self._wait(self.tmpfile, 60.0, "Hello World")
+
+ def testShellOutput(self):
+ utils.StartDaemon("echo Hello World", output=self.tmpfile)
+ self._wait(self.tmpfile, 60.0, "Hello World")
+
+ def testNoShellNoOutput(self):
+ utils.StartDaemon(["pwd"])
+
+ def testNoShellNoOutputTouch(self):
+ testfile = os.path.join(self.tmpdir, "check")
+ self.failIf(os.path.exists(testfile))
+ utils.StartDaemon(["touch", testfile])
+ self._wait(testfile, 60.0, "")
+
+ def testNoShellOutput(self):
+ utils.StartDaemon(["pwd"], output=self.tmpfile)
+ self._wait(self.tmpfile, 60.0, "/")
+
+ def testNoShellOutputCwd(self):
+ utils.StartDaemon(["pwd"], output=self.tmpfile, cwd=os.getcwd())
+ self._wait(self.tmpfile, 60.0, os.getcwd())
+
+ def testShellEnv(self):
+ utils.StartDaemon("echo \"$GNT_TEST_VAR\"", output=self.tmpfile,
+ env={ "GNT_TEST_VAR": "Hello World", })
+ self._wait(self.tmpfile, 60.0, "Hello World")
+
+ def testNoShellEnv(self):
+ utils.StartDaemon(["printenv", "GNT_TEST_VAR"], output=self.tmpfile,
+ env={ "GNT_TEST_VAR": "Hello World", })
+ self._wait(self.tmpfile, 60.0, "Hello World")
+
+ def testOutputFd(self):
+ fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
+ try:
+ utils.StartDaemon(["pwd"], output_fd=fd, cwd=os.getcwd())
+ finally:
+ os.close(fd)
+ self._wait(self.tmpfile, 60.0, os.getcwd())
+
+ def testPid(self):
+ pid = utils.StartDaemon("echo $$ > %s" % self.tmpfile)
+ self._wait(self.tmpfile, 60.0, str(pid))
+
+ def testPidFile(self):
+ pidfile = os.path.join(self.tmpdir, "pid")
+ checkfile = os.path.join(self.tmpdir, "abort")
+
+ pid = utils.StartDaemon("while sleep 5; do :; done", pidfile=pidfile,
+ output=self.tmpfile)
+ try:
+ fd = os.open(pidfile, os.O_RDONLY)
+ try:
+ # Check file is locked
+ self.assertRaises(errors.LockError, utils.LockFile, fd)
+
+ pidtext = os.read(fd, 100)
+ finally:
+ os.close(fd)
+
+ self.assertEqual(int(pidtext.strip()), pid)
+
+ self.assert_(utils.IsProcessAlive(pid))
+ finally:
+ # No matter what happens, kill daemon
+ utils.KillProcess(pid, timeout=5.0, waitpid=False)
+ self.failIf(utils.IsProcessAlive(pid))
+
+ self.assertEqual(utils.ReadFile(self.tmpfile), "")
+
+ def _wait(self, path, timeout, expected):
+ # Due to the asynchronous nature of daemon processes, polling is necessary.
+ # A timeout makes sure the test doesn't hang forever.
+ def _CheckFile():
+ if not (os.path.isfile(path) and
+ utils.ReadFile(path).strip() == expected):
+ raise utils.RetryAgain()
+
+ try:
+ utils.Retry(_CheckFile, (0.01, 1.5, 1.0), timeout)
+ except utils.RetryTimeout:
+ self.fail("Apparently the daemon didn't run in %s seconds and/or"
+ " didn't write the correct output" % timeout)
+
+ def testError(self):
+ self.assertRaises(errors.OpExecError, utils.StartDaemon,
+ ["./does-NOT-EXIST/here/0123456789"])
+ self.assertRaises(errors.OpExecError, utils.StartDaemon,
+ ["./does-NOT-EXIST/here/0123456789"],
+ output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
+ self.assertRaises(errors.OpExecError, utils.StartDaemon,
+ ["./does-NOT-EXIST/here/0123456789"],
+ cwd=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
+ self.assertRaises(errors.OpExecError, utils.StartDaemon,
+ ["./does-NOT-EXIST/here/0123456789"],
+ output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
+
+ fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
+ try:
+ self.assertRaises(errors.ProgrammerError, utils.StartDaemon,
+ ["./does-NOT-EXIST/here/0123456789"],
+ output=self.tmpfile, output_fd=fd)
+ finally:
+ os.close(fd)
+
+
+class TestSetCloseOnExecFlag(unittest.TestCase):
+ """Tests for SetCloseOnExecFlag"""
+
+ def setUp(self):
+ self.tmpfile = tempfile.TemporaryFile()
+
+ def testEnable(self):
+ utils.SetCloseOnExecFlag(self.tmpfile.fileno(), True)
+ self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
+ fcntl.FD_CLOEXEC)
+
+ def testDisable(self):
+ utils.SetCloseOnExecFlag(self.tmpfile.fileno(), False)
+ self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
+ fcntl.FD_CLOEXEC)
+
+
+class TestSetNonblockFlag(unittest.TestCase):
+ def setUp(self):
+ self.tmpfile = tempfile.TemporaryFile()
+
+ def testEnable(self):
+ utils.SetNonblockFlag(self.tmpfile.fileno(), True)
+ self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
+ os.O_NONBLOCK)
+
+ def testDisable(self):
+ utils.SetNonblockFlag(self.tmpfile.fileno(), False)
+ self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
+ os.O_NONBLOCK)
+
+
class TestRemoveFile(unittest.TestCase):
"""Test case for the RemoveFile function"""
os.unlink(self.tmpfile)
os.rmdir(self.tmpdir)
-
def testIgnoreDirs(self):
"""Test that RemoveFile() ignores directories"""
self.assertEqual(None, RemoveFile(self.tmpdir))
-
def testIgnoreNotExisting(self):
"""Test that RemoveFile() ignores non-existing files"""
RemoveFile(self.tmpfile)
RemoveFile(self.tmpfile)
-
def testRemoveFile(self):
"""Test that RemoveFile does remove a file"""
RemoveFile(self.tmpfile)
if os.path.exists(self.tmpfile):
self.fail("File '%s' not removed" % self.tmpfile)
-
def testRemoveSymlink(self):
"""Test that RemoveFile does remove symlinks"""
symlink = self.tmpdir + "/symlink"
None)
+ class TestTimestampForFilename(unittest.TestCase):
+ def test(self):
+ self.assert_("." not in utils.TimestampForFilename())
+ self.assert_(":" not in utils.TimestampForFilename())
+
+
+ class TestCreateBackup(testutils.GanetiTestCase):
+ def setUp(self):
+ testutils.GanetiTestCase.setUp(self)
+
+ self.tmpdir = tempfile.mkdtemp()
+
+ def tearDown(self):
+ testutils.GanetiTestCase.tearDown(self)
+
+ shutil.rmtree(self.tmpdir)
+
+ def testEmpty(self):
+ filename = utils.PathJoin(self.tmpdir, "config.data")
+ utils.WriteFile(filename, data="")
+ bname = utils.CreateBackup(filename)
+ self.assertFileContent(bname, "")
+ self.assertEqual(len(glob.glob("%s*" % filename)), 2)
+ utils.CreateBackup(filename)
+ self.assertEqual(len(glob.glob("%s*" % filename)), 3)
+ utils.CreateBackup(filename)
+ self.assertEqual(len(glob.glob("%s*" % filename)), 4)
+
+ fifoname = utils.PathJoin(self.tmpdir, "fifo")
+ os.mkfifo(fifoname)
+ self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
+
+ def testContent(self):
+ bkpcount = 0
+ for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
+ for rep in [1, 2, 10, 127]:
+ testdata = data * rep
+
+ filename = utils.PathJoin(self.tmpdir, "test.data_")
+ utils.WriteFile(filename, data=testdata)
+ self.assertFileContent(filename, testdata)
+
+ for _ in range(3):
+ bname = utils.CreateBackup(filename)
+ bkpcount += 1
+ self.assertFileContent(bname, testdata)
+ self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
+
+
class TestFormatUnit(unittest.TestCase):
"""Test case for the FormatUnit function"""
expected = ["a", "b"]
self._test(files, expected)
+ def testNonAbsolutePath(self):
+ self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles, "abc")
+
+ def testNonNormalizedPath(self):
+ self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles,
+ "/bin/../tmp")
+
class TestNewUUID(unittest.TestCase):
"""Test case for NewUUID"""
self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
- class TestFileLock(unittest.TestCase):
+ class _BaseFileLockTest:
"""Test case for the FileLock class"""
- def setUp(self):
- self.tmpfile = tempfile.NamedTemporaryFile()
- self.lock = utils.FileLock(self.tmpfile.name)
-
def testSharedNonblocking(self):
self.lock.Shared(blocking=False)
self.lock.Close()
self.lock.Unlock(blocking=False)
self.lock.Close()
+ def testSimpleTimeout(self):
+ # These will succeed on the first attempt, hence a short timeout
+ self.lock.Shared(blocking=True, timeout=10.0)
+ self.lock.Exclusive(blocking=False, timeout=10.0)
+ self.lock.Unlock(blocking=True, timeout=10.0)
+ self.lock.Close()
+
+ @staticmethod
+ def _TryLockInner(filename, shared, blocking):
+ lock = utils.FileLock.Open(filename)
+
+ if shared:
+ fn = lock.Shared
+ else:
+ fn = lock.Exclusive
+
+ try:
+ # The timeout doesn't really matter as the parent process waits for us to
+ # finish anyway.
+ fn(blocking=blocking, timeout=0.01)
+ except errors.LockError, err:
+ return False
+
+ return True
+
+ def _TryLock(self, *args):
+ return utils.RunInSeparateProcess(self._TryLockInner, self.tmpfile.name,
+ *args)
+
+ def testTimeout(self):
+ for blocking in [True, False]:
+ self.lock.Exclusive(blocking=True)
+ self.failIf(self._TryLock(False, blocking))
+ self.failIf(self._TryLock(True, blocking))
+
+ self.lock.Shared(blocking=True)
+ self.assert_(self._TryLock(True, blocking))
+ self.failIf(self._TryLock(False, blocking))
+
def testCloseShared(self):
self.lock.Close()
self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
+ class TestFileLockWithFilename(testutils.GanetiTestCase, _BaseFileLockTest):
+ TESTDATA = "Hello World\n" * 10
+
+ def setUp(self):
+ testutils.GanetiTestCase.setUp(self)
+
+ self.tmpfile = tempfile.NamedTemporaryFile()
+ utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
+ self.lock = utils.FileLock.Open(self.tmpfile.name)
+
+ # Ensure "Open" didn't truncate file
+ self.assertFileContent(self.tmpfile.name, self.TESTDATA)
+
+ def tearDown(self):
+ self.assertFileContent(self.tmpfile.name, self.TESTDATA)
+
+ testutils.GanetiTestCase.tearDown(self)
+
+
+ class TestFileLockWithFileObject(unittest.TestCase, _BaseFileLockTest):
+ def setUp(self):
+ self.tmpfile = tempfile.NamedTemporaryFile()
+ self.lock = utils.FileLock(open(self.tmpfile.name, "w"), self.tmpfile.name)
+
+
class TestTimeFunctions(unittest.TestCase):
"""Test case for time functions"""
class TestIsAbsNormPath(unittest.TestCase):
- """Testing case for IsProcessAlive"""
+ """Testing case for IsNormAbsPath"""
def _pathTestHelper(self, path, result):
if result:
self.assertEqual(exp, utils.RunInSeparateProcess(_child))
+ def testArgs(self):
+ for arg in [0, 1, 999, "Hello World", (1, 2, 3)]:
+ def _child(carg1, carg2):
+ return carg1 == "Foo" and carg2 == arg
+
+ self.assert_(utils.RunInSeparateProcess(_child, "Foo", arg))
+
def testPid(self):
parent_pid = os.getpid()
self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
+class TestGenerateSelfSignedX509Cert(unittest.TestCase):
+ def setUp(self):
+ self.tmpdir = tempfile.mkdtemp()
+
+ def tearDown(self):
+ shutil.rmtree(self.tmpdir)
+
+ def _checkRsaPrivateKey(self, key):
+ lines = key.splitlines()
+ return ("-----BEGIN RSA PRIVATE KEY-----" in lines and
+ "-----END RSA PRIVATE KEY-----" in lines)
+
+ def _checkCertificate(self, cert):
+ lines = cert.splitlines()
+ return ("-----BEGIN CERTIFICATE-----" in lines and
+ "-----END CERTIFICATE-----" in lines)
+
+ def test(self):
+ for common_name in [None, ".", "Ganeti", "node1.example.com"]:
+ (key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300)
+ self._checkRsaPrivateKey(key_pem)
+ self._checkCertificate(cert_pem)
+
+ key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
+ key_pem)
+ self.assert_(key.bits() >= 1024)
+ self.assertEqual(key.bits(), constants.RSA_KEY_BITS)
+ self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA)
+
+ x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
+ cert_pem)
+ self.failIf(x509.has_expired())
+ self.assertEqual(x509.get_issuer().CN, common_name)
+ self.assertEqual(x509.get_subject().CN, common_name)
+ self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS)
+
+ def testLegacy(self):
+ cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
+
+ utils.GenerateSelfSignedSslCert(cert1_filename, validity=1)
+
+ cert1 = utils.ReadFile(cert1_filename)
+
+ self.assert_(self._checkRsaPrivateKey(cert1))
+ self.assert_(self._checkCertificate(cert1))
+
+
+ class TestPathJoin(unittest.TestCase):
+ """Testing case for PathJoin"""
+
+ def testBasicItems(self):
+ mlist = ["/a", "b", "c"]
+ self.failUnlessEqual(PathJoin(*mlist), "/".join(mlist))
+
+ def testNonAbsPrefix(self):
+ self.failUnlessRaises(ValueError, PathJoin, "a", "b")
+
+ def testBackTrack(self):
+ self.failUnlessRaises(ValueError, PathJoin, "/a", "b/../c")
+
+ def testMultiAbs(self):
+ self.failUnlessRaises(ValueError, PathJoin, "/a", "/b")
+
+
+ class TestHostInfo(unittest.TestCase):
+ """Testing case for HostInfo"""
+
+ def testUppercase(self):
+ data = "AbC.example.com"
+ self.failUnlessEqual(HostInfo.NormalizeName(data), data.lower())
+
+ def testTooLongName(self):
+ data = "a.b." + "c" * 255
+ self.failUnlessRaises(OpPrereqError, HostInfo.NormalizeName, data)
+
+ def testTrailingDot(self):
+ data = "a.b.c"
+ self.failUnlessEqual(HostInfo.NormalizeName(data + "."), data)
+
+ def testInvalidName(self):
+ data = [
+ "a b",
+ "a/b",
+ ".a.b",
+ "a..b",
+ ]
+ for value in data:
+ self.failUnlessRaises(OpPrereqError, HostInfo.NormalizeName, value)
+
+ def testValidName(self):
+ data = [
+ "a.b",
+ "a-b",
+ "a_b",
+ "a.b.c",
+ ]
+ for value in data:
+ HostInfo.NormalizeName(value)
+
+
+ class TestParseAsn1Generalizedtime(unittest.TestCase):
+ def test(self):
+ # UTC
+ self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000Z"), 0)
+ self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152Z"),
+ 1266860512)
+ self.assertEqual(utils._ParseAsn1Generalizedtime("20380119031407Z"),
+ (2**31) - 1)
+
+ # With offset
+ self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152+0000"),
+ 1266860512)
+ self.assertEqual(utils._ParseAsn1Generalizedtime("20100223131652+0000"),
+ 1266931012)
+ self.assertEqual(utils._ParseAsn1Generalizedtime("20100223051808-0800"),
+ 1266931088)
+ self.assertEqual(utils._ParseAsn1Generalizedtime("20100224002135+1100"),
+ 1266931295)
+ self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000-0100"),
+ 3600)
+
+ # Leap seconds are not supported by datetime.datetime
+ self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
+ "19841231235960+0000")
+ self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
+ "19920630235960+0000")
+
+ # Errors
+ self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "")
+ self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "invalid")
+ self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
+ "20100222174152")
+ self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
+ "Mon Feb 22 17:47:02 UTC 2010")
+ self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
+ "2010-02-22 17:42:02")
+
+
+ class TestGetX509CertValidity(testutils.GanetiTestCase):
+ def setUp(self):
+ testutils.GanetiTestCase.setUp(self)
+
+ pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
+
+ # Test whether we have pyOpenSSL 0.7 or above
+ self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
+
+ if not self.pyopenssl0_7:
+ warnings.warn("This test requires pyOpenSSL 0.7 or above to"
+ " function correctly")
+
+ def _LoadCert(self, name):
+ return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
+ self._ReadTestData(name))
+
+ def test(self):
+ validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
+ if self.pyopenssl0_7:
+ self.assertEqual(validity, (1266919967, 1267524767))
+ else:
+ self.assertEqual(validity, (None, None))
+
+
if __name__ == '__main__':
testutils.GanetiTestProgram()