X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/d1b47b168f9729ad0ad555d6ae15d05a25290ab5..86aa9ba32f810956948ff4e578c828351998a82c:/lib/client/gnt_job.py diff --git a/lib/client/gnt_job.py b/lib/client/gnt_job.py index 1049760..81f0bd1 100644 --- a/lib/client/gnt_job.py +++ b/lib/client/gnt_job.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007 Google Inc. +# Copyright (C) 2006, 2007, 2012 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -20,7 +20,7 @@ """Job related commands""" -# pylint: disable-msg=W0401,W0613,W0614,C0103 +# pylint: disable=W0401,W0613,W0614,C0103 # W0401: Wildcard import ganeti.cli # W0613: Unused argument, since all functions follow the same API # W0614: Unused import %s from wildcard import (since we need cli) @@ -31,6 +31,7 @@ from ganeti import constants from ganeti import errors from ganeti import utils from ganeti import cli +from ganeti import qlang #: default list of fields for L{ListJobs} @@ -40,7 +41,7 @@ _LIST_DEF_FIELDS = ["id", "status", "summary"] #: names _USER_JOB_STATUS = { constants.JOB_STATUS_QUEUED: "queued", - constants.JOB_STATUS_WAITLOCK: "waiting", + constants.JOB_STATUS_WAITING: "waiting", constants.JOB_STATUS_CANCELING: "canceling", constants.JOB_STATUS_RUNNING: "running", constants.JOB_STATUS_CANCELED: "canceled", @@ -49,6 +50,41 @@ _USER_JOB_STATUS = { } +def _FormatStatus(value): + """Formats a job status. + + """ + try: + return _USER_JOB_STATUS[value] + except KeyError: + raise errors.ProgrammerError("Unknown job status code '%s'" % value) + + +_JOB_LIST_FORMAT = { + "status": (_FormatStatus, False), + "summary": (lambda value: ",".join(str(item) for item in value), False), + } +_JOB_LIST_FORMAT.update(dict.fromkeys(["opstart", "opexec", "opend"], + (lambda value: map(FormatTimestamp, + value), + None))) + + +def _ParseJobIds(args): + """Parses a list of string job IDs into integers. + + @param args: list of strings + @return: list of integers + @raise OpPrereqError: in case of invalid values + + """ + try: + return [int(a) for a in args] + except (ValueError, TypeError), err: + raise errors.OpPrereqError("Invalid job ID passed: %s" % err, + errors.ECODE_INVAL) + + def ListJobs(opts, args): """List the jobs @@ -61,60 +97,30 @@ def ListJobs(opts, args): """ selected_fields = ParseFields(opts.output, _LIST_DEF_FIELDS) - output = GetClient().QueryJobs(args, selected_fields) - if not opts.no_headers: - # TODO: Implement more fields - headers = { - "id": "ID", - "status": "Status", - "priority": "Prio", - "ops": "OpCodes", - "opresult": "OpCode_result", - "opstatus": "OpCode_status", - "oplog": "OpCode_log", - "summary": "Summary", - "opstart": "OpCode_start", - "opexec": "OpCode_exec", - "opend": "OpCode_end", - "oppriority": "OpCode_prio", - "start_ts": "Start", - "end_ts": "End", - "received_ts": "Received", - } - else: - headers = None + if opts.archived and "archived" not in selected_fields: + selected_fields.append("archived") - numfields = ["priority"] + qfilter = qlang.MakeSimpleFilter("status", opts.status_filter) - # change raw values to nicer strings - for row_id, row in enumerate(output): - if row is None: - ToStderr("No such job: %s" % args[row_id]) - continue + return GenericList(constants.QR_JOB, selected_fields, args, None, + opts.separator, not opts.no_headers, + format_override=_JOB_LIST_FORMAT, verbose=opts.verbose, + force_filter=opts.force_filter, namefield="id", + qfilter=qfilter, isnumeric=True) - for idx, field in enumerate(selected_fields): - val = row[idx] - if field == "status": - if val in _USER_JOB_STATUS: - val = _USER_JOB_STATUS[val] - else: - raise errors.ProgrammerError("Unknown job status code '%s'" % val) - elif field == "summary": - val = ",".join(val) - elif field in ("start_ts", "end_ts", "received_ts"): - val = FormatTimestamp(val) - elif field in ("opstart", "opexec", "opend"): - val = [FormatTimestamp(entry) for entry in val] - - row[idx] = str(val) - - data = GenerateTable(separator=opts.separator, headers=headers, - fields=selected_fields, data=output, - numfields=numfields) - for line in data: - ToStdout(line) - return 0 +def ListJobFields(opts, args): + """List job fields. + + @param opts: the command line options selected by the user + @type args: list + @param args: fields to list, or empty for all + @rtype: int + @return: the desired exit code + + """ + return GenericListFields(constants.QR_JOB, args, opts.separator, + not opts.no_headers) def ArchiveJobs(opts, args): @@ -157,7 +163,7 @@ def AutoArchiveJobs(opts, args): age = args[0] - if age == 'all': + if age == "all": age = -1 else: age = ParseTimespec(age) @@ -168,30 +174,97 @@ def AutoArchiveJobs(opts, args): return 0 -def CancelJobs(opts, args): - """Cancel not-yet-started jobs. +def _MultiJobAction(opts, args, cl, stdout_fn, ask_fn, question, action_fn): + """Applies a function to multipe jobs. - @param opts: the command line options selected by the user + @param opts: Command line options @type args: list - @param args: should contain the job IDs to be cancelled + @param args: Job IDs @rtype: int - @return: the desired exit code + @return: Exit code """ - client = GetClient() + if cl is None: + cl = GetClient() + + if stdout_fn is None: + stdout_fn = ToStdout + + if ask_fn is None: + ask_fn = AskUser + result = constants.EXIT_SUCCESS - for job_id in args: - (success, msg) = client.CancelJob(job_id) + if bool(args) ^ (opts.status_filter is None): + raise errors.OpPrereqError("Either a status filter or job ID(s) must be" + " specified and never both", errors.ECODE_INVAL) + + if opts.status_filter is not None: + response = cl.Query(constants.QR_JOB, ["id", "status", "summary"], + qlang.MakeSimpleFilter("status", opts.status_filter)) + + jobs = [i for ((_, i), _, _) in response.data] + if not jobs: + raise errors.OpPrereqError("No jobs with the requested status have been" + " found", errors.ECODE_STATE) + + if not opts.force: + (_, table) = FormatQueryResult(response, header=True, + format_override=_JOB_LIST_FORMAT) + for line in table: + stdout_fn(line) + + if not ask_fn(question): + return constants.EXIT_CONFIRMATION + else: + jobs = args + + for job_id in jobs: + (success, msg) = action_fn(cl, job_id) if not success: result = constants.EXIT_FAILURE - ToStdout(msg) + stdout_fn(msg) return result +def CancelJobs(opts, args, cl=None, _stdout_fn=ToStdout, _ask_fn=AskUser): + """Cancel not-yet-started jobs. + + @param opts: the command line options selected by the user + @type args: list + @param args: should contain the job IDs to be cancelled + @rtype: int + @return: the desired exit code + + """ + return _MultiJobAction(opts, args, cl, _stdout_fn, _ask_fn, + "Cancel job(s) listed above?", + lambda cl, job_id: cl.CancelJob(job_id)) + + +def ChangePriority(opts, args): + """Change priority of jobs. + + @param opts: Command line options + @type args: list + @param args: Job IDs + @rtype: int + @return: Exit code + + """ + if opts.priority is None: + ToStderr("--priority option must be given.") + return constants.EXIT_FAILURE + + return _MultiJobAction(opts, args, None, None, None, + "Change priority of job(s) listed above?", + lambda cl, job_id: + cl.ChangeJobPriority(job_id, opts.priority)) + + def ShowJobs(opts, args): """Show detailed information about jobs. @@ -218,27 +291,26 @@ def ShowJobs(opts, args): "opstart", "opexec", "opend", "received_ts", "start_ts", "end_ts", ] - result = GetClient().QueryJobs(args, selected_fields) + qfilter = qlang.MakeSimpleFilter("id", _ParseJobIds(args)) + result = GetClient().Query(constants.QR_JOB, selected_fields, qfilter).data first = True - for idx, entry in enumerate(result): + for entry in result: if not first: format_msg(0, "") else: first = False - if entry is None: - if idx <= len(args): - format_msg(0, "Job ID %s not found" % args[idx]) - else: - # this should not happen, when we don't pass args it will be a - # valid job returned - format_msg(0, "Job ID requested as argument %s not found" % (idx + 1)) + ((_, job_id), (rs_status, status), (_, ops), (_, opresult), (_, opstatus), + (_, oplog), (_, opstart), (_, opexec), (_, opend), (_, recv_ts), + (_, start_ts), (_, end_ts)) = entry + + # Detect non-normal results + if rs_status != constants.RS_NORMAL: + format_msg(0, "Job ID %s not found" % job_id) continue - (job_id, status, ops, opresult, opstatus, oplog, - opstart, opexec, opend, recv_ts, start_ts, end_ts) = entry format_msg(0, "Job ID: %s" % job_id) if status in _USER_JOB_STATUS: status = _USER_JOB_STATUS[status] @@ -356,36 +428,106 @@ def WatchJob(opts, args): return retcode +_PENDING_OPT = \ + cli_option("--pending", default=None, + action="store_const", dest="status_filter", + const=constants.JOBS_PENDING, + help="Select jobs pending execution or being cancelled") + +_RUNNING_OPT = \ + cli_option("--running", default=None, + action="store_const", dest="status_filter", + const=frozenset([ + constants.JOB_STATUS_RUNNING, + ]), + help="Show jobs currently running only") + +_ERROR_OPT = \ + cli_option("--error", default=None, + action="store_const", dest="status_filter", + const=frozenset([ + constants.JOB_STATUS_ERROR, + ]), + help="Show failed jobs only") + +_FINISHED_OPT = \ + cli_option("--finished", default=None, + action="store_const", dest="status_filter", + const=constants.JOBS_FINALIZED, + help="Show finished jobs only") + +_ARCHIVED_OPT = \ + cli_option("--archived", default=False, + action="store_true", dest="archived", + help="Include archived jobs in list (slow and expensive)") + +_QUEUED_OPT = \ + cli_option("--queued", default=None, + action="store_const", dest="status_filter", + const=frozenset([ + constants.JOB_STATUS_QUEUED, + ]), + help="Select queued jobs only") + +_WAITING_OPT = \ + cli_option("--waiting", default=None, + action="store_const", dest="status_filter", + const=frozenset([ + constants.JOB_STATUS_WAITING, + ]), + help="Select waiting jobs only") + + commands = { - 'list': ( + "list": ( ListJobs, [ArgJobId()], - [NOHDR_OPT, SEP_OPT, FIELDS_OPT], + [NOHDR_OPT, SEP_OPT, FIELDS_OPT, VERBOSE_OPT, FORCE_FILTER_OPT, + _PENDING_OPT, _RUNNING_OPT, _ERROR_OPT, _FINISHED_OPT, _ARCHIVED_OPT], "[job_id ...]", - "List the jobs and their status. The available fields are" - " (see the man page for details): id, status, op_list," - " op_status, op_result." - " The default field" - " list is (in order): %s." % utils.CommaJoin(_LIST_DEF_FIELDS)), - 'archive': ( + "Lists the jobs and their status. The available fields can be shown" + " using the \"list-fields\" command (see the man page for details)." + " The default field list is (in order): %s." % + utils.CommaJoin(_LIST_DEF_FIELDS)), + "list-fields": ( + ListJobFields, [ArgUnknown()], + [NOHDR_OPT, SEP_OPT], + "[fields...]", + "Lists all available fields for jobs"), + "archive": ( ArchiveJobs, [ArgJobId(min=1)], [], " [ ...]", "Archive specified jobs"), - 'autoarchive': ( + "autoarchive": ( AutoArchiveJobs, [ArgSuggest(min=1, max=1, choices=["1d", "1w", "4w", "all"])], [], "", "Auto archive jobs older than the given age"), - 'cancel': ( - CancelJobs, [ArgJobId(min=1)], [], - " [ ...]", "Cancel specified jobs"), - 'info': ( + "cancel": ( + CancelJobs, [ArgJobId()], + [FORCE_OPT, _PENDING_OPT, _QUEUED_OPT, _WAITING_OPT], + "{[--force] {--pending | --queued | --waiting} |" + " [ ...]}", + "Cancel jobs"), + "info": ( ShowJobs, [ArgJobId(min=1)], [], " [ ...]", "Show detailed information about the specified jobs"), - 'watch': ( + "watch": ( WatchJob, [ArgJobId(min=1, max=1)], [], "", "Follows a job and prints its output as it arrives"), + "change-priority": ( + ChangePriority, [ArgJobId()], + [PRIORITY_OPT, FORCE_OPT, _PENDING_OPT, _QUEUED_OPT, _WAITING_OPT], + "--priority {[--force] {--pending | --queued | --waiting} |" + " [ ...]}", + "Change the priority of jobs"), + } + + +#: dictionary with aliases for commands +aliases = { + "show": "info", } def Main(): - return GenericMain(commands) + return GenericMain(commands, aliases=aliases)