Statistics
| Branch: | Tag: | Revision:

root / lib / cli.py @ fdad8c4d

History | View | Annotate | Download (70.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module dealing with command line parsing"""
23

    
24

    
25
import sys
26
import textwrap
27
import os.path
28
import time
29
import logging
30
from cStringIO import StringIO
31

    
32
from ganeti import utils
33
from ganeti import errors
34
from ganeti import constants
35
from ganeti import opcodes
36
from ganeti import luxi
37
from ganeti import ssconf
38
from ganeti import rpc
39
from ganeti import ssh
40

    
41
from optparse import (OptionParser, TitledHelpFormatter,
42
                      Option, OptionValueError)
43

    
44

    
45
__all__ = [
46
  # Command line options
47
  "ADD_UIDS_OPT",
48
  "ALLOCATABLE_OPT",
49
  "ALL_OPT",
50
  "AUTO_PROMOTE_OPT",
51
  "AUTO_REPLACE_OPT",
52
  "BACKEND_OPT",
53
  "CLEANUP_OPT",
54
  "CONFIRM_OPT",
55
  "CP_SIZE_OPT",
56
  "DEBUG_OPT",
57
  "DEBUG_SIMERR_OPT",
58
  "DISKIDX_OPT",
59
  "DISK_OPT",
60
  "DISK_TEMPLATE_OPT",
61
  "DRAINED_OPT",
62
  "EARLY_RELEASE_OPT",
63
  "ENABLED_HV_OPT",
64
  "ERROR_CODES_OPT",
65
  "FIELDS_OPT",
66
  "FILESTORE_DIR_OPT",
67
  "FILESTORE_DRIVER_OPT",
68
  "FORCE_OPT",
69
  "FORCE_VARIANT_OPT",
70
  "GLOBAL_FILEDIR_OPT",
71
  "HVLIST_OPT",
72
  "HVOPTS_OPT",
73
  "HYPERVISOR_OPT",
74
  "IALLOCATOR_OPT",
75
  "IDENTIFY_DEFAULTS_OPT",
76
  "IGNORE_CONSIST_OPT",
77
  "IGNORE_FAILURES_OPT",
78
  "IGNORE_SECONDARIES_OPT",
79
  "IGNORE_SIZE_OPT",
80
  "MAC_PREFIX_OPT",
81
  "MAINTAIN_NODE_HEALTH_OPT",
82
  "MASTER_NETDEV_OPT",
83
  "MC_OPT",
84
  "NET_OPT",
85
  "NEW_CLUSTER_CERT_OPT",
86
  "NEW_CONFD_HMAC_KEY_OPT",
87
  "NEW_RAPI_CERT_OPT",
88
  "NEW_SECONDARY_OPT",
89
  "NIC_PARAMS_OPT",
90
  "NODE_LIST_OPT",
91
  "NODE_PLACEMENT_OPT",
92
  "NOHDR_OPT",
93
  "NOIPCHECK_OPT",
94
  "NO_INSTALL_OPT",
95
  "NONAMECHECK_OPT",
96
  "NOLVM_STORAGE_OPT",
97
  "NOMODIFY_ETCHOSTS_OPT",
98
  "NOMODIFY_SSH_SETUP_OPT",
99
  "NONICS_OPT",
100
  "NONLIVE_OPT",
101
  "NONPLUS1_OPT",
102
  "NOSHUTDOWN_OPT",
103
  "NOSTART_OPT",
104
  "NOSSH_KEYCHECK_OPT",
105
  "NOVOTING_OPT",
106
  "NWSYNC_OPT",
107
  "ON_PRIMARY_OPT",
108
  "ON_SECONDARY_OPT",
109
  "OFFLINE_OPT",
110
  "OS_OPT",
111
  "OS_SIZE_OPT",
112
  "RAPI_CERT_OPT",
113
  "READD_OPT",
114
  "REBOOT_TYPE_OPT",
115
  "REMOVE_UIDS_OPT",
116
  "SECONDARY_IP_OPT",
117
  "SELECT_OS_OPT",
118
  "SEP_OPT",
119
  "SHOWCMD_OPT",
120
  "SHUTDOWN_TIMEOUT_OPT",
121
  "SINGLE_NODE_OPT",
122
  "SRC_DIR_OPT",
123
  "SRC_NODE_OPT",
124
  "SUBMIT_OPT",
125
  "STATIC_OPT",
126
  "SYNC_OPT",
127
  "TAG_SRC_OPT",
128
  "TIMEOUT_OPT",
129
  "UIDPOOL_OPT",
130
  "USEUNITS_OPT",
131
  "USE_REPL_NET_OPT",
132
  "VERBOSE_OPT",
133
  "VG_NAME_OPT",
134
  "YES_DOIT_OPT",
135
  # Generic functions for CLI programs
136
  "GenericMain",
137
  "GenericInstanceCreate",
138
  "GetClient",
139
  "GetOnlineNodes",
140
  "JobExecutor",
141
  "JobSubmittedException",
142
  "ParseTimespec",
143
  "RunWhileClusterStopped",
144
  "SubmitOpCode",
145
  "SubmitOrSend",
146
  "UsesRPC",
147
  # Formatting functions
148
  "ToStderr", "ToStdout",
149
  "FormatError",
150
  "GenerateTable",
151
  "AskUser",
152
  "FormatTimestamp",
153
  # Tags functions
154
  "ListTags",
155
  "AddTags",
156
  "RemoveTags",
157
  # command line options support infrastructure
158
  "ARGS_MANY_INSTANCES",
159
  "ARGS_MANY_NODES",
160
  "ARGS_NONE",
161
  "ARGS_ONE_INSTANCE",
162
  "ARGS_ONE_NODE",
163
  "ARGS_ONE_OS",
164
  "ArgChoice",
165
  "ArgCommand",
166
  "ArgFile",
167
  "ArgHost",
168
  "ArgInstance",
169
  "ArgJobId",
170
  "ArgNode",
171
  "ArgOs",
172
  "ArgSuggest",
173
  "ArgUnknown",
174
  "OPT_COMPL_INST_ADD_NODES",
175
  "OPT_COMPL_MANY_NODES",
176
  "OPT_COMPL_ONE_IALLOCATOR",
177
  "OPT_COMPL_ONE_INSTANCE",
178
  "OPT_COMPL_ONE_NODE",
179
  "OPT_COMPL_ONE_OS",
180
  "cli_option",
181
  "SplitNodeOption",
182
  "CalculateOSNames",
183
  ]
184

    
185
NO_PREFIX = "no_"
186
UN_PREFIX = "-"
187

    
188

    
189
class _Argument:
190
  def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
191
    self.min = min
192
    self.max = max
193

    
194
  def __repr__(self):
195
    return ("<%s min=%s max=%s>" %
196
            (self.__class__.__name__, self.min, self.max))
197

    
198

    
199
class ArgSuggest(_Argument):
200
  """Suggesting argument.
201

202
  Value can be any of the ones passed to the constructor.
203

204
  """
205
  # pylint: disable-msg=W0622
206
  def __init__(self, min=0, max=None, choices=None):
207
    _Argument.__init__(self, min=min, max=max)
208
    self.choices = choices
209

    
210
  def __repr__(self):
211
    return ("<%s min=%s max=%s choices=%r>" %
212
            (self.__class__.__name__, self.min, self.max, self.choices))
213

    
214

    
215
class ArgChoice(ArgSuggest):
216
  """Choice argument.
217

218
  Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
219
  but value must be one of the choices.
220

221
  """
222

    
223

    
224
class ArgUnknown(_Argument):
225
  """Unknown argument to program (e.g. determined at runtime).
226

227
  """
228

    
229

    
230
class ArgInstance(_Argument):
231
  """Instances argument.
232

233
  """
234

    
235

    
236
class ArgNode(_Argument):
237
  """Node argument.
238

239
  """
240

    
241
class ArgJobId(_Argument):
242
  """Job ID argument.
243

244
  """
245

    
246

    
247
class ArgFile(_Argument):
248
  """File path argument.
249

250
  """
251

    
252

    
253
class ArgCommand(_Argument):
254
  """Command argument.
255

256
  """
257

    
258

    
259
class ArgHost(_Argument):
260
  """Host argument.
261

262
  """
263

    
264

    
265
class ArgOs(_Argument):
266
  """OS argument.
267

268
  """
269

    
270

    
271
ARGS_NONE = []
272
ARGS_MANY_INSTANCES = [ArgInstance()]
273
ARGS_MANY_NODES = [ArgNode()]
274
ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
275
ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
276
ARGS_ONE_OS = [ArgOs(min=1, max=1)]
277

    
278

    
279
def _ExtractTagsObject(opts, args):
280
  """Extract the tag type object.
281

282
  Note that this function will modify its args parameter.
283

284
  """
285
  if not hasattr(opts, "tag_type"):
286
    raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
287
  kind = opts.tag_type
288
  if kind == constants.TAG_CLUSTER:
289
    retval = kind, kind
290
  elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
291
    if not args:
292
      raise errors.OpPrereqError("no arguments passed to the command")
293
    name = args.pop(0)
294
    retval = kind, name
295
  else:
296
    raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
297
  return retval
298

    
299

    
300
def _ExtendTags(opts, args):
301
  """Extend the args if a source file has been given.
302

303
  This function will extend the tags with the contents of the file
304
  passed in the 'tags_source' attribute of the opts parameter. A file
305
  named '-' will be replaced by stdin.
306

307
  """
308
  fname = opts.tags_source
309
  if fname is None:
310
    return
311
  if fname == "-":
312
    new_fh = sys.stdin
313
  else:
314
    new_fh = open(fname, "r")
315
  new_data = []
316
  try:
317
    # we don't use the nice 'new_data = [line.strip() for line in fh]'
318
    # because of python bug 1633941
319
    while True:
320
      line = new_fh.readline()
321
      if not line:
322
        break
323
      new_data.append(line.strip())
324
  finally:
325
    new_fh.close()
326
  args.extend(new_data)
327

    
328

    
329
def ListTags(opts, args):
330
  """List the tags on a given object.
331

332
  This is a generic implementation that knows how to deal with all
333
  three cases of tag objects (cluster, node, instance). The opts
334
  argument is expected to contain a tag_type field denoting what
335
  object type we work on.
336

337
  """
338
  kind, name = _ExtractTagsObject(opts, args)
339
  cl = GetClient()
340
  result = cl.QueryTags(kind, name)
341
  result = list(result)
342
  result.sort()
343
  for tag in result:
344
    ToStdout(tag)
345

    
346

    
347
def AddTags(opts, args):
348
  """Add tags on a given object.
349

350
  This is a generic implementation that knows how to deal with all
351
  three cases of tag objects (cluster, node, instance). The opts
352
  argument is expected to contain a tag_type field denoting what
353
  object type we work on.
354

355
  """
356
  kind, name = _ExtractTagsObject(opts, args)
357
  _ExtendTags(opts, args)
358
  if not args:
359
    raise errors.OpPrereqError("No tags to be added")
360
  op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
361
  SubmitOpCode(op)
362

    
363

    
364
def RemoveTags(opts, args):
365
  """Remove tags from a given object.
366

367
  This is a generic implementation that knows how to deal with all
368
  three cases of tag objects (cluster, node, instance). The opts
369
  argument is expected to contain a tag_type field denoting what
370
  object type we work on.
371

372
  """
373
  kind, name = _ExtractTagsObject(opts, args)
374
  _ExtendTags(opts, args)
375
  if not args:
376
    raise errors.OpPrereqError("No tags to be removed")
377
  op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
378
  SubmitOpCode(op)
379

    
380

    
381
def check_unit(option, opt, value): # pylint: disable-msg=W0613
382
  """OptParsers custom converter for units.
383

384
  """
385
  try:
386
    return utils.ParseUnit(value)
387
  except errors.UnitParseError, err:
388
    raise OptionValueError("option %s: %s" % (opt, err))
389

    
390

    
391
def _SplitKeyVal(opt, data):
392
  """Convert a KeyVal string into a dict.
393

394
  This function will convert a key=val[,...] string into a dict. Empty
395
  values will be converted specially: keys which have the prefix 'no_'
396
  will have the value=False and the prefix stripped, the others will
397
  have value=True.
398

399
  @type opt: string
400
  @param opt: a string holding the option name for which we process the
401
      data, used in building error messages
402
  @type data: string
403
  @param data: a string of the format key=val,key=val,...
404
  @rtype: dict
405
  @return: {key=val, key=val}
406
  @raises errors.ParameterError: if there are duplicate keys
407

408
  """
409
  kv_dict = {}
410
  if data:
411
    for elem in utils.UnescapeAndSplit(data, sep=","):
412
      if "=" in elem:
413
        key, val = elem.split("=", 1)
414
      else:
415
        if elem.startswith(NO_PREFIX):
416
          key, val = elem[len(NO_PREFIX):], False
417
        elif elem.startswith(UN_PREFIX):
418
          key, val = elem[len(UN_PREFIX):], None
419
        else:
420
          key, val = elem, True
421
      if key in kv_dict:
422
        raise errors.ParameterError("Duplicate key '%s' in option %s" %
423
                                    (key, opt))
424
      kv_dict[key] = val
425
  return kv_dict
426

    
427

    
428
def check_ident_key_val(option, opt, value):  # pylint: disable-msg=W0613
429
  """Custom parser for ident:key=val,key=val options.
430

431
  This will store the parsed values as a tuple (ident, {key: val}). As such,
432
  multiple uses of this option via action=append is possible.
433

434
  """
435
  if ":" not in value:
436
    ident, rest = value, ''
437
  else:
438
    ident, rest = value.split(":", 1)
439

    
440
  if ident.startswith(NO_PREFIX):
441
    if rest:
442
      msg = "Cannot pass options when removing parameter groups: %s" % value
443
      raise errors.ParameterError(msg)
444
    retval = (ident[len(NO_PREFIX):], False)
445
  elif ident.startswith(UN_PREFIX):
446
    if rest:
447
      msg = "Cannot pass options when removing parameter groups: %s" % value
448
      raise errors.ParameterError(msg)
449
    retval = (ident[len(UN_PREFIX):], None)
450
  else:
451
    kv_dict = _SplitKeyVal(opt, rest)
452
    retval = (ident, kv_dict)
453
  return retval
454

    
455

    
456
def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
457
  """Custom parser class for key=val,key=val options.
458

459
  This will store the parsed values as a dict {key: val}.
460

461
  """
462
  return _SplitKeyVal(opt, value)
463

    
464

    
465
def check_bool(option, opt, value): # pylint: disable-msg=W0613
466
  """Custom parser for yes/no options.
467

468
  This will store the parsed value as either True or False.
469

470
  """
471
  value = value.lower()
472
  if value == constants.VALUE_FALSE or value == "no":
473
    return False
474
  elif value == constants.VALUE_TRUE or value == "yes":
475
    return True
476
  else:
477
    raise errors.ParameterError("Invalid boolean value '%s'" % value)
478

    
479

    
480
# completion_suggestion is normally a list. Using numeric values not evaluating
481
# to False for dynamic completion.
482
(OPT_COMPL_MANY_NODES,
483
 OPT_COMPL_ONE_NODE,
484
 OPT_COMPL_ONE_INSTANCE,
485
 OPT_COMPL_ONE_OS,
486
 OPT_COMPL_ONE_IALLOCATOR,
487
 OPT_COMPL_INST_ADD_NODES) = range(100, 106)
488

    
489
OPT_COMPL_ALL = frozenset([
490
  OPT_COMPL_MANY_NODES,
491
  OPT_COMPL_ONE_NODE,
492
  OPT_COMPL_ONE_INSTANCE,
493
  OPT_COMPL_ONE_OS,
494
  OPT_COMPL_ONE_IALLOCATOR,
495
  OPT_COMPL_INST_ADD_NODES,
496
  ])
497

    
498

    
499
class CliOption(Option):
500
  """Custom option class for optparse.
501

502
  """
503
  ATTRS = Option.ATTRS + [
504
    "completion_suggest",
505
    ]
506
  TYPES = Option.TYPES + (
507
    "identkeyval",
508
    "keyval",
509
    "unit",
510
    "bool",
511
    )
512
  TYPE_CHECKER = Option.TYPE_CHECKER.copy()
513
  TYPE_CHECKER["identkeyval"] = check_ident_key_val
514
  TYPE_CHECKER["keyval"] = check_key_val
515
  TYPE_CHECKER["unit"] = check_unit
516
  TYPE_CHECKER["bool"] = check_bool
517

    
518

    
519
# optparse.py sets make_option, so we do it for our own option class, too
520
cli_option = CliOption
521

    
522

    
523
_YORNO = "yes|no"
524

    
525
DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
526
                       help="Increase debugging level")
527

    
528
NOHDR_OPT = cli_option("--no-headers", default=False,
529
                       action="store_true", dest="no_headers",
530
                       help="Don't display column headers")
531

    
532
SEP_OPT = cli_option("--separator", default=None,
533
                     action="store", dest="separator",
534
                     help=("Separator between output fields"
535
                           " (defaults to one space)"))
536

    
537
USEUNITS_OPT = cli_option("--units", default=None,
538
                          dest="units", choices=('h', 'm', 'g', 't'),
539
                          help="Specify units for output (one of hmgt)")
540

    
541
FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
542
                        type="string", metavar="FIELDS",
543
                        help="Comma separated list of output fields")
544

    
545
FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
546
                       default=False, help="Force the operation")
547

    
548
CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
549
                         default=False, help="Do not require confirmation")
550

    
551
TAG_SRC_OPT = cli_option("--from", dest="tags_source",
552
                         default=None, help="File with tag names")
553

    
554
SUBMIT_OPT = cli_option("--submit", dest="submit_only",
555
                        default=False, action="store_true",
556
                        help=("Submit the job and return the job ID, but"
557
                              " don't wait for the job to finish"))
558

    
559
SYNC_OPT = cli_option("--sync", dest="do_locking",
560
                      default=False, action="store_true",
561
                      help=("Grab locks while doing the queries"
562
                            " in order to ensure more consistent results"))
563

    
564
_DRY_RUN_OPT = cli_option("--dry-run", default=False,
565
                          action="store_true",
566
                          help=("Do not execute the operation, just run the"
567
                                " check steps and verify it it could be"
568
                                " executed"))
569

    
570
VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
571
                         action="store_true",
572
                         help="Increase the verbosity of the operation")
573

    
574
DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
575
                              action="store_true", dest="simulate_errors",
576
                              help="Debugging option that makes the operation"
577
                              " treat most runtime checks as failed")
578

    
579
NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
580
                        default=True, action="store_false",
581
                        help="Don't wait for sync (DANGEROUS!)")
582

    
583
DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
584
                               help="Custom disk setup (diskless, file,"
585
                               " plain or drbd)",
586
                               default=None, metavar="TEMPL",
587
                               choices=list(constants.DISK_TEMPLATES))
588

    
589
NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
590
                        help="Do not create any network cards for"
591
                        " the instance")
592

    
593
FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
594
                               help="Relative path under default cluster-wide"
595
                               " file storage dir to store file-based disks",
596
                               default=None, metavar="<DIR>")
597

    
598
FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
599
                                  help="Driver to use for image files",
600
                                  default="loop", metavar="<DRIVER>",
601
                                  choices=list(constants.FILE_DRIVER))
602

    
603
IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
604
                            help="Select nodes for the instance automatically"
605
                            " using the <NAME> iallocator plugin",
606
                            default=None, type="string",
607
                            completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
608

    
609
OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
610
                    metavar="<os>",
611
                    completion_suggest=OPT_COMPL_ONE_OS)
612

    
613
FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
614
                               action="store_true", default=False,
615
                               help="Force an unknown variant")
616

    
617
NO_INSTALL_OPT = cli_option("--no-install", dest="no_install",
618
                            action="store_true", default=False,
619
                            help="Do not install the OS (will"
620
                            " enable no-start)")
621

    
622
BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
623
                         type="keyval", default={},
624
                         help="Backend parameters")
625

    
626
HVOPTS_OPT =  cli_option("-H", "--hypervisor-parameters", type="keyval",
627
                         default={}, dest="hvparams",
628
                         help="Hypervisor parameters")
629

    
630
HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
631
                            help="Hypervisor and hypervisor options, in the"
632
                            " format hypervisor:option=value,option=value,...",
633
                            default=None, type="identkeyval")
634

    
635
HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
636
                        help="Hypervisor and hypervisor options, in the"
637
                        " format hypervisor:option=value,option=value,...",
638
                        default=[], action="append", type="identkeyval")
639

    
640
NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
641
                           action="store_false",
642
                           help="Don't check that the instance's IP"
643
                           " is alive")
644

    
645
NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
646
                             default=True, action="store_false",
647
                             help="Don't check that the instance's name"
648
                             " is resolvable")
649

    
650
NET_OPT = cli_option("--net",
651
                     help="NIC parameters", default=[],
652
                     dest="nics", action="append", type="identkeyval")
653

    
654
DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
655
                      dest="disks", action="append", type="identkeyval")
656

    
657
DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
658
                         help="Comma-separated list of disks"
659
                         " indices to act on (e.g. 0,2) (optional,"
660
                         " defaults to all disks)")
661

    
662
OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
663
                         help="Enforces a single-disk configuration using the"
664
                         " given disk size, in MiB unless a suffix is used",
665
                         default=None, type="unit", metavar="<size>")
666

    
667
IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
668
                                dest="ignore_consistency",
669
                                action="store_true", default=False,
670
                                help="Ignore the consistency of the disks on"
671
                                " the secondary")
672

    
673
NONLIVE_OPT = cli_option("--non-live", dest="live",
674
                         default=True, action="store_false",
675
                         help="Do a non-live migration (this usually means"
676
                         " freeze the instance, save the state, transfer and"
677
                         " only then resume running on the secondary node)")
678

    
679
NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
680
                                help="Target node and optional secondary node",
681
                                metavar="<pnode>[:<snode>]",
682
                                completion_suggest=OPT_COMPL_INST_ADD_NODES)
683

    
684
NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
685
                           action="append", metavar="<node>",
686
                           help="Use only this node (can be used multiple"
687
                           " times, if not given defaults to all nodes)",
688
                           completion_suggest=OPT_COMPL_ONE_NODE)
689

    
690
SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
691
                             metavar="<node>",
692
                             completion_suggest=OPT_COMPL_ONE_NODE)
693

    
694
NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
695
                         action="store_false",
696
                         help="Don't start the instance after creation")
697

    
698
SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
699
                         action="store_true", default=False,
700
                         help="Show command instead of executing it")
701

    
702
CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
703
                         default=False, action="store_true",
704
                         help="Instead of performing the migration, try to"
705
                         " recover from a failed cleanup. This is safe"
706
                         " to run even if the instance is healthy, but it"
707
                         " will create extra replication traffic and "
708
                         " disrupt briefly the replication (like during the"
709
                         " migration")
710

    
711
STATIC_OPT = cli_option("-s", "--static", dest="static",
712
                        action="store_true", default=False,
713
                        help="Only show configuration data, not runtime data")
714

    
715
ALL_OPT = cli_option("--all", dest="show_all",
716
                     default=False, action="store_true",
717
                     help="Show info on all instances on the cluster."
718
                     " This can take a long time to run, use wisely")
719

    
720
SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
721
                           action="store_true", default=False,
722
                           help="Interactive OS reinstall, lists available"
723
                           " OS templates for selection")
724

    
725
IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
726
                                 action="store_true", default=False,
727
                                 help="Remove the instance from the cluster"
728
                                 " configuration even if there are failures"
729
                                 " during the removal process")
730

    
731
NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
732
                               help="Specifies the new secondary node",
733
                               metavar="NODE", default=None,
734
                               completion_suggest=OPT_COMPL_ONE_NODE)
735

    
736
ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
737
                            default=False, action="store_true",
738
                            help="Replace the disk(s) on the primary"
739
                            " node (only for the drbd template)")
740

    
741
ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
742
                              default=False, action="store_true",
743
                              help="Replace the disk(s) on the secondary"
744
                              " node (only for the drbd template)")
745

    
746
AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
747
                              default=False, action="store_true",
748
                              help="Lock all nodes and auto-promote as needed"
749
                              " to MC status")
750

    
751
AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
752
                              default=False, action="store_true",
753
                              help="Automatically replace faulty disks"
754
                              " (only for the drbd template)")
755

    
756
IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
757
                             default=False, action="store_true",
758
                             help="Ignore current recorded size"
759
                             " (useful for forcing activation when"
760
                             " the recorded size is wrong)")
761

    
762
SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
763
                          metavar="<node>",
764
                          completion_suggest=OPT_COMPL_ONE_NODE)
765

    
766
SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
767
                         metavar="<dir>")
768

    
769
SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
770
                              help="Specify the secondary ip for the node",
771
                              metavar="ADDRESS", default=None)
772

    
773
READD_OPT = cli_option("--readd", dest="readd",
774
                       default=False, action="store_true",
775
                       help="Readd old node after replacing it")
776

    
777
NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
778
                                default=True, action="store_false",
779
                                help="Disable SSH key fingerprint checking")
780

    
781

    
782
MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
783
                    type="bool", default=None, metavar=_YORNO,
784
                    help="Set the master_candidate flag on the node")
785

    
786
OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
787
                         type="bool", default=None,
788
                         help="Set the offline flag on the node")
789

    
790
DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
791
                         type="bool", default=None,
792
                         help="Set the drained flag on the node")
793

    
794
ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
795
                             type="bool", default=None, metavar=_YORNO,
796
                             help="Set the allocatable flag on a volume")
797

    
798
NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
799
                               help="Disable support for lvm based instances"
800
                               " (cluster-wide)",
801
                               action="store_false", default=True)
802

    
803
ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
804
                            dest="enabled_hypervisors",
805
                            help="Comma-separated list of hypervisors",
806
                            type="string", default=None)
807

    
808
NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
809
                            type="keyval", default={},
810
                            help="NIC parameters")
811

    
812
CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
813
                         dest="candidate_pool_size", type="int",
814
                         help="Set the candidate pool size")
815

    
816
VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name",
817
                         help="Enables LVM and specifies the volume group"
818
                         " name (cluster-wide) for disk allocation [xenvg]",
819
                         metavar="VG", default=None)
820

    
821
YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
822
                          help="Destroy cluster", action="store_true")
823

    
824
NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
825
                          help="Skip node agreement check (dangerous)",
826
                          action="store_true", default=False)
827

    
828
MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
829
                            help="Specify the mac prefix for the instance IP"
830
                            " addresses, in the format XX:XX:XX",
831
                            metavar="PREFIX",
832
                            default=None)
833

    
834
MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
835
                               help="Specify the node interface (cluster-wide)"
836
                               " on which the master IP address will be added "
837
                               " [%s]" % constants.DEFAULT_BRIDGE,
838
                               metavar="NETDEV",
839
                               default=constants.DEFAULT_BRIDGE)
840

    
841

    
842
GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
843
                                help="Specify the default directory (cluster-"
844
                                "wide) for storing the file-based disks [%s]" %
845
                                constants.DEFAULT_FILE_STORAGE_DIR,
846
                                metavar="DIR",
847
                                default=constants.DEFAULT_FILE_STORAGE_DIR)
848

    
849
NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
850
                                   help="Don't modify /etc/hosts",
851
                                   action="store_false", default=True)
852

    
853
NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
854
                                    help="Don't initialize SSH keys",
855
                                    action="store_false", default=True)
856

    
857
ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
858
                             help="Enable parseable error messages",
859
                             action="store_true", default=False)
860

    
861
NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
862
                          help="Skip N+1 memory redundancy tests",
863
                          action="store_true", default=False)
864

    
865
REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
866
                             help="Type of reboot: soft/hard/full",
867
                             default=constants.INSTANCE_REBOOT_HARD,
868
                             metavar="<REBOOT>",
869
                             choices=list(constants.REBOOT_TYPES))
870

    
871
IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
872
                                    dest="ignore_secondaries",
873
                                    default=False, action="store_true",
874
                                    help="Ignore errors from secondaries")
875

    
876
NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
877
                            action="store_false", default=True,
878
                            help="Don't shutdown the instance (unsafe)")
879

    
880
TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
881
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
882
                         help="Maximum time to wait")
883

    
884
SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
885
                         dest="shutdown_timeout", type="int",
886
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
887
                         help="Maximum time to wait for instance shutdown")
888

    
889
EARLY_RELEASE_OPT = cli_option("--early-release",
890
                               dest="early_release", default=False,
891
                               action="store_true",
892
                               help="Release the locks on the secondary"
893
                               " node(s) early")
894

    
895
NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
896
                                  dest="new_cluster_cert",
897
                                  default=False, action="store_true",
898
                                  help="Generate a new cluster certificate")
899

    
900
RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
901
                           default=None,
902
                           help="File containing new RAPI certificate")
903

    
904
NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
905
                               default=None, action="store_true",
906
                               help=("Generate a new self-signed RAPI"
907
                                     " certificate"))
908

    
909
NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
910
                                    dest="new_confd_hmac_key",
911
                                    default=False, action="store_true",
912
                                    help=("Create a new HMAC key for %s" %
913
                                          constants.CONFD))
914

    
915
USE_REPL_NET_OPT = cli_option("--use-replication-network",
916
                              dest="use_replication_network",
917
                              help="Whether to use the replication network"
918
                              " for talking to the nodes",
919
                              action="store_true", default=False)
920

    
921
MAINTAIN_NODE_HEALTH_OPT = \
922
    cli_option("--maintain-node-health", dest="maintain_node_health",
923
               metavar=_YORNO, default=None, type="bool",
924
               help="Configure the cluster to automatically maintain node"
925
               " health, by shutting down unknown instances, shutting down"
926
               " unknown DRBD devices, etc.")
927

    
928
IDENTIFY_DEFAULTS_OPT = \
929
    cli_option("--identify-defaults", dest="identify_defaults",
930
               default=False, action="store_true",
931
               help="Identify which saved instance parameters are equal to"
932
               " the current cluster defaults and set them as such, instead"
933
               " of marking them as overridden")
934

    
935
UIDPOOL_OPT = cli_option("--uid-pool", default=None,
936
                         action="store", dest="uid_pool",
937
                         help=("A list of user-ids or user-id"
938
                               " ranges separated by commas"))
939

    
940
ADD_UIDS_OPT = cli_option("--add-uids", default=None,
941
                          action="store", dest="add_uids",
942
                          help=("A list of user-ids or user-id"
943
                                " ranges separated by commas, to be"
944
                                " added to the user-id pool"))
945

    
946
REMOVE_UIDS_OPT = cli_option("--remove-uids", default=None,
947
                             action="store", dest="remove_uids",
948
                             help=("A list of user-ids or user-id"
949
                                   " ranges separated by commas, to be"
950
                                   " removed from the user-id pool"))
951

    
952

    
953
def _ParseArgs(argv, commands, aliases):
954
  """Parser for the command line arguments.
955

956
  This function parses the arguments and returns the function which
957
  must be executed together with its (modified) arguments.
958

959
  @param argv: the command line
960
  @param commands: dictionary with special contents, see the design
961
      doc for cmdline handling
962
  @param aliases: dictionary with command aliases {'alias': 'target, ...}
963

964
  """
965
  if len(argv) == 0:
966
    binary = "<command>"
967
  else:
968
    binary = argv[0].split("/")[-1]
969

    
970
  if len(argv) > 1 and argv[1] == "--version":
971
    ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
972
    # Quit right away. That way we don't have to care about this special
973
    # argument. optparse.py does it the same.
974
    sys.exit(0)
975

    
976
  if len(argv) < 2 or not (argv[1] in commands or
977
                           argv[1] in aliases):
978
    # let's do a nice thing
979
    sortedcmds = commands.keys()
980
    sortedcmds.sort()
981

    
982
    ToStdout("Usage: %s {command} [options...] [argument...]", binary)
983
    ToStdout("%s <command> --help to see details, or man %s", binary, binary)
984
    ToStdout("")
985

    
986
    # compute the max line length for cmd + usage
987
    mlen = max([len(" %s" % cmd) for cmd in commands])
988
    mlen = min(60, mlen) # should not get here...
989

    
990
    # and format a nice command list
991
    ToStdout("Commands:")
992
    for cmd in sortedcmds:
993
      cmdstr = " %s" % (cmd,)
994
      help_text = commands[cmd][4]
995
      help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
996
      ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
997
      for line in help_lines:
998
        ToStdout("%-*s   %s", mlen, "", line)
999

    
1000
    ToStdout("")
1001

    
1002
    return None, None, None
1003

    
1004
  # get command, unalias it, and look it up in commands
1005
  cmd = argv.pop(1)
1006
  if cmd in aliases:
1007
    if cmd in commands:
1008
      raise errors.ProgrammerError("Alias '%s' overrides an existing"
1009
                                   " command" % cmd)
1010

    
1011
    if aliases[cmd] not in commands:
1012
      raise errors.ProgrammerError("Alias '%s' maps to non-existing"
1013
                                   " command '%s'" % (cmd, aliases[cmd]))
1014

    
1015
    cmd = aliases[cmd]
1016

    
1017
  func, args_def, parser_opts, usage, description = commands[cmd]
1018
  parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT, DEBUG_OPT],
1019
                        description=description,
1020
                        formatter=TitledHelpFormatter(),
1021
                        usage="%%prog %s %s" % (cmd, usage))
1022
  parser.disable_interspersed_args()
1023
  options, args = parser.parse_args()
1024

    
1025
  if not _CheckArguments(cmd, args_def, args):
1026
    return None, None, None
1027

    
1028
  return func, options, args
1029

    
1030

    
1031
def _CheckArguments(cmd, args_def, args):
1032
  """Verifies the arguments using the argument definition.
1033

1034
  Algorithm:
1035

1036
    1. Abort with error if values specified by user but none expected.
1037

1038
    1. For each argument in definition
1039

1040
      1. Keep running count of minimum number of values (min_count)
1041
      1. Keep running count of maximum number of values (max_count)
1042
      1. If it has an unlimited number of values
1043

1044
        1. Abort with error if it's not the last argument in the definition
1045

1046
    1. If last argument has limited number of values
1047

1048
      1. Abort with error if number of values doesn't match or is too large
1049

1050
    1. Abort with error if user didn't pass enough values (min_count)
1051

1052
  """
1053
  if args and not args_def:
1054
    ToStderr("Error: Command %s expects no arguments", cmd)
1055
    return False
1056

    
1057
  min_count = None
1058
  max_count = None
1059
  check_max = None
1060

    
1061
  last_idx = len(args_def) - 1
1062

    
1063
  for idx, arg in enumerate(args_def):
1064
    if min_count is None:
1065
      min_count = arg.min
1066
    elif arg.min is not None:
1067
      min_count += arg.min
1068

    
1069
    if max_count is None:
1070
      max_count = arg.max
1071
    elif arg.max is not None:
1072
      max_count += arg.max
1073

    
1074
    if idx == last_idx:
1075
      check_max = (arg.max is not None)
1076

    
1077
    elif arg.max is None:
1078
      raise errors.ProgrammerError("Only the last argument can have max=None")
1079

    
1080
  if check_max:
1081
    # Command with exact number of arguments
1082
    if (min_count is not None and max_count is not None and
1083
        min_count == max_count and len(args) != min_count):
1084
      ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
1085
      return False
1086

    
1087
    # Command with limited number of arguments
1088
    if max_count is not None and len(args) > max_count:
1089
      ToStderr("Error: Command %s expects only %d argument(s)",
1090
               cmd, max_count)
1091
      return False
1092

    
1093
  # Command with some required arguments
1094
  if min_count is not None and len(args) < min_count:
1095
    ToStderr("Error: Command %s expects at least %d argument(s)",
1096
             cmd, min_count)
1097
    return False
1098

    
1099
  return True
1100

    
1101

    
1102
def SplitNodeOption(value):
1103
  """Splits the value of a --node option.
1104

1105
  """
1106
  if value and ':' in value:
1107
    return value.split(':', 1)
1108
  else:
1109
    return (value, None)
1110

    
1111

    
1112
def CalculateOSNames(os_name, os_variants):
1113
  """Calculates all the names an OS can be called, according to its variants.
1114

1115
  @type os_name: string
1116
  @param os_name: base name of the os
1117
  @type os_variants: list or None
1118
  @param os_variants: list of supported variants
1119
  @rtype: list
1120
  @return: list of valid names
1121

1122
  """
1123
  if os_variants:
1124
    return ['%s+%s' % (os_name, v) for v in os_variants]
1125
  else:
1126
    return [os_name]
1127

    
1128

    
1129
def UsesRPC(fn):
1130
  def wrapper(*args, **kwargs):
1131
    rpc.Init()
1132
    try:
1133
      return fn(*args, **kwargs)
1134
    finally:
1135
      rpc.Shutdown()
1136
  return wrapper
1137

    
1138

    
1139
def AskUser(text, choices=None):
1140
  """Ask the user a question.
1141

1142
  @param text: the question to ask
1143

1144
  @param choices: list with elements tuples (input_char, return_value,
1145
      description); if not given, it will default to: [('y', True,
1146
      'Perform the operation'), ('n', False, 'Do no do the operation')];
1147
      note that the '?' char is reserved for help
1148

1149
  @return: one of the return values from the choices list; if input is
1150
      not possible (i.e. not running with a tty, we return the last
1151
      entry from the list
1152

1153
  """
1154
  if choices is None:
1155
    choices = [('y', True, 'Perform the operation'),
1156
               ('n', False, 'Do not perform the operation')]
1157
  if not choices or not isinstance(choices, list):
1158
    raise errors.ProgrammerError("Invalid choices argument to AskUser")
1159
  for entry in choices:
1160
    if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
1161
      raise errors.ProgrammerError("Invalid choices element to AskUser")
1162

    
1163
  answer = choices[-1][1]
1164
  new_text = []
1165
  for line in text.splitlines():
1166
    new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
1167
  text = "\n".join(new_text)
1168
  try:
1169
    f = file("/dev/tty", "a+")
1170
  except IOError:
1171
    return answer
1172
  try:
1173
    chars = [entry[0] for entry in choices]
1174
    chars[-1] = "[%s]" % chars[-1]
1175
    chars.append('?')
1176
    maps = dict([(entry[0], entry[1]) for entry in choices])
1177
    while True:
1178
      f.write(text)
1179
      f.write('\n')
1180
      f.write("/".join(chars))
1181
      f.write(": ")
1182
      line = f.readline(2).strip().lower()
1183
      if line in maps:
1184
        answer = maps[line]
1185
        break
1186
      elif line == '?':
1187
        for entry in choices:
1188
          f.write(" %s - %s\n" % (entry[0], entry[2]))
1189
        f.write("\n")
1190
        continue
1191
  finally:
1192
    f.close()
1193
  return answer
1194

    
1195

    
1196
class JobSubmittedException(Exception):
1197
  """Job was submitted, client should exit.
1198

1199
  This exception has one argument, the ID of the job that was
1200
  submitted. The handler should print this ID.
1201

1202
  This is not an error, just a structured way to exit from clients.
1203

1204
  """
1205

    
1206

    
1207
def SendJob(ops, cl=None):
1208
  """Function to submit an opcode without waiting for the results.
1209

1210
  @type ops: list
1211
  @param ops: list of opcodes
1212
  @type cl: luxi.Client
1213
  @param cl: the luxi client to use for communicating with the master;
1214
             if None, a new client will be created
1215

1216
  """
1217
  if cl is None:
1218
    cl = GetClient()
1219

    
1220
  job_id = cl.SubmitJob(ops)
1221

    
1222
  return job_id
1223

    
1224

    
1225
def PollJob(job_id, cl=None, feedback_fn=None):
1226
  """Function to poll for the result of a job.
1227

1228
  @type job_id: job identified
1229
  @param job_id: the job to poll for results
1230
  @type cl: luxi.Client
1231
  @param cl: the luxi client to use for communicating with the master;
1232
             if None, a new client will be created
1233

1234
  """
1235
  if cl is None:
1236
    cl = GetClient()
1237

    
1238
  prev_job_info = None
1239
  prev_logmsg_serial = None
1240

    
1241
  status = None
1242

    
1243
  notified_queued = False
1244
  notified_waitlock = False
1245

    
1246
  while True:
1247
    result = cl.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1248
                                     prev_logmsg_serial)
1249
    if not result:
1250
      # job not found, go away!
1251
      raise errors.JobLost("Job with id %s lost" % job_id)
1252
    elif result == constants.JOB_NOTCHANGED:
1253
      if status is not None and not callable(feedback_fn):
1254
        if status == constants.JOB_STATUS_QUEUED and not notified_queued:
1255
          ToStderr("Job %s is waiting in queue", job_id)
1256
          notified_queued = True
1257
        elif status == constants.JOB_STATUS_WAITLOCK and not notified_waitlock:
1258
          ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1259
          notified_waitlock = True
1260

    
1261
      # Wait again
1262
      continue
1263

    
1264
    # Split result, a tuple of (field values, log entries)
1265
    (job_info, log_entries) = result
1266
    (status, ) = job_info
1267

    
1268
    if log_entries:
1269
      for log_entry in log_entries:
1270
        (serial, timestamp, _, message) = log_entry
1271
        if callable(feedback_fn):
1272
          feedback_fn(log_entry[1:])
1273
        else:
1274
          encoded = utils.SafeEncode(message)
1275
          ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded)
1276
        prev_logmsg_serial = max(prev_logmsg_serial, serial)
1277

    
1278
    # TODO: Handle canceled and archived jobs
1279
    elif status in (constants.JOB_STATUS_SUCCESS,
1280
                    constants.JOB_STATUS_ERROR,
1281
                    constants.JOB_STATUS_CANCELING,
1282
                    constants.JOB_STATUS_CANCELED):
1283
      break
1284

    
1285
    prev_job_info = job_info
1286

    
1287
  jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1288
  if not jobs:
1289
    raise errors.JobLost("Job with id %s lost" % job_id)
1290

    
1291
  status, opstatus, result = jobs[0]
1292
  if status == constants.JOB_STATUS_SUCCESS:
1293
    return result
1294
  elif status in (constants.JOB_STATUS_CANCELING,
1295
                  constants.JOB_STATUS_CANCELED):
1296
    raise errors.OpExecError("Job was canceled")
1297
  else:
1298
    has_ok = False
1299
    for idx, (status, msg) in enumerate(zip(opstatus, result)):
1300
      if status == constants.OP_STATUS_SUCCESS:
1301
        has_ok = True
1302
      elif status == constants.OP_STATUS_ERROR:
1303
        errors.MaybeRaise(msg)
1304
        if has_ok:
1305
          raise errors.OpExecError("partial failure (opcode %d): %s" %
1306
                                   (idx, msg))
1307
        else:
1308
          raise errors.OpExecError(str(msg))
1309
    # default failure mode
1310
    raise errors.OpExecError(result)
1311

    
1312

    
1313
def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):
1314
  """Legacy function to submit an opcode.
1315

1316
  This is just a simple wrapper over the construction of the processor
1317
  instance. It should be extended to better handle feedback and
1318
  interaction functions.
1319

1320
  """
1321
  if cl is None:
1322
    cl = GetClient()
1323

    
1324
  SetGenericOpcodeOpts([op], opts)
1325

    
1326
  job_id = SendJob([op], cl)
1327

    
1328
  op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
1329

    
1330
  return op_results[0]
1331

    
1332

    
1333
def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1334
  """Wrapper around SubmitOpCode or SendJob.
1335

1336
  This function will decide, based on the 'opts' parameter, whether to
1337
  submit and wait for the result of the opcode (and return it), or
1338
  whether to just send the job and print its identifier. It is used in
1339
  order to simplify the implementation of the '--submit' option.
1340

1341
  It will also process the opcodes if we're sending the via SendJob
1342
  (otherwise SubmitOpCode does it).
1343

1344
  """
1345
  if opts and opts.submit_only:
1346
    job = [op]
1347
    SetGenericOpcodeOpts(job, opts)
1348
    job_id = SendJob(job, cl=cl)
1349
    raise JobSubmittedException(job_id)
1350
  else:
1351
    return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1352

    
1353

    
1354
def SetGenericOpcodeOpts(opcode_list, options):
1355
  """Processor for generic options.
1356

1357
  This function updates the given opcodes based on generic command
1358
  line options (like debug, dry-run, etc.).
1359

1360
  @param opcode_list: list of opcodes
1361
  @param options: command line options or None
1362
  @return: None (in-place modification)
1363

1364
  """
1365
  if not options:
1366
    return
1367
  for op in opcode_list:
1368
    op.dry_run = options.dry_run
1369
    op.debug_level = options.debug
1370

    
1371

    
1372
def GetClient():
1373
  # TODO: Cache object?
1374
  try:
1375
    client = luxi.Client()
1376
  except luxi.NoMasterError:
1377
    ss = ssconf.SimpleStore()
1378

    
1379
    # Try to read ssconf file
1380
    try:
1381
      ss.GetMasterNode()
1382
    except errors.ConfigurationError:
1383
      raise errors.OpPrereqError("Cluster not initialized or this machine is"
1384
                                 " not part of a cluster")
1385

    
1386
    master, myself = ssconf.GetMasterAndMyself(ss=ss)
1387
    if master != myself:
1388
      raise errors.OpPrereqError("This is not the master node, please connect"
1389
                                 " to node '%s' and rerun the command" %
1390
                                 master)
1391
    raise
1392
  return client
1393

    
1394

    
1395
def FormatError(err):
1396
  """Return a formatted error message for a given error.
1397

1398
  This function takes an exception instance and returns a tuple
1399
  consisting of two values: first, the recommended exit code, and
1400
  second, a string describing the error message (not
1401
  newline-terminated).
1402

1403
  """
1404
  retcode = 1
1405
  obuf = StringIO()
1406
  msg = str(err)
1407
  if isinstance(err, errors.ConfigurationError):
1408
    txt = "Corrupt configuration file: %s" % msg
1409
    logging.error(txt)
1410
    obuf.write(txt + "\n")
1411
    obuf.write("Aborting.")
1412
    retcode = 2
1413
  elif isinstance(err, errors.HooksAbort):
1414
    obuf.write("Failure: hooks execution failed:\n")
1415
    for node, script, out in err.args[0]:
1416
      if out:
1417
        obuf.write("  node: %s, script: %s, output: %s\n" %
1418
                   (node, script, out))
1419
      else:
1420
        obuf.write("  node: %s, script: %s (no output)\n" %
1421
                   (node, script))
1422
  elif isinstance(err, errors.HooksFailure):
1423
    obuf.write("Failure: hooks general failure: %s" % msg)
1424
  elif isinstance(err, errors.ResolverError):
1425
    this_host = utils.HostInfo.SysName()
1426
    if err.args[0] == this_host:
1427
      msg = "Failure: can't resolve my own hostname ('%s')"
1428
    else:
1429
      msg = "Failure: can't resolve hostname '%s'"
1430
    obuf.write(msg % err.args[0])
1431
  elif isinstance(err, errors.OpPrereqError):
1432
    if len(err.args) == 2:
1433
      obuf.write("Failure: prerequisites not met for this"
1434
               " operation:\nerror type: %s, error details:\n%s" %
1435
                 (err.args[1], err.args[0]))
1436
    else:
1437
      obuf.write("Failure: prerequisites not met for this"
1438
                 " operation:\n%s" % msg)
1439
  elif isinstance(err, errors.OpExecError):
1440
    obuf.write("Failure: command execution error:\n%s" % msg)
1441
  elif isinstance(err, errors.TagError):
1442
    obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1443
  elif isinstance(err, errors.JobQueueDrainError):
1444
    obuf.write("Failure: the job queue is marked for drain and doesn't"
1445
               " accept new requests\n")
1446
  elif isinstance(err, errors.JobQueueFull):
1447
    obuf.write("Failure: the job queue is full and doesn't accept new"
1448
               " job submissions until old jobs are archived\n")
1449
  elif isinstance(err, errors.TypeEnforcementError):
1450
    obuf.write("Parameter Error: %s" % msg)
1451
  elif isinstance(err, errors.ParameterError):
1452
    obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1453
  elif isinstance(err, errors.GenericError):
1454
    obuf.write("Unhandled Ganeti error: %s" % msg)
1455
  elif isinstance(err, luxi.NoMasterError):
1456
    obuf.write("Cannot communicate with the master daemon.\nIs it running"
1457
               " and listening for connections?")
1458
  elif isinstance(err, luxi.TimeoutError):
1459
    obuf.write("Timeout while talking to the master daemon. Error:\n"
1460
               "%s" % msg)
1461
  elif isinstance(err, luxi.ProtocolError):
1462
    obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1463
               "%s" % msg)
1464
  elif isinstance(err, JobSubmittedException):
1465
    obuf.write("JobID: %s\n" % err.args[0])
1466
    retcode = 0
1467
  else:
1468
    obuf.write("Unhandled exception: %s" % msg)
1469
  return retcode, obuf.getvalue().rstrip('\n')
1470

    
1471

    
1472
def GenericMain(commands, override=None, aliases=None):
1473
  """Generic main function for all the gnt-* commands.
1474

1475
  Arguments:
1476
    - commands: a dictionary with a special structure, see the design doc
1477
                for command line handling.
1478
    - override: if not None, we expect a dictionary with keys that will
1479
                override command line options; this can be used to pass
1480
                options from the scripts to generic functions
1481
    - aliases: dictionary with command aliases {'alias': 'target, ...}
1482

1483
  """
1484
  # save the program name and the entire command line for later logging
1485
  if sys.argv:
1486
    binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1487
    if len(sys.argv) >= 2:
1488
      binary += " " + sys.argv[1]
1489
      old_cmdline = " ".join(sys.argv[2:])
1490
    else:
1491
      old_cmdline = ""
1492
  else:
1493
    binary = "<unknown program>"
1494
    old_cmdline = ""
1495

    
1496
  if aliases is None:
1497
    aliases = {}
1498

    
1499
  try:
1500
    func, options, args = _ParseArgs(sys.argv, commands, aliases)
1501
  except errors.ParameterError, err:
1502
    result, err_msg = FormatError(err)
1503
    ToStderr(err_msg)
1504
    return 1
1505

    
1506
  if func is None: # parse error
1507
    return 1
1508

    
1509
  if override is not None:
1510
    for key, val in override.iteritems():
1511
      setattr(options, key, val)
1512

    
1513
  utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1514
                     stderr_logging=True, program=binary)
1515

    
1516
  if old_cmdline:
1517
    logging.info("run with arguments '%s'", old_cmdline)
1518
  else:
1519
    logging.info("run with no arguments")
1520

    
1521
  try:
1522
    result = func(options, args)
1523
  except (errors.GenericError, luxi.ProtocolError,
1524
          JobSubmittedException), err:
1525
    result, err_msg = FormatError(err)
1526
    logging.exception("Error during command processing")
1527
    ToStderr(err_msg)
1528

    
1529
  return result
1530

    
1531

    
1532
def GenericInstanceCreate(mode, opts, args):
1533
  """Add an instance to the cluster via either creation or import.
1534

1535
  @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1536
  @param opts: the command line options selected by the user
1537
  @type args: list
1538
  @param args: should contain only one element, the new instance name
1539
  @rtype: int
1540
  @return: the desired exit code
1541

1542
  """
1543
  instance = args[0]
1544

    
1545
  (pnode, snode) = SplitNodeOption(opts.node)
1546

    
1547
  hypervisor = None
1548
  hvparams = {}
1549
  if opts.hypervisor:
1550
    hypervisor, hvparams = opts.hypervisor
1551

    
1552
  if opts.nics:
1553
    try:
1554
      nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
1555
    except ValueError, err:
1556
      raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
1557
    nics = [{}] * nic_max
1558
    for nidx, ndict in opts.nics:
1559
      nidx = int(nidx)
1560
      if not isinstance(ndict, dict):
1561
        msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict)
1562
        raise errors.OpPrereqError(msg)
1563
      nics[nidx] = ndict
1564
  elif opts.no_nics:
1565
    # no nics
1566
    nics = []
1567
  elif mode == constants.INSTANCE_CREATE:
1568
    # default of one nic, all auto
1569
    nics = [{}]
1570
  else:
1571
    # mode == import
1572
    nics = []
1573

    
1574
  if opts.disk_template == constants.DT_DISKLESS:
1575
    if opts.disks or opts.sd_size is not None:
1576
      raise errors.OpPrereqError("Diskless instance but disk"
1577
                                 " information passed")
1578
    disks = []
1579
  else:
1580
    if (not opts.disks and not opts.sd_size
1581
        and mode == constants.INSTANCE_CREATE):
1582
      raise errors.OpPrereqError("No disk information specified")
1583
    if opts.disks and opts.sd_size is not None:
1584
      raise errors.OpPrereqError("Please use either the '--disk' or"
1585
                                 " '-s' option")
1586
    if opts.sd_size is not None:
1587
      opts.disks = [(0, {"size": opts.sd_size})]
1588

    
1589
    if opts.disks:
1590
      try:
1591
        disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1592
      except ValueError, err:
1593
        raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
1594
      disks = [{}] * disk_max
1595
    else:
1596
      disks = []
1597
    for didx, ddict in opts.disks:
1598
      didx = int(didx)
1599
      if not isinstance(ddict, dict):
1600
        msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1601
        raise errors.OpPrereqError(msg)
1602
      elif "size" in ddict:
1603
        if "adopt" in ddict:
1604
          raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1605
                                     " (disk %d)" % didx)
1606
        try:
1607
          ddict["size"] = utils.ParseUnit(ddict["size"])
1608
        except ValueError, err:
1609
          raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1610
                                     (didx, err))
1611
      elif "adopt" in ddict:
1612
        if mode == constants.INSTANCE_IMPORT:
1613
          raise errors.OpPrereqError("Disk adoption not allowed for instance"
1614
                                     " import")
1615
        ddict["size"] = 0
1616
      else:
1617
        raise errors.OpPrereqError("Missing size or adoption source for"
1618
                                   " disk %d" % didx)
1619
      disks[didx] = ddict
1620

    
1621
  utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
1622
  utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1623

    
1624
  if mode == constants.INSTANCE_CREATE:
1625
    start = opts.start
1626
    os_type = opts.os
1627
    src_node = None
1628
    src_path = None
1629
    no_install = opts.no_install
1630
    identify_defaults = False
1631
  elif mode == constants.INSTANCE_IMPORT:
1632
    start = False
1633
    os_type = None
1634
    src_node = opts.src_node
1635
    src_path = opts.src_dir
1636
    no_install = None
1637
    identify_defaults = opts.identify_defaults
1638
  else:
1639
    raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1640

    
1641
  op = opcodes.OpCreateInstance(instance_name=instance,
1642
                                disks=disks,
1643
                                disk_template=opts.disk_template,
1644
                                nics=nics,
1645
                                pnode=pnode, snode=snode,
1646
                                ip_check=opts.ip_check,
1647
                                name_check=opts.name_check,
1648
                                wait_for_sync=opts.wait_for_sync,
1649
                                file_storage_dir=opts.file_storage_dir,
1650
                                file_driver=opts.file_driver,
1651
                                iallocator=opts.iallocator,
1652
                                hypervisor=hypervisor,
1653
                                hvparams=hvparams,
1654
                                beparams=opts.beparams,
1655
                                mode=mode,
1656
                                start=start,
1657
                                os_type=os_type,
1658
                                src_node=src_node,
1659
                                src_path=src_path,
1660
                                no_install=no_install,
1661
                                identify_defaults=identify_defaults)
1662

    
1663
  SubmitOrSend(op, opts)
1664
  return 0
1665

    
1666

    
1667
class _RunWhileClusterStoppedHelper:
1668
  """Helper class for L{RunWhileClusterStopped} to simplify state management
1669

1670
  """
1671
  def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
1672
    """Initializes this class.
1673

1674
    @type feedback_fn: callable
1675
    @param feedback_fn: Feedback function
1676
    @type cluster_name: string
1677
    @param cluster_name: Cluster name
1678
    @type master_node: string
1679
    @param master_node Master node name
1680
    @type online_nodes: list
1681
    @param online_nodes: List of names of online nodes
1682

1683
    """
1684
    self.feedback_fn = feedback_fn
1685
    self.cluster_name = cluster_name
1686
    self.master_node = master_node
1687
    self.online_nodes = online_nodes
1688

    
1689
    self.ssh = ssh.SshRunner(self.cluster_name)
1690

    
1691
    self.nonmaster_nodes = [name for name in online_nodes
1692
                            if name != master_node]
1693

    
1694
    assert self.master_node not in self.nonmaster_nodes
1695

    
1696
  def _RunCmd(self, node_name, cmd):
1697
    """Runs a command on the local or a remote machine.
1698

1699
    @type node_name: string
1700
    @param node_name: Machine name
1701
    @type cmd: list
1702
    @param cmd: Command
1703

1704
    """
1705
    if node_name is None or node_name == self.master_node:
1706
      # No need to use SSH
1707
      result = utils.RunCmd(cmd)
1708
    else:
1709
      result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
1710

    
1711
    if result.failed:
1712
      errmsg = ["Failed to run command %s" % result.cmd]
1713
      if node_name:
1714
        errmsg.append("on node %s" % node_name)
1715
      errmsg.append(": exitcode %s and error %s" %
1716
                    (result.exit_code, result.output))
1717
      raise errors.OpExecError(" ".join(errmsg))
1718

    
1719
  def Call(self, fn, *args):
1720
    """Call function while all daemons are stopped.
1721

1722
    @type fn: callable
1723
    @param fn: Function to be called
1724

1725
    """
1726
    # Pause watcher by acquiring an exclusive lock on watcher state file
1727
    self.feedback_fn("Blocking watcher")
1728
    watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
1729
    try:
1730
      # TODO: Currently, this just blocks. There's no timeout.
1731
      # TODO: Should it be a shared lock?
1732
      watcher_block.Exclusive(blocking=True)
1733

    
1734
      # Stop master daemons, so that no new jobs can come in and all running
1735
      # ones are finished
1736
      self.feedback_fn("Stopping master daemons")
1737
      self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
1738
      try:
1739
        # Stop daemons on all nodes
1740
        for node_name in self.online_nodes:
1741
          self.feedback_fn("Stopping daemons on %s" % node_name)
1742
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
1743

    
1744
        # All daemons are shut down now
1745
        try:
1746
          return fn(self, *args)
1747
        except Exception, err:
1748
          _, errmsg = FormatError(err)
1749
          logging.exception("Caught exception")
1750
          self.feedback_fn(errmsg)
1751
          raise
1752
      finally:
1753
        # Start cluster again, master node last
1754
        for node_name in self.nonmaster_nodes + [self.master_node]:
1755
          self.feedback_fn("Starting daemons on %s" % node_name)
1756
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
1757
    finally:
1758
      # Resume watcher
1759
      watcher_block.Close()
1760

    
1761

    
1762
def RunWhileClusterStopped(feedback_fn, fn, *args):
1763
  """Calls a function while all cluster daemons are stopped.
1764

1765
  @type feedback_fn: callable
1766
  @param feedback_fn: Feedback function
1767
  @type fn: callable
1768
  @param fn: Function to be called when daemons are stopped
1769

1770
  """
1771
  feedback_fn("Gathering cluster information")
1772

    
1773
  # This ensures we're running on the master daemon
1774
  cl = GetClient()
1775

    
1776
  (cluster_name, master_node) = \
1777
    cl.QueryConfigValues(["cluster_name", "master_node"])
1778

    
1779
  online_nodes = GetOnlineNodes([], cl=cl)
1780

    
1781
  # Don't keep a reference to the client. The master daemon will go away.
1782
  del cl
1783

    
1784
  assert master_node in online_nodes
1785

    
1786
  return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
1787
                                       online_nodes).Call(fn, *args)
1788

    
1789

    
1790
def GenerateTable(headers, fields, separator, data,
1791
                  numfields=None, unitfields=None,
1792
                  units=None):
1793
  """Prints a table with headers and different fields.
1794

1795
  @type headers: dict
1796
  @param headers: dictionary mapping field names to headers for
1797
      the table
1798
  @type fields: list
1799
  @param fields: the field names corresponding to each row in
1800
      the data field
1801
  @param separator: the separator to be used; if this is None,
1802
      the default 'smart' algorithm is used which computes optimal
1803
      field width, otherwise just the separator is used between
1804
      each field
1805
  @type data: list
1806
  @param data: a list of lists, each sublist being one row to be output
1807
  @type numfields: list
1808
  @param numfields: a list with the fields that hold numeric
1809
      values and thus should be right-aligned
1810
  @type unitfields: list
1811
  @param unitfields: a list with the fields that hold numeric
1812
      values that should be formatted with the units field
1813
  @type units: string or None
1814
  @param units: the units we should use for formatting, or None for
1815
      automatic choice (human-readable for non-separator usage, otherwise
1816
      megabytes); this is a one-letter string
1817

1818
  """
1819
  if units is None:
1820
    if separator:
1821
      units = "m"
1822
    else:
1823
      units = "h"
1824

    
1825
  if numfields is None:
1826
    numfields = []
1827
  if unitfields is None:
1828
    unitfields = []
1829

    
1830
  numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
1831
  unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
1832

    
1833
  format_fields = []
1834
  for field in fields:
1835
    if headers and field not in headers:
1836
      # TODO: handle better unknown fields (either revert to old
1837
      # style of raising exception, or deal more intelligently with
1838
      # variable fields)
1839
      headers[field] = field
1840
    if separator is not None:
1841
      format_fields.append("%s")
1842
    elif numfields.Matches(field):
1843
      format_fields.append("%*s")
1844
    else:
1845
      format_fields.append("%-*s")
1846

    
1847
  if separator is None:
1848
    mlens = [0 for name in fields]
1849
    format = ' '.join(format_fields)
1850
  else:
1851
    format = separator.replace("%", "%%").join(format_fields)
1852

    
1853
  for row in data:
1854
    if row is None:
1855
      continue
1856
    for idx, val in enumerate(row):
1857
      if unitfields.Matches(fields[idx]):
1858
        try:
1859
          val = int(val)
1860
        except (TypeError, ValueError):
1861
          pass
1862
        else:
1863
          val = row[idx] = utils.FormatUnit(val, units)
1864
      val = row[idx] = str(val)
1865
      if separator is None:
1866
        mlens[idx] = max(mlens[idx], len(val))
1867

    
1868
  result = []
1869
  if headers:
1870
    args = []
1871
    for idx, name in enumerate(fields):
1872
      hdr = headers[name]
1873
      if separator is None:
1874
        mlens[idx] = max(mlens[idx], len(hdr))
1875
        args.append(mlens[idx])
1876
      args.append(hdr)
1877
    result.append(format % tuple(args))
1878

    
1879
  if separator is None:
1880
    assert len(mlens) == len(fields)
1881

    
1882
    if fields and not numfields.Matches(fields[-1]):
1883
      mlens[-1] = 0
1884

    
1885
  for line in data:
1886
    args = []
1887
    if line is None:
1888
      line = ['-' for _ in fields]
1889
    for idx in range(len(fields)):
1890
      if separator is None:
1891
        args.append(mlens[idx])
1892
      args.append(line[idx])
1893
    result.append(format % tuple(args))
1894

    
1895
  return result
1896

    
1897

    
1898
def FormatTimestamp(ts):
1899
  """Formats a given timestamp.
1900

1901
  @type ts: timestamp
1902
  @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
1903

1904
  @rtype: string
1905
  @return: a string with the formatted timestamp
1906

1907
  """
1908
  if not isinstance (ts, (tuple, list)) or len(ts) != 2:
1909
    return '?'
1910
  sec, usec = ts
1911
  return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
1912

    
1913

    
1914
def ParseTimespec(value):
1915
  """Parse a time specification.
1916

1917
  The following suffixed will be recognized:
1918

1919
    - s: seconds
1920
    - m: minutes
1921
    - h: hours
1922
    - d: day
1923
    - w: weeks
1924

1925
  Without any suffix, the value will be taken to be in seconds.
1926

1927
  """
1928
  value = str(value)
1929
  if not value:
1930
    raise errors.OpPrereqError("Empty time specification passed")
1931
  suffix_map = {
1932
    's': 1,
1933
    'm': 60,
1934
    'h': 3600,
1935
    'd': 86400,
1936
    'w': 604800,
1937
    }
1938
  if value[-1] not in suffix_map:
1939
    try:
1940
      value = int(value)
1941
    except (TypeError, ValueError):
1942
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1943
  else:
1944
    multiplier = suffix_map[value[-1]]
1945
    value = value[:-1]
1946
    if not value: # no data left after stripping the suffix
1947
      raise errors.OpPrereqError("Invalid time specification (only"
1948
                                 " suffix passed)")
1949
    try:
1950
      value = int(value) * multiplier
1951
    except (TypeError, ValueError):
1952
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1953
  return value
1954

    
1955

    
1956
def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
1957
                   filter_master=False):
1958
  """Returns the names of online nodes.
1959

1960
  This function will also log a warning on stderr with the names of
1961
  the online nodes.
1962

1963
  @param nodes: if not empty, use only this subset of nodes (minus the
1964
      offline ones)
1965
  @param cl: if not None, luxi client to use
1966
  @type nowarn: boolean
1967
  @param nowarn: by default, this function will output a note with the
1968
      offline nodes that are skipped; if this parameter is True the
1969
      note is not displayed
1970
  @type secondary_ips: boolean
1971
  @param secondary_ips: if True, return the secondary IPs instead of the
1972
      names, useful for doing network traffic over the replication interface
1973
      (if any)
1974
  @type filter_master: boolean
1975
  @param filter_master: if True, do not return the master node in the list
1976
      (useful in coordination with secondary_ips where we cannot check our
1977
      node name against the list)
1978

1979
  """
1980
  if cl is None:
1981
    cl = GetClient()
1982

    
1983
  if secondary_ips:
1984
    name_idx = 2
1985
  else:
1986
    name_idx = 0
1987

    
1988
  if filter_master:
1989
    master_node = cl.QueryConfigValues(["master_node"])[0]
1990
    filter_fn = lambda x: x != master_node
1991
  else:
1992
    filter_fn = lambda _: True
1993

    
1994
  result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"],
1995
                         use_locking=False)
1996
  offline = [row[0] for row in result if row[1]]
1997
  if offline and not nowarn:
1998
    ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
1999
  return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])]
2000

    
2001

    
2002
def _ToStream(stream, txt, *args):
2003
  """Write a message to a stream, bypassing the logging system
2004

2005
  @type stream: file object
2006
  @param stream: the file to which we should write
2007
  @type txt: str
2008
  @param txt: the message
2009

2010
  """
2011
  if args:
2012
    args = tuple(args)
2013
    stream.write(txt % args)
2014
  else:
2015
    stream.write(txt)
2016
  stream.write('\n')
2017
  stream.flush()
2018

    
2019

    
2020
def ToStdout(txt, *args):
2021
  """Write a message to stdout only, bypassing the logging system
2022

2023
  This is just a wrapper over _ToStream.
2024

2025
  @type txt: str
2026
  @param txt: the message
2027

2028
  """
2029
  _ToStream(sys.stdout, txt, *args)
2030

    
2031

    
2032
def ToStderr(txt, *args):
2033
  """Write a message to stderr only, bypassing the logging system
2034

2035
  This is just a wrapper over _ToStream.
2036

2037
  @type txt: str
2038
  @param txt: the message
2039

2040
  """
2041
  _ToStream(sys.stderr, txt, *args)
2042

    
2043

    
2044
class JobExecutor(object):
2045
  """Class which manages the submission and execution of multiple jobs.
2046

2047
  Note that instances of this class should not be reused between
2048
  GetResults() calls.
2049

2050
  """
2051
  def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
2052
    self.queue = []
2053
    if cl is None:
2054
      cl = GetClient()
2055
    self.cl = cl
2056
    self.verbose = verbose
2057
    self.jobs = []
2058
    self.opts = opts
2059
    self.feedback_fn = feedback_fn
2060

    
2061
  def QueueJob(self, name, *ops):
2062
    """Record a job for later submit.
2063

2064
    @type name: string
2065
    @param name: a description of the job, will be used in WaitJobSet
2066
    """
2067
    SetGenericOpcodeOpts(ops, self.opts)
2068
    self.queue.append((name, ops))
2069

    
2070
  def SubmitPending(self):
2071
    """Submit all pending jobs.
2072

2073
    """
2074
    results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
2075
    for (idx, ((status, data), (name, _))) in enumerate(zip(results,
2076
                                                            self.queue)):
2077
      self.jobs.append((idx, status, data, name))
2078

    
2079
  def _ChooseJob(self):
2080
    """Choose a non-waiting/queued job to poll next.
2081

2082
    """
2083
    assert self.jobs, "_ChooseJob called with empty job list"
2084

    
2085
    result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
2086
    assert result
2087

    
2088
    for job_data, status in zip(self.jobs, result):
2089
      if status[0] in (constants.JOB_STATUS_QUEUED,
2090
                    constants.JOB_STATUS_WAITLOCK,
2091
                    constants.JOB_STATUS_CANCELING):
2092
        # job is still waiting
2093
        continue
2094
      # good candidate found
2095
      self.jobs.remove(job_data)
2096
      return job_data
2097

    
2098
    # no job found
2099
    return self.jobs.pop(0)
2100

    
2101
  def GetResults(self):
2102
    """Wait for and return the results of all jobs.
2103

2104
    @rtype: list
2105
    @return: list of tuples (success, job results), in the same order
2106
        as the submitted jobs; if a job has failed, instead of the result
2107
        there will be the error message
2108

2109
    """
2110
    if not self.jobs:
2111
      self.SubmitPending()
2112
    results = []
2113
    if self.verbose:
2114
      ok_jobs = [row[2] for row in self.jobs if row[1]]
2115
      if ok_jobs:
2116
        ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2117

    
2118
    # first, remove any non-submitted jobs
2119
    self.jobs, failures = utils.partition(self.jobs, lambda x: x[1])
2120
    for idx, _, jid, name in failures:
2121
      ToStderr("Failed to submit job for %s: %s", name, jid)
2122
      results.append((idx, False, jid))
2123

    
2124
    while self.jobs:
2125
      (idx, _, jid, name) = self._ChooseJob()
2126
      ToStdout("Waiting for job %s for %s...", jid, name)
2127
      try:
2128
        job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2129
        success = True
2130
      except (errors.GenericError, luxi.ProtocolError), err:
2131
        _, job_result = FormatError(err)
2132
        success = False
2133
        # the error message will always be shown, verbose or not
2134
        ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
2135

    
2136
      results.append((idx, success, job_result))
2137

    
2138
    # sort based on the index, then drop it
2139
    results.sort()
2140
    results = [i[1:] for i in results]
2141

    
2142
    return results
2143

    
2144
  def WaitOrShow(self, wait):
2145
    """Wait for job results or only print the job IDs.
2146

2147
    @type wait: boolean
2148
    @param wait: whether to wait or not
2149

2150
    """
2151
    if wait:
2152
      return self.GetResults()
2153
    else:
2154
      if not self.jobs:
2155
        self.SubmitPending()
2156
      for _, status, result, name in self.jobs:
2157
        if status:
2158
          ToStdout("%s: %s", result, name)
2159
        else:
2160
          ToStderr("Failure for %s: %s", name, result)