Statistics
| Branch: | Tag: | Revision:

root / lib / cli.py @ 1338f2b4

History | View | Annotate | Download (69.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module dealing with command line parsing"""
23

    
24

    
25
import sys
26
import textwrap
27
import os.path
28
import time
29
import logging
30
from cStringIO import StringIO
31

    
32
from ganeti import utils
33
from ganeti import errors
34
from ganeti import constants
35
from ganeti import opcodes
36
from ganeti import luxi
37
from ganeti import ssconf
38
from ganeti import rpc
39
from ganeti import ssh
40

    
41
from optparse import (OptionParser, TitledHelpFormatter,
42
                      Option, OptionValueError)
43

    
44

    
45
__all__ = [
46
  # Command line options
47
  "ALLOCATABLE_OPT",
48
  "ALL_OPT",
49
  "AUTO_PROMOTE_OPT",
50
  "AUTO_REPLACE_OPT",
51
  "BACKEND_OPT",
52
  "CLEANUP_OPT",
53
  "CONFIRM_OPT",
54
  "CP_SIZE_OPT",
55
  "DEBUG_OPT",
56
  "DEBUG_SIMERR_OPT",
57
  "DISKIDX_OPT",
58
  "DISK_OPT",
59
  "DISK_TEMPLATE_OPT",
60
  "DRAINED_OPT",
61
  "EARLY_RELEASE_OPT",
62
  "ENABLED_HV_OPT",
63
  "ERROR_CODES_OPT",
64
  "FIELDS_OPT",
65
  "FILESTORE_DIR_OPT",
66
  "FILESTORE_DRIVER_OPT",
67
  "FORCE_OPT",
68
  "FORCE_VARIANT_OPT",
69
  "GLOBAL_FILEDIR_OPT",
70
  "HVLIST_OPT",
71
  "HVOPTS_OPT",
72
  "HYPERVISOR_OPT",
73
  "IALLOCATOR_OPT",
74
  "IDENTIFY_DEFAULTS_OPT",
75
  "IGNORE_CONSIST_OPT",
76
  "IGNORE_FAILURES_OPT",
77
  "IGNORE_SECONDARIES_OPT",
78
  "IGNORE_SIZE_OPT",
79
  "MAC_PREFIX_OPT",
80
  "MAINTAIN_NODE_HEALTH_OPT",
81
  "MASTER_NETDEV_OPT",
82
  "MC_OPT",
83
  "NET_OPT",
84
  "NEW_CLUSTER_CERT_OPT",
85
  "NEW_CONFD_HMAC_KEY_OPT",
86
  "NEW_RAPI_CERT_OPT",
87
  "NEW_SECONDARY_OPT",
88
  "NIC_PARAMS_OPT",
89
  "NODE_LIST_OPT",
90
  "NODE_PLACEMENT_OPT",
91
  "NOHDR_OPT",
92
  "NOIPCHECK_OPT",
93
  "NO_INSTALL_OPT",
94
  "NONAMECHECK_OPT",
95
  "NOLVM_STORAGE_OPT",
96
  "NOMODIFY_ETCHOSTS_OPT",
97
  "NOMODIFY_SSH_SETUP_OPT",
98
  "NONICS_OPT",
99
  "NONLIVE_OPT",
100
  "NONPLUS1_OPT",
101
  "NOSHUTDOWN_OPT",
102
  "NOSTART_OPT",
103
  "NOSSH_KEYCHECK_OPT",
104
  "NOVOTING_OPT",
105
  "NWSYNC_OPT",
106
  "ON_PRIMARY_OPT",
107
  "ON_SECONDARY_OPT",
108
  "OFFLINE_OPT",
109
  "OS_OPT",
110
  "OS_SIZE_OPT",
111
  "RAPI_CERT_OPT",
112
  "READD_OPT",
113
  "REBOOT_TYPE_OPT",
114
  "SECONDARY_IP_OPT",
115
  "SELECT_OS_OPT",
116
  "SEP_OPT",
117
  "SHOWCMD_OPT",
118
  "SHUTDOWN_TIMEOUT_OPT",
119
  "SINGLE_NODE_OPT",
120
  "SRC_DIR_OPT",
121
  "SRC_NODE_OPT",
122
  "SUBMIT_OPT",
123
  "STATIC_OPT",
124
  "SYNC_OPT",
125
  "TAG_SRC_OPT",
126
  "TIMEOUT_OPT",
127
  "UIDPOOL_OPT",
128
  "USEUNITS_OPT",
129
  "USE_REPL_NET_OPT",
130
  "VERBOSE_OPT",
131
  "VG_NAME_OPT",
132
  "YES_DOIT_OPT",
133
  # Generic functions for CLI programs
134
  "GenericMain",
135
  "GenericInstanceCreate",
136
  "GetClient",
137
  "GetOnlineNodes",
138
  "JobExecutor",
139
  "JobSubmittedException",
140
  "ParseTimespec",
141
  "RunWhileClusterStopped",
142
  "SubmitOpCode",
143
  "SubmitOrSend",
144
  "UsesRPC",
145
  # Formatting functions
146
  "ToStderr", "ToStdout",
147
  "FormatError",
148
  "GenerateTable",
149
  "AskUser",
150
  "FormatTimestamp",
151
  # Tags functions
152
  "ListTags",
153
  "AddTags",
154
  "RemoveTags",
155
  # command line options support infrastructure
156
  "ARGS_MANY_INSTANCES",
157
  "ARGS_MANY_NODES",
158
  "ARGS_NONE",
159
  "ARGS_ONE_INSTANCE",
160
  "ARGS_ONE_NODE",
161
  "ARGS_ONE_OS",
162
  "ArgChoice",
163
  "ArgCommand",
164
  "ArgFile",
165
  "ArgHost",
166
  "ArgInstance",
167
  "ArgJobId",
168
  "ArgNode",
169
  "ArgOs",
170
  "ArgSuggest",
171
  "ArgUnknown",
172
  "OPT_COMPL_INST_ADD_NODES",
173
  "OPT_COMPL_MANY_NODES",
174
  "OPT_COMPL_ONE_IALLOCATOR",
175
  "OPT_COMPL_ONE_INSTANCE",
176
  "OPT_COMPL_ONE_NODE",
177
  "OPT_COMPL_ONE_OS",
178
  "cli_option",
179
  "SplitNodeOption",
180
  "CalculateOSNames",
181
  ]
182

    
183
NO_PREFIX = "no_"
184
UN_PREFIX = "-"
185

    
186

    
187
class _Argument:
188
  def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
189
    self.min = min
190
    self.max = max
191

    
192
  def __repr__(self):
193
    return ("<%s min=%s max=%s>" %
194
            (self.__class__.__name__, self.min, self.max))
195

    
196

    
197
class ArgSuggest(_Argument):
198
  """Suggesting argument.
199

200
  Value can be any of the ones passed to the constructor.
201

202
  """
203
  # pylint: disable-msg=W0622
204
  def __init__(self, min=0, max=None, choices=None):
205
    _Argument.__init__(self, min=min, max=max)
206
    self.choices = choices
207

    
208
  def __repr__(self):
209
    return ("<%s min=%s max=%s choices=%r>" %
210
            (self.__class__.__name__, self.min, self.max, self.choices))
211

    
212

    
213
class ArgChoice(ArgSuggest):
214
  """Choice argument.
215

216
  Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
217
  but value must be one of the choices.
218

219
  """
220

    
221

    
222
class ArgUnknown(_Argument):
223
  """Unknown argument to program (e.g. determined at runtime).
224

225
  """
226

    
227

    
228
class ArgInstance(_Argument):
229
  """Instances argument.
230

231
  """
232

    
233

    
234
class ArgNode(_Argument):
235
  """Node argument.
236

237
  """
238

    
239
class ArgJobId(_Argument):
240
  """Job ID argument.
241

242
  """
243

    
244

    
245
class ArgFile(_Argument):
246
  """File path argument.
247

248
  """
249

    
250

    
251
class ArgCommand(_Argument):
252
  """Command argument.
253

254
  """
255

    
256

    
257
class ArgHost(_Argument):
258
  """Host argument.
259

260
  """
261

    
262

    
263
class ArgOs(_Argument):
264
  """OS argument.
265

266
  """
267

    
268

    
269
ARGS_NONE = []
270
ARGS_MANY_INSTANCES = [ArgInstance()]
271
ARGS_MANY_NODES = [ArgNode()]
272
ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
273
ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
274
ARGS_ONE_OS = [ArgOs(min=1, max=1)]
275

    
276

    
277
def _ExtractTagsObject(opts, args):
278
  """Extract the tag type object.
279

280
  Note that this function will modify its args parameter.
281

282
  """
283
  if not hasattr(opts, "tag_type"):
284
    raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
285
  kind = opts.tag_type
286
  if kind == constants.TAG_CLUSTER:
287
    retval = kind, kind
288
  elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
289
    if not args:
290
      raise errors.OpPrereqError("no arguments passed to the command")
291
    name = args.pop(0)
292
    retval = kind, name
293
  else:
294
    raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
295
  return retval
296

    
297

    
298
def _ExtendTags(opts, args):
299
  """Extend the args if a source file has been given.
300

301
  This function will extend the tags with the contents of the file
302
  passed in the 'tags_source' attribute of the opts parameter. A file
303
  named '-' will be replaced by stdin.
304

305
  """
306
  fname = opts.tags_source
307
  if fname is None:
308
    return
309
  if fname == "-":
310
    new_fh = sys.stdin
311
  else:
312
    new_fh = open(fname, "r")
313
  new_data = []
314
  try:
315
    # we don't use the nice 'new_data = [line.strip() for line in fh]'
316
    # because of python bug 1633941
317
    while True:
318
      line = new_fh.readline()
319
      if not line:
320
        break
321
      new_data.append(line.strip())
322
  finally:
323
    new_fh.close()
324
  args.extend(new_data)
325

    
326

    
327
def ListTags(opts, args):
328
  """List the tags on a given object.
329

330
  This is a generic implementation that knows how to deal with all
331
  three cases of tag objects (cluster, node, instance). The opts
332
  argument is expected to contain a tag_type field denoting what
333
  object type we work on.
334

335
  """
336
  kind, name = _ExtractTagsObject(opts, args)
337
  cl = GetClient()
338
  result = cl.QueryTags(kind, name)
339
  result = list(result)
340
  result.sort()
341
  for tag in result:
342
    ToStdout(tag)
343

    
344

    
345
def AddTags(opts, args):
346
  """Add tags on a given object.
347

348
  This is a generic implementation that knows how to deal with all
349
  three cases of tag objects (cluster, node, instance). The opts
350
  argument is expected to contain a tag_type field denoting what
351
  object type we work on.
352

353
  """
354
  kind, name = _ExtractTagsObject(opts, args)
355
  _ExtendTags(opts, args)
356
  if not args:
357
    raise errors.OpPrereqError("No tags to be added")
358
  op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
359
  SubmitOpCode(op)
360

    
361

    
362
def RemoveTags(opts, args):
363
  """Remove tags from a given object.
364

365
  This is a generic implementation that knows how to deal with all
366
  three cases of tag objects (cluster, node, instance). The opts
367
  argument is expected to contain a tag_type field denoting what
368
  object type we work on.
369

370
  """
371
  kind, name = _ExtractTagsObject(opts, args)
372
  _ExtendTags(opts, args)
373
  if not args:
374
    raise errors.OpPrereqError("No tags to be removed")
375
  op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
376
  SubmitOpCode(op)
377

    
378

    
379
def check_unit(option, opt, value): # pylint: disable-msg=W0613
380
  """OptParsers custom converter for units.
381

382
  """
383
  try:
384
    return utils.ParseUnit(value)
385
  except errors.UnitParseError, err:
386
    raise OptionValueError("option %s: %s" % (opt, err))
387

    
388

    
389
def _SplitKeyVal(opt, data):
390
  """Convert a KeyVal string into a dict.
391

392
  This function will convert a key=val[,...] string into a dict. Empty
393
  values will be converted specially: keys which have the prefix 'no_'
394
  will have the value=False and the prefix stripped, the others will
395
  have value=True.
396

397
  @type opt: string
398
  @param opt: a string holding the option name for which we process the
399
      data, used in building error messages
400
  @type data: string
401
  @param data: a string of the format key=val,key=val,...
402
  @rtype: dict
403
  @return: {key=val, key=val}
404
  @raises errors.ParameterError: if there are duplicate keys
405

406
  """
407
  kv_dict = {}
408
  if data:
409
    for elem in utils.UnescapeAndSplit(data, sep=","):
410
      if "=" in elem:
411
        key, val = elem.split("=", 1)
412
      else:
413
        if elem.startswith(NO_PREFIX):
414
          key, val = elem[len(NO_PREFIX):], False
415
        elif elem.startswith(UN_PREFIX):
416
          key, val = elem[len(UN_PREFIX):], None
417
        else:
418
          key, val = elem, True
419
      if key in kv_dict:
420
        raise errors.ParameterError("Duplicate key '%s' in option %s" %
421
                                    (key, opt))
422
      kv_dict[key] = val
423
  return kv_dict
424

    
425

    
426
def check_ident_key_val(option, opt, value):  # pylint: disable-msg=W0613
427
  """Custom parser for ident:key=val,key=val options.
428

429
  This will store the parsed values as a tuple (ident, {key: val}). As such,
430
  multiple uses of this option via action=append is possible.
431

432
  """
433
  if ":" not in value:
434
    ident, rest = value, ''
435
  else:
436
    ident, rest = value.split(":", 1)
437

    
438
  if ident.startswith(NO_PREFIX):
439
    if rest:
440
      msg = "Cannot pass options when removing parameter groups: %s" % value
441
      raise errors.ParameterError(msg)
442
    retval = (ident[len(NO_PREFIX):], False)
443
  elif ident.startswith(UN_PREFIX):
444
    if rest:
445
      msg = "Cannot pass options when removing parameter groups: %s" % value
446
      raise errors.ParameterError(msg)
447
    retval = (ident[len(UN_PREFIX):], None)
448
  else:
449
    kv_dict = _SplitKeyVal(opt, rest)
450
    retval = (ident, kv_dict)
451
  return retval
452

    
453

    
454
def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
455
  """Custom parser class for key=val,key=val options.
456

457
  This will store the parsed values as a dict {key: val}.
458

459
  """
460
  return _SplitKeyVal(opt, value)
461

    
462

    
463
def check_bool(option, opt, value): # pylint: disable-msg=W0613
464
  """Custom parser for yes/no options.
465

466
  This will store the parsed value as either True or False.
467

468
  """
469
  value = value.lower()
470
  if value == constants.VALUE_FALSE or value == "no":
471
    return False
472
  elif value == constants.VALUE_TRUE or value == "yes":
473
    return True
474
  else:
475
    raise errors.ParameterError("Invalid boolean value '%s'" % value)
476

    
477

    
478
# completion_suggestion is normally a list. Using numeric values not evaluating
479
# to False for dynamic completion.
480
(OPT_COMPL_MANY_NODES,
481
 OPT_COMPL_ONE_NODE,
482
 OPT_COMPL_ONE_INSTANCE,
483
 OPT_COMPL_ONE_OS,
484
 OPT_COMPL_ONE_IALLOCATOR,
485
 OPT_COMPL_INST_ADD_NODES) = range(100, 106)
486

    
487
OPT_COMPL_ALL = frozenset([
488
  OPT_COMPL_MANY_NODES,
489
  OPT_COMPL_ONE_NODE,
490
  OPT_COMPL_ONE_INSTANCE,
491
  OPT_COMPL_ONE_OS,
492
  OPT_COMPL_ONE_IALLOCATOR,
493
  OPT_COMPL_INST_ADD_NODES,
494
  ])
495

    
496

    
497
class CliOption(Option):
498
  """Custom option class for optparse.
499

500
  """
501
  ATTRS = Option.ATTRS + [
502
    "completion_suggest",
503
    ]
504
  TYPES = Option.TYPES + (
505
    "identkeyval",
506
    "keyval",
507
    "unit",
508
    "bool",
509
    )
510
  TYPE_CHECKER = Option.TYPE_CHECKER.copy()
511
  TYPE_CHECKER["identkeyval"] = check_ident_key_val
512
  TYPE_CHECKER["keyval"] = check_key_val
513
  TYPE_CHECKER["unit"] = check_unit
514
  TYPE_CHECKER["bool"] = check_bool
515

    
516

    
517
# optparse.py sets make_option, so we do it for our own option class, too
518
cli_option = CliOption
519

    
520

    
521
_YORNO = "yes|no"
522

    
523
DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
524
                       help="Increase debugging level")
525

    
526
NOHDR_OPT = cli_option("--no-headers", default=False,
527
                       action="store_true", dest="no_headers",
528
                       help="Don't display column headers")
529

    
530
SEP_OPT = cli_option("--separator", default=None,
531
                     action="store", dest="separator",
532
                     help=("Separator between output fields"
533
                           " (defaults to one space)"))
534

    
535
USEUNITS_OPT = cli_option("--units", default=None,
536
                          dest="units", choices=('h', 'm', 'g', 't'),
537
                          help="Specify units for output (one of hmgt)")
538

    
539
FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
540
                        type="string", metavar="FIELDS",
541
                        help="Comma separated list of output fields")
542

    
543
FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
544
                       default=False, help="Force the operation")
545

    
546
CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
547
                         default=False, help="Do not require confirmation")
548

    
549
TAG_SRC_OPT = cli_option("--from", dest="tags_source",
550
                         default=None, help="File with tag names")
551

    
552
SUBMIT_OPT = cli_option("--submit", dest="submit_only",
553
                        default=False, action="store_true",
554
                        help=("Submit the job and return the job ID, but"
555
                              " don't wait for the job to finish"))
556

    
557
SYNC_OPT = cli_option("--sync", dest="do_locking",
558
                      default=False, action="store_true",
559
                      help=("Grab locks while doing the queries"
560
                            " in order to ensure more consistent results"))
561

    
562
_DRY_RUN_OPT = cli_option("--dry-run", default=False,
563
                          action="store_true",
564
                          help=("Do not execute the operation, just run the"
565
                                " check steps and verify it it could be"
566
                                " executed"))
567

    
568
VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
569
                         action="store_true",
570
                         help="Increase the verbosity of the operation")
571

    
572
DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
573
                              action="store_true", dest="simulate_errors",
574
                              help="Debugging option that makes the operation"
575
                              " treat most runtime checks as failed")
576

    
577
NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
578
                        default=True, action="store_false",
579
                        help="Don't wait for sync (DANGEROUS!)")
580

    
581
DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
582
                               help="Custom disk setup (diskless, file,"
583
                               " plain or drbd)",
584
                               default=None, metavar="TEMPL",
585
                               choices=list(constants.DISK_TEMPLATES))
586

    
587
NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
588
                        help="Do not create any network cards for"
589
                        " the instance")
590

    
591
FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
592
                               help="Relative path under default cluster-wide"
593
                               " file storage dir to store file-based disks",
594
                               default=None, metavar="<DIR>")
595

    
596
FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
597
                                  help="Driver to use for image files",
598
                                  default="loop", metavar="<DRIVER>",
599
                                  choices=list(constants.FILE_DRIVER))
600

    
601
IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
602
                            help="Select nodes for the instance automatically"
603
                            " using the <NAME> iallocator plugin",
604
                            default=None, type="string",
605
                            completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
606

    
607
OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
608
                    metavar="<os>",
609
                    completion_suggest=OPT_COMPL_ONE_OS)
610

    
611
FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
612
                               action="store_true", default=False,
613
                               help="Force an unknown variant")
614

    
615
NO_INSTALL_OPT = cli_option("--no-install", dest="no_install",
616
                            action="store_true", default=False,
617
                            help="Do not install the OS (will"
618
                            " enable no-start)")
619

    
620
BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
621
                         type="keyval", default={},
622
                         help="Backend parameters")
623

    
624
HVOPTS_OPT =  cli_option("-H", "--hypervisor-parameters", type="keyval",
625
                         default={}, dest="hvparams",
626
                         help="Hypervisor parameters")
627

    
628
HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
629
                            help="Hypervisor and hypervisor options, in the"
630
                            " format hypervisor:option=value,option=value,...",
631
                            default=None, type="identkeyval")
632

    
633
HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
634
                        help="Hypervisor and hypervisor options, in the"
635
                        " format hypervisor:option=value,option=value,...",
636
                        default=[], action="append", type="identkeyval")
637

    
638
NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
639
                           action="store_false",
640
                           help="Don't check that the instance's IP"
641
                           " is alive")
642

    
643
NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
644
                             default=True, action="store_false",
645
                             help="Don't check that the instance's name"
646
                             " is resolvable")
647

    
648
NET_OPT = cli_option("--net",
649
                     help="NIC parameters", default=[],
650
                     dest="nics", action="append", type="identkeyval")
651

    
652
DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
653
                      dest="disks", action="append", type="identkeyval")
654

    
655
DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
656
                         help="Comma-separated list of disks"
657
                         " indices to act on (e.g. 0,2) (optional,"
658
                         " defaults to all disks)")
659

    
660
OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
661
                         help="Enforces a single-disk configuration using the"
662
                         " given disk size, in MiB unless a suffix is used",
663
                         default=None, type="unit", metavar="<size>")
664

    
665
IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
666
                                dest="ignore_consistency",
667
                                action="store_true", default=False,
668
                                help="Ignore the consistency of the disks on"
669
                                " the secondary")
670

    
671
NONLIVE_OPT = cli_option("--non-live", dest="live",
672
                         default=True, action="store_false",
673
                         help="Do a non-live migration (this usually means"
674
                         " freeze the instance, save the state, transfer and"
675
                         " only then resume running on the secondary node)")
676

    
677
NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
678
                                help="Target node and optional secondary node",
679
                                metavar="<pnode>[:<snode>]",
680
                                completion_suggest=OPT_COMPL_INST_ADD_NODES)
681

    
682
NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
683
                           action="append", metavar="<node>",
684
                           help="Use only this node (can be used multiple"
685
                           " times, if not given defaults to all nodes)",
686
                           completion_suggest=OPT_COMPL_ONE_NODE)
687

    
688
SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
689
                             metavar="<node>",
690
                             completion_suggest=OPT_COMPL_ONE_NODE)
691

    
692
NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
693
                         action="store_false",
694
                         help="Don't start the instance after creation")
695

    
696
SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
697
                         action="store_true", default=False,
698
                         help="Show command instead of executing it")
699

    
700
CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
701
                         default=False, action="store_true",
702
                         help="Instead of performing the migration, try to"
703
                         " recover from a failed cleanup. This is safe"
704
                         " to run even if the instance is healthy, but it"
705
                         " will create extra replication traffic and "
706
                         " disrupt briefly the replication (like during the"
707
                         " migration")
708

    
709
STATIC_OPT = cli_option("-s", "--static", dest="static",
710
                        action="store_true", default=False,
711
                        help="Only show configuration data, not runtime data")
712

    
713
ALL_OPT = cli_option("--all", dest="show_all",
714
                     default=False, action="store_true",
715
                     help="Show info on all instances on the cluster."
716
                     " This can take a long time to run, use wisely")
717

    
718
SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
719
                           action="store_true", default=False,
720
                           help="Interactive OS reinstall, lists available"
721
                           " OS templates for selection")
722

    
723
IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
724
                                 action="store_true", default=False,
725
                                 help="Remove the instance from the cluster"
726
                                 " configuration even if there are failures"
727
                                 " during the removal process")
728

    
729
NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
730
                               help="Specifies the new secondary node",
731
                               metavar="NODE", default=None,
732
                               completion_suggest=OPT_COMPL_ONE_NODE)
733

    
734
ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
735
                            default=False, action="store_true",
736
                            help="Replace the disk(s) on the primary"
737
                            " node (only for the drbd template)")
738

    
739
ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
740
                              default=False, action="store_true",
741
                              help="Replace the disk(s) on the secondary"
742
                              " node (only for the drbd template)")
743

    
744
AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
745
                              default=False, action="store_true",
746
                              help="Lock all nodes and auto-promote as needed"
747
                              " to MC status")
748

    
749
AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
750
                              default=False, action="store_true",
751
                              help="Automatically replace faulty disks"
752
                              " (only for the drbd template)")
753

    
754
IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
755
                             default=False, action="store_true",
756
                             help="Ignore current recorded size"
757
                             " (useful for forcing activation when"
758
                             " the recorded size is wrong)")
759

    
760
SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
761
                          metavar="<node>",
762
                          completion_suggest=OPT_COMPL_ONE_NODE)
763

    
764
SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
765
                         metavar="<dir>")
766

    
767
SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
768
                              help="Specify the secondary ip for the node",
769
                              metavar="ADDRESS", default=None)
770

    
771
READD_OPT = cli_option("--readd", dest="readd",
772
                       default=False, action="store_true",
773
                       help="Readd old node after replacing it")
774

    
775
NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
776
                                default=True, action="store_false",
777
                                help="Disable SSH key fingerprint checking")
778

    
779

    
780
MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
781
                    type="bool", default=None, metavar=_YORNO,
782
                    help="Set the master_candidate flag on the node")
783

    
784
OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
785
                         type="bool", default=None,
786
                         help="Set the offline flag on the node")
787

    
788
DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
789
                         type="bool", default=None,
790
                         help="Set the drained flag on the node")
791

    
792
ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
793
                             type="bool", default=None, metavar=_YORNO,
794
                             help="Set the allocatable flag on a volume")
795

    
796
NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
797
                               help="Disable support for lvm based instances"
798
                               " (cluster-wide)",
799
                               action="store_false", default=True)
800

    
801
ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
802
                            dest="enabled_hypervisors",
803
                            help="Comma-separated list of hypervisors",
804
                            type="string", default=None)
805

    
806
NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
807
                            type="keyval", default={},
808
                            help="NIC parameters")
809

    
810
CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
811
                         dest="candidate_pool_size", type="int",
812
                         help="Set the candidate pool size")
813

    
814
VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name",
815
                         help="Enables LVM and specifies the volume group"
816
                         " name (cluster-wide) for disk allocation [xenvg]",
817
                         metavar="VG", default=None)
818

    
819
YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
820
                          help="Destroy cluster", action="store_true")
821

    
822
NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
823
                          help="Skip node agreement check (dangerous)",
824
                          action="store_true", default=False)
825

    
826
MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
827
                            help="Specify the mac prefix for the instance IP"
828
                            " addresses, in the format XX:XX:XX",
829
                            metavar="PREFIX",
830
                            default=None)
831

    
832
MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
833
                               help="Specify the node interface (cluster-wide)"
834
                               " on which the master IP address will be added "
835
                               " [%s]" % constants.DEFAULT_BRIDGE,
836
                               metavar="NETDEV",
837
                               default=constants.DEFAULT_BRIDGE)
838

    
839

    
840
GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
841
                                help="Specify the default directory (cluster-"
842
                                "wide) for storing the file-based disks [%s]" %
843
                                constants.DEFAULT_FILE_STORAGE_DIR,
844
                                metavar="DIR",
845
                                default=constants.DEFAULT_FILE_STORAGE_DIR)
846

    
847
NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
848
                                   help="Don't modify /etc/hosts",
849
                                   action="store_false", default=True)
850

    
851
NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
852
                                    help="Don't initialize SSH keys",
853
                                    action="store_false", default=True)
854

    
855
ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
856
                             help="Enable parseable error messages",
857
                             action="store_true", default=False)
858

    
859
NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
860
                          help="Skip N+1 memory redundancy tests",
861
                          action="store_true", default=False)
862

    
863
REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
864
                             help="Type of reboot: soft/hard/full",
865
                             default=constants.INSTANCE_REBOOT_HARD,
866
                             metavar="<REBOOT>",
867
                             choices=list(constants.REBOOT_TYPES))
868

    
869
IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
870
                                    dest="ignore_secondaries",
871
                                    default=False, action="store_true",
872
                                    help="Ignore errors from secondaries")
873

    
874
NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
875
                            action="store_false", default=True,
876
                            help="Don't shutdown the instance (unsafe)")
877

    
878
TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
879
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
880
                         help="Maximum time to wait")
881

    
882
SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
883
                         dest="shutdown_timeout", type="int",
884
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
885
                         help="Maximum time to wait for instance shutdown")
886

    
887
EARLY_RELEASE_OPT = cli_option("--early-release",
888
                               dest="early_release", default=False,
889
                               action="store_true",
890
                               help="Release the locks on the secondary"
891
                               " node(s) early")
892

    
893
NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
894
                                  dest="new_cluster_cert",
895
                                  default=False, action="store_true",
896
                                  help="Generate a new cluster certificate")
897

    
898
RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
899
                           default=None,
900
                           help="File containing new RAPI certificate")
901

    
902
NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
903
                               default=None, action="store_true",
904
                               help=("Generate a new self-signed RAPI"
905
                                     " certificate"))
906

    
907
NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
908
                                    dest="new_confd_hmac_key",
909
                                    default=False, action="store_true",
910
                                    help=("Create a new HMAC key for %s" %
911
                                          constants.CONFD))
912

    
913
USE_REPL_NET_OPT = cli_option("--use-replication-network",
914
                              dest="use_replication_network",
915
                              help="Whether to use the replication network"
916
                              " for talking to the nodes",
917
                              action="store_true", default=False)
918

    
919
MAINTAIN_NODE_HEALTH_OPT = \
920
    cli_option("--maintain-node-health", dest="maintain_node_health",
921
               metavar=_YORNO, default=None, type="bool",
922
               help="Configure the cluster to automatically maintain node"
923
               " health, by shutting down unknown instances, shutting down"
924
               " unknown DRBD devices, etc.")
925

    
926
IDENTIFY_DEFAULTS_OPT = \
927
    cli_option("--identify-defaults", dest="identify_defaults",
928
               default=False, action="store_true",
929
               help="Identify which saved instance parameters are equal to"
930
               " the current cluster defaults and set them as such, instead"
931
               " of marking them as overridden")
932

    
933
UIDPOOL_OPT = cli_option("--uid-pool", default=None,
934
                         action="store", dest="uid_pool",
935
                         help=("A list of user-ids or user-id"
936
                               " ranges separated by commas"))
937

    
938

    
939
def _ParseArgs(argv, commands, aliases):
940
  """Parser for the command line arguments.
941

942
  This function parses the arguments and returns the function which
943
  must be executed together with its (modified) arguments.
944

945
  @param argv: the command line
946
  @param commands: dictionary with special contents, see the design
947
      doc for cmdline handling
948
  @param aliases: dictionary with command aliases {'alias': 'target, ...}
949

950
  """
951
  if len(argv) == 0:
952
    binary = "<command>"
953
  else:
954
    binary = argv[0].split("/")[-1]
955

    
956
  if len(argv) > 1 and argv[1] == "--version":
957
    ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
958
    # Quit right away. That way we don't have to care about this special
959
    # argument. optparse.py does it the same.
960
    sys.exit(0)
961

    
962
  if len(argv) < 2 or not (argv[1] in commands or
963
                           argv[1] in aliases):
964
    # let's do a nice thing
965
    sortedcmds = commands.keys()
966
    sortedcmds.sort()
967

    
968
    ToStdout("Usage: %s {command} [options...] [argument...]", binary)
969
    ToStdout("%s <command> --help to see details, or man %s", binary, binary)
970
    ToStdout("")
971

    
972
    # compute the max line length for cmd + usage
973
    mlen = max([len(" %s" % cmd) for cmd in commands])
974
    mlen = min(60, mlen) # should not get here...
975

    
976
    # and format a nice command list
977
    ToStdout("Commands:")
978
    for cmd in sortedcmds:
979
      cmdstr = " %s" % (cmd,)
980
      help_text = commands[cmd][4]
981
      help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
982
      ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
983
      for line in help_lines:
984
        ToStdout("%-*s   %s", mlen, "", line)
985

    
986
    ToStdout("")
987

    
988
    return None, None, None
989

    
990
  # get command, unalias it, and look it up in commands
991
  cmd = argv.pop(1)
992
  if cmd in aliases:
993
    if cmd in commands:
994
      raise errors.ProgrammerError("Alias '%s' overrides an existing"
995
                                   " command" % cmd)
996

    
997
    if aliases[cmd] not in commands:
998
      raise errors.ProgrammerError("Alias '%s' maps to non-existing"
999
                                   " command '%s'" % (cmd, aliases[cmd]))
1000

    
1001
    cmd = aliases[cmd]
1002

    
1003
  func, args_def, parser_opts, usage, description = commands[cmd]
1004
  parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT, DEBUG_OPT],
1005
                        description=description,
1006
                        formatter=TitledHelpFormatter(),
1007
                        usage="%%prog %s %s" % (cmd, usage))
1008
  parser.disable_interspersed_args()
1009
  options, args = parser.parse_args()
1010

    
1011
  if not _CheckArguments(cmd, args_def, args):
1012
    return None, None, None
1013

    
1014
  return func, options, args
1015

    
1016

    
1017
def _CheckArguments(cmd, args_def, args):
1018
  """Verifies the arguments using the argument definition.
1019

1020
  Algorithm:
1021

1022
    1. Abort with error if values specified by user but none expected.
1023

1024
    1. For each argument in definition
1025

1026
      1. Keep running count of minimum number of values (min_count)
1027
      1. Keep running count of maximum number of values (max_count)
1028
      1. If it has an unlimited number of values
1029

1030
        1. Abort with error if it's not the last argument in the definition
1031

1032
    1. If last argument has limited number of values
1033

1034
      1. Abort with error if number of values doesn't match or is too large
1035

1036
    1. Abort with error if user didn't pass enough values (min_count)
1037

1038
  """
1039
  if args and not args_def:
1040
    ToStderr("Error: Command %s expects no arguments", cmd)
1041
    return False
1042

    
1043
  min_count = None
1044
  max_count = None
1045
  check_max = None
1046

    
1047
  last_idx = len(args_def) - 1
1048

    
1049
  for idx, arg in enumerate(args_def):
1050
    if min_count is None:
1051
      min_count = arg.min
1052
    elif arg.min is not None:
1053
      min_count += arg.min
1054

    
1055
    if max_count is None:
1056
      max_count = arg.max
1057
    elif arg.max is not None:
1058
      max_count += arg.max
1059

    
1060
    if idx == last_idx:
1061
      check_max = (arg.max is not None)
1062

    
1063
    elif arg.max is None:
1064
      raise errors.ProgrammerError("Only the last argument can have max=None")
1065

    
1066
  if check_max:
1067
    # Command with exact number of arguments
1068
    if (min_count is not None and max_count is not None and
1069
        min_count == max_count and len(args) != min_count):
1070
      ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
1071
      return False
1072

    
1073
    # Command with limited number of arguments
1074
    if max_count is not None and len(args) > max_count:
1075
      ToStderr("Error: Command %s expects only %d argument(s)",
1076
               cmd, max_count)
1077
      return False
1078

    
1079
  # Command with some required arguments
1080
  if min_count is not None and len(args) < min_count:
1081
    ToStderr("Error: Command %s expects at least %d argument(s)",
1082
             cmd, min_count)
1083
    return False
1084

    
1085
  return True
1086

    
1087

    
1088
def SplitNodeOption(value):
1089
  """Splits the value of a --node option.
1090

1091
  """
1092
  if value and ':' in value:
1093
    return value.split(':', 1)
1094
  else:
1095
    return (value, None)
1096

    
1097

    
1098
def CalculateOSNames(os_name, os_variants):
1099
  """Calculates all the names an OS can be called, according to its variants.
1100

1101
  @type os_name: string
1102
  @param os_name: base name of the os
1103
  @type os_variants: list or None
1104
  @param os_variants: list of supported variants
1105
  @rtype: list
1106
  @return: list of valid names
1107

1108
  """
1109
  if os_variants:
1110
    return ['%s+%s' % (os_name, v) for v in os_variants]
1111
  else:
1112
    return [os_name]
1113

    
1114

    
1115
def UsesRPC(fn):
1116
  def wrapper(*args, **kwargs):
1117
    rpc.Init()
1118
    try:
1119
      return fn(*args, **kwargs)
1120
    finally:
1121
      rpc.Shutdown()
1122
  return wrapper
1123

    
1124

    
1125
def AskUser(text, choices=None):
1126
  """Ask the user a question.
1127

1128
  @param text: the question to ask
1129

1130
  @param choices: list with elements tuples (input_char, return_value,
1131
      description); if not given, it will default to: [('y', True,
1132
      'Perform the operation'), ('n', False, 'Do no do the operation')];
1133
      note that the '?' char is reserved for help
1134

1135
  @return: one of the return values from the choices list; if input is
1136
      not possible (i.e. not running with a tty, we return the last
1137
      entry from the list
1138

1139
  """
1140
  if choices is None:
1141
    choices = [('y', True, 'Perform the operation'),
1142
               ('n', False, 'Do not perform the operation')]
1143
  if not choices or not isinstance(choices, list):
1144
    raise errors.ProgrammerError("Invalid choices argument to AskUser")
1145
  for entry in choices:
1146
    if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
1147
      raise errors.ProgrammerError("Invalid choices element to AskUser")
1148

    
1149
  answer = choices[-1][1]
1150
  new_text = []
1151
  for line in text.splitlines():
1152
    new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
1153
  text = "\n".join(new_text)
1154
  try:
1155
    f = file("/dev/tty", "a+")
1156
  except IOError:
1157
    return answer
1158
  try:
1159
    chars = [entry[0] for entry in choices]
1160
    chars[-1] = "[%s]" % chars[-1]
1161
    chars.append('?')
1162
    maps = dict([(entry[0], entry[1]) for entry in choices])
1163
    while True:
1164
      f.write(text)
1165
      f.write('\n')
1166
      f.write("/".join(chars))
1167
      f.write(": ")
1168
      line = f.readline(2).strip().lower()
1169
      if line in maps:
1170
        answer = maps[line]
1171
        break
1172
      elif line == '?':
1173
        for entry in choices:
1174
          f.write(" %s - %s\n" % (entry[0], entry[2]))
1175
        f.write("\n")
1176
        continue
1177
  finally:
1178
    f.close()
1179
  return answer
1180

    
1181

    
1182
class JobSubmittedException(Exception):
1183
  """Job was submitted, client should exit.
1184

1185
  This exception has one argument, the ID of the job that was
1186
  submitted. The handler should print this ID.
1187

1188
  This is not an error, just a structured way to exit from clients.
1189

1190
  """
1191

    
1192

    
1193
def SendJob(ops, cl=None):
1194
  """Function to submit an opcode without waiting for the results.
1195

1196
  @type ops: list
1197
  @param ops: list of opcodes
1198
  @type cl: luxi.Client
1199
  @param cl: the luxi client to use for communicating with the master;
1200
             if None, a new client will be created
1201

1202
  """
1203
  if cl is None:
1204
    cl = GetClient()
1205

    
1206
  job_id = cl.SubmitJob(ops)
1207

    
1208
  return job_id
1209

    
1210

    
1211
def PollJob(job_id, cl=None, feedback_fn=None):
1212
  """Function to poll for the result of a job.
1213

1214
  @type job_id: job identified
1215
  @param job_id: the job to poll for results
1216
  @type cl: luxi.Client
1217
  @param cl: the luxi client to use for communicating with the master;
1218
             if None, a new client will be created
1219

1220
  """
1221
  if cl is None:
1222
    cl = GetClient()
1223

    
1224
  prev_job_info = None
1225
  prev_logmsg_serial = None
1226

    
1227
  status = None
1228

    
1229
  notified_queued = False
1230
  notified_waitlock = False
1231

    
1232
  while True:
1233
    result = cl.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1234
                                     prev_logmsg_serial)
1235
    if not result:
1236
      # job not found, go away!
1237
      raise errors.JobLost("Job with id %s lost" % job_id)
1238
    elif result == constants.JOB_NOTCHANGED:
1239
      if status is not None and not callable(feedback_fn):
1240
        if status == constants.JOB_STATUS_QUEUED and not notified_queued:
1241
          ToStderr("Job %s is waiting in queue", job_id)
1242
          notified_queued = True
1243
        elif status == constants.JOB_STATUS_WAITLOCK and not notified_waitlock:
1244
          ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1245
          notified_waitlock = True
1246

    
1247
      # Wait again
1248
      continue
1249

    
1250
    # Split result, a tuple of (field values, log entries)
1251
    (job_info, log_entries) = result
1252
    (status, ) = job_info
1253

    
1254
    if log_entries:
1255
      for log_entry in log_entries:
1256
        (serial, timestamp, _, message) = log_entry
1257
        if callable(feedback_fn):
1258
          feedback_fn(log_entry[1:])
1259
        else:
1260
          encoded = utils.SafeEncode(message)
1261
          ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded)
1262
        prev_logmsg_serial = max(prev_logmsg_serial, serial)
1263

    
1264
    # TODO: Handle canceled and archived jobs
1265
    elif status in (constants.JOB_STATUS_SUCCESS,
1266
                    constants.JOB_STATUS_ERROR,
1267
                    constants.JOB_STATUS_CANCELING,
1268
                    constants.JOB_STATUS_CANCELED):
1269
      break
1270

    
1271
    prev_job_info = job_info
1272

    
1273
  jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1274
  if not jobs:
1275
    raise errors.JobLost("Job with id %s lost" % job_id)
1276

    
1277
  status, opstatus, result = jobs[0]
1278
  if status == constants.JOB_STATUS_SUCCESS:
1279
    return result
1280
  elif status in (constants.JOB_STATUS_CANCELING,
1281
                  constants.JOB_STATUS_CANCELED):
1282
    raise errors.OpExecError("Job was canceled")
1283
  else:
1284
    has_ok = False
1285
    for idx, (status, msg) in enumerate(zip(opstatus, result)):
1286
      if status == constants.OP_STATUS_SUCCESS:
1287
        has_ok = True
1288
      elif status == constants.OP_STATUS_ERROR:
1289
        errors.MaybeRaise(msg)
1290
        if has_ok:
1291
          raise errors.OpExecError("partial failure (opcode %d): %s" %
1292
                                   (idx, msg))
1293
        else:
1294
          raise errors.OpExecError(str(msg))
1295
    # default failure mode
1296
    raise errors.OpExecError(result)
1297

    
1298

    
1299
def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):
1300
  """Legacy function to submit an opcode.
1301

1302
  This is just a simple wrapper over the construction of the processor
1303
  instance. It should be extended to better handle feedback and
1304
  interaction functions.
1305

1306
  """
1307
  if cl is None:
1308
    cl = GetClient()
1309

    
1310
  SetGenericOpcodeOpts([op], opts)
1311

    
1312
  job_id = SendJob([op], cl)
1313

    
1314
  op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
1315

    
1316
  return op_results[0]
1317

    
1318

    
1319
def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1320
  """Wrapper around SubmitOpCode or SendJob.
1321

1322
  This function will decide, based on the 'opts' parameter, whether to
1323
  submit and wait for the result of the opcode (and return it), or
1324
  whether to just send the job and print its identifier. It is used in
1325
  order to simplify the implementation of the '--submit' option.
1326

1327
  It will also process the opcodes if we're sending the via SendJob
1328
  (otherwise SubmitOpCode does it).
1329

1330
  """
1331
  if opts and opts.submit_only:
1332
    job = [op]
1333
    SetGenericOpcodeOpts(job, opts)
1334
    job_id = SendJob(job, cl=cl)
1335
    raise JobSubmittedException(job_id)
1336
  else:
1337
    return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1338

    
1339

    
1340
def SetGenericOpcodeOpts(opcode_list, options):
1341
  """Processor for generic options.
1342

1343
  This function updates the given opcodes based on generic command
1344
  line options (like debug, dry-run, etc.).
1345

1346
  @param opcode_list: list of opcodes
1347
  @param options: command line options or None
1348
  @return: None (in-place modification)
1349

1350
  """
1351
  if not options:
1352
    return
1353
  for op in opcode_list:
1354
    op.dry_run = options.dry_run
1355
    op.debug_level = options.debug
1356

    
1357

    
1358
def GetClient():
1359
  # TODO: Cache object?
1360
  try:
1361
    client = luxi.Client()
1362
  except luxi.NoMasterError:
1363
    ss = ssconf.SimpleStore()
1364

    
1365
    # Try to read ssconf file
1366
    try:
1367
      ss.GetMasterNode()
1368
    except errors.ConfigurationError:
1369
      raise errors.OpPrereqError("Cluster not initialized or this machine is"
1370
                                 " not part of a cluster")
1371

    
1372
    master, myself = ssconf.GetMasterAndMyself(ss=ss)
1373
    if master != myself:
1374
      raise errors.OpPrereqError("This is not the master node, please connect"
1375
                                 " to node '%s' and rerun the command" %
1376
                                 master)
1377
    raise
1378
  return client
1379

    
1380

    
1381
def FormatError(err):
1382
  """Return a formatted error message for a given error.
1383

1384
  This function takes an exception instance and returns a tuple
1385
  consisting of two values: first, the recommended exit code, and
1386
  second, a string describing the error message (not
1387
  newline-terminated).
1388

1389
  """
1390
  retcode = 1
1391
  obuf = StringIO()
1392
  msg = str(err)
1393
  if isinstance(err, errors.ConfigurationError):
1394
    txt = "Corrupt configuration file: %s" % msg
1395
    logging.error(txt)
1396
    obuf.write(txt + "\n")
1397
    obuf.write("Aborting.")
1398
    retcode = 2
1399
  elif isinstance(err, errors.HooksAbort):
1400
    obuf.write("Failure: hooks execution failed:\n")
1401
    for node, script, out in err.args[0]:
1402
      if out:
1403
        obuf.write("  node: %s, script: %s, output: %s\n" %
1404
                   (node, script, out))
1405
      else:
1406
        obuf.write("  node: %s, script: %s (no output)\n" %
1407
                   (node, script))
1408
  elif isinstance(err, errors.HooksFailure):
1409
    obuf.write("Failure: hooks general failure: %s" % msg)
1410
  elif isinstance(err, errors.ResolverError):
1411
    this_host = utils.HostInfo.SysName()
1412
    if err.args[0] == this_host:
1413
      msg = "Failure: can't resolve my own hostname ('%s')"
1414
    else:
1415
      msg = "Failure: can't resolve hostname '%s'"
1416
    obuf.write(msg % err.args[0])
1417
  elif isinstance(err, errors.OpPrereqError):
1418
    if len(err.args) == 2:
1419
      obuf.write("Failure: prerequisites not met for this"
1420
               " operation:\nerror type: %s, error details:\n%s" %
1421
                 (err.args[1], err.args[0]))
1422
    else:
1423
      obuf.write("Failure: prerequisites not met for this"
1424
                 " operation:\n%s" % msg)
1425
  elif isinstance(err, errors.OpExecError):
1426
    obuf.write("Failure: command execution error:\n%s" % msg)
1427
  elif isinstance(err, errors.TagError):
1428
    obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1429
  elif isinstance(err, errors.JobQueueDrainError):
1430
    obuf.write("Failure: the job queue is marked for drain and doesn't"
1431
               " accept new requests\n")
1432
  elif isinstance(err, errors.JobQueueFull):
1433
    obuf.write("Failure: the job queue is full and doesn't accept new"
1434
               " job submissions until old jobs are archived\n")
1435
  elif isinstance(err, errors.TypeEnforcementError):
1436
    obuf.write("Parameter Error: %s" % msg)
1437
  elif isinstance(err, errors.ParameterError):
1438
    obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1439
  elif isinstance(err, errors.GenericError):
1440
    obuf.write("Unhandled Ganeti error: %s" % msg)
1441
  elif isinstance(err, luxi.NoMasterError):
1442
    obuf.write("Cannot communicate with the master daemon.\nIs it running"
1443
               " and listening for connections?")
1444
  elif isinstance(err, luxi.TimeoutError):
1445
    obuf.write("Timeout while talking to the master daemon. Error:\n"
1446
               "%s" % msg)
1447
  elif isinstance(err, luxi.ProtocolError):
1448
    obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1449
               "%s" % msg)
1450
  elif isinstance(err, JobSubmittedException):
1451
    obuf.write("JobID: %s\n" % err.args[0])
1452
    retcode = 0
1453
  else:
1454
    obuf.write("Unhandled exception: %s" % msg)
1455
  return retcode, obuf.getvalue().rstrip('\n')
1456

    
1457

    
1458
def GenericMain(commands, override=None, aliases=None):
1459
  """Generic main function for all the gnt-* commands.
1460

1461
  Arguments:
1462
    - commands: a dictionary with a special structure, see the design doc
1463
                for command line handling.
1464
    - override: if not None, we expect a dictionary with keys that will
1465
                override command line options; this can be used to pass
1466
                options from the scripts to generic functions
1467
    - aliases: dictionary with command aliases {'alias': 'target, ...}
1468

1469
  """
1470
  # save the program name and the entire command line for later logging
1471
  if sys.argv:
1472
    binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1473
    if len(sys.argv) >= 2:
1474
      binary += " " + sys.argv[1]
1475
      old_cmdline = " ".join(sys.argv[2:])
1476
    else:
1477
      old_cmdline = ""
1478
  else:
1479
    binary = "<unknown program>"
1480
    old_cmdline = ""
1481

    
1482
  if aliases is None:
1483
    aliases = {}
1484

    
1485
  try:
1486
    func, options, args = _ParseArgs(sys.argv, commands, aliases)
1487
  except errors.ParameterError, err:
1488
    result, err_msg = FormatError(err)
1489
    ToStderr(err_msg)
1490
    return 1
1491

    
1492
  if func is None: # parse error
1493
    return 1
1494

    
1495
  if override is not None:
1496
    for key, val in override.iteritems():
1497
      setattr(options, key, val)
1498

    
1499
  utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1500
                     stderr_logging=True, program=binary)
1501

    
1502
  if old_cmdline:
1503
    logging.info("run with arguments '%s'", old_cmdline)
1504
  else:
1505
    logging.info("run with no arguments")
1506

    
1507
  try:
1508
    result = func(options, args)
1509
  except (errors.GenericError, luxi.ProtocolError,
1510
          JobSubmittedException), err:
1511
    result, err_msg = FormatError(err)
1512
    logging.exception("Error during command processing")
1513
    ToStderr(err_msg)
1514

    
1515
  return result
1516

    
1517

    
1518
def GenericInstanceCreate(mode, opts, args):
1519
  """Add an instance to the cluster via either creation or import.
1520

1521
  @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1522
  @param opts: the command line options selected by the user
1523
  @type args: list
1524
  @param args: should contain only one element, the new instance name
1525
  @rtype: int
1526
  @return: the desired exit code
1527

1528
  """
1529
  instance = args[0]
1530

    
1531
  (pnode, snode) = SplitNodeOption(opts.node)
1532

    
1533
  hypervisor = None
1534
  hvparams = {}
1535
  if opts.hypervisor:
1536
    hypervisor, hvparams = opts.hypervisor
1537

    
1538
  if opts.nics:
1539
    try:
1540
      nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
1541
    except ValueError, err:
1542
      raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
1543
    nics = [{}] * nic_max
1544
    for nidx, ndict in opts.nics:
1545
      nidx = int(nidx)
1546
      if not isinstance(ndict, dict):
1547
        msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict)
1548
        raise errors.OpPrereqError(msg)
1549
      nics[nidx] = ndict
1550
  elif opts.no_nics:
1551
    # no nics
1552
    nics = []
1553
  elif mode == constants.INSTANCE_CREATE:
1554
    # default of one nic, all auto
1555
    nics = [{}]
1556
  else:
1557
    # mode == import
1558
    nics = []
1559

    
1560
  if opts.disk_template == constants.DT_DISKLESS:
1561
    if opts.disks or opts.sd_size is not None:
1562
      raise errors.OpPrereqError("Diskless instance but disk"
1563
                                 " information passed")
1564
    disks = []
1565
  else:
1566
    if (not opts.disks and not opts.sd_size
1567
        and mode == constants.INSTANCE_CREATE):
1568
      raise errors.OpPrereqError("No disk information specified")
1569
    if opts.disks and opts.sd_size is not None:
1570
      raise errors.OpPrereqError("Please use either the '--disk' or"
1571
                                 " '-s' option")
1572
    if opts.sd_size is not None:
1573
      opts.disks = [(0, {"size": opts.sd_size})]
1574

    
1575
    if opts.disks:
1576
      try:
1577
        disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1578
      except ValueError, err:
1579
        raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
1580
      disks = [{}] * disk_max
1581
    else:
1582
      disks = []
1583
    for didx, ddict in opts.disks:
1584
      didx = int(didx)
1585
      if not isinstance(ddict, dict):
1586
        msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1587
        raise errors.OpPrereqError(msg)
1588
      elif "size" in ddict:
1589
        if "adopt" in ddict:
1590
          raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1591
                                     " (disk %d)" % didx)
1592
        try:
1593
          ddict["size"] = utils.ParseUnit(ddict["size"])
1594
        except ValueError, err:
1595
          raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1596
                                     (didx, err))
1597
      elif "adopt" in ddict:
1598
        if mode == constants.INSTANCE_IMPORT:
1599
          raise errors.OpPrereqError("Disk adoption not allowed for instance"
1600
                                     " import")
1601
        ddict["size"] = 0
1602
      else:
1603
        raise errors.OpPrereqError("Missing size or adoption source for"
1604
                                   " disk %d" % didx)
1605
      disks[didx] = ddict
1606

    
1607
  utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
1608
  utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1609

    
1610
  if mode == constants.INSTANCE_CREATE:
1611
    start = opts.start
1612
    os_type = opts.os
1613
    src_node = None
1614
    src_path = None
1615
    no_install = opts.no_install
1616
    identify_defaults = False
1617
  elif mode == constants.INSTANCE_IMPORT:
1618
    start = False
1619
    os_type = None
1620
    src_node = opts.src_node
1621
    src_path = opts.src_dir
1622
    no_install = None
1623
    identify_defaults = opts.identify_defaults
1624
  else:
1625
    raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1626

    
1627
  op = opcodes.OpCreateInstance(instance_name=instance,
1628
                                disks=disks,
1629
                                disk_template=opts.disk_template,
1630
                                nics=nics,
1631
                                pnode=pnode, snode=snode,
1632
                                ip_check=opts.ip_check,
1633
                                name_check=opts.name_check,
1634
                                wait_for_sync=opts.wait_for_sync,
1635
                                file_storage_dir=opts.file_storage_dir,
1636
                                file_driver=opts.file_driver,
1637
                                iallocator=opts.iallocator,
1638
                                hypervisor=hypervisor,
1639
                                hvparams=hvparams,
1640
                                beparams=opts.beparams,
1641
                                mode=mode,
1642
                                start=start,
1643
                                os_type=os_type,
1644
                                src_node=src_node,
1645
                                src_path=src_path,
1646
                                no_install=no_install,
1647
                                identify_defaults=identify_defaults)
1648

    
1649
  SubmitOrSend(op, opts)
1650
  return 0
1651

    
1652

    
1653
class _RunWhileClusterStoppedHelper:
1654
  """Helper class for L{RunWhileClusterStopped} to simplify state management
1655

1656
  """
1657
  def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
1658
    """Initializes this class.
1659

1660
    @type feedback_fn: callable
1661
    @param feedback_fn: Feedback function
1662
    @type cluster_name: string
1663
    @param cluster_name: Cluster name
1664
    @type master_node: string
1665
    @param master_node Master node name
1666
    @type online_nodes: list
1667
    @param online_nodes: List of names of online nodes
1668

1669
    """
1670
    self.feedback_fn = feedback_fn
1671
    self.cluster_name = cluster_name
1672
    self.master_node = master_node
1673
    self.online_nodes = online_nodes
1674

    
1675
    self.ssh = ssh.SshRunner(self.cluster_name)
1676

    
1677
    self.nonmaster_nodes = [name for name in online_nodes
1678
                            if name != master_node]
1679

    
1680
    assert self.master_node not in self.nonmaster_nodes
1681

    
1682
  def _RunCmd(self, node_name, cmd):
1683
    """Runs a command on the local or a remote machine.
1684

1685
    @type node_name: string
1686
    @param node_name: Machine name
1687
    @type cmd: list
1688
    @param cmd: Command
1689

1690
    """
1691
    if node_name is None or node_name == self.master_node:
1692
      # No need to use SSH
1693
      result = utils.RunCmd(cmd)
1694
    else:
1695
      result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
1696

    
1697
    if result.failed:
1698
      errmsg = ["Failed to run command %s" % result.cmd]
1699
      if node_name:
1700
        errmsg.append("on node %s" % node_name)
1701
      errmsg.append(": exitcode %s and error %s" %
1702
                    (result.exit_code, result.output))
1703
      raise errors.OpExecError(" ".join(errmsg))
1704

    
1705
  def Call(self, fn, *args):
1706
    """Call function while all daemons are stopped.
1707

1708
    @type fn: callable
1709
    @param fn: Function to be called
1710

1711
    """
1712
    # Pause watcher by acquiring an exclusive lock on watcher state file
1713
    self.feedback_fn("Blocking watcher")
1714
    watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
1715
    try:
1716
      # TODO: Currently, this just blocks. There's no timeout.
1717
      # TODO: Should it be a shared lock?
1718
      watcher_block.Exclusive(blocking=True)
1719

    
1720
      # Stop master daemons, so that no new jobs can come in and all running
1721
      # ones are finished
1722
      self.feedback_fn("Stopping master daemons")
1723
      self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
1724
      try:
1725
        # Stop daemons on all nodes
1726
        for node_name in self.online_nodes:
1727
          self.feedback_fn("Stopping daemons on %s" % node_name)
1728
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
1729

    
1730
        # All daemons are shut down now
1731
        try:
1732
          return fn(self, *args)
1733
        except Exception, err:
1734
          _, errmsg = FormatError(err)
1735
          logging.exception("Caught exception")
1736
          self.feedback_fn(errmsg)
1737
          raise
1738
      finally:
1739
        # Start cluster again, master node last
1740
        for node_name in self.nonmaster_nodes + [self.master_node]:
1741
          self.feedback_fn("Starting daemons on %s" % node_name)
1742
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
1743
    finally:
1744
      # Resume watcher
1745
      watcher_block.Close()
1746

    
1747

    
1748
def RunWhileClusterStopped(feedback_fn, fn, *args):
1749
  """Calls a function while all cluster daemons are stopped.
1750

1751
  @type feedback_fn: callable
1752
  @param feedback_fn: Feedback function
1753
  @type fn: callable
1754
  @param fn: Function to be called when daemons are stopped
1755

1756
  """
1757
  feedback_fn("Gathering cluster information")
1758

    
1759
  # This ensures we're running on the master daemon
1760
  cl = GetClient()
1761

    
1762
  (cluster_name, master_node) = \
1763
    cl.QueryConfigValues(["cluster_name", "master_node"])
1764

    
1765
  online_nodes = GetOnlineNodes([], cl=cl)
1766

    
1767
  # Don't keep a reference to the client. The master daemon will go away.
1768
  del cl
1769

    
1770
  assert master_node in online_nodes
1771

    
1772
  return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
1773
                                       online_nodes).Call(fn, *args)
1774

    
1775

    
1776
def GenerateTable(headers, fields, separator, data,
1777
                  numfields=None, unitfields=None,
1778
                  units=None):
1779
  """Prints a table with headers and different fields.
1780

1781
  @type headers: dict
1782
  @param headers: dictionary mapping field names to headers for
1783
      the table
1784
  @type fields: list
1785
  @param fields: the field names corresponding to each row in
1786
      the data field
1787
  @param separator: the separator to be used; if this is None,
1788
      the default 'smart' algorithm is used which computes optimal
1789
      field width, otherwise just the separator is used between
1790
      each field
1791
  @type data: list
1792
  @param data: a list of lists, each sublist being one row to be output
1793
  @type numfields: list
1794
  @param numfields: a list with the fields that hold numeric
1795
      values and thus should be right-aligned
1796
  @type unitfields: list
1797
  @param unitfields: a list with the fields that hold numeric
1798
      values that should be formatted with the units field
1799
  @type units: string or None
1800
  @param units: the units we should use for formatting, or None for
1801
      automatic choice (human-readable for non-separator usage, otherwise
1802
      megabytes); this is a one-letter string
1803

1804
  """
1805
  if units is None:
1806
    if separator:
1807
      units = "m"
1808
    else:
1809
      units = "h"
1810

    
1811
  if numfields is None:
1812
    numfields = []
1813
  if unitfields is None:
1814
    unitfields = []
1815

    
1816
  numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
1817
  unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
1818

    
1819
  format_fields = []
1820
  for field in fields:
1821
    if headers and field not in headers:
1822
      # TODO: handle better unknown fields (either revert to old
1823
      # style of raising exception, or deal more intelligently with
1824
      # variable fields)
1825
      headers[field] = field
1826
    if separator is not None:
1827
      format_fields.append("%s")
1828
    elif numfields.Matches(field):
1829
      format_fields.append("%*s")
1830
    else:
1831
      format_fields.append("%-*s")
1832

    
1833
  if separator is None:
1834
    mlens = [0 for name in fields]
1835
    format = ' '.join(format_fields)
1836
  else:
1837
    format = separator.replace("%", "%%").join(format_fields)
1838

    
1839
  for row in data:
1840
    if row is None:
1841
      continue
1842
    for idx, val in enumerate(row):
1843
      if unitfields.Matches(fields[idx]):
1844
        try:
1845
          val = int(val)
1846
        except (TypeError, ValueError):
1847
          pass
1848
        else:
1849
          val = row[idx] = utils.FormatUnit(val, units)
1850
      val = row[idx] = str(val)
1851
      if separator is None:
1852
        mlens[idx] = max(mlens[idx], len(val))
1853

    
1854
  result = []
1855
  if headers:
1856
    args = []
1857
    for idx, name in enumerate(fields):
1858
      hdr = headers[name]
1859
      if separator is None:
1860
        mlens[idx] = max(mlens[idx], len(hdr))
1861
        args.append(mlens[idx])
1862
      args.append(hdr)
1863
    result.append(format % tuple(args))
1864

    
1865
  if separator is None:
1866
    assert len(mlens) == len(fields)
1867

    
1868
    if fields and not numfields.Matches(fields[-1]):
1869
      mlens[-1] = 0
1870

    
1871
  for line in data:
1872
    args = []
1873
    if line is None:
1874
      line = ['-' for _ in fields]
1875
    for idx in range(len(fields)):
1876
      if separator is None:
1877
        args.append(mlens[idx])
1878
      args.append(line[idx])
1879
    result.append(format % tuple(args))
1880

    
1881
  return result
1882

    
1883

    
1884
def FormatTimestamp(ts):
1885
  """Formats a given timestamp.
1886

1887
  @type ts: timestamp
1888
  @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
1889

1890
  @rtype: string
1891
  @return: a string with the formatted timestamp
1892

1893
  """
1894
  if not isinstance (ts, (tuple, list)) or len(ts) != 2:
1895
    return '?'
1896
  sec, usec = ts
1897
  return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
1898

    
1899

    
1900
def ParseTimespec(value):
1901
  """Parse a time specification.
1902

1903
  The following suffixed will be recognized:
1904

1905
    - s: seconds
1906
    - m: minutes
1907
    - h: hours
1908
    - d: day
1909
    - w: weeks
1910

1911
  Without any suffix, the value will be taken to be in seconds.
1912

1913
  """
1914
  value = str(value)
1915
  if not value:
1916
    raise errors.OpPrereqError("Empty time specification passed")
1917
  suffix_map = {
1918
    's': 1,
1919
    'm': 60,
1920
    'h': 3600,
1921
    'd': 86400,
1922
    'w': 604800,
1923
    }
1924
  if value[-1] not in suffix_map:
1925
    try:
1926
      value = int(value)
1927
    except (TypeError, ValueError):
1928
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1929
  else:
1930
    multiplier = suffix_map[value[-1]]
1931
    value = value[:-1]
1932
    if not value: # no data left after stripping the suffix
1933
      raise errors.OpPrereqError("Invalid time specification (only"
1934
                                 " suffix passed)")
1935
    try:
1936
      value = int(value) * multiplier
1937
    except (TypeError, ValueError):
1938
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1939
  return value
1940

    
1941

    
1942
def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
1943
                   filter_master=False):
1944
  """Returns the names of online nodes.
1945

1946
  This function will also log a warning on stderr with the names of
1947
  the online nodes.
1948

1949
  @param nodes: if not empty, use only this subset of nodes (minus the
1950
      offline ones)
1951
  @param cl: if not None, luxi client to use
1952
  @type nowarn: boolean
1953
  @param nowarn: by default, this function will output a note with the
1954
      offline nodes that are skipped; if this parameter is True the
1955
      note is not displayed
1956
  @type secondary_ips: boolean
1957
  @param secondary_ips: if True, return the secondary IPs instead of the
1958
      names, useful for doing network traffic over the replication interface
1959
      (if any)
1960
  @type filter_master: boolean
1961
  @param filter_master: if True, do not return the master node in the list
1962
      (useful in coordination with secondary_ips where we cannot check our
1963
      node name against the list)
1964

1965
  """
1966
  if cl is None:
1967
    cl = GetClient()
1968

    
1969
  if secondary_ips:
1970
    name_idx = 2
1971
  else:
1972
    name_idx = 0
1973

    
1974
  if filter_master:
1975
    master_node = cl.QueryConfigValues(["master_node"])[0]
1976
    filter_fn = lambda x: x != master_node
1977
  else:
1978
    filter_fn = lambda _: True
1979

    
1980
  result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"],
1981
                         use_locking=False)
1982
  offline = [row[0] for row in result if row[1]]
1983
  if offline and not nowarn:
1984
    ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
1985
  return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])]
1986

    
1987

    
1988
def _ToStream(stream, txt, *args):
1989
  """Write a message to a stream, bypassing the logging system
1990

1991
  @type stream: file object
1992
  @param stream: the file to which we should write
1993
  @type txt: str
1994
  @param txt: the message
1995

1996
  """
1997
  if args:
1998
    args = tuple(args)
1999
    stream.write(txt % args)
2000
  else:
2001
    stream.write(txt)
2002
  stream.write('\n')
2003
  stream.flush()
2004

    
2005

    
2006
def ToStdout(txt, *args):
2007
  """Write a message to stdout only, bypassing the logging system
2008

2009
  This is just a wrapper over _ToStream.
2010

2011
  @type txt: str
2012
  @param txt: the message
2013

2014
  """
2015
  _ToStream(sys.stdout, txt, *args)
2016

    
2017

    
2018
def ToStderr(txt, *args):
2019
  """Write a message to stderr only, bypassing the logging system
2020

2021
  This is just a wrapper over _ToStream.
2022

2023
  @type txt: str
2024
  @param txt: the message
2025

2026
  """
2027
  _ToStream(sys.stderr, txt, *args)
2028

    
2029

    
2030
class JobExecutor(object):
2031
  """Class which manages the submission and execution of multiple jobs.
2032

2033
  Note that instances of this class should not be reused between
2034
  GetResults() calls.
2035

2036
  """
2037
  def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
2038
    self.queue = []
2039
    if cl is None:
2040
      cl = GetClient()
2041
    self.cl = cl
2042
    self.verbose = verbose
2043
    self.jobs = []
2044
    self.opts = opts
2045
    self.feedback_fn = feedback_fn
2046

    
2047
  def QueueJob(self, name, *ops):
2048
    """Record a job for later submit.
2049

2050
    @type name: string
2051
    @param name: a description of the job, will be used in WaitJobSet
2052
    """
2053
    SetGenericOpcodeOpts(ops, self.opts)
2054
    self.queue.append((name, ops))
2055

    
2056
  def SubmitPending(self):
2057
    """Submit all pending jobs.
2058

2059
    """
2060
    results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
2061
    for (idx, ((status, data), (name, _))) in enumerate(zip(results,
2062
                                                            self.queue)):
2063
      self.jobs.append((idx, status, data, name))
2064

    
2065
  def _ChooseJob(self):
2066
    """Choose a non-waiting/queued job to poll next.
2067

2068
    """
2069
    assert self.jobs, "_ChooseJob called with empty job list"
2070

    
2071
    result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
2072
    assert result
2073

    
2074
    for job_data, status in zip(self.jobs, result):
2075
      if status[0] in (constants.JOB_STATUS_QUEUED,
2076
                    constants.JOB_STATUS_WAITLOCK,
2077
                    constants.JOB_STATUS_CANCELING):
2078
        # job is still waiting
2079
        continue
2080
      # good candidate found
2081
      self.jobs.remove(job_data)
2082
      return job_data
2083

    
2084
    # no job found
2085
    return self.jobs.pop(0)
2086

    
2087
  def GetResults(self):
2088
    """Wait for and return the results of all jobs.
2089

2090
    @rtype: list
2091
    @return: list of tuples (success, job results), in the same order
2092
        as the submitted jobs; if a job has failed, instead of the result
2093
        there will be the error message
2094

2095
    """
2096
    if not self.jobs:
2097
      self.SubmitPending()
2098
    results = []
2099
    if self.verbose:
2100
      ok_jobs = [row[2] for row in self.jobs if row[1]]
2101
      if ok_jobs:
2102
        ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2103

    
2104
    # first, remove any non-submitted jobs
2105
    self.jobs, failures = utils.partition(self.jobs, lambda x: x[1])
2106
    for idx, _, jid, name in failures:
2107
      ToStderr("Failed to submit job for %s: %s", name, jid)
2108
      results.append((idx, False, jid))
2109

    
2110
    while self.jobs:
2111
      (idx, _, jid, name) = self._ChooseJob()
2112
      ToStdout("Waiting for job %s for %s...", jid, name)
2113
      try:
2114
        job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2115
        success = True
2116
      except (errors.GenericError, luxi.ProtocolError), err:
2117
        _, job_result = FormatError(err)
2118
        success = False
2119
        # the error message will always be shown, verbose or not
2120
        ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
2121

    
2122
      results.append((idx, success, job_result))
2123

    
2124
    # sort based on the index, then drop it
2125
    results.sort()
2126
    results = [i[1:] for i in results]
2127

    
2128
    return results
2129

    
2130
  def WaitOrShow(self, wait):
2131
    """Wait for job results or only print the job IDs.
2132

2133
    @type wait: boolean
2134
    @param wait: whether to wait or not
2135

2136
    """
2137
    if wait:
2138
      return self.GetResults()
2139
    else:
2140
      if not self.jobs:
2141
        self.SubmitPending()
2142
      for _, status, result, name in self.jobs:
2143
        if status:
2144
          ToStdout("%s: %s", result, name)
2145
        else:
2146
          ToStderr("Failure for %s: %s", name, result)