Statistics
| Branch: | Tag: | Revision:

root / lib / cli.py @ 6b7d5878

History | View | Annotate | Download (66.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module dealing with command line parsing"""
23

    
24

    
25
import sys
26
import textwrap
27
import os.path
28
import time
29
import logging
30
from cStringIO import StringIO
31

    
32
from ganeti import utils
33
from ganeti import errors
34
from ganeti import constants
35
from ganeti import opcodes
36
from ganeti import luxi
37
from ganeti import ssconf
38
from ganeti import rpc
39
from ganeti import ssh
40

    
41
from optparse import (OptionParser, TitledHelpFormatter,
42
                      Option, OptionValueError)
43

    
44

    
45
__all__ = [
46
  # Command line options
47
  "ALLOCATABLE_OPT",
48
  "ALL_OPT",
49
  "AUTO_PROMOTE_OPT",
50
  "AUTO_REPLACE_OPT",
51
  "BACKEND_OPT",
52
  "CLEANUP_OPT",
53
  "CONFIRM_OPT",
54
  "CP_SIZE_OPT",
55
  "DEBUG_OPT",
56
  "DEBUG_SIMERR_OPT",
57
  "DISKIDX_OPT",
58
  "DISK_OPT",
59
  "DISK_TEMPLATE_OPT",
60
  "DRAINED_OPT",
61
  "EARLY_RELEASE_OPT",
62
  "ENABLED_HV_OPT",
63
  "ERROR_CODES_OPT",
64
  "FIELDS_OPT",
65
  "FILESTORE_DIR_OPT",
66
  "FILESTORE_DRIVER_OPT",
67
  "FORCE_OPT",
68
  "FORCE_VARIANT_OPT",
69
  "GLOBAL_FILEDIR_OPT",
70
  "HVLIST_OPT",
71
  "HVOPTS_OPT",
72
  "HYPERVISOR_OPT",
73
  "IALLOCATOR_OPT",
74
  "IGNORE_CONSIST_OPT",
75
  "IGNORE_FAILURES_OPT",
76
  "IGNORE_SECONDARIES_OPT",
77
  "IGNORE_SIZE_OPT",
78
  "MAC_PREFIX_OPT",
79
  "MASTER_NETDEV_OPT",
80
  "MC_OPT",
81
  "NET_OPT",
82
  "NEW_CLUSTER_CERT_OPT",
83
  "NEW_CONFD_HMAC_KEY_OPT",
84
  "NEW_RAPI_CERT_OPT",
85
  "NEW_SECONDARY_OPT",
86
  "NIC_PARAMS_OPT",
87
  "NODE_LIST_OPT",
88
  "NODE_PLACEMENT_OPT",
89
  "NOHDR_OPT",
90
  "NOIPCHECK_OPT",
91
  "NONAMECHECK_OPT",
92
  "NOLVM_STORAGE_OPT",
93
  "NOMODIFY_ETCHOSTS_OPT",
94
  "NOMODIFY_SSH_SETUP_OPT",
95
  "NONICS_OPT",
96
  "NONLIVE_OPT",
97
  "NONPLUS1_OPT",
98
  "NOSHUTDOWN_OPT",
99
  "NOSTART_OPT",
100
  "NOSSH_KEYCHECK_OPT",
101
  "NOVOTING_OPT",
102
  "NWSYNC_OPT",
103
  "ON_PRIMARY_OPT",
104
  "ON_SECONDARY_OPT",
105
  "OFFLINE_OPT",
106
  "OS_OPT",
107
  "OS_SIZE_OPT",
108
  "RAPI_CERT_OPT",
109
  "READD_OPT",
110
  "REBOOT_TYPE_OPT",
111
  "SECONDARY_IP_OPT",
112
  "SELECT_OS_OPT",
113
  "SEP_OPT",
114
  "SHOWCMD_OPT",
115
  "SHUTDOWN_TIMEOUT_OPT",
116
  "SINGLE_NODE_OPT",
117
  "SRC_DIR_OPT",
118
  "SRC_NODE_OPT",
119
  "SUBMIT_OPT",
120
  "STATIC_OPT",
121
  "SYNC_OPT",
122
  "TAG_SRC_OPT",
123
  "TIMEOUT_OPT",
124
  "USEUNITS_OPT",
125
  "VERBOSE_OPT",
126
  "VG_NAME_OPT",
127
  "YES_DOIT_OPT",
128
  # Generic functions for CLI programs
129
  "GenericMain",
130
  "GenericInstanceCreate",
131
  "GetClient",
132
  "GetOnlineNodes",
133
  "JobExecutor",
134
  "JobSubmittedException",
135
  "ParseTimespec",
136
  "RunWhileClusterStopped",
137
  "SubmitOpCode",
138
  "SubmitOrSend",
139
  "UsesRPC",
140
  # Formatting functions
141
  "ToStderr", "ToStdout",
142
  "FormatError",
143
  "GenerateTable",
144
  "AskUser",
145
  "FormatTimestamp",
146
  # Tags functions
147
  "ListTags",
148
  "AddTags",
149
  "RemoveTags",
150
  # command line options support infrastructure
151
  "ARGS_MANY_INSTANCES",
152
  "ARGS_MANY_NODES",
153
  "ARGS_NONE",
154
  "ARGS_ONE_INSTANCE",
155
  "ARGS_ONE_NODE",
156
  "ARGS_ONE_OS",
157
  "ArgChoice",
158
  "ArgCommand",
159
  "ArgFile",
160
  "ArgHost",
161
  "ArgInstance",
162
  "ArgJobId",
163
  "ArgNode",
164
  "ArgOs",
165
  "ArgSuggest",
166
  "ArgUnknown",
167
  "OPT_COMPL_INST_ADD_NODES",
168
  "OPT_COMPL_MANY_NODES",
169
  "OPT_COMPL_ONE_IALLOCATOR",
170
  "OPT_COMPL_ONE_INSTANCE",
171
  "OPT_COMPL_ONE_NODE",
172
  "OPT_COMPL_ONE_OS",
173
  "cli_option",
174
  "SplitNodeOption",
175
  "CalculateOSNames",
176
  ]
177

    
178
NO_PREFIX = "no_"
179
UN_PREFIX = "-"
180

    
181

    
182
class _Argument:
183
  def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
184
    self.min = min
185
    self.max = max
186

    
187
  def __repr__(self):
188
    return ("<%s min=%s max=%s>" %
189
            (self.__class__.__name__, self.min, self.max))
190

    
191

    
192
class ArgSuggest(_Argument):
193
  """Suggesting argument.
194

195
  Value can be any of the ones passed to the constructor.
196

197
  """
198
  # pylint: disable-msg=W0622
199
  def __init__(self, min=0, max=None, choices=None):
200
    _Argument.__init__(self, min=min, max=max)
201
    self.choices = choices
202

    
203
  def __repr__(self):
204
    return ("<%s min=%s max=%s choices=%r>" %
205
            (self.__class__.__name__, self.min, self.max, self.choices))
206

    
207

    
208
class ArgChoice(ArgSuggest):
209
  """Choice argument.
210

211
  Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
212
  but value must be one of the choices.
213

214
  """
215

    
216

    
217
class ArgUnknown(_Argument):
218
  """Unknown argument to program (e.g. determined at runtime).
219

220
  """
221

    
222

    
223
class ArgInstance(_Argument):
224
  """Instances argument.
225

226
  """
227

    
228

    
229
class ArgNode(_Argument):
230
  """Node argument.
231

232
  """
233

    
234
class ArgJobId(_Argument):
235
  """Job ID argument.
236

237
  """
238

    
239

    
240
class ArgFile(_Argument):
241
  """File path argument.
242

243
  """
244

    
245

    
246
class ArgCommand(_Argument):
247
  """Command argument.
248

249
  """
250

    
251

    
252
class ArgHost(_Argument):
253
  """Host argument.
254

255
  """
256

    
257

    
258
class ArgOs(_Argument):
259
  """OS argument.
260

261
  """
262

    
263

    
264
ARGS_NONE = []
265
ARGS_MANY_INSTANCES = [ArgInstance()]
266
ARGS_MANY_NODES = [ArgNode()]
267
ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
268
ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
269
ARGS_ONE_OS = [ArgOs(min=1, max=1)]
270

    
271

    
272
def _ExtractTagsObject(opts, args):
273
  """Extract the tag type object.
274

275
  Note that this function will modify its args parameter.
276

277
  """
278
  if not hasattr(opts, "tag_type"):
279
    raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
280
  kind = opts.tag_type
281
  if kind == constants.TAG_CLUSTER:
282
    retval = kind, kind
283
  elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
284
    if not args:
285
      raise errors.OpPrereqError("no arguments passed to the command")
286
    name = args.pop(0)
287
    retval = kind, name
288
  else:
289
    raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
290
  return retval
291

    
292

    
293
def _ExtendTags(opts, args):
294
  """Extend the args if a source file has been given.
295

296
  This function will extend the tags with the contents of the file
297
  passed in the 'tags_source' attribute of the opts parameter. A file
298
  named '-' will be replaced by stdin.
299

300
  """
301
  fname = opts.tags_source
302
  if fname is None:
303
    return
304
  if fname == "-":
305
    new_fh = sys.stdin
306
  else:
307
    new_fh = open(fname, "r")
308
  new_data = []
309
  try:
310
    # we don't use the nice 'new_data = [line.strip() for line in fh]'
311
    # because of python bug 1633941
312
    while True:
313
      line = new_fh.readline()
314
      if not line:
315
        break
316
      new_data.append(line.strip())
317
  finally:
318
    new_fh.close()
319
  args.extend(new_data)
320

    
321

    
322
def ListTags(opts, args):
323
  """List the tags on a given object.
324

325
  This is a generic implementation that knows how to deal with all
326
  three cases of tag objects (cluster, node, instance). The opts
327
  argument is expected to contain a tag_type field denoting what
328
  object type we work on.
329

330
  """
331
  kind, name = _ExtractTagsObject(opts, args)
332
  cl = GetClient()
333
  result = cl.QueryTags(kind, name)
334
  result = list(result)
335
  result.sort()
336
  for tag in result:
337
    ToStdout(tag)
338

    
339

    
340
def AddTags(opts, args):
341
  """Add tags on a given object.
342

343
  This is a generic implementation that knows how to deal with all
344
  three cases of tag objects (cluster, node, instance). The opts
345
  argument is expected to contain a tag_type field denoting what
346
  object type we work on.
347

348
  """
349
  kind, name = _ExtractTagsObject(opts, args)
350
  _ExtendTags(opts, args)
351
  if not args:
352
    raise errors.OpPrereqError("No tags to be added")
353
  op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
354
  SubmitOpCode(op)
355

    
356

    
357
def RemoveTags(opts, args):
358
  """Remove tags from a given object.
359

360
  This is a generic implementation that knows how to deal with all
361
  three cases of tag objects (cluster, node, instance). The opts
362
  argument is expected to contain a tag_type field denoting what
363
  object type we work on.
364

365
  """
366
  kind, name = _ExtractTagsObject(opts, args)
367
  _ExtendTags(opts, args)
368
  if not args:
369
    raise errors.OpPrereqError("No tags to be removed")
370
  op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
371
  SubmitOpCode(op)
372

    
373

    
374
def check_unit(option, opt, value): # pylint: disable-msg=W0613
375
  """OptParsers custom converter for units.
376

377
  """
378
  try:
379
    return utils.ParseUnit(value)
380
  except errors.UnitParseError, err:
381
    raise OptionValueError("option %s: %s" % (opt, err))
382

    
383

    
384
def _SplitKeyVal(opt, data):
385
  """Convert a KeyVal string into a dict.
386

387
  This function will convert a key=val[,...] string into a dict. Empty
388
  values will be converted specially: keys which have the prefix 'no_'
389
  will have the value=False and the prefix stripped, the others will
390
  have value=True.
391

392
  @type opt: string
393
  @param opt: a string holding the option name for which we process the
394
      data, used in building error messages
395
  @type data: string
396
  @param data: a string of the format key=val,key=val,...
397
  @rtype: dict
398
  @return: {key=val, key=val}
399
  @raises errors.ParameterError: if there are duplicate keys
400

401
  """
402
  kv_dict = {}
403
  if data:
404
    for elem in utils.UnescapeAndSplit(data, sep=","):
405
      if "=" in elem:
406
        key, val = elem.split("=", 1)
407
      else:
408
        if elem.startswith(NO_PREFIX):
409
          key, val = elem[len(NO_PREFIX):], False
410
        elif elem.startswith(UN_PREFIX):
411
          key, val = elem[len(UN_PREFIX):], None
412
        else:
413
          key, val = elem, True
414
      if key in kv_dict:
415
        raise errors.ParameterError("Duplicate key '%s' in option %s" %
416
                                    (key, opt))
417
      kv_dict[key] = val
418
  return kv_dict
419

    
420

    
421
def check_ident_key_val(option, opt, value):  # pylint: disable-msg=W0613
422
  """Custom parser for ident:key=val,key=val options.
423

424
  This will store the parsed values as a tuple (ident, {key: val}). As such,
425
  multiple uses of this option via action=append is possible.
426

427
  """
428
  if ":" not in value:
429
    ident, rest = value, ''
430
  else:
431
    ident, rest = value.split(":", 1)
432

    
433
  if ident.startswith(NO_PREFIX):
434
    if rest:
435
      msg = "Cannot pass options when removing parameter groups: %s" % value
436
      raise errors.ParameterError(msg)
437
    retval = (ident[len(NO_PREFIX):], False)
438
  elif ident.startswith(UN_PREFIX):
439
    if rest:
440
      msg = "Cannot pass options when removing parameter groups: %s" % value
441
      raise errors.ParameterError(msg)
442
    retval = (ident[len(UN_PREFIX):], None)
443
  else:
444
    kv_dict = _SplitKeyVal(opt, rest)
445
    retval = (ident, kv_dict)
446
  return retval
447

    
448

    
449
def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
450
  """Custom parser class for key=val,key=val options.
451

452
  This will store the parsed values as a dict {key: val}.
453

454
  """
455
  return _SplitKeyVal(opt, value)
456

    
457

    
458
# completion_suggestion is normally a list. Using numeric values not evaluating
459
# to False for dynamic completion.
460
(OPT_COMPL_MANY_NODES,
461
 OPT_COMPL_ONE_NODE,
462
 OPT_COMPL_ONE_INSTANCE,
463
 OPT_COMPL_ONE_OS,
464
 OPT_COMPL_ONE_IALLOCATOR,
465
 OPT_COMPL_INST_ADD_NODES) = range(100, 106)
466

    
467
OPT_COMPL_ALL = frozenset([
468
  OPT_COMPL_MANY_NODES,
469
  OPT_COMPL_ONE_NODE,
470
  OPT_COMPL_ONE_INSTANCE,
471
  OPT_COMPL_ONE_OS,
472
  OPT_COMPL_ONE_IALLOCATOR,
473
  OPT_COMPL_INST_ADD_NODES,
474
  ])
475

    
476

    
477
class CliOption(Option):
478
  """Custom option class for optparse.
479

480
  """
481
  ATTRS = Option.ATTRS + [
482
    "completion_suggest",
483
    ]
484
  TYPES = Option.TYPES + (
485
    "identkeyval",
486
    "keyval",
487
    "unit",
488
    )
489
  TYPE_CHECKER = Option.TYPE_CHECKER.copy()
490
  TYPE_CHECKER["identkeyval"] = check_ident_key_val
491
  TYPE_CHECKER["keyval"] = check_key_val
492
  TYPE_CHECKER["unit"] = check_unit
493

    
494

    
495
# optparse.py sets make_option, so we do it for our own option class, too
496
cli_option = CliOption
497

    
498

    
499
_YESNO = ("yes", "no")
500
_YORNO = "yes|no"
501

    
502
DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
503
                       help="Increase debugging level")
504

    
505
NOHDR_OPT = cli_option("--no-headers", default=False,
506
                       action="store_true", dest="no_headers",
507
                       help="Don't display column headers")
508

    
509
SEP_OPT = cli_option("--separator", default=None,
510
                     action="store", dest="separator",
511
                     help=("Separator between output fields"
512
                           " (defaults to one space)"))
513

    
514
USEUNITS_OPT = cli_option("--units", default=None,
515
                          dest="units", choices=('h', 'm', 'g', 't'),
516
                          help="Specify units for output (one of hmgt)")
517

    
518
FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
519
                        type="string", metavar="FIELDS",
520
                        help="Comma separated list of output fields")
521

    
522
FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
523
                       default=False, help="Force the operation")
524

    
525
CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
526
                         default=False, help="Do not require confirmation")
527

    
528
TAG_SRC_OPT = cli_option("--from", dest="tags_source",
529
                         default=None, help="File with tag names")
530

    
531
SUBMIT_OPT = cli_option("--submit", dest="submit_only",
532
                        default=False, action="store_true",
533
                        help=("Submit the job and return the job ID, but"
534
                              " don't wait for the job to finish"))
535

    
536
SYNC_OPT = cli_option("--sync", dest="do_locking",
537
                      default=False, action="store_true",
538
                      help=("Grab locks while doing the queries"
539
                            " in order to ensure more consistent results"))
540

    
541
_DRY_RUN_OPT = cli_option("--dry-run", default=False,
542
                          action="store_true",
543
                          help=("Do not execute the operation, just run the"
544
                                " check steps and verify it it could be"
545
                                " executed"))
546

    
547
VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
548
                         action="store_true",
549
                         help="Increase the verbosity of the operation")
550

    
551
DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
552
                              action="store_true", dest="simulate_errors",
553
                              help="Debugging option that makes the operation"
554
                              " treat most runtime checks as failed")
555

    
556
NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
557
                        default=True, action="store_false",
558
                        help="Don't wait for sync (DANGEROUS!)")
559

    
560
DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
561
                               help="Custom disk setup (diskless, file,"
562
                               " plain or drbd)",
563
                               default=None, metavar="TEMPL",
564
                               choices=list(constants.DISK_TEMPLATES))
565

    
566
NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
567
                        help="Do not create any network cards for"
568
                        " the instance")
569

    
570
FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
571
                               help="Relative path under default cluster-wide"
572
                               " file storage dir to store file-based disks",
573
                               default=None, metavar="<DIR>")
574

    
575
FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
576
                                  help="Driver to use for image files",
577
                                  default="loop", metavar="<DRIVER>",
578
                                  choices=list(constants.FILE_DRIVER))
579

    
580
IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
581
                            help="Select nodes for the instance automatically"
582
                            " using the <NAME> iallocator plugin",
583
                            default=None, type="string",
584
                            completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
585

    
586
OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
587
                    metavar="<os>",
588
                    completion_suggest=OPT_COMPL_ONE_OS)
589

    
590
FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
591
                               action="store_true", default=False,
592
                               help="Force an unknown variant")
593

    
594
BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
595
                         type="keyval", default={},
596
                         help="Backend parameters")
597

    
598
HVOPTS_OPT =  cli_option("-H", "--hypervisor-parameters", type="keyval",
599
                         default={}, dest="hvparams",
600
                         help="Hypervisor parameters")
601

    
602
HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
603
                            help="Hypervisor and hypervisor options, in the"
604
                            " format hypervisor:option=value,option=value,...",
605
                            default=None, type="identkeyval")
606

    
607
HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
608
                        help="Hypervisor and hypervisor options, in the"
609
                        " format hypervisor:option=value,option=value,...",
610
                        default=[], action="append", type="identkeyval")
611

    
612
NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
613
                           action="store_false",
614
                           help="Don't check that the instance's IP"
615
                           " is alive")
616

    
617
NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
618
                             default=True, action="store_false",
619
                             help="Don't check that the instance's name"
620
                             " is resolvable")
621

    
622
NET_OPT = cli_option("--net",
623
                     help="NIC parameters", default=[],
624
                     dest="nics", action="append", type="identkeyval")
625

    
626
DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
627
                      dest="disks", action="append", type="identkeyval")
628

    
629
DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
630
                         help="Comma-separated list of disks"
631
                         " indices to act on (e.g. 0,2) (optional,"
632
                         " defaults to all disks)")
633

    
634
OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
635
                         help="Enforces a single-disk configuration using the"
636
                         " given disk size, in MiB unless a suffix is used",
637
                         default=None, type="unit", metavar="<size>")
638

    
639
IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
640
                                dest="ignore_consistency",
641
                                action="store_true", default=False,
642
                                help="Ignore the consistency of the disks on"
643
                                " the secondary")
644

    
645
NONLIVE_OPT = cli_option("--non-live", dest="live",
646
                         default=True, action="store_false",
647
                         help="Do a non-live migration (this usually means"
648
                         " freeze the instance, save the state, transfer and"
649
                         " only then resume running on the secondary node)")
650

    
651
NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
652
                                help="Target node and optional secondary node",
653
                                metavar="<pnode>[:<snode>]",
654
                                completion_suggest=OPT_COMPL_INST_ADD_NODES)
655

    
656
NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
657
                           action="append", metavar="<node>",
658
                           help="Use only this node (can be used multiple"
659
                           " times, if not given defaults to all nodes)",
660
                           completion_suggest=OPT_COMPL_ONE_NODE)
661

    
662
SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
663
                             metavar="<node>",
664
                             completion_suggest=OPT_COMPL_ONE_NODE)
665

    
666
NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
667
                         action="store_false",
668
                         help="Don't start the instance after creation")
669

    
670
SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
671
                         action="store_true", default=False,
672
                         help="Show command instead of executing it")
673

    
674
CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
675
                         default=False, action="store_true",
676
                         help="Instead of performing the migration, try to"
677
                         " recover from a failed cleanup. This is safe"
678
                         " to run even if the instance is healthy, but it"
679
                         " will create extra replication traffic and "
680
                         " disrupt briefly the replication (like during the"
681
                         " migration")
682

    
683
STATIC_OPT = cli_option("-s", "--static", dest="static",
684
                        action="store_true", default=False,
685
                        help="Only show configuration data, not runtime data")
686

    
687
ALL_OPT = cli_option("--all", dest="show_all",
688
                     default=False, action="store_true",
689
                     help="Show info on all instances on the cluster."
690
                     " This can take a long time to run, use wisely")
691

    
692
SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
693
                           action="store_true", default=False,
694
                           help="Interactive OS reinstall, lists available"
695
                           " OS templates for selection")
696

    
697
IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
698
                                 action="store_true", default=False,
699
                                 help="Remove the instance from the cluster"
700
                                 " configuration even if there are failures"
701
                                 " during the removal process")
702

    
703
NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
704
                               help="Specifies the new secondary node",
705
                               metavar="NODE", default=None,
706
                               completion_suggest=OPT_COMPL_ONE_NODE)
707

    
708
ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
709
                            default=False, action="store_true",
710
                            help="Replace the disk(s) on the primary"
711
                            " node (only for the drbd template)")
712

    
713
ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
714
                              default=False, action="store_true",
715
                              help="Replace the disk(s) on the secondary"
716
                              " node (only for the drbd template)")
717

    
718
AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
719
                              default=False, action="store_true",
720
                              help="Lock all nodes and auto-promote as needed"
721
                              " to MC status")
722

    
723
AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
724
                              default=False, action="store_true",
725
                              help="Automatically replace faulty disks"
726
                              " (only for the drbd template)")
727

    
728
IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
729
                             default=False, action="store_true",
730
                             help="Ignore current recorded size"
731
                             " (useful for forcing activation when"
732
                             " the recorded size is wrong)")
733

    
734
SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
735
                          metavar="<node>",
736
                          completion_suggest=OPT_COMPL_ONE_NODE)
737

    
738
SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
739
                         metavar="<dir>")
740

    
741
SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
742
                              help="Specify the secondary ip for the node",
743
                              metavar="ADDRESS", default=None)
744

    
745
READD_OPT = cli_option("--readd", dest="readd",
746
                       default=False, action="store_true",
747
                       help="Readd old node after replacing it")
748

    
749
NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
750
                                default=True, action="store_false",
751
                                help="Disable SSH key fingerprint checking")
752

    
753

    
754
MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
755
                    choices=_YESNO, default=None, metavar=_YORNO,
756
                    help="Set the master_candidate flag on the node")
757

    
758
OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
759
                         choices=_YESNO, default=None,
760
                         help="Set the offline flag on the node")
761

    
762
DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
763
                         choices=_YESNO, default=None,
764
                         help="Set the drained flag on the node")
765

    
766
ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
767
                             choices=_YESNO, default=None, metavar=_YORNO,
768
                             help="Set the allocatable flag on a volume")
769

    
770
NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
771
                               help="Disable support for lvm based instances"
772
                               " (cluster-wide)",
773
                               action="store_false", default=True)
774

    
775
ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
776
                            dest="enabled_hypervisors",
777
                            help="Comma-separated list of hypervisors",
778
                            type="string", default=None)
779

    
780
NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
781
                            type="keyval", default={},
782
                            help="NIC parameters")
783

    
784
CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
785
                         dest="candidate_pool_size", type="int",
786
                         help="Set the candidate pool size")
787

    
788
VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name",
789
                         help="Enables LVM and specifies the volume group"
790
                         " name (cluster-wide) for disk allocation [xenvg]",
791
                         metavar="VG", default=None)
792

    
793
YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
794
                          help="Destroy cluster", action="store_true")
795

    
796
NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
797
                          help="Skip node agreement check (dangerous)",
798
                          action="store_true", default=False)
799

    
800
MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
801
                            help="Specify the mac prefix for the instance IP"
802
                            " addresses, in the format XX:XX:XX",
803
                            metavar="PREFIX",
804
                            default=None)
805

    
806
MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
807
                               help="Specify the node interface (cluster-wide)"
808
                               " on which the master IP address will be added "
809
                               " [%s]" % constants.DEFAULT_BRIDGE,
810
                               metavar="NETDEV",
811
                               default=constants.DEFAULT_BRIDGE)
812

    
813

    
814
GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
815
                                help="Specify the default directory (cluster-"
816
                                "wide) for storing the file-based disks [%s]" %
817
                                constants.DEFAULT_FILE_STORAGE_DIR,
818
                                metavar="DIR",
819
                                default=constants.DEFAULT_FILE_STORAGE_DIR)
820

    
821
NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
822
                                   help="Don't modify /etc/hosts",
823
                                   action="store_false", default=True)
824

    
825
NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
826
                                    help="Don't initialize SSH keys",
827
                                    action="store_false", default=True)
828

    
829
ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
830
                             help="Enable parseable error messages",
831
                             action="store_true", default=False)
832

    
833
NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
834
                          help="Skip N+1 memory redundancy tests",
835
                          action="store_true", default=False)
836

    
837
REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
838
                             help="Type of reboot: soft/hard/full",
839
                             default=constants.INSTANCE_REBOOT_HARD,
840
                             metavar="<REBOOT>",
841
                             choices=list(constants.REBOOT_TYPES))
842

    
843
IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
844
                                    dest="ignore_secondaries",
845
                                    default=False, action="store_true",
846
                                    help="Ignore errors from secondaries")
847

    
848
NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
849
                            action="store_false", default=True,
850
                            help="Don't shutdown the instance (unsafe)")
851

    
852
TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
853
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
854
                         help="Maximum time to wait")
855

    
856
SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
857
                         dest="shutdown_timeout", type="int",
858
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
859
                         help="Maximum time to wait for instance shutdown")
860

    
861
EARLY_RELEASE_OPT = cli_option("--early-release",
862
                               dest="early_release", default=False,
863
                               action="store_true",
864
                               help="Release the locks on the secondary"
865
                               " node(s) early")
866

    
867
NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
868
                                  dest="new_cluster_cert",
869
                                  default=False, action="store_true",
870
                                  help="Generate a new cluster certificate")
871

    
872
RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
873
                           default=None,
874
                           help="File containing new RAPI certificate")
875

    
876
NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
877
                               default=None, action="store_true",
878
                               help=("Generate a new self-signed RAPI"
879
                                     " certificate"))
880

    
881
NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
882
                                    dest="new_confd_hmac_key",
883
                                    default=False, action="store_true",
884
                                    help=("Create a new HMAC key for %s" %
885
                                          constants.CONFD))
886

    
887

    
888
def _ParseArgs(argv, commands, aliases):
889
  """Parser for the command line arguments.
890

891
  This function parses the arguments and returns the function which
892
  must be executed together with its (modified) arguments.
893

894
  @param argv: the command line
895
  @param commands: dictionary with special contents, see the design
896
      doc for cmdline handling
897
  @param aliases: dictionary with command aliases {'alias': 'target, ...}
898

899
  """
900
  if len(argv) == 0:
901
    binary = "<command>"
902
  else:
903
    binary = argv[0].split("/")[-1]
904

    
905
  if len(argv) > 1 and argv[1] == "--version":
906
    ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
907
    # Quit right away. That way we don't have to care about this special
908
    # argument. optparse.py does it the same.
909
    sys.exit(0)
910

    
911
  if len(argv) < 2 or not (argv[1] in commands or
912
                           argv[1] in aliases):
913
    # let's do a nice thing
914
    sortedcmds = commands.keys()
915
    sortedcmds.sort()
916

    
917
    ToStdout("Usage: %s {command} [options...] [argument...]", binary)
918
    ToStdout("%s <command> --help to see details, or man %s", binary, binary)
919
    ToStdout("")
920

    
921
    # compute the max line length for cmd + usage
922
    mlen = max([len(" %s" % cmd) for cmd in commands])
923
    mlen = min(60, mlen) # should not get here...
924

    
925
    # and format a nice command list
926
    ToStdout("Commands:")
927
    for cmd in sortedcmds:
928
      cmdstr = " %s" % (cmd,)
929
      help_text = commands[cmd][4]
930
      help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
931
      ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
932
      for line in help_lines:
933
        ToStdout("%-*s   %s", mlen, "", line)
934

    
935
    ToStdout("")
936

    
937
    return None, None, None
938

    
939
  # get command, unalias it, and look it up in commands
940
  cmd = argv.pop(1)
941
  if cmd in aliases:
942
    if cmd in commands:
943
      raise errors.ProgrammerError("Alias '%s' overrides an existing"
944
                                   " command" % cmd)
945

    
946
    if aliases[cmd] not in commands:
947
      raise errors.ProgrammerError("Alias '%s' maps to non-existing"
948
                                   " command '%s'" % (cmd, aliases[cmd]))
949

    
950
    cmd = aliases[cmd]
951

    
952
  func, args_def, parser_opts, usage, description = commands[cmd]
953
  parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT, DEBUG_OPT],
954
                        description=description,
955
                        formatter=TitledHelpFormatter(),
956
                        usage="%%prog %s %s" % (cmd, usage))
957
  parser.disable_interspersed_args()
958
  options, args = parser.parse_args()
959

    
960
  if not _CheckArguments(cmd, args_def, args):
961
    return None, None, None
962

    
963
  return func, options, args
964

    
965

    
966
def _CheckArguments(cmd, args_def, args):
967
  """Verifies the arguments using the argument definition.
968

969
  Algorithm:
970

971
    1. Abort with error if values specified by user but none expected.
972

973
    1. For each argument in definition
974

975
      1. Keep running count of minimum number of values (min_count)
976
      1. Keep running count of maximum number of values (max_count)
977
      1. If it has an unlimited number of values
978

979
        1. Abort with error if it's not the last argument in the definition
980

981
    1. If last argument has limited number of values
982

983
      1. Abort with error if number of values doesn't match or is too large
984

985
    1. Abort with error if user didn't pass enough values (min_count)
986

987
  """
988
  if args and not args_def:
989
    ToStderr("Error: Command %s expects no arguments", cmd)
990
    return False
991

    
992
  min_count = None
993
  max_count = None
994
  check_max = None
995

    
996
  last_idx = len(args_def) - 1
997

    
998
  for idx, arg in enumerate(args_def):
999
    if min_count is None:
1000
      min_count = arg.min
1001
    elif arg.min is not None:
1002
      min_count += arg.min
1003

    
1004
    if max_count is None:
1005
      max_count = arg.max
1006
    elif arg.max is not None:
1007
      max_count += arg.max
1008

    
1009
    if idx == last_idx:
1010
      check_max = (arg.max is not None)
1011

    
1012
    elif arg.max is None:
1013
      raise errors.ProgrammerError("Only the last argument can have max=None")
1014

    
1015
  if check_max:
1016
    # Command with exact number of arguments
1017
    if (min_count is not None and max_count is not None and
1018
        min_count == max_count and len(args) != min_count):
1019
      ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
1020
      return False
1021

    
1022
    # Command with limited number of arguments
1023
    if max_count is not None and len(args) > max_count:
1024
      ToStderr("Error: Command %s expects only %d argument(s)",
1025
               cmd, max_count)
1026
      return False
1027

    
1028
  # Command with some required arguments
1029
  if min_count is not None and len(args) < min_count:
1030
    ToStderr("Error: Command %s expects at least %d argument(s)",
1031
             cmd, min_count)
1032
    return False
1033

    
1034
  return True
1035

    
1036

    
1037
def SplitNodeOption(value):
1038
  """Splits the value of a --node option.
1039

1040
  """
1041
  if value and ':' in value:
1042
    return value.split(':', 1)
1043
  else:
1044
    return (value, None)
1045

    
1046

    
1047
def CalculateOSNames(os_name, os_variants):
1048
  """Calculates all the names an OS can be called, according to its variants.
1049

1050
  @type os_name: string
1051
  @param os_name: base name of the os
1052
  @type os_variants: list or None
1053
  @param os_variants: list of supported variants
1054
  @rtype: list
1055
  @return: list of valid names
1056

1057
  """
1058
  if os_variants:
1059
    return ['%s+%s' % (os_name, v) for v in os_variants]
1060
  else:
1061
    return [os_name]
1062

    
1063

    
1064
def UsesRPC(fn):
1065
  def wrapper(*args, **kwargs):
1066
    rpc.Init()
1067
    try:
1068
      return fn(*args, **kwargs)
1069
    finally:
1070
      rpc.Shutdown()
1071
  return wrapper
1072

    
1073

    
1074
def AskUser(text, choices=None):
1075
  """Ask the user a question.
1076

1077
  @param text: the question to ask
1078

1079
  @param choices: list with elements tuples (input_char, return_value,
1080
      description); if not given, it will default to: [('y', True,
1081
      'Perform the operation'), ('n', False, 'Do no do the operation')];
1082
      note that the '?' char is reserved for help
1083

1084
  @return: one of the return values from the choices list; if input is
1085
      not possible (i.e. not running with a tty, we return the last
1086
      entry from the list
1087

1088
  """
1089
  if choices is None:
1090
    choices = [('y', True, 'Perform the operation'),
1091
               ('n', False, 'Do not perform the operation')]
1092
  if not choices or not isinstance(choices, list):
1093
    raise errors.ProgrammerError("Invalid choices argument to AskUser")
1094
  for entry in choices:
1095
    if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
1096
      raise errors.ProgrammerError("Invalid choices element to AskUser")
1097

    
1098
  answer = choices[-1][1]
1099
  new_text = []
1100
  for line in text.splitlines():
1101
    new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
1102
  text = "\n".join(new_text)
1103
  try:
1104
    f = file("/dev/tty", "a+")
1105
  except IOError:
1106
    return answer
1107
  try:
1108
    chars = [entry[0] for entry in choices]
1109
    chars[-1] = "[%s]" % chars[-1]
1110
    chars.append('?')
1111
    maps = dict([(entry[0], entry[1]) for entry in choices])
1112
    while True:
1113
      f.write(text)
1114
      f.write('\n')
1115
      f.write("/".join(chars))
1116
      f.write(": ")
1117
      line = f.readline(2).strip().lower()
1118
      if line in maps:
1119
        answer = maps[line]
1120
        break
1121
      elif line == '?':
1122
        for entry in choices:
1123
          f.write(" %s - %s\n" % (entry[0], entry[2]))
1124
        f.write("\n")
1125
        continue
1126
  finally:
1127
    f.close()
1128
  return answer
1129

    
1130

    
1131
class JobSubmittedException(Exception):
1132
  """Job was submitted, client should exit.
1133

1134
  This exception has one argument, the ID of the job that was
1135
  submitted. The handler should print this ID.
1136

1137
  This is not an error, just a structured way to exit from clients.
1138

1139
  """
1140

    
1141

    
1142
def SendJob(ops, cl=None):
1143
  """Function to submit an opcode without waiting for the results.
1144

1145
  @type ops: list
1146
  @param ops: list of opcodes
1147
  @type cl: luxi.Client
1148
  @param cl: the luxi client to use for communicating with the master;
1149
             if None, a new client will be created
1150

1151
  """
1152
  if cl is None:
1153
    cl = GetClient()
1154

    
1155
  job_id = cl.SubmitJob(ops)
1156

    
1157
  return job_id
1158

    
1159

    
1160
def PollJob(job_id, cl=None, feedback_fn=None):
1161
  """Function to poll for the result of a job.
1162

1163
  @type job_id: job identified
1164
  @param job_id: the job to poll for results
1165
  @type cl: luxi.Client
1166
  @param cl: the luxi client to use for communicating with the master;
1167
             if None, a new client will be created
1168

1169
  """
1170
  if cl is None:
1171
    cl = GetClient()
1172

    
1173
  prev_job_info = None
1174
  prev_logmsg_serial = None
1175

    
1176
  status = None
1177

    
1178
  notified_queued = False
1179
  notified_waitlock = False
1180

    
1181
  while True:
1182
    result = cl.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1183
                                     prev_logmsg_serial)
1184
    if not result:
1185
      # job not found, go away!
1186
      raise errors.JobLost("Job with id %s lost" % job_id)
1187
    elif result == constants.JOB_NOTCHANGED:
1188
      if status is not None and not callable(feedback_fn):
1189
        if status == constants.JOB_STATUS_QUEUED and not notified_queued:
1190
          ToStderr("Job %s is waiting in queue", job_id)
1191
          notified_queued = True
1192
        elif status == constants.JOB_STATUS_WAITLOCK and not notified_waitlock:
1193
          ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1194
          notified_waitlock = True
1195

    
1196
      # Wait again
1197
      continue
1198

    
1199
    # Split result, a tuple of (field values, log entries)
1200
    (job_info, log_entries) = result
1201
    (status, ) = job_info
1202

    
1203
    if log_entries:
1204
      for log_entry in log_entries:
1205
        (serial, timestamp, _, message) = log_entry
1206
        if callable(feedback_fn):
1207
          feedback_fn(log_entry[1:])
1208
        else:
1209
          encoded = utils.SafeEncode(message)
1210
          ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded)
1211
        prev_logmsg_serial = max(prev_logmsg_serial, serial)
1212

    
1213
    # TODO: Handle canceled and archived jobs
1214
    elif status in (constants.JOB_STATUS_SUCCESS,
1215
                    constants.JOB_STATUS_ERROR,
1216
                    constants.JOB_STATUS_CANCELING,
1217
                    constants.JOB_STATUS_CANCELED):
1218
      break
1219

    
1220
    prev_job_info = job_info
1221

    
1222
  jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1223
  if not jobs:
1224
    raise errors.JobLost("Job with id %s lost" % job_id)
1225

    
1226
  status, opstatus, result = jobs[0]
1227
  if status == constants.JOB_STATUS_SUCCESS:
1228
    return result
1229
  elif status in (constants.JOB_STATUS_CANCELING,
1230
                  constants.JOB_STATUS_CANCELED):
1231
    raise errors.OpExecError("Job was canceled")
1232
  else:
1233
    has_ok = False
1234
    for idx, (status, msg) in enumerate(zip(opstatus, result)):
1235
      if status == constants.OP_STATUS_SUCCESS:
1236
        has_ok = True
1237
      elif status == constants.OP_STATUS_ERROR:
1238
        errors.MaybeRaise(msg)
1239
        if has_ok:
1240
          raise errors.OpExecError("partial failure (opcode %d): %s" %
1241
                                   (idx, msg))
1242
        else:
1243
          raise errors.OpExecError(str(msg))
1244
    # default failure mode
1245
    raise errors.OpExecError(result)
1246

    
1247

    
1248
def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):
1249
  """Legacy function to submit an opcode.
1250

1251
  This is just a simple wrapper over the construction of the processor
1252
  instance. It should be extended to better handle feedback and
1253
  interaction functions.
1254

1255
  """
1256
  if cl is None:
1257
    cl = GetClient()
1258

    
1259
  SetGenericOpcodeOpts([op], opts)
1260

    
1261
  job_id = SendJob([op], cl)
1262

    
1263
  op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
1264

    
1265
  return op_results[0]
1266

    
1267

    
1268
def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1269
  """Wrapper around SubmitOpCode or SendJob.
1270

1271
  This function will decide, based on the 'opts' parameter, whether to
1272
  submit and wait for the result of the opcode (and return it), or
1273
  whether to just send the job and print its identifier. It is used in
1274
  order to simplify the implementation of the '--submit' option.
1275

1276
  It will also process the opcodes if we're sending the via SendJob
1277
  (otherwise SubmitOpCode does it).
1278

1279
  """
1280
  if opts and opts.submit_only:
1281
    job = [op]
1282
    SetGenericOpcodeOpts(job, opts)
1283
    job_id = SendJob(job, cl=cl)
1284
    raise JobSubmittedException(job_id)
1285
  else:
1286
    return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1287

    
1288

    
1289
def SetGenericOpcodeOpts(opcode_list, options):
1290
  """Processor for generic options.
1291

1292
  This function updates the given opcodes based on generic command
1293
  line options (like debug, dry-run, etc.).
1294

1295
  @param opcode_list: list of opcodes
1296
  @param options: command line options or None
1297
  @return: None (in-place modification)
1298

1299
  """
1300
  if not options:
1301
    return
1302
  for op in opcode_list:
1303
    op.dry_run = options.dry_run
1304
    op.debug_level = options.debug
1305

    
1306

    
1307
def GetClient():
1308
  # TODO: Cache object?
1309
  try:
1310
    client = luxi.Client()
1311
  except luxi.NoMasterError:
1312
    ss = ssconf.SimpleStore()
1313

    
1314
    # Try to read ssconf file
1315
    try:
1316
      ss.GetMasterNode()
1317
    except errors.ConfigurationError:
1318
      raise errors.OpPrereqError("Cluster not initialized or this machine is"
1319
                                 " not part of a cluster")
1320

    
1321
    master, myself = ssconf.GetMasterAndMyself(ss=ss)
1322
    if master != myself:
1323
      raise errors.OpPrereqError("This is not the master node, please connect"
1324
                                 " to node '%s' and rerun the command" %
1325
                                 master)
1326
    raise
1327
  return client
1328

    
1329

    
1330
def FormatError(err):
1331
  """Return a formatted error message for a given error.
1332

1333
  This function takes an exception instance and returns a tuple
1334
  consisting of two values: first, the recommended exit code, and
1335
  second, a string describing the error message (not
1336
  newline-terminated).
1337

1338
  """
1339
  retcode = 1
1340
  obuf = StringIO()
1341
  msg = str(err)
1342
  if isinstance(err, errors.ConfigurationError):
1343
    txt = "Corrupt configuration file: %s" % msg
1344
    logging.error(txt)
1345
    obuf.write(txt + "\n")
1346
    obuf.write("Aborting.")
1347
    retcode = 2
1348
  elif isinstance(err, errors.HooksAbort):
1349
    obuf.write("Failure: hooks execution failed:\n")
1350
    for node, script, out in err.args[0]:
1351
      if out:
1352
        obuf.write("  node: %s, script: %s, output: %s\n" %
1353
                   (node, script, out))
1354
      else:
1355
        obuf.write("  node: %s, script: %s (no output)\n" %
1356
                   (node, script))
1357
  elif isinstance(err, errors.HooksFailure):
1358
    obuf.write("Failure: hooks general failure: %s" % msg)
1359
  elif isinstance(err, errors.ResolverError):
1360
    this_host = utils.HostInfo.SysName()
1361
    if err.args[0] == this_host:
1362
      msg = "Failure: can't resolve my own hostname ('%s')"
1363
    else:
1364
      msg = "Failure: can't resolve hostname '%s'"
1365
    obuf.write(msg % err.args[0])
1366
  elif isinstance(err, errors.OpPrereqError):
1367
    if len(err.args) == 2:
1368
      obuf.write("Failure: prerequisites not met for this"
1369
               " operation:\nerror type: %s, error details:\n%s" %
1370
                 (err.args[1], err.args[0]))
1371
    else:
1372
      obuf.write("Failure: prerequisites not met for this"
1373
                 " operation:\n%s" % msg)
1374
  elif isinstance(err, errors.OpExecError):
1375
    obuf.write("Failure: command execution error:\n%s" % msg)
1376
  elif isinstance(err, errors.TagError):
1377
    obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1378
  elif isinstance(err, errors.JobQueueDrainError):
1379
    obuf.write("Failure: the job queue is marked for drain and doesn't"
1380
               " accept new requests\n")
1381
  elif isinstance(err, errors.JobQueueFull):
1382
    obuf.write("Failure: the job queue is full and doesn't accept new"
1383
               " job submissions until old jobs are archived\n")
1384
  elif isinstance(err, errors.TypeEnforcementError):
1385
    obuf.write("Parameter Error: %s" % msg)
1386
  elif isinstance(err, errors.ParameterError):
1387
    obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1388
  elif isinstance(err, errors.GenericError):
1389
    obuf.write("Unhandled Ganeti error: %s" % msg)
1390
  elif isinstance(err, luxi.NoMasterError):
1391
    obuf.write("Cannot communicate with the master daemon.\nIs it running"
1392
               " and listening for connections?")
1393
  elif isinstance(err, luxi.TimeoutError):
1394
    obuf.write("Timeout while talking to the master daemon. Error:\n"
1395
               "%s" % msg)
1396
  elif isinstance(err, luxi.ProtocolError):
1397
    obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1398
               "%s" % msg)
1399
  elif isinstance(err, JobSubmittedException):
1400
    obuf.write("JobID: %s\n" % err.args[0])
1401
    retcode = 0
1402
  else:
1403
    obuf.write("Unhandled exception: %s" % msg)
1404
  return retcode, obuf.getvalue().rstrip('\n')
1405

    
1406

    
1407
def GenericMain(commands, override=None, aliases=None):
1408
  """Generic main function for all the gnt-* commands.
1409

1410
  Arguments:
1411
    - commands: a dictionary with a special structure, see the design doc
1412
                for command line handling.
1413
    - override: if not None, we expect a dictionary with keys that will
1414
                override command line options; this can be used to pass
1415
                options from the scripts to generic functions
1416
    - aliases: dictionary with command aliases {'alias': 'target, ...}
1417

1418
  """
1419
  # save the program name and the entire command line for later logging
1420
  if sys.argv:
1421
    binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1422
    if len(sys.argv) >= 2:
1423
      binary += " " + sys.argv[1]
1424
      old_cmdline = " ".join(sys.argv[2:])
1425
    else:
1426
      old_cmdline = ""
1427
  else:
1428
    binary = "<unknown program>"
1429
    old_cmdline = ""
1430

    
1431
  if aliases is None:
1432
    aliases = {}
1433

    
1434
  try:
1435
    func, options, args = _ParseArgs(sys.argv, commands, aliases)
1436
  except errors.ParameterError, err:
1437
    result, err_msg = FormatError(err)
1438
    ToStderr(err_msg)
1439
    return 1
1440

    
1441
  if func is None: # parse error
1442
    return 1
1443

    
1444
  if override is not None:
1445
    for key, val in override.iteritems():
1446
      setattr(options, key, val)
1447

    
1448
  utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1449
                     stderr_logging=True, program=binary)
1450

    
1451
  if old_cmdline:
1452
    logging.info("run with arguments '%s'", old_cmdline)
1453
  else:
1454
    logging.info("run with no arguments")
1455

    
1456
  try:
1457
    result = func(options, args)
1458
  except (errors.GenericError, luxi.ProtocolError,
1459
          JobSubmittedException), err:
1460
    result, err_msg = FormatError(err)
1461
    logging.exception("Error during command processing")
1462
    ToStderr(err_msg)
1463

    
1464
  return result
1465

    
1466

    
1467
def GenericInstanceCreate(mode, opts, args):
1468
  """Add an instance to the cluster via either creation or import.
1469

1470
  @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1471
  @param opts: the command line options selected by the user
1472
  @type args: list
1473
  @param args: should contain only one element, the new instance name
1474
  @rtype: int
1475
  @return: the desired exit code
1476

1477
  """
1478
  instance = args[0]
1479

    
1480
  (pnode, snode) = SplitNodeOption(opts.node)
1481

    
1482
  hypervisor = None
1483
  hvparams = {}
1484
  if opts.hypervisor:
1485
    hypervisor, hvparams = opts.hypervisor
1486

    
1487
  if opts.nics:
1488
    try:
1489
      nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
1490
    except ValueError, err:
1491
      raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
1492
    nics = [{}] * nic_max
1493
    for nidx, ndict in opts.nics:
1494
      nidx = int(nidx)
1495
      if not isinstance(ndict, dict):
1496
        msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict)
1497
        raise errors.OpPrereqError(msg)
1498
      nics[nidx] = ndict
1499
  elif opts.no_nics:
1500
    # no nics
1501
    nics = []
1502
  else:
1503
    # default of one nic, all auto
1504
    nics = [{}]
1505

    
1506
  if opts.disk_template == constants.DT_DISKLESS:
1507
    if opts.disks or opts.sd_size is not None:
1508
      raise errors.OpPrereqError("Diskless instance but disk"
1509
                                 " information passed")
1510
    disks = []
1511
  else:
1512
    if not opts.disks and not opts.sd_size:
1513
      raise errors.OpPrereqError("No disk information specified")
1514
    if opts.disks and opts.sd_size is not None:
1515
      raise errors.OpPrereqError("Please use either the '--disk' or"
1516
                                 " '-s' option")
1517
    if opts.sd_size is not None:
1518
      opts.disks = [(0, {"size": opts.sd_size})]
1519
    try:
1520
      disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1521
    except ValueError, err:
1522
      raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
1523
    disks = [{}] * disk_max
1524
    for didx, ddict in opts.disks:
1525
      didx = int(didx)
1526
      if not isinstance(ddict, dict):
1527
        msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1528
        raise errors.OpPrereqError(msg)
1529
      elif "size" in ddict:
1530
        if "adopt" in ddict:
1531
          raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1532
                                     " (disk %d)" % didx)
1533
        try:
1534
          ddict["size"] = utils.ParseUnit(ddict["size"])
1535
        except ValueError, err:
1536
          raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1537
                                     (didx, err))
1538
      elif "adopt" in ddict:
1539
        if mode == constants.INSTANCE_IMPORT:
1540
          raise errors.OpPrereqError("Disk adoption not allowed for instance"
1541
                                     " import")
1542
        ddict["size"] = 0
1543
      else:
1544
        raise errors.OpPrereqError("Missing size or adoption source for"
1545
                                   " disk %d" % didx)
1546
      disks[didx] = ddict
1547

    
1548
  utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
1549
  utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1550

    
1551
  if mode == constants.INSTANCE_CREATE:
1552
    start = opts.start
1553
    os_type = opts.os
1554
    src_node = None
1555
    src_path = None
1556
  elif mode == constants.INSTANCE_IMPORT:
1557
    start = False
1558
    os_type = None
1559
    src_node = opts.src_node
1560
    src_path = opts.src_dir
1561
  else:
1562
    raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1563

    
1564
  op = opcodes.OpCreateInstance(instance_name=instance,
1565
                                disks=disks,
1566
                                disk_template=opts.disk_template,
1567
                                nics=nics,
1568
                                pnode=pnode, snode=snode,
1569
                                ip_check=opts.ip_check,
1570
                                name_check=opts.name_check,
1571
                                wait_for_sync=opts.wait_for_sync,
1572
                                file_storage_dir=opts.file_storage_dir,
1573
                                file_driver=opts.file_driver,
1574
                                iallocator=opts.iallocator,
1575
                                hypervisor=hypervisor,
1576
                                hvparams=hvparams,
1577
                                beparams=opts.beparams,
1578
                                mode=mode,
1579
                                start=start,
1580
                                os_type=os_type,
1581
                                src_node=src_node,
1582
                                src_path=src_path)
1583

    
1584
  SubmitOrSend(op, opts)
1585
  return 0
1586

    
1587

    
1588
class _RunWhileClusterStoppedHelper:
1589
  """Helper class for L{RunWhileClusterStopped} to simplify state management
1590

1591
  """
1592
  def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
1593
    """Initializes this class.
1594

1595
    @type feedback_fn: callable
1596
    @param feedback_fn: Feedback function
1597
    @type cluster_name: string
1598
    @param cluster_name: Cluster name
1599
    @type master_node: string
1600
    @param master_node Master node name
1601
    @type online_nodes: list
1602
    @param online_nodes: List of names of online nodes
1603

1604
    """
1605
    self.feedback_fn = feedback_fn
1606
    self.cluster_name = cluster_name
1607
    self.master_node = master_node
1608
    self.online_nodes = online_nodes
1609

    
1610
    self.ssh = ssh.SshRunner(self.cluster_name)
1611

    
1612
    self.nonmaster_nodes = [name for name in online_nodes
1613
                            if name != master_node]
1614

    
1615
    assert self.master_node not in self.nonmaster_nodes
1616

    
1617
  def _RunCmd(self, node_name, cmd):
1618
    """Runs a command on the local or a remote machine.
1619

1620
    @type node_name: string
1621
    @param node_name: Machine name
1622
    @type cmd: list
1623
    @param cmd: Command
1624

1625
    """
1626
    if node_name is None or node_name == self.master_node:
1627
      # No need to use SSH
1628
      result = utils.RunCmd(cmd)
1629
    else:
1630
      result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
1631

    
1632
    if result.failed:
1633
      errmsg = ["Failed to run command %s" % result.cmd]
1634
      if node_name:
1635
        errmsg.append("on node %s" % node_name)
1636
      errmsg.append(": exitcode %s and error %s" %
1637
                    (result.exit_code, result.output))
1638
      raise errors.OpExecError(" ".join(errmsg))
1639

    
1640
  def Call(self, fn, *args):
1641
    """Call function while all daemons are stopped.
1642

1643
    @type fn: callable
1644
    @param fn: Function to be called
1645

1646
    """
1647
    # Pause watcher by acquiring an exclusive lock on watcher state file
1648
    self.feedback_fn("Blocking watcher")
1649
    watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
1650
    try:
1651
      # TODO: Currently, this just blocks. There's no timeout.
1652
      # TODO: Should it be a shared lock?
1653
      watcher_block.Exclusive(blocking=True)
1654

    
1655
      # Stop master daemons, so that no new jobs can come in and all running
1656
      # ones are finished
1657
      self.feedback_fn("Stopping master daemons")
1658
      self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
1659
      try:
1660
        # Stop daemons on all nodes
1661
        for node_name in self.online_nodes:
1662
          self.feedback_fn("Stopping daemons on %s" % node_name)
1663
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
1664

    
1665
        # All daemons are shut down now
1666
        try:
1667
          return fn(self, *args)
1668
        except Exception:
1669
          logging.exception("Caught exception")
1670
          raise
1671
      finally:
1672
        # Start cluster again, master node last
1673
        for node_name in self.nonmaster_nodes + [self.master_node]:
1674
          self.feedback_fn("Starting daemons on %s" % node_name)
1675
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
1676
    finally:
1677
      # Resume watcher
1678
      watcher_block.Close()
1679

    
1680

    
1681
def RunWhileClusterStopped(feedback_fn, fn, *args):
1682
  """Calls a function while all cluster daemons are stopped.
1683

1684
  @type feedback_fn: callable
1685
  @param feedback_fn: Feedback function
1686
  @type fn: callable
1687
  @param fn: Function to be called when daemons are stopped
1688

1689
  """
1690
  feedback_fn("Gathering cluster information")
1691

    
1692
  # This ensures we're running on the master daemon
1693
  cl = GetClient()
1694

    
1695
  (cluster_name, master_node) = \
1696
    cl.QueryConfigValues(["cluster_name", "master_node"])
1697

    
1698
  online_nodes = GetOnlineNodes([], cl=cl)
1699

    
1700
  # Don't keep a reference to the client. The master daemon will go away.
1701
  del cl
1702

    
1703
  assert master_node in online_nodes
1704

    
1705
  return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
1706
                                       online_nodes).Call(fn, *args)
1707

    
1708

    
1709
def GenerateTable(headers, fields, separator, data,
1710
                  numfields=None, unitfields=None,
1711
                  units=None):
1712
  """Prints a table with headers and different fields.
1713

1714
  @type headers: dict
1715
  @param headers: dictionary mapping field names to headers for
1716
      the table
1717
  @type fields: list
1718
  @param fields: the field names corresponding to each row in
1719
      the data field
1720
  @param separator: the separator to be used; if this is None,
1721
      the default 'smart' algorithm is used which computes optimal
1722
      field width, otherwise just the separator is used between
1723
      each field
1724
  @type data: list
1725
  @param data: a list of lists, each sublist being one row to be output
1726
  @type numfields: list
1727
  @param numfields: a list with the fields that hold numeric
1728
      values and thus should be right-aligned
1729
  @type unitfields: list
1730
  @param unitfields: a list with the fields that hold numeric
1731
      values that should be formatted with the units field
1732
  @type units: string or None
1733
  @param units: the units we should use for formatting, or None for
1734
      automatic choice (human-readable for non-separator usage, otherwise
1735
      megabytes); this is a one-letter string
1736

1737
  """
1738
  if units is None:
1739
    if separator:
1740
      units = "m"
1741
    else:
1742
      units = "h"
1743

    
1744
  if numfields is None:
1745
    numfields = []
1746
  if unitfields is None:
1747
    unitfields = []
1748

    
1749
  numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
1750
  unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
1751

    
1752
  format_fields = []
1753
  for field in fields:
1754
    if headers and field not in headers:
1755
      # TODO: handle better unknown fields (either revert to old
1756
      # style of raising exception, or deal more intelligently with
1757
      # variable fields)
1758
      headers[field] = field
1759
    if separator is not None:
1760
      format_fields.append("%s")
1761
    elif numfields.Matches(field):
1762
      format_fields.append("%*s")
1763
    else:
1764
      format_fields.append("%-*s")
1765

    
1766
  if separator is None:
1767
    mlens = [0 for name in fields]
1768
    format = ' '.join(format_fields)
1769
  else:
1770
    format = separator.replace("%", "%%").join(format_fields)
1771

    
1772
  for row in data:
1773
    if row is None:
1774
      continue
1775
    for idx, val in enumerate(row):
1776
      if unitfields.Matches(fields[idx]):
1777
        try:
1778
          val = int(val)
1779
        except (TypeError, ValueError):
1780
          pass
1781
        else:
1782
          val = row[idx] = utils.FormatUnit(val, units)
1783
      val = row[idx] = str(val)
1784
      if separator is None:
1785
        mlens[idx] = max(mlens[idx], len(val))
1786

    
1787
  result = []
1788
  if headers:
1789
    args = []
1790
    for idx, name in enumerate(fields):
1791
      hdr = headers[name]
1792
      if separator is None:
1793
        mlens[idx] = max(mlens[idx], len(hdr))
1794
        args.append(mlens[idx])
1795
      args.append(hdr)
1796
    result.append(format % tuple(args))
1797

    
1798
  if separator is None:
1799
    assert len(mlens) == len(fields)
1800

    
1801
    if fields and not numfields.Matches(fields[-1]):
1802
      mlens[-1] = 0
1803

    
1804
  for line in data:
1805
    args = []
1806
    if line is None:
1807
      line = ['-' for _ in fields]
1808
    for idx in range(len(fields)):
1809
      if separator is None:
1810
        args.append(mlens[idx])
1811
      args.append(line[idx])
1812
    result.append(format % tuple(args))
1813

    
1814
  return result
1815

    
1816

    
1817
def FormatTimestamp(ts):
1818
  """Formats a given timestamp.
1819

1820
  @type ts: timestamp
1821
  @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
1822

1823
  @rtype: string
1824
  @return: a string with the formatted timestamp
1825

1826
  """
1827
  if not isinstance (ts, (tuple, list)) or len(ts) != 2:
1828
    return '?'
1829
  sec, usec = ts
1830
  return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
1831

    
1832

    
1833
def ParseTimespec(value):
1834
  """Parse a time specification.
1835

1836
  The following suffixed will be recognized:
1837

1838
    - s: seconds
1839
    - m: minutes
1840
    - h: hours
1841
    - d: day
1842
    - w: weeks
1843

1844
  Without any suffix, the value will be taken to be in seconds.
1845

1846
  """
1847
  value = str(value)
1848
  if not value:
1849
    raise errors.OpPrereqError("Empty time specification passed")
1850
  suffix_map = {
1851
    's': 1,
1852
    'm': 60,
1853
    'h': 3600,
1854
    'd': 86400,
1855
    'w': 604800,
1856
    }
1857
  if value[-1] not in suffix_map:
1858
    try:
1859
      value = int(value)
1860
    except (TypeError, ValueError):
1861
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1862
  else:
1863
    multiplier = suffix_map[value[-1]]
1864
    value = value[:-1]
1865
    if not value: # no data left after stripping the suffix
1866
      raise errors.OpPrereqError("Invalid time specification (only"
1867
                                 " suffix passed)")
1868
    try:
1869
      value = int(value) * multiplier
1870
    except (TypeError, ValueError):
1871
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1872
  return value
1873

    
1874

    
1875
def GetOnlineNodes(nodes, cl=None, nowarn=False):
1876
  """Returns the names of online nodes.
1877

1878
  This function will also log a warning on stderr with the names of
1879
  the online nodes.
1880

1881
  @param nodes: if not empty, use only this subset of nodes (minus the
1882
      offline ones)
1883
  @param cl: if not None, luxi client to use
1884
  @type nowarn: boolean
1885
  @param nowarn: by default, this function will output a note with the
1886
      offline nodes that are skipped; if this parameter is True the
1887
      note is not displayed
1888

1889
  """
1890
  if cl is None:
1891
    cl = GetClient()
1892

    
1893
  result = cl.QueryNodes(names=nodes, fields=["name", "offline"],
1894
                         use_locking=False)
1895
  offline = [row[0] for row in result if row[1]]
1896
  if offline and not nowarn:
1897
    ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
1898
  return [row[0] for row in result if not row[1]]
1899

    
1900

    
1901
def _ToStream(stream, txt, *args):
1902
  """Write a message to a stream, bypassing the logging system
1903

1904
  @type stream: file object
1905
  @param stream: the file to which we should write
1906
  @type txt: str
1907
  @param txt: the message
1908

1909
  """
1910
  if args:
1911
    args = tuple(args)
1912
    stream.write(txt % args)
1913
  else:
1914
    stream.write(txt)
1915
  stream.write('\n')
1916
  stream.flush()
1917

    
1918

    
1919
def ToStdout(txt, *args):
1920
  """Write a message to stdout only, bypassing the logging system
1921

1922
  This is just a wrapper over _ToStream.
1923

1924
  @type txt: str
1925
  @param txt: the message
1926

1927
  """
1928
  _ToStream(sys.stdout, txt, *args)
1929

    
1930

    
1931
def ToStderr(txt, *args):
1932
  """Write a message to stderr only, bypassing the logging system
1933

1934
  This is just a wrapper over _ToStream.
1935

1936
  @type txt: str
1937
  @param txt: the message
1938

1939
  """
1940
  _ToStream(sys.stderr, txt, *args)
1941

    
1942

    
1943
class JobExecutor(object):
1944
  """Class which manages the submission and execution of multiple jobs.
1945

1946
  Note that instances of this class should not be reused between
1947
  GetResults() calls.
1948

1949
  """
1950
  def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
1951
    self.queue = []
1952
    if cl is None:
1953
      cl = GetClient()
1954
    self.cl = cl
1955
    self.verbose = verbose
1956
    self.jobs = []
1957
    self.opts = opts
1958
    self.feedback_fn = feedback_fn
1959

    
1960
  def QueueJob(self, name, *ops):
1961
    """Record a job for later submit.
1962

1963
    @type name: string
1964
    @param name: a description of the job, will be used in WaitJobSet
1965
    """
1966
    SetGenericOpcodeOpts(ops, self.opts)
1967
    self.queue.append((name, ops))
1968

    
1969
  def SubmitPending(self):
1970
    """Submit all pending jobs.
1971

1972
    """
1973
    results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
1974
    for (idx, ((status, data), (name, _))) in enumerate(zip(results,
1975
                                                            self.queue)):
1976
      self.jobs.append((idx, status, data, name))
1977

    
1978
  def _ChooseJob(self):
1979
    """Choose a non-waiting/queued job to poll next.
1980

1981
    """
1982
    assert self.jobs, "_ChooseJob called with empty job list"
1983

    
1984
    result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
1985
    assert result
1986

    
1987
    for job_data, status in zip(self.jobs, result):
1988
      if status[0] in (constants.JOB_STATUS_QUEUED,
1989
                    constants.JOB_STATUS_WAITLOCK,
1990
                    constants.JOB_STATUS_CANCELING):
1991
        # job is still waiting
1992
        continue
1993
      # good candidate found
1994
      self.jobs.remove(job_data)
1995
      return job_data
1996

    
1997
    # no job found
1998
    return self.jobs.pop(0)
1999

    
2000
  def GetResults(self):
2001
    """Wait for and return the results of all jobs.
2002

2003
    @rtype: list
2004
    @return: list of tuples (success, job results), in the same order
2005
        as the submitted jobs; if a job has failed, instead of the result
2006
        there will be the error message
2007

2008
    """
2009
    if not self.jobs:
2010
      self.SubmitPending()
2011
    results = []
2012
    if self.verbose:
2013
      ok_jobs = [row[2] for row in self.jobs if row[1]]
2014
      if ok_jobs:
2015
        ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2016

    
2017
    # first, remove any non-submitted jobs
2018
    self.jobs, failures = utils.partition(self.jobs, lambda x: x[1])
2019
    for idx, _, jid, name in failures:
2020
      ToStderr("Failed to submit job for %s: %s", name, jid)
2021
      results.append((idx, False, jid))
2022

    
2023
    while self.jobs:
2024
      (idx, _, jid, name) = self._ChooseJob()
2025
      ToStdout("Waiting for job %s for %s...", jid, name)
2026
      try:
2027
        job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2028
        success = True
2029
      except (errors.GenericError, luxi.ProtocolError), err:
2030
        _, job_result = FormatError(err)
2031
        success = False
2032
        # the error message will always be shown, verbose or not
2033
        ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
2034

    
2035
      results.append((idx, success, job_result))
2036

    
2037
    # sort based on the index, then drop it
2038
    results.sort()
2039
    results = [i[1:] for i in results]
2040

    
2041
    return results
2042

    
2043
  def WaitOrShow(self, wait):
2044
    """Wait for job results or only print the job IDs.
2045

2046
    @type wait: boolean
2047
    @param wait: whether to wait or not
2048

2049
    """
2050
    if wait:
2051
      return self.GetResults()
2052
    else:
2053
      if not self.jobs:
2054
        self.SubmitPending()
2055
      for status, result, name in self.jobs:
2056
        if status:
2057
          ToStdout("%s: %s", result, name)
2058
        else:
2059
          ToStderr("Failure for %s: %s", name, result)