Statistics
| Branch: | Tag: | Revision:

root / lib / cli.py @ 0af0f641

History | View | Annotate | Download (68.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module dealing with command line parsing"""
23

    
24

    
25
import sys
26
import textwrap
27
import os.path
28
import time
29
import logging
30
from cStringIO import StringIO
31

    
32
from ganeti import utils
33
from ganeti import errors
34
from ganeti import constants
35
from ganeti import opcodes
36
from ganeti import luxi
37
from ganeti import ssconf
38
from ganeti import rpc
39
from ganeti import ssh
40

    
41
from optparse import (OptionParser, TitledHelpFormatter,
42
                      Option, OptionValueError)
43

    
44

    
45
__all__ = [
46
  # Command line options
47
  "ALLOCATABLE_OPT",
48
  "ALL_OPT",
49
  "AUTO_PROMOTE_OPT",
50
  "AUTO_REPLACE_OPT",
51
  "BACKEND_OPT",
52
  "CLEANUP_OPT",
53
  "CONFIRM_OPT",
54
  "CP_SIZE_OPT",
55
  "DEBUG_OPT",
56
  "DEBUG_SIMERR_OPT",
57
  "DISKIDX_OPT",
58
  "DISK_OPT",
59
  "DISK_TEMPLATE_OPT",
60
  "DRAINED_OPT",
61
  "EARLY_RELEASE_OPT",
62
  "ENABLED_HV_OPT",
63
  "ERROR_CODES_OPT",
64
  "FIELDS_OPT",
65
  "FILESTORE_DIR_OPT",
66
  "FILESTORE_DRIVER_OPT",
67
  "FORCE_OPT",
68
  "FORCE_VARIANT_OPT",
69
  "GLOBAL_FILEDIR_OPT",
70
  "HVLIST_OPT",
71
  "HVOPTS_OPT",
72
  "HYPERVISOR_OPT",
73
  "IALLOCATOR_OPT",
74
  "IGNORE_CONSIST_OPT",
75
  "IGNORE_FAILURES_OPT",
76
  "IGNORE_SECONDARIES_OPT",
77
  "IGNORE_SIZE_OPT",
78
  "MAC_PREFIX_OPT",
79
  "MAINTAIN_NODE_HEALTH_OPT",
80
  "MASTER_NETDEV_OPT",
81
  "MC_OPT",
82
  "NET_OPT",
83
  "NEW_CLUSTER_CERT_OPT",
84
  "NEW_CONFD_HMAC_KEY_OPT",
85
  "NEW_RAPI_CERT_OPT",
86
  "NEW_SECONDARY_OPT",
87
  "NIC_PARAMS_OPT",
88
  "NODE_LIST_OPT",
89
  "NODE_PLACEMENT_OPT",
90
  "NOHDR_OPT",
91
  "NOIPCHECK_OPT",
92
  "NO_INSTALL_OPT",
93
  "NONAMECHECK_OPT",
94
  "NOLVM_STORAGE_OPT",
95
  "NOMODIFY_ETCHOSTS_OPT",
96
  "NOMODIFY_SSH_SETUP_OPT",
97
  "NONICS_OPT",
98
  "NONLIVE_OPT",
99
  "NONPLUS1_OPT",
100
  "NOSHUTDOWN_OPT",
101
  "NOSTART_OPT",
102
  "NOSSH_KEYCHECK_OPT",
103
  "NOVOTING_OPT",
104
  "NWSYNC_OPT",
105
  "ON_PRIMARY_OPT",
106
  "ON_SECONDARY_OPT",
107
  "OFFLINE_OPT",
108
  "OS_OPT",
109
  "OS_SIZE_OPT",
110
  "RAPI_CERT_OPT",
111
  "READD_OPT",
112
  "REBOOT_TYPE_OPT",
113
  "SECONDARY_IP_OPT",
114
  "SELECT_OS_OPT",
115
  "SEP_OPT",
116
  "SHOWCMD_OPT",
117
  "SHUTDOWN_TIMEOUT_OPT",
118
  "SINGLE_NODE_OPT",
119
  "SRC_DIR_OPT",
120
  "SRC_NODE_OPT",
121
  "SUBMIT_OPT",
122
  "STATIC_OPT",
123
  "SYNC_OPT",
124
  "TAG_SRC_OPT",
125
  "TIMEOUT_OPT",
126
  "USEUNITS_OPT",
127
  "USE_REPL_NET_OPT",
128
  "VERBOSE_OPT",
129
  "VG_NAME_OPT",
130
  "YES_DOIT_OPT",
131
  # Generic functions for CLI programs
132
  "GenericMain",
133
  "GenericInstanceCreate",
134
  "GetClient",
135
  "GetOnlineNodes",
136
  "JobExecutor",
137
  "JobSubmittedException",
138
  "ParseTimespec",
139
  "RunWhileClusterStopped",
140
  "SubmitOpCode",
141
  "SubmitOrSend",
142
  "UsesRPC",
143
  # Formatting functions
144
  "ToStderr", "ToStdout",
145
  "FormatError",
146
  "GenerateTable",
147
  "AskUser",
148
  "FormatTimestamp",
149
  # Tags functions
150
  "ListTags",
151
  "AddTags",
152
  "RemoveTags",
153
  # command line options support infrastructure
154
  "ARGS_MANY_INSTANCES",
155
  "ARGS_MANY_NODES",
156
  "ARGS_NONE",
157
  "ARGS_ONE_INSTANCE",
158
  "ARGS_ONE_NODE",
159
  "ARGS_ONE_OS",
160
  "ArgChoice",
161
  "ArgCommand",
162
  "ArgFile",
163
  "ArgHost",
164
  "ArgInstance",
165
  "ArgJobId",
166
  "ArgNode",
167
  "ArgOs",
168
  "ArgSuggest",
169
  "ArgUnknown",
170
  "OPT_COMPL_INST_ADD_NODES",
171
  "OPT_COMPL_MANY_NODES",
172
  "OPT_COMPL_ONE_IALLOCATOR",
173
  "OPT_COMPL_ONE_INSTANCE",
174
  "OPT_COMPL_ONE_NODE",
175
  "OPT_COMPL_ONE_OS",
176
  "cli_option",
177
  "SplitNodeOption",
178
  "CalculateOSNames",
179
  ]
180

    
181
NO_PREFIX = "no_"
182
UN_PREFIX = "-"
183

    
184

    
185
class _Argument:
186
  def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
187
    self.min = min
188
    self.max = max
189

    
190
  def __repr__(self):
191
    return ("<%s min=%s max=%s>" %
192
            (self.__class__.__name__, self.min, self.max))
193

    
194

    
195
class ArgSuggest(_Argument):
196
  """Suggesting argument.
197

198
  Value can be any of the ones passed to the constructor.
199

200
  """
201
  # pylint: disable-msg=W0622
202
  def __init__(self, min=0, max=None, choices=None):
203
    _Argument.__init__(self, min=min, max=max)
204
    self.choices = choices
205

    
206
  def __repr__(self):
207
    return ("<%s min=%s max=%s choices=%r>" %
208
            (self.__class__.__name__, self.min, self.max, self.choices))
209

    
210

    
211
class ArgChoice(ArgSuggest):
212
  """Choice argument.
213

214
  Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
215
  but value must be one of the choices.
216

217
  """
218

    
219

    
220
class ArgUnknown(_Argument):
221
  """Unknown argument to program (e.g. determined at runtime).
222

223
  """
224

    
225

    
226
class ArgInstance(_Argument):
227
  """Instances argument.
228

229
  """
230

    
231

    
232
class ArgNode(_Argument):
233
  """Node argument.
234

235
  """
236

    
237
class ArgJobId(_Argument):
238
  """Job ID argument.
239

240
  """
241

    
242

    
243
class ArgFile(_Argument):
244
  """File path argument.
245

246
  """
247

    
248

    
249
class ArgCommand(_Argument):
250
  """Command argument.
251

252
  """
253

    
254

    
255
class ArgHost(_Argument):
256
  """Host argument.
257

258
  """
259

    
260

    
261
class ArgOs(_Argument):
262
  """OS argument.
263

264
  """
265

    
266

    
267
ARGS_NONE = []
268
ARGS_MANY_INSTANCES = [ArgInstance()]
269
ARGS_MANY_NODES = [ArgNode()]
270
ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
271
ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
272
ARGS_ONE_OS = [ArgOs(min=1, max=1)]
273

    
274

    
275
def _ExtractTagsObject(opts, args):
276
  """Extract the tag type object.
277

278
  Note that this function will modify its args parameter.
279

280
  """
281
  if not hasattr(opts, "tag_type"):
282
    raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
283
  kind = opts.tag_type
284
  if kind == constants.TAG_CLUSTER:
285
    retval = kind, kind
286
  elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
287
    if not args:
288
      raise errors.OpPrereqError("no arguments passed to the command")
289
    name = args.pop(0)
290
    retval = kind, name
291
  else:
292
    raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
293
  return retval
294

    
295

    
296
def _ExtendTags(opts, args):
297
  """Extend the args if a source file has been given.
298

299
  This function will extend the tags with the contents of the file
300
  passed in the 'tags_source' attribute of the opts parameter. A file
301
  named '-' will be replaced by stdin.
302

303
  """
304
  fname = opts.tags_source
305
  if fname is None:
306
    return
307
  if fname == "-":
308
    new_fh = sys.stdin
309
  else:
310
    new_fh = open(fname, "r")
311
  new_data = []
312
  try:
313
    # we don't use the nice 'new_data = [line.strip() for line in fh]'
314
    # because of python bug 1633941
315
    while True:
316
      line = new_fh.readline()
317
      if not line:
318
        break
319
      new_data.append(line.strip())
320
  finally:
321
    new_fh.close()
322
  args.extend(new_data)
323

    
324

    
325
def ListTags(opts, args):
326
  """List the tags on a given object.
327

328
  This is a generic implementation that knows how to deal with all
329
  three cases of tag objects (cluster, node, instance). The opts
330
  argument is expected to contain a tag_type field denoting what
331
  object type we work on.
332

333
  """
334
  kind, name = _ExtractTagsObject(opts, args)
335
  cl = GetClient()
336
  result = cl.QueryTags(kind, name)
337
  result = list(result)
338
  result.sort()
339
  for tag in result:
340
    ToStdout(tag)
341

    
342

    
343
def AddTags(opts, args):
344
  """Add tags on a given object.
345

346
  This is a generic implementation that knows how to deal with all
347
  three cases of tag objects (cluster, node, instance). The opts
348
  argument is expected to contain a tag_type field denoting what
349
  object type we work on.
350

351
  """
352
  kind, name = _ExtractTagsObject(opts, args)
353
  _ExtendTags(opts, args)
354
  if not args:
355
    raise errors.OpPrereqError("No tags to be added")
356
  op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
357
  SubmitOpCode(op)
358

    
359

    
360
def RemoveTags(opts, args):
361
  """Remove tags from a given object.
362

363
  This is a generic implementation that knows how to deal with all
364
  three cases of tag objects (cluster, node, instance). The opts
365
  argument is expected to contain a tag_type field denoting what
366
  object type we work on.
367

368
  """
369
  kind, name = _ExtractTagsObject(opts, args)
370
  _ExtendTags(opts, args)
371
  if not args:
372
    raise errors.OpPrereqError("No tags to be removed")
373
  op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
374
  SubmitOpCode(op)
375

    
376

    
377
def check_unit(option, opt, value): # pylint: disable-msg=W0613
378
  """OptParsers custom converter for units.
379

380
  """
381
  try:
382
    return utils.ParseUnit(value)
383
  except errors.UnitParseError, err:
384
    raise OptionValueError("option %s: %s" % (opt, err))
385

    
386

    
387
def _SplitKeyVal(opt, data):
388
  """Convert a KeyVal string into a dict.
389

390
  This function will convert a key=val[,...] string into a dict. Empty
391
  values will be converted specially: keys which have the prefix 'no_'
392
  will have the value=False and the prefix stripped, the others will
393
  have value=True.
394

395
  @type opt: string
396
  @param opt: a string holding the option name for which we process the
397
      data, used in building error messages
398
  @type data: string
399
  @param data: a string of the format key=val,key=val,...
400
  @rtype: dict
401
  @return: {key=val, key=val}
402
  @raises errors.ParameterError: if there are duplicate keys
403

404
  """
405
  kv_dict = {}
406
  if data:
407
    for elem in utils.UnescapeAndSplit(data, sep=","):
408
      if "=" in elem:
409
        key, val = elem.split("=", 1)
410
      else:
411
        if elem.startswith(NO_PREFIX):
412
          key, val = elem[len(NO_PREFIX):], False
413
        elif elem.startswith(UN_PREFIX):
414
          key, val = elem[len(UN_PREFIX):], None
415
        else:
416
          key, val = elem, True
417
      if key in kv_dict:
418
        raise errors.ParameterError("Duplicate key '%s' in option %s" %
419
                                    (key, opt))
420
      kv_dict[key] = val
421
  return kv_dict
422

    
423

    
424
def check_ident_key_val(option, opt, value):  # pylint: disable-msg=W0613
425
  """Custom parser for ident:key=val,key=val options.
426

427
  This will store the parsed values as a tuple (ident, {key: val}). As such,
428
  multiple uses of this option via action=append is possible.
429

430
  """
431
  if ":" not in value:
432
    ident, rest = value, ''
433
  else:
434
    ident, rest = value.split(":", 1)
435

    
436
  if ident.startswith(NO_PREFIX):
437
    if rest:
438
      msg = "Cannot pass options when removing parameter groups: %s" % value
439
      raise errors.ParameterError(msg)
440
    retval = (ident[len(NO_PREFIX):], False)
441
  elif ident.startswith(UN_PREFIX):
442
    if rest:
443
      msg = "Cannot pass options when removing parameter groups: %s" % value
444
      raise errors.ParameterError(msg)
445
    retval = (ident[len(UN_PREFIX):], None)
446
  else:
447
    kv_dict = _SplitKeyVal(opt, rest)
448
    retval = (ident, kv_dict)
449
  return retval
450

    
451

    
452
def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
453
  """Custom parser class for key=val,key=val options.
454

455
  This will store the parsed values as a dict {key: val}.
456

457
  """
458
  return _SplitKeyVal(opt, value)
459

    
460

    
461
def check_bool(option, opt, value): # pylint: disable-msg=W0613
462
  """Custom parser for yes/no options.
463

464
  This will store the parsed value as either True or False.
465

466
  """
467
  value = value.lower()
468
  if value == constants.VALUE_FALSE or value == "no":
469
    return False
470
  elif value == constants.VALUE_TRUE or value == "yes":
471
    return True
472
  else:
473
    raise errors.ParameterError("Invalid boolean value '%s'" % value)
474

    
475

    
476
# completion_suggestion is normally a list. Using numeric values not evaluating
477
# to False for dynamic completion.
478
(OPT_COMPL_MANY_NODES,
479
 OPT_COMPL_ONE_NODE,
480
 OPT_COMPL_ONE_INSTANCE,
481
 OPT_COMPL_ONE_OS,
482
 OPT_COMPL_ONE_IALLOCATOR,
483
 OPT_COMPL_INST_ADD_NODES) = range(100, 106)
484

    
485
OPT_COMPL_ALL = frozenset([
486
  OPT_COMPL_MANY_NODES,
487
  OPT_COMPL_ONE_NODE,
488
  OPT_COMPL_ONE_INSTANCE,
489
  OPT_COMPL_ONE_OS,
490
  OPT_COMPL_ONE_IALLOCATOR,
491
  OPT_COMPL_INST_ADD_NODES,
492
  ])
493

    
494

    
495
class CliOption(Option):
496
  """Custom option class for optparse.
497

498
  """
499
  ATTRS = Option.ATTRS + [
500
    "completion_suggest",
501
    ]
502
  TYPES = Option.TYPES + (
503
    "identkeyval",
504
    "keyval",
505
    "unit",
506
    "bool",
507
    )
508
  TYPE_CHECKER = Option.TYPE_CHECKER.copy()
509
  TYPE_CHECKER["identkeyval"] = check_ident_key_val
510
  TYPE_CHECKER["keyval"] = check_key_val
511
  TYPE_CHECKER["unit"] = check_unit
512
  TYPE_CHECKER["bool"] = check_bool
513

    
514

    
515
# optparse.py sets make_option, so we do it for our own option class, too
516
cli_option = CliOption
517

    
518

    
519
_YORNO = "yes|no"
520

    
521
DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
522
                       help="Increase debugging level")
523

    
524
NOHDR_OPT = cli_option("--no-headers", default=False,
525
                       action="store_true", dest="no_headers",
526
                       help="Don't display column headers")
527

    
528
SEP_OPT = cli_option("--separator", default=None,
529
                     action="store", dest="separator",
530
                     help=("Separator between output fields"
531
                           " (defaults to one space)"))
532

    
533
USEUNITS_OPT = cli_option("--units", default=None,
534
                          dest="units", choices=('h', 'm', 'g', 't'),
535
                          help="Specify units for output (one of hmgt)")
536

    
537
FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
538
                        type="string", metavar="FIELDS",
539
                        help="Comma separated list of output fields")
540

    
541
FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
542
                       default=False, help="Force the operation")
543

    
544
CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
545
                         default=False, help="Do not require confirmation")
546

    
547
TAG_SRC_OPT = cli_option("--from", dest="tags_source",
548
                         default=None, help="File with tag names")
549

    
550
SUBMIT_OPT = cli_option("--submit", dest="submit_only",
551
                        default=False, action="store_true",
552
                        help=("Submit the job and return the job ID, but"
553
                              " don't wait for the job to finish"))
554

    
555
SYNC_OPT = cli_option("--sync", dest="do_locking",
556
                      default=False, action="store_true",
557
                      help=("Grab locks while doing the queries"
558
                            " in order to ensure more consistent results"))
559

    
560
_DRY_RUN_OPT = cli_option("--dry-run", default=False,
561
                          action="store_true",
562
                          help=("Do not execute the operation, just run the"
563
                                " check steps and verify it it could be"
564
                                " executed"))
565

    
566
VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
567
                         action="store_true",
568
                         help="Increase the verbosity of the operation")
569

    
570
DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
571
                              action="store_true", dest="simulate_errors",
572
                              help="Debugging option that makes the operation"
573
                              " treat most runtime checks as failed")
574

    
575
NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
576
                        default=True, action="store_false",
577
                        help="Don't wait for sync (DANGEROUS!)")
578

    
579
DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
580
                               help="Custom disk setup (diskless, file,"
581
                               " plain or drbd)",
582
                               default=None, metavar="TEMPL",
583
                               choices=list(constants.DISK_TEMPLATES))
584

    
585
NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
586
                        help="Do not create any network cards for"
587
                        " the instance")
588

    
589
FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
590
                               help="Relative path under default cluster-wide"
591
                               " file storage dir to store file-based disks",
592
                               default=None, metavar="<DIR>")
593

    
594
FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
595
                                  help="Driver to use for image files",
596
                                  default="loop", metavar="<DRIVER>",
597
                                  choices=list(constants.FILE_DRIVER))
598

    
599
IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
600
                            help="Select nodes for the instance automatically"
601
                            " using the <NAME> iallocator plugin",
602
                            default=None, type="string",
603
                            completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
604

    
605
OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
606
                    metavar="<os>",
607
                    completion_suggest=OPT_COMPL_ONE_OS)
608

    
609
FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
610
                               action="store_true", default=False,
611
                               help="Force an unknown variant")
612

    
613
NO_INSTALL_OPT = cli_option("--no-install", dest="no_install",
614
                            action="store_true", default=False,
615
                            help="Do not install the OS (will"
616
                            " enable no-start)")
617

    
618
BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
619
                         type="keyval", default={},
620
                         help="Backend parameters")
621

    
622
HVOPTS_OPT =  cli_option("-H", "--hypervisor-parameters", type="keyval",
623
                         default={}, dest="hvparams",
624
                         help="Hypervisor parameters")
625

    
626
HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
627
                            help="Hypervisor and hypervisor options, in the"
628
                            " format hypervisor:option=value,option=value,...",
629
                            default=None, type="identkeyval")
630

    
631
HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
632
                        help="Hypervisor and hypervisor options, in the"
633
                        " format hypervisor:option=value,option=value,...",
634
                        default=[], action="append", type="identkeyval")
635

    
636
NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
637
                           action="store_false",
638
                           help="Don't check that the instance's IP"
639
                           " is alive")
640

    
641
NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
642
                             default=True, action="store_false",
643
                             help="Don't check that the instance's name"
644
                             " is resolvable")
645

    
646
NET_OPT = cli_option("--net",
647
                     help="NIC parameters", default=[],
648
                     dest="nics", action="append", type="identkeyval")
649

    
650
DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
651
                      dest="disks", action="append", type="identkeyval")
652

    
653
DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
654
                         help="Comma-separated list of disks"
655
                         " indices to act on (e.g. 0,2) (optional,"
656
                         " defaults to all disks)")
657

    
658
OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
659
                         help="Enforces a single-disk configuration using the"
660
                         " given disk size, in MiB unless a suffix is used",
661
                         default=None, type="unit", metavar="<size>")
662

    
663
IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
664
                                dest="ignore_consistency",
665
                                action="store_true", default=False,
666
                                help="Ignore the consistency of the disks on"
667
                                " the secondary")
668

    
669
NONLIVE_OPT = cli_option("--non-live", dest="live",
670
                         default=True, action="store_false",
671
                         help="Do a non-live migration (this usually means"
672
                         " freeze the instance, save the state, transfer and"
673
                         " only then resume running on the secondary node)")
674

    
675
NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
676
                                help="Target node and optional secondary node",
677
                                metavar="<pnode>[:<snode>]",
678
                                completion_suggest=OPT_COMPL_INST_ADD_NODES)
679

    
680
NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
681
                           action="append", metavar="<node>",
682
                           help="Use only this node (can be used multiple"
683
                           " times, if not given defaults to all nodes)",
684
                           completion_suggest=OPT_COMPL_ONE_NODE)
685

    
686
SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
687
                             metavar="<node>",
688
                             completion_suggest=OPT_COMPL_ONE_NODE)
689

    
690
NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
691
                         action="store_false",
692
                         help="Don't start the instance after creation")
693

    
694
SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
695
                         action="store_true", default=False,
696
                         help="Show command instead of executing it")
697

    
698
CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
699
                         default=False, action="store_true",
700
                         help="Instead of performing the migration, try to"
701
                         " recover from a failed cleanup. This is safe"
702
                         " to run even if the instance is healthy, but it"
703
                         " will create extra replication traffic and "
704
                         " disrupt briefly the replication (like during the"
705
                         " migration")
706

    
707
STATIC_OPT = cli_option("-s", "--static", dest="static",
708
                        action="store_true", default=False,
709
                        help="Only show configuration data, not runtime data")
710

    
711
ALL_OPT = cli_option("--all", dest="show_all",
712
                     default=False, action="store_true",
713
                     help="Show info on all instances on the cluster."
714
                     " This can take a long time to run, use wisely")
715

    
716
SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
717
                           action="store_true", default=False,
718
                           help="Interactive OS reinstall, lists available"
719
                           " OS templates for selection")
720

    
721
IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
722
                                 action="store_true", default=False,
723
                                 help="Remove the instance from the cluster"
724
                                 " configuration even if there are failures"
725
                                 " during the removal process")
726

    
727
NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
728
                               help="Specifies the new secondary node",
729
                               metavar="NODE", default=None,
730
                               completion_suggest=OPT_COMPL_ONE_NODE)
731

    
732
ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
733
                            default=False, action="store_true",
734
                            help="Replace the disk(s) on the primary"
735
                            " node (only for the drbd template)")
736

    
737
ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
738
                              default=False, action="store_true",
739
                              help="Replace the disk(s) on the secondary"
740
                              " node (only for the drbd template)")
741

    
742
AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
743
                              default=False, action="store_true",
744
                              help="Lock all nodes and auto-promote as needed"
745
                              " to MC status")
746

    
747
AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
748
                              default=False, action="store_true",
749
                              help="Automatically replace faulty disks"
750
                              " (only for the drbd template)")
751

    
752
IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
753
                             default=False, action="store_true",
754
                             help="Ignore current recorded size"
755
                             " (useful for forcing activation when"
756
                             " the recorded size is wrong)")
757

    
758
SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
759
                          metavar="<node>",
760
                          completion_suggest=OPT_COMPL_ONE_NODE)
761

    
762
SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
763
                         metavar="<dir>")
764

    
765
SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
766
                              help="Specify the secondary ip for the node",
767
                              metavar="ADDRESS", default=None)
768

    
769
READD_OPT = cli_option("--readd", dest="readd",
770
                       default=False, action="store_true",
771
                       help="Readd old node after replacing it")
772

    
773
NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
774
                                default=True, action="store_false",
775
                                help="Disable SSH key fingerprint checking")
776

    
777

    
778
MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
779
                    type="bool", default=None, metavar=_YORNO,
780
                    help="Set the master_candidate flag on the node")
781

    
782
OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
783
                         type="bool", default=None,
784
                         help="Set the offline flag on the node")
785

    
786
DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
787
                         type="bool", default=None,
788
                         help="Set the drained flag on the node")
789

    
790
ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
791
                             type="bool", default=None, metavar=_YORNO,
792
                             help="Set the allocatable flag on a volume")
793

    
794
NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
795
                               help="Disable support for lvm based instances"
796
                               " (cluster-wide)",
797
                               action="store_false", default=True)
798

    
799
ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
800
                            dest="enabled_hypervisors",
801
                            help="Comma-separated list of hypervisors",
802
                            type="string", default=None)
803

    
804
NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
805
                            type="keyval", default={},
806
                            help="NIC parameters")
807

    
808
CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
809
                         dest="candidate_pool_size", type="int",
810
                         help="Set the candidate pool size")
811

    
812
VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name",
813
                         help="Enables LVM and specifies the volume group"
814
                         " name (cluster-wide) for disk allocation [xenvg]",
815
                         metavar="VG", default=None)
816

    
817
YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
818
                          help="Destroy cluster", action="store_true")
819

    
820
NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
821
                          help="Skip node agreement check (dangerous)",
822
                          action="store_true", default=False)
823

    
824
MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
825
                            help="Specify the mac prefix for the instance IP"
826
                            " addresses, in the format XX:XX:XX",
827
                            metavar="PREFIX",
828
                            default=None)
829

    
830
MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
831
                               help="Specify the node interface (cluster-wide)"
832
                               " on which the master IP address will be added "
833
                               " [%s]" % constants.DEFAULT_BRIDGE,
834
                               metavar="NETDEV",
835
                               default=constants.DEFAULT_BRIDGE)
836

    
837

    
838
GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
839
                                help="Specify the default directory (cluster-"
840
                                "wide) for storing the file-based disks [%s]" %
841
                                constants.DEFAULT_FILE_STORAGE_DIR,
842
                                metavar="DIR",
843
                                default=constants.DEFAULT_FILE_STORAGE_DIR)
844

    
845
NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
846
                                   help="Don't modify /etc/hosts",
847
                                   action="store_false", default=True)
848

    
849
NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
850
                                    help="Don't initialize SSH keys",
851
                                    action="store_false", default=True)
852

    
853
ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
854
                             help="Enable parseable error messages",
855
                             action="store_true", default=False)
856

    
857
NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
858
                          help="Skip N+1 memory redundancy tests",
859
                          action="store_true", default=False)
860

    
861
REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
862
                             help="Type of reboot: soft/hard/full",
863
                             default=constants.INSTANCE_REBOOT_HARD,
864
                             metavar="<REBOOT>",
865
                             choices=list(constants.REBOOT_TYPES))
866

    
867
IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
868
                                    dest="ignore_secondaries",
869
                                    default=False, action="store_true",
870
                                    help="Ignore errors from secondaries")
871

    
872
NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
873
                            action="store_false", default=True,
874
                            help="Don't shutdown the instance (unsafe)")
875

    
876
TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
877
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
878
                         help="Maximum time to wait")
879

    
880
SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
881
                         dest="shutdown_timeout", type="int",
882
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
883
                         help="Maximum time to wait for instance shutdown")
884

    
885
EARLY_RELEASE_OPT = cli_option("--early-release",
886
                               dest="early_release", default=False,
887
                               action="store_true",
888
                               help="Release the locks on the secondary"
889
                               " node(s) early")
890

    
891
NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
892
                                  dest="new_cluster_cert",
893
                                  default=False, action="store_true",
894
                                  help="Generate a new cluster certificate")
895

    
896
RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
897
                           default=None,
898
                           help="File containing new RAPI certificate")
899

    
900
NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
901
                               default=None, action="store_true",
902
                               help=("Generate a new self-signed RAPI"
903
                                     " certificate"))
904

    
905
NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
906
                                    dest="new_confd_hmac_key",
907
                                    default=False, action="store_true",
908
                                    help=("Create a new HMAC key for %s" %
909
                                          constants.CONFD))
910

    
911
USE_REPL_NET_OPT = cli_option("--use-replication-network",
912
                              dest="use_replication_network",
913
                              help="Whether to use the replication network"
914
                              " for talking to the nodes",
915
                              action="store_true", default=False)
916

    
917
MAINTAIN_NODE_HEALTH_OPT = \
918
    cli_option("--maintain-node-health", dest="maintain_node_health",
919
               metavar=_YORNO, default=None, type="bool",
920
               help="Configure the cluster to automatically maintain node"
921
               " health, by shutting down unknown instances, shutting down"
922
               " unknown DRBD devices, etc.")
923

    
924

    
925
def _ParseArgs(argv, commands, aliases):
926
  """Parser for the command line arguments.
927

928
  This function parses the arguments and returns the function which
929
  must be executed together with its (modified) arguments.
930

931
  @param argv: the command line
932
  @param commands: dictionary with special contents, see the design
933
      doc for cmdline handling
934
  @param aliases: dictionary with command aliases {'alias': 'target, ...}
935

936
  """
937
  if len(argv) == 0:
938
    binary = "<command>"
939
  else:
940
    binary = argv[0].split("/")[-1]
941

    
942
  if len(argv) > 1 and argv[1] == "--version":
943
    ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
944
    # Quit right away. That way we don't have to care about this special
945
    # argument. optparse.py does it the same.
946
    sys.exit(0)
947

    
948
  if len(argv) < 2 or not (argv[1] in commands or
949
                           argv[1] in aliases):
950
    # let's do a nice thing
951
    sortedcmds = commands.keys()
952
    sortedcmds.sort()
953

    
954
    ToStdout("Usage: %s {command} [options...] [argument...]", binary)
955
    ToStdout("%s <command> --help to see details, or man %s", binary, binary)
956
    ToStdout("")
957

    
958
    # compute the max line length for cmd + usage
959
    mlen = max([len(" %s" % cmd) for cmd in commands])
960
    mlen = min(60, mlen) # should not get here...
961

    
962
    # and format a nice command list
963
    ToStdout("Commands:")
964
    for cmd in sortedcmds:
965
      cmdstr = " %s" % (cmd,)
966
      help_text = commands[cmd][4]
967
      help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
968
      ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
969
      for line in help_lines:
970
        ToStdout("%-*s   %s", mlen, "", line)
971

    
972
    ToStdout("")
973

    
974
    return None, None, None
975

    
976
  # get command, unalias it, and look it up in commands
977
  cmd = argv.pop(1)
978
  if cmd in aliases:
979
    if cmd in commands:
980
      raise errors.ProgrammerError("Alias '%s' overrides an existing"
981
                                   " command" % cmd)
982

    
983
    if aliases[cmd] not in commands:
984
      raise errors.ProgrammerError("Alias '%s' maps to non-existing"
985
                                   " command '%s'" % (cmd, aliases[cmd]))
986

    
987
    cmd = aliases[cmd]
988

    
989
  func, args_def, parser_opts, usage, description = commands[cmd]
990
  parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT, DEBUG_OPT],
991
                        description=description,
992
                        formatter=TitledHelpFormatter(),
993
                        usage="%%prog %s %s" % (cmd, usage))
994
  parser.disable_interspersed_args()
995
  options, args = parser.parse_args()
996

    
997
  if not _CheckArguments(cmd, args_def, args):
998
    return None, None, None
999

    
1000
  return func, options, args
1001

    
1002

    
1003
def _CheckArguments(cmd, args_def, args):
1004
  """Verifies the arguments using the argument definition.
1005

1006
  Algorithm:
1007

1008
    1. Abort with error if values specified by user but none expected.
1009

1010
    1. For each argument in definition
1011

1012
      1. Keep running count of minimum number of values (min_count)
1013
      1. Keep running count of maximum number of values (max_count)
1014
      1. If it has an unlimited number of values
1015

1016
        1. Abort with error if it's not the last argument in the definition
1017

1018
    1. If last argument has limited number of values
1019

1020
      1. Abort with error if number of values doesn't match or is too large
1021

1022
    1. Abort with error if user didn't pass enough values (min_count)
1023

1024
  """
1025
  if args and not args_def:
1026
    ToStderr("Error: Command %s expects no arguments", cmd)
1027
    return False
1028

    
1029
  min_count = None
1030
  max_count = None
1031
  check_max = None
1032

    
1033
  last_idx = len(args_def) - 1
1034

    
1035
  for idx, arg in enumerate(args_def):
1036
    if min_count is None:
1037
      min_count = arg.min
1038
    elif arg.min is not None:
1039
      min_count += arg.min
1040

    
1041
    if max_count is None:
1042
      max_count = arg.max
1043
    elif arg.max is not None:
1044
      max_count += arg.max
1045

    
1046
    if idx == last_idx:
1047
      check_max = (arg.max is not None)
1048

    
1049
    elif arg.max is None:
1050
      raise errors.ProgrammerError("Only the last argument can have max=None")
1051

    
1052
  if check_max:
1053
    # Command with exact number of arguments
1054
    if (min_count is not None and max_count is not None and
1055
        min_count == max_count and len(args) != min_count):
1056
      ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
1057
      return False
1058

    
1059
    # Command with limited number of arguments
1060
    if max_count is not None and len(args) > max_count:
1061
      ToStderr("Error: Command %s expects only %d argument(s)",
1062
               cmd, max_count)
1063
      return False
1064

    
1065
  # Command with some required arguments
1066
  if min_count is not None and len(args) < min_count:
1067
    ToStderr("Error: Command %s expects at least %d argument(s)",
1068
             cmd, min_count)
1069
    return False
1070

    
1071
  return True
1072

    
1073

    
1074
def SplitNodeOption(value):
1075
  """Splits the value of a --node option.
1076

1077
  """
1078
  if value and ':' in value:
1079
    return value.split(':', 1)
1080
  else:
1081
    return (value, None)
1082

    
1083

    
1084
def CalculateOSNames(os_name, os_variants):
1085
  """Calculates all the names an OS can be called, according to its variants.
1086

1087
  @type os_name: string
1088
  @param os_name: base name of the os
1089
  @type os_variants: list or None
1090
  @param os_variants: list of supported variants
1091
  @rtype: list
1092
  @return: list of valid names
1093

1094
  """
1095
  if os_variants:
1096
    return ['%s+%s' % (os_name, v) for v in os_variants]
1097
  else:
1098
    return [os_name]
1099

    
1100

    
1101
def UsesRPC(fn):
1102
  def wrapper(*args, **kwargs):
1103
    rpc.Init()
1104
    try:
1105
      return fn(*args, **kwargs)
1106
    finally:
1107
      rpc.Shutdown()
1108
  return wrapper
1109

    
1110

    
1111
def AskUser(text, choices=None):
1112
  """Ask the user a question.
1113

1114
  @param text: the question to ask
1115

1116
  @param choices: list with elements tuples (input_char, return_value,
1117
      description); if not given, it will default to: [('y', True,
1118
      'Perform the operation'), ('n', False, 'Do no do the operation')];
1119
      note that the '?' char is reserved for help
1120

1121
  @return: one of the return values from the choices list; if input is
1122
      not possible (i.e. not running with a tty, we return the last
1123
      entry from the list
1124

1125
  """
1126
  if choices is None:
1127
    choices = [('y', True, 'Perform the operation'),
1128
               ('n', False, 'Do not perform the operation')]
1129
  if not choices or not isinstance(choices, list):
1130
    raise errors.ProgrammerError("Invalid choices argument to AskUser")
1131
  for entry in choices:
1132
    if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
1133
      raise errors.ProgrammerError("Invalid choices element to AskUser")
1134

    
1135
  answer = choices[-1][1]
1136
  new_text = []
1137
  for line in text.splitlines():
1138
    new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
1139
  text = "\n".join(new_text)
1140
  try:
1141
    f = file("/dev/tty", "a+")
1142
  except IOError:
1143
    return answer
1144
  try:
1145
    chars = [entry[0] for entry in choices]
1146
    chars[-1] = "[%s]" % chars[-1]
1147
    chars.append('?')
1148
    maps = dict([(entry[0], entry[1]) for entry in choices])
1149
    while True:
1150
      f.write(text)
1151
      f.write('\n')
1152
      f.write("/".join(chars))
1153
      f.write(": ")
1154
      line = f.readline(2).strip().lower()
1155
      if line in maps:
1156
        answer = maps[line]
1157
        break
1158
      elif line == '?':
1159
        for entry in choices:
1160
          f.write(" %s - %s\n" % (entry[0], entry[2]))
1161
        f.write("\n")
1162
        continue
1163
  finally:
1164
    f.close()
1165
  return answer
1166

    
1167

    
1168
class JobSubmittedException(Exception):
1169
  """Job was submitted, client should exit.
1170

1171
  This exception has one argument, the ID of the job that was
1172
  submitted. The handler should print this ID.
1173

1174
  This is not an error, just a structured way to exit from clients.
1175

1176
  """
1177

    
1178

    
1179
def SendJob(ops, cl=None):
1180
  """Function to submit an opcode without waiting for the results.
1181

1182
  @type ops: list
1183
  @param ops: list of opcodes
1184
  @type cl: luxi.Client
1185
  @param cl: the luxi client to use for communicating with the master;
1186
             if None, a new client will be created
1187

1188
  """
1189
  if cl is None:
1190
    cl = GetClient()
1191

    
1192
  job_id = cl.SubmitJob(ops)
1193

    
1194
  return job_id
1195

    
1196

    
1197
def PollJob(job_id, cl=None, feedback_fn=None):
1198
  """Function to poll for the result of a job.
1199

1200
  @type job_id: job identified
1201
  @param job_id: the job to poll for results
1202
  @type cl: luxi.Client
1203
  @param cl: the luxi client to use for communicating with the master;
1204
             if None, a new client will be created
1205

1206
  """
1207
  if cl is None:
1208
    cl = GetClient()
1209

    
1210
  prev_job_info = None
1211
  prev_logmsg_serial = None
1212

    
1213
  status = None
1214

    
1215
  notified_queued = False
1216
  notified_waitlock = False
1217

    
1218
  while True:
1219
    result = cl.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1220
                                     prev_logmsg_serial)
1221
    if not result:
1222
      # job not found, go away!
1223
      raise errors.JobLost("Job with id %s lost" % job_id)
1224
    elif result == constants.JOB_NOTCHANGED:
1225
      if status is not None and not callable(feedback_fn):
1226
        if status == constants.JOB_STATUS_QUEUED and not notified_queued:
1227
          ToStderr("Job %s is waiting in queue", job_id)
1228
          notified_queued = True
1229
        elif status == constants.JOB_STATUS_WAITLOCK and not notified_waitlock:
1230
          ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1231
          notified_waitlock = True
1232

    
1233
      # Wait again
1234
      continue
1235

    
1236
    # Split result, a tuple of (field values, log entries)
1237
    (job_info, log_entries) = result
1238
    (status, ) = job_info
1239

    
1240
    if log_entries:
1241
      for log_entry in log_entries:
1242
        (serial, timestamp, _, message) = log_entry
1243
        if callable(feedback_fn):
1244
          feedback_fn(log_entry[1:])
1245
        else:
1246
          encoded = utils.SafeEncode(message)
1247
          ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded)
1248
        prev_logmsg_serial = max(prev_logmsg_serial, serial)
1249

    
1250
    # TODO: Handle canceled and archived jobs
1251
    elif status in (constants.JOB_STATUS_SUCCESS,
1252
                    constants.JOB_STATUS_ERROR,
1253
                    constants.JOB_STATUS_CANCELING,
1254
                    constants.JOB_STATUS_CANCELED):
1255
      break
1256

    
1257
    prev_job_info = job_info
1258

    
1259
  jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1260
  if not jobs:
1261
    raise errors.JobLost("Job with id %s lost" % job_id)
1262

    
1263
  status, opstatus, result = jobs[0]
1264
  if status == constants.JOB_STATUS_SUCCESS:
1265
    return result
1266
  elif status in (constants.JOB_STATUS_CANCELING,
1267
                  constants.JOB_STATUS_CANCELED):
1268
    raise errors.OpExecError("Job was canceled")
1269
  else:
1270
    has_ok = False
1271
    for idx, (status, msg) in enumerate(zip(opstatus, result)):
1272
      if status == constants.OP_STATUS_SUCCESS:
1273
        has_ok = True
1274
      elif status == constants.OP_STATUS_ERROR:
1275
        errors.MaybeRaise(msg)
1276
        if has_ok:
1277
          raise errors.OpExecError("partial failure (opcode %d): %s" %
1278
                                   (idx, msg))
1279
        else:
1280
          raise errors.OpExecError(str(msg))
1281
    # default failure mode
1282
    raise errors.OpExecError(result)
1283

    
1284

    
1285
def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):
1286
  """Legacy function to submit an opcode.
1287

1288
  This is just a simple wrapper over the construction of the processor
1289
  instance. It should be extended to better handle feedback and
1290
  interaction functions.
1291

1292
  """
1293
  if cl is None:
1294
    cl = GetClient()
1295

    
1296
  SetGenericOpcodeOpts([op], opts)
1297

    
1298
  job_id = SendJob([op], cl)
1299

    
1300
  op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
1301

    
1302
  return op_results[0]
1303

    
1304

    
1305
def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1306
  """Wrapper around SubmitOpCode or SendJob.
1307

1308
  This function will decide, based on the 'opts' parameter, whether to
1309
  submit and wait for the result of the opcode (and return it), or
1310
  whether to just send the job and print its identifier. It is used in
1311
  order to simplify the implementation of the '--submit' option.
1312

1313
  It will also process the opcodes if we're sending the via SendJob
1314
  (otherwise SubmitOpCode does it).
1315

1316
  """
1317
  if opts and opts.submit_only:
1318
    job = [op]
1319
    SetGenericOpcodeOpts(job, opts)
1320
    job_id = SendJob(job, cl=cl)
1321
    raise JobSubmittedException(job_id)
1322
  else:
1323
    return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1324

    
1325

    
1326
def SetGenericOpcodeOpts(opcode_list, options):
1327
  """Processor for generic options.
1328

1329
  This function updates the given opcodes based on generic command
1330
  line options (like debug, dry-run, etc.).
1331

1332
  @param opcode_list: list of opcodes
1333
  @param options: command line options or None
1334
  @return: None (in-place modification)
1335

1336
  """
1337
  if not options:
1338
    return
1339
  for op in opcode_list:
1340
    op.dry_run = options.dry_run
1341
    op.debug_level = options.debug
1342

    
1343

    
1344
def GetClient():
1345
  # TODO: Cache object?
1346
  try:
1347
    client = luxi.Client()
1348
  except luxi.NoMasterError:
1349
    ss = ssconf.SimpleStore()
1350

    
1351
    # Try to read ssconf file
1352
    try:
1353
      ss.GetMasterNode()
1354
    except errors.ConfigurationError:
1355
      raise errors.OpPrereqError("Cluster not initialized or this machine is"
1356
                                 " not part of a cluster")
1357

    
1358
    master, myself = ssconf.GetMasterAndMyself(ss=ss)
1359
    if master != myself:
1360
      raise errors.OpPrereqError("This is not the master node, please connect"
1361
                                 " to node '%s' and rerun the command" %
1362
                                 master)
1363
    raise
1364
  return client
1365

    
1366

    
1367
def FormatError(err):
1368
  """Return a formatted error message for a given error.
1369

1370
  This function takes an exception instance and returns a tuple
1371
  consisting of two values: first, the recommended exit code, and
1372
  second, a string describing the error message (not
1373
  newline-terminated).
1374

1375
  """
1376
  retcode = 1
1377
  obuf = StringIO()
1378
  msg = str(err)
1379
  if isinstance(err, errors.ConfigurationError):
1380
    txt = "Corrupt configuration file: %s" % msg
1381
    logging.error(txt)
1382
    obuf.write(txt + "\n")
1383
    obuf.write("Aborting.")
1384
    retcode = 2
1385
  elif isinstance(err, errors.HooksAbort):
1386
    obuf.write("Failure: hooks execution failed:\n")
1387
    for node, script, out in err.args[0]:
1388
      if out:
1389
        obuf.write("  node: %s, script: %s, output: %s\n" %
1390
                   (node, script, out))
1391
      else:
1392
        obuf.write("  node: %s, script: %s (no output)\n" %
1393
                   (node, script))
1394
  elif isinstance(err, errors.HooksFailure):
1395
    obuf.write("Failure: hooks general failure: %s" % msg)
1396
  elif isinstance(err, errors.ResolverError):
1397
    this_host = utils.HostInfo.SysName()
1398
    if err.args[0] == this_host:
1399
      msg = "Failure: can't resolve my own hostname ('%s')"
1400
    else:
1401
      msg = "Failure: can't resolve hostname '%s'"
1402
    obuf.write(msg % err.args[0])
1403
  elif isinstance(err, errors.OpPrereqError):
1404
    if len(err.args) == 2:
1405
      obuf.write("Failure: prerequisites not met for this"
1406
               " operation:\nerror type: %s, error details:\n%s" %
1407
                 (err.args[1], err.args[0]))
1408
    else:
1409
      obuf.write("Failure: prerequisites not met for this"
1410
                 " operation:\n%s" % msg)
1411
  elif isinstance(err, errors.OpExecError):
1412
    obuf.write("Failure: command execution error:\n%s" % msg)
1413
  elif isinstance(err, errors.TagError):
1414
    obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1415
  elif isinstance(err, errors.JobQueueDrainError):
1416
    obuf.write("Failure: the job queue is marked for drain and doesn't"
1417
               " accept new requests\n")
1418
  elif isinstance(err, errors.JobQueueFull):
1419
    obuf.write("Failure: the job queue is full and doesn't accept new"
1420
               " job submissions until old jobs are archived\n")
1421
  elif isinstance(err, errors.TypeEnforcementError):
1422
    obuf.write("Parameter Error: %s" % msg)
1423
  elif isinstance(err, errors.ParameterError):
1424
    obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1425
  elif isinstance(err, errors.GenericError):
1426
    obuf.write("Unhandled Ganeti error: %s" % msg)
1427
  elif isinstance(err, luxi.NoMasterError):
1428
    obuf.write("Cannot communicate with the master daemon.\nIs it running"
1429
               " and listening for connections?")
1430
  elif isinstance(err, luxi.TimeoutError):
1431
    obuf.write("Timeout while talking to the master daemon. Error:\n"
1432
               "%s" % msg)
1433
  elif isinstance(err, luxi.ProtocolError):
1434
    obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1435
               "%s" % msg)
1436
  elif isinstance(err, JobSubmittedException):
1437
    obuf.write("JobID: %s\n" % err.args[0])
1438
    retcode = 0
1439
  else:
1440
    obuf.write("Unhandled exception: %s" % msg)
1441
  return retcode, obuf.getvalue().rstrip('\n')
1442

    
1443

    
1444
def GenericMain(commands, override=None, aliases=None):
1445
  """Generic main function for all the gnt-* commands.
1446

1447
  Arguments:
1448
    - commands: a dictionary with a special structure, see the design doc
1449
                for command line handling.
1450
    - override: if not None, we expect a dictionary with keys that will
1451
                override command line options; this can be used to pass
1452
                options from the scripts to generic functions
1453
    - aliases: dictionary with command aliases {'alias': 'target, ...}
1454

1455
  """
1456
  # save the program name and the entire command line for later logging
1457
  if sys.argv:
1458
    binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1459
    if len(sys.argv) >= 2:
1460
      binary += " " + sys.argv[1]
1461
      old_cmdline = " ".join(sys.argv[2:])
1462
    else:
1463
      old_cmdline = ""
1464
  else:
1465
    binary = "<unknown program>"
1466
    old_cmdline = ""
1467

    
1468
  if aliases is None:
1469
    aliases = {}
1470

    
1471
  try:
1472
    func, options, args = _ParseArgs(sys.argv, commands, aliases)
1473
  except errors.ParameterError, err:
1474
    result, err_msg = FormatError(err)
1475
    ToStderr(err_msg)
1476
    return 1
1477

    
1478
  if func is None: # parse error
1479
    return 1
1480

    
1481
  if override is not None:
1482
    for key, val in override.iteritems():
1483
      setattr(options, key, val)
1484

    
1485
  utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1486
                     stderr_logging=True, program=binary)
1487

    
1488
  if old_cmdline:
1489
    logging.info("run with arguments '%s'", old_cmdline)
1490
  else:
1491
    logging.info("run with no arguments")
1492

    
1493
  try:
1494
    result = func(options, args)
1495
  except (errors.GenericError, luxi.ProtocolError,
1496
          JobSubmittedException), err:
1497
    result, err_msg = FormatError(err)
1498
    logging.exception("Error during command processing")
1499
    ToStderr(err_msg)
1500

    
1501
  return result
1502

    
1503

    
1504
def GenericInstanceCreate(mode, opts, args):
1505
  """Add an instance to the cluster via either creation or import.
1506

1507
  @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1508
  @param opts: the command line options selected by the user
1509
  @type args: list
1510
  @param args: should contain only one element, the new instance name
1511
  @rtype: int
1512
  @return: the desired exit code
1513

1514
  """
1515
  instance = args[0]
1516

    
1517
  (pnode, snode) = SplitNodeOption(opts.node)
1518

    
1519
  hypervisor = None
1520
  hvparams = {}
1521
  if opts.hypervisor:
1522
    hypervisor, hvparams = opts.hypervisor
1523

    
1524
  if opts.nics:
1525
    try:
1526
      nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
1527
    except ValueError, err:
1528
      raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
1529
    nics = [{}] * nic_max
1530
    for nidx, ndict in opts.nics:
1531
      nidx = int(nidx)
1532
      if not isinstance(ndict, dict):
1533
        msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict)
1534
        raise errors.OpPrereqError(msg)
1535
      nics[nidx] = ndict
1536
  elif opts.no_nics:
1537
    # no nics
1538
    nics = []
1539
  elif mode == constants.INSTANCE_CREATE:
1540
    # default of one nic, all auto
1541
    nics = [{}]
1542
  else:
1543
    # mode == import
1544
    nics = []
1545

    
1546
  if opts.disk_template == constants.DT_DISKLESS:
1547
    if opts.disks or opts.sd_size is not None:
1548
      raise errors.OpPrereqError("Diskless instance but disk"
1549
                                 " information passed")
1550
    disks = []
1551
  else:
1552
    if (not opts.disks and not opts.sd_size
1553
        and mode == constants.INSTANCE_CREATE):
1554
      raise errors.OpPrereqError("No disk information specified")
1555
    if opts.disks and opts.sd_size is not None:
1556
      raise errors.OpPrereqError("Please use either the '--disk' or"
1557
                                 " '-s' option")
1558
    if opts.sd_size is not None:
1559
      opts.disks = [(0, {"size": opts.sd_size})]
1560

    
1561
    if opts.disks:
1562
      try:
1563
        disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1564
      except ValueError, err:
1565
        raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
1566
      disks = [{}] * disk_max
1567
    else:
1568
      disks = []
1569
    for didx, ddict in opts.disks:
1570
      didx = int(didx)
1571
      if not isinstance(ddict, dict):
1572
        msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1573
        raise errors.OpPrereqError(msg)
1574
      elif "size" in ddict:
1575
        if "adopt" in ddict:
1576
          raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1577
                                     " (disk %d)" % didx)
1578
        try:
1579
          ddict["size"] = utils.ParseUnit(ddict["size"])
1580
        except ValueError, err:
1581
          raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1582
                                     (didx, err))
1583
      elif "adopt" in ddict:
1584
        if mode == constants.INSTANCE_IMPORT:
1585
          raise errors.OpPrereqError("Disk adoption not allowed for instance"
1586
                                     " import")
1587
        ddict["size"] = 0
1588
      else:
1589
        raise errors.OpPrereqError("Missing size or adoption source for"
1590
                                   " disk %d" % didx)
1591
      disks[didx] = ddict
1592

    
1593
  utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
1594
  utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1595

    
1596
  if mode == constants.INSTANCE_CREATE:
1597
    start = opts.start
1598
    os_type = opts.os
1599
    src_node = None
1600
    src_path = None
1601
    no_install = opts.no_install
1602
  elif mode == constants.INSTANCE_IMPORT:
1603
    start = False
1604
    os_type = None
1605
    src_node = opts.src_node
1606
    src_path = opts.src_dir
1607
    no_install = None
1608
  else:
1609
    raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1610

    
1611
  op = opcodes.OpCreateInstance(instance_name=instance,
1612
                                disks=disks,
1613
                                disk_template=opts.disk_template,
1614
                                nics=nics,
1615
                                pnode=pnode, snode=snode,
1616
                                ip_check=opts.ip_check,
1617
                                name_check=opts.name_check,
1618
                                wait_for_sync=opts.wait_for_sync,
1619
                                file_storage_dir=opts.file_storage_dir,
1620
                                file_driver=opts.file_driver,
1621
                                iallocator=opts.iallocator,
1622
                                hypervisor=hypervisor,
1623
                                hvparams=hvparams,
1624
                                beparams=opts.beparams,
1625
                                mode=mode,
1626
                                start=start,
1627
                                os_type=os_type,
1628
                                src_node=src_node,
1629
                                src_path=src_path,
1630
                                no_install=no_install)
1631

    
1632
  SubmitOrSend(op, opts)
1633
  return 0
1634

    
1635

    
1636
class _RunWhileClusterStoppedHelper:
1637
  """Helper class for L{RunWhileClusterStopped} to simplify state management
1638

1639
  """
1640
  def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
1641
    """Initializes this class.
1642

1643
    @type feedback_fn: callable
1644
    @param feedback_fn: Feedback function
1645
    @type cluster_name: string
1646
    @param cluster_name: Cluster name
1647
    @type master_node: string
1648
    @param master_node Master node name
1649
    @type online_nodes: list
1650
    @param online_nodes: List of names of online nodes
1651

1652
    """
1653
    self.feedback_fn = feedback_fn
1654
    self.cluster_name = cluster_name
1655
    self.master_node = master_node
1656
    self.online_nodes = online_nodes
1657

    
1658
    self.ssh = ssh.SshRunner(self.cluster_name)
1659

    
1660
    self.nonmaster_nodes = [name for name in online_nodes
1661
                            if name != master_node]
1662

    
1663
    assert self.master_node not in self.nonmaster_nodes
1664

    
1665
  def _RunCmd(self, node_name, cmd):
1666
    """Runs a command on the local or a remote machine.
1667

1668
    @type node_name: string
1669
    @param node_name: Machine name
1670
    @type cmd: list
1671
    @param cmd: Command
1672

1673
    """
1674
    if node_name is None or node_name == self.master_node:
1675
      # No need to use SSH
1676
      result = utils.RunCmd(cmd)
1677
    else:
1678
      result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
1679

    
1680
    if result.failed:
1681
      errmsg = ["Failed to run command %s" % result.cmd]
1682
      if node_name:
1683
        errmsg.append("on node %s" % node_name)
1684
      errmsg.append(": exitcode %s and error %s" %
1685
                    (result.exit_code, result.output))
1686
      raise errors.OpExecError(" ".join(errmsg))
1687

    
1688
  def Call(self, fn, *args):
1689
    """Call function while all daemons are stopped.
1690

1691
    @type fn: callable
1692
    @param fn: Function to be called
1693

1694
    """
1695
    # Pause watcher by acquiring an exclusive lock on watcher state file
1696
    self.feedback_fn("Blocking watcher")
1697
    watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
1698
    try:
1699
      # TODO: Currently, this just blocks. There's no timeout.
1700
      # TODO: Should it be a shared lock?
1701
      watcher_block.Exclusive(blocking=True)
1702

    
1703
      # Stop master daemons, so that no new jobs can come in and all running
1704
      # ones are finished
1705
      self.feedback_fn("Stopping master daemons")
1706
      self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
1707
      try:
1708
        # Stop daemons on all nodes
1709
        for node_name in self.online_nodes:
1710
          self.feedback_fn("Stopping daemons on %s" % node_name)
1711
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
1712

    
1713
        # All daemons are shut down now
1714
        try:
1715
          return fn(self, *args)
1716
        except Exception, err:
1717
          _, errmsg = FormatError(err)
1718
          logging.exception("Caught exception")
1719
          self.feedback_fn(errmsg)
1720
          raise
1721
      finally:
1722
        # Start cluster again, master node last
1723
        for node_name in self.nonmaster_nodes + [self.master_node]:
1724
          self.feedback_fn("Starting daemons on %s" % node_name)
1725
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
1726
    finally:
1727
      # Resume watcher
1728
      watcher_block.Close()
1729

    
1730

    
1731
def RunWhileClusterStopped(feedback_fn, fn, *args):
1732
  """Calls a function while all cluster daemons are stopped.
1733

1734
  @type feedback_fn: callable
1735
  @param feedback_fn: Feedback function
1736
  @type fn: callable
1737
  @param fn: Function to be called when daemons are stopped
1738

1739
  """
1740
  feedback_fn("Gathering cluster information")
1741

    
1742
  # This ensures we're running on the master daemon
1743
  cl = GetClient()
1744

    
1745
  (cluster_name, master_node) = \
1746
    cl.QueryConfigValues(["cluster_name", "master_node"])
1747

    
1748
  online_nodes = GetOnlineNodes([], cl=cl)
1749

    
1750
  # Don't keep a reference to the client. The master daemon will go away.
1751
  del cl
1752

    
1753
  assert master_node in online_nodes
1754

    
1755
  return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
1756
                                       online_nodes).Call(fn, *args)
1757

    
1758

    
1759
def GenerateTable(headers, fields, separator, data,
1760
                  numfields=None, unitfields=None,
1761
                  units=None):
1762
  """Prints a table with headers and different fields.
1763

1764
  @type headers: dict
1765
  @param headers: dictionary mapping field names to headers for
1766
      the table
1767
  @type fields: list
1768
  @param fields: the field names corresponding to each row in
1769
      the data field
1770
  @param separator: the separator to be used; if this is None,
1771
      the default 'smart' algorithm is used which computes optimal
1772
      field width, otherwise just the separator is used between
1773
      each field
1774
  @type data: list
1775
  @param data: a list of lists, each sublist being one row to be output
1776
  @type numfields: list
1777
  @param numfields: a list with the fields that hold numeric
1778
      values and thus should be right-aligned
1779
  @type unitfields: list
1780
  @param unitfields: a list with the fields that hold numeric
1781
      values that should be formatted with the units field
1782
  @type units: string or None
1783
  @param units: the units we should use for formatting, or None for
1784
      automatic choice (human-readable for non-separator usage, otherwise
1785
      megabytes); this is a one-letter string
1786

1787
  """
1788
  if units is None:
1789
    if separator:
1790
      units = "m"
1791
    else:
1792
      units = "h"
1793

    
1794
  if numfields is None:
1795
    numfields = []
1796
  if unitfields is None:
1797
    unitfields = []
1798

    
1799
  numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
1800
  unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
1801

    
1802
  format_fields = []
1803
  for field in fields:
1804
    if headers and field not in headers:
1805
      # TODO: handle better unknown fields (either revert to old
1806
      # style of raising exception, or deal more intelligently with
1807
      # variable fields)
1808
      headers[field] = field
1809
    if separator is not None:
1810
      format_fields.append("%s")
1811
    elif numfields.Matches(field):
1812
      format_fields.append("%*s")
1813
    else:
1814
      format_fields.append("%-*s")
1815

    
1816
  if separator is None:
1817
    mlens = [0 for name in fields]
1818
    format = ' '.join(format_fields)
1819
  else:
1820
    format = separator.replace("%", "%%").join(format_fields)
1821

    
1822
  for row in data:
1823
    if row is None:
1824
      continue
1825
    for idx, val in enumerate(row):
1826
      if unitfields.Matches(fields[idx]):
1827
        try:
1828
          val = int(val)
1829
        except (TypeError, ValueError):
1830
          pass
1831
        else:
1832
          val = row[idx] = utils.FormatUnit(val, units)
1833
      val = row[idx] = str(val)
1834
      if separator is None:
1835
        mlens[idx] = max(mlens[idx], len(val))
1836

    
1837
  result = []
1838
  if headers:
1839
    args = []
1840
    for idx, name in enumerate(fields):
1841
      hdr = headers[name]
1842
      if separator is None:
1843
        mlens[idx] = max(mlens[idx], len(hdr))
1844
        args.append(mlens[idx])
1845
      args.append(hdr)
1846
    result.append(format % tuple(args))
1847

    
1848
  if separator is None:
1849
    assert len(mlens) == len(fields)
1850

    
1851
    if fields and not numfields.Matches(fields[-1]):
1852
      mlens[-1] = 0
1853

    
1854
  for line in data:
1855
    args = []
1856
    if line is None:
1857
      line = ['-' for _ in fields]
1858
    for idx in range(len(fields)):
1859
      if separator is None:
1860
        args.append(mlens[idx])
1861
      args.append(line[idx])
1862
    result.append(format % tuple(args))
1863

    
1864
  return result
1865

    
1866

    
1867
def FormatTimestamp(ts):
1868
  """Formats a given timestamp.
1869

1870
  @type ts: timestamp
1871
  @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
1872

1873
  @rtype: string
1874
  @return: a string with the formatted timestamp
1875

1876
  """
1877
  if not isinstance (ts, (tuple, list)) or len(ts) != 2:
1878
    return '?'
1879
  sec, usec = ts
1880
  return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
1881

    
1882

    
1883
def ParseTimespec(value):
1884
  """Parse a time specification.
1885

1886
  The following suffixed will be recognized:
1887

1888
    - s: seconds
1889
    - m: minutes
1890
    - h: hours
1891
    - d: day
1892
    - w: weeks
1893

1894
  Without any suffix, the value will be taken to be in seconds.
1895

1896
  """
1897
  value = str(value)
1898
  if not value:
1899
    raise errors.OpPrereqError("Empty time specification passed")
1900
  suffix_map = {
1901
    's': 1,
1902
    'm': 60,
1903
    'h': 3600,
1904
    'd': 86400,
1905
    'w': 604800,
1906
    }
1907
  if value[-1] not in suffix_map:
1908
    try:
1909
      value = int(value)
1910
    except (TypeError, ValueError):
1911
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1912
  else:
1913
    multiplier = suffix_map[value[-1]]
1914
    value = value[:-1]
1915
    if not value: # no data left after stripping the suffix
1916
      raise errors.OpPrereqError("Invalid time specification (only"
1917
                                 " suffix passed)")
1918
    try:
1919
      value = int(value) * multiplier
1920
    except (TypeError, ValueError):
1921
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1922
  return value
1923

    
1924

    
1925
def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
1926
                   filter_master=False):
1927
  """Returns the names of online nodes.
1928

1929
  This function will also log a warning on stderr with the names of
1930
  the online nodes.
1931

1932
  @param nodes: if not empty, use only this subset of nodes (minus the
1933
      offline ones)
1934
  @param cl: if not None, luxi client to use
1935
  @type nowarn: boolean
1936
  @param nowarn: by default, this function will output a note with the
1937
      offline nodes that are skipped; if this parameter is True the
1938
      note is not displayed
1939
  @type secondary_ips: boolean
1940
  @param secondary_ips: if True, return the secondary IPs instead of the
1941
      names, useful for doing network traffic over the replication interface
1942
      (if any)
1943
  @type filter_master: boolean
1944
  @param filter_master: if True, do not return the master node in the list
1945
      (useful in coordination with secondary_ips where we cannot check our
1946
      node name against the list)
1947

1948
  """
1949
  if cl is None:
1950
    cl = GetClient()
1951

    
1952
  if secondary_ips:
1953
    name_idx = 2
1954
  else:
1955
    name_idx = 0
1956

    
1957
  if filter_master:
1958
    master_node = cl.QueryConfigValues(["master_node"])[0]
1959
    filter_fn = lambda x: x != master_node
1960
  else:
1961
    filter_fn = lambda _: True
1962

    
1963
  result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"],
1964
                         use_locking=False)
1965
  offline = [row[0] for row in result if row[1]]
1966
  if offline and not nowarn:
1967
    ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
1968
  return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])]
1969

    
1970

    
1971
def _ToStream(stream, txt, *args):
1972
  """Write a message to a stream, bypassing the logging system
1973

1974
  @type stream: file object
1975
  @param stream: the file to which we should write
1976
  @type txt: str
1977
  @param txt: the message
1978

1979
  """
1980
  if args:
1981
    args = tuple(args)
1982
    stream.write(txt % args)
1983
  else:
1984
    stream.write(txt)
1985
  stream.write('\n')
1986
  stream.flush()
1987

    
1988

    
1989
def ToStdout(txt, *args):
1990
  """Write a message to stdout only, bypassing the logging system
1991

1992
  This is just a wrapper over _ToStream.
1993

1994
  @type txt: str
1995
  @param txt: the message
1996

1997
  """
1998
  _ToStream(sys.stdout, txt, *args)
1999

    
2000

    
2001
def ToStderr(txt, *args):
2002
  """Write a message to stderr only, bypassing the logging system
2003

2004
  This is just a wrapper over _ToStream.
2005

2006
  @type txt: str
2007
  @param txt: the message
2008

2009
  """
2010
  _ToStream(sys.stderr, txt, *args)
2011

    
2012

    
2013
class JobExecutor(object):
2014
  """Class which manages the submission and execution of multiple jobs.
2015

2016
  Note that instances of this class should not be reused between
2017
  GetResults() calls.
2018

2019
  """
2020
  def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
2021
    self.queue = []
2022
    if cl is None:
2023
      cl = GetClient()
2024
    self.cl = cl
2025
    self.verbose = verbose
2026
    self.jobs = []
2027
    self.opts = opts
2028
    self.feedback_fn = feedback_fn
2029

    
2030
  def QueueJob(self, name, *ops):
2031
    """Record a job for later submit.
2032

2033
    @type name: string
2034
    @param name: a description of the job, will be used in WaitJobSet
2035
    """
2036
    SetGenericOpcodeOpts(ops, self.opts)
2037
    self.queue.append((name, ops))
2038

    
2039
  def SubmitPending(self):
2040
    """Submit all pending jobs.
2041

2042
    """
2043
    results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
2044
    for (idx, ((status, data), (name, _))) in enumerate(zip(results,
2045
                                                            self.queue)):
2046
      self.jobs.append((idx, status, data, name))
2047

    
2048
  def _ChooseJob(self):
2049
    """Choose a non-waiting/queued job to poll next.
2050

2051
    """
2052
    assert self.jobs, "_ChooseJob called with empty job list"
2053

    
2054
    result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
2055
    assert result
2056

    
2057
    for job_data, status in zip(self.jobs, result):
2058
      if status[0] in (constants.JOB_STATUS_QUEUED,
2059
                    constants.JOB_STATUS_WAITLOCK,
2060
                    constants.JOB_STATUS_CANCELING):
2061
        # job is still waiting
2062
        continue
2063
      # good candidate found
2064
      self.jobs.remove(job_data)
2065
      return job_data
2066

    
2067
    # no job found
2068
    return self.jobs.pop(0)
2069

    
2070
  def GetResults(self):
2071
    """Wait for and return the results of all jobs.
2072

2073
    @rtype: list
2074
    @return: list of tuples (success, job results), in the same order
2075
        as the submitted jobs; if a job has failed, instead of the result
2076
        there will be the error message
2077

2078
    """
2079
    if not self.jobs:
2080
      self.SubmitPending()
2081
    results = []
2082
    if self.verbose:
2083
      ok_jobs = [row[2] for row in self.jobs if row[1]]
2084
      if ok_jobs:
2085
        ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2086

    
2087
    # first, remove any non-submitted jobs
2088
    self.jobs, failures = utils.partition(self.jobs, lambda x: x[1])
2089
    for idx, _, jid, name in failures:
2090
      ToStderr("Failed to submit job for %s: %s", name, jid)
2091
      results.append((idx, False, jid))
2092

    
2093
    while self.jobs:
2094
      (idx, _, jid, name) = self._ChooseJob()
2095
      ToStdout("Waiting for job %s for %s...", jid, name)
2096
      try:
2097
        job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2098
        success = True
2099
      except (errors.GenericError, luxi.ProtocolError), err:
2100
        _, job_result = FormatError(err)
2101
        success = False
2102
        # the error message will always be shown, verbose or not
2103
        ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
2104

    
2105
      results.append((idx, success, job_result))
2106

    
2107
    # sort based on the index, then drop it
2108
    results.sort()
2109
    results = [i[1:] for i in results]
2110

    
2111
    return results
2112

    
2113
  def WaitOrShow(self, wait):
2114
    """Wait for job results or only print the job IDs.
2115

2116
    @type wait: boolean
2117
    @param wait: whether to wait or not
2118

2119
    """
2120
    if wait:
2121
      return self.GetResults()
2122
    else:
2123
      if not self.jobs:
2124
        self.SubmitPending()
2125
      for _, status, result, name in self.jobs:
2126
        if status:
2127
          ToStdout("%s: %s", result, name)
2128
        else:
2129
          ToStderr("Failure for %s: %s", name, result)