Statistics
| Branch: | Tag: | Revision:

root / lib / cli.py @ e6345c35

History | View | Annotate | Download (31.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module dealing with command line parsing"""
23

    
24

    
25
import sys
26
import textwrap
27
import os.path
28
import copy
29
import time
30
import logging
31
from cStringIO import StringIO
32

    
33
from ganeti import utils
34
from ganeti import errors
35
from ganeti import constants
36
from ganeti import opcodes
37
from ganeti import luxi
38
from ganeti import ssconf
39
from ganeti import rpc
40

    
41
from optparse import (OptionParser, make_option, TitledHelpFormatter,
42
                      Option, OptionValueError)
43

    
44

    
45
__all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain",
46
           "SubmitOpCode", "GetClient",
47
           "cli_option", "ikv_option", "keyval_option",
48
           "GenerateTable", "AskUser",
49
           "ARGS_NONE", "ARGS_FIXED", "ARGS_ATLEAST", "ARGS_ANY", "ARGS_ONE",
50
           "USEUNITS_OPT", "FIELDS_OPT", "FORCE_OPT", "SUBMIT_OPT",
51
           "ListTags", "AddTags", "RemoveTags", "TAG_SRC_OPT",
52
           "FormatError", "SplitNodeOption", "SubmitOrSend",
53
           "JobSubmittedException", "FormatTimestamp", "ParseTimespec",
54
           "ToStderr", "ToStdout", "UsesRPC",
55
           "GetOnlineNodes", "JobExecutor", "SYNC_OPT",
56
           ]
57

    
58

    
59

    
60
def _ExtractTagsObject(opts, args):
61
  """Extract the tag type object.
62

63
  Note that this function will modify its args parameter.
64

65
  """
66
  if not hasattr(opts, "tag_type"):
67
    raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
68
  kind = opts.tag_type
69
  if kind == constants.TAG_CLUSTER:
70
    retval = kind, kind
71
  elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
72
    if not args:
73
      raise errors.OpPrereqError("no arguments passed to the command")
74
    name = args.pop(0)
75
    retval = kind, name
76
  else:
77
    raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
78
  return retval
79

    
80

    
81
def _ExtendTags(opts, args):
82
  """Extend the args if a source file has been given.
83

84
  This function will extend the tags with the contents of the file
85
  passed in the 'tags_source' attribute of the opts parameter. A file
86
  named '-' will be replaced by stdin.
87

88
  """
89
  fname = opts.tags_source
90
  if fname is None:
91
    return
92
  if fname == "-":
93
    new_fh = sys.stdin
94
  else:
95
    new_fh = open(fname, "r")
96
  new_data = []
97
  try:
98
    # we don't use the nice 'new_data = [line.strip() for line in fh]'
99
    # because of python bug 1633941
100
    while True:
101
      line = new_fh.readline()
102
      if not line:
103
        break
104
      new_data.append(line.strip())
105
  finally:
106
    new_fh.close()
107
  args.extend(new_data)
108

    
109

    
110
def ListTags(opts, args):
111
  """List the tags on a given object.
112

113
  This is a generic implementation that knows how to deal with all
114
  three cases of tag objects (cluster, node, instance). The opts
115
  argument is expected to contain a tag_type field denoting what
116
  object type we work on.
117

118
  """
119
  kind, name = _ExtractTagsObject(opts, args)
120
  op = opcodes.OpGetTags(kind=kind, name=name)
121
  result = SubmitOpCode(op)
122
  result = list(result)
123
  result.sort()
124
  for tag in result:
125
    ToStdout(tag)
126

    
127

    
128
def AddTags(opts, args):
129
  """Add tags on a given object.
130

131
  This is a generic implementation that knows how to deal with all
132
  three cases of tag objects (cluster, node, instance). The opts
133
  argument is expected to contain a tag_type field denoting what
134
  object type we work on.
135

136
  """
137
  kind, name = _ExtractTagsObject(opts, args)
138
  _ExtendTags(opts, args)
139
  if not args:
140
    raise errors.OpPrereqError("No tags to be added")
141
  op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
142
  SubmitOpCode(op)
143

    
144

    
145
def RemoveTags(opts, args):
146
  """Remove tags from a given object.
147

148
  This is a generic implementation that knows how to deal with all
149
  three cases of tag objects (cluster, node, instance). The opts
150
  argument is expected to contain a tag_type field denoting what
151
  object type we work on.
152

153
  """
154
  kind, name = _ExtractTagsObject(opts, args)
155
  _ExtendTags(opts, args)
156
  if not args:
157
    raise errors.OpPrereqError("No tags to be removed")
158
  op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
159
  SubmitOpCode(op)
160

    
161

    
162
DEBUG_OPT = make_option("-d", "--debug", default=False,
163
                        action="store_true",
164
                        help="Turn debugging on")
165

    
166
NOHDR_OPT = make_option("--no-headers", default=False,
167
                        action="store_true", dest="no_headers",
168
                        help="Don't display column headers")
169

    
170
SEP_OPT = make_option("--separator", default=None,
171
                      action="store", dest="separator",
172
                      help="Separator between output fields"
173
                      " (defaults to one space)")
174

    
175
USEUNITS_OPT = make_option("--units", default=None,
176
                           dest="units", choices=('h', 'm', 'g', 't'),
177
                           help="Specify units for output (one of hmgt)")
178

    
179
FIELDS_OPT = make_option("-o", "--output", dest="output", action="store",
180
                         type="string", help="Comma separated list of"
181
                         " output fields",
182
                         metavar="FIELDS")
183

    
184
FORCE_OPT = make_option("-f", "--force", dest="force", action="store_true",
185
                        default=False, help="Force the operation")
186

    
187
TAG_SRC_OPT = make_option("--from", dest="tags_source",
188
                          default=None, help="File with tag names")
189

    
190
SUBMIT_OPT = make_option("--submit", dest="submit_only",
191
                         default=False, action="store_true",
192
                         help="Submit the job and return the job ID, but"
193
                         " don't wait for the job to finish")
194

    
195
SYNC_OPT = make_option("--sync", dest="do_locking",
196
                       default=False, action="store_true",
197
                       help="Grab locks while doing the queries"
198
                       " in order to ensure more consistent results")
199

    
200

    
201
def ARGS_FIXED(val):
202
  """Macro-like function denoting a fixed number of arguments"""
203
  return -val
204

    
205

    
206
def ARGS_ATLEAST(val):
207
  """Macro-like function denoting a minimum number of arguments"""
208
  return val
209

    
210

    
211
ARGS_NONE = None
212
ARGS_ONE = ARGS_FIXED(1)
213
ARGS_ANY = ARGS_ATLEAST(0)
214

    
215

    
216
def check_unit(option, opt, value):
217
  """OptParsers custom converter for units.
218

219
  """
220
  try:
221
    return utils.ParseUnit(value)
222
  except errors.UnitParseError, err:
223
    raise OptionValueError("option %s: %s" % (opt, err))
224

    
225

    
226
class CliOption(Option):
227
  """Custom option class for optparse.
228

229
  """
230
  TYPES = Option.TYPES + ("unit",)
231
  TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
232
  TYPE_CHECKER["unit"] = check_unit
233

    
234

    
235
def _SplitKeyVal(opt, data):
236
  """Convert a KeyVal string into a dict.
237

238
  This function will convert a key=val[,...] string into a dict. Empty
239
  values will be converted specially: keys which have the prefix 'no_'
240
  will have the value=False and the prefix stripped, the others will
241
  have value=True.
242

243
  @type opt: string
244
  @param opt: a string holding the option name for which we process the
245
      data, used in building error messages
246
  @type data: string
247
  @param data: a string of the format key=val,key=val,...
248
  @rtype: dict
249
  @return: {key=val, key=val}
250
  @raises errors.ParameterError: if there are duplicate keys
251

252
  """
253
  NO_PREFIX = "no_"
254
  UN_PREFIX = "-"
255
  kv_dict = {}
256
  for elem in data.split(","):
257
    if "=" in elem:
258
      key, val = elem.split("=", 1)
259
    else:
260
      if elem.startswith(NO_PREFIX):
261
        key, val = elem[len(NO_PREFIX):], False
262
      elif elem.startswith(UN_PREFIX):
263
        key, val = elem[len(UN_PREFIX):], None
264
      else:
265
        key, val = elem, True
266
    if key in kv_dict:
267
      raise errors.ParameterError("Duplicate key '%s' in option %s" %
268
                                  (key, opt))
269
    kv_dict[key] = val
270
  return kv_dict
271

    
272

    
273
def check_ident_key_val(option, opt, value):
274
  """Custom parser for the IdentKeyVal option type.
275

276
  """
277
  if ":" not in value:
278
    retval =  (value, {})
279
  else:
280
    ident, rest = value.split(":", 1)
281
    kv_dict = _SplitKeyVal(opt, rest)
282
    retval = (ident, kv_dict)
283
  return retval
284

    
285

    
286
class IdentKeyValOption(Option):
287
  """Custom option class for ident:key=val,key=val options.
288

289
  This will store the parsed values as a tuple (ident, {key: val}). As
290
  such, multiple uses of this option via action=append is possible.
291

292
  """
293
  TYPES = Option.TYPES + ("identkeyval",)
294
  TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
295
  TYPE_CHECKER["identkeyval"] = check_ident_key_val
296

    
297

    
298
def check_key_val(option, opt, value):
299
  """Custom parser for the KeyVal option type.
300

301
  """
302
  return _SplitKeyVal(opt, value)
303

    
304

    
305
class KeyValOption(Option):
306
  """Custom option class for key=val,key=val options.
307

308
  This will store the parsed values as a dict {key: val}.
309

310
  """
311
  TYPES = Option.TYPES + ("keyval",)
312
  TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
313
  TYPE_CHECKER["keyval"] = check_key_val
314

    
315

    
316
# optparse.py sets make_option, so we do it for our own option class, too
317
cli_option = CliOption
318
ikv_option = IdentKeyValOption
319
keyval_option = KeyValOption
320

    
321

    
322
def _ParseArgs(argv, commands, aliases):
323
  """Parser for the command line arguments.
324

325
  This function parses the arguments and returns the function which
326
  must be executed together with its (modified) arguments.
327

328
  @param argv: the command line
329
  @param commands: dictionary with special contents, see the design
330
      doc for cmdline handling
331
  @param aliases: dictionary with command aliases {'alias': 'target, ...}
332

333
  """
334
  if len(argv) == 0:
335
    binary = "<command>"
336
  else:
337
    binary = argv[0].split("/")[-1]
338

    
339
  if len(argv) > 1 and argv[1] == "--version":
340
    ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
341
    # Quit right away. That way we don't have to care about this special
342
    # argument. optparse.py does it the same.
343
    sys.exit(0)
344

    
345
  if len(argv) < 2 or not (argv[1] in commands or
346
                           argv[1] in aliases):
347
    # let's do a nice thing
348
    sortedcmds = commands.keys()
349
    sortedcmds.sort()
350

    
351
    ToStdout("Usage: %s {command} [options...] [argument...]", binary)
352
    ToStdout("%s <command> --help to see details, or man %s", binary, binary)
353
    ToStdout("")
354

    
355
    # compute the max line length for cmd + usage
356
    mlen = max([len(" %s" % cmd) for cmd in commands])
357
    mlen = min(60, mlen) # should not get here...
358

    
359
    # and format a nice command list
360
    ToStdout("Commands:")
361
    for cmd in sortedcmds:
362
      cmdstr = " %s" % (cmd,)
363
      help_text = commands[cmd][4]
364
      help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
365
      ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
366
      for line in help_lines:
367
        ToStdout("%-*s   %s", mlen, "", line)
368

    
369
    ToStdout("")
370

    
371
    return None, None, None
372

    
373
  # get command, unalias it, and look it up in commands
374
  cmd = argv.pop(1)
375
  if cmd in aliases:
376
    if cmd in commands:
377
      raise errors.ProgrammerError("Alias '%s' overrides an existing"
378
                                   " command" % cmd)
379

    
380
    if aliases[cmd] not in commands:
381
      raise errors.ProgrammerError("Alias '%s' maps to non-existing"
382
                                   " command '%s'" % (cmd, aliases[cmd]))
383

    
384
    cmd = aliases[cmd]
385

    
386
  func, nargs, parser_opts, usage, description = commands[cmd]
387
  parser = OptionParser(option_list=parser_opts,
388
                        description=description,
389
                        formatter=TitledHelpFormatter(),
390
                        usage="%%prog %s %s" % (cmd, usage))
391
  parser.disable_interspersed_args()
392
  options, args = parser.parse_args()
393
  if nargs is None:
394
    if len(args) != 0:
395
      ToStderr("Error: Command %s expects no arguments", cmd)
396
      return None, None, None
397
  elif nargs < 0 and len(args) != -nargs:
398
    ToStderr("Error: Command %s expects %d argument(s)", cmd, -nargs)
399
    return None, None, None
400
  elif nargs >= 0 and len(args) < nargs:
401
    ToStderr("Error: Command %s expects at least %d argument(s)", cmd, nargs)
402
    return None, None, None
403

    
404
  return func, options, args
405

    
406

    
407
def SplitNodeOption(value):
408
  """Splits the value of a --node option.
409

410
  """
411
  if value and ':' in value:
412
    return value.split(':', 1)
413
  else:
414
    return (value, None)
415

    
416

    
417
def UsesRPC(fn):
418
  def wrapper(*args, **kwargs):
419
    rpc.Init()
420
    try:
421
      return fn(*args, **kwargs)
422
    finally:
423
      rpc.Shutdown()
424
  return wrapper
425

    
426

    
427
def AskUser(text, choices=None):
428
  """Ask the user a question.
429

430
  @param text: the question to ask
431

432
  @param choices: list with elements tuples (input_char, return_value,
433
      description); if not given, it will default to: [('y', True,
434
      'Perform the operation'), ('n', False, 'Do no do the operation')];
435
      note that the '?' char is reserved for help
436

437
  @return: one of the return values from the choices list; if input is
438
      not possible (i.e. not running with a tty, we return the last
439
      entry from the list
440

441
  """
442
  if choices is None:
443
    choices = [('y', True, 'Perform the operation'),
444
               ('n', False, 'Do not perform the operation')]
445
  if not choices or not isinstance(choices, list):
446
    raise errors.ProgrammerError("Invalid choices argument to AskUser")
447
  for entry in choices:
448
    if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
449
      raise errors.ProgrammerError("Invalid choices element to AskUser")
450

    
451
  answer = choices[-1][1]
452
  new_text = []
453
  for line in text.splitlines():
454
    new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
455
  text = "\n".join(new_text)
456
  try:
457
    f = file("/dev/tty", "a+")
458
  except IOError:
459
    return answer
460
  try:
461
    chars = [entry[0] for entry in choices]
462
    chars[-1] = "[%s]" % chars[-1]
463
    chars.append('?')
464
    maps = dict([(entry[0], entry[1]) for entry in choices])
465
    while True:
466
      f.write(text)
467
      f.write('\n')
468
      f.write("/".join(chars))
469
      f.write(": ")
470
      line = f.readline(2).strip().lower()
471
      if line in maps:
472
        answer = maps[line]
473
        break
474
      elif line == '?':
475
        for entry in choices:
476
          f.write(" %s - %s\n" % (entry[0], entry[2]))
477
        f.write("\n")
478
        continue
479
  finally:
480
    f.close()
481
  return answer
482

    
483

    
484
class JobSubmittedException(Exception):
485
  """Job was submitted, client should exit.
486

487
  This exception has one argument, the ID of the job that was
488
  submitted. The handler should print this ID.
489

490
  This is not an error, just a structured way to exit from clients.
491

492
  """
493

    
494

    
495
def SendJob(ops, cl=None):
496
  """Function to submit an opcode without waiting for the results.
497

498
  @type ops: list
499
  @param ops: list of opcodes
500
  @type cl: luxi.Client
501
  @param cl: the luxi client to use for communicating with the master;
502
             if None, a new client will be created
503

504
  """
505
  if cl is None:
506
    cl = GetClient()
507

    
508
  job_id = cl.SubmitJob(ops)
509

    
510
  return job_id
511

    
512

    
513
def PollJob(job_id, cl=None, feedback_fn=None):
514
  """Function to poll for the result of a job.
515

516
  @type job_id: job identified
517
  @param job_id: the job to poll for results
518
  @type cl: luxi.Client
519
  @param cl: the luxi client to use for communicating with the master;
520
             if None, a new client will be created
521

522
  """
523
  if cl is None:
524
    cl = GetClient()
525

    
526
  prev_job_info = None
527
  prev_logmsg_serial = None
528

    
529
  while True:
530
    result = cl.WaitForJobChange(job_id, ["status"], prev_job_info,
531
                                 prev_logmsg_serial)
532
    if not result:
533
      # job not found, go away!
534
      raise errors.JobLost("Job with id %s lost" % job_id)
535

    
536
    # Split result, a tuple of (field values, log entries)
537
    (job_info, log_entries) = result
538
    (status, ) = job_info
539

    
540
    if log_entries:
541
      for log_entry in log_entries:
542
        (serial, timestamp, _, message) = log_entry
543
        if callable(feedback_fn):
544
          feedback_fn(log_entry[1:])
545
        else:
546
          encoded = utils.SafeEncode(message)
547
          ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded)
548
        prev_logmsg_serial = max(prev_logmsg_serial, serial)
549

    
550
    # TODO: Handle canceled and archived jobs
551
    elif status in (constants.JOB_STATUS_SUCCESS,
552
                    constants.JOB_STATUS_ERROR,
553
                    constants.JOB_STATUS_CANCELING,
554
                    constants.JOB_STATUS_CANCELED):
555
      break
556

    
557
    prev_job_info = job_info
558

    
559
  jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
560
  if not jobs:
561
    raise errors.JobLost("Job with id %s lost" % job_id)
562

    
563
  status, opstatus, result = jobs[0]
564
  if status == constants.JOB_STATUS_SUCCESS:
565
    return result
566
  elif status in (constants.JOB_STATUS_CANCELING,
567
                  constants.JOB_STATUS_CANCELED):
568
    raise errors.OpExecError("Job was canceled")
569
  else:
570
    has_ok = False
571
    for idx, (status, msg) in enumerate(zip(opstatus, result)):
572
      if status == constants.OP_STATUS_SUCCESS:
573
        has_ok = True
574
      elif status == constants.OP_STATUS_ERROR:
575
        errors.MaybeRaise(msg)
576
        if has_ok:
577
          raise errors.OpExecError("partial failure (opcode %d): %s" %
578
                                   (idx, msg))
579
        else:
580
          raise errors.OpExecError(str(msg))
581
    # default failure mode
582
    raise errors.OpExecError(result)
583

    
584

    
585
def SubmitOpCode(op, cl=None, feedback_fn=None):
586
  """Legacy function to submit an opcode.
587

588
  This is just a simple wrapper over the construction of the processor
589
  instance. It should be extended to better handle feedback and
590
  interaction functions.
591

592
  """
593
  if cl is None:
594
    cl = GetClient()
595

    
596
  job_id = SendJob([op], cl)
597

    
598
  op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
599

    
600
  return op_results[0]
601

    
602

    
603
def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
604
  """Wrapper around SubmitOpCode or SendJob.
605

606
  This function will decide, based on the 'opts' parameter, whether to
607
  submit and wait for the result of the opcode (and return it), or
608
  whether to just send the job and print its identifier. It is used in
609
  order to simplify the implementation of the '--submit' option.
610

611
  """
612
  if opts and opts.submit_only:
613
    job_id = SendJob([op], cl=cl)
614
    raise JobSubmittedException(job_id)
615
  else:
616
    return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn)
617

    
618

    
619
def GetClient():
620
  # TODO: Cache object?
621
  try:
622
    client = luxi.Client()
623
  except luxi.NoMasterError:
624
    master, myself = ssconf.GetMasterAndMyself()
625
    if master != myself:
626
      raise errors.OpPrereqError("This is not the master node, please connect"
627
                                 " to node '%s' and rerun the command" %
628
                                 master)
629
    else:
630
      raise
631
  return client
632

    
633

    
634
def FormatError(err):
635
  """Return a formatted error message for a given error.
636

637
  This function takes an exception instance and returns a tuple
638
  consisting of two values: first, the recommended exit code, and
639
  second, a string describing the error message (not
640
  newline-terminated).
641

642
  """
643
  retcode = 1
644
  obuf = StringIO()
645
  msg = str(err)
646
  if isinstance(err, errors.ConfigurationError):
647
    txt = "Corrupt configuration file: %s" % msg
648
    logging.error(txt)
649
    obuf.write(txt + "\n")
650
    obuf.write("Aborting.")
651
    retcode = 2
652
  elif isinstance(err, errors.HooksAbort):
653
    obuf.write("Failure: hooks execution failed:\n")
654
    for node, script, out in err.args[0]:
655
      if out:
656
        obuf.write("  node: %s, script: %s, output: %s\n" %
657
                   (node, script, out))
658
      else:
659
        obuf.write("  node: %s, script: %s (no output)\n" %
660
                   (node, script))
661
  elif isinstance(err, errors.HooksFailure):
662
    obuf.write("Failure: hooks general failure: %s" % msg)
663
  elif isinstance(err, errors.ResolverError):
664
    this_host = utils.HostInfo.SysName()
665
    if err.args[0] == this_host:
666
      msg = "Failure: can't resolve my own hostname ('%s')"
667
    else:
668
      msg = "Failure: can't resolve hostname '%s'"
669
    obuf.write(msg % err.args[0])
670
  elif isinstance(err, errors.OpPrereqError):
671
    obuf.write("Failure: prerequisites not met for this"
672
               " operation:\n%s" % msg)
673
  elif isinstance(err, errors.OpExecError):
674
    obuf.write("Failure: command execution error:\n%s" % msg)
675
  elif isinstance(err, errors.TagError):
676
    obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
677
  elif isinstance(err, errors.JobQueueDrainError):
678
    obuf.write("Failure: the job queue is marked for drain and doesn't"
679
               " accept new requests\n")
680
  elif isinstance(err, errors.JobQueueFull):
681
    obuf.write("Failure: the job queue is full and doesn't accept new"
682
               " job submissions until old jobs are archived\n")
683
  elif isinstance(err, errors.TypeEnforcementError):
684
    obuf.write("Parameter Error: %s" % msg)
685
  elif isinstance(err, errors.ParameterError):
686
    obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
687
  elif isinstance(err, errors.GenericError):
688
    obuf.write("Unhandled Ganeti error: %s" % msg)
689
  elif isinstance(err, luxi.NoMasterError):
690
    obuf.write("Cannot communicate with the master daemon.\nIs it running"
691
               " and listening for connections?")
692
  elif isinstance(err, luxi.TimeoutError):
693
    obuf.write("Timeout while talking to the master daemon. Error:\n"
694
               "%s" % msg)
695
  elif isinstance(err, luxi.ProtocolError):
696
    obuf.write("Unhandled protocol error while talking to the master daemon:\n"
697
               "%s" % msg)
698
  elif isinstance(err, JobSubmittedException):
699
    obuf.write("JobID: %s\n" % err.args[0])
700
    retcode = 0
701
  else:
702
    obuf.write("Unhandled exception: %s" % msg)
703
  return retcode, obuf.getvalue().rstrip('\n')
704

    
705

    
706
def GenericMain(commands, override=None, aliases=None):
707
  """Generic main function for all the gnt-* commands.
708

709
  Arguments:
710
    - commands: a dictionary with a special structure, see the design doc
711
                for command line handling.
712
    - override: if not None, we expect a dictionary with keys that will
713
                override command line options; this can be used to pass
714
                options from the scripts to generic functions
715
    - aliases: dictionary with command aliases {'alias': 'target, ...}
716

717
  """
718
  # save the program name and the entire command line for later logging
719
  if sys.argv:
720
    binary = os.path.basename(sys.argv[0]) or sys.argv[0]
721
    if len(sys.argv) >= 2:
722
      binary += " " + sys.argv[1]
723
      old_cmdline = " ".join(sys.argv[2:])
724
    else:
725
      old_cmdline = ""
726
  else:
727
    binary = "<unknown program>"
728
    old_cmdline = ""
729

    
730
  if aliases is None:
731
    aliases = {}
732

    
733
  func, options, args = _ParseArgs(sys.argv, commands, aliases)
734
  if func is None: # parse error
735
    return 1
736

    
737
  if override is not None:
738
    for key, val in override.iteritems():
739
      setattr(options, key, val)
740

    
741
  utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
742
                     stderr_logging=True, program=binary)
743

    
744
  if old_cmdline:
745
    logging.info("run with arguments '%s'", old_cmdline)
746
  else:
747
    logging.info("run with no arguments")
748

    
749
  try:
750
    result = func(options, args)
751
  except (errors.GenericError, luxi.ProtocolError,
752
          JobSubmittedException), err:
753
    result, err_msg = FormatError(err)
754
    logging.exception("Error during command processing")
755
    ToStderr(err_msg)
756

    
757
  return result
758

    
759

    
760
def GenerateTable(headers, fields, separator, data,
761
                  numfields=None, unitfields=None,
762
                  units=None):
763
  """Prints a table with headers and different fields.
764

765
  @type headers: dict
766
  @param headers: dictionary mapping field names to headers for
767
      the table
768
  @type fields: list
769
  @param fields: the field names corresponding to each row in
770
      the data field
771
  @param separator: the separator to be used; if this is None,
772
      the default 'smart' algorithm is used which computes optimal
773
      field width, otherwise just the separator is used between
774
      each field
775
  @type data: list
776
  @param data: a list of lists, each sublist being one row to be output
777
  @type numfields: list
778
  @param numfields: a list with the fields that hold numeric
779
      values and thus should be right-aligned
780
  @type unitfields: list
781
  @param unitfields: a list with the fields that hold numeric
782
      values that should be formatted with the units field
783
  @type units: string or None
784
  @param units: the units we should use for formatting, or None for
785
      automatic choice (human-readable for non-separator usage, otherwise
786
      megabytes); this is a one-letter string
787

788
  """
789
  if units is None:
790
    if separator:
791
      units = "m"
792
    else:
793
      units = "h"
794

    
795
  if numfields is None:
796
    numfields = []
797
  if unitfields is None:
798
    unitfields = []
799

    
800
  numfields = utils.FieldSet(*numfields)
801
  unitfields = utils.FieldSet(*unitfields)
802

    
803
  format_fields = []
804
  for field in fields:
805
    if headers and field not in headers:
806
      # TODO: handle better unknown fields (either revert to old
807
      # style of raising exception, or deal more intelligently with
808
      # variable fields)
809
      headers[field] = field
810
    if separator is not None:
811
      format_fields.append("%s")
812
    elif numfields.Matches(field):
813
      format_fields.append("%*s")
814
    else:
815
      format_fields.append("%-*s")
816

    
817
  if separator is None:
818
    mlens = [0 for name in fields]
819
    format = ' '.join(format_fields)
820
  else:
821
    format = separator.replace("%", "%%").join(format_fields)
822

    
823
  for row in data:
824
    if row is None:
825
      continue
826
    for idx, val in enumerate(row):
827
      if unitfields.Matches(fields[idx]):
828
        try:
829
          val = int(val)
830
        except ValueError:
831
          pass
832
        else:
833
          val = row[idx] = utils.FormatUnit(val, units)
834
      val = row[idx] = str(val)
835
      if separator is None:
836
        mlens[idx] = max(mlens[idx], len(val))
837

    
838
  result = []
839
  if headers:
840
    args = []
841
    for idx, name in enumerate(fields):
842
      hdr = headers[name]
843
      if separator is None:
844
        mlens[idx] = max(mlens[idx], len(hdr))
845
        args.append(mlens[idx])
846
      args.append(hdr)
847
    result.append(format % tuple(args))
848

    
849
  for line in data:
850
    args = []
851
    if line is None:
852
      line = ['-' for _ in fields]
853
    for idx in xrange(len(fields)):
854
      if separator is None:
855
        args.append(mlens[idx])
856
      args.append(line[idx])
857
    result.append(format % tuple(args))
858

    
859
  return result
860

    
861

    
862
def FormatTimestamp(ts):
863
  """Formats a given timestamp.
864

865
  @type ts: timestamp
866
  @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
867

868
  @rtype: string
869
  @return: a string with the formatted timestamp
870

871
  """
872
  if not isinstance (ts, (tuple, list)) or len(ts) != 2:
873
    return '?'
874
  sec, usec = ts
875
  return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
876

    
877

    
878
def ParseTimespec(value):
879
  """Parse a time specification.
880

881
  The following suffixed will be recognized:
882

883
    - s: seconds
884
    - m: minutes
885
    - h: hours
886
    - d: day
887
    - w: weeks
888

889
  Without any suffix, the value will be taken to be in seconds.
890

891
  """
892
  value = str(value)
893
  if not value:
894
    raise errors.OpPrereqError("Empty time specification passed")
895
  suffix_map = {
896
    's': 1,
897
    'm': 60,
898
    'h': 3600,
899
    'd': 86400,
900
    'w': 604800,
901
    }
902
  if value[-1] not in suffix_map:
903
    try:
904
      value = int(value)
905
    except ValueError:
906
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
907
  else:
908
    multiplier = suffix_map[value[-1]]
909
    value = value[:-1]
910
    if not value: # no data left after stripping the suffix
911
      raise errors.OpPrereqError("Invalid time specification (only"
912
                                 " suffix passed)")
913
    try:
914
      value = int(value) * multiplier
915
    except ValueError:
916
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
917
  return value
918

    
919

    
920
def GetOnlineNodes(nodes, cl=None, nowarn=False):
921
  """Returns the names of online nodes.
922

923
  This function will also log a warning on stderr with the names of
924
  the online nodes.
925

926
  @param nodes: if not empty, use only this subset of nodes (minus the
927
      offline ones)
928
  @param cl: if not None, luxi client to use
929
  @type nowarn: boolean
930
  @param nowarn: by default, this function will output a note with the
931
      offline nodes that are skipped; if this parameter is True the
932
      note is not displayed
933

934
  """
935
  if cl is None:
936
    cl = GetClient()
937

    
938
  result = cl.QueryNodes(names=nodes, fields=["name", "offline"],
939
                         use_locking=False)
940
  offline = [row[0] for row in result if row[1]]
941
  if offline and not nowarn:
942
    ToStderr("Note: skipping offline node(s): %s" % ", ".join(offline))
943
  return [row[0] for row in result if not row[1]]
944

    
945

    
946
def _ToStream(stream, txt, *args):
947
  """Write a message to a stream, bypassing the logging system
948

949
  @type stream: file object
950
  @param stream: the file to which we should write
951
  @type txt: str
952
  @param txt: the message
953

954
  """
955
  if args:
956
    args = tuple(args)
957
    stream.write(txt % args)
958
  else:
959
    stream.write(txt)
960
  stream.write('\n')
961
  stream.flush()
962

    
963

    
964
def ToStdout(txt, *args):
965
  """Write a message to stdout only, bypassing the logging system
966

967
  This is just a wrapper over _ToStream.
968

969
  @type txt: str
970
  @param txt: the message
971

972
  """
973
  _ToStream(sys.stdout, txt, *args)
974

    
975

    
976
def ToStderr(txt, *args):
977
  """Write a message to stderr only, bypassing the logging system
978

979
  This is just a wrapper over _ToStream.
980

981
  @type txt: str
982
  @param txt: the message
983

984
  """
985
  _ToStream(sys.stderr, txt, *args)
986

    
987

    
988
class JobExecutor(object):
989
  """Class which manages the submission and execution of multiple jobs.
990

991
  Note that instances of this class should not be reused between
992
  GetResults() calls.
993

994
  """
995
  def __init__(self, cl=None, verbose=True):
996
    self.queue = []
997
    if cl is None:
998
      cl = GetClient()
999
    self.cl = cl
1000
    self.verbose = verbose
1001
    self.jobs = []
1002

    
1003
  def QueueJob(self, name, *ops):
1004
    """Record a job for later submit.
1005

1006
    @type name: string
1007
    @param name: a description of the job, will be used in WaitJobSet
1008
    """
1009
    self.queue.append((name, ops))
1010

    
1011
  def SubmitPending(self):
1012
    """Submit all pending jobs.
1013

1014
    """
1015
    results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
1016
    for ((status, data), (name, _)) in zip(results, self.queue):
1017
      self.jobs.append((status, data, name))
1018

    
1019
  def GetResults(self):
1020
    """Wait for and return the results of all jobs.
1021

1022
    @rtype: list
1023
    @return: list of tuples (success, job results), in the same order
1024
        as the submitted jobs; if a job has failed, instead of the result
1025
        there will be the error message
1026

1027
    """
1028
    if not self.jobs:
1029
      self.SubmitPending()
1030
    results = []
1031
    if self.verbose:
1032
      ok_jobs = [row[1] for row in self.jobs if row[0]]
1033
      if ok_jobs:
1034
        ToStdout("Submitted jobs %s", ", ".join(ok_jobs))
1035
    for submit_status, jid, name in self.jobs:
1036
      if not submit_status:
1037
        ToStderr("Failed to submit job for %s: %s", name, jid)
1038
        results.append((False, jid))
1039
        continue
1040
      if self.verbose:
1041
        ToStdout("Waiting for job %s for %s...", jid, name)
1042
      try:
1043
        job_result = PollJob(jid, cl=self.cl)
1044
        success = True
1045
      except (errors.GenericError, luxi.ProtocolError), err:
1046
        _, job_result = FormatError(err)
1047
        success = False
1048
        # the error message will always be shown, verbose or not
1049
        ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
1050

    
1051
      results.append((success, job_result))
1052
    return results
1053

    
1054
  def WaitOrShow(self, wait):
1055
    """Wait for job results or only print the job IDs.
1056

1057
    @type wait: boolean
1058
    @param wait: whether to wait or not
1059

1060
    """
1061
    if wait:
1062
      return self.GetResults()
1063
    else:
1064
      if not self.jobs:
1065
        self.SubmitPending()
1066
      for status, result, name in self.jobs:
1067
        if status:
1068
          ToStdout("%s: %s", result, name)
1069
        else:
1070
          ToStderr("Failure for %s: %s", name, result)