X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/f87b405e8ce72544084472079939e987a9ad9bec..2ee88aeb76a2430ec0c7f86629bf66cfd0b6f564:/lib/cli.py diff --git a/lib/cli.py b/lib/cli.py index dea7078..281fb5c 100644 --- a/lib/cli.py +++ b/lib/cli.py @@ -50,9 +50,8 @@ __all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain", "ListTags", "AddTags", "RemoveTags", "TAG_SRC_OPT", "FormatError", "SplitNodeOption", "SubmitOrSend", "JobSubmittedException", "FormatTimestamp", "ParseTimespec", - "ValidateBeParams", - "ToStderr", "ToStdout", - "UsesRPC", + "ToStderr", "ToStdout", "UsesRPC", + "GetOnlineNodes", "JobExecutor", "SYNC_OPT", "CONFIRM_OPT", ] @@ -183,6 +182,9 @@ FIELDS_OPT = make_option("-o", "--output", dest="output", action="store", FORCE_OPT = make_option("-f", "--force", dest="force", action="store_true", default=False, help="Force the operation") +CONFIRM_OPT = make_option("--yes", dest="confirm", action="store_true", + default=False, help="Do not require confirmation") + TAG_SRC_OPT = make_option("--from", dest="tags_source", default=None, help="File with tag names") @@ -191,6 +193,11 @@ SUBMIT_OPT = make_option("--submit", dest="submit_only", help="Submit the job and return the job ID, but" " don't wait for the job to finish") +SYNC_OPT = make_option("--sync", dest="do_locking", + default=False, action="store_true", + help="Grab locks while doing the queries" + " in order to ensure more consistent results") + def ARGS_FIXED(val): """Macro-like function denoting a fixed number of arguments""" @@ -405,27 +412,6 @@ def SplitNodeOption(value): return (value, None) -def ValidateBeParams(bep): - """Parse and check the given beparams. - - The function will update in-place the given dictionary. - - @type bep: dict - @param bep: input beparams - @raise errors.ParameterError: if the input values are not OK - @raise errors.UnitParseError: if the input values are not OK - - """ - if constants.BE_MEMORY in bep: - bep[constants.BE_MEMORY] = utils.ParseUnit(bep[constants.BE_MEMORY]) - - if constants.BE_VCPUS in bep: - try: - bep[constants.BE_VCPUS] = int(bep[constants.BE_VCPUS]) - except ValueError: - raise errors.ParameterError("Invalid number of VCPUs") - - def UsesRPC(fn): def wrapper(*args, **kwargs): rpc.Init() @@ -555,7 +541,8 @@ def PollJob(job_id, cl=None, feedback_fn=None): if callable(feedback_fn): feedback_fn(log_entry[1:]) else: - print "%s %s" % (time.ctime(utils.MergeTime(timestamp)), message) + encoded = utils.SafeEncode(message) + print "%s %s" % (time.ctime(utils.MergeTime(timestamp)), encoded) prev_logmsg_serial = max(prev_logmsg_serial, serial) # TODO: Handle canceled and archived jobs @@ -690,6 +677,8 @@ def FormatError(err): elif isinstance(err, errors.JobQueueFull): obuf.write("Failure: the job queue is full and doesn't accept new" " job submissions until old jobs are archived\n") + elif isinstance(err, errors.TypeEnforcementError): + obuf.write("Parameter Error: %s" % msg) elif isinstance(err, errors.GenericError): obuf.write("Unhandled Ganeti error: %s" % msg) elif isinstance(err, luxi.NoMasterError): @@ -811,7 +800,7 @@ def GenerateTable(headers, fields, separator, data, format_fields = [] for field in fields: if headers and field not in headers: - # FIXME: handle better unknown fields (either revert to old + # TODO: handle better unknown fields (either revert to old # style of raising exception, or deal more intelligently with # variable fields) headers[field] = field @@ -829,6 +818,8 @@ def GenerateTable(headers, fields, separator, data, format = separator.replace("%", "%%").join(format_fields) for row in data: + if row is None: + continue for idx, val in enumerate(row): if unitfields.Matches(fields[idx]): try: @@ -854,6 +845,8 @@ def GenerateTable(headers, fields, separator, data, for line in data: args = [] + if line is None: + line = ['-' for _ in fields] for idx in xrange(len(fields)): if separator is None: args.append(mlens[idx]) @@ -870,7 +863,7 @@ def FormatTimestamp(ts): @param ts: a timeval-type timestamp, a tuple of seconds and microseconds @rtype: string - @returns: a string with the formatted timestamp + @return: a string with the formatted timestamp """ if not isinstance (ts, (tuple, list)) or len(ts) != 2: @@ -921,6 +914,32 @@ def ParseTimespec(value): return value +def GetOnlineNodes(nodes, cl=None, nowarn=False): + """Returns the names of online nodes. + + This function will also log a warning on stderr with the names of + the online nodes. + + @param nodes: if not empty, use only this subset of nodes (minus the + offline ones) + @param cl: if not None, luxi client to use + @type nowarn: boolean + @param nowarn: by default, this function will output a note with the + offline nodes that are skipped; if this parameter is True the + note is not displayed + + """ + if cl is None: + cl = GetClient() + + result = cl.QueryNodes(names=nodes, fields=["name", "offline"], + use_locking=False) + offline = [row[0] for row in result if row[1]] + if offline and not nowarn: + ToStderr("Note: skipping offline node(s): %s" % ", ".join(offline)) + return [row[0] for row in result if not row[1]] + + def _ToStream(stream, txt, *args): """Write a message to a stream, bypassing the logging system @@ -961,3 +980,89 @@ def ToStderr(txt, *args): """ _ToStream(sys.stderr, txt, *args) + + +class JobExecutor(object): + """Class which manages the submission and execution of multiple jobs. + + Note that instances of this class should not be reused between + GetResults() calls. + + """ + def __init__(self, cl=None, verbose=True): + self.queue = [] + if cl is None: + cl = GetClient() + self.cl = cl + self.verbose = verbose + self.jobs = [] + + def QueueJob(self, name, *ops): + """Record a job for later submit. + + @type name: string + @param name: a description of the job, will be used in WaitJobSet + """ + self.queue.append((name, ops)) + + + def SubmitPending(self): + """Submit all pending jobs. + + """ + results = self.cl.SubmitManyJobs([row[1] for row in self.queue]) + for ((status, data), (name, _)) in zip(results, self.queue): + self.jobs.append((status, data, name)) + + def GetResults(self): + """Wait for and return the results of all jobs. + + @rtype: list + @return: list of tuples (success, job results), in the same order + as the submitted jobs; if a job has failed, instead of the result + there will be the error message + + """ + if not self.jobs: + self.SubmitPending() + results = [] + if self.verbose: + ok_jobs = [row[1] for row in self.jobs if row[0]] + if ok_jobs: + ToStdout("Submitted jobs %s", ", ".join(ok_jobs)) + for submit_status, jid, name in self.jobs: + if not submit_status: + ToStderr("Failed to submit job for %s: %s", name, jid) + results.append((False, jid)) + continue + if self.verbose: + ToStdout("Waiting for job %s for %s...", jid, name) + try: + job_result = PollJob(jid, cl=self.cl) + success = True + except (errors.GenericError, luxi.ProtocolError), err: + _, job_result = FormatError(err) + success = False + # the error message will always be shown, verbose or not + ToStderr("Job %s for %s has failed: %s", jid, name, job_result) + + results.append((success, job_result)) + return results + + def WaitOrShow(self, wait): + """Wait for job results or only print the job IDs. + + @type wait: boolean + @param wait: whether to wait or not + + """ + if wait: + return self.GetResults() + else: + if not self.jobs: + self.SubmitPending() + for status, result, name in self.jobs: + if status: + ToStdout("%s: %s", result, name) + else: + ToStderr("Failure for %s: %s", name, result)