Statistics
| Branch: | Tag: | Revision:

root / lib / cli.py @ 20601361

History | View | Annotate | Download (73.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module dealing with command line parsing"""
23

    
24

    
25
import sys
26
import textwrap
27
import os.path
28
import time
29
import logging
30
from cStringIO import StringIO
31

    
32
from ganeti import utils
33
from ganeti import errors
34
from ganeti import constants
35
from ganeti import opcodes
36
from ganeti import luxi
37
from ganeti import ssconf
38
from ganeti import rpc
39
from ganeti import ssh
40
from ganeti import compat
41

    
42
from optparse import (OptionParser, TitledHelpFormatter,
43
                      Option, OptionValueError)
44

    
45

    
46
__all__ = [
47
  # Command line options
48
  "ADD_UIDS_OPT",
49
  "ALLOCATABLE_OPT",
50
  "ALL_OPT",
51
  "AUTO_PROMOTE_OPT",
52
  "AUTO_REPLACE_OPT",
53
  "BACKEND_OPT",
54
  "CLEANUP_OPT",
55
  "CONFIRM_OPT",
56
  "CP_SIZE_OPT",
57
  "DEBUG_OPT",
58
  "DEBUG_SIMERR_OPT",
59
  "DISKIDX_OPT",
60
  "DISK_OPT",
61
  "DISK_TEMPLATE_OPT",
62
  "DRAINED_OPT",
63
  "EARLY_RELEASE_OPT",
64
  "ENABLED_HV_OPT",
65
  "ERROR_CODES_OPT",
66
  "FIELDS_OPT",
67
  "FILESTORE_DIR_OPT",
68
  "FILESTORE_DRIVER_OPT",
69
  "FORCE_OPT",
70
  "FORCE_VARIANT_OPT",
71
  "GLOBAL_FILEDIR_OPT",
72
  "HVLIST_OPT",
73
  "HVOPTS_OPT",
74
  "HYPERVISOR_OPT",
75
  "IALLOCATOR_OPT",
76
  "IDENTIFY_DEFAULTS_OPT",
77
  "IGNORE_CONSIST_OPT",
78
  "IGNORE_FAILURES_OPT",
79
  "IGNORE_SECONDARIES_OPT",
80
  "IGNORE_SIZE_OPT",
81
  "MAC_PREFIX_OPT",
82
  "MAINTAIN_NODE_HEALTH_OPT",
83
  "MASTER_NETDEV_OPT",
84
  "MC_OPT",
85
  "NET_OPT",
86
  "NEW_CLUSTER_CERT_OPT",
87
  "NEW_CONFD_HMAC_KEY_OPT",
88
  "NEW_RAPI_CERT_OPT",
89
  "NEW_SECONDARY_OPT",
90
  "NIC_PARAMS_OPT",
91
  "NODE_LIST_OPT",
92
  "NODE_PLACEMENT_OPT",
93
  "NOHDR_OPT",
94
  "NOIPCHECK_OPT",
95
  "NO_INSTALL_OPT",
96
  "NONAMECHECK_OPT",
97
  "NOLVM_STORAGE_OPT",
98
  "NOMODIFY_ETCHOSTS_OPT",
99
  "NOMODIFY_SSH_SETUP_OPT",
100
  "NONICS_OPT",
101
  "NONLIVE_OPT",
102
  "NONPLUS1_OPT",
103
  "NOSHUTDOWN_OPT",
104
  "NOSTART_OPT",
105
  "NOSSH_KEYCHECK_OPT",
106
  "NOVOTING_OPT",
107
  "NWSYNC_OPT",
108
  "ON_PRIMARY_OPT",
109
  "ON_SECONDARY_OPT",
110
  "OFFLINE_OPT",
111
  "OS_OPT",
112
  "OS_SIZE_OPT",
113
  "RAPI_CERT_OPT",
114
  "READD_OPT",
115
  "REBOOT_TYPE_OPT",
116
  "REMOVE_UIDS_OPT",
117
  "SECONDARY_IP_OPT",
118
  "SELECT_OS_OPT",
119
  "SEP_OPT",
120
  "SHOWCMD_OPT",
121
  "SHUTDOWN_TIMEOUT_OPT",
122
  "SINGLE_NODE_OPT",
123
  "SRC_DIR_OPT",
124
  "SRC_NODE_OPT",
125
  "SUBMIT_OPT",
126
  "STATIC_OPT",
127
  "SYNC_OPT",
128
  "TAG_SRC_OPT",
129
  "TIMEOUT_OPT",
130
  "UIDPOOL_OPT",
131
  "USEUNITS_OPT",
132
  "USE_REPL_NET_OPT",
133
  "VERBOSE_OPT",
134
  "VG_NAME_OPT",
135
  "YES_DOIT_OPT",
136
  # Generic functions for CLI programs
137
  "GenericMain",
138
  "GenericInstanceCreate",
139
  "GetClient",
140
  "GetOnlineNodes",
141
  "JobExecutor",
142
  "JobSubmittedException",
143
  "ParseTimespec",
144
  "RunWhileClusterStopped",
145
  "SubmitOpCode",
146
  "SubmitOrSend",
147
  "UsesRPC",
148
  # Formatting functions
149
  "ToStderr", "ToStdout",
150
  "FormatError",
151
  "GenerateTable",
152
  "AskUser",
153
  "FormatTimestamp",
154
  # Tags functions
155
  "ListTags",
156
  "AddTags",
157
  "RemoveTags",
158
  # command line options support infrastructure
159
  "ARGS_MANY_INSTANCES",
160
  "ARGS_MANY_NODES",
161
  "ARGS_NONE",
162
  "ARGS_ONE_INSTANCE",
163
  "ARGS_ONE_NODE",
164
  "ARGS_ONE_OS",
165
  "ArgChoice",
166
  "ArgCommand",
167
  "ArgFile",
168
  "ArgHost",
169
  "ArgInstance",
170
  "ArgJobId",
171
  "ArgNode",
172
  "ArgOs",
173
  "ArgSuggest",
174
  "ArgUnknown",
175
  "OPT_COMPL_INST_ADD_NODES",
176
  "OPT_COMPL_MANY_NODES",
177
  "OPT_COMPL_ONE_IALLOCATOR",
178
  "OPT_COMPL_ONE_INSTANCE",
179
  "OPT_COMPL_ONE_NODE",
180
  "OPT_COMPL_ONE_OS",
181
  "cli_option",
182
  "SplitNodeOption",
183
  "CalculateOSNames",
184
  ]
185

    
186
NO_PREFIX = "no_"
187
UN_PREFIX = "-"
188

    
189

    
190
class _Argument:
191
  def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
192
    self.min = min
193
    self.max = max
194

    
195
  def __repr__(self):
196
    return ("<%s min=%s max=%s>" %
197
            (self.__class__.__name__, self.min, self.max))
198

    
199

    
200
class ArgSuggest(_Argument):
201
  """Suggesting argument.
202

203
  Value can be any of the ones passed to the constructor.
204

205
  """
206
  # pylint: disable-msg=W0622
207
  def __init__(self, min=0, max=None, choices=None):
208
    _Argument.__init__(self, min=min, max=max)
209
    self.choices = choices
210

    
211
  def __repr__(self):
212
    return ("<%s min=%s max=%s choices=%r>" %
213
            (self.__class__.__name__, self.min, self.max, self.choices))
214

    
215

    
216
class ArgChoice(ArgSuggest):
217
  """Choice argument.
218

219
  Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
220
  but value must be one of the choices.
221

222
  """
223

    
224

    
225
class ArgUnknown(_Argument):
226
  """Unknown argument to program (e.g. determined at runtime).
227

228
  """
229

    
230

    
231
class ArgInstance(_Argument):
232
  """Instances argument.
233

234
  """
235

    
236

    
237
class ArgNode(_Argument):
238
  """Node argument.
239

240
  """
241

    
242
class ArgJobId(_Argument):
243
  """Job ID argument.
244

245
  """
246

    
247

    
248
class ArgFile(_Argument):
249
  """File path argument.
250

251
  """
252

    
253

    
254
class ArgCommand(_Argument):
255
  """Command argument.
256

257
  """
258

    
259

    
260
class ArgHost(_Argument):
261
  """Host argument.
262

263
  """
264

    
265

    
266
class ArgOs(_Argument):
267
  """OS argument.
268

269
  """
270

    
271

    
272
ARGS_NONE = []
273
ARGS_MANY_INSTANCES = [ArgInstance()]
274
ARGS_MANY_NODES = [ArgNode()]
275
ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
276
ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
277
ARGS_ONE_OS = [ArgOs(min=1, max=1)]
278

    
279

    
280
def _ExtractTagsObject(opts, args):
281
  """Extract the tag type object.
282

283
  Note that this function will modify its args parameter.
284

285
  """
286
  if not hasattr(opts, "tag_type"):
287
    raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
288
  kind = opts.tag_type
289
  if kind == constants.TAG_CLUSTER:
290
    retval = kind, kind
291
  elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
292
    if not args:
293
      raise errors.OpPrereqError("no arguments passed to the command")
294
    name = args.pop(0)
295
    retval = kind, name
296
  else:
297
    raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
298
  return retval
299

    
300

    
301
def _ExtendTags(opts, args):
302
  """Extend the args if a source file has been given.
303

304
  This function will extend the tags with the contents of the file
305
  passed in the 'tags_source' attribute of the opts parameter. A file
306
  named '-' will be replaced by stdin.
307

308
  """
309
  fname = opts.tags_source
310
  if fname is None:
311
    return
312
  if fname == "-":
313
    new_fh = sys.stdin
314
  else:
315
    new_fh = open(fname, "r")
316
  new_data = []
317
  try:
318
    # we don't use the nice 'new_data = [line.strip() for line in fh]'
319
    # because of python bug 1633941
320
    while True:
321
      line = new_fh.readline()
322
      if not line:
323
        break
324
      new_data.append(line.strip())
325
  finally:
326
    new_fh.close()
327
  args.extend(new_data)
328

    
329

    
330
def ListTags(opts, args):
331
  """List the tags on a given object.
332

333
  This is a generic implementation that knows how to deal with all
334
  three cases of tag objects (cluster, node, instance). The opts
335
  argument is expected to contain a tag_type field denoting what
336
  object type we work on.
337

338
  """
339
  kind, name = _ExtractTagsObject(opts, args)
340
  cl = GetClient()
341
  result = cl.QueryTags(kind, name)
342
  result = list(result)
343
  result.sort()
344
  for tag in result:
345
    ToStdout(tag)
346

    
347

    
348
def AddTags(opts, args):
349
  """Add tags on a given object.
350

351
  This is a generic implementation that knows how to deal with all
352
  three cases of tag objects (cluster, node, instance). The opts
353
  argument is expected to contain a tag_type field denoting what
354
  object type we work on.
355

356
  """
357
  kind, name = _ExtractTagsObject(opts, args)
358
  _ExtendTags(opts, args)
359
  if not args:
360
    raise errors.OpPrereqError("No tags to be added")
361
  op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
362
  SubmitOpCode(op)
363

    
364

    
365
def RemoveTags(opts, args):
366
  """Remove tags from a given object.
367

368
  This is a generic implementation that knows how to deal with all
369
  three cases of tag objects (cluster, node, instance). The opts
370
  argument is expected to contain a tag_type field denoting what
371
  object type we work on.
372

373
  """
374
  kind, name = _ExtractTagsObject(opts, args)
375
  _ExtendTags(opts, args)
376
  if not args:
377
    raise errors.OpPrereqError("No tags to be removed")
378
  op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
379
  SubmitOpCode(op)
380

    
381

    
382
def check_unit(option, opt, value): # pylint: disable-msg=W0613
383
  """OptParsers custom converter for units.
384

385
  """
386
  try:
387
    return utils.ParseUnit(value)
388
  except errors.UnitParseError, err:
389
    raise OptionValueError("option %s: %s" % (opt, err))
390

    
391

    
392
def _SplitKeyVal(opt, data):
393
  """Convert a KeyVal string into a dict.
394

395
  This function will convert a key=val[,...] string into a dict. Empty
396
  values will be converted specially: keys which have the prefix 'no_'
397
  will have the value=False and the prefix stripped, the others will
398
  have value=True.
399

400
  @type opt: string
401
  @param opt: a string holding the option name for which we process the
402
      data, used in building error messages
403
  @type data: string
404
  @param data: a string of the format key=val,key=val,...
405
  @rtype: dict
406
  @return: {key=val, key=val}
407
  @raises errors.ParameterError: if there are duplicate keys
408

409
  """
410
  kv_dict = {}
411
  if data:
412
    for elem in utils.UnescapeAndSplit(data, sep=","):
413
      if "=" in elem:
414
        key, val = elem.split("=", 1)
415
      else:
416
        if elem.startswith(NO_PREFIX):
417
          key, val = elem[len(NO_PREFIX):], False
418
        elif elem.startswith(UN_PREFIX):
419
          key, val = elem[len(UN_PREFIX):], None
420
        else:
421
          key, val = elem, True
422
      if key in kv_dict:
423
        raise errors.ParameterError("Duplicate key '%s' in option %s" %
424
                                    (key, opt))
425
      kv_dict[key] = val
426
  return kv_dict
427

    
428

    
429
def check_ident_key_val(option, opt, value):  # pylint: disable-msg=W0613
430
  """Custom parser for ident:key=val,key=val options.
431

432
  This will store the parsed values as a tuple (ident, {key: val}). As such,
433
  multiple uses of this option via action=append is possible.
434

435
  """
436
  if ":" not in value:
437
    ident, rest = value, ''
438
  else:
439
    ident, rest = value.split(":", 1)
440

    
441
  if ident.startswith(NO_PREFIX):
442
    if rest:
443
      msg = "Cannot pass options when removing parameter groups: %s" % value
444
      raise errors.ParameterError(msg)
445
    retval = (ident[len(NO_PREFIX):], False)
446
  elif ident.startswith(UN_PREFIX):
447
    if rest:
448
      msg = "Cannot pass options when removing parameter groups: %s" % value
449
      raise errors.ParameterError(msg)
450
    retval = (ident[len(UN_PREFIX):], None)
451
  else:
452
    kv_dict = _SplitKeyVal(opt, rest)
453
    retval = (ident, kv_dict)
454
  return retval
455

    
456

    
457
def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
458
  """Custom parser class for key=val,key=val options.
459

460
  This will store the parsed values as a dict {key: val}.
461

462
  """
463
  return _SplitKeyVal(opt, value)
464

    
465

    
466
def check_bool(option, opt, value): # pylint: disable-msg=W0613
467
  """Custom parser for yes/no options.
468

469
  This will store the parsed value as either True or False.
470

471
  """
472
  value = value.lower()
473
  if value == constants.VALUE_FALSE or value == "no":
474
    return False
475
  elif value == constants.VALUE_TRUE or value == "yes":
476
    return True
477
  else:
478
    raise errors.ParameterError("Invalid boolean value '%s'" % value)
479

    
480

    
481
# completion_suggestion is normally a list. Using numeric values not evaluating
482
# to False for dynamic completion.
483
(OPT_COMPL_MANY_NODES,
484
 OPT_COMPL_ONE_NODE,
485
 OPT_COMPL_ONE_INSTANCE,
486
 OPT_COMPL_ONE_OS,
487
 OPT_COMPL_ONE_IALLOCATOR,
488
 OPT_COMPL_INST_ADD_NODES) = range(100, 106)
489

    
490
OPT_COMPL_ALL = frozenset([
491
  OPT_COMPL_MANY_NODES,
492
  OPT_COMPL_ONE_NODE,
493
  OPT_COMPL_ONE_INSTANCE,
494
  OPT_COMPL_ONE_OS,
495
  OPT_COMPL_ONE_IALLOCATOR,
496
  OPT_COMPL_INST_ADD_NODES,
497
  ])
498

    
499

    
500
class CliOption(Option):
501
  """Custom option class for optparse.
502

503
  """
504
  ATTRS = Option.ATTRS + [
505
    "completion_suggest",
506
    ]
507
  TYPES = Option.TYPES + (
508
    "identkeyval",
509
    "keyval",
510
    "unit",
511
    "bool",
512
    )
513
  TYPE_CHECKER = Option.TYPE_CHECKER.copy()
514
  TYPE_CHECKER["identkeyval"] = check_ident_key_val
515
  TYPE_CHECKER["keyval"] = check_key_val
516
  TYPE_CHECKER["unit"] = check_unit
517
  TYPE_CHECKER["bool"] = check_bool
518

    
519

    
520
# optparse.py sets make_option, so we do it for our own option class, too
521
cli_option = CliOption
522

    
523

    
524
_YORNO = "yes|no"
525

    
526
DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
527
                       help="Increase debugging level")
528

    
529
NOHDR_OPT = cli_option("--no-headers", default=False,
530
                       action="store_true", dest="no_headers",
531
                       help="Don't display column headers")
532

    
533
SEP_OPT = cli_option("--separator", default=None,
534
                     action="store", dest="separator",
535
                     help=("Separator between output fields"
536
                           " (defaults to one space)"))
537

    
538
USEUNITS_OPT = cli_option("--units", default=None,
539
                          dest="units", choices=('h', 'm', 'g', 't'),
540
                          help="Specify units for output (one of hmgt)")
541

    
542
FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
543
                        type="string", metavar="FIELDS",
544
                        help="Comma separated list of output fields")
545

    
546
FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
547
                       default=False, help="Force the operation")
548

    
549
CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
550
                         default=False, help="Do not require confirmation")
551

    
552
TAG_SRC_OPT = cli_option("--from", dest="tags_source",
553
                         default=None, help="File with tag names")
554

    
555
SUBMIT_OPT = cli_option("--submit", dest="submit_only",
556
                        default=False, action="store_true",
557
                        help=("Submit the job and return the job ID, but"
558
                              " don't wait for the job to finish"))
559

    
560
SYNC_OPT = cli_option("--sync", dest="do_locking",
561
                      default=False, action="store_true",
562
                      help=("Grab locks while doing the queries"
563
                            " in order to ensure more consistent results"))
564

    
565
_DRY_RUN_OPT = cli_option("--dry-run", default=False,
566
                          action="store_true",
567
                          help=("Do not execute the operation, just run the"
568
                                " check steps and verify it it could be"
569
                                " executed"))
570

    
571
VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
572
                         action="store_true",
573
                         help="Increase the verbosity of the operation")
574

    
575
DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
576
                              action="store_true", dest="simulate_errors",
577
                              help="Debugging option that makes the operation"
578
                              " treat most runtime checks as failed")
579

    
580
NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
581
                        default=True, action="store_false",
582
                        help="Don't wait for sync (DANGEROUS!)")
583

    
584
DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
585
                               help="Custom disk setup (diskless, file,"
586
                               " plain or drbd)",
587
                               default=None, metavar="TEMPL",
588
                               choices=list(constants.DISK_TEMPLATES))
589

    
590
NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
591
                        help="Do not create any network cards for"
592
                        " the instance")
593

    
594
FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
595
                               help="Relative path under default cluster-wide"
596
                               " file storage dir to store file-based disks",
597
                               default=None, metavar="<DIR>")
598

    
599
FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
600
                                  help="Driver to use for image files",
601
                                  default="loop", metavar="<DRIVER>",
602
                                  choices=list(constants.FILE_DRIVER))
603

    
604
IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
605
                            help="Select nodes for the instance automatically"
606
                            " using the <NAME> iallocator plugin",
607
                            default=None, type="string",
608
                            completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
609

    
610
OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
611
                    metavar="<os>",
612
                    completion_suggest=OPT_COMPL_ONE_OS)
613

    
614
FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
615
                               action="store_true", default=False,
616
                               help="Force an unknown variant")
617

    
618
NO_INSTALL_OPT = cli_option("--no-install", dest="no_install",
619
                            action="store_true", default=False,
620
                            help="Do not install the OS (will"
621
                            " enable no-start)")
622

    
623
BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
624
                         type="keyval", default={},
625
                         help="Backend parameters")
626

    
627
HVOPTS_OPT =  cli_option("-H", "--hypervisor-parameters", type="keyval",
628
                         default={}, dest="hvparams",
629
                         help="Hypervisor parameters")
630

    
631
HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
632
                            help="Hypervisor and hypervisor options, in the"
633
                            " format hypervisor:option=value,option=value,...",
634
                            default=None, type="identkeyval")
635

    
636
HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
637
                        help="Hypervisor and hypervisor options, in the"
638
                        " format hypervisor:option=value,option=value,...",
639
                        default=[], action="append", type="identkeyval")
640

    
641
NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
642
                           action="store_false",
643
                           help="Don't check that the instance's IP"
644
                           " is alive")
645

    
646
NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
647
                             default=True, action="store_false",
648
                             help="Don't check that the instance's name"
649
                             " is resolvable")
650

    
651
NET_OPT = cli_option("--net",
652
                     help="NIC parameters", default=[],
653
                     dest="nics", action="append", type="identkeyval")
654

    
655
DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
656
                      dest="disks", action="append", type="identkeyval")
657

    
658
DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
659
                         help="Comma-separated list of disks"
660
                         " indices to act on (e.g. 0,2) (optional,"
661
                         " defaults to all disks)")
662

    
663
OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
664
                         help="Enforces a single-disk configuration using the"
665
                         " given disk size, in MiB unless a suffix is used",
666
                         default=None, type="unit", metavar="<size>")
667

    
668
IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
669
                                dest="ignore_consistency",
670
                                action="store_true", default=False,
671
                                help="Ignore the consistency of the disks on"
672
                                " the secondary")
673

    
674
NONLIVE_OPT = cli_option("--non-live", dest="live",
675
                         default=True, action="store_false",
676
                         help="Do a non-live migration (this usually means"
677
                         " freeze the instance, save the state, transfer and"
678
                         " only then resume running on the secondary node)")
679

    
680
NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
681
                                help="Target node and optional secondary node",
682
                                metavar="<pnode>[:<snode>]",
683
                                completion_suggest=OPT_COMPL_INST_ADD_NODES)
684

    
685
NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
686
                           action="append", metavar="<node>",
687
                           help="Use only this node (can be used multiple"
688
                           " times, if not given defaults to all nodes)",
689
                           completion_suggest=OPT_COMPL_ONE_NODE)
690

    
691
SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
692
                             metavar="<node>",
693
                             completion_suggest=OPT_COMPL_ONE_NODE)
694

    
695
NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
696
                         action="store_false",
697
                         help="Don't start the instance after creation")
698

    
699
SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
700
                         action="store_true", default=False,
701
                         help="Show command instead of executing it")
702

    
703
CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
704
                         default=False, action="store_true",
705
                         help="Instead of performing the migration, try to"
706
                         " recover from a failed cleanup. This is safe"
707
                         " to run even if the instance is healthy, but it"
708
                         " will create extra replication traffic and "
709
                         " disrupt briefly the replication (like during the"
710
                         " migration")
711

    
712
STATIC_OPT = cli_option("-s", "--static", dest="static",
713
                        action="store_true", default=False,
714
                        help="Only show configuration data, not runtime data")
715

    
716
ALL_OPT = cli_option("--all", dest="show_all",
717
                     default=False, action="store_true",
718
                     help="Show info on all instances on the cluster."
719
                     " This can take a long time to run, use wisely")
720

    
721
SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
722
                           action="store_true", default=False,
723
                           help="Interactive OS reinstall, lists available"
724
                           " OS templates for selection")
725

    
726
IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
727
                                 action="store_true", default=False,
728
                                 help="Remove the instance from the cluster"
729
                                 " configuration even if there are failures"
730
                                 " during the removal process")
731

    
732
NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
733
                               help="Specifies the new secondary node",
734
                               metavar="NODE", default=None,
735
                               completion_suggest=OPT_COMPL_ONE_NODE)
736

    
737
ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
738
                            default=False, action="store_true",
739
                            help="Replace the disk(s) on the primary"
740
                            " node (only for the drbd template)")
741

    
742
ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
743
                              default=False, action="store_true",
744
                              help="Replace the disk(s) on the secondary"
745
                              " node (only for the drbd template)")
746

    
747
AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
748
                              default=False, action="store_true",
749
                              help="Lock all nodes and auto-promote as needed"
750
                              " to MC status")
751

    
752
AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
753
                              default=False, action="store_true",
754
                              help="Automatically replace faulty disks"
755
                              " (only for the drbd template)")
756

    
757
IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
758
                             default=False, action="store_true",
759
                             help="Ignore current recorded size"
760
                             " (useful for forcing activation when"
761
                             " the recorded size is wrong)")
762

    
763
SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
764
                          metavar="<node>",
765
                          completion_suggest=OPT_COMPL_ONE_NODE)
766

    
767
SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
768
                         metavar="<dir>")
769

    
770
SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
771
                              help="Specify the secondary ip for the node",
772
                              metavar="ADDRESS", default=None)
773

    
774
READD_OPT = cli_option("--readd", dest="readd",
775
                       default=False, action="store_true",
776
                       help="Readd old node after replacing it")
777

    
778
NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
779
                                default=True, action="store_false",
780
                                help="Disable SSH key fingerprint checking")
781

    
782

    
783
MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
784
                    type="bool", default=None, metavar=_YORNO,
785
                    help="Set the master_candidate flag on the node")
786

    
787
OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
788
                         type="bool", default=None,
789
                         help="Set the offline flag on the node")
790

    
791
DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
792
                         type="bool", default=None,
793
                         help="Set the drained flag on the node")
794

    
795
ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
796
                             type="bool", default=None, metavar=_YORNO,
797
                             help="Set the allocatable flag on a volume")
798

    
799
NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
800
                               help="Disable support for lvm based instances"
801
                               " (cluster-wide)",
802
                               action="store_false", default=True)
803

    
804
ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
805
                            dest="enabled_hypervisors",
806
                            help="Comma-separated list of hypervisors",
807
                            type="string", default=None)
808

    
809
NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
810
                            type="keyval", default={},
811
                            help="NIC parameters")
812

    
813
CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
814
                         dest="candidate_pool_size", type="int",
815
                         help="Set the candidate pool size")
816

    
817
VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name",
818
                         help="Enables LVM and specifies the volume group"
819
                         " name (cluster-wide) for disk allocation [xenvg]",
820
                         metavar="VG", default=None)
821

    
822
YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
823
                          help="Destroy cluster", action="store_true")
824

    
825
NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
826
                          help="Skip node agreement check (dangerous)",
827
                          action="store_true", default=False)
828

    
829
MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
830
                            help="Specify the mac prefix for the instance IP"
831
                            " addresses, in the format XX:XX:XX",
832
                            metavar="PREFIX",
833
                            default=None)
834

    
835
MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
836
                               help="Specify the node interface (cluster-wide)"
837
                               " on which the master IP address will be added "
838
                               " [%s]" % constants.DEFAULT_BRIDGE,
839
                               metavar="NETDEV",
840
                               default=constants.DEFAULT_BRIDGE)
841

    
842

    
843
GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
844
                                help="Specify the default directory (cluster-"
845
                                "wide) for storing the file-based disks [%s]" %
846
                                constants.DEFAULT_FILE_STORAGE_DIR,
847
                                metavar="DIR",
848
                                default=constants.DEFAULT_FILE_STORAGE_DIR)
849

    
850
NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
851
                                   help="Don't modify /etc/hosts",
852
                                   action="store_false", default=True)
853

    
854
NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
855
                                    help="Don't initialize SSH keys",
856
                                    action="store_false", default=True)
857

    
858
ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
859
                             help="Enable parseable error messages",
860
                             action="store_true", default=False)
861

    
862
NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
863
                          help="Skip N+1 memory redundancy tests",
864
                          action="store_true", default=False)
865

    
866
REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
867
                             help="Type of reboot: soft/hard/full",
868
                             default=constants.INSTANCE_REBOOT_HARD,
869
                             metavar="<REBOOT>",
870
                             choices=list(constants.REBOOT_TYPES))
871

    
872
IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
873
                                    dest="ignore_secondaries",
874
                                    default=False, action="store_true",
875
                                    help="Ignore errors from secondaries")
876

    
877
NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
878
                            action="store_false", default=True,
879
                            help="Don't shutdown the instance (unsafe)")
880

    
881
TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
882
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
883
                         help="Maximum time to wait")
884

    
885
SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
886
                         dest="shutdown_timeout", type="int",
887
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
888
                         help="Maximum time to wait for instance shutdown")
889

    
890
EARLY_RELEASE_OPT = cli_option("--early-release",
891
                               dest="early_release", default=False,
892
                               action="store_true",
893
                               help="Release the locks on the secondary"
894
                               " node(s) early")
895

    
896
NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
897
                                  dest="new_cluster_cert",
898
                                  default=False, action="store_true",
899
                                  help="Generate a new cluster certificate")
900

    
901
RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
902
                           default=None,
903
                           help="File containing new RAPI certificate")
904

    
905
NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
906
                               default=None, action="store_true",
907
                               help=("Generate a new self-signed RAPI"
908
                                     " certificate"))
909

    
910
NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
911
                                    dest="new_confd_hmac_key",
912
                                    default=False, action="store_true",
913
                                    help=("Create a new HMAC key for %s" %
914
                                          constants.CONFD))
915

    
916
USE_REPL_NET_OPT = cli_option("--use-replication-network",
917
                              dest="use_replication_network",
918
                              help="Whether to use the replication network"
919
                              " for talking to the nodes",
920
                              action="store_true", default=False)
921

    
922
MAINTAIN_NODE_HEALTH_OPT = \
923
    cli_option("--maintain-node-health", dest="maintain_node_health",
924
               metavar=_YORNO, default=None, type="bool",
925
               help="Configure the cluster to automatically maintain node"
926
               " health, by shutting down unknown instances, shutting down"
927
               " unknown DRBD devices, etc.")
928

    
929
IDENTIFY_DEFAULTS_OPT = \
930
    cli_option("--identify-defaults", dest="identify_defaults",
931
               default=False, action="store_true",
932
               help="Identify which saved instance parameters are equal to"
933
               " the current cluster defaults and set them as such, instead"
934
               " of marking them as overridden")
935

    
936
UIDPOOL_OPT = cli_option("--uid-pool", default=None,
937
                         action="store", dest="uid_pool",
938
                         help=("A list of user-ids or user-id"
939
                               " ranges separated by commas"))
940

    
941
ADD_UIDS_OPT = cli_option("--add-uids", default=None,
942
                          action="store", dest="add_uids",
943
                          help=("A list of user-ids or user-id"
944
                                " ranges separated by commas, to be"
945
                                " added to the user-id pool"))
946

    
947
REMOVE_UIDS_OPT = cli_option("--remove-uids", default=None,
948
                             action="store", dest="remove_uids",
949
                             help=("A list of user-ids or user-id"
950
                                   " ranges separated by commas, to be"
951
                                   " removed from the user-id pool"))
952

    
953

    
954
def _ParseArgs(argv, commands, aliases):
955
  """Parser for the command line arguments.
956

957
  This function parses the arguments and returns the function which
958
  must be executed together with its (modified) arguments.
959

960
  @param argv: the command line
961
  @param commands: dictionary with special contents, see the design
962
      doc for cmdline handling
963
  @param aliases: dictionary with command aliases {'alias': 'target, ...}
964

965
  """
966
  if len(argv) == 0:
967
    binary = "<command>"
968
  else:
969
    binary = argv[0].split("/")[-1]
970

    
971
  if len(argv) > 1 and argv[1] == "--version":
972
    ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
973
    # Quit right away. That way we don't have to care about this special
974
    # argument. optparse.py does it the same.
975
    sys.exit(0)
976

    
977
  if len(argv) < 2 or not (argv[1] in commands or
978
                           argv[1] in aliases):
979
    # let's do a nice thing
980
    sortedcmds = commands.keys()
981
    sortedcmds.sort()
982

    
983
    ToStdout("Usage: %s {command} [options...] [argument...]", binary)
984
    ToStdout("%s <command> --help to see details, or man %s", binary, binary)
985
    ToStdout("")
986

    
987
    # compute the max line length for cmd + usage
988
    mlen = max([len(" %s" % cmd) for cmd in commands])
989
    mlen = min(60, mlen) # should not get here...
990

    
991
    # and format a nice command list
992
    ToStdout("Commands:")
993
    for cmd in sortedcmds:
994
      cmdstr = " %s" % (cmd,)
995
      help_text = commands[cmd][4]
996
      help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
997
      ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
998
      for line in help_lines:
999
        ToStdout("%-*s   %s", mlen, "", line)
1000

    
1001
    ToStdout("")
1002

    
1003
    return None, None, None
1004

    
1005
  # get command, unalias it, and look it up in commands
1006
  cmd = argv.pop(1)
1007
  if cmd in aliases:
1008
    if cmd in commands:
1009
      raise errors.ProgrammerError("Alias '%s' overrides an existing"
1010
                                   " command" % cmd)
1011

    
1012
    if aliases[cmd] not in commands:
1013
      raise errors.ProgrammerError("Alias '%s' maps to non-existing"
1014
                                   " command '%s'" % (cmd, aliases[cmd]))
1015

    
1016
    cmd = aliases[cmd]
1017

    
1018
  func, args_def, parser_opts, usage, description = commands[cmd]
1019
  parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT, DEBUG_OPT],
1020
                        description=description,
1021
                        formatter=TitledHelpFormatter(),
1022
                        usage="%%prog %s %s" % (cmd, usage))
1023
  parser.disable_interspersed_args()
1024
  options, args = parser.parse_args()
1025

    
1026
  if not _CheckArguments(cmd, args_def, args):
1027
    return None, None, None
1028

    
1029
  return func, options, args
1030

    
1031

    
1032
def _CheckArguments(cmd, args_def, args):
1033
  """Verifies the arguments using the argument definition.
1034

1035
  Algorithm:
1036

1037
    1. Abort with error if values specified by user but none expected.
1038

1039
    1. For each argument in definition
1040

1041
      1. Keep running count of minimum number of values (min_count)
1042
      1. Keep running count of maximum number of values (max_count)
1043
      1. If it has an unlimited number of values
1044

1045
        1. Abort with error if it's not the last argument in the definition
1046

1047
    1. If last argument has limited number of values
1048

1049
      1. Abort with error if number of values doesn't match or is too large
1050

1051
    1. Abort with error if user didn't pass enough values (min_count)
1052

1053
  """
1054
  if args and not args_def:
1055
    ToStderr("Error: Command %s expects no arguments", cmd)
1056
    return False
1057

    
1058
  min_count = None
1059
  max_count = None
1060
  check_max = None
1061

    
1062
  last_idx = len(args_def) - 1
1063

    
1064
  for idx, arg in enumerate(args_def):
1065
    if min_count is None:
1066
      min_count = arg.min
1067
    elif arg.min is not None:
1068
      min_count += arg.min
1069

    
1070
    if max_count is None:
1071
      max_count = arg.max
1072
    elif arg.max is not None:
1073
      max_count += arg.max
1074

    
1075
    if idx == last_idx:
1076
      check_max = (arg.max is not None)
1077

    
1078
    elif arg.max is None:
1079
      raise errors.ProgrammerError("Only the last argument can have max=None")
1080

    
1081
  if check_max:
1082
    # Command with exact number of arguments
1083
    if (min_count is not None and max_count is not None and
1084
        min_count == max_count and len(args) != min_count):
1085
      ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
1086
      return False
1087

    
1088
    # Command with limited number of arguments
1089
    if max_count is not None and len(args) > max_count:
1090
      ToStderr("Error: Command %s expects only %d argument(s)",
1091
               cmd, max_count)
1092
      return False
1093

    
1094
  # Command with some required arguments
1095
  if min_count is not None and len(args) < min_count:
1096
    ToStderr("Error: Command %s expects at least %d argument(s)",
1097
             cmd, min_count)
1098
    return False
1099

    
1100
  return True
1101

    
1102

    
1103
def SplitNodeOption(value):
1104
  """Splits the value of a --node option.
1105

1106
  """
1107
  if value and ':' in value:
1108
    return value.split(':', 1)
1109
  else:
1110
    return (value, None)
1111

    
1112

    
1113
def CalculateOSNames(os_name, os_variants):
1114
  """Calculates all the names an OS can be called, according to its variants.
1115

1116
  @type os_name: string
1117
  @param os_name: base name of the os
1118
  @type os_variants: list or None
1119
  @param os_variants: list of supported variants
1120
  @rtype: list
1121
  @return: list of valid names
1122

1123
  """
1124
  if os_variants:
1125
    return ['%s+%s' % (os_name, v) for v in os_variants]
1126
  else:
1127
    return [os_name]
1128

    
1129

    
1130
def UsesRPC(fn):
1131
  def wrapper(*args, **kwargs):
1132
    rpc.Init()
1133
    try:
1134
      return fn(*args, **kwargs)
1135
    finally:
1136
      rpc.Shutdown()
1137
  return wrapper
1138

    
1139

    
1140
def AskUser(text, choices=None):
1141
  """Ask the user a question.
1142

1143
  @param text: the question to ask
1144

1145
  @param choices: list with elements tuples (input_char, return_value,
1146
      description); if not given, it will default to: [('y', True,
1147
      'Perform the operation'), ('n', False, 'Do no do the operation')];
1148
      note that the '?' char is reserved for help
1149

1150
  @return: one of the return values from the choices list; if input is
1151
      not possible (i.e. not running with a tty, we return the last
1152
      entry from the list
1153

1154
  """
1155
  if choices is None:
1156
    choices = [('y', True, 'Perform the operation'),
1157
               ('n', False, 'Do not perform the operation')]
1158
  if not choices or not isinstance(choices, list):
1159
    raise errors.ProgrammerError("Invalid choices argument to AskUser")
1160
  for entry in choices:
1161
    if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
1162
      raise errors.ProgrammerError("Invalid choices element to AskUser")
1163

    
1164
  answer = choices[-1][1]
1165
  new_text = []
1166
  for line in text.splitlines():
1167
    new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
1168
  text = "\n".join(new_text)
1169
  try:
1170
    f = file("/dev/tty", "a+")
1171
  except IOError:
1172
    return answer
1173
  try:
1174
    chars = [entry[0] for entry in choices]
1175
    chars[-1] = "[%s]" % chars[-1]
1176
    chars.append('?')
1177
    maps = dict([(entry[0], entry[1]) for entry in choices])
1178
    while True:
1179
      f.write(text)
1180
      f.write('\n')
1181
      f.write("/".join(chars))
1182
      f.write(": ")
1183
      line = f.readline(2).strip().lower()
1184
      if line in maps:
1185
        answer = maps[line]
1186
        break
1187
      elif line == '?':
1188
        for entry in choices:
1189
          f.write(" %s - %s\n" % (entry[0], entry[2]))
1190
        f.write("\n")
1191
        continue
1192
  finally:
1193
    f.close()
1194
  return answer
1195

    
1196

    
1197
class JobSubmittedException(Exception):
1198
  """Job was submitted, client should exit.
1199

1200
  This exception has one argument, the ID of the job that was
1201
  submitted. The handler should print this ID.
1202

1203
  This is not an error, just a structured way to exit from clients.
1204

1205
  """
1206

    
1207

    
1208
def SendJob(ops, cl=None):
1209
  """Function to submit an opcode without waiting for the results.
1210

1211
  @type ops: list
1212
  @param ops: list of opcodes
1213
  @type cl: luxi.Client
1214
  @param cl: the luxi client to use for communicating with the master;
1215
             if None, a new client will be created
1216

1217
  """
1218
  if cl is None:
1219
    cl = GetClient()
1220

    
1221
  job_id = cl.SubmitJob(ops)
1222

    
1223
  return job_id
1224

    
1225

    
1226
def GenericPollJob(job_id, cbs, report_cbs):
1227
  """Generic job-polling function.
1228

1229
  @type job_id: number
1230
  @param job_id: Job ID
1231
  @type cbs: Instance of L{JobPollCbBase}
1232
  @param cbs: Data callbacks
1233
  @type report_cbs: Instance of L{JobPollReportCbBase}
1234
  @param report_cbs: Reporting callbacks
1235

1236
  """
1237
  prev_job_info = None
1238
  prev_logmsg_serial = None
1239

    
1240
  status = None
1241

    
1242
  while True:
1243
    result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1244
                                      prev_logmsg_serial)
1245
    if not result:
1246
      # job not found, go away!
1247
      raise errors.JobLost("Job with id %s lost" % job_id)
1248

    
1249
    if result == constants.JOB_NOTCHANGED:
1250
      report_cbs.ReportNotChanged(job_id, status)
1251

    
1252
      # Wait again
1253
      continue
1254

    
1255
    # Split result, a tuple of (field values, log entries)
1256
    (job_info, log_entries) = result
1257
    (status, ) = job_info
1258

    
1259
    if log_entries:
1260
      for log_entry in log_entries:
1261
        (serial, timestamp, log_type, message) = log_entry
1262
        report_cbs.ReportLogMessage(job_id, serial, timestamp,
1263
                                    log_type, message)
1264
        prev_logmsg_serial = max(prev_logmsg_serial, serial)
1265

    
1266
    # TODO: Handle canceled and archived jobs
1267
    elif status in (constants.JOB_STATUS_SUCCESS,
1268
                    constants.JOB_STATUS_ERROR,
1269
                    constants.JOB_STATUS_CANCELING,
1270
                    constants.JOB_STATUS_CANCELED):
1271
      break
1272

    
1273
    prev_job_info = job_info
1274

    
1275
  jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1276
  if not jobs:
1277
    raise errors.JobLost("Job with id %s lost" % job_id)
1278

    
1279
  status, opstatus, result = jobs[0]
1280

    
1281
  if status == constants.JOB_STATUS_SUCCESS:
1282
    return result
1283

    
1284
  if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
1285
    raise errors.OpExecError("Job was canceled")
1286

    
1287
  has_ok = False
1288
  for idx, (status, msg) in enumerate(zip(opstatus, result)):
1289
    if status == constants.OP_STATUS_SUCCESS:
1290
      has_ok = True
1291
    elif status == constants.OP_STATUS_ERROR:
1292
      errors.MaybeRaise(msg)
1293

    
1294
      if has_ok:
1295
        raise errors.OpExecError("partial failure (opcode %d): %s" %
1296
                                 (idx, msg))
1297

    
1298
      raise errors.OpExecError(str(msg))
1299

    
1300
  # default failure mode
1301
  raise errors.OpExecError(result)
1302

    
1303

    
1304
class JobPollCbBase:
1305
  """Base class for L{GenericPollJob} callbacks.
1306

1307
  """
1308
  def __init__(self):
1309
    """Initializes this class.
1310

1311
    """
1312

    
1313
  def WaitForJobChangeOnce(self, job_id, fields,
1314
                           prev_job_info, prev_log_serial):
1315
    """Waits for changes on a job.
1316

1317
    """
1318
    raise NotImplementedError()
1319

    
1320
  def QueryJobs(self, job_ids, fields):
1321
    """Returns the selected fields for the selected job IDs.
1322

1323
    @type job_ids: list of numbers
1324
    @param job_ids: Job IDs
1325
    @type fields: list of strings
1326
    @param fields: Fields
1327

1328
    """
1329
    raise NotImplementedError()
1330

    
1331

    
1332
class JobPollReportCbBase:
1333
  """Base class for L{GenericPollJob} reporting callbacks.
1334

1335
  """
1336
  def __init__(self):
1337
    """Initializes this class.
1338

1339
    """
1340

    
1341
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1342
    """Handles a log message.
1343

1344
    """
1345
    raise NotImplementedError()
1346

    
1347
  def ReportNotChanged(self, job_id, status):
1348
    """Called for if a job hasn't changed in a while.
1349

1350
    @type job_id: number
1351
    @param job_id: Job ID
1352
    @type status: string or None
1353
    @param status: Job status if available
1354

1355
    """
1356
    raise NotImplementedError()
1357

    
1358

    
1359
class _LuxiJobPollCb(JobPollCbBase):
1360
  def __init__(self, cl):
1361
    """Initializes this class.
1362

1363
    """
1364
    JobPollCbBase.__init__(self)
1365
    self.cl = cl
1366

    
1367
  def WaitForJobChangeOnce(self, job_id, fields,
1368
                           prev_job_info, prev_log_serial):
1369
    """Waits for changes on a job.
1370

1371
    """
1372
    return self.cl.WaitForJobChangeOnce(job_id, fields,
1373
                                        prev_job_info, prev_log_serial)
1374

    
1375
  def QueryJobs(self, job_ids, fields):
1376
    """Returns the selected fields for the selected job IDs.
1377

1378
    """
1379
    return self.cl.QueryJobs(job_ids, fields)
1380

    
1381

    
1382
class FeedbackFnJobPollReportCb(JobPollReportCbBase):
1383
  def __init__(self, feedback_fn):
1384
    """Initializes this class.
1385

1386
    """
1387
    JobPollReportCbBase.__init__(self)
1388

    
1389
    self.feedback_fn = feedback_fn
1390

    
1391
    assert callable(feedback_fn)
1392

    
1393
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1394
    """Handles a log message.
1395

1396
    """
1397
    self.feedback_fn((timestamp, log_type, log_msg))
1398

    
1399
  def ReportNotChanged(self, job_id, status):
1400
    """Called if a job hasn't changed in a while.
1401

1402
    """
1403
    # Ignore
1404

    
1405

    
1406
class StdioJobPollReportCb(JobPollReportCbBase):
1407
  def __init__(self):
1408
    """Initializes this class.
1409

1410
    """
1411
    JobPollReportCbBase.__init__(self)
1412

    
1413
    self.notified_queued = False
1414
    self.notified_waitlock = False
1415

    
1416
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1417
    """Handles a log message.
1418

1419
    """
1420
    ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
1421
             utils.SafeEncode(log_msg))
1422

    
1423
  def ReportNotChanged(self, job_id, status):
1424
    """Called if a job hasn't changed in a while.
1425

1426
    """
1427
    if status is None:
1428
      return
1429

    
1430
    if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
1431
      ToStderr("Job %s is waiting in queue", job_id)
1432
      self.notified_queued = True
1433

    
1434
    elif status == constants.JOB_STATUS_WAITLOCK and not self.notified_waitlock:
1435
      ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1436
      self.notified_waitlock = True
1437

    
1438

    
1439
def PollJob(job_id, cl=None, feedback_fn=None):
1440
  """Function to poll for the result of a job.
1441

1442
  @type job_id: job identified
1443
  @param job_id: the job to poll for results
1444
  @type cl: luxi.Client
1445
  @param cl: the luxi client to use for communicating with the master;
1446
             if None, a new client will be created
1447

1448
  """
1449
  if cl is None:
1450
    cl = GetClient()
1451

    
1452
  if feedback_fn:
1453
    reporter = FeedbackFnJobPollReportCb(feedback_fn)
1454
  else:
1455
    reporter = StdioJobPollReportCb()
1456

    
1457
  return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
1458

    
1459

    
1460
def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):
1461
  """Legacy function to submit an opcode.
1462

1463
  This is just a simple wrapper over the construction of the processor
1464
  instance. It should be extended to better handle feedback and
1465
  interaction functions.
1466

1467
  """
1468
  if cl is None:
1469
    cl = GetClient()
1470

    
1471
  SetGenericOpcodeOpts([op], opts)
1472

    
1473
  job_id = SendJob([op], cl)
1474

    
1475
  op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
1476

    
1477
  return op_results[0]
1478

    
1479

    
1480
def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1481
  """Wrapper around SubmitOpCode or SendJob.
1482

1483
  This function will decide, based on the 'opts' parameter, whether to
1484
  submit and wait for the result of the opcode (and return it), or
1485
  whether to just send the job and print its identifier. It is used in
1486
  order to simplify the implementation of the '--submit' option.
1487

1488
  It will also process the opcodes if we're sending the via SendJob
1489
  (otherwise SubmitOpCode does it).
1490

1491
  """
1492
  if opts and opts.submit_only:
1493
    job = [op]
1494
    SetGenericOpcodeOpts(job, opts)
1495
    job_id = SendJob(job, cl=cl)
1496
    raise JobSubmittedException(job_id)
1497
  else:
1498
    return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1499

    
1500

    
1501
def SetGenericOpcodeOpts(opcode_list, options):
1502
  """Processor for generic options.
1503

1504
  This function updates the given opcodes based on generic command
1505
  line options (like debug, dry-run, etc.).
1506

1507
  @param opcode_list: list of opcodes
1508
  @param options: command line options or None
1509
  @return: None (in-place modification)
1510

1511
  """
1512
  if not options:
1513
    return
1514
  for op in opcode_list:
1515
    op.dry_run = options.dry_run
1516
    op.debug_level = options.debug
1517

    
1518

    
1519
def GetClient():
1520
  # TODO: Cache object?
1521
  try:
1522
    client = luxi.Client()
1523
  except luxi.NoMasterError:
1524
    ss = ssconf.SimpleStore()
1525

    
1526
    # Try to read ssconf file
1527
    try:
1528
      ss.GetMasterNode()
1529
    except errors.ConfigurationError:
1530
      raise errors.OpPrereqError("Cluster not initialized or this machine is"
1531
                                 " not part of a cluster")
1532

    
1533
    master, myself = ssconf.GetMasterAndMyself(ss=ss)
1534
    if master != myself:
1535
      raise errors.OpPrereqError("This is not the master node, please connect"
1536
                                 " to node '%s' and rerun the command" %
1537
                                 master)
1538
    raise
1539
  return client
1540

    
1541

    
1542
def FormatError(err):
1543
  """Return a formatted error message for a given error.
1544

1545
  This function takes an exception instance and returns a tuple
1546
  consisting of two values: first, the recommended exit code, and
1547
  second, a string describing the error message (not
1548
  newline-terminated).
1549

1550
  """
1551
  retcode = 1
1552
  obuf = StringIO()
1553
  msg = str(err)
1554
  if isinstance(err, errors.ConfigurationError):
1555
    txt = "Corrupt configuration file: %s" % msg
1556
    logging.error(txt)
1557
    obuf.write(txt + "\n")
1558
    obuf.write("Aborting.")
1559
    retcode = 2
1560
  elif isinstance(err, errors.HooksAbort):
1561
    obuf.write("Failure: hooks execution failed:\n")
1562
    for node, script, out in err.args[0]:
1563
      if out:
1564
        obuf.write("  node: %s, script: %s, output: %s\n" %
1565
                   (node, script, out))
1566
      else:
1567
        obuf.write("  node: %s, script: %s (no output)\n" %
1568
                   (node, script))
1569
  elif isinstance(err, errors.HooksFailure):
1570
    obuf.write("Failure: hooks general failure: %s" % msg)
1571
  elif isinstance(err, errors.ResolverError):
1572
    this_host = utils.HostInfo.SysName()
1573
    if err.args[0] == this_host:
1574
      msg = "Failure: can't resolve my own hostname ('%s')"
1575
    else:
1576
      msg = "Failure: can't resolve hostname '%s'"
1577
    obuf.write(msg % err.args[0])
1578
  elif isinstance(err, errors.OpPrereqError):
1579
    if len(err.args) == 2:
1580
      obuf.write("Failure: prerequisites not met for this"
1581
               " operation:\nerror type: %s, error details:\n%s" %
1582
                 (err.args[1], err.args[0]))
1583
    else:
1584
      obuf.write("Failure: prerequisites not met for this"
1585
                 " operation:\n%s" % msg)
1586
  elif isinstance(err, errors.OpExecError):
1587
    obuf.write("Failure: command execution error:\n%s" % msg)
1588
  elif isinstance(err, errors.TagError):
1589
    obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1590
  elif isinstance(err, errors.JobQueueDrainError):
1591
    obuf.write("Failure: the job queue is marked for drain and doesn't"
1592
               " accept new requests\n")
1593
  elif isinstance(err, errors.JobQueueFull):
1594
    obuf.write("Failure: the job queue is full and doesn't accept new"
1595
               " job submissions until old jobs are archived\n")
1596
  elif isinstance(err, errors.TypeEnforcementError):
1597
    obuf.write("Parameter Error: %s" % msg)
1598
  elif isinstance(err, errors.ParameterError):
1599
    obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1600
  elif isinstance(err, errors.GenericError):
1601
    obuf.write("Unhandled Ganeti error: %s" % msg)
1602
  elif isinstance(err, luxi.NoMasterError):
1603
    obuf.write("Cannot communicate with the master daemon.\nIs it running"
1604
               " and listening for connections?")
1605
  elif isinstance(err, luxi.TimeoutError):
1606
    obuf.write("Timeout while talking to the master daemon. Error:\n"
1607
               "%s" % msg)
1608
  elif isinstance(err, luxi.ProtocolError):
1609
    obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1610
               "%s" % msg)
1611
  elif isinstance(err, JobSubmittedException):
1612
    obuf.write("JobID: %s\n" % err.args[0])
1613
    retcode = 0
1614
  else:
1615
    obuf.write("Unhandled exception: %s" % msg)
1616
  return retcode, obuf.getvalue().rstrip('\n')
1617

    
1618

    
1619
def GenericMain(commands, override=None, aliases=None):
1620
  """Generic main function for all the gnt-* commands.
1621

1622
  Arguments:
1623
    - commands: a dictionary with a special structure, see the design doc
1624
                for command line handling.
1625
    - override: if not None, we expect a dictionary with keys that will
1626
                override command line options; this can be used to pass
1627
                options from the scripts to generic functions
1628
    - aliases: dictionary with command aliases {'alias': 'target, ...}
1629

1630
  """
1631
  # save the program name and the entire command line for later logging
1632
  if sys.argv:
1633
    binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1634
    if len(sys.argv) >= 2:
1635
      binary += " " + sys.argv[1]
1636
      old_cmdline = " ".join(sys.argv[2:])
1637
    else:
1638
      old_cmdline = ""
1639
  else:
1640
    binary = "<unknown program>"
1641
    old_cmdline = ""
1642

    
1643
  if aliases is None:
1644
    aliases = {}
1645

    
1646
  try:
1647
    func, options, args = _ParseArgs(sys.argv, commands, aliases)
1648
  except errors.ParameterError, err:
1649
    result, err_msg = FormatError(err)
1650
    ToStderr(err_msg)
1651
    return 1
1652

    
1653
  if func is None: # parse error
1654
    return 1
1655

    
1656
  if override is not None:
1657
    for key, val in override.iteritems():
1658
      setattr(options, key, val)
1659

    
1660
  utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1661
                     stderr_logging=True, program=binary)
1662

    
1663
  if old_cmdline:
1664
    logging.info("run with arguments '%s'", old_cmdline)
1665
  else:
1666
    logging.info("run with no arguments")
1667

    
1668
  try:
1669
    result = func(options, args)
1670
  except (errors.GenericError, luxi.ProtocolError,
1671
          JobSubmittedException), err:
1672
    result, err_msg = FormatError(err)
1673
    logging.exception("Error during command processing")
1674
    ToStderr(err_msg)
1675

    
1676
  return result
1677

    
1678

    
1679
def GenericInstanceCreate(mode, opts, args):
1680
  """Add an instance to the cluster via either creation or import.
1681

1682
  @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1683
  @param opts: the command line options selected by the user
1684
  @type args: list
1685
  @param args: should contain only one element, the new instance name
1686
  @rtype: int
1687
  @return: the desired exit code
1688

1689
  """
1690
  instance = args[0]
1691

    
1692
  (pnode, snode) = SplitNodeOption(opts.node)
1693

    
1694
  hypervisor = None
1695
  hvparams = {}
1696
  if opts.hypervisor:
1697
    hypervisor, hvparams = opts.hypervisor
1698

    
1699
  if opts.nics:
1700
    try:
1701
      nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
1702
    except ValueError, err:
1703
      raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
1704
    nics = [{}] * nic_max
1705
    for nidx, ndict in opts.nics:
1706
      nidx = int(nidx)
1707
      if not isinstance(ndict, dict):
1708
        msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict)
1709
        raise errors.OpPrereqError(msg)
1710
      nics[nidx] = ndict
1711
  elif opts.no_nics:
1712
    # no nics
1713
    nics = []
1714
  elif mode == constants.INSTANCE_CREATE:
1715
    # default of one nic, all auto
1716
    nics = [{}]
1717
  else:
1718
    # mode == import
1719
    nics = []
1720

    
1721
  if opts.disk_template == constants.DT_DISKLESS:
1722
    if opts.disks or opts.sd_size is not None:
1723
      raise errors.OpPrereqError("Diskless instance but disk"
1724
                                 " information passed")
1725
    disks = []
1726
  else:
1727
    if (not opts.disks and not opts.sd_size
1728
        and mode == constants.INSTANCE_CREATE):
1729
      raise errors.OpPrereqError("No disk information specified")
1730
    if opts.disks and opts.sd_size is not None:
1731
      raise errors.OpPrereqError("Please use either the '--disk' or"
1732
                                 " '-s' option")
1733
    if opts.sd_size is not None:
1734
      opts.disks = [(0, {"size": opts.sd_size})]
1735

    
1736
    if opts.disks:
1737
      try:
1738
        disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1739
      except ValueError, err:
1740
        raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
1741
      disks = [{}] * disk_max
1742
    else:
1743
      disks = []
1744
    for didx, ddict in opts.disks:
1745
      didx = int(didx)
1746
      if not isinstance(ddict, dict):
1747
        msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1748
        raise errors.OpPrereqError(msg)
1749
      elif "size" in ddict:
1750
        if "adopt" in ddict:
1751
          raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1752
                                     " (disk %d)" % didx)
1753
        try:
1754
          ddict["size"] = utils.ParseUnit(ddict["size"])
1755
        except ValueError, err:
1756
          raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1757
                                     (didx, err))
1758
      elif "adopt" in ddict:
1759
        if mode == constants.INSTANCE_IMPORT:
1760
          raise errors.OpPrereqError("Disk adoption not allowed for instance"
1761
                                     " import")
1762
        ddict["size"] = 0
1763
      else:
1764
        raise errors.OpPrereqError("Missing size or adoption source for"
1765
                                   " disk %d" % didx)
1766
      disks[didx] = ddict
1767

    
1768
  utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
1769
  utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1770

    
1771
  if mode == constants.INSTANCE_CREATE:
1772
    start = opts.start
1773
    os_type = opts.os
1774
    src_node = None
1775
    src_path = None
1776
    no_install = opts.no_install
1777
    identify_defaults = False
1778
  elif mode == constants.INSTANCE_IMPORT:
1779
    start = False
1780
    os_type = None
1781
    src_node = opts.src_node
1782
    src_path = opts.src_dir
1783
    no_install = None
1784
    identify_defaults = opts.identify_defaults
1785
  else:
1786
    raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1787

    
1788
  op = opcodes.OpCreateInstance(instance_name=instance,
1789
                                disks=disks,
1790
                                disk_template=opts.disk_template,
1791
                                nics=nics,
1792
                                pnode=pnode, snode=snode,
1793
                                ip_check=opts.ip_check,
1794
                                name_check=opts.name_check,
1795
                                wait_for_sync=opts.wait_for_sync,
1796
                                file_storage_dir=opts.file_storage_dir,
1797
                                file_driver=opts.file_driver,
1798
                                iallocator=opts.iallocator,
1799
                                hypervisor=hypervisor,
1800
                                hvparams=hvparams,
1801
                                beparams=opts.beparams,
1802
                                mode=mode,
1803
                                start=start,
1804
                                os_type=os_type,
1805
                                src_node=src_node,
1806
                                src_path=src_path,
1807
                                no_install=no_install,
1808
                                identify_defaults=identify_defaults)
1809

    
1810
  SubmitOrSend(op, opts)
1811
  return 0
1812

    
1813

    
1814
class _RunWhileClusterStoppedHelper:
1815
  """Helper class for L{RunWhileClusterStopped} to simplify state management
1816

1817
  """
1818
  def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
1819
    """Initializes this class.
1820

1821
    @type feedback_fn: callable
1822
    @param feedback_fn: Feedback function
1823
    @type cluster_name: string
1824
    @param cluster_name: Cluster name
1825
    @type master_node: string
1826
    @param master_node Master node name
1827
    @type online_nodes: list
1828
    @param online_nodes: List of names of online nodes
1829

1830
    """
1831
    self.feedback_fn = feedback_fn
1832
    self.cluster_name = cluster_name
1833
    self.master_node = master_node
1834
    self.online_nodes = online_nodes
1835

    
1836
    self.ssh = ssh.SshRunner(self.cluster_name)
1837

    
1838
    self.nonmaster_nodes = [name for name in online_nodes
1839
                            if name != master_node]
1840

    
1841
    assert self.master_node not in self.nonmaster_nodes
1842

    
1843
  def _RunCmd(self, node_name, cmd):
1844
    """Runs a command on the local or a remote machine.
1845

1846
    @type node_name: string
1847
    @param node_name: Machine name
1848
    @type cmd: list
1849
    @param cmd: Command
1850

1851
    """
1852
    if node_name is None or node_name == self.master_node:
1853
      # No need to use SSH
1854
      result = utils.RunCmd(cmd)
1855
    else:
1856
      result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
1857

    
1858
    if result.failed:
1859
      errmsg = ["Failed to run command %s" % result.cmd]
1860
      if node_name:
1861
        errmsg.append("on node %s" % node_name)
1862
      errmsg.append(": exitcode %s and error %s" %
1863
                    (result.exit_code, result.output))
1864
      raise errors.OpExecError(" ".join(errmsg))
1865

    
1866
  def Call(self, fn, *args):
1867
    """Call function while all daemons are stopped.
1868

1869
    @type fn: callable
1870
    @param fn: Function to be called
1871

1872
    """
1873
    # Pause watcher by acquiring an exclusive lock on watcher state file
1874
    self.feedback_fn("Blocking watcher")
1875
    watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
1876
    try:
1877
      # TODO: Currently, this just blocks. There's no timeout.
1878
      # TODO: Should it be a shared lock?
1879
      watcher_block.Exclusive(blocking=True)
1880

    
1881
      # Stop master daemons, so that no new jobs can come in and all running
1882
      # ones are finished
1883
      self.feedback_fn("Stopping master daemons")
1884
      self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
1885
      try:
1886
        # Stop daemons on all nodes
1887
        for node_name in self.online_nodes:
1888
          self.feedback_fn("Stopping daemons on %s" % node_name)
1889
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
1890

    
1891
        # All daemons are shut down now
1892
        try:
1893
          return fn(self, *args)
1894
        except Exception, err:
1895
          _, errmsg = FormatError(err)
1896
          logging.exception("Caught exception")
1897
          self.feedback_fn(errmsg)
1898
          raise
1899
      finally:
1900
        # Start cluster again, master node last
1901
        for node_name in self.nonmaster_nodes + [self.master_node]:
1902
          self.feedback_fn("Starting daemons on %s" % node_name)
1903
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
1904
    finally:
1905
      # Resume watcher
1906
      watcher_block.Close()
1907

    
1908

    
1909
def RunWhileClusterStopped(feedback_fn, fn, *args):
1910
  """Calls a function while all cluster daemons are stopped.
1911

1912
  @type feedback_fn: callable
1913
  @param feedback_fn: Feedback function
1914
  @type fn: callable
1915
  @param fn: Function to be called when daemons are stopped
1916

1917
  """
1918
  feedback_fn("Gathering cluster information")
1919

    
1920
  # This ensures we're running on the master daemon
1921
  cl = GetClient()
1922

    
1923
  (cluster_name, master_node) = \
1924
    cl.QueryConfigValues(["cluster_name", "master_node"])
1925

    
1926
  online_nodes = GetOnlineNodes([], cl=cl)
1927

    
1928
  # Don't keep a reference to the client. The master daemon will go away.
1929
  del cl
1930

    
1931
  assert master_node in online_nodes
1932

    
1933
  return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
1934
                                       online_nodes).Call(fn, *args)
1935

    
1936

    
1937
def GenerateTable(headers, fields, separator, data,
1938
                  numfields=None, unitfields=None,
1939
                  units=None):
1940
  """Prints a table with headers and different fields.
1941

1942
  @type headers: dict
1943
  @param headers: dictionary mapping field names to headers for
1944
      the table
1945
  @type fields: list
1946
  @param fields: the field names corresponding to each row in
1947
      the data field
1948
  @param separator: the separator to be used; if this is None,
1949
      the default 'smart' algorithm is used which computes optimal
1950
      field width, otherwise just the separator is used between
1951
      each field
1952
  @type data: list
1953
  @param data: a list of lists, each sublist being one row to be output
1954
  @type numfields: list
1955
  @param numfields: a list with the fields that hold numeric
1956
      values and thus should be right-aligned
1957
  @type unitfields: list
1958
  @param unitfields: a list with the fields that hold numeric
1959
      values that should be formatted with the units field
1960
  @type units: string or None
1961
  @param units: the units we should use for formatting, or None for
1962
      automatic choice (human-readable for non-separator usage, otherwise
1963
      megabytes); this is a one-letter string
1964

1965
  """
1966
  if units is None:
1967
    if separator:
1968
      units = "m"
1969
    else:
1970
      units = "h"
1971

    
1972
  if numfields is None:
1973
    numfields = []
1974
  if unitfields is None:
1975
    unitfields = []
1976

    
1977
  numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
1978
  unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
1979

    
1980
  format_fields = []
1981
  for field in fields:
1982
    if headers and field not in headers:
1983
      # TODO: handle better unknown fields (either revert to old
1984
      # style of raising exception, or deal more intelligently with
1985
      # variable fields)
1986
      headers[field] = field
1987
    if separator is not None:
1988
      format_fields.append("%s")
1989
    elif numfields.Matches(field):
1990
      format_fields.append("%*s")
1991
    else:
1992
      format_fields.append("%-*s")
1993

    
1994
  if separator is None:
1995
    mlens = [0 for name in fields]
1996
    format = ' '.join(format_fields)
1997
  else:
1998
    format = separator.replace("%", "%%").join(format_fields)
1999

    
2000
  for row in data:
2001
    if row is None:
2002
      continue
2003
    for idx, val in enumerate(row):
2004
      if unitfields.Matches(fields[idx]):
2005
        try:
2006
          val = int(val)
2007
        except (TypeError, ValueError):
2008
          pass
2009
        else:
2010
          val = row[idx] = utils.FormatUnit(val, units)
2011
      val = row[idx] = str(val)
2012
      if separator is None:
2013
        mlens[idx] = max(mlens[idx], len(val))
2014

    
2015
  result = []
2016
  if headers:
2017
    args = []
2018
    for idx, name in enumerate(fields):
2019
      hdr = headers[name]
2020
      if separator is None:
2021
        mlens[idx] = max(mlens[idx], len(hdr))
2022
        args.append(mlens[idx])
2023
      args.append(hdr)
2024
    result.append(format % tuple(args))
2025

    
2026
  if separator is None:
2027
    assert len(mlens) == len(fields)
2028

    
2029
    if fields and not numfields.Matches(fields[-1]):
2030
      mlens[-1] = 0
2031

    
2032
  for line in data:
2033
    args = []
2034
    if line is None:
2035
      line = ['-' for _ in fields]
2036
    for idx in range(len(fields)):
2037
      if separator is None:
2038
        args.append(mlens[idx])
2039
      args.append(line[idx])
2040
    result.append(format % tuple(args))
2041

    
2042
  return result
2043

    
2044

    
2045
def FormatTimestamp(ts):
2046
  """Formats a given timestamp.
2047

2048
  @type ts: timestamp
2049
  @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
2050

2051
  @rtype: string
2052
  @return: a string with the formatted timestamp
2053

2054
  """
2055
  if not isinstance (ts, (tuple, list)) or len(ts) != 2:
2056
    return '?'
2057
  sec, usec = ts
2058
  return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
2059

    
2060

    
2061
def ParseTimespec(value):
2062
  """Parse a time specification.
2063

2064
  The following suffixed will be recognized:
2065

2066
    - s: seconds
2067
    - m: minutes
2068
    - h: hours
2069
    - d: day
2070
    - w: weeks
2071

2072
  Without any suffix, the value will be taken to be in seconds.
2073

2074
  """
2075
  value = str(value)
2076
  if not value:
2077
    raise errors.OpPrereqError("Empty time specification passed")
2078
  suffix_map = {
2079
    's': 1,
2080
    'm': 60,
2081
    'h': 3600,
2082
    'd': 86400,
2083
    'w': 604800,
2084
    }
2085
  if value[-1] not in suffix_map:
2086
    try:
2087
      value = int(value)
2088
    except (TypeError, ValueError):
2089
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2090
  else:
2091
    multiplier = suffix_map[value[-1]]
2092
    value = value[:-1]
2093
    if not value: # no data left after stripping the suffix
2094
      raise errors.OpPrereqError("Invalid time specification (only"
2095
                                 " suffix passed)")
2096
    try:
2097
      value = int(value) * multiplier
2098
    except (TypeError, ValueError):
2099
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2100
  return value
2101

    
2102

    
2103
def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
2104
                   filter_master=False):
2105
  """Returns the names of online nodes.
2106

2107
  This function will also log a warning on stderr with the names of
2108
  the online nodes.
2109

2110
  @param nodes: if not empty, use only this subset of nodes (minus the
2111
      offline ones)
2112
  @param cl: if not None, luxi client to use
2113
  @type nowarn: boolean
2114
  @param nowarn: by default, this function will output a note with the
2115
      offline nodes that are skipped; if this parameter is True the
2116
      note is not displayed
2117
  @type secondary_ips: boolean
2118
  @param secondary_ips: if True, return the secondary IPs instead of the
2119
      names, useful for doing network traffic over the replication interface
2120
      (if any)
2121
  @type filter_master: boolean
2122
  @param filter_master: if True, do not return the master node in the list
2123
      (useful in coordination with secondary_ips where we cannot check our
2124
      node name against the list)
2125

2126
  """
2127
  if cl is None:
2128
    cl = GetClient()
2129

    
2130
  if secondary_ips:
2131
    name_idx = 2
2132
  else:
2133
    name_idx = 0
2134

    
2135
  if filter_master:
2136
    master_node = cl.QueryConfigValues(["master_node"])[0]
2137
    filter_fn = lambda x: x != master_node
2138
  else:
2139
    filter_fn = lambda _: True
2140

    
2141
  result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"],
2142
                         use_locking=False)
2143
  offline = [row[0] for row in result if row[1]]
2144
  if offline and not nowarn:
2145
    ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
2146
  return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])]
2147

    
2148

    
2149
def _ToStream(stream, txt, *args):
2150
  """Write a message to a stream, bypassing the logging system
2151

2152
  @type stream: file object
2153
  @param stream: the file to which we should write
2154
  @type txt: str
2155
  @param txt: the message
2156

2157
  """
2158
  if args:
2159
    args = tuple(args)
2160
    stream.write(txt % args)
2161
  else:
2162
    stream.write(txt)
2163
  stream.write('\n')
2164
  stream.flush()
2165

    
2166

    
2167
def ToStdout(txt, *args):
2168
  """Write a message to stdout only, bypassing the logging system
2169

2170
  This is just a wrapper over _ToStream.
2171

2172
  @type txt: str
2173
  @param txt: the message
2174

2175
  """
2176
  _ToStream(sys.stdout, txt, *args)
2177

    
2178

    
2179
def ToStderr(txt, *args):
2180
  """Write a message to stderr only, bypassing the logging system
2181

2182
  This is just a wrapper over _ToStream.
2183

2184
  @type txt: str
2185
  @param txt: the message
2186

2187
  """
2188
  _ToStream(sys.stderr, txt, *args)
2189

    
2190

    
2191
class JobExecutor(object):
2192
  """Class which manages the submission and execution of multiple jobs.
2193

2194
  Note that instances of this class should not be reused between
2195
  GetResults() calls.
2196

2197
  """
2198
  def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
2199
    self.queue = []
2200
    if cl is None:
2201
      cl = GetClient()
2202
    self.cl = cl
2203
    self.verbose = verbose
2204
    self.jobs = []
2205
    self.opts = opts
2206
    self.feedback_fn = feedback_fn
2207

    
2208
  def QueueJob(self, name, *ops):
2209
    """Record a job for later submit.
2210

2211
    @type name: string
2212
    @param name: a description of the job, will be used in WaitJobSet
2213
    """
2214
    SetGenericOpcodeOpts(ops, self.opts)
2215
    self.queue.append((name, ops))
2216

    
2217
  def SubmitPending(self):
2218
    """Submit all pending jobs.
2219

2220
    """
2221
    results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
2222
    for (idx, ((status, data), (name, _))) in enumerate(zip(results,
2223
                                                            self.queue)):
2224
      self.jobs.append((idx, status, data, name))
2225

    
2226
  def _ChooseJob(self):
2227
    """Choose a non-waiting/queued job to poll next.
2228

2229
    """
2230
    assert self.jobs, "_ChooseJob called with empty job list"
2231

    
2232
    result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
2233
    assert result
2234

    
2235
    for job_data, status in zip(self.jobs, result):
2236
      if status[0] in (constants.JOB_STATUS_QUEUED,
2237
                    constants.JOB_STATUS_WAITLOCK,
2238
                    constants.JOB_STATUS_CANCELING):
2239
        # job is still waiting
2240
        continue
2241
      # good candidate found
2242
      self.jobs.remove(job_data)
2243
      return job_data
2244

    
2245
    # no job found
2246
    return self.jobs.pop(0)
2247

    
2248
  def GetResults(self):
2249
    """Wait for and return the results of all jobs.
2250

2251
    @rtype: list
2252
    @return: list of tuples (success, job results), in the same order
2253
        as the submitted jobs; if a job has failed, instead of the result
2254
        there will be the error message
2255

2256
    """
2257
    if not self.jobs:
2258
      self.SubmitPending()
2259
    results = []
2260
    if self.verbose:
2261
      ok_jobs = [row[2] for row in self.jobs if row[1]]
2262
      if ok_jobs:
2263
        ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2264

    
2265
    # first, remove any non-submitted jobs
2266
    self.jobs, failures = compat.partition(self.jobs, lambda x: x[1])
2267
    for idx, _, jid, name in failures:
2268
      ToStderr("Failed to submit job for %s: %s", name, jid)
2269
      results.append((idx, False, jid))
2270

    
2271
    while self.jobs:
2272
      (idx, _, jid, name) = self._ChooseJob()
2273
      ToStdout("Waiting for job %s for %s...", jid, name)
2274
      try:
2275
        job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2276
        success = True
2277
      except (errors.GenericError, luxi.ProtocolError), err:
2278
        _, job_result = FormatError(err)
2279
        success = False
2280
        # the error message will always be shown, verbose or not
2281
        ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
2282

    
2283
      results.append((idx, success, job_result))
2284

    
2285
    # sort based on the index, then drop it
2286
    results.sort()
2287
    results = [i[1:] for i in results]
2288

    
2289
    return results
2290

    
2291
  def WaitOrShow(self, wait):
2292
    """Wait for job results or only print the job IDs.
2293

2294
    @type wait: boolean
2295
    @param wait: whether to wait or not
2296

2297
    """
2298
    if wait:
2299
      return self.GetResults()
2300
    else:
2301
      if not self.jobs:
2302
        self.SubmitPending()
2303
      for _, status, result, name in self.jobs:
2304
        if status:
2305
          ToStdout("%s: %s", result, name)
2306
        else:
2307
          ToStderr("Failure for %s: %s", name, result)