Statistics
| Branch: | Tag: | Revision:

root / lib / cli.py @ f942a838

History | View | Annotate | Download (69.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module dealing with command line parsing"""
23

    
24

    
25
import sys
26
import textwrap
27
import os.path
28
import time
29
import logging
30
from cStringIO import StringIO
31

    
32
from ganeti import utils
33
from ganeti import errors
34
from ganeti import constants
35
from ganeti import opcodes
36
from ganeti import luxi
37
from ganeti import ssconf
38
from ganeti import rpc
39
from ganeti import ssh
40

    
41
from optparse import (OptionParser, TitledHelpFormatter,
42
                      Option, OptionValueError)
43

    
44

    
45
__all__ = [
46
  # Command line options
47
  "ALLOCATABLE_OPT",
48
  "ALL_OPT",
49
  "AUTO_PROMOTE_OPT",
50
  "AUTO_REPLACE_OPT",
51
  "BACKEND_OPT",
52
  "CLEANUP_OPT",
53
  "CLUSTER_DOMAIN_SECRET_OPT",
54
  "CONFIRM_OPT",
55
  "CP_SIZE_OPT",
56
  "DEBUG_OPT",
57
  "DEBUG_SIMERR_OPT",
58
  "DISKIDX_OPT",
59
  "DISK_OPT",
60
  "DISK_TEMPLATE_OPT",
61
  "DRAINED_OPT",
62
  "EARLY_RELEASE_OPT",
63
  "ENABLED_HV_OPT",
64
  "ERROR_CODES_OPT",
65
  "FIELDS_OPT",
66
  "FILESTORE_DIR_OPT",
67
  "FILESTORE_DRIVER_OPT",
68
  "FORCE_OPT",
69
  "FORCE_VARIANT_OPT",
70
  "GLOBAL_FILEDIR_OPT",
71
  "HVLIST_OPT",
72
  "HVOPTS_OPT",
73
  "HYPERVISOR_OPT",
74
  "IALLOCATOR_OPT",
75
  "IGNORE_CONSIST_OPT",
76
  "IGNORE_FAILURES_OPT",
77
  "IGNORE_REMOVE_FAILURES_OPT",
78
  "IGNORE_SECONDARIES_OPT",
79
  "IGNORE_SIZE_OPT",
80
  "MAC_PREFIX_OPT",
81
  "MASTER_NETDEV_OPT",
82
  "MC_OPT",
83
  "NET_OPT",
84
  "NEW_CLUSTER_CERT_OPT",
85
  "NEW_CLUSTER_DOMAIN_SECRET_OPT",
86
  "NEW_CONFD_HMAC_KEY_OPT",
87
  "NEW_RAPI_CERT_OPT",
88
  "NEW_SECONDARY_OPT",
89
  "NIC_PARAMS_OPT",
90
  "NODE_LIST_OPT",
91
  "NODE_PLACEMENT_OPT",
92
  "NOHDR_OPT",
93
  "NOIPCHECK_OPT",
94
  "NO_INSTALL_OPT",
95
  "NONAMECHECK_OPT",
96
  "NOLVM_STORAGE_OPT",
97
  "NOMODIFY_ETCHOSTS_OPT",
98
  "NOMODIFY_SSH_SETUP_OPT",
99
  "NONICS_OPT",
100
  "NONLIVE_OPT",
101
  "NONPLUS1_OPT",
102
  "NOSHUTDOWN_OPT",
103
  "NOSTART_OPT",
104
  "NOSSH_KEYCHECK_OPT",
105
  "NOVOTING_OPT",
106
  "NWSYNC_OPT",
107
  "ON_PRIMARY_OPT",
108
  "ON_SECONDARY_OPT",
109
  "OFFLINE_OPT",
110
  "OS_OPT",
111
  "OS_SIZE_OPT",
112
  "RAPI_CERT_OPT",
113
  "READD_OPT",
114
  "REBOOT_TYPE_OPT",
115
  "REMOVE_INSTANCE_OPT",
116
  "SECONDARY_IP_OPT",
117
  "SELECT_OS_OPT",
118
  "SEP_OPT",
119
  "SHOWCMD_OPT",
120
  "SHUTDOWN_TIMEOUT_OPT",
121
  "SINGLE_NODE_OPT",
122
  "SRC_DIR_OPT",
123
  "SRC_NODE_OPT",
124
  "SUBMIT_OPT",
125
  "STATIC_OPT",
126
  "SYNC_OPT",
127
  "TAG_SRC_OPT",
128
  "TIMEOUT_OPT",
129
  "USEUNITS_OPT",
130
  "USE_REPL_NET_OPT",
131
  "VERBOSE_OPT",
132
  "VG_NAME_OPT",
133
  "YES_DOIT_OPT",
134
  # Generic functions for CLI programs
135
  "GenericMain",
136
  "GenericInstanceCreate",
137
  "GetClient",
138
  "GetOnlineNodes",
139
  "JobExecutor",
140
  "JobSubmittedException",
141
  "ParseTimespec",
142
  "RunWhileClusterStopped",
143
  "SubmitOpCode",
144
  "SubmitOrSend",
145
  "UsesRPC",
146
  # Formatting functions
147
  "ToStderr", "ToStdout",
148
  "FormatError",
149
  "GenerateTable",
150
  "AskUser",
151
  "FormatTimestamp",
152
  # Tags functions
153
  "ListTags",
154
  "AddTags",
155
  "RemoveTags",
156
  # command line options support infrastructure
157
  "ARGS_MANY_INSTANCES",
158
  "ARGS_MANY_NODES",
159
  "ARGS_NONE",
160
  "ARGS_ONE_INSTANCE",
161
  "ARGS_ONE_NODE",
162
  "ARGS_ONE_OS",
163
  "ArgChoice",
164
  "ArgCommand",
165
  "ArgFile",
166
  "ArgHost",
167
  "ArgInstance",
168
  "ArgJobId",
169
  "ArgNode",
170
  "ArgOs",
171
  "ArgSuggest",
172
  "ArgUnknown",
173
  "OPT_COMPL_INST_ADD_NODES",
174
  "OPT_COMPL_MANY_NODES",
175
  "OPT_COMPL_ONE_IALLOCATOR",
176
  "OPT_COMPL_ONE_INSTANCE",
177
  "OPT_COMPL_ONE_NODE",
178
  "OPT_COMPL_ONE_OS",
179
  "cli_option",
180
  "SplitNodeOption",
181
  "CalculateOSNames",
182
  ]
183

    
184
NO_PREFIX = "no_"
185
UN_PREFIX = "-"
186

    
187

    
188
class _Argument:
189
  def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
190
    self.min = min
191
    self.max = max
192

    
193
  def __repr__(self):
194
    return ("<%s min=%s max=%s>" %
195
            (self.__class__.__name__, self.min, self.max))
196

    
197

    
198
class ArgSuggest(_Argument):
199
  """Suggesting argument.
200

201
  Value can be any of the ones passed to the constructor.
202

203
  """
204
  # pylint: disable-msg=W0622
205
  def __init__(self, min=0, max=None, choices=None):
206
    _Argument.__init__(self, min=min, max=max)
207
    self.choices = choices
208

    
209
  def __repr__(self):
210
    return ("<%s min=%s max=%s choices=%r>" %
211
            (self.__class__.__name__, self.min, self.max, self.choices))
212

    
213

    
214
class ArgChoice(ArgSuggest):
215
  """Choice argument.
216

217
  Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
218
  but value must be one of the choices.
219

220
  """
221

    
222

    
223
class ArgUnknown(_Argument):
224
  """Unknown argument to program (e.g. determined at runtime).
225

226
  """
227

    
228

    
229
class ArgInstance(_Argument):
230
  """Instances argument.
231

232
  """
233

    
234

    
235
class ArgNode(_Argument):
236
  """Node argument.
237

238
  """
239

    
240
class ArgJobId(_Argument):
241
  """Job ID argument.
242

243
  """
244

    
245

    
246
class ArgFile(_Argument):
247
  """File path argument.
248

249
  """
250

    
251

    
252
class ArgCommand(_Argument):
253
  """Command argument.
254

255
  """
256

    
257

    
258
class ArgHost(_Argument):
259
  """Host argument.
260

261
  """
262

    
263

    
264
class ArgOs(_Argument):
265
  """OS argument.
266

267
  """
268

    
269

    
270
ARGS_NONE = []
271
ARGS_MANY_INSTANCES = [ArgInstance()]
272
ARGS_MANY_NODES = [ArgNode()]
273
ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
274
ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
275
ARGS_ONE_OS = [ArgOs(min=1, max=1)]
276

    
277

    
278
def _ExtractTagsObject(opts, args):
279
  """Extract the tag type object.
280

281
  Note that this function will modify its args parameter.
282

283
  """
284
  if not hasattr(opts, "tag_type"):
285
    raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
286
  kind = opts.tag_type
287
  if kind == constants.TAG_CLUSTER:
288
    retval = kind, kind
289
  elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
290
    if not args:
291
      raise errors.OpPrereqError("no arguments passed to the command")
292
    name = args.pop(0)
293
    retval = kind, name
294
  else:
295
    raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
296
  return retval
297

    
298

    
299
def _ExtendTags(opts, args):
300
  """Extend the args if a source file has been given.
301

302
  This function will extend the tags with the contents of the file
303
  passed in the 'tags_source' attribute of the opts parameter. A file
304
  named '-' will be replaced by stdin.
305

306
  """
307
  fname = opts.tags_source
308
  if fname is None:
309
    return
310
  if fname == "-":
311
    new_fh = sys.stdin
312
  else:
313
    new_fh = open(fname, "r")
314
  new_data = []
315
  try:
316
    # we don't use the nice 'new_data = [line.strip() for line in fh]'
317
    # because of python bug 1633941
318
    while True:
319
      line = new_fh.readline()
320
      if not line:
321
        break
322
      new_data.append(line.strip())
323
  finally:
324
    new_fh.close()
325
  args.extend(new_data)
326

    
327

    
328
def ListTags(opts, args):
329
  """List the tags on a given object.
330

331
  This is a generic implementation that knows how to deal with all
332
  three cases of tag objects (cluster, node, instance). The opts
333
  argument is expected to contain a tag_type field denoting what
334
  object type we work on.
335

336
  """
337
  kind, name = _ExtractTagsObject(opts, args)
338
  cl = GetClient()
339
  result = cl.QueryTags(kind, name)
340
  result = list(result)
341
  result.sort()
342
  for tag in result:
343
    ToStdout(tag)
344

    
345

    
346
def AddTags(opts, args):
347
  """Add tags on a given object.
348

349
  This is a generic implementation that knows how to deal with all
350
  three cases of tag objects (cluster, node, instance). The opts
351
  argument is expected to contain a tag_type field denoting what
352
  object type we work on.
353

354
  """
355
  kind, name = _ExtractTagsObject(opts, args)
356
  _ExtendTags(opts, args)
357
  if not args:
358
    raise errors.OpPrereqError("No tags to be added")
359
  op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
360
  SubmitOpCode(op)
361

    
362

    
363
def RemoveTags(opts, args):
364
  """Remove tags from a given object.
365

366
  This is a generic implementation that knows how to deal with all
367
  three cases of tag objects (cluster, node, instance). The opts
368
  argument is expected to contain a tag_type field denoting what
369
  object type we work on.
370

371
  """
372
  kind, name = _ExtractTagsObject(opts, args)
373
  _ExtendTags(opts, args)
374
  if not args:
375
    raise errors.OpPrereqError("No tags to be removed")
376
  op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
377
  SubmitOpCode(op)
378

    
379

    
380
def check_unit(option, opt, value): # pylint: disable-msg=W0613
381
  """OptParsers custom converter for units.
382

383
  """
384
  try:
385
    return utils.ParseUnit(value)
386
  except errors.UnitParseError, err:
387
    raise OptionValueError("option %s: %s" % (opt, err))
388

    
389

    
390
def _SplitKeyVal(opt, data):
391
  """Convert a KeyVal string into a dict.
392

393
  This function will convert a key=val[,...] string into a dict. Empty
394
  values will be converted specially: keys which have the prefix 'no_'
395
  will have the value=False and the prefix stripped, the others will
396
  have value=True.
397

398
  @type opt: string
399
  @param opt: a string holding the option name for which we process the
400
      data, used in building error messages
401
  @type data: string
402
  @param data: a string of the format key=val,key=val,...
403
  @rtype: dict
404
  @return: {key=val, key=val}
405
  @raises errors.ParameterError: if there are duplicate keys
406

407
  """
408
  kv_dict = {}
409
  if data:
410
    for elem in utils.UnescapeAndSplit(data, sep=","):
411
      if "=" in elem:
412
        key, val = elem.split("=", 1)
413
      else:
414
        if elem.startswith(NO_PREFIX):
415
          key, val = elem[len(NO_PREFIX):], False
416
        elif elem.startswith(UN_PREFIX):
417
          key, val = elem[len(UN_PREFIX):], None
418
        else:
419
          key, val = elem, True
420
      if key in kv_dict:
421
        raise errors.ParameterError("Duplicate key '%s' in option %s" %
422
                                    (key, opt))
423
      kv_dict[key] = val
424
  return kv_dict
425

    
426

    
427
def check_ident_key_val(option, opt, value):  # pylint: disable-msg=W0613
428
  """Custom parser for ident:key=val,key=val options.
429

430
  This will store the parsed values as a tuple (ident, {key: val}). As such,
431
  multiple uses of this option via action=append is possible.
432

433
  """
434
  if ":" not in value:
435
    ident, rest = value, ''
436
  else:
437
    ident, rest = value.split(":", 1)
438

    
439
  if ident.startswith(NO_PREFIX):
440
    if rest:
441
      msg = "Cannot pass options when removing parameter groups: %s" % value
442
      raise errors.ParameterError(msg)
443
    retval = (ident[len(NO_PREFIX):], False)
444
  elif ident.startswith(UN_PREFIX):
445
    if rest:
446
      msg = "Cannot pass options when removing parameter groups: %s" % value
447
      raise errors.ParameterError(msg)
448
    retval = (ident[len(UN_PREFIX):], None)
449
  else:
450
    kv_dict = _SplitKeyVal(opt, rest)
451
    retval = (ident, kv_dict)
452
  return retval
453

    
454

    
455
def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
456
  """Custom parser class for key=val,key=val options.
457

458
  This will store the parsed values as a dict {key: val}.
459

460
  """
461
  return _SplitKeyVal(opt, value)
462

    
463

    
464
# completion_suggestion is normally a list. Using numeric values not evaluating
465
# to False for dynamic completion.
466
(OPT_COMPL_MANY_NODES,
467
 OPT_COMPL_ONE_NODE,
468
 OPT_COMPL_ONE_INSTANCE,
469
 OPT_COMPL_ONE_OS,
470
 OPT_COMPL_ONE_IALLOCATOR,
471
 OPT_COMPL_INST_ADD_NODES) = range(100, 106)
472

    
473
OPT_COMPL_ALL = frozenset([
474
  OPT_COMPL_MANY_NODES,
475
  OPT_COMPL_ONE_NODE,
476
  OPT_COMPL_ONE_INSTANCE,
477
  OPT_COMPL_ONE_OS,
478
  OPT_COMPL_ONE_IALLOCATOR,
479
  OPT_COMPL_INST_ADD_NODES,
480
  ])
481

    
482

    
483
class CliOption(Option):
484
  """Custom option class for optparse.
485

486
  """
487
  ATTRS = Option.ATTRS + [
488
    "completion_suggest",
489
    ]
490
  TYPES = Option.TYPES + (
491
    "identkeyval",
492
    "keyval",
493
    "unit",
494
    )
495
  TYPE_CHECKER = Option.TYPE_CHECKER.copy()
496
  TYPE_CHECKER["identkeyval"] = check_ident_key_val
497
  TYPE_CHECKER["keyval"] = check_key_val
498
  TYPE_CHECKER["unit"] = check_unit
499

    
500

    
501
# optparse.py sets make_option, so we do it for our own option class, too
502
cli_option = CliOption
503

    
504

    
505
_YESNO = ("yes", "no")
506
_YORNO = "yes|no"
507

    
508
DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
509
                       help="Increase debugging level")
510

    
511
NOHDR_OPT = cli_option("--no-headers", default=False,
512
                       action="store_true", dest="no_headers",
513
                       help="Don't display column headers")
514

    
515
SEP_OPT = cli_option("--separator", default=None,
516
                     action="store", dest="separator",
517
                     help=("Separator between output fields"
518
                           " (defaults to one space)"))
519

    
520
USEUNITS_OPT = cli_option("--units", default=None,
521
                          dest="units", choices=('h', 'm', 'g', 't'),
522
                          help="Specify units for output (one of hmgt)")
523

    
524
FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
525
                        type="string", metavar="FIELDS",
526
                        help="Comma separated list of output fields")
527

    
528
FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
529
                       default=False, help="Force the operation")
530

    
531
CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
532
                         default=False, help="Do not require confirmation")
533

    
534
TAG_SRC_OPT = cli_option("--from", dest="tags_source",
535
                         default=None, help="File with tag names")
536

    
537
SUBMIT_OPT = cli_option("--submit", dest="submit_only",
538
                        default=False, action="store_true",
539
                        help=("Submit the job and return the job ID, but"
540
                              " don't wait for the job to finish"))
541

    
542
SYNC_OPT = cli_option("--sync", dest="do_locking",
543
                      default=False, action="store_true",
544
                      help=("Grab locks while doing the queries"
545
                            " in order to ensure more consistent results"))
546

    
547
_DRY_RUN_OPT = cli_option("--dry-run", default=False,
548
                          action="store_true",
549
                          help=("Do not execute the operation, just run the"
550
                                " check steps and verify it it could be"
551
                                " executed"))
552

    
553
VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
554
                         action="store_true",
555
                         help="Increase the verbosity of the operation")
556

    
557
DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
558
                              action="store_true", dest="simulate_errors",
559
                              help="Debugging option that makes the operation"
560
                              " treat most runtime checks as failed")
561

    
562
NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
563
                        default=True, action="store_false",
564
                        help="Don't wait for sync (DANGEROUS!)")
565

    
566
DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
567
                               help="Custom disk setup (diskless, file,"
568
                               " plain or drbd)",
569
                               default=None, metavar="TEMPL",
570
                               choices=list(constants.DISK_TEMPLATES))
571

    
572
NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
573
                        help="Do not create any network cards for"
574
                        " the instance")
575

    
576
FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
577
                               help="Relative path under default cluster-wide"
578
                               " file storage dir to store file-based disks",
579
                               default=None, metavar="<DIR>")
580

    
581
FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
582
                                  help="Driver to use for image files",
583
                                  default="loop", metavar="<DRIVER>",
584
                                  choices=list(constants.FILE_DRIVER))
585

    
586
IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
587
                            help="Select nodes for the instance automatically"
588
                            " using the <NAME> iallocator plugin",
589
                            default=None, type="string",
590
                            completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
591

    
592
OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
593
                    metavar="<os>",
594
                    completion_suggest=OPT_COMPL_ONE_OS)
595

    
596
FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
597
                               action="store_true", default=False,
598
                               help="Force an unknown variant")
599

    
600
NO_INSTALL_OPT = cli_option("--no-install", dest="no_install",
601
                            action="store_true", default=False,
602
                            help="Do not install the OS (will"
603
                            " enable no-start)")
604

    
605
BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
606
                         type="keyval", default={},
607
                         help="Backend parameters")
608

    
609
HVOPTS_OPT =  cli_option("-H", "--hypervisor-parameters", type="keyval",
610
                         default={}, dest="hvparams",
611
                         help="Hypervisor parameters")
612

    
613
HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
614
                            help="Hypervisor and hypervisor options, in the"
615
                            " format hypervisor:option=value,option=value,...",
616
                            default=None, type="identkeyval")
617

    
618
HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
619
                        help="Hypervisor and hypervisor options, in the"
620
                        " format hypervisor:option=value,option=value,...",
621
                        default=[], action="append", type="identkeyval")
622

    
623
NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
624
                           action="store_false",
625
                           help="Don't check that the instance's IP"
626
                           " is alive")
627

    
628
NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
629
                             default=True, action="store_false",
630
                             help="Don't check that the instance's name"
631
                             " is resolvable")
632

    
633
NET_OPT = cli_option("--net",
634
                     help="NIC parameters", default=[],
635
                     dest="nics", action="append", type="identkeyval")
636

    
637
DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
638
                      dest="disks", action="append", type="identkeyval")
639

    
640
DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
641
                         help="Comma-separated list of disks"
642
                         " indices to act on (e.g. 0,2) (optional,"
643
                         " defaults to all disks)")
644

    
645
OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
646
                         help="Enforces a single-disk configuration using the"
647
                         " given disk size, in MiB unless a suffix is used",
648
                         default=None, type="unit", metavar="<size>")
649

    
650
IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
651
                                dest="ignore_consistency",
652
                                action="store_true", default=False,
653
                                help="Ignore the consistency of the disks on"
654
                                " the secondary")
655

    
656
NONLIVE_OPT = cli_option("--non-live", dest="live",
657
                         default=True, action="store_false",
658
                         help="Do a non-live migration (this usually means"
659
                         " freeze the instance, save the state, transfer and"
660
                         " only then resume running on the secondary node)")
661

    
662
NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
663
                                help="Target node and optional secondary node",
664
                                metavar="<pnode>[:<snode>]",
665
                                completion_suggest=OPT_COMPL_INST_ADD_NODES)
666

    
667
NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
668
                           action="append", metavar="<node>",
669
                           help="Use only this node (can be used multiple"
670
                           " times, if not given defaults to all nodes)",
671
                           completion_suggest=OPT_COMPL_ONE_NODE)
672

    
673
SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
674
                             metavar="<node>",
675
                             completion_suggest=OPT_COMPL_ONE_NODE)
676

    
677
NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
678
                         action="store_false",
679
                         help="Don't start the instance after creation")
680

    
681
SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
682
                         action="store_true", default=False,
683
                         help="Show command instead of executing it")
684

    
685
CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
686
                         default=False, action="store_true",
687
                         help="Instead of performing the migration, try to"
688
                         " recover from a failed cleanup. This is safe"
689
                         " to run even if the instance is healthy, but it"
690
                         " will create extra replication traffic and "
691
                         " disrupt briefly the replication (like during the"
692
                         " migration")
693

    
694
STATIC_OPT = cli_option("-s", "--static", dest="static",
695
                        action="store_true", default=False,
696
                        help="Only show configuration data, not runtime data")
697

    
698
ALL_OPT = cli_option("--all", dest="show_all",
699
                     default=False, action="store_true",
700
                     help="Show info on all instances on the cluster."
701
                     " This can take a long time to run, use wisely")
702

    
703
SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
704
                           action="store_true", default=False,
705
                           help="Interactive OS reinstall, lists available"
706
                           " OS templates for selection")
707

    
708
IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
709
                                 action="store_true", default=False,
710
                                 help="Remove the instance from the cluster"
711
                                 " configuration even if there are failures"
712
                                 " during the removal process")
713

    
714
IGNORE_REMOVE_FAILURES_OPT = cli_option("--ignore-remove-failures",
715
                                        dest="ignore_remove_failures",
716
                                        action="store_true", default=False,
717
                                        help="Remove the instance from the"
718
                                        " cluster configuration even if there"
719
                                        " are failures during the removal"
720
                                        " process")
721

    
722
REMOVE_INSTANCE_OPT = cli_option("--remove-instance", dest="remove_instance",
723
                                 action="store_true", default=False,
724
                                 help="Remove the instance from the cluster")
725

    
726
NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
727
                               help="Specifies the new secondary node",
728
                               metavar="NODE", default=None,
729
                               completion_suggest=OPT_COMPL_ONE_NODE)
730

    
731
ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
732
                            default=False, action="store_true",
733
                            help="Replace the disk(s) on the primary"
734
                            " node (only for the drbd template)")
735

    
736
ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
737
                              default=False, action="store_true",
738
                              help="Replace the disk(s) on the secondary"
739
                              " node (only for the drbd template)")
740

    
741
AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
742
                              default=False, action="store_true",
743
                              help="Lock all nodes and auto-promote as needed"
744
                              " to MC status")
745

    
746
AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
747
                              default=False, action="store_true",
748
                              help="Automatically replace faulty disks"
749
                              " (only for the drbd template)")
750

    
751
IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
752
                             default=False, action="store_true",
753
                             help="Ignore current recorded size"
754
                             " (useful for forcing activation when"
755
                             " the recorded size is wrong)")
756

    
757
SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
758
                          metavar="<node>",
759
                          completion_suggest=OPT_COMPL_ONE_NODE)
760

    
761
SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
762
                         metavar="<dir>")
763

    
764
SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
765
                              help="Specify the secondary ip for the node",
766
                              metavar="ADDRESS", default=None)
767

    
768
READD_OPT = cli_option("--readd", dest="readd",
769
                       default=False, action="store_true",
770
                       help="Readd old node after replacing it")
771

    
772
NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
773
                                default=True, action="store_false",
774
                                help="Disable SSH key fingerprint checking")
775

    
776

    
777
MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
778
                    choices=_YESNO, default=None, metavar=_YORNO,
779
                    help="Set the master_candidate flag on the node")
780

    
781
OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
782
                         choices=_YESNO, default=None,
783
                         help="Set the offline flag on the node")
784

    
785
DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
786
                         choices=_YESNO, default=None,
787
                         help="Set the drained flag on the node")
788

    
789
ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
790
                             choices=_YESNO, default=None, metavar=_YORNO,
791
                             help="Set the allocatable flag on a volume")
792

    
793
NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
794
                               help="Disable support for lvm based instances"
795
                               " (cluster-wide)",
796
                               action="store_false", default=True)
797

    
798
ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
799
                            dest="enabled_hypervisors",
800
                            help="Comma-separated list of hypervisors",
801
                            type="string", default=None)
802

    
803
NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
804
                            type="keyval", default={},
805
                            help="NIC parameters")
806

    
807
CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
808
                         dest="candidate_pool_size", type="int",
809
                         help="Set the candidate pool size")
810

    
811
VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name",
812
                         help="Enables LVM and specifies the volume group"
813
                         " name (cluster-wide) for disk allocation [xenvg]",
814
                         metavar="VG", default=None)
815

    
816
YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
817
                          help="Destroy cluster", action="store_true")
818

    
819
NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
820
                          help="Skip node agreement check (dangerous)",
821
                          action="store_true", default=False)
822

    
823
MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
824
                            help="Specify the mac prefix for the instance IP"
825
                            " addresses, in the format XX:XX:XX",
826
                            metavar="PREFIX",
827
                            default=None)
828

    
829
MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
830
                               help="Specify the node interface (cluster-wide)"
831
                               " on which the master IP address will be added "
832
                               " [%s]" % constants.DEFAULT_BRIDGE,
833
                               metavar="NETDEV",
834
                               default=constants.DEFAULT_BRIDGE)
835

    
836
GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
837
                                help="Specify the default directory (cluster-"
838
                                "wide) for storing the file-based disks [%s]" %
839
                                constants.DEFAULT_FILE_STORAGE_DIR,
840
                                metavar="DIR",
841
                                default=constants.DEFAULT_FILE_STORAGE_DIR)
842

    
843
NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
844
                                   help="Don't modify /etc/hosts",
845
                                   action="store_false", default=True)
846

    
847
NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
848
                                    help="Don't initialize SSH keys",
849
                                    action="store_false", default=True)
850

    
851
ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
852
                             help="Enable parseable error messages",
853
                             action="store_true", default=False)
854

    
855
NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
856
                          help="Skip N+1 memory redundancy tests",
857
                          action="store_true", default=False)
858

    
859
REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
860
                             help="Type of reboot: soft/hard/full",
861
                             default=constants.INSTANCE_REBOOT_HARD,
862
                             metavar="<REBOOT>",
863
                             choices=list(constants.REBOOT_TYPES))
864

    
865
IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
866
                                    dest="ignore_secondaries",
867
                                    default=False, action="store_true",
868
                                    help="Ignore errors from secondaries")
869

    
870
NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
871
                            action="store_false", default=True,
872
                            help="Don't shutdown the instance (unsafe)")
873

    
874
TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
875
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
876
                         help="Maximum time to wait")
877

    
878
SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
879
                         dest="shutdown_timeout", type="int",
880
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
881
                         help="Maximum time to wait for instance shutdown")
882

    
883
EARLY_RELEASE_OPT = cli_option("--early-release",
884
                               dest="early_release", default=False,
885
                               action="store_true",
886
                               help="Release the locks on the secondary"
887
                               " node(s) early")
888

    
889
NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
890
                                  dest="new_cluster_cert",
891
                                  default=False, action="store_true",
892
                                  help="Generate a new cluster certificate")
893

    
894
RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
895
                           default=None,
896
                           help="File containing new RAPI certificate")
897

    
898
NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
899
                               default=None, action="store_true",
900
                               help=("Generate a new self-signed RAPI"
901
                                     " certificate"))
902

    
903
NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
904
                                    dest="new_confd_hmac_key",
905
                                    default=False, action="store_true",
906
                                    help=("Create a new HMAC key for %s" %
907
                                          constants.CONFD))
908

    
909
CLUSTER_DOMAIN_SECRET_OPT = cli_option("--cluster-domain-secret",
910
                                       dest="cluster_domain_secret",
911
                                       default=None,
912
                                       help=("Load new new cluster domain"
913
                                             " secret from file"))
914

    
915
NEW_CLUSTER_DOMAIN_SECRET_OPT = cli_option("--new-cluster-domain-secret",
916
                                           dest="new_cluster_domain_secret",
917
                                           default=False, action="store_true",
918
                                           help=("Create a new cluster domain"
919
                                                 " secret"))
920

    
921
USE_REPL_NET_OPT = cli_option("--use-replication-network",
922
                              dest="use_replication_network",
923
                              help="Whether to use the replication network"
924
                              " for talking to the nodes",
925
                              action="store_true", default=False)
926

    
927

    
928
def _ParseArgs(argv, commands, aliases):
929
  """Parser for the command line arguments.
930

931
  This function parses the arguments and returns the function which
932
  must be executed together with its (modified) arguments.
933

934
  @param argv: the command line
935
  @param commands: dictionary with special contents, see the design
936
      doc for cmdline handling
937
  @param aliases: dictionary with command aliases {'alias': 'target, ...}
938

939
  """
940
  if len(argv) == 0:
941
    binary = "<command>"
942
  else:
943
    binary = argv[0].split("/")[-1]
944

    
945
  if len(argv) > 1 and argv[1] == "--version":
946
    ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
947
    # Quit right away. That way we don't have to care about this special
948
    # argument. optparse.py does it the same.
949
    sys.exit(0)
950

    
951
  if len(argv) < 2 or not (argv[1] in commands or
952
                           argv[1] in aliases):
953
    # let's do a nice thing
954
    sortedcmds = commands.keys()
955
    sortedcmds.sort()
956

    
957
    ToStdout("Usage: %s {command} [options...] [argument...]", binary)
958
    ToStdout("%s <command> --help to see details, or man %s", binary, binary)
959
    ToStdout("")
960

    
961
    # compute the max line length for cmd + usage
962
    mlen = max([len(" %s" % cmd) for cmd in commands])
963
    mlen = min(60, mlen) # should not get here...
964

    
965
    # and format a nice command list
966
    ToStdout("Commands:")
967
    for cmd in sortedcmds:
968
      cmdstr = " %s" % (cmd,)
969
      help_text = commands[cmd][4]
970
      help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
971
      ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
972
      for line in help_lines:
973
        ToStdout("%-*s   %s", mlen, "", line)
974

    
975
    ToStdout("")
976

    
977
    return None, None, None
978

    
979
  # get command, unalias it, and look it up in commands
980
  cmd = argv.pop(1)
981
  if cmd in aliases:
982
    if cmd in commands:
983
      raise errors.ProgrammerError("Alias '%s' overrides an existing"
984
                                   " command" % cmd)
985

    
986
    if aliases[cmd] not in commands:
987
      raise errors.ProgrammerError("Alias '%s' maps to non-existing"
988
                                   " command '%s'" % (cmd, aliases[cmd]))
989

    
990
    cmd = aliases[cmd]
991

    
992
  func, args_def, parser_opts, usage, description = commands[cmd]
993
  parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT, DEBUG_OPT],
994
                        description=description,
995
                        formatter=TitledHelpFormatter(),
996
                        usage="%%prog %s %s" % (cmd, usage))
997
  parser.disable_interspersed_args()
998
  options, args = parser.parse_args()
999

    
1000
  if not _CheckArguments(cmd, args_def, args):
1001
    return None, None, None
1002

    
1003
  return func, options, args
1004

    
1005

    
1006
def _CheckArguments(cmd, args_def, args):
1007
  """Verifies the arguments using the argument definition.
1008

1009
  Algorithm:
1010

1011
    1. Abort with error if values specified by user but none expected.
1012

1013
    1. For each argument in definition
1014

1015
      1. Keep running count of minimum number of values (min_count)
1016
      1. Keep running count of maximum number of values (max_count)
1017
      1. If it has an unlimited number of values
1018

1019
        1. Abort with error if it's not the last argument in the definition
1020

1021
    1. If last argument has limited number of values
1022

1023
      1. Abort with error if number of values doesn't match or is too large
1024

1025
    1. Abort with error if user didn't pass enough values (min_count)
1026

1027
  """
1028
  if args and not args_def:
1029
    ToStderr("Error: Command %s expects no arguments", cmd)
1030
    return False
1031

    
1032
  min_count = None
1033
  max_count = None
1034
  check_max = None
1035

    
1036
  last_idx = len(args_def) - 1
1037

    
1038
  for idx, arg in enumerate(args_def):
1039
    if min_count is None:
1040
      min_count = arg.min
1041
    elif arg.min is not None:
1042
      min_count += arg.min
1043

    
1044
    if max_count is None:
1045
      max_count = arg.max
1046
    elif arg.max is not None:
1047
      max_count += arg.max
1048

    
1049
    if idx == last_idx:
1050
      check_max = (arg.max is not None)
1051

    
1052
    elif arg.max is None:
1053
      raise errors.ProgrammerError("Only the last argument can have max=None")
1054

    
1055
  if check_max:
1056
    # Command with exact number of arguments
1057
    if (min_count is not None and max_count is not None and
1058
        min_count == max_count and len(args) != min_count):
1059
      ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
1060
      return False
1061

    
1062
    # Command with limited number of arguments
1063
    if max_count is not None and len(args) > max_count:
1064
      ToStderr("Error: Command %s expects only %d argument(s)",
1065
               cmd, max_count)
1066
      return False
1067

    
1068
  # Command with some required arguments
1069
  if min_count is not None and len(args) < min_count:
1070
    ToStderr("Error: Command %s expects at least %d argument(s)",
1071
             cmd, min_count)
1072
    return False
1073

    
1074
  return True
1075

    
1076

    
1077
def SplitNodeOption(value):
1078
  """Splits the value of a --node option.
1079

1080
  """
1081
  if value and ':' in value:
1082
    return value.split(':', 1)
1083
  else:
1084
    return (value, None)
1085

    
1086

    
1087
def CalculateOSNames(os_name, os_variants):
1088
  """Calculates all the names an OS can be called, according to its variants.
1089

1090
  @type os_name: string
1091
  @param os_name: base name of the os
1092
  @type os_variants: list or None
1093
  @param os_variants: list of supported variants
1094
  @rtype: list
1095
  @return: list of valid names
1096

1097
  """
1098
  if os_variants:
1099
    return ['%s+%s' % (os_name, v) for v in os_variants]
1100
  else:
1101
    return [os_name]
1102

    
1103

    
1104
def UsesRPC(fn):
1105
  def wrapper(*args, **kwargs):
1106
    rpc.Init()
1107
    try:
1108
      return fn(*args, **kwargs)
1109
    finally:
1110
      rpc.Shutdown()
1111
  return wrapper
1112

    
1113

    
1114
def AskUser(text, choices=None):
1115
  """Ask the user a question.
1116

1117
  @param text: the question to ask
1118

1119
  @param choices: list with elements tuples (input_char, return_value,
1120
      description); if not given, it will default to: [('y', True,
1121
      'Perform the operation'), ('n', False, 'Do no do the operation')];
1122
      note that the '?' char is reserved for help
1123

1124
  @return: one of the return values from the choices list; if input is
1125
      not possible (i.e. not running with a tty, we return the last
1126
      entry from the list
1127

1128
  """
1129
  if choices is None:
1130
    choices = [('y', True, 'Perform the operation'),
1131
               ('n', False, 'Do not perform the operation')]
1132
  if not choices or not isinstance(choices, list):
1133
    raise errors.ProgrammerError("Invalid choices argument to AskUser")
1134
  for entry in choices:
1135
    if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
1136
      raise errors.ProgrammerError("Invalid choices element to AskUser")
1137

    
1138
  answer = choices[-1][1]
1139
  new_text = []
1140
  for line in text.splitlines():
1141
    new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
1142
  text = "\n".join(new_text)
1143
  try:
1144
    f = file("/dev/tty", "a+")
1145
  except IOError:
1146
    return answer
1147
  try:
1148
    chars = [entry[0] for entry in choices]
1149
    chars[-1] = "[%s]" % chars[-1]
1150
    chars.append('?')
1151
    maps = dict([(entry[0], entry[1]) for entry in choices])
1152
    while True:
1153
      f.write(text)
1154
      f.write('\n')
1155
      f.write("/".join(chars))
1156
      f.write(": ")
1157
      line = f.readline(2).strip().lower()
1158
      if line in maps:
1159
        answer = maps[line]
1160
        break
1161
      elif line == '?':
1162
        for entry in choices:
1163
          f.write(" %s - %s\n" % (entry[0], entry[2]))
1164
        f.write("\n")
1165
        continue
1166
  finally:
1167
    f.close()
1168
  return answer
1169

    
1170

    
1171
class JobSubmittedException(Exception):
1172
  """Job was submitted, client should exit.
1173

1174
  This exception has one argument, the ID of the job that was
1175
  submitted. The handler should print this ID.
1176

1177
  This is not an error, just a structured way to exit from clients.
1178

1179
  """
1180

    
1181

    
1182
def SendJob(ops, cl=None):
1183
  """Function to submit an opcode without waiting for the results.
1184

1185
  @type ops: list
1186
  @param ops: list of opcodes
1187
  @type cl: luxi.Client
1188
  @param cl: the luxi client to use for communicating with the master;
1189
             if None, a new client will be created
1190

1191
  """
1192
  if cl is None:
1193
    cl = GetClient()
1194

    
1195
  job_id = cl.SubmitJob(ops)
1196

    
1197
  return job_id
1198

    
1199

    
1200
def PollJob(job_id, cl=None, feedback_fn=None):
1201
  """Function to poll for the result of a job.
1202

1203
  @type job_id: job identified
1204
  @param job_id: the job to poll for results
1205
  @type cl: luxi.Client
1206
  @param cl: the luxi client to use for communicating with the master;
1207
             if None, a new client will be created
1208

1209
  """
1210
  if cl is None:
1211
    cl = GetClient()
1212

    
1213
  prev_job_info = None
1214
  prev_logmsg_serial = None
1215

    
1216
  status = None
1217

    
1218
  notified_queued = False
1219
  notified_waitlock = False
1220

    
1221
  while True:
1222
    result = cl.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1223
                                     prev_logmsg_serial)
1224
    if not result:
1225
      # job not found, go away!
1226
      raise errors.JobLost("Job with id %s lost" % job_id)
1227
    elif result == constants.JOB_NOTCHANGED:
1228
      if status is not None and not callable(feedback_fn):
1229
        if status == constants.JOB_STATUS_QUEUED and not notified_queued:
1230
          ToStderr("Job %s is waiting in queue", job_id)
1231
          notified_queued = True
1232
        elif status == constants.JOB_STATUS_WAITLOCK and not notified_waitlock:
1233
          ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1234
          notified_waitlock = True
1235

    
1236
      # Wait again
1237
      continue
1238

    
1239
    # Split result, a tuple of (field values, log entries)
1240
    (job_info, log_entries) = result
1241
    (status, ) = job_info
1242

    
1243
    if log_entries:
1244
      for log_entry in log_entries:
1245
        (serial, timestamp, _, message) = log_entry
1246
        if callable(feedback_fn):
1247
          feedback_fn(log_entry[1:])
1248
        else:
1249
          encoded = utils.SafeEncode(message)
1250
          ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded)
1251
        prev_logmsg_serial = max(prev_logmsg_serial, serial)
1252

    
1253
    # TODO: Handle canceled and archived jobs
1254
    elif status in (constants.JOB_STATUS_SUCCESS,
1255
                    constants.JOB_STATUS_ERROR,
1256
                    constants.JOB_STATUS_CANCELING,
1257
                    constants.JOB_STATUS_CANCELED):
1258
      break
1259

    
1260
    prev_job_info = job_info
1261

    
1262
  jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1263
  if not jobs:
1264
    raise errors.JobLost("Job with id %s lost" % job_id)
1265

    
1266
  status, opstatus, result = jobs[0]
1267
  if status == constants.JOB_STATUS_SUCCESS:
1268
    return result
1269
  elif status in (constants.JOB_STATUS_CANCELING,
1270
                  constants.JOB_STATUS_CANCELED):
1271
    raise errors.OpExecError("Job was canceled")
1272
  else:
1273
    has_ok = False
1274
    for idx, (status, msg) in enumerate(zip(opstatus, result)):
1275
      if status == constants.OP_STATUS_SUCCESS:
1276
        has_ok = True
1277
      elif status == constants.OP_STATUS_ERROR:
1278
        errors.MaybeRaise(msg)
1279
        if has_ok:
1280
          raise errors.OpExecError("partial failure (opcode %d): %s" %
1281
                                   (idx, msg))
1282
        else:
1283
          raise errors.OpExecError(str(msg))
1284
    # default failure mode
1285
    raise errors.OpExecError(result)
1286

    
1287

    
1288
def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):
1289
  """Legacy function to submit an opcode.
1290

1291
  This is just a simple wrapper over the construction of the processor
1292
  instance. It should be extended to better handle feedback and
1293
  interaction functions.
1294

1295
  """
1296
  if cl is None:
1297
    cl = GetClient()
1298

    
1299
  SetGenericOpcodeOpts([op], opts)
1300

    
1301
  job_id = SendJob([op], cl)
1302

    
1303
  op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
1304

    
1305
  return op_results[0]
1306

    
1307

    
1308
def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1309
  """Wrapper around SubmitOpCode or SendJob.
1310

1311
  This function will decide, based on the 'opts' parameter, whether to
1312
  submit and wait for the result of the opcode (and return it), or
1313
  whether to just send the job and print its identifier. It is used in
1314
  order to simplify the implementation of the '--submit' option.
1315

1316
  It will also process the opcodes if we're sending the via SendJob
1317
  (otherwise SubmitOpCode does it).
1318

1319
  """
1320
  if opts and opts.submit_only:
1321
    job = [op]
1322
    SetGenericOpcodeOpts(job, opts)
1323
    job_id = SendJob(job, cl=cl)
1324
    raise JobSubmittedException(job_id)
1325
  else:
1326
    return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1327

    
1328

    
1329
def SetGenericOpcodeOpts(opcode_list, options):
1330
  """Processor for generic options.
1331

1332
  This function updates the given opcodes based on generic command
1333
  line options (like debug, dry-run, etc.).
1334

1335
  @param opcode_list: list of opcodes
1336
  @param options: command line options or None
1337
  @return: None (in-place modification)
1338

1339
  """
1340
  if not options:
1341
    return
1342
  for op in opcode_list:
1343
    op.dry_run = options.dry_run
1344
    op.debug_level = options.debug
1345

    
1346

    
1347
def GetClient():
1348
  # TODO: Cache object?
1349
  try:
1350
    client = luxi.Client()
1351
  except luxi.NoMasterError:
1352
    ss = ssconf.SimpleStore()
1353

    
1354
    # Try to read ssconf file
1355
    try:
1356
      ss.GetMasterNode()
1357
    except errors.ConfigurationError:
1358
      raise errors.OpPrereqError("Cluster not initialized or this machine is"
1359
                                 " not part of a cluster")
1360

    
1361
    master, myself = ssconf.GetMasterAndMyself(ss=ss)
1362
    if master != myself:
1363
      raise errors.OpPrereqError("This is not the master node, please connect"
1364
                                 " to node '%s' and rerun the command" %
1365
                                 master)
1366
    raise
1367
  return client
1368

    
1369

    
1370
def FormatError(err):
1371
  """Return a formatted error message for a given error.
1372

1373
  This function takes an exception instance and returns a tuple
1374
  consisting of two values: first, the recommended exit code, and
1375
  second, a string describing the error message (not
1376
  newline-terminated).
1377

1378
  """
1379
  retcode = 1
1380
  obuf = StringIO()
1381
  msg = str(err)
1382
  if isinstance(err, errors.ConfigurationError):
1383
    txt = "Corrupt configuration file: %s" % msg
1384
    logging.error(txt)
1385
    obuf.write(txt + "\n")
1386
    obuf.write("Aborting.")
1387
    retcode = 2
1388
  elif isinstance(err, errors.HooksAbort):
1389
    obuf.write("Failure: hooks execution failed:\n")
1390
    for node, script, out in err.args[0]:
1391
      if out:
1392
        obuf.write("  node: %s, script: %s, output: %s\n" %
1393
                   (node, script, out))
1394
      else:
1395
        obuf.write("  node: %s, script: %s (no output)\n" %
1396
                   (node, script))
1397
  elif isinstance(err, errors.HooksFailure):
1398
    obuf.write("Failure: hooks general failure: %s" % msg)
1399
  elif isinstance(err, errors.ResolverError):
1400
    this_host = utils.HostInfo.SysName()
1401
    if err.args[0] == this_host:
1402
      msg = "Failure: can't resolve my own hostname ('%s')"
1403
    else:
1404
      msg = "Failure: can't resolve hostname '%s'"
1405
    obuf.write(msg % err.args[0])
1406
  elif isinstance(err, errors.OpPrereqError):
1407
    if len(err.args) == 2:
1408
      obuf.write("Failure: prerequisites not met for this"
1409
               " operation:\nerror type: %s, error details:\n%s" %
1410
                 (err.args[1], err.args[0]))
1411
    else:
1412
      obuf.write("Failure: prerequisites not met for this"
1413
                 " operation:\n%s" % msg)
1414
  elif isinstance(err, errors.OpExecError):
1415
    obuf.write("Failure: command execution error:\n%s" % msg)
1416
  elif isinstance(err, errors.TagError):
1417
    obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1418
  elif isinstance(err, errors.JobQueueDrainError):
1419
    obuf.write("Failure: the job queue is marked for drain and doesn't"
1420
               " accept new requests\n")
1421
  elif isinstance(err, errors.JobQueueFull):
1422
    obuf.write("Failure: the job queue is full and doesn't accept new"
1423
               " job submissions until old jobs are archived\n")
1424
  elif isinstance(err, errors.TypeEnforcementError):
1425
    obuf.write("Parameter Error: %s" % msg)
1426
  elif isinstance(err, errors.ParameterError):
1427
    obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1428
  elif isinstance(err, luxi.NoMasterError):
1429
    obuf.write("Cannot communicate with the master daemon.\nIs it running"
1430
               " and listening for connections?")
1431
  elif isinstance(err, luxi.TimeoutError):
1432
    obuf.write("Timeout while talking to the master daemon. Error:\n"
1433
               "%s" % msg)
1434
  elif isinstance(err, luxi.ProtocolError):
1435
    obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1436
               "%s" % msg)
1437
  elif isinstance(err, errors.GenericError):
1438
    obuf.write("Unhandled Ganeti error: %s" % msg)
1439
  elif isinstance(err, JobSubmittedException):
1440
    obuf.write("JobID: %s\n" % err.args[0])
1441
    retcode = 0
1442
  else:
1443
    obuf.write("Unhandled exception: %s" % msg)
1444
  return retcode, obuf.getvalue().rstrip('\n')
1445

    
1446

    
1447
def GenericMain(commands, override=None, aliases=None):
1448
  """Generic main function for all the gnt-* commands.
1449

1450
  Arguments:
1451
    - commands: a dictionary with a special structure, see the design doc
1452
                for command line handling.
1453
    - override: if not None, we expect a dictionary with keys that will
1454
                override command line options; this can be used to pass
1455
                options from the scripts to generic functions
1456
    - aliases: dictionary with command aliases {'alias': 'target, ...}
1457

1458
  """
1459
  # save the program name and the entire command line for later logging
1460
  if sys.argv:
1461
    binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1462
    if len(sys.argv) >= 2:
1463
      binary += " " + sys.argv[1]
1464
      old_cmdline = " ".join(sys.argv[2:])
1465
    else:
1466
      old_cmdline = ""
1467
  else:
1468
    binary = "<unknown program>"
1469
    old_cmdline = ""
1470

    
1471
  if aliases is None:
1472
    aliases = {}
1473

    
1474
  try:
1475
    func, options, args = _ParseArgs(sys.argv, commands, aliases)
1476
  except errors.ParameterError, err:
1477
    result, err_msg = FormatError(err)
1478
    ToStderr(err_msg)
1479
    return 1
1480

    
1481
  if func is None: # parse error
1482
    return 1
1483

    
1484
  if override is not None:
1485
    for key, val in override.iteritems():
1486
      setattr(options, key, val)
1487

    
1488
  utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1489
                     stderr_logging=True, program=binary)
1490

    
1491
  if old_cmdline:
1492
    logging.info("run with arguments '%s'", old_cmdline)
1493
  else:
1494
    logging.info("run with no arguments")
1495

    
1496
  try:
1497
    result = func(options, args)
1498
  except (errors.GenericError, luxi.ProtocolError,
1499
          JobSubmittedException), err:
1500
    result, err_msg = FormatError(err)
1501
    logging.exception("Error during command processing")
1502
    ToStderr(err_msg)
1503

    
1504
  return result
1505

    
1506

    
1507
def GenericInstanceCreate(mode, opts, args):
1508
  """Add an instance to the cluster via either creation or import.
1509

1510
  @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1511
  @param opts: the command line options selected by the user
1512
  @type args: list
1513
  @param args: should contain only one element, the new instance name
1514
  @rtype: int
1515
  @return: the desired exit code
1516

1517
  """
1518
  instance = args[0]
1519

    
1520
  (pnode, snode) = SplitNodeOption(opts.node)
1521

    
1522
  hypervisor = None
1523
  hvparams = {}
1524
  if opts.hypervisor:
1525
    hypervisor, hvparams = opts.hypervisor
1526

    
1527
  if opts.nics:
1528
    try:
1529
      nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
1530
    except ValueError, err:
1531
      raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
1532
    nics = [{}] * nic_max
1533
    for nidx, ndict in opts.nics:
1534
      nidx = int(nidx)
1535
      if not isinstance(ndict, dict):
1536
        msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict)
1537
        raise errors.OpPrereqError(msg)
1538
      nics[nidx] = ndict
1539
  elif opts.no_nics:
1540
    # no nics
1541
    nics = []
1542
  else:
1543
    # default of one nic, all auto
1544
    nics = [{}]
1545

    
1546
  if opts.disk_template == constants.DT_DISKLESS:
1547
    if opts.disks or opts.sd_size is not None:
1548
      raise errors.OpPrereqError("Diskless instance but disk"
1549
                                 " information passed")
1550
    disks = []
1551
  else:
1552
    if not opts.disks and not opts.sd_size:
1553
      raise errors.OpPrereqError("No disk information specified")
1554
    if opts.disks and opts.sd_size is not None:
1555
      raise errors.OpPrereqError("Please use either the '--disk' or"
1556
                                 " '-s' option")
1557
    if opts.sd_size is not None:
1558
      opts.disks = [(0, {"size": opts.sd_size})]
1559
    try:
1560
      disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1561
    except ValueError, err:
1562
      raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
1563
    disks = [{}] * disk_max
1564
    for didx, ddict in opts.disks:
1565
      didx = int(didx)
1566
      if not isinstance(ddict, dict):
1567
        msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1568
        raise errors.OpPrereqError(msg)
1569
      elif "size" in ddict:
1570
        if "adopt" in ddict:
1571
          raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1572
                                     " (disk %d)" % didx)
1573
        try:
1574
          ddict["size"] = utils.ParseUnit(ddict["size"])
1575
        except ValueError, err:
1576
          raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1577
                                     (didx, err))
1578
      elif "adopt" in ddict:
1579
        if mode == constants.INSTANCE_IMPORT:
1580
          raise errors.OpPrereqError("Disk adoption not allowed for instance"
1581
                                     " import")
1582
        ddict["size"] = 0
1583
      else:
1584
        raise errors.OpPrereqError("Missing size or adoption source for"
1585
                                   " disk %d" % didx)
1586
      disks[didx] = ddict
1587

    
1588
  utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
1589
  utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1590

    
1591
  if mode == constants.INSTANCE_CREATE:
1592
    start = opts.start
1593
    os_type = opts.os
1594
    src_node = None
1595
    src_path = None
1596
    no_install = opts.no_install
1597
  elif mode == constants.INSTANCE_IMPORT:
1598
    start = False
1599
    os_type = None
1600
    src_node = opts.src_node
1601
    src_path = opts.src_dir
1602
    no_install = None
1603
  else:
1604
    raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1605

    
1606
  op = opcodes.OpCreateInstance(instance_name=instance,
1607
                                disks=disks,
1608
                                disk_template=opts.disk_template,
1609
                                nics=nics,
1610
                                pnode=pnode, snode=snode,
1611
                                ip_check=opts.ip_check,
1612
                                name_check=opts.name_check,
1613
                                wait_for_sync=opts.wait_for_sync,
1614
                                file_storage_dir=opts.file_storage_dir,
1615
                                file_driver=opts.file_driver,
1616
                                iallocator=opts.iallocator,
1617
                                hypervisor=hypervisor,
1618
                                hvparams=hvparams,
1619
                                beparams=opts.beparams,
1620
                                mode=mode,
1621
                                start=start,
1622
                                os_type=os_type,
1623
                                src_node=src_node,
1624
                                src_path=src_path,
1625
                                no_install=no_install)
1626

    
1627
  SubmitOrSend(op, opts)
1628
  return 0
1629

    
1630

    
1631
class _RunWhileClusterStoppedHelper:
1632
  """Helper class for L{RunWhileClusterStopped} to simplify state management
1633

1634
  """
1635
  def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
1636
    """Initializes this class.
1637

1638
    @type feedback_fn: callable
1639
    @param feedback_fn: Feedback function
1640
    @type cluster_name: string
1641
    @param cluster_name: Cluster name
1642
    @type master_node: string
1643
    @param master_node Master node name
1644
    @type online_nodes: list
1645
    @param online_nodes: List of names of online nodes
1646

1647
    """
1648
    self.feedback_fn = feedback_fn
1649
    self.cluster_name = cluster_name
1650
    self.master_node = master_node
1651
    self.online_nodes = online_nodes
1652

    
1653
    self.ssh = ssh.SshRunner(self.cluster_name)
1654

    
1655
    self.nonmaster_nodes = [name for name in online_nodes
1656
                            if name != master_node]
1657

    
1658
    assert self.master_node not in self.nonmaster_nodes
1659

    
1660
  def _RunCmd(self, node_name, cmd):
1661
    """Runs a command on the local or a remote machine.
1662

1663
    @type node_name: string
1664
    @param node_name: Machine name
1665
    @type cmd: list
1666
    @param cmd: Command
1667

1668
    """
1669
    if node_name is None or node_name == self.master_node:
1670
      # No need to use SSH
1671
      result = utils.RunCmd(cmd)
1672
    else:
1673
      result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
1674

    
1675
    if result.failed:
1676
      errmsg = ["Failed to run command %s" % result.cmd]
1677
      if node_name:
1678
        errmsg.append("on node %s" % node_name)
1679
      errmsg.append(": exitcode %s and error %s" %
1680
                    (result.exit_code, result.output))
1681
      raise errors.OpExecError(" ".join(errmsg))
1682

    
1683
  def Call(self, fn, *args):
1684
    """Call function while all daemons are stopped.
1685

1686
    @type fn: callable
1687
    @param fn: Function to be called
1688

1689
    """
1690
    # Pause watcher by acquiring an exclusive lock on watcher state file
1691
    self.feedback_fn("Blocking watcher")
1692
    watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
1693
    try:
1694
      # TODO: Currently, this just blocks. There's no timeout.
1695
      # TODO: Should it be a shared lock?
1696
      watcher_block.Exclusive(blocking=True)
1697

    
1698
      # Stop master daemons, so that no new jobs can come in and all running
1699
      # ones are finished
1700
      self.feedback_fn("Stopping master daemons")
1701
      self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
1702
      try:
1703
        # Stop daemons on all nodes
1704
        for node_name in self.online_nodes:
1705
          self.feedback_fn("Stopping daemons on %s" % node_name)
1706
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
1707

    
1708
        # All daemons are shut down now
1709
        try:
1710
          return fn(self, *args)
1711
        except Exception, err:
1712
          _, errmsg = FormatError(err)
1713
          logging.exception("Caught exception")
1714
          self.feedback_fn(errmsg)
1715
          raise
1716
      finally:
1717
        # Start cluster again, master node last
1718
        for node_name in self.nonmaster_nodes + [self.master_node]:
1719
          self.feedback_fn("Starting daemons on %s" % node_name)
1720
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
1721
    finally:
1722
      # Resume watcher
1723
      watcher_block.Close()
1724

    
1725

    
1726
def RunWhileClusterStopped(feedback_fn, fn, *args):
1727
  """Calls a function while all cluster daemons are stopped.
1728

1729
  @type feedback_fn: callable
1730
  @param feedback_fn: Feedback function
1731
  @type fn: callable
1732
  @param fn: Function to be called when daemons are stopped
1733

1734
  """
1735
  feedback_fn("Gathering cluster information")
1736

    
1737
  # This ensures we're running on the master daemon
1738
  cl = GetClient()
1739

    
1740
  (cluster_name, master_node) = \
1741
    cl.QueryConfigValues(["cluster_name", "master_node"])
1742

    
1743
  online_nodes = GetOnlineNodes([], cl=cl)
1744

    
1745
  # Don't keep a reference to the client. The master daemon will go away.
1746
  del cl
1747

    
1748
  assert master_node in online_nodes
1749

    
1750
  return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
1751
                                       online_nodes).Call(fn, *args)
1752

    
1753

    
1754
def GenerateTable(headers, fields, separator, data,
1755
                  numfields=None, unitfields=None,
1756
                  units=None):
1757
  """Prints a table with headers and different fields.
1758

1759
  @type headers: dict
1760
  @param headers: dictionary mapping field names to headers for
1761
      the table
1762
  @type fields: list
1763
  @param fields: the field names corresponding to each row in
1764
      the data field
1765
  @param separator: the separator to be used; if this is None,
1766
      the default 'smart' algorithm is used which computes optimal
1767
      field width, otherwise just the separator is used between
1768
      each field
1769
  @type data: list
1770
  @param data: a list of lists, each sublist being one row to be output
1771
  @type numfields: list
1772
  @param numfields: a list with the fields that hold numeric
1773
      values and thus should be right-aligned
1774
  @type unitfields: list
1775
  @param unitfields: a list with the fields that hold numeric
1776
      values that should be formatted with the units field
1777
  @type units: string or None
1778
  @param units: the units we should use for formatting, or None for
1779
      automatic choice (human-readable for non-separator usage, otherwise
1780
      megabytes); this is a one-letter string
1781

1782
  """
1783
  if units is None:
1784
    if separator:
1785
      units = "m"
1786
    else:
1787
      units = "h"
1788

    
1789
  if numfields is None:
1790
    numfields = []
1791
  if unitfields is None:
1792
    unitfields = []
1793

    
1794
  numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
1795
  unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
1796

    
1797
  format_fields = []
1798
  for field in fields:
1799
    if headers and field not in headers:
1800
      # TODO: handle better unknown fields (either revert to old
1801
      # style of raising exception, or deal more intelligently with
1802
      # variable fields)
1803
      headers[field] = field
1804
    if separator is not None:
1805
      format_fields.append("%s")
1806
    elif numfields.Matches(field):
1807
      format_fields.append("%*s")
1808
    else:
1809
      format_fields.append("%-*s")
1810

    
1811
  if separator is None:
1812
    mlens = [0 for name in fields]
1813
    format = ' '.join(format_fields)
1814
  else:
1815
    format = separator.replace("%", "%%").join(format_fields)
1816

    
1817
  for row in data:
1818
    if row is None:
1819
      continue
1820
    for idx, val in enumerate(row):
1821
      if unitfields.Matches(fields[idx]):
1822
        try:
1823
          val = int(val)
1824
        except (TypeError, ValueError):
1825
          pass
1826
        else:
1827
          val = row[idx] = utils.FormatUnit(val, units)
1828
      val = row[idx] = str(val)
1829
      if separator is None:
1830
        mlens[idx] = max(mlens[idx], len(val))
1831

    
1832
  result = []
1833
  if headers:
1834
    args = []
1835
    for idx, name in enumerate(fields):
1836
      hdr = headers[name]
1837
      if separator is None:
1838
        mlens[idx] = max(mlens[idx], len(hdr))
1839
        args.append(mlens[idx])
1840
      args.append(hdr)
1841
    result.append(format % tuple(args))
1842

    
1843
  if separator is None:
1844
    assert len(mlens) == len(fields)
1845

    
1846
    if fields and not numfields.Matches(fields[-1]):
1847
      mlens[-1] = 0
1848

    
1849
  for line in data:
1850
    args = []
1851
    if line is None:
1852
      line = ['-' for _ in fields]
1853
    for idx in range(len(fields)):
1854
      if separator is None:
1855
        args.append(mlens[idx])
1856
      args.append(line[idx])
1857
    result.append(format % tuple(args))
1858

    
1859
  return result
1860

    
1861

    
1862
def FormatTimestamp(ts):
1863
  """Formats a given timestamp.
1864

1865
  @type ts: timestamp
1866
  @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
1867

1868
  @rtype: string
1869
  @return: a string with the formatted timestamp
1870

1871
  """
1872
  if not isinstance (ts, (tuple, list)) or len(ts) != 2:
1873
    return '?'
1874
  sec, usec = ts
1875
  return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
1876

    
1877

    
1878
def ParseTimespec(value):
1879
  """Parse a time specification.
1880

1881
  The following suffixed will be recognized:
1882

1883
    - s: seconds
1884
    - m: minutes
1885
    - h: hours
1886
    - d: day
1887
    - w: weeks
1888

1889
  Without any suffix, the value will be taken to be in seconds.
1890

1891
  """
1892
  value = str(value)
1893
  if not value:
1894
    raise errors.OpPrereqError("Empty time specification passed")
1895
  suffix_map = {
1896
    's': 1,
1897
    'm': 60,
1898
    'h': 3600,
1899
    'd': 86400,
1900
    'w': 604800,
1901
    }
1902
  if value[-1] not in suffix_map:
1903
    try:
1904
      value = int(value)
1905
    except (TypeError, ValueError):
1906
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1907
  else:
1908
    multiplier = suffix_map[value[-1]]
1909
    value = value[:-1]
1910
    if not value: # no data left after stripping the suffix
1911
      raise errors.OpPrereqError("Invalid time specification (only"
1912
                                 " suffix passed)")
1913
    try:
1914
      value = int(value) * multiplier
1915
    except (TypeError, ValueError):
1916
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1917
  return value
1918

    
1919

    
1920
def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
1921
                   filter_master=False):
1922
  """Returns the names of online nodes.
1923

1924
  This function will also log a warning on stderr with the names of
1925
  the online nodes.
1926

1927
  @param nodes: if not empty, use only this subset of nodes (minus the
1928
      offline ones)
1929
  @param cl: if not None, luxi client to use
1930
  @type nowarn: boolean
1931
  @param nowarn: by default, this function will output a note with the
1932
      offline nodes that are skipped; if this parameter is True the
1933
      note is not displayed
1934
  @type secondary_ips: boolean
1935
  @param secondary_ips: if True, return the secondary IPs instead of the
1936
      names, useful for doing network traffic over the replication interface
1937
      (if any)
1938
  @type filter_master: boolean
1939
  @param filter_master: if True, do not return the master node in the list
1940
      (useful in coordination with secondary_ips where we cannot check our
1941
      node name against the list)
1942

1943
  """
1944
  if cl is None:
1945
    cl = GetClient()
1946

    
1947
  if secondary_ips:
1948
    name_idx = 2
1949
  else:
1950
    name_idx = 0
1951

    
1952
  if filter_master:
1953
    master_node = cl.QueryConfigValues(["master_node"])[0]
1954
    filter_fn = lambda x: x != master_node
1955
  else:
1956
    filter_fn = lambda _: True
1957

    
1958
  result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"],
1959
                         use_locking=False)
1960
  offline = [row[0] for row in result if row[1]]
1961
  if offline and not nowarn:
1962
    ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
1963
  return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])]
1964

    
1965

    
1966
def _ToStream(stream, txt, *args):
1967
  """Write a message to a stream, bypassing the logging system
1968

1969
  @type stream: file object
1970
  @param stream: the file to which we should write
1971
  @type txt: str
1972
  @param txt: the message
1973

1974
  """
1975
  if args:
1976
    args = tuple(args)
1977
    stream.write(txt % args)
1978
  else:
1979
    stream.write(txt)
1980
  stream.write('\n')
1981
  stream.flush()
1982

    
1983

    
1984
def ToStdout(txt, *args):
1985
  """Write a message to stdout only, bypassing the logging system
1986

1987
  This is just a wrapper over _ToStream.
1988

1989
  @type txt: str
1990
  @param txt: the message
1991

1992
  """
1993
  _ToStream(sys.stdout, txt, *args)
1994

    
1995

    
1996
def ToStderr(txt, *args):
1997
  """Write a message to stderr only, bypassing the logging system
1998

1999
  This is just a wrapper over _ToStream.
2000

2001
  @type txt: str
2002
  @param txt: the message
2003

2004
  """
2005
  _ToStream(sys.stderr, txt, *args)
2006

    
2007

    
2008
class JobExecutor(object):
2009
  """Class which manages the submission and execution of multiple jobs.
2010

2011
  Note that instances of this class should not be reused between
2012
  GetResults() calls.
2013

2014
  """
2015
  def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
2016
    self.queue = []
2017
    if cl is None:
2018
      cl = GetClient()
2019
    self.cl = cl
2020
    self.verbose = verbose
2021
    self.jobs = []
2022
    self.opts = opts
2023
    self.feedback_fn = feedback_fn
2024

    
2025
  def QueueJob(self, name, *ops):
2026
    """Record a job for later submit.
2027

2028
    @type name: string
2029
    @param name: a description of the job, will be used in WaitJobSet
2030
    """
2031
    SetGenericOpcodeOpts(ops, self.opts)
2032
    self.queue.append((name, ops))
2033

    
2034
  def SubmitPending(self):
2035
    """Submit all pending jobs.
2036

2037
    """
2038
    results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
2039
    for (idx, ((status, data), (name, _))) in enumerate(zip(results,
2040
                                                            self.queue)):
2041
      self.jobs.append((idx, status, data, name))
2042

    
2043
  def _ChooseJob(self):
2044
    """Choose a non-waiting/queued job to poll next.
2045

2046
    """
2047
    assert self.jobs, "_ChooseJob called with empty job list"
2048

    
2049
    result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
2050
    assert result
2051

    
2052
    for job_data, status in zip(self.jobs, result):
2053
      if status[0] in (constants.JOB_STATUS_QUEUED,
2054
                    constants.JOB_STATUS_WAITLOCK,
2055
                    constants.JOB_STATUS_CANCELING):
2056
        # job is still waiting
2057
        continue
2058
      # good candidate found
2059
      self.jobs.remove(job_data)
2060
      return job_data
2061

    
2062
    # no job found
2063
    return self.jobs.pop(0)
2064

    
2065
  def GetResults(self):
2066
    """Wait for and return the results of all jobs.
2067

2068
    @rtype: list
2069
    @return: list of tuples (success, job results), in the same order
2070
        as the submitted jobs; if a job has failed, instead of the result
2071
        there will be the error message
2072

2073
    """
2074
    if not self.jobs:
2075
      self.SubmitPending()
2076
    results = []
2077
    if self.verbose:
2078
      ok_jobs = [row[2] for row in self.jobs if row[1]]
2079
      if ok_jobs:
2080
        ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2081

    
2082
    # first, remove any non-submitted jobs
2083
    self.jobs, failures = utils.partition(self.jobs, lambda x: x[1])
2084
    for idx, _, jid, name in failures:
2085
      ToStderr("Failed to submit job for %s: %s", name, jid)
2086
      results.append((idx, False, jid))
2087

    
2088
    while self.jobs:
2089
      (idx, _, jid, name) = self._ChooseJob()
2090
      ToStdout("Waiting for job %s for %s...", jid, name)
2091
      try:
2092
        job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2093
        success = True
2094
      except (errors.GenericError, luxi.ProtocolError), err:
2095
        _, job_result = FormatError(err)
2096
        success = False
2097
        # the error message will always be shown, verbose or not
2098
        ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
2099

    
2100
      results.append((idx, success, job_result))
2101

    
2102
    # sort based on the index, then drop it
2103
    results.sort()
2104
    results = [i[1:] for i in results]
2105

    
2106
    return results
2107

    
2108
  def WaitOrShow(self, wait):
2109
    """Wait for job results or only print the job IDs.
2110

2111
    @type wait: boolean
2112
    @param wait: whether to wait or not
2113

2114
    """
2115
    if wait:
2116
      return self.GetResults()
2117
    else:
2118
      if not self.jobs:
2119
        self.SubmitPending()
2120
      for status, result, name in self.jobs:
2121
        if status:
2122
          ToStdout("%s: %s", result, name)
2123
        else:
2124
          ToStderr("Failure for %s: %s", name, result)