Statistics
| Branch: | Tag: | Revision:

root / lib / cli.py @ a72d0a79

History | View | Annotate | Download (44.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module dealing with command line parsing"""
23

    
24

    
25
import sys
26
import textwrap
27
import os.path
28
import copy
29
import time
30
import logging
31
from cStringIO import StringIO
32

    
33
from ganeti import utils
34
from ganeti import errors
35
from ganeti import constants
36
from ganeti import opcodes
37
from ganeti import luxi
38
from ganeti import ssconf
39
from ganeti import rpc
40

    
41
from optparse import (OptionParser, TitledHelpFormatter,
42
                      Option, OptionValueError)
43

    
44

    
45
__all__ = [
46
  # Command line options
47
  "ALL_OPT",
48
  "BACKEND_OPT",
49
  "CLEANUP_OPT",
50
  "CONFIRM_OPT",
51
  "DEBUG_OPT",
52
  "DEBUG_SIMERR_OPT",
53
  "DISKIDX_OPT",
54
  "DISK_OPT",
55
  "DISK_TEMPLATE_OPT",
56
  "FIELDS_OPT",
57
  "FILESTORE_DIR_OPT",
58
  "FILESTORE_DRIVER_OPT",
59
  "HVLIST_OPT",
60
  "HVOPTS_OPT",
61
  "HYPERVISOR_OPT",
62
  "IALLOCATOR_OPT",
63
  "IGNORE_CONSIST_OPT",
64
  "IGNORE_FAILURES_OPT",
65
  "FORCE_OPT",
66
  "NET_OPT",
67
  "NEW_SECONDARY_OPT",
68
  "NODE_LIST_OPT",
69
  "NODE_PLACEMENT_OPT",
70
  "NOHDR_OPT",
71
  "NOIPCHECK_OPT",
72
  "NONICS_OPT",
73
  "NONLIVE_OPT",
74
  "NOSTART_OPT",
75
  "NWSYNC_OPT",
76
  "ON_PRIMARY_OPT",
77
  "OS_OPT",
78
  "OS_SIZE_OPT",
79
  "SELECT_OS_OPT",
80
  "SEP_OPT",
81
  "SHOWCMD_OPT",
82
  "SINGLE_NODE_OPT",
83
  "SUBMIT_OPT",
84
  "STATIC_OPT",
85
  "SYNC_OPT",
86
  "TAG_SRC_OPT",
87
  "USEUNITS_OPT",
88
  "VERBOSE_OPT",
89
  # Generic functions for CLI programs
90
  "GenericMain",
91
  "GetClient",
92
  "GetOnlineNodes",
93
  "JobExecutor",
94
  "JobSubmittedException",
95
  "ParseTimespec",
96
  "SubmitOpCode",
97
  "SubmitOrSend",
98
  "UsesRPC",
99
  # Formatting functions
100
  "ToStderr", "ToStdout",
101
  "FormatError",
102
  "GenerateTable",
103
  "AskUser",
104
  "FormatTimestamp",
105
  # Tags functions
106
  "ListTags",
107
  "AddTags",
108
  "RemoveTags",
109
  # command line options support infrastructure
110
  "ARGS_MANY_INSTANCES",
111
  "ARGS_MANY_NODES",
112
  "ARGS_NONE",
113
  "ARGS_ONE_INSTANCE",
114
  "ARGS_ONE_NODE",
115
  "ArgChoice",
116
  "ArgCommand",
117
  "ArgFile",
118
  "ArgHost",
119
  "ArgInstance",
120
  "ArgJobId",
121
  "ArgNode",
122
  "ArgSuggest",
123
  "ArgUnknown",
124
  "OPT_COMPL_INST_ADD_NODES",
125
  "OPT_COMPL_MANY_NODES",
126
  "OPT_COMPL_ONE_IALLOCATOR",
127
  "OPT_COMPL_ONE_INSTANCE",
128
  "OPT_COMPL_ONE_NODE",
129
  "OPT_COMPL_ONE_OS",
130
  "cli_option",
131
  "SplitNodeOption",
132
  ]
133

    
134
NO_PREFIX = "no_"
135
UN_PREFIX = "-"
136

    
137

    
138
class _Argument:
139
  def __init__(self, min=0, max=None):
140
    self.min = min
141
    self.max = max
142

    
143
  def __repr__(self):
144
    return ("<%s min=%s max=%s>" %
145
            (self.__class__.__name__, self.min, self.max))
146

    
147

    
148
class ArgSuggest(_Argument):
149
  """Suggesting argument.
150

151
  Value can be any of the ones passed to the constructor.
152

153
  """
154
  def __init__(self, min=0, max=None, choices=None):
155
    _Argument.__init__(self, min=min, max=max)
156
    self.choices = choices
157

    
158
  def __repr__(self):
159
    return ("<%s min=%s max=%s choices=%r>" %
160
            (self.__class__.__name__, self.min, self.max, self.choices))
161

    
162

    
163
class ArgChoice(ArgSuggest):
164
  """Choice argument.
165

166
  Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
167
  but value must be one of the choices.
168

169
  """
170

    
171

    
172
class ArgUnknown(_Argument):
173
  """Unknown argument to program (e.g. determined at runtime).
174

175
  """
176

    
177

    
178
class ArgInstance(_Argument):
179
  """Instances argument.
180

181
  """
182

    
183

    
184
class ArgNode(_Argument):
185
  """Node argument.
186

187
  """
188

    
189
class ArgJobId(_Argument):
190
  """Job ID argument.
191

192
  """
193

    
194

    
195
class ArgFile(_Argument):
196
  """File path argument.
197

198
  """
199

    
200

    
201
class ArgCommand(_Argument):
202
  """Command argument.
203

204
  """
205

    
206

    
207
class ArgHost(_Argument):
208
  """Host argument.
209

210
  """
211

    
212

    
213
ARGS_NONE = []
214
ARGS_MANY_INSTANCES = [ArgInstance()]
215
ARGS_MANY_NODES = [ArgNode()]
216
ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
217
ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
218

    
219

    
220

    
221
def _ExtractTagsObject(opts, args):
222
  """Extract the tag type object.
223

224
  Note that this function will modify its args parameter.
225

226
  """
227
  if not hasattr(opts, "tag_type"):
228
    raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
229
  kind = opts.tag_type
230
  if kind == constants.TAG_CLUSTER:
231
    retval = kind, kind
232
  elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
233
    if not args:
234
      raise errors.OpPrereqError("no arguments passed to the command")
235
    name = args.pop(0)
236
    retval = kind, name
237
  else:
238
    raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
239
  return retval
240

    
241

    
242
def _ExtendTags(opts, args):
243
  """Extend the args if a source file has been given.
244

245
  This function will extend the tags with the contents of the file
246
  passed in the 'tags_source' attribute of the opts parameter. A file
247
  named '-' will be replaced by stdin.
248

249
  """
250
  fname = opts.tags_source
251
  if fname is None:
252
    return
253
  if fname == "-":
254
    new_fh = sys.stdin
255
  else:
256
    new_fh = open(fname, "r")
257
  new_data = []
258
  try:
259
    # we don't use the nice 'new_data = [line.strip() for line in fh]'
260
    # because of python bug 1633941
261
    while True:
262
      line = new_fh.readline()
263
      if not line:
264
        break
265
      new_data.append(line.strip())
266
  finally:
267
    new_fh.close()
268
  args.extend(new_data)
269

    
270

    
271
def ListTags(opts, args):
272
  """List the tags on a given object.
273

274
  This is a generic implementation that knows how to deal with all
275
  three cases of tag objects (cluster, node, instance). The opts
276
  argument is expected to contain a tag_type field denoting what
277
  object type we work on.
278

279
  """
280
  kind, name = _ExtractTagsObject(opts, args)
281
  op = opcodes.OpGetTags(kind=kind, name=name)
282
  result = SubmitOpCode(op)
283
  result = list(result)
284
  result.sort()
285
  for tag in result:
286
    ToStdout(tag)
287

    
288

    
289
def AddTags(opts, args):
290
  """Add tags on a given object.
291

292
  This is a generic implementation that knows how to deal with all
293
  three cases of tag objects (cluster, node, instance). The opts
294
  argument is expected to contain a tag_type field denoting what
295
  object type we work on.
296

297
  """
298
  kind, name = _ExtractTagsObject(opts, args)
299
  _ExtendTags(opts, args)
300
  if not args:
301
    raise errors.OpPrereqError("No tags to be added")
302
  op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
303
  SubmitOpCode(op)
304

    
305

    
306
def RemoveTags(opts, args):
307
  """Remove tags from a given object.
308

309
  This is a generic implementation that knows how to deal with all
310
  three cases of tag objects (cluster, node, instance). The opts
311
  argument is expected to contain a tag_type field denoting what
312
  object type we work on.
313

314
  """
315
  kind, name = _ExtractTagsObject(opts, args)
316
  _ExtendTags(opts, args)
317
  if not args:
318
    raise errors.OpPrereqError("No tags to be removed")
319
  op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
320
  SubmitOpCode(op)
321

    
322

    
323
def check_unit(option, opt, value):
324
  """OptParsers custom converter for units.
325

326
  """
327
  try:
328
    return utils.ParseUnit(value)
329
  except errors.UnitParseError, err:
330
    raise OptionValueError("option %s: %s" % (opt, err))
331

    
332

    
333
def _SplitKeyVal(opt, data):
334
  """Convert a KeyVal string into a dict.
335

336
  This function will convert a key=val[,...] string into a dict. Empty
337
  values will be converted specially: keys which have the prefix 'no_'
338
  will have the value=False and the prefix stripped, the others will
339
  have value=True.
340

341
  @type opt: string
342
  @param opt: a string holding the option name for which we process the
343
      data, used in building error messages
344
  @type data: string
345
  @param data: a string of the format key=val,key=val,...
346
  @rtype: dict
347
  @return: {key=val, key=val}
348
  @raises errors.ParameterError: if there are duplicate keys
349

350
  """
351
  kv_dict = {}
352
  if data:
353
    for elem in data.split(","):
354
      if "=" in elem:
355
        key, val = elem.split("=", 1)
356
      else:
357
        if elem.startswith(NO_PREFIX):
358
          key, val = elem[len(NO_PREFIX):], False
359
        elif elem.startswith(UN_PREFIX):
360
          key, val = elem[len(UN_PREFIX):], None
361
        else:
362
          key, val = elem, True
363
      if key in kv_dict:
364
        raise errors.ParameterError("Duplicate key '%s' in option %s" %
365
                                    (key, opt))
366
      kv_dict[key] = val
367
  return kv_dict
368

    
369

    
370
def check_ident_key_val(option, opt, value):
371
  """Custom parser for ident:key=val,key=val options.
372

373
  This will store the parsed values as a tuple (ident, {key: val}). As such,
374
  multiple uses of this option via action=append is possible.
375

376
  """
377
  if ":" not in value:
378
    ident, rest = value, ''
379
  else:
380
    ident, rest = value.split(":", 1)
381

    
382
  if ident.startswith(NO_PREFIX):
383
    if rest:
384
      msg = "Cannot pass options when removing parameter groups: %s" % value
385
      raise errors.ParameterError(msg)
386
    retval = (ident[len(NO_PREFIX):], False)
387
  elif ident.startswith(UN_PREFIX):
388
    if rest:
389
      msg = "Cannot pass options when removing parameter groups: %s" % value
390
      raise errors.ParameterError(msg)
391
    retval = (ident[len(UN_PREFIX):], None)
392
  else:
393
    kv_dict = _SplitKeyVal(opt, rest)
394
    retval = (ident, kv_dict)
395
  return retval
396

    
397

    
398
def check_key_val(option, opt, value):
399
  """Custom parser class for key=val,key=val options.
400

401
  This will store the parsed values as a dict {key: val}.
402

403
  """
404
  return _SplitKeyVal(opt, value)
405

    
406

    
407
# completion_suggestion is normally a list. Using numeric values not evaluating
408
# to False for dynamic completion.
409
(OPT_COMPL_MANY_NODES,
410
 OPT_COMPL_ONE_NODE,
411
 OPT_COMPL_ONE_INSTANCE,
412
 OPT_COMPL_ONE_OS,
413
 OPT_COMPL_ONE_IALLOCATOR,
414
 OPT_COMPL_INST_ADD_NODES) = range(100, 106)
415

    
416
OPT_COMPL_ALL = frozenset([
417
  OPT_COMPL_MANY_NODES,
418
  OPT_COMPL_ONE_NODE,
419
  OPT_COMPL_ONE_INSTANCE,
420
  OPT_COMPL_ONE_OS,
421
  OPT_COMPL_ONE_IALLOCATOR,
422
  OPT_COMPL_INST_ADD_NODES,
423
  ])
424

    
425

    
426
class CliOption(Option):
427
  """Custom option class for optparse.
428

429
  """
430
  ATTRS = Option.ATTRS + [
431
    "completion_suggest",
432
    ]
433
  TYPES = Option.TYPES + (
434
    "identkeyval",
435
    "keyval",
436
    "unit",
437
    )
438
  TYPE_CHECKER = Option.TYPE_CHECKER.copy()
439
  TYPE_CHECKER["identkeyval"] = check_ident_key_val
440
  TYPE_CHECKER["keyval"] = check_key_val
441
  TYPE_CHECKER["unit"] = check_unit
442

    
443

    
444
# optparse.py sets make_option, so we do it for our own option class, too
445
cli_option = CliOption
446

    
447

    
448
DEBUG_OPT = cli_option("-d", "--debug", default=False,
449
                       action="store_true",
450
                       help="Turn debugging on")
451

    
452
NOHDR_OPT = cli_option("--no-headers", default=False,
453
                       action="store_true", dest="no_headers",
454
                       help="Don't display column headers")
455

    
456
SEP_OPT = cli_option("--separator", default=None,
457
                     action="store", dest="separator",
458
                     help=("Separator between output fields"
459
                           " (defaults to one space)"))
460

    
461
USEUNITS_OPT = cli_option("--units", default=None,
462
                          dest="units", choices=('h', 'm', 'g', 't'),
463
                          help="Specify units for output (one of hmgt)")
464

    
465
FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
466
                        type="string", metavar="FIELDS",
467
                        help="Comma separated list of output fields")
468

    
469
FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
470
                       default=False, help="Force the operation")
471

    
472
CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
473
                         default=False, help="Do not require confirmation")
474

    
475
TAG_SRC_OPT = cli_option("--from", dest="tags_source",
476
                         default=None, help="File with tag names")
477

    
478
SUBMIT_OPT = cli_option("--submit", dest="submit_only",
479
                        default=False, action="store_true",
480
                        help=("Submit the job and return the job ID, but"
481
                              " don't wait for the job to finish"))
482

    
483
SYNC_OPT = cli_option("--sync", dest="do_locking",
484
                      default=False, action="store_true",
485
                      help=("Grab locks while doing the queries"
486
                            " in order to ensure more consistent results"))
487

    
488
_DRY_RUN_OPT = cli_option("--dry-run", default=False,
489
                          action="store_true",
490
                          help=("Do not execute the operation, just run the"
491
                                " check steps and verify it it could be"
492
                                " executed"))
493

    
494
VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
495
                         action="store_true",
496
                         help="Increase the verbosity of the operation")
497

    
498
DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
499
                              action="store_true", dest="simulate_errors",
500
                              help="Debugging option that makes the operation"
501
                              " treat most runtime checks as failed")
502

    
503
NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
504
                        default=True, action="store_false",
505
                        help="Don't wait for sync (DANGEROUS!)")
506

    
507
DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
508
                               help="Custom disk setup (diskless, file,"
509
                               " plain or drbd)",
510
                               default=None, metavar="TEMPL",
511
                               choices=list(constants.DISK_TEMPLATES))
512

    
513
NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
514
                        help="Do not create any network cards for"
515
                        " the instance")
516

    
517
FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
518
                               help="Relative path under default cluster-wide"
519
                               " file storage dir to store file-based disks",
520
                               default=None, metavar="<DIR>")
521

    
522
FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
523
                                  help="Driver to use for image files",
524
                                  default="loop", metavar="<DRIVER>",
525
                                  choices=list(constants.FILE_DRIVER))
526

    
527
IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
528
                            help="Select nodes for the instance automatically"
529
                            " using the <NAME> iallocator plugin",
530
                            default=None, type="string",
531
                            completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
532

    
533
OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
534
                    metavar="<os>",
535
                    completion_suggest=OPT_COMPL_ONE_OS)
536

    
537
BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
538
                         type="keyval", default={},
539
                         help="Backend parameters")
540

    
541
HVOPTS_OPT =  cli_option("-H", "--hypervisor-parameters", type="keyval",
542
                         default={}, dest="hvparams",
543
                         help="Hypervisor parameters")
544

    
545
HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
546
                            help="Hypervisor and hypervisor options, in the"
547
                            " format hypervisor:option=value,option=value,...",
548
                            default=None, type="identkeyval")
549

    
550
HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
551
                        help="Hypervisor and hypervisor options, in the"
552
                        " format hypervisor:option=value,option=value,...",
553
                        default=[], action="append", type="identkeyval")
554

    
555
NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
556
                           action="store_false",
557
                           help="Don't check that the instance's IP"
558
                           " is alive")
559

    
560
NET_OPT = cli_option("--net",
561
                     help="NIC parameters", default=[],
562
                     dest="nics", action="append", type="identkeyval")
563

    
564
DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
565
                      dest="disks", action="append", type="identkeyval")
566

    
567
DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
568
                         help="Comma-separated list of disks"
569
                         " indices to act on (e.g. 0,2) (optional,"
570
                         " defaults to all disks)")
571

    
572
OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
573
                         help="Enforces a single-disk configuration using the"
574
                         " given disk size, in MiB unless a suffix is used",
575
                         default=None, type="unit", metavar="<size>")
576

    
577
IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
578
                                dest="ignore_consistency",
579
                                action="store_true", default=False,
580
                                help="Ignore the consistency of the disks on"
581
                                " the secondary")
582

    
583
NONLIVE_OPT = cli_option("--non-live", dest="live",
584
                         default=True, action="store_false",
585
                         help="Do a non-live migration (this usually means"
586
                         " freeze the instance, save the state, transfer and"
587
                         " only then resume running on the secondary node)")
588

    
589
NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
590
                                help="Target node and optional secondary node",
591
                                metavar="<pnode>[:<snode>]",
592
                                completion_suggest=OPT_COMPL_INST_ADD_NODES)
593

    
594
NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
595
                           action="append", metavar="<node>",
596
                           help="Use only this node (can be used multiple"
597
                           " times, if not given defaults to all nodes)",
598
                           completion_suggest=OPT_COMPL_ONE_NODE)
599

    
600
SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
601
                             metavar="<node>",
602
                             completion_suggest=OPT_COMPL_ONE_NODE)
603

    
604
NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
605
                         action="store_false",
606
                         help="Don't start the instance after creation")
607

    
608
SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
609
                         action="store_true", default=False,
610
                         help="Show command instead of executing it")
611

    
612
CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
613
                         default=False, action="store_true",
614
                         help="Instead of performing the migration, try to"
615
                         " recover from a failed cleanup. This is safe"
616
                         " to run even if the instance is healthy, but it"
617
                         " will create extra replication traffic and "
618
                         " disrupt briefly the replication (like during the"
619
                         " migration")
620

    
621
STATIC_OPT = cli_option("-s", "--static", dest="static",
622
                        action="store_true", default=False,
623
                        help="Only show configuration data, not runtime data")
624

    
625
ALL_OPT = cli_option("--all", dest="show_all",
626
                     default=False, action="store_true",
627
                     help="Show info on all instances on the cluster."
628
                     " This can take a long time to run, use wisely")
629

    
630
SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
631
                           action="store_true", default=False,
632
                           help="Interactive OS reinstall, lists available"
633
                           " OS templates for selection")
634

    
635
IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
636
                                 action="store_true", default=False,
637
                                 help="Remove the instance from the cluster"
638
                                 " configuration even if there are failures"
639
                                 " during the removal process")
640

    
641
NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
642
                               help="Specifies the new secondary node",
643
                               metavar="NODE", default=None,
644
                               completion_suggest=OPT_COMPL_ONE_NODE)
645

    
646
ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
647
                            default=False, action="store_true",
648
                            help="Replace the disk(s) on the primary"
649
                            " node (only for the drbd template)")
650

    
651

    
652
def _ParseArgs(argv, commands, aliases):
653
  """Parser for the command line arguments.
654

655
  This function parses the arguments and returns the function which
656
  must be executed together with its (modified) arguments.
657

658
  @param argv: the command line
659
  @param commands: dictionary with special contents, see the design
660
      doc for cmdline handling
661
  @param aliases: dictionary with command aliases {'alias': 'target, ...}
662

663
  """
664
  if len(argv) == 0:
665
    binary = "<command>"
666
  else:
667
    binary = argv[0].split("/")[-1]
668

    
669
  if len(argv) > 1 and argv[1] == "--version":
670
    ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
671
    # Quit right away. That way we don't have to care about this special
672
    # argument. optparse.py does it the same.
673
    sys.exit(0)
674

    
675
  if len(argv) < 2 or not (argv[1] in commands or
676
                           argv[1] in aliases):
677
    # let's do a nice thing
678
    sortedcmds = commands.keys()
679
    sortedcmds.sort()
680

    
681
    ToStdout("Usage: %s {command} [options...] [argument...]", binary)
682
    ToStdout("%s <command> --help to see details, or man %s", binary, binary)
683
    ToStdout("")
684

    
685
    # compute the max line length for cmd + usage
686
    mlen = max([len(" %s" % cmd) for cmd in commands])
687
    mlen = min(60, mlen) # should not get here...
688

    
689
    # and format a nice command list
690
    ToStdout("Commands:")
691
    for cmd in sortedcmds:
692
      cmdstr = " %s" % (cmd,)
693
      help_text = commands[cmd][4]
694
      help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
695
      ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
696
      for line in help_lines:
697
        ToStdout("%-*s   %s", mlen, "", line)
698

    
699
    ToStdout("")
700

    
701
    return None, None, None
702

    
703
  # get command, unalias it, and look it up in commands
704
  cmd = argv.pop(1)
705
  if cmd in aliases:
706
    if cmd in commands:
707
      raise errors.ProgrammerError("Alias '%s' overrides an existing"
708
                                   " command" % cmd)
709

    
710
    if aliases[cmd] not in commands:
711
      raise errors.ProgrammerError("Alias '%s' maps to non-existing"
712
                                   " command '%s'" % (cmd, aliases[cmd]))
713

    
714
    cmd = aliases[cmd]
715

    
716
  func, args_def, parser_opts, usage, description = commands[cmd]
717
  parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT],
718
                        description=description,
719
                        formatter=TitledHelpFormatter(),
720
                        usage="%%prog %s %s" % (cmd, usage))
721
  parser.disable_interspersed_args()
722
  options, args = parser.parse_args()
723

    
724
  if not _CheckArguments(cmd, args_def, args):
725
    return None, None, None
726

    
727
  return func, options, args
728

    
729

    
730
def _CheckArguments(cmd, args_def, args):
731
  """Verifies the arguments using the argument definition.
732

733
  Algorithm:
734

735
    1. Abort with error if values specified by user but none expected.
736

737
    1. For each argument in definition
738

739
      1. Keep running count of minimum number of values (min_count)
740
      1. Keep running count of maximum number of values (max_count)
741
      1. If it has an unlimited number of values
742

743
        1. Abort with error if it's not the last argument in the definition
744

745
    1. If last argument has limited number of values
746

747
      1. Abort with error if number of values doesn't match or is too large
748

749
    1. Abort with error if user didn't pass enough values (min_count)
750

751
  """
752
  if args and not args_def:
753
    ToStderr("Error: Command %s expects no arguments", cmd)
754
    return False
755

    
756
  min_count = None
757
  max_count = None
758
  check_max = None
759

    
760
  last_idx = len(args_def) - 1
761

    
762
  for idx, arg in enumerate(args_def):
763
    if min_count is None:
764
      min_count = arg.min
765
    elif arg.min is not None:
766
      min_count += arg.min
767

    
768
    if max_count is None:
769
      max_count = arg.max
770
    elif arg.max is not None:
771
      max_count += arg.max
772

    
773
    if idx == last_idx:
774
      check_max = (arg.max is not None)
775

    
776
    elif arg.max is None:
777
      raise errors.ProgrammerError("Only the last argument can have max=None")
778

    
779
  if check_max:
780
    # Command with exact number of arguments
781
    if (min_count is not None and max_count is not None and
782
        min_count == max_count and len(args) != min_count):
783
      ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
784
      return False
785

    
786
    # Command with limited number of arguments
787
    if max_count is not None and len(args) > max_count:
788
      ToStderr("Error: Command %s expects only %d argument(s)",
789
               cmd, max_count)
790
      return False
791

    
792
  # Command with some required arguments
793
  if min_count is not None and len(args) < min_count:
794
    ToStderr("Error: Command %s expects at least %d argument(s)",
795
             cmd, min_count)
796
    return False
797

    
798
  return True
799

    
800

    
801
def SplitNodeOption(value):
802
  """Splits the value of a --node option.
803

804
  """
805
  if value and ':' in value:
806
    return value.split(':', 1)
807
  else:
808
    return (value, None)
809

    
810

    
811
def UsesRPC(fn):
812
  def wrapper(*args, **kwargs):
813
    rpc.Init()
814
    try:
815
      return fn(*args, **kwargs)
816
    finally:
817
      rpc.Shutdown()
818
  return wrapper
819

    
820

    
821
def AskUser(text, choices=None):
822
  """Ask the user a question.
823

824
  @param text: the question to ask
825

826
  @param choices: list with elements tuples (input_char, return_value,
827
      description); if not given, it will default to: [('y', True,
828
      'Perform the operation'), ('n', False, 'Do no do the operation')];
829
      note that the '?' char is reserved for help
830

831
  @return: one of the return values from the choices list; if input is
832
      not possible (i.e. not running with a tty, we return the last
833
      entry from the list
834

835
  """
836
  if choices is None:
837
    choices = [('y', True, 'Perform the operation'),
838
               ('n', False, 'Do not perform the operation')]
839
  if not choices or not isinstance(choices, list):
840
    raise errors.ProgrammerError("Invalid choices argument to AskUser")
841
  for entry in choices:
842
    if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
843
      raise errors.ProgrammerError("Invalid choices element to AskUser")
844

    
845
  answer = choices[-1][1]
846
  new_text = []
847
  for line in text.splitlines():
848
    new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
849
  text = "\n".join(new_text)
850
  try:
851
    f = file("/dev/tty", "a+")
852
  except IOError:
853
    return answer
854
  try:
855
    chars = [entry[0] for entry in choices]
856
    chars[-1] = "[%s]" % chars[-1]
857
    chars.append('?')
858
    maps = dict([(entry[0], entry[1]) for entry in choices])
859
    while True:
860
      f.write(text)
861
      f.write('\n')
862
      f.write("/".join(chars))
863
      f.write(": ")
864
      line = f.readline(2).strip().lower()
865
      if line in maps:
866
        answer = maps[line]
867
        break
868
      elif line == '?':
869
        for entry in choices:
870
          f.write(" %s - %s\n" % (entry[0], entry[2]))
871
        f.write("\n")
872
        continue
873
  finally:
874
    f.close()
875
  return answer
876

    
877

    
878
class JobSubmittedException(Exception):
879
  """Job was submitted, client should exit.
880

881
  This exception has one argument, the ID of the job that was
882
  submitted. The handler should print this ID.
883

884
  This is not an error, just a structured way to exit from clients.
885

886
  """
887

    
888

    
889
def SendJob(ops, cl=None):
890
  """Function to submit an opcode without waiting for the results.
891

892
  @type ops: list
893
  @param ops: list of opcodes
894
  @type cl: luxi.Client
895
  @param cl: the luxi client to use for communicating with the master;
896
             if None, a new client will be created
897

898
  """
899
  if cl is None:
900
    cl = GetClient()
901

    
902
  job_id = cl.SubmitJob(ops)
903

    
904
  return job_id
905

    
906

    
907
def PollJob(job_id, cl=None, feedback_fn=None):
908
  """Function to poll for the result of a job.
909

910
  @type job_id: job identified
911
  @param job_id: the job to poll for results
912
  @type cl: luxi.Client
913
  @param cl: the luxi client to use for communicating with the master;
914
             if None, a new client will be created
915

916
  """
917
  if cl is None:
918
    cl = GetClient()
919

    
920
  prev_job_info = None
921
  prev_logmsg_serial = None
922

    
923
  while True:
924
    result = cl.WaitForJobChange(job_id, ["status"], prev_job_info,
925
                                 prev_logmsg_serial)
926
    if not result:
927
      # job not found, go away!
928
      raise errors.JobLost("Job with id %s lost" % job_id)
929

    
930
    # Split result, a tuple of (field values, log entries)
931
    (job_info, log_entries) = result
932
    (status, ) = job_info
933

    
934
    if log_entries:
935
      for log_entry in log_entries:
936
        (serial, timestamp, _, message) = log_entry
937
        if callable(feedback_fn):
938
          feedback_fn(log_entry[1:])
939
        else:
940
          encoded = utils.SafeEncode(message)
941
          ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded)
942
        prev_logmsg_serial = max(prev_logmsg_serial, serial)
943

    
944
    # TODO: Handle canceled and archived jobs
945
    elif status in (constants.JOB_STATUS_SUCCESS,
946
                    constants.JOB_STATUS_ERROR,
947
                    constants.JOB_STATUS_CANCELING,
948
                    constants.JOB_STATUS_CANCELED):
949
      break
950

    
951
    prev_job_info = job_info
952

    
953
  jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
954
  if not jobs:
955
    raise errors.JobLost("Job with id %s lost" % job_id)
956

    
957
  status, opstatus, result = jobs[0]
958
  if status == constants.JOB_STATUS_SUCCESS:
959
    return result
960
  elif status in (constants.JOB_STATUS_CANCELING,
961
                  constants.JOB_STATUS_CANCELED):
962
    raise errors.OpExecError("Job was canceled")
963
  else:
964
    has_ok = False
965
    for idx, (status, msg) in enumerate(zip(opstatus, result)):
966
      if status == constants.OP_STATUS_SUCCESS:
967
        has_ok = True
968
      elif status == constants.OP_STATUS_ERROR:
969
        errors.MaybeRaise(msg)
970
        if has_ok:
971
          raise errors.OpExecError("partial failure (opcode %d): %s" %
972
                                   (idx, msg))
973
        else:
974
          raise errors.OpExecError(str(msg))
975
    # default failure mode
976
    raise errors.OpExecError(result)
977

    
978

    
979
def SubmitOpCode(op, cl=None, feedback_fn=None):
980
  """Legacy function to submit an opcode.
981

982
  This is just a simple wrapper over the construction of the processor
983
  instance. It should be extended to better handle feedback and
984
  interaction functions.
985

986
  """
987
  if cl is None:
988
    cl = GetClient()
989

    
990
  job_id = SendJob([op], cl)
991

    
992
  op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
993

    
994
  return op_results[0]
995

    
996

    
997
def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
998
  """Wrapper around SubmitOpCode or SendJob.
999

1000
  This function will decide, based on the 'opts' parameter, whether to
1001
  submit and wait for the result of the opcode (and return it), or
1002
  whether to just send the job and print its identifier. It is used in
1003
  order to simplify the implementation of the '--submit' option.
1004

1005
  It will also add the dry-run parameter from the options passed, if true.
1006

1007
  """
1008
  if opts and opts.dry_run:
1009
    op.dry_run = opts.dry_run
1010
  if opts and opts.submit_only:
1011
    job_id = SendJob([op], cl=cl)
1012
    raise JobSubmittedException(job_id)
1013
  else:
1014
    return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn)
1015

    
1016

    
1017
def GetClient():
1018
  # TODO: Cache object?
1019
  try:
1020
    client = luxi.Client()
1021
  except luxi.NoMasterError:
1022
    master, myself = ssconf.GetMasterAndMyself()
1023
    if master != myself:
1024
      raise errors.OpPrereqError("This is not the master node, please connect"
1025
                                 " to node '%s' and rerun the command" %
1026
                                 master)
1027
    else:
1028
      raise
1029
  return client
1030

    
1031

    
1032
def FormatError(err):
1033
  """Return a formatted error message for a given error.
1034

1035
  This function takes an exception instance and returns a tuple
1036
  consisting of two values: first, the recommended exit code, and
1037
  second, a string describing the error message (not
1038
  newline-terminated).
1039

1040
  """
1041
  retcode = 1
1042
  obuf = StringIO()
1043
  msg = str(err)
1044
  if isinstance(err, errors.ConfigurationError):
1045
    txt = "Corrupt configuration file: %s" % msg
1046
    logging.error(txt)
1047
    obuf.write(txt + "\n")
1048
    obuf.write("Aborting.")
1049
    retcode = 2
1050
  elif isinstance(err, errors.HooksAbort):
1051
    obuf.write("Failure: hooks execution failed:\n")
1052
    for node, script, out in err.args[0]:
1053
      if out:
1054
        obuf.write("  node: %s, script: %s, output: %s\n" %
1055
                   (node, script, out))
1056
      else:
1057
        obuf.write("  node: %s, script: %s (no output)\n" %
1058
                   (node, script))
1059
  elif isinstance(err, errors.HooksFailure):
1060
    obuf.write("Failure: hooks general failure: %s" % msg)
1061
  elif isinstance(err, errors.ResolverError):
1062
    this_host = utils.HostInfo.SysName()
1063
    if err.args[0] == this_host:
1064
      msg = "Failure: can't resolve my own hostname ('%s')"
1065
    else:
1066
      msg = "Failure: can't resolve hostname '%s'"
1067
    obuf.write(msg % err.args[0])
1068
  elif isinstance(err, errors.OpPrereqError):
1069
    obuf.write("Failure: prerequisites not met for this"
1070
               " operation:\n%s" % msg)
1071
  elif isinstance(err, errors.OpExecError):
1072
    obuf.write("Failure: command execution error:\n%s" % msg)
1073
  elif isinstance(err, errors.TagError):
1074
    obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1075
  elif isinstance(err, errors.JobQueueDrainError):
1076
    obuf.write("Failure: the job queue is marked for drain and doesn't"
1077
               " accept new requests\n")
1078
  elif isinstance(err, errors.JobQueueFull):
1079
    obuf.write("Failure: the job queue is full and doesn't accept new"
1080
               " job submissions until old jobs are archived\n")
1081
  elif isinstance(err, errors.TypeEnforcementError):
1082
    obuf.write("Parameter Error: %s" % msg)
1083
  elif isinstance(err, errors.ParameterError):
1084
    obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1085
  elif isinstance(err, errors.GenericError):
1086
    obuf.write("Unhandled Ganeti error: %s" % msg)
1087
  elif isinstance(err, luxi.NoMasterError):
1088
    obuf.write("Cannot communicate with the master daemon.\nIs it running"
1089
               " and listening for connections?")
1090
  elif isinstance(err, luxi.TimeoutError):
1091
    obuf.write("Timeout while talking to the master daemon. Error:\n"
1092
               "%s" % msg)
1093
  elif isinstance(err, luxi.ProtocolError):
1094
    obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1095
               "%s" % msg)
1096
  elif isinstance(err, JobSubmittedException):
1097
    obuf.write("JobID: %s\n" % err.args[0])
1098
    retcode = 0
1099
  else:
1100
    obuf.write("Unhandled exception: %s" % msg)
1101
  return retcode, obuf.getvalue().rstrip('\n')
1102

    
1103

    
1104
def GenericMain(commands, override=None, aliases=None):
1105
  """Generic main function for all the gnt-* commands.
1106

1107
  Arguments:
1108
    - commands: a dictionary with a special structure, see the design doc
1109
                for command line handling.
1110
    - override: if not None, we expect a dictionary with keys that will
1111
                override command line options; this can be used to pass
1112
                options from the scripts to generic functions
1113
    - aliases: dictionary with command aliases {'alias': 'target, ...}
1114

1115
  """
1116
  # save the program name and the entire command line for later logging
1117
  if sys.argv:
1118
    binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1119
    if len(sys.argv) >= 2:
1120
      binary += " " + sys.argv[1]
1121
      old_cmdline = " ".join(sys.argv[2:])
1122
    else:
1123
      old_cmdline = ""
1124
  else:
1125
    binary = "<unknown program>"
1126
    old_cmdline = ""
1127

    
1128
  if aliases is None:
1129
    aliases = {}
1130

    
1131
  try:
1132
    func, options, args = _ParseArgs(sys.argv, commands, aliases)
1133
  except errors.ParameterError, err:
1134
    result, err_msg = FormatError(err)
1135
    ToStderr(err_msg)
1136
    return 1
1137

    
1138
  if func is None: # parse error
1139
    return 1
1140

    
1141
  if override is not None:
1142
    for key, val in override.iteritems():
1143
      setattr(options, key, val)
1144

    
1145
  utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1146
                     stderr_logging=True, program=binary)
1147

    
1148
  if old_cmdline:
1149
    logging.info("run with arguments '%s'", old_cmdline)
1150
  else:
1151
    logging.info("run with no arguments")
1152

    
1153
  try:
1154
    result = func(options, args)
1155
  except (errors.GenericError, luxi.ProtocolError,
1156
          JobSubmittedException), err:
1157
    result, err_msg = FormatError(err)
1158
    logging.exception("Error during command processing")
1159
    ToStderr(err_msg)
1160

    
1161
  return result
1162

    
1163

    
1164
def GenerateTable(headers, fields, separator, data,
1165
                  numfields=None, unitfields=None,
1166
                  units=None):
1167
  """Prints a table with headers and different fields.
1168

1169
  @type headers: dict
1170
  @param headers: dictionary mapping field names to headers for
1171
      the table
1172
  @type fields: list
1173
  @param fields: the field names corresponding to each row in
1174
      the data field
1175
  @param separator: the separator to be used; if this is None,
1176
      the default 'smart' algorithm is used which computes optimal
1177
      field width, otherwise just the separator is used between
1178
      each field
1179
  @type data: list
1180
  @param data: a list of lists, each sublist being one row to be output
1181
  @type numfields: list
1182
  @param numfields: a list with the fields that hold numeric
1183
      values and thus should be right-aligned
1184
  @type unitfields: list
1185
  @param unitfields: a list with the fields that hold numeric
1186
      values that should be formatted with the units field
1187
  @type units: string or None
1188
  @param units: the units we should use for formatting, or None for
1189
      automatic choice (human-readable for non-separator usage, otherwise
1190
      megabytes); this is a one-letter string
1191

1192
  """
1193
  if units is None:
1194
    if separator:
1195
      units = "m"
1196
    else:
1197
      units = "h"
1198

    
1199
  if numfields is None:
1200
    numfields = []
1201
  if unitfields is None:
1202
    unitfields = []
1203

    
1204
  numfields = utils.FieldSet(*numfields)
1205
  unitfields = utils.FieldSet(*unitfields)
1206

    
1207
  format_fields = []
1208
  for field in fields:
1209
    if headers and field not in headers:
1210
      # TODO: handle better unknown fields (either revert to old
1211
      # style of raising exception, or deal more intelligently with
1212
      # variable fields)
1213
      headers[field] = field
1214
    if separator is not None:
1215
      format_fields.append("%s")
1216
    elif numfields.Matches(field):
1217
      format_fields.append("%*s")
1218
    else:
1219
      format_fields.append("%-*s")
1220

    
1221
  if separator is None:
1222
    mlens = [0 for name in fields]
1223
    format = ' '.join(format_fields)
1224
  else:
1225
    format = separator.replace("%", "%%").join(format_fields)
1226

    
1227
  for row in data:
1228
    if row is None:
1229
      continue
1230
    for idx, val in enumerate(row):
1231
      if unitfields.Matches(fields[idx]):
1232
        try:
1233
          val = int(val)
1234
        except ValueError:
1235
          pass
1236
        else:
1237
          val = row[idx] = utils.FormatUnit(val, units)
1238
      val = row[idx] = str(val)
1239
      if separator is None:
1240
        mlens[idx] = max(mlens[idx], len(val))
1241

    
1242
  result = []
1243
  if headers:
1244
    args = []
1245
    for idx, name in enumerate(fields):
1246
      hdr = headers[name]
1247
      if separator is None:
1248
        mlens[idx] = max(mlens[idx], len(hdr))
1249
        args.append(mlens[idx])
1250
      args.append(hdr)
1251
    result.append(format % tuple(args))
1252

    
1253
  for line in data:
1254
    args = []
1255
    if line is None:
1256
      line = ['-' for _ in fields]
1257
    for idx in xrange(len(fields)):
1258
      if separator is None:
1259
        args.append(mlens[idx])
1260
      args.append(line[idx])
1261
    result.append(format % tuple(args))
1262

    
1263
  return result
1264

    
1265

    
1266
def FormatTimestamp(ts):
1267
  """Formats a given timestamp.
1268

1269
  @type ts: timestamp
1270
  @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
1271

1272
  @rtype: string
1273
  @return: a string with the formatted timestamp
1274

1275
  """
1276
  if not isinstance (ts, (tuple, list)) or len(ts) != 2:
1277
    return '?'
1278
  sec, usec = ts
1279
  return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
1280

    
1281

    
1282
def ParseTimespec(value):
1283
  """Parse a time specification.
1284

1285
  The following suffixed will be recognized:
1286

1287
    - s: seconds
1288
    - m: minutes
1289
    - h: hours
1290
    - d: day
1291
    - w: weeks
1292

1293
  Without any suffix, the value will be taken to be in seconds.
1294

1295
  """
1296
  value = str(value)
1297
  if not value:
1298
    raise errors.OpPrereqError("Empty time specification passed")
1299
  suffix_map = {
1300
    's': 1,
1301
    'm': 60,
1302
    'h': 3600,
1303
    'd': 86400,
1304
    'w': 604800,
1305
    }
1306
  if value[-1] not in suffix_map:
1307
    try:
1308
      value = int(value)
1309
    except ValueError:
1310
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1311
  else:
1312
    multiplier = suffix_map[value[-1]]
1313
    value = value[:-1]
1314
    if not value: # no data left after stripping the suffix
1315
      raise errors.OpPrereqError("Invalid time specification (only"
1316
                                 " suffix passed)")
1317
    try:
1318
      value = int(value) * multiplier
1319
    except ValueError:
1320
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1321
  return value
1322

    
1323

    
1324
def GetOnlineNodes(nodes, cl=None, nowarn=False):
1325
  """Returns the names of online nodes.
1326

1327
  This function will also log a warning on stderr with the names of
1328
  the online nodes.
1329

1330
  @param nodes: if not empty, use only this subset of nodes (minus the
1331
      offline ones)
1332
  @param cl: if not None, luxi client to use
1333
  @type nowarn: boolean
1334
  @param nowarn: by default, this function will output a note with the
1335
      offline nodes that are skipped; if this parameter is True the
1336
      note is not displayed
1337

1338
  """
1339
  if cl is None:
1340
    cl = GetClient()
1341

    
1342
  result = cl.QueryNodes(names=nodes, fields=["name", "offline"],
1343
                         use_locking=False)
1344
  offline = [row[0] for row in result if row[1]]
1345
  if offline and not nowarn:
1346
    ToStderr("Note: skipping offline node(s): %s" % ", ".join(offline))
1347
  return [row[0] for row in result if not row[1]]
1348

    
1349

    
1350
def _ToStream(stream, txt, *args):
1351
  """Write a message to a stream, bypassing the logging system
1352

1353
  @type stream: file object
1354
  @param stream: the file to which we should write
1355
  @type txt: str
1356
  @param txt: the message
1357

1358
  """
1359
  if args:
1360
    args = tuple(args)
1361
    stream.write(txt % args)
1362
  else:
1363
    stream.write(txt)
1364
  stream.write('\n')
1365
  stream.flush()
1366

    
1367

    
1368
def ToStdout(txt, *args):
1369
  """Write a message to stdout only, bypassing the logging system
1370

1371
  This is just a wrapper over _ToStream.
1372

1373
  @type txt: str
1374
  @param txt: the message
1375

1376
  """
1377
  _ToStream(sys.stdout, txt, *args)
1378

    
1379

    
1380
def ToStderr(txt, *args):
1381
  """Write a message to stderr only, bypassing the logging system
1382

1383
  This is just a wrapper over _ToStream.
1384

1385
  @type txt: str
1386
  @param txt: the message
1387

1388
  """
1389
  _ToStream(sys.stderr, txt, *args)
1390

    
1391

    
1392
class JobExecutor(object):
1393
  """Class which manages the submission and execution of multiple jobs.
1394

1395
  Note that instances of this class should not be reused between
1396
  GetResults() calls.
1397

1398
  """
1399
  def __init__(self, cl=None, verbose=True):
1400
    self.queue = []
1401
    if cl is None:
1402
      cl = GetClient()
1403
    self.cl = cl
1404
    self.verbose = verbose
1405
    self.jobs = []
1406

    
1407
  def QueueJob(self, name, *ops):
1408
    """Record a job for later submit.
1409

1410
    @type name: string
1411
    @param name: a description of the job, will be used in WaitJobSet
1412
    """
1413
    self.queue.append((name, ops))
1414

    
1415
  def SubmitPending(self):
1416
    """Submit all pending jobs.
1417

1418
    """
1419
    results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
1420
    for ((status, data), (name, _)) in zip(results, self.queue):
1421
      self.jobs.append((status, data, name))
1422

    
1423
  def GetResults(self):
1424
    """Wait for and return the results of all jobs.
1425

1426
    @rtype: list
1427
    @return: list of tuples (success, job results), in the same order
1428
        as the submitted jobs; if a job has failed, instead of the result
1429
        there will be the error message
1430

1431
    """
1432
    if not self.jobs:
1433
      self.SubmitPending()
1434
    results = []
1435
    if self.verbose:
1436
      ok_jobs = [row[1] for row in self.jobs if row[0]]
1437
      if ok_jobs:
1438
        ToStdout("Submitted jobs %s", ", ".join(ok_jobs))
1439
    for submit_status, jid, name in self.jobs:
1440
      if not submit_status:
1441
        ToStderr("Failed to submit job for %s: %s", name, jid)
1442
        results.append((False, jid))
1443
        continue
1444
      if self.verbose:
1445
        ToStdout("Waiting for job %s for %s...", jid, name)
1446
      try:
1447
        job_result = PollJob(jid, cl=self.cl)
1448
        success = True
1449
      except (errors.GenericError, luxi.ProtocolError), err:
1450
        _, job_result = FormatError(err)
1451
        success = False
1452
        # the error message will always be shown, verbose or not
1453
        ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
1454

    
1455
      results.append((success, job_result))
1456
    return results
1457

    
1458
  def WaitOrShow(self, wait):
1459
    """Wait for job results or only print the job IDs.
1460

1461
    @type wait: boolean
1462
    @param wait: whether to wait or not
1463

1464
    """
1465
    if wait:
1466
      return self.GetResults()
1467
    else:
1468
      if not self.jobs:
1469
        self.SubmitPending()
1470
      for status, result, name in self.jobs:
1471
        if status:
1472
          ToStdout("%s: %s", result, name)
1473
        else:
1474
          ToStderr("Failure for %s: %s", name, result)