Statistics
| Branch: | Tag: | Revision:

root / lib / cli.py @ 53a8a54d

History | View | Annotate | Download (76.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module dealing with command line parsing"""
23

    
24

    
25
import sys
26
import textwrap
27
import os.path
28
import time
29
import logging
30
from cStringIO import StringIO
31

    
32
from ganeti import utils
33
from ganeti import errors
34
from ganeti import constants
35
from ganeti import opcodes
36
from ganeti import luxi
37
from ganeti import ssconf
38
from ganeti import rpc
39
from ganeti import ssh
40
from ganeti import compat
41

    
42
from optparse import (OptionParser, TitledHelpFormatter,
43
                      Option, OptionValueError)
44

    
45

    
46
__all__ = [
47
  # Command line options
48
  "ADD_UIDS_OPT",
49
  "ALLOCATABLE_OPT",
50
  "ALL_OPT",
51
  "AUTO_PROMOTE_OPT",
52
  "AUTO_REPLACE_OPT",
53
  "BACKEND_OPT",
54
  "CLEANUP_OPT",
55
  "CLUSTER_DOMAIN_SECRET_OPT",
56
  "CONFIRM_OPT",
57
  "CP_SIZE_OPT",
58
  "DEBUG_OPT",
59
  "DEBUG_SIMERR_OPT",
60
  "DISKIDX_OPT",
61
  "DISK_OPT",
62
  "DISK_TEMPLATE_OPT",
63
  "DRAINED_OPT",
64
  "DRBD_HELPER_OPT",
65
  "EARLY_RELEASE_OPT",
66
  "ENABLED_HV_OPT",
67
  "ERROR_CODES_OPT",
68
  "FIELDS_OPT",
69
  "FILESTORE_DIR_OPT",
70
  "FILESTORE_DRIVER_OPT",
71
  "FORCE_OPT",
72
  "FORCE_VARIANT_OPT",
73
  "GLOBAL_FILEDIR_OPT",
74
  "HVLIST_OPT",
75
  "HVOPTS_OPT",
76
  "HYPERVISOR_OPT",
77
  "IALLOCATOR_OPT",
78
  "IDENTIFY_DEFAULTS_OPT",
79
  "IGNORE_CONSIST_OPT",
80
  "IGNORE_FAILURES_OPT",
81
  "IGNORE_REMOVE_FAILURES_OPT",
82
  "IGNORE_SECONDARIES_OPT",
83
  "IGNORE_SIZE_OPT",
84
  "MAC_PREFIX_OPT",
85
  "MAINTAIN_NODE_HEALTH_OPT",
86
  "MASTER_NETDEV_OPT",
87
  "MC_OPT",
88
  "NET_OPT",
89
  "NEW_CLUSTER_CERT_OPT",
90
  "NEW_CLUSTER_DOMAIN_SECRET_OPT",
91
  "NEW_CONFD_HMAC_KEY_OPT",
92
  "NEW_RAPI_CERT_OPT",
93
  "NEW_SECONDARY_OPT",
94
  "NIC_PARAMS_OPT",
95
  "NODE_LIST_OPT",
96
  "NODE_PLACEMENT_OPT",
97
  "NODRBD_STORAGE_OPT",
98
  "NOHDR_OPT",
99
  "NOIPCHECK_OPT",
100
  "NO_INSTALL_OPT",
101
  "NONAMECHECK_OPT",
102
  "NOLVM_STORAGE_OPT",
103
  "NOMODIFY_ETCHOSTS_OPT",
104
  "NOMODIFY_SSH_SETUP_OPT",
105
  "NONICS_OPT",
106
  "NONLIVE_OPT",
107
  "NONPLUS1_OPT",
108
  "NOSHUTDOWN_OPT",
109
  "NOSTART_OPT",
110
  "NOSSH_KEYCHECK_OPT",
111
  "NOVOTING_OPT",
112
  "NWSYNC_OPT",
113
  "ON_PRIMARY_OPT",
114
  "ON_SECONDARY_OPT",
115
  "OFFLINE_OPT",
116
  "OSPARAMS_OPT",
117
  "OS_OPT",
118
  "OS_SIZE_OPT",
119
  "RAPI_CERT_OPT",
120
  "READD_OPT",
121
  "REBOOT_TYPE_OPT",
122
  "REMOVE_INSTANCE_OPT",
123
  "REMOVE_UIDS_OPT",
124
  "ROMAN_OPT",
125
  "SECONDARY_IP_OPT",
126
  "SELECT_OS_OPT",
127
  "SEP_OPT",
128
  "SHOWCMD_OPT",
129
  "SHUTDOWN_TIMEOUT_OPT",
130
  "SINGLE_NODE_OPT",
131
  "SRC_DIR_OPT",
132
  "SRC_NODE_OPT",
133
  "SUBMIT_OPT",
134
  "STATIC_OPT",
135
  "SYNC_OPT",
136
  "TAG_SRC_OPT",
137
  "TIMEOUT_OPT",
138
  "UIDPOOL_OPT",
139
  "USEUNITS_OPT",
140
  "USE_REPL_NET_OPT",
141
  "VERBOSE_OPT",
142
  "VG_NAME_OPT",
143
  "YES_DOIT_OPT",
144
  # Generic functions for CLI programs
145
  "GenericMain",
146
  "GenericInstanceCreate",
147
  "GetClient",
148
  "GetOnlineNodes",
149
  "JobExecutor",
150
  "JobSubmittedException",
151
  "ParseTimespec",
152
  "RunWhileClusterStopped",
153
  "SubmitOpCode",
154
  "SubmitOrSend",
155
  "UsesRPC",
156
  # Formatting functions
157
  "ToStderr", "ToStdout",
158
  "FormatError",
159
  "GenerateTable",
160
  "AskUser",
161
  "FormatTimestamp",
162
  # Tags functions
163
  "ListTags",
164
  "AddTags",
165
  "RemoveTags",
166
  # command line options support infrastructure
167
  "ARGS_MANY_INSTANCES",
168
  "ARGS_MANY_NODES",
169
  "ARGS_NONE",
170
  "ARGS_ONE_INSTANCE",
171
  "ARGS_ONE_NODE",
172
  "ARGS_ONE_OS",
173
  "ArgChoice",
174
  "ArgCommand",
175
  "ArgFile",
176
  "ArgHost",
177
  "ArgInstance",
178
  "ArgJobId",
179
  "ArgNode",
180
  "ArgOs",
181
  "ArgSuggest",
182
  "ArgUnknown",
183
  "OPT_COMPL_INST_ADD_NODES",
184
  "OPT_COMPL_MANY_NODES",
185
  "OPT_COMPL_ONE_IALLOCATOR",
186
  "OPT_COMPL_ONE_INSTANCE",
187
  "OPT_COMPL_ONE_NODE",
188
  "OPT_COMPL_ONE_OS",
189
  "cli_option",
190
  "SplitNodeOption",
191
  "CalculateOSNames",
192
  ]
193

    
194
NO_PREFIX = "no_"
195
UN_PREFIX = "-"
196

    
197

    
198
class _Argument:
199
  def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
200
    self.min = min
201
    self.max = max
202

    
203
  def __repr__(self):
204
    return ("<%s min=%s max=%s>" %
205
            (self.__class__.__name__, self.min, self.max))
206

    
207

    
208
class ArgSuggest(_Argument):
209
  """Suggesting argument.
210

211
  Value can be any of the ones passed to the constructor.
212

213
  """
214
  # pylint: disable-msg=W0622
215
  def __init__(self, min=0, max=None, choices=None):
216
    _Argument.__init__(self, min=min, max=max)
217
    self.choices = choices
218

    
219
  def __repr__(self):
220
    return ("<%s min=%s max=%s choices=%r>" %
221
            (self.__class__.__name__, self.min, self.max, self.choices))
222

    
223

    
224
class ArgChoice(ArgSuggest):
225
  """Choice argument.
226

227
  Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
228
  but value must be one of the choices.
229

230
  """
231

    
232

    
233
class ArgUnknown(_Argument):
234
  """Unknown argument to program (e.g. determined at runtime).
235

236
  """
237

    
238

    
239
class ArgInstance(_Argument):
240
  """Instances argument.
241

242
  """
243

    
244

    
245
class ArgNode(_Argument):
246
  """Node argument.
247

248
  """
249

    
250
class ArgJobId(_Argument):
251
  """Job ID argument.
252

253
  """
254

    
255

    
256
class ArgFile(_Argument):
257
  """File path argument.
258

259
  """
260

    
261

    
262
class ArgCommand(_Argument):
263
  """Command argument.
264

265
  """
266

    
267

    
268
class ArgHost(_Argument):
269
  """Host argument.
270

271
  """
272

    
273

    
274
class ArgOs(_Argument):
275
  """OS argument.
276

277
  """
278

    
279

    
280
ARGS_NONE = []
281
ARGS_MANY_INSTANCES = [ArgInstance()]
282
ARGS_MANY_NODES = [ArgNode()]
283
ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
284
ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
285
ARGS_ONE_OS = [ArgOs(min=1, max=1)]
286

    
287

    
288
def _ExtractTagsObject(opts, args):
289
  """Extract the tag type object.
290

291
  Note that this function will modify its args parameter.
292

293
  """
294
  if not hasattr(opts, "tag_type"):
295
    raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
296
  kind = opts.tag_type
297
  if kind == constants.TAG_CLUSTER:
298
    retval = kind, kind
299
  elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
300
    if not args:
301
      raise errors.OpPrereqError("no arguments passed to the command")
302
    name = args.pop(0)
303
    retval = kind, name
304
  else:
305
    raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
306
  return retval
307

    
308

    
309
def _ExtendTags(opts, args):
310
  """Extend the args if a source file has been given.
311

312
  This function will extend the tags with the contents of the file
313
  passed in the 'tags_source' attribute of the opts parameter. A file
314
  named '-' will be replaced by stdin.
315

316
  """
317
  fname = opts.tags_source
318
  if fname is None:
319
    return
320
  if fname == "-":
321
    new_fh = sys.stdin
322
  else:
323
    new_fh = open(fname, "r")
324
  new_data = []
325
  try:
326
    # we don't use the nice 'new_data = [line.strip() for line in fh]'
327
    # because of python bug 1633941
328
    while True:
329
      line = new_fh.readline()
330
      if not line:
331
        break
332
      new_data.append(line.strip())
333
  finally:
334
    new_fh.close()
335
  args.extend(new_data)
336

    
337

    
338
def ListTags(opts, args):
339
  """List the tags on a given object.
340

341
  This is a generic implementation that knows how to deal with all
342
  three cases of tag objects (cluster, node, instance). The opts
343
  argument is expected to contain a tag_type field denoting what
344
  object type we work on.
345

346
  """
347
  kind, name = _ExtractTagsObject(opts, args)
348
  cl = GetClient()
349
  result = cl.QueryTags(kind, name)
350
  result = list(result)
351
  result.sort()
352
  for tag in result:
353
    ToStdout(tag)
354

    
355

    
356
def AddTags(opts, args):
357
  """Add tags on a given object.
358

359
  This is a generic implementation that knows how to deal with all
360
  three cases of tag objects (cluster, node, instance). The opts
361
  argument is expected to contain a tag_type field denoting what
362
  object type we work on.
363

364
  """
365
  kind, name = _ExtractTagsObject(opts, args)
366
  _ExtendTags(opts, args)
367
  if not args:
368
    raise errors.OpPrereqError("No tags to be added")
369
  op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
370
  SubmitOpCode(op)
371

    
372

    
373
def RemoveTags(opts, args):
374
  """Remove tags from a given object.
375

376
  This is a generic implementation that knows how to deal with all
377
  three cases of tag objects (cluster, node, instance). The opts
378
  argument is expected to contain a tag_type field denoting what
379
  object type we work on.
380

381
  """
382
  kind, name = _ExtractTagsObject(opts, args)
383
  _ExtendTags(opts, args)
384
  if not args:
385
    raise errors.OpPrereqError("No tags to be removed")
386
  op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
387
  SubmitOpCode(op)
388

    
389

    
390
def check_unit(option, opt, value): # pylint: disable-msg=W0613
391
  """OptParsers custom converter for units.
392

393
  """
394
  try:
395
    return utils.ParseUnit(value)
396
  except errors.UnitParseError, err:
397
    raise OptionValueError("option %s: %s" % (opt, err))
398

    
399

    
400
def _SplitKeyVal(opt, data):
401
  """Convert a KeyVal string into a dict.
402

403
  This function will convert a key=val[,...] string into a dict. Empty
404
  values will be converted specially: keys which have the prefix 'no_'
405
  will have the value=False and the prefix stripped, the others will
406
  have value=True.
407

408
  @type opt: string
409
  @param opt: a string holding the option name for which we process the
410
      data, used in building error messages
411
  @type data: string
412
  @param data: a string of the format key=val,key=val,...
413
  @rtype: dict
414
  @return: {key=val, key=val}
415
  @raises errors.ParameterError: if there are duplicate keys
416

417
  """
418
  kv_dict = {}
419
  if data:
420
    for elem in utils.UnescapeAndSplit(data, sep=","):
421
      if "=" in elem:
422
        key, val = elem.split("=", 1)
423
      else:
424
        if elem.startswith(NO_PREFIX):
425
          key, val = elem[len(NO_PREFIX):], False
426
        elif elem.startswith(UN_PREFIX):
427
          key, val = elem[len(UN_PREFIX):], None
428
        else:
429
          key, val = elem, True
430
      if key in kv_dict:
431
        raise errors.ParameterError("Duplicate key '%s' in option %s" %
432
                                    (key, opt))
433
      kv_dict[key] = val
434
  return kv_dict
435

    
436

    
437
def check_ident_key_val(option, opt, value):  # pylint: disable-msg=W0613
438
  """Custom parser for ident:key=val,key=val options.
439

440
  This will store the parsed values as a tuple (ident, {key: val}). As such,
441
  multiple uses of this option via action=append is possible.
442

443
  """
444
  if ":" not in value:
445
    ident, rest = value, ''
446
  else:
447
    ident, rest = value.split(":", 1)
448

    
449
  if ident.startswith(NO_PREFIX):
450
    if rest:
451
      msg = "Cannot pass options when removing parameter groups: %s" % value
452
      raise errors.ParameterError(msg)
453
    retval = (ident[len(NO_PREFIX):], False)
454
  elif ident.startswith(UN_PREFIX):
455
    if rest:
456
      msg = "Cannot pass options when removing parameter groups: %s" % value
457
      raise errors.ParameterError(msg)
458
    retval = (ident[len(UN_PREFIX):], None)
459
  else:
460
    kv_dict = _SplitKeyVal(opt, rest)
461
    retval = (ident, kv_dict)
462
  return retval
463

    
464

    
465
def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
466
  """Custom parser class for key=val,key=val options.
467

468
  This will store the parsed values as a dict {key: val}.
469

470
  """
471
  return _SplitKeyVal(opt, value)
472

    
473

    
474
def check_bool(option, opt, value): # pylint: disable-msg=W0613
475
  """Custom parser for yes/no options.
476

477
  This will store the parsed value as either True or False.
478

479
  """
480
  value = value.lower()
481
  if value == constants.VALUE_FALSE or value == "no":
482
    return False
483
  elif value == constants.VALUE_TRUE or value == "yes":
484
    return True
485
  else:
486
    raise errors.ParameterError("Invalid boolean value '%s'" % value)
487

    
488

    
489
# completion_suggestion is normally a list. Using numeric values not evaluating
490
# to False for dynamic completion.
491
(OPT_COMPL_MANY_NODES,
492
 OPT_COMPL_ONE_NODE,
493
 OPT_COMPL_ONE_INSTANCE,
494
 OPT_COMPL_ONE_OS,
495
 OPT_COMPL_ONE_IALLOCATOR,
496
 OPT_COMPL_INST_ADD_NODES) = range(100, 106)
497

    
498
OPT_COMPL_ALL = frozenset([
499
  OPT_COMPL_MANY_NODES,
500
  OPT_COMPL_ONE_NODE,
501
  OPT_COMPL_ONE_INSTANCE,
502
  OPT_COMPL_ONE_OS,
503
  OPT_COMPL_ONE_IALLOCATOR,
504
  OPT_COMPL_INST_ADD_NODES,
505
  ])
506

    
507

    
508
class CliOption(Option):
509
  """Custom option class for optparse.
510

511
  """
512
  ATTRS = Option.ATTRS + [
513
    "completion_suggest",
514
    ]
515
  TYPES = Option.TYPES + (
516
    "identkeyval",
517
    "keyval",
518
    "unit",
519
    "bool",
520
    )
521
  TYPE_CHECKER = Option.TYPE_CHECKER.copy()
522
  TYPE_CHECKER["identkeyval"] = check_ident_key_val
523
  TYPE_CHECKER["keyval"] = check_key_val
524
  TYPE_CHECKER["unit"] = check_unit
525
  TYPE_CHECKER["bool"] = check_bool
526

    
527

    
528
# optparse.py sets make_option, so we do it for our own option class, too
529
cli_option = CliOption
530

    
531

    
532
_YORNO = "yes|no"
533

    
534
DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
535
                       help="Increase debugging level")
536

    
537
NOHDR_OPT = cli_option("--no-headers", default=False,
538
                       action="store_true", dest="no_headers",
539
                       help="Don't display column headers")
540

    
541
SEP_OPT = cli_option("--separator", default=None,
542
                     action="store", dest="separator",
543
                     help=("Separator between output fields"
544
                           " (defaults to one space)"))
545

    
546
USEUNITS_OPT = cli_option("--units", default=None,
547
                          dest="units", choices=('h', 'm', 'g', 't'),
548
                          help="Specify units for output (one of hmgt)")
549

    
550
FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
551
                        type="string", metavar="FIELDS",
552
                        help="Comma separated list of output fields")
553

    
554
FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
555
                       default=False, help="Force the operation")
556

    
557
CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
558
                         default=False, help="Do not require confirmation")
559

    
560
TAG_SRC_OPT = cli_option("--from", dest="tags_source",
561
                         default=None, help="File with tag names")
562

    
563
SUBMIT_OPT = cli_option("--submit", dest="submit_only",
564
                        default=False, action="store_true",
565
                        help=("Submit the job and return the job ID, but"
566
                              " don't wait for the job to finish"))
567

    
568
SYNC_OPT = cli_option("--sync", dest="do_locking",
569
                      default=False, action="store_true",
570
                      help=("Grab locks while doing the queries"
571
                            " in order to ensure more consistent results"))
572

    
573
_DRY_RUN_OPT = cli_option("--dry-run", default=False,
574
                          action="store_true",
575
                          help=("Do not execute the operation, just run the"
576
                                " check steps and verify it it could be"
577
                                " executed"))
578

    
579
VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
580
                         action="store_true",
581
                         help="Increase the verbosity of the operation")
582

    
583
DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
584
                              action="store_true", dest="simulate_errors",
585
                              help="Debugging option that makes the operation"
586
                              " treat most runtime checks as failed")
587

    
588
NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
589
                        default=True, action="store_false",
590
                        help="Don't wait for sync (DANGEROUS!)")
591

    
592
DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
593
                               help="Custom disk setup (diskless, file,"
594
                               " plain or drbd)",
595
                               default=None, metavar="TEMPL",
596
                               choices=list(constants.DISK_TEMPLATES))
597

    
598
NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
599
                        help="Do not create any network cards for"
600
                        " the instance")
601

    
602
FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
603
                               help="Relative path under default cluster-wide"
604
                               " file storage dir to store file-based disks",
605
                               default=None, metavar="<DIR>")
606

    
607
FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
608
                                  help="Driver to use for image files",
609
                                  default="loop", metavar="<DRIVER>",
610
                                  choices=list(constants.FILE_DRIVER))
611

    
612
IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
613
                            help="Select nodes for the instance automatically"
614
                            " using the <NAME> iallocator plugin",
615
                            default=None, type="string",
616
                            completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
617

    
618
OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
619
                    metavar="<os>",
620
                    completion_suggest=OPT_COMPL_ONE_OS)
621

    
622
OSPARAMS_OPT = cli_option("-O", "--os-parameters", dest="osparams",
623
                         type="keyval", default={},
624
                         help="OS parameters")
625

    
626
FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
627
                               action="store_true", default=False,
628
                               help="Force an unknown variant")
629

    
630
NO_INSTALL_OPT = cli_option("--no-install", dest="no_install",
631
                            action="store_true", default=False,
632
                            help="Do not install the OS (will"
633
                            " enable no-start)")
634

    
635
BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
636
                         type="keyval", default={},
637
                         help="Backend parameters")
638

    
639
HVOPTS_OPT =  cli_option("-H", "--hypervisor-parameters", type="keyval",
640
                         default={}, dest="hvparams",
641
                         help="Hypervisor parameters")
642

    
643
HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
644
                            help="Hypervisor and hypervisor options, in the"
645
                            " format hypervisor:option=value,option=value,...",
646
                            default=None, type="identkeyval")
647

    
648
HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
649
                        help="Hypervisor and hypervisor options, in the"
650
                        " format hypervisor:option=value,option=value,...",
651
                        default=[], action="append", type="identkeyval")
652

    
653
NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
654
                           action="store_false",
655
                           help="Don't check that the instance's IP"
656
                           " is alive")
657

    
658
NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
659
                             default=True, action="store_false",
660
                             help="Don't check that the instance's name"
661
                             " is resolvable")
662

    
663
NET_OPT = cli_option("--net",
664
                     help="NIC parameters", default=[],
665
                     dest="nics", action="append", type="identkeyval")
666

    
667
DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
668
                      dest="disks", action="append", type="identkeyval")
669

    
670
DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
671
                         help="Comma-separated list of disks"
672
                         " indices to act on (e.g. 0,2) (optional,"
673
                         " defaults to all disks)")
674

    
675
OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
676
                         help="Enforces a single-disk configuration using the"
677
                         " given disk size, in MiB unless a suffix is used",
678
                         default=None, type="unit", metavar="<size>")
679

    
680
IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
681
                                dest="ignore_consistency",
682
                                action="store_true", default=False,
683
                                help="Ignore the consistency of the disks on"
684
                                " the secondary")
685

    
686
NONLIVE_OPT = cli_option("--non-live", dest="live",
687
                         default=True, action="store_false",
688
                         help="Do a non-live migration (this usually means"
689
                         " freeze the instance, save the state, transfer and"
690
                         " only then resume running on the secondary node)")
691

    
692
NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
693
                                help="Target node and optional secondary node",
694
                                metavar="<pnode>[:<snode>]",
695
                                completion_suggest=OPT_COMPL_INST_ADD_NODES)
696

    
697
NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
698
                           action="append", metavar="<node>",
699
                           help="Use only this node (can be used multiple"
700
                           " times, if not given defaults to all nodes)",
701
                           completion_suggest=OPT_COMPL_ONE_NODE)
702

    
703
SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
704
                             metavar="<node>",
705
                             completion_suggest=OPT_COMPL_ONE_NODE)
706

    
707
NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
708
                         action="store_false",
709
                         help="Don't start the instance after creation")
710

    
711
SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
712
                         action="store_true", default=False,
713
                         help="Show command instead of executing it")
714

    
715
CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
716
                         default=False, action="store_true",
717
                         help="Instead of performing the migration, try to"
718
                         " recover from a failed cleanup. This is safe"
719
                         " to run even if the instance is healthy, but it"
720
                         " will create extra replication traffic and "
721
                         " disrupt briefly the replication (like during the"
722
                         " migration")
723

    
724
STATIC_OPT = cli_option("-s", "--static", dest="static",
725
                        action="store_true", default=False,
726
                        help="Only show configuration data, not runtime data")
727

    
728
ALL_OPT = cli_option("--all", dest="show_all",
729
                     default=False, action="store_true",
730
                     help="Show info on all instances on the cluster."
731
                     " This can take a long time to run, use wisely")
732

    
733
SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
734
                           action="store_true", default=False,
735
                           help="Interactive OS reinstall, lists available"
736
                           " OS templates for selection")
737

    
738
IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
739
                                 action="store_true", default=False,
740
                                 help="Remove the instance from the cluster"
741
                                 " configuration even if there are failures"
742
                                 " during the removal process")
743

    
744
IGNORE_REMOVE_FAILURES_OPT = cli_option("--ignore-remove-failures",
745
                                        dest="ignore_remove_failures",
746
                                        action="store_true", default=False,
747
                                        help="Remove the instance from the"
748
                                        " cluster configuration even if there"
749
                                        " are failures during the removal"
750
                                        " process")
751

    
752
REMOVE_INSTANCE_OPT = cli_option("--remove-instance", dest="remove_instance",
753
                                 action="store_true", default=False,
754
                                 help="Remove the instance from the cluster")
755

    
756
NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
757
                               help="Specifies the new secondary node",
758
                               metavar="NODE", default=None,
759
                               completion_suggest=OPT_COMPL_ONE_NODE)
760

    
761
ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
762
                            default=False, action="store_true",
763
                            help="Replace the disk(s) on the primary"
764
                            " node (only for the drbd template)")
765

    
766
ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
767
                              default=False, action="store_true",
768
                              help="Replace the disk(s) on the secondary"
769
                              " node (only for the drbd template)")
770

    
771
AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
772
                              default=False, action="store_true",
773
                              help="Lock all nodes and auto-promote as needed"
774
                              " to MC status")
775

    
776
AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
777
                              default=False, action="store_true",
778
                              help="Automatically replace faulty disks"
779
                              " (only for the drbd template)")
780

    
781
IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
782
                             default=False, action="store_true",
783
                             help="Ignore current recorded size"
784
                             " (useful for forcing activation when"
785
                             " the recorded size is wrong)")
786

    
787
SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
788
                          metavar="<node>",
789
                          completion_suggest=OPT_COMPL_ONE_NODE)
790

    
791
SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
792
                         metavar="<dir>")
793

    
794
SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
795
                              help="Specify the secondary ip for the node",
796
                              metavar="ADDRESS", default=None)
797

    
798
READD_OPT = cli_option("--readd", dest="readd",
799
                       default=False, action="store_true",
800
                       help="Readd old node after replacing it")
801

    
802
NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
803
                                default=True, action="store_false",
804
                                help="Disable SSH key fingerprint checking")
805

    
806

    
807
MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
808
                    type="bool", default=None, metavar=_YORNO,
809
                    help="Set the master_candidate flag on the node")
810

    
811
OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
812
                         type="bool", default=None,
813
                         help="Set the offline flag on the node")
814

    
815
DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
816
                         type="bool", default=None,
817
                         help="Set the drained flag on the node")
818

    
819
ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
820
                             type="bool", default=None, metavar=_YORNO,
821
                             help="Set the allocatable flag on a volume")
822

    
823
NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
824
                               help="Disable support for lvm based instances"
825
                               " (cluster-wide)",
826
                               action="store_false", default=True)
827

    
828
ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
829
                            dest="enabled_hypervisors",
830
                            help="Comma-separated list of hypervisors",
831
                            type="string", default=None)
832

    
833
NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
834
                            type="keyval", default={},
835
                            help="NIC parameters")
836

    
837
CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
838
                         dest="candidate_pool_size", type="int",
839
                         help="Set the candidate pool size")
840

    
841
VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name",
842
                         help="Enables LVM and specifies the volume group"
843
                         " name (cluster-wide) for disk allocation [xenvg]",
844
                         metavar="VG", default=None)
845

    
846
YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
847
                          help="Destroy cluster", action="store_true")
848

    
849
NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
850
                          help="Skip node agreement check (dangerous)",
851
                          action="store_true", default=False)
852

    
853
MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
854
                            help="Specify the mac prefix for the instance IP"
855
                            " addresses, in the format XX:XX:XX",
856
                            metavar="PREFIX",
857
                            default=None)
858

    
859
MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
860
                               help="Specify the node interface (cluster-wide)"
861
                               " on which the master IP address will be added "
862
                               " [%s]" % constants.DEFAULT_BRIDGE,
863
                               metavar="NETDEV",
864
                               default=constants.DEFAULT_BRIDGE)
865

    
866
GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
867
                                help="Specify the default directory (cluster-"
868
                                "wide) for storing the file-based disks [%s]" %
869
                                constants.DEFAULT_FILE_STORAGE_DIR,
870
                                metavar="DIR",
871
                                default=constants.DEFAULT_FILE_STORAGE_DIR)
872

    
873
NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
874
                                   help="Don't modify /etc/hosts",
875
                                   action="store_false", default=True)
876

    
877
NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
878
                                    help="Don't initialize SSH keys",
879
                                    action="store_false", default=True)
880

    
881
ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
882
                             help="Enable parseable error messages",
883
                             action="store_true", default=False)
884

    
885
NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
886
                          help="Skip N+1 memory redundancy tests",
887
                          action="store_true", default=False)
888

    
889
REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
890
                             help="Type of reboot: soft/hard/full",
891
                             default=constants.INSTANCE_REBOOT_HARD,
892
                             metavar="<REBOOT>",
893
                             choices=list(constants.REBOOT_TYPES))
894

    
895
IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
896
                                    dest="ignore_secondaries",
897
                                    default=False, action="store_true",
898
                                    help="Ignore errors from secondaries")
899

    
900
NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
901
                            action="store_false", default=True,
902
                            help="Don't shutdown the instance (unsafe)")
903

    
904
TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
905
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
906
                         help="Maximum time to wait")
907

    
908
SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
909
                         dest="shutdown_timeout", type="int",
910
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
911
                         help="Maximum time to wait for instance shutdown")
912

    
913
EARLY_RELEASE_OPT = cli_option("--early-release",
914
                               dest="early_release", default=False,
915
                               action="store_true",
916
                               help="Release the locks on the secondary"
917
                               " node(s) early")
918

    
919
NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
920
                                  dest="new_cluster_cert",
921
                                  default=False, action="store_true",
922
                                  help="Generate a new cluster certificate")
923

    
924
RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
925
                           default=None,
926
                           help="File containing new RAPI certificate")
927

    
928
NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
929
                               default=None, action="store_true",
930
                               help=("Generate a new self-signed RAPI"
931
                                     " certificate"))
932

    
933
NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
934
                                    dest="new_confd_hmac_key",
935
                                    default=False, action="store_true",
936
                                    help=("Create a new HMAC key for %s" %
937
                                          constants.CONFD))
938

    
939
CLUSTER_DOMAIN_SECRET_OPT = cli_option("--cluster-domain-secret",
940
                                       dest="cluster_domain_secret",
941
                                       default=None,
942
                                       help=("Load new new cluster domain"
943
                                             " secret from file"))
944

    
945
NEW_CLUSTER_DOMAIN_SECRET_OPT = cli_option("--new-cluster-domain-secret",
946
                                           dest="new_cluster_domain_secret",
947
                                           default=False, action="store_true",
948
                                           help=("Create a new cluster domain"
949
                                                 " secret"))
950

    
951
USE_REPL_NET_OPT = cli_option("--use-replication-network",
952
                              dest="use_replication_network",
953
                              help="Whether to use the replication network"
954
                              " for talking to the nodes",
955
                              action="store_true", default=False)
956

    
957
MAINTAIN_NODE_HEALTH_OPT = \
958
    cli_option("--maintain-node-health", dest="maintain_node_health",
959
               metavar=_YORNO, default=None, type="bool",
960
               help="Configure the cluster to automatically maintain node"
961
               " health, by shutting down unknown instances, shutting down"
962
               " unknown DRBD devices, etc.")
963

    
964
IDENTIFY_DEFAULTS_OPT = \
965
    cli_option("--identify-defaults", dest="identify_defaults",
966
               default=False, action="store_true",
967
               help="Identify which saved instance parameters are equal to"
968
               " the current cluster defaults and set them as such, instead"
969
               " of marking them as overridden")
970

    
971
UIDPOOL_OPT = cli_option("--uid-pool", default=None,
972
                         action="store", dest="uid_pool",
973
                         help=("A list of user-ids or user-id"
974
                               " ranges separated by commas"))
975

    
976
ADD_UIDS_OPT = cli_option("--add-uids", default=None,
977
                          action="store", dest="add_uids",
978
                          help=("A list of user-ids or user-id"
979
                                " ranges separated by commas, to be"
980
                                " added to the user-id pool"))
981

    
982
REMOVE_UIDS_OPT = cli_option("--remove-uids", default=None,
983
                             action="store", dest="remove_uids",
984
                             help=("A list of user-ids or user-id"
985
                                   " ranges separated by commas, to be"
986
                                   " removed from the user-id pool"))
987

    
988
ROMAN_OPT = cli_option("--roman",
989
                       dest="roman_integers", default=False,
990
                       action="store_true",
991
                       help="Use roman numbers for positive integers")
992

    
993
DRBD_HELPER_OPT = cli_option("--drbd-usermode-helper", dest="drbd_helper",
994
                             action="store", default=None,
995
                             help="Specifies usermode helper for DRBD")
996

    
997
NODRBD_STORAGE_OPT = cli_option("--no-drbd-storage", dest="drbd_storage",
998
                                action="store_false", default=True,
999
                                help="Disable support for DRBD")
1000

    
1001

    
1002
def _ParseArgs(argv, commands, aliases):
1003
  """Parser for the command line arguments.
1004

1005
  This function parses the arguments and returns the function which
1006
  must be executed together with its (modified) arguments.
1007

1008
  @param argv: the command line
1009
  @param commands: dictionary with special contents, see the design
1010
      doc for cmdline handling
1011
  @param aliases: dictionary with command aliases {'alias': 'target, ...}
1012

1013
  """
1014
  if len(argv) == 0:
1015
    binary = "<command>"
1016
  else:
1017
    binary = argv[0].split("/")[-1]
1018

    
1019
  if len(argv) > 1 and argv[1] == "--version":
1020
    ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
1021
    # Quit right away. That way we don't have to care about this special
1022
    # argument. optparse.py does it the same.
1023
    sys.exit(0)
1024

    
1025
  if len(argv) < 2 or not (argv[1] in commands or
1026
                           argv[1] in aliases):
1027
    # let's do a nice thing
1028
    sortedcmds = commands.keys()
1029
    sortedcmds.sort()
1030

    
1031
    ToStdout("Usage: %s {command} [options...] [argument...]", binary)
1032
    ToStdout("%s <command> --help to see details, or man %s", binary, binary)
1033
    ToStdout("")
1034

    
1035
    # compute the max line length for cmd + usage
1036
    mlen = max([len(" %s" % cmd) for cmd in commands])
1037
    mlen = min(60, mlen) # should not get here...
1038

    
1039
    # and format a nice command list
1040
    ToStdout("Commands:")
1041
    for cmd in sortedcmds:
1042
      cmdstr = " %s" % (cmd,)
1043
      help_text = commands[cmd][4]
1044
      help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
1045
      ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
1046
      for line in help_lines:
1047
        ToStdout("%-*s   %s", mlen, "", line)
1048

    
1049
    ToStdout("")
1050

    
1051
    return None, None, None
1052

    
1053
  # get command, unalias it, and look it up in commands
1054
  cmd = argv.pop(1)
1055
  if cmd in aliases:
1056
    if cmd in commands:
1057
      raise errors.ProgrammerError("Alias '%s' overrides an existing"
1058
                                   " command" % cmd)
1059

    
1060
    if aliases[cmd] not in commands:
1061
      raise errors.ProgrammerError("Alias '%s' maps to non-existing"
1062
                                   " command '%s'" % (cmd, aliases[cmd]))
1063

    
1064
    cmd = aliases[cmd]
1065

    
1066
  func, args_def, parser_opts, usage, description = commands[cmd]
1067
  parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT, DEBUG_OPT],
1068
                        description=description,
1069
                        formatter=TitledHelpFormatter(),
1070
                        usage="%%prog %s %s" % (cmd, usage))
1071
  parser.disable_interspersed_args()
1072
  options, args = parser.parse_args()
1073

    
1074
  if not _CheckArguments(cmd, args_def, args):
1075
    return None, None, None
1076

    
1077
  return func, options, args
1078

    
1079

    
1080
def _CheckArguments(cmd, args_def, args):
1081
  """Verifies the arguments using the argument definition.
1082

1083
  Algorithm:
1084

1085
    1. Abort with error if values specified by user but none expected.
1086

1087
    1. For each argument in definition
1088

1089
      1. Keep running count of minimum number of values (min_count)
1090
      1. Keep running count of maximum number of values (max_count)
1091
      1. If it has an unlimited number of values
1092

1093
        1. Abort with error if it's not the last argument in the definition
1094

1095
    1. If last argument has limited number of values
1096

1097
      1. Abort with error if number of values doesn't match or is too large
1098

1099
    1. Abort with error if user didn't pass enough values (min_count)
1100

1101
  """
1102
  if args and not args_def:
1103
    ToStderr("Error: Command %s expects no arguments", cmd)
1104
    return False
1105

    
1106
  min_count = None
1107
  max_count = None
1108
  check_max = None
1109

    
1110
  last_idx = len(args_def) - 1
1111

    
1112
  for idx, arg in enumerate(args_def):
1113
    if min_count is None:
1114
      min_count = arg.min
1115
    elif arg.min is not None:
1116
      min_count += arg.min
1117

    
1118
    if max_count is None:
1119
      max_count = arg.max
1120
    elif arg.max is not None:
1121
      max_count += arg.max
1122

    
1123
    if idx == last_idx:
1124
      check_max = (arg.max is not None)
1125

    
1126
    elif arg.max is None:
1127
      raise errors.ProgrammerError("Only the last argument can have max=None")
1128

    
1129
  if check_max:
1130
    # Command with exact number of arguments
1131
    if (min_count is not None and max_count is not None and
1132
        min_count == max_count and len(args) != min_count):
1133
      ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
1134
      return False
1135

    
1136
    # Command with limited number of arguments
1137
    if max_count is not None and len(args) > max_count:
1138
      ToStderr("Error: Command %s expects only %d argument(s)",
1139
               cmd, max_count)
1140
      return False
1141

    
1142
  # Command with some required arguments
1143
  if min_count is not None and len(args) < min_count:
1144
    ToStderr("Error: Command %s expects at least %d argument(s)",
1145
             cmd, min_count)
1146
    return False
1147

    
1148
  return True
1149

    
1150

    
1151
def SplitNodeOption(value):
1152
  """Splits the value of a --node option.
1153

1154
  """
1155
  if value and ':' in value:
1156
    return value.split(':', 1)
1157
  else:
1158
    return (value, None)
1159

    
1160

    
1161
def CalculateOSNames(os_name, os_variants):
1162
  """Calculates all the names an OS can be called, according to its variants.
1163

1164
  @type os_name: string
1165
  @param os_name: base name of the os
1166
  @type os_variants: list or None
1167
  @param os_variants: list of supported variants
1168
  @rtype: list
1169
  @return: list of valid names
1170

1171
  """
1172
  if os_variants:
1173
    return ['%s+%s' % (os_name, v) for v in os_variants]
1174
  else:
1175
    return [os_name]
1176

    
1177

    
1178
def UsesRPC(fn):
1179
  def wrapper(*args, **kwargs):
1180
    rpc.Init()
1181
    try:
1182
      return fn(*args, **kwargs)
1183
    finally:
1184
      rpc.Shutdown()
1185
  return wrapper
1186

    
1187

    
1188
def AskUser(text, choices=None):
1189
  """Ask the user a question.
1190

1191
  @param text: the question to ask
1192

1193
  @param choices: list with elements tuples (input_char, return_value,
1194
      description); if not given, it will default to: [('y', True,
1195
      'Perform the operation'), ('n', False, 'Do no do the operation')];
1196
      note that the '?' char is reserved for help
1197

1198
  @return: one of the return values from the choices list; if input is
1199
      not possible (i.e. not running with a tty, we return the last
1200
      entry from the list
1201

1202
  """
1203
  if choices is None:
1204
    choices = [('y', True, 'Perform the operation'),
1205
               ('n', False, 'Do not perform the operation')]
1206
  if not choices or not isinstance(choices, list):
1207
    raise errors.ProgrammerError("Invalid choices argument to AskUser")
1208
  for entry in choices:
1209
    if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
1210
      raise errors.ProgrammerError("Invalid choices element to AskUser")
1211

    
1212
  answer = choices[-1][1]
1213
  new_text = []
1214
  for line in text.splitlines():
1215
    new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
1216
  text = "\n".join(new_text)
1217
  try:
1218
    f = file("/dev/tty", "a+")
1219
  except IOError:
1220
    return answer
1221
  try:
1222
    chars = [entry[0] for entry in choices]
1223
    chars[-1] = "[%s]" % chars[-1]
1224
    chars.append('?')
1225
    maps = dict([(entry[0], entry[1]) for entry in choices])
1226
    while True:
1227
      f.write(text)
1228
      f.write('\n')
1229
      f.write("/".join(chars))
1230
      f.write(": ")
1231
      line = f.readline(2).strip().lower()
1232
      if line in maps:
1233
        answer = maps[line]
1234
        break
1235
      elif line == '?':
1236
        for entry in choices:
1237
          f.write(" %s - %s\n" % (entry[0], entry[2]))
1238
        f.write("\n")
1239
        continue
1240
  finally:
1241
    f.close()
1242
  return answer
1243

    
1244

    
1245
class JobSubmittedException(Exception):
1246
  """Job was submitted, client should exit.
1247

1248
  This exception has one argument, the ID of the job that was
1249
  submitted. The handler should print this ID.
1250

1251
  This is not an error, just a structured way to exit from clients.
1252

1253
  """
1254

    
1255

    
1256
def SendJob(ops, cl=None):
1257
  """Function to submit an opcode without waiting for the results.
1258

1259
  @type ops: list
1260
  @param ops: list of opcodes
1261
  @type cl: luxi.Client
1262
  @param cl: the luxi client to use for communicating with the master;
1263
             if None, a new client will be created
1264

1265
  """
1266
  if cl is None:
1267
    cl = GetClient()
1268

    
1269
  job_id = cl.SubmitJob(ops)
1270

    
1271
  return job_id
1272

    
1273

    
1274
def GenericPollJob(job_id, cbs, report_cbs):
1275
  """Generic job-polling function.
1276

1277
  @type job_id: number
1278
  @param job_id: Job ID
1279
  @type cbs: Instance of L{JobPollCbBase}
1280
  @param cbs: Data callbacks
1281
  @type report_cbs: Instance of L{JobPollReportCbBase}
1282
  @param report_cbs: Reporting callbacks
1283

1284
  """
1285
  prev_job_info = None
1286
  prev_logmsg_serial = None
1287

    
1288
  status = None
1289

    
1290
  while True:
1291
    result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1292
                                      prev_logmsg_serial)
1293
    if not result:
1294
      # job not found, go away!
1295
      raise errors.JobLost("Job with id %s lost" % job_id)
1296

    
1297
    if result == constants.JOB_NOTCHANGED:
1298
      report_cbs.ReportNotChanged(job_id, status)
1299

    
1300
      # Wait again
1301
      continue
1302

    
1303
    # Split result, a tuple of (field values, log entries)
1304
    (job_info, log_entries) = result
1305
    (status, ) = job_info
1306

    
1307
    if log_entries:
1308
      for log_entry in log_entries:
1309
        (serial, timestamp, log_type, message) = log_entry
1310
        report_cbs.ReportLogMessage(job_id, serial, timestamp,
1311
                                    log_type, message)
1312
        prev_logmsg_serial = max(prev_logmsg_serial, serial)
1313

    
1314
    # TODO: Handle canceled and archived jobs
1315
    elif status in (constants.JOB_STATUS_SUCCESS,
1316
                    constants.JOB_STATUS_ERROR,
1317
                    constants.JOB_STATUS_CANCELING,
1318
                    constants.JOB_STATUS_CANCELED):
1319
      break
1320

    
1321
    prev_job_info = job_info
1322

    
1323
  jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1324
  if not jobs:
1325
    raise errors.JobLost("Job with id %s lost" % job_id)
1326

    
1327
  status, opstatus, result = jobs[0]
1328

    
1329
  if status == constants.JOB_STATUS_SUCCESS:
1330
    return result
1331

    
1332
  if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
1333
    raise errors.OpExecError("Job was canceled")
1334

    
1335
  has_ok = False
1336
  for idx, (status, msg) in enumerate(zip(opstatus, result)):
1337
    if status == constants.OP_STATUS_SUCCESS:
1338
      has_ok = True
1339
    elif status == constants.OP_STATUS_ERROR:
1340
      errors.MaybeRaise(msg)
1341

    
1342
      if has_ok:
1343
        raise errors.OpExecError("partial failure (opcode %d): %s" %
1344
                                 (idx, msg))
1345

    
1346
      raise errors.OpExecError(str(msg))
1347

    
1348
  # default failure mode
1349
  raise errors.OpExecError(result)
1350

    
1351

    
1352
class JobPollCbBase:
1353
  """Base class for L{GenericPollJob} callbacks.
1354

1355
  """
1356
  def __init__(self):
1357
    """Initializes this class.
1358

1359
    """
1360

    
1361
  def WaitForJobChangeOnce(self, job_id, fields,
1362
                           prev_job_info, prev_log_serial):
1363
    """Waits for changes on a job.
1364

1365
    """
1366
    raise NotImplementedError()
1367

    
1368
  def QueryJobs(self, job_ids, fields):
1369
    """Returns the selected fields for the selected job IDs.
1370

1371
    @type job_ids: list of numbers
1372
    @param job_ids: Job IDs
1373
    @type fields: list of strings
1374
    @param fields: Fields
1375

1376
    """
1377
    raise NotImplementedError()
1378

    
1379

    
1380
class JobPollReportCbBase:
1381
  """Base class for L{GenericPollJob} reporting callbacks.
1382

1383
  """
1384
  def __init__(self):
1385
    """Initializes this class.
1386

1387
    """
1388

    
1389
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1390
    """Handles a log message.
1391

1392
    """
1393
    raise NotImplementedError()
1394

    
1395
  def ReportNotChanged(self, job_id, status):
1396
    """Called for if a job hasn't changed in a while.
1397

1398
    @type job_id: number
1399
    @param job_id: Job ID
1400
    @type status: string or None
1401
    @param status: Job status if available
1402

1403
    """
1404
    raise NotImplementedError()
1405

    
1406

    
1407
class _LuxiJobPollCb(JobPollCbBase):
1408
  def __init__(self, cl):
1409
    """Initializes this class.
1410

1411
    """
1412
    JobPollCbBase.__init__(self)
1413
    self.cl = cl
1414

    
1415
  def WaitForJobChangeOnce(self, job_id, fields,
1416
                           prev_job_info, prev_log_serial):
1417
    """Waits for changes on a job.
1418

1419
    """
1420
    return self.cl.WaitForJobChangeOnce(job_id, fields,
1421
                                        prev_job_info, prev_log_serial)
1422

    
1423
  def QueryJobs(self, job_ids, fields):
1424
    """Returns the selected fields for the selected job IDs.
1425

1426
    """
1427
    return self.cl.QueryJobs(job_ids, fields)
1428

    
1429

    
1430
class FeedbackFnJobPollReportCb(JobPollReportCbBase):
1431
  def __init__(self, feedback_fn):
1432
    """Initializes this class.
1433

1434
    """
1435
    JobPollReportCbBase.__init__(self)
1436

    
1437
    self.feedback_fn = feedback_fn
1438

    
1439
    assert callable(feedback_fn)
1440

    
1441
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1442
    """Handles a log message.
1443

1444
    """
1445
    self.feedback_fn((timestamp, log_type, log_msg))
1446

    
1447
  def ReportNotChanged(self, job_id, status):
1448
    """Called if a job hasn't changed in a while.
1449

1450
    """
1451
    # Ignore
1452

    
1453

    
1454
class StdioJobPollReportCb(JobPollReportCbBase):
1455
  def __init__(self):
1456
    """Initializes this class.
1457

1458
    """
1459
    JobPollReportCbBase.__init__(self)
1460

    
1461
    self.notified_queued = False
1462
    self.notified_waitlock = False
1463

    
1464
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1465
    """Handles a log message.
1466

1467
    """
1468
    ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
1469
             utils.SafeEncode(log_msg))
1470

    
1471
  def ReportNotChanged(self, job_id, status):
1472
    """Called if a job hasn't changed in a while.
1473

1474
    """
1475
    if status is None:
1476
      return
1477

    
1478
    if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
1479
      ToStderr("Job %s is waiting in queue", job_id)
1480
      self.notified_queued = True
1481

    
1482
    elif status == constants.JOB_STATUS_WAITLOCK and not self.notified_waitlock:
1483
      ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1484
      self.notified_waitlock = True
1485

    
1486

    
1487
def PollJob(job_id, cl=None, feedback_fn=None):
1488
  """Function to poll for the result of a job.
1489

1490
  @type job_id: job identified
1491
  @param job_id: the job to poll for results
1492
  @type cl: luxi.Client
1493
  @param cl: the luxi client to use for communicating with the master;
1494
             if None, a new client will be created
1495

1496
  """
1497
  if cl is None:
1498
    cl = GetClient()
1499

    
1500
  if feedback_fn:
1501
    reporter = FeedbackFnJobPollReportCb(feedback_fn)
1502
  else:
1503
    reporter = StdioJobPollReportCb()
1504

    
1505
  return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
1506

    
1507

    
1508
def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):
1509
  """Legacy function to submit an opcode.
1510

1511
  This is just a simple wrapper over the construction of the processor
1512
  instance. It should be extended to better handle feedback and
1513
  interaction functions.
1514

1515
  """
1516
  if cl is None:
1517
    cl = GetClient()
1518

    
1519
  SetGenericOpcodeOpts([op], opts)
1520

    
1521
  job_id = SendJob([op], cl)
1522

    
1523
  op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
1524

    
1525
  return op_results[0]
1526

    
1527

    
1528
def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1529
  """Wrapper around SubmitOpCode or SendJob.
1530

1531
  This function will decide, based on the 'opts' parameter, whether to
1532
  submit and wait for the result of the opcode (and return it), or
1533
  whether to just send the job and print its identifier. It is used in
1534
  order to simplify the implementation of the '--submit' option.
1535

1536
  It will also process the opcodes if we're sending the via SendJob
1537
  (otherwise SubmitOpCode does it).
1538

1539
  """
1540
  if opts and opts.submit_only:
1541
    job = [op]
1542
    SetGenericOpcodeOpts(job, opts)
1543
    job_id = SendJob(job, cl=cl)
1544
    raise JobSubmittedException(job_id)
1545
  else:
1546
    return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1547

    
1548

    
1549
def SetGenericOpcodeOpts(opcode_list, options):
1550
  """Processor for generic options.
1551

1552
  This function updates the given opcodes based on generic command
1553
  line options (like debug, dry-run, etc.).
1554

1555
  @param opcode_list: list of opcodes
1556
  @param options: command line options or None
1557
  @return: None (in-place modification)
1558

1559
  """
1560
  if not options:
1561
    return
1562
  for op in opcode_list:
1563
    op.dry_run = options.dry_run
1564
    op.debug_level = options.debug
1565

    
1566

    
1567
def GetClient():
1568
  # TODO: Cache object?
1569
  try:
1570
    client = luxi.Client()
1571
  except luxi.NoMasterError:
1572
    ss = ssconf.SimpleStore()
1573

    
1574
    # Try to read ssconf file
1575
    try:
1576
      ss.GetMasterNode()
1577
    except errors.ConfigurationError:
1578
      raise errors.OpPrereqError("Cluster not initialized or this machine is"
1579
                                 " not part of a cluster")
1580

    
1581
    master, myself = ssconf.GetMasterAndMyself(ss=ss)
1582
    if master != myself:
1583
      raise errors.OpPrereqError("This is not the master node, please connect"
1584
                                 " to node '%s' and rerun the command" %
1585
                                 master)
1586
    raise
1587
  return client
1588

    
1589

    
1590
def FormatError(err):
1591
  """Return a formatted error message for a given error.
1592

1593
  This function takes an exception instance and returns a tuple
1594
  consisting of two values: first, the recommended exit code, and
1595
  second, a string describing the error message (not
1596
  newline-terminated).
1597

1598
  """
1599
  retcode = 1
1600
  obuf = StringIO()
1601
  msg = str(err)
1602
  if isinstance(err, errors.ConfigurationError):
1603
    txt = "Corrupt configuration file: %s" % msg
1604
    logging.error(txt)
1605
    obuf.write(txt + "\n")
1606
    obuf.write("Aborting.")
1607
    retcode = 2
1608
  elif isinstance(err, errors.HooksAbort):
1609
    obuf.write("Failure: hooks execution failed:\n")
1610
    for node, script, out in err.args[0]:
1611
      if out:
1612
        obuf.write("  node: %s, script: %s, output: %s\n" %
1613
                   (node, script, out))
1614
      else:
1615
        obuf.write("  node: %s, script: %s (no output)\n" %
1616
                   (node, script))
1617
  elif isinstance(err, errors.HooksFailure):
1618
    obuf.write("Failure: hooks general failure: %s" % msg)
1619
  elif isinstance(err, errors.ResolverError):
1620
    this_host = utils.HostInfo.SysName()
1621
    if err.args[0] == this_host:
1622
      msg = "Failure: can't resolve my own hostname ('%s')"
1623
    else:
1624
      msg = "Failure: can't resolve hostname '%s'"
1625
    obuf.write(msg % err.args[0])
1626
  elif isinstance(err, errors.OpPrereqError):
1627
    if len(err.args) == 2:
1628
      obuf.write("Failure: prerequisites not met for this"
1629
               " operation:\nerror type: %s, error details:\n%s" %
1630
                 (err.args[1], err.args[0]))
1631
    else:
1632
      obuf.write("Failure: prerequisites not met for this"
1633
                 " operation:\n%s" % msg)
1634
  elif isinstance(err, errors.OpExecError):
1635
    obuf.write("Failure: command execution error:\n%s" % msg)
1636
  elif isinstance(err, errors.TagError):
1637
    obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1638
  elif isinstance(err, errors.JobQueueDrainError):
1639
    obuf.write("Failure: the job queue is marked for drain and doesn't"
1640
               " accept new requests\n")
1641
  elif isinstance(err, errors.JobQueueFull):
1642
    obuf.write("Failure: the job queue is full and doesn't accept new"
1643
               " job submissions until old jobs are archived\n")
1644
  elif isinstance(err, errors.TypeEnforcementError):
1645
    obuf.write("Parameter Error: %s" % msg)
1646
  elif isinstance(err, errors.ParameterError):
1647
    obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1648
  elif isinstance(err, luxi.NoMasterError):
1649
    obuf.write("Cannot communicate with the master daemon.\nIs it running"
1650
               " and listening for connections?")
1651
  elif isinstance(err, luxi.TimeoutError):
1652
    obuf.write("Timeout while talking to the master daemon. Error:\n"
1653
               "%s" % msg)
1654
  elif isinstance(err, luxi.ProtocolError):
1655
    obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1656
               "%s" % msg)
1657
  elif isinstance(err, errors.GenericError):
1658
    obuf.write("Unhandled Ganeti error: %s" % msg)
1659
  elif isinstance(err, JobSubmittedException):
1660
    obuf.write("JobID: %s\n" % err.args[0])
1661
    retcode = 0
1662
  else:
1663
    obuf.write("Unhandled exception: %s" % msg)
1664
  return retcode, obuf.getvalue().rstrip('\n')
1665

    
1666

    
1667
def GenericMain(commands, override=None, aliases=None):
1668
  """Generic main function for all the gnt-* commands.
1669

1670
  Arguments:
1671
    - commands: a dictionary with a special structure, see the design doc
1672
                for command line handling.
1673
    - override: if not None, we expect a dictionary with keys that will
1674
                override command line options; this can be used to pass
1675
                options from the scripts to generic functions
1676
    - aliases: dictionary with command aliases {'alias': 'target, ...}
1677

1678
  """
1679
  # save the program name and the entire command line for later logging
1680
  if sys.argv:
1681
    binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1682
    if len(sys.argv) >= 2:
1683
      binary += " " + sys.argv[1]
1684
      old_cmdline = " ".join(sys.argv[2:])
1685
    else:
1686
      old_cmdline = ""
1687
  else:
1688
    binary = "<unknown program>"
1689
    old_cmdline = ""
1690

    
1691
  if aliases is None:
1692
    aliases = {}
1693

    
1694
  try:
1695
    func, options, args = _ParseArgs(sys.argv, commands, aliases)
1696
  except errors.ParameterError, err:
1697
    result, err_msg = FormatError(err)
1698
    ToStderr(err_msg)
1699
    return 1
1700

    
1701
  if func is None: # parse error
1702
    return 1
1703

    
1704
  if override is not None:
1705
    for key, val in override.iteritems():
1706
      setattr(options, key, val)
1707

    
1708
  utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1709
                     stderr_logging=True, program=binary)
1710

    
1711
  if old_cmdline:
1712
    logging.info("run with arguments '%s'", old_cmdline)
1713
  else:
1714
    logging.info("run with no arguments")
1715

    
1716
  try:
1717
    result = func(options, args)
1718
  except (errors.GenericError, luxi.ProtocolError,
1719
          JobSubmittedException), err:
1720
    result, err_msg = FormatError(err)
1721
    logging.exception("Error during command processing")
1722
    ToStderr(err_msg)
1723

    
1724
  return result
1725

    
1726

    
1727
def GenericInstanceCreate(mode, opts, args):
1728
  """Add an instance to the cluster via either creation or import.
1729

1730
  @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1731
  @param opts: the command line options selected by the user
1732
  @type args: list
1733
  @param args: should contain only one element, the new instance name
1734
  @rtype: int
1735
  @return: the desired exit code
1736

1737
  """
1738
  instance = args[0]
1739

    
1740
  (pnode, snode) = SplitNodeOption(opts.node)
1741

    
1742
  hypervisor = None
1743
  hvparams = {}
1744
  if opts.hypervisor:
1745
    hypervisor, hvparams = opts.hypervisor
1746

    
1747
  if opts.nics:
1748
    try:
1749
      nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
1750
    except ValueError, err:
1751
      raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
1752
    nics = [{}] * nic_max
1753
    for nidx, ndict in opts.nics:
1754
      nidx = int(nidx)
1755
      if not isinstance(ndict, dict):
1756
        msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict)
1757
        raise errors.OpPrereqError(msg)
1758
      nics[nidx] = ndict
1759
  elif opts.no_nics:
1760
    # no nics
1761
    nics = []
1762
  elif mode == constants.INSTANCE_CREATE:
1763
    # default of one nic, all auto
1764
    nics = [{}]
1765
  else:
1766
    # mode == import
1767
    nics = []
1768

    
1769
  if opts.disk_template == constants.DT_DISKLESS:
1770
    if opts.disks or opts.sd_size is not None:
1771
      raise errors.OpPrereqError("Diskless instance but disk"
1772
                                 " information passed")
1773
    disks = []
1774
  else:
1775
    if (not opts.disks and not opts.sd_size
1776
        and mode == constants.INSTANCE_CREATE):
1777
      raise errors.OpPrereqError("No disk information specified")
1778
    if opts.disks and opts.sd_size is not None:
1779
      raise errors.OpPrereqError("Please use either the '--disk' or"
1780
                                 " '-s' option")
1781
    if opts.sd_size is not None:
1782
      opts.disks = [(0, {"size": opts.sd_size})]
1783

    
1784
    if opts.disks:
1785
      try:
1786
        disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1787
      except ValueError, err:
1788
        raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
1789
      disks = [{}] * disk_max
1790
    else:
1791
      disks = []
1792
    for didx, ddict in opts.disks:
1793
      didx = int(didx)
1794
      if not isinstance(ddict, dict):
1795
        msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1796
        raise errors.OpPrereqError(msg)
1797
      elif "size" in ddict:
1798
        if "adopt" in ddict:
1799
          raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1800
                                     " (disk %d)" % didx)
1801
        try:
1802
          ddict["size"] = utils.ParseUnit(ddict["size"])
1803
        except ValueError, err:
1804
          raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1805
                                     (didx, err))
1806
      elif "adopt" in ddict:
1807
        if mode == constants.INSTANCE_IMPORT:
1808
          raise errors.OpPrereqError("Disk adoption not allowed for instance"
1809
                                     " import")
1810
        ddict["size"] = 0
1811
      else:
1812
        raise errors.OpPrereqError("Missing size or adoption source for"
1813
                                   " disk %d" % didx)
1814
      disks[didx] = ddict
1815

    
1816
  utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
1817
  utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1818

    
1819
  if mode == constants.INSTANCE_CREATE:
1820
    start = opts.start
1821
    os_type = opts.os
1822
    force_variant = opts.force_variant
1823
    src_node = None
1824
    src_path = None
1825
    no_install = opts.no_install
1826
    identify_defaults = False
1827
  elif mode == constants.INSTANCE_IMPORT:
1828
    start = False
1829
    os_type = None
1830
    force_variant = False
1831
    src_node = opts.src_node
1832
    src_path = opts.src_dir
1833
    no_install = None
1834
    identify_defaults = opts.identify_defaults
1835
  else:
1836
    raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1837

    
1838
  op = opcodes.OpCreateInstance(instance_name=instance,
1839
                                disks=disks,
1840
                                disk_template=opts.disk_template,
1841
                                nics=nics,
1842
                                pnode=pnode, snode=snode,
1843
                                ip_check=opts.ip_check,
1844
                                name_check=opts.name_check,
1845
                                wait_for_sync=opts.wait_for_sync,
1846
                                file_storage_dir=opts.file_storage_dir,
1847
                                file_driver=opts.file_driver,
1848
                                iallocator=opts.iallocator,
1849
                                hypervisor=hypervisor,
1850
                                hvparams=hvparams,
1851
                                beparams=opts.beparams,
1852
                                osparams=opts.osparams,
1853
                                mode=mode,
1854
                                start=start,
1855
                                os_type=os_type,
1856
                                force_variant=force_variant,
1857
                                src_node=src_node,
1858
                                src_path=src_path,
1859
                                no_install=no_install,
1860
                                identify_defaults=identify_defaults)
1861

    
1862
  SubmitOrSend(op, opts)
1863
  return 0
1864

    
1865

    
1866
class _RunWhileClusterStoppedHelper:
1867
  """Helper class for L{RunWhileClusterStopped} to simplify state management
1868

1869
  """
1870
  def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
1871
    """Initializes this class.
1872

1873
    @type feedback_fn: callable
1874
    @param feedback_fn: Feedback function
1875
    @type cluster_name: string
1876
    @param cluster_name: Cluster name
1877
    @type master_node: string
1878
    @param master_node Master node name
1879
    @type online_nodes: list
1880
    @param online_nodes: List of names of online nodes
1881

1882
    """
1883
    self.feedback_fn = feedback_fn
1884
    self.cluster_name = cluster_name
1885
    self.master_node = master_node
1886
    self.online_nodes = online_nodes
1887

    
1888
    self.ssh = ssh.SshRunner(self.cluster_name)
1889

    
1890
    self.nonmaster_nodes = [name for name in online_nodes
1891
                            if name != master_node]
1892

    
1893
    assert self.master_node not in self.nonmaster_nodes
1894

    
1895
  def _RunCmd(self, node_name, cmd):
1896
    """Runs a command on the local or a remote machine.
1897

1898
    @type node_name: string
1899
    @param node_name: Machine name
1900
    @type cmd: list
1901
    @param cmd: Command
1902

1903
    """
1904
    if node_name is None or node_name == self.master_node:
1905
      # No need to use SSH
1906
      result = utils.RunCmd(cmd)
1907
    else:
1908
      result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
1909

    
1910
    if result.failed:
1911
      errmsg = ["Failed to run command %s" % result.cmd]
1912
      if node_name:
1913
        errmsg.append("on node %s" % node_name)
1914
      errmsg.append(": exitcode %s and error %s" %
1915
                    (result.exit_code, result.output))
1916
      raise errors.OpExecError(" ".join(errmsg))
1917

    
1918
  def Call(self, fn, *args):
1919
    """Call function while all daemons are stopped.
1920

1921
    @type fn: callable
1922
    @param fn: Function to be called
1923

1924
    """
1925
    # Pause watcher by acquiring an exclusive lock on watcher state file
1926
    self.feedback_fn("Blocking watcher")
1927
    watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
1928
    try:
1929
      # TODO: Currently, this just blocks. There's no timeout.
1930
      # TODO: Should it be a shared lock?
1931
      watcher_block.Exclusive(blocking=True)
1932

    
1933
      # Stop master daemons, so that no new jobs can come in and all running
1934
      # ones are finished
1935
      self.feedback_fn("Stopping master daemons")
1936
      self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
1937
      try:
1938
        # Stop daemons on all nodes
1939
        for node_name in self.online_nodes:
1940
          self.feedback_fn("Stopping daemons on %s" % node_name)
1941
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
1942

    
1943
        # All daemons are shut down now
1944
        try:
1945
          return fn(self, *args)
1946
        except Exception, err:
1947
          _, errmsg = FormatError(err)
1948
          logging.exception("Caught exception")
1949
          self.feedback_fn(errmsg)
1950
          raise
1951
      finally:
1952
        # Start cluster again, master node last
1953
        for node_name in self.nonmaster_nodes + [self.master_node]:
1954
          self.feedback_fn("Starting daemons on %s" % node_name)
1955
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
1956
    finally:
1957
      # Resume watcher
1958
      watcher_block.Close()
1959

    
1960

    
1961
def RunWhileClusterStopped(feedback_fn, fn, *args):
1962
  """Calls a function while all cluster daemons are stopped.
1963

1964
  @type feedback_fn: callable
1965
  @param feedback_fn: Feedback function
1966
  @type fn: callable
1967
  @param fn: Function to be called when daemons are stopped
1968

1969
  """
1970
  feedback_fn("Gathering cluster information")
1971

    
1972
  # This ensures we're running on the master daemon
1973
  cl = GetClient()
1974

    
1975
  (cluster_name, master_node) = \
1976
    cl.QueryConfigValues(["cluster_name", "master_node"])
1977

    
1978
  online_nodes = GetOnlineNodes([], cl=cl)
1979

    
1980
  # Don't keep a reference to the client. The master daemon will go away.
1981
  del cl
1982

    
1983
  assert master_node in online_nodes
1984

    
1985
  return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
1986
                                       online_nodes).Call(fn, *args)
1987

    
1988

    
1989
def GenerateTable(headers, fields, separator, data,
1990
                  numfields=None, unitfields=None,
1991
                  units=None):
1992
  """Prints a table with headers and different fields.
1993

1994
  @type headers: dict
1995
  @param headers: dictionary mapping field names to headers for
1996
      the table
1997
  @type fields: list
1998
  @param fields: the field names corresponding to each row in
1999
      the data field
2000
  @param separator: the separator to be used; if this is None,
2001
      the default 'smart' algorithm is used which computes optimal
2002
      field width, otherwise just the separator is used between
2003
      each field
2004
  @type data: list
2005
  @param data: a list of lists, each sublist being one row to be output
2006
  @type numfields: list
2007
  @param numfields: a list with the fields that hold numeric
2008
      values and thus should be right-aligned
2009
  @type unitfields: list
2010
  @param unitfields: a list with the fields that hold numeric
2011
      values that should be formatted with the units field
2012
  @type units: string or None
2013
  @param units: the units we should use for formatting, or None for
2014
      automatic choice (human-readable for non-separator usage, otherwise
2015
      megabytes); this is a one-letter string
2016

2017
  """
2018
  if units is None:
2019
    if separator:
2020
      units = "m"
2021
    else:
2022
      units = "h"
2023

    
2024
  if numfields is None:
2025
    numfields = []
2026
  if unitfields is None:
2027
    unitfields = []
2028

    
2029
  numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
2030
  unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
2031

    
2032
  format_fields = []
2033
  for field in fields:
2034
    if headers and field not in headers:
2035
      # TODO: handle better unknown fields (either revert to old
2036
      # style of raising exception, or deal more intelligently with
2037
      # variable fields)
2038
      headers[field] = field
2039
    if separator is not None:
2040
      format_fields.append("%s")
2041
    elif numfields.Matches(field):
2042
      format_fields.append("%*s")
2043
    else:
2044
      format_fields.append("%-*s")
2045

    
2046
  if separator is None:
2047
    mlens = [0 for name in fields]
2048
    format_str = ' '.join(format_fields)
2049
  else:
2050
    format_str = separator.replace("%", "%%").join(format_fields)
2051

    
2052
  for row in data:
2053
    if row is None:
2054
      continue
2055
    for idx, val in enumerate(row):
2056
      if unitfields.Matches(fields[idx]):
2057
        try:
2058
          val = int(val)
2059
        except (TypeError, ValueError):
2060
          pass
2061
        else:
2062
          val = row[idx] = utils.FormatUnit(val, units)
2063
      val = row[idx] = str(val)
2064
      if separator is None:
2065
        mlens[idx] = max(mlens[idx], len(val))
2066

    
2067
  result = []
2068
  if headers:
2069
    args = []
2070
    for idx, name in enumerate(fields):
2071
      hdr = headers[name]
2072
      if separator is None:
2073
        mlens[idx] = max(mlens[idx], len(hdr))
2074
        args.append(mlens[idx])
2075
      args.append(hdr)
2076
    result.append(format_str % tuple(args))
2077

    
2078
  if separator is None:
2079
    assert len(mlens) == len(fields)
2080

    
2081
    if fields and not numfields.Matches(fields[-1]):
2082
      mlens[-1] = 0
2083

    
2084
  for line in data:
2085
    args = []
2086
    if line is None:
2087
      line = ['-' for _ in fields]
2088
    for idx in range(len(fields)):
2089
      if separator is None:
2090
        args.append(mlens[idx])
2091
      args.append(line[idx])
2092
    result.append(format_str % tuple(args))
2093

    
2094
  return result
2095

    
2096

    
2097
def FormatTimestamp(ts):
2098
  """Formats a given timestamp.
2099

2100
  @type ts: timestamp
2101
  @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
2102

2103
  @rtype: string
2104
  @return: a string with the formatted timestamp
2105

2106
  """
2107
  if not isinstance (ts, (tuple, list)) or len(ts) != 2:
2108
    return '?'
2109
  sec, usec = ts
2110
  return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
2111

    
2112

    
2113
def ParseTimespec(value):
2114
  """Parse a time specification.
2115

2116
  The following suffixed will be recognized:
2117

2118
    - s: seconds
2119
    - m: minutes
2120
    - h: hours
2121
    - d: day
2122
    - w: weeks
2123

2124
  Without any suffix, the value will be taken to be in seconds.
2125

2126
  """
2127
  value = str(value)
2128
  if not value:
2129
    raise errors.OpPrereqError("Empty time specification passed")
2130
  suffix_map = {
2131
    's': 1,
2132
    'm': 60,
2133
    'h': 3600,
2134
    'd': 86400,
2135
    'w': 604800,
2136
    }
2137
  if value[-1] not in suffix_map:
2138
    try:
2139
      value = int(value)
2140
    except (TypeError, ValueError):
2141
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2142
  else:
2143
    multiplier = suffix_map[value[-1]]
2144
    value = value[:-1]
2145
    if not value: # no data left after stripping the suffix
2146
      raise errors.OpPrereqError("Invalid time specification (only"
2147
                                 " suffix passed)")
2148
    try:
2149
      value = int(value) * multiplier
2150
    except (TypeError, ValueError):
2151
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2152
  return value
2153

    
2154

    
2155
def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
2156
                   filter_master=False):
2157
  """Returns the names of online nodes.
2158

2159
  This function will also log a warning on stderr with the names of
2160
  the online nodes.
2161

2162
  @param nodes: if not empty, use only this subset of nodes (minus the
2163
      offline ones)
2164
  @param cl: if not None, luxi client to use
2165
  @type nowarn: boolean
2166
  @param nowarn: by default, this function will output a note with the
2167
      offline nodes that are skipped; if this parameter is True the
2168
      note is not displayed
2169
  @type secondary_ips: boolean
2170
  @param secondary_ips: if True, return the secondary IPs instead of the
2171
      names, useful for doing network traffic over the replication interface
2172
      (if any)
2173
  @type filter_master: boolean
2174
  @param filter_master: if True, do not return the master node in the list
2175
      (useful in coordination with secondary_ips where we cannot check our
2176
      node name against the list)
2177

2178
  """
2179
  if cl is None:
2180
    cl = GetClient()
2181

    
2182
  if secondary_ips:
2183
    name_idx = 2
2184
  else:
2185
    name_idx = 0
2186

    
2187
  if filter_master:
2188
    master_node = cl.QueryConfigValues(["master_node"])[0]
2189
    filter_fn = lambda x: x != master_node
2190
  else:
2191
    filter_fn = lambda _: True
2192

    
2193
  result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"],
2194
                         use_locking=False)
2195
  offline = [row[0] for row in result if row[1]]
2196
  if offline and not nowarn:
2197
    ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
2198
  return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])]
2199

    
2200

    
2201
def _ToStream(stream, txt, *args):
2202
  """Write a message to a stream, bypassing the logging system
2203

2204
  @type stream: file object
2205
  @param stream: the file to which we should write
2206
  @type txt: str
2207
  @param txt: the message
2208

2209
  """
2210
  if args:
2211
    args = tuple(args)
2212
    stream.write(txt % args)
2213
  else:
2214
    stream.write(txt)
2215
  stream.write('\n')
2216
  stream.flush()
2217

    
2218

    
2219
def ToStdout(txt, *args):
2220
  """Write a message to stdout only, bypassing the logging system
2221

2222
  This is just a wrapper over _ToStream.
2223

2224
  @type txt: str
2225
  @param txt: the message
2226

2227
  """
2228
  _ToStream(sys.stdout, txt, *args)
2229

    
2230

    
2231
def ToStderr(txt, *args):
2232
  """Write a message to stderr only, bypassing the logging system
2233

2234
  This is just a wrapper over _ToStream.
2235

2236
  @type txt: str
2237
  @param txt: the message
2238

2239
  """
2240
  _ToStream(sys.stderr, txt, *args)
2241

    
2242

    
2243
class JobExecutor(object):
2244
  """Class which manages the submission and execution of multiple jobs.
2245

2246
  Note that instances of this class should not be reused between
2247
  GetResults() calls.
2248

2249
  """
2250
  def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
2251
    self.queue = []
2252
    if cl is None:
2253
      cl = GetClient()
2254
    self.cl = cl
2255
    self.verbose = verbose
2256
    self.jobs = []
2257
    self.opts = opts
2258
    self.feedback_fn = feedback_fn
2259

    
2260
  def QueueJob(self, name, *ops):
2261
    """Record a job for later submit.
2262

2263
    @type name: string
2264
    @param name: a description of the job, will be used in WaitJobSet
2265
    """
2266
    SetGenericOpcodeOpts(ops, self.opts)
2267
    self.queue.append((name, ops))
2268

    
2269
  def SubmitPending(self, each=False):
2270
    """Submit all pending jobs.
2271

2272
    """
2273
    if each:
2274
      results = []
2275
      for row in self.queue:
2276
        # SubmitJob will remove the success status, but raise an exception if
2277
        # the submission fails, so we'll notice that anyway.
2278
        results.append([True, self.cl.SubmitJob(row[1])])
2279
    else:
2280
      results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
2281
    for (idx, ((status, data), (name, _))) in enumerate(zip(results,
2282
                                                            self.queue)):
2283
      self.jobs.append((idx, status, data, name))
2284

    
2285
  def _ChooseJob(self):
2286
    """Choose a non-waiting/queued job to poll next.
2287

2288
    """
2289
    assert self.jobs, "_ChooseJob called with empty job list"
2290

    
2291
    result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
2292
    assert result
2293

    
2294
    for job_data, status in zip(self.jobs, result):
2295
      if status[0] in (constants.JOB_STATUS_QUEUED,
2296
                    constants.JOB_STATUS_WAITLOCK,
2297
                    constants.JOB_STATUS_CANCELING):
2298
        # job is still waiting
2299
        continue
2300
      # good candidate found
2301
      self.jobs.remove(job_data)
2302
      return job_data
2303

    
2304
    # no job found
2305
    return self.jobs.pop(0)
2306

    
2307
  def GetResults(self):
2308
    """Wait for and return the results of all jobs.
2309

2310
    @rtype: list
2311
    @return: list of tuples (success, job results), in the same order
2312
        as the submitted jobs; if a job has failed, instead of the result
2313
        there will be the error message
2314

2315
    """
2316
    if not self.jobs:
2317
      self.SubmitPending()
2318
    results = []
2319
    if self.verbose:
2320
      ok_jobs = [row[2] for row in self.jobs if row[1]]
2321
      if ok_jobs:
2322
        ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2323

    
2324
    # first, remove any non-submitted jobs
2325
    self.jobs, failures = compat.partition(self.jobs, lambda x: x[1])
2326
    for idx, _, jid, name in failures:
2327
      ToStderr("Failed to submit job for %s: %s", name, jid)
2328
      results.append((idx, False, jid))
2329

    
2330
    while self.jobs:
2331
      (idx, _, jid, name) = self._ChooseJob()
2332
      ToStdout("Waiting for job %s for %s...", jid, name)
2333
      try:
2334
        job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2335
        success = True
2336
      except (errors.GenericError, luxi.ProtocolError), err:
2337
        _, job_result = FormatError(err)
2338
        success = False
2339
        # the error message will always be shown, verbose or not
2340
        ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
2341

    
2342
      results.append((idx, success, job_result))
2343

    
2344
    # sort based on the index, then drop it
2345
    results.sort()
2346
    results = [i[1:] for i in results]
2347

    
2348
    return results
2349

    
2350
  def WaitOrShow(self, wait):
2351
    """Wait for job results or only print the job IDs.
2352

2353
    @type wait: boolean
2354
    @param wait: whether to wait or not
2355

2356
    """
2357
    if wait:
2358
      return self.GetResults()
2359
    else:
2360
      if not self.jobs:
2361
        self.SubmitPending()
2362
      for _, status, result, name in self.jobs:
2363
        if status:
2364
          ToStdout("%s: %s", result, name)
2365
        else:
2366
          ToStderr("Failure for %s: %s", name, result)
2367
      return [row[1:3] for row in self.jobs]