Statistics
| Branch: | Tag: | Revision:

root / lib / cli.py @ 535b49cb

History | View | Annotate | Download (75.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module dealing with command line parsing"""
23

    
24

    
25
import sys
26
import textwrap
27
import os.path
28
import time
29
import logging
30
from cStringIO import StringIO
31

    
32
from ganeti import utils
33
from ganeti import errors
34
from ganeti import constants
35
from ganeti import opcodes
36
from ganeti import luxi
37
from ganeti import ssconf
38
from ganeti import rpc
39
from ganeti import ssh
40
from ganeti import compat
41

    
42
from optparse import (OptionParser, TitledHelpFormatter,
43
                      Option, OptionValueError)
44

    
45

    
46
__all__ = [
47
  # Command line options
48
  "ADD_UIDS_OPT",
49
  "ALLOCATABLE_OPT",
50
  "ALL_OPT",
51
  "AUTO_PROMOTE_OPT",
52
  "AUTO_REPLACE_OPT",
53
  "BACKEND_OPT",
54
  "CLEANUP_OPT",
55
  "CLUSTER_DOMAIN_SECRET_OPT",
56
  "CONFIRM_OPT",
57
  "CP_SIZE_OPT",
58
  "DEBUG_OPT",
59
  "DEBUG_SIMERR_OPT",
60
  "DISKIDX_OPT",
61
  "DISK_OPT",
62
  "DISK_TEMPLATE_OPT",
63
  "DRAINED_OPT",
64
  "EARLY_RELEASE_OPT",
65
  "ENABLED_HV_OPT",
66
  "ERROR_CODES_OPT",
67
  "FIELDS_OPT",
68
  "FILESTORE_DIR_OPT",
69
  "FILESTORE_DRIVER_OPT",
70
  "FORCE_OPT",
71
  "FORCE_VARIANT_OPT",
72
  "GLOBAL_FILEDIR_OPT",
73
  "HVLIST_OPT",
74
  "HVOPTS_OPT",
75
  "HYPERVISOR_OPT",
76
  "IALLOCATOR_OPT",
77
  "IDENTIFY_DEFAULTS_OPT",
78
  "IGNORE_CONSIST_OPT",
79
  "IGNORE_FAILURES_OPT",
80
  "IGNORE_REMOVE_FAILURES_OPT",
81
  "IGNORE_SECONDARIES_OPT",
82
  "IGNORE_SIZE_OPT",
83
  "MAC_PREFIX_OPT",
84
  "MAINTAIN_NODE_HEALTH_OPT",
85
  "MASTER_NETDEV_OPT",
86
  "MC_OPT",
87
  "NET_OPT",
88
  "NEW_CLUSTER_CERT_OPT",
89
  "NEW_CLUSTER_DOMAIN_SECRET_OPT",
90
  "NEW_CONFD_HMAC_KEY_OPT",
91
  "NEW_RAPI_CERT_OPT",
92
  "NEW_SECONDARY_OPT",
93
  "NIC_PARAMS_OPT",
94
  "NODE_LIST_OPT",
95
  "NODE_PLACEMENT_OPT",
96
  "NOHDR_OPT",
97
  "NOIPCHECK_OPT",
98
  "NO_INSTALL_OPT",
99
  "NONAMECHECK_OPT",
100
  "NOLVM_STORAGE_OPT",
101
  "NOMODIFY_ETCHOSTS_OPT",
102
  "NOMODIFY_SSH_SETUP_OPT",
103
  "NONICS_OPT",
104
  "NONLIVE_OPT",
105
  "NONPLUS1_OPT",
106
  "NOSHUTDOWN_OPT",
107
  "NOSTART_OPT",
108
  "NOSSH_KEYCHECK_OPT",
109
  "NOVOTING_OPT",
110
  "NWSYNC_OPT",
111
  "ON_PRIMARY_OPT",
112
  "ON_SECONDARY_OPT",
113
  "OFFLINE_OPT",
114
  "OSPARAMS_OPT",
115
  "OS_OPT",
116
  "OS_SIZE_OPT",
117
  "RAPI_CERT_OPT",
118
  "READD_OPT",
119
  "REBOOT_TYPE_OPT",
120
  "REMOVE_INSTANCE_OPT",
121
  "REMOVE_UIDS_OPT",
122
  "ROMAN_OPT",
123
  "SECONDARY_IP_OPT",
124
  "SELECT_OS_OPT",
125
  "SEP_OPT",
126
  "SHOWCMD_OPT",
127
  "SHUTDOWN_TIMEOUT_OPT",
128
  "SINGLE_NODE_OPT",
129
  "SRC_DIR_OPT",
130
  "SRC_NODE_OPT",
131
  "SUBMIT_OPT",
132
  "STATIC_OPT",
133
  "SYNC_OPT",
134
  "TAG_SRC_OPT",
135
  "TIMEOUT_OPT",
136
  "UIDPOOL_OPT",
137
  "USEUNITS_OPT",
138
  "USE_REPL_NET_OPT",
139
  "VERBOSE_OPT",
140
  "VG_NAME_OPT",
141
  "YES_DOIT_OPT",
142
  # Generic functions for CLI programs
143
  "GenericMain",
144
  "GenericInstanceCreate",
145
  "GetClient",
146
  "GetOnlineNodes",
147
  "JobExecutor",
148
  "JobSubmittedException",
149
  "ParseTimespec",
150
  "RunWhileClusterStopped",
151
  "SubmitOpCode",
152
  "SubmitOrSend",
153
  "UsesRPC",
154
  # Formatting functions
155
  "ToStderr", "ToStdout",
156
  "FormatError",
157
  "GenerateTable",
158
  "AskUser",
159
  "FormatTimestamp",
160
  # Tags functions
161
  "ListTags",
162
  "AddTags",
163
  "RemoveTags",
164
  # command line options support infrastructure
165
  "ARGS_MANY_INSTANCES",
166
  "ARGS_MANY_NODES",
167
  "ARGS_NONE",
168
  "ARGS_ONE_INSTANCE",
169
  "ARGS_ONE_NODE",
170
  "ARGS_ONE_OS",
171
  "ArgChoice",
172
  "ArgCommand",
173
  "ArgFile",
174
  "ArgHost",
175
  "ArgInstance",
176
  "ArgJobId",
177
  "ArgNode",
178
  "ArgOs",
179
  "ArgSuggest",
180
  "ArgUnknown",
181
  "OPT_COMPL_INST_ADD_NODES",
182
  "OPT_COMPL_MANY_NODES",
183
  "OPT_COMPL_ONE_IALLOCATOR",
184
  "OPT_COMPL_ONE_INSTANCE",
185
  "OPT_COMPL_ONE_NODE",
186
  "OPT_COMPL_ONE_OS",
187
  "cli_option",
188
  "SplitNodeOption",
189
  "CalculateOSNames",
190
  ]
191

    
192
NO_PREFIX = "no_"
193
UN_PREFIX = "-"
194

    
195

    
196
class _Argument:
197
  def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
198
    self.min = min
199
    self.max = max
200

    
201
  def __repr__(self):
202
    return ("<%s min=%s max=%s>" %
203
            (self.__class__.__name__, self.min, self.max))
204

    
205

    
206
class ArgSuggest(_Argument):
207
  """Suggesting argument.
208

209
  Value can be any of the ones passed to the constructor.
210

211
  """
212
  # pylint: disable-msg=W0622
213
  def __init__(self, min=0, max=None, choices=None):
214
    _Argument.__init__(self, min=min, max=max)
215
    self.choices = choices
216

    
217
  def __repr__(self):
218
    return ("<%s min=%s max=%s choices=%r>" %
219
            (self.__class__.__name__, self.min, self.max, self.choices))
220

    
221

    
222
class ArgChoice(ArgSuggest):
223
  """Choice argument.
224

225
  Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
226
  but value must be one of the choices.
227

228
  """
229

    
230

    
231
class ArgUnknown(_Argument):
232
  """Unknown argument to program (e.g. determined at runtime).
233

234
  """
235

    
236

    
237
class ArgInstance(_Argument):
238
  """Instances argument.
239

240
  """
241

    
242

    
243
class ArgNode(_Argument):
244
  """Node argument.
245

246
  """
247

    
248
class ArgJobId(_Argument):
249
  """Job ID argument.
250

251
  """
252

    
253

    
254
class ArgFile(_Argument):
255
  """File path argument.
256

257
  """
258

    
259

    
260
class ArgCommand(_Argument):
261
  """Command argument.
262

263
  """
264

    
265

    
266
class ArgHost(_Argument):
267
  """Host argument.
268

269
  """
270

    
271

    
272
class ArgOs(_Argument):
273
  """OS argument.
274

275
  """
276

    
277

    
278
ARGS_NONE = []
279
ARGS_MANY_INSTANCES = [ArgInstance()]
280
ARGS_MANY_NODES = [ArgNode()]
281
ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
282
ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
283
ARGS_ONE_OS = [ArgOs(min=1, max=1)]
284

    
285

    
286
def _ExtractTagsObject(opts, args):
287
  """Extract the tag type object.
288

289
  Note that this function will modify its args parameter.
290

291
  """
292
  if not hasattr(opts, "tag_type"):
293
    raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
294
  kind = opts.tag_type
295
  if kind == constants.TAG_CLUSTER:
296
    retval = kind, kind
297
  elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
298
    if not args:
299
      raise errors.OpPrereqError("no arguments passed to the command")
300
    name = args.pop(0)
301
    retval = kind, name
302
  else:
303
    raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
304
  return retval
305

    
306

    
307
def _ExtendTags(opts, args):
308
  """Extend the args if a source file has been given.
309

310
  This function will extend the tags with the contents of the file
311
  passed in the 'tags_source' attribute of the opts parameter. A file
312
  named '-' will be replaced by stdin.
313

314
  """
315
  fname = opts.tags_source
316
  if fname is None:
317
    return
318
  if fname == "-":
319
    new_fh = sys.stdin
320
  else:
321
    new_fh = open(fname, "r")
322
  new_data = []
323
  try:
324
    # we don't use the nice 'new_data = [line.strip() for line in fh]'
325
    # because of python bug 1633941
326
    while True:
327
      line = new_fh.readline()
328
      if not line:
329
        break
330
      new_data.append(line.strip())
331
  finally:
332
    new_fh.close()
333
  args.extend(new_data)
334

    
335

    
336
def ListTags(opts, args):
337
  """List the tags on a given object.
338

339
  This is a generic implementation that knows how to deal with all
340
  three cases of tag objects (cluster, node, instance). The opts
341
  argument is expected to contain a tag_type field denoting what
342
  object type we work on.
343

344
  """
345
  kind, name = _ExtractTagsObject(opts, args)
346
  cl = GetClient()
347
  result = cl.QueryTags(kind, name)
348
  result = list(result)
349
  result.sort()
350
  for tag in result:
351
    ToStdout(tag)
352

    
353

    
354
def AddTags(opts, args):
355
  """Add tags on a given object.
356

357
  This is a generic implementation that knows how to deal with all
358
  three cases of tag objects (cluster, node, instance). The opts
359
  argument is expected to contain a tag_type field denoting what
360
  object type we work on.
361

362
  """
363
  kind, name = _ExtractTagsObject(opts, args)
364
  _ExtendTags(opts, args)
365
  if not args:
366
    raise errors.OpPrereqError("No tags to be added")
367
  op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
368
  SubmitOpCode(op)
369

    
370

    
371
def RemoveTags(opts, args):
372
  """Remove tags from a given object.
373

374
  This is a generic implementation that knows how to deal with all
375
  three cases of tag objects (cluster, node, instance). The opts
376
  argument is expected to contain a tag_type field denoting what
377
  object type we work on.
378

379
  """
380
  kind, name = _ExtractTagsObject(opts, args)
381
  _ExtendTags(opts, args)
382
  if not args:
383
    raise errors.OpPrereqError("No tags to be removed")
384
  op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
385
  SubmitOpCode(op)
386

    
387

    
388
def check_unit(option, opt, value): # pylint: disable-msg=W0613
389
  """OptParsers custom converter for units.
390

391
  """
392
  try:
393
    return utils.ParseUnit(value)
394
  except errors.UnitParseError, err:
395
    raise OptionValueError("option %s: %s" % (opt, err))
396

    
397

    
398
def _SplitKeyVal(opt, data):
399
  """Convert a KeyVal string into a dict.
400

401
  This function will convert a key=val[,...] string into a dict. Empty
402
  values will be converted specially: keys which have the prefix 'no_'
403
  will have the value=False and the prefix stripped, the others will
404
  have value=True.
405

406
  @type opt: string
407
  @param opt: a string holding the option name for which we process the
408
      data, used in building error messages
409
  @type data: string
410
  @param data: a string of the format key=val,key=val,...
411
  @rtype: dict
412
  @return: {key=val, key=val}
413
  @raises errors.ParameterError: if there are duplicate keys
414

415
  """
416
  kv_dict = {}
417
  if data:
418
    for elem in utils.UnescapeAndSplit(data, sep=","):
419
      if "=" in elem:
420
        key, val = elem.split("=", 1)
421
      else:
422
        if elem.startswith(NO_PREFIX):
423
          key, val = elem[len(NO_PREFIX):], False
424
        elif elem.startswith(UN_PREFIX):
425
          key, val = elem[len(UN_PREFIX):], None
426
        else:
427
          key, val = elem, True
428
      if key in kv_dict:
429
        raise errors.ParameterError("Duplicate key '%s' in option %s" %
430
                                    (key, opt))
431
      kv_dict[key] = val
432
  return kv_dict
433

    
434

    
435
def check_ident_key_val(option, opt, value):  # pylint: disable-msg=W0613
436
  """Custom parser for ident:key=val,key=val options.
437

438
  This will store the parsed values as a tuple (ident, {key: val}). As such,
439
  multiple uses of this option via action=append is possible.
440

441
  """
442
  if ":" not in value:
443
    ident, rest = value, ''
444
  else:
445
    ident, rest = value.split(":", 1)
446

    
447
  if ident.startswith(NO_PREFIX):
448
    if rest:
449
      msg = "Cannot pass options when removing parameter groups: %s" % value
450
      raise errors.ParameterError(msg)
451
    retval = (ident[len(NO_PREFIX):], False)
452
  elif ident.startswith(UN_PREFIX):
453
    if rest:
454
      msg = "Cannot pass options when removing parameter groups: %s" % value
455
      raise errors.ParameterError(msg)
456
    retval = (ident[len(UN_PREFIX):], None)
457
  else:
458
    kv_dict = _SplitKeyVal(opt, rest)
459
    retval = (ident, kv_dict)
460
  return retval
461

    
462

    
463
def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
464
  """Custom parser class for key=val,key=val options.
465

466
  This will store the parsed values as a dict {key: val}.
467

468
  """
469
  return _SplitKeyVal(opt, value)
470

    
471

    
472
def check_bool(option, opt, value): # pylint: disable-msg=W0613
473
  """Custom parser for yes/no options.
474

475
  This will store the parsed value as either True or False.
476

477
  """
478
  value = value.lower()
479
  if value == constants.VALUE_FALSE or value == "no":
480
    return False
481
  elif value == constants.VALUE_TRUE or value == "yes":
482
    return True
483
  else:
484
    raise errors.ParameterError("Invalid boolean value '%s'" % value)
485

    
486

    
487
# completion_suggestion is normally a list. Using numeric values not evaluating
488
# to False for dynamic completion.
489
(OPT_COMPL_MANY_NODES,
490
 OPT_COMPL_ONE_NODE,
491
 OPT_COMPL_ONE_INSTANCE,
492
 OPT_COMPL_ONE_OS,
493
 OPT_COMPL_ONE_IALLOCATOR,
494
 OPT_COMPL_INST_ADD_NODES) = range(100, 106)
495

    
496
OPT_COMPL_ALL = frozenset([
497
  OPT_COMPL_MANY_NODES,
498
  OPT_COMPL_ONE_NODE,
499
  OPT_COMPL_ONE_INSTANCE,
500
  OPT_COMPL_ONE_OS,
501
  OPT_COMPL_ONE_IALLOCATOR,
502
  OPT_COMPL_INST_ADD_NODES,
503
  ])
504

    
505

    
506
class CliOption(Option):
507
  """Custom option class for optparse.
508

509
  """
510
  ATTRS = Option.ATTRS + [
511
    "completion_suggest",
512
    ]
513
  TYPES = Option.TYPES + (
514
    "identkeyval",
515
    "keyval",
516
    "unit",
517
    "bool",
518
    )
519
  TYPE_CHECKER = Option.TYPE_CHECKER.copy()
520
  TYPE_CHECKER["identkeyval"] = check_ident_key_val
521
  TYPE_CHECKER["keyval"] = check_key_val
522
  TYPE_CHECKER["unit"] = check_unit
523
  TYPE_CHECKER["bool"] = check_bool
524

    
525

    
526
# optparse.py sets make_option, so we do it for our own option class, too
527
cli_option = CliOption
528

    
529

    
530
_YORNO = "yes|no"
531

    
532
DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
533
                       help="Increase debugging level")
534

    
535
NOHDR_OPT = cli_option("--no-headers", default=False,
536
                       action="store_true", dest="no_headers",
537
                       help="Don't display column headers")
538

    
539
SEP_OPT = cli_option("--separator", default=None,
540
                     action="store", dest="separator",
541
                     help=("Separator between output fields"
542
                           " (defaults to one space)"))
543

    
544
USEUNITS_OPT = cli_option("--units", default=None,
545
                          dest="units", choices=('h', 'm', 'g', 't'),
546
                          help="Specify units for output (one of hmgt)")
547

    
548
FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
549
                        type="string", metavar="FIELDS",
550
                        help="Comma separated list of output fields")
551

    
552
FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
553
                       default=False, help="Force the operation")
554

    
555
CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
556
                         default=False, help="Do not require confirmation")
557

    
558
TAG_SRC_OPT = cli_option("--from", dest="tags_source",
559
                         default=None, help="File with tag names")
560

    
561
SUBMIT_OPT = cli_option("--submit", dest="submit_only",
562
                        default=False, action="store_true",
563
                        help=("Submit the job and return the job ID, but"
564
                              " don't wait for the job to finish"))
565

    
566
SYNC_OPT = cli_option("--sync", dest="do_locking",
567
                      default=False, action="store_true",
568
                      help=("Grab locks while doing the queries"
569
                            " in order to ensure more consistent results"))
570

    
571
_DRY_RUN_OPT = cli_option("--dry-run", default=False,
572
                          action="store_true",
573
                          help=("Do not execute the operation, just run the"
574
                                " check steps and verify it it could be"
575
                                " executed"))
576

    
577
VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
578
                         action="store_true",
579
                         help="Increase the verbosity of the operation")
580

    
581
DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
582
                              action="store_true", dest="simulate_errors",
583
                              help="Debugging option that makes the operation"
584
                              " treat most runtime checks as failed")
585

    
586
NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
587
                        default=True, action="store_false",
588
                        help="Don't wait for sync (DANGEROUS!)")
589

    
590
DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
591
                               help="Custom disk setup (diskless, file,"
592
                               " plain or drbd)",
593
                               default=None, metavar="TEMPL",
594
                               choices=list(constants.DISK_TEMPLATES))
595

    
596
NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
597
                        help="Do not create any network cards for"
598
                        " the instance")
599

    
600
FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
601
                               help="Relative path under default cluster-wide"
602
                               " file storage dir to store file-based disks",
603
                               default=None, metavar="<DIR>")
604

    
605
FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
606
                                  help="Driver to use for image files",
607
                                  default="loop", metavar="<DRIVER>",
608
                                  choices=list(constants.FILE_DRIVER))
609

    
610
IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
611
                            help="Select nodes for the instance automatically"
612
                            " using the <NAME> iallocator plugin",
613
                            default=None, type="string",
614
                            completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
615

    
616
OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
617
                    metavar="<os>",
618
                    completion_suggest=OPT_COMPL_ONE_OS)
619

    
620
OSPARAMS_OPT = cli_option("-O", "--os-parameters", dest="osparams",
621
                         type="keyval", default={},
622
                         help="OS parameters")
623

    
624
FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
625
                               action="store_true", default=False,
626
                               help="Force an unknown variant")
627

    
628
NO_INSTALL_OPT = cli_option("--no-install", dest="no_install",
629
                            action="store_true", default=False,
630
                            help="Do not install the OS (will"
631
                            " enable no-start)")
632

    
633
BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
634
                         type="keyval", default={},
635
                         help="Backend parameters")
636

    
637
HVOPTS_OPT =  cli_option("-H", "--hypervisor-parameters", type="keyval",
638
                         default={}, dest="hvparams",
639
                         help="Hypervisor parameters")
640

    
641
HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
642
                            help="Hypervisor and hypervisor options, in the"
643
                            " format hypervisor:option=value,option=value,...",
644
                            default=None, type="identkeyval")
645

    
646
HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
647
                        help="Hypervisor and hypervisor options, in the"
648
                        " format hypervisor:option=value,option=value,...",
649
                        default=[], action="append", type="identkeyval")
650

    
651
NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
652
                           action="store_false",
653
                           help="Don't check that the instance's IP"
654
                           " is alive")
655

    
656
NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
657
                             default=True, action="store_false",
658
                             help="Don't check that the instance's name"
659
                             " is resolvable")
660

    
661
NET_OPT = cli_option("--net",
662
                     help="NIC parameters", default=[],
663
                     dest="nics", action="append", type="identkeyval")
664

    
665
DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
666
                      dest="disks", action="append", type="identkeyval")
667

    
668
DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
669
                         help="Comma-separated list of disks"
670
                         " indices to act on (e.g. 0,2) (optional,"
671
                         " defaults to all disks)")
672

    
673
OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
674
                         help="Enforces a single-disk configuration using the"
675
                         " given disk size, in MiB unless a suffix is used",
676
                         default=None, type="unit", metavar="<size>")
677

    
678
IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
679
                                dest="ignore_consistency",
680
                                action="store_true", default=False,
681
                                help="Ignore the consistency of the disks on"
682
                                " the secondary")
683

    
684
NONLIVE_OPT = cli_option("--non-live", dest="live",
685
                         default=True, action="store_false",
686
                         help="Do a non-live migration (this usually means"
687
                         " freeze the instance, save the state, transfer and"
688
                         " only then resume running on the secondary node)")
689

    
690
NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
691
                                help="Target node and optional secondary node",
692
                                metavar="<pnode>[:<snode>]",
693
                                completion_suggest=OPT_COMPL_INST_ADD_NODES)
694

    
695
NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
696
                           action="append", metavar="<node>",
697
                           help="Use only this node (can be used multiple"
698
                           " times, if not given defaults to all nodes)",
699
                           completion_suggest=OPT_COMPL_ONE_NODE)
700

    
701
SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
702
                             metavar="<node>",
703
                             completion_suggest=OPT_COMPL_ONE_NODE)
704

    
705
NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
706
                         action="store_false",
707
                         help="Don't start the instance after creation")
708

    
709
SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
710
                         action="store_true", default=False,
711
                         help="Show command instead of executing it")
712

    
713
CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
714
                         default=False, action="store_true",
715
                         help="Instead of performing the migration, try to"
716
                         " recover from a failed cleanup. This is safe"
717
                         " to run even if the instance is healthy, but it"
718
                         " will create extra replication traffic and "
719
                         " disrupt briefly the replication (like during the"
720
                         " migration")
721

    
722
STATIC_OPT = cli_option("-s", "--static", dest="static",
723
                        action="store_true", default=False,
724
                        help="Only show configuration data, not runtime data")
725

    
726
ALL_OPT = cli_option("--all", dest="show_all",
727
                     default=False, action="store_true",
728
                     help="Show info on all instances on the cluster."
729
                     " This can take a long time to run, use wisely")
730

    
731
SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
732
                           action="store_true", default=False,
733
                           help="Interactive OS reinstall, lists available"
734
                           " OS templates for selection")
735

    
736
IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
737
                                 action="store_true", default=False,
738
                                 help="Remove the instance from the cluster"
739
                                 " configuration even if there are failures"
740
                                 " during the removal process")
741

    
742
IGNORE_REMOVE_FAILURES_OPT = cli_option("--ignore-remove-failures",
743
                                        dest="ignore_remove_failures",
744
                                        action="store_true", default=False,
745
                                        help="Remove the instance from the"
746
                                        " cluster configuration even if there"
747
                                        " are failures during the removal"
748
                                        " process")
749

    
750
REMOVE_INSTANCE_OPT = cli_option("--remove-instance", dest="remove_instance",
751
                                 action="store_true", default=False,
752
                                 help="Remove the instance from the cluster")
753

    
754
NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
755
                               help="Specifies the new secondary node",
756
                               metavar="NODE", default=None,
757
                               completion_suggest=OPT_COMPL_ONE_NODE)
758

    
759
ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
760
                            default=False, action="store_true",
761
                            help="Replace the disk(s) on the primary"
762
                            " node (only for the drbd template)")
763

    
764
ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
765
                              default=False, action="store_true",
766
                              help="Replace the disk(s) on the secondary"
767
                              " node (only for the drbd template)")
768

    
769
AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
770
                              default=False, action="store_true",
771
                              help="Lock all nodes and auto-promote as needed"
772
                              " to MC status")
773

    
774
AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
775
                              default=False, action="store_true",
776
                              help="Automatically replace faulty disks"
777
                              " (only for the drbd template)")
778

    
779
IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
780
                             default=False, action="store_true",
781
                             help="Ignore current recorded size"
782
                             " (useful for forcing activation when"
783
                             " the recorded size is wrong)")
784

    
785
SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
786
                          metavar="<node>",
787
                          completion_suggest=OPT_COMPL_ONE_NODE)
788

    
789
SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
790
                         metavar="<dir>")
791

    
792
SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
793
                              help="Specify the secondary ip for the node",
794
                              metavar="ADDRESS", default=None)
795

    
796
READD_OPT = cli_option("--readd", dest="readd",
797
                       default=False, action="store_true",
798
                       help="Readd old node after replacing it")
799

    
800
NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
801
                                default=True, action="store_false",
802
                                help="Disable SSH key fingerprint checking")
803

    
804

    
805
MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
806
                    type="bool", default=None, metavar=_YORNO,
807
                    help="Set the master_candidate flag on the node")
808

    
809
OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
810
                         type="bool", default=None,
811
                         help="Set the offline flag on the node")
812

    
813
DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
814
                         type="bool", default=None,
815
                         help="Set the drained flag on the node")
816

    
817
ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
818
                             type="bool", default=None, metavar=_YORNO,
819
                             help="Set the allocatable flag on a volume")
820

    
821
NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
822
                               help="Disable support for lvm based instances"
823
                               " (cluster-wide)",
824
                               action="store_false", default=True)
825

    
826
ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
827
                            dest="enabled_hypervisors",
828
                            help="Comma-separated list of hypervisors",
829
                            type="string", default=None)
830

    
831
NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
832
                            type="keyval", default={},
833
                            help="NIC parameters")
834

    
835
CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
836
                         dest="candidate_pool_size", type="int",
837
                         help="Set the candidate pool size")
838

    
839
VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name",
840
                         help="Enables LVM and specifies the volume group"
841
                         " name (cluster-wide) for disk allocation [xenvg]",
842
                         metavar="VG", default=None)
843

    
844
YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
845
                          help="Destroy cluster", action="store_true")
846

    
847
NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
848
                          help="Skip node agreement check (dangerous)",
849
                          action="store_true", default=False)
850

    
851
MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
852
                            help="Specify the mac prefix for the instance IP"
853
                            " addresses, in the format XX:XX:XX",
854
                            metavar="PREFIX",
855
                            default=None)
856

    
857
MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
858
                               help="Specify the node interface (cluster-wide)"
859
                               " on which the master IP address will be added "
860
                               " [%s]" % constants.DEFAULT_BRIDGE,
861
                               metavar="NETDEV",
862
                               default=constants.DEFAULT_BRIDGE)
863

    
864
GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
865
                                help="Specify the default directory (cluster-"
866
                                "wide) for storing the file-based disks [%s]" %
867
                                constants.DEFAULT_FILE_STORAGE_DIR,
868
                                metavar="DIR",
869
                                default=constants.DEFAULT_FILE_STORAGE_DIR)
870

    
871
NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
872
                                   help="Don't modify /etc/hosts",
873
                                   action="store_false", default=True)
874

    
875
NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
876
                                    help="Don't initialize SSH keys",
877
                                    action="store_false", default=True)
878

    
879
ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
880
                             help="Enable parseable error messages",
881
                             action="store_true", default=False)
882

    
883
NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
884
                          help="Skip N+1 memory redundancy tests",
885
                          action="store_true", default=False)
886

    
887
REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
888
                             help="Type of reboot: soft/hard/full",
889
                             default=constants.INSTANCE_REBOOT_HARD,
890
                             metavar="<REBOOT>",
891
                             choices=list(constants.REBOOT_TYPES))
892

    
893
IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
894
                                    dest="ignore_secondaries",
895
                                    default=False, action="store_true",
896
                                    help="Ignore errors from secondaries")
897

    
898
NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
899
                            action="store_false", default=True,
900
                            help="Don't shutdown the instance (unsafe)")
901

    
902
TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
903
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
904
                         help="Maximum time to wait")
905

    
906
SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
907
                         dest="shutdown_timeout", type="int",
908
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
909
                         help="Maximum time to wait for instance shutdown")
910

    
911
EARLY_RELEASE_OPT = cli_option("--early-release",
912
                               dest="early_release", default=False,
913
                               action="store_true",
914
                               help="Release the locks on the secondary"
915
                               " node(s) early")
916

    
917
NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
918
                                  dest="new_cluster_cert",
919
                                  default=False, action="store_true",
920
                                  help="Generate a new cluster certificate")
921

    
922
RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
923
                           default=None,
924
                           help="File containing new RAPI certificate")
925

    
926
NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
927
                               default=None, action="store_true",
928
                               help=("Generate a new self-signed RAPI"
929
                                     " certificate"))
930

    
931
NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
932
                                    dest="new_confd_hmac_key",
933
                                    default=False, action="store_true",
934
                                    help=("Create a new HMAC key for %s" %
935
                                          constants.CONFD))
936

    
937
CLUSTER_DOMAIN_SECRET_OPT = cli_option("--cluster-domain-secret",
938
                                       dest="cluster_domain_secret",
939
                                       default=None,
940
                                       help=("Load new new cluster domain"
941
                                             " secret from file"))
942

    
943
NEW_CLUSTER_DOMAIN_SECRET_OPT = cli_option("--new-cluster-domain-secret",
944
                                           dest="new_cluster_domain_secret",
945
                                           default=False, action="store_true",
946
                                           help=("Create a new cluster domain"
947
                                                 " secret"))
948

    
949
USE_REPL_NET_OPT = cli_option("--use-replication-network",
950
                              dest="use_replication_network",
951
                              help="Whether to use the replication network"
952
                              " for talking to the nodes",
953
                              action="store_true", default=False)
954

    
955
MAINTAIN_NODE_HEALTH_OPT = \
956
    cli_option("--maintain-node-health", dest="maintain_node_health",
957
               metavar=_YORNO, default=None, type="bool",
958
               help="Configure the cluster to automatically maintain node"
959
               " health, by shutting down unknown instances, shutting down"
960
               " unknown DRBD devices, etc.")
961

    
962
IDENTIFY_DEFAULTS_OPT = \
963
    cli_option("--identify-defaults", dest="identify_defaults",
964
               default=False, action="store_true",
965
               help="Identify which saved instance parameters are equal to"
966
               " the current cluster defaults and set them as such, instead"
967
               " of marking them as overridden")
968

    
969
UIDPOOL_OPT = cli_option("--uid-pool", default=None,
970
                         action="store", dest="uid_pool",
971
                         help=("A list of user-ids or user-id"
972
                               " ranges separated by commas"))
973

    
974
ADD_UIDS_OPT = cli_option("--add-uids", default=None,
975
                          action="store", dest="add_uids",
976
                          help=("A list of user-ids or user-id"
977
                                " ranges separated by commas, to be"
978
                                " added to the user-id pool"))
979

    
980
REMOVE_UIDS_OPT = cli_option("--remove-uids", default=None,
981
                             action="store", dest="remove_uids",
982
                             help=("A list of user-ids or user-id"
983
                                   " ranges separated by commas, to be"
984
                                   " removed from the user-id pool"))
985

    
986
ROMAN_OPT = cli_option("--roman",
987
                       dest="roman_integers", default=False,
988
                       action="store_true",
989
                       help="Use roman numbers for positive integers")
990

    
991

    
992

    
993
def _ParseArgs(argv, commands, aliases):
994
  """Parser for the command line arguments.
995

996
  This function parses the arguments and returns the function which
997
  must be executed together with its (modified) arguments.
998

999
  @param argv: the command line
1000
  @param commands: dictionary with special contents, see the design
1001
      doc for cmdline handling
1002
  @param aliases: dictionary with command aliases {'alias': 'target, ...}
1003

1004
  """
1005
  if len(argv) == 0:
1006
    binary = "<command>"
1007
  else:
1008
    binary = argv[0].split("/")[-1]
1009

    
1010
  if len(argv) > 1 and argv[1] == "--version":
1011
    ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
1012
    # Quit right away. That way we don't have to care about this special
1013
    # argument. optparse.py does it the same.
1014
    sys.exit(0)
1015

    
1016
  if len(argv) < 2 or not (argv[1] in commands or
1017
                           argv[1] in aliases):
1018
    # let's do a nice thing
1019
    sortedcmds = commands.keys()
1020
    sortedcmds.sort()
1021

    
1022
    ToStdout("Usage: %s {command} [options...] [argument...]", binary)
1023
    ToStdout("%s <command> --help to see details, or man %s", binary, binary)
1024
    ToStdout("")
1025

    
1026
    # compute the max line length for cmd + usage
1027
    mlen = max([len(" %s" % cmd) for cmd in commands])
1028
    mlen = min(60, mlen) # should not get here...
1029

    
1030
    # and format a nice command list
1031
    ToStdout("Commands:")
1032
    for cmd in sortedcmds:
1033
      cmdstr = " %s" % (cmd,)
1034
      help_text = commands[cmd][4]
1035
      help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
1036
      ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
1037
      for line in help_lines:
1038
        ToStdout("%-*s   %s", mlen, "", line)
1039

    
1040
    ToStdout("")
1041

    
1042
    return None, None, None
1043

    
1044
  # get command, unalias it, and look it up in commands
1045
  cmd = argv.pop(1)
1046
  if cmd in aliases:
1047
    if cmd in commands:
1048
      raise errors.ProgrammerError("Alias '%s' overrides an existing"
1049
                                   " command" % cmd)
1050

    
1051
    if aliases[cmd] not in commands:
1052
      raise errors.ProgrammerError("Alias '%s' maps to non-existing"
1053
                                   " command '%s'" % (cmd, aliases[cmd]))
1054

    
1055
    cmd = aliases[cmd]
1056

    
1057
  func, args_def, parser_opts, usage, description = commands[cmd]
1058
  parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT, DEBUG_OPT],
1059
                        description=description,
1060
                        formatter=TitledHelpFormatter(),
1061
                        usage="%%prog %s %s" % (cmd, usage))
1062
  parser.disable_interspersed_args()
1063
  options, args = parser.parse_args()
1064

    
1065
  if not _CheckArguments(cmd, args_def, args):
1066
    return None, None, None
1067

    
1068
  return func, options, args
1069

    
1070

    
1071
def _CheckArguments(cmd, args_def, args):
1072
  """Verifies the arguments using the argument definition.
1073

1074
  Algorithm:
1075

1076
    1. Abort with error if values specified by user but none expected.
1077

1078
    1. For each argument in definition
1079

1080
      1. Keep running count of minimum number of values (min_count)
1081
      1. Keep running count of maximum number of values (max_count)
1082
      1. If it has an unlimited number of values
1083

1084
        1. Abort with error if it's not the last argument in the definition
1085

1086
    1. If last argument has limited number of values
1087

1088
      1. Abort with error if number of values doesn't match or is too large
1089

1090
    1. Abort with error if user didn't pass enough values (min_count)
1091

1092
  """
1093
  if args and not args_def:
1094
    ToStderr("Error: Command %s expects no arguments", cmd)
1095
    return False
1096

    
1097
  min_count = None
1098
  max_count = None
1099
  check_max = None
1100

    
1101
  last_idx = len(args_def) - 1
1102

    
1103
  for idx, arg in enumerate(args_def):
1104
    if min_count is None:
1105
      min_count = arg.min
1106
    elif arg.min is not None:
1107
      min_count += arg.min
1108

    
1109
    if max_count is None:
1110
      max_count = arg.max
1111
    elif arg.max is not None:
1112
      max_count += arg.max
1113

    
1114
    if idx == last_idx:
1115
      check_max = (arg.max is not None)
1116

    
1117
    elif arg.max is None:
1118
      raise errors.ProgrammerError("Only the last argument can have max=None")
1119

    
1120
  if check_max:
1121
    # Command with exact number of arguments
1122
    if (min_count is not None and max_count is not None and
1123
        min_count == max_count and len(args) != min_count):
1124
      ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
1125
      return False
1126

    
1127
    # Command with limited number of arguments
1128
    if max_count is not None and len(args) > max_count:
1129
      ToStderr("Error: Command %s expects only %d argument(s)",
1130
               cmd, max_count)
1131
      return False
1132

    
1133
  # Command with some required arguments
1134
  if min_count is not None and len(args) < min_count:
1135
    ToStderr("Error: Command %s expects at least %d argument(s)",
1136
             cmd, min_count)
1137
    return False
1138

    
1139
  return True
1140

    
1141

    
1142
def SplitNodeOption(value):
1143
  """Splits the value of a --node option.
1144

1145
  """
1146
  if value and ':' in value:
1147
    return value.split(':', 1)
1148
  else:
1149
    return (value, None)
1150

    
1151

    
1152
def CalculateOSNames(os_name, os_variants):
1153
  """Calculates all the names an OS can be called, according to its variants.
1154

1155
  @type os_name: string
1156
  @param os_name: base name of the os
1157
  @type os_variants: list or None
1158
  @param os_variants: list of supported variants
1159
  @rtype: list
1160
  @return: list of valid names
1161

1162
  """
1163
  if os_variants:
1164
    return ['%s+%s' % (os_name, v) for v in os_variants]
1165
  else:
1166
    return [os_name]
1167

    
1168

    
1169
def UsesRPC(fn):
1170
  def wrapper(*args, **kwargs):
1171
    rpc.Init()
1172
    try:
1173
      return fn(*args, **kwargs)
1174
    finally:
1175
      rpc.Shutdown()
1176
  return wrapper
1177

    
1178

    
1179
def AskUser(text, choices=None):
1180
  """Ask the user a question.
1181

1182
  @param text: the question to ask
1183

1184
  @param choices: list with elements tuples (input_char, return_value,
1185
      description); if not given, it will default to: [('y', True,
1186
      'Perform the operation'), ('n', False, 'Do no do the operation')];
1187
      note that the '?' char is reserved for help
1188

1189
  @return: one of the return values from the choices list; if input is
1190
      not possible (i.e. not running with a tty, we return the last
1191
      entry from the list
1192

1193
  """
1194
  if choices is None:
1195
    choices = [('y', True, 'Perform the operation'),
1196
               ('n', False, 'Do not perform the operation')]
1197
  if not choices or not isinstance(choices, list):
1198
    raise errors.ProgrammerError("Invalid choices argument to AskUser")
1199
  for entry in choices:
1200
    if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
1201
      raise errors.ProgrammerError("Invalid choices element to AskUser")
1202

    
1203
  answer = choices[-1][1]
1204
  new_text = []
1205
  for line in text.splitlines():
1206
    new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
1207
  text = "\n".join(new_text)
1208
  try:
1209
    f = file("/dev/tty", "a+")
1210
  except IOError:
1211
    return answer
1212
  try:
1213
    chars = [entry[0] for entry in choices]
1214
    chars[-1] = "[%s]" % chars[-1]
1215
    chars.append('?')
1216
    maps = dict([(entry[0], entry[1]) for entry in choices])
1217
    while True:
1218
      f.write(text)
1219
      f.write('\n')
1220
      f.write("/".join(chars))
1221
      f.write(": ")
1222
      line = f.readline(2).strip().lower()
1223
      if line in maps:
1224
        answer = maps[line]
1225
        break
1226
      elif line == '?':
1227
        for entry in choices:
1228
          f.write(" %s - %s\n" % (entry[0], entry[2]))
1229
        f.write("\n")
1230
        continue
1231
  finally:
1232
    f.close()
1233
  return answer
1234

    
1235

    
1236
class JobSubmittedException(Exception):
1237
  """Job was submitted, client should exit.
1238

1239
  This exception has one argument, the ID of the job that was
1240
  submitted. The handler should print this ID.
1241

1242
  This is not an error, just a structured way to exit from clients.
1243

1244
  """
1245

    
1246

    
1247
def SendJob(ops, cl=None):
1248
  """Function to submit an opcode without waiting for the results.
1249

1250
  @type ops: list
1251
  @param ops: list of opcodes
1252
  @type cl: luxi.Client
1253
  @param cl: the luxi client to use for communicating with the master;
1254
             if None, a new client will be created
1255

1256
  """
1257
  if cl is None:
1258
    cl = GetClient()
1259

    
1260
  job_id = cl.SubmitJob(ops)
1261

    
1262
  return job_id
1263

    
1264

    
1265
def GenericPollJob(job_id, cbs, report_cbs):
1266
  """Generic job-polling function.
1267

1268
  @type job_id: number
1269
  @param job_id: Job ID
1270
  @type cbs: Instance of L{JobPollCbBase}
1271
  @param cbs: Data callbacks
1272
  @type report_cbs: Instance of L{JobPollReportCbBase}
1273
  @param report_cbs: Reporting callbacks
1274

1275
  """
1276
  prev_job_info = None
1277
  prev_logmsg_serial = None
1278

    
1279
  status = None
1280

    
1281
  while True:
1282
    result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1283
                                      prev_logmsg_serial)
1284
    if not result:
1285
      # job not found, go away!
1286
      raise errors.JobLost("Job with id %s lost" % job_id)
1287

    
1288
    if result == constants.JOB_NOTCHANGED:
1289
      report_cbs.ReportNotChanged(job_id, status)
1290

    
1291
      # Wait again
1292
      continue
1293

    
1294
    # Split result, a tuple of (field values, log entries)
1295
    (job_info, log_entries) = result
1296
    (status, ) = job_info
1297

    
1298
    if log_entries:
1299
      for log_entry in log_entries:
1300
        (serial, timestamp, log_type, message) = log_entry
1301
        report_cbs.ReportLogMessage(job_id, serial, timestamp,
1302
                                    log_type, message)
1303
        prev_logmsg_serial = max(prev_logmsg_serial, serial)
1304

    
1305
    # TODO: Handle canceled and archived jobs
1306
    elif status in (constants.JOB_STATUS_SUCCESS,
1307
                    constants.JOB_STATUS_ERROR,
1308
                    constants.JOB_STATUS_CANCELING,
1309
                    constants.JOB_STATUS_CANCELED):
1310
      break
1311

    
1312
    prev_job_info = job_info
1313

    
1314
  jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1315
  if not jobs:
1316
    raise errors.JobLost("Job with id %s lost" % job_id)
1317

    
1318
  status, opstatus, result = jobs[0]
1319

    
1320
  if status == constants.JOB_STATUS_SUCCESS:
1321
    return result
1322

    
1323
  if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
1324
    raise errors.OpExecError("Job was canceled")
1325

    
1326
  has_ok = False
1327
  for idx, (status, msg) in enumerate(zip(opstatus, result)):
1328
    if status == constants.OP_STATUS_SUCCESS:
1329
      has_ok = True
1330
    elif status == constants.OP_STATUS_ERROR:
1331
      errors.MaybeRaise(msg)
1332

    
1333
      if has_ok:
1334
        raise errors.OpExecError("partial failure (opcode %d): %s" %
1335
                                 (idx, msg))
1336

    
1337
      raise errors.OpExecError(str(msg))
1338

    
1339
  # default failure mode
1340
  raise errors.OpExecError(result)
1341

    
1342

    
1343
class JobPollCbBase:
1344
  """Base class for L{GenericPollJob} callbacks.
1345

1346
  """
1347
  def __init__(self):
1348
    """Initializes this class.
1349

1350
    """
1351

    
1352
  def WaitForJobChangeOnce(self, job_id, fields,
1353
                           prev_job_info, prev_log_serial):
1354
    """Waits for changes on a job.
1355

1356
    """
1357
    raise NotImplementedError()
1358

    
1359
  def QueryJobs(self, job_ids, fields):
1360
    """Returns the selected fields for the selected job IDs.
1361

1362
    @type job_ids: list of numbers
1363
    @param job_ids: Job IDs
1364
    @type fields: list of strings
1365
    @param fields: Fields
1366

1367
    """
1368
    raise NotImplementedError()
1369

    
1370

    
1371
class JobPollReportCbBase:
1372
  """Base class for L{GenericPollJob} reporting callbacks.
1373

1374
  """
1375
  def __init__(self):
1376
    """Initializes this class.
1377

1378
    """
1379

    
1380
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1381
    """Handles a log message.
1382

1383
    """
1384
    raise NotImplementedError()
1385

    
1386
  def ReportNotChanged(self, job_id, status):
1387
    """Called for if a job hasn't changed in a while.
1388

1389
    @type job_id: number
1390
    @param job_id: Job ID
1391
    @type status: string or None
1392
    @param status: Job status if available
1393

1394
    """
1395
    raise NotImplementedError()
1396

    
1397

    
1398
class _LuxiJobPollCb(JobPollCbBase):
1399
  def __init__(self, cl):
1400
    """Initializes this class.
1401

1402
    """
1403
    JobPollCbBase.__init__(self)
1404
    self.cl = cl
1405

    
1406
  def WaitForJobChangeOnce(self, job_id, fields,
1407
                           prev_job_info, prev_log_serial):
1408
    """Waits for changes on a job.
1409

1410
    """
1411
    return self.cl.WaitForJobChangeOnce(job_id, fields,
1412
                                        prev_job_info, prev_log_serial)
1413

    
1414
  def QueryJobs(self, job_ids, fields):
1415
    """Returns the selected fields for the selected job IDs.
1416

1417
    """
1418
    return self.cl.QueryJobs(job_ids, fields)
1419

    
1420

    
1421
class FeedbackFnJobPollReportCb(JobPollReportCbBase):
1422
  def __init__(self, feedback_fn):
1423
    """Initializes this class.
1424

1425
    """
1426
    JobPollReportCbBase.__init__(self)
1427

    
1428
    self.feedback_fn = feedback_fn
1429

    
1430
    assert callable(feedback_fn)
1431

    
1432
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1433
    """Handles a log message.
1434

1435
    """
1436
    self.feedback_fn((timestamp, log_type, log_msg))
1437

    
1438
  def ReportNotChanged(self, job_id, status):
1439
    """Called if a job hasn't changed in a while.
1440

1441
    """
1442
    # Ignore
1443

    
1444

    
1445
class StdioJobPollReportCb(JobPollReportCbBase):
1446
  def __init__(self):
1447
    """Initializes this class.
1448

1449
    """
1450
    JobPollReportCbBase.__init__(self)
1451

    
1452
    self.notified_queued = False
1453
    self.notified_waitlock = False
1454

    
1455
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1456
    """Handles a log message.
1457

1458
    """
1459
    ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
1460
             utils.SafeEncode(log_msg))
1461

    
1462
  def ReportNotChanged(self, job_id, status):
1463
    """Called if a job hasn't changed in a while.
1464

1465
    """
1466
    if status is None:
1467
      return
1468

    
1469
    if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
1470
      ToStderr("Job %s is waiting in queue", job_id)
1471
      self.notified_queued = True
1472

    
1473
    elif status == constants.JOB_STATUS_WAITLOCK and not self.notified_waitlock:
1474
      ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1475
      self.notified_waitlock = True
1476

    
1477

    
1478
def PollJob(job_id, cl=None, feedback_fn=None):
1479
  """Function to poll for the result of a job.
1480

1481
  @type job_id: job identified
1482
  @param job_id: the job to poll for results
1483
  @type cl: luxi.Client
1484
  @param cl: the luxi client to use for communicating with the master;
1485
             if None, a new client will be created
1486

1487
  """
1488
  if cl is None:
1489
    cl = GetClient()
1490

    
1491
  if feedback_fn:
1492
    reporter = FeedbackFnJobPollReportCb(feedback_fn)
1493
  else:
1494
    reporter = StdioJobPollReportCb()
1495

    
1496
  return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
1497

    
1498

    
1499
def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):
1500
  """Legacy function to submit an opcode.
1501

1502
  This is just a simple wrapper over the construction of the processor
1503
  instance. It should be extended to better handle feedback and
1504
  interaction functions.
1505

1506
  """
1507
  if cl is None:
1508
    cl = GetClient()
1509

    
1510
  SetGenericOpcodeOpts([op], opts)
1511

    
1512
  job_id = SendJob([op], cl)
1513

    
1514
  op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
1515

    
1516
  return op_results[0]
1517

    
1518

    
1519
def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1520
  """Wrapper around SubmitOpCode or SendJob.
1521

1522
  This function will decide, based on the 'opts' parameter, whether to
1523
  submit and wait for the result of the opcode (and return it), or
1524
  whether to just send the job and print its identifier. It is used in
1525
  order to simplify the implementation of the '--submit' option.
1526

1527
  It will also process the opcodes if we're sending the via SendJob
1528
  (otherwise SubmitOpCode does it).
1529

1530
  """
1531
  if opts and opts.submit_only:
1532
    job = [op]
1533
    SetGenericOpcodeOpts(job, opts)
1534
    job_id = SendJob(job, cl=cl)
1535
    raise JobSubmittedException(job_id)
1536
  else:
1537
    return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1538

    
1539

    
1540
def SetGenericOpcodeOpts(opcode_list, options):
1541
  """Processor for generic options.
1542

1543
  This function updates the given opcodes based on generic command
1544
  line options (like debug, dry-run, etc.).
1545

1546
  @param opcode_list: list of opcodes
1547
  @param options: command line options or None
1548
  @return: None (in-place modification)
1549

1550
  """
1551
  if not options:
1552
    return
1553
  for op in opcode_list:
1554
    op.dry_run = options.dry_run
1555
    op.debug_level = options.debug
1556

    
1557

    
1558
def GetClient():
1559
  # TODO: Cache object?
1560
  try:
1561
    client = luxi.Client()
1562
  except luxi.NoMasterError:
1563
    ss = ssconf.SimpleStore()
1564

    
1565
    # Try to read ssconf file
1566
    try:
1567
      ss.GetMasterNode()
1568
    except errors.ConfigurationError:
1569
      raise errors.OpPrereqError("Cluster not initialized or this machine is"
1570
                                 " not part of a cluster")
1571

    
1572
    master, myself = ssconf.GetMasterAndMyself(ss=ss)
1573
    if master != myself:
1574
      raise errors.OpPrereqError("This is not the master node, please connect"
1575
                                 " to node '%s' and rerun the command" %
1576
                                 master)
1577
    raise
1578
  return client
1579

    
1580

    
1581
def FormatError(err):
1582
  """Return a formatted error message for a given error.
1583

1584
  This function takes an exception instance and returns a tuple
1585
  consisting of two values: first, the recommended exit code, and
1586
  second, a string describing the error message (not
1587
  newline-terminated).
1588

1589
  """
1590
  retcode = 1
1591
  obuf = StringIO()
1592
  msg = str(err)
1593
  if isinstance(err, errors.ConfigurationError):
1594
    txt = "Corrupt configuration file: %s" % msg
1595
    logging.error(txt)
1596
    obuf.write(txt + "\n")
1597
    obuf.write("Aborting.")
1598
    retcode = 2
1599
  elif isinstance(err, errors.HooksAbort):
1600
    obuf.write("Failure: hooks execution failed:\n")
1601
    for node, script, out in err.args[0]:
1602
      if out:
1603
        obuf.write("  node: %s, script: %s, output: %s\n" %
1604
                   (node, script, out))
1605
      else:
1606
        obuf.write("  node: %s, script: %s (no output)\n" %
1607
                   (node, script))
1608
  elif isinstance(err, errors.HooksFailure):
1609
    obuf.write("Failure: hooks general failure: %s" % msg)
1610
  elif isinstance(err, errors.ResolverError):
1611
    this_host = utils.HostInfo.SysName()
1612
    if err.args[0] == this_host:
1613
      msg = "Failure: can't resolve my own hostname ('%s')"
1614
    else:
1615
      msg = "Failure: can't resolve hostname '%s'"
1616
    obuf.write(msg % err.args[0])
1617
  elif isinstance(err, errors.OpPrereqError):
1618
    if len(err.args) == 2:
1619
      obuf.write("Failure: prerequisites not met for this"
1620
               " operation:\nerror type: %s, error details:\n%s" %
1621
                 (err.args[1], err.args[0]))
1622
    else:
1623
      obuf.write("Failure: prerequisites not met for this"
1624
                 " operation:\n%s" % msg)
1625
  elif isinstance(err, errors.OpExecError):
1626
    obuf.write("Failure: command execution error:\n%s" % msg)
1627
  elif isinstance(err, errors.TagError):
1628
    obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1629
  elif isinstance(err, errors.JobQueueDrainError):
1630
    obuf.write("Failure: the job queue is marked for drain and doesn't"
1631
               " accept new requests\n")
1632
  elif isinstance(err, errors.JobQueueFull):
1633
    obuf.write("Failure: the job queue is full and doesn't accept new"
1634
               " job submissions until old jobs are archived\n")
1635
  elif isinstance(err, errors.TypeEnforcementError):
1636
    obuf.write("Parameter Error: %s" % msg)
1637
  elif isinstance(err, errors.ParameterError):
1638
    obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1639
  elif isinstance(err, luxi.NoMasterError):
1640
    obuf.write("Cannot communicate with the master daemon.\nIs it running"
1641
               " and listening for connections?")
1642
  elif isinstance(err, luxi.TimeoutError):
1643
    obuf.write("Timeout while talking to the master daemon. Error:\n"
1644
               "%s" % msg)
1645
  elif isinstance(err, luxi.ProtocolError):
1646
    obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1647
               "%s" % msg)
1648
  elif isinstance(err, errors.GenericError):
1649
    obuf.write("Unhandled Ganeti error: %s" % msg)
1650
  elif isinstance(err, JobSubmittedException):
1651
    obuf.write("JobID: %s\n" % err.args[0])
1652
    retcode = 0
1653
  else:
1654
    obuf.write("Unhandled exception: %s" % msg)
1655
  return retcode, obuf.getvalue().rstrip('\n')
1656

    
1657

    
1658
def GenericMain(commands, override=None, aliases=None):
1659
  """Generic main function for all the gnt-* commands.
1660

1661
  Arguments:
1662
    - commands: a dictionary with a special structure, see the design doc
1663
                for command line handling.
1664
    - override: if not None, we expect a dictionary with keys that will
1665
                override command line options; this can be used to pass
1666
                options from the scripts to generic functions
1667
    - aliases: dictionary with command aliases {'alias': 'target, ...}
1668

1669
  """
1670
  # save the program name and the entire command line for later logging
1671
  if sys.argv:
1672
    binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1673
    if len(sys.argv) >= 2:
1674
      binary += " " + sys.argv[1]
1675
      old_cmdline = " ".join(sys.argv[2:])
1676
    else:
1677
      old_cmdline = ""
1678
  else:
1679
    binary = "<unknown program>"
1680
    old_cmdline = ""
1681

    
1682
  if aliases is None:
1683
    aliases = {}
1684

    
1685
  try:
1686
    func, options, args = _ParseArgs(sys.argv, commands, aliases)
1687
  except errors.ParameterError, err:
1688
    result, err_msg = FormatError(err)
1689
    ToStderr(err_msg)
1690
    return 1
1691

    
1692
  if func is None: # parse error
1693
    return 1
1694

    
1695
  if override is not None:
1696
    for key, val in override.iteritems():
1697
      setattr(options, key, val)
1698

    
1699
  utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1700
                     stderr_logging=True, program=binary)
1701

    
1702
  if old_cmdline:
1703
    logging.info("run with arguments '%s'", old_cmdline)
1704
  else:
1705
    logging.info("run with no arguments")
1706

    
1707
  try:
1708
    result = func(options, args)
1709
  except (errors.GenericError, luxi.ProtocolError,
1710
          JobSubmittedException), err:
1711
    result, err_msg = FormatError(err)
1712
    logging.exception("Error during command processing")
1713
    ToStderr(err_msg)
1714

    
1715
  return result
1716

    
1717

    
1718
def GenericInstanceCreate(mode, opts, args):
1719
  """Add an instance to the cluster via either creation or import.
1720

1721
  @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1722
  @param opts: the command line options selected by the user
1723
  @type args: list
1724
  @param args: should contain only one element, the new instance name
1725
  @rtype: int
1726
  @return: the desired exit code
1727

1728
  """
1729
  instance = args[0]
1730

    
1731
  (pnode, snode) = SplitNodeOption(opts.node)
1732

    
1733
  hypervisor = None
1734
  hvparams = {}
1735
  if opts.hypervisor:
1736
    hypervisor, hvparams = opts.hypervisor
1737

    
1738
  if opts.nics:
1739
    try:
1740
      nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
1741
    except ValueError, err:
1742
      raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
1743
    nics = [{}] * nic_max
1744
    for nidx, ndict in opts.nics:
1745
      nidx = int(nidx)
1746
      if not isinstance(ndict, dict):
1747
        msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict)
1748
        raise errors.OpPrereqError(msg)
1749
      nics[nidx] = ndict
1750
  elif opts.no_nics:
1751
    # no nics
1752
    nics = []
1753
  elif mode == constants.INSTANCE_CREATE:
1754
    # default of one nic, all auto
1755
    nics = [{}]
1756
  else:
1757
    # mode == import
1758
    nics = []
1759

    
1760
  if opts.disk_template == constants.DT_DISKLESS:
1761
    if opts.disks or opts.sd_size is not None:
1762
      raise errors.OpPrereqError("Diskless instance but disk"
1763
                                 " information passed")
1764
    disks = []
1765
  else:
1766
    if (not opts.disks and not opts.sd_size
1767
        and mode == constants.INSTANCE_CREATE):
1768
      raise errors.OpPrereqError("No disk information specified")
1769
    if opts.disks and opts.sd_size is not None:
1770
      raise errors.OpPrereqError("Please use either the '--disk' or"
1771
                                 " '-s' option")
1772
    if opts.sd_size is not None:
1773
      opts.disks = [(0, {"size": opts.sd_size})]
1774

    
1775
    if opts.disks:
1776
      try:
1777
        disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1778
      except ValueError, err:
1779
        raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
1780
      disks = [{}] * disk_max
1781
    else:
1782
      disks = []
1783
    for didx, ddict in opts.disks:
1784
      didx = int(didx)
1785
      if not isinstance(ddict, dict):
1786
        msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1787
        raise errors.OpPrereqError(msg)
1788
      elif "size" in ddict:
1789
        if "adopt" in ddict:
1790
          raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1791
                                     " (disk %d)" % didx)
1792
        try:
1793
          ddict["size"] = utils.ParseUnit(ddict["size"])
1794
        except ValueError, err:
1795
          raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1796
                                     (didx, err))
1797
      elif "adopt" in ddict:
1798
        if mode == constants.INSTANCE_IMPORT:
1799
          raise errors.OpPrereqError("Disk adoption not allowed for instance"
1800
                                     " import")
1801
        ddict["size"] = 0
1802
      else:
1803
        raise errors.OpPrereqError("Missing size or adoption source for"
1804
                                   " disk %d" % didx)
1805
      disks[didx] = ddict
1806

    
1807
  utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
1808
  utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1809

    
1810
  if mode == constants.INSTANCE_CREATE:
1811
    start = opts.start
1812
    os_type = opts.os
1813
    src_node = None
1814
    src_path = None
1815
    no_install = opts.no_install
1816
    identify_defaults = False
1817
  elif mode == constants.INSTANCE_IMPORT:
1818
    start = False
1819
    os_type = None
1820
    src_node = opts.src_node
1821
    src_path = opts.src_dir
1822
    no_install = None
1823
    identify_defaults = opts.identify_defaults
1824
  else:
1825
    raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1826

    
1827
  op = opcodes.OpCreateInstance(instance_name=instance,
1828
                                disks=disks,
1829
                                disk_template=opts.disk_template,
1830
                                nics=nics,
1831
                                pnode=pnode, snode=snode,
1832
                                ip_check=opts.ip_check,
1833
                                name_check=opts.name_check,
1834
                                wait_for_sync=opts.wait_for_sync,
1835
                                file_storage_dir=opts.file_storage_dir,
1836
                                file_driver=opts.file_driver,
1837
                                iallocator=opts.iallocator,
1838
                                hypervisor=hypervisor,
1839
                                hvparams=hvparams,
1840
                                beparams=opts.beparams,
1841
                                osparams=opts.osparams,
1842
                                mode=mode,
1843
                                start=start,
1844
                                os_type=os_type,
1845
                                src_node=src_node,
1846
                                src_path=src_path,
1847
                                no_install=no_install,
1848
                                identify_defaults=identify_defaults)
1849

    
1850
  SubmitOrSend(op, opts)
1851
  return 0
1852

    
1853

    
1854
class _RunWhileClusterStoppedHelper:
1855
  """Helper class for L{RunWhileClusterStopped} to simplify state management
1856

1857
  """
1858
  def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
1859
    """Initializes this class.
1860

1861
    @type feedback_fn: callable
1862
    @param feedback_fn: Feedback function
1863
    @type cluster_name: string
1864
    @param cluster_name: Cluster name
1865
    @type master_node: string
1866
    @param master_node Master node name
1867
    @type online_nodes: list
1868
    @param online_nodes: List of names of online nodes
1869

1870
    """
1871
    self.feedback_fn = feedback_fn
1872
    self.cluster_name = cluster_name
1873
    self.master_node = master_node
1874
    self.online_nodes = online_nodes
1875

    
1876
    self.ssh = ssh.SshRunner(self.cluster_name)
1877

    
1878
    self.nonmaster_nodes = [name for name in online_nodes
1879
                            if name != master_node]
1880

    
1881
    assert self.master_node not in self.nonmaster_nodes
1882

    
1883
  def _RunCmd(self, node_name, cmd):
1884
    """Runs a command on the local or a remote machine.
1885

1886
    @type node_name: string
1887
    @param node_name: Machine name
1888
    @type cmd: list
1889
    @param cmd: Command
1890

1891
    """
1892
    if node_name is None or node_name == self.master_node:
1893
      # No need to use SSH
1894
      result = utils.RunCmd(cmd)
1895
    else:
1896
      result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
1897

    
1898
    if result.failed:
1899
      errmsg = ["Failed to run command %s" % result.cmd]
1900
      if node_name:
1901
        errmsg.append("on node %s" % node_name)
1902
      errmsg.append(": exitcode %s and error %s" %
1903
                    (result.exit_code, result.output))
1904
      raise errors.OpExecError(" ".join(errmsg))
1905

    
1906
  def Call(self, fn, *args):
1907
    """Call function while all daemons are stopped.
1908

1909
    @type fn: callable
1910
    @param fn: Function to be called
1911

1912
    """
1913
    # Pause watcher by acquiring an exclusive lock on watcher state file
1914
    self.feedback_fn("Blocking watcher")
1915
    watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
1916
    try:
1917
      # TODO: Currently, this just blocks. There's no timeout.
1918
      # TODO: Should it be a shared lock?
1919
      watcher_block.Exclusive(blocking=True)
1920

    
1921
      # Stop master daemons, so that no new jobs can come in and all running
1922
      # ones are finished
1923
      self.feedback_fn("Stopping master daemons")
1924
      self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
1925
      try:
1926
        # Stop daemons on all nodes
1927
        for node_name in self.online_nodes:
1928
          self.feedback_fn("Stopping daemons on %s" % node_name)
1929
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
1930

    
1931
        # All daemons are shut down now
1932
        try:
1933
          return fn(self, *args)
1934
        except Exception, err:
1935
          _, errmsg = FormatError(err)
1936
          logging.exception("Caught exception")
1937
          self.feedback_fn(errmsg)
1938
          raise
1939
      finally:
1940
        # Start cluster again, master node last
1941
        for node_name in self.nonmaster_nodes + [self.master_node]:
1942
          self.feedback_fn("Starting daemons on %s" % node_name)
1943
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
1944
    finally:
1945
      # Resume watcher
1946
      watcher_block.Close()
1947

    
1948

    
1949
def RunWhileClusterStopped(feedback_fn, fn, *args):
1950
  """Calls a function while all cluster daemons are stopped.
1951

1952
  @type feedback_fn: callable
1953
  @param feedback_fn: Feedback function
1954
  @type fn: callable
1955
  @param fn: Function to be called when daemons are stopped
1956

1957
  """
1958
  feedback_fn("Gathering cluster information")
1959

    
1960
  # This ensures we're running on the master daemon
1961
  cl = GetClient()
1962

    
1963
  (cluster_name, master_node) = \
1964
    cl.QueryConfigValues(["cluster_name", "master_node"])
1965

    
1966
  online_nodes = GetOnlineNodes([], cl=cl)
1967

    
1968
  # Don't keep a reference to the client. The master daemon will go away.
1969
  del cl
1970

    
1971
  assert master_node in online_nodes
1972

    
1973
  return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
1974
                                       online_nodes).Call(fn, *args)
1975

    
1976

    
1977
def GenerateTable(headers, fields, separator, data,
1978
                  numfields=None, unitfields=None,
1979
                  units=None):
1980
  """Prints a table with headers and different fields.
1981

1982
  @type headers: dict
1983
  @param headers: dictionary mapping field names to headers for
1984
      the table
1985
  @type fields: list
1986
  @param fields: the field names corresponding to each row in
1987
      the data field
1988
  @param separator: the separator to be used; if this is None,
1989
      the default 'smart' algorithm is used which computes optimal
1990
      field width, otherwise just the separator is used between
1991
      each field
1992
  @type data: list
1993
  @param data: a list of lists, each sublist being one row to be output
1994
  @type numfields: list
1995
  @param numfields: a list with the fields that hold numeric
1996
      values and thus should be right-aligned
1997
  @type unitfields: list
1998
  @param unitfields: a list with the fields that hold numeric
1999
      values that should be formatted with the units field
2000
  @type units: string or None
2001
  @param units: the units we should use for formatting, or None for
2002
      automatic choice (human-readable for non-separator usage, otherwise
2003
      megabytes); this is a one-letter string
2004

2005
  """
2006
  if units is None:
2007
    if separator:
2008
      units = "m"
2009
    else:
2010
      units = "h"
2011

    
2012
  if numfields is None:
2013
    numfields = []
2014
  if unitfields is None:
2015
    unitfields = []
2016

    
2017
  numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
2018
  unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
2019

    
2020
  format_fields = []
2021
  for field in fields:
2022
    if headers and field not in headers:
2023
      # TODO: handle better unknown fields (either revert to old
2024
      # style of raising exception, or deal more intelligently with
2025
      # variable fields)
2026
      headers[field] = field
2027
    if separator is not None:
2028
      format_fields.append("%s")
2029
    elif numfields.Matches(field):
2030
      format_fields.append("%*s")
2031
    else:
2032
      format_fields.append("%-*s")
2033

    
2034
  if separator is None:
2035
    mlens = [0 for name in fields]
2036
    format_str = ' '.join(format_fields)
2037
  else:
2038
    format_str = separator.replace("%", "%%").join(format_fields)
2039

    
2040
  for row in data:
2041
    if row is None:
2042
      continue
2043
    for idx, val in enumerate(row):
2044
      if unitfields.Matches(fields[idx]):
2045
        try:
2046
          val = int(val)
2047
        except (TypeError, ValueError):
2048
          pass
2049
        else:
2050
          val = row[idx] = utils.FormatUnit(val, units)
2051
      val = row[idx] = str(val)
2052
      if separator is None:
2053
        mlens[idx] = max(mlens[idx], len(val))
2054

    
2055
  result = []
2056
  if headers:
2057
    args = []
2058
    for idx, name in enumerate(fields):
2059
      hdr = headers[name]
2060
      if separator is None:
2061
        mlens[idx] = max(mlens[idx], len(hdr))
2062
        args.append(mlens[idx])
2063
      args.append(hdr)
2064
    result.append(format_str % tuple(args))
2065

    
2066
  if separator is None:
2067
    assert len(mlens) == len(fields)
2068

    
2069
    if fields and not numfields.Matches(fields[-1]):
2070
      mlens[-1] = 0
2071

    
2072
  for line in data:
2073
    args = []
2074
    if line is None:
2075
      line = ['-' for _ in fields]
2076
    for idx in range(len(fields)):
2077
      if separator is None:
2078
        args.append(mlens[idx])
2079
      args.append(line[idx])
2080
    result.append(format_str % tuple(args))
2081

    
2082
  return result
2083

    
2084

    
2085
def FormatTimestamp(ts):
2086
  """Formats a given timestamp.
2087

2088
  @type ts: timestamp
2089
  @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
2090

2091
  @rtype: string
2092
  @return: a string with the formatted timestamp
2093

2094
  """
2095
  if not isinstance (ts, (tuple, list)) or len(ts) != 2:
2096
    return '?'
2097
  sec, usec = ts
2098
  return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
2099

    
2100

    
2101
def ParseTimespec(value):
2102
  """Parse a time specification.
2103

2104
  The following suffixed will be recognized:
2105

2106
    - s: seconds
2107
    - m: minutes
2108
    - h: hours
2109
    - d: day
2110
    - w: weeks
2111

2112
  Without any suffix, the value will be taken to be in seconds.
2113

2114
  """
2115
  value = str(value)
2116
  if not value:
2117
    raise errors.OpPrereqError("Empty time specification passed")
2118
  suffix_map = {
2119
    's': 1,
2120
    'm': 60,
2121
    'h': 3600,
2122
    'd': 86400,
2123
    'w': 604800,
2124
    }
2125
  if value[-1] not in suffix_map:
2126
    try:
2127
      value = int(value)
2128
    except (TypeError, ValueError):
2129
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2130
  else:
2131
    multiplier = suffix_map[value[-1]]
2132
    value = value[:-1]
2133
    if not value: # no data left after stripping the suffix
2134
      raise errors.OpPrereqError("Invalid time specification (only"
2135
                                 " suffix passed)")
2136
    try:
2137
      value = int(value) * multiplier
2138
    except (TypeError, ValueError):
2139
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2140
  return value
2141

    
2142

    
2143
def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
2144
                   filter_master=False):
2145
  """Returns the names of online nodes.
2146

2147
  This function will also log a warning on stderr with the names of
2148
  the online nodes.
2149

2150
  @param nodes: if not empty, use only this subset of nodes (minus the
2151
      offline ones)
2152
  @param cl: if not None, luxi client to use
2153
  @type nowarn: boolean
2154
  @param nowarn: by default, this function will output a note with the
2155
      offline nodes that are skipped; if this parameter is True the
2156
      note is not displayed
2157
  @type secondary_ips: boolean
2158
  @param secondary_ips: if True, return the secondary IPs instead of the
2159
      names, useful for doing network traffic over the replication interface
2160
      (if any)
2161
  @type filter_master: boolean
2162
  @param filter_master: if True, do not return the master node in the list
2163
      (useful in coordination with secondary_ips where we cannot check our
2164
      node name against the list)
2165

2166
  """
2167
  if cl is None:
2168
    cl = GetClient()
2169

    
2170
  if secondary_ips:
2171
    name_idx = 2
2172
  else:
2173
    name_idx = 0
2174

    
2175
  if filter_master:
2176
    master_node = cl.QueryConfigValues(["master_node"])[0]
2177
    filter_fn = lambda x: x != master_node
2178
  else:
2179
    filter_fn = lambda _: True
2180

    
2181
  result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"],
2182
                         use_locking=False)
2183
  offline = [row[0] for row in result if row[1]]
2184
  if offline and not nowarn:
2185
    ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
2186
  return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])]
2187

    
2188

    
2189
def _ToStream(stream, txt, *args):
2190
  """Write a message to a stream, bypassing the logging system
2191

2192
  @type stream: file object
2193
  @param stream: the file to which we should write
2194
  @type txt: str
2195
  @param txt: the message
2196

2197
  """
2198
  if args:
2199
    args = tuple(args)
2200
    stream.write(txt % args)
2201
  else:
2202
    stream.write(txt)
2203
  stream.write('\n')
2204
  stream.flush()
2205

    
2206

    
2207
def ToStdout(txt, *args):
2208
  """Write a message to stdout only, bypassing the logging system
2209

2210
  This is just a wrapper over _ToStream.
2211

2212
  @type txt: str
2213
  @param txt: the message
2214

2215
  """
2216
  _ToStream(sys.stdout, txt, *args)
2217

    
2218

    
2219
def ToStderr(txt, *args):
2220
  """Write a message to stderr only, bypassing the logging system
2221

2222
  This is just a wrapper over _ToStream.
2223

2224
  @type txt: str
2225
  @param txt: the message
2226

2227
  """
2228
  _ToStream(sys.stderr, txt, *args)
2229

    
2230

    
2231
class JobExecutor(object):
2232
  """Class which manages the submission and execution of multiple jobs.
2233

2234
  Note that instances of this class should not be reused between
2235
  GetResults() calls.
2236

2237
  """
2238
  def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
2239
    self.queue = []
2240
    if cl is None:
2241
      cl = GetClient()
2242
    self.cl = cl
2243
    self.verbose = verbose
2244
    self.jobs = []
2245
    self.opts = opts
2246
    self.feedback_fn = feedback_fn
2247

    
2248
  def QueueJob(self, name, *ops):
2249
    """Record a job for later submit.
2250

2251
    @type name: string
2252
    @param name: a description of the job, will be used in WaitJobSet
2253
    """
2254
    SetGenericOpcodeOpts(ops, self.opts)
2255
    self.queue.append((name, ops))
2256

    
2257
  def SubmitPending(self, each=False):
2258
    """Submit all pending jobs.
2259

2260
    """
2261
    if each:
2262
      results = []
2263
      for row in self.queue:
2264
        # SubmitJob will remove the success status, but raise an exception if
2265
        # the submission fails, so we'll notice that anyway.
2266
        results.append([True, self.cl.SubmitJob(row[1])])
2267
    else:
2268
      results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
2269
    for (idx, ((status, data), (name, _))) in enumerate(zip(results,
2270
                                                            self.queue)):
2271
      self.jobs.append((idx, status, data, name))
2272

    
2273
  def _ChooseJob(self):
2274
    """Choose a non-waiting/queued job to poll next.
2275

2276
    """
2277
    assert self.jobs, "_ChooseJob called with empty job list"
2278

    
2279
    result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
2280
    assert result
2281

    
2282
    for job_data, status in zip(self.jobs, result):
2283
      if status[0] in (constants.JOB_STATUS_QUEUED,
2284
                    constants.JOB_STATUS_WAITLOCK,
2285
                    constants.JOB_STATUS_CANCELING):
2286
        # job is still waiting
2287
        continue
2288
      # good candidate found
2289
      self.jobs.remove(job_data)
2290
      return job_data
2291

    
2292
    # no job found
2293
    return self.jobs.pop(0)
2294

    
2295
  def GetResults(self):
2296
    """Wait for and return the results of all jobs.
2297

2298
    @rtype: list
2299
    @return: list of tuples (success, job results), in the same order
2300
        as the submitted jobs; if a job has failed, instead of the result
2301
        there will be the error message
2302

2303
    """
2304
    if not self.jobs:
2305
      self.SubmitPending()
2306
    results = []
2307
    if self.verbose:
2308
      ok_jobs = [row[2] for row in self.jobs if row[1]]
2309
      if ok_jobs:
2310
        ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2311

    
2312
    # first, remove any non-submitted jobs
2313
    self.jobs, failures = compat.partition(self.jobs, lambda x: x[1])
2314
    for idx, _, jid, name in failures:
2315
      ToStderr("Failed to submit job for %s: %s", name, jid)
2316
      results.append((idx, False, jid))
2317

    
2318
    while self.jobs:
2319
      (idx, _, jid, name) = self._ChooseJob()
2320
      ToStdout("Waiting for job %s for %s...", jid, name)
2321
      try:
2322
        job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2323
        success = True
2324
      except (errors.GenericError, luxi.ProtocolError), err:
2325
        _, job_result = FormatError(err)
2326
        success = False
2327
        # the error message will always be shown, verbose or not
2328
        ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
2329

    
2330
      results.append((idx, success, job_result))
2331

    
2332
    # sort based on the index, then drop it
2333
    results.sort()
2334
    results = [i[1:] for i in results]
2335

    
2336
    return results
2337

    
2338
  def WaitOrShow(self, wait):
2339
    """Wait for job results or only print the job IDs.
2340

2341
    @type wait: boolean
2342
    @param wait: whether to wait or not
2343

2344
    """
2345
    if wait:
2346
      return self.GetResults()
2347
    else:
2348
      if not self.jobs:
2349
        self.SubmitPending()
2350
      for _, status, result, name in self.jobs:
2351
        if status:
2352
          ToStdout("%s: %s", result, name)
2353
        else:
2354
          ToStderr("Failure for %s: %s", name, result)