Statistics
| Branch: | Tag: | Revision:

root / lib / cli.py @ 91c622a8

History | View | Annotate | Download (78.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module dealing with command line parsing"""
23

    
24

    
25
import sys
26
import textwrap
27
import os.path
28
import time
29
import logging
30
from cStringIO import StringIO
31

    
32
from ganeti import utils
33
from ganeti import errors
34
from ganeti import constants
35
from ganeti import opcodes
36
from ganeti import luxi
37
from ganeti import ssconf
38
from ganeti import rpc
39
from ganeti import ssh
40
from ganeti import compat
41
from ganeti import netutils
42

    
43
from optparse import (OptionParser, TitledHelpFormatter,
44
                      Option, OptionValueError)
45

    
46

    
47
__all__ = [
48
  # Command line options
49
  "ADD_UIDS_OPT",
50
  "ALLOCATABLE_OPT",
51
  "ALL_OPT",
52
  "AUTO_PROMOTE_OPT",
53
  "AUTO_REPLACE_OPT",
54
  "BACKEND_OPT",
55
  "CLEANUP_OPT",
56
  "CLUSTER_DOMAIN_SECRET_OPT",
57
  "CONFIRM_OPT",
58
  "CP_SIZE_OPT",
59
  "DEBUG_OPT",
60
  "DEBUG_SIMERR_OPT",
61
  "DISKIDX_OPT",
62
  "DISK_OPT",
63
  "DISK_TEMPLATE_OPT",
64
  "DRAINED_OPT",
65
  "DRBD_HELPER_OPT",
66
  "EARLY_RELEASE_OPT",
67
  "ENABLED_HV_OPT",
68
  "ERROR_CODES_OPT",
69
  "FIELDS_OPT",
70
  "FILESTORE_DIR_OPT",
71
  "FILESTORE_DRIVER_OPT",
72
  "FORCE_OPT",
73
  "FORCE_VARIANT_OPT",
74
  "GLOBAL_FILEDIR_OPT",
75
  "HVLIST_OPT",
76
  "HVOPTS_OPT",
77
  "HYPERVISOR_OPT",
78
  "IALLOCATOR_OPT",
79
  "DEFAULT_IALLOCATOR_OPT",
80
  "IDENTIFY_DEFAULTS_OPT",
81
  "IGNORE_CONSIST_OPT",
82
  "IGNORE_FAILURES_OPT",
83
  "IGNORE_REMOVE_FAILURES_OPT",
84
  "IGNORE_SECONDARIES_OPT",
85
  "IGNORE_SIZE_OPT",
86
  "MAC_PREFIX_OPT",
87
  "MAINTAIN_NODE_HEALTH_OPT",
88
  "MASTER_NETDEV_OPT",
89
  "MC_OPT",
90
  "MIGRATION_MODE_OPT",
91
  "NET_OPT",
92
  "NEW_CLUSTER_CERT_OPT",
93
  "NEW_CLUSTER_DOMAIN_SECRET_OPT",
94
  "NEW_CONFD_HMAC_KEY_OPT",
95
  "NEW_RAPI_CERT_OPT",
96
  "NEW_SECONDARY_OPT",
97
  "NIC_PARAMS_OPT",
98
  "NODE_LIST_OPT",
99
  "NODE_PLACEMENT_OPT",
100
  "NODRBD_STORAGE_OPT",
101
  "NOHDR_OPT",
102
  "NOIPCHECK_OPT",
103
  "NO_INSTALL_OPT",
104
  "NONAMECHECK_OPT",
105
  "NOLVM_STORAGE_OPT",
106
  "NOMODIFY_ETCHOSTS_OPT",
107
  "NOMODIFY_SSH_SETUP_OPT",
108
  "NONICS_OPT",
109
  "NONLIVE_OPT",
110
  "NONPLUS1_OPT",
111
  "NOSHUTDOWN_OPT",
112
  "NOSTART_OPT",
113
  "NOSSH_KEYCHECK_OPT",
114
  "NOVOTING_OPT",
115
  "NWSYNC_OPT",
116
  "ON_PRIMARY_OPT",
117
  "ON_SECONDARY_OPT",
118
  "OFFLINE_OPT",
119
  "OSPARAMS_OPT",
120
  "OS_OPT",
121
  "OS_SIZE_OPT",
122
  "RAPI_CERT_OPT",
123
  "READD_OPT",
124
  "REBOOT_TYPE_OPT",
125
  "REMOVE_INSTANCE_OPT",
126
  "REMOVE_UIDS_OPT",
127
  "RESERVED_LVS_OPT",
128
  "ROMAN_OPT",
129
  "SECONDARY_IP_OPT",
130
  "SELECT_OS_OPT",
131
  "SEP_OPT",
132
  "SHOWCMD_OPT",
133
  "SHUTDOWN_TIMEOUT_OPT",
134
  "SINGLE_NODE_OPT",
135
  "SRC_DIR_OPT",
136
  "SRC_NODE_OPT",
137
  "SUBMIT_OPT",
138
  "STATIC_OPT",
139
  "SYNC_OPT",
140
  "TAG_SRC_OPT",
141
  "TIMEOUT_OPT",
142
  "UIDPOOL_OPT",
143
  "USEUNITS_OPT",
144
  "USE_REPL_NET_OPT",
145
  "VERBOSE_OPT",
146
  "VG_NAME_OPT",
147
  "YES_DOIT_OPT",
148
  # Generic functions for CLI programs
149
  "GenericMain",
150
  "GenericInstanceCreate",
151
  "GetClient",
152
  "GetOnlineNodes",
153
  "JobExecutor",
154
  "JobSubmittedException",
155
  "ParseTimespec",
156
  "RunWhileClusterStopped",
157
  "SubmitOpCode",
158
  "SubmitOrSend",
159
  "UsesRPC",
160
  # Formatting functions
161
  "ToStderr", "ToStdout",
162
  "FormatError",
163
  "GenerateTable",
164
  "AskUser",
165
  "FormatTimestamp",
166
  "FormatLogMessage",
167
  # Tags functions
168
  "ListTags",
169
  "AddTags",
170
  "RemoveTags",
171
  # command line options support infrastructure
172
  "ARGS_MANY_INSTANCES",
173
  "ARGS_MANY_NODES",
174
  "ARGS_NONE",
175
  "ARGS_ONE_INSTANCE",
176
  "ARGS_ONE_NODE",
177
  "ARGS_ONE_OS",
178
  "ArgChoice",
179
  "ArgCommand",
180
  "ArgFile",
181
  "ArgHost",
182
  "ArgInstance",
183
  "ArgJobId",
184
  "ArgNode",
185
  "ArgOs",
186
  "ArgSuggest",
187
  "ArgUnknown",
188
  "OPT_COMPL_INST_ADD_NODES",
189
  "OPT_COMPL_MANY_NODES",
190
  "OPT_COMPL_ONE_IALLOCATOR",
191
  "OPT_COMPL_ONE_INSTANCE",
192
  "OPT_COMPL_ONE_NODE",
193
  "OPT_COMPL_ONE_OS",
194
  "cli_option",
195
  "SplitNodeOption",
196
  "CalculateOSNames",
197
  ]
198

    
199
NO_PREFIX = "no_"
200
UN_PREFIX = "-"
201

    
202

    
203
class _Argument:
204
  def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
205
    self.min = min
206
    self.max = max
207

    
208
  def __repr__(self):
209
    return ("<%s min=%s max=%s>" %
210
            (self.__class__.__name__, self.min, self.max))
211

    
212

    
213
class ArgSuggest(_Argument):
214
  """Suggesting argument.
215

216
  Value can be any of the ones passed to the constructor.
217

218
  """
219
  # pylint: disable-msg=W0622
220
  def __init__(self, min=0, max=None, choices=None):
221
    _Argument.__init__(self, min=min, max=max)
222
    self.choices = choices
223

    
224
  def __repr__(self):
225
    return ("<%s min=%s max=%s choices=%r>" %
226
            (self.__class__.__name__, self.min, self.max, self.choices))
227

    
228

    
229
class ArgChoice(ArgSuggest):
230
  """Choice argument.
231

232
  Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
233
  but value must be one of the choices.
234

235
  """
236

    
237

    
238
class ArgUnknown(_Argument):
239
  """Unknown argument to program (e.g. determined at runtime).
240

241
  """
242

    
243

    
244
class ArgInstance(_Argument):
245
  """Instances argument.
246

247
  """
248

    
249

    
250
class ArgNode(_Argument):
251
  """Node argument.
252

253
  """
254

    
255
class ArgJobId(_Argument):
256
  """Job ID argument.
257

258
  """
259

    
260

    
261
class ArgFile(_Argument):
262
  """File path argument.
263

264
  """
265

    
266

    
267
class ArgCommand(_Argument):
268
  """Command argument.
269

270
  """
271

    
272

    
273
class ArgHost(_Argument):
274
  """Host argument.
275

276
  """
277

    
278

    
279
class ArgOs(_Argument):
280
  """OS argument.
281

282
  """
283

    
284

    
285
ARGS_NONE = []
286
ARGS_MANY_INSTANCES = [ArgInstance()]
287
ARGS_MANY_NODES = [ArgNode()]
288
ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
289
ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
290
ARGS_ONE_OS = [ArgOs(min=1, max=1)]
291

    
292

    
293
def _ExtractTagsObject(opts, args):
294
  """Extract the tag type object.
295

296
  Note that this function will modify its args parameter.
297

298
  """
299
  if not hasattr(opts, "tag_type"):
300
    raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
301
  kind = opts.tag_type
302
  if kind == constants.TAG_CLUSTER:
303
    retval = kind, kind
304
  elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
305
    if not args:
306
      raise errors.OpPrereqError("no arguments passed to the command")
307
    name = args.pop(0)
308
    retval = kind, name
309
  else:
310
    raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
311
  return retval
312

    
313

    
314
def _ExtendTags(opts, args):
315
  """Extend the args if a source file has been given.
316

317
  This function will extend the tags with the contents of the file
318
  passed in the 'tags_source' attribute of the opts parameter. A file
319
  named '-' will be replaced by stdin.
320

321
  """
322
  fname = opts.tags_source
323
  if fname is None:
324
    return
325
  if fname == "-":
326
    new_fh = sys.stdin
327
  else:
328
    new_fh = open(fname, "r")
329
  new_data = []
330
  try:
331
    # we don't use the nice 'new_data = [line.strip() for line in fh]'
332
    # because of python bug 1633941
333
    while True:
334
      line = new_fh.readline()
335
      if not line:
336
        break
337
      new_data.append(line.strip())
338
  finally:
339
    new_fh.close()
340
  args.extend(new_data)
341

    
342

    
343
def ListTags(opts, args):
344
  """List the tags on a given object.
345

346
  This is a generic implementation that knows how to deal with all
347
  three cases of tag objects (cluster, node, instance). The opts
348
  argument is expected to contain a tag_type field denoting what
349
  object type we work on.
350

351
  """
352
  kind, name = _ExtractTagsObject(opts, args)
353
  cl = GetClient()
354
  result = cl.QueryTags(kind, name)
355
  result = list(result)
356
  result.sort()
357
  for tag in result:
358
    ToStdout(tag)
359

    
360

    
361
def AddTags(opts, args):
362
  """Add tags on a given object.
363

364
  This is a generic implementation that knows how to deal with all
365
  three cases of tag objects (cluster, node, instance). The opts
366
  argument is expected to contain a tag_type field denoting what
367
  object type we work on.
368

369
  """
370
  kind, name = _ExtractTagsObject(opts, args)
371
  _ExtendTags(opts, args)
372
  if not args:
373
    raise errors.OpPrereqError("No tags to be added")
374
  op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
375
  SubmitOpCode(op)
376

    
377

    
378
def RemoveTags(opts, args):
379
  """Remove tags from a given object.
380

381
  This is a generic implementation that knows how to deal with all
382
  three cases of tag objects (cluster, node, instance). The opts
383
  argument is expected to contain a tag_type field denoting what
384
  object type we work on.
385

386
  """
387
  kind, name = _ExtractTagsObject(opts, args)
388
  _ExtendTags(opts, args)
389
  if not args:
390
    raise errors.OpPrereqError("No tags to be removed")
391
  op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
392
  SubmitOpCode(op)
393

    
394

    
395
def check_unit(option, opt, value): # pylint: disable-msg=W0613
396
  """OptParsers custom converter for units.
397

398
  """
399
  try:
400
    return utils.ParseUnit(value)
401
  except errors.UnitParseError, err:
402
    raise OptionValueError("option %s: %s" % (opt, err))
403

    
404

    
405
def _SplitKeyVal(opt, data):
406
  """Convert a KeyVal string into a dict.
407

408
  This function will convert a key=val[,...] string into a dict. Empty
409
  values will be converted specially: keys which have the prefix 'no_'
410
  will have the value=False and the prefix stripped, the others will
411
  have value=True.
412

413
  @type opt: string
414
  @param opt: a string holding the option name for which we process the
415
      data, used in building error messages
416
  @type data: string
417
  @param data: a string of the format key=val,key=val,...
418
  @rtype: dict
419
  @return: {key=val, key=val}
420
  @raises errors.ParameterError: if there are duplicate keys
421

422
  """
423
  kv_dict = {}
424
  if data:
425
    for elem in utils.UnescapeAndSplit(data, sep=","):
426
      if "=" in elem:
427
        key, val = elem.split("=", 1)
428
      else:
429
        if elem.startswith(NO_PREFIX):
430
          key, val = elem[len(NO_PREFIX):], False
431
        elif elem.startswith(UN_PREFIX):
432
          key, val = elem[len(UN_PREFIX):], None
433
        else:
434
          key, val = elem, True
435
      if key in kv_dict:
436
        raise errors.ParameterError("Duplicate key '%s' in option %s" %
437
                                    (key, opt))
438
      kv_dict[key] = val
439
  return kv_dict
440

    
441

    
442
def check_ident_key_val(option, opt, value):  # pylint: disable-msg=W0613
443
  """Custom parser for ident:key=val,key=val options.
444

445
  This will store the parsed values as a tuple (ident, {key: val}). As such,
446
  multiple uses of this option via action=append is possible.
447

448
  """
449
  if ":" not in value:
450
    ident, rest = value, ''
451
  else:
452
    ident, rest = value.split(":", 1)
453

    
454
  if ident.startswith(NO_PREFIX):
455
    if rest:
456
      msg = "Cannot pass options when removing parameter groups: %s" % value
457
      raise errors.ParameterError(msg)
458
    retval = (ident[len(NO_PREFIX):], False)
459
  elif ident.startswith(UN_PREFIX):
460
    if rest:
461
      msg = "Cannot pass options when removing parameter groups: %s" % value
462
      raise errors.ParameterError(msg)
463
    retval = (ident[len(UN_PREFIX):], None)
464
  else:
465
    kv_dict = _SplitKeyVal(opt, rest)
466
    retval = (ident, kv_dict)
467
  return retval
468

    
469

    
470
def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
471
  """Custom parser class for key=val,key=val options.
472

473
  This will store the parsed values as a dict {key: val}.
474

475
  """
476
  return _SplitKeyVal(opt, value)
477

    
478

    
479
def check_bool(option, opt, value): # pylint: disable-msg=W0613
480
  """Custom parser for yes/no options.
481

482
  This will store the parsed value as either True or False.
483

484
  """
485
  value = value.lower()
486
  if value == constants.VALUE_FALSE or value == "no":
487
    return False
488
  elif value == constants.VALUE_TRUE or value == "yes":
489
    return True
490
  else:
491
    raise errors.ParameterError("Invalid boolean value '%s'" % value)
492

    
493

    
494
# completion_suggestion is normally a list. Using numeric values not evaluating
495
# to False for dynamic completion.
496
(OPT_COMPL_MANY_NODES,
497
 OPT_COMPL_ONE_NODE,
498
 OPT_COMPL_ONE_INSTANCE,
499
 OPT_COMPL_ONE_OS,
500
 OPT_COMPL_ONE_IALLOCATOR,
501
 OPT_COMPL_INST_ADD_NODES) = range(100, 106)
502

    
503
OPT_COMPL_ALL = frozenset([
504
  OPT_COMPL_MANY_NODES,
505
  OPT_COMPL_ONE_NODE,
506
  OPT_COMPL_ONE_INSTANCE,
507
  OPT_COMPL_ONE_OS,
508
  OPT_COMPL_ONE_IALLOCATOR,
509
  OPT_COMPL_INST_ADD_NODES,
510
  ])
511

    
512

    
513
class CliOption(Option):
514
  """Custom option class for optparse.
515

516
  """
517
  ATTRS = Option.ATTRS + [
518
    "completion_suggest",
519
    ]
520
  TYPES = Option.TYPES + (
521
    "identkeyval",
522
    "keyval",
523
    "unit",
524
    "bool",
525
    )
526
  TYPE_CHECKER = Option.TYPE_CHECKER.copy()
527
  TYPE_CHECKER["identkeyval"] = check_ident_key_val
528
  TYPE_CHECKER["keyval"] = check_key_val
529
  TYPE_CHECKER["unit"] = check_unit
530
  TYPE_CHECKER["bool"] = check_bool
531

    
532

    
533
# optparse.py sets make_option, so we do it for our own option class, too
534
cli_option = CliOption
535

    
536

    
537
_YORNO = "yes|no"
538

    
539
DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
540
                       help="Increase debugging level")
541

    
542
NOHDR_OPT = cli_option("--no-headers", default=False,
543
                       action="store_true", dest="no_headers",
544
                       help="Don't display column headers")
545

    
546
SEP_OPT = cli_option("--separator", default=None,
547
                     action="store", dest="separator",
548
                     help=("Separator between output fields"
549
                           " (defaults to one space)"))
550

    
551
USEUNITS_OPT = cli_option("--units", default=None,
552
                          dest="units", choices=('h', 'm', 'g', 't'),
553
                          help="Specify units for output (one of hmgt)")
554

    
555
FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
556
                        type="string", metavar="FIELDS",
557
                        help="Comma separated list of output fields")
558

    
559
FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
560
                       default=False, help="Force the operation")
561

    
562
CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
563
                         default=False, help="Do not require confirmation")
564

    
565
TAG_SRC_OPT = cli_option("--from", dest="tags_source",
566
                         default=None, help="File with tag names")
567

    
568
SUBMIT_OPT = cli_option("--submit", dest="submit_only",
569
                        default=False, action="store_true",
570
                        help=("Submit the job and return the job ID, but"
571
                              " don't wait for the job to finish"))
572

    
573
SYNC_OPT = cli_option("--sync", dest="do_locking",
574
                      default=False, action="store_true",
575
                      help=("Grab locks while doing the queries"
576
                            " in order to ensure more consistent results"))
577

    
578
_DRY_RUN_OPT = cli_option("--dry-run", default=False,
579
                          action="store_true",
580
                          help=("Do not execute the operation, just run the"
581
                                " check steps and verify it it could be"
582
                                " executed"))
583

    
584
VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
585
                         action="store_true",
586
                         help="Increase the verbosity of the operation")
587

    
588
DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
589
                              action="store_true", dest="simulate_errors",
590
                              help="Debugging option that makes the operation"
591
                              " treat most runtime checks as failed")
592

    
593
NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
594
                        default=True, action="store_false",
595
                        help="Don't wait for sync (DANGEROUS!)")
596

    
597
DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
598
                               help="Custom disk setup (diskless, file,"
599
                               " plain or drbd)",
600
                               default=None, metavar="TEMPL",
601
                               choices=list(constants.DISK_TEMPLATES))
602

    
603
NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
604
                        help="Do not create any network cards for"
605
                        " the instance")
606

    
607
FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
608
                               help="Relative path under default cluster-wide"
609
                               " file storage dir to store file-based disks",
610
                               default=None, metavar="<DIR>")
611

    
612
FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
613
                                  help="Driver to use for image files",
614
                                  default="loop", metavar="<DRIVER>",
615
                                  choices=list(constants.FILE_DRIVER))
616

    
617
IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
618
                            help="Select nodes for the instance automatically"
619
                            " using the <NAME> iallocator plugin",
620
                            default=None, type="string",
621
                            completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
622

    
623
DEFAULT_IALLOCATOR_OPT = cli_option("-I", "--default-iallocator",
624
                            metavar="<NAME>",
625
                            help="Set the default instance allocator plugin",
626
                            default=None, type="string",
627
                            completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
628

    
629
OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
630
                    metavar="<os>",
631
                    completion_suggest=OPT_COMPL_ONE_OS)
632

    
633
OSPARAMS_OPT = cli_option("-O", "--os-parameters", dest="osparams",
634
                         type="keyval", default={},
635
                         help="OS parameters")
636

    
637
FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
638
                               action="store_true", default=False,
639
                               help="Force an unknown variant")
640

    
641
NO_INSTALL_OPT = cli_option("--no-install", dest="no_install",
642
                            action="store_true", default=False,
643
                            help="Do not install the OS (will"
644
                            " enable no-start)")
645

    
646
BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
647
                         type="keyval", default={},
648
                         help="Backend parameters")
649

    
650
HVOPTS_OPT =  cli_option("-H", "--hypervisor-parameters", type="keyval",
651
                         default={}, dest="hvparams",
652
                         help="Hypervisor parameters")
653

    
654
HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
655
                            help="Hypervisor and hypervisor options, in the"
656
                            " format hypervisor:option=value,option=value,...",
657
                            default=None, type="identkeyval")
658

    
659
HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
660
                        help="Hypervisor and hypervisor options, in the"
661
                        " format hypervisor:option=value,option=value,...",
662
                        default=[], action="append", type="identkeyval")
663

    
664
NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
665
                           action="store_false",
666
                           help="Don't check that the instance's IP"
667
                           " is alive")
668

    
669
NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
670
                             default=True, action="store_false",
671
                             help="Don't check that the instance's name"
672
                             " is resolvable")
673

    
674
NET_OPT = cli_option("--net",
675
                     help="NIC parameters", default=[],
676
                     dest="nics", action="append", type="identkeyval")
677

    
678
DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
679
                      dest="disks", action="append", type="identkeyval")
680

    
681
DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
682
                         help="Comma-separated list of disks"
683
                         " indices to act on (e.g. 0,2) (optional,"
684
                         " defaults to all disks)")
685

    
686
OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
687
                         help="Enforces a single-disk configuration using the"
688
                         " given disk size, in MiB unless a suffix is used",
689
                         default=None, type="unit", metavar="<size>")
690

    
691
IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
692
                                dest="ignore_consistency",
693
                                action="store_true", default=False,
694
                                help="Ignore the consistency of the disks on"
695
                                " the secondary")
696

    
697
NONLIVE_OPT = cli_option("--non-live", dest="live",
698
                         default=True, action="store_false",
699
                         help="Do a non-live migration (this usually means"
700
                         " freeze the instance, save the state, transfer and"
701
                         " only then resume running on the secondary node)")
702

    
703
MIGRATION_MODE_OPT = cli_option("--migration-mode", dest="migration_mode",
704
                                default=None,
705
                                choices=list(constants.HT_MIGRATION_MODES),
706
                                help="Override default migration mode (choose"
707
                                " either live or non-live")
708

    
709
NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
710
                                help="Target node and optional secondary node",
711
                                metavar="<pnode>[:<snode>]",
712
                                completion_suggest=OPT_COMPL_INST_ADD_NODES)
713

    
714
NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
715
                           action="append", metavar="<node>",
716
                           help="Use only this node (can be used multiple"
717
                           " times, if not given defaults to all nodes)",
718
                           completion_suggest=OPT_COMPL_ONE_NODE)
719

    
720
SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
721
                             metavar="<node>",
722
                             completion_suggest=OPT_COMPL_ONE_NODE)
723

    
724
NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
725
                         action="store_false",
726
                         help="Don't start the instance after creation")
727

    
728
SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
729
                         action="store_true", default=False,
730
                         help="Show command instead of executing it")
731

    
732
CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
733
                         default=False, action="store_true",
734
                         help="Instead of performing the migration, try to"
735
                         " recover from a failed cleanup. This is safe"
736
                         " to run even if the instance is healthy, but it"
737
                         " will create extra replication traffic and "
738
                         " disrupt briefly the replication (like during the"
739
                         " migration")
740

    
741
STATIC_OPT = cli_option("-s", "--static", dest="static",
742
                        action="store_true", default=False,
743
                        help="Only show configuration data, not runtime data")
744

    
745
ALL_OPT = cli_option("--all", dest="show_all",
746
                     default=False, action="store_true",
747
                     help="Show info on all instances on the cluster."
748
                     " This can take a long time to run, use wisely")
749

    
750
SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
751
                           action="store_true", default=False,
752
                           help="Interactive OS reinstall, lists available"
753
                           " OS templates for selection")
754

    
755
IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
756
                                 action="store_true", default=False,
757
                                 help="Remove the instance from the cluster"
758
                                 " configuration even if there are failures"
759
                                 " during the removal process")
760

    
761
IGNORE_REMOVE_FAILURES_OPT = cli_option("--ignore-remove-failures",
762
                                        dest="ignore_remove_failures",
763
                                        action="store_true", default=False,
764
                                        help="Remove the instance from the"
765
                                        " cluster configuration even if there"
766
                                        " are failures during the removal"
767
                                        " process")
768

    
769
REMOVE_INSTANCE_OPT = cli_option("--remove-instance", dest="remove_instance",
770
                                 action="store_true", default=False,
771
                                 help="Remove the instance from the cluster")
772

    
773
NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
774
                               help="Specifies the new secondary node",
775
                               metavar="NODE", default=None,
776
                               completion_suggest=OPT_COMPL_ONE_NODE)
777

    
778
ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
779
                            default=False, action="store_true",
780
                            help="Replace the disk(s) on the primary"
781
                            " node (only for the drbd template)")
782

    
783
ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
784
                              default=False, action="store_true",
785
                              help="Replace the disk(s) on the secondary"
786
                              " node (only for the drbd template)")
787

    
788
AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
789
                              default=False, action="store_true",
790
                              help="Lock all nodes and auto-promote as needed"
791
                              " to MC status")
792

    
793
AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
794
                              default=False, action="store_true",
795
                              help="Automatically replace faulty disks"
796
                              " (only for the drbd template)")
797

    
798
IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
799
                             default=False, action="store_true",
800
                             help="Ignore current recorded size"
801
                             " (useful for forcing activation when"
802
                             " the recorded size is wrong)")
803

    
804
SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
805
                          metavar="<node>",
806
                          completion_suggest=OPT_COMPL_ONE_NODE)
807

    
808
SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
809
                         metavar="<dir>")
810

    
811
SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
812
                              help="Specify the secondary ip for the node",
813
                              metavar="ADDRESS", default=None)
814

    
815
READD_OPT = cli_option("--readd", dest="readd",
816
                       default=False, action="store_true",
817
                       help="Readd old node after replacing it")
818

    
819
NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
820
                                default=True, action="store_false",
821
                                help="Disable SSH key fingerprint checking")
822

    
823

    
824
MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
825
                    type="bool", default=None, metavar=_YORNO,
826
                    help="Set the master_candidate flag on the node")
827

    
828
OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
829
                         type="bool", default=None,
830
                         help="Set the offline flag on the node")
831

    
832
DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
833
                         type="bool", default=None,
834
                         help="Set the drained flag on the node")
835

    
836
ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
837
                             type="bool", default=None, metavar=_YORNO,
838
                             help="Set the allocatable flag on a volume")
839

    
840
NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
841
                               help="Disable support for lvm based instances"
842
                               " (cluster-wide)",
843
                               action="store_false", default=True)
844

    
845
ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
846
                            dest="enabled_hypervisors",
847
                            help="Comma-separated list of hypervisors",
848
                            type="string", default=None)
849

    
850
NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
851
                            type="keyval", default={},
852
                            help="NIC parameters")
853

    
854
CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
855
                         dest="candidate_pool_size", type="int",
856
                         help="Set the candidate pool size")
857

    
858
VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name",
859
                         help="Enables LVM and specifies the volume group"
860
                         " name (cluster-wide) for disk allocation [xenvg]",
861
                         metavar="VG", default=None)
862

    
863
YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
864
                          help="Destroy cluster", action="store_true")
865

    
866
NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
867
                          help="Skip node agreement check (dangerous)",
868
                          action="store_true", default=False)
869

    
870
MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
871
                            help="Specify the mac prefix for the instance IP"
872
                            " addresses, in the format XX:XX:XX",
873
                            metavar="PREFIX",
874
                            default=None)
875

    
876
MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
877
                               help="Specify the node interface (cluster-wide)"
878
                               " on which the master IP address will be added "
879
                               " [%s]" % constants.DEFAULT_BRIDGE,
880
                               metavar="NETDEV",
881
                               default=constants.DEFAULT_BRIDGE)
882

    
883
GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
884
                                help="Specify the default directory (cluster-"
885
                                "wide) for storing the file-based disks [%s]" %
886
                                constants.DEFAULT_FILE_STORAGE_DIR,
887
                                metavar="DIR",
888
                                default=constants.DEFAULT_FILE_STORAGE_DIR)
889

    
890
NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
891
                                   help="Don't modify /etc/hosts",
892
                                   action="store_false", default=True)
893

    
894
NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
895
                                    help="Don't initialize SSH keys",
896
                                    action="store_false", default=True)
897

    
898
ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
899
                             help="Enable parseable error messages",
900
                             action="store_true", default=False)
901

    
902
NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
903
                          help="Skip N+1 memory redundancy tests",
904
                          action="store_true", default=False)
905

    
906
REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
907
                             help="Type of reboot: soft/hard/full",
908
                             default=constants.INSTANCE_REBOOT_HARD,
909
                             metavar="<REBOOT>",
910
                             choices=list(constants.REBOOT_TYPES))
911

    
912
IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
913
                                    dest="ignore_secondaries",
914
                                    default=False, action="store_true",
915
                                    help="Ignore errors from secondaries")
916

    
917
NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
918
                            action="store_false", default=True,
919
                            help="Don't shutdown the instance (unsafe)")
920

    
921
TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
922
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
923
                         help="Maximum time to wait")
924

    
925
SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
926
                         dest="shutdown_timeout", type="int",
927
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
928
                         help="Maximum time to wait for instance shutdown")
929

    
930
EARLY_RELEASE_OPT = cli_option("--early-release",
931
                               dest="early_release", default=False,
932
                               action="store_true",
933
                               help="Release the locks on the secondary"
934
                               " node(s) early")
935

    
936
NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
937
                                  dest="new_cluster_cert",
938
                                  default=False, action="store_true",
939
                                  help="Generate a new cluster certificate")
940

    
941
RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
942
                           default=None,
943
                           help="File containing new RAPI certificate")
944

    
945
NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
946
                               default=None, action="store_true",
947
                               help=("Generate a new self-signed RAPI"
948
                                     " certificate"))
949

    
950
NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
951
                                    dest="new_confd_hmac_key",
952
                                    default=False, action="store_true",
953
                                    help=("Create a new HMAC key for %s" %
954
                                          constants.CONFD))
955

    
956
CLUSTER_DOMAIN_SECRET_OPT = cli_option("--cluster-domain-secret",
957
                                       dest="cluster_domain_secret",
958
                                       default=None,
959
                                       help=("Load new new cluster domain"
960
                                             " secret from file"))
961

    
962
NEW_CLUSTER_DOMAIN_SECRET_OPT = cli_option("--new-cluster-domain-secret",
963
                                           dest="new_cluster_domain_secret",
964
                                           default=False, action="store_true",
965
                                           help=("Create a new cluster domain"
966
                                                 " secret"))
967

    
968
USE_REPL_NET_OPT = cli_option("--use-replication-network",
969
                              dest="use_replication_network",
970
                              help="Whether to use the replication network"
971
                              " for talking to the nodes",
972
                              action="store_true", default=False)
973

    
974
MAINTAIN_NODE_HEALTH_OPT = \
975
    cli_option("--maintain-node-health", dest="maintain_node_health",
976
               metavar=_YORNO, default=None, type="bool",
977
               help="Configure the cluster to automatically maintain node"
978
               " health, by shutting down unknown instances, shutting down"
979
               " unknown DRBD devices, etc.")
980

    
981
IDENTIFY_DEFAULTS_OPT = \
982
    cli_option("--identify-defaults", dest="identify_defaults",
983
               default=False, action="store_true",
984
               help="Identify which saved instance parameters are equal to"
985
               " the current cluster defaults and set them as such, instead"
986
               " of marking them as overridden")
987

    
988
UIDPOOL_OPT = cli_option("--uid-pool", default=None,
989
                         action="store", dest="uid_pool",
990
                         help=("A list of user-ids or user-id"
991
                               " ranges separated by commas"))
992

    
993
ADD_UIDS_OPT = cli_option("--add-uids", default=None,
994
                          action="store", dest="add_uids",
995
                          help=("A list of user-ids or user-id"
996
                                " ranges separated by commas, to be"
997
                                " added to the user-id pool"))
998

    
999
REMOVE_UIDS_OPT = cli_option("--remove-uids", default=None,
1000
                             action="store", dest="remove_uids",
1001
                             help=("A list of user-ids or user-id"
1002
                                   " ranges separated by commas, to be"
1003
                                   " removed from the user-id pool"))
1004

    
1005
RESERVED_LVS_OPT = cli_option("--reserved-lvs", default=None,
1006
                             action="store", dest="reserved_lvs",
1007
                             help=("A comma-separated list of reserved"
1008
                                   " logical volumes names, that will be"
1009
                                   " ignored by cluster verify"))
1010

    
1011
ROMAN_OPT = cli_option("--roman",
1012
                       dest="roman_integers", default=False,
1013
                       action="store_true",
1014
                       help="Use roman numbers for positive integers")
1015

    
1016
DRBD_HELPER_OPT = cli_option("--drbd-usermode-helper", dest="drbd_helper",
1017
                             action="store", default=None,
1018
                             help="Specifies usermode helper for DRBD")
1019

    
1020
NODRBD_STORAGE_OPT = cli_option("--no-drbd-storage", dest="drbd_storage",
1021
                                action="store_false", default=True,
1022
                                help="Disable support for DRBD")
1023

    
1024

    
1025
def _ParseArgs(argv, commands, aliases):
1026
  """Parser for the command line arguments.
1027

1028
  This function parses the arguments and returns the function which
1029
  must be executed together with its (modified) arguments.
1030

1031
  @param argv: the command line
1032
  @param commands: dictionary with special contents, see the design
1033
      doc for cmdline handling
1034
  @param aliases: dictionary with command aliases {'alias': 'target, ...}
1035

1036
  """
1037
  if len(argv) == 0:
1038
    binary = "<command>"
1039
  else:
1040
    binary = argv[0].split("/")[-1]
1041

    
1042
  if len(argv) > 1 and argv[1] == "--version":
1043
    ToStdout("%s (ganeti %s) %s", binary, constants.VCS_VERSION,
1044
             constants.RELEASE_VERSION)
1045
    # Quit right away. That way we don't have to care about this special
1046
    # argument. optparse.py does it the same.
1047
    sys.exit(0)
1048

    
1049
  if len(argv) < 2 or not (argv[1] in commands or
1050
                           argv[1] in aliases):
1051
    # let's do a nice thing
1052
    sortedcmds = commands.keys()
1053
    sortedcmds.sort()
1054

    
1055
    ToStdout("Usage: %s {command} [options...] [argument...]", binary)
1056
    ToStdout("%s <command> --help to see details, or man %s", binary, binary)
1057
    ToStdout("")
1058

    
1059
    # compute the max line length for cmd + usage
1060
    mlen = max([len(" %s" % cmd) for cmd in commands])
1061
    mlen = min(60, mlen) # should not get here...
1062

    
1063
    # and format a nice command list
1064
    ToStdout("Commands:")
1065
    for cmd in sortedcmds:
1066
      cmdstr = " %s" % (cmd,)
1067
      help_text = commands[cmd][4]
1068
      help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
1069
      ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
1070
      for line in help_lines:
1071
        ToStdout("%-*s   %s", mlen, "", line)
1072

    
1073
    ToStdout("")
1074

    
1075
    return None, None, None
1076

    
1077
  # get command, unalias it, and look it up in commands
1078
  cmd = argv.pop(1)
1079
  if cmd in aliases:
1080
    if cmd in commands:
1081
      raise errors.ProgrammerError("Alias '%s' overrides an existing"
1082
                                   " command" % cmd)
1083

    
1084
    if aliases[cmd] not in commands:
1085
      raise errors.ProgrammerError("Alias '%s' maps to non-existing"
1086
                                   " command '%s'" % (cmd, aliases[cmd]))
1087

    
1088
    cmd = aliases[cmd]
1089

    
1090
  func, args_def, parser_opts, usage, description = commands[cmd]
1091
  parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT, DEBUG_OPT],
1092
                        description=description,
1093
                        formatter=TitledHelpFormatter(),
1094
                        usage="%%prog %s %s" % (cmd, usage))
1095
  parser.disable_interspersed_args()
1096
  options, args = parser.parse_args()
1097

    
1098
  if not _CheckArguments(cmd, args_def, args):
1099
    return None, None, None
1100

    
1101
  return func, options, args
1102

    
1103

    
1104
def _CheckArguments(cmd, args_def, args):
1105
  """Verifies the arguments using the argument definition.
1106

1107
  Algorithm:
1108

1109
    1. Abort with error if values specified by user but none expected.
1110

1111
    1. For each argument in definition
1112

1113
      1. Keep running count of minimum number of values (min_count)
1114
      1. Keep running count of maximum number of values (max_count)
1115
      1. If it has an unlimited number of values
1116

1117
        1. Abort with error if it's not the last argument in the definition
1118

1119
    1. If last argument has limited number of values
1120

1121
      1. Abort with error if number of values doesn't match or is too large
1122

1123
    1. Abort with error if user didn't pass enough values (min_count)
1124

1125
  """
1126
  if args and not args_def:
1127
    ToStderr("Error: Command %s expects no arguments", cmd)
1128
    return False
1129

    
1130
  min_count = None
1131
  max_count = None
1132
  check_max = None
1133

    
1134
  last_idx = len(args_def) - 1
1135

    
1136
  for idx, arg in enumerate(args_def):
1137
    if min_count is None:
1138
      min_count = arg.min
1139
    elif arg.min is not None:
1140
      min_count += arg.min
1141

    
1142
    if max_count is None:
1143
      max_count = arg.max
1144
    elif arg.max is not None:
1145
      max_count += arg.max
1146

    
1147
    if idx == last_idx:
1148
      check_max = (arg.max is not None)
1149

    
1150
    elif arg.max is None:
1151
      raise errors.ProgrammerError("Only the last argument can have max=None")
1152

    
1153
  if check_max:
1154
    # Command with exact number of arguments
1155
    if (min_count is not None and max_count is not None and
1156
        min_count == max_count and len(args) != min_count):
1157
      ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
1158
      return False
1159

    
1160
    # Command with limited number of arguments
1161
    if max_count is not None and len(args) > max_count:
1162
      ToStderr("Error: Command %s expects only %d argument(s)",
1163
               cmd, max_count)
1164
      return False
1165

    
1166
  # Command with some required arguments
1167
  if min_count is not None and len(args) < min_count:
1168
    ToStderr("Error: Command %s expects at least %d argument(s)",
1169
             cmd, min_count)
1170
    return False
1171

    
1172
  return True
1173

    
1174

    
1175
def SplitNodeOption(value):
1176
  """Splits the value of a --node option.
1177

1178
  """
1179
  if value and ':' in value:
1180
    return value.split(':', 1)
1181
  else:
1182
    return (value, None)
1183

    
1184

    
1185
def CalculateOSNames(os_name, os_variants):
1186
  """Calculates all the names an OS can be called, according to its variants.
1187

1188
  @type os_name: string
1189
  @param os_name: base name of the os
1190
  @type os_variants: list or None
1191
  @param os_variants: list of supported variants
1192
  @rtype: list
1193
  @return: list of valid names
1194

1195
  """
1196
  if os_variants:
1197
    return ['%s+%s' % (os_name, v) for v in os_variants]
1198
  else:
1199
    return [os_name]
1200

    
1201

    
1202
UsesRPC = rpc.RunWithRPC
1203

    
1204

    
1205
def AskUser(text, choices=None):
1206
  """Ask the user a question.
1207

1208
  @param text: the question to ask
1209

1210
  @param choices: list with elements tuples (input_char, return_value,
1211
      description); if not given, it will default to: [('y', True,
1212
      'Perform the operation'), ('n', False, 'Do no do the operation')];
1213
      note that the '?' char is reserved for help
1214

1215
  @return: one of the return values from the choices list; if input is
1216
      not possible (i.e. not running with a tty, we return the last
1217
      entry from the list
1218

1219
  """
1220
  if choices is None:
1221
    choices = [('y', True, 'Perform the operation'),
1222
               ('n', False, 'Do not perform the operation')]
1223
  if not choices or not isinstance(choices, list):
1224
    raise errors.ProgrammerError("Invalid choices argument to AskUser")
1225
  for entry in choices:
1226
    if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
1227
      raise errors.ProgrammerError("Invalid choices element to AskUser")
1228

    
1229
  answer = choices[-1][1]
1230
  new_text = []
1231
  for line in text.splitlines():
1232
    new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
1233
  text = "\n".join(new_text)
1234
  try:
1235
    f = file("/dev/tty", "a+")
1236
  except IOError:
1237
    return answer
1238
  try:
1239
    chars = [entry[0] for entry in choices]
1240
    chars[-1] = "[%s]" % chars[-1]
1241
    chars.append('?')
1242
    maps = dict([(entry[0], entry[1]) for entry in choices])
1243
    while True:
1244
      f.write(text)
1245
      f.write('\n')
1246
      f.write("/".join(chars))
1247
      f.write(": ")
1248
      line = f.readline(2).strip().lower()
1249
      if line in maps:
1250
        answer = maps[line]
1251
        break
1252
      elif line == '?':
1253
        for entry in choices:
1254
          f.write(" %s - %s\n" % (entry[0], entry[2]))
1255
        f.write("\n")
1256
        continue
1257
  finally:
1258
    f.close()
1259
  return answer
1260

    
1261

    
1262
class JobSubmittedException(Exception):
1263
  """Job was submitted, client should exit.
1264

1265
  This exception has one argument, the ID of the job that was
1266
  submitted. The handler should print this ID.
1267

1268
  This is not an error, just a structured way to exit from clients.
1269

1270
  """
1271

    
1272

    
1273
def SendJob(ops, cl=None):
1274
  """Function to submit an opcode without waiting for the results.
1275

1276
  @type ops: list
1277
  @param ops: list of opcodes
1278
  @type cl: luxi.Client
1279
  @param cl: the luxi client to use for communicating with the master;
1280
             if None, a new client will be created
1281

1282
  """
1283
  if cl is None:
1284
    cl = GetClient()
1285

    
1286
  job_id = cl.SubmitJob(ops)
1287

    
1288
  return job_id
1289

    
1290

    
1291
def GenericPollJob(job_id, cbs, report_cbs):
1292
  """Generic job-polling function.
1293

1294
  @type job_id: number
1295
  @param job_id: Job ID
1296
  @type cbs: Instance of L{JobPollCbBase}
1297
  @param cbs: Data callbacks
1298
  @type report_cbs: Instance of L{JobPollReportCbBase}
1299
  @param report_cbs: Reporting callbacks
1300

1301
  """
1302
  prev_job_info = None
1303
  prev_logmsg_serial = None
1304

    
1305
  status = None
1306

    
1307
  while True:
1308
    result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1309
                                      prev_logmsg_serial)
1310
    if not result:
1311
      # job not found, go away!
1312
      raise errors.JobLost("Job with id %s lost" % job_id)
1313

    
1314
    if result == constants.JOB_NOTCHANGED:
1315
      report_cbs.ReportNotChanged(job_id, status)
1316

    
1317
      # Wait again
1318
      continue
1319

    
1320
    # Split result, a tuple of (field values, log entries)
1321
    (job_info, log_entries) = result
1322
    (status, ) = job_info
1323

    
1324
    if log_entries:
1325
      for log_entry in log_entries:
1326
        (serial, timestamp, log_type, message) = log_entry
1327
        report_cbs.ReportLogMessage(job_id, serial, timestamp,
1328
                                    log_type, message)
1329
        prev_logmsg_serial = max(prev_logmsg_serial, serial)
1330

    
1331
    # TODO: Handle canceled and archived jobs
1332
    elif status in (constants.JOB_STATUS_SUCCESS,
1333
                    constants.JOB_STATUS_ERROR,
1334
                    constants.JOB_STATUS_CANCELING,
1335
                    constants.JOB_STATUS_CANCELED):
1336
      break
1337

    
1338
    prev_job_info = job_info
1339

    
1340
  jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1341
  if not jobs:
1342
    raise errors.JobLost("Job with id %s lost" % job_id)
1343

    
1344
  status, opstatus, result = jobs[0]
1345

    
1346
  if status == constants.JOB_STATUS_SUCCESS:
1347
    return result
1348

    
1349
  if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
1350
    raise errors.OpExecError("Job was canceled")
1351

    
1352
  has_ok = False
1353
  for idx, (status, msg) in enumerate(zip(opstatus, result)):
1354
    if status == constants.OP_STATUS_SUCCESS:
1355
      has_ok = True
1356
    elif status == constants.OP_STATUS_ERROR:
1357
      errors.MaybeRaise(msg)
1358

    
1359
      if has_ok:
1360
        raise errors.OpExecError("partial failure (opcode %d): %s" %
1361
                                 (idx, msg))
1362

    
1363
      raise errors.OpExecError(str(msg))
1364

    
1365
  # default failure mode
1366
  raise errors.OpExecError(result)
1367

    
1368

    
1369
class JobPollCbBase:
1370
  """Base class for L{GenericPollJob} callbacks.
1371

1372
  """
1373
  def __init__(self):
1374
    """Initializes this class.
1375

1376
    """
1377

    
1378
  def WaitForJobChangeOnce(self, job_id, fields,
1379
                           prev_job_info, prev_log_serial):
1380
    """Waits for changes on a job.
1381

1382
    """
1383
    raise NotImplementedError()
1384

    
1385
  def QueryJobs(self, job_ids, fields):
1386
    """Returns the selected fields for the selected job IDs.
1387

1388
    @type job_ids: list of numbers
1389
    @param job_ids: Job IDs
1390
    @type fields: list of strings
1391
    @param fields: Fields
1392

1393
    """
1394
    raise NotImplementedError()
1395

    
1396

    
1397
class JobPollReportCbBase:
1398
  """Base class for L{GenericPollJob} reporting callbacks.
1399

1400
  """
1401
  def __init__(self):
1402
    """Initializes this class.
1403

1404
    """
1405

    
1406
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1407
    """Handles a log message.
1408

1409
    """
1410
    raise NotImplementedError()
1411

    
1412
  def ReportNotChanged(self, job_id, status):
1413
    """Called for if a job hasn't changed in a while.
1414

1415
    @type job_id: number
1416
    @param job_id: Job ID
1417
    @type status: string or None
1418
    @param status: Job status if available
1419

1420
    """
1421
    raise NotImplementedError()
1422

    
1423

    
1424
class _LuxiJobPollCb(JobPollCbBase):
1425
  def __init__(self, cl):
1426
    """Initializes this class.
1427

1428
    """
1429
    JobPollCbBase.__init__(self)
1430
    self.cl = cl
1431

    
1432
  def WaitForJobChangeOnce(self, job_id, fields,
1433
                           prev_job_info, prev_log_serial):
1434
    """Waits for changes on a job.
1435

1436
    """
1437
    return self.cl.WaitForJobChangeOnce(job_id, fields,
1438
                                        prev_job_info, prev_log_serial)
1439

    
1440
  def QueryJobs(self, job_ids, fields):
1441
    """Returns the selected fields for the selected job IDs.
1442

1443
    """
1444
    return self.cl.QueryJobs(job_ids, fields)
1445

    
1446

    
1447
class FeedbackFnJobPollReportCb(JobPollReportCbBase):
1448
  def __init__(self, feedback_fn):
1449
    """Initializes this class.
1450

1451
    """
1452
    JobPollReportCbBase.__init__(self)
1453

    
1454
    self.feedback_fn = feedback_fn
1455

    
1456
    assert callable(feedback_fn)
1457

    
1458
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1459
    """Handles a log message.
1460

1461
    """
1462
    self.feedback_fn((timestamp, log_type, log_msg))
1463

    
1464
  def ReportNotChanged(self, job_id, status):
1465
    """Called if a job hasn't changed in a while.
1466

1467
    """
1468
    # Ignore
1469

    
1470

    
1471
class StdioJobPollReportCb(JobPollReportCbBase):
1472
  def __init__(self):
1473
    """Initializes this class.
1474

1475
    """
1476
    JobPollReportCbBase.__init__(self)
1477

    
1478
    self.notified_queued = False
1479
    self.notified_waitlock = False
1480

    
1481
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1482
    """Handles a log message.
1483

1484
    """
1485
    ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
1486
             FormatLogMessage(log_type, log_msg))
1487

    
1488
  def ReportNotChanged(self, job_id, status):
1489
    """Called if a job hasn't changed in a while.
1490

1491
    """
1492
    if status is None:
1493
      return
1494

    
1495
    if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
1496
      ToStderr("Job %s is waiting in queue", job_id)
1497
      self.notified_queued = True
1498

    
1499
    elif status == constants.JOB_STATUS_WAITLOCK and not self.notified_waitlock:
1500
      ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1501
      self.notified_waitlock = True
1502

    
1503

    
1504
def FormatLogMessage(log_type, log_msg):
1505
  """Formats a job message according to its type.
1506

1507
  """
1508
  if log_type != constants.ELOG_MESSAGE:
1509
    log_msg = str(log_msg)
1510

    
1511
  return utils.SafeEncode(log_msg)
1512

    
1513

    
1514
def PollJob(job_id, cl=None, feedback_fn=None, reporter=None):
1515
  """Function to poll for the result of a job.
1516

1517
  @type job_id: job identified
1518
  @param job_id: the job to poll for results
1519
  @type cl: luxi.Client
1520
  @param cl: the luxi client to use for communicating with the master;
1521
             if None, a new client will be created
1522

1523
  """
1524
  if cl is None:
1525
    cl = GetClient()
1526

    
1527
  if reporter is None:
1528
    if feedback_fn:
1529
      reporter = FeedbackFnJobPollReportCb(feedback_fn)
1530
    else:
1531
      reporter = StdioJobPollReportCb()
1532
  elif feedback_fn:
1533
    raise errors.ProgrammerError("Can't specify reporter and feedback function")
1534

    
1535
  return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
1536

    
1537

    
1538
def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None, reporter=None):
1539
  """Legacy function to submit an opcode.
1540

1541
  This is just a simple wrapper over the construction of the processor
1542
  instance. It should be extended to better handle feedback and
1543
  interaction functions.
1544

1545
  """
1546
  if cl is None:
1547
    cl = GetClient()
1548

    
1549
  SetGenericOpcodeOpts([op], opts)
1550

    
1551
  job_id = SendJob([op], cl)
1552

    
1553
  op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn,
1554
                       reporter=reporter)
1555

    
1556
  return op_results[0]
1557

    
1558

    
1559
def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1560
  """Wrapper around SubmitOpCode or SendJob.
1561

1562
  This function will decide, based on the 'opts' parameter, whether to
1563
  submit and wait for the result of the opcode (and return it), or
1564
  whether to just send the job and print its identifier. It is used in
1565
  order to simplify the implementation of the '--submit' option.
1566

1567
  It will also process the opcodes if we're sending the via SendJob
1568
  (otherwise SubmitOpCode does it).
1569

1570
  """
1571
  if opts and opts.submit_only:
1572
    job = [op]
1573
    SetGenericOpcodeOpts(job, opts)
1574
    job_id = SendJob(job, cl=cl)
1575
    raise JobSubmittedException(job_id)
1576
  else:
1577
    return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1578

    
1579

    
1580
def SetGenericOpcodeOpts(opcode_list, options):
1581
  """Processor for generic options.
1582

1583
  This function updates the given opcodes based on generic command
1584
  line options (like debug, dry-run, etc.).
1585

1586
  @param opcode_list: list of opcodes
1587
  @param options: command line options or None
1588
  @return: None (in-place modification)
1589

1590
  """
1591
  if not options:
1592
    return
1593
  for op in opcode_list:
1594
    op.dry_run = options.dry_run
1595
    op.debug_level = options.debug
1596

    
1597

    
1598
def GetClient():
1599
  # TODO: Cache object?
1600
  try:
1601
    client = luxi.Client()
1602
  except luxi.NoMasterError:
1603
    ss = ssconf.SimpleStore()
1604

    
1605
    # Try to read ssconf file
1606
    try:
1607
      ss.GetMasterNode()
1608
    except errors.ConfigurationError:
1609
      raise errors.OpPrereqError("Cluster not initialized or this machine is"
1610
                                 " not part of a cluster")
1611

    
1612
    master, myself = ssconf.GetMasterAndMyself(ss=ss)
1613
    if master != myself:
1614
      raise errors.OpPrereqError("This is not the master node, please connect"
1615
                                 " to node '%s' and rerun the command" %
1616
                                 master)
1617
    raise
1618
  return client
1619

    
1620

    
1621
def FormatError(err):
1622
  """Return a formatted error message for a given error.
1623

1624
  This function takes an exception instance and returns a tuple
1625
  consisting of two values: first, the recommended exit code, and
1626
  second, a string describing the error message (not
1627
  newline-terminated).
1628

1629
  """
1630
  retcode = 1
1631
  obuf = StringIO()
1632
  msg = str(err)
1633
  if isinstance(err, errors.ConfigurationError):
1634
    txt = "Corrupt configuration file: %s" % msg
1635
    logging.error(txt)
1636
    obuf.write(txt + "\n")
1637
    obuf.write("Aborting.")
1638
    retcode = 2
1639
  elif isinstance(err, errors.HooksAbort):
1640
    obuf.write("Failure: hooks execution failed:\n")
1641
    for node, script, out in err.args[0]:
1642
      if out:
1643
        obuf.write("  node: %s, script: %s, output: %s\n" %
1644
                   (node, script, out))
1645
      else:
1646
        obuf.write("  node: %s, script: %s (no output)\n" %
1647
                   (node, script))
1648
  elif isinstance(err, errors.HooksFailure):
1649
    obuf.write("Failure: hooks general failure: %s" % msg)
1650
  elif isinstance(err, errors.ResolverError):
1651
    this_host = netutils.HostInfo.SysName()
1652
    if err.args[0] == this_host:
1653
      msg = "Failure: can't resolve my own hostname ('%s')"
1654
    else:
1655
      msg = "Failure: can't resolve hostname '%s'"
1656
    obuf.write(msg % err.args[0])
1657
  elif isinstance(err, errors.OpPrereqError):
1658
    if len(err.args) == 2:
1659
      obuf.write("Failure: prerequisites not met for this"
1660
               " operation:\nerror type: %s, error details:\n%s" %
1661
                 (err.args[1], err.args[0]))
1662
    else:
1663
      obuf.write("Failure: prerequisites not met for this"
1664
                 " operation:\n%s" % msg)
1665
  elif isinstance(err, errors.OpExecError):
1666
    obuf.write("Failure: command execution error:\n%s" % msg)
1667
  elif isinstance(err, errors.TagError):
1668
    obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1669
  elif isinstance(err, errors.JobQueueDrainError):
1670
    obuf.write("Failure: the job queue is marked for drain and doesn't"
1671
               " accept new requests\n")
1672
  elif isinstance(err, errors.JobQueueFull):
1673
    obuf.write("Failure: the job queue is full and doesn't accept new"
1674
               " job submissions until old jobs are archived\n")
1675
  elif isinstance(err, errors.TypeEnforcementError):
1676
    obuf.write("Parameter Error: %s" % msg)
1677
  elif isinstance(err, errors.ParameterError):
1678
    obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1679
  elif isinstance(err, luxi.NoMasterError):
1680
    obuf.write("Cannot communicate with the master daemon.\nIs it running"
1681
               " and listening for connections?")
1682
  elif isinstance(err, luxi.TimeoutError):
1683
    obuf.write("Timeout while talking to the master daemon. Error:\n"
1684
               "%s" % msg)
1685
  elif isinstance(err, luxi.PermissionError):
1686
    obuf.write("It seems you don't have permissions to connect to the"
1687
               " master daemon.\nPlease retry as a different user.")
1688
  elif isinstance(err, luxi.ProtocolError):
1689
    obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1690
               "%s" % msg)
1691
  elif isinstance(err, errors.JobLost):
1692
    obuf.write("Error checking job status: %s" % msg)
1693
  elif isinstance(err, errors.GenericError):
1694
    obuf.write("Unhandled Ganeti error: %s" % msg)
1695
  elif isinstance(err, JobSubmittedException):
1696
    obuf.write("JobID: %s\n" % err.args[0])
1697
    retcode = 0
1698
  else:
1699
    obuf.write("Unhandled exception: %s" % msg)
1700
  return retcode, obuf.getvalue().rstrip('\n')
1701

    
1702

    
1703
def GenericMain(commands, override=None, aliases=None):
1704
  """Generic main function for all the gnt-* commands.
1705

1706
  Arguments:
1707
    - commands: a dictionary with a special structure, see the design doc
1708
                for command line handling.
1709
    - override: if not None, we expect a dictionary with keys that will
1710
                override command line options; this can be used to pass
1711
                options from the scripts to generic functions
1712
    - aliases: dictionary with command aliases {'alias': 'target, ...}
1713

1714
  """
1715
  # save the program name and the entire command line for later logging
1716
  if sys.argv:
1717
    binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1718
    if len(sys.argv) >= 2:
1719
      binary += " " + sys.argv[1]
1720
      old_cmdline = " ".join(sys.argv[2:])
1721
    else:
1722
      old_cmdline = ""
1723
  else:
1724
    binary = "<unknown program>"
1725
    old_cmdline = ""
1726

    
1727
  if aliases is None:
1728
    aliases = {}
1729

    
1730
  try:
1731
    func, options, args = _ParseArgs(sys.argv, commands, aliases)
1732
  except errors.ParameterError, err:
1733
    result, err_msg = FormatError(err)
1734
    ToStderr(err_msg)
1735
    return 1
1736

    
1737
  if func is None: # parse error
1738
    return 1
1739

    
1740
  if override is not None:
1741
    for key, val in override.iteritems():
1742
      setattr(options, key, val)
1743

    
1744
  utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1745
                     stderr_logging=True, program=binary)
1746

    
1747
  if old_cmdline:
1748
    logging.info("run with arguments '%s'", old_cmdline)
1749
  else:
1750
    logging.info("run with no arguments")
1751

    
1752
  try:
1753
    result = func(options, args)
1754
  except (errors.GenericError, luxi.ProtocolError,
1755
          JobSubmittedException), err:
1756
    result, err_msg = FormatError(err)
1757
    logging.exception("Error during command processing")
1758
    ToStderr(err_msg)
1759

    
1760
  return result
1761

    
1762

    
1763
def GenericInstanceCreate(mode, opts, args):
1764
  """Add an instance to the cluster via either creation or import.
1765

1766
  @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1767
  @param opts: the command line options selected by the user
1768
  @type args: list
1769
  @param args: should contain only one element, the new instance name
1770
  @rtype: int
1771
  @return: the desired exit code
1772

1773
  """
1774
  instance = args[0]
1775

    
1776
  (pnode, snode) = SplitNodeOption(opts.node)
1777

    
1778
  hypervisor = None
1779
  hvparams = {}
1780
  if opts.hypervisor:
1781
    hypervisor, hvparams = opts.hypervisor
1782

    
1783
  if opts.nics:
1784
    try:
1785
      nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
1786
    except ValueError, err:
1787
      raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
1788
    nics = [{}] * nic_max
1789
    for nidx, ndict in opts.nics:
1790
      nidx = int(nidx)
1791
      if not isinstance(ndict, dict):
1792
        msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict)
1793
        raise errors.OpPrereqError(msg)
1794
      nics[nidx] = ndict
1795
  elif opts.no_nics:
1796
    # no nics
1797
    nics = []
1798
  elif mode == constants.INSTANCE_CREATE:
1799
    # default of one nic, all auto
1800
    nics = [{}]
1801
  else:
1802
    # mode == import
1803
    nics = []
1804

    
1805
  if opts.disk_template == constants.DT_DISKLESS:
1806
    if opts.disks or opts.sd_size is not None:
1807
      raise errors.OpPrereqError("Diskless instance but disk"
1808
                                 " information passed")
1809
    disks = []
1810
  else:
1811
    if (not opts.disks and not opts.sd_size
1812
        and mode == constants.INSTANCE_CREATE):
1813
      raise errors.OpPrereqError("No disk information specified")
1814
    if opts.disks and opts.sd_size is not None:
1815
      raise errors.OpPrereqError("Please use either the '--disk' or"
1816
                                 " '-s' option")
1817
    if opts.sd_size is not None:
1818
      opts.disks = [(0, {"size": opts.sd_size})]
1819

    
1820
    if opts.disks:
1821
      try:
1822
        disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1823
      except ValueError, err:
1824
        raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
1825
      disks = [{}] * disk_max
1826
    else:
1827
      disks = []
1828
    for didx, ddict in opts.disks:
1829
      didx = int(didx)
1830
      if not isinstance(ddict, dict):
1831
        msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1832
        raise errors.OpPrereqError(msg)
1833
      elif "size" in ddict:
1834
        if "adopt" in ddict:
1835
          raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1836
                                     " (disk %d)" % didx)
1837
        try:
1838
          ddict["size"] = utils.ParseUnit(ddict["size"])
1839
        except ValueError, err:
1840
          raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1841
                                     (didx, err))
1842
      elif "adopt" in ddict:
1843
        if mode == constants.INSTANCE_IMPORT:
1844
          raise errors.OpPrereqError("Disk adoption not allowed for instance"
1845
                                     " import")
1846
        ddict["size"] = 0
1847
      else:
1848
        raise errors.OpPrereqError("Missing size or adoption source for"
1849
                                   " disk %d" % didx)
1850
      disks[didx] = ddict
1851

    
1852
  utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
1853
  utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1854

    
1855
  if mode == constants.INSTANCE_CREATE:
1856
    start = opts.start
1857
    os_type = opts.os
1858
    force_variant = opts.force_variant
1859
    src_node = None
1860
    src_path = None
1861
    no_install = opts.no_install
1862
    identify_defaults = False
1863
  elif mode == constants.INSTANCE_IMPORT:
1864
    start = False
1865
    os_type = None
1866
    force_variant = False
1867
    src_node = opts.src_node
1868
    src_path = opts.src_dir
1869
    no_install = None
1870
    identify_defaults = opts.identify_defaults
1871
  else:
1872
    raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1873

    
1874
  op = opcodes.OpCreateInstance(instance_name=instance,
1875
                                disks=disks,
1876
                                disk_template=opts.disk_template,
1877
                                nics=nics,
1878
                                pnode=pnode, snode=snode,
1879
                                ip_check=opts.ip_check,
1880
                                name_check=opts.name_check,
1881
                                wait_for_sync=opts.wait_for_sync,
1882
                                file_storage_dir=opts.file_storage_dir,
1883
                                file_driver=opts.file_driver,
1884
                                iallocator=opts.iallocator,
1885
                                hypervisor=hypervisor,
1886
                                hvparams=hvparams,
1887
                                beparams=opts.beparams,
1888
                                osparams=opts.osparams,
1889
                                mode=mode,
1890
                                start=start,
1891
                                os_type=os_type,
1892
                                force_variant=force_variant,
1893
                                src_node=src_node,
1894
                                src_path=src_path,
1895
                                no_install=no_install,
1896
                                identify_defaults=identify_defaults)
1897

    
1898
  SubmitOrSend(op, opts)
1899
  return 0
1900

    
1901

    
1902
class _RunWhileClusterStoppedHelper:
1903
  """Helper class for L{RunWhileClusterStopped} to simplify state management
1904

1905
  """
1906
  def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
1907
    """Initializes this class.
1908

1909
    @type feedback_fn: callable
1910
    @param feedback_fn: Feedback function
1911
    @type cluster_name: string
1912
    @param cluster_name: Cluster name
1913
    @type master_node: string
1914
    @param master_node Master node name
1915
    @type online_nodes: list
1916
    @param online_nodes: List of names of online nodes
1917

1918
    """
1919
    self.feedback_fn = feedback_fn
1920
    self.cluster_name = cluster_name
1921
    self.master_node = master_node
1922
    self.online_nodes = online_nodes
1923

    
1924
    self.ssh = ssh.SshRunner(self.cluster_name)
1925

    
1926
    self.nonmaster_nodes = [name for name in online_nodes
1927
                            if name != master_node]
1928

    
1929
    assert self.master_node not in self.nonmaster_nodes
1930

    
1931
  def _RunCmd(self, node_name, cmd):
1932
    """Runs a command on the local or a remote machine.
1933

1934
    @type node_name: string
1935
    @param node_name: Machine name
1936
    @type cmd: list
1937
    @param cmd: Command
1938

1939
    """
1940
    if node_name is None or node_name == self.master_node:
1941
      # No need to use SSH
1942
      result = utils.RunCmd(cmd)
1943
    else:
1944
      result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
1945

    
1946
    if result.failed:
1947
      errmsg = ["Failed to run command %s" % result.cmd]
1948
      if node_name:
1949
        errmsg.append("on node %s" % node_name)
1950
      errmsg.append(": exitcode %s and error %s" %
1951
                    (result.exit_code, result.output))
1952
      raise errors.OpExecError(" ".join(errmsg))
1953

    
1954
  def Call(self, fn, *args):
1955
    """Call function while all daemons are stopped.
1956

1957
    @type fn: callable
1958
    @param fn: Function to be called
1959

1960
    """
1961
    # Pause watcher by acquiring an exclusive lock on watcher state file
1962
    self.feedback_fn("Blocking watcher")
1963
    watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
1964
    try:
1965
      # TODO: Currently, this just blocks. There's no timeout.
1966
      # TODO: Should it be a shared lock?
1967
      watcher_block.Exclusive(blocking=True)
1968

    
1969
      # Stop master daemons, so that no new jobs can come in and all running
1970
      # ones are finished
1971
      self.feedback_fn("Stopping master daemons")
1972
      self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
1973
      try:
1974
        # Stop daemons on all nodes
1975
        for node_name in self.online_nodes:
1976
          self.feedback_fn("Stopping daemons on %s" % node_name)
1977
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
1978

    
1979
        # All daemons are shut down now
1980
        try:
1981
          return fn(self, *args)
1982
        except Exception, err:
1983
          _, errmsg = FormatError(err)
1984
          logging.exception("Caught exception")
1985
          self.feedback_fn(errmsg)
1986
          raise
1987
      finally:
1988
        # Start cluster again, master node last
1989
        for node_name in self.nonmaster_nodes + [self.master_node]:
1990
          self.feedback_fn("Starting daemons on %s" % node_name)
1991
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
1992
    finally:
1993
      # Resume watcher
1994
      watcher_block.Close()
1995

    
1996

    
1997
def RunWhileClusterStopped(feedback_fn, fn, *args):
1998
  """Calls a function while all cluster daemons are stopped.
1999

2000
  @type feedback_fn: callable
2001
  @param feedback_fn: Feedback function
2002
  @type fn: callable
2003
  @param fn: Function to be called when daemons are stopped
2004

2005
  """
2006
  feedback_fn("Gathering cluster information")
2007

    
2008
  # This ensures we're running on the master daemon
2009
  cl = GetClient()
2010

    
2011
  (cluster_name, master_node) = \
2012
    cl.QueryConfigValues(["cluster_name", "master_node"])
2013

    
2014
  online_nodes = GetOnlineNodes([], cl=cl)
2015

    
2016
  # Don't keep a reference to the client. The master daemon will go away.
2017
  del cl
2018

    
2019
  assert master_node in online_nodes
2020

    
2021
  return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
2022
                                       online_nodes).Call(fn, *args)
2023

    
2024

    
2025
def GenerateTable(headers, fields, separator, data,
2026
                  numfields=None, unitfields=None,
2027
                  units=None):
2028
  """Prints a table with headers and different fields.
2029

2030
  @type headers: dict
2031
  @param headers: dictionary mapping field names to headers for
2032
      the table
2033
  @type fields: list
2034
  @param fields: the field names corresponding to each row in
2035
      the data field
2036
  @param separator: the separator to be used; if this is None,
2037
      the default 'smart' algorithm is used which computes optimal
2038
      field width, otherwise just the separator is used between
2039
      each field
2040
  @type data: list
2041
  @param data: a list of lists, each sublist being one row to be output
2042
  @type numfields: list
2043
  @param numfields: a list with the fields that hold numeric
2044
      values and thus should be right-aligned
2045
  @type unitfields: list
2046
  @param unitfields: a list with the fields that hold numeric
2047
      values that should be formatted with the units field
2048
  @type units: string or None
2049
  @param units: the units we should use for formatting, or None for
2050
      automatic choice (human-readable for non-separator usage, otherwise
2051
      megabytes); this is a one-letter string
2052

2053
  """
2054
  if units is None:
2055
    if separator:
2056
      units = "m"
2057
    else:
2058
      units = "h"
2059

    
2060
  if numfields is None:
2061
    numfields = []
2062
  if unitfields is None:
2063
    unitfields = []
2064

    
2065
  numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
2066
  unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
2067

    
2068
  format_fields = []
2069
  for field in fields:
2070
    if headers and field not in headers:
2071
      # TODO: handle better unknown fields (either revert to old
2072
      # style of raising exception, or deal more intelligently with
2073
      # variable fields)
2074
      headers[field] = field
2075
    if separator is not None:
2076
      format_fields.append("%s")
2077
    elif numfields.Matches(field):
2078
      format_fields.append("%*s")
2079
    else:
2080
      format_fields.append("%-*s")
2081

    
2082
  if separator is None:
2083
    mlens = [0 for name in fields]
2084
    format_str = ' '.join(format_fields)
2085
  else:
2086
    format_str = separator.replace("%", "%%").join(format_fields)
2087

    
2088
  for row in data:
2089
    if row is None:
2090
      continue
2091
    for idx, val in enumerate(row):
2092
      if unitfields.Matches(fields[idx]):
2093
        try:
2094
          val = int(val)
2095
        except (TypeError, ValueError):
2096
          pass
2097
        else:
2098
          val = row[idx] = utils.FormatUnit(val, units)
2099
      val = row[idx] = str(val)
2100
      if separator is None:
2101
        mlens[idx] = max(mlens[idx], len(val))
2102

    
2103
  result = []
2104
  if headers:
2105
    args = []
2106
    for idx, name in enumerate(fields):
2107
      hdr = headers[name]
2108
      if separator is None:
2109
        mlens[idx] = max(mlens[idx], len(hdr))
2110
        args.append(mlens[idx])
2111
      args.append(hdr)
2112
    result.append(format_str % tuple(args))
2113

    
2114
  if separator is None:
2115
    assert len(mlens) == len(fields)
2116

    
2117
    if fields and not numfields.Matches(fields[-1]):
2118
      mlens[-1] = 0
2119

    
2120
  for line in data:
2121
    args = []
2122
    if line is None:
2123
      line = ['-' for _ in fields]
2124
    for idx in range(len(fields)):
2125
      if separator is None:
2126
        args.append(mlens[idx])
2127
      args.append(line[idx])
2128
    result.append(format_str % tuple(args))
2129

    
2130
  return result
2131

    
2132

    
2133
def FormatTimestamp(ts):
2134
  """Formats a given timestamp.
2135

2136
  @type ts: timestamp
2137
  @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
2138

2139
  @rtype: string
2140
  @return: a string with the formatted timestamp
2141

2142
  """
2143
  if not isinstance (ts, (tuple, list)) or len(ts) != 2:
2144
    return '?'
2145
  sec, usec = ts
2146
  return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
2147

    
2148

    
2149
def ParseTimespec(value):
2150
  """Parse a time specification.
2151

2152
  The following suffixed will be recognized:
2153

2154
    - s: seconds
2155
    - m: minutes
2156
    - h: hours
2157
    - d: day
2158
    - w: weeks
2159

2160
  Without any suffix, the value will be taken to be in seconds.
2161

2162
  """
2163
  value = str(value)
2164
  if not value:
2165
    raise errors.OpPrereqError("Empty time specification passed")
2166
  suffix_map = {
2167
    's': 1,
2168
    'm': 60,
2169
    'h': 3600,
2170
    'd': 86400,
2171
    'w': 604800,
2172
    }
2173
  if value[-1] not in suffix_map:
2174
    try:
2175
      value = int(value)
2176
    except (TypeError, ValueError):
2177
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2178
  else:
2179
    multiplier = suffix_map[value[-1]]
2180
    value = value[:-1]
2181
    if not value: # no data left after stripping the suffix
2182
      raise errors.OpPrereqError("Invalid time specification (only"
2183
                                 " suffix passed)")
2184
    try:
2185
      value = int(value) * multiplier
2186
    except (TypeError, ValueError):
2187
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2188
  return value
2189

    
2190

    
2191
def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
2192
                   filter_master=False):
2193
  """Returns the names of online nodes.
2194

2195
  This function will also log a warning on stderr with the names of
2196
  the online nodes.
2197

2198
  @param nodes: if not empty, use only this subset of nodes (minus the
2199
      offline ones)
2200
  @param cl: if not None, luxi client to use
2201
  @type nowarn: boolean
2202
  @param nowarn: by default, this function will output a note with the
2203
      offline nodes that are skipped; if this parameter is True the
2204
      note is not displayed
2205
  @type secondary_ips: boolean
2206
  @param secondary_ips: if True, return the secondary IPs instead of the
2207
      names, useful for doing network traffic over the replication interface
2208
      (if any)
2209
  @type filter_master: boolean
2210
  @param filter_master: if True, do not return the master node in the list
2211
      (useful in coordination with secondary_ips where we cannot check our
2212
      node name against the list)
2213

2214
  """
2215
  if cl is None:
2216
    cl = GetClient()
2217

    
2218
  if secondary_ips:
2219
    name_idx = 2
2220
  else:
2221
    name_idx = 0
2222

    
2223
  if filter_master:
2224
    master_node = cl.QueryConfigValues(["master_node"])[0]
2225
    filter_fn = lambda x: x != master_node
2226
  else:
2227
    filter_fn = lambda _: True
2228

    
2229
  result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"],
2230
                         use_locking=False)
2231
  offline = [row[0] for row in result if row[1]]
2232
  if offline and not nowarn:
2233
    ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
2234
  return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])]
2235

    
2236

    
2237
def _ToStream(stream, txt, *args):
2238
  """Write a message to a stream, bypassing the logging system
2239

2240
  @type stream: file object
2241
  @param stream: the file to which we should write
2242
  @type txt: str
2243
  @param txt: the message
2244

2245
  """
2246
  if args:
2247
    args = tuple(args)
2248
    stream.write(txt % args)
2249
  else:
2250
    stream.write(txt)
2251
  stream.write('\n')
2252
  stream.flush()
2253

    
2254

    
2255
def ToStdout(txt, *args):
2256
  """Write a message to stdout only, bypassing the logging system
2257

2258
  This is just a wrapper over _ToStream.
2259

2260
  @type txt: str
2261
  @param txt: the message
2262

2263
  """
2264
  _ToStream(sys.stdout, txt, *args)
2265

    
2266

    
2267
def ToStderr(txt, *args):
2268
  """Write a message to stderr only, bypassing the logging system
2269

2270
  This is just a wrapper over _ToStream.
2271

2272
  @type txt: str
2273
  @param txt: the message
2274

2275
  """
2276
  _ToStream(sys.stderr, txt, *args)
2277

    
2278

    
2279
class JobExecutor(object):
2280
  """Class which manages the submission and execution of multiple jobs.
2281

2282
  Note that instances of this class should not be reused between
2283
  GetResults() calls.
2284

2285
  """
2286
  def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
2287
    self.queue = []
2288
    if cl is None:
2289
      cl = GetClient()
2290
    self.cl = cl
2291
    self.verbose = verbose
2292
    self.jobs = []
2293
    self.opts = opts
2294
    self.feedback_fn = feedback_fn
2295

    
2296
  def QueueJob(self, name, *ops):
2297
    """Record a job for later submit.
2298

2299
    @type name: string
2300
    @param name: a description of the job, will be used in WaitJobSet
2301
    """
2302
    SetGenericOpcodeOpts(ops, self.opts)
2303
    self.queue.append((name, ops))
2304

    
2305
  def SubmitPending(self, each=False):
2306
    """Submit all pending jobs.
2307

2308
    """
2309
    if each:
2310
      results = []
2311
      for row in self.queue:
2312
        # SubmitJob will remove the success status, but raise an exception if
2313
        # the submission fails, so we'll notice that anyway.
2314
        results.append([True, self.cl.SubmitJob(row[1])])
2315
    else:
2316
      results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
2317
    for (idx, ((status, data), (name, _))) in enumerate(zip(results,
2318
                                                            self.queue)):
2319
      self.jobs.append((idx, status, data, name))
2320

    
2321
  def _ChooseJob(self):
2322
    """Choose a non-waiting/queued job to poll next.
2323

2324
    """
2325
    assert self.jobs, "_ChooseJob called with empty job list"
2326

    
2327
    result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
2328
    assert result
2329

    
2330
    for job_data, status in zip(self.jobs, result):
2331
      if (isinstance(status, list) and status and
2332
          status[0] in (constants.JOB_STATUS_QUEUED,
2333
                        constants.JOB_STATUS_WAITLOCK,
2334
                        constants.JOB_STATUS_CANCELING)):
2335
        # job is still present and waiting
2336
        continue
2337
      # good candidate found (either running job or lost job)
2338
      self.jobs.remove(job_data)
2339
      return job_data
2340

    
2341
    # no job found
2342
    return self.jobs.pop(0)
2343

    
2344
  def GetResults(self):
2345
    """Wait for and return the results of all jobs.
2346

2347
    @rtype: list
2348
    @return: list of tuples (success, job results), in the same order
2349
        as the submitted jobs; if a job has failed, instead of the result
2350
        there will be the error message
2351

2352
    """
2353
    if not self.jobs:
2354
      self.SubmitPending()
2355
    results = []
2356
    if self.verbose:
2357
      ok_jobs = [row[2] for row in self.jobs if row[1]]
2358
      if ok_jobs:
2359
        ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2360

    
2361
    # first, remove any non-submitted jobs
2362
    self.jobs, failures = compat.partition(self.jobs, lambda x: x[1])
2363
    for idx, _, jid, name in failures:
2364
      ToStderr("Failed to submit job for %s: %s", name, jid)
2365
      results.append((idx, False, jid))
2366

    
2367
    while self.jobs:
2368
      (idx, _, jid, name) = self._ChooseJob()
2369
      ToStdout("Waiting for job %s for %s...", jid, name)
2370
      try:
2371
        job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2372
        success = True
2373
      except errors.JobLost, err:
2374
        _, job_result = FormatError(err)
2375
        ToStderr("Job %s for %s has been archived, cannot check its result",
2376
                 jid, name)
2377
        success = False
2378
      except (errors.GenericError, luxi.ProtocolError), err:
2379
        _, job_result = FormatError(err)
2380
        success = False
2381
        # the error message will always be shown, verbose or not
2382
        ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
2383

    
2384
      results.append((idx, success, job_result))
2385

    
2386
    # sort based on the index, then drop it
2387
    results.sort()
2388
    results = [i[1:] for i in results]
2389

    
2390
    return results
2391

    
2392
  def WaitOrShow(self, wait):
2393
    """Wait for job results or only print the job IDs.
2394

2395
    @type wait: boolean
2396
    @param wait: whether to wait or not
2397

2398
    """
2399
    if wait:
2400
      return self.GetResults()
2401
    else:
2402
      if not self.jobs:
2403
        self.SubmitPending()
2404
      for _, status, result, name in self.jobs:
2405
        if status:
2406
          ToStdout("%s: %s", result, name)
2407
        else:
2408
          ToStderr("Failure for %s: %s", name, result)
2409
      return [row[1:3] for row in self.jobs]