Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_job.py @ 91c17910

History | View | Annotate | Download (15.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Job related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-job
28

    
29
from ganeti.cli import *
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import utils
33
from ganeti import cli
34
from ganeti import qlang
35

    
36

    
37
#: default list of fields for L{ListJobs}
38
_LIST_DEF_FIELDS = ["id", "status", "summary"]
39

    
40
#: map converting the job status contants to user-visible
41
#: names
42
_USER_JOB_STATUS = {
43
  constants.JOB_STATUS_QUEUED: "queued",
44
  constants.JOB_STATUS_WAITING: "waiting",
45
  constants.JOB_STATUS_CANCELING: "canceling",
46
  constants.JOB_STATUS_RUNNING: "running",
47
  constants.JOB_STATUS_CANCELED: "canceled",
48
  constants.JOB_STATUS_SUCCESS: "success",
49
  constants.JOB_STATUS_ERROR: "error",
50
  }
51

    
52

    
53
def _FormatStatus(value):
54
  """Formats a job status.
55

56
  """
57
  try:
58
    return _USER_JOB_STATUS[value]
59
  except KeyError:
60
    raise errors.ProgrammerError("Unknown job status code '%s'" % value)
61

    
62

    
63
_JOB_LIST_FORMAT = {
64
  "status": (_FormatStatus, False),
65
  "summary": (lambda value: ",".join(str(item) for item in value), False),
66
  }
67
_JOB_LIST_FORMAT.update(dict.fromkeys(["opstart", "opexec", "opend"],
68
                                      (lambda value: map(FormatTimestamp,
69
                                                         value),
70
                                       None)))
71

    
72

    
73
def _ParseJobIds(args):
74
  """Parses a list of string job IDs into integers.
75

76
  @param args: list of strings
77
  @return: list of integers
78
  @raise OpPrereqError: in case of invalid values
79

80
  """
81
  try:
82
    return [int(a) for a in args]
83
  except (ValueError, TypeError), err:
84
    raise errors.OpPrereqError("Invalid job ID passed: %s" % err,
85
                               errors.ECODE_INVAL)
86

    
87

    
88
def ListJobs(opts, args):
89
  """List the jobs
90

91
  @param opts: the command line options selected by the user
92
  @type args: list
93
  @param args: should be an empty list
94
  @rtype: int
95
  @return: the desired exit code
96

97
  """
98
  selected_fields = ParseFields(opts.output, _LIST_DEF_FIELDS)
99

    
100
  if opts.archived and "archived" not in selected_fields:
101
    selected_fields.append("archived")
102

    
103
  qfilter = qlang.MakeSimpleFilter("status", opts.status_filter)
104

    
105
  cl = GetClient(query=True)
106

    
107
  return GenericList(constants.QR_JOB, selected_fields, args, None,
108
                     opts.separator, not opts.no_headers,
109
                     format_override=_JOB_LIST_FORMAT, verbose=opts.verbose,
110
                     force_filter=opts.force_filter, namefield="id",
111
                     qfilter=qfilter, isnumeric=True, cl=cl)
112

    
113

    
114
def ListJobFields(opts, args):
115
  """List job fields.
116

117
  @param opts: the command line options selected by the user
118
  @type args: list
119
  @param args: fields to list, or empty for all
120
  @rtype: int
121
  @return: the desired exit code
122

123
  """
124
  cl = GetClient(query=True)
125

    
126
  return GenericListFields(constants.QR_JOB, args, opts.separator,
127
                           not opts.no_headers, cl=cl)
128

    
129

    
130
def ArchiveJobs(opts, args):
131
  """Archive jobs.
132

133
  @param opts: the command line options selected by the user
134
  @type args: list
135
  @param args: should contain the job IDs to be archived
136
  @rtype: int
137
  @return: the desired exit code
138

139
  """
140
  client = GetClient()
141

    
142
  rcode = 0
143
  for job_id in args:
144
    if not client.ArchiveJob(job_id):
145
      ToStderr("Failed to archive job with ID '%s'", job_id)
146
      rcode = 1
147

    
148
  return rcode
149

    
150

    
151
def AutoArchiveJobs(opts, args):
152
  """Archive jobs based on age.
153

154
  This will archive jobs based on their age, or all jobs if a 'all' is
155
  passed.
156

157
  @param opts: the command line options selected by the user
158
  @type args: list
159
  @param args: should contain only one element, the age as a time spec
160
      that can be parsed by L{ganeti.cli.ParseTimespec} or the
161
      keyword I{all}, which will cause all jobs to be archived
162
  @rtype: int
163
  @return: the desired exit code
164

165
  """
166
  client = GetClient()
167

    
168
  age = args[0]
169

    
170
  if age == "all":
171
    age = -1
172
  else:
173
    age = ParseTimespec(age)
174

    
175
  (archived_count, jobs_left) = client.AutoArchiveJobs(age)
176
  ToStdout("Archived %s jobs, %s unchecked left", archived_count, jobs_left)
177

    
178
  return 0
179

    
180

    
181
def _MultiJobAction(opts, args, cl, stdout_fn, ask_fn, question, action_fn):
182
  """Applies a function to multipe jobs.
183

184
  @param opts: Command line options
185
  @type args: list
186
  @param args: Job IDs
187
  @rtype: int
188
  @return: Exit code
189

190
  """
191
  if cl is None:
192
    cl = GetClient()
193

    
194
  if stdout_fn is None:
195
    stdout_fn = ToStdout
196

    
197
  if ask_fn is None:
198
    ask_fn = AskUser
199

    
200
  result = constants.EXIT_SUCCESS
201

    
202
  if bool(args) ^ (opts.status_filter is None):
203
    raise errors.OpPrereqError("Either a status filter or job ID(s) must be"
204
                               " specified and never both", errors.ECODE_INVAL)
205

    
206
  if opts.status_filter is not None:
207
    response = cl.Query(constants.QR_JOB, ["id", "status", "summary"],
208
                        qlang.MakeSimpleFilter("status", opts.status_filter))
209

    
210
    jobs = [i for ((_, i), _, _) in response.data]
211
    if not jobs:
212
      raise errors.OpPrereqError("No jobs with the requested status have been"
213
                                 " found", errors.ECODE_STATE)
214

    
215
    if not opts.force:
216
      (_, table) = FormatQueryResult(response, header=True,
217
                                     format_override=_JOB_LIST_FORMAT)
218
      for line in table:
219
        stdout_fn(line)
220

    
221
      if not ask_fn(question):
222
        return constants.EXIT_CONFIRMATION
223
  else:
224
    jobs = args
225

    
226
  for job_id in jobs:
227
    (success, msg) = action_fn(cl, job_id)
228

    
229
    if not success:
230
      result = constants.EXIT_FAILURE
231

    
232
    stdout_fn(msg)
233

    
234
  return result
235

    
236

    
237
def CancelJobs(opts, args, cl=None, _stdout_fn=ToStdout, _ask_fn=AskUser):
238
  """Cancel not-yet-started jobs.
239

240
  @param opts: the command line options selected by the user
241
  @type args: list
242
  @param args: should contain the job IDs to be cancelled
243
  @rtype: int
244
  @return: the desired exit code
245

246
  """
247
  return _MultiJobAction(opts, args, cl, _stdout_fn, _ask_fn,
248
                         "Cancel job(s) listed above?",
249
                         lambda cl, job_id: cl.CancelJob(job_id))
250

    
251

    
252
def ChangePriority(opts, args):
253
  """Change priority of jobs.
254

255
  @param opts: Command line options
256
  @type args: list
257
  @param args: Job IDs
258
  @rtype: int
259
  @return: Exit code
260

261
  """
262
  if opts.priority is None:
263
    ToStderr("--priority option must be given.")
264
    return constants.EXIT_FAILURE
265

    
266
  return _MultiJobAction(opts, args, None, None, None,
267
                         "Change priority of job(s) listed above?",
268
                         lambda cl, job_id:
269
                           cl.ChangeJobPriority(job_id, opts.priority))
270

    
271

    
272
def ShowJobs(opts, args):
273
  """Show detailed information about jobs.
274

275
  @param opts: the command line options selected by the user
276
  @type args: list
277
  @param args: should contain the job IDs to be queried
278
  @rtype: int
279
  @return: the desired exit code
280

281
  """
282
  def format_msg(level, text):
283
    """Display the text indented."""
284
    ToStdout("%s%s", "  " * level, text)
285

    
286
  def result_helper(value):
287
    """Format a result field in a nice way."""
288
    if isinstance(value, (tuple, list)):
289
      return "[%s]" % utils.CommaJoin(value)
290
    else:
291
      return str(value)
292

    
293
  selected_fields = [
294
    "id", "status", "ops", "opresult", "opstatus", "oplog",
295
    "opstart", "opexec", "opend", "received_ts", "start_ts", "end_ts",
296
    ]
297

    
298
  qfilter = qlang.MakeSimpleFilter("id", _ParseJobIds(args))
299
  cl = GetClient(query=True)
300
  result = cl.Query(constants.QR_JOB, selected_fields, qfilter).data
301

    
302
  first = True
303

    
304
  for entry in result:
305
    if not first:
306
      format_msg(0, "")
307
    else:
308
      first = False
309

    
310
    ((_, job_id), (rs_status, status), (_, ops), (_, opresult), (_, opstatus),
311
     (_, oplog), (_, opstart), (_, opexec), (_, opend), (_, recv_ts),
312
     (_, start_ts), (_, end_ts)) = entry
313

    
314
    # Detect non-normal results
315
    if rs_status != constants.RS_NORMAL:
316
      format_msg(0, "Job ID %s not found" % job_id)
317
      continue
318

    
319
    format_msg(0, "Job ID: %s" % job_id)
320
    if status in _USER_JOB_STATUS:
321
      status = _USER_JOB_STATUS[status]
322
    else:
323
      raise errors.ProgrammerError("Unknown job status code '%s'" % status)
324

    
325
    format_msg(1, "Status: %s" % status)
326

    
327
    if recv_ts is not None:
328
      format_msg(1, "Received:         %s" % FormatTimestamp(recv_ts))
329
    else:
330
      format_msg(1, "Missing received timestamp (%s)" % str(recv_ts))
331

    
332
    if start_ts is not None:
333
      if recv_ts is not None:
334
        d1 = start_ts[0] - recv_ts[0] + (start_ts[1] - recv_ts[1]) / 1000000.0
335
        delta = " (delta %.6fs)" % d1
336
      else:
337
        delta = ""
338
      format_msg(1, "Processing start: %s%s" %
339
                 (FormatTimestamp(start_ts), delta))
340
    else:
341
      format_msg(1, "Processing start: unknown (%s)" % str(start_ts))
342

    
343
    if end_ts is not None:
344
      if start_ts is not None:
345
        d2 = end_ts[0] - start_ts[0] + (end_ts[1] - start_ts[1]) / 1000000.0
346
        delta = " (delta %.6fs)" % d2
347
      else:
348
        delta = ""
349
      format_msg(1, "Processing end:   %s%s" %
350
                 (FormatTimestamp(end_ts), delta))
351
    else:
352
      format_msg(1, "Processing end:   unknown (%s)" % str(end_ts))
353

    
354
    if end_ts is not None and recv_ts is not None:
355
      d3 = end_ts[0] - recv_ts[0] + (end_ts[1] - recv_ts[1]) / 1000000.0
356
      format_msg(1, "Total processing time: %.6f seconds" % d3)
357
    else:
358
      format_msg(1, "Total processing time: N/A")
359
    format_msg(1, "Opcodes:")
360
    for (opcode, result, status, log, s_ts, x_ts, e_ts) in \
361
            zip(ops, opresult, opstatus, oplog, opstart, opexec, opend):
362
      format_msg(2, "%s" % opcode["OP_ID"])
363
      format_msg(3, "Status: %s" % status)
364
      if isinstance(s_ts, (tuple, list)):
365
        format_msg(3, "Processing start: %s" % FormatTimestamp(s_ts))
366
      else:
367
        format_msg(3, "No processing start time")
368
      if isinstance(x_ts, (tuple, list)):
369
        format_msg(3, "Execution start:  %s" % FormatTimestamp(x_ts))
370
      else:
371
        format_msg(3, "No execution start time")
372
      if isinstance(e_ts, (tuple, list)):
373
        format_msg(3, "Processing end:   %s" % FormatTimestamp(e_ts))
374
      else:
375
        format_msg(3, "No processing end time")
376
      format_msg(3, "Input fields:")
377
      for key in utils.NiceSort(opcode.keys()):
378
        if key == "OP_ID":
379
          continue
380
        val = opcode[key]
381
        if isinstance(val, (tuple, list)):
382
          val = ",".join([str(item) for item in val])
383
        format_msg(4, "%s: %s" % (key, val))
384
      if result is None:
385
        format_msg(3, "No output data")
386
      elif isinstance(result, (tuple, list)):
387
        if not result:
388
          format_msg(3, "Result: empty sequence")
389
        else:
390
          format_msg(3, "Result:")
391
          for elem in result:
392
            format_msg(4, result_helper(elem))
393
      elif isinstance(result, dict):
394
        if not result:
395
          format_msg(3, "Result: empty dictionary")
396
        else:
397
          format_msg(3, "Result:")
398
          for key, val in result.iteritems():
399
            format_msg(4, "%s: %s" % (key, result_helper(val)))
400
      else:
401
        format_msg(3, "Result: %s" % result)
402
      format_msg(3, "Execution log:")
403
      for serial, log_ts, log_type, log_msg in log:
404
        time_txt = FormatTimestamp(log_ts)
405
        encoded = FormatLogMessage(log_type, log_msg)
406
        format_msg(4, "%s:%s:%s %s" % (serial, time_txt, log_type, encoded))
407
  return 0
408

    
409

    
410
def WatchJob(opts, args):
411
  """Follow a job and print its output as it arrives.
412

413
  @param opts: the command line options selected by the user
414
  @type args: list
415
  @param args: Contains the job ID
416
  @rtype: int
417
  @return: the desired exit code
418

419
  """
420
  job_id = args[0]
421

    
422
  msg = ("Output from job %s follows" % job_id)
423
  ToStdout(msg)
424
  ToStdout("-" * len(msg))
425

    
426
  retcode = 0
427
  try:
428
    cli.PollJob(job_id)
429
  except errors.GenericError, err:
430
    (retcode, job_result) = cli.FormatError(err)
431
    ToStderr("Job %s failed: %s", job_id, job_result)
432

    
433
  return retcode
434

    
435

    
436
_PENDING_OPT = \
437
  cli_option("--pending", default=None,
438
             action="store_const", dest="status_filter",
439
             const=constants.JOBS_PENDING,
440
             help="Select jobs pending execution or being cancelled")
441

    
442
_RUNNING_OPT = \
443
  cli_option("--running", default=None,
444
             action="store_const", dest="status_filter",
445
             const=frozenset([
446
               constants.JOB_STATUS_RUNNING,
447
               ]),
448
             help="Show jobs currently running only")
449

    
450
_ERROR_OPT = \
451
  cli_option("--error", default=None,
452
             action="store_const", dest="status_filter",
453
             const=frozenset([
454
               constants.JOB_STATUS_ERROR,
455
               ]),
456
             help="Show failed jobs only")
457

    
458
_FINISHED_OPT = \
459
  cli_option("--finished", default=None,
460
             action="store_const", dest="status_filter",
461
             const=constants.JOBS_FINALIZED,
462
             help="Show finished jobs only")
463

    
464
_ARCHIVED_OPT = \
465
  cli_option("--archived", default=False,
466
             action="store_true", dest="archived",
467
             help="Include archived jobs in list (slow and expensive)")
468

    
469
_QUEUED_OPT = \
470
  cli_option("--queued", default=None,
471
             action="store_const", dest="status_filter",
472
             const=frozenset([
473
               constants.JOB_STATUS_QUEUED,
474
               ]),
475
             help="Select queued jobs only")
476

    
477
_WAITING_OPT = \
478
  cli_option("--waiting", default=None,
479
             action="store_const", dest="status_filter",
480
             const=frozenset([
481
               constants.JOB_STATUS_WAITING,
482
               ]),
483
             help="Select waiting jobs only")
484

    
485

    
486
commands = {
487
  "list": (
488
    ListJobs, [ArgJobId()],
489
    [NOHDR_OPT, SEP_OPT, FIELDS_OPT, VERBOSE_OPT, FORCE_FILTER_OPT,
490
     _PENDING_OPT, _RUNNING_OPT, _ERROR_OPT, _FINISHED_OPT, _ARCHIVED_OPT],
491
    "[job_id ...]",
492
    "Lists the jobs and their status. The available fields can be shown"
493
    " using the \"list-fields\" command (see the man page for details)."
494
    " The default field list is (in order): %s." %
495
    utils.CommaJoin(_LIST_DEF_FIELDS)),
496
  "list-fields": (
497
    ListJobFields, [ArgUnknown()],
498
    [NOHDR_OPT, SEP_OPT],
499
    "[fields...]",
500
    "Lists all available fields for jobs"),
501
  "archive": (
502
    ArchiveJobs, [ArgJobId(min=1)], [],
503
    "<job-id> [<job-id> ...]", "Archive specified jobs"),
504
  "autoarchive": (
505
    AutoArchiveJobs,
506
    [ArgSuggest(min=1, max=1, choices=["1d", "1w", "4w", "all"])],
507
    [],
508
    "<age>", "Auto archive jobs older than the given age"),
509
  "cancel": (
510
    CancelJobs, [ArgJobId()],
511
    [FORCE_OPT, _PENDING_OPT, _QUEUED_OPT, _WAITING_OPT],
512
    "{[--force] {--pending | --queued | --waiting} |"
513
    " <job-id> [<job-id> ...]}",
514
    "Cancel jobs"),
515
  "info": (
516
    ShowJobs, [ArgJobId(min=1)], [],
517
    "<job-id> [<job-id> ...]",
518
    "Show detailed information about the specified jobs"),
519
  "watch": (
520
    WatchJob, [ArgJobId(min=1, max=1)], [],
521
    "<job-id>", "Follows a job and prints its output as it arrives"),
522
  "change-priority": (
523
    ChangePriority, [ArgJobId()],
524
    [PRIORITY_OPT, FORCE_OPT, _PENDING_OPT, _QUEUED_OPT, _WAITING_OPT],
525
    "--priority <priority> {[--force] {--pending | --queued | --waiting} |"
526
    " <job-id> [<job-id> ...]}",
527
    "Change the priority of jobs"),
528
  }
529

    
530

    
531
#: dictionary with aliases for commands
532
aliases = {
533
  "show": "info",
534
  }
535

    
536

    
537
def Main():
538
  return GenericMain(commands, aliases=aliases)