from ganeti import ssconf
from ganeti import rpc
from ganeti import ssh
+from ganeti import compat
from optparse import (OptionParser, TitledHelpFormatter,
Option, OptionValueError)
__all__ = [
# Command line options
+ "ADD_UIDS_OPT",
"ALLOCATABLE_OPT",
"ALL_OPT",
"AUTO_PROMOTE_OPT",
"AUTO_REPLACE_OPT",
"BACKEND_OPT",
"CLEANUP_OPT",
+ "CLUSTER_DOMAIN_SECRET_OPT",
"CONFIRM_OPT",
"CP_SIZE_OPT",
"DEBUG_OPT",
"IDENTIFY_DEFAULTS_OPT",
"IGNORE_CONSIST_OPT",
"IGNORE_FAILURES_OPT",
+ "IGNORE_REMOVE_FAILURES_OPT",
"IGNORE_SECONDARIES_OPT",
"IGNORE_SIZE_OPT",
"MAC_PREFIX_OPT",
"MC_OPT",
"NET_OPT",
"NEW_CLUSTER_CERT_OPT",
+ "NEW_CLUSTER_DOMAIN_SECRET_OPT",
"NEW_CONFD_HMAC_KEY_OPT",
"NEW_RAPI_CERT_OPT",
"NEW_SECONDARY_OPT",
"RAPI_CERT_OPT",
"READD_OPT",
"REBOOT_TYPE_OPT",
+ "REMOVE_INSTANCE_OPT",
+ "REMOVE_UIDS_OPT",
+ "ROMAN_OPT",
"SECONDARY_IP_OPT",
"SELECT_OS_OPT",
"SEP_OPT",
" configuration even if there are failures"
" during the removal process")
+IGNORE_REMOVE_FAILURES_OPT = cli_option("--ignore-remove-failures",
+ dest="ignore_remove_failures",
+ action="store_true", default=False,
+ help="Remove the instance from the"
+ " cluster configuration even if there"
+ " are failures during the removal"
+ " process")
+
+REMOVE_INSTANCE_OPT = cli_option("--remove-instance", dest="remove_instance",
+ action="store_true", default=False,
+ help="Remove the instance from the cluster")
+
NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
help="Specifies the new secondary node",
metavar="NODE", default=None,
metavar="NETDEV",
default=constants.DEFAULT_BRIDGE)
-
GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
help="Specify the default directory (cluster-"
"wide) for storing the file-based disks [%s]" %
help=("Create a new HMAC key for %s" %
constants.CONFD))
+CLUSTER_DOMAIN_SECRET_OPT = cli_option("--cluster-domain-secret",
+ dest="cluster_domain_secret",
+ default=None,
+ help=("Load new new cluster domain"
+ " secret from file"))
+
+NEW_CLUSTER_DOMAIN_SECRET_OPT = cli_option("--new-cluster-domain-secret",
+ dest="new_cluster_domain_secret",
+ default=False, action="store_true",
+ help=("Create a new cluster domain"
+ " secret"))
+
USE_REPL_NET_OPT = cli_option("--use-replication-network",
dest="use_replication_network",
help="Whether to use the replication network"
help=("A list of user-ids or user-id"
" ranges separated by commas"))
+ADD_UIDS_OPT = cli_option("--add-uids", default=None,
+ action="store", dest="add_uids",
+ help=("A list of user-ids or user-id"
+ " ranges separated by commas, to be"
+ " added to the user-id pool"))
+
+REMOVE_UIDS_OPT = cli_option("--remove-uids", default=None,
+ action="store", dest="remove_uids",
+ help=("A list of user-ids or user-id"
+ " ranges separated by commas, to be"
+ " removed from the user-id pool"))
+
+ROMAN_OPT = cli_option("--roman",
+ dest="roman_integers", default=False,
+ action="store_true",
+ help="Use roman numbers for positive integers")
+
+
def _ParseArgs(argv, commands, aliases):
"""Parser for the command line arguments.
return job_id
-def PollJob(job_id, cl=None, feedback_fn=None):
- """Function to poll for the result of a job.
+def GenericPollJob(job_id, cbs, report_cbs):
+ """Generic job-polling function.
- @type job_id: job identified
- @param job_id: the job to poll for results
- @type cl: luxi.Client
- @param cl: the luxi client to use for communicating with the master;
- if None, a new client will be created
+ @type job_id: number
+ @param job_id: Job ID
+ @type cbs: Instance of L{JobPollCbBase}
+ @param cbs: Data callbacks
+ @type report_cbs: Instance of L{JobPollReportCbBase}
+ @param report_cbs: Reporting callbacks
"""
- if cl is None:
- cl = GetClient()
-
prev_job_info = None
prev_logmsg_serial = None
status = None
- notified_queued = False
- notified_waitlock = False
-
while True:
- result = cl.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
- prev_logmsg_serial)
+ result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
+ prev_logmsg_serial)
if not result:
# job not found, go away!
raise errors.JobLost("Job with id %s lost" % job_id)
- elif result == constants.JOB_NOTCHANGED:
- if status is not None and not callable(feedback_fn):
- if status == constants.JOB_STATUS_QUEUED and not notified_queued:
- ToStderr("Job %s is waiting in queue", job_id)
- notified_queued = True
- elif status == constants.JOB_STATUS_WAITLOCK and not notified_waitlock:
- ToStderr("Job %s is trying to acquire all necessary locks", job_id)
- notified_waitlock = True
+
+ if result == constants.JOB_NOTCHANGED:
+ report_cbs.ReportNotChanged(job_id, status)
# Wait again
continue
if log_entries:
for log_entry in log_entries:
- (serial, timestamp, _, message) = log_entry
- if callable(feedback_fn):
- feedback_fn(log_entry[1:])
- else:
- encoded = utils.SafeEncode(message)
- ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded)
+ (serial, timestamp, log_type, message) = log_entry
+ report_cbs.ReportLogMessage(job_id, serial, timestamp,
+ log_type, message)
prev_logmsg_serial = max(prev_logmsg_serial, serial)
# TODO: Handle canceled and archived jobs
prev_job_info = job_info
- jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
+ jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
if not jobs:
raise errors.JobLost("Job with id %s lost" % job_id)
status, opstatus, result = jobs[0]
+
if status == constants.JOB_STATUS_SUCCESS:
return result
- elif status in (constants.JOB_STATUS_CANCELING,
- constants.JOB_STATUS_CANCELED):
+
+ if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
raise errors.OpExecError("Job was canceled")
+
+ has_ok = False
+ for idx, (status, msg) in enumerate(zip(opstatus, result)):
+ if status == constants.OP_STATUS_SUCCESS:
+ has_ok = True
+ elif status == constants.OP_STATUS_ERROR:
+ errors.MaybeRaise(msg)
+
+ if has_ok:
+ raise errors.OpExecError("partial failure (opcode %d): %s" %
+ (idx, msg))
+
+ raise errors.OpExecError(str(msg))
+
+ # default failure mode
+ raise errors.OpExecError(result)
+
+
+class JobPollCbBase:
+ """Base class for L{GenericPollJob} callbacks.
+
+ """
+ def __init__(self):
+ """Initializes this class.
+
+ """
+
+ def WaitForJobChangeOnce(self, job_id, fields,
+ prev_job_info, prev_log_serial):
+ """Waits for changes on a job.
+
+ """
+ raise NotImplementedError()
+
+ def QueryJobs(self, job_ids, fields):
+ """Returns the selected fields for the selected job IDs.
+
+ @type job_ids: list of numbers
+ @param job_ids: Job IDs
+ @type fields: list of strings
+ @param fields: Fields
+
+ """
+ raise NotImplementedError()
+
+
+class JobPollReportCbBase:
+ """Base class for L{GenericPollJob} reporting callbacks.
+
+ """
+ def __init__(self):
+ """Initializes this class.
+
+ """
+
+ def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
+ """Handles a log message.
+
+ """
+ raise NotImplementedError()
+
+ def ReportNotChanged(self, job_id, status):
+ """Called for if a job hasn't changed in a while.
+
+ @type job_id: number
+ @param job_id: Job ID
+ @type status: string or None
+ @param status: Job status if available
+
+ """
+ raise NotImplementedError()
+
+
+class _LuxiJobPollCb(JobPollCbBase):
+ def __init__(self, cl):
+ """Initializes this class.
+
+ """
+ JobPollCbBase.__init__(self)
+ self.cl = cl
+
+ def WaitForJobChangeOnce(self, job_id, fields,
+ prev_job_info, prev_log_serial):
+ """Waits for changes on a job.
+
+ """
+ return self.cl.WaitForJobChangeOnce(job_id, fields,
+ prev_job_info, prev_log_serial)
+
+ def QueryJobs(self, job_ids, fields):
+ """Returns the selected fields for the selected job IDs.
+
+ """
+ return self.cl.QueryJobs(job_ids, fields)
+
+
+class FeedbackFnJobPollReportCb(JobPollReportCbBase):
+ def __init__(self, feedback_fn):
+ """Initializes this class.
+
+ """
+ JobPollReportCbBase.__init__(self)
+
+ self.feedback_fn = feedback_fn
+
+ assert callable(feedback_fn)
+
+ def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
+ """Handles a log message.
+
+ """
+ self.feedback_fn((timestamp, log_type, log_msg))
+
+ def ReportNotChanged(self, job_id, status):
+ """Called if a job hasn't changed in a while.
+
+ """
+ # Ignore
+
+
+class StdioJobPollReportCb(JobPollReportCbBase):
+ def __init__(self):
+ """Initializes this class.
+
+ """
+ JobPollReportCbBase.__init__(self)
+
+ self.notified_queued = False
+ self.notified_waitlock = False
+
+ def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
+ """Handles a log message.
+
+ """
+ ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
+ utils.SafeEncode(log_msg))
+
+ def ReportNotChanged(self, job_id, status):
+ """Called if a job hasn't changed in a while.
+
+ """
+ if status is None:
+ return
+
+ if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
+ ToStderr("Job %s is waiting in queue", job_id)
+ self.notified_queued = True
+
+ elif status == constants.JOB_STATUS_WAITLOCK and not self.notified_waitlock:
+ ToStderr("Job %s is trying to acquire all necessary locks", job_id)
+ self.notified_waitlock = True
+
+
+def PollJob(job_id, cl=None, feedback_fn=None):
+ """Function to poll for the result of a job.
+
+ @type job_id: job identified
+ @param job_id: the job to poll for results
+ @type cl: luxi.Client
+ @param cl: the luxi client to use for communicating with the master;
+ if None, a new client will be created
+
+ """
+ if cl is None:
+ cl = GetClient()
+
+ if feedback_fn:
+ reporter = FeedbackFnJobPollReportCb(feedback_fn)
else:
- has_ok = False
- for idx, (status, msg) in enumerate(zip(opstatus, result)):
- if status == constants.OP_STATUS_SUCCESS:
- has_ok = True
- elif status == constants.OP_STATUS_ERROR:
- errors.MaybeRaise(msg)
- if has_ok:
- raise errors.OpExecError("partial failure (opcode %d): %s" %
- (idx, msg))
- else:
- raise errors.OpExecError(str(msg))
- # default failure mode
- raise errors.OpExecError(result)
+ reporter = StdioJobPollReportCb()
+
+ return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):
obuf.write("Parameter Error: %s" % msg)
elif isinstance(err, errors.ParameterError):
obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
- elif isinstance(err, errors.GenericError):
- obuf.write("Unhandled Ganeti error: %s" % msg)
elif isinstance(err, luxi.NoMasterError):
obuf.write("Cannot communicate with the master daemon.\nIs it running"
" and listening for connections?")
elif isinstance(err, luxi.ProtocolError):
obuf.write("Unhandled protocol error while talking to the master daemon:\n"
"%s" % msg)
+ elif isinstance(err, errors.GenericError):
+ obuf.write("Unhandled Ganeti error: %s" % msg)
elif isinstance(err, JobSubmittedException):
obuf.write("JobID: %s\n" % err.args[0])
retcode = 0
SetGenericOpcodeOpts(ops, self.opts)
self.queue.append((name, ops))
- def SubmitPending(self):
+ def SubmitPending(self, each=False):
"""Submit all pending jobs.
"""
- results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
+ if each:
+ results = []
+ for row in self.queue:
+ # SubmitJob will remove the success status, but raise an exception if
+ # the submission fails, so we'll notice that anyway.
+ results.append([True, self.cl.SubmitJob(row[1])])
+ else:
+ results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
for (idx, ((status, data), (name, _))) in enumerate(zip(results,
self.queue)):
self.jobs.append((idx, status, data, name))
ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
# first, remove any non-submitted jobs
- self.jobs, failures = utils.partition(self.jobs, lambda x: x[1])
+ self.jobs, failures = compat.partition(self.jobs, lambda x: x[1])
for idx, _, jid, name in failures:
ToStderr("Failed to submit job for %s: %s", name, jid)
results.append((idx, False, jid))