Statistics
| Branch: | Tag: | Revision:

root / lib / cli.py @ 5029db65

History | View | Annotate | Download (66.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module dealing with command line parsing"""
23

    
24

    
25
import sys
26
import textwrap
27
import os.path
28
import time
29
import logging
30
from cStringIO import StringIO
31

    
32
from ganeti import utils
33
from ganeti import errors
34
from ganeti import constants
35
from ganeti import opcodes
36
from ganeti import luxi
37
from ganeti import ssconf
38
from ganeti import rpc
39
from ganeti import ssh
40

    
41
from optparse import (OptionParser, TitledHelpFormatter,
42
                      Option, OptionValueError)
43

    
44

    
45
__all__ = [
46
  # Command line options
47
  "ALLOCATABLE_OPT",
48
  "ALL_OPT",
49
  "AUTO_PROMOTE_OPT",
50
  "AUTO_REPLACE_OPT",
51
  "BACKEND_OPT",
52
  "CLEANUP_OPT",
53
  "CONFIRM_OPT",
54
  "CP_SIZE_OPT",
55
  "DEBUG_OPT",
56
  "DEBUG_SIMERR_OPT",
57
  "DISKIDX_OPT",
58
  "DISK_OPT",
59
  "DISK_TEMPLATE_OPT",
60
  "DRAINED_OPT",
61
  "EARLY_RELEASE_OPT",
62
  "ENABLED_HV_OPT",
63
  "ERROR_CODES_OPT",
64
  "FIELDS_OPT",
65
  "FILESTORE_DIR_OPT",
66
  "FILESTORE_DRIVER_OPT",
67
  "FORCE_OPT",
68
  "FORCE_VARIANT_OPT",
69
  "GLOBAL_FILEDIR_OPT",
70
  "HVLIST_OPT",
71
  "HVOPTS_OPT",
72
  "HYPERVISOR_OPT",
73
  "IALLOCATOR_OPT",
74
  "IGNORE_CONSIST_OPT",
75
  "IGNORE_FAILURES_OPT",
76
  "IGNORE_SECONDARIES_OPT",
77
  "IGNORE_SIZE_OPT",
78
  "MAC_PREFIX_OPT",
79
  "MASTER_NETDEV_OPT",
80
  "MC_OPT",
81
  "NET_OPT",
82
  "NEW_CLUSTER_CERT_OPT",
83
  "NEW_HMAC_KEY_OPT",
84
  "NEW_RAPI_CERT_OPT",
85
  "NEW_SECONDARY_OPT",
86
  "NIC_PARAMS_OPT",
87
  "NODE_LIST_OPT",
88
  "NODE_PLACEMENT_OPT",
89
  "NOHDR_OPT",
90
  "NOIPCHECK_OPT",
91
  "NONAMECHECK_OPT",
92
  "NOLVM_STORAGE_OPT",
93
  "NOMODIFY_ETCHOSTS_OPT",
94
  "NOMODIFY_SSH_SETUP_OPT",
95
  "NONICS_OPT",
96
  "NONLIVE_OPT",
97
  "NONPLUS1_OPT",
98
  "NOSHUTDOWN_OPT",
99
  "NOSTART_OPT",
100
  "NOSSH_KEYCHECK_OPT",
101
  "NOVOTING_OPT",
102
  "NWSYNC_OPT",
103
  "ON_PRIMARY_OPT",
104
  "ON_SECONDARY_OPT",
105
  "OFFLINE_OPT",
106
  "OS_OPT",
107
  "OS_SIZE_OPT",
108
  "RAPI_CERT_OPT",
109
  "READD_OPT",
110
  "REBOOT_TYPE_OPT",
111
  "SECONDARY_IP_OPT",
112
  "SELECT_OS_OPT",
113
  "SEP_OPT",
114
  "SHOWCMD_OPT",
115
  "SHUTDOWN_TIMEOUT_OPT",
116
  "SINGLE_NODE_OPT",
117
  "SRC_DIR_OPT",
118
  "SRC_NODE_OPT",
119
  "SUBMIT_OPT",
120
  "STATIC_OPT",
121
  "SYNC_OPT",
122
  "TAG_SRC_OPT",
123
  "TIMEOUT_OPT",
124
  "USEUNITS_OPT",
125
  "VERBOSE_OPT",
126
  "VG_NAME_OPT",
127
  "YES_DOIT_OPT",
128
  # Generic functions for CLI programs
129
  "GenericMain",
130
  "GenericInstanceCreate",
131
  "GetClient",
132
  "GetOnlineNodes",
133
  "JobExecutor",
134
  "JobSubmittedException",
135
  "ParseTimespec",
136
  "RunWhileClusterStopped",
137
  "SubmitOpCode",
138
  "SubmitOrSend",
139
  "UsesRPC",
140
  # Formatting functions
141
  "ToStderr", "ToStdout",
142
  "FormatError",
143
  "GenerateTable",
144
  "AskUser",
145
  "FormatTimestamp",
146
  # Tags functions
147
  "ListTags",
148
  "AddTags",
149
  "RemoveTags",
150
  # command line options support infrastructure
151
  "ARGS_MANY_INSTANCES",
152
  "ARGS_MANY_NODES",
153
  "ARGS_NONE",
154
  "ARGS_ONE_INSTANCE",
155
  "ARGS_ONE_NODE",
156
  "ARGS_ONE_OS",
157
  "ArgChoice",
158
  "ArgCommand",
159
  "ArgFile",
160
  "ArgHost",
161
  "ArgInstance",
162
  "ArgJobId",
163
  "ArgNode",
164
  "ArgOs",
165
  "ArgSuggest",
166
  "ArgUnknown",
167
  "OPT_COMPL_INST_ADD_NODES",
168
  "OPT_COMPL_MANY_NODES",
169
  "OPT_COMPL_ONE_IALLOCATOR",
170
  "OPT_COMPL_ONE_INSTANCE",
171
  "OPT_COMPL_ONE_NODE",
172
  "OPT_COMPL_ONE_OS",
173
  "cli_option",
174
  "SplitNodeOption",
175
  "CalculateOSNames",
176
  ]
177

    
178
NO_PREFIX = "no_"
179
UN_PREFIX = "-"
180

    
181

    
182
class _Argument:
183
  def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
184
    self.min = min
185
    self.max = max
186

    
187
  def __repr__(self):
188
    return ("<%s min=%s max=%s>" %
189
            (self.__class__.__name__, self.min, self.max))
190

    
191

    
192
class ArgSuggest(_Argument):
193
  """Suggesting argument.
194

195
  Value can be any of the ones passed to the constructor.
196

197
  """
198
  # pylint: disable-msg=W0622
199
  def __init__(self, min=0, max=None, choices=None):
200
    _Argument.__init__(self, min=min, max=max)
201
    self.choices = choices
202

    
203
  def __repr__(self):
204
    return ("<%s min=%s max=%s choices=%r>" %
205
            (self.__class__.__name__, self.min, self.max, self.choices))
206

    
207

    
208
class ArgChoice(ArgSuggest):
209
  """Choice argument.
210

211
  Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
212
  but value must be one of the choices.
213

214
  """
215

    
216

    
217
class ArgUnknown(_Argument):
218
  """Unknown argument to program (e.g. determined at runtime).
219

220
  """
221

    
222

    
223
class ArgInstance(_Argument):
224
  """Instances argument.
225

226
  """
227

    
228

    
229
class ArgNode(_Argument):
230
  """Node argument.
231

232
  """
233

    
234
class ArgJobId(_Argument):
235
  """Job ID argument.
236

237
  """
238

    
239

    
240
class ArgFile(_Argument):
241
  """File path argument.
242

243
  """
244

    
245

    
246
class ArgCommand(_Argument):
247
  """Command argument.
248

249
  """
250

    
251

    
252
class ArgHost(_Argument):
253
  """Host argument.
254

255
  """
256

    
257

    
258
class ArgOs(_Argument):
259
  """OS argument.
260

261
  """
262

    
263

    
264
ARGS_NONE = []
265
ARGS_MANY_INSTANCES = [ArgInstance()]
266
ARGS_MANY_NODES = [ArgNode()]
267
ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
268
ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
269
ARGS_ONE_OS = [ArgOs(min=1, max=1)]
270

    
271

    
272
def _ExtractTagsObject(opts, args):
273
  """Extract the tag type object.
274

275
  Note that this function will modify its args parameter.
276

277
  """
278
  if not hasattr(opts, "tag_type"):
279
    raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
280
  kind = opts.tag_type
281
  if kind == constants.TAG_CLUSTER:
282
    retval = kind, kind
283
  elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
284
    if not args:
285
      raise errors.OpPrereqError("no arguments passed to the command")
286
    name = args.pop(0)
287
    retval = kind, name
288
  else:
289
    raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
290
  return retval
291

    
292

    
293
def _ExtendTags(opts, args):
294
  """Extend the args if a source file has been given.
295

296
  This function will extend the tags with the contents of the file
297
  passed in the 'tags_source' attribute of the opts parameter. A file
298
  named '-' will be replaced by stdin.
299

300
  """
301
  fname = opts.tags_source
302
  if fname is None:
303
    return
304
  if fname == "-":
305
    new_fh = sys.stdin
306
  else:
307
    new_fh = open(fname, "r")
308
  new_data = []
309
  try:
310
    # we don't use the nice 'new_data = [line.strip() for line in fh]'
311
    # because of python bug 1633941
312
    while True:
313
      line = new_fh.readline()
314
      if not line:
315
        break
316
      new_data.append(line.strip())
317
  finally:
318
    new_fh.close()
319
  args.extend(new_data)
320

    
321

    
322
def ListTags(opts, args):
323
  """List the tags on a given object.
324

325
  This is a generic implementation that knows how to deal with all
326
  three cases of tag objects (cluster, node, instance). The opts
327
  argument is expected to contain a tag_type field denoting what
328
  object type we work on.
329

330
  """
331
  kind, name = _ExtractTagsObject(opts, args)
332
  cl = GetClient()
333
  result = cl.QueryTags(kind, name)
334
  result = list(result)
335
  result.sort()
336
  for tag in result:
337
    ToStdout(tag)
338

    
339

    
340
def AddTags(opts, args):
341
  """Add tags on a given object.
342

343
  This is a generic implementation that knows how to deal with all
344
  three cases of tag objects (cluster, node, instance). The opts
345
  argument is expected to contain a tag_type field denoting what
346
  object type we work on.
347

348
  """
349
  kind, name = _ExtractTagsObject(opts, args)
350
  _ExtendTags(opts, args)
351
  if not args:
352
    raise errors.OpPrereqError("No tags to be added")
353
  op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
354
  SubmitOpCode(op)
355

    
356

    
357
def RemoveTags(opts, args):
358
  """Remove tags from a given object.
359

360
  This is a generic implementation that knows how to deal with all
361
  three cases of tag objects (cluster, node, instance). The opts
362
  argument is expected to contain a tag_type field denoting what
363
  object type we work on.
364

365
  """
366
  kind, name = _ExtractTagsObject(opts, args)
367
  _ExtendTags(opts, args)
368
  if not args:
369
    raise errors.OpPrereqError("No tags to be removed")
370
  op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
371
  SubmitOpCode(op)
372

    
373

    
374
def check_unit(option, opt, value): # pylint: disable-msg=W0613
375
  """OptParsers custom converter for units.
376

377
  """
378
  try:
379
    return utils.ParseUnit(value)
380
  except errors.UnitParseError, err:
381
    raise OptionValueError("option %s: %s" % (opt, err))
382

    
383

    
384
def _SplitKeyVal(opt, data):
385
  """Convert a KeyVal string into a dict.
386

387
  This function will convert a key=val[,...] string into a dict. Empty
388
  values will be converted specially: keys which have the prefix 'no_'
389
  will have the value=False and the prefix stripped, the others will
390
  have value=True.
391

392
  @type opt: string
393
  @param opt: a string holding the option name for which we process the
394
      data, used in building error messages
395
  @type data: string
396
  @param data: a string of the format key=val,key=val,...
397
  @rtype: dict
398
  @return: {key=val, key=val}
399
  @raises errors.ParameterError: if there are duplicate keys
400

401
  """
402
  kv_dict = {}
403
  if data:
404
    for elem in utils.UnescapeAndSplit(data, sep=","):
405
      if "=" in elem:
406
        key, val = elem.split("=", 1)
407
      else:
408
        if elem.startswith(NO_PREFIX):
409
          key, val = elem[len(NO_PREFIX):], False
410
        elif elem.startswith(UN_PREFIX):
411
          key, val = elem[len(UN_PREFIX):], None
412
        else:
413
          key, val = elem, True
414
      if key in kv_dict:
415
        raise errors.ParameterError("Duplicate key '%s' in option %s" %
416
                                    (key, opt))
417
      kv_dict[key] = val
418
  return kv_dict
419

    
420

    
421
def check_ident_key_val(option, opt, value):  # pylint: disable-msg=W0613
422
  """Custom parser for ident:key=val,key=val options.
423

424
  This will store the parsed values as a tuple (ident, {key: val}). As such,
425
  multiple uses of this option via action=append is possible.
426

427
  """
428
  if ":" not in value:
429
    ident, rest = value, ''
430
  else:
431
    ident, rest = value.split(":", 1)
432

    
433
  if ident.startswith(NO_PREFIX):
434
    if rest:
435
      msg = "Cannot pass options when removing parameter groups: %s" % value
436
      raise errors.ParameterError(msg)
437
    retval = (ident[len(NO_PREFIX):], False)
438
  elif ident.startswith(UN_PREFIX):
439
    if rest:
440
      msg = "Cannot pass options when removing parameter groups: %s" % value
441
      raise errors.ParameterError(msg)
442
    retval = (ident[len(UN_PREFIX):], None)
443
  else:
444
    kv_dict = _SplitKeyVal(opt, rest)
445
    retval = (ident, kv_dict)
446
  return retval
447

    
448

    
449
def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
450
  """Custom parser class for key=val,key=val options.
451

452
  This will store the parsed values as a dict {key: val}.
453

454
  """
455
  return _SplitKeyVal(opt, value)
456

    
457

    
458
# completion_suggestion is normally a list. Using numeric values not evaluating
459
# to False for dynamic completion.
460
(OPT_COMPL_MANY_NODES,
461
 OPT_COMPL_ONE_NODE,
462
 OPT_COMPL_ONE_INSTANCE,
463
 OPT_COMPL_ONE_OS,
464
 OPT_COMPL_ONE_IALLOCATOR,
465
 OPT_COMPL_INST_ADD_NODES) = range(100, 106)
466

    
467
OPT_COMPL_ALL = frozenset([
468
  OPT_COMPL_MANY_NODES,
469
  OPT_COMPL_ONE_NODE,
470
  OPT_COMPL_ONE_INSTANCE,
471
  OPT_COMPL_ONE_OS,
472
  OPT_COMPL_ONE_IALLOCATOR,
473
  OPT_COMPL_INST_ADD_NODES,
474
  ])
475

    
476

    
477
class CliOption(Option):
478
  """Custom option class for optparse.
479

480
  """
481
  ATTRS = Option.ATTRS + [
482
    "completion_suggest",
483
    ]
484
  TYPES = Option.TYPES + (
485
    "identkeyval",
486
    "keyval",
487
    "unit",
488
    )
489
  TYPE_CHECKER = Option.TYPE_CHECKER.copy()
490
  TYPE_CHECKER["identkeyval"] = check_ident_key_val
491
  TYPE_CHECKER["keyval"] = check_key_val
492
  TYPE_CHECKER["unit"] = check_unit
493

    
494

    
495
# optparse.py sets make_option, so we do it for our own option class, too
496
cli_option = CliOption
497

    
498

    
499
_YESNO = ("yes", "no")
500
_YORNO = "yes|no"
501

    
502
DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
503
                       help="Increase debugging level")
504

    
505
NOHDR_OPT = cli_option("--no-headers", default=False,
506
                       action="store_true", dest="no_headers",
507
                       help="Don't display column headers")
508

    
509
SEP_OPT = cli_option("--separator", default=None,
510
                     action="store", dest="separator",
511
                     help=("Separator between output fields"
512
                           " (defaults to one space)"))
513

    
514
USEUNITS_OPT = cli_option("--units", default=None,
515
                          dest="units", choices=('h', 'm', 'g', 't'),
516
                          help="Specify units for output (one of hmgt)")
517

    
518
FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
519
                        type="string", metavar="FIELDS",
520
                        help="Comma separated list of output fields")
521

    
522
FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
523
                       default=False, help="Force the operation")
524

    
525
CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
526
                         default=False, help="Do not require confirmation")
527

    
528
TAG_SRC_OPT = cli_option("--from", dest="tags_source",
529
                         default=None, help="File with tag names")
530

    
531
SUBMIT_OPT = cli_option("--submit", dest="submit_only",
532
                        default=False, action="store_true",
533
                        help=("Submit the job and return the job ID, but"
534
                              " don't wait for the job to finish"))
535

    
536
SYNC_OPT = cli_option("--sync", dest="do_locking",
537
                      default=False, action="store_true",
538
                      help=("Grab locks while doing the queries"
539
                            " in order to ensure more consistent results"))
540

    
541
_DRY_RUN_OPT = cli_option("--dry-run", default=False,
542
                          action="store_true",
543
                          help=("Do not execute the operation, just run the"
544
                                " check steps and verify it it could be"
545
                                " executed"))
546

    
547
VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
548
                         action="store_true",
549
                         help="Increase the verbosity of the operation")
550

    
551
DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
552
                              action="store_true", dest="simulate_errors",
553
                              help="Debugging option that makes the operation"
554
                              " treat most runtime checks as failed")
555

    
556
NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
557
                        default=True, action="store_false",
558
                        help="Don't wait for sync (DANGEROUS!)")
559

    
560
DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
561
                               help="Custom disk setup (diskless, file,"
562
                               " plain or drbd)",
563
                               default=None, metavar="TEMPL",
564
                               choices=list(constants.DISK_TEMPLATES))
565

    
566
NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
567
                        help="Do not create any network cards for"
568
                        " the instance")
569

    
570
FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
571
                               help="Relative path under default cluster-wide"
572
                               " file storage dir to store file-based disks",
573
                               default=None, metavar="<DIR>")
574

    
575
FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
576
                                  help="Driver to use for image files",
577
                                  default="loop", metavar="<DRIVER>",
578
                                  choices=list(constants.FILE_DRIVER))
579

    
580
IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
581
                            help="Select nodes for the instance automatically"
582
                            " using the <NAME> iallocator plugin",
583
                            default=None, type="string",
584
                            completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
585

    
586
OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
587
                    metavar="<os>",
588
                    completion_suggest=OPT_COMPL_ONE_OS)
589

    
590
FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
591
                               action="store_true", default=False,
592
                               help="Force an unknown variant")
593

    
594
BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
595
                         type="keyval", default={},
596
                         help="Backend parameters")
597

    
598
HVOPTS_OPT =  cli_option("-H", "--hypervisor-parameters", type="keyval",
599
                         default={}, dest="hvparams",
600
                         help="Hypervisor parameters")
601

    
602
HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
603
                            help="Hypervisor and hypervisor options, in the"
604
                            " format hypervisor:option=value,option=value,...",
605
                            default=None, type="identkeyval")
606

    
607
HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
608
                        help="Hypervisor and hypervisor options, in the"
609
                        " format hypervisor:option=value,option=value,...",
610
                        default=[], action="append", type="identkeyval")
611

    
612
NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
613
                           action="store_false",
614
                           help="Don't check that the instance's IP"
615
                           " is alive")
616

    
617
NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
618
                             default=True, action="store_false",
619
                             help="Don't check that the instance's name"
620
                             " is resolvable")
621

    
622
NET_OPT = cli_option("--net",
623
                     help="NIC parameters", default=[],
624
                     dest="nics", action="append", type="identkeyval")
625

    
626
DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
627
                      dest="disks", action="append", type="identkeyval")
628

    
629
DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
630
                         help="Comma-separated list of disks"
631
                         " indices to act on (e.g. 0,2) (optional,"
632
                         " defaults to all disks)")
633

    
634
OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
635
                         help="Enforces a single-disk configuration using the"
636
                         " given disk size, in MiB unless a suffix is used",
637
                         default=None, type="unit", metavar="<size>")
638

    
639
IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
640
                                dest="ignore_consistency",
641
                                action="store_true", default=False,
642
                                help="Ignore the consistency of the disks on"
643
                                " the secondary")
644

    
645
NONLIVE_OPT = cli_option("--non-live", dest="live",
646
                         default=True, action="store_false",
647
                         help="Do a non-live migration (this usually means"
648
                         " freeze the instance, save the state, transfer and"
649
                         " only then resume running on the secondary node)")
650

    
651
NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
652
                                help="Target node and optional secondary node",
653
                                metavar="<pnode>[:<snode>]",
654
                                completion_suggest=OPT_COMPL_INST_ADD_NODES)
655

    
656
NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
657
                           action="append", metavar="<node>",
658
                           help="Use only this node (can be used multiple"
659
                           " times, if not given defaults to all nodes)",
660
                           completion_suggest=OPT_COMPL_ONE_NODE)
661

    
662
SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
663
                             metavar="<node>",
664
                             completion_suggest=OPT_COMPL_ONE_NODE)
665

    
666
NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
667
                         action="store_false",
668
                         help="Don't start the instance after creation")
669

    
670
SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
671
                         action="store_true", default=False,
672
                         help="Show command instead of executing it")
673

    
674
CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
675
                         default=False, action="store_true",
676
                         help="Instead of performing the migration, try to"
677
                         " recover from a failed cleanup. This is safe"
678
                         " to run even if the instance is healthy, but it"
679
                         " will create extra replication traffic and "
680
                         " disrupt briefly the replication (like during the"
681
                         " migration")
682

    
683
STATIC_OPT = cli_option("-s", "--static", dest="static",
684
                        action="store_true", default=False,
685
                        help="Only show configuration data, not runtime data")
686

    
687
ALL_OPT = cli_option("--all", dest="show_all",
688
                     default=False, action="store_true",
689
                     help="Show info on all instances on the cluster."
690
                     " This can take a long time to run, use wisely")
691

    
692
SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
693
                           action="store_true", default=False,
694
                           help="Interactive OS reinstall, lists available"
695
                           " OS templates for selection")
696

    
697
IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
698
                                 action="store_true", default=False,
699
                                 help="Remove the instance from the cluster"
700
                                 " configuration even if there are failures"
701
                                 " during the removal process")
702

    
703
NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
704
                               help="Specifies the new secondary node",
705
                               metavar="NODE", default=None,
706
                               completion_suggest=OPT_COMPL_ONE_NODE)
707

    
708
ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
709
                            default=False, action="store_true",
710
                            help="Replace the disk(s) on the primary"
711
                            " node (only for the drbd template)")
712

    
713
ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
714
                              default=False, action="store_true",
715
                              help="Replace the disk(s) on the secondary"
716
                              " node (only for the drbd template)")
717

    
718
AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
719
                              default=False, action="store_true",
720
                              help="Lock all nodes and auto-promote as needed"
721
                              " to MC status")
722

    
723
AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
724
                              default=False, action="store_true",
725
                              help="Automatically replace faulty disks"
726
                              " (only for the drbd template)")
727

    
728
IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
729
                             default=False, action="store_true",
730
                             help="Ignore current recorded size"
731
                             " (useful for forcing activation when"
732
                             " the recorded size is wrong)")
733

    
734
SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
735
                          metavar="<node>",
736
                          completion_suggest=OPT_COMPL_ONE_NODE)
737

    
738
SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
739
                         metavar="<dir>")
740

    
741
SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
742
                              help="Specify the secondary ip for the node",
743
                              metavar="ADDRESS", default=None)
744

    
745
READD_OPT = cli_option("--readd", dest="readd",
746
                       default=False, action="store_true",
747
                       help="Readd old node after replacing it")
748

    
749
NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
750
                                default=True, action="store_false",
751
                                help="Disable SSH key fingerprint checking")
752

    
753

    
754
MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
755
                    choices=_YESNO, default=None, metavar=_YORNO,
756
                    help="Set the master_candidate flag on the node")
757

    
758
OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
759
                         choices=_YESNO, default=None,
760
                         help="Set the offline flag on the node")
761

    
762
DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
763
                         choices=_YESNO, default=None,
764
                         help="Set the drained flag on the node")
765

    
766
ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
767
                             choices=_YESNO, default=None, metavar=_YORNO,
768
                             help="Set the allocatable flag on a volume")
769

    
770
NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
771
                               help="Disable support for lvm based instances"
772
                               " (cluster-wide)",
773
                               action="store_false", default=True)
774

    
775
ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
776
                            dest="enabled_hypervisors",
777
                            help="Comma-separated list of hypervisors",
778
                            type="string", default=None)
779

    
780
NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
781
                            type="keyval", default={},
782
                            help="NIC parameters")
783

    
784
CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
785
                         dest="candidate_pool_size", type="int",
786
                         help="Set the candidate pool size")
787

    
788
VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name",
789
                         help="Enables LVM and specifies the volume group"
790
                         " name (cluster-wide) for disk allocation [xenvg]",
791
                         metavar="VG", default=None)
792

    
793
YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
794
                          help="Destroy cluster", action="store_true")
795

    
796
NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
797
                          help="Skip node agreement check (dangerous)",
798
                          action="store_true", default=False)
799

    
800
MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
801
                            help="Specify the mac prefix for the instance IP"
802
                            " addresses, in the format XX:XX:XX",
803
                            metavar="PREFIX",
804
                            default=None)
805

    
806
MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
807
                               help="Specify the node interface (cluster-wide)"
808
                               " on which the master IP address will be added "
809
                               " [%s]" % constants.DEFAULT_BRIDGE,
810
                               metavar="NETDEV",
811
                               default=constants.DEFAULT_BRIDGE)
812

    
813

    
814
GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
815
                                help="Specify the default directory (cluster-"
816
                                "wide) for storing the file-based disks [%s]" %
817
                                constants.DEFAULT_FILE_STORAGE_DIR,
818
                                metavar="DIR",
819
                                default=constants.DEFAULT_FILE_STORAGE_DIR)
820

    
821
NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
822
                                   help="Don't modify /etc/hosts",
823
                                   action="store_false", default=True)
824

    
825
NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
826
                                    help="Don't initialize SSH keys",
827
                                    action="store_false", default=True)
828

    
829
ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
830
                             help="Enable parseable error messages",
831
                             action="store_true", default=False)
832

    
833
NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
834
                          help="Skip N+1 memory redundancy tests",
835
                          action="store_true", default=False)
836

    
837
REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
838
                             help="Type of reboot: soft/hard/full",
839
                             default=constants.INSTANCE_REBOOT_HARD,
840
                             metavar="<REBOOT>",
841
                             choices=list(constants.REBOOT_TYPES))
842

    
843
IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
844
                                    dest="ignore_secondaries",
845
                                    default=False, action="store_true",
846
                                    help="Ignore errors from secondaries")
847

    
848
NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
849
                            action="store_false", default=True,
850
                            help="Don't shutdown the instance (unsafe)")
851

    
852
TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
853
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
854
                         help="Maximum time to wait")
855

    
856
SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
857
                         dest="shutdown_timeout", type="int",
858
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
859
                         help="Maximum time to wait for instance shutdown")
860

    
861
EARLY_RELEASE_OPT = cli_option("--early-release",
862
                               dest="early_release", default=False,
863
                               action="store_true",
864
                               help="Release the locks on the secondary"
865
                               " node(s) early")
866

    
867
NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
868
                                  dest="new_cluster_cert",
869
                                  default=False, action="store_true",
870
                                  help="Generate a new cluster certificate")
871

    
872
RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
873
                           default=None,
874
                           help="File containing new RAPI certificate")
875

    
876
NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
877
                               default=None, action="store_true",
878
                               help=("Generate a new self-signed RAPI"
879
                                     " certificate"))
880

    
881
NEW_HMAC_KEY_OPT = cli_option("--new-hmac-key", dest="new_hmac_key",
882
                              default=False, action="store_true",
883
                              help="Create a new HMAC key")
884

    
885

    
886
def _ParseArgs(argv, commands, aliases):
887
  """Parser for the command line arguments.
888

889
  This function parses the arguments and returns the function which
890
  must be executed together with its (modified) arguments.
891

892
  @param argv: the command line
893
  @param commands: dictionary with special contents, see the design
894
      doc for cmdline handling
895
  @param aliases: dictionary with command aliases {'alias': 'target, ...}
896

897
  """
898
  if len(argv) == 0:
899
    binary = "<command>"
900
  else:
901
    binary = argv[0].split("/")[-1]
902

    
903
  if len(argv) > 1 and argv[1] == "--version":
904
    ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
905
    # Quit right away. That way we don't have to care about this special
906
    # argument. optparse.py does it the same.
907
    sys.exit(0)
908

    
909
  if len(argv) < 2 or not (argv[1] in commands or
910
                           argv[1] in aliases):
911
    # let's do a nice thing
912
    sortedcmds = commands.keys()
913
    sortedcmds.sort()
914

    
915
    ToStdout("Usage: %s {command} [options...] [argument...]", binary)
916
    ToStdout("%s <command> --help to see details, or man %s", binary, binary)
917
    ToStdout("")
918

    
919
    # compute the max line length for cmd + usage
920
    mlen = max([len(" %s" % cmd) for cmd in commands])
921
    mlen = min(60, mlen) # should not get here...
922

    
923
    # and format a nice command list
924
    ToStdout("Commands:")
925
    for cmd in sortedcmds:
926
      cmdstr = " %s" % (cmd,)
927
      help_text = commands[cmd][4]
928
      help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
929
      ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
930
      for line in help_lines:
931
        ToStdout("%-*s   %s", mlen, "", line)
932

    
933
    ToStdout("")
934

    
935
    return None, None, None
936

    
937
  # get command, unalias it, and look it up in commands
938
  cmd = argv.pop(1)
939
  if cmd in aliases:
940
    if cmd in commands:
941
      raise errors.ProgrammerError("Alias '%s' overrides an existing"
942
                                   " command" % cmd)
943

    
944
    if aliases[cmd] not in commands:
945
      raise errors.ProgrammerError("Alias '%s' maps to non-existing"
946
                                   " command '%s'" % (cmd, aliases[cmd]))
947

    
948
    cmd = aliases[cmd]
949

    
950
  func, args_def, parser_opts, usage, description = commands[cmd]
951
  parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT, DEBUG_OPT],
952
                        description=description,
953
                        formatter=TitledHelpFormatter(),
954
                        usage="%%prog %s %s" % (cmd, usage))
955
  parser.disable_interspersed_args()
956
  options, args = parser.parse_args()
957

    
958
  if not _CheckArguments(cmd, args_def, args):
959
    return None, None, None
960

    
961
  return func, options, args
962

    
963

    
964
def _CheckArguments(cmd, args_def, args):
965
  """Verifies the arguments using the argument definition.
966

967
  Algorithm:
968

969
    1. Abort with error if values specified by user but none expected.
970

971
    1. For each argument in definition
972

973
      1. Keep running count of minimum number of values (min_count)
974
      1. Keep running count of maximum number of values (max_count)
975
      1. If it has an unlimited number of values
976

977
        1. Abort with error if it's not the last argument in the definition
978

979
    1. If last argument has limited number of values
980

981
      1. Abort with error if number of values doesn't match or is too large
982

983
    1. Abort with error if user didn't pass enough values (min_count)
984

985
  """
986
  if args and not args_def:
987
    ToStderr("Error: Command %s expects no arguments", cmd)
988
    return False
989

    
990
  min_count = None
991
  max_count = None
992
  check_max = None
993

    
994
  last_idx = len(args_def) - 1
995

    
996
  for idx, arg in enumerate(args_def):
997
    if min_count is None:
998
      min_count = arg.min
999
    elif arg.min is not None:
1000
      min_count += arg.min
1001

    
1002
    if max_count is None:
1003
      max_count = arg.max
1004
    elif arg.max is not None:
1005
      max_count += arg.max
1006

    
1007
    if idx == last_idx:
1008
      check_max = (arg.max is not None)
1009

    
1010
    elif arg.max is None:
1011
      raise errors.ProgrammerError("Only the last argument can have max=None")
1012

    
1013
  if check_max:
1014
    # Command with exact number of arguments
1015
    if (min_count is not None and max_count is not None and
1016
        min_count == max_count and len(args) != min_count):
1017
      ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
1018
      return False
1019

    
1020
    # Command with limited number of arguments
1021
    if max_count is not None and len(args) > max_count:
1022
      ToStderr("Error: Command %s expects only %d argument(s)",
1023
               cmd, max_count)
1024
      return False
1025

    
1026
  # Command with some required arguments
1027
  if min_count is not None and len(args) < min_count:
1028
    ToStderr("Error: Command %s expects at least %d argument(s)",
1029
             cmd, min_count)
1030
    return False
1031

    
1032
  return True
1033

    
1034

    
1035
def SplitNodeOption(value):
1036
  """Splits the value of a --node option.
1037

1038
  """
1039
  if value and ':' in value:
1040
    return value.split(':', 1)
1041
  else:
1042
    return (value, None)
1043

    
1044

    
1045
def CalculateOSNames(os_name, os_variants):
1046
  """Calculates all the names an OS can be called, according to its variants.
1047

1048
  @type os_name: string
1049
  @param os_name: base name of the os
1050
  @type os_variants: list or None
1051
  @param os_variants: list of supported variants
1052
  @rtype: list
1053
  @return: list of valid names
1054

1055
  """
1056
  if os_variants:
1057
    return ['%s+%s' % (os_name, v) for v in os_variants]
1058
  else:
1059
    return [os_name]
1060

    
1061

    
1062
def UsesRPC(fn):
1063
  def wrapper(*args, **kwargs):
1064
    rpc.Init()
1065
    try:
1066
      return fn(*args, **kwargs)
1067
    finally:
1068
      rpc.Shutdown()
1069
  return wrapper
1070

    
1071

    
1072
def AskUser(text, choices=None):
1073
  """Ask the user a question.
1074

1075
  @param text: the question to ask
1076

1077
  @param choices: list with elements tuples (input_char, return_value,
1078
      description); if not given, it will default to: [('y', True,
1079
      'Perform the operation'), ('n', False, 'Do no do the operation')];
1080
      note that the '?' char is reserved for help
1081

1082
  @return: one of the return values from the choices list; if input is
1083
      not possible (i.e. not running with a tty, we return the last
1084
      entry from the list
1085

1086
  """
1087
  if choices is None:
1088
    choices = [('y', True, 'Perform the operation'),
1089
               ('n', False, 'Do not perform the operation')]
1090
  if not choices or not isinstance(choices, list):
1091
    raise errors.ProgrammerError("Invalid choices argument to AskUser")
1092
  for entry in choices:
1093
    if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
1094
      raise errors.ProgrammerError("Invalid choices element to AskUser")
1095

    
1096
  answer = choices[-1][1]
1097
  new_text = []
1098
  for line in text.splitlines():
1099
    new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
1100
  text = "\n".join(new_text)
1101
  try:
1102
    f = file("/dev/tty", "a+")
1103
  except IOError:
1104
    return answer
1105
  try:
1106
    chars = [entry[0] for entry in choices]
1107
    chars[-1] = "[%s]" % chars[-1]
1108
    chars.append('?')
1109
    maps = dict([(entry[0], entry[1]) for entry in choices])
1110
    while True:
1111
      f.write(text)
1112
      f.write('\n')
1113
      f.write("/".join(chars))
1114
      f.write(": ")
1115
      line = f.readline(2).strip().lower()
1116
      if line in maps:
1117
        answer = maps[line]
1118
        break
1119
      elif line == '?':
1120
        for entry in choices:
1121
          f.write(" %s - %s\n" % (entry[0], entry[2]))
1122
        f.write("\n")
1123
        continue
1124
  finally:
1125
    f.close()
1126
  return answer
1127

    
1128

    
1129
class JobSubmittedException(Exception):
1130
  """Job was submitted, client should exit.
1131

1132
  This exception has one argument, the ID of the job that was
1133
  submitted. The handler should print this ID.
1134

1135
  This is not an error, just a structured way to exit from clients.
1136

1137
  """
1138

    
1139

    
1140
def SendJob(ops, cl=None):
1141
  """Function to submit an opcode without waiting for the results.
1142

1143
  @type ops: list
1144
  @param ops: list of opcodes
1145
  @type cl: luxi.Client
1146
  @param cl: the luxi client to use for communicating with the master;
1147
             if None, a new client will be created
1148

1149
  """
1150
  if cl is None:
1151
    cl = GetClient()
1152

    
1153
  job_id = cl.SubmitJob(ops)
1154

    
1155
  return job_id
1156

    
1157

    
1158
def PollJob(job_id, cl=None, feedback_fn=None):
1159
  """Function to poll for the result of a job.
1160

1161
  @type job_id: job identified
1162
  @param job_id: the job to poll for results
1163
  @type cl: luxi.Client
1164
  @param cl: the luxi client to use for communicating with the master;
1165
             if None, a new client will be created
1166

1167
  """
1168
  if cl is None:
1169
    cl = GetClient()
1170

    
1171
  prev_job_info = None
1172
  prev_logmsg_serial = None
1173

    
1174
  status = None
1175

    
1176
  notified_queued = False
1177
  notified_waitlock = False
1178

    
1179
  while True:
1180
    result = cl.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1181
                                     prev_logmsg_serial)
1182
    if not result:
1183
      # job not found, go away!
1184
      raise errors.JobLost("Job with id %s lost" % job_id)
1185
    elif result == constants.JOB_NOTCHANGED:
1186
      if status is not None and not callable(feedback_fn):
1187
        if status == constants.JOB_STATUS_QUEUED and not notified_queued:
1188
          ToStderr("Job %s is waiting in queue", job_id)
1189
          notified_queued = True
1190
        elif status == constants.JOB_STATUS_WAITLOCK and not notified_waitlock:
1191
          ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1192
          notified_waitlock = True
1193

    
1194
      # Wait again
1195
      continue
1196

    
1197
    # Split result, a tuple of (field values, log entries)
1198
    (job_info, log_entries) = result
1199
    (status, ) = job_info
1200

    
1201
    if log_entries:
1202
      for log_entry in log_entries:
1203
        (serial, timestamp, _, message) = log_entry
1204
        if callable(feedback_fn):
1205
          feedback_fn(log_entry[1:])
1206
        else:
1207
          encoded = utils.SafeEncode(message)
1208
          ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded)
1209
        prev_logmsg_serial = max(prev_logmsg_serial, serial)
1210

    
1211
    # TODO: Handle canceled and archived jobs
1212
    elif status in (constants.JOB_STATUS_SUCCESS,
1213
                    constants.JOB_STATUS_ERROR,
1214
                    constants.JOB_STATUS_CANCELING,
1215
                    constants.JOB_STATUS_CANCELED):
1216
      break
1217

    
1218
    prev_job_info = job_info
1219

    
1220
  jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1221
  if not jobs:
1222
    raise errors.JobLost("Job with id %s lost" % job_id)
1223

    
1224
  status, opstatus, result = jobs[0]
1225
  if status == constants.JOB_STATUS_SUCCESS:
1226
    return result
1227
  elif status in (constants.JOB_STATUS_CANCELING,
1228
                  constants.JOB_STATUS_CANCELED):
1229
    raise errors.OpExecError("Job was canceled")
1230
  else:
1231
    has_ok = False
1232
    for idx, (status, msg) in enumerate(zip(opstatus, result)):
1233
      if status == constants.OP_STATUS_SUCCESS:
1234
        has_ok = True
1235
      elif status == constants.OP_STATUS_ERROR:
1236
        errors.MaybeRaise(msg)
1237
        if has_ok:
1238
          raise errors.OpExecError("partial failure (opcode %d): %s" %
1239
                                   (idx, msg))
1240
        else:
1241
          raise errors.OpExecError(str(msg))
1242
    # default failure mode
1243
    raise errors.OpExecError(result)
1244

    
1245

    
1246
def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):
1247
  """Legacy function to submit an opcode.
1248

1249
  This is just a simple wrapper over the construction of the processor
1250
  instance. It should be extended to better handle feedback and
1251
  interaction functions.
1252

1253
  """
1254
  if cl is None:
1255
    cl = GetClient()
1256

    
1257
  SetGenericOpcodeOpts([op], opts)
1258

    
1259
  job_id = SendJob([op], cl)
1260

    
1261
  op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
1262

    
1263
  return op_results[0]
1264

    
1265

    
1266
def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1267
  """Wrapper around SubmitOpCode or SendJob.
1268

1269
  This function will decide, based on the 'opts' parameter, whether to
1270
  submit and wait for the result of the opcode (and return it), or
1271
  whether to just send the job and print its identifier. It is used in
1272
  order to simplify the implementation of the '--submit' option.
1273

1274
  It will also process the opcodes if we're sending the via SendJob
1275
  (otherwise SubmitOpCode does it).
1276

1277
  """
1278
  if opts and opts.submit_only:
1279
    job = [op]
1280
    SetGenericOpcodeOpts(job, opts)
1281
    job_id = SendJob(job, cl=cl)
1282
    raise JobSubmittedException(job_id)
1283
  else:
1284
    return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1285

    
1286

    
1287
def SetGenericOpcodeOpts(opcode_list, options):
1288
  """Processor for generic options.
1289

1290
  This function updates the given opcodes based on generic command
1291
  line options (like debug, dry-run, etc.).
1292

1293
  @param opcode_list: list of opcodes
1294
  @param options: command line options or None
1295
  @return: None (in-place modification)
1296

1297
  """
1298
  if not options:
1299
    return
1300
  for op in opcode_list:
1301
    op.dry_run = options.dry_run
1302
    op.debug_level = options.debug
1303

    
1304

    
1305
def GetClient():
1306
  # TODO: Cache object?
1307
  try:
1308
    client = luxi.Client()
1309
  except luxi.NoMasterError:
1310
    ss = ssconf.SimpleStore()
1311

    
1312
    # Try to read ssconf file
1313
    try:
1314
      ss.GetMasterNode()
1315
    except errors.ConfigurationError:
1316
      raise errors.OpPrereqError("Cluster not initialized or this machine is"
1317
                                 " not part of a cluster")
1318

    
1319
    master, myself = ssconf.GetMasterAndMyself(ss=ss)
1320
    if master != myself:
1321
      raise errors.OpPrereqError("This is not the master node, please connect"
1322
                                 " to node '%s' and rerun the command" %
1323
                                 master)
1324
    raise
1325
  return client
1326

    
1327

    
1328
def FormatError(err):
1329
  """Return a formatted error message for a given error.
1330

1331
  This function takes an exception instance and returns a tuple
1332
  consisting of two values: first, the recommended exit code, and
1333
  second, a string describing the error message (not
1334
  newline-terminated).
1335

1336
  """
1337
  retcode = 1
1338
  obuf = StringIO()
1339
  msg = str(err)
1340
  if isinstance(err, errors.ConfigurationError):
1341
    txt = "Corrupt configuration file: %s" % msg
1342
    logging.error(txt)
1343
    obuf.write(txt + "\n")
1344
    obuf.write("Aborting.")
1345
    retcode = 2
1346
  elif isinstance(err, errors.HooksAbort):
1347
    obuf.write("Failure: hooks execution failed:\n")
1348
    for node, script, out in err.args[0]:
1349
      if out:
1350
        obuf.write("  node: %s, script: %s, output: %s\n" %
1351
                   (node, script, out))
1352
      else:
1353
        obuf.write("  node: %s, script: %s (no output)\n" %
1354
                   (node, script))
1355
  elif isinstance(err, errors.HooksFailure):
1356
    obuf.write("Failure: hooks general failure: %s" % msg)
1357
  elif isinstance(err, errors.ResolverError):
1358
    this_host = utils.HostInfo.SysName()
1359
    if err.args[0] == this_host:
1360
      msg = "Failure: can't resolve my own hostname ('%s')"
1361
    else:
1362
      msg = "Failure: can't resolve hostname '%s'"
1363
    obuf.write(msg % err.args[0])
1364
  elif isinstance(err, errors.OpPrereqError):
1365
    if len(err.args) == 2:
1366
      obuf.write("Failure: prerequisites not met for this"
1367
               " operation:\nerror type: %s, error details:\n%s" %
1368
                 (err.args[1], err.args[0]))
1369
    else:
1370
      obuf.write("Failure: prerequisites not met for this"
1371
                 " operation:\n%s" % msg)
1372
  elif isinstance(err, errors.OpExecError):
1373
    obuf.write("Failure: command execution error:\n%s" % msg)
1374
  elif isinstance(err, errors.TagError):
1375
    obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1376
  elif isinstance(err, errors.JobQueueDrainError):
1377
    obuf.write("Failure: the job queue is marked for drain and doesn't"
1378
               " accept new requests\n")
1379
  elif isinstance(err, errors.JobQueueFull):
1380
    obuf.write("Failure: the job queue is full and doesn't accept new"
1381
               " job submissions until old jobs are archived\n")
1382
  elif isinstance(err, errors.TypeEnforcementError):
1383
    obuf.write("Parameter Error: %s" % msg)
1384
  elif isinstance(err, errors.ParameterError):
1385
    obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1386
  elif isinstance(err, errors.GenericError):
1387
    obuf.write("Unhandled Ganeti error: %s" % msg)
1388
  elif isinstance(err, luxi.NoMasterError):
1389
    obuf.write("Cannot communicate with the master daemon.\nIs it running"
1390
               " and listening for connections?")
1391
  elif isinstance(err, luxi.TimeoutError):
1392
    obuf.write("Timeout while talking to the master daemon. Error:\n"
1393
               "%s" % msg)
1394
  elif isinstance(err, luxi.ProtocolError):
1395
    obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1396
               "%s" % msg)
1397
  elif isinstance(err, JobSubmittedException):
1398
    obuf.write("JobID: %s\n" % err.args[0])
1399
    retcode = 0
1400
  else:
1401
    obuf.write("Unhandled exception: %s" % msg)
1402
  return retcode, obuf.getvalue().rstrip('\n')
1403

    
1404

    
1405
def GenericMain(commands, override=None, aliases=None):
1406
  """Generic main function for all the gnt-* commands.
1407

1408
  Arguments:
1409
    - commands: a dictionary with a special structure, see the design doc
1410
                for command line handling.
1411
    - override: if not None, we expect a dictionary with keys that will
1412
                override command line options; this can be used to pass
1413
                options from the scripts to generic functions
1414
    - aliases: dictionary with command aliases {'alias': 'target, ...}
1415

1416
  """
1417
  # save the program name and the entire command line for later logging
1418
  if sys.argv:
1419
    binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1420
    if len(sys.argv) >= 2:
1421
      binary += " " + sys.argv[1]
1422
      old_cmdline = " ".join(sys.argv[2:])
1423
    else:
1424
      old_cmdline = ""
1425
  else:
1426
    binary = "<unknown program>"
1427
    old_cmdline = ""
1428

    
1429
  if aliases is None:
1430
    aliases = {}
1431

    
1432
  try:
1433
    func, options, args = _ParseArgs(sys.argv, commands, aliases)
1434
  except errors.ParameterError, err:
1435
    result, err_msg = FormatError(err)
1436
    ToStderr(err_msg)
1437
    return 1
1438

    
1439
  if func is None: # parse error
1440
    return 1
1441

    
1442
  if override is not None:
1443
    for key, val in override.iteritems():
1444
      setattr(options, key, val)
1445

    
1446
  utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1447
                     stderr_logging=True, program=binary)
1448

    
1449
  if old_cmdline:
1450
    logging.info("run with arguments '%s'", old_cmdline)
1451
  else:
1452
    logging.info("run with no arguments")
1453

    
1454
  try:
1455
    result = func(options, args)
1456
  except (errors.GenericError, luxi.ProtocolError,
1457
          JobSubmittedException), err:
1458
    result, err_msg = FormatError(err)
1459
    logging.exception("Error during command processing")
1460
    ToStderr(err_msg)
1461

    
1462
  return result
1463

    
1464

    
1465
def GenericInstanceCreate(mode, opts, args):
1466
  """Add an instance to the cluster via either creation or import.
1467

1468
  @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1469
  @param opts: the command line options selected by the user
1470
  @type args: list
1471
  @param args: should contain only one element, the new instance name
1472
  @rtype: int
1473
  @return: the desired exit code
1474

1475
  """
1476
  instance = args[0]
1477

    
1478
  (pnode, snode) = SplitNodeOption(opts.node)
1479

    
1480
  hypervisor = None
1481
  hvparams = {}
1482
  if opts.hypervisor:
1483
    hypervisor, hvparams = opts.hypervisor
1484

    
1485
  if opts.nics:
1486
    try:
1487
      nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
1488
    except ValueError, err:
1489
      raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
1490
    nics = [{}] * nic_max
1491
    for nidx, ndict in opts.nics:
1492
      nidx = int(nidx)
1493
      if not isinstance(ndict, dict):
1494
        msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict)
1495
        raise errors.OpPrereqError(msg)
1496
      nics[nidx] = ndict
1497
  elif opts.no_nics:
1498
    # no nics
1499
    nics = []
1500
  else:
1501
    # default of one nic, all auto
1502
    nics = [{}]
1503

    
1504
  if opts.disk_template == constants.DT_DISKLESS:
1505
    if opts.disks or opts.sd_size is not None:
1506
      raise errors.OpPrereqError("Diskless instance but disk"
1507
                                 " information passed")
1508
    disks = []
1509
  else:
1510
    if not opts.disks and not opts.sd_size:
1511
      raise errors.OpPrereqError("No disk information specified")
1512
    if opts.disks and opts.sd_size is not None:
1513
      raise errors.OpPrereqError("Please use either the '--disk' or"
1514
                                 " '-s' option")
1515
    if opts.sd_size is not None:
1516
      opts.disks = [(0, {"size": opts.sd_size})]
1517
    try:
1518
      disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1519
    except ValueError, err:
1520
      raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
1521
    disks = [{}] * disk_max
1522
    for didx, ddict in opts.disks:
1523
      didx = int(didx)
1524
      if not isinstance(ddict, dict):
1525
        msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1526
        raise errors.OpPrereqError(msg)
1527
      elif "size" in ddict:
1528
        if "adopt" in ddict:
1529
          raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1530
                                     " (disk %d)" % didx)
1531
        try:
1532
          ddict["size"] = utils.ParseUnit(ddict["size"])
1533
        except ValueError, err:
1534
          raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1535
                                     (didx, err))
1536
      elif "adopt" in ddict:
1537
        if mode == constants.INSTANCE_IMPORT:
1538
          raise errors.OpPrereqError("Disk adoption not allowed for instance"
1539
                                     " import")
1540
        ddict["size"] = 0
1541
      else:
1542
        raise errors.OpPrereqError("Missing size or adoption source for"
1543
                                   " disk %d" % didx)
1544
      disks[didx] = ddict
1545

    
1546
  utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
1547
  utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1548

    
1549
  if mode == constants.INSTANCE_CREATE:
1550
    start = opts.start
1551
    os_type = opts.os
1552
    src_node = None
1553
    src_path = None
1554
  elif mode == constants.INSTANCE_IMPORT:
1555
    start = False
1556
    os_type = None
1557
    src_node = opts.src_node
1558
    src_path = opts.src_dir
1559
  else:
1560
    raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1561

    
1562
  op = opcodes.OpCreateInstance(instance_name=instance,
1563
                                disks=disks,
1564
                                disk_template=opts.disk_template,
1565
                                nics=nics,
1566
                                pnode=pnode, snode=snode,
1567
                                ip_check=opts.ip_check,
1568
                                name_check=opts.name_check,
1569
                                wait_for_sync=opts.wait_for_sync,
1570
                                file_storage_dir=opts.file_storage_dir,
1571
                                file_driver=opts.file_driver,
1572
                                iallocator=opts.iallocator,
1573
                                hypervisor=hypervisor,
1574
                                hvparams=hvparams,
1575
                                beparams=opts.beparams,
1576
                                mode=mode,
1577
                                start=start,
1578
                                os_type=os_type,
1579
                                src_node=src_node,
1580
                                src_path=src_path)
1581

    
1582
  SubmitOrSend(op, opts)
1583
  return 0
1584

    
1585

    
1586
class _RunWhileClusterStoppedHelper:
1587
  """Helper class for L{RunWhileClusterStopped} to simplify state management
1588

1589
  """
1590
  def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
1591
    """Initializes this class.
1592

1593
    @type feedback_fn: callable
1594
    @param feedback_fn: Feedback function
1595
    @type cluster_name: string
1596
    @param cluster_name: Cluster name
1597
    @type master_node: string
1598
    @param master_node Master node name
1599
    @type online_nodes: list
1600
    @param online_nodes: List of names of online nodes
1601

1602
    """
1603
    self.feedback_fn = feedback_fn
1604
    self.cluster_name = cluster_name
1605
    self.master_node = master_node
1606
    self.online_nodes = online_nodes
1607

    
1608
    self.ssh = ssh.SshRunner(self.cluster_name)
1609

    
1610
    self.nonmaster_nodes = [name for name in online_nodes
1611
                            if name != master_node]
1612

    
1613
    assert self.master_node not in self.nonmaster_nodes
1614

    
1615
  def _RunCmd(self, node_name, cmd):
1616
    """Runs a command on the local or a remote machine.
1617

1618
    @type node_name: string
1619
    @param node_name: Machine name
1620
    @type cmd: list
1621
    @param cmd: Command
1622

1623
    """
1624
    if node_name is None or node_name == self.master_node:
1625
      # No need to use SSH
1626
      result = utils.RunCmd(cmd)
1627
    else:
1628
      result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
1629

    
1630
    if result.failed:
1631
      errmsg = ["Failed to run command %s" % result.cmd]
1632
      if node_name:
1633
        errmsg.append("on node %s" % node_name)
1634
      errmsg.append(": exitcode %s and error %s" %
1635
                    (result.exit_code, result.output))
1636
      raise errors.OpExecError(" ".join(errmsg))
1637

    
1638
  def Call(self, fn, *args):
1639
    """Call function while all daemons are stopped.
1640

1641
    @type fn: callable
1642
    @param fn: Function to be called
1643

1644
    """
1645
    # Pause watcher by acquiring an exclusive lock on watcher state file
1646
    self.feedback_fn("Blocking watcher")
1647
    watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
1648
    try:
1649
      # TODO: Currently, this just blocks. There's no timeout.
1650
      # TODO: Should it be a shared lock?
1651
      watcher_block.Exclusive(blocking=True)
1652

    
1653
      # Stop master daemons, so that no new jobs can come in and all running
1654
      # ones are finished
1655
      self.feedback_fn("Stopping master daemons")
1656
      self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
1657
      try:
1658
        # Stop daemons on all nodes
1659
        for node_name in self.online_nodes:
1660
          self.feedback_fn("Stopping daemons on %s" % node_name)
1661
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
1662

    
1663
        # All daemons are shut down now
1664
        try:
1665
          return fn(self, *args)
1666
        except Exception:
1667
          logging.exception("Caught exception")
1668
          raise
1669
      finally:
1670
        # Start cluster again, master node last
1671
        for node_name in self.nonmaster_nodes + [self.master_node]:
1672
          self.feedback_fn("Starting daemons on %s" % node_name)
1673
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
1674
    finally:
1675
      # Resume watcher
1676
      watcher_block.Close()
1677

    
1678

    
1679
def RunWhileClusterStopped(feedback_fn, fn, *args):
1680
  """Calls a function while all cluster daemons are stopped.
1681

1682
  @type feedback_fn: callable
1683
  @param feedback_fn: Feedback function
1684
  @type fn: callable
1685
  @param fn: Function to be called when daemons are stopped
1686

1687
  """
1688
  feedback_fn("Gathering cluster information")
1689

    
1690
  # This ensures we're running on the master daemon
1691
  cl = GetClient()
1692

    
1693
  (cluster_name, master_node) = \
1694
    cl.QueryConfigValues(["cluster_name", "master_node"])
1695

    
1696
  online_nodes = GetOnlineNodes([], cl=cl)
1697

    
1698
  # Don't keep a reference to the client. The master daemon will go away.
1699
  del cl
1700

    
1701
  assert master_node in online_nodes
1702

    
1703
  return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
1704
                                       online_nodes).Call(fn, *args)
1705

    
1706

    
1707
def GenerateTable(headers, fields, separator, data,
1708
                  numfields=None, unitfields=None,
1709
                  units=None):
1710
  """Prints a table with headers and different fields.
1711

1712
  @type headers: dict
1713
  @param headers: dictionary mapping field names to headers for
1714
      the table
1715
  @type fields: list
1716
  @param fields: the field names corresponding to each row in
1717
      the data field
1718
  @param separator: the separator to be used; if this is None,
1719
      the default 'smart' algorithm is used which computes optimal
1720
      field width, otherwise just the separator is used between
1721
      each field
1722
  @type data: list
1723
  @param data: a list of lists, each sublist being one row to be output
1724
  @type numfields: list
1725
  @param numfields: a list with the fields that hold numeric
1726
      values and thus should be right-aligned
1727
  @type unitfields: list
1728
  @param unitfields: a list with the fields that hold numeric
1729
      values that should be formatted with the units field
1730
  @type units: string or None
1731
  @param units: the units we should use for formatting, or None for
1732
      automatic choice (human-readable for non-separator usage, otherwise
1733
      megabytes); this is a one-letter string
1734

1735
  """
1736
  if units is None:
1737
    if separator:
1738
      units = "m"
1739
    else:
1740
      units = "h"
1741

    
1742
  if numfields is None:
1743
    numfields = []
1744
  if unitfields is None:
1745
    unitfields = []
1746

    
1747
  numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
1748
  unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
1749

    
1750
  format_fields = []
1751
  for field in fields:
1752
    if headers and field not in headers:
1753
      # TODO: handle better unknown fields (either revert to old
1754
      # style of raising exception, or deal more intelligently with
1755
      # variable fields)
1756
      headers[field] = field
1757
    if separator is not None:
1758
      format_fields.append("%s")
1759
    elif numfields.Matches(field):
1760
      format_fields.append("%*s")
1761
    else:
1762
      format_fields.append("%-*s")
1763

    
1764
  if separator is None:
1765
    mlens = [0 for name in fields]
1766
    format = ' '.join(format_fields)
1767
  else:
1768
    format = separator.replace("%", "%%").join(format_fields)
1769

    
1770
  for row in data:
1771
    if row is None:
1772
      continue
1773
    for idx, val in enumerate(row):
1774
      if unitfields.Matches(fields[idx]):
1775
        try:
1776
          val = int(val)
1777
        except (TypeError, ValueError):
1778
          pass
1779
        else:
1780
          val = row[idx] = utils.FormatUnit(val, units)
1781
      val = row[idx] = str(val)
1782
      if separator is None:
1783
        mlens[idx] = max(mlens[idx], len(val))
1784

    
1785
  result = []
1786
  if headers:
1787
    args = []
1788
    for idx, name in enumerate(fields):
1789
      hdr = headers[name]
1790
      if separator is None:
1791
        mlens[idx] = max(mlens[idx], len(hdr))
1792
        args.append(mlens[idx])
1793
      args.append(hdr)
1794
    result.append(format % tuple(args))
1795

    
1796
  if separator is None:
1797
    assert len(mlens) == len(fields)
1798

    
1799
    if fields and not numfields.Matches(fields[-1]):
1800
      mlens[-1] = 0
1801

    
1802
  for line in data:
1803
    args = []
1804
    if line is None:
1805
      line = ['-' for _ in fields]
1806
    for idx in range(len(fields)):
1807
      if separator is None:
1808
        args.append(mlens[idx])
1809
      args.append(line[idx])
1810
    result.append(format % tuple(args))
1811

    
1812
  return result
1813

    
1814

    
1815
def FormatTimestamp(ts):
1816
  """Formats a given timestamp.
1817

1818
  @type ts: timestamp
1819
  @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
1820

1821
  @rtype: string
1822
  @return: a string with the formatted timestamp
1823

1824
  """
1825
  if not isinstance (ts, (tuple, list)) or len(ts) != 2:
1826
    return '?'
1827
  sec, usec = ts
1828
  return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
1829

    
1830

    
1831
def ParseTimespec(value):
1832
  """Parse a time specification.
1833

1834
  The following suffixed will be recognized:
1835

1836
    - s: seconds
1837
    - m: minutes
1838
    - h: hours
1839
    - d: day
1840
    - w: weeks
1841

1842
  Without any suffix, the value will be taken to be in seconds.
1843

1844
  """
1845
  value = str(value)
1846
  if not value:
1847
    raise errors.OpPrereqError("Empty time specification passed")
1848
  suffix_map = {
1849
    's': 1,
1850
    'm': 60,
1851
    'h': 3600,
1852
    'd': 86400,
1853
    'w': 604800,
1854
    }
1855
  if value[-1] not in suffix_map:
1856
    try:
1857
      value = int(value)
1858
    except (TypeError, ValueError):
1859
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1860
  else:
1861
    multiplier = suffix_map[value[-1]]
1862
    value = value[:-1]
1863
    if not value: # no data left after stripping the suffix
1864
      raise errors.OpPrereqError("Invalid time specification (only"
1865
                                 " suffix passed)")
1866
    try:
1867
      value = int(value) * multiplier
1868
    except (TypeError, ValueError):
1869
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1870
  return value
1871

    
1872

    
1873
def GetOnlineNodes(nodes, cl=None, nowarn=False):
1874
  """Returns the names of online nodes.
1875

1876
  This function will also log a warning on stderr with the names of
1877
  the online nodes.
1878

1879
  @param nodes: if not empty, use only this subset of nodes (minus the
1880
      offline ones)
1881
  @param cl: if not None, luxi client to use
1882
  @type nowarn: boolean
1883
  @param nowarn: by default, this function will output a note with the
1884
      offline nodes that are skipped; if this parameter is True the
1885
      note is not displayed
1886

1887
  """
1888
  if cl is None:
1889
    cl = GetClient()
1890

    
1891
  result = cl.QueryNodes(names=nodes, fields=["name", "offline"],
1892
                         use_locking=False)
1893
  offline = [row[0] for row in result if row[1]]
1894
  if offline and not nowarn:
1895
    ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
1896
  return [row[0] for row in result if not row[1]]
1897

    
1898

    
1899
def _ToStream(stream, txt, *args):
1900
  """Write a message to a stream, bypassing the logging system
1901

1902
  @type stream: file object
1903
  @param stream: the file to which we should write
1904
  @type txt: str
1905
  @param txt: the message
1906

1907
  """
1908
  if args:
1909
    args = tuple(args)
1910
    stream.write(txt % args)
1911
  else:
1912
    stream.write(txt)
1913
  stream.write('\n')
1914
  stream.flush()
1915

    
1916

    
1917
def ToStdout(txt, *args):
1918
  """Write a message to stdout only, bypassing the logging system
1919

1920
  This is just a wrapper over _ToStream.
1921

1922
  @type txt: str
1923
  @param txt: the message
1924

1925
  """
1926
  _ToStream(sys.stdout, txt, *args)
1927

    
1928

    
1929
def ToStderr(txt, *args):
1930
  """Write a message to stderr only, bypassing the logging system
1931

1932
  This is just a wrapper over _ToStream.
1933

1934
  @type txt: str
1935
  @param txt: the message
1936

1937
  """
1938
  _ToStream(sys.stderr, txt, *args)
1939

    
1940

    
1941
class JobExecutor(object):
1942
  """Class which manages the submission and execution of multiple jobs.
1943

1944
  Note that instances of this class should not be reused between
1945
  GetResults() calls.
1946

1947
  """
1948
  def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
1949
    self.queue = []
1950
    if cl is None:
1951
      cl = GetClient()
1952
    self.cl = cl
1953
    self.verbose = verbose
1954
    self.jobs = []
1955
    self.opts = opts
1956
    self.feedback_fn = feedback_fn
1957

    
1958
  def QueueJob(self, name, *ops):
1959
    """Record a job for later submit.
1960

1961
    @type name: string
1962
    @param name: a description of the job, will be used in WaitJobSet
1963
    """
1964
    SetGenericOpcodeOpts(ops, self.opts)
1965
    self.queue.append((name, ops))
1966

    
1967
  def SubmitPending(self):
1968
    """Submit all pending jobs.
1969

1970
    """
1971
    results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
1972
    for (idx, ((status, data), (name, _))) in enumerate(zip(results,
1973
                                                            self.queue)):
1974
      self.jobs.append((idx, status, data, name))
1975

    
1976
  def _ChooseJob(self):
1977
    """Choose a non-waiting/queued job to poll next.
1978

1979
    """
1980
    assert self.jobs, "_ChooseJob called with empty job list"
1981

    
1982
    result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
1983
    assert result
1984

    
1985
    for job_data, status in zip(self.jobs, result):
1986
      if status[0] in (constants.JOB_STATUS_QUEUED,
1987
                    constants.JOB_STATUS_WAITLOCK,
1988
                    constants.JOB_STATUS_CANCELING):
1989
        # job is still waiting
1990
        continue
1991
      # good candidate found
1992
      self.jobs.remove(job_data)
1993
      return job_data
1994

    
1995
    # no job found
1996
    return self.jobs.pop(0)
1997

    
1998
  def GetResults(self):
1999
    """Wait for and return the results of all jobs.
2000

2001
    @rtype: list
2002
    @return: list of tuples (success, job results), in the same order
2003
        as the submitted jobs; if a job has failed, instead of the result
2004
        there will be the error message
2005

2006
    """
2007
    if not self.jobs:
2008
      self.SubmitPending()
2009
    results = []
2010
    if self.verbose:
2011
      ok_jobs = [row[2] for row in self.jobs if row[1]]
2012
      if ok_jobs:
2013
        ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2014

    
2015
    # first, remove any non-submitted jobs
2016
    self.jobs, failures = utils.partition(self.jobs, lambda x: x[1])
2017
    for idx, _, jid, name in failures:
2018
      ToStderr("Failed to submit job for %s: %s", name, jid)
2019
      results.append((idx, False, jid))
2020

    
2021
    while self.jobs:
2022
      (idx, _, jid, name) = self._ChooseJob()
2023
      ToStdout("Waiting for job %s for %s...", jid, name)
2024
      try:
2025
        job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2026
        success = True
2027
      except (errors.GenericError, luxi.ProtocolError), err:
2028
        _, job_result = FormatError(err)
2029
        success = False
2030
        # the error message will always be shown, verbose or not
2031
        ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
2032

    
2033
      results.append((idx, success, job_result))
2034

    
2035
    # sort based on the index, then drop it
2036
    results.sort()
2037
    results = [i[1:] for i in results]
2038

    
2039
    return results
2040

    
2041
  def WaitOrShow(self, wait):
2042
    """Wait for job results or only print the job IDs.
2043

2044
    @type wait: boolean
2045
    @param wait: whether to wait or not
2046

2047
    """
2048
    if wait:
2049
      return self.GetResults()
2050
    else:
2051
      if not self.jobs:
2052
        self.SubmitPending()
2053
      for status, result, name in self.jobs:
2054
        if status:
2055
          ToStdout("%s: %s", result, name)
2056
        else:
2057
          ToStderr("Failure for %s: %s", name, result)