from ganeti import ssconf
from ganeti import rpc
from ganeti import ssh
+from ganeti import compat
from optparse import (OptionParser, TitledHelpFormatter,
Option, OptionValueError)
__all__ = [
# Command line options
+ "ADD_UIDS_OPT",
"ALLOCATABLE_OPT",
"ALL_OPT",
"AUTO_PROMOTE_OPT",
"HVOPTS_OPT",
"HYPERVISOR_OPT",
"IALLOCATOR_OPT",
+ "IDENTIFY_DEFAULTS_OPT",
"IGNORE_CONSIST_OPT",
"IGNORE_FAILURES_OPT",
"IGNORE_SECONDARIES_OPT",
"IGNORE_SIZE_OPT",
"MAC_PREFIX_OPT",
+ "MAINTAIN_NODE_HEALTH_OPT",
"MASTER_NETDEV_OPT",
"MC_OPT",
"NET_OPT",
"RAPI_CERT_OPT",
"READD_OPT",
"REBOOT_TYPE_OPT",
+ "REMOVE_UIDS_OPT",
+ "ROMAN_OPT",
"SECONDARY_IP_OPT",
"SELECT_OS_OPT",
"SEP_OPT",
"SYNC_OPT",
"TAG_SRC_OPT",
"TIMEOUT_OPT",
+ "UIDPOOL_OPT",
"USEUNITS_OPT",
+ "USE_REPL_NET_OPT",
"VERBOSE_OPT",
"VG_NAME_OPT",
"YES_DOIT_OPT",
return _SplitKeyVal(opt, value)
+def check_bool(option, opt, value): # pylint: disable-msg=W0613
+ """Custom parser for yes/no options.
+
+ This will store the parsed value as either True or False.
+
+ """
+ value = value.lower()
+ if value == constants.VALUE_FALSE or value == "no":
+ return False
+ elif value == constants.VALUE_TRUE or value == "yes":
+ return True
+ else:
+ raise errors.ParameterError("Invalid boolean value '%s'" % value)
+
+
# completion_suggestion is normally a list. Using numeric values not evaluating
# to False for dynamic completion.
(OPT_COMPL_MANY_NODES,
"identkeyval",
"keyval",
"unit",
+ "bool",
)
TYPE_CHECKER = Option.TYPE_CHECKER.copy()
TYPE_CHECKER["identkeyval"] = check_ident_key_val
TYPE_CHECKER["keyval"] = check_key_val
TYPE_CHECKER["unit"] = check_unit
+ TYPE_CHECKER["bool"] = check_bool
# optparse.py sets make_option, so we do it for our own option class, too
cli_option = CliOption
-_YESNO = ("yes", "no")
_YORNO = "yes|no"
DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
- choices=_YESNO, default=None, metavar=_YORNO,
+ type="bool", default=None, metavar=_YORNO,
help="Set the master_candidate flag on the node")
OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
- choices=_YESNO, default=None,
+ type="bool", default=None,
help="Set the offline flag on the node")
DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
- choices=_YESNO, default=None,
+ type="bool", default=None,
help="Set the drained flag on the node")
ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
- choices=_YESNO, default=None, metavar=_YORNO,
+ type="bool", default=None, metavar=_YORNO,
help="Set the allocatable flag on a volume")
NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
help=("Create a new HMAC key for %s" %
constants.CONFD))
+USE_REPL_NET_OPT = cli_option("--use-replication-network",
+ dest="use_replication_network",
+ help="Whether to use the replication network"
+ " for talking to the nodes",
+ action="store_true", default=False)
+
+MAINTAIN_NODE_HEALTH_OPT = \
+ cli_option("--maintain-node-health", dest="maintain_node_health",
+ metavar=_YORNO, default=None, type="bool",
+ help="Configure the cluster to automatically maintain node"
+ " health, by shutting down unknown instances, shutting down"
+ " unknown DRBD devices, etc.")
+
+IDENTIFY_DEFAULTS_OPT = \
+ cli_option("--identify-defaults", dest="identify_defaults",
+ default=False, action="store_true",
+ help="Identify which saved instance parameters are equal to"
+ " the current cluster defaults and set them as such, instead"
+ " of marking them as overridden")
+
+UIDPOOL_OPT = cli_option("--uid-pool", default=None,
+ action="store", dest="uid_pool",
+ help=("A list of user-ids or user-id"
+ " ranges separated by commas"))
+
+ADD_UIDS_OPT = cli_option("--add-uids", default=None,
+ action="store", dest="add_uids",
+ help=("A list of user-ids or user-id"
+ " ranges separated by commas, to be"
+ " added to the user-id pool"))
+
+REMOVE_UIDS_OPT = cli_option("--remove-uids", default=None,
+ action="store", dest="remove_uids",
+ help=("A list of user-ids or user-id"
+ " ranges separated by commas, to be"
+ " removed from the user-id pool"))
+
+ROMAN_OPT = cli_option("--roman",
+ dest="roman_integers", default=False,
+ action="store_true",
+ help="Use roman numbers for positive integers")
+
+
def _ParseArgs(argv, commands, aliases):
"""Parser for the command line arguments.
return job_id
-def PollJob(job_id, cl=None, feedback_fn=None):
- """Function to poll for the result of a job.
+def GenericPollJob(job_id, cbs, report_cbs):
+ """Generic job-polling function.
- @type job_id: job identified
- @param job_id: the job to poll for results
- @type cl: luxi.Client
- @param cl: the luxi client to use for communicating with the master;
- if None, a new client will be created
+ @type job_id: number
+ @param job_id: Job ID
+ @type cbs: Instance of L{JobPollCbBase}
+ @param cbs: Data callbacks
+ @type report_cbs: Instance of L{JobPollReportCbBase}
+ @param report_cbs: Reporting callbacks
"""
- if cl is None:
- cl = GetClient()
-
prev_job_info = None
prev_logmsg_serial = None
status = None
- notified_queued = False
- notified_waitlock = False
-
while True:
- result = cl.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
- prev_logmsg_serial)
+ result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
+ prev_logmsg_serial)
if not result:
# job not found, go away!
raise errors.JobLost("Job with id %s lost" % job_id)
- elif result == constants.JOB_NOTCHANGED:
- if status is not None and not callable(feedback_fn):
- if status == constants.JOB_STATUS_QUEUED and not notified_queued:
- ToStderr("Job %s is waiting in queue", job_id)
- notified_queued = True
- elif status == constants.JOB_STATUS_WAITLOCK and not notified_waitlock:
- ToStderr("Job %s is trying to acquire all necessary locks", job_id)
- notified_waitlock = True
+
+ if result == constants.JOB_NOTCHANGED:
+ report_cbs.ReportNotChanged(job_id, status)
# Wait again
continue
if log_entries:
for log_entry in log_entries:
- (serial, timestamp, _, message) = log_entry
- if callable(feedback_fn):
- feedback_fn(log_entry[1:])
- else:
- encoded = utils.SafeEncode(message)
- ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded)
+ (serial, timestamp, log_type, message) = log_entry
+ report_cbs.ReportLogMessage(job_id, serial, timestamp,
+ log_type, message)
prev_logmsg_serial = max(prev_logmsg_serial, serial)
# TODO: Handle canceled and archived jobs
prev_job_info = job_info
- jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
+ jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
if not jobs:
raise errors.JobLost("Job with id %s lost" % job_id)
status, opstatus, result = jobs[0]
+
if status == constants.JOB_STATUS_SUCCESS:
return result
- elif status in (constants.JOB_STATUS_CANCELING,
- constants.JOB_STATUS_CANCELED):
+
+ if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
raise errors.OpExecError("Job was canceled")
+
+ has_ok = False
+ for idx, (status, msg) in enumerate(zip(opstatus, result)):
+ if status == constants.OP_STATUS_SUCCESS:
+ has_ok = True
+ elif status == constants.OP_STATUS_ERROR:
+ errors.MaybeRaise(msg)
+
+ if has_ok:
+ raise errors.OpExecError("partial failure (opcode %d): %s" %
+ (idx, msg))
+
+ raise errors.OpExecError(str(msg))
+
+ # default failure mode
+ raise errors.OpExecError(result)
+
+
+class JobPollCbBase:
+ """Base class for L{GenericPollJob} callbacks.
+
+ """
+ def __init__(self):
+ """Initializes this class.
+
+ """
+
+ def WaitForJobChangeOnce(self, job_id, fields,
+ prev_job_info, prev_log_serial):
+ """Waits for changes on a job.
+
+ """
+ raise NotImplementedError()
+
+ def QueryJobs(self, job_ids, fields):
+ """Returns the selected fields for the selected job IDs.
+
+ @type job_ids: list of numbers
+ @param job_ids: Job IDs
+ @type fields: list of strings
+ @param fields: Fields
+
+ """
+ raise NotImplementedError()
+
+
+class JobPollReportCbBase:
+ """Base class for L{GenericPollJob} reporting callbacks.
+
+ """
+ def __init__(self):
+ """Initializes this class.
+
+ """
+
+ def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
+ """Handles a log message.
+
+ """
+ raise NotImplementedError()
+
+ def ReportNotChanged(self, job_id, status):
+ """Called for if a job hasn't changed in a while.
+
+ @type job_id: number
+ @param job_id: Job ID
+ @type status: string or None
+ @param status: Job status if available
+
+ """
+ raise NotImplementedError()
+
+
+class _LuxiJobPollCb(JobPollCbBase):
+ def __init__(self, cl):
+ """Initializes this class.
+
+ """
+ JobPollCbBase.__init__(self)
+ self.cl = cl
+
+ def WaitForJobChangeOnce(self, job_id, fields,
+ prev_job_info, prev_log_serial):
+ """Waits for changes on a job.
+
+ """
+ return self.cl.WaitForJobChangeOnce(job_id, fields,
+ prev_job_info, prev_log_serial)
+
+ def QueryJobs(self, job_ids, fields):
+ """Returns the selected fields for the selected job IDs.
+
+ """
+ return self.cl.QueryJobs(job_ids, fields)
+
+
+class FeedbackFnJobPollReportCb(JobPollReportCbBase):
+ def __init__(self, feedback_fn):
+ """Initializes this class.
+
+ """
+ JobPollReportCbBase.__init__(self)
+
+ self.feedback_fn = feedback_fn
+
+ assert callable(feedback_fn)
+
+ def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
+ """Handles a log message.
+
+ """
+ self.feedback_fn((timestamp, log_type, log_msg))
+
+ def ReportNotChanged(self, job_id, status):
+ """Called if a job hasn't changed in a while.
+
+ """
+ # Ignore
+
+
+class StdioJobPollReportCb(JobPollReportCbBase):
+ def __init__(self):
+ """Initializes this class.
+
+ """
+ JobPollReportCbBase.__init__(self)
+
+ self.notified_queued = False
+ self.notified_waitlock = False
+
+ def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
+ """Handles a log message.
+
+ """
+ ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
+ utils.SafeEncode(log_msg))
+
+ def ReportNotChanged(self, job_id, status):
+ """Called if a job hasn't changed in a while.
+
+ """
+ if status is None:
+ return
+
+ if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
+ ToStderr("Job %s is waiting in queue", job_id)
+ self.notified_queued = True
+
+ elif status == constants.JOB_STATUS_WAITLOCK and not self.notified_waitlock:
+ ToStderr("Job %s is trying to acquire all necessary locks", job_id)
+ self.notified_waitlock = True
+
+
+def PollJob(job_id, cl=None, feedback_fn=None):
+ """Function to poll for the result of a job.
+
+ @type job_id: job identified
+ @param job_id: the job to poll for results
+ @type cl: luxi.Client
+ @param cl: the luxi client to use for communicating with the master;
+ if None, a new client will be created
+
+ """
+ if cl is None:
+ cl = GetClient()
+
+ if feedback_fn:
+ reporter = FeedbackFnJobPollReportCb(feedback_fn)
else:
- has_ok = False
- for idx, (status, msg) in enumerate(zip(opstatus, result)):
- if status == constants.OP_STATUS_SUCCESS:
- has_ok = True
- elif status == constants.OP_STATUS_ERROR:
- errors.MaybeRaise(msg)
- if has_ok:
- raise errors.OpExecError("partial failure (opcode %d): %s" %
- (idx, msg))
- else:
- raise errors.OpExecError(str(msg))
- # default failure mode
- raise errors.OpExecError(result)
+ reporter = StdioJobPollReportCb()
+
+ return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):
elif opts.no_nics:
# no nics
nics = []
- else:
+ elif mode == constants.INSTANCE_CREATE:
# default of one nic, all auto
nics = [{}]
+ else:
+ # mode == import
+ nics = []
if opts.disk_template == constants.DT_DISKLESS:
if opts.disks or opts.sd_size is not None:
" information passed")
disks = []
else:
- if not opts.disks and not opts.sd_size:
+ if (not opts.disks and not opts.sd_size
+ and mode == constants.INSTANCE_CREATE):
raise errors.OpPrereqError("No disk information specified")
if opts.disks and opts.sd_size is not None:
raise errors.OpPrereqError("Please use either the '--disk' or"
" '-s' option")
if opts.sd_size is not None:
opts.disks = [(0, {"size": opts.sd_size})]
- try:
- disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
- except ValueError, err:
- raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
- disks = [{}] * disk_max
+
+ if opts.disks:
+ try:
+ disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
+ except ValueError, err:
+ raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
+ disks = [{}] * disk_max
+ else:
+ disks = []
for didx, ddict in opts.disks:
didx = int(didx)
if not isinstance(ddict, dict):
src_node = None
src_path = None
no_install = opts.no_install
+ identify_defaults = False
elif mode == constants.INSTANCE_IMPORT:
start = False
os_type = None
src_node = opts.src_node
src_path = opts.src_dir
no_install = None
+ identify_defaults = opts.identify_defaults
else:
raise errors.ProgrammerError("Invalid creation mode %s" % mode)
os_type=os_type,
src_node=src_node,
src_path=src_path,
- no_install=no_install)
+ no_install=no_install,
+ identify_defaults=identify_defaults)
SubmitOrSend(op, opts)
return 0
return value
-def GetOnlineNodes(nodes, cl=None, nowarn=False):
+def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
+ filter_master=False):
"""Returns the names of online nodes.
This function will also log a warning on stderr with the names of
@param nowarn: by default, this function will output a note with the
offline nodes that are skipped; if this parameter is True the
note is not displayed
+ @type secondary_ips: boolean
+ @param secondary_ips: if True, return the secondary IPs instead of the
+ names, useful for doing network traffic over the replication interface
+ (if any)
+ @type filter_master: boolean
+ @param filter_master: if True, do not return the master node in the list
+ (useful in coordination with secondary_ips where we cannot check our
+ node name against the list)
"""
if cl is None:
cl = GetClient()
- result = cl.QueryNodes(names=nodes, fields=["name", "offline"],
+ if secondary_ips:
+ name_idx = 2
+ else:
+ name_idx = 0
+
+ if filter_master:
+ master_node = cl.QueryConfigValues(["master_node"])[0]
+ filter_fn = lambda x: x != master_node
+ else:
+ filter_fn = lambda _: True
+
+ result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"],
use_locking=False)
offline = [row[0] for row in result if row[1]]
if offline and not nowarn:
ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
- return [row[0] for row in result if not row[1]]
+ return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])]
def _ToStream(stream, txt, *args):
ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
# first, remove any non-submitted jobs
- self.jobs, failures = utils.partition(self.jobs, lambda x: x[1])
+ self.jobs, failures = compat.partition(self.jobs, lambda x: x[1])
for idx, _, jid, name in failures:
ToStderr("Failed to submit job for %s: %s", name, jid)
results.append((idx, False, jid))
else:
if not self.jobs:
self.SubmitPending()
- for status, result, name in self.jobs:
+ for _, status, result, name in self.jobs:
if status:
ToStdout("%s: %s", result, name)
else: