#
#
-# Copyright (C) 2006, 2007 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
from ganeti import luxi
from ganeti import ssconf
from ganeti import rpc
+from ganeti import ssh
+from ganeti import compat
+from ganeti import netutils
from optparse import (OptionParser, TitledHelpFormatter,
Option, OptionValueError)
__all__ = [
# Command line options
+ "ADD_UIDS_OPT",
"ALLOCATABLE_OPT",
"ALL_OPT",
"AUTO_PROMOTE_OPT",
"AUTO_REPLACE_OPT",
"BACKEND_OPT",
"CLEANUP_OPT",
+ "CLUSTER_DOMAIN_SECRET_OPT",
"CONFIRM_OPT",
"CP_SIZE_OPT",
"DEBUG_OPT",
"DISK_OPT",
"DISK_TEMPLATE_OPT",
"DRAINED_OPT",
+ "DRBD_HELPER_OPT",
"EARLY_RELEASE_OPT",
"ENABLED_HV_OPT",
"ERROR_CODES_OPT",
"HVOPTS_OPT",
"HYPERVISOR_OPT",
"IALLOCATOR_OPT",
+ "DEFAULT_IALLOCATOR_OPT",
+ "IDENTIFY_DEFAULTS_OPT",
"IGNORE_CONSIST_OPT",
"IGNORE_FAILURES_OPT",
+ "IGNORE_REMOVE_FAILURES_OPT",
"IGNORE_SECONDARIES_OPT",
"IGNORE_SIZE_OPT",
"MAC_PREFIX_OPT",
+ "MAINTAIN_NODE_HEALTH_OPT",
"MASTER_NETDEV_OPT",
"MC_OPT",
+ "MIGRATION_MODE_OPT",
"NET_OPT",
+ "NEW_CLUSTER_CERT_OPT",
+ "NEW_CLUSTER_DOMAIN_SECRET_OPT",
+ "NEW_CONFD_HMAC_KEY_OPT",
+ "NEW_RAPI_CERT_OPT",
"NEW_SECONDARY_OPT",
"NIC_PARAMS_OPT",
"NODE_LIST_OPT",
"NODE_PLACEMENT_OPT",
+ "NODRBD_STORAGE_OPT",
"NOHDR_OPT",
"NOIPCHECK_OPT",
+ "NO_INSTALL_OPT",
"NONAMECHECK_OPT",
"NOLVM_STORAGE_OPT",
"NOMODIFY_ETCHOSTS_OPT",
"ON_PRIMARY_OPT",
"ON_SECONDARY_OPT",
"OFFLINE_OPT",
+ "OSPARAMS_OPT",
"OS_OPT",
"OS_SIZE_OPT",
+ "PRIMARY_IP_VERSION_OPT",
+ "RAPI_CERT_OPT",
"READD_OPT",
"REBOOT_TYPE_OPT",
+ "REMOVE_INSTANCE_OPT",
+ "REMOVE_UIDS_OPT",
+ "RESERVED_LVS_OPT",
+ "ROMAN_OPT",
"SECONDARY_IP_OPT",
"SELECT_OS_OPT",
"SEP_OPT",
"SYNC_OPT",
"TAG_SRC_OPT",
"TIMEOUT_OPT",
+ "UIDPOOL_OPT",
"USEUNITS_OPT",
+ "USE_REPL_NET_OPT",
"VERBOSE_OPT",
"VG_NAME_OPT",
"YES_DOIT_OPT",
"JobExecutor",
"JobSubmittedException",
"ParseTimespec",
+ "RunWhileClusterStopped",
"SubmitOpCode",
"SubmitOrSend",
"UsesRPC",
"GenerateTable",
"AskUser",
"FormatTimestamp",
+ "FormatLogMessage",
# Tags functions
"ListTags",
"AddTags",
return _SplitKeyVal(opt, value)
+def check_bool(option, opt, value): # pylint: disable-msg=W0613
+ """Custom parser for yes/no options.
+
+ This will store the parsed value as either True or False.
+
+ """
+ value = value.lower()
+ if value == constants.VALUE_FALSE or value == "no":
+ return False
+ elif value == constants.VALUE_TRUE or value == "yes":
+ return True
+ else:
+ raise errors.ParameterError("Invalid boolean value '%s'" % value)
+
+
# completion_suggestion is normally a list. Using numeric values not evaluating
# to False for dynamic completion.
(OPT_COMPL_MANY_NODES,
"identkeyval",
"keyval",
"unit",
+ "bool",
)
TYPE_CHECKER = Option.TYPE_CHECKER.copy()
TYPE_CHECKER["identkeyval"] = check_ident_key_val
TYPE_CHECKER["keyval"] = check_key_val
TYPE_CHECKER["unit"] = check_unit
+ TYPE_CHECKER["bool"] = check_bool
# optparse.py sets make_option, so we do it for our own option class, too
cli_option = CliOption
-_YESNO = ("yes", "no")
_YORNO = "yes|no"
DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
default=None, type="string",
completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
+DEFAULT_IALLOCATOR_OPT = cli_option("-I", "--default-iallocator",
+ metavar="<NAME>",
+ help="Set the default instance allocator plugin",
+ default=None, type="string",
+ completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
+
OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
metavar="<os>",
completion_suggest=OPT_COMPL_ONE_OS)
+OSPARAMS_OPT = cli_option("-O", "--os-parameters", dest="osparams",
+ type="keyval", default={},
+ help="OS parameters")
+
FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
action="store_true", default=False,
help="Force an unknown variant")
+NO_INSTALL_OPT = cli_option("--no-install", dest="no_install",
+ action="store_true", default=False,
+ help="Do not install the OS (will"
+ " enable no-start)")
+
BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
type="keyval", default={},
help="Backend parameters")
" freeze the instance, save the state, transfer and"
" only then resume running on the secondary node)")
+MIGRATION_MODE_OPT = cli_option("--migration-mode", dest="migration_mode",
+ default=None,
+ choices=list(constants.HT_MIGRATION_MODES),
+ help="Override default migration mode (choose"
+ " either live or non-live")
+
NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
help="Target node and optional secondary node",
metavar="<pnode>[:<snode>]",
" configuration even if there are failures"
" during the removal process")
+IGNORE_REMOVE_FAILURES_OPT = cli_option("--ignore-remove-failures",
+ dest="ignore_remove_failures",
+ action="store_true", default=False,
+ help="Remove the instance from the"
+ " cluster configuration even if there"
+ " are failures during the removal"
+ " process")
+
+REMOVE_INSTANCE_OPT = cli_option("--remove-instance", dest="remove_instance",
+ action="store_true", default=False,
+ help="Remove the instance from the cluster")
+
NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
help="Specifies the new secondary node",
metavar="NODE", default=None,
MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
- choices=_YESNO, default=None, metavar=_YORNO,
+ type="bool", default=None, metavar=_YORNO,
help="Set the master_candidate flag on the node")
OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
- choices=_YESNO, default=None,
+ type="bool", default=None,
help="Set the offline flag on the node")
DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
- choices=_YESNO, default=None,
+ type="bool", default=None,
help="Set the drained flag on the node")
ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
- choices=_YESNO, default=None, metavar=_YORNO,
+ type="bool", default=None, metavar=_YORNO,
help="Set the allocatable flag on a volume")
NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
metavar="NETDEV",
default=constants.DEFAULT_BRIDGE)
-
GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
help="Specify the default directory (cluster-"
"wide) for storing the file-based disks [%s]" %
help="Release the locks on the secondary"
" node(s) early")
+NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
+ dest="new_cluster_cert",
+ default=False, action="store_true",
+ help="Generate a new cluster certificate")
+
+RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
+ default=None,
+ help="File containing new RAPI certificate")
+
+NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
+ default=None, action="store_true",
+ help=("Generate a new self-signed RAPI"
+ " certificate"))
+
+NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
+ dest="new_confd_hmac_key",
+ default=False, action="store_true",
+ help=("Create a new HMAC key for %s" %
+ constants.CONFD))
+
+CLUSTER_DOMAIN_SECRET_OPT = cli_option("--cluster-domain-secret",
+ dest="cluster_domain_secret",
+ default=None,
+ help=("Load new new cluster domain"
+ " secret from file"))
+
+NEW_CLUSTER_DOMAIN_SECRET_OPT = cli_option("--new-cluster-domain-secret",
+ dest="new_cluster_domain_secret",
+ default=False, action="store_true",
+ help=("Create a new cluster domain"
+ " secret"))
+
+USE_REPL_NET_OPT = cli_option("--use-replication-network",
+ dest="use_replication_network",
+ help="Whether to use the replication network"
+ " for talking to the nodes",
+ action="store_true", default=False)
+
+MAINTAIN_NODE_HEALTH_OPT = \
+ cli_option("--maintain-node-health", dest="maintain_node_health",
+ metavar=_YORNO, default=None, type="bool",
+ help="Configure the cluster to automatically maintain node"
+ " health, by shutting down unknown instances, shutting down"
+ " unknown DRBD devices, etc.")
+
+IDENTIFY_DEFAULTS_OPT = \
+ cli_option("--identify-defaults", dest="identify_defaults",
+ default=False, action="store_true",
+ help="Identify which saved instance parameters are equal to"
+ " the current cluster defaults and set them as such, instead"
+ " of marking them as overridden")
+
+UIDPOOL_OPT = cli_option("--uid-pool", default=None,
+ action="store", dest="uid_pool",
+ help=("A list of user-ids or user-id"
+ " ranges separated by commas"))
+
+ADD_UIDS_OPT = cli_option("--add-uids", default=None,
+ action="store", dest="add_uids",
+ help=("A list of user-ids or user-id"
+ " ranges separated by commas, to be"
+ " added to the user-id pool"))
+
+REMOVE_UIDS_OPT = cli_option("--remove-uids", default=None,
+ action="store", dest="remove_uids",
+ help=("A list of user-ids or user-id"
+ " ranges separated by commas, to be"
+ " removed from the user-id pool"))
+
+RESERVED_LVS_OPT = cli_option("--reserved-lvs", default=None,
+ action="store", dest="reserved_lvs",
+ help=("A comma-separated list of reserved"
+ " logical volumes names, that will be"
+ " ignored by cluster verify"))
+
+ROMAN_OPT = cli_option("--roman",
+ dest="roman_integers", default=False,
+ action="store_true",
+ help="Use roman numbers for positive integers")
+
+DRBD_HELPER_OPT = cli_option("--drbd-usermode-helper", dest="drbd_helper",
+ action="store", default=None,
+ help="Specifies usermode helper for DRBD")
+
+NODRBD_STORAGE_OPT = cli_option("--no-drbd-storage", dest="drbd_storage",
+ action="store_false", default=True,
+ help="Disable support for DRBD")
+
+PRIMARY_IP_VERSION_OPT = \
+ cli_option("--primary-ip-version", default=constants.IP4_VERSION,
+ action="store", dest="primary_ip_version",
+ metavar="%d|%d" % (constants.IP4_VERSION,
+ constants.IP6_VERSION),
+ help="Cluster-wide IP version for primary IP")
+
def _ParseArgs(argv, commands, aliases):
"""Parser for the command line arguments.
binary = argv[0].split("/")[-1]
if len(argv) > 1 and argv[1] == "--version":
- ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
+ ToStdout("%s (ganeti %s) %s", binary, constants.VCS_VERSION,
+ constants.RELEASE_VERSION)
# Quit right away. That way we don't have to care about this special
# argument. optparse.py does it the same.
sys.exit(0)
return [os_name]
-def UsesRPC(fn):
- def wrapper(*args, **kwargs):
- rpc.Init()
- try:
- return fn(*args, **kwargs)
- finally:
- rpc.Shutdown()
- return wrapper
+UsesRPC = rpc.RunWithRPC
def AskUser(text, choices=None):
return job_id
-def PollJob(job_id, cl=None, feedback_fn=None):
- """Function to poll for the result of a job.
+def GenericPollJob(job_id, cbs, report_cbs):
+ """Generic job-polling function.
- @type job_id: job identified
- @param job_id: the job to poll for results
- @type cl: luxi.Client
- @param cl: the luxi client to use for communicating with the master;
- if None, a new client will be created
+ @type job_id: number
+ @param job_id: Job ID
+ @type cbs: Instance of L{JobPollCbBase}
+ @param cbs: Data callbacks
+ @type report_cbs: Instance of L{JobPollReportCbBase}
+ @param report_cbs: Reporting callbacks
"""
- if cl is None:
- cl = GetClient()
-
prev_job_info = None
prev_logmsg_serial = None
status = None
- notified_queued = False
- notified_waitlock = False
-
while True:
- result = cl.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
- prev_logmsg_serial)
+ result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
+ prev_logmsg_serial)
if not result:
# job not found, go away!
raise errors.JobLost("Job with id %s lost" % job_id)
- elif result == constants.JOB_NOTCHANGED:
- if status is not None and not callable(feedback_fn):
- if status == constants.JOB_STATUS_QUEUED and not notified_queued:
- ToStderr("Job %s is waiting in queue", job_id)
- notified_queued = True
- elif status == constants.JOB_STATUS_WAITLOCK and not notified_waitlock:
- ToStderr("Job %s is trying to acquire all necessary locks", job_id)
- notified_waitlock = True
+
+ if result == constants.JOB_NOTCHANGED:
+ report_cbs.ReportNotChanged(job_id, status)
# Wait again
continue
if log_entries:
for log_entry in log_entries:
- (serial, timestamp, _, message) = log_entry
- if callable(feedback_fn):
- feedback_fn(log_entry[1:])
- else:
- encoded = utils.SafeEncode(message)
- ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded)
+ (serial, timestamp, log_type, message) = log_entry
+ report_cbs.ReportLogMessage(job_id, serial, timestamp,
+ log_type, message)
prev_logmsg_serial = max(prev_logmsg_serial, serial)
# TODO: Handle canceled and archived jobs
prev_job_info = job_info
- jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
+ jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
if not jobs:
raise errors.JobLost("Job with id %s lost" % job_id)
status, opstatus, result = jobs[0]
+
if status == constants.JOB_STATUS_SUCCESS:
return result
- elif status in (constants.JOB_STATUS_CANCELING,
- constants.JOB_STATUS_CANCELED):
+
+ if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
raise errors.OpExecError("Job was canceled")
- else:
- has_ok = False
- for idx, (status, msg) in enumerate(zip(opstatus, result)):
- if status == constants.OP_STATUS_SUCCESS:
- has_ok = True
- elif status == constants.OP_STATUS_ERROR:
- errors.MaybeRaise(msg)
- if has_ok:
- raise errors.OpExecError("partial failure (opcode %d): %s" %
- (idx, msg))
- else:
- raise errors.OpExecError(str(msg))
- # default failure mode
- raise errors.OpExecError(result)
+
+ has_ok = False
+ for idx, (status, msg) in enumerate(zip(opstatus, result)):
+ if status == constants.OP_STATUS_SUCCESS:
+ has_ok = True
+ elif status == constants.OP_STATUS_ERROR:
+ errors.MaybeRaise(msg)
+
+ if has_ok:
+ raise errors.OpExecError("partial failure (opcode %d): %s" %
+ (idx, msg))
+
+ raise errors.OpExecError(str(msg))
+
+ # default failure mode
+ raise errors.OpExecError(result)
+
+
+class JobPollCbBase:
+ """Base class for L{GenericPollJob} callbacks.
+
+ """
+ def __init__(self):
+ """Initializes this class.
+
+ """
+
+ def WaitForJobChangeOnce(self, job_id, fields,
+ prev_job_info, prev_log_serial):
+ """Waits for changes on a job.
+
+ """
+ raise NotImplementedError()
+
+ def QueryJobs(self, job_ids, fields):
+ """Returns the selected fields for the selected job IDs.
+
+ @type job_ids: list of numbers
+ @param job_ids: Job IDs
+ @type fields: list of strings
+ @param fields: Fields
+
+ """
+ raise NotImplementedError()
+
+
+class JobPollReportCbBase:
+ """Base class for L{GenericPollJob} reporting callbacks.
+
+ """
+ def __init__(self):
+ """Initializes this class.
+
+ """
+
+ def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
+ """Handles a log message.
+
+ """
+ raise NotImplementedError()
+
+ def ReportNotChanged(self, job_id, status):
+ """Called for if a job hasn't changed in a while.
+
+ @type job_id: number
+ @param job_id: Job ID
+ @type status: string or None
+ @param status: Job status if available
+
+ """
+ raise NotImplementedError()
+
+
+class _LuxiJobPollCb(JobPollCbBase):
+ def __init__(self, cl):
+ """Initializes this class.
+
+ """
+ JobPollCbBase.__init__(self)
+ self.cl = cl
+
+ def WaitForJobChangeOnce(self, job_id, fields,
+ prev_job_info, prev_log_serial):
+ """Waits for changes on a job.
+
+ """
+ return self.cl.WaitForJobChangeOnce(job_id, fields,
+ prev_job_info, prev_log_serial)
+
+ def QueryJobs(self, job_ids, fields):
+ """Returns the selected fields for the selected job IDs.
+
+ """
+ return self.cl.QueryJobs(job_ids, fields)
+
+
+class FeedbackFnJobPollReportCb(JobPollReportCbBase):
+ def __init__(self, feedback_fn):
+ """Initializes this class.
+
+ """
+ JobPollReportCbBase.__init__(self)
+
+ self.feedback_fn = feedback_fn
+
+ assert callable(feedback_fn)
+
+ def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
+ """Handles a log message.
+
+ """
+ self.feedback_fn((timestamp, log_type, log_msg))
+
+ def ReportNotChanged(self, job_id, status):
+ """Called if a job hasn't changed in a while.
+
+ """
+ # Ignore
-def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):
+class StdioJobPollReportCb(JobPollReportCbBase):
+ def __init__(self):
+ """Initializes this class.
+
+ """
+ JobPollReportCbBase.__init__(self)
+
+ self.notified_queued = False
+ self.notified_waitlock = False
+
+ def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
+ """Handles a log message.
+
+ """
+ ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
+ FormatLogMessage(log_type, log_msg))
+
+ def ReportNotChanged(self, job_id, status):
+ """Called if a job hasn't changed in a while.
+
+ """
+ if status is None:
+ return
+
+ if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
+ ToStderr("Job %s is waiting in queue", job_id)
+ self.notified_queued = True
+
+ elif status == constants.JOB_STATUS_WAITLOCK and not self.notified_waitlock:
+ ToStderr("Job %s is trying to acquire all necessary locks", job_id)
+ self.notified_waitlock = True
+
+
+def FormatLogMessage(log_type, log_msg):
+ """Formats a job message according to its type.
+
+ """
+ if log_type != constants.ELOG_MESSAGE:
+ log_msg = str(log_msg)
+
+ return utils.SafeEncode(log_msg)
+
+
+def PollJob(job_id, cl=None, feedback_fn=None, reporter=None):
+ """Function to poll for the result of a job.
+
+ @type job_id: job identified
+ @param job_id: the job to poll for results
+ @type cl: luxi.Client
+ @param cl: the luxi client to use for communicating with the master;
+ if None, a new client will be created
+
+ """
+ if cl is None:
+ cl = GetClient()
+
+ if reporter is None:
+ if feedback_fn:
+ reporter = FeedbackFnJobPollReportCb(feedback_fn)
+ else:
+ reporter = StdioJobPollReportCb()
+ elif feedback_fn:
+ raise errors.ProgrammerError("Can't specify reporter and feedback function")
+
+ return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
+
+
+def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None, reporter=None):
"""Legacy function to submit an opcode.
This is just a simple wrapper over the construction of the processor
SetGenericOpcodeOpts([op], opts)
- job_id = SendJob([op], cl)
+ job_id = SendJob([op], cl=cl)
- op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
+ op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn,
+ reporter=reporter)
return op_results[0]
elif isinstance(err, errors.HooksFailure):
obuf.write("Failure: hooks general failure: %s" % msg)
elif isinstance(err, errors.ResolverError):
- this_host = utils.HostInfo.SysName()
+ this_host = netutils.Hostname.GetSysName()
if err.args[0] == this_host:
msg = "Failure: can't resolve my own hostname ('%s')"
else:
obuf.write("Parameter Error: %s" % msg)
elif isinstance(err, errors.ParameterError):
obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
- elif isinstance(err, errors.GenericError):
- obuf.write("Unhandled Ganeti error: %s" % msg)
elif isinstance(err, luxi.NoMasterError):
obuf.write("Cannot communicate with the master daemon.\nIs it running"
" and listening for connections?")
elif isinstance(err, luxi.TimeoutError):
obuf.write("Timeout while talking to the master daemon. Error:\n"
"%s" % msg)
+ elif isinstance(err, luxi.PermissionError):
+ obuf.write("It seems you don't have permissions to connect to the"
+ " master daemon.\nPlease retry as a different user.")
elif isinstance(err, luxi.ProtocolError):
obuf.write("Unhandled protocol error while talking to the master daemon:\n"
"%s" % msg)
+ elif isinstance(err, errors.JobLost):
+ obuf.write("Error checking job status: %s" % msg)
+ elif isinstance(err, errors.GenericError):
+ obuf.write("Unhandled Ganeti error: %s" % msg)
elif isinstance(err, JobSubmittedException):
obuf.write("JobID: %s\n" % err.args[0])
retcode = 0
elif opts.no_nics:
# no nics
nics = []
- else:
+ elif mode == constants.INSTANCE_CREATE:
# default of one nic, all auto
nics = [{}]
+ else:
+ # mode == import
+ nics = []
if opts.disk_template == constants.DT_DISKLESS:
if opts.disks or opts.sd_size is not None:
" information passed")
disks = []
else:
- if not opts.disks and not opts.sd_size:
+ if (not opts.disks and not opts.sd_size
+ and mode == constants.INSTANCE_CREATE):
raise errors.OpPrereqError("No disk information specified")
if opts.disks and opts.sd_size is not None:
raise errors.OpPrereqError("Please use either the '--disk' or"
" '-s' option")
if opts.sd_size is not None:
opts.disks = [(0, {"size": opts.sd_size})]
- try:
- disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
- except ValueError, err:
- raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
- disks = [{}] * disk_max
+
+ if opts.disks:
+ try:
+ disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
+ except ValueError, err:
+ raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
+ disks = [{}] * disk_max
+ else:
+ disks = []
for didx, ddict in opts.disks:
didx = int(didx)
if not isinstance(ddict, dict):
msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
raise errors.OpPrereqError(msg)
- elif "size" not in ddict:
- raise errors.OpPrereqError("Missing size for disk %d" % didx)
- try:
- ddict["size"] = utils.ParseUnit(ddict["size"])
- except ValueError, err:
- raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
- (didx, err))
+ elif "size" in ddict:
+ if "adopt" in ddict:
+ raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
+ " (disk %d)" % didx)
+ try:
+ ddict["size"] = utils.ParseUnit(ddict["size"])
+ except ValueError, err:
+ raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
+ (didx, err))
+ elif "adopt" in ddict:
+ if mode == constants.INSTANCE_IMPORT:
+ raise errors.OpPrereqError("Disk adoption not allowed for instance"
+ " import")
+ ddict["size"] = 0
+ else:
+ raise errors.OpPrereqError("Missing size or adoption source for"
+ " disk %d" % didx)
disks[didx] = ddict
utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
if mode == constants.INSTANCE_CREATE:
start = opts.start
os_type = opts.os
+ force_variant = opts.force_variant
src_node = None
src_path = None
+ no_install = opts.no_install
+ identify_defaults = False
elif mode == constants.INSTANCE_IMPORT:
start = False
os_type = None
+ force_variant = False
src_node = opts.src_node
src_path = opts.src_dir
+ no_install = None
+ identify_defaults = opts.identify_defaults
else:
raise errors.ProgrammerError("Invalid creation mode %s" % mode)
hypervisor=hypervisor,
hvparams=hvparams,
beparams=opts.beparams,
+ osparams=opts.osparams,
mode=mode,
start=start,
os_type=os_type,
+ force_variant=force_variant,
src_node=src_node,
- src_path=src_path)
+ src_path=src_path,
+ no_install=no_install,
+ identify_defaults=identify_defaults)
SubmitOrSend(op, opts)
return 0
+class _RunWhileClusterStoppedHelper:
+ """Helper class for L{RunWhileClusterStopped} to simplify state management
+
+ """
+ def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
+ """Initializes this class.
+
+ @type feedback_fn: callable
+ @param feedback_fn: Feedback function
+ @type cluster_name: string
+ @param cluster_name: Cluster name
+ @type master_node: string
+ @param master_node Master node name
+ @type online_nodes: list
+ @param online_nodes: List of names of online nodes
+
+ """
+ self.feedback_fn = feedback_fn
+ self.cluster_name = cluster_name
+ self.master_node = master_node
+ self.online_nodes = online_nodes
+
+ self.ssh = ssh.SshRunner(self.cluster_name)
+
+ self.nonmaster_nodes = [name for name in online_nodes
+ if name != master_node]
+
+ assert self.master_node not in self.nonmaster_nodes
+
+ def _RunCmd(self, node_name, cmd):
+ """Runs a command on the local or a remote machine.
+
+ @type node_name: string
+ @param node_name: Machine name
+ @type cmd: list
+ @param cmd: Command
+
+ """
+ if node_name is None or node_name == self.master_node:
+ # No need to use SSH
+ result = utils.RunCmd(cmd)
+ else:
+ result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
+
+ if result.failed:
+ errmsg = ["Failed to run command %s" % result.cmd]
+ if node_name:
+ errmsg.append("on node %s" % node_name)
+ errmsg.append(": exitcode %s and error %s" %
+ (result.exit_code, result.output))
+ raise errors.OpExecError(" ".join(errmsg))
+
+ def Call(self, fn, *args):
+ """Call function while all daemons are stopped.
+
+ @type fn: callable
+ @param fn: Function to be called
+
+ """
+ # Pause watcher by acquiring an exclusive lock on watcher state file
+ self.feedback_fn("Blocking watcher")
+ watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
+ try:
+ # TODO: Currently, this just blocks. There's no timeout.
+ # TODO: Should it be a shared lock?
+ watcher_block.Exclusive(blocking=True)
+
+ # Stop master daemons, so that no new jobs can come in and all running
+ # ones are finished
+ self.feedback_fn("Stopping master daemons")
+ self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
+ try:
+ # Stop daemons on all nodes
+ for node_name in self.online_nodes:
+ self.feedback_fn("Stopping daemons on %s" % node_name)
+ self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
+
+ # All daemons are shut down now
+ try:
+ return fn(self, *args)
+ except Exception, err:
+ _, errmsg = FormatError(err)
+ logging.exception("Caught exception")
+ self.feedback_fn(errmsg)
+ raise
+ finally:
+ # Start cluster again, master node last
+ for node_name in self.nonmaster_nodes + [self.master_node]:
+ self.feedback_fn("Starting daemons on %s" % node_name)
+ self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
+ finally:
+ # Resume watcher
+ watcher_block.Close()
+
+
+def RunWhileClusterStopped(feedback_fn, fn, *args):
+ """Calls a function while all cluster daemons are stopped.
+
+ @type feedback_fn: callable
+ @param feedback_fn: Feedback function
+ @type fn: callable
+ @param fn: Function to be called when daemons are stopped
+
+ """
+ feedback_fn("Gathering cluster information")
+
+ # This ensures we're running on the master daemon
+ cl = GetClient()
+
+ (cluster_name, master_node) = \
+ cl.QueryConfigValues(["cluster_name", "master_node"])
+
+ online_nodes = GetOnlineNodes([], cl=cl)
+
+ # Don't keep a reference to the client. The master daemon will go away.
+ del cl
+
+ assert master_node in online_nodes
+
+ return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
+ online_nodes).Call(fn, *args)
+
+
def GenerateTable(headers, fields, separator, data,
numfields=None, unitfields=None,
units=None):
if separator is None:
mlens = [0 for name in fields]
- format = ' '.join(format_fields)
+ format_str = ' '.join(format_fields)
else:
- format = separator.replace("%", "%%").join(format_fields)
+ format_str = separator.replace("%", "%%").join(format_fields)
for row in data:
if row is None:
mlens[idx] = max(mlens[idx], len(hdr))
args.append(mlens[idx])
args.append(hdr)
- result.append(format % tuple(args))
+ result.append(format_str % tuple(args))
if separator is None:
assert len(mlens) == len(fields)
if separator is None:
args.append(mlens[idx])
args.append(line[idx])
- result.append(format % tuple(args))
+ result.append(format_str % tuple(args))
return result
return value
-def GetOnlineNodes(nodes, cl=None, nowarn=False):
+def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
+ filter_master=False):
"""Returns the names of online nodes.
This function will also log a warning on stderr with the names of
@param nowarn: by default, this function will output a note with the
offline nodes that are skipped; if this parameter is True the
note is not displayed
+ @type secondary_ips: boolean
+ @param secondary_ips: if True, return the secondary IPs instead of the
+ names, useful for doing network traffic over the replication interface
+ (if any)
+ @type filter_master: boolean
+ @param filter_master: if True, do not return the master node in the list
+ (useful in coordination with secondary_ips where we cannot check our
+ node name against the list)
"""
if cl is None:
cl = GetClient()
- result = cl.QueryNodes(names=nodes, fields=["name", "offline"],
+ if secondary_ips:
+ name_idx = 2
+ else:
+ name_idx = 0
+
+ if filter_master:
+ master_node = cl.QueryConfigValues(["master_node"])[0]
+ filter_fn = lambda x: x != master_node
+ else:
+ filter_fn = lambda _: True
+
+ result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"],
use_locking=False)
offline = [row[0] for row in result if row[1]]
if offline and not nowarn:
ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
- return [row[0] for row in result if not row[1]]
+ return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])]
def _ToStream(stream, txt, *args):
SetGenericOpcodeOpts(ops, self.opts)
self.queue.append((name, ops))
- def SubmitPending(self):
+ def SubmitPending(self, each=False):
"""Submit all pending jobs.
"""
- results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
+ if each:
+ results = []
+ for row in self.queue:
+ # SubmitJob will remove the success status, but raise an exception if
+ # the submission fails, so we'll notice that anyway.
+ results.append([True, self.cl.SubmitJob(row[1])])
+ else:
+ results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
for (idx, ((status, data), (name, _))) in enumerate(zip(results,
self.queue)):
self.jobs.append((idx, status, data, name))
assert result
for job_data, status in zip(self.jobs, result):
- if status[0] in (constants.JOB_STATUS_QUEUED,
- constants.JOB_STATUS_WAITLOCK,
- constants.JOB_STATUS_CANCELING):
- # job is still waiting
+ if (isinstance(status, list) and status and
+ status[0] in (constants.JOB_STATUS_QUEUED,
+ constants.JOB_STATUS_WAITLOCK,
+ constants.JOB_STATUS_CANCELING)):
+ # job is still present and waiting
continue
- # good candidate found
+ # good candidate found (either running job or lost job)
self.jobs.remove(job_data)
return job_data
ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
# first, remove any non-submitted jobs
- self.jobs, failures = utils.partition(self.jobs, lambda x: x[1])
+ self.jobs, failures = compat.partition(self.jobs, lambda x: x[1])
for idx, _, jid, name in failures:
ToStderr("Failed to submit job for %s: %s", name, jid)
results.append((idx, False, jid))
try:
job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
success = True
+ except errors.JobLost, err:
+ _, job_result = FormatError(err)
+ ToStderr("Job %s for %s has been archived, cannot check its result",
+ jid, name)
+ success = False
except (errors.GenericError, luxi.ProtocolError), err:
_, job_result = FormatError(err)
success = False
else:
if not self.jobs:
self.SubmitPending()
- for status, result, name in self.jobs:
+ for _, status, result, name in self.jobs:
if status:
ToStdout("%s: %s", result, name)
else:
ToStderr("Failure for %s: %s", name, result)
+ return [row[1:3] for row in self.jobs]