Statistics
| Branch: | Tag: | Revision:

root / lib / cli.py @ 73b90123

History | View | Annotate | Download (31.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module dealing with command line parsing"""
23

    
24

    
25
import sys
26
import textwrap
27
import os.path
28
import copy
29
import time
30
import logging
31
from cStringIO import StringIO
32

    
33
from ganeti import utils
34
from ganeti import errors
35
from ganeti import constants
36
from ganeti import opcodes
37
from ganeti import luxi
38
from ganeti import ssconf
39
from ganeti import rpc
40

    
41
from optparse import (OptionParser, make_option, TitledHelpFormatter,
42
                      Option, OptionValueError)
43

    
44

    
45
__all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain",
46
           "SubmitOpCode", "GetClient",
47
           "cli_option", "ikv_option", "keyval_option",
48
           "GenerateTable", "AskUser",
49
           "ARGS_NONE", "ARGS_FIXED", "ARGS_ATLEAST", "ARGS_ANY", "ARGS_ONE",
50
           "USEUNITS_OPT", "FIELDS_OPT", "FORCE_OPT", "SUBMIT_OPT",
51
           "ListTags", "AddTags", "RemoveTags", "TAG_SRC_OPT",
52
           "FormatError", "SplitNodeOption", "SubmitOrSend",
53
           "JobSubmittedException", "FormatTimestamp", "ParseTimespec",
54
           "ToStderr", "ToStdout", "UsesRPC",
55
           "GetOnlineNodes", "JobExecutor", "SYNC_OPT",
56
           ]
57

    
58

    
59

    
60
def _ExtractTagsObject(opts, args):
61
  """Extract the tag type object.
62

63
  Note that this function will modify its args parameter.
64

65
  """
66
  if not hasattr(opts, "tag_type"):
67
    raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
68
  kind = opts.tag_type
69
  if kind == constants.TAG_CLUSTER:
70
    retval = kind, kind
71
  elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
72
    if not args:
73
      raise errors.OpPrereqError("no arguments passed to the command")
74
    name = args.pop(0)
75
    retval = kind, name
76
  else:
77
    raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
78
  return retval
79

    
80

    
81
def _ExtendTags(opts, args):
82
  """Extend the args if a source file has been given.
83

84
  This function will extend the tags with the contents of the file
85
  passed in the 'tags_source' attribute of the opts parameter. A file
86
  named '-' will be replaced by stdin.
87

88
  """
89
  fname = opts.tags_source
90
  if fname is None:
91
    return
92
  if fname == "-":
93
    new_fh = sys.stdin
94
  else:
95
    new_fh = open(fname, "r")
96
  new_data = []
97
  try:
98
    # we don't use the nice 'new_data = [line.strip() for line in fh]'
99
    # because of python bug 1633941
100
    while True:
101
      line = new_fh.readline()
102
      if not line:
103
        break
104
      new_data.append(line.strip())
105
  finally:
106
    new_fh.close()
107
  args.extend(new_data)
108

    
109

    
110
def ListTags(opts, args):
111
  """List the tags on a given object.
112

113
  This is a generic implementation that knows how to deal with all
114
  three cases of tag objects (cluster, node, instance). The opts
115
  argument is expected to contain a tag_type field denoting what
116
  object type we work on.
117

118
  """
119
  kind, name = _ExtractTagsObject(opts, args)
120
  op = opcodes.OpGetTags(kind=kind, name=name)
121
  result = SubmitOpCode(op)
122
  result = list(result)
123
  result.sort()
124
  for tag in result:
125
    ToStdout(tag)
126

    
127

    
128
def AddTags(opts, args):
129
  """Add tags on a given object.
130

131
  This is a generic implementation that knows how to deal with all
132
  three cases of tag objects (cluster, node, instance). The opts
133
  argument is expected to contain a tag_type field denoting what
134
  object type we work on.
135

136
  """
137
  kind, name = _ExtractTagsObject(opts, args)
138
  _ExtendTags(opts, args)
139
  if not args:
140
    raise errors.OpPrereqError("No tags to be added")
141
  op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
142
  SubmitOpCode(op)
143

    
144

    
145
def RemoveTags(opts, args):
146
  """Remove tags from a given object.
147

148
  This is a generic implementation that knows how to deal with all
149
  three cases of tag objects (cluster, node, instance). The opts
150
  argument is expected to contain a tag_type field denoting what
151
  object type we work on.
152

153
  """
154
  kind, name = _ExtractTagsObject(opts, args)
155
  _ExtendTags(opts, args)
156
  if not args:
157
    raise errors.OpPrereqError("No tags to be removed")
158
  op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
159
  SubmitOpCode(op)
160

    
161

    
162
DEBUG_OPT = make_option("-d", "--debug", default=False,
163
                        action="store_true",
164
                        help="Turn debugging on")
165

    
166
NOHDR_OPT = make_option("--no-headers", default=False,
167
                        action="store_true", dest="no_headers",
168
                        help="Don't display column headers")
169

    
170
SEP_OPT = make_option("--separator", default=None,
171
                      action="store", dest="separator",
172
                      help="Separator between output fields"
173
                      " (defaults to one space)")
174

    
175
USEUNITS_OPT = make_option("--units", default=None,
176
                           dest="units", choices=('h', 'm', 'g', 't'),
177
                           help="Specify units for output (one of hmgt)")
178

    
179
FIELDS_OPT = make_option("-o", "--output", dest="output", action="store",
180
                         type="string", help="Comma separated list of"
181
                         " output fields",
182
                         metavar="FIELDS")
183

    
184
FORCE_OPT = make_option("-f", "--force", dest="force", action="store_true",
185
                        default=False, help="Force the operation")
186

    
187
TAG_SRC_OPT = make_option("--from", dest="tags_source",
188
                          default=None, help="File with tag names")
189

    
190
SUBMIT_OPT = make_option("--submit", dest="submit_only",
191
                         default=False, action="store_true",
192
                         help="Submit the job and return the job ID, but"
193
                         " don't wait for the job to finish")
194

    
195
SYNC_OPT = make_option("--sync", dest="do_locking",
196
                       default=False, action="store_true",
197
                       help="Grab locks while doing the queries"
198
                       " in order to ensure more consistent results")
199

    
200

    
201
def ARGS_FIXED(val):
202
  """Macro-like function denoting a fixed number of arguments"""
203
  return -val
204

    
205

    
206
def ARGS_ATLEAST(val):
207
  """Macro-like function denoting a minimum number of arguments"""
208
  return val
209

    
210

    
211
ARGS_NONE = None
212
ARGS_ONE = ARGS_FIXED(1)
213
ARGS_ANY = ARGS_ATLEAST(0)
214

    
215

    
216
def check_unit(option, opt, value):
217
  """OptParsers custom converter for units.
218

219
  """
220
  try:
221
    return utils.ParseUnit(value)
222
  except errors.UnitParseError, err:
223
    raise OptionValueError("option %s: %s" % (opt, err))
224

    
225

    
226
class CliOption(Option):
227
  """Custom option class for optparse.
228

229
  """
230
  TYPES = Option.TYPES + ("unit",)
231
  TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
232
  TYPE_CHECKER["unit"] = check_unit
233

    
234

    
235
def _SplitKeyVal(opt, data):
236
  """Convert a KeyVal string into a dict.
237

238
  This function will convert a key=val[,...] string into a dict. Empty
239
  values will be converted specially: keys which have the prefix 'no_'
240
  will have the value=False and the prefix stripped, the others will
241
  have value=True.
242

243
  @type opt: string
244
  @param opt: a string holding the option name for which we process the
245
      data, used in building error messages
246
  @type data: string
247
  @param data: a string of the format key=val,key=val,...
248
  @rtype: dict
249
  @return: {key=val, key=val}
250
  @raises errors.ParameterError: if there are duplicate keys
251

252
  """
253
  NO_PREFIX = "no_"
254
  UN_PREFIX = "-"
255
  kv_dict = {}
256
  for elem in data.split(","):
257
    if "=" in elem:
258
      key, val = elem.split("=", 1)
259
    else:
260
      if elem.startswith(NO_PREFIX):
261
        key, val = elem[len(NO_PREFIX):], False
262
      elif elem.startswith(UN_PREFIX):
263
        key, val = elem[len(UN_PREFIX):], None
264
      else:
265
        key, val = elem, True
266
    if key in kv_dict:
267
      raise errors.ParameterError("Duplicate key '%s' in option %s" %
268
                                  (key, opt))
269
    kv_dict[key] = val
270
  return kv_dict
271

    
272

    
273
def check_ident_key_val(option, opt, value):
274
  """Custom parser for the IdentKeyVal option type.
275

276
  """
277
  if ":" not in value:
278
    retval =  (value, {})
279
  else:
280
    ident, rest = value.split(":", 1)
281
    kv_dict = _SplitKeyVal(opt, rest)
282
    retval = (ident, kv_dict)
283
  return retval
284

    
285

    
286
class IdentKeyValOption(Option):
287
  """Custom option class for ident:key=val,key=val options.
288

289
  This will store the parsed values as a tuple (ident, {key: val}). As
290
  such, multiple uses of this option via action=append is possible.
291

292
  """
293
  TYPES = Option.TYPES + ("identkeyval",)
294
  TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
295
  TYPE_CHECKER["identkeyval"] = check_ident_key_val
296

    
297

    
298
def check_key_val(option, opt, value):
299
  """Custom parser for the KeyVal option type.
300

301
  """
302
  return _SplitKeyVal(opt, value)
303

    
304

    
305
class KeyValOption(Option):
306
  """Custom option class for key=val,key=val options.
307

308
  This will store the parsed values as a dict {key: val}.
309

310
  """
311
  TYPES = Option.TYPES + ("keyval",)
312
  TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
313
  TYPE_CHECKER["keyval"] = check_key_val
314

    
315

    
316
# optparse.py sets make_option, so we do it for our own option class, too
317
cli_option = CliOption
318
ikv_option = IdentKeyValOption
319
keyval_option = KeyValOption
320

    
321

    
322
def _ParseArgs(argv, commands, aliases):
323
  """Parser for the command line arguments.
324

325
  This function parses the arguments and returns the function which
326
  must be executed together with its (modified) arguments.
327

328
  @param argv: the command line
329
  @param commands: dictionary with special contents, see the design
330
      doc for cmdline handling
331
  @param aliases: dictionary with command aliases {'alias': 'target, ...}
332

333
  """
334
  if len(argv) == 0:
335
    binary = "<command>"
336
  else:
337
    binary = argv[0].split("/")[-1]
338

    
339
  if len(argv) > 1 and argv[1] == "--version":
340
    ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
341
    # Quit right away. That way we don't have to care about this special
342
    # argument. optparse.py does it the same.
343
    sys.exit(0)
344

    
345
  if len(argv) < 2 or not (argv[1] in commands or
346
                           argv[1] in aliases):
347
    # let's do a nice thing
348
    sortedcmds = commands.keys()
349
    sortedcmds.sort()
350

    
351
    ToStdout("Usage: %s {command} [options...] [argument...]", binary)
352
    ToStdout("%s <command> --help to see details, or man %s", binary, binary)
353
    ToStdout("")
354

    
355
    # compute the max line length for cmd + usage
356
    mlen = max([len(" %s" % cmd) for cmd in commands])
357
    mlen = min(60, mlen) # should not get here...
358

    
359
    # and format a nice command list
360
    ToStdout("Commands:")
361
    for cmd in sortedcmds:
362
      cmdstr = " %s" % (cmd,)
363
      help_text = commands[cmd][4]
364
      help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
365
      ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
366
      for line in help_lines:
367
        ToStdout("%-*s   %s", mlen, "", line)
368

    
369
    ToStdout("")
370

    
371
    return None, None, None
372

    
373
  # get command, unalias it, and look it up in commands
374
  cmd = argv.pop(1)
375
  if cmd in aliases:
376
    if cmd in commands:
377
      raise errors.ProgrammerError("Alias '%s' overrides an existing"
378
                                   " command" % cmd)
379

    
380
    if aliases[cmd] not in commands:
381
      raise errors.ProgrammerError("Alias '%s' maps to non-existing"
382
                                   " command '%s'" % (cmd, aliases[cmd]))
383

    
384
    cmd = aliases[cmd]
385

    
386
  func, nargs, parser_opts, usage, description = commands[cmd]
387
  parser = OptionParser(option_list=parser_opts,
388
                        description=description,
389
                        formatter=TitledHelpFormatter(),
390
                        usage="%%prog %s %s" % (cmd, usage))
391
  parser.disable_interspersed_args()
392
  options, args = parser.parse_args()
393
  if nargs is None:
394
    if len(args) != 0:
395
      ToStderr("Error: Command %s expects no arguments", cmd)
396
      return None, None, None
397
  elif nargs < 0 and len(args) != -nargs:
398
    ToStderr("Error: Command %s expects %d argument(s)", cmd, -nargs)
399
    return None, None, None
400
  elif nargs >= 0 and len(args) < nargs:
401
    ToStderr("Error: Command %s expects at least %d argument(s)", cmd, nargs)
402
    return None, None, None
403

    
404
  return func, options, args
405

    
406

    
407
def SplitNodeOption(value):
408
  """Splits the value of a --node option.
409

410
  """
411
  if value and ':' in value:
412
    return value.split(':', 1)
413
  else:
414
    return (value, None)
415

    
416

    
417
def UsesRPC(fn):
418
  def wrapper(*args, **kwargs):
419
    rpc.Init()
420
    try:
421
      return fn(*args, **kwargs)
422
    finally:
423
      rpc.Shutdown()
424
  return wrapper
425

    
426

    
427
def AskUser(text, choices=None):
428
  """Ask the user a question.
429

430
  @param text: the question to ask
431

432
  @param choices: list with elements tuples (input_char, return_value,
433
      description); if not given, it will default to: [('y', True,
434
      'Perform the operation'), ('n', False, 'Do no do the operation')];
435
      note that the '?' char is reserved for help
436

437
  @return: one of the return values from the choices list; if input is
438
      not possible (i.e. not running with a tty, we return the last
439
      entry from the list
440

441
  """
442
  if choices is None:
443
    choices = [('y', True, 'Perform the operation'),
444
               ('n', False, 'Do not perform the operation')]
445
  if not choices or not isinstance(choices, list):
446
    raise errors.ProgrammerError("Invalid choices argument to AskUser")
447
  for entry in choices:
448
    if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
449
      raise errors.ProgrammerError("Invalid choices element to AskUser")
450

    
451
  answer = choices[-1][1]
452
  new_text = []
453
  for line in text.splitlines():
454
    new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
455
  text = "\n".join(new_text)
456
  try:
457
    f = file("/dev/tty", "a+")
458
  except IOError:
459
    return answer
460
  try:
461
    chars = [entry[0] for entry in choices]
462
    chars[-1] = "[%s]" % chars[-1]
463
    chars.append('?')
464
    maps = dict([(entry[0], entry[1]) for entry in choices])
465
    while True:
466
      f.write(text)
467
      f.write('\n')
468
      f.write("/".join(chars))
469
      f.write(": ")
470
      line = f.readline(2).strip().lower()
471
      if line in maps:
472
        answer = maps[line]
473
        break
474
      elif line == '?':
475
        for entry in choices:
476
          f.write(" %s - %s\n" % (entry[0], entry[2]))
477
        f.write("\n")
478
        continue
479
  finally:
480
    f.close()
481
  return answer
482

    
483

    
484
class JobSubmittedException(Exception):
485
  """Job was submitted, client should exit.
486

487
  This exception has one argument, the ID of the job that was
488
  submitted. The handler should print this ID.
489

490
  This is not an error, just a structured way to exit from clients.
491

492
  """
493

    
494

    
495
def SendJob(ops, cl=None):
496
  """Function to submit an opcode without waiting for the results.
497

498
  @type ops: list
499
  @param ops: list of opcodes
500
  @type cl: luxi.Client
501
  @param cl: the luxi client to use for communicating with the master;
502
             if None, a new client will be created
503

504
  """
505
  if cl is None:
506
    cl = GetClient()
507

    
508
  job_id = cl.SubmitJob(ops)
509

    
510
  return job_id
511

    
512

    
513
def PollJob(job_id, cl=None, feedback_fn=None):
514
  """Function to poll for the result of a job.
515

516
  @type job_id: job identified
517
  @param job_id: the job to poll for results
518
  @type cl: luxi.Client
519
  @param cl: the luxi client to use for communicating with the master;
520
             if None, a new client will be created
521

522
  """
523
  if cl is None:
524
    cl = GetClient()
525

    
526
  prev_job_info = None
527
  prev_logmsg_serial = None
528

    
529
  while True:
530
    result = cl.WaitForJobChange(job_id, ["status"], prev_job_info,
531
                                 prev_logmsg_serial)
532
    if not result:
533
      # job not found, go away!
534
      raise errors.JobLost("Job with id %s lost" % job_id)
535

    
536
    # Split result, a tuple of (field values, log entries)
537
    (job_info, log_entries) = result
538
    (status, ) = job_info
539

    
540
    if log_entries:
541
      for log_entry in log_entries:
542
        (serial, timestamp, _, message) = log_entry
543
        if callable(feedback_fn):
544
          feedback_fn(log_entry[1:])
545
        else:
546
          encoded = utils.SafeEncode(message)
547
          ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded)
548
        prev_logmsg_serial = max(prev_logmsg_serial, serial)
549

    
550
    # TODO: Handle canceled and archived jobs
551
    elif status in (constants.JOB_STATUS_SUCCESS,
552
                    constants.JOB_STATUS_ERROR,
553
                    constants.JOB_STATUS_CANCELING,
554
                    constants.JOB_STATUS_CANCELED):
555
      break
556

    
557
    prev_job_info = job_info
558

    
559
  jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
560
  if not jobs:
561
    raise errors.JobLost("Job with id %s lost" % job_id)
562

    
563
  status, opstatus, result = jobs[0]
564
  if status == constants.JOB_STATUS_SUCCESS:
565
    return result
566
  elif status in (constants.JOB_STATUS_CANCELING,
567
                  constants.JOB_STATUS_CANCELED):
568
    raise errors.OpExecError("Job was canceled")
569
  else:
570
    has_ok = False
571
    for idx, (status, msg) in enumerate(zip(opstatus, result)):
572
      if status == constants.OP_STATUS_SUCCESS:
573
        has_ok = True
574
      elif status == constants.OP_STATUS_ERROR:
575
        if has_ok:
576
          raise errors.OpExecError("partial failure (opcode %d): %s" %
577
                                   (idx, msg))
578
        else:
579
          raise errors.OpExecError(str(msg))
580
    # default failure mode
581
    raise errors.OpExecError(result)
582

    
583

    
584
def SubmitOpCode(op, cl=None, feedback_fn=None):
585
  """Legacy function to submit an opcode.
586

587
  This is just a simple wrapper over the construction of the processor
588
  instance. It should be extended to better handle feedback and
589
  interaction functions.
590

591
  """
592
  if cl is None:
593
    cl = GetClient()
594

    
595
  job_id = SendJob([op], cl)
596

    
597
  op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
598

    
599
  return op_results[0]
600

    
601

    
602
def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
603
  """Wrapper around SubmitOpCode or SendJob.
604

605
  This function will decide, based on the 'opts' parameter, whether to
606
  submit and wait for the result of the opcode (and return it), or
607
  whether to just send the job and print its identifier. It is used in
608
  order to simplify the implementation of the '--submit' option.
609

610
  """
611
  if opts and opts.submit_only:
612
    job_id = SendJob([op], cl=cl)
613
    raise JobSubmittedException(job_id)
614
  else:
615
    return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn)
616

    
617

    
618
def GetClient():
619
  # TODO: Cache object?
620
  try:
621
    client = luxi.Client()
622
  except luxi.NoMasterError:
623
    master, myself = ssconf.GetMasterAndMyself()
624
    if master != myself:
625
      raise errors.OpPrereqError("This is not the master node, please connect"
626
                                 " to node '%s' and rerun the command" %
627
                                 master)
628
    else:
629
      raise
630
  return client
631

    
632

    
633
def FormatError(err):
634
  """Return a formatted error message for a given error.
635

636
  This function takes an exception instance and returns a tuple
637
  consisting of two values: first, the recommended exit code, and
638
  second, a string describing the error message (not
639
  newline-terminated).
640

641
  """
642
  retcode = 1
643
  obuf = StringIO()
644
  msg = str(err)
645
  if isinstance(err, errors.ConfigurationError):
646
    txt = "Corrupt configuration file: %s" % msg
647
    logging.error(txt)
648
    obuf.write(txt + "\n")
649
    obuf.write("Aborting.")
650
    retcode = 2
651
  elif isinstance(err, errors.HooksAbort):
652
    obuf.write("Failure: hooks execution failed:\n")
653
    for node, script, out in err.args[0]:
654
      if out:
655
        obuf.write("  node: %s, script: %s, output: %s\n" %
656
                   (node, script, out))
657
      else:
658
        obuf.write("  node: %s, script: %s (no output)\n" %
659
                   (node, script))
660
  elif isinstance(err, errors.HooksFailure):
661
    obuf.write("Failure: hooks general failure: %s" % msg)
662
  elif isinstance(err, errors.ResolverError):
663
    this_host = utils.HostInfo.SysName()
664
    if err.args[0] == this_host:
665
      msg = "Failure: can't resolve my own hostname ('%s')"
666
    else:
667
      msg = "Failure: can't resolve hostname '%s'"
668
    obuf.write(msg % err.args[0])
669
  elif isinstance(err, errors.OpPrereqError):
670
    obuf.write("Failure: prerequisites not met for this"
671
               " operation:\n%s" % msg)
672
  elif isinstance(err, errors.OpExecError):
673
    obuf.write("Failure: command execution error:\n%s" % msg)
674
  elif isinstance(err, errors.TagError):
675
    obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
676
  elif isinstance(err, errors.JobQueueDrainError):
677
    obuf.write("Failure: the job queue is marked for drain and doesn't"
678
               " accept new requests\n")
679
  elif isinstance(err, errors.JobQueueFull):
680
    obuf.write("Failure: the job queue is full and doesn't accept new"
681
               " job submissions until old jobs are archived\n")
682
  elif isinstance(err, errors.TypeEnforcementError):
683
    obuf.write("Parameter Error: %s" % msg)
684
  elif isinstance(err, errors.ParameterError):
685
    obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
686
  elif isinstance(err, errors.GenericError):
687
    obuf.write("Unhandled Ganeti error: %s" % msg)
688
  elif isinstance(err, luxi.NoMasterError):
689
    obuf.write("Cannot communicate with the master daemon.\nIs it running"
690
               " and listening for connections?")
691
  elif isinstance(err, luxi.TimeoutError):
692
    obuf.write("Timeout while talking to the master daemon. Error:\n"
693
               "%s" % msg)
694
  elif isinstance(err, luxi.ProtocolError):
695
    obuf.write("Unhandled protocol error while talking to the master daemon:\n"
696
               "%s" % msg)
697
  elif isinstance(err, JobSubmittedException):
698
    obuf.write("JobID: %s\n" % err.args[0])
699
    retcode = 0
700
  else:
701
    obuf.write("Unhandled exception: %s" % msg)
702
  return retcode, obuf.getvalue().rstrip('\n')
703

    
704

    
705
def GenericMain(commands, override=None, aliases=None):
706
  """Generic main function for all the gnt-* commands.
707

708
  Arguments:
709
    - commands: a dictionary with a special structure, see the design doc
710
                for command line handling.
711
    - override: if not None, we expect a dictionary with keys that will
712
                override command line options; this can be used to pass
713
                options from the scripts to generic functions
714
    - aliases: dictionary with command aliases {'alias': 'target, ...}
715

716
  """
717
  # save the program name and the entire command line for later logging
718
  if sys.argv:
719
    binary = os.path.basename(sys.argv[0]) or sys.argv[0]
720
    if len(sys.argv) >= 2:
721
      binary += " " + sys.argv[1]
722
      old_cmdline = " ".join(sys.argv[2:])
723
    else:
724
      old_cmdline = ""
725
  else:
726
    binary = "<unknown program>"
727
    old_cmdline = ""
728

    
729
  if aliases is None:
730
    aliases = {}
731

    
732
  func, options, args = _ParseArgs(sys.argv, commands, aliases)
733
  if func is None: # parse error
734
    return 1
735

    
736
  if override is not None:
737
    for key, val in override.iteritems():
738
      setattr(options, key, val)
739

    
740
  utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
741
                     stderr_logging=True, program=binary)
742

    
743
  if old_cmdline:
744
    logging.info("run with arguments '%s'", old_cmdline)
745
  else:
746
    logging.info("run with no arguments")
747

    
748
  try:
749
    result = func(options, args)
750
  except (errors.GenericError, luxi.ProtocolError,
751
          JobSubmittedException), err:
752
    result, err_msg = FormatError(err)
753
    logging.exception("Error during command processing")
754
    ToStderr(err_msg)
755

    
756
  return result
757

    
758

    
759
def GenerateTable(headers, fields, separator, data,
760
                  numfields=None, unitfields=None,
761
                  units=None):
762
  """Prints a table with headers and different fields.
763

764
  @type headers: dict
765
  @param headers: dictionary mapping field names to headers for
766
      the table
767
  @type fields: list
768
  @param fields: the field names corresponding to each row in
769
      the data field
770
  @param separator: the separator to be used; if this is None,
771
      the default 'smart' algorithm is used which computes optimal
772
      field width, otherwise just the separator is used between
773
      each field
774
  @type data: list
775
  @param data: a list of lists, each sublist being one row to be output
776
  @type numfields: list
777
  @param numfields: a list with the fields that hold numeric
778
      values and thus should be right-aligned
779
  @type unitfields: list
780
  @param unitfields: a list with the fields that hold numeric
781
      values that should be formatted with the units field
782
  @type units: string or None
783
  @param units: the units we should use for formatting, or None for
784
      automatic choice (human-readable for non-separator usage, otherwise
785
      megabytes); this is a one-letter string
786

787
  """
788
  if units is None:
789
    if separator:
790
      units = "m"
791
    else:
792
      units = "h"
793

    
794
  if numfields is None:
795
    numfields = []
796
  if unitfields is None:
797
    unitfields = []
798

    
799
  numfields = utils.FieldSet(*numfields)
800
  unitfields = utils.FieldSet(*unitfields)
801

    
802
  format_fields = []
803
  for field in fields:
804
    if headers and field not in headers:
805
      # TODO: handle better unknown fields (either revert to old
806
      # style of raising exception, or deal more intelligently with
807
      # variable fields)
808
      headers[field] = field
809
    if separator is not None:
810
      format_fields.append("%s")
811
    elif numfields.Matches(field):
812
      format_fields.append("%*s")
813
    else:
814
      format_fields.append("%-*s")
815

    
816
  if separator is None:
817
    mlens = [0 for name in fields]
818
    format = ' '.join(format_fields)
819
  else:
820
    format = separator.replace("%", "%%").join(format_fields)
821

    
822
  for row in data:
823
    if row is None:
824
      continue
825
    for idx, val in enumerate(row):
826
      if unitfields.Matches(fields[idx]):
827
        try:
828
          val = int(val)
829
        except ValueError:
830
          pass
831
        else:
832
          val = row[idx] = utils.FormatUnit(val, units)
833
      val = row[idx] = str(val)
834
      if separator is None:
835
        mlens[idx] = max(mlens[idx], len(val))
836

    
837
  result = []
838
  if headers:
839
    args = []
840
    for idx, name in enumerate(fields):
841
      hdr = headers[name]
842
      if separator is None:
843
        mlens[idx] = max(mlens[idx], len(hdr))
844
        args.append(mlens[idx])
845
      args.append(hdr)
846
    result.append(format % tuple(args))
847

    
848
  for line in data:
849
    args = []
850
    if line is None:
851
      line = ['-' for _ in fields]
852
    for idx in xrange(len(fields)):
853
      if separator is None:
854
        args.append(mlens[idx])
855
      args.append(line[idx])
856
    result.append(format % tuple(args))
857

    
858
  return result
859

    
860

    
861
def FormatTimestamp(ts):
862
  """Formats a given timestamp.
863

864
  @type ts: timestamp
865
  @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
866

867
  @rtype: string
868
  @return: a string with the formatted timestamp
869

870
  """
871
  if not isinstance (ts, (tuple, list)) or len(ts) != 2:
872
    return '?'
873
  sec, usec = ts
874
  return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
875

    
876

    
877
def ParseTimespec(value):
878
  """Parse a time specification.
879

880
  The following suffixed will be recognized:
881

882
    - s: seconds
883
    - m: minutes
884
    - h: hours
885
    - d: day
886
    - w: weeks
887

888
  Without any suffix, the value will be taken to be in seconds.
889

890
  """
891
  value = str(value)
892
  if not value:
893
    raise errors.OpPrereqError("Empty time specification passed")
894
  suffix_map = {
895
    's': 1,
896
    'm': 60,
897
    'h': 3600,
898
    'd': 86400,
899
    'w': 604800,
900
    }
901
  if value[-1] not in suffix_map:
902
    try:
903
      value = int(value)
904
    except ValueError:
905
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
906
  else:
907
    multiplier = suffix_map[value[-1]]
908
    value = value[:-1]
909
    if not value: # no data left after stripping the suffix
910
      raise errors.OpPrereqError("Invalid time specification (only"
911
                                 " suffix passed)")
912
    try:
913
      value = int(value) * multiplier
914
    except ValueError:
915
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
916
  return value
917

    
918

    
919
def GetOnlineNodes(nodes, cl=None, nowarn=False):
920
  """Returns the names of online nodes.
921

922
  This function will also log a warning on stderr with the names of
923
  the online nodes.
924

925
  @param nodes: if not empty, use only this subset of nodes (minus the
926
      offline ones)
927
  @param cl: if not None, luxi client to use
928
  @type nowarn: boolean
929
  @param nowarn: by default, this function will output a note with the
930
      offline nodes that are skipped; if this parameter is True the
931
      note is not displayed
932

933
  """
934
  if cl is None:
935
    cl = GetClient()
936

    
937
  result = cl.QueryNodes(names=nodes, fields=["name", "offline"],
938
                         use_locking=False)
939
  offline = [row[0] for row in result if row[1]]
940
  if offline and not nowarn:
941
    ToStderr("Note: skipping offline node(s): %s" % ", ".join(offline))
942
  return [row[0] for row in result if not row[1]]
943

    
944

    
945
def _ToStream(stream, txt, *args):
946
  """Write a message to a stream, bypassing the logging system
947

948
  @type stream: file object
949
  @param stream: the file to which we should write
950
  @type txt: str
951
  @param txt: the message
952

953
  """
954
  if args:
955
    args = tuple(args)
956
    stream.write(txt % args)
957
  else:
958
    stream.write(txt)
959
  stream.write('\n')
960
  stream.flush()
961

    
962

    
963
def ToStdout(txt, *args):
964
  """Write a message to stdout only, bypassing the logging system
965

966
  This is just a wrapper over _ToStream.
967

968
  @type txt: str
969
  @param txt: the message
970

971
  """
972
  _ToStream(sys.stdout, txt, *args)
973

    
974

    
975
def ToStderr(txt, *args):
976
  """Write a message to stderr only, bypassing the logging system
977

978
  This is just a wrapper over _ToStream.
979

980
  @type txt: str
981
  @param txt: the message
982

983
  """
984
  _ToStream(sys.stderr, txt, *args)
985

    
986

    
987
class JobExecutor(object):
988
  """Class which manages the submission and execution of multiple jobs.
989

990
  Note that instances of this class should not be reused between
991
  GetResults() calls.
992

993
  """
994
  def __init__(self, cl=None, verbose=True):
995
    self.queue = []
996
    if cl is None:
997
      cl = GetClient()
998
    self.cl = cl
999
    self.verbose = verbose
1000
    self.jobs = []
1001

    
1002
  def QueueJob(self, name, *ops):
1003
    """Record a job for later submit.
1004

1005
    @type name: string
1006
    @param name: a description of the job, will be used in WaitJobSet
1007
    """
1008
    self.queue.append((name, ops))
1009

    
1010
  def SubmitPending(self):
1011
    """Submit all pending jobs.
1012

1013
    """
1014
    results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
1015
    for ((status, data), (name, _)) in zip(results, self.queue):
1016
      self.jobs.append((status, data, name))
1017

    
1018
  def GetResults(self):
1019
    """Wait for and return the results of all jobs.
1020

1021
    @rtype: list
1022
    @return: list of tuples (success, job results), in the same order
1023
        as the submitted jobs; if a job has failed, instead of the result
1024
        there will be the error message
1025

1026
    """
1027
    if not self.jobs:
1028
      self.SubmitPending()
1029
    results = []
1030
    if self.verbose:
1031
      ok_jobs = [row[1] for row in self.jobs if row[0]]
1032
      if ok_jobs:
1033
        ToStdout("Submitted jobs %s", ", ".join(ok_jobs))
1034
    for submit_status, jid, name in self.jobs:
1035
      if not submit_status:
1036
        ToStderr("Failed to submit job for %s: %s", name, jid)
1037
        results.append((False, jid))
1038
        continue
1039
      if self.verbose:
1040
        ToStdout("Waiting for job %s for %s...", jid, name)
1041
      try:
1042
        job_result = PollJob(jid, cl=self.cl)
1043
        success = True
1044
      except (errors.GenericError, luxi.ProtocolError), err:
1045
        _, job_result = FormatError(err)
1046
        success = False
1047
        # the error message will always be shown, verbose or not
1048
        ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
1049

    
1050
      results.append((success, job_result))
1051
    return results
1052

    
1053
  def WaitOrShow(self, wait):
1054
    """Wait for job results or only print the job IDs.
1055

1056
    @type wait: boolean
1057
    @param wait: whether to wait or not
1058

1059
    """
1060
    if wait:
1061
      return self.GetResults()
1062
    else:
1063
      if not self.jobs:
1064
        self.SubmitPending()
1065
      for status, result, name in self.jobs:
1066
        if status:
1067
          ToStdout("%s: %s", result, name)
1068
        else:
1069
          ToStderr("Failure for %s: %s", name, result)