Statistics
| Branch: | Tag: | Revision:

root / lib / cli.py @ 4a25828c

History | View | Annotate | Download (37.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module dealing with command line parsing"""
23

    
24

    
25
import sys
26
import textwrap
27
import os.path
28
import copy
29
import time
30
import logging
31
from cStringIO import StringIO
32

    
33
from ganeti import utils
34
from ganeti import errors
35
from ganeti import constants
36
from ganeti import opcodes
37
from ganeti import luxi
38
from ganeti import ssconf
39
from ganeti import rpc
40

    
41
from optparse import (OptionParser, TitledHelpFormatter,
42
                      Option, OptionValueError)
43

    
44

    
45
__all__ = [
46
  # Command line options
47
  "CONFIRM_OPT",
48
  "DEBUG_OPT",
49
  "DEBUG_SIMERR_OPT",
50
  "DISK_TEMPLATE_OPT",
51
  "FIELDS_OPT",
52
  "FILESTORE_DIR_OPT",
53
  "FORCE_OPT",
54
  "NOHDR_OPT",
55
  "NONICS_OPT",
56
  "NWSYNC_OPT",
57
  "SEP_OPT",
58
  "SUBMIT_OPT",
59
  "SYNC_OPT",
60
  "TAG_SRC_OPT",
61
  "USEUNITS_OPT",
62
  "VERBOSE_OPT",
63
  # Generic functions for CLI programs
64
  "GenericMain",
65
  "GetClient",
66
  "GetOnlineNodes",
67
  "JobExecutor",
68
  "JobSubmittedException",
69
  "ParseTimespec",
70
  "SubmitOpCode",
71
  "SubmitOrSend",
72
  "UsesRPC",
73
  # Formatting functions
74
  "ToStderr", "ToStdout",
75
  "FormatError",
76
  "GenerateTable",
77
  "AskUser",
78
  "FormatTimestamp",
79
  # Tags functions
80
  "ListTags",
81
  "AddTags",
82
  "RemoveTags",
83
  # command line options support infrastructure
84
  "ARGS_MANY_INSTANCES",
85
  "ARGS_MANY_NODES",
86
  "ARGS_NONE",
87
  "ARGS_ONE_INSTANCE",
88
  "ARGS_ONE_NODE",
89
  "ArgChoice",
90
  "ArgCommand",
91
  "ArgFile",
92
  "ArgHost",
93
  "ArgInstance",
94
  "ArgJobId",
95
  "ArgNode",
96
  "ArgSuggest",
97
  "ArgUnknown",
98
  "OPT_COMPL_INST_ADD_NODES",
99
  "OPT_COMPL_MANY_NODES",
100
  "OPT_COMPL_ONE_IALLOCATOR",
101
  "OPT_COMPL_ONE_INSTANCE",
102
  "OPT_COMPL_ONE_NODE",
103
  "OPT_COMPL_ONE_OS",
104
  "cli_option",
105
  "SplitNodeOption",
106
  ]
107

    
108
NO_PREFIX = "no_"
109
UN_PREFIX = "-"
110

    
111

    
112
class _Argument:
113
  def __init__(self, min=0, max=None):
114
    self.min = min
115
    self.max = max
116

    
117
  def __repr__(self):
118
    return ("<%s min=%s max=%s>" %
119
            (self.__class__.__name__, self.min, self.max))
120

    
121

    
122
class ArgSuggest(_Argument):
123
  """Suggesting argument.
124

125
  Value can be any of the ones passed to the constructor.
126

127
  """
128
  def __init__(self, min=0, max=None, choices=None):
129
    _Argument.__init__(self, min=min, max=max)
130
    self.choices = choices
131

    
132
  def __repr__(self):
133
    return ("<%s min=%s max=%s choices=%r>" %
134
            (self.__class__.__name__, self.min, self.max, self.choices))
135

    
136

    
137
class ArgChoice(ArgSuggest):
138
  """Choice argument.
139

140
  Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
141
  but value must be one of the choices.
142

143
  """
144

    
145

    
146
class ArgUnknown(_Argument):
147
  """Unknown argument to program (e.g. determined at runtime).
148

149
  """
150

    
151

    
152
class ArgInstance(_Argument):
153
  """Instances argument.
154

155
  """
156

    
157

    
158
class ArgNode(_Argument):
159
  """Node argument.
160

161
  """
162

    
163
class ArgJobId(_Argument):
164
  """Job ID argument.
165

166
  """
167

    
168

    
169
class ArgFile(_Argument):
170
  """File path argument.
171

172
  """
173

    
174

    
175
class ArgCommand(_Argument):
176
  """Command argument.
177

178
  """
179

    
180

    
181
class ArgHost(_Argument):
182
  """Host argument.
183

184
  """
185

    
186

    
187
ARGS_NONE = []
188
ARGS_MANY_INSTANCES = [ArgInstance()]
189
ARGS_MANY_NODES = [ArgNode()]
190
ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
191
ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
192

    
193

    
194

    
195
def _ExtractTagsObject(opts, args):
196
  """Extract the tag type object.
197

198
  Note that this function will modify its args parameter.
199

200
  """
201
  if not hasattr(opts, "tag_type"):
202
    raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
203
  kind = opts.tag_type
204
  if kind == constants.TAG_CLUSTER:
205
    retval = kind, kind
206
  elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
207
    if not args:
208
      raise errors.OpPrereqError("no arguments passed to the command")
209
    name = args.pop(0)
210
    retval = kind, name
211
  else:
212
    raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
213
  return retval
214

    
215

    
216
def _ExtendTags(opts, args):
217
  """Extend the args if a source file has been given.
218

219
  This function will extend the tags with the contents of the file
220
  passed in the 'tags_source' attribute of the opts parameter. A file
221
  named '-' will be replaced by stdin.
222

223
  """
224
  fname = opts.tags_source
225
  if fname is None:
226
    return
227
  if fname == "-":
228
    new_fh = sys.stdin
229
  else:
230
    new_fh = open(fname, "r")
231
  new_data = []
232
  try:
233
    # we don't use the nice 'new_data = [line.strip() for line in fh]'
234
    # because of python bug 1633941
235
    while True:
236
      line = new_fh.readline()
237
      if not line:
238
        break
239
      new_data.append(line.strip())
240
  finally:
241
    new_fh.close()
242
  args.extend(new_data)
243

    
244

    
245
def ListTags(opts, args):
246
  """List the tags on a given object.
247

248
  This is a generic implementation that knows how to deal with all
249
  three cases of tag objects (cluster, node, instance). The opts
250
  argument is expected to contain a tag_type field denoting what
251
  object type we work on.
252

253
  """
254
  kind, name = _ExtractTagsObject(opts, args)
255
  op = opcodes.OpGetTags(kind=kind, name=name)
256
  result = SubmitOpCode(op)
257
  result = list(result)
258
  result.sort()
259
  for tag in result:
260
    ToStdout(tag)
261

    
262

    
263
def AddTags(opts, args):
264
  """Add tags on a given object.
265

266
  This is a generic implementation that knows how to deal with all
267
  three cases of tag objects (cluster, node, instance). The opts
268
  argument is expected to contain a tag_type field denoting what
269
  object type we work on.
270

271
  """
272
  kind, name = _ExtractTagsObject(opts, args)
273
  _ExtendTags(opts, args)
274
  if not args:
275
    raise errors.OpPrereqError("No tags to be added")
276
  op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
277
  SubmitOpCode(op)
278

    
279

    
280
def RemoveTags(opts, args):
281
  """Remove tags from a given object.
282

283
  This is a generic implementation that knows how to deal with all
284
  three cases of tag objects (cluster, node, instance). The opts
285
  argument is expected to contain a tag_type field denoting what
286
  object type we work on.
287

288
  """
289
  kind, name = _ExtractTagsObject(opts, args)
290
  _ExtendTags(opts, args)
291
  if not args:
292
    raise errors.OpPrereqError("No tags to be removed")
293
  op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
294
  SubmitOpCode(op)
295

    
296

    
297
def check_unit(option, opt, value):
298
  """OptParsers custom converter for units.
299

300
  """
301
  try:
302
    return utils.ParseUnit(value)
303
  except errors.UnitParseError, err:
304
    raise OptionValueError("option %s: %s" % (opt, err))
305

    
306

    
307
def _SplitKeyVal(opt, data):
308
  """Convert a KeyVal string into a dict.
309

310
  This function will convert a key=val[,...] string into a dict. Empty
311
  values will be converted specially: keys which have the prefix 'no_'
312
  will have the value=False and the prefix stripped, the others will
313
  have value=True.
314

315
  @type opt: string
316
  @param opt: a string holding the option name for which we process the
317
      data, used in building error messages
318
  @type data: string
319
  @param data: a string of the format key=val,key=val,...
320
  @rtype: dict
321
  @return: {key=val, key=val}
322
  @raises errors.ParameterError: if there are duplicate keys
323

324
  """
325
  kv_dict = {}
326
  if data:
327
    for elem in data.split(","):
328
      if "=" in elem:
329
        key, val = elem.split("=", 1)
330
      else:
331
        if elem.startswith(NO_PREFIX):
332
          key, val = elem[len(NO_PREFIX):], False
333
        elif elem.startswith(UN_PREFIX):
334
          key, val = elem[len(UN_PREFIX):], None
335
        else:
336
          key, val = elem, True
337
      if key in kv_dict:
338
        raise errors.ParameterError("Duplicate key '%s' in option %s" %
339
                                    (key, opt))
340
      kv_dict[key] = val
341
  return kv_dict
342

    
343

    
344
def check_ident_key_val(option, opt, value):
345
  """Custom parser for ident:key=val,key=val options.
346

347
  This will store the parsed values as a tuple (ident, {key: val}). As such,
348
  multiple uses of this option via action=append is possible.
349

350
  """
351
  if ":" not in value:
352
    ident, rest = value, ''
353
  else:
354
    ident, rest = value.split(":", 1)
355

    
356
  if ident.startswith(NO_PREFIX):
357
    if rest:
358
      msg = "Cannot pass options when removing parameter groups: %s" % value
359
      raise errors.ParameterError(msg)
360
    retval = (ident[len(NO_PREFIX):], False)
361
  elif ident.startswith(UN_PREFIX):
362
    if rest:
363
      msg = "Cannot pass options when removing parameter groups: %s" % value
364
      raise errors.ParameterError(msg)
365
    retval = (ident[len(UN_PREFIX):], None)
366
  else:
367
    kv_dict = _SplitKeyVal(opt, rest)
368
    retval = (ident, kv_dict)
369
  return retval
370

    
371

    
372
def check_key_val(option, opt, value):
373
  """Custom parser class for key=val,key=val options.
374

375
  This will store the parsed values as a dict {key: val}.
376

377
  """
378
  return _SplitKeyVal(opt, value)
379

    
380

    
381
# completion_suggestion is normally a list. Using numeric values not evaluating
382
# to False for dynamic completion.
383
(OPT_COMPL_MANY_NODES,
384
 OPT_COMPL_ONE_NODE,
385
 OPT_COMPL_ONE_INSTANCE,
386
 OPT_COMPL_ONE_OS,
387
 OPT_COMPL_ONE_IALLOCATOR,
388
 OPT_COMPL_INST_ADD_NODES) = range(100, 106)
389

    
390
OPT_COMPL_ALL = frozenset([
391
  OPT_COMPL_MANY_NODES,
392
  OPT_COMPL_ONE_NODE,
393
  OPT_COMPL_ONE_INSTANCE,
394
  OPT_COMPL_ONE_OS,
395
  OPT_COMPL_ONE_IALLOCATOR,
396
  OPT_COMPL_INST_ADD_NODES,
397
  ])
398

    
399

    
400
class CliOption(Option):
401
  """Custom option class for optparse.
402

403
  """
404
  ATTRS = Option.ATTRS + [
405
    "completion_suggest",
406
    ]
407
  TYPES = Option.TYPES + (
408
    "identkeyval",
409
    "keyval",
410
    "unit",
411
    )
412
  TYPE_CHECKER = Option.TYPE_CHECKER.copy()
413
  TYPE_CHECKER["identkeyval"] = check_ident_key_val
414
  TYPE_CHECKER["keyval"] = check_key_val
415
  TYPE_CHECKER["unit"] = check_unit
416

    
417

    
418
# optparse.py sets make_option, so we do it for our own option class, too
419
cli_option = CliOption
420

    
421

    
422
DEBUG_OPT = cli_option("-d", "--debug", default=False,
423
                       action="store_true",
424
                       help="Turn debugging on")
425

    
426
NOHDR_OPT = cli_option("--no-headers", default=False,
427
                       action="store_true", dest="no_headers",
428
                       help="Don't display column headers")
429

    
430
SEP_OPT = cli_option("--separator", default=None,
431
                     action="store", dest="separator",
432
                     help=("Separator between output fields"
433
                           " (defaults to one space)"))
434

    
435
USEUNITS_OPT = cli_option("--units", default=None,
436
                          dest="units", choices=('h', 'm', 'g', 't'),
437
                          help="Specify units for output (one of hmgt)")
438

    
439
FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
440
                        type="string", metavar="FIELDS",
441
                        help="Comma separated list of output fields")
442

    
443
FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
444
                       default=False, help="Force the operation")
445

    
446
CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
447
                         default=False, help="Do not require confirmation")
448

    
449
TAG_SRC_OPT = cli_option("--from", dest="tags_source",
450
                         default=None, help="File with tag names")
451

    
452
SUBMIT_OPT = cli_option("--submit", dest="submit_only",
453
                        default=False, action="store_true",
454
                        help=("Submit the job and return the job ID, but"
455
                              " don't wait for the job to finish"))
456

    
457
SYNC_OPT = cli_option("--sync", dest="do_locking",
458
                      default=False, action="store_true",
459
                      help=("Grab locks while doing the queries"
460
                            " in order to ensure more consistent results"))
461

    
462
_DRY_RUN_OPT = cli_option("--dry-run", default=False,
463
                          action="store_true",
464
                          help=("Do not execute the operation, just run the"
465
                                " check steps and verify it it could be"
466
                                " executed"))
467

    
468
VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
469
                         action="store_true",
470
                         help="Increase the verbosity of the operation")
471

    
472
DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
473
                              action="store_true", dest="simulate_errors",
474
                              help="Debugging option that makes the operation"
475
                              " treat most runtime checks as failed")
476

    
477
NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
478
                        default=True, action="store_false",
479
                        help="Don't wait for sync (DANGEROUS!)")
480

    
481
DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
482
                               help="Custom disk setup (diskless, file,"
483
                               " plain or drbd)",
484
                               default=None, metavar="TEMPL",
485
                               choices=list(constants.DISK_TEMPLATES))
486

    
487
NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
488
                        help="Do not create any network cards for"
489
                        " the instance")
490

    
491
FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
492
                               help="Relative path under default cluster-wide"
493
                               " file storage dir to store file-based disks",
494
                               default=None, metavar="<DIR>")
495

    
496

    
497
def _ParseArgs(argv, commands, aliases):
498
  """Parser for the command line arguments.
499

500
  This function parses the arguments and returns the function which
501
  must be executed together with its (modified) arguments.
502

503
  @param argv: the command line
504
  @param commands: dictionary with special contents, see the design
505
      doc for cmdline handling
506
  @param aliases: dictionary with command aliases {'alias': 'target, ...}
507

508
  """
509
  if len(argv) == 0:
510
    binary = "<command>"
511
  else:
512
    binary = argv[0].split("/")[-1]
513

    
514
  if len(argv) > 1 and argv[1] == "--version":
515
    ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
516
    # Quit right away. That way we don't have to care about this special
517
    # argument. optparse.py does it the same.
518
    sys.exit(0)
519

    
520
  if len(argv) < 2 or not (argv[1] in commands or
521
                           argv[1] in aliases):
522
    # let's do a nice thing
523
    sortedcmds = commands.keys()
524
    sortedcmds.sort()
525

    
526
    ToStdout("Usage: %s {command} [options...] [argument...]", binary)
527
    ToStdout("%s <command> --help to see details, or man %s", binary, binary)
528
    ToStdout("")
529

    
530
    # compute the max line length for cmd + usage
531
    mlen = max([len(" %s" % cmd) for cmd in commands])
532
    mlen = min(60, mlen) # should not get here...
533

    
534
    # and format a nice command list
535
    ToStdout("Commands:")
536
    for cmd in sortedcmds:
537
      cmdstr = " %s" % (cmd,)
538
      help_text = commands[cmd][4]
539
      help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
540
      ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
541
      for line in help_lines:
542
        ToStdout("%-*s   %s", mlen, "", line)
543

    
544
    ToStdout("")
545

    
546
    return None, None, None
547

    
548
  # get command, unalias it, and look it up in commands
549
  cmd = argv.pop(1)
550
  if cmd in aliases:
551
    if cmd in commands:
552
      raise errors.ProgrammerError("Alias '%s' overrides an existing"
553
                                   " command" % cmd)
554

    
555
    if aliases[cmd] not in commands:
556
      raise errors.ProgrammerError("Alias '%s' maps to non-existing"
557
                                   " command '%s'" % (cmd, aliases[cmd]))
558

    
559
    cmd = aliases[cmd]
560

    
561
  func, args_def, parser_opts, usage, description = commands[cmd]
562
  parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT],
563
                        description=description,
564
                        formatter=TitledHelpFormatter(),
565
                        usage="%%prog %s %s" % (cmd, usage))
566
  parser.disable_interspersed_args()
567
  options, args = parser.parse_args()
568

    
569
  if not _CheckArguments(cmd, args_def, args):
570
    return None, None, None
571

    
572
  return func, options, args
573

    
574

    
575
def _CheckArguments(cmd, args_def, args):
576
  """Verifies the arguments using the argument definition.
577

578
  Algorithm:
579

580
    1. Abort with error if values specified by user but none expected.
581

582
    1. For each argument in definition
583

584
      1. Keep running count of minimum number of values (min_count)
585
      1. Keep running count of maximum number of values (max_count)
586
      1. If it has an unlimited number of values
587

588
        1. Abort with error if it's not the last argument in the definition
589

590
    1. If last argument has limited number of values
591

592
      1. Abort with error if number of values doesn't match or is too large
593

594
    1. Abort with error if user didn't pass enough values (min_count)
595

596
  """
597
  if args and not args_def:
598
    ToStderr("Error: Command %s expects no arguments", cmd)
599
    return False
600

    
601
  min_count = None
602
  max_count = None
603
  check_max = None
604

    
605
  last_idx = len(args_def) - 1
606

    
607
  for idx, arg in enumerate(args_def):
608
    if min_count is None:
609
      min_count = arg.min
610
    elif arg.min is not None:
611
      min_count += arg.min
612

    
613
    if max_count is None:
614
      max_count = arg.max
615
    elif arg.max is not None:
616
      max_count += arg.max
617

    
618
    if idx == last_idx:
619
      check_max = (arg.max is not None)
620

    
621
    elif arg.max is None:
622
      raise errors.ProgrammerError("Only the last argument can have max=None")
623

    
624
  if check_max:
625
    # Command with exact number of arguments
626
    if (min_count is not None and max_count is not None and
627
        min_count == max_count and len(args) != min_count):
628
      ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
629
      return False
630

    
631
    # Command with limited number of arguments
632
    if max_count is not None and len(args) > max_count:
633
      ToStderr("Error: Command %s expects only %d argument(s)",
634
               cmd, max_count)
635
      return False
636

    
637
  # Command with some required arguments
638
  if min_count is not None and len(args) < min_count:
639
    ToStderr("Error: Command %s expects at least %d argument(s)",
640
             cmd, min_count)
641
    return False
642

    
643
  return True
644

    
645

    
646
def SplitNodeOption(value):
647
  """Splits the value of a --node option.
648

649
  """
650
  if value and ':' in value:
651
    return value.split(':', 1)
652
  else:
653
    return (value, None)
654

    
655

    
656
def UsesRPC(fn):
657
  def wrapper(*args, **kwargs):
658
    rpc.Init()
659
    try:
660
      return fn(*args, **kwargs)
661
    finally:
662
      rpc.Shutdown()
663
  return wrapper
664

    
665

    
666
def AskUser(text, choices=None):
667
  """Ask the user a question.
668

669
  @param text: the question to ask
670

671
  @param choices: list with elements tuples (input_char, return_value,
672
      description); if not given, it will default to: [('y', True,
673
      'Perform the operation'), ('n', False, 'Do no do the operation')];
674
      note that the '?' char is reserved for help
675

676
  @return: one of the return values from the choices list; if input is
677
      not possible (i.e. not running with a tty, we return the last
678
      entry from the list
679

680
  """
681
  if choices is None:
682
    choices = [('y', True, 'Perform the operation'),
683
               ('n', False, 'Do not perform the operation')]
684
  if not choices or not isinstance(choices, list):
685
    raise errors.ProgrammerError("Invalid choices argument to AskUser")
686
  for entry in choices:
687
    if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
688
      raise errors.ProgrammerError("Invalid choices element to AskUser")
689

    
690
  answer = choices[-1][1]
691
  new_text = []
692
  for line in text.splitlines():
693
    new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
694
  text = "\n".join(new_text)
695
  try:
696
    f = file("/dev/tty", "a+")
697
  except IOError:
698
    return answer
699
  try:
700
    chars = [entry[0] for entry in choices]
701
    chars[-1] = "[%s]" % chars[-1]
702
    chars.append('?')
703
    maps = dict([(entry[0], entry[1]) for entry in choices])
704
    while True:
705
      f.write(text)
706
      f.write('\n')
707
      f.write("/".join(chars))
708
      f.write(": ")
709
      line = f.readline(2).strip().lower()
710
      if line in maps:
711
        answer = maps[line]
712
        break
713
      elif line == '?':
714
        for entry in choices:
715
          f.write(" %s - %s\n" % (entry[0], entry[2]))
716
        f.write("\n")
717
        continue
718
  finally:
719
    f.close()
720
  return answer
721

    
722

    
723
class JobSubmittedException(Exception):
724
  """Job was submitted, client should exit.
725

726
  This exception has one argument, the ID of the job that was
727
  submitted. The handler should print this ID.
728

729
  This is not an error, just a structured way to exit from clients.
730

731
  """
732

    
733

    
734
def SendJob(ops, cl=None):
735
  """Function to submit an opcode without waiting for the results.
736

737
  @type ops: list
738
  @param ops: list of opcodes
739
  @type cl: luxi.Client
740
  @param cl: the luxi client to use for communicating with the master;
741
             if None, a new client will be created
742

743
  """
744
  if cl is None:
745
    cl = GetClient()
746

    
747
  job_id = cl.SubmitJob(ops)
748

    
749
  return job_id
750

    
751

    
752
def PollJob(job_id, cl=None, feedback_fn=None):
753
  """Function to poll for the result of a job.
754

755
  @type job_id: job identified
756
  @param job_id: the job to poll for results
757
  @type cl: luxi.Client
758
  @param cl: the luxi client to use for communicating with the master;
759
             if None, a new client will be created
760

761
  """
762
  if cl is None:
763
    cl = GetClient()
764

    
765
  prev_job_info = None
766
  prev_logmsg_serial = None
767

    
768
  while True:
769
    result = cl.WaitForJobChange(job_id, ["status"], prev_job_info,
770
                                 prev_logmsg_serial)
771
    if not result:
772
      # job not found, go away!
773
      raise errors.JobLost("Job with id %s lost" % job_id)
774

    
775
    # Split result, a tuple of (field values, log entries)
776
    (job_info, log_entries) = result
777
    (status, ) = job_info
778

    
779
    if log_entries:
780
      for log_entry in log_entries:
781
        (serial, timestamp, _, message) = log_entry
782
        if callable(feedback_fn):
783
          feedback_fn(log_entry[1:])
784
        else:
785
          encoded = utils.SafeEncode(message)
786
          ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded)
787
        prev_logmsg_serial = max(prev_logmsg_serial, serial)
788

    
789
    # TODO: Handle canceled and archived jobs
790
    elif status in (constants.JOB_STATUS_SUCCESS,
791
                    constants.JOB_STATUS_ERROR,
792
                    constants.JOB_STATUS_CANCELING,
793
                    constants.JOB_STATUS_CANCELED):
794
      break
795

    
796
    prev_job_info = job_info
797

    
798
  jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
799
  if not jobs:
800
    raise errors.JobLost("Job with id %s lost" % job_id)
801

    
802
  status, opstatus, result = jobs[0]
803
  if status == constants.JOB_STATUS_SUCCESS:
804
    return result
805
  elif status in (constants.JOB_STATUS_CANCELING,
806
                  constants.JOB_STATUS_CANCELED):
807
    raise errors.OpExecError("Job was canceled")
808
  else:
809
    has_ok = False
810
    for idx, (status, msg) in enumerate(zip(opstatus, result)):
811
      if status == constants.OP_STATUS_SUCCESS:
812
        has_ok = True
813
      elif status == constants.OP_STATUS_ERROR:
814
        errors.MaybeRaise(msg)
815
        if has_ok:
816
          raise errors.OpExecError("partial failure (opcode %d): %s" %
817
                                   (idx, msg))
818
        else:
819
          raise errors.OpExecError(str(msg))
820
    # default failure mode
821
    raise errors.OpExecError(result)
822

    
823

    
824
def SubmitOpCode(op, cl=None, feedback_fn=None):
825
  """Legacy function to submit an opcode.
826

827
  This is just a simple wrapper over the construction of the processor
828
  instance. It should be extended to better handle feedback and
829
  interaction functions.
830

831
  """
832
  if cl is None:
833
    cl = GetClient()
834

    
835
  job_id = SendJob([op], cl)
836

    
837
  op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
838

    
839
  return op_results[0]
840

    
841

    
842
def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
843
  """Wrapper around SubmitOpCode or SendJob.
844

845
  This function will decide, based on the 'opts' parameter, whether to
846
  submit and wait for the result of the opcode (and return it), or
847
  whether to just send the job and print its identifier. It is used in
848
  order to simplify the implementation of the '--submit' option.
849

850
  It will also add the dry-run parameter from the options passed, if true.
851

852
  """
853
  if opts and opts.dry_run:
854
    op.dry_run = opts.dry_run
855
  if opts and opts.submit_only:
856
    job_id = SendJob([op], cl=cl)
857
    raise JobSubmittedException(job_id)
858
  else:
859
    return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn)
860

    
861

    
862
def GetClient():
863
  # TODO: Cache object?
864
  try:
865
    client = luxi.Client()
866
  except luxi.NoMasterError:
867
    master, myself = ssconf.GetMasterAndMyself()
868
    if master != myself:
869
      raise errors.OpPrereqError("This is not the master node, please connect"
870
                                 " to node '%s' and rerun the command" %
871
                                 master)
872
    else:
873
      raise
874
  return client
875

    
876

    
877
def FormatError(err):
878
  """Return a formatted error message for a given error.
879

880
  This function takes an exception instance and returns a tuple
881
  consisting of two values: first, the recommended exit code, and
882
  second, a string describing the error message (not
883
  newline-terminated).
884

885
  """
886
  retcode = 1
887
  obuf = StringIO()
888
  msg = str(err)
889
  if isinstance(err, errors.ConfigurationError):
890
    txt = "Corrupt configuration file: %s" % msg
891
    logging.error(txt)
892
    obuf.write(txt + "\n")
893
    obuf.write("Aborting.")
894
    retcode = 2
895
  elif isinstance(err, errors.HooksAbort):
896
    obuf.write("Failure: hooks execution failed:\n")
897
    for node, script, out in err.args[0]:
898
      if out:
899
        obuf.write("  node: %s, script: %s, output: %s\n" %
900
                   (node, script, out))
901
      else:
902
        obuf.write("  node: %s, script: %s (no output)\n" %
903
                   (node, script))
904
  elif isinstance(err, errors.HooksFailure):
905
    obuf.write("Failure: hooks general failure: %s" % msg)
906
  elif isinstance(err, errors.ResolverError):
907
    this_host = utils.HostInfo.SysName()
908
    if err.args[0] == this_host:
909
      msg = "Failure: can't resolve my own hostname ('%s')"
910
    else:
911
      msg = "Failure: can't resolve hostname '%s'"
912
    obuf.write(msg % err.args[0])
913
  elif isinstance(err, errors.OpPrereqError):
914
    obuf.write("Failure: prerequisites not met for this"
915
               " operation:\n%s" % msg)
916
  elif isinstance(err, errors.OpExecError):
917
    obuf.write("Failure: command execution error:\n%s" % msg)
918
  elif isinstance(err, errors.TagError):
919
    obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
920
  elif isinstance(err, errors.JobQueueDrainError):
921
    obuf.write("Failure: the job queue is marked for drain and doesn't"
922
               " accept new requests\n")
923
  elif isinstance(err, errors.JobQueueFull):
924
    obuf.write("Failure: the job queue is full and doesn't accept new"
925
               " job submissions until old jobs are archived\n")
926
  elif isinstance(err, errors.TypeEnforcementError):
927
    obuf.write("Parameter Error: %s" % msg)
928
  elif isinstance(err, errors.ParameterError):
929
    obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
930
  elif isinstance(err, errors.GenericError):
931
    obuf.write("Unhandled Ganeti error: %s" % msg)
932
  elif isinstance(err, luxi.NoMasterError):
933
    obuf.write("Cannot communicate with the master daemon.\nIs it running"
934
               " and listening for connections?")
935
  elif isinstance(err, luxi.TimeoutError):
936
    obuf.write("Timeout while talking to the master daemon. Error:\n"
937
               "%s" % msg)
938
  elif isinstance(err, luxi.ProtocolError):
939
    obuf.write("Unhandled protocol error while talking to the master daemon:\n"
940
               "%s" % msg)
941
  elif isinstance(err, JobSubmittedException):
942
    obuf.write("JobID: %s\n" % err.args[0])
943
    retcode = 0
944
  else:
945
    obuf.write("Unhandled exception: %s" % msg)
946
  return retcode, obuf.getvalue().rstrip('\n')
947

    
948

    
949
def GenericMain(commands, override=None, aliases=None):
950
  """Generic main function for all the gnt-* commands.
951

952
  Arguments:
953
    - commands: a dictionary with a special structure, see the design doc
954
                for command line handling.
955
    - override: if not None, we expect a dictionary with keys that will
956
                override command line options; this can be used to pass
957
                options from the scripts to generic functions
958
    - aliases: dictionary with command aliases {'alias': 'target, ...}
959

960
  """
961
  # save the program name and the entire command line for later logging
962
  if sys.argv:
963
    binary = os.path.basename(sys.argv[0]) or sys.argv[0]
964
    if len(sys.argv) >= 2:
965
      binary += " " + sys.argv[1]
966
      old_cmdline = " ".join(sys.argv[2:])
967
    else:
968
      old_cmdline = ""
969
  else:
970
    binary = "<unknown program>"
971
    old_cmdline = ""
972

    
973
  if aliases is None:
974
    aliases = {}
975

    
976
  try:
977
    func, options, args = _ParseArgs(sys.argv, commands, aliases)
978
  except errors.ParameterError, err:
979
    result, err_msg = FormatError(err)
980
    ToStderr(err_msg)
981
    return 1
982

    
983
  if func is None: # parse error
984
    return 1
985

    
986
  if override is not None:
987
    for key, val in override.iteritems():
988
      setattr(options, key, val)
989

    
990
  utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
991
                     stderr_logging=True, program=binary)
992

    
993
  if old_cmdline:
994
    logging.info("run with arguments '%s'", old_cmdline)
995
  else:
996
    logging.info("run with no arguments")
997

    
998
  try:
999
    result = func(options, args)
1000
  except (errors.GenericError, luxi.ProtocolError,
1001
          JobSubmittedException), err:
1002
    result, err_msg = FormatError(err)
1003
    logging.exception("Error during command processing")
1004
    ToStderr(err_msg)
1005

    
1006
  return result
1007

    
1008

    
1009
def GenerateTable(headers, fields, separator, data,
1010
                  numfields=None, unitfields=None,
1011
                  units=None):
1012
  """Prints a table with headers and different fields.
1013

1014
  @type headers: dict
1015
  @param headers: dictionary mapping field names to headers for
1016
      the table
1017
  @type fields: list
1018
  @param fields: the field names corresponding to each row in
1019
      the data field
1020
  @param separator: the separator to be used; if this is None,
1021
      the default 'smart' algorithm is used which computes optimal
1022
      field width, otherwise just the separator is used between
1023
      each field
1024
  @type data: list
1025
  @param data: a list of lists, each sublist being one row to be output
1026
  @type numfields: list
1027
  @param numfields: a list with the fields that hold numeric
1028
      values and thus should be right-aligned
1029
  @type unitfields: list
1030
  @param unitfields: a list with the fields that hold numeric
1031
      values that should be formatted with the units field
1032
  @type units: string or None
1033
  @param units: the units we should use for formatting, or None for
1034
      automatic choice (human-readable for non-separator usage, otherwise
1035
      megabytes); this is a one-letter string
1036

1037
  """
1038
  if units is None:
1039
    if separator:
1040
      units = "m"
1041
    else:
1042
      units = "h"
1043

    
1044
  if numfields is None:
1045
    numfields = []
1046
  if unitfields is None:
1047
    unitfields = []
1048

    
1049
  numfields = utils.FieldSet(*numfields)
1050
  unitfields = utils.FieldSet(*unitfields)
1051

    
1052
  format_fields = []
1053
  for field in fields:
1054
    if headers and field not in headers:
1055
      # TODO: handle better unknown fields (either revert to old
1056
      # style of raising exception, or deal more intelligently with
1057
      # variable fields)
1058
      headers[field] = field
1059
    if separator is not None:
1060
      format_fields.append("%s")
1061
    elif numfields.Matches(field):
1062
      format_fields.append("%*s")
1063
    else:
1064
      format_fields.append("%-*s")
1065

    
1066
  if separator is None:
1067
    mlens = [0 for name in fields]
1068
    format = ' '.join(format_fields)
1069
  else:
1070
    format = separator.replace("%", "%%").join(format_fields)
1071

    
1072
  for row in data:
1073
    if row is None:
1074
      continue
1075
    for idx, val in enumerate(row):
1076
      if unitfields.Matches(fields[idx]):
1077
        try:
1078
          val = int(val)
1079
        except ValueError:
1080
          pass
1081
        else:
1082
          val = row[idx] = utils.FormatUnit(val, units)
1083
      val = row[idx] = str(val)
1084
      if separator is None:
1085
        mlens[idx] = max(mlens[idx], len(val))
1086

    
1087
  result = []
1088
  if headers:
1089
    args = []
1090
    for idx, name in enumerate(fields):
1091
      hdr = headers[name]
1092
      if separator is None:
1093
        mlens[idx] = max(mlens[idx], len(hdr))
1094
        args.append(mlens[idx])
1095
      args.append(hdr)
1096
    result.append(format % tuple(args))
1097

    
1098
  for line in data:
1099
    args = []
1100
    if line is None:
1101
      line = ['-' for _ in fields]
1102
    for idx in xrange(len(fields)):
1103
      if separator is None:
1104
        args.append(mlens[idx])
1105
      args.append(line[idx])
1106
    result.append(format % tuple(args))
1107

    
1108
  return result
1109

    
1110

    
1111
def FormatTimestamp(ts):
1112
  """Formats a given timestamp.
1113

1114
  @type ts: timestamp
1115
  @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
1116

1117
  @rtype: string
1118
  @return: a string with the formatted timestamp
1119

1120
  """
1121
  if not isinstance (ts, (tuple, list)) or len(ts) != 2:
1122
    return '?'
1123
  sec, usec = ts
1124
  return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
1125

    
1126

    
1127
def ParseTimespec(value):
1128
  """Parse a time specification.
1129

1130
  The following suffixed will be recognized:
1131

1132
    - s: seconds
1133
    - m: minutes
1134
    - h: hours
1135
    - d: day
1136
    - w: weeks
1137

1138
  Without any suffix, the value will be taken to be in seconds.
1139

1140
  """
1141
  value = str(value)
1142
  if not value:
1143
    raise errors.OpPrereqError("Empty time specification passed")
1144
  suffix_map = {
1145
    's': 1,
1146
    'm': 60,
1147
    'h': 3600,
1148
    'd': 86400,
1149
    'w': 604800,
1150
    }
1151
  if value[-1] not in suffix_map:
1152
    try:
1153
      value = int(value)
1154
    except ValueError:
1155
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1156
  else:
1157
    multiplier = suffix_map[value[-1]]
1158
    value = value[:-1]
1159
    if not value: # no data left after stripping the suffix
1160
      raise errors.OpPrereqError("Invalid time specification (only"
1161
                                 " suffix passed)")
1162
    try:
1163
      value = int(value) * multiplier
1164
    except ValueError:
1165
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1166
  return value
1167

    
1168

    
1169
def GetOnlineNodes(nodes, cl=None, nowarn=False):
1170
  """Returns the names of online nodes.
1171

1172
  This function will also log a warning on stderr with the names of
1173
  the online nodes.
1174

1175
  @param nodes: if not empty, use only this subset of nodes (minus the
1176
      offline ones)
1177
  @param cl: if not None, luxi client to use
1178
  @type nowarn: boolean
1179
  @param nowarn: by default, this function will output a note with the
1180
      offline nodes that are skipped; if this parameter is True the
1181
      note is not displayed
1182

1183
  """
1184
  if cl is None:
1185
    cl = GetClient()
1186

    
1187
  result = cl.QueryNodes(names=nodes, fields=["name", "offline"],
1188
                         use_locking=False)
1189
  offline = [row[0] for row in result if row[1]]
1190
  if offline and not nowarn:
1191
    ToStderr("Note: skipping offline node(s): %s" % ", ".join(offline))
1192
  return [row[0] for row in result if not row[1]]
1193

    
1194

    
1195
def _ToStream(stream, txt, *args):
1196
  """Write a message to a stream, bypassing the logging system
1197

1198
  @type stream: file object
1199
  @param stream: the file to which we should write
1200
  @type txt: str
1201
  @param txt: the message
1202

1203
  """
1204
  if args:
1205
    args = tuple(args)
1206
    stream.write(txt % args)
1207
  else:
1208
    stream.write(txt)
1209
  stream.write('\n')
1210
  stream.flush()
1211

    
1212

    
1213
def ToStdout(txt, *args):
1214
  """Write a message to stdout only, bypassing the logging system
1215

1216
  This is just a wrapper over _ToStream.
1217

1218
  @type txt: str
1219
  @param txt: the message
1220

1221
  """
1222
  _ToStream(sys.stdout, txt, *args)
1223

    
1224

    
1225
def ToStderr(txt, *args):
1226
  """Write a message to stderr only, bypassing the logging system
1227

1228
  This is just a wrapper over _ToStream.
1229

1230
  @type txt: str
1231
  @param txt: the message
1232

1233
  """
1234
  _ToStream(sys.stderr, txt, *args)
1235

    
1236

    
1237
class JobExecutor(object):
1238
  """Class which manages the submission and execution of multiple jobs.
1239

1240
  Note that instances of this class should not be reused between
1241
  GetResults() calls.
1242

1243
  """
1244
  def __init__(self, cl=None, verbose=True):
1245
    self.queue = []
1246
    if cl is None:
1247
      cl = GetClient()
1248
    self.cl = cl
1249
    self.verbose = verbose
1250
    self.jobs = []
1251

    
1252
  def QueueJob(self, name, *ops):
1253
    """Record a job for later submit.
1254

1255
    @type name: string
1256
    @param name: a description of the job, will be used in WaitJobSet
1257
    """
1258
    self.queue.append((name, ops))
1259

    
1260
  def SubmitPending(self):
1261
    """Submit all pending jobs.
1262

1263
    """
1264
    results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
1265
    for ((status, data), (name, _)) in zip(results, self.queue):
1266
      self.jobs.append((status, data, name))
1267

    
1268
  def GetResults(self):
1269
    """Wait for and return the results of all jobs.
1270

1271
    @rtype: list
1272
    @return: list of tuples (success, job results), in the same order
1273
        as the submitted jobs; if a job has failed, instead of the result
1274
        there will be the error message
1275

1276
    """
1277
    if not self.jobs:
1278
      self.SubmitPending()
1279
    results = []
1280
    if self.verbose:
1281
      ok_jobs = [row[1] for row in self.jobs if row[0]]
1282
      if ok_jobs:
1283
        ToStdout("Submitted jobs %s", ", ".join(ok_jobs))
1284
    for submit_status, jid, name in self.jobs:
1285
      if not submit_status:
1286
        ToStderr("Failed to submit job for %s: %s", name, jid)
1287
        results.append((False, jid))
1288
        continue
1289
      if self.verbose:
1290
        ToStdout("Waiting for job %s for %s...", jid, name)
1291
      try:
1292
        job_result = PollJob(jid, cl=self.cl)
1293
        success = True
1294
      except (errors.GenericError, luxi.ProtocolError), err:
1295
        _, job_result = FormatError(err)
1296
        success = False
1297
        # the error message will always be shown, verbose or not
1298
        ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
1299

    
1300
      results.append((success, job_result))
1301
    return results
1302

    
1303
  def WaitOrShow(self, wait):
1304
    """Wait for job results or only print the job IDs.
1305

1306
    @type wait: boolean
1307
    @param wait: whether to wait or not
1308

1309
    """
1310
    if wait:
1311
      return self.GetResults()
1312
    else:
1313
      if not self.jobs:
1314
        self.SubmitPending()
1315
      for status, result, name in self.jobs:
1316
        if status:
1317
          ToStdout("%s: %s", result, name)
1318
        else:
1319
          ToStderr("Failure for %s: %s", name, result)