Statistics
| Branch: | Tag: | Revision:

root / lib / cli.py @ 25a8792c

History | View | Annotate | Download (66.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module dealing with command line parsing"""
23

    
24

    
25
import sys
26
import textwrap
27
import os.path
28
import time
29
import logging
30
from cStringIO import StringIO
31

    
32
from ganeti import utils
33
from ganeti import errors
34
from ganeti import constants
35
from ganeti import opcodes
36
from ganeti import luxi
37
from ganeti import ssconf
38
from ganeti import rpc
39
from ganeti import ssh
40

    
41
from optparse import (OptionParser, TitledHelpFormatter,
42
                      Option, OptionValueError)
43

    
44

    
45
__all__ = [
46
  # Command line options
47
  "ALLOCATABLE_OPT",
48
  "ALL_OPT",
49
  "AUTO_PROMOTE_OPT",
50
  "AUTO_REPLACE_OPT",
51
  "BACKEND_OPT",
52
  "CLEANUP_OPT",
53
  "CONFIRM_OPT",
54
  "CP_SIZE_OPT",
55
  "DEBUG_OPT",
56
  "DEBUG_SIMERR_OPT",
57
  "DISKIDX_OPT",
58
  "DISK_OPT",
59
  "DISK_TEMPLATE_OPT",
60
  "DRAINED_OPT",
61
  "EARLY_RELEASE_OPT",
62
  "ENABLED_HV_OPT",
63
  "ERROR_CODES_OPT",
64
  "FIELDS_OPT",
65
  "FILESTORE_DIR_OPT",
66
  "FILESTORE_DRIVER_OPT",
67
  "FORCE_OPT",
68
  "FORCE_VARIANT_OPT",
69
  "GLOBAL_FILEDIR_OPT",
70
  "HVLIST_OPT",
71
  "HVOPTS_OPT",
72
  "HYPERVISOR_OPT",
73
  "IALLOCATOR_OPT",
74
  "IGNORE_CONSIST_OPT",
75
  "IGNORE_FAILURES_OPT",
76
  "IGNORE_SECONDARIES_OPT",
77
  "IGNORE_SIZE_OPT",
78
  "MAC_PREFIX_OPT",
79
  "MASTER_NETDEV_OPT",
80
  "MC_OPT",
81
  "NET_OPT",
82
  "NEW_CLUSTER_CERT_OPT",
83
  "NEW_CONFD_HMAC_KEY_OPT",
84
  "NEW_RAPI_CERT_OPT",
85
  "NEW_SECONDARY_OPT",
86
  "NIC_PARAMS_OPT",
87
  "NODE_LIST_OPT",
88
  "NODE_PLACEMENT_OPT",
89
  "NOHDR_OPT",
90
  "NOIPCHECK_OPT",
91
  "NO_INSTALL_OPT",
92
  "NONAMECHECK_OPT",
93
  "NOLVM_STORAGE_OPT",
94
  "NOMODIFY_ETCHOSTS_OPT",
95
  "NOMODIFY_SSH_SETUP_OPT",
96
  "NONICS_OPT",
97
  "NONLIVE_OPT",
98
  "NONPLUS1_OPT",
99
  "NOSHUTDOWN_OPT",
100
  "NOSTART_OPT",
101
  "NOSSH_KEYCHECK_OPT",
102
  "NOVOTING_OPT",
103
  "NWSYNC_OPT",
104
  "ON_PRIMARY_OPT",
105
  "ON_SECONDARY_OPT",
106
  "OFFLINE_OPT",
107
  "OS_OPT",
108
  "OS_SIZE_OPT",
109
  "RAPI_CERT_OPT",
110
  "READD_OPT",
111
  "REBOOT_TYPE_OPT",
112
  "SECONDARY_IP_OPT",
113
  "SELECT_OS_OPT",
114
  "SEP_OPT",
115
  "SHOWCMD_OPT",
116
  "SHUTDOWN_TIMEOUT_OPT",
117
  "SINGLE_NODE_OPT",
118
  "SRC_DIR_OPT",
119
  "SRC_NODE_OPT",
120
  "SUBMIT_OPT",
121
  "STATIC_OPT",
122
  "SYNC_OPT",
123
  "TAG_SRC_OPT",
124
  "TIMEOUT_OPT",
125
  "USEUNITS_OPT",
126
  "VERBOSE_OPT",
127
  "VG_NAME_OPT",
128
  "YES_DOIT_OPT",
129
  # Generic functions for CLI programs
130
  "GenericMain",
131
  "GenericInstanceCreate",
132
  "GetClient",
133
  "GetOnlineNodes",
134
  "JobExecutor",
135
  "JobSubmittedException",
136
  "ParseTimespec",
137
  "RunWhileClusterStopped",
138
  "SubmitOpCode",
139
  "SubmitOrSend",
140
  "UsesRPC",
141
  # Formatting functions
142
  "ToStderr", "ToStdout",
143
  "FormatError",
144
  "GenerateTable",
145
  "AskUser",
146
  "FormatTimestamp",
147
  # Tags functions
148
  "ListTags",
149
  "AddTags",
150
  "RemoveTags",
151
  # command line options support infrastructure
152
  "ARGS_MANY_INSTANCES",
153
  "ARGS_MANY_NODES",
154
  "ARGS_NONE",
155
  "ARGS_ONE_INSTANCE",
156
  "ARGS_ONE_NODE",
157
  "ARGS_ONE_OS",
158
  "ArgChoice",
159
  "ArgCommand",
160
  "ArgFile",
161
  "ArgHost",
162
  "ArgInstance",
163
  "ArgJobId",
164
  "ArgNode",
165
  "ArgOs",
166
  "ArgSuggest",
167
  "ArgUnknown",
168
  "OPT_COMPL_INST_ADD_NODES",
169
  "OPT_COMPL_MANY_NODES",
170
  "OPT_COMPL_ONE_IALLOCATOR",
171
  "OPT_COMPL_ONE_INSTANCE",
172
  "OPT_COMPL_ONE_NODE",
173
  "OPT_COMPL_ONE_OS",
174
  "cli_option",
175
  "SplitNodeOption",
176
  "CalculateOSNames",
177
  ]
178

    
179
NO_PREFIX = "no_"
180
UN_PREFIX = "-"
181

    
182

    
183
class _Argument:
184
  def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
185
    self.min = min
186
    self.max = max
187

    
188
  def __repr__(self):
189
    return ("<%s min=%s max=%s>" %
190
            (self.__class__.__name__, self.min, self.max))
191

    
192

    
193
class ArgSuggest(_Argument):
194
  """Suggesting argument.
195

196
  Value can be any of the ones passed to the constructor.
197

198
  """
199
  # pylint: disable-msg=W0622
200
  def __init__(self, min=0, max=None, choices=None):
201
    _Argument.__init__(self, min=min, max=max)
202
    self.choices = choices
203

    
204
  def __repr__(self):
205
    return ("<%s min=%s max=%s choices=%r>" %
206
            (self.__class__.__name__, self.min, self.max, self.choices))
207

    
208

    
209
class ArgChoice(ArgSuggest):
210
  """Choice argument.
211

212
  Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
213
  but value must be one of the choices.
214

215
  """
216

    
217

    
218
class ArgUnknown(_Argument):
219
  """Unknown argument to program (e.g. determined at runtime).
220

221
  """
222

    
223

    
224
class ArgInstance(_Argument):
225
  """Instances argument.
226

227
  """
228

    
229

    
230
class ArgNode(_Argument):
231
  """Node argument.
232

233
  """
234

    
235
class ArgJobId(_Argument):
236
  """Job ID argument.
237

238
  """
239

    
240

    
241
class ArgFile(_Argument):
242
  """File path argument.
243

244
  """
245

    
246

    
247
class ArgCommand(_Argument):
248
  """Command argument.
249

250
  """
251

    
252

    
253
class ArgHost(_Argument):
254
  """Host argument.
255

256
  """
257

    
258

    
259
class ArgOs(_Argument):
260
  """OS argument.
261

262
  """
263

    
264

    
265
ARGS_NONE = []
266
ARGS_MANY_INSTANCES = [ArgInstance()]
267
ARGS_MANY_NODES = [ArgNode()]
268
ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
269
ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
270
ARGS_ONE_OS = [ArgOs(min=1, max=1)]
271

    
272

    
273
def _ExtractTagsObject(opts, args):
274
  """Extract the tag type object.
275

276
  Note that this function will modify its args parameter.
277

278
  """
279
  if not hasattr(opts, "tag_type"):
280
    raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
281
  kind = opts.tag_type
282
  if kind == constants.TAG_CLUSTER:
283
    retval = kind, kind
284
  elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
285
    if not args:
286
      raise errors.OpPrereqError("no arguments passed to the command")
287
    name = args.pop(0)
288
    retval = kind, name
289
  else:
290
    raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
291
  return retval
292

    
293

    
294
def _ExtendTags(opts, args):
295
  """Extend the args if a source file has been given.
296

297
  This function will extend the tags with the contents of the file
298
  passed in the 'tags_source' attribute of the opts parameter. A file
299
  named '-' will be replaced by stdin.
300

301
  """
302
  fname = opts.tags_source
303
  if fname is None:
304
    return
305
  if fname == "-":
306
    new_fh = sys.stdin
307
  else:
308
    new_fh = open(fname, "r")
309
  new_data = []
310
  try:
311
    # we don't use the nice 'new_data = [line.strip() for line in fh]'
312
    # because of python bug 1633941
313
    while True:
314
      line = new_fh.readline()
315
      if not line:
316
        break
317
      new_data.append(line.strip())
318
  finally:
319
    new_fh.close()
320
  args.extend(new_data)
321

    
322

    
323
def ListTags(opts, args):
324
  """List the tags on a given object.
325

326
  This is a generic implementation that knows how to deal with all
327
  three cases of tag objects (cluster, node, instance). The opts
328
  argument is expected to contain a tag_type field denoting what
329
  object type we work on.
330

331
  """
332
  kind, name = _ExtractTagsObject(opts, args)
333
  cl = GetClient()
334
  result = cl.QueryTags(kind, name)
335
  result = list(result)
336
  result.sort()
337
  for tag in result:
338
    ToStdout(tag)
339

    
340

    
341
def AddTags(opts, args):
342
  """Add tags on a given object.
343

344
  This is a generic implementation that knows how to deal with all
345
  three cases of tag objects (cluster, node, instance). The opts
346
  argument is expected to contain a tag_type field denoting what
347
  object type we work on.
348

349
  """
350
  kind, name = _ExtractTagsObject(opts, args)
351
  _ExtendTags(opts, args)
352
  if not args:
353
    raise errors.OpPrereqError("No tags to be added")
354
  op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
355
  SubmitOpCode(op)
356

    
357

    
358
def RemoveTags(opts, args):
359
  """Remove tags from a given object.
360

361
  This is a generic implementation that knows how to deal with all
362
  three cases of tag objects (cluster, node, instance). The opts
363
  argument is expected to contain a tag_type field denoting what
364
  object type we work on.
365

366
  """
367
  kind, name = _ExtractTagsObject(opts, args)
368
  _ExtendTags(opts, args)
369
  if not args:
370
    raise errors.OpPrereqError("No tags to be removed")
371
  op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
372
  SubmitOpCode(op)
373

    
374

    
375
def check_unit(option, opt, value): # pylint: disable-msg=W0613
376
  """OptParsers custom converter for units.
377

378
  """
379
  try:
380
    return utils.ParseUnit(value)
381
  except errors.UnitParseError, err:
382
    raise OptionValueError("option %s: %s" % (opt, err))
383

    
384

    
385
def _SplitKeyVal(opt, data):
386
  """Convert a KeyVal string into a dict.
387

388
  This function will convert a key=val[,...] string into a dict. Empty
389
  values will be converted specially: keys which have the prefix 'no_'
390
  will have the value=False and the prefix stripped, the others will
391
  have value=True.
392

393
  @type opt: string
394
  @param opt: a string holding the option name for which we process the
395
      data, used in building error messages
396
  @type data: string
397
  @param data: a string of the format key=val,key=val,...
398
  @rtype: dict
399
  @return: {key=val, key=val}
400
  @raises errors.ParameterError: if there are duplicate keys
401

402
  """
403
  kv_dict = {}
404
  if data:
405
    for elem in utils.UnescapeAndSplit(data, sep=","):
406
      if "=" in elem:
407
        key, val = elem.split("=", 1)
408
      else:
409
        if elem.startswith(NO_PREFIX):
410
          key, val = elem[len(NO_PREFIX):], False
411
        elif elem.startswith(UN_PREFIX):
412
          key, val = elem[len(UN_PREFIX):], None
413
        else:
414
          key, val = elem, True
415
      if key in kv_dict:
416
        raise errors.ParameterError("Duplicate key '%s' in option %s" %
417
                                    (key, opt))
418
      kv_dict[key] = val
419
  return kv_dict
420

    
421

    
422
def check_ident_key_val(option, opt, value):  # pylint: disable-msg=W0613
423
  """Custom parser for ident:key=val,key=val options.
424

425
  This will store the parsed values as a tuple (ident, {key: val}). As such,
426
  multiple uses of this option via action=append is possible.
427

428
  """
429
  if ":" not in value:
430
    ident, rest = value, ''
431
  else:
432
    ident, rest = value.split(":", 1)
433

    
434
  if ident.startswith(NO_PREFIX):
435
    if rest:
436
      msg = "Cannot pass options when removing parameter groups: %s" % value
437
      raise errors.ParameterError(msg)
438
    retval = (ident[len(NO_PREFIX):], False)
439
  elif ident.startswith(UN_PREFIX):
440
    if rest:
441
      msg = "Cannot pass options when removing parameter groups: %s" % value
442
      raise errors.ParameterError(msg)
443
    retval = (ident[len(UN_PREFIX):], None)
444
  else:
445
    kv_dict = _SplitKeyVal(opt, rest)
446
    retval = (ident, kv_dict)
447
  return retval
448

    
449

    
450
def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
451
  """Custom parser class for key=val,key=val options.
452

453
  This will store the parsed values as a dict {key: val}.
454

455
  """
456
  return _SplitKeyVal(opt, value)
457

    
458

    
459
# completion_suggestion is normally a list. Using numeric values not evaluating
460
# to False for dynamic completion.
461
(OPT_COMPL_MANY_NODES,
462
 OPT_COMPL_ONE_NODE,
463
 OPT_COMPL_ONE_INSTANCE,
464
 OPT_COMPL_ONE_OS,
465
 OPT_COMPL_ONE_IALLOCATOR,
466
 OPT_COMPL_INST_ADD_NODES) = range(100, 106)
467

    
468
OPT_COMPL_ALL = frozenset([
469
  OPT_COMPL_MANY_NODES,
470
  OPT_COMPL_ONE_NODE,
471
  OPT_COMPL_ONE_INSTANCE,
472
  OPT_COMPL_ONE_OS,
473
  OPT_COMPL_ONE_IALLOCATOR,
474
  OPT_COMPL_INST_ADD_NODES,
475
  ])
476

    
477

    
478
class CliOption(Option):
479
  """Custom option class for optparse.
480

481
  """
482
  ATTRS = Option.ATTRS + [
483
    "completion_suggest",
484
    ]
485
  TYPES = Option.TYPES + (
486
    "identkeyval",
487
    "keyval",
488
    "unit",
489
    )
490
  TYPE_CHECKER = Option.TYPE_CHECKER.copy()
491
  TYPE_CHECKER["identkeyval"] = check_ident_key_val
492
  TYPE_CHECKER["keyval"] = check_key_val
493
  TYPE_CHECKER["unit"] = check_unit
494

    
495

    
496
# optparse.py sets make_option, so we do it for our own option class, too
497
cli_option = CliOption
498

    
499

    
500
_YESNO = ("yes", "no")
501
_YORNO = "yes|no"
502

    
503
DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
504
                       help="Increase debugging level")
505

    
506
NOHDR_OPT = cli_option("--no-headers", default=False,
507
                       action="store_true", dest="no_headers",
508
                       help="Don't display column headers")
509

    
510
SEP_OPT = cli_option("--separator", default=None,
511
                     action="store", dest="separator",
512
                     help=("Separator between output fields"
513
                           " (defaults to one space)"))
514

    
515
USEUNITS_OPT = cli_option("--units", default=None,
516
                          dest="units", choices=('h', 'm', 'g', 't'),
517
                          help="Specify units for output (one of hmgt)")
518

    
519
FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
520
                        type="string", metavar="FIELDS",
521
                        help="Comma separated list of output fields")
522

    
523
FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
524
                       default=False, help="Force the operation")
525

    
526
CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
527
                         default=False, help="Do not require confirmation")
528

    
529
TAG_SRC_OPT = cli_option("--from", dest="tags_source",
530
                         default=None, help="File with tag names")
531

    
532
SUBMIT_OPT = cli_option("--submit", dest="submit_only",
533
                        default=False, action="store_true",
534
                        help=("Submit the job and return the job ID, but"
535
                              " don't wait for the job to finish"))
536

    
537
SYNC_OPT = cli_option("--sync", dest="do_locking",
538
                      default=False, action="store_true",
539
                      help=("Grab locks while doing the queries"
540
                            " in order to ensure more consistent results"))
541

    
542
_DRY_RUN_OPT = cli_option("--dry-run", default=False,
543
                          action="store_true",
544
                          help=("Do not execute the operation, just run the"
545
                                " check steps and verify it it could be"
546
                                " executed"))
547

    
548
VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
549
                         action="store_true",
550
                         help="Increase the verbosity of the operation")
551

    
552
DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
553
                              action="store_true", dest="simulate_errors",
554
                              help="Debugging option that makes the operation"
555
                              " treat most runtime checks as failed")
556

    
557
NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
558
                        default=True, action="store_false",
559
                        help="Don't wait for sync (DANGEROUS!)")
560

    
561
DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
562
                               help="Custom disk setup (diskless, file,"
563
                               " plain or drbd)",
564
                               default=None, metavar="TEMPL",
565
                               choices=list(constants.DISK_TEMPLATES))
566

    
567
NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
568
                        help="Do not create any network cards for"
569
                        " the instance")
570

    
571
FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
572
                               help="Relative path under default cluster-wide"
573
                               " file storage dir to store file-based disks",
574
                               default=None, metavar="<DIR>")
575

    
576
FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
577
                                  help="Driver to use for image files",
578
                                  default="loop", metavar="<DRIVER>",
579
                                  choices=list(constants.FILE_DRIVER))
580

    
581
IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
582
                            help="Select nodes for the instance automatically"
583
                            " using the <NAME> iallocator plugin",
584
                            default=None, type="string",
585
                            completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
586

    
587
OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
588
                    metavar="<os>",
589
                    completion_suggest=OPT_COMPL_ONE_OS)
590

    
591
FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
592
                               action="store_true", default=False,
593
                               help="Force an unknown variant")
594

    
595
NO_INSTALL_OPT = cli_option("--no-install", dest="no_install",
596
                            action="store_true", default=False,
597
                            help="Do not install the OS (will"
598
                            " enable no-start)")
599

    
600
BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
601
                         type="keyval", default={},
602
                         help="Backend parameters")
603

    
604
HVOPTS_OPT =  cli_option("-H", "--hypervisor-parameters", type="keyval",
605
                         default={}, dest="hvparams",
606
                         help="Hypervisor parameters")
607

    
608
HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
609
                            help="Hypervisor and hypervisor options, in the"
610
                            " format hypervisor:option=value,option=value,...",
611
                            default=None, type="identkeyval")
612

    
613
HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
614
                        help="Hypervisor and hypervisor options, in the"
615
                        " format hypervisor:option=value,option=value,...",
616
                        default=[], action="append", type="identkeyval")
617

    
618
NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
619
                           action="store_false",
620
                           help="Don't check that the instance's IP"
621
                           " is alive")
622

    
623
NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
624
                             default=True, action="store_false",
625
                             help="Don't check that the instance's name"
626
                             " is resolvable")
627

    
628
NET_OPT = cli_option("--net",
629
                     help="NIC parameters", default=[],
630
                     dest="nics", action="append", type="identkeyval")
631

    
632
DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
633
                      dest="disks", action="append", type="identkeyval")
634

    
635
DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
636
                         help="Comma-separated list of disks"
637
                         " indices to act on (e.g. 0,2) (optional,"
638
                         " defaults to all disks)")
639

    
640
OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
641
                         help="Enforces a single-disk configuration using the"
642
                         " given disk size, in MiB unless a suffix is used",
643
                         default=None, type="unit", metavar="<size>")
644

    
645
IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
646
                                dest="ignore_consistency",
647
                                action="store_true", default=False,
648
                                help="Ignore the consistency of the disks on"
649
                                " the secondary")
650

    
651
NONLIVE_OPT = cli_option("--non-live", dest="live",
652
                         default=True, action="store_false",
653
                         help="Do a non-live migration (this usually means"
654
                         " freeze the instance, save the state, transfer and"
655
                         " only then resume running on the secondary node)")
656

    
657
NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
658
                                help="Target node and optional secondary node",
659
                                metavar="<pnode>[:<snode>]",
660
                                completion_suggest=OPT_COMPL_INST_ADD_NODES)
661

    
662
NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
663
                           action="append", metavar="<node>",
664
                           help="Use only this node (can be used multiple"
665
                           " times, if not given defaults to all nodes)",
666
                           completion_suggest=OPT_COMPL_ONE_NODE)
667

    
668
SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
669
                             metavar="<node>",
670
                             completion_suggest=OPT_COMPL_ONE_NODE)
671

    
672
NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
673
                         action="store_false",
674
                         help="Don't start the instance after creation")
675

    
676
SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
677
                         action="store_true", default=False,
678
                         help="Show command instead of executing it")
679

    
680
CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
681
                         default=False, action="store_true",
682
                         help="Instead of performing the migration, try to"
683
                         " recover from a failed cleanup. This is safe"
684
                         " to run even if the instance is healthy, but it"
685
                         " will create extra replication traffic and "
686
                         " disrupt briefly the replication (like during the"
687
                         " migration")
688

    
689
STATIC_OPT = cli_option("-s", "--static", dest="static",
690
                        action="store_true", default=False,
691
                        help="Only show configuration data, not runtime data")
692

    
693
ALL_OPT = cli_option("--all", dest="show_all",
694
                     default=False, action="store_true",
695
                     help="Show info on all instances on the cluster."
696
                     " This can take a long time to run, use wisely")
697

    
698
SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
699
                           action="store_true", default=False,
700
                           help="Interactive OS reinstall, lists available"
701
                           " OS templates for selection")
702

    
703
IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
704
                                 action="store_true", default=False,
705
                                 help="Remove the instance from the cluster"
706
                                 " configuration even if there are failures"
707
                                 " during the removal process")
708

    
709
NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
710
                               help="Specifies the new secondary node",
711
                               metavar="NODE", default=None,
712
                               completion_suggest=OPT_COMPL_ONE_NODE)
713

    
714
ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
715
                            default=False, action="store_true",
716
                            help="Replace the disk(s) on the primary"
717
                            " node (only for the drbd template)")
718

    
719
ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
720
                              default=False, action="store_true",
721
                              help="Replace the disk(s) on the secondary"
722
                              " node (only for the drbd template)")
723

    
724
AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
725
                              default=False, action="store_true",
726
                              help="Lock all nodes and auto-promote as needed"
727
                              " to MC status")
728

    
729
AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
730
                              default=False, action="store_true",
731
                              help="Automatically replace faulty disks"
732
                              " (only for the drbd template)")
733

    
734
IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
735
                             default=False, action="store_true",
736
                             help="Ignore current recorded size"
737
                             " (useful for forcing activation when"
738
                             " the recorded size is wrong)")
739

    
740
SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
741
                          metavar="<node>",
742
                          completion_suggest=OPT_COMPL_ONE_NODE)
743

    
744
SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
745
                         metavar="<dir>")
746

    
747
SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
748
                              help="Specify the secondary ip for the node",
749
                              metavar="ADDRESS", default=None)
750

    
751
READD_OPT = cli_option("--readd", dest="readd",
752
                       default=False, action="store_true",
753
                       help="Readd old node after replacing it")
754

    
755
NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
756
                                default=True, action="store_false",
757
                                help="Disable SSH key fingerprint checking")
758

    
759

    
760
MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
761
                    choices=_YESNO, default=None, metavar=_YORNO,
762
                    help="Set the master_candidate flag on the node")
763

    
764
OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
765
                         choices=_YESNO, default=None,
766
                         help="Set the offline flag on the node")
767

    
768
DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
769
                         choices=_YESNO, default=None,
770
                         help="Set the drained flag on the node")
771

    
772
ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
773
                             choices=_YESNO, default=None, metavar=_YORNO,
774
                             help="Set the allocatable flag on a volume")
775

    
776
NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
777
                               help="Disable support for lvm based instances"
778
                               " (cluster-wide)",
779
                               action="store_false", default=True)
780

    
781
ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
782
                            dest="enabled_hypervisors",
783
                            help="Comma-separated list of hypervisors",
784
                            type="string", default=None)
785

    
786
NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
787
                            type="keyval", default={},
788
                            help="NIC parameters")
789

    
790
CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
791
                         dest="candidate_pool_size", type="int",
792
                         help="Set the candidate pool size")
793

    
794
VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name",
795
                         help="Enables LVM and specifies the volume group"
796
                         " name (cluster-wide) for disk allocation [xenvg]",
797
                         metavar="VG", default=None)
798

    
799
YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
800
                          help="Destroy cluster", action="store_true")
801

    
802
NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
803
                          help="Skip node agreement check (dangerous)",
804
                          action="store_true", default=False)
805

    
806
MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
807
                            help="Specify the mac prefix for the instance IP"
808
                            " addresses, in the format XX:XX:XX",
809
                            metavar="PREFIX",
810
                            default=None)
811

    
812
MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
813
                               help="Specify the node interface (cluster-wide)"
814
                               " on which the master IP address will be added "
815
                               " [%s]" % constants.DEFAULT_BRIDGE,
816
                               metavar="NETDEV",
817
                               default=constants.DEFAULT_BRIDGE)
818

    
819

    
820
GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
821
                                help="Specify the default directory (cluster-"
822
                                "wide) for storing the file-based disks [%s]" %
823
                                constants.DEFAULT_FILE_STORAGE_DIR,
824
                                metavar="DIR",
825
                                default=constants.DEFAULT_FILE_STORAGE_DIR)
826

    
827
NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
828
                                   help="Don't modify /etc/hosts",
829
                                   action="store_false", default=True)
830

    
831
NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
832
                                    help="Don't initialize SSH keys",
833
                                    action="store_false", default=True)
834

    
835
ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
836
                             help="Enable parseable error messages",
837
                             action="store_true", default=False)
838

    
839
NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
840
                          help="Skip N+1 memory redundancy tests",
841
                          action="store_true", default=False)
842

    
843
REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
844
                             help="Type of reboot: soft/hard/full",
845
                             default=constants.INSTANCE_REBOOT_HARD,
846
                             metavar="<REBOOT>",
847
                             choices=list(constants.REBOOT_TYPES))
848

    
849
IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
850
                                    dest="ignore_secondaries",
851
                                    default=False, action="store_true",
852
                                    help="Ignore errors from secondaries")
853

    
854
NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
855
                            action="store_false", default=True,
856
                            help="Don't shutdown the instance (unsafe)")
857

    
858
TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
859
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
860
                         help="Maximum time to wait")
861

    
862
SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
863
                         dest="shutdown_timeout", type="int",
864
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
865
                         help="Maximum time to wait for instance shutdown")
866

    
867
EARLY_RELEASE_OPT = cli_option("--early-release",
868
                               dest="early_release", default=False,
869
                               action="store_true",
870
                               help="Release the locks on the secondary"
871
                               " node(s) early")
872

    
873
NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
874
                                  dest="new_cluster_cert",
875
                                  default=False, action="store_true",
876
                                  help="Generate a new cluster certificate")
877

    
878
RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
879
                           default=None,
880
                           help="File containing new RAPI certificate")
881

    
882
NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
883
                               default=None, action="store_true",
884
                               help=("Generate a new self-signed RAPI"
885
                                     " certificate"))
886

    
887
NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
888
                                    dest="new_confd_hmac_key",
889
                                    default=False, action="store_true",
890
                                    help=("Create a new HMAC key for %s" %
891
                                          constants.CONFD))
892

    
893

    
894
def _ParseArgs(argv, commands, aliases):
895
  """Parser for the command line arguments.
896

897
  This function parses the arguments and returns the function which
898
  must be executed together with its (modified) arguments.
899

900
  @param argv: the command line
901
  @param commands: dictionary with special contents, see the design
902
      doc for cmdline handling
903
  @param aliases: dictionary with command aliases {'alias': 'target, ...}
904

905
  """
906
  if len(argv) == 0:
907
    binary = "<command>"
908
  else:
909
    binary = argv[0].split("/")[-1]
910

    
911
  if len(argv) > 1 and argv[1] == "--version":
912
    ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
913
    # Quit right away. That way we don't have to care about this special
914
    # argument. optparse.py does it the same.
915
    sys.exit(0)
916

    
917
  if len(argv) < 2 or not (argv[1] in commands or
918
                           argv[1] in aliases):
919
    # let's do a nice thing
920
    sortedcmds = commands.keys()
921
    sortedcmds.sort()
922

    
923
    ToStdout("Usage: %s {command} [options...] [argument...]", binary)
924
    ToStdout("%s <command> --help to see details, or man %s", binary, binary)
925
    ToStdout("")
926

    
927
    # compute the max line length for cmd + usage
928
    mlen = max([len(" %s" % cmd) for cmd in commands])
929
    mlen = min(60, mlen) # should not get here...
930

    
931
    # and format a nice command list
932
    ToStdout("Commands:")
933
    for cmd in sortedcmds:
934
      cmdstr = " %s" % (cmd,)
935
      help_text = commands[cmd][4]
936
      help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
937
      ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
938
      for line in help_lines:
939
        ToStdout("%-*s   %s", mlen, "", line)
940

    
941
    ToStdout("")
942

    
943
    return None, None, None
944

    
945
  # get command, unalias it, and look it up in commands
946
  cmd = argv.pop(1)
947
  if cmd in aliases:
948
    if cmd in commands:
949
      raise errors.ProgrammerError("Alias '%s' overrides an existing"
950
                                   " command" % cmd)
951

    
952
    if aliases[cmd] not in commands:
953
      raise errors.ProgrammerError("Alias '%s' maps to non-existing"
954
                                   " command '%s'" % (cmd, aliases[cmd]))
955

    
956
    cmd = aliases[cmd]
957

    
958
  func, args_def, parser_opts, usage, description = commands[cmd]
959
  parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT, DEBUG_OPT],
960
                        description=description,
961
                        formatter=TitledHelpFormatter(),
962
                        usage="%%prog %s %s" % (cmd, usage))
963
  parser.disable_interspersed_args()
964
  options, args = parser.parse_args()
965

    
966
  if not _CheckArguments(cmd, args_def, args):
967
    return None, None, None
968

    
969
  return func, options, args
970

    
971

    
972
def _CheckArguments(cmd, args_def, args):
973
  """Verifies the arguments using the argument definition.
974

975
  Algorithm:
976

977
    1. Abort with error if values specified by user but none expected.
978

979
    1. For each argument in definition
980

981
      1. Keep running count of minimum number of values (min_count)
982
      1. Keep running count of maximum number of values (max_count)
983
      1. If it has an unlimited number of values
984

985
        1. Abort with error if it's not the last argument in the definition
986

987
    1. If last argument has limited number of values
988

989
      1. Abort with error if number of values doesn't match or is too large
990

991
    1. Abort with error if user didn't pass enough values (min_count)
992

993
  """
994
  if args and not args_def:
995
    ToStderr("Error: Command %s expects no arguments", cmd)
996
    return False
997

    
998
  min_count = None
999
  max_count = None
1000
  check_max = None
1001

    
1002
  last_idx = len(args_def) - 1
1003

    
1004
  for idx, arg in enumerate(args_def):
1005
    if min_count is None:
1006
      min_count = arg.min
1007
    elif arg.min is not None:
1008
      min_count += arg.min
1009

    
1010
    if max_count is None:
1011
      max_count = arg.max
1012
    elif arg.max is not None:
1013
      max_count += arg.max
1014

    
1015
    if idx == last_idx:
1016
      check_max = (arg.max is not None)
1017

    
1018
    elif arg.max is None:
1019
      raise errors.ProgrammerError("Only the last argument can have max=None")
1020

    
1021
  if check_max:
1022
    # Command with exact number of arguments
1023
    if (min_count is not None and max_count is not None and
1024
        min_count == max_count and len(args) != min_count):
1025
      ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
1026
      return False
1027

    
1028
    # Command with limited number of arguments
1029
    if max_count is not None and len(args) > max_count:
1030
      ToStderr("Error: Command %s expects only %d argument(s)",
1031
               cmd, max_count)
1032
      return False
1033

    
1034
  # Command with some required arguments
1035
  if min_count is not None and len(args) < min_count:
1036
    ToStderr("Error: Command %s expects at least %d argument(s)",
1037
             cmd, min_count)
1038
    return False
1039

    
1040
  return True
1041

    
1042

    
1043
def SplitNodeOption(value):
1044
  """Splits the value of a --node option.
1045

1046
  """
1047
  if value and ':' in value:
1048
    return value.split(':', 1)
1049
  else:
1050
    return (value, None)
1051

    
1052

    
1053
def CalculateOSNames(os_name, os_variants):
1054
  """Calculates all the names an OS can be called, according to its variants.
1055

1056
  @type os_name: string
1057
  @param os_name: base name of the os
1058
  @type os_variants: list or None
1059
  @param os_variants: list of supported variants
1060
  @rtype: list
1061
  @return: list of valid names
1062

1063
  """
1064
  if os_variants:
1065
    return ['%s+%s' % (os_name, v) for v in os_variants]
1066
  else:
1067
    return [os_name]
1068

    
1069

    
1070
def UsesRPC(fn):
1071
  def wrapper(*args, **kwargs):
1072
    rpc.Init()
1073
    try:
1074
      return fn(*args, **kwargs)
1075
    finally:
1076
      rpc.Shutdown()
1077
  return wrapper
1078

    
1079

    
1080
def AskUser(text, choices=None):
1081
  """Ask the user a question.
1082

1083
  @param text: the question to ask
1084

1085
  @param choices: list with elements tuples (input_char, return_value,
1086
      description); if not given, it will default to: [('y', True,
1087
      'Perform the operation'), ('n', False, 'Do no do the operation')];
1088
      note that the '?' char is reserved for help
1089

1090
  @return: one of the return values from the choices list; if input is
1091
      not possible (i.e. not running with a tty, we return the last
1092
      entry from the list
1093

1094
  """
1095
  if choices is None:
1096
    choices = [('y', True, 'Perform the operation'),
1097
               ('n', False, 'Do not perform the operation')]
1098
  if not choices or not isinstance(choices, list):
1099
    raise errors.ProgrammerError("Invalid choices argument to AskUser")
1100
  for entry in choices:
1101
    if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
1102
      raise errors.ProgrammerError("Invalid choices element to AskUser")
1103

    
1104
  answer = choices[-1][1]
1105
  new_text = []
1106
  for line in text.splitlines():
1107
    new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
1108
  text = "\n".join(new_text)
1109
  try:
1110
    f = file("/dev/tty", "a+")
1111
  except IOError:
1112
    return answer
1113
  try:
1114
    chars = [entry[0] for entry in choices]
1115
    chars[-1] = "[%s]" % chars[-1]
1116
    chars.append('?')
1117
    maps = dict([(entry[0], entry[1]) for entry in choices])
1118
    while True:
1119
      f.write(text)
1120
      f.write('\n')
1121
      f.write("/".join(chars))
1122
      f.write(": ")
1123
      line = f.readline(2).strip().lower()
1124
      if line in maps:
1125
        answer = maps[line]
1126
        break
1127
      elif line == '?':
1128
        for entry in choices:
1129
          f.write(" %s - %s\n" % (entry[0], entry[2]))
1130
        f.write("\n")
1131
        continue
1132
  finally:
1133
    f.close()
1134
  return answer
1135

    
1136

    
1137
class JobSubmittedException(Exception):
1138
  """Job was submitted, client should exit.
1139

1140
  This exception has one argument, the ID of the job that was
1141
  submitted. The handler should print this ID.
1142

1143
  This is not an error, just a structured way to exit from clients.
1144

1145
  """
1146

    
1147

    
1148
def SendJob(ops, cl=None):
1149
  """Function to submit an opcode without waiting for the results.
1150

1151
  @type ops: list
1152
  @param ops: list of opcodes
1153
  @type cl: luxi.Client
1154
  @param cl: the luxi client to use for communicating with the master;
1155
             if None, a new client will be created
1156

1157
  """
1158
  if cl is None:
1159
    cl = GetClient()
1160

    
1161
  job_id = cl.SubmitJob(ops)
1162

    
1163
  return job_id
1164

    
1165

    
1166
def PollJob(job_id, cl=None, feedback_fn=None):
1167
  """Function to poll for the result of a job.
1168

1169
  @type job_id: job identified
1170
  @param job_id: the job to poll for results
1171
  @type cl: luxi.Client
1172
  @param cl: the luxi client to use for communicating with the master;
1173
             if None, a new client will be created
1174

1175
  """
1176
  if cl is None:
1177
    cl = GetClient()
1178

    
1179
  prev_job_info = None
1180
  prev_logmsg_serial = None
1181

    
1182
  status = None
1183

    
1184
  notified_queued = False
1185
  notified_waitlock = False
1186

    
1187
  while True:
1188
    result = cl.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1189
                                     prev_logmsg_serial)
1190
    if not result:
1191
      # job not found, go away!
1192
      raise errors.JobLost("Job with id %s lost" % job_id)
1193
    elif result == constants.JOB_NOTCHANGED:
1194
      if status is not None and not callable(feedback_fn):
1195
        if status == constants.JOB_STATUS_QUEUED and not notified_queued:
1196
          ToStderr("Job %s is waiting in queue", job_id)
1197
          notified_queued = True
1198
        elif status == constants.JOB_STATUS_WAITLOCK and not notified_waitlock:
1199
          ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1200
          notified_waitlock = True
1201

    
1202
      # Wait again
1203
      continue
1204

    
1205
    # Split result, a tuple of (field values, log entries)
1206
    (job_info, log_entries) = result
1207
    (status, ) = job_info
1208

    
1209
    if log_entries:
1210
      for log_entry in log_entries:
1211
        (serial, timestamp, _, message) = log_entry
1212
        if callable(feedback_fn):
1213
          feedback_fn(log_entry[1:])
1214
        else:
1215
          encoded = utils.SafeEncode(message)
1216
          ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded)
1217
        prev_logmsg_serial = max(prev_logmsg_serial, serial)
1218

    
1219
    # TODO: Handle canceled and archived jobs
1220
    elif status in (constants.JOB_STATUS_SUCCESS,
1221
                    constants.JOB_STATUS_ERROR,
1222
                    constants.JOB_STATUS_CANCELING,
1223
                    constants.JOB_STATUS_CANCELED):
1224
      break
1225

    
1226
    prev_job_info = job_info
1227

    
1228
  jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1229
  if not jobs:
1230
    raise errors.JobLost("Job with id %s lost" % job_id)
1231

    
1232
  status, opstatus, result = jobs[0]
1233
  if status == constants.JOB_STATUS_SUCCESS:
1234
    return result
1235
  elif status in (constants.JOB_STATUS_CANCELING,
1236
                  constants.JOB_STATUS_CANCELED):
1237
    raise errors.OpExecError("Job was canceled")
1238
  else:
1239
    has_ok = False
1240
    for idx, (status, msg) in enumerate(zip(opstatus, result)):
1241
      if status == constants.OP_STATUS_SUCCESS:
1242
        has_ok = True
1243
      elif status == constants.OP_STATUS_ERROR:
1244
        errors.MaybeRaise(msg)
1245
        if has_ok:
1246
          raise errors.OpExecError("partial failure (opcode %d): %s" %
1247
                                   (idx, msg))
1248
        else:
1249
          raise errors.OpExecError(str(msg))
1250
    # default failure mode
1251
    raise errors.OpExecError(result)
1252

    
1253

    
1254
def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):
1255
  """Legacy function to submit an opcode.
1256

1257
  This is just a simple wrapper over the construction of the processor
1258
  instance. It should be extended to better handle feedback and
1259
  interaction functions.
1260

1261
  """
1262
  if cl is None:
1263
    cl = GetClient()
1264

    
1265
  SetGenericOpcodeOpts([op], opts)
1266

    
1267
  job_id = SendJob([op], cl)
1268

    
1269
  op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
1270

    
1271
  return op_results[0]
1272

    
1273

    
1274
def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1275
  """Wrapper around SubmitOpCode or SendJob.
1276

1277
  This function will decide, based on the 'opts' parameter, whether to
1278
  submit and wait for the result of the opcode (and return it), or
1279
  whether to just send the job and print its identifier. It is used in
1280
  order to simplify the implementation of the '--submit' option.
1281

1282
  It will also process the opcodes if we're sending the via SendJob
1283
  (otherwise SubmitOpCode does it).
1284

1285
  """
1286
  if opts and opts.submit_only:
1287
    job = [op]
1288
    SetGenericOpcodeOpts(job, opts)
1289
    job_id = SendJob(job, cl=cl)
1290
    raise JobSubmittedException(job_id)
1291
  else:
1292
    return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1293

    
1294

    
1295
def SetGenericOpcodeOpts(opcode_list, options):
1296
  """Processor for generic options.
1297

1298
  This function updates the given opcodes based on generic command
1299
  line options (like debug, dry-run, etc.).
1300

1301
  @param opcode_list: list of opcodes
1302
  @param options: command line options or None
1303
  @return: None (in-place modification)
1304

1305
  """
1306
  if not options:
1307
    return
1308
  for op in opcode_list:
1309
    op.dry_run = options.dry_run
1310
    op.debug_level = options.debug
1311

    
1312

    
1313
def GetClient():
1314
  # TODO: Cache object?
1315
  try:
1316
    client = luxi.Client()
1317
  except luxi.NoMasterError:
1318
    ss = ssconf.SimpleStore()
1319

    
1320
    # Try to read ssconf file
1321
    try:
1322
      ss.GetMasterNode()
1323
    except errors.ConfigurationError:
1324
      raise errors.OpPrereqError("Cluster not initialized or this machine is"
1325
                                 " not part of a cluster")
1326

    
1327
    master, myself = ssconf.GetMasterAndMyself(ss=ss)
1328
    if master != myself:
1329
      raise errors.OpPrereqError("This is not the master node, please connect"
1330
                                 " to node '%s' and rerun the command" %
1331
                                 master)
1332
    raise
1333
  return client
1334

    
1335

    
1336
def FormatError(err):
1337
  """Return a formatted error message for a given error.
1338

1339
  This function takes an exception instance and returns a tuple
1340
  consisting of two values: first, the recommended exit code, and
1341
  second, a string describing the error message (not
1342
  newline-terminated).
1343

1344
  """
1345
  retcode = 1
1346
  obuf = StringIO()
1347
  msg = str(err)
1348
  if isinstance(err, errors.ConfigurationError):
1349
    txt = "Corrupt configuration file: %s" % msg
1350
    logging.error(txt)
1351
    obuf.write(txt + "\n")
1352
    obuf.write("Aborting.")
1353
    retcode = 2
1354
  elif isinstance(err, errors.HooksAbort):
1355
    obuf.write("Failure: hooks execution failed:\n")
1356
    for node, script, out in err.args[0]:
1357
      if out:
1358
        obuf.write("  node: %s, script: %s, output: %s\n" %
1359
                   (node, script, out))
1360
      else:
1361
        obuf.write("  node: %s, script: %s (no output)\n" %
1362
                   (node, script))
1363
  elif isinstance(err, errors.HooksFailure):
1364
    obuf.write("Failure: hooks general failure: %s" % msg)
1365
  elif isinstance(err, errors.ResolverError):
1366
    this_host = utils.HostInfo.SysName()
1367
    if err.args[0] == this_host:
1368
      msg = "Failure: can't resolve my own hostname ('%s')"
1369
    else:
1370
      msg = "Failure: can't resolve hostname '%s'"
1371
    obuf.write(msg % err.args[0])
1372
  elif isinstance(err, errors.OpPrereqError):
1373
    if len(err.args) == 2:
1374
      obuf.write("Failure: prerequisites not met for this"
1375
               " operation:\nerror type: %s, error details:\n%s" %
1376
                 (err.args[1], err.args[0]))
1377
    else:
1378
      obuf.write("Failure: prerequisites not met for this"
1379
                 " operation:\n%s" % msg)
1380
  elif isinstance(err, errors.OpExecError):
1381
    obuf.write("Failure: command execution error:\n%s" % msg)
1382
  elif isinstance(err, errors.TagError):
1383
    obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1384
  elif isinstance(err, errors.JobQueueDrainError):
1385
    obuf.write("Failure: the job queue is marked for drain and doesn't"
1386
               " accept new requests\n")
1387
  elif isinstance(err, errors.JobQueueFull):
1388
    obuf.write("Failure: the job queue is full and doesn't accept new"
1389
               " job submissions until old jobs are archived\n")
1390
  elif isinstance(err, errors.TypeEnforcementError):
1391
    obuf.write("Parameter Error: %s" % msg)
1392
  elif isinstance(err, errors.ParameterError):
1393
    obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1394
  elif isinstance(err, errors.GenericError):
1395
    obuf.write("Unhandled Ganeti error: %s" % msg)
1396
  elif isinstance(err, luxi.NoMasterError):
1397
    obuf.write("Cannot communicate with the master daemon.\nIs it running"
1398
               " and listening for connections?")
1399
  elif isinstance(err, luxi.TimeoutError):
1400
    obuf.write("Timeout while talking to the master daemon. Error:\n"
1401
               "%s" % msg)
1402
  elif isinstance(err, luxi.ProtocolError):
1403
    obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1404
               "%s" % msg)
1405
  elif isinstance(err, JobSubmittedException):
1406
    obuf.write("JobID: %s\n" % err.args[0])
1407
    retcode = 0
1408
  else:
1409
    obuf.write("Unhandled exception: %s" % msg)
1410
  return retcode, obuf.getvalue().rstrip('\n')
1411

    
1412

    
1413
def GenericMain(commands, override=None, aliases=None):
1414
  """Generic main function for all the gnt-* commands.
1415

1416
  Arguments:
1417
    - commands: a dictionary with a special structure, see the design doc
1418
                for command line handling.
1419
    - override: if not None, we expect a dictionary with keys that will
1420
                override command line options; this can be used to pass
1421
                options from the scripts to generic functions
1422
    - aliases: dictionary with command aliases {'alias': 'target, ...}
1423

1424
  """
1425
  # save the program name and the entire command line for later logging
1426
  if sys.argv:
1427
    binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1428
    if len(sys.argv) >= 2:
1429
      binary += " " + sys.argv[1]
1430
      old_cmdline = " ".join(sys.argv[2:])
1431
    else:
1432
      old_cmdline = ""
1433
  else:
1434
    binary = "<unknown program>"
1435
    old_cmdline = ""
1436

    
1437
  if aliases is None:
1438
    aliases = {}
1439

    
1440
  try:
1441
    func, options, args = _ParseArgs(sys.argv, commands, aliases)
1442
  except errors.ParameterError, err:
1443
    result, err_msg = FormatError(err)
1444
    ToStderr(err_msg)
1445
    return 1
1446

    
1447
  if func is None: # parse error
1448
    return 1
1449

    
1450
  if override is not None:
1451
    for key, val in override.iteritems():
1452
      setattr(options, key, val)
1453

    
1454
  utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1455
                     stderr_logging=True, program=binary)
1456

    
1457
  if old_cmdline:
1458
    logging.info("run with arguments '%s'", old_cmdline)
1459
  else:
1460
    logging.info("run with no arguments")
1461

    
1462
  try:
1463
    result = func(options, args)
1464
  except (errors.GenericError, luxi.ProtocolError,
1465
          JobSubmittedException), err:
1466
    result, err_msg = FormatError(err)
1467
    logging.exception("Error during command processing")
1468
    ToStderr(err_msg)
1469

    
1470
  return result
1471

    
1472

    
1473
def GenericInstanceCreate(mode, opts, args):
1474
  """Add an instance to the cluster via either creation or import.
1475

1476
  @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1477
  @param opts: the command line options selected by the user
1478
  @type args: list
1479
  @param args: should contain only one element, the new instance name
1480
  @rtype: int
1481
  @return: the desired exit code
1482

1483
  """
1484
  instance = args[0]
1485

    
1486
  (pnode, snode) = SplitNodeOption(opts.node)
1487

    
1488
  hypervisor = None
1489
  hvparams = {}
1490
  if opts.hypervisor:
1491
    hypervisor, hvparams = opts.hypervisor
1492

    
1493
  if opts.nics:
1494
    try:
1495
      nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
1496
    except ValueError, err:
1497
      raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
1498
    nics = [{}] * nic_max
1499
    for nidx, ndict in opts.nics:
1500
      nidx = int(nidx)
1501
      if not isinstance(ndict, dict):
1502
        msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict)
1503
        raise errors.OpPrereqError(msg)
1504
      nics[nidx] = ndict
1505
  elif opts.no_nics:
1506
    # no nics
1507
    nics = []
1508
  else:
1509
    # default of one nic, all auto
1510
    nics = [{}]
1511

    
1512
  if opts.disk_template == constants.DT_DISKLESS:
1513
    if opts.disks or opts.sd_size is not None:
1514
      raise errors.OpPrereqError("Diskless instance but disk"
1515
                                 " information passed")
1516
    disks = []
1517
  else:
1518
    if not opts.disks and not opts.sd_size:
1519
      raise errors.OpPrereqError("No disk information specified")
1520
    if opts.disks and opts.sd_size is not None:
1521
      raise errors.OpPrereqError("Please use either the '--disk' or"
1522
                                 " '-s' option")
1523
    if opts.sd_size is not None:
1524
      opts.disks = [(0, {"size": opts.sd_size})]
1525
    try:
1526
      disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1527
    except ValueError, err:
1528
      raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
1529
    disks = [{}] * disk_max
1530
    for didx, ddict in opts.disks:
1531
      didx = int(didx)
1532
      if not isinstance(ddict, dict):
1533
        msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1534
        raise errors.OpPrereqError(msg)
1535
      elif "size" in ddict:
1536
        if "adopt" in ddict:
1537
          raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1538
                                     " (disk %d)" % didx)
1539
        try:
1540
          ddict["size"] = utils.ParseUnit(ddict["size"])
1541
        except ValueError, err:
1542
          raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1543
                                     (didx, err))
1544
      elif "adopt" in ddict:
1545
        if mode == constants.INSTANCE_IMPORT:
1546
          raise errors.OpPrereqError("Disk adoption not allowed for instance"
1547
                                     " import")
1548
        ddict["size"] = 0
1549
      else:
1550
        raise errors.OpPrereqError("Missing size or adoption source for"
1551
                                   " disk %d" % didx)
1552
      disks[didx] = ddict
1553

    
1554
  utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
1555
  utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1556

    
1557
  if mode == constants.INSTANCE_CREATE:
1558
    start = opts.start
1559
    os_type = opts.os
1560
    src_node = None
1561
    src_path = None
1562
    no_install = opts.no_install
1563
  elif mode == constants.INSTANCE_IMPORT:
1564
    start = False
1565
    os_type = None
1566
    src_node = opts.src_node
1567
    src_path = opts.src_dir
1568
    no_install = None
1569
  else:
1570
    raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1571

    
1572
  op = opcodes.OpCreateInstance(instance_name=instance,
1573
                                disks=disks,
1574
                                disk_template=opts.disk_template,
1575
                                nics=nics,
1576
                                pnode=pnode, snode=snode,
1577
                                ip_check=opts.ip_check,
1578
                                name_check=opts.name_check,
1579
                                wait_for_sync=opts.wait_for_sync,
1580
                                file_storage_dir=opts.file_storage_dir,
1581
                                file_driver=opts.file_driver,
1582
                                iallocator=opts.iallocator,
1583
                                hypervisor=hypervisor,
1584
                                hvparams=hvparams,
1585
                                beparams=opts.beparams,
1586
                                mode=mode,
1587
                                start=start,
1588
                                os_type=os_type,
1589
                                src_node=src_node,
1590
                                src_path=src_path,
1591
                                no_install=no_install)
1592

    
1593
  SubmitOrSend(op, opts)
1594
  return 0
1595

    
1596

    
1597
class _RunWhileClusterStoppedHelper:
1598
  """Helper class for L{RunWhileClusterStopped} to simplify state management
1599

1600
  """
1601
  def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
1602
    """Initializes this class.
1603

1604
    @type feedback_fn: callable
1605
    @param feedback_fn: Feedback function
1606
    @type cluster_name: string
1607
    @param cluster_name: Cluster name
1608
    @type master_node: string
1609
    @param master_node Master node name
1610
    @type online_nodes: list
1611
    @param online_nodes: List of names of online nodes
1612

1613
    """
1614
    self.feedback_fn = feedback_fn
1615
    self.cluster_name = cluster_name
1616
    self.master_node = master_node
1617
    self.online_nodes = online_nodes
1618

    
1619
    self.ssh = ssh.SshRunner(self.cluster_name)
1620

    
1621
    self.nonmaster_nodes = [name for name in online_nodes
1622
                            if name != master_node]
1623

    
1624
    assert self.master_node not in self.nonmaster_nodes
1625

    
1626
  def _RunCmd(self, node_name, cmd):
1627
    """Runs a command on the local or a remote machine.
1628

1629
    @type node_name: string
1630
    @param node_name: Machine name
1631
    @type cmd: list
1632
    @param cmd: Command
1633

1634
    """
1635
    if node_name is None or node_name == self.master_node:
1636
      # No need to use SSH
1637
      result = utils.RunCmd(cmd)
1638
    else:
1639
      result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
1640

    
1641
    if result.failed:
1642
      errmsg = ["Failed to run command %s" % result.cmd]
1643
      if node_name:
1644
        errmsg.append("on node %s" % node_name)
1645
      errmsg.append(": exitcode %s and error %s" %
1646
                    (result.exit_code, result.output))
1647
      raise errors.OpExecError(" ".join(errmsg))
1648

    
1649
  def Call(self, fn, *args):
1650
    """Call function while all daemons are stopped.
1651

1652
    @type fn: callable
1653
    @param fn: Function to be called
1654

1655
    """
1656
    # Pause watcher by acquiring an exclusive lock on watcher state file
1657
    self.feedback_fn("Blocking watcher")
1658
    watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
1659
    try:
1660
      # TODO: Currently, this just blocks. There's no timeout.
1661
      # TODO: Should it be a shared lock?
1662
      watcher_block.Exclusive(blocking=True)
1663

    
1664
      # Stop master daemons, so that no new jobs can come in and all running
1665
      # ones are finished
1666
      self.feedback_fn("Stopping master daemons")
1667
      self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
1668
      try:
1669
        # Stop daemons on all nodes
1670
        for node_name in self.online_nodes:
1671
          self.feedback_fn("Stopping daemons on %s" % node_name)
1672
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
1673

    
1674
        # All daemons are shut down now
1675
        try:
1676
          return fn(self, *args)
1677
        except Exception, err:
1678
          _, errmsg = FormatError(err)
1679
          logging.exception("Caught exception")
1680
          self.feedback_fn(errmsg)
1681
          raise
1682
      finally:
1683
        # Start cluster again, master node last
1684
        for node_name in self.nonmaster_nodes + [self.master_node]:
1685
          self.feedback_fn("Starting daemons on %s" % node_name)
1686
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
1687
    finally:
1688
      # Resume watcher
1689
      watcher_block.Close()
1690

    
1691

    
1692
def RunWhileClusterStopped(feedback_fn, fn, *args):
1693
  """Calls a function while all cluster daemons are stopped.
1694

1695
  @type feedback_fn: callable
1696
  @param feedback_fn: Feedback function
1697
  @type fn: callable
1698
  @param fn: Function to be called when daemons are stopped
1699

1700
  """
1701
  feedback_fn("Gathering cluster information")
1702

    
1703
  # This ensures we're running on the master daemon
1704
  cl = GetClient()
1705

    
1706
  (cluster_name, master_node) = \
1707
    cl.QueryConfigValues(["cluster_name", "master_node"])
1708

    
1709
  online_nodes = GetOnlineNodes([], cl=cl)
1710

    
1711
  # Don't keep a reference to the client. The master daemon will go away.
1712
  del cl
1713

    
1714
  assert master_node in online_nodes
1715

    
1716
  return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
1717
                                       online_nodes).Call(fn, *args)
1718

    
1719

    
1720
def GenerateTable(headers, fields, separator, data,
1721
                  numfields=None, unitfields=None,
1722
                  units=None):
1723
  """Prints a table with headers and different fields.
1724

1725
  @type headers: dict
1726
  @param headers: dictionary mapping field names to headers for
1727
      the table
1728
  @type fields: list
1729
  @param fields: the field names corresponding to each row in
1730
      the data field
1731
  @param separator: the separator to be used; if this is None,
1732
      the default 'smart' algorithm is used which computes optimal
1733
      field width, otherwise just the separator is used between
1734
      each field
1735
  @type data: list
1736
  @param data: a list of lists, each sublist being one row to be output
1737
  @type numfields: list
1738
  @param numfields: a list with the fields that hold numeric
1739
      values and thus should be right-aligned
1740
  @type unitfields: list
1741
  @param unitfields: a list with the fields that hold numeric
1742
      values that should be formatted with the units field
1743
  @type units: string or None
1744
  @param units: the units we should use for formatting, or None for
1745
      automatic choice (human-readable for non-separator usage, otherwise
1746
      megabytes); this is a one-letter string
1747

1748
  """
1749
  if units is None:
1750
    if separator:
1751
      units = "m"
1752
    else:
1753
      units = "h"
1754

    
1755
  if numfields is None:
1756
    numfields = []
1757
  if unitfields is None:
1758
    unitfields = []
1759

    
1760
  numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
1761
  unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
1762

    
1763
  format_fields = []
1764
  for field in fields:
1765
    if headers and field not in headers:
1766
      # TODO: handle better unknown fields (either revert to old
1767
      # style of raising exception, or deal more intelligently with
1768
      # variable fields)
1769
      headers[field] = field
1770
    if separator is not None:
1771
      format_fields.append("%s")
1772
    elif numfields.Matches(field):
1773
      format_fields.append("%*s")
1774
    else:
1775
      format_fields.append("%-*s")
1776

    
1777
  if separator is None:
1778
    mlens = [0 for name in fields]
1779
    format = ' '.join(format_fields)
1780
  else:
1781
    format = separator.replace("%", "%%").join(format_fields)
1782

    
1783
  for row in data:
1784
    if row is None:
1785
      continue
1786
    for idx, val in enumerate(row):
1787
      if unitfields.Matches(fields[idx]):
1788
        try:
1789
          val = int(val)
1790
        except (TypeError, ValueError):
1791
          pass
1792
        else:
1793
          val = row[idx] = utils.FormatUnit(val, units)
1794
      val = row[idx] = str(val)
1795
      if separator is None:
1796
        mlens[idx] = max(mlens[idx], len(val))
1797

    
1798
  result = []
1799
  if headers:
1800
    args = []
1801
    for idx, name in enumerate(fields):
1802
      hdr = headers[name]
1803
      if separator is None:
1804
        mlens[idx] = max(mlens[idx], len(hdr))
1805
        args.append(mlens[idx])
1806
      args.append(hdr)
1807
    result.append(format % tuple(args))
1808

    
1809
  if separator is None:
1810
    assert len(mlens) == len(fields)
1811

    
1812
    if fields and not numfields.Matches(fields[-1]):
1813
      mlens[-1] = 0
1814

    
1815
  for line in data:
1816
    args = []
1817
    if line is None:
1818
      line = ['-' for _ in fields]
1819
    for idx in range(len(fields)):
1820
      if separator is None:
1821
        args.append(mlens[idx])
1822
      args.append(line[idx])
1823
    result.append(format % tuple(args))
1824

    
1825
  return result
1826

    
1827

    
1828
def FormatTimestamp(ts):
1829
  """Formats a given timestamp.
1830

1831
  @type ts: timestamp
1832
  @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
1833

1834
  @rtype: string
1835
  @return: a string with the formatted timestamp
1836

1837
  """
1838
  if not isinstance (ts, (tuple, list)) or len(ts) != 2:
1839
    return '?'
1840
  sec, usec = ts
1841
  return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
1842

    
1843

    
1844
def ParseTimespec(value):
1845
  """Parse a time specification.
1846

1847
  The following suffixed will be recognized:
1848

1849
    - s: seconds
1850
    - m: minutes
1851
    - h: hours
1852
    - d: day
1853
    - w: weeks
1854

1855
  Without any suffix, the value will be taken to be in seconds.
1856

1857
  """
1858
  value = str(value)
1859
  if not value:
1860
    raise errors.OpPrereqError("Empty time specification passed")
1861
  suffix_map = {
1862
    's': 1,
1863
    'm': 60,
1864
    'h': 3600,
1865
    'd': 86400,
1866
    'w': 604800,
1867
    }
1868
  if value[-1] not in suffix_map:
1869
    try:
1870
      value = int(value)
1871
    except (TypeError, ValueError):
1872
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1873
  else:
1874
    multiplier = suffix_map[value[-1]]
1875
    value = value[:-1]
1876
    if not value: # no data left after stripping the suffix
1877
      raise errors.OpPrereqError("Invalid time specification (only"
1878
                                 " suffix passed)")
1879
    try:
1880
      value = int(value) * multiplier
1881
    except (TypeError, ValueError):
1882
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1883
  return value
1884

    
1885

    
1886
def GetOnlineNodes(nodes, cl=None, nowarn=False):
1887
  """Returns the names of online nodes.
1888

1889
  This function will also log a warning on stderr with the names of
1890
  the online nodes.
1891

1892
  @param nodes: if not empty, use only this subset of nodes (minus the
1893
      offline ones)
1894
  @param cl: if not None, luxi client to use
1895
  @type nowarn: boolean
1896
  @param nowarn: by default, this function will output a note with the
1897
      offline nodes that are skipped; if this parameter is True the
1898
      note is not displayed
1899

1900
  """
1901
  if cl is None:
1902
    cl = GetClient()
1903

    
1904
  result = cl.QueryNodes(names=nodes, fields=["name", "offline"],
1905
                         use_locking=False)
1906
  offline = [row[0] for row in result if row[1]]
1907
  if offline and not nowarn:
1908
    ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
1909
  return [row[0] for row in result if not row[1]]
1910

    
1911

    
1912
def _ToStream(stream, txt, *args):
1913
  """Write a message to a stream, bypassing the logging system
1914

1915
  @type stream: file object
1916
  @param stream: the file to which we should write
1917
  @type txt: str
1918
  @param txt: the message
1919

1920
  """
1921
  if args:
1922
    args = tuple(args)
1923
    stream.write(txt % args)
1924
  else:
1925
    stream.write(txt)
1926
  stream.write('\n')
1927
  stream.flush()
1928

    
1929

    
1930
def ToStdout(txt, *args):
1931
  """Write a message to stdout only, bypassing the logging system
1932

1933
  This is just a wrapper over _ToStream.
1934

1935
  @type txt: str
1936
  @param txt: the message
1937

1938
  """
1939
  _ToStream(sys.stdout, txt, *args)
1940

    
1941

    
1942
def ToStderr(txt, *args):
1943
  """Write a message to stderr only, bypassing the logging system
1944

1945
  This is just a wrapper over _ToStream.
1946

1947
  @type txt: str
1948
  @param txt: the message
1949

1950
  """
1951
  _ToStream(sys.stderr, txt, *args)
1952

    
1953

    
1954
class JobExecutor(object):
1955
  """Class which manages the submission and execution of multiple jobs.
1956

1957
  Note that instances of this class should not be reused between
1958
  GetResults() calls.
1959

1960
  """
1961
  def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
1962
    self.queue = []
1963
    if cl is None:
1964
      cl = GetClient()
1965
    self.cl = cl
1966
    self.verbose = verbose
1967
    self.jobs = []
1968
    self.opts = opts
1969
    self.feedback_fn = feedback_fn
1970

    
1971
  def QueueJob(self, name, *ops):
1972
    """Record a job for later submit.
1973

1974
    @type name: string
1975
    @param name: a description of the job, will be used in WaitJobSet
1976
    """
1977
    SetGenericOpcodeOpts(ops, self.opts)
1978
    self.queue.append((name, ops))
1979

    
1980
  def SubmitPending(self):
1981
    """Submit all pending jobs.
1982

1983
    """
1984
    results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
1985
    for (idx, ((status, data), (name, _))) in enumerate(zip(results,
1986
                                                            self.queue)):
1987
      self.jobs.append((idx, status, data, name))
1988

    
1989
  def _ChooseJob(self):
1990
    """Choose a non-waiting/queued job to poll next.
1991

1992
    """
1993
    assert self.jobs, "_ChooseJob called with empty job list"
1994

    
1995
    result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
1996
    assert result
1997

    
1998
    for job_data, status in zip(self.jobs, result):
1999
      if status[0] in (constants.JOB_STATUS_QUEUED,
2000
                    constants.JOB_STATUS_WAITLOCK,
2001
                    constants.JOB_STATUS_CANCELING):
2002
        # job is still waiting
2003
        continue
2004
      # good candidate found
2005
      self.jobs.remove(job_data)
2006
      return job_data
2007

    
2008
    # no job found
2009
    return self.jobs.pop(0)
2010

    
2011
  def GetResults(self):
2012
    """Wait for and return the results of all jobs.
2013

2014
    @rtype: list
2015
    @return: list of tuples (success, job results), in the same order
2016
        as the submitted jobs; if a job has failed, instead of the result
2017
        there will be the error message
2018

2019
    """
2020
    if not self.jobs:
2021
      self.SubmitPending()
2022
    results = []
2023
    if self.verbose:
2024
      ok_jobs = [row[2] for row in self.jobs if row[1]]
2025
      if ok_jobs:
2026
        ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2027

    
2028
    # first, remove any non-submitted jobs
2029
    self.jobs, failures = utils.partition(self.jobs, lambda x: x[1])
2030
    for idx, _, jid, name in failures:
2031
      ToStderr("Failed to submit job for %s: %s", name, jid)
2032
      results.append((idx, False, jid))
2033

    
2034
    while self.jobs:
2035
      (idx, _, jid, name) = self._ChooseJob()
2036
      ToStdout("Waiting for job %s for %s...", jid, name)
2037
      try:
2038
        job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2039
        success = True
2040
      except (errors.GenericError, luxi.ProtocolError), err:
2041
        _, job_result = FormatError(err)
2042
        success = False
2043
        # the error message will always be shown, verbose or not
2044
        ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
2045

    
2046
      results.append((idx, success, job_result))
2047

    
2048
    # sort based on the index, then drop it
2049
    results.sort()
2050
    results = [i[1:] for i in results]
2051

    
2052
    return results
2053

    
2054
  def WaitOrShow(self, wait):
2055
    """Wait for job results or only print the job IDs.
2056

2057
    @type wait: boolean
2058
    @param wait: whether to wait or not
2059

2060
    """
2061
    if wait:
2062
      return self.GetResults()
2063
    else:
2064
      if not self.jobs:
2065
        self.SubmitPending()
2066
      for status, result, name in self.jobs:
2067
        if status:
2068
          ToStdout("%s: %s", result, name)
2069
        else:
2070
          ToStderr("Failure for %s: %s", name, result)