Better specify what packages to install
[ganeti-local] / lib / client / gnt_job.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Job related commands"""
22
23 # pylint: disable=W0401,W0613,W0614,C0103
24 # W0401: Wildcard import ganeti.cli
25 # W0613: Unused argument, since all functions follow the same API
26 # W0614: Unused import %s from wildcard import (since we need cli)
27 # C0103: Invalid name gnt-job
28
29 from ganeti.cli import *
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import utils
33 from ganeti import cli
34 from ganeti import qlang
35
36
37 #: default list of fields for L{ListJobs}
38 _LIST_DEF_FIELDS = ["id", "status", "summary"]
39
40 #: map converting the job status contants to user-visible
41 #: names
42 _USER_JOB_STATUS = {
43   constants.JOB_STATUS_QUEUED: "queued",
44   constants.JOB_STATUS_WAITING: "waiting",
45   constants.JOB_STATUS_CANCELING: "canceling",
46   constants.JOB_STATUS_RUNNING: "running",
47   constants.JOB_STATUS_CANCELED: "canceled",
48   constants.JOB_STATUS_SUCCESS: "success",
49   constants.JOB_STATUS_ERROR: "error",
50   }
51
52
53 def _FormatStatus(value):
54   """Formats a job status.
55
56   """
57   try:
58     return _USER_JOB_STATUS[value]
59   except KeyError:
60     raise errors.ProgrammerError("Unknown job status code '%s'" % value)
61
62
63 def _FormatSummary(value):
64   """Formats a job's summary. Takes possible non-ascii encoding into account.
65
66   """
67   return ','.encode('utf-8').join(item.encode('utf-8') for item in value)
68
69
70 _JOB_LIST_FORMAT = {
71   "status": (_FormatStatus, False),
72   "summary": (_FormatSummary, False),
73   }
74 _JOB_LIST_FORMAT.update(dict.fromkeys(["opstart", "opexec", "opend"],
75                                       (lambda value: map(FormatTimestamp,
76                                                          value),
77                                        None)))
78
79
80 def _ParseJobIds(args):
81   """Parses a list of string job IDs into integers.
82
83   @param args: list of strings
84   @return: list of integers
85   @raise OpPrereqError: in case of invalid values
86
87   """
88   try:
89     return [int(a) for a in args]
90   except (ValueError, TypeError), err:
91     raise errors.OpPrereqError("Invalid job ID passed: %s" % err,
92                                errors.ECODE_INVAL)
93
94
95 def ListJobs(opts, args):
96   """List the jobs
97
98   @param opts: the command line options selected by the user
99   @type args: list
100   @param args: should be an empty list
101   @rtype: int
102   @return: the desired exit code
103
104   """
105   selected_fields = ParseFields(opts.output, _LIST_DEF_FIELDS)
106
107   if opts.archived and "archived" not in selected_fields:
108     selected_fields.append("archived")
109
110   qfilter = qlang.MakeSimpleFilter("status", opts.status_filter)
111
112   cl = GetClient(query=True)
113
114   return GenericList(constants.QR_JOB, selected_fields, args, None,
115                      opts.separator, not opts.no_headers,
116                      format_override=_JOB_LIST_FORMAT, verbose=opts.verbose,
117                      force_filter=opts.force_filter, namefield="id",
118                      qfilter=qfilter, isnumeric=True, cl=cl)
119
120
121 def ListJobFields(opts, args):
122   """List job fields.
123
124   @param opts: the command line options selected by the user
125   @type args: list
126   @param args: fields to list, or empty for all
127   @rtype: int
128   @return: the desired exit code
129
130   """
131   cl = GetClient(query=True)
132
133   return GenericListFields(constants.QR_JOB, args, opts.separator,
134                            not opts.no_headers, cl=cl)
135
136
137 def ArchiveJobs(opts, args):
138   """Archive jobs.
139
140   @param opts: the command line options selected by the user
141   @type args: list
142   @param args: should contain the job IDs to be archived
143   @rtype: int
144   @return: the desired exit code
145
146   """
147   client = GetClient()
148
149   rcode = 0
150   for job_id in args:
151     if not client.ArchiveJob(job_id):
152       ToStderr("Failed to archive job with ID '%s'", job_id)
153       rcode = 1
154
155   return rcode
156
157
158 def AutoArchiveJobs(opts, args):
159   """Archive jobs based on age.
160
161   This will archive jobs based on their age, or all jobs if a 'all' is
162   passed.
163
164   @param opts: the command line options selected by the user
165   @type args: list
166   @param args: should contain only one element, the age as a time spec
167       that can be parsed by L{ganeti.cli.ParseTimespec} or the
168       keyword I{all}, which will cause all jobs to be archived
169   @rtype: int
170   @return: the desired exit code
171
172   """
173   client = GetClient()
174
175   age = args[0]
176
177   if age == "all":
178     age = -1
179   else:
180     age = ParseTimespec(age)
181
182   (archived_count, jobs_left) = client.AutoArchiveJobs(age)
183   ToStdout("Archived %s jobs, %s unchecked left", archived_count, jobs_left)
184
185   return 0
186
187
188 def _MultiJobAction(opts, args, cl, stdout_fn, ask_fn, question, action_fn):
189   """Applies a function to multipe jobs.
190
191   @param opts: Command line options
192   @type args: list
193   @param args: Job IDs
194   @rtype: int
195   @return: Exit code
196
197   """
198   if cl is None:
199     cl = GetClient()
200
201   if stdout_fn is None:
202     stdout_fn = ToStdout
203
204   if ask_fn is None:
205     ask_fn = AskUser
206
207   result = constants.EXIT_SUCCESS
208
209   if bool(args) ^ (opts.status_filter is None):
210     raise errors.OpPrereqError("Either a status filter or job ID(s) must be"
211                                " specified and never both", errors.ECODE_INVAL)
212
213   if opts.status_filter is not None:
214     response = cl.Query(constants.QR_JOB, ["id", "status", "summary"],
215                         qlang.MakeSimpleFilter("status", opts.status_filter))
216
217     jobs = [i for ((_, i), _, _) in response.data]
218     if not jobs:
219       raise errors.OpPrereqError("No jobs with the requested status have been"
220                                  " found", errors.ECODE_STATE)
221
222     if not opts.force:
223       (_, table) = FormatQueryResult(response, header=True,
224                                      format_override=_JOB_LIST_FORMAT)
225       for line in table:
226         stdout_fn(line)
227
228       if not ask_fn(question):
229         return constants.EXIT_CONFIRMATION
230   else:
231     jobs = args
232
233   for job_id in jobs:
234     (success, msg) = action_fn(cl, job_id)
235
236     if not success:
237       result = constants.EXIT_FAILURE
238
239     stdout_fn(msg)
240
241   return result
242
243
244 def CancelJobs(opts, args, cl=None, _stdout_fn=ToStdout, _ask_fn=AskUser):
245   """Cancel not-yet-started jobs.
246
247   @param opts: the command line options selected by the user
248   @type args: list
249   @param args: should contain the job IDs to be cancelled
250   @rtype: int
251   @return: the desired exit code
252
253   """
254   return _MultiJobAction(opts, args, cl, _stdout_fn, _ask_fn,
255                          "Cancel job(s) listed above?",
256                          lambda cl, job_id: cl.CancelJob(job_id))
257
258
259 def ChangePriority(opts, args):
260   """Change priority of jobs.
261
262   @param opts: Command line options
263   @type args: list
264   @param args: Job IDs
265   @rtype: int
266   @return: Exit code
267
268   """
269   if opts.priority is None:
270     ToStderr("--priority option must be given.")
271     return constants.EXIT_FAILURE
272
273   return _MultiJobAction(opts, args, None, None, None,
274                          "Change priority of job(s) listed above?",
275                          lambda cl, job_id:
276                            cl.ChangeJobPriority(job_id, opts.priority))
277
278
279 def ShowJobs(opts, args):
280   """Show detailed information about jobs.
281
282   @param opts: the command line options selected by the user
283   @type args: list
284   @param args: should contain the job IDs to be queried
285   @rtype: int
286   @return: the desired exit code
287
288   """
289   def format_msg(level, text):
290     """Display the text indented."""
291     ToStdout("%s%s", "  " * level, text)
292
293   def result_helper(value):
294     """Format a result field in a nice way."""
295     if isinstance(value, (tuple, list)):
296       return "[%s]" % utils.CommaJoin(value)
297     else:
298       return str(value)
299
300   selected_fields = [
301     "id", "status", "ops", "opresult", "opstatus", "oplog",
302     "opstart", "opexec", "opend", "received_ts", "start_ts", "end_ts",
303     ]
304
305   qfilter = qlang.MakeSimpleFilter("id", _ParseJobIds(args))
306   cl = GetClient(query=True)
307   result = cl.Query(constants.QR_JOB, selected_fields, qfilter).data
308
309   first = True
310
311   for entry in result:
312     if not first:
313       format_msg(0, "")
314     else:
315       first = False
316
317     ((_, job_id), (rs_status, status), (_, ops), (_, opresult), (_, opstatus),
318      (_, oplog), (_, opstart), (_, opexec), (_, opend), (_, recv_ts),
319      (_, start_ts), (_, end_ts)) = entry
320
321     # Detect non-normal results
322     if rs_status != constants.RS_NORMAL:
323       format_msg(0, "Job ID %s not found" % job_id)
324       continue
325
326     format_msg(0, "Job ID: %s" % job_id)
327     if status in _USER_JOB_STATUS:
328       status = _USER_JOB_STATUS[status]
329     else:
330       raise errors.ProgrammerError("Unknown job status code '%s'" % status)
331
332     format_msg(1, "Status: %s" % status)
333
334     if recv_ts is not None:
335       format_msg(1, "Received:         %s" % FormatTimestamp(recv_ts))
336     else:
337       format_msg(1, "Missing received timestamp (%s)" % str(recv_ts))
338
339     if start_ts is not None:
340       if recv_ts is not None:
341         d1 = start_ts[0] - recv_ts[0] + (start_ts[1] - recv_ts[1]) / 1000000.0
342         delta = " (delta %.6fs)" % d1
343       else:
344         delta = ""
345       format_msg(1, "Processing start: %s%s" %
346                  (FormatTimestamp(start_ts), delta))
347     else:
348       format_msg(1, "Processing start: unknown (%s)" % str(start_ts))
349
350     if end_ts is not None:
351       if start_ts is not None:
352         d2 = end_ts[0] - start_ts[0] + (end_ts[1] - start_ts[1]) / 1000000.0
353         delta = " (delta %.6fs)" % d2
354       else:
355         delta = ""
356       format_msg(1, "Processing end:   %s%s" %
357                  (FormatTimestamp(end_ts), delta))
358     else:
359       format_msg(1, "Processing end:   unknown (%s)" % str(end_ts))
360
361     if end_ts is not None and recv_ts is not None:
362       d3 = end_ts[0] - recv_ts[0] + (end_ts[1] - recv_ts[1]) / 1000000.0
363       format_msg(1, "Total processing time: %.6f seconds" % d3)
364     else:
365       format_msg(1, "Total processing time: N/A")
366     format_msg(1, "Opcodes:")
367     for (opcode, result, status, log, s_ts, x_ts, e_ts) in \
368             zip(ops, opresult, opstatus, oplog, opstart, opexec, opend):
369       format_msg(2, "%s" % opcode["OP_ID"])
370       format_msg(3, "Status: %s" % status)
371       if isinstance(s_ts, (tuple, list)):
372         format_msg(3, "Processing start: %s" % FormatTimestamp(s_ts))
373       else:
374         format_msg(3, "No processing start time")
375       if isinstance(x_ts, (tuple, list)):
376         format_msg(3, "Execution start:  %s" % FormatTimestamp(x_ts))
377       else:
378         format_msg(3, "No execution start time")
379       if isinstance(e_ts, (tuple, list)):
380         format_msg(3, "Processing end:   %s" % FormatTimestamp(e_ts))
381       else:
382         format_msg(3, "No processing end time")
383       format_msg(3, "Input fields:")
384       for key in utils.NiceSort(opcode.keys()):
385         if key == "OP_ID":
386           continue
387         val = opcode[key]
388         if isinstance(val, (tuple, list)):
389           val = ",".join([str(item) for item in val])
390         format_msg(4, "%s: %s" % (key, val))
391       if result is None:
392         format_msg(3, "No output data")
393       elif isinstance(result, (tuple, list)):
394         if not result:
395           format_msg(3, "Result: empty sequence")
396         else:
397           format_msg(3, "Result:")
398           for elem in result:
399             format_msg(4, result_helper(elem))
400       elif isinstance(result, dict):
401         if not result:
402           format_msg(3, "Result: empty dictionary")
403         else:
404           format_msg(3, "Result:")
405           for key, val in result.iteritems():
406             format_msg(4, "%s: %s" % (key, result_helper(val)))
407       else:
408         format_msg(3, "Result: %s" % result)
409       format_msg(3, "Execution log:")
410       for serial, log_ts, log_type, log_msg in log:
411         time_txt = FormatTimestamp(log_ts)
412         encoded = FormatLogMessage(log_type, log_msg)
413         format_msg(4, "%s:%s:%s %s" % (serial, time_txt, log_type, encoded))
414   return 0
415
416
417 def WatchJob(opts, args):
418   """Follow a job and print its output as it arrives.
419
420   @param opts: the command line options selected by the user
421   @type args: list
422   @param args: Contains the job ID
423   @rtype: int
424   @return: the desired exit code
425
426   """
427   job_id = args[0]
428
429   msg = ("Output from job %s follows" % job_id)
430   ToStdout(msg)
431   ToStdout("-" * len(msg))
432
433   retcode = 0
434   try:
435     cli.PollJob(job_id)
436   except errors.GenericError, err:
437     (retcode, job_result) = cli.FormatError(err)
438     ToStderr("Job %s failed: %s", job_id, job_result)
439
440   return retcode
441
442
443 _PENDING_OPT = \
444   cli_option("--pending", default=None,
445              action="store_const", dest="status_filter",
446              const=constants.JOBS_PENDING,
447              help="Select jobs pending execution or being cancelled")
448
449 _RUNNING_OPT = \
450   cli_option("--running", default=None,
451              action="store_const", dest="status_filter",
452              const=frozenset([
453                constants.JOB_STATUS_RUNNING,
454                ]),
455              help="Show jobs currently running only")
456
457 _ERROR_OPT = \
458   cli_option("--error", default=None,
459              action="store_const", dest="status_filter",
460              const=frozenset([
461                constants.JOB_STATUS_ERROR,
462                ]),
463              help="Show failed jobs only")
464
465 _FINISHED_OPT = \
466   cli_option("--finished", default=None,
467              action="store_const", dest="status_filter",
468              const=constants.JOBS_FINALIZED,
469              help="Show finished jobs only")
470
471 _ARCHIVED_OPT = \
472   cli_option("--archived", default=False,
473              action="store_true", dest="archived",
474              help="Include archived jobs in list (slow and expensive)")
475
476 _QUEUED_OPT = \
477   cli_option("--queued", default=None,
478              action="store_const", dest="status_filter",
479              const=frozenset([
480                constants.JOB_STATUS_QUEUED,
481                ]),
482              help="Select queued jobs only")
483
484 _WAITING_OPT = \
485   cli_option("--waiting", default=None,
486              action="store_const", dest="status_filter",
487              const=frozenset([
488                constants.JOB_STATUS_WAITING,
489                ]),
490              help="Select waiting jobs only")
491
492
493 commands = {
494   "list": (
495     ListJobs, [ArgJobId()],
496     [NOHDR_OPT, SEP_OPT, FIELDS_OPT, VERBOSE_OPT, FORCE_FILTER_OPT,
497      _PENDING_OPT, _RUNNING_OPT, _ERROR_OPT, _FINISHED_OPT, _ARCHIVED_OPT],
498     "[job_id ...]",
499     "Lists the jobs and their status. The available fields can be shown"
500     " using the \"list-fields\" command (see the man page for details)."
501     " The default field list is (in order): %s." %
502     utils.CommaJoin(_LIST_DEF_FIELDS)),
503   "list-fields": (
504     ListJobFields, [ArgUnknown()],
505     [NOHDR_OPT, SEP_OPT],
506     "[fields...]",
507     "Lists all available fields for jobs"),
508   "archive": (
509     ArchiveJobs, [ArgJobId(min=1)], [],
510     "<job-id> [<job-id> ...]", "Archive specified jobs"),
511   "autoarchive": (
512     AutoArchiveJobs,
513     [ArgSuggest(min=1, max=1, choices=["1d", "1w", "4w", "all"])],
514     [],
515     "<age>", "Auto archive jobs older than the given age"),
516   "cancel": (
517     CancelJobs, [ArgJobId()],
518     [FORCE_OPT, _PENDING_OPT, _QUEUED_OPT, _WAITING_OPT],
519     "{[--force] {--pending | --queued | --waiting} |"
520     " <job-id> [<job-id> ...]}",
521     "Cancel jobs"),
522   "info": (
523     ShowJobs, [ArgJobId(min=1)], [],
524     "<job-id> [<job-id> ...]",
525     "Show detailed information about the specified jobs"),
526   "watch": (
527     WatchJob, [ArgJobId(min=1, max=1)], [],
528     "<job-id>", "Follows a job and prints its output as it arrives"),
529   "change-priority": (
530     ChangePriority, [ArgJobId()],
531     [PRIORITY_OPT, FORCE_OPT, _PENDING_OPT, _QUEUED_OPT, _WAITING_OPT],
532     "--priority <priority> {[--force] {--pending | --queued | --waiting} |"
533     " <job-id> [<job-id> ...]}",
534     "Change the priority of jobs"),
535   }
536
537
538 #: dictionary with aliases for commands
539 aliases = {
540   "show": "info",
541   }
542
543
544 def Main():
545   return GenericMain(commands, aliases=aliases)