Statistics
| Branch: | Tag: | Revision:

root / lib / cli.py @ 84a12e40

History | View | Annotate | Download (77.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module dealing with command line parsing"""
23

    
24

    
25
import sys
26
import textwrap
27
import os.path
28
import time
29
import logging
30
from cStringIO import StringIO
31

    
32
from ganeti import utils
33
from ganeti import errors
34
from ganeti import constants
35
from ganeti import opcodes
36
from ganeti import luxi
37
from ganeti import ssconf
38
from ganeti import rpc
39
from ganeti import ssh
40
from ganeti import compat
41
from ganeti import netutils
42

    
43
from optparse import (OptionParser, TitledHelpFormatter,
44
                      Option, OptionValueError)
45

    
46

    
47
__all__ = [
48
  # Command line options
49
  "ADD_UIDS_OPT",
50
  "ALLOCATABLE_OPT",
51
  "ALL_OPT",
52
  "AUTO_PROMOTE_OPT",
53
  "AUTO_REPLACE_OPT",
54
  "BACKEND_OPT",
55
  "CLEANUP_OPT",
56
  "CLUSTER_DOMAIN_SECRET_OPT",
57
  "CONFIRM_OPT",
58
  "CP_SIZE_OPT",
59
  "DEBUG_OPT",
60
  "DEBUG_SIMERR_OPT",
61
  "DISKIDX_OPT",
62
  "DISK_OPT",
63
  "DISK_TEMPLATE_OPT",
64
  "DRAINED_OPT",
65
  "DRBD_HELPER_OPT",
66
  "EARLY_RELEASE_OPT",
67
  "ENABLED_HV_OPT",
68
  "ERROR_CODES_OPT",
69
  "FIELDS_OPT",
70
  "FILESTORE_DIR_OPT",
71
  "FILESTORE_DRIVER_OPT",
72
  "FORCE_OPT",
73
  "FORCE_VARIANT_OPT",
74
  "GLOBAL_FILEDIR_OPT",
75
  "HVLIST_OPT",
76
  "HVOPTS_OPT",
77
  "HYPERVISOR_OPT",
78
  "IALLOCATOR_OPT",
79
  "DEFAULT_IALLOCATOR_OPT",
80
  "IDENTIFY_DEFAULTS_OPT",
81
  "IGNORE_CONSIST_OPT",
82
  "IGNORE_FAILURES_OPT",
83
  "IGNORE_REMOVE_FAILURES_OPT",
84
  "IGNORE_SECONDARIES_OPT",
85
  "IGNORE_SIZE_OPT",
86
  "MAC_PREFIX_OPT",
87
  "MAINTAIN_NODE_HEALTH_OPT",
88
  "MASTER_NETDEV_OPT",
89
  "MC_OPT",
90
  "MIGRATION_TYPE_OPT",
91
  "NET_OPT",
92
  "NEW_CLUSTER_CERT_OPT",
93
  "NEW_CLUSTER_DOMAIN_SECRET_OPT",
94
  "NEW_CONFD_HMAC_KEY_OPT",
95
  "NEW_RAPI_CERT_OPT",
96
  "NEW_SECONDARY_OPT",
97
  "NIC_PARAMS_OPT",
98
  "NODE_LIST_OPT",
99
  "NODE_PLACEMENT_OPT",
100
  "NODRBD_STORAGE_OPT",
101
  "NOHDR_OPT",
102
  "NOIPCHECK_OPT",
103
  "NO_INSTALL_OPT",
104
  "NONAMECHECK_OPT",
105
  "NOLVM_STORAGE_OPT",
106
  "NOMODIFY_ETCHOSTS_OPT",
107
  "NOMODIFY_SSH_SETUP_OPT",
108
  "NONICS_OPT",
109
  "NONLIVE_OPT",
110
  "NONPLUS1_OPT",
111
  "NOSHUTDOWN_OPT",
112
  "NOSTART_OPT",
113
  "NOSSH_KEYCHECK_OPT",
114
  "NOVOTING_OPT",
115
  "NWSYNC_OPT",
116
  "ON_PRIMARY_OPT",
117
  "ON_SECONDARY_OPT",
118
  "OFFLINE_OPT",
119
  "OSPARAMS_OPT",
120
  "OS_OPT",
121
  "OS_SIZE_OPT",
122
  "RAPI_CERT_OPT",
123
  "READD_OPT",
124
  "REBOOT_TYPE_OPT",
125
  "REMOVE_INSTANCE_OPT",
126
  "REMOVE_UIDS_OPT",
127
  "ROMAN_OPT",
128
  "SECONDARY_IP_OPT",
129
  "SELECT_OS_OPT",
130
  "SEP_OPT",
131
  "SHOWCMD_OPT",
132
  "SHUTDOWN_TIMEOUT_OPT",
133
  "SINGLE_NODE_OPT",
134
  "SRC_DIR_OPT",
135
  "SRC_NODE_OPT",
136
  "SUBMIT_OPT",
137
  "STATIC_OPT",
138
  "SYNC_OPT",
139
  "TAG_SRC_OPT",
140
  "TIMEOUT_OPT",
141
  "UIDPOOL_OPT",
142
  "USEUNITS_OPT",
143
  "USE_REPL_NET_OPT",
144
  "VERBOSE_OPT",
145
  "VG_NAME_OPT",
146
  "YES_DOIT_OPT",
147
  # Generic functions for CLI programs
148
  "GenericMain",
149
  "GenericInstanceCreate",
150
  "GetClient",
151
  "GetOnlineNodes",
152
  "JobExecutor",
153
  "JobSubmittedException",
154
  "ParseTimespec",
155
  "RunWhileClusterStopped",
156
  "SubmitOpCode",
157
  "SubmitOrSend",
158
  "UsesRPC",
159
  # Formatting functions
160
  "ToStderr", "ToStdout",
161
  "FormatError",
162
  "GenerateTable",
163
  "AskUser",
164
  "FormatTimestamp",
165
  "FormatLogMessage",
166
  # Tags functions
167
  "ListTags",
168
  "AddTags",
169
  "RemoveTags",
170
  # command line options support infrastructure
171
  "ARGS_MANY_INSTANCES",
172
  "ARGS_MANY_NODES",
173
  "ARGS_NONE",
174
  "ARGS_ONE_INSTANCE",
175
  "ARGS_ONE_NODE",
176
  "ARGS_ONE_OS",
177
  "ArgChoice",
178
  "ArgCommand",
179
  "ArgFile",
180
  "ArgHost",
181
  "ArgInstance",
182
  "ArgJobId",
183
  "ArgNode",
184
  "ArgOs",
185
  "ArgSuggest",
186
  "ArgUnknown",
187
  "OPT_COMPL_INST_ADD_NODES",
188
  "OPT_COMPL_MANY_NODES",
189
  "OPT_COMPL_ONE_IALLOCATOR",
190
  "OPT_COMPL_ONE_INSTANCE",
191
  "OPT_COMPL_ONE_NODE",
192
  "OPT_COMPL_ONE_OS",
193
  "cli_option",
194
  "SplitNodeOption",
195
  "CalculateOSNames",
196
  ]
197

    
198
NO_PREFIX = "no_"
199
UN_PREFIX = "-"
200

    
201

    
202
class _Argument:
203
  def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
204
    self.min = min
205
    self.max = max
206

    
207
  def __repr__(self):
208
    return ("<%s min=%s max=%s>" %
209
            (self.__class__.__name__, self.min, self.max))
210

    
211

    
212
class ArgSuggest(_Argument):
213
  """Suggesting argument.
214

215
  Value can be any of the ones passed to the constructor.
216

217
  """
218
  # pylint: disable-msg=W0622
219
  def __init__(self, min=0, max=None, choices=None):
220
    _Argument.__init__(self, min=min, max=max)
221
    self.choices = choices
222

    
223
  def __repr__(self):
224
    return ("<%s min=%s max=%s choices=%r>" %
225
            (self.__class__.__name__, self.min, self.max, self.choices))
226

    
227

    
228
class ArgChoice(ArgSuggest):
229
  """Choice argument.
230

231
  Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
232
  but value must be one of the choices.
233

234
  """
235

    
236

    
237
class ArgUnknown(_Argument):
238
  """Unknown argument to program (e.g. determined at runtime).
239

240
  """
241

    
242

    
243
class ArgInstance(_Argument):
244
  """Instances argument.
245

246
  """
247

    
248

    
249
class ArgNode(_Argument):
250
  """Node argument.
251

252
  """
253

    
254
class ArgJobId(_Argument):
255
  """Job ID argument.
256

257
  """
258

    
259

    
260
class ArgFile(_Argument):
261
  """File path argument.
262

263
  """
264

    
265

    
266
class ArgCommand(_Argument):
267
  """Command argument.
268

269
  """
270

    
271

    
272
class ArgHost(_Argument):
273
  """Host argument.
274

275
  """
276

    
277

    
278
class ArgOs(_Argument):
279
  """OS argument.
280

281
  """
282

    
283

    
284
ARGS_NONE = []
285
ARGS_MANY_INSTANCES = [ArgInstance()]
286
ARGS_MANY_NODES = [ArgNode()]
287
ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
288
ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
289
ARGS_ONE_OS = [ArgOs(min=1, max=1)]
290

    
291

    
292
def _ExtractTagsObject(opts, args):
293
  """Extract the tag type object.
294

295
  Note that this function will modify its args parameter.
296

297
  """
298
  if not hasattr(opts, "tag_type"):
299
    raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
300
  kind = opts.tag_type
301
  if kind == constants.TAG_CLUSTER:
302
    retval = kind, kind
303
  elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
304
    if not args:
305
      raise errors.OpPrereqError("no arguments passed to the command")
306
    name = args.pop(0)
307
    retval = kind, name
308
  else:
309
    raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
310
  return retval
311

    
312

    
313
def _ExtendTags(opts, args):
314
  """Extend the args if a source file has been given.
315

316
  This function will extend the tags with the contents of the file
317
  passed in the 'tags_source' attribute of the opts parameter. A file
318
  named '-' will be replaced by stdin.
319

320
  """
321
  fname = opts.tags_source
322
  if fname is None:
323
    return
324
  if fname == "-":
325
    new_fh = sys.stdin
326
  else:
327
    new_fh = open(fname, "r")
328
  new_data = []
329
  try:
330
    # we don't use the nice 'new_data = [line.strip() for line in fh]'
331
    # because of python bug 1633941
332
    while True:
333
      line = new_fh.readline()
334
      if not line:
335
        break
336
      new_data.append(line.strip())
337
  finally:
338
    new_fh.close()
339
  args.extend(new_data)
340

    
341

    
342
def ListTags(opts, args):
343
  """List the tags on a given object.
344

345
  This is a generic implementation that knows how to deal with all
346
  three cases of tag objects (cluster, node, instance). The opts
347
  argument is expected to contain a tag_type field denoting what
348
  object type we work on.
349

350
  """
351
  kind, name = _ExtractTagsObject(opts, args)
352
  cl = GetClient()
353
  result = cl.QueryTags(kind, name)
354
  result = list(result)
355
  result.sort()
356
  for tag in result:
357
    ToStdout(tag)
358

    
359

    
360
def AddTags(opts, args):
361
  """Add tags on a given object.
362

363
  This is a generic implementation that knows how to deal with all
364
  three cases of tag objects (cluster, node, instance). The opts
365
  argument is expected to contain a tag_type field denoting what
366
  object type we work on.
367

368
  """
369
  kind, name = _ExtractTagsObject(opts, args)
370
  _ExtendTags(opts, args)
371
  if not args:
372
    raise errors.OpPrereqError("No tags to be added")
373
  op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
374
  SubmitOpCode(op)
375

    
376

    
377
def RemoveTags(opts, args):
378
  """Remove tags from a given object.
379

380
  This is a generic implementation that knows how to deal with all
381
  three cases of tag objects (cluster, node, instance). The opts
382
  argument is expected to contain a tag_type field denoting what
383
  object type we work on.
384

385
  """
386
  kind, name = _ExtractTagsObject(opts, args)
387
  _ExtendTags(opts, args)
388
  if not args:
389
    raise errors.OpPrereqError("No tags to be removed")
390
  op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
391
  SubmitOpCode(op)
392

    
393

    
394
def check_unit(option, opt, value): # pylint: disable-msg=W0613
395
  """OptParsers custom converter for units.
396

397
  """
398
  try:
399
    return utils.ParseUnit(value)
400
  except errors.UnitParseError, err:
401
    raise OptionValueError("option %s: %s" % (opt, err))
402

    
403

    
404
def _SplitKeyVal(opt, data):
405
  """Convert a KeyVal string into a dict.
406

407
  This function will convert a key=val[,...] string into a dict. Empty
408
  values will be converted specially: keys which have the prefix 'no_'
409
  will have the value=False and the prefix stripped, the others will
410
  have value=True.
411

412
  @type opt: string
413
  @param opt: a string holding the option name for which we process the
414
      data, used in building error messages
415
  @type data: string
416
  @param data: a string of the format key=val,key=val,...
417
  @rtype: dict
418
  @return: {key=val, key=val}
419
  @raises errors.ParameterError: if there are duplicate keys
420

421
  """
422
  kv_dict = {}
423
  if data:
424
    for elem in utils.UnescapeAndSplit(data, sep=","):
425
      if "=" in elem:
426
        key, val = elem.split("=", 1)
427
      else:
428
        if elem.startswith(NO_PREFIX):
429
          key, val = elem[len(NO_PREFIX):], False
430
        elif elem.startswith(UN_PREFIX):
431
          key, val = elem[len(UN_PREFIX):], None
432
        else:
433
          key, val = elem, True
434
      if key in kv_dict:
435
        raise errors.ParameterError("Duplicate key '%s' in option %s" %
436
                                    (key, opt))
437
      kv_dict[key] = val
438
  return kv_dict
439

    
440

    
441
def check_ident_key_val(option, opt, value):  # pylint: disable-msg=W0613
442
  """Custom parser for ident:key=val,key=val options.
443

444
  This will store the parsed values as a tuple (ident, {key: val}). As such,
445
  multiple uses of this option via action=append is possible.
446

447
  """
448
  if ":" not in value:
449
    ident, rest = value, ''
450
  else:
451
    ident, rest = value.split(":", 1)
452

    
453
  if ident.startswith(NO_PREFIX):
454
    if rest:
455
      msg = "Cannot pass options when removing parameter groups: %s" % value
456
      raise errors.ParameterError(msg)
457
    retval = (ident[len(NO_PREFIX):], False)
458
  elif ident.startswith(UN_PREFIX):
459
    if rest:
460
      msg = "Cannot pass options when removing parameter groups: %s" % value
461
      raise errors.ParameterError(msg)
462
    retval = (ident[len(UN_PREFIX):], None)
463
  else:
464
    kv_dict = _SplitKeyVal(opt, rest)
465
    retval = (ident, kv_dict)
466
  return retval
467

    
468

    
469
def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
470
  """Custom parser class for key=val,key=val options.
471

472
  This will store the parsed values as a dict {key: val}.
473

474
  """
475
  return _SplitKeyVal(opt, value)
476

    
477

    
478
def check_bool(option, opt, value): # pylint: disable-msg=W0613
479
  """Custom parser for yes/no options.
480

481
  This will store the parsed value as either True or False.
482

483
  """
484
  value = value.lower()
485
  if value == constants.VALUE_FALSE or value == "no":
486
    return False
487
  elif value == constants.VALUE_TRUE or value == "yes":
488
    return True
489
  else:
490
    raise errors.ParameterError("Invalid boolean value '%s'" % value)
491

    
492

    
493
# completion_suggestion is normally a list. Using numeric values not evaluating
494
# to False for dynamic completion.
495
(OPT_COMPL_MANY_NODES,
496
 OPT_COMPL_ONE_NODE,
497
 OPT_COMPL_ONE_INSTANCE,
498
 OPT_COMPL_ONE_OS,
499
 OPT_COMPL_ONE_IALLOCATOR,
500
 OPT_COMPL_INST_ADD_NODES) = range(100, 106)
501

    
502
OPT_COMPL_ALL = frozenset([
503
  OPT_COMPL_MANY_NODES,
504
  OPT_COMPL_ONE_NODE,
505
  OPT_COMPL_ONE_INSTANCE,
506
  OPT_COMPL_ONE_OS,
507
  OPT_COMPL_ONE_IALLOCATOR,
508
  OPT_COMPL_INST_ADD_NODES,
509
  ])
510

    
511

    
512
class CliOption(Option):
513
  """Custom option class for optparse.
514

515
  """
516
  ATTRS = Option.ATTRS + [
517
    "completion_suggest",
518
    ]
519
  TYPES = Option.TYPES + (
520
    "identkeyval",
521
    "keyval",
522
    "unit",
523
    "bool",
524
    )
525
  TYPE_CHECKER = Option.TYPE_CHECKER.copy()
526
  TYPE_CHECKER["identkeyval"] = check_ident_key_val
527
  TYPE_CHECKER["keyval"] = check_key_val
528
  TYPE_CHECKER["unit"] = check_unit
529
  TYPE_CHECKER["bool"] = check_bool
530

    
531

    
532
# optparse.py sets make_option, so we do it for our own option class, too
533
cli_option = CliOption
534

    
535

    
536
_YORNO = "yes|no"
537

    
538
DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
539
                       help="Increase debugging level")
540

    
541
NOHDR_OPT = cli_option("--no-headers", default=False,
542
                       action="store_true", dest="no_headers",
543
                       help="Don't display column headers")
544

    
545
SEP_OPT = cli_option("--separator", default=None,
546
                     action="store", dest="separator",
547
                     help=("Separator between output fields"
548
                           " (defaults to one space)"))
549

    
550
USEUNITS_OPT = cli_option("--units", default=None,
551
                          dest="units", choices=('h', 'm', 'g', 't'),
552
                          help="Specify units for output (one of hmgt)")
553

    
554
FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
555
                        type="string", metavar="FIELDS",
556
                        help="Comma separated list of output fields")
557

    
558
FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
559
                       default=False, help="Force the operation")
560

    
561
CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
562
                         default=False, help="Do not require confirmation")
563

    
564
TAG_SRC_OPT = cli_option("--from", dest="tags_source",
565
                         default=None, help="File with tag names")
566

    
567
SUBMIT_OPT = cli_option("--submit", dest="submit_only",
568
                        default=False, action="store_true",
569
                        help=("Submit the job and return the job ID, but"
570
                              " don't wait for the job to finish"))
571

    
572
SYNC_OPT = cli_option("--sync", dest="do_locking",
573
                      default=False, action="store_true",
574
                      help=("Grab locks while doing the queries"
575
                            " in order to ensure more consistent results"))
576

    
577
_DRY_RUN_OPT = cli_option("--dry-run", default=False,
578
                          action="store_true",
579
                          help=("Do not execute the operation, just run the"
580
                                " check steps and verify it it could be"
581
                                " executed"))
582

    
583
VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
584
                         action="store_true",
585
                         help="Increase the verbosity of the operation")
586

    
587
DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
588
                              action="store_true", dest="simulate_errors",
589
                              help="Debugging option that makes the operation"
590
                              " treat most runtime checks as failed")
591

    
592
NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
593
                        default=True, action="store_false",
594
                        help="Don't wait for sync (DANGEROUS!)")
595

    
596
DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
597
                               help="Custom disk setup (diskless, file,"
598
                               " plain or drbd)",
599
                               default=None, metavar="TEMPL",
600
                               choices=list(constants.DISK_TEMPLATES))
601

    
602
NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
603
                        help="Do not create any network cards for"
604
                        " the instance")
605

    
606
FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
607
                               help="Relative path under default cluster-wide"
608
                               " file storage dir to store file-based disks",
609
                               default=None, metavar="<DIR>")
610

    
611
FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
612
                                  help="Driver to use for image files",
613
                                  default="loop", metavar="<DRIVER>",
614
                                  choices=list(constants.FILE_DRIVER))
615

    
616
IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
617
                            help="Select nodes for the instance automatically"
618
                            " using the <NAME> iallocator plugin",
619
                            default=None, type="string",
620
                            completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
621

    
622
DEFAULT_IALLOCATOR_OPT = cli_option("-I", "--default-iallocator",
623
                            metavar="<NAME>",
624
                            help="Set the default instance allocator plugin",
625
                            default=None, type="string",
626
                            completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
627

    
628
OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
629
                    metavar="<os>",
630
                    completion_suggest=OPT_COMPL_ONE_OS)
631

    
632
OSPARAMS_OPT = cli_option("-O", "--os-parameters", dest="osparams",
633
                         type="keyval", default={},
634
                         help="OS parameters")
635

    
636
FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
637
                               action="store_true", default=False,
638
                               help="Force an unknown variant")
639

    
640
NO_INSTALL_OPT = cli_option("--no-install", dest="no_install",
641
                            action="store_true", default=False,
642
                            help="Do not install the OS (will"
643
                            " enable no-start)")
644

    
645
BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
646
                         type="keyval", default={},
647
                         help="Backend parameters")
648

    
649
HVOPTS_OPT =  cli_option("-H", "--hypervisor-parameters", type="keyval",
650
                         default={}, dest="hvparams",
651
                         help="Hypervisor parameters")
652

    
653
HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
654
                            help="Hypervisor and hypervisor options, in the"
655
                            " format hypervisor:option=value,option=value,...",
656
                            default=None, type="identkeyval")
657

    
658
HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
659
                        help="Hypervisor and hypervisor options, in the"
660
                        " format hypervisor:option=value,option=value,...",
661
                        default=[], action="append", type="identkeyval")
662

    
663
NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
664
                           action="store_false",
665
                           help="Don't check that the instance's IP"
666
                           " is alive")
667

    
668
NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
669
                             default=True, action="store_false",
670
                             help="Don't check that the instance's name"
671
                             " is resolvable")
672

    
673
NET_OPT = cli_option("--net",
674
                     help="NIC parameters", default=[],
675
                     dest="nics", action="append", type="identkeyval")
676

    
677
DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
678
                      dest="disks", action="append", type="identkeyval")
679

    
680
DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
681
                         help="Comma-separated list of disks"
682
                         " indices to act on (e.g. 0,2) (optional,"
683
                         " defaults to all disks)")
684

    
685
OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
686
                         help="Enforces a single-disk configuration using the"
687
                         " given disk size, in MiB unless a suffix is used",
688
                         default=None, type="unit", metavar="<size>")
689

    
690
IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
691
                                dest="ignore_consistency",
692
                                action="store_true", default=False,
693
                                help="Ignore the consistency of the disks on"
694
                                " the secondary")
695

    
696
NONLIVE_OPT = cli_option("--non-live", dest="live",
697
                         default=True, action="store_false",
698
                         help="Do a non-live migration (this usually means"
699
                         " freeze the instance, save the state, transfer and"
700
                         " only then resume running on the secondary node)")
701

    
702
MIGRATION_TYPE_OPT = cli_option("--migration-type", dest="migration_type",
703
                                default=None,
704
                                choices=list(constants.HT_MIGRATION_TYPES),
705
                                help="Override default migration type (choose"
706
                                " either live or non-live")
707

    
708
NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
709
                                help="Target node and optional secondary node",
710
                                metavar="<pnode>[:<snode>]",
711
                                completion_suggest=OPT_COMPL_INST_ADD_NODES)
712

    
713
NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
714
                           action="append", metavar="<node>",
715
                           help="Use only this node (can be used multiple"
716
                           " times, if not given defaults to all nodes)",
717
                           completion_suggest=OPT_COMPL_ONE_NODE)
718

    
719
SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
720
                             metavar="<node>",
721
                             completion_suggest=OPT_COMPL_ONE_NODE)
722

    
723
NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
724
                         action="store_false",
725
                         help="Don't start the instance after creation")
726

    
727
SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
728
                         action="store_true", default=False,
729
                         help="Show command instead of executing it")
730

    
731
CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
732
                         default=False, action="store_true",
733
                         help="Instead of performing the migration, try to"
734
                         " recover from a failed cleanup. This is safe"
735
                         " to run even if the instance is healthy, but it"
736
                         " will create extra replication traffic and "
737
                         " disrupt briefly the replication (like during the"
738
                         " migration")
739

    
740
STATIC_OPT = cli_option("-s", "--static", dest="static",
741
                        action="store_true", default=False,
742
                        help="Only show configuration data, not runtime data")
743

    
744
ALL_OPT = cli_option("--all", dest="show_all",
745
                     default=False, action="store_true",
746
                     help="Show info on all instances on the cluster."
747
                     " This can take a long time to run, use wisely")
748

    
749
SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
750
                           action="store_true", default=False,
751
                           help="Interactive OS reinstall, lists available"
752
                           " OS templates for selection")
753

    
754
IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
755
                                 action="store_true", default=False,
756
                                 help="Remove the instance from the cluster"
757
                                 " configuration even if there are failures"
758
                                 " during the removal process")
759

    
760
IGNORE_REMOVE_FAILURES_OPT = cli_option("--ignore-remove-failures",
761
                                        dest="ignore_remove_failures",
762
                                        action="store_true", default=False,
763
                                        help="Remove the instance from the"
764
                                        " cluster configuration even if there"
765
                                        " are failures during the removal"
766
                                        " process")
767

    
768
REMOVE_INSTANCE_OPT = cli_option("--remove-instance", dest="remove_instance",
769
                                 action="store_true", default=False,
770
                                 help="Remove the instance from the cluster")
771

    
772
NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
773
                               help="Specifies the new secondary node",
774
                               metavar="NODE", default=None,
775
                               completion_suggest=OPT_COMPL_ONE_NODE)
776

    
777
ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
778
                            default=False, action="store_true",
779
                            help="Replace the disk(s) on the primary"
780
                            " node (only for the drbd template)")
781

    
782
ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
783
                              default=False, action="store_true",
784
                              help="Replace the disk(s) on the secondary"
785
                              " node (only for the drbd template)")
786

    
787
AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
788
                              default=False, action="store_true",
789
                              help="Lock all nodes and auto-promote as needed"
790
                              " to MC status")
791

    
792
AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
793
                              default=False, action="store_true",
794
                              help="Automatically replace faulty disks"
795
                              " (only for the drbd template)")
796

    
797
IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
798
                             default=False, action="store_true",
799
                             help="Ignore current recorded size"
800
                             " (useful for forcing activation when"
801
                             " the recorded size is wrong)")
802

    
803
SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
804
                          metavar="<node>",
805
                          completion_suggest=OPT_COMPL_ONE_NODE)
806

    
807
SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
808
                         metavar="<dir>")
809

    
810
SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
811
                              help="Specify the secondary ip for the node",
812
                              metavar="ADDRESS", default=None)
813

    
814
READD_OPT = cli_option("--readd", dest="readd",
815
                       default=False, action="store_true",
816
                       help="Readd old node after replacing it")
817

    
818
NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
819
                                default=True, action="store_false",
820
                                help="Disable SSH key fingerprint checking")
821

    
822

    
823
MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
824
                    type="bool", default=None, metavar=_YORNO,
825
                    help="Set the master_candidate flag on the node")
826

    
827
OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
828
                         type="bool", default=None,
829
                         help="Set the offline flag on the node")
830

    
831
DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
832
                         type="bool", default=None,
833
                         help="Set the drained flag on the node")
834

    
835
ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
836
                             type="bool", default=None, metavar=_YORNO,
837
                             help="Set the allocatable flag on a volume")
838

    
839
NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
840
                               help="Disable support for lvm based instances"
841
                               " (cluster-wide)",
842
                               action="store_false", default=True)
843

    
844
ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
845
                            dest="enabled_hypervisors",
846
                            help="Comma-separated list of hypervisors",
847
                            type="string", default=None)
848

    
849
NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
850
                            type="keyval", default={},
851
                            help="NIC parameters")
852

    
853
CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
854
                         dest="candidate_pool_size", type="int",
855
                         help="Set the candidate pool size")
856

    
857
VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name",
858
                         help="Enables LVM and specifies the volume group"
859
                         " name (cluster-wide) for disk allocation [xenvg]",
860
                         metavar="VG", default=None)
861

    
862
YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
863
                          help="Destroy cluster", action="store_true")
864

    
865
NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
866
                          help="Skip node agreement check (dangerous)",
867
                          action="store_true", default=False)
868

    
869
MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
870
                            help="Specify the mac prefix for the instance IP"
871
                            " addresses, in the format XX:XX:XX",
872
                            metavar="PREFIX",
873
                            default=None)
874

    
875
MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
876
                               help="Specify the node interface (cluster-wide)"
877
                               " on which the master IP address will be added "
878
                               " [%s]" % constants.DEFAULT_BRIDGE,
879
                               metavar="NETDEV",
880
                               default=constants.DEFAULT_BRIDGE)
881

    
882
GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
883
                                help="Specify the default directory (cluster-"
884
                                "wide) for storing the file-based disks [%s]" %
885
                                constants.DEFAULT_FILE_STORAGE_DIR,
886
                                metavar="DIR",
887
                                default=constants.DEFAULT_FILE_STORAGE_DIR)
888

    
889
NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
890
                                   help="Don't modify /etc/hosts",
891
                                   action="store_false", default=True)
892

    
893
NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
894
                                    help="Don't initialize SSH keys",
895
                                    action="store_false", default=True)
896

    
897
ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
898
                             help="Enable parseable error messages",
899
                             action="store_true", default=False)
900

    
901
NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
902
                          help="Skip N+1 memory redundancy tests",
903
                          action="store_true", default=False)
904

    
905
REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
906
                             help="Type of reboot: soft/hard/full",
907
                             default=constants.INSTANCE_REBOOT_HARD,
908
                             metavar="<REBOOT>",
909
                             choices=list(constants.REBOOT_TYPES))
910

    
911
IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
912
                                    dest="ignore_secondaries",
913
                                    default=False, action="store_true",
914
                                    help="Ignore errors from secondaries")
915

    
916
NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
917
                            action="store_false", default=True,
918
                            help="Don't shutdown the instance (unsafe)")
919

    
920
TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
921
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
922
                         help="Maximum time to wait")
923

    
924
SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
925
                         dest="shutdown_timeout", type="int",
926
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
927
                         help="Maximum time to wait for instance shutdown")
928

    
929
EARLY_RELEASE_OPT = cli_option("--early-release",
930
                               dest="early_release", default=False,
931
                               action="store_true",
932
                               help="Release the locks on the secondary"
933
                               " node(s) early")
934

    
935
NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
936
                                  dest="new_cluster_cert",
937
                                  default=False, action="store_true",
938
                                  help="Generate a new cluster certificate")
939

    
940
RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
941
                           default=None,
942
                           help="File containing new RAPI certificate")
943

    
944
NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
945
                               default=None, action="store_true",
946
                               help=("Generate a new self-signed RAPI"
947
                                     " certificate"))
948

    
949
NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
950
                                    dest="new_confd_hmac_key",
951
                                    default=False, action="store_true",
952
                                    help=("Create a new HMAC key for %s" %
953
                                          constants.CONFD))
954

    
955
CLUSTER_DOMAIN_SECRET_OPT = cli_option("--cluster-domain-secret",
956
                                       dest="cluster_domain_secret",
957
                                       default=None,
958
                                       help=("Load new new cluster domain"
959
                                             " secret from file"))
960

    
961
NEW_CLUSTER_DOMAIN_SECRET_OPT = cli_option("--new-cluster-domain-secret",
962
                                           dest="new_cluster_domain_secret",
963
                                           default=False, action="store_true",
964
                                           help=("Create a new cluster domain"
965
                                                 " secret"))
966

    
967
USE_REPL_NET_OPT = cli_option("--use-replication-network",
968
                              dest="use_replication_network",
969
                              help="Whether to use the replication network"
970
                              " for talking to the nodes",
971
                              action="store_true", default=False)
972

    
973
MAINTAIN_NODE_HEALTH_OPT = \
974
    cli_option("--maintain-node-health", dest="maintain_node_health",
975
               metavar=_YORNO, default=None, type="bool",
976
               help="Configure the cluster to automatically maintain node"
977
               " health, by shutting down unknown instances, shutting down"
978
               " unknown DRBD devices, etc.")
979

    
980
IDENTIFY_DEFAULTS_OPT = \
981
    cli_option("--identify-defaults", dest="identify_defaults",
982
               default=False, action="store_true",
983
               help="Identify which saved instance parameters are equal to"
984
               " the current cluster defaults and set them as such, instead"
985
               " of marking them as overridden")
986

    
987
UIDPOOL_OPT = cli_option("--uid-pool", default=None,
988
                         action="store", dest="uid_pool",
989
                         help=("A list of user-ids or user-id"
990
                               " ranges separated by commas"))
991

    
992
ADD_UIDS_OPT = cli_option("--add-uids", default=None,
993
                          action="store", dest="add_uids",
994
                          help=("A list of user-ids or user-id"
995
                                " ranges separated by commas, to be"
996
                                " added to the user-id pool"))
997

    
998
REMOVE_UIDS_OPT = cli_option("--remove-uids", default=None,
999
                             action="store", dest="remove_uids",
1000
                             help=("A list of user-ids or user-id"
1001
                                   " ranges separated by commas, to be"
1002
                                   " removed from the user-id pool"))
1003

    
1004
ROMAN_OPT = cli_option("--roman",
1005
                       dest="roman_integers", default=False,
1006
                       action="store_true",
1007
                       help="Use roman numbers for positive integers")
1008

    
1009
DRBD_HELPER_OPT = cli_option("--drbd-usermode-helper", dest="drbd_helper",
1010
                             action="store", default=None,
1011
                             help="Specifies usermode helper for DRBD")
1012

    
1013
NODRBD_STORAGE_OPT = cli_option("--no-drbd-storage", dest="drbd_storage",
1014
                                action="store_false", default=True,
1015
                                help="Disable support for DRBD")
1016

    
1017

    
1018
def _ParseArgs(argv, commands, aliases):
1019
  """Parser for the command line arguments.
1020

1021
  This function parses the arguments and returns the function which
1022
  must be executed together with its (modified) arguments.
1023

1024
  @param argv: the command line
1025
  @param commands: dictionary with special contents, see the design
1026
      doc for cmdline handling
1027
  @param aliases: dictionary with command aliases {'alias': 'target, ...}
1028

1029
  """
1030
  if len(argv) == 0:
1031
    binary = "<command>"
1032
  else:
1033
    binary = argv[0].split("/")[-1]
1034

    
1035
  if len(argv) > 1 and argv[1] == "--version":
1036
    ToStdout("%s (ganeti %s) %s", binary, constants.VCS_VERSION,
1037
             constants.RELEASE_VERSION)
1038
    # Quit right away. That way we don't have to care about this special
1039
    # argument. optparse.py does it the same.
1040
    sys.exit(0)
1041

    
1042
  if len(argv) < 2 or not (argv[1] in commands or
1043
                           argv[1] in aliases):
1044
    # let's do a nice thing
1045
    sortedcmds = commands.keys()
1046
    sortedcmds.sort()
1047

    
1048
    ToStdout("Usage: %s {command} [options...] [argument...]", binary)
1049
    ToStdout("%s <command> --help to see details, or man %s", binary, binary)
1050
    ToStdout("")
1051

    
1052
    # compute the max line length for cmd + usage
1053
    mlen = max([len(" %s" % cmd) for cmd in commands])
1054
    mlen = min(60, mlen) # should not get here...
1055

    
1056
    # and format a nice command list
1057
    ToStdout("Commands:")
1058
    for cmd in sortedcmds:
1059
      cmdstr = " %s" % (cmd,)
1060
      help_text = commands[cmd][4]
1061
      help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
1062
      ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
1063
      for line in help_lines:
1064
        ToStdout("%-*s   %s", mlen, "", line)
1065

    
1066
    ToStdout("")
1067

    
1068
    return None, None, None
1069

    
1070
  # get command, unalias it, and look it up in commands
1071
  cmd = argv.pop(1)
1072
  if cmd in aliases:
1073
    if cmd in commands:
1074
      raise errors.ProgrammerError("Alias '%s' overrides an existing"
1075
                                   " command" % cmd)
1076

    
1077
    if aliases[cmd] not in commands:
1078
      raise errors.ProgrammerError("Alias '%s' maps to non-existing"
1079
                                   " command '%s'" % (cmd, aliases[cmd]))
1080

    
1081
    cmd = aliases[cmd]
1082

    
1083
  func, args_def, parser_opts, usage, description = commands[cmd]
1084
  parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT, DEBUG_OPT],
1085
                        description=description,
1086
                        formatter=TitledHelpFormatter(),
1087
                        usage="%%prog %s %s" % (cmd, usage))
1088
  parser.disable_interspersed_args()
1089
  options, args = parser.parse_args()
1090

    
1091
  if not _CheckArguments(cmd, args_def, args):
1092
    return None, None, None
1093

    
1094
  return func, options, args
1095

    
1096

    
1097
def _CheckArguments(cmd, args_def, args):
1098
  """Verifies the arguments using the argument definition.
1099

1100
  Algorithm:
1101

1102
    1. Abort with error if values specified by user but none expected.
1103

1104
    1. For each argument in definition
1105

1106
      1. Keep running count of minimum number of values (min_count)
1107
      1. Keep running count of maximum number of values (max_count)
1108
      1. If it has an unlimited number of values
1109

1110
        1. Abort with error if it's not the last argument in the definition
1111

1112
    1. If last argument has limited number of values
1113

1114
      1. Abort with error if number of values doesn't match or is too large
1115

1116
    1. Abort with error if user didn't pass enough values (min_count)
1117

1118
  """
1119
  if args and not args_def:
1120
    ToStderr("Error: Command %s expects no arguments", cmd)
1121
    return False
1122

    
1123
  min_count = None
1124
  max_count = None
1125
  check_max = None
1126

    
1127
  last_idx = len(args_def) - 1
1128

    
1129
  for idx, arg in enumerate(args_def):
1130
    if min_count is None:
1131
      min_count = arg.min
1132
    elif arg.min is not None:
1133
      min_count += arg.min
1134

    
1135
    if max_count is None:
1136
      max_count = arg.max
1137
    elif arg.max is not None:
1138
      max_count += arg.max
1139

    
1140
    if idx == last_idx:
1141
      check_max = (arg.max is not None)
1142

    
1143
    elif arg.max is None:
1144
      raise errors.ProgrammerError("Only the last argument can have max=None")
1145

    
1146
  if check_max:
1147
    # Command with exact number of arguments
1148
    if (min_count is not None and max_count is not None and
1149
        min_count == max_count and len(args) != min_count):
1150
      ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
1151
      return False
1152

    
1153
    # Command with limited number of arguments
1154
    if max_count is not None and len(args) > max_count:
1155
      ToStderr("Error: Command %s expects only %d argument(s)",
1156
               cmd, max_count)
1157
      return False
1158

    
1159
  # Command with some required arguments
1160
  if min_count is not None and len(args) < min_count:
1161
    ToStderr("Error: Command %s expects at least %d argument(s)",
1162
             cmd, min_count)
1163
    return False
1164

    
1165
  return True
1166

    
1167

    
1168
def SplitNodeOption(value):
1169
  """Splits the value of a --node option.
1170

1171
  """
1172
  if value and ':' in value:
1173
    return value.split(':', 1)
1174
  else:
1175
    return (value, None)
1176

    
1177

    
1178
def CalculateOSNames(os_name, os_variants):
1179
  """Calculates all the names an OS can be called, according to its variants.
1180

1181
  @type os_name: string
1182
  @param os_name: base name of the os
1183
  @type os_variants: list or None
1184
  @param os_variants: list of supported variants
1185
  @rtype: list
1186
  @return: list of valid names
1187

1188
  """
1189
  if os_variants:
1190
    return ['%s+%s' % (os_name, v) for v in os_variants]
1191
  else:
1192
    return [os_name]
1193

    
1194

    
1195
def UsesRPC(fn):
1196
  def wrapper(*args, **kwargs):
1197
    rpc.Init()
1198
    try:
1199
      return fn(*args, **kwargs)
1200
    finally:
1201
      rpc.Shutdown()
1202
  return wrapper
1203

    
1204

    
1205
def AskUser(text, choices=None):
1206
  """Ask the user a question.
1207

1208
  @param text: the question to ask
1209

1210
  @param choices: list with elements tuples (input_char, return_value,
1211
      description); if not given, it will default to: [('y', True,
1212
      'Perform the operation'), ('n', False, 'Do no do the operation')];
1213
      note that the '?' char is reserved for help
1214

1215
  @return: one of the return values from the choices list; if input is
1216
      not possible (i.e. not running with a tty, we return the last
1217
      entry from the list
1218

1219
  """
1220
  if choices is None:
1221
    choices = [('y', True, 'Perform the operation'),
1222
               ('n', False, 'Do not perform the operation')]
1223
  if not choices or not isinstance(choices, list):
1224
    raise errors.ProgrammerError("Invalid choices argument to AskUser")
1225
  for entry in choices:
1226
    if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
1227
      raise errors.ProgrammerError("Invalid choices element to AskUser")
1228

    
1229
  answer = choices[-1][1]
1230
  new_text = []
1231
  for line in text.splitlines():
1232
    new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
1233
  text = "\n".join(new_text)
1234
  try:
1235
    f = file("/dev/tty", "a+")
1236
  except IOError:
1237
    return answer
1238
  try:
1239
    chars = [entry[0] for entry in choices]
1240
    chars[-1] = "[%s]" % chars[-1]
1241
    chars.append('?')
1242
    maps = dict([(entry[0], entry[1]) for entry in choices])
1243
    while True:
1244
      f.write(text)
1245
      f.write('\n')
1246
      f.write("/".join(chars))
1247
      f.write(": ")
1248
      line = f.readline(2).strip().lower()
1249
      if line in maps:
1250
        answer = maps[line]
1251
        break
1252
      elif line == '?':
1253
        for entry in choices:
1254
          f.write(" %s - %s\n" % (entry[0], entry[2]))
1255
        f.write("\n")
1256
        continue
1257
  finally:
1258
    f.close()
1259
  return answer
1260

    
1261

    
1262
class JobSubmittedException(Exception):
1263
  """Job was submitted, client should exit.
1264

1265
  This exception has one argument, the ID of the job that was
1266
  submitted. The handler should print this ID.
1267

1268
  This is not an error, just a structured way to exit from clients.
1269

1270
  """
1271

    
1272

    
1273
def SendJob(ops, cl=None):
1274
  """Function to submit an opcode without waiting for the results.
1275

1276
  @type ops: list
1277
  @param ops: list of opcodes
1278
  @type cl: luxi.Client
1279
  @param cl: the luxi client to use for communicating with the master;
1280
             if None, a new client will be created
1281

1282
  """
1283
  if cl is None:
1284
    cl = GetClient()
1285

    
1286
  job_id = cl.SubmitJob(ops)
1287

    
1288
  return job_id
1289

    
1290

    
1291
def GenericPollJob(job_id, cbs, report_cbs):
1292
  """Generic job-polling function.
1293

1294
  @type job_id: number
1295
  @param job_id: Job ID
1296
  @type cbs: Instance of L{JobPollCbBase}
1297
  @param cbs: Data callbacks
1298
  @type report_cbs: Instance of L{JobPollReportCbBase}
1299
  @param report_cbs: Reporting callbacks
1300

1301
  """
1302
  prev_job_info = None
1303
  prev_logmsg_serial = None
1304

    
1305
  status = None
1306

    
1307
  while True:
1308
    result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1309
                                      prev_logmsg_serial)
1310
    if not result:
1311
      # job not found, go away!
1312
      raise errors.JobLost("Job with id %s lost" % job_id)
1313

    
1314
    if result == constants.JOB_NOTCHANGED:
1315
      report_cbs.ReportNotChanged(job_id, status)
1316

    
1317
      # Wait again
1318
      continue
1319

    
1320
    # Split result, a tuple of (field values, log entries)
1321
    (job_info, log_entries) = result
1322
    (status, ) = job_info
1323

    
1324
    if log_entries:
1325
      for log_entry in log_entries:
1326
        (serial, timestamp, log_type, message) = log_entry
1327
        report_cbs.ReportLogMessage(job_id, serial, timestamp,
1328
                                    log_type, message)
1329
        prev_logmsg_serial = max(prev_logmsg_serial, serial)
1330

    
1331
    # TODO: Handle canceled and archived jobs
1332
    elif status in (constants.JOB_STATUS_SUCCESS,
1333
                    constants.JOB_STATUS_ERROR,
1334
                    constants.JOB_STATUS_CANCELING,
1335
                    constants.JOB_STATUS_CANCELED):
1336
      break
1337

    
1338
    prev_job_info = job_info
1339

    
1340
  jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1341
  if not jobs:
1342
    raise errors.JobLost("Job with id %s lost" % job_id)
1343

    
1344
  status, opstatus, result = jobs[0]
1345

    
1346
  if status == constants.JOB_STATUS_SUCCESS:
1347
    return result
1348

    
1349
  if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
1350
    raise errors.OpExecError("Job was canceled")
1351

    
1352
  has_ok = False
1353
  for idx, (status, msg) in enumerate(zip(opstatus, result)):
1354
    if status == constants.OP_STATUS_SUCCESS:
1355
      has_ok = True
1356
    elif status == constants.OP_STATUS_ERROR:
1357
      errors.MaybeRaise(msg)
1358

    
1359
      if has_ok:
1360
        raise errors.OpExecError("partial failure (opcode %d): %s" %
1361
                                 (idx, msg))
1362

    
1363
      raise errors.OpExecError(str(msg))
1364

    
1365
  # default failure mode
1366
  raise errors.OpExecError(result)
1367

    
1368

    
1369
class JobPollCbBase:
1370
  """Base class for L{GenericPollJob} callbacks.
1371

1372
  """
1373
  def __init__(self):
1374
    """Initializes this class.
1375

1376
    """
1377

    
1378
  def WaitForJobChangeOnce(self, job_id, fields,
1379
                           prev_job_info, prev_log_serial):
1380
    """Waits for changes on a job.
1381

1382
    """
1383
    raise NotImplementedError()
1384

    
1385
  def QueryJobs(self, job_ids, fields):
1386
    """Returns the selected fields for the selected job IDs.
1387

1388
    @type job_ids: list of numbers
1389
    @param job_ids: Job IDs
1390
    @type fields: list of strings
1391
    @param fields: Fields
1392

1393
    """
1394
    raise NotImplementedError()
1395

    
1396

    
1397
class JobPollReportCbBase:
1398
  """Base class for L{GenericPollJob} reporting callbacks.
1399

1400
  """
1401
  def __init__(self):
1402
    """Initializes this class.
1403

1404
    """
1405

    
1406
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1407
    """Handles a log message.
1408

1409
    """
1410
    raise NotImplementedError()
1411

    
1412
  def ReportNotChanged(self, job_id, status):
1413
    """Called for if a job hasn't changed in a while.
1414

1415
    @type job_id: number
1416
    @param job_id: Job ID
1417
    @type status: string or None
1418
    @param status: Job status if available
1419

1420
    """
1421
    raise NotImplementedError()
1422

    
1423

    
1424
class _LuxiJobPollCb(JobPollCbBase):
1425
  def __init__(self, cl):
1426
    """Initializes this class.
1427

1428
    """
1429
    JobPollCbBase.__init__(self)
1430
    self.cl = cl
1431

    
1432
  def WaitForJobChangeOnce(self, job_id, fields,
1433
                           prev_job_info, prev_log_serial):
1434
    """Waits for changes on a job.
1435

1436
    """
1437
    return self.cl.WaitForJobChangeOnce(job_id, fields,
1438
                                        prev_job_info, prev_log_serial)
1439

    
1440
  def QueryJobs(self, job_ids, fields):
1441
    """Returns the selected fields for the selected job IDs.
1442

1443
    """
1444
    return self.cl.QueryJobs(job_ids, fields)
1445

    
1446

    
1447
class FeedbackFnJobPollReportCb(JobPollReportCbBase):
1448
  def __init__(self, feedback_fn):
1449
    """Initializes this class.
1450

1451
    """
1452
    JobPollReportCbBase.__init__(self)
1453

    
1454
    self.feedback_fn = feedback_fn
1455

    
1456
    assert callable(feedback_fn)
1457

    
1458
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1459
    """Handles a log message.
1460

1461
    """
1462
    self.feedback_fn((timestamp, log_type, log_msg))
1463

    
1464
  def ReportNotChanged(self, job_id, status):
1465
    """Called if a job hasn't changed in a while.
1466

1467
    """
1468
    # Ignore
1469

    
1470

    
1471
class StdioJobPollReportCb(JobPollReportCbBase):
1472
  def __init__(self):
1473
    """Initializes this class.
1474

1475
    """
1476
    JobPollReportCbBase.__init__(self)
1477

    
1478
    self.notified_queued = False
1479
    self.notified_waitlock = False
1480

    
1481
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1482
    """Handles a log message.
1483

1484
    """
1485
    ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
1486
             FormatLogMessage(log_type, log_msg))
1487

    
1488
  def ReportNotChanged(self, job_id, status):
1489
    """Called if a job hasn't changed in a while.
1490

1491
    """
1492
    if status is None:
1493
      return
1494

    
1495
    if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
1496
      ToStderr("Job %s is waiting in queue", job_id)
1497
      self.notified_queued = True
1498

    
1499
    elif status == constants.JOB_STATUS_WAITLOCK and not self.notified_waitlock:
1500
      ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1501
      self.notified_waitlock = True
1502

    
1503

    
1504
def FormatLogMessage(log_type, log_msg):
1505
  """Formats a job message according to its type.
1506

1507
  """
1508
  if log_type != constants.ELOG_MESSAGE:
1509
    log_msg = str(log_msg)
1510

    
1511
  return utils.SafeEncode(log_msg)
1512

    
1513

    
1514
def PollJob(job_id, cl=None, feedback_fn=None, reporter=None):
1515
  """Function to poll for the result of a job.
1516

1517
  @type job_id: job identified
1518
  @param job_id: the job to poll for results
1519
  @type cl: luxi.Client
1520
  @param cl: the luxi client to use for communicating with the master;
1521
             if None, a new client will be created
1522

1523
  """
1524
  if cl is None:
1525
    cl = GetClient()
1526

    
1527
  if reporter is None:
1528
    if feedback_fn:
1529
      reporter = FeedbackFnJobPollReportCb(feedback_fn)
1530
    else:
1531
      reporter = StdioJobPollReportCb()
1532
  elif feedback_fn:
1533
    raise errors.ProgrammerError("Can't specify reporter and feedback function")
1534

    
1535
  return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
1536

    
1537

    
1538
def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None, reporter=None):
1539
  """Legacy function to submit an opcode.
1540

1541
  This is just a simple wrapper over the construction of the processor
1542
  instance. It should be extended to better handle feedback and
1543
  interaction functions.
1544

1545
  """
1546
  if cl is None:
1547
    cl = GetClient()
1548

    
1549
  SetGenericOpcodeOpts([op], opts)
1550

    
1551
  job_id = SendJob([op], cl)
1552

    
1553
  op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn,
1554
                       reporter=reporter)
1555

    
1556
  return op_results[0]
1557

    
1558

    
1559
def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1560
  """Wrapper around SubmitOpCode or SendJob.
1561

1562
  This function will decide, based on the 'opts' parameter, whether to
1563
  submit and wait for the result of the opcode (and return it), or
1564
  whether to just send the job and print its identifier. It is used in
1565
  order to simplify the implementation of the '--submit' option.
1566

1567
  It will also process the opcodes if we're sending the via SendJob
1568
  (otherwise SubmitOpCode does it).
1569

1570
  """
1571
  if opts and opts.submit_only:
1572
    job = [op]
1573
    SetGenericOpcodeOpts(job, opts)
1574
    job_id = SendJob(job, cl=cl)
1575
    raise JobSubmittedException(job_id)
1576
  else:
1577
    return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1578

    
1579

    
1580
def SetGenericOpcodeOpts(opcode_list, options):
1581
  """Processor for generic options.
1582

1583
  This function updates the given opcodes based on generic command
1584
  line options (like debug, dry-run, etc.).
1585

1586
  @param opcode_list: list of opcodes
1587
  @param options: command line options or None
1588
  @return: None (in-place modification)
1589

1590
  """
1591
  if not options:
1592
    return
1593
  for op in opcode_list:
1594
    op.dry_run = options.dry_run
1595
    op.debug_level = options.debug
1596

    
1597

    
1598
def GetClient():
1599
  # TODO: Cache object?
1600
  try:
1601
    client = luxi.Client()
1602
  except luxi.NoMasterError:
1603
    ss = ssconf.SimpleStore()
1604

    
1605
    # Try to read ssconf file
1606
    try:
1607
      ss.GetMasterNode()
1608
    except errors.ConfigurationError:
1609
      raise errors.OpPrereqError("Cluster not initialized or this machine is"
1610
                                 " not part of a cluster")
1611

    
1612
    master, myself = ssconf.GetMasterAndMyself(ss=ss)
1613
    if master != myself:
1614
      raise errors.OpPrereqError("This is not the master node, please connect"
1615
                                 " to node '%s' and rerun the command" %
1616
                                 master)
1617
    raise
1618
  return client
1619

    
1620

    
1621
def FormatError(err):
1622
  """Return a formatted error message for a given error.
1623

1624
  This function takes an exception instance and returns a tuple
1625
  consisting of two values: first, the recommended exit code, and
1626
  second, a string describing the error message (not
1627
  newline-terminated).
1628

1629
  """
1630
  retcode = 1
1631
  obuf = StringIO()
1632
  msg = str(err)
1633
  if isinstance(err, errors.ConfigurationError):
1634
    txt = "Corrupt configuration file: %s" % msg
1635
    logging.error(txt)
1636
    obuf.write(txt + "\n")
1637
    obuf.write("Aborting.")
1638
    retcode = 2
1639
  elif isinstance(err, errors.HooksAbort):
1640
    obuf.write("Failure: hooks execution failed:\n")
1641
    for node, script, out in err.args[0]:
1642
      if out:
1643
        obuf.write("  node: %s, script: %s, output: %s\n" %
1644
                   (node, script, out))
1645
      else:
1646
        obuf.write("  node: %s, script: %s (no output)\n" %
1647
                   (node, script))
1648
  elif isinstance(err, errors.HooksFailure):
1649
    obuf.write("Failure: hooks general failure: %s" % msg)
1650
  elif isinstance(err, errors.ResolverError):
1651
    this_host = netutils.HostInfo.SysName()
1652
    if err.args[0] == this_host:
1653
      msg = "Failure: can't resolve my own hostname ('%s')"
1654
    else:
1655
      msg = "Failure: can't resolve hostname '%s'"
1656
    obuf.write(msg % err.args[0])
1657
  elif isinstance(err, errors.OpPrereqError):
1658
    if len(err.args) == 2:
1659
      obuf.write("Failure: prerequisites not met for this"
1660
               " operation:\nerror type: %s, error details:\n%s" %
1661
                 (err.args[1], err.args[0]))
1662
    else:
1663
      obuf.write("Failure: prerequisites not met for this"
1664
                 " operation:\n%s" % msg)
1665
  elif isinstance(err, errors.OpExecError):
1666
    obuf.write("Failure: command execution error:\n%s" % msg)
1667
  elif isinstance(err, errors.TagError):
1668
    obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1669
  elif isinstance(err, errors.JobQueueDrainError):
1670
    obuf.write("Failure: the job queue is marked for drain and doesn't"
1671
               " accept new requests\n")
1672
  elif isinstance(err, errors.JobQueueFull):
1673
    obuf.write("Failure: the job queue is full and doesn't accept new"
1674
               " job submissions until old jobs are archived\n")
1675
  elif isinstance(err, errors.TypeEnforcementError):
1676
    obuf.write("Parameter Error: %s" % msg)
1677
  elif isinstance(err, errors.ParameterError):
1678
    obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1679
  elif isinstance(err, luxi.NoMasterError):
1680
    obuf.write("Cannot communicate with the master daemon.\nIs it running"
1681
               " and listening for connections?")
1682
  elif isinstance(err, luxi.TimeoutError):
1683
    obuf.write("Timeout while talking to the master daemon. Error:\n"
1684
               "%s" % msg)
1685
  elif isinstance(err, luxi.ProtocolError):
1686
    obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1687
               "%s" % msg)
1688
  elif isinstance(err, errors.GenericError):
1689
    obuf.write("Unhandled Ganeti error: %s" % msg)
1690
  elif isinstance(err, JobSubmittedException):
1691
    obuf.write("JobID: %s\n" % err.args[0])
1692
    retcode = 0
1693
  else:
1694
    obuf.write("Unhandled exception: %s" % msg)
1695
  return retcode, obuf.getvalue().rstrip('\n')
1696

    
1697

    
1698
def GenericMain(commands, override=None, aliases=None):
1699
  """Generic main function for all the gnt-* commands.
1700

1701
  Arguments:
1702
    - commands: a dictionary with a special structure, see the design doc
1703
                for command line handling.
1704
    - override: if not None, we expect a dictionary with keys that will
1705
                override command line options; this can be used to pass
1706
                options from the scripts to generic functions
1707
    - aliases: dictionary with command aliases {'alias': 'target, ...}
1708

1709
  """
1710
  # save the program name and the entire command line for later logging
1711
  if sys.argv:
1712
    binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1713
    if len(sys.argv) >= 2:
1714
      binary += " " + sys.argv[1]
1715
      old_cmdline = " ".join(sys.argv[2:])
1716
    else:
1717
      old_cmdline = ""
1718
  else:
1719
    binary = "<unknown program>"
1720
    old_cmdline = ""
1721

    
1722
  if aliases is None:
1723
    aliases = {}
1724

    
1725
  try:
1726
    func, options, args = _ParseArgs(sys.argv, commands, aliases)
1727
  except errors.ParameterError, err:
1728
    result, err_msg = FormatError(err)
1729
    ToStderr(err_msg)
1730
    return 1
1731

    
1732
  if func is None: # parse error
1733
    return 1
1734

    
1735
  if override is not None:
1736
    for key, val in override.iteritems():
1737
      setattr(options, key, val)
1738

    
1739
  utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1740
                     stderr_logging=True, program=binary)
1741

    
1742
  if old_cmdline:
1743
    logging.info("run with arguments '%s'", old_cmdline)
1744
  else:
1745
    logging.info("run with no arguments")
1746

    
1747
  try:
1748
    result = func(options, args)
1749
  except (errors.GenericError, luxi.ProtocolError,
1750
          JobSubmittedException), err:
1751
    result, err_msg = FormatError(err)
1752
    logging.exception("Error during command processing")
1753
    ToStderr(err_msg)
1754

    
1755
  return result
1756

    
1757

    
1758
def GenericInstanceCreate(mode, opts, args):
1759
  """Add an instance to the cluster via either creation or import.
1760

1761
  @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1762
  @param opts: the command line options selected by the user
1763
  @type args: list
1764
  @param args: should contain only one element, the new instance name
1765
  @rtype: int
1766
  @return: the desired exit code
1767

1768
  """
1769
  instance = args[0]
1770

    
1771
  (pnode, snode) = SplitNodeOption(opts.node)
1772

    
1773
  hypervisor = None
1774
  hvparams = {}
1775
  if opts.hypervisor:
1776
    hypervisor, hvparams = opts.hypervisor
1777

    
1778
  if opts.nics:
1779
    try:
1780
      nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
1781
    except ValueError, err:
1782
      raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
1783
    nics = [{}] * nic_max
1784
    for nidx, ndict in opts.nics:
1785
      nidx = int(nidx)
1786
      if not isinstance(ndict, dict):
1787
        msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict)
1788
        raise errors.OpPrereqError(msg)
1789
      nics[nidx] = ndict
1790
  elif opts.no_nics:
1791
    # no nics
1792
    nics = []
1793
  elif mode == constants.INSTANCE_CREATE:
1794
    # default of one nic, all auto
1795
    nics = [{}]
1796
  else:
1797
    # mode == import
1798
    nics = []
1799

    
1800
  if opts.disk_template == constants.DT_DISKLESS:
1801
    if opts.disks or opts.sd_size is not None:
1802
      raise errors.OpPrereqError("Diskless instance but disk"
1803
                                 " information passed")
1804
    disks = []
1805
  else:
1806
    if (not opts.disks and not opts.sd_size
1807
        and mode == constants.INSTANCE_CREATE):
1808
      raise errors.OpPrereqError("No disk information specified")
1809
    if opts.disks and opts.sd_size is not None:
1810
      raise errors.OpPrereqError("Please use either the '--disk' or"
1811
                                 " '-s' option")
1812
    if opts.sd_size is not None:
1813
      opts.disks = [(0, {"size": opts.sd_size})]
1814

    
1815
    if opts.disks:
1816
      try:
1817
        disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1818
      except ValueError, err:
1819
        raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
1820
      disks = [{}] * disk_max
1821
    else:
1822
      disks = []
1823
    for didx, ddict in opts.disks:
1824
      didx = int(didx)
1825
      if not isinstance(ddict, dict):
1826
        msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1827
        raise errors.OpPrereqError(msg)
1828
      elif "size" in ddict:
1829
        if "adopt" in ddict:
1830
          raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1831
                                     " (disk %d)" % didx)
1832
        try:
1833
          ddict["size"] = utils.ParseUnit(ddict["size"])
1834
        except ValueError, err:
1835
          raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1836
                                     (didx, err))
1837
      elif "adopt" in ddict:
1838
        if mode == constants.INSTANCE_IMPORT:
1839
          raise errors.OpPrereqError("Disk adoption not allowed for instance"
1840
                                     " import")
1841
        ddict["size"] = 0
1842
      else:
1843
        raise errors.OpPrereqError("Missing size or adoption source for"
1844
                                   " disk %d" % didx)
1845
      disks[didx] = ddict
1846

    
1847
  utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
1848
  utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1849

    
1850
  if mode == constants.INSTANCE_CREATE:
1851
    start = opts.start
1852
    os_type = opts.os
1853
    force_variant = opts.force_variant
1854
    src_node = None
1855
    src_path = None
1856
    no_install = opts.no_install
1857
    identify_defaults = False
1858
  elif mode == constants.INSTANCE_IMPORT:
1859
    start = False
1860
    os_type = None
1861
    force_variant = False
1862
    src_node = opts.src_node
1863
    src_path = opts.src_dir
1864
    no_install = None
1865
    identify_defaults = opts.identify_defaults
1866
  else:
1867
    raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1868

    
1869
  op = opcodes.OpCreateInstance(instance_name=instance,
1870
                                disks=disks,
1871
                                disk_template=opts.disk_template,
1872
                                nics=nics,
1873
                                pnode=pnode, snode=snode,
1874
                                ip_check=opts.ip_check,
1875
                                name_check=opts.name_check,
1876
                                wait_for_sync=opts.wait_for_sync,
1877
                                file_storage_dir=opts.file_storage_dir,
1878
                                file_driver=opts.file_driver,
1879
                                iallocator=opts.iallocator,
1880
                                hypervisor=hypervisor,
1881
                                hvparams=hvparams,
1882
                                beparams=opts.beparams,
1883
                                osparams=opts.osparams,
1884
                                mode=mode,
1885
                                start=start,
1886
                                os_type=os_type,
1887
                                force_variant=force_variant,
1888
                                src_node=src_node,
1889
                                src_path=src_path,
1890
                                no_install=no_install,
1891
                                identify_defaults=identify_defaults)
1892

    
1893
  SubmitOrSend(op, opts)
1894
  return 0
1895

    
1896

    
1897
class _RunWhileClusterStoppedHelper:
1898
  """Helper class for L{RunWhileClusterStopped} to simplify state management
1899

1900
  """
1901
  def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
1902
    """Initializes this class.
1903

1904
    @type feedback_fn: callable
1905
    @param feedback_fn: Feedback function
1906
    @type cluster_name: string
1907
    @param cluster_name: Cluster name
1908
    @type master_node: string
1909
    @param master_node Master node name
1910
    @type online_nodes: list
1911
    @param online_nodes: List of names of online nodes
1912

1913
    """
1914
    self.feedback_fn = feedback_fn
1915
    self.cluster_name = cluster_name
1916
    self.master_node = master_node
1917
    self.online_nodes = online_nodes
1918

    
1919
    self.ssh = ssh.SshRunner(self.cluster_name)
1920

    
1921
    self.nonmaster_nodes = [name for name in online_nodes
1922
                            if name != master_node]
1923

    
1924
    assert self.master_node not in self.nonmaster_nodes
1925

    
1926
  def _RunCmd(self, node_name, cmd):
1927
    """Runs a command on the local or a remote machine.
1928

1929
    @type node_name: string
1930
    @param node_name: Machine name
1931
    @type cmd: list
1932
    @param cmd: Command
1933

1934
    """
1935
    if node_name is None or node_name == self.master_node:
1936
      # No need to use SSH
1937
      result = utils.RunCmd(cmd)
1938
    else:
1939
      result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
1940

    
1941
    if result.failed:
1942
      errmsg = ["Failed to run command %s" % result.cmd]
1943
      if node_name:
1944
        errmsg.append("on node %s" % node_name)
1945
      errmsg.append(": exitcode %s and error %s" %
1946
                    (result.exit_code, result.output))
1947
      raise errors.OpExecError(" ".join(errmsg))
1948

    
1949
  def Call(self, fn, *args):
1950
    """Call function while all daemons are stopped.
1951

1952
    @type fn: callable
1953
    @param fn: Function to be called
1954

1955
    """
1956
    # Pause watcher by acquiring an exclusive lock on watcher state file
1957
    self.feedback_fn("Blocking watcher")
1958
    watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
1959
    try:
1960
      # TODO: Currently, this just blocks. There's no timeout.
1961
      # TODO: Should it be a shared lock?
1962
      watcher_block.Exclusive(blocking=True)
1963

    
1964
      # Stop master daemons, so that no new jobs can come in and all running
1965
      # ones are finished
1966
      self.feedback_fn("Stopping master daemons")
1967
      self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
1968
      try:
1969
        # Stop daemons on all nodes
1970
        for node_name in self.online_nodes:
1971
          self.feedback_fn("Stopping daemons on %s" % node_name)
1972
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
1973

    
1974
        # All daemons are shut down now
1975
        try:
1976
          return fn(self, *args)
1977
        except Exception, err:
1978
          _, errmsg = FormatError(err)
1979
          logging.exception("Caught exception")
1980
          self.feedback_fn(errmsg)
1981
          raise
1982
      finally:
1983
        # Start cluster again, master node last
1984
        for node_name in self.nonmaster_nodes + [self.master_node]:
1985
          self.feedback_fn("Starting daemons on %s" % node_name)
1986
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
1987
    finally:
1988
      # Resume watcher
1989
      watcher_block.Close()
1990

    
1991

    
1992
def RunWhileClusterStopped(feedback_fn, fn, *args):
1993
  """Calls a function while all cluster daemons are stopped.
1994

1995
  @type feedback_fn: callable
1996
  @param feedback_fn: Feedback function
1997
  @type fn: callable
1998
  @param fn: Function to be called when daemons are stopped
1999

2000
  """
2001
  feedback_fn("Gathering cluster information")
2002

    
2003
  # This ensures we're running on the master daemon
2004
  cl = GetClient()
2005

    
2006
  (cluster_name, master_node) = \
2007
    cl.QueryConfigValues(["cluster_name", "master_node"])
2008

    
2009
  online_nodes = GetOnlineNodes([], cl=cl)
2010

    
2011
  # Don't keep a reference to the client. The master daemon will go away.
2012
  del cl
2013

    
2014
  assert master_node in online_nodes
2015

    
2016
  return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
2017
                                       online_nodes).Call(fn, *args)
2018

    
2019

    
2020
def GenerateTable(headers, fields, separator, data,
2021
                  numfields=None, unitfields=None,
2022
                  units=None):
2023
  """Prints a table with headers and different fields.
2024

2025
  @type headers: dict
2026
  @param headers: dictionary mapping field names to headers for
2027
      the table
2028
  @type fields: list
2029
  @param fields: the field names corresponding to each row in
2030
      the data field
2031
  @param separator: the separator to be used; if this is None,
2032
      the default 'smart' algorithm is used which computes optimal
2033
      field width, otherwise just the separator is used between
2034
      each field
2035
  @type data: list
2036
  @param data: a list of lists, each sublist being one row to be output
2037
  @type numfields: list
2038
  @param numfields: a list with the fields that hold numeric
2039
      values and thus should be right-aligned
2040
  @type unitfields: list
2041
  @param unitfields: a list with the fields that hold numeric
2042
      values that should be formatted with the units field
2043
  @type units: string or None
2044
  @param units: the units we should use for formatting, or None for
2045
      automatic choice (human-readable for non-separator usage, otherwise
2046
      megabytes); this is a one-letter string
2047

2048
  """
2049
  if units is None:
2050
    if separator:
2051
      units = "m"
2052
    else:
2053
      units = "h"
2054

    
2055
  if numfields is None:
2056
    numfields = []
2057
  if unitfields is None:
2058
    unitfields = []
2059

    
2060
  numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
2061
  unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
2062

    
2063
  format_fields = []
2064
  for field in fields:
2065
    if headers and field not in headers:
2066
      # TODO: handle better unknown fields (either revert to old
2067
      # style of raising exception, or deal more intelligently with
2068
      # variable fields)
2069
      headers[field] = field
2070
    if separator is not None:
2071
      format_fields.append("%s")
2072
    elif numfields.Matches(field):
2073
      format_fields.append("%*s")
2074
    else:
2075
      format_fields.append("%-*s")
2076

    
2077
  if separator is None:
2078
    mlens = [0 for name in fields]
2079
    format_str = ' '.join(format_fields)
2080
  else:
2081
    format_str = separator.replace("%", "%%").join(format_fields)
2082

    
2083
  for row in data:
2084
    if row is None:
2085
      continue
2086
    for idx, val in enumerate(row):
2087
      if unitfields.Matches(fields[idx]):
2088
        try:
2089
          val = int(val)
2090
        except (TypeError, ValueError):
2091
          pass
2092
        else:
2093
          val = row[idx] = utils.FormatUnit(val, units)
2094
      val = row[idx] = str(val)
2095
      if separator is None:
2096
        mlens[idx] = max(mlens[idx], len(val))
2097

    
2098
  result = []
2099
  if headers:
2100
    args = []
2101
    for idx, name in enumerate(fields):
2102
      hdr = headers[name]
2103
      if separator is None:
2104
        mlens[idx] = max(mlens[idx], len(hdr))
2105
        args.append(mlens[idx])
2106
      args.append(hdr)
2107
    result.append(format_str % tuple(args))
2108

    
2109
  if separator is None:
2110
    assert len(mlens) == len(fields)
2111

    
2112
    if fields and not numfields.Matches(fields[-1]):
2113
      mlens[-1] = 0
2114

    
2115
  for line in data:
2116
    args = []
2117
    if line is None:
2118
      line = ['-' for _ in fields]
2119
    for idx in range(len(fields)):
2120
      if separator is None:
2121
        args.append(mlens[idx])
2122
      args.append(line[idx])
2123
    result.append(format_str % tuple(args))
2124

    
2125
  return result
2126

    
2127

    
2128
def FormatTimestamp(ts):
2129
  """Formats a given timestamp.
2130

2131
  @type ts: timestamp
2132
  @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
2133

2134
  @rtype: string
2135
  @return: a string with the formatted timestamp
2136

2137
  """
2138
  if not isinstance (ts, (tuple, list)) or len(ts) != 2:
2139
    return '?'
2140
  sec, usec = ts
2141
  return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
2142

    
2143

    
2144
def ParseTimespec(value):
2145
  """Parse a time specification.
2146

2147
  The following suffixed will be recognized:
2148

2149
    - s: seconds
2150
    - m: minutes
2151
    - h: hours
2152
    - d: day
2153
    - w: weeks
2154

2155
  Without any suffix, the value will be taken to be in seconds.
2156

2157
  """
2158
  value = str(value)
2159
  if not value:
2160
    raise errors.OpPrereqError("Empty time specification passed")
2161
  suffix_map = {
2162
    's': 1,
2163
    'm': 60,
2164
    'h': 3600,
2165
    'd': 86400,
2166
    'w': 604800,
2167
    }
2168
  if value[-1] not in suffix_map:
2169
    try:
2170
      value = int(value)
2171
    except (TypeError, ValueError):
2172
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2173
  else:
2174
    multiplier = suffix_map[value[-1]]
2175
    value = value[:-1]
2176
    if not value: # no data left after stripping the suffix
2177
      raise errors.OpPrereqError("Invalid time specification (only"
2178
                                 " suffix passed)")
2179
    try:
2180
      value = int(value) * multiplier
2181
    except (TypeError, ValueError):
2182
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2183
  return value
2184

    
2185

    
2186
def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
2187
                   filter_master=False):
2188
  """Returns the names of online nodes.
2189

2190
  This function will also log a warning on stderr with the names of
2191
  the online nodes.
2192

2193
  @param nodes: if not empty, use only this subset of nodes (minus the
2194
      offline ones)
2195
  @param cl: if not None, luxi client to use
2196
  @type nowarn: boolean
2197
  @param nowarn: by default, this function will output a note with the
2198
      offline nodes that are skipped; if this parameter is True the
2199
      note is not displayed
2200
  @type secondary_ips: boolean
2201
  @param secondary_ips: if True, return the secondary IPs instead of the
2202
      names, useful for doing network traffic over the replication interface
2203
      (if any)
2204
  @type filter_master: boolean
2205
  @param filter_master: if True, do not return the master node in the list
2206
      (useful in coordination with secondary_ips where we cannot check our
2207
      node name against the list)
2208

2209
  """
2210
  if cl is None:
2211
    cl = GetClient()
2212

    
2213
  if secondary_ips:
2214
    name_idx = 2
2215
  else:
2216
    name_idx = 0
2217

    
2218
  if filter_master:
2219
    master_node = cl.QueryConfigValues(["master_node"])[0]
2220
    filter_fn = lambda x: x != master_node
2221
  else:
2222
    filter_fn = lambda _: True
2223

    
2224
  result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"],
2225
                         use_locking=False)
2226
  offline = [row[0] for row in result if row[1]]
2227
  if offline and not nowarn:
2228
    ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
2229
  return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])]
2230

    
2231

    
2232
def _ToStream(stream, txt, *args):
2233
  """Write a message to a stream, bypassing the logging system
2234

2235
  @type stream: file object
2236
  @param stream: the file to which we should write
2237
  @type txt: str
2238
  @param txt: the message
2239

2240
  """
2241
  if args:
2242
    args = tuple(args)
2243
    stream.write(txt % args)
2244
  else:
2245
    stream.write(txt)
2246
  stream.write('\n')
2247
  stream.flush()
2248

    
2249

    
2250
def ToStdout(txt, *args):
2251
  """Write a message to stdout only, bypassing the logging system
2252

2253
  This is just a wrapper over _ToStream.
2254

2255
  @type txt: str
2256
  @param txt: the message
2257

2258
  """
2259
  _ToStream(sys.stdout, txt, *args)
2260

    
2261

    
2262
def ToStderr(txt, *args):
2263
  """Write a message to stderr only, bypassing the logging system
2264

2265
  This is just a wrapper over _ToStream.
2266

2267
  @type txt: str
2268
  @param txt: the message
2269

2270
  """
2271
  _ToStream(sys.stderr, txt, *args)
2272

    
2273

    
2274
class JobExecutor(object):
2275
  """Class which manages the submission and execution of multiple jobs.
2276

2277
  Note that instances of this class should not be reused between
2278
  GetResults() calls.
2279

2280
  """
2281
  def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
2282
    self.queue = []
2283
    if cl is None:
2284
      cl = GetClient()
2285
    self.cl = cl
2286
    self.verbose = verbose
2287
    self.jobs = []
2288
    self.opts = opts
2289
    self.feedback_fn = feedback_fn
2290

    
2291
  def QueueJob(self, name, *ops):
2292
    """Record a job for later submit.
2293

2294
    @type name: string
2295
    @param name: a description of the job, will be used in WaitJobSet
2296
    """
2297
    SetGenericOpcodeOpts(ops, self.opts)
2298
    self.queue.append((name, ops))
2299

    
2300
  def SubmitPending(self, each=False):
2301
    """Submit all pending jobs.
2302

2303
    """
2304
    if each:
2305
      results = []
2306
      for row in self.queue:
2307
        # SubmitJob will remove the success status, but raise an exception if
2308
        # the submission fails, so we'll notice that anyway.
2309
        results.append([True, self.cl.SubmitJob(row[1])])
2310
    else:
2311
      results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
2312
    for (idx, ((status, data), (name, _))) in enumerate(zip(results,
2313
                                                            self.queue)):
2314
      self.jobs.append((idx, status, data, name))
2315

    
2316
  def _ChooseJob(self):
2317
    """Choose a non-waiting/queued job to poll next.
2318

2319
    """
2320
    assert self.jobs, "_ChooseJob called with empty job list"
2321

    
2322
    result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
2323
    assert result
2324

    
2325
    for job_data, status in zip(self.jobs, result):
2326
      if status[0] in (constants.JOB_STATUS_QUEUED,
2327
                    constants.JOB_STATUS_WAITLOCK,
2328
                    constants.JOB_STATUS_CANCELING):
2329
        # job is still waiting
2330
        continue
2331
      # good candidate found
2332
      self.jobs.remove(job_data)
2333
      return job_data
2334

    
2335
    # no job found
2336
    return self.jobs.pop(0)
2337

    
2338
  def GetResults(self):
2339
    """Wait for and return the results of all jobs.
2340

2341
    @rtype: list
2342
    @return: list of tuples (success, job results), in the same order
2343
        as the submitted jobs; if a job has failed, instead of the result
2344
        there will be the error message
2345

2346
    """
2347
    if not self.jobs:
2348
      self.SubmitPending()
2349
    results = []
2350
    if self.verbose:
2351
      ok_jobs = [row[2] for row in self.jobs if row[1]]
2352
      if ok_jobs:
2353
        ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2354

    
2355
    # first, remove any non-submitted jobs
2356
    self.jobs, failures = compat.partition(self.jobs, lambda x: x[1])
2357
    for idx, _, jid, name in failures:
2358
      ToStderr("Failed to submit job for %s: %s", name, jid)
2359
      results.append((idx, False, jid))
2360

    
2361
    while self.jobs:
2362
      (idx, _, jid, name) = self._ChooseJob()
2363
      ToStdout("Waiting for job %s for %s...", jid, name)
2364
      try:
2365
        job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2366
        success = True
2367
      except (errors.GenericError, luxi.ProtocolError), err:
2368
        _, job_result = FormatError(err)
2369
        success = False
2370
        # the error message will always be shown, verbose or not
2371
        ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
2372

    
2373
      results.append((idx, success, job_result))
2374

    
2375
    # sort based on the index, then drop it
2376
    results.sort()
2377
    results = [i[1:] for i in results]
2378

    
2379
    return results
2380

    
2381
  def WaitOrShow(self, wait):
2382
    """Wait for job results or only print the job IDs.
2383

2384
    @type wait: boolean
2385
    @param wait: whether to wait or not
2386

2387
    """
2388
    if wait:
2389
      return self.GetResults()
2390
    else:
2391
      if not self.jobs:
2392
        self.SubmitPending()
2393
      for _, status, result, name in self.jobs:
2394
        if status:
2395
          ToStdout("%s: %s", result, name)
2396
        else:
2397
          ToStderr("Failure for %s: %s", name, result)
2398
      return [row[1:3] for row in self.jobs]