Statistics
| Branch: | Tag: | Revision:

root / lib / cli.py @ 26023ecd

History | View | Annotate | Download (37.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module dealing with command line parsing"""
23

    
24

    
25
import sys
26
import textwrap
27
import os.path
28
import copy
29
import time
30
import logging
31
from cStringIO import StringIO
32

    
33
from ganeti import utils
34
from ganeti import errors
35
from ganeti import constants
36
from ganeti import opcodes
37
from ganeti import luxi
38
from ganeti import ssconf
39
from ganeti import rpc
40

    
41
from optparse import (OptionParser, TitledHelpFormatter,
42
                      Option, OptionValueError)
43

    
44

    
45
__all__ = [
46
  # Command line options
47
  "CONFIRM_OPT",
48
  "DEBUG_OPT",
49
  "DEBUG_SIMERR_OPT",
50
  "DISK_TEMPLATE_OPT",
51
  "FIELDS_OPT",
52
  "FORCE_OPT",
53
  "NOHDR_OPT",
54
  "NONICS_OPT",
55
  "NWSYNC_OPT",
56
  "SEP_OPT",
57
  "SUBMIT_OPT",
58
  "SYNC_OPT",
59
  "TAG_SRC_OPT",
60
  "USEUNITS_OPT",
61
  "VERBOSE_OPT",
62
  # Generic functions for CLI programs
63
  "GenericMain",
64
  "GetClient",
65
  "GetOnlineNodes",
66
  "JobExecutor",
67
  "JobSubmittedException",
68
  "ParseTimespec",
69
  "SubmitOpCode",
70
  "SubmitOrSend",
71
  "UsesRPC",
72
  # Formatting functions
73
  "ToStderr", "ToStdout",
74
  "FormatError",
75
  "GenerateTable",
76
  "AskUser",
77
  "FormatTimestamp",
78
  # Tags functions
79
  "ListTags",
80
  "AddTags",
81
  "RemoveTags",
82
  # command line options support infrastructure
83
  "ARGS_MANY_INSTANCES",
84
  "ARGS_MANY_NODES",
85
  "ARGS_NONE",
86
  "ARGS_ONE_INSTANCE",
87
  "ARGS_ONE_NODE",
88
  "ArgChoice",
89
  "ArgCommand",
90
  "ArgFile",
91
  "ArgHost",
92
  "ArgInstance",
93
  "ArgJobId",
94
  "ArgNode",
95
  "ArgSuggest",
96
  "ArgUnknown",
97
  "OPT_COMPL_INST_ADD_NODES",
98
  "OPT_COMPL_MANY_NODES",
99
  "OPT_COMPL_ONE_IALLOCATOR",
100
  "OPT_COMPL_ONE_INSTANCE",
101
  "OPT_COMPL_ONE_NODE",
102
  "OPT_COMPL_ONE_OS",
103
  "cli_option",
104
  "SplitNodeOption",
105
  ]
106

    
107
NO_PREFIX = "no_"
108
UN_PREFIX = "-"
109

    
110

    
111
class _Argument:
112
  def __init__(self, min=0, max=None):
113
    self.min = min
114
    self.max = max
115

    
116
  def __repr__(self):
117
    return ("<%s min=%s max=%s>" %
118
            (self.__class__.__name__, self.min, self.max))
119

    
120

    
121
class ArgSuggest(_Argument):
122
  """Suggesting argument.
123

124
  Value can be any of the ones passed to the constructor.
125

126
  """
127
  def __init__(self, min=0, max=None, choices=None):
128
    _Argument.__init__(self, min=min, max=max)
129
    self.choices = choices
130

    
131
  def __repr__(self):
132
    return ("<%s min=%s max=%s choices=%r>" %
133
            (self.__class__.__name__, self.min, self.max, self.choices))
134

    
135

    
136
class ArgChoice(ArgSuggest):
137
  """Choice argument.
138

139
  Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
140
  but value must be one of the choices.
141

142
  """
143

    
144

    
145
class ArgUnknown(_Argument):
146
  """Unknown argument to program (e.g. determined at runtime).
147

148
  """
149

    
150

    
151
class ArgInstance(_Argument):
152
  """Instances argument.
153

154
  """
155

    
156

    
157
class ArgNode(_Argument):
158
  """Node argument.
159

160
  """
161

    
162
class ArgJobId(_Argument):
163
  """Job ID argument.
164

165
  """
166

    
167

    
168
class ArgFile(_Argument):
169
  """File path argument.
170

171
  """
172

    
173

    
174
class ArgCommand(_Argument):
175
  """Command argument.
176

177
  """
178

    
179

    
180
class ArgHost(_Argument):
181
  """Host argument.
182

183
  """
184

    
185

    
186
ARGS_NONE = []
187
ARGS_MANY_INSTANCES = [ArgInstance()]
188
ARGS_MANY_NODES = [ArgNode()]
189
ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
190
ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
191

    
192

    
193

    
194
def _ExtractTagsObject(opts, args):
195
  """Extract the tag type object.
196

197
  Note that this function will modify its args parameter.
198

199
  """
200
  if not hasattr(opts, "tag_type"):
201
    raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
202
  kind = opts.tag_type
203
  if kind == constants.TAG_CLUSTER:
204
    retval = kind, kind
205
  elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
206
    if not args:
207
      raise errors.OpPrereqError("no arguments passed to the command")
208
    name = args.pop(0)
209
    retval = kind, name
210
  else:
211
    raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
212
  return retval
213

    
214

    
215
def _ExtendTags(opts, args):
216
  """Extend the args if a source file has been given.
217

218
  This function will extend the tags with the contents of the file
219
  passed in the 'tags_source' attribute of the opts parameter. A file
220
  named '-' will be replaced by stdin.
221

222
  """
223
  fname = opts.tags_source
224
  if fname is None:
225
    return
226
  if fname == "-":
227
    new_fh = sys.stdin
228
  else:
229
    new_fh = open(fname, "r")
230
  new_data = []
231
  try:
232
    # we don't use the nice 'new_data = [line.strip() for line in fh]'
233
    # because of python bug 1633941
234
    while True:
235
      line = new_fh.readline()
236
      if not line:
237
        break
238
      new_data.append(line.strip())
239
  finally:
240
    new_fh.close()
241
  args.extend(new_data)
242

    
243

    
244
def ListTags(opts, args):
245
  """List the tags on a given object.
246

247
  This is a generic implementation that knows how to deal with all
248
  three cases of tag objects (cluster, node, instance). The opts
249
  argument is expected to contain a tag_type field denoting what
250
  object type we work on.
251

252
  """
253
  kind, name = _ExtractTagsObject(opts, args)
254
  op = opcodes.OpGetTags(kind=kind, name=name)
255
  result = SubmitOpCode(op)
256
  result = list(result)
257
  result.sort()
258
  for tag in result:
259
    ToStdout(tag)
260

    
261

    
262
def AddTags(opts, args):
263
  """Add tags on a given object.
264

265
  This is a generic implementation that knows how to deal with all
266
  three cases of tag objects (cluster, node, instance). The opts
267
  argument is expected to contain a tag_type field denoting what
268
  object type we work on.
269

270
  """
271
  kind, name = _ExtractTagsObject(opts, args)
272
  _ExtendTags(opts, args)
273
  if not args:
274
    raise errors.OpPrereqError("No tags to be added")
275
  op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
276
  SubmitOpCode(op)
277

    
278

    
279
def RemoveTags(opts, args):
280
  """Remove tags from a given object.
281

282
  This is a generic implementation that knows how to deal with all
283
  three cases of tag objects (cluster, node, instance). The opts
284
  argument is expected to contain a tag_type field denoting what
285
  object type we work on.
286

287
  """
288
  kind, name = _ExtractTagsObject(opts, args)
289
  _ExtendTags(opts, args)
290
  if not args:
291
    raise errors.OpPrereqError("No tags to be removed")
292
  op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
293
  SubmitOpCode(op)
294

    
295

    
296
def check_unit(option, opt, value):
297
  """OptParsers custom converter for units.
298

299
  """
300
  try:
301
    return utils.ParseUnit(value)
302
  except errors.UnitParseError, err:
303
    raise OptionValueError("option %s: %s" % (opt, err))
304

    
305

    
306
def _SplitKeyVal(opt, data):
307
  """Convert a KeyVal string into a dict.
308

309
  This function will convert a key=val[,...] string into a dict. Empty
310
  values will be converted specially: keys which have the prefix 'no_'
311
  will have the value=False and the prefix stripped, the others will
312
  have value=True.
313

314
  @type opt: string
315
  @param opt: a string holding the option name for which we process the
316
      data, used in building error messages
317
  @type data: string
318
  @param data: a string of the format key=val,key=val,...
319
  @rtype: dict
320
  @return: {key=val, key=val}
321
  @raises errors.ParameterError: if there are duplicate keys
322

323
  """
324
  kv_dict = {}
325
  if data:
326
    for elem in data.split(","):
327
      if "=" in elem:
328
        key, val = elem.split("=", 1)
329
      else:
330
        if elem.startswith(NO_PREFIX):
331
          key, val = elem[len(NO_PREFIX):], False
332
        elif elem.startswith(UN_PREFIX):
333
          key, val = elem[len(UN_PREFIX):], None
334
        else:
335
          key, val = elem, True
336
      if key in kv_dict:
337
        raise errors.ParameterError("Duplicate key '%s' in option %s" %
338
                                    (key, opt))
339
      kv_dict[key] = val
340
  return kv_dict
341

    
342

    
343
def check_ident_key_val(option, opt, value):
344
  """Custom parser for ident:key=val,key=val options.
345

346
  This will store the parsed values as a tuple (ident, {key: val}). As such,
347
  multiple uses of this option via action=append is possible.
348

349
  """
350
  if ":" not in value:
351
    ident, rest = value, ''
352
  else:
353
    ident, rest = value.split(":", 1)
354

    
355
  if ident.startswith(NO_PREFIX):
356
    if rest:
357
      msg = "Cannot pass options when removing parameter groups: %s" % value
358
      raise errors.ParameterError(msg)
359
    retval = (ident[len(NO_PREFIX):], False)
360
  elif ident.startswith(UN_PREFIX):
361
    if rest:
362
      msg = "Cannot pass options when removing parameter groups: %s" % value
363
      raise errors.ParameterError(msg)
364
    retval = (ident[len(UN_PREFIX):], None)
365
  else:
366
    kv_dict = _SplitKeyVal(opt, rest)
367
    retval = (ident, kv_dict)
368
  return retval
369

    
370

    
371
def check_key_val(option, opt, value):
372
  """Custom parser class for key=val,key=val options.
373

374
  This will store the parsed values as a dict {key: val}.
375

376
  """
377
  return _SplitKeyVal(opt, value)
378

    
379

    
380
# completion_suggestion is normally a list. Using numeric values not evaluating
381
# to False for dynamic completion.
382
(OPT_COMPL_MANY_NODES,
383
 OPT_COMPL_ONE_NODE,
384
 OPT_COMPL_ONE_INSTANCE,
385
 OPT_COMPL_ONE_OS,
386
 OPT_COMPL_ONE_IALLOCATOR,
387
 OPT_COMPL_INST_ADD_NODES) = range(100, 106)
388

    
389
OPT_COMPL_ALL = frozenset([
390
  OPT_COMPL_MANY_NODES,
391
  OPT_COMPL_ONE_NODE,
392
  OPT_COMPL_ONE_INSTANCE,
393
  OPT_COMPL_ONE_OS,
394
  OPT_COMPL_ONE_IALLOCATOR,
395
  OPT_COMPL_INST_ADD_NODES,
396
  ])
397

    
398

    
399
class CliOption(Option):
400
  """Custom option class for optparse.
401

402
  """
403
  ATTRS = Option.ATTRS + [
404
    "completion_suggest",
405
    ]
406
  TYPES = Option.TYPES + (
407
    "identkeyval",
408
    "keyval",
409
    "unit",
410
    )
411
  TYPE_CHECKER = Option.TYPE_CHECKER.copy()
412
  TYPE_CHECKER["identkeyval"] = check_ident_key_val
413
  TYPE_CHECKER["keyval"] = check_key_val
414
  TYPE_CHECKER["unit"] = check_unit
415

    
416

    
417
# optparse.py sets make_option, so we do it for our own option class, too
418
cli_option = CliOption
419

    
420

    
421
DEBUG_OPT = cli_option("-d", "--debug", default=False,
422
                       action="store_true",
423
                       help="Turn debugging on")
424

    
425
NOHDR_OPT = cli_option("--no-headers", default=False,
426
                       action="store_true", dest="no_headers",
427
                       help="Don't display column headers")
428

    
429
SEP_OPT = cli_option("--separator", default=None,
430
                     action="store", dest="separator",
431
                     help=("Separator between output fields"
432
                           " (defaults to one space)"))
433

    
434
USEUNITS_OPT = cli_option("--units", default=None,
435
                          dest="units", choices=('h', 'm', 'g', 't'),
436
                          help="Specify units for output (one of hmgt)")
437

    
438
FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
439
                        type="string", metavar="FIELDS",
440
                        help="Comma separated list of output fields")
441

    
442
FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
443
                       default=False, help="Force the operation")
444

    
445
CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
446
                         default=False, help="Do not require confirmation")
447

    
448
TAG_SRC_OPT = cli_option("--from", dest="tags_source",
449
                         default=None, help="File with tag names")
450

    
451
SUBMIT_OPT = cli_option("--submit", dest="submit_only",
452
                        default=False, action="store_true",
453
                        help=("Submit the job and return the job ID, but"
454
                              " don't wait for the job to finish"))
455

    
456
SYNC_OPT = cli_option("--sync", dest="do_locking",
457
                      default=False, action="store_true",
458
                      help=("Grab locks while doing the queries"
459
                            " in order to ensure more consistent results"))
460

    
461
_DRY_RUN_OPT = cli_option("--dry-run", default=False,
462
                          action="store_true",
463
                          help=("Do not execute the operation, just run the"
464
                                " check steps and verify it it could be"
465
                                " executed"))
466

    
467
VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
468
                         action="store_true",
469
                         help="Increase the verbosity of the operation")
470

    
471
DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
472
                              action="store_true", dest="simulate_errors",
473
                              help="Debugging option that makes the operation"
474
                              " treat most runtime checks as failed")
475

    
476
NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
477
                        default=True, action="store_false",
478
                        help="Don't wait for sync (DANGEROUS!)")
479

    
480
DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
481
                               help="Custom disk setup (diskless, file,"
482
                               " plain or drbd)",
483
                               default=None, metavar="TEMPL",
484
                               choices=list(constants.DISK_TEMPLATES))
485

    
486
NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
487
                        help="Do not create any network cards for"
488
                        " the instance")
489

    
490

    
491
def _ParseArgs(argv, commands, aliases):
492
  """Parser for the command line arguments.
493

494
  This function parses the arguments and returns the function which
495
  must be executed together with its (modified) arguments.
496

497
  @param argv: the command line
498
  @param commands: dictionary with special contents, see the design
499
      doc for cmdline handling
500
  @param aliases: dictionary with command aliases {'alias': 'target, ...}
501

502
  """
503
  if len(argv) == 0:
504
    binary = "<command>"
505
  else:
506
    binary = argv[0].split("/")[-1]
507

    
508
  if len(argv) > 1 and argv[1] == "--version":
509
    ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
510
    # Quit right away. That way we don't have to care about this special
511
    # argument. optparse.py does it the same.
512
    sys.exit(0)
513

    
514
  if len(argv) < 2 or not (argv[1] in commands or
515
                           argv[1] in aliases):
516
    # let's do a nice thing
517
    sortedcmds = commands.keys()
518
    sortedcmds.sort()
519

    
520
    ToStdout("Usage: %s {command} [options...] [argument...]", binary)
521
    ToStdout("%s <command> --help to see details, or man %s", binary, binary)
522
    ToStdout("")
523

    
524
    # compute the max line length for cmd + usage
525
    mlen = max([len(" %s" % cmd) for cmd in commands])
526
    mlen = min(60, mlen) # should not get here...
527

    
528
    # and format a nice command list
529
    ToStdout("Commands:")
530
    for cmd in sortedcmds:
531
      cmdstr = " %s" % (cmd,)
532
      help_text = commands[cmd][4]
533
      help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
534
      ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
535
      for line in help_lines:
536
        ToStdout("%-*s   %s", mlen, "", line)
537

    
538
    ToStdout("")
539

    
540
    return None, None, None
541

    
542
  # get command, unalias it, and look it up in commands
543
  cmd = argv.pop(1)
544
  if cmd in aliases:
545
    if cmd in commands:
546
      raise errors.ProgrammerError("Alias '%s' overrides an existing"
547
                                   " command" % cmd)
548

    
549
    if aliases[cmd] not in commands:
550
      raise errors.ProgrammerError("Alias '%s' maps to non-existing"
551
                                   " command '%s'" % (cmd, aliases[cmd]))
552

    
553
    cmd = aliases[cmd]
554

    
555
  func, args_def, parser_opts, usage, description = commands[cmd]
556
  parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT],
557
                        description=description,
558
                        formatter=TitledHelpFormatter(),
559
                        usage="%%prog %s %s" % (cmd, usage))
560
  parser.disable_interspersed_args()
561
  options, args = parser.parse_args()
562

    
563
  if not _CheckArguments(cmd, args_def, args):
564
    return None, None, None
565

    
566
  return func, options, args
567

    
568

    
569
def _CheckArguments(cmd, args_def, args):
570
  """Verifies the arguments using the argument definition.
571

572
  Algorithm:
573

574
    1. Abort with error if values specified by user but none expected.
575

576
    1. For each argument in definition
577

578
      1. Keep running count of minimum number of values (min_count)
579
      1. Keep running count of maximum number of values (max_count)
580
      1. If it has an unlimited number of values
581

582
        1. Abort with error if it's not the last argument in the definition
583

584
    1. If last argument has limited number of values
585

586
      1. Abort with error if number of values doesn't match or is too large
587

588
    1. Abort with error if user didn't pass enough values (min_count)
589

590
  """
591
  if args and not args_def:
592
    ToStderr("Error: Command %s expects no arguments", cmd)
593
    return False
594

    
595
  min_count = None
596
  max_count = None
597
  check_max = None
598

    
599
  last_idx = len(args_def) - 1
600

    
601
  for idx, arg in enumerate(args_def):
602
    if min_count is None:
603
      min_count = arg.min
604
    elif arg.min is not None:
605
      min_count += arg.min
606

    
607
    if max_count is None:
608
      max_count = arg.max
609
    elif arg.max is not None:
610
      max_count += arg.max
611

    
612
    if idx == last_idx:
613
      check_max = (arg.max is not None)
614

    
615
    elif arg.max is None:
616
      raise errors.ProgrammerError("Only the last argument can have max=None")
617

    
618
  if check_max:
619
    # Command with exact number of arguments
620
    if (min_count is not None and max_count is not None and
621
        min_count == max_count and len(args) != min_count):
622
      ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
623
      return False
624

    
625
    # Command with limited number of arguments
626
    if max_count is not None and len(args) > max_count:
627
      ToStderr("Error: Command %s expects only %d argument(s)",
628
               cmd, max_count)
629
      return False
630

    
631
  # Command with some required arguments
632
  if min_count is not None and len(args) < min_count:
633
    ToStderr("Error: Command %s expects at least %d argument(s)",
634
             cmd, min_count)
635
    return False
636

    
637
  return True
638

    
639

    
640
def SplitNodeOption(value):
641
  """Splits the value of a --node option.
642

643
  """
644
  if value and ':' in value:
645
    return value.split(':', 1)
646
  else:
647
    return (value, None)
648

    
649

    
650
def UsesRPC(fn):
651
  def wrapper(*args, **kwargs):
652
    rpc.Init()
653
    try:
654
      return fn(*args, **kwargs)
655
    finally:
656
      rpc.Shutdown()
657
  return wrapper
658

    
659

    
660
def AskUser(text, choices=None):
661
  """Ask the user a question.
662

663
  @param text: the question to ask
664

665
  @param choices: list with elements tuples (input_char, return_value,
666
      description); if not given, it will default to: [('y', True,
667
      'Perform the operation'), ('n', False, 'Do no do the operation')];
668
      note that the '?' char is reserved for help
669

670
  @return: one of the return values from the choices list; if input is
671
      not possible (i.e. not running with a tty, we return the last
672
      entry from the list
673

674
  """
675
  if choices is None:
676
    choices = [('y', True, 'Perform the operation'),
677
               ('n', False, 'Do not perform the operation')]
678
  if not choices or not isinstance(choices, list):
679
    raise errors.ProgrammerError("Invalid choices argument to AskUser")
680
  for entry in choices:
681
    if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
682
      raise errors.ProgrammerError("Invalid choices element to AskUser")
683

    
684
  answer = choices[-1][1]
685
  new_text = []
686
  for line in text.splitlines():
687
    new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
688
  text = "\n".join(new_text)
689
  try:
690
    f = file("/dev/tty", "a+")
691
  except IOError:
692
    return answer
693
  try:
694
    chars = [entry[0] for entry in choices]
695
    chars[-1] = "[%s]" % chars[-1]
696
    chars.append('?')
697
    maps = dict([(entry[0], entry[1]) for entry in choices])
698
    while True:
699
      f.write(text)
700
      f.write('\n')
701
      f.write("/".join(chars))
702
      f.write(": ")
703
      line = f.readline(2).strip().lower()
704
      if line in maps:
705
        answer = maps[line]
706
        break
707
      elif line == '?':
708
        for entry in choices:
709
          f.write(" %s - %s\n" % (entry[0], entry[2]))
710
        f.write("\n")
711
        continue
712
  finally:
713
    f.close()
714
  return answer
715

    
716

    
717
class JobSubmittedException(Exception):
718
  """Job was submitted, client should exit.
719

720
  This exception has one argument, the ID of the job that was
721
  submitted. The handler should print this ID.
722

723
  This is not an error, just a structured way to exit from clients.
724

725
  """
726

    
727

    
728
def SendJob(ops, cl=None):
729
  """Function to submit an opcode without waiting for the results.
730

731
  @type ops: list
732
  @param ops: list of opcodes
733
  @type cl: luxi.Client
734
  @param cl: the luxi client to use for communicating with the master;
735
             if None, a new client will be created
736

737
  """
738
  if cl is None:
739
    cl = GetClient()
740

    
741
  job_id = cl.SubmitJob(ops)
742

    
743
  return job_id
744

    
745

    
746
def PollJob(job_id, cl=None, feedback_fn=None):
747
  """Function to poll for the result of a job.
748

749
  @type job_id: job identified
750
  @param job_id: the job to poll for results
751
  @type cl: luxi.Client
752
  @param cl: the luxi client to use for communicating with the master;
753
             if None, a new client will be created
754

755
  """
756
  if cl is None:
757
    cl = GetClient()
758

    
759
  prev_job_info = None
760
  prev_logmsg_serial = None
761

    
762
  while True:
763
    result = cl.WaitForJobChange(job_id, ["status"], prev_job_info,
764
                                 prev_logmsg_serial)
765
    if not result:
766
      # job not found, go away!
767
      raise errors.JobLost("Job with id %s lost" % job_id)
768

    
769
    # Split result, a tuple of (field values, log entries)
770
    (job_info, log_entries) = result
771
    (status, ) = job_info
772

    
773
    if log_entries:
774
      for log_entry in log_entries:
775
        (serial, timestamp, _, message) = log_entry
776
        if callable(feedback_fn):
777
          feedback_fn(log_entry[1:])
778
        else:
779
          encoded = utils.SafeEncode(message)
780
          ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded)
781
        prev_logmsg_serial = max(prev_logmsg_serial, serial)
782

    
783
    # TODO: Handle canceled and archived jobs
784
    elif status in (constants.JOB_STATUS_SUCCESS,
785
                    constants.JOB_STATUS_ERROR,
786
                    constants.JOB_STATUS_CANCELING,
787
                    constants.JOB_STATUS_CANCELED):
788
      break
789

    
790
    prev_job_info = job_info
791

    
792
  jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
793
  if not jobs:
794
    raise errors.JobLost("Job with id %s lost" % job_id)
795

    
796
  status, opstatus, result = jobs[0]
797
  if status == constants.JOB_STATUS_SUCCESS:
798
    return result
799
  elif status in (constants.JOB_STATUS_CANCELING,
800
                  constants.JOB_STATUS_CANCELED):
801
    raise errors.OpExecError("Job was canceled")
802
  else:
803
    has_ok = False
804
    for idx, (status, msg) in enumerate(zip(opstatus, result)):
805
      if status == constants.OP_STATUS_SUCCESS:
806
        has_ok = True
807
      elif status == constants.OP_STATUS_ERROR:
808
        errors.MaybeRaise(msg)
809
        if has_ok:
810
          raise errors.OpExecError("partial failure (opcode %d): %s" %
811
                                   (idx, msg))
812
        else:
813
          raise errors.OpExecError(str(msg))
814
    # default failure mode
815
    raise errors.OpExecError(result)
816

    
817

    
818
def SubmitOpCode(op, cl=None, feedback_fn=None):
819
  """Legacy function to submit an opcode.
820

821
  This is just a simple wrapper over the construction of the processor
822
  instance. It should be extended to better handle feedback and
823
  interaction functions.
824

825
  """
826
  if cl is None:
827
    cl = GetClient()
828

    
829
  job_id = SendJob([op], cl)
830

    
831
  op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
832

    
833
  return op_results[0]
834

    
835

    
836
def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
837
  """Wrapper around SubmitOpCode or SendJob.
838

839
  This function will decide, based on the 'opts' parameter, whether to
840
  submit and wait for the result of the opcode (and return it), or
841
  whether to just send the job and print its identifier. It is used in
842
  order to simplify the implementation of the '--submit' option.
843

844
  It will also add the dry-run parameter from the options passed, if true.
845

846
  """
847
  if opts and opts.dry_run:
848
    op.dry_run = opts.dry_run
849
  if opts and opts.submit_only:
850
    job_id = SendJob([op], cl=cl)
851
    raise JobSubmittedException(job_id)
852
  else:
853
    return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn)
854

    
855

    
856
def GetClient():
857
  # TODO: Cache object?
858
  try:
859
    client = luxi.Client()
860
  except luxi.NoMasterError:
861
    master, myself = ssconf.GetMasterAndMyself()
862
    if master != myself:
863
      raise errors.OpPrereqError("This is not the master node, please connect"
864
                                 " to node '%s' and rerun the command" %
865
                                 master)
866
    else:
867
      raise
868
  return client
869

    
870

    
871
def FormatError(err):
872
  """Return a formatted error message for a given error.
873

874
  This function takes an exception instance and returns a tuple
875
  consisting of two values: first, the recommended exit code, and
876
  second, a string describing the error message (not
877
  newline-terminated).
878

879
  """
880
  retcode = 1
881
  obuf = StringIO()
882
  msg = str(err)
883
  if isinstance(err, errors.ConfigurationError):
884
    txt = "Corrupt configuration file: %s" % msg
885
    logging.error(txt)
886
    obuf.write(txt + "\n")
887
    obuf.write("Aborting.")
888
    retcode = 2
889
  elif isinstance(err, errors.HooksAbort):
890
    obuf.write("Failure: hooks execution failed:\n")
891
    for node, script, out in err.args[0]:
892
      if out:
893
        obuf.write("  node: %s, script: %s, output: %s\n" %
894
                   (node, script, out))
895
      else:
896
        obuf.write("  node: %s, script: %s (no output)\n" %
897
                   (node, script))
898
  elif isinstance(err, errors.HooksFailure):
899
    obuf.write("Failure: hooks general failure: %s" % msg)
900
  elif isinstance(err, errors.ResolverError):
901
    this_host = utils.HostInfo.SysName()
902
    if err.args[0] == this_host:
903
      msg = "Failure: can't resolve my own hostname ('%s')"
904
    else:
905
      msg = "Failure: can't resolve hostname '%s'"
906
    obuf.write(msg % err.args[0])
907
  elif isinstance(err, errors.OpPrereqError):
908
    obuf.write("Failure: prerequisites not met for this"
909
               " operation:\n%s" % msg)
910
  elif isinstance(err, errors.OpExecError):
911
    obuf.write("Failure: command execution error:\n%s" % msg)
912
  elif isinstance(err, errors.TagError):
913
    obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
914
  elif isinstance(err, errors.JobQueueDrainError):
915
    obuf.write("Failure: the job queue is marked for drain and doesn't"
916
               " accept new requests\n")
917
  elif isinstance(err, errors.JobQueueFull):
918
    obuf.write("Failure: the job queue is full and doesn't accept new"
919
               " job submissions until old jobs are archived\n")
920
  elif isinstance(err, errors.TypeEnforcementError):
921
    obuf.write("Parameter Error: %s" % msg)
922
  elif isinstance(err, errors.ParameterError):
923
    obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
924
  elif isinstance(err, errors.GenericError):
925
    obuf.write("Unhandled Ganeti error: %s" % msg)
926
  elif isinstance(err, luxi.NoMasterError):
927
    obuf.write("Cannot communicate with the master daemon.\nIs it running"
928
               " and listening for connections?")
929
  elif isinstance(err, luxi.TimeoutError):
930
    obuf.write("Timeout while talking to the master daemon. Error:\n"
931
               "%s" % msg)
932
  elif isinstance(err, luxi.ProtocolError):
933
    obuf.write("Unhandled protocol error while talking to the master daemon:\n"
934
               "%s" % msg)
935
  elif isinstance(err, JobSubmittedException):
936
    obuf.write("JobID: %s\n" % err.args[0])
937
    retcode = 0
938
  else:
939
    obuf.write("Unhandled exception: %s" % msg)
940
  return retcode, obuf.getvalue().rstrip('\n')
941

    
942

    
943
def GenericMain(commands, override=None, aliases=None):
944
  """Generic main function for all the gnt-* commands.
945

946
  Arguments:
947
    - commands: a dictionary with a special structure, see the design doc
948
                for command line handling.
949
    - override: if not None, we expect a dictionary with keys that will
950
                override command line options; this can be used to pass
951
                options from the scripts to generic functions
952
    - aliases: dictionary with command aliases {'alias': 'target, ...}
953

954
  """
955
  # save the program name and the entire command line for later logging
956
  if sys.argv:
957
    binary = os.path.basename(sys.argv[0]) or sys.argv[0]
958
    if len(sys.argv) >= 2:
959
      binary += " " + sys.argv[1]
960
      old_cmdline = " ".join(sys.argv[2:])
961
    else:
962
      old_cmdline = ""
963
  else:
964
    binary = "<unknown program>"
965
    old_cmdline = ""
966

    
967
  if aliases is None:
968
    aliases = {}
969

    
970
  try:
971
    func, options, args = _ParseArgs(sys.argv, commands, aliases)
972
  except errors.ParameterError, err:
973
    result, err_msg = FormatError(err)
974
    ToStderr(err_msg)
975
    return 1
976

    
977
  if func is None: # parse error
978
    return 1
979

    
980
  if override is not None:
981
    for key, val in override.iteritems():
982
      setattr(options, key, val)
983

    
984
  utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
985
                     stderr_logging=True, program=binary)
986

    
987
  if old_cmdline:
988
    logging.info("run with arguments '%s'", old_cmdline)
989
  else:
990
    logging.info("run with no arguments")
991

    
992
  try:
993
    result = func(options, args)
994
  except (errors.GenericError, luxi.ProtocolError,
995
          JobSubmittedException), err:
996
    result, err_msg = FormatError(err)
997
    logging.exception("Error during command processing")
998
    ToStderr(err_msg)
999

    
1000
  return result
1001

    
1002

    
1003
def GenerateTable(headers, fields, separator, data,
1004
                  numfields=None, unitfields=None,
1005
                  units=None):
1006
  """Prints a table with headers and different fields.
1007

1008
  @type headers: dict
1009
  @param headers: dictionary mapping field names to headers for
1010
      the table
1011
  @type fields: list
1012
  @param fields: the field names corresponding to each row in
1013
      the data field
1014
  @param separator: the separator to be used; if this is None,
1015
      the default 'smart' algorithm is used which computes optimal
1016
      field width, otherwise just the separator is used between
1017
      each field
1018
  @type data: list
1019
  @param data: a list of lists, each sublist being one row to be output
1020
  @type numfields: list
1021
  @param numfields: a list with the fields that hold numeric
1022
      values and thus should be right-aligned
1023
  @type unitfields: list
1024
  @param unitfields: a list with the fields that hold numeric
1025
      values that should be formatted with the units field
1026
  @type units: string or None
1027
  @param units: the units we should use for formatting, or None for
1028
      automatic choice (human-readable for non-separator usage, otherwise
1029
      megabytes); this is a one-letter string
1030

1031
  """
1032
  if units is None:
1033
    if separator:
1034
      units = "m"
1035
    else:
1036
      units = "h"
1037

    
1038
  if numfields is None:
1039
    numfields = []
1040
  if unitfields is None:
1041
    unitfields = []
1042

    
1043
  numfields = utils.FieldSet(*numfields)
1044
  unitfields = utils.FieldSet(*unitfields)
1045

    
1046
  format_fields = []
1047
  for field in fields:
1048
    if headers and field not in headers:
1049
      # TODO: handle better unknown fields (either revert to old
1050
      # style of raising exception, or deal more intelligently with
1051
      # variable fields)
1052
      headers[field] = field
1053
    if separator is not None:
1054
      format_fields.append("%s")
1055
    elif numfields.Matches(field):
1056
      format_fields.append("%*s")
1057
    else:
1058
      format_fields.append("%-*s")
1059

    
1060
  if separator is None:
1061
    mlens = [0 for name in fields]
1062
    format = ' '.join(format_fields)
1063
  else:
1064
    format = separator.replace("%", "%%").join(format_fields)
1065

    
1066
  for row in data:
1067
    if row is None:
1068
      continue
1069
    for idx, val in enumerate(row):
1070
      if unitfields.Matches(fields[idx]):
1071
        try:
1072
          val = int(val)
1073
        except ValueError:
1074
          pass
1075
        else:
1076
          val = row[idx] = utils.FormatUnit(val, units)
1077
      val = row[idx] = str(val)
1078
      if separator is None:
1079
        mlens[idx] = max(mlens[idx], len(val))
1080

    
1081
  result = []
1082
  if headers:
1083
    args = []
1084
    for idx, name in enumerate(fields):
1085
      hdr = headers[name]
1086
      if separator is None:
1087
        mlens[idx] = max(mlens[idx], len(hdr))
1088
        args.append(mlens[idx])
1089
      args.append(hdr)
1090
    result.append(format % tuple(args))
1091

    
1092
  for line in data:
1093
    args = []
1094
    if line is None:
1095
      line = ['-' for _ in fields]
1096
    for idx in xrange(len(fields)):
1097
      if separator is None:
1098
        args.append(mlens[idx])
1099
      args.append(line[idx])
1100
    result.append(format % tuple(args))
1101

    
1102
  return result
1103

    
1104

    
1105
def FormatTimestamp(ts):
1106
  """Formats a given timestamp.
1107

1108
  @type ts: timestamp
1109
  @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
1110

1111
  @rtype: string
1112
  @return: a string with the formatted timestamp
1113

1114
  """
1115
  if not isinstance (ts, (tuple, list)) or len(ts) != 2:
1116
    return '?'
1117
  sec, usec = ts
1118
  return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
1119

    
1120

    
1121
def ParseTimespec(value):
1122
  """Parse a time specification.
1123

1124
  The following suffixed will be recognized:
1125

1126
    - s: seconds
1127
    - m: minutes
1128
    - h: hours
1129
    - d: day
1130
    - w: weeks
1131

1132
  Without any suffix, the value will be taken to be in seconds.
1133

1134
  """
1135
  value = str(value)
1136
  if not value:
1137
    raise errors.OpPrereqError("Empty time specification passed")
1138
  suffix_map = {
1139
    's': 1,
1140
    'm': 60,
1141
    'h': 3600,
1142
    'd': 86400,
1143
    'w': 604800,
1144
    }
1145
  if value[-1] not in suffix_map:
1146
    try:
1147
      value = int(value)
1148
    except ValueError:
1149
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1150
  else:
1151
    multiplier = suffix_map[value[-1]]
1152
    value = value[:-1]
1153
    if not value: # no data left after stripping the suffix
1154
      raise errors.OpPrereqError("Invalid time specification (only"
1155
                                 " suffix passed)")
1156
    try:
1157
      value = int(value) * multiplier
1158
    except ValueError:
1159
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1160
  return value
1161

    
1162

    
1163
def GetOnlineNodes(nodes, cl=None, nowarn=False):
1164
  """Returns the names of online nodes.
1165

1166
  This function will also log a warning on stderr with the names of
1167
  the online nodes.
1168

1169
  @param nodes: if not empty, use only this subset of nodes (minus the
1170
      offline ones)
1171
  @param cl: if not None, luxi client to use
1172
  @type nowarn: boolean
1173
  @param nowarn: by default, this function will output a note with the
1174
      offline nodes that are skipped; if this parameter is True the
1175
      note is not displayed
1176

1177
  """
1178
  if cl is None:
1179
    cl = GetClient()
1180

    
1181
  result = cl.QueryNodes(names=nodes, fields=["name", "offline"],
1182
                         use_locking=False)
1183
  offline = [row[0] for row in result if row[1]]
1184
  if offline and not nowarn:
1185
    ToStderr("Note: skipping offline node(s): %s" % ", ".join(offline))
1186
  return [row[0] for row in result if not row[1]]
1187

    
1188

    
1189
def _ToStream(stream, txt, *args):
1190
  """Write a message to a stream, bypassing the logging system
1191

1192
  @type stream: file object
1193
  @param stream: the file to which we should write
1194
  @type txt: str
1195
  @param txt: the message
1196

1197
  """
1198
  if args:
1199
    args = tuple(args)
1200
    stream.write(txt % args)
1201
  else:
1202
    stream.write(txt)
1203
  stream.write('\n')
1204
  stream.flush()
1205

    
1206

    
1207
def ToStdout(txt, *args):
1208
  """Write a message to stdout only, bypassing the logging system
1209

1210
  This is just a wrapper over _ToStream.
1211

1212
  @type txt: str
1213
  @param txt: the message
1214

1215
  """
1216
  _ToStream(sys.stdout, txt, *args)
1217

    
1218

    
1219
def ToStderr(txt, *args):
1220
  """Write a message to stderr only, bypassing the logging system
1221

1222
  This is just a wrapper over _ToStream.
1223

1224
  @type txt: str
1225
  @param txt: the message
1226

1227
  """
1228
  _ToStream(sys.stderr, txt, *args)
1229

    
1230

    
1231
class JobExecutor(object):
1232
  """Class which manages the submission and execution of multiple jobs.
1233

1234
  Note that instances of this class should not be reused between
1235
  GetResults() calls.
1236

1237
  """
1238
  def __init__(self, cl=None, verbose=True):
1239
    self.queue = []
1240
    if cl is None:
1241
      cl = GetClient()
1242
    self.cl = cl
1243
    self.verbose = verbose
1244
    self.jobs = []
1245

    
1246
  def QueueJob(self, name, *ops):
1247
    """Record a job for later submit.
1248

1249
    @type name: string
1250
    @param name: a description of the job, will be used in WaitJobSet
1251
    """
1252
    self.queue.append((name, ops))
1253

    
1254
  def SubmitPending(self):
1255
    """Submit all pending jobs.
1256

1257
    """
1258
    results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
1259
    for ((status, data), (name, _)) in zip(results, self.queue):
1260
      self.jobs.append((status, data, name))
1261

    
1262
  def GetResults(self):
1263
    """Wait for and return the results of all jobs.
1264

1265
    @rtype: list
1266
    @return: list of tuples (success, job results), in the same order
1267
        as the submitted jobs; if a job has failed, instead of the result
1268
        there will be the error message
1269

1270
    """
1271
    if not self.jobs:
1272
      self.SubmitPending()
1273
    results = []
1274
    if self.verbose:
1275
      ok_jobs = [row[1] for row in self.jobs if row[0]]
1276
      if ok_jobs:
1277
        ToStdout("Submitted jobs %s", ", ".join(ok_jobs))
1278
    for submit_status, jid, name in self.jobs:
1279
      if not submit_status:
1280
        ToStderr("Failed to submit job for %s: %s", name, jid)
1281
        results.append((False, jid))
1282
        continue
1283
      if self.verbose:
1284
        ToStdout("Waiting for job %s for %s...", jid, name)
1285
      try:
1286
        job_result = PollJob(jid, cl=self.cl)
1287
        success = True
1288
      except (errors.GenericError, luxi.ProtocolError), err:
1289
        _, job_result = FormatError(err)
1290
        success = False
1291
        # the error message will always be shown, verbose or not
1292
        ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
1293

    
1294
      results.append((success, job_result))
1295
    return results
1296

    
1297
  def WaitOrShow(self, wait):
1298
    """Wait for job results or only print the job IDs.
1299

1300
    @type wait: boolean
1301
    @param wait: whether to wait or not
1302

1303
    """
1304
    if wait:
1305
      return self.GetResults()
1306
    else:
1307
      if not self.jobs:
1308
        self.SubmitPending()
1309
      for status, result, name in self.jobs:
1310
        if status:
1311
          ToStdout("%s: %s", result, name)
1312
        else:
1313
          ToStderr("Failure for %s: %s", name, result)