X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/a604f165385c632dac15ae119cccdb6f264b3824..b59252feb90e3f6c3754c5e464dc28c895ac0c2b:/lib/cli.py diff --git a/lib/cli.py b/lib/cli.py index 73be930..fefd2b7 100644 --- a/lib/cli.py +++ b/lib/cli.py @@ -27,15 +27,16 @@ import textwrap import os.path import copy import time +import logging from cStringIO import StringIO from ganeti import utils -from ganeti import logger from ganeti import errors from ganeti import constants from ganeti import opcodes from ganeti import luxi from ganeti import ssconf +from ganeti import rpc from optparse import (OptionParser, make_option, TitledHelpFormatter, Option, OptionValueError) @@ -49,7 +50,8 @@ __all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain", "ListTags", "AddTags", "RemoveTags", "TAG_SRC_OPT", "FormatError", "SplitNodeOption", "SubmitOrSend", "JobSubmittedException", "FormatTimestamp", "ParseTimespec", - "ValidateBeParams", + "ToStderr", "ToStdout", "UsesRPC", + "GetOnlineNodes", "JobExecutor", "SYNC_OPT", ] @@ -168,9 +170,9 @@ SEP_OPT = make_option("--separator", default=None, help="Separator between output fields" " (defaults to one space)") -USEUNITS_OPT = make_option("--human-readable", default=False, - action="store_true", dest="human_readable", - help="Print sizes in human readable format") +USEUNITS_OPT = make_option("--units", default=None, + dest="units", choices=('h', 'm', 'g', 't'), + help="Specify units for output (one of hmgt)") FIELDS_OPT = make_option("-o", "--output", dest="output", action="store", type="string", help="Comma separated list of" @@ -188,6 +190,11 @@ SUBMIT_OPT = make_option("--submit", dest="submit_only", help="Submit the job and return the job ID, but" " don't wait for the job to finish") +SYNC_OPT = make_option("--sync", dest="do_locking", + default=False, action="store_true", + help="Grab locks while doing the queries" + " in order to ensure more consistent results") + def ARGS_FIXED(val): """Macro-like function denoting a fixed number of arguments""" @@ -311,15 +318,15 @@ keyval_option = KeyValOption def _ParseArgs(argv, commands, aliases): - """Parses the command line and return the function which must be - executed together with its arguments + """Parser for the command line arguments. - Arguments: - argv: the command line + This function parses the arguments and returns the function which + must be executed together with its (modified) arguments. - commands: dictionary with special contents, see the design doc for - cmdline handling - aliases: dictionary with command aliases {'alias': 'target, ...} + @param argv: the command line + @param commands: dictionary with special contents, see the design + doc for cmdline handling + @param aliases: dictionary with command aliases {'alias': 'target, ...} """ if len(argv) == 0: @@ -402,51 +409,39 @@ def SplitNodeOption(value): return (value, None) -def ValidateBeParams(bep): - """Parse and check the given beparams. - - The function will update in-place the given dictionary. - - @type bep: dict - @param bep: input beparams - @raise errors.ParameterError: if the input values are not OK - @raise errors.UnitParseError: if the input values are not OK - - """ - if constants.BE_MEMORY in bep: - bep[constants.BE_MEMORY] = utils.ParseUnit(bep[constants.BE_MEMORY]) - - if constants.BE_VCPUS in bep: +def UsesRPC(fn): + def wrapper(*args, **kwargs): + rpc.Init() try: - bep[constants.BE_VCPUS] = int(bep[constants.BE_VCPUS]) - except ValueError: - raise errors.ParameterError("Invalid number of VCPUs") + return fn(*args, **kwargs) + finally: + rpc.Shutdown() + return wrapper def AskUser(text, choices=None): """Ask the user a question. - Args: - text - the question to ask. + @param text: the question to ask - choices - list with elements tuples (input_char, return_value, - description); if not given, it will default to: [('y', True, - 'Perform the operation'), ('n', False, 'Do no do the operation')]; - note that the '?' char is reserved for help + @param choices: list with elements tuples (input_char, return_value, + description); if not given, it will default to: [('y', True, + 'Perform the operation'), ('n', False, 'Do no do the operation')]; + note that the '?' char is reserved for help - Returns: one of the return values from the choices list; if input is - not possible (i.e. not running with a tty, we return the last entry - from the list + @return: one of the return values from the choices list; if input is + not possible (i.e. not running with a tty, we return the last + entry from the list """ if choices is None: choices = [('y', True, 'Perform the operation'), ('n', False, 'Do not perform the operation')] if not choices or not isinstance(choices, list): - raise errors.ProgrammerError("Invalid choiches argument to AskUser") + raise errors.ProgrammerError("Invalid choices argument to AskUser") for entry in choices: if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?': - raise errors.ProgrammerError("Invalid choiches element to AskUser") + raise errors.ProgrammerError("Invalid choices element to AskUser") answer = choices[-1][1] new_text = [] @@ -543,23 +538,41 @@ def PollJob(job_id, cl=None, feedback_fn=None): if callable(feedback_fn): feedback_fn(log_entry[1:]) else: - print "%s %s" % (time.ctime(utils.MergeTime(timestamp)), message) + encoded = utils.SafeEncode(message) + print "%s %s" % (time.ctime(utils.MergeTime(timestamp)), encoded) prev_logmsg_serial = max(prev_logmsg_serial, serial) # TODO: Handle canceled and archived jobs - elif status in (constants.JOB_STATUS_SUCCESS, constants.JOB_STATUS_ERROR): + elif status in (constants.JOB_STATUS_SUCCESS, + constants.JOB_STATUS_ERROR, + constants.JOB_STATUS_CANCELING, + constants.JOB_STATUS_CANCELED): break prev_job_info = job_info - jobs = cl.QueryJobs([job_id], ["status", "opresult"]) + jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"]) if not jobs: raise errors.JobLost("Job with id %s lost" % job_id) - status, result = jobs[0] + status, opstatus, result = jobs[0] if status == constants.JOB_STATUS_SUCCESS: return result + elif status in (constants.JOB_STATUS_CANCELING, + constants.JOB_STATUS_CANCELED): + raise errors.OpExecError("Job was canceled") else: + has_ok = False + for idx, (status, msg) in enumerate(zip(opstatus, result)): + if status == constants.OP_STATUS_SUCCESS: + has_ok = True + elif status == constants.OP_STATUS_ERROR: + if has_ok: + raise errors.OpExecError("partial failure (opcode %d): %s" % + (idx, msg)) + else: + raise errors.OpExecError(str(msg)) + # default failure mode raise errors.OpExecError(result) @@ -626,7 +639,7 @@ def FormatError(err): msg = str(err) if isinstance(err, errors.ConfigurationError): txt = "Corrupt configuration file: %s" % msg - logger.Error(txt) + logging.error(txt) obuf.write(txt + "\n") obuf.write("Aborting.") retcode = 2 @@ -655,6 +668,16 @@ def FormatError(err): obuf.write("Failure: command execution error:\n%s" % msg) elif isinstance(err, errors.TagError): obuf.write("Failure: invalid tag(s) given:\n%s" % msg) + elif isinstance(err, errors.JobQueueDrainError): + obuf.write("Failure: the job queue is marked for drain and doesn't" + " accept new requests\n") + elif isinstance(err, errors.JobQueueFull): + obuf.write("Failure: the job queue is full and doesn't accept new" + " job submissions until old jobs are archived\n") + elif isinstance(err, errors.TypeEnforcementError): + obuf.write("Parameter Error: %s" % msg) + elif isinstance(err, errors.ParameterError): + obuf.write("Failure: unknown/wrong parameter name '%s'" % msg) elif isinstance(err, errors.GenericError): obuf.write("Unhandled Ganeti error: %s" % msg) elif isinstance(err, luxi.NoMasterError): @@ -709,51 +732,80 @@ def GenericMain(commands, override=None, aliases=None): for key, val in override.iteritems(): setattr(options, key, val) - logger.SetupLogging(constants.LOG_COMMANDS, debug=options.debug, - stderr_logging=True, program=binary) + utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug, + stderr_logging=True, program=binary) utils.debug = options.debug if old_cmdline: - logger.Info("run with arguments '%s'" % old_cmdline) + logging.info("run with arguments '%s'", old_cmdline) else: - logger.Info("run with no arguments") + logging.info("run with no arguments") try: result = func(options, args) - except (errors.GenericError, luxi.ProtocolError), err: + except (errors.GenericError, luxi.ProtocolError, + JobSubmittedException), err: result, err_msg = FormatError(err) - logger.ToStderr(err_msg) + logging.exception("Error during command processing") + ToStderr(err_msg) return result def GenerateTable(headers, fields, separator, data, - numfields=None, unitfields=None): + numfields=None, unitfields=None, + units=None): """Prints a table with headers and different fields. - Args: - headers: Dict of header titles or None if no headers should be shown - fields: List of fields to show - separator: String used to separate fields or None for spaces - data: Data to be printed - numfields: List of fields to be aligned to right - unitfields: List of fields to be formatted as units + @type headers: dict + @param headers: dictionary mapping field names to headers for + the table + @type fields: list + @param fields: the field names corresponding to each row in + the data field + @param separator: the separator to be used; if this is None, + the default 'smart' algorithm is used which computes optimal + field width, otherwise just the separator is used between + each field + @type data: list + @param data: a list of lists, each sublist being one row to be output + @type numfields: list + @param numfields: a list with the fields that hold numeric + values and thus should be right-aligned + @type unitfields: list + @param unitfields: a list with the fields that hold numeric + values that should be formatted with the units field + @type units: string or None + @param units: the units we should use for formatting, or None for + automatic choice (human-readable for non-separator usage, otherwise + megabytes); this is a one-letter string """ + if units is None: + if separator: + units = "m" + else: + units = "h" + if numfields is None: numfields = [] if unitfields is None: unitfields = [] + numfields = utils.FieldSet(*numfields) + unitfields = utils.FieldSet(*unitfields) + format_fields = [] for field in fields: if headers and field not in headers: - raise errors.ProgrammerError("Missing header description for field '%s'" - % field) + # TODO: handle better unknown fields (either revert to old + # style of raising exception, or deal more intelligently with + # variable fields) + headers[field] = field if separator is not None: format_fields.append("%s") - elif field in numfields: + elif numfields.Matches(field): format_fields.append("%*s") else: format_fields.append("%-*s") @@ -765,14 +817,16 @@ def GenerateTable(headers, fields, separator, data, format = separator.replace("%", "%%").join(format_fields) for row in data: + if row is None: + continue for idx, val in enumerate(row): - if fields[idx] in unitfields: + if unitfields.Matches(fields[idx]): try: val = int(val) except ValueError: pass else: - val = row[idx] = utils.FormatUnit(val) + val = row[idx] = utils.FormatUnit(val, units) val = row[idx] = str(val) if separator is None: mlens[idx] = max(mlens[idx], len(val)) @@ -790,6 +844,8 @@ def GenerateTable(headers, fields, separator, data, for line in data: args = [] + if line is None: + line = ['-' for _ in fields] for idx in xrange(len(fields)): if separator is None: args.append(mlens[idx]) @@ -806,7 +862,7 @@ def FormatTimestamp(ts): @param ts: a timeval-type timestamp, a tuple of seconds and microseconds @rtype: string - @returns: a string with the formatted timestamp + @return: a string with the formatted timestamp """ if not isinstance (ts, (tuple, list)) or len(ts) != 2: @@ -855,3 +911,156 @@ def ParseTimespec(value): except ValueError: raise errors.OpPrereqError("Invalid time specification '%s'" % value) return value + + +def GetOnlineNodes(nodes, cl=None, nowarn=False): + """Returns the names of online nodes. + + This function will also log a warning on stderr with the names of + the online nodes. + + @param nodes: if not empty, use only this subset of nodes (minus the + offline ones) + @param cl: if not None, luxi client to use + @type nowarn: boolean + @param nowarn: by default, this function will output a note with the + offline nodes that are skipped; if this parameter is True the + note is not displayed + + """ + if cl is None: + cl = GetClient() + + result = cl.QueryNodes(names=nodes, fields=["name", "offline"], + use_locking=False) + offline = [row[0] for row in result if row[1]] + if offline and not nowarn: + ToStderr("Note: skipping offline node(s): %s" % ", ".join(offline)) + return [row[0] for row in result if not row[1]] + + +def _ToStream(stream, txt, *args): + """Write a message to a stream, bypassing the logging system + + @type stream: file object + @param stream: the file to which we should write + @type txt: str + @param txt: the message + + """ + if args: + args = tuple(args) + stream.write(txt % args) + else: + stream.write(txt) + stream.write('\n') + stream.flush() + + +def ToStdout(txt, *args): + """Write a message to stdout only, bypassing the logging system + + This is just a wrapper over _ToStream. + + @type txt: str + @param txt: the message + + """ + _ToStream(sys.stdout, txt, *args) + + +def ToStderr(txt, *args): + """Write a message to stderr only, bypassing the logging system + + This is just a wrapper over _ToStream. + + @type txt: str + @param txt: the message + + """ + _ToStream(sys.stderr, txt, *args) + + +class JobExecutor(object): + """Class which manages the submission and execution of multiple jobs. + + Note that instances of this class should not be reused between + GetResults() calls. + + """ + def __init__(self, cl=None, verbose=True): + self.queue = [] + if cl is None: + cl = GetClient() + self.cl = cl + self.verbose = verbose + self.jobs = [] + + def QueueJob(self, name, *ops): + """Record a job for later submit. + + @type name: string + @param name: a description of the job, will be used in WaitJobSet + """ + self.queue.append((name, ops)) + + def SubmitPending(self): + """Submit all pending jobs. + + """ + results = self.cl.SubmitManyJobs([row[1] for row in self.queue]) + for ((status, data), (name, _)) in zip(results, self.queue): + self.jobs.append((status, data, name)) + + def GetResults(self): + """Wait for and return the results of all jobs. + + @rtype: list + @return: list of tuples (success, job results), in the same order + as the submitted jobs; if a job has failed, instead of the result + there will be the error message + + """ + if not self.jobs: + self.SubmitPending() + results = [] + if self.verbose: + ok_jobs = [row[1] for row in self.jobs if row[0]] + if ok_jobs: + ToStdout("Submitted jobs %s", ", ".join(ok_jobs)) + for submit_status, jid, name in self.jobs: + if not submit_status: + ToStderr("Failed to submit job for %s: %s", name, jid) + results.append((False, jid)) + continue + if self.verbose: + ToStdout("Waiting for job %s for %s...", jid, name) + try: + job_result = PollJob(jid, cl=self.cl) + success = True + except (errors.GenericError, luxi.ProtocolError), err: + _, job_result = FormatError(err) + success = False + # the error message will always be shown, verbose or not + ToStderr("Job %s for %s has failed: %s", jid, name, job_result) + + results.append((success, job_result)) + return results + + def WaitOrShow(self, wait): + """Wait for job results or only print the job IDs. + + @type wait: boolean + @param wait: whether to wait or not + + """ + if wait: + return self.GetResults() + else: + if not self.jobs: + self.SubmitPending() + for status, result, name in self.jobs: + if status: + ToStdout("%s: %s", result, name) + else: + ToStderr("Failure for %s: %s", name, result)