Statistics
| Branch: | Tag: | Revision:

root / lib / cli.py @ e588764d

History | View | Annotate | Download (69.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module dealing with command line parsing"""
23

    
24

    
25
import sys
26
import textwrap
27
import os.path
28
import time
29
import logging
30
from cStringIO import StringIO
31

    
32
from ganeti import utils
33
from ganeti import errors
34
from ganeti import constants
35
from ganeti import opcodes
36
from ganeti import luxi
37
from ganeti import ssconf
38
from ganeti import rpc
39
from ganeti import ssh
40

    
41
from optparse import (OptionParser, TitledHelpFormatter,
42
                      Option, OptionValueError)
43

    
44

    
45
__all__ = [
46
  # Command line options
47
  "ALLOCATABLE_OPT",
48
  "ALL_OPT",
49
  "AUTO_PROMOTE_OPT",
50
  "AUTO_REPLACE_OPT",
51
  "BACKEND_OPT",
52
  "CLEANUP_OPT",
53
  "CONFIRM_OPT",
54
  "CP_SIZE_OPT",
55
  "DEBUG_OPT",
56
  "DEBUG_SIMERR_OPT",
57
  "DISKIDX_OPT",
58
  "DISK_OPT",
59
  "DISK_TEMPLATE_OPT",
60
  "DRAINED_OPT",
61
  "EARLY_RELEASE_OPT",
62
  "ENABLED_HV_OPT",
63
  "ERROR_CODES_OPT",
64
  "FIELDS_OPT",
65
  "FILESTORE_DIR_OPT",
66
  "FILESTORE_DRIVER_OPT",
67
  "FORCE_OPT",
68
  "FORCE_VARIANT_OPT",
69
  "GLOBAL_FILEDIR_OPT",
70
  "HVLIST_OPT",
71
  "HVOPTS_OPT",
72
  "HYPERVISOR_OPT",
73
  "IALLOCATOR_OPT",
74
  "IDENTIFY_DEFAULTS_OPT",
75
  "IGNORE_CONSIST_OPT",
76
  "IGNORE_FAILURES_OPT",
77
  "IGNORE_SECONDARIES_OPT",
78
  "IGNORE_SIZE_OPT",
79
  "MAC_PREFIX_OPT",
80
  "MAINTAIN_NODE_HEALTH_OPT",
81
  "MASTER_NETDEV_OPT",
82
  "MC_OPT",
83
  "NET_OPT",
84
  "NEW_CLUSTER_CERT_OPT",
85
  "NEW_CONFD_HMAC_KEY_OPT",
86
  "NEW_RAPI_CERT_OPT",
87
  "NEW_SECONDARY_OPT",
88
  "NIC_PARAMS_OPT",
89
  "NODE_LIST_OPT",
90
  "NODE_PLACEMENT_OPT",
91
  "NOHDR_OPT",
92
  "NOIPCHECK_OPT",
93
  "NO_INSTALL_OPT",
94
  "NONAMECHECK_OPT",
95
  "NOLVM_STORAGE_OPT",
96
  "NOMODIFY_ETCHOSTS_OPT",
97
  "NOMODIFY_SSH_SETUP_OPT",
98
  "NONICS_OPT",
99
  "NONLIVE_OPT",
100
  "NONPLUS1_OPT",
101
  "NOSHUTDOWN_OPT",
102
  "NOSTART_OPT",
103
  "NOSSH_KEYCHECK_OPT",
104
  "NOVOTING_OPT",
105
  "NWSYNC_OPT",
106
  "ON_PRIMARY_OPT",
107
  "ON_SECONDARY_OPT",
108
  "OFFLINE_OPT",
109
  "OS_OPT",
110
  "OS_SIZE_OPT",
111
  "RAPI_CERT_OPT",
112
  "READD_OPT",
113
  "REBOOT_TYPE_OPT",
114
  "SECONDARY_IP_OPT",
115
  "SELECT_OS_OPT",
116
  "SEP_OPT",
117
  "SHOWCMD_OPT",
118
  "SHUTDOWN_TIMEOUT_OPT",
119
  "SINGLE_NODE_OPT",
120
  "SRC_DIR_OPT",
121
  "SRC_NODE_OPT",
122
  "SUBMIT_OPT",
123
  "STATIC_OPT",
124
  "SYNC_OPT",
125
  "TAG_SRC_OPT",
126
  "TIMEOUT_OPT",
127
  "USEUNITS_OPT",
128
  "USE_REPL_NET_OPT",
129
  "VERBOSE_OPT",
130
  "VG_NAME_OPT",
131
  "YES_DOIT_OPT",
132
  # Generic functions for CLI programs
133
  "GenericMain",
134
  "GenericInstanceCreate",
135
  "GetClient",
136
  "GetOnlineNodes",
137
  "JobExecutor",
138
  "JobSubmittedException",
139
  "ParseTimespec",
140
  "RunWhileClusterStopped",
141
  "SubmitOpCode",
142
  "SubmitOrSend",
143
  "UsesRPC",
144
  # Formatting functions
145
  "ToStderr", "ToStdout",
146
  "FormatError",
147
  "GenerateTable",
148
  "AskUser",
149
  "FormatTimestamp",
150
  # Tags functions
151
  "ListTags",
152
  "AddTags",
153
  "RemoveTags",
154
  # command line options support infrastructure
155
  "ARGS_MANY_INSTANCES",
156
  "ARGS_MANY_NODES",
157
  "ARGS_NONE",
158
  "ARGS_ONE_INSTANCE",
159
  "ARGS_ONE_NODE",
160
  "ARGS_ONE_OS",
161
  "ArgChoice",
162
  "ArgCommand",
163
  "ArgFile",
164
  "ArgHost",
165
  "ArgInstance",
166
  "ArgJobId",
167
  "ArgNode",
168
  "ArgOs",
169
  "ArgSuggest",
170
  "ArgUnknown",
171
  "OPT_COMPL_INST_ADD_NODES",
172
  "OPT_COMPL_MANY_NODES",
173
  "OPT_COMPL_ONE_IALLOCATOR",
174
  "OPT_COMPL_ONE_INSTANCE",
175
  "OPT_COMPL_ONE_NODE",
176
  "OPT_COMPL_ONE_OS",
177
  "cli_option",
178
  "SplitNodeOption",
179
  "CalculateOSNames",
180
  ]
181

    
182
NO_PREFIX = "no_"
183
UN_PREFIX = "-"
184

    
185

    
186
class _Argument:
187
  def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
188
    self.min = min
189
    self.max = max
190

    
191
  def __repr__(self):
192
    return ("<%s min=%s max=%s>" %
193
            (self.__class__.__name__, self.min, self.max))
194

    
195

    
196
class ArgSuggest(_Argument):
197
  """Suggesting argument.
198

199
  Value can be any of the ones passed to the constructor.
200

201
  """
202
  # pylint: disable-msg=W0622
203
  def __init__(self, min=0, max=None, choices=None):
204
    _Argument.__init__(self, min=min, max=max)
205
    self.choices = choices
206

    
207
  def __repr__(self):
208
    return ("<%s min=%s max=%s choices=%r>" %
209
            (self.__class__.__name__, self.min, self.max, self.choices))
210

    
211

    
212
class ArgChoice(ArgSuggest):
213
  """Choice argument.
214

215
  Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
216
  but value must be one of the choices.
217

218
  """
219

    
220

    
221
class ArgUnknown(_Argument):
222
  """Unknown argument to program (e.g. determined at runtime).
223

224
  """
225

    
226

    
227
class ArgInstance(_Argument):
228
  """Instances argument.
229

230
  """
231

    
232

    
233
class ArgNode(_Argument):
234
  """Node argument.
235

236
  """
237

    
238
class ArgJobId(_Argument):
239
  """Job ID argument.
240

241
  """
242

    
243

    
244
class ArgFile(_Argument):
245
  """File path argument.
246

247
  """
248

    
249

    
250
class ArgCommand(_Argument):
251
  """Command argument.
252

253
  """
254

    
255

    
256
class ArgHost(_Argument):
257
  """Host argument.
258

259
  """
260

    
261

    
262
class ArgOs(_Argument):
263
  """OS argument.
264

265
  """
266

    
267

    
268
ARGS_NONE = []
269
ARGS_MANY_INSTANCES = [ArgInstance()]
270
ARGS_MANY_NODES = [ArgNode()]
271
ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
272
ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
273
ARGS_ONE_OS = [ArgOs(min=1, max=1)]
274

    
275

    
276
def _ExtractTagsObject(opts, args):
277
  """Extract the tag type object.
278

279
  Note that this function will modify its args parameter.
280

281
  """
282
  if not hasattr(opts, "tag_type"):
283
    raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
284
  kind = opts.tag_type
285
  if kind == constants.TAG_CLUSTER:
286
    retval = kind, kind
287
  elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
288
    if not args:
289
      raise errors.OpPrereqError("no arguments passed to the command")
290
    name = args.pop(0)
291
    retval = kind, name
292
  else:
293
    raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
294
  return retval
295

    
296

    
297
def _ExtendTags(opts, args):
298
  """Extend the args if a source file has been given.
299

300
  This function will extend the tags with the contents of the file
301
  passed in the 'tags_source' attribute of the opts parameter. A file
302
  named '-' will be replaced by stdin.
303

304
  """
305
  fname = opts.tags_source
306
  if fname is None:
307
    return
308
  if fname == "-":
309
    new_fh = sys.stdin
310
  else:
311
    new_fh = open(fname, "r")
312
  new_data = []
313
  try:
314
    # we don't use the nice 'new_data = [line.strip() for line in fh]'
315
    # because of python bug 1633941
316
    while True:
317
      line = new_fh.readline()
318
      if not line:
319
        break
320
      new_data.append(line.strip())
321
  finally:
322
    new_fh.close()
323
  args.extend(new_data)
324

    
325

    
326
def ListTags(opts, args):
327
  """List the tags on a given object.
328

329
  This is a generic implementation that knows how to deal with all
330
  three cases of tag objects (cluster, node, instance). The opts
331
  argument is expected to contain a tag_type field denoting what
332
  object type we work on.
333

334
  """
335
  kind, name = _ExtractTagsObject(opts, args)
336
  cl = GetClient()
337
  result = cl.QueryTags(kind, name)
338
  result = list(result)
339
  result.sort()
340
  for tag in result:
341
    ToStdout(tag)
342

    
343

    
344
def AddTags(opts, args):
345
  """Add tags on a given object.
346

347
  This is a generic implementation that knows how to deal with all
348
  three cases of tag objects (cluster, node, instance). The opts
349
  argument is expected to contain a tag_type field denoting what
350
  object type we work on.
351

352
  """
353
  kind, name = _ExtractTagsObject(opts, args)
354
  _ExtendTags(opts, args)
355
  if not args:
356
    raise errors.OpPrereqError("No tags to be added")
357
  op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
358
  SubmitOpCode(op)
359

    
360

    
361
def RemoveTags(opts, args):
362
  """Remove tags from a given object.
363

364
  This is a generic implementation that knows how to deal with all
365
  three cases of tag objects (cluster, node, instance). The opts
366
  argument is expected to contain a tag_type field denoting what
367
  object type we work on.
368

369
  """
370
  kind, name = _ExtractTagsObject(opts, args)
371
  _ExtendTags(opts, args)
372
  if not args:
373
    raise errors.OpPrereqError("No tags to be removed")
374
  op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
375
  SubmitOpCode(op)
376

    
377

    
378
def check_unit(option, opt, value): # pylint: disable-msg=W0613
379
  """OptParsers custom converter for units.
380

381
  """
382
  try:
383
    return utils.ParseUnit(value)
384
  except errors.UnitParseError, err:
385
    raise OptionValueError("option %s: %s" % (opt, err))
386

    
387

    
388
def _SplitKeyVal(opt, data):
389
  """Convert a KeyVal string into a dict.
390

391
  This function will convert a key=val[,...] string into a dict. Empty
392
  values will be converted specially: keys which have the prefix 'no_'
393
  will have the value=False and the prefix stripped, the others will
394
  have value=True.
395

396
  @type opt: string
397
  @param opt: a string holding the option name for which we process the
398
      data, used in building error messages
399
  @type data: string
400
  @param data: a string of the format key=val,key=val,...
401
  @rtype: dict
402
  @return: {key=val, key=val}
403
  @raises errors.ParameterError: if there are duplicate keys
404

405
  """
406
  kv_dict = {}
407
  if data:
408
    for elem in utils.UnescapeAndSplit(data, sep=","):
409
      if "=" in elem:
410
        key, val = elem.split("=", 1)
411
      else:
412
        if elem.startswith(NO_PREFIX):
413
          key, val = elem[len(NO_PREFIX):], False
414
        elif elem.startswith(UN_PREFIX):
415
          key, val = elem[len(UN_PREFIX):], None
416
        else:
417
          key, val = elem, True
418
      if key in kv_dict:
419
        raise errors.ParameterError("Duplicate key '%s' in option %s" %
420
                                    (key, opt))
421
      kv_dict[key] = val
422
  return kv_dict
423

    
424

    
425
def check_ident_key_val(option, opt, value):  # pylint: disable-msg=W0613
426
  """Custom parser for ident:key=val,key=val options.
427

428
  This will store the parsed values as a tuple (ident, {key: val}). As such,
429
  multiple uses of this option via action=append is possible.
430

431
  """
432
  if ":" not in value:
433
    ident, rest = value, ''
434
  else:
435
    ident, rest = value.split(":", 1)
436

    
437
  if ident.startswith(NO_PREFIX):
438
    if rest:
439
      msg = "Cannot pass options when removing parameter groups: %s" % value
440
      raise errors.ParameterError(msg)
441
    retval = (ident[len(NO_PREFIX):], False)
442
  elif ident.startswith(UN_PREFIX):
443
    if rest:
444
      msg = "Cannot pass options when removing parameter groups: %s" % value
445
      raise errors.ParameterError(msg)
446
    retval = (ident[len(UN_PREFIX):], None)
447
  else:
448
    kv_dict = _SplitKeyVal(opt, rest)
449
    retval = (ident, kv_dict)
450
  return retval
451

    
452

    
453
def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
454
  """Custom parser class for key=val,key=val options.
455

456
  This will store the parsed values as a dict {key: val}.
457

458
  """
459
  return _SplitKeyVal(opt, value)
460

    
461

    
462
def check_bool(option, opt, value): # pylint: disable-msg=W0613
463
  """Custom parser for yes/no options.
464

465
  This will store the parsed value as either True or False.
466

467
  """
468
  value = value.lower()
469
  if value == constants.VALUE_FALSE or value == "no":
470
    return False
471
  elif value == constants.VALUE_TRUE or value == "yes":
472
    return True
473
  else:
474
    raise errors.ParameterError("Invalid boolean value '%s'" % value)
475

    
476

    
477
# completion_suggestion is normally a list. Using numeric values not evaluating
478
# to False for dynamic completion.
479
(OPT_COMPL_MANY_NODES,
480
 OPT_COMPL_ONE_NODE,
481
 OPT_COMPL_ONE_INSTANCE,
482
 OPT_COMPL_ONE_OS,
483
 OPT_COMPL_ONE_IALLOCATOR,
484
 OPT_COMPL_INST_ADD_NODES) = range(100, 106)
485

    
486
OPT_COMPL_ALL = frozenset([
487
  OPT_COMPL_MANY_NODES,
488
  OPT_COMPL_ONE_NODE,
489
  OPT_COMPL_ONE_INSTANCE,
490
  OPT_COMPL_ONE_OS,
491
  OPT_COMPL_ONE_IALLOCATOR,
492
  OPT_COMPL_INST_ADD_NODES,
493
  ])
494

    
495

    
496
class CliOption(Option):
497
  """Custom option class for optparse.
498

499
  """
500
  ATTRS = Option.ATTRS + [
501
    "completion_suggest",
502
    ]
503
  TYPES = Option.TYPES + (
504
    "identkeyval",
505
    "keyval",
506
    "unit",
507
    "bool",
508
    )
509
  TYPE_CHECKER = Option.TYPE_CHECKER.copy()
510
  TYPE_CHECKER["identkeyval"] = check_ident_key_val
511
  TYPE_CHECKER["keyval"] = check_key_val
512
  TYPE_CHECKER["unit"] = check_unit
513
  TYPE_CHECKER["bool"] = check_bool
514

    
515

    
516
# optparse.py sets make_option, so we do it for our own option class, too
517
cli_option = CliOption
518

    
519

    
520
_YORNO = "yes|no"
521

    
522
DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
523
                       help="Increase debugging level")
524

    
525
NOHDR_OPT = cli_option("--no-headers", default=False,
526
                       action="store_true", dest="no_headers",
527
                       help="Don't display column headers")
528

    
529
SEP_OPT = cli_option("--separator", default=None,
530
                     action="store", dest="separator",
531
                     help=("Separator between output fields"
532
                           " (defaults to one space)"))
533

    
534
USEUNITS_OPT = cli_option("--units", default=None,
535
                          dest="units", choices=('h', 'm', 'g', 't'),
536
                          help="Specify units for output (one of hmgt)")
537

    
538
FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
539
                        type="string", metavar="FIELDS",
540
                        help="Comma separated list of output fields")
541

    
542
FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
543
                       default=False, help="Force the operation")
544

    
545
CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
546
                         default=False, help="Do not require confirmation")
547

    
548
TAG_SRC_OPT = cli_option("--from", dest="tags_source",
549
                         default=None, help="File with tag names")
550

    
551
SUBMIT_OPT = cli_option("--submit", dest="submit_only",
552
                        default=False, action="store_true",
553
                        help=("Submit the job and return the job ID, but"
554
                              " don't wait for the job to finish"))
555

    
556
SYNC_OPT = cli_option("--sync", dest="do_locking",
557
                      default=False, action="store_true",
558
                      help=("Grab locks while doing the queries"
559
                            " in order to ensure more consistent results"))
560

    
561
_DRY_RUN_OPT = cli_option("--dry-run", default=False,
562
                          action="store_true",
563
                          help=("Do not execute the operation, just run the"
564
                                " check steps and verify it it could be"
565
                                " executed"))
566

    
567
VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
568
                         action="store_true",
569
                         help="Increase the verbosity of the operation")
570

    
571
DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
572
                              action="store_true", dest="simulate_errors",
573
                              help="Debugging option that makes the operation"
574
                              " treat most runtime checks as failed")
575

    
576
NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
577
                        default=True, action="store_false",
578
                        help="Don't wait for sync (DANGEROUS!)")
579

    
580
DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
581
                               help="Custom disk setup (diskless, file,"
582
                               " plain or drbd)",
583
                               default=None, metavar="TEMPL",
584
                               choices=list(constants.DISK_TEMPLATES))
585

    
586
NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
587
                        help="Do not create any network cards for"
588
                        " the instance")
589

    
590
FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
591
                               help="Relative path under default cluster-wide"
592
                               " file storage dir to store file-based disks",
593
                               default=None, metavar="<DIR>")
594

    
595
FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
596
                                  help="Driver to use for image files",
597
                                  default="loop", metavar="<DRIVER>",
598
                                  choices=list(constants.FILE_DRIVER))
599

    
600
IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
601
                            help="Select nodes for the instance automatically"
602
                            " using the <NAME> iallocator plugin",
603
                            default=None, type="string",
604
                            completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
605

    
606
OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
607
                    metavar="<os>",
608
                    completion_suggest=OPT_COMPL_ONE_OS)
609

    
610
FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
611
                               action="store_true", default=False,
612
                               help="Force an unknown variant")
613

    
614
NO_INSTALL_OPT = cli_option("--no-install", dest="no_install",
615
                            action="store_true", default=False,
616
                            help="Do not install the OS (will"
617
                            " enable no-start)")
618

    
619
BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
620
                         type="keyval", default={},
621
                         help="Backend parameters")
622

    
623
HVOPTS_OPT =  cli_option("-H", "--hypervisor-parameters", type="keyval",
624
                         default={}, dest="hvparams",
625
                         help="Hypervisor parameters")
626

    
627
HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
628
                            help="Hypervisor and hypervisor options, in the"
629
                            " format hypervisor:option=value,option=value,...",
630
                            default=None, type="identkeyval")
631

    
632
HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
633
                        help="Hypervisor and hypervisor options, in the"
634
                        " format hypervisor:option=value,option=value,...",
635
                        default=[], action="append", type="identkeyval")
636

    
637
NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
638
                           action="store_false",
639
                           help="Don't check that the instance's IP"
640
                           " is alive")
641

    
642
NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
643
                             default=True, action="store_false",
644
                             help="Don't check that the instance's name"
645
                             " is resolvable")
646

    
647
NET_OPT = cli_option("--net",
648
                     help="NIC parameters", default=[],
649
                     dest="nics", action="append", type="identkeyval")
650

    
651
DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
652
                      dest="disks", action="append", type="identkeyval")
653

    
654
DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
655
                         help="Comma-separated list of disks"
656
                         " indices to act on (e.g. 0,2) (optional,"
657
                         " defaults to all disks)")
658

    
659
OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
660
                         help="Enforces a single-disk configuration using the"
661
                         " given disk size, in MiB unless a suffix is used",
662
                         default=None, type="unit", metavar="<size>")
663

    
664
IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
665
                                dest="ignore_consistency",
666
                                action="store_true", default=False,
667
                                help="Ignore the consistency of the disks on"
668
                                " the secondary")
669

    
670
NONLIVE_OPT = cli_option("--non-live", dest="live",
671
                         default=True, action="store_false",
672
                         help="Do a non-live migration (this usually means"
673
                         " freeze the instance, save the state, transfer and"
674
                         " only then resume running on the secondary node)")
675

    
676
NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
677
                                help="Target node and optional secondary node",
678
                                metavar="<pnode>[:<snode>]",
679
                                completion_suggest=OPT_COMPL_INST_ADD_NODES)
680

    
681
NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
682
                           action="append", metavar="<node>",
683
                           help="Use only this node (can be used multiple"
684
                           " times, if not given defaults to all nodes)",
685
                           completion_suggest=OPT_COMPL_ONE_NODE)
686

    
687
SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
688
                             metavar="<node>",
689
                             completion_suggest=OPT_COMPL_ONE_NODE)
690

    
691
NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
692
                         action="store_false",
693
                         help="Don't start the instance after creation")
694

    
695
SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
696
                         action="store_true", default=False,
697
                         help="Show command instead of executing it")
698

    
699
CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
700
                         default=False, action="store_true",
701
                         help="Instead of performing the migration, try to"
702
                         " recover from a failed cleanup. This is safe"
703
                         " to run even if the instance is healthy, but it"
704
                         " will create extra replication traffic and "
705
                         " disrupt briefly the replication (like during the"
706
                         " migration")
707

    
708
STATIC_OPT = cli_option("-s", "--static", dest="static",
709
                        action="store_true", default=False,
710
                        help="Only show configuration data, not runtime data")
711

    
712
ALL_OPT = cli_option("--all", dest="show_all",
713
                     default=False, action="store_true",
714
                     help="Show info on all instances on the cluster."
715
                     " This can take a long time to run, use wisely")
716

    
717
SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
718
                           action="store_true", default=False,
719
                           help="Interactive OS reinstall, lists available"
720
                           " OS templates for selection")
721

    
722
IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
723
                                 action="store_true", default=False,
724
                                 help="Remove the instance from the cluster"
725
                                 " configuration even if there are failures"
726
                                 " during the removal process")
727

    
728
NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
729
                               help="Specifies the new secondary node",
730
                               metavar="NODE", default=None,
731
                               completion_suggest=OPT_COMPL_ONE_NODE)
732

    
733
ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
734
                            default=False, action="store_true",
735
                            help="Replace the disk(s) on the primary"
736
                            " node (only for the drbd template)")
737

    
738
ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
739
                              default=False, action="store_true",
740
                              help="Replace the disk(s) on the secondary"
741
                              " node (only for the drbd template)")
742

    
743
AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
744
                              default=False, action="store_true",
745
                              help="Lock all nodes and auto-promote as needed"
746
                              " to MC status")
747

    
748
AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
749
                              default=False, action="store_true",
750
                              help="Automatically replace faulty disks"
751
                              " (only for the drbd template)")
752

    
753
IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
754
                             default=False, action="store_true",
755
                             help="Ignore current recorded size"
756
                             " (useful for forcing activation when"
757
                             " the recorded size is wrong)")
758

    
759
SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
760
                          metavar="<node>",
761
                          completion_suggest=OPT_COMPL_ONE_NODE)
762

    
763
SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
764
                         metavar="<dir>")
765

    
766
SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
767
                              help="Specify the secondary ip for the node",
768
                              metavar="ADDRESS", default=None)
769

    
770
READD_OPT = cli_option("--readd", dest="readd",
771
                       default=False, action="store_true",
772
                       help="Readd old node after replacing it")
773

    
774
NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
775
                                default=True, action="store_false",
776
                                help="Disable SSH key fingerprint checking")
777

    
778

    
779
MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
780
                    type="bool", default=None, metavar=_YORNO,
781
                    help="Set the master_candidate flag on the node")
782

    
783
OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
784
                         type="bool", default=None,
785
                         help="Set the offline flag on the node")
786

    
787
DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
788
                         type="bool", default=None,
789
                         help="Set the drained flag on the node")
790

    
791
ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
792
                             type="bool", default=None, metavar=_YORNO,
793
                             help="Set the allocatable flag on a volume")
794

    
795
NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
796
                               help="Disable support for lvm based instances"
797
                               " (cluster-wide)",
798
                               action="store_false", default=True)
799

    
800
ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
801
                            dest="enabled_hypervisors",
802
                            help="Comma-separated list of hypervisors",
803
                            type="string", default=None)
804

    
805
NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
806
                            type="keyval", default={},
807
                            help="NIC parameters")
808

    
809
CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
810
                         dest="candidate_pool_size", type="int",
811
                         help="Set the candidate pool size")
812

    
813
VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name",
814
                         help="Enables LVM and specifies the volume group"
815
                         " name (cluster-wide) for disk allocation [xenvg]",
816
                         metavar="VG", default=None)
817

    
818
YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
819
                          help="Destroy cluster", action="store_true")
820

    
821
NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
822
                          help="Skip node agreement check (dangerous)",
823
                          action="store_true", default=False)
824

    
825
MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
826
                            help="Specify the mac prefix for the instance IP"
827
                            " addresses, in the format XX:XX:XX",
828
                            metavar="PREFIX",
829
                            default=None)
830

    
831
MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
832
                               help="Specify the node interface (cluster-wide)"
833
                               " on which the master IP address will be added "
834
                               " [%s]" % constants.DEFAULT_BRIDGE,
835
                               metavar="NETDEV",
836
                               default=constants.DEFAULT_BRIDGE)
837

    
838

    
839
GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
840
                                help="Specify the default directory (cluster-"
841
                                "wide) for storing the file-based disks [%s]" %
842
                                constants.DEFAULT_FILE_STORAGE_DIR,
843
                                metavar="DIR",
844
                                default=constants.DEFAULT_FILE_STORAGE_DIR)
845

    
846
NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
847
                                   help="Don't modify /etc/hosts",
848
                                   action="store_false", default=True)
849

    
850
NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
851
                                    help="Don't initialize SSH keys",
852
                                    action="store_false", default=True)
853

    
854
ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
855
                             help="Enable parseable error messages",
856
                             action="store_true", default=False)
857

    
858
NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
859
                          help="Skip N+1 memory redundancy tests",
860
                          action="store_true", default=False)
861

    
862
REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
863
                             help="Type of reboot: soft/hard/full",
864
                             default=constants.INSTANCE_REBOOT_HARD,
865
                             metavar="<REBOOT>",
866
                             choices=list(constants.REBOOT_TYPES))
867

    
868
IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
869
                                    dest="ignore_secondaries",
870
                                    default=False, action="store_true",
871
                                    help="Ignore errors from secondaries")
872

    
873
NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
874
                            action="store_false", default=True,
875
                            help="Don't shutdown the instance (unsafe)")
876

    
877
TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
878
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
879
                         help="Maximum time to wait")
880

    
881
SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
882
                         dest="shutdown_timeout", type="int",
883
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
884
                         help="Maximum time to wait for instance shutdown")
885

    
886
EARLY_RELEASE_OPT = cli_option("--early-release",
887
                               dest="early_release", default=False,
888
                               action="store_true",
889
                               help="Release the locks on the secondary"
890
                               " node(s) early")
891

    
892
NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
893
                                  dest="new_cluster_cert",
894
                                  default=False, action="store_true",
895
                                  help="Generate a new cluster certificate")
896

    
897
RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
898
                           default=None,
899
                           help="File containing new RAPI certificate")
900

    
901
NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
902
                               default=None, action="store_true",
903
                               help=("Generate a new self-signed RAPI"
904
                                     " certificate"))
905

    
906
NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
907
                                    dest="new_confd_hmac_key",
908
                                    default=False, action="store_true",
909
                                    help=("Create a new HMAC key for %s" %
910
                                          constants.CONFD))
911

    
912
USE_REPL_NET_OPT = cli_option("--use-replication-network",
913
                              dest="use_replication_network",
914
                              help="Whether to use the replication network"
915
                              " for talking to the nodes",
916
                              action="store_true", default=False)
917

    
918
MAINTAIN_NODE_HEALTH_OPT = \
919
    cli_option("--maintain-node-health", dest="maintain_node_health",
920
               metavar=_YORNO, default=None, type="bool",
921
               help="Configure the cluster to automatically maintain node"
922
               " health, by shutting down unknown instances, shutting down"
923
               " unknown DRBD devices, etc.")
924

    
925
IDENTIFY_DEFAULTS_OPT = \
926
    cli_option("--identify-defaults", dest="identify_defaults",
927
               default=False, action="store_true",
928
               help="Identify which saved instance parameters are equal to"
929
               " the current cluster defaults and set them as such, instead"
930
               " of marking them as overridden")
931

    
932

    
933
def _ParseArgs(argv, commands, aliases):
934
  """Parser for the command line arguments.
935

936
  This function parses the arguments and returns the function which
937
  must be executed together with its (modified) arguments.
938

939
  @param argv: the command line
940
  @param commands: dictionary with special contents, see the design
941
      doc for cmdline handling
942
  @param aliases: dictionary with command aliases {'alias': 'target, ...}
943

944
  """
945
  if len(argv) == 0:
946
    binary = "<command>"
947
  else:
948
    binary = argv[0].split("/")[-1]
949

    
950
  if len(argv) > 1 and argv[1] == "--version":
951
    ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
952
    # Quit right away. That way we don't have to care about this special
953
    # argument. optparse.py does it the same.
954
    sys.exit(0)
955

    
956
  if len(argv) < 2 or not (argv[1] in commands or
957
                           argv[1] in aliases):
958
    # let's do a nice thing
959
    sortedcmds = commands.keys()
960
    sortedcmds.sort()
961

    
962
    ToStdout("Usage: %s {command} [options...] [argument...]", binary)
963
    ToStdout("%s <command> --help to see details, or man %s", binary, binary)
964
    ToStdout("")
965

    
966
    # compute the max line length for cmd + usage
967
    mlen = max([len(" %s" % cmd) for cmd in commands])
968
    mlen = min(60, mlen) # should not get here...
969

    
970
    # and format a nice command list
971
    ToStdout("Commands:")
972
    for cmd in sortedcmds:
973
      cmdstr = " %s" % (cmd,)
974
      help_text = commands[cmd][4]
975
      help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
976
      ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
977
      for line in help_lines:
978
        ToStdout("%-*s   %s", mlen, "", line)
979

    
980
    ToStdout("")
981

    
982
    return None, None, None
983

    
984
  # get command, unalias it, and look it up in commands
985
  cmd = argv.pop(1)
986
  if cmd in aliases:
987
    if cmd in commands:
988
      raise errors.ProgrammerError("Alias '%s' overrides an existing"
989
                                   " command" % cmd)
990

    
991
    if aliases[cmd] not in commands:
992
      raise errors.ProgrammerError("Alias '%s' maps to non-existing"
993
                                   " command '%s'" % (cmd, aliases[cmd]))
994

    
995
    cmd = aliases[cmd]
996

    
997
  func, args_def, parser_opts, usage, description = commands[cmd]
998
  parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT, DEBUG_OPT],
999
                        description=description,
1000
                        formatter=TitledHelpFormatter(),
1001
                        usage="%%prog %s %s" % (cmd, usage))
1002
  parser.disable_interspersed_args()
1003
  options, args = parser.parse_args()
1004

    
1005
  if not _CheckArguments(cmd, args_def, args):
1006
    return None, None, None
1007

    
1008
  return func, options, args
1009

    
1010

    
1011
def _CheckArguments(cmd, args_def, args):
1012
  """Verifies the arguments using the argument definition.
1013

1014
  Algorithm:
1015

1016
    1. Abort with error if values specified by user but none expected.
1017

1018
    1. For each argument in definition
1019

1020
      1. Keep running count of minimum number of values (min_count)
1021
      1. Keep running count of maximum number of values (max_count)
1022
      1. If it has an unlimited number of values
1023

1024
        1. Abort with error if it's not the last argument in the definition
1025

1026
    1. If last argument has limited number of values
1027

1028
      1. Abort with error if number of values doesn't match or is too large
1029

1030
    1. Abort with error if user didn't pass enough values (min_count)
1031

1032
  """
1033
  if args and not args_def:
1034
    ToStderr("Error: Command %s expects no arguments", cmd)
1035
    return False
1036

    
1037
  min_count = None
1038
  max_count = None
1039
  check_max = None
1040

    
1041
  last_idx = len(args_def) - 1
1042

    
1043
  for idx, arg in enumerate(args_def):
1044
    if min_count is None:
1045
      min_count = arg.min
1046
    elif arg.min is not None:
1047
      min_count += arg.min
1048

    
1049
    if max_count is None:
1050
      max_count = arg.max
1051
    elif arg.max is not None:
1052
      max_count += arg.max
1053

    
1054
    if idx == last_idx:
1055
      check_max = (arg.max is not None)
1056

    
1057
    elif arg.max is None:
1058
      raise errors.ProgrammerError("Only the last argument can have max=None")
1059

    
1060
  if check_max:
1061
    # Command with exact number of arguments
1062
    if (min_count is not None and max_count is not None and
1063
        min_count == max_count and len(args) != min_count):
1064
      ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
1065
      return False
1066

    
1067
    # Command with limited number of arguments
1068
    if max_count is not None and len(args) > max_count:
1069
      ToStderr("Error: Command %s expects only %d argument(s)",
1070
               cmd, max_count)
1071
      return False
1072

    
1073
  # Command with some required arguments
1074
  if min_count is not None and len(args) < min_count:
1075
    ToStderr("Error: Command %s expects at least %d argument(s)",
1076
             cmd, min_count)
1077
    return False
1078

    
1079
  return True
1080

    
1081

    
1082
def SplitNodeOption(value):
1083
  """Splits the value of a --node option.
1084

1085
  """
1086
  if value and ':' in value:
1087
    return value.split(':', 1)
1088
  else:
1089
    return (value, None)
1090

    
1091

    
1092
def CalculateOSNames(os_name, os_variants):
1093
  """Calculates all the names an OS can be called, according to its variants.
1094

1095
  @type os_name: string
1096
  @param os_name: base name of the os
1097
  @type os_variants: list or None
1098
  @param os_variants: list of supported variants
1099
  @rtype: list
1100
  @return: list of valid names
1101

1102
  """
1103
  if os_variants:
1104
    return ['%s+%s' % (os_name, v) for v in os_variants]
1105
  else:
1106
    return [os_name]
1107

    
1108

    
1109
def UsesRPC(fn):
1110
  def wrapper(*args, **kwargs):
1111
    rpc.Init()
1112
    try:
1113
      return fn(*args, **kwargs)
1114
    finally:
1115
      rpc.Shutdown()
1116
  return wrapper
1117

    
1118

    
1119
def AskUser(text, choices=None):
1120
  """Ask the user a question.
1121

1122
  @param text: the question to ask
1123

1124
  @param choices: list with elements tuples (input_char, return_value,
1125
      description); if not given, it will default to: [('y', True,
1126
      'Perform the operation'), ('n', False, 'Do no do the operation')];
1127
      note that the '?' char is reserved for help
1128

1129
  @return: one of the return values from the choices list; if input is
1130
      not possible (i.e. not running with a tty, we return the last
1131
      entry from the list
1132

1133
  """
1134
  if choices is None:
1135
    choices = [('y', True, 'Perform the operation'),
1136
               ('n', False, 'Do not perform the operation')]
1137
  if not choices or not isinstance(choices, list):
1138
    raise errors.ProgrammerError("Invalid choices argument to AskUser")
1139
  for entry in choices:
1140
    if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
1141
      raise errors.ProgrammerError("Invalid choices element to AskUser")
1142

    
1143
  answer = choices[-1][1]
1144
  new_text = []
1145
  for line in text.splitlines():
1146
    new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
1147
  text = "\n".join(new_text)
1148
  try:
1149
    f = file("/dev/tty", "a+")
1150
  except IOError:
1151
    return answer
1152
  try:
1153
    chars = [entry[0] for entry in choices]
1154
    chars[-1] = "[%s]" % chars[-1]
1155
    chars.append('?')
1156
    maps = dict([(entry[0], entry[1]) for entry in choices])
1157
    while True:
1158
      f.write(text)
1159
      f.write('\n')
1160
      f.write("/".join(chars))
1161
      f.write(": ")
1162
      line = f.readline(2).strip().lower()
1163
      if line in maps:
1164
        answer = maps[line]
1165
        break
1166
      elif line == '?':
1167
        for entry in choices:
1168
          f.write(" %s - %s\n" % (entry[0], entry[2]))
1169
        f.write("\n")
1170
        continue
1171
  finally:
1172
    f.close()
1173
  return answer
1174

    
1175

    
1176
class JobSubmittedException(Exception):
1177
  """Job was submitted, client should exit.
1178

1179
  This exception has one argument, the ID of the job that was
1180
  submitted. The handler should print this ID.
1181

1182
  This is not an error, just a structured way to exit from clients.
1183

1184
  """
1185

    
1186

    
1187
def SendJob(ops, cl=None):
1188
  """Function to submit an opcode without waiting for the results.
1189

1190
  @type ops: list
1191
  @param ops: list of opcodes
1192
  @type cl: luxi.Client
1193
  @param cl: the luxi client to use for communicating with the master;
1194
             if None, a new client will be created
1195

1196
  """
1197
  if cl is None:
1198
    cl = GetClient()
1199

    
1200
  job_id = cl.SubmitJob(ops)
1201

    
1202
  return job_id
1203

    
1204

    
1205
def PollJob(job_id, cl=None, feedback_fn=None):
1206
  """Function to poll for the result of a job.
1207

1208
  @type job_id: job identified
1209
  @param job_id: the job to poll for results
1210
  @type cl: luxi.Client
1211
  @param cl: the luxi client to use for communicating with the master;
1212
             if None, a new client will be created
1213

1214
  """
1215
  if cl is None:
1216
    cl = GetClient()
1217

    
1218
  prev_job_info = None
1219
  prev_logmsg_serial = None
1220

    
1221
  status = None
1222

    
1223
  notified_queued = False
1224
  notified_waitlock = False
1225

    
1226
  while True:
1227
    result = cl.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1228
                                     prev_logmsg_serial)
1229
    if not result:
1230
      # job not found, go away!
1231
      raise errors.JobLost("Job with id %s lost" % job_id)
1232
    elif result == constants.JOB_NOTCHANGED:
1233
      if status is not None and not callable(feedback_fn):
1234
        if status == constants.JOB_STATUS_QUEUED and not notified_queued:
1235
          ToStderr("Job %s is waiting in queue", job_id)
1236
          notified_queued = True
1237
        elif status == constants.JOB_STATUS_WAITLOCK and not notified_waitlock:
1238
          ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1239
          notified_waitlock = True
1240

    
1241
      # Wait again
1242
      continue
1243

    
1244
    # Split result, a tuple of (field values, log entries)
1245
    (job_info, log_entries) = result
1246
    (status, ) = job_info
1247

    
1248
    if log_entries:
1249
      for log_entry in log_entries:
1250
        (serial, timestamp, _, message) = log_entry
1251
        if callable(feedback_fn):
1252
          feedback_fn(log_entry[1:])
1253
        else:
1254
          encoded = utils.SafeEncode(message)
1255
          ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded)
1256
        prev_logmsg_serial = max(prev_logmsg_serial, serial)
1257

    
1258
    # TODO: Handle canceled and archived jobs
1259
    elif status in (constants.JOB_STATUS_SUCCESS,
1260
                    constants.JOB_STATUS_ERROR,
1261
                    constants.JOB_STATUS_CANCELING,
1262
                    constants.JOB_STATUS_CANCELED):
1263
      break
1264

    
1265
    prev_job_info = job_info
1266

    
1267
  jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1268
  if not jobs:
1269
    raise errors.JobLost("Job with id %s lost" % job_id)
1270

    
1271
  status, opstatus, result = jobs[0]
1272
  if status == constants.JOB_STATUS_SUCCESS:
1273
    return result
1274
  elif status in (constants.JOB_STATUS_CANCELING,
1275
                  constants.JOB_STATUS_CANCELED):
1276
    raise errors.OpExecError("Job was canceled")
1277
  else:
1278
    has_ok = False
1279
    for idx, (status, msg) in enumerate(zip(opstatus, result)):
1280
      if status == constants.OP_STATUS_SUCCESS:
1281
        has_ok = True
1282
      elif status == constants.OP_STATUS_ERROR:
1283
        errors.MaybeRaise(msg)
1284
        if has_ok:
1285
          raise errors.OpExecError("partial failure (opcode %d): %s" %
1286
                                   (idx, msg))
1287
        else:
1288
          raise errors.OpExecError(str(msg))
1289
    # default failure mode
1290
    raise errors.OpExecError(result)
1291

    
1292

    
1293
def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):
1294
  """Legacy function to submit an opcode.
1295

1296
  This is just a simple wrapper over the construction of the processor
1297
  instance. It should be extended to better handle feedback and
1298
  interaction functions.
1299

1300
  """
1301
  if cl is None:
1302
    cl = GetClient()
1303

    
1304
  SetGenericOpcodeOpts([op], opts)
1305

    
1306
  job_id = SendJob([op], cl)
1307

    
1308
  op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
1309

    
1310
  return op_results[0]
1311

    
1312

    
1313
def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1314
  """Wrapper around SubmitOpCode or SendJob.
1315

1316
  This function will decide, based on the 'opts' parameter, whether to
1317
  submit and wait for the result of the opcode (and return it), or
1318
  whether to just send the job and print its identifier. It is used in
1319
  order to simplify the implementation of the '--submit' option.
1320

1321
  It will also process the opcodes if we're sending the via SendJob
1322
  (otherwise SubmitOpCode does it).
1323

1324
  """
1325
  if opts and opts.submit_only:
1326
    job = [op]
1327
    SetGenericOpcodeOpts(job, opts)
1328
    job_id = SendJob(job, cl=cl)
1329
    raise JobSubmittedException(job_id)
1330
  else:
1331
    return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1332

    
1333

    
1334
def SetGenericOpcodeOpts(opcode_list, options):
1335
  """Processor for generic options.
1336

1337
  This function updates the given opcodes based on generic command
1338
  line options (like debug, dry-run, etc.).
1339

1340
  @param opcode_list: list of opcodes
1341
  @param options: command line options or None
1342
  @return: None (in-place modification)
1343

1344
  """
1345
  if not options:
1346
    return
1347
  for op in opcode_list:
1348
    op.dry_run = options.dry_run
1349
    op.debug_level = options.debug
1350

    
1351

    
1352
def GetClient():
1353
  # TODO: Cache object?
1354
  try:
1355
    client = luxi.Client()
1356
  except luxi.NoMasterError:
1357
    ss = ssconf.SimpleStore()
1358

    
1359
    # Try to read ssconf file
1360
    try:
1361
      ss.GetMasterNode()
1362
    except errors.ConfigurationError:
1363
      raise errors.OpPrereqError("Cluster not initialized or this machine is"
1364
                                 " not part of a cluster")
1365

    
1366
    master, myself = ssconf.GetMasterAndMyself(ss=ss)
1367
    if master != myself:
1368
      raise errors.OpPrereqError("This is not the master node, please connect"
1369
                                 " to node '%s' and rerun the command" %
1370
                                 master)
1371
    raise
1372
  return client
1373

    
1374

    
1375
def FormatError(err):
1376
  """Return a formatted error message for a given error.
1377

1378
  This function takes an exception instance and returns a tuple
1379
  consisting of two values: first, the recommended exit code, and
1380
  second, a string describing the error message (not
1381
  newline-terminated).
1382

1383
  """
1384
  retcode = 1
1385
  obuf = StringIO()
1386
  msg = str(err)
1387
  if isinstance(err, errors.ConfigurationError):
1388
    txt = "Corrupt configuration file: %s" % msg
1389
    logging.error(txt)
1390
    obuf.write(txt + "\n")
1391
    obuf.write("Aborting.")
1392
    retcode = 2
1393
  elif isinstance(err, errors.HooksAbort):
1394
    obuf.write("Failure: hooks execution failed:\n")
1395
    for node, script, out in err.args[0]:
1396
      if out:
1397
        obuf.write("  node: %s, script: %s, output: %s\n" %
1398
                   (node, script, out))
1399
      else:
1400
        obuf.write("  node: %s, script: %s (no output)\n" %
1401
                   (node, script))
1402
  elif isinstance(err, errors.HooksFailure):
1403
    obuf.write("Failure: hooks general failure: %s" % msg)
1404
  elif isinstance(err, errors.ResolverError):
1405
    this_host = utils.HostInfo.SysName()
1406
    if err.args[0] == this_host:
1407
      msg = "Failure: can't resolve my own hostname ('%s')"
1408
    else:
1409
      msg = "Failure: can't resolve hostname '%s'"
1410
    obuf.write(msg % err.args[0])
1411
  elif isinstance(err, errors.OpPrereqError):
1412
    if len(err.args) == 2:
1413
      obuf.write("Failure: prerequisites not met for this"
1414
               " operation:\nerror type: %s, error details:\n%s" %
1415
                 (err.args[1], err.args[0]))
1416
    else:
1417
      obuf.write("Failure: prerequisites not met for this"
1418
                 " operation:\n%s" % msg)
1419
  elif isinstance(err, errors.OpExecError):
1420
    obuf.write("Failure: command execution error:\n%s" % msg)
1421
  elif isinstance(err, errors.TagError):
1422
    obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1423
  elif isinstance(err, errors.JobQueueDrainError):
1424
    obuf.write("Failure: the job queue is marked for drain and doesn't"
1425
               " accept new requests\n")
1426
  elif isinstance(err, errors.JobQueueFull):
1427
    obuf.write("Failure: the job queue is full and doesn't accept new"
1428
               " job submissions until old jobs are archived\n")
1429
  elif isinstance(err, errors.TypeEnforcementError):
1430
    obuf.write("Parameter Error: %s" % msg)
1431
  elif isinstance(err, errors.ParameterError):
1432
    obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1433
  elif isinstance(err, errors.GenericError):
1434
    obuf.write("Unhandled Ganeti error: %s" % msg)
1435
  elif isinstance(err, luxi.NoMasterError):
1436
    obuf.write("Cannot communicate with the master daemon.\nIs it running"
1437
               " and listening for connections?")
1438
  elif isinstance(err, luxi.TimeoutError):
1439
    obuf.write("Timeout while talking to the master daemon. Error:\n"
1440
               "%s" % msg)
1441
  elif isinstance(err, luxi.ProtocolError):
1442
    obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1443
               "%s" % msg)
1444
  elif isinstance(err, JobSubmittedException):
1445
    obuf.write("JobID: %s\n" % err.args[0])
1446
    retcode = 0
1447
  else:
1448
    obuf.write("Unhandled exception: %s" % msg)
1449
  return retcode, obuf.getvalue().rstrip('\n')
1450

    
1451

    
1452
def GenericMain(commands, override=None, aliases=None):
1453
  """Generic main function for all the gnt-* commands.
1454

1455
  Arguments:
1456
    - commands: a dictionary with a special structure, see the design doc
1457
                for command line handling.
1458
    - override: if not None, we expect a dictionary with keys that will
1459
                override command line options; this can be used to pass
1460
                options from the scripts to generic functions
1461
    - aliases: dictionary with command aliases {'alias': 'target, ...}
1462

1463
  """
1464
  # save the program name and the entire command line for later logging
1465
  if sys.argv:
1466
    binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1467
    if len(sys.argv) >= 2:
1468
      binary += " " + sys.argv[1]
1469
      old_cmdline = " ".join(sys.argv[2:])
1470
    else:
1471
      old_cmdline = ""
1472
  else:
1473
    binary = "<unknown program>"
1474
    old_cmdline = ""
1475

    
1476
  if aliases is None:
1477
    aliases = {}
1478

    
1479
  try:
1480
    func, options, args = _ParseArgs(sys.argv, commands, aliases)
1481
  except errors.ParameterError, err:
1482
    result, err_msg = FormatError(err)
1483
    ToStderr(err_msg)
1484
    return 1
1485

    
1486
  if func is None: # parse error
1487
    return 1
1488

    
1489
  if override is not None:
1490
    for key, val in override.iteritems():
1491
      setattr(options, key, val)
1492

    
1493
  utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1494
                     stderr_logging=True, program=binary)
1495

    
1496
  if old_cmdline:
1497
    logging.info("run with arguments '%s'", old_cmdline)
1498
  else:
1499
    logging.info("run with no arguments")
1500

    
1501
  try:
1502
    result = func(options, args)
1503
  except (errors.GenericError, luxi.ProtocolError,
1504
          JobSubmittedException), err:
1505
    result, err_msg = FormatError(err)
1506
    logging.exception("Error during command processing")
1507
    ToStderr(err_msg)
1508

    
1509
  return result
1510

    
1511

    
1512
def GenericInstanceCreate(mode, opts, args):
1513
  """Add an instance to the cluster via either creation or import.
1514

1515
  @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1516
  @param opts: the command line options selected by the user
1517
  @type args: list
1518
  @param args: should contain only one element, the new instance name
1519
  @rtype: int
1520
  @return: the desired exit code
1521

1522
  """
1523
  instance = args[0]
1524

    
1525
  (pnode, snode) = SplitNodeOption(opts.node)
1526

    
1527
  hypervisor = None
1528
  hvparams = {}
1529
  if opts.hypervisor:
1530
    hypervisor, hvparams = opts.hypervisor
1531

    
1532
  if opts.nics:
1533
    try:
1534
      nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
1535
    except ValueError, err:
1536
      raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
1537
    nics = [{}] * nic_max
1538
    for nidx, ndict in opts.nics:
1539
      nidx = int(nidx)
1540
      if not isinstance(ndict, dict):
1541
        msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict)
1542
        raise errors.OpPrereqError(msg)
1543
      nics[nidx] = ndict
1544
  elif opts.no_nics:
1545
    # no nics
1546
    nics = []
1547
  elif mode == constants.INSTANCE_CREATE:
1548
    # default of one nic, all auto
1549
    nics = [{}]
1550
  else:
1551
    # mode == import
1552
    nics = []
1553

    
1554
  if opts.disk_template == constants.DT_DISKLESS:
1555
    if opts.disks or opts.sd_size is not None:
1556
      raise errors.OpPrereqError("Diskless instance but disk"
1557
                                 " information passed")
1558
    disks = []
1559
  else:
1560
    if (not opts.disks and not opts.sd_size
1561
        and mode == constants.INSTANCE_CREATE):
1562
      raise errors.OpPrereqError("No disk information specified")
1563
    if opts.disks and opts.sd_size is not None:
1564
      raise errors.OpPrereqError("Please use either the '--disk' or"
1565
                                 " '-s' option")
1566
    if opts.sd_size is not None:
1567
      opts.disks = [(0, {"size": opts.sd_size})]
1568

    
1569
    if opts.disks:
1570
      try:
1571
        disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1572
      except ValueError, err:
1573
        raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
1574
      disks = [{}] * disk_max
1575
    else:
1576
      disks = []
1577
    for didx, ddict in opts.disks:
1578
      didx = int(didx)
1579
      if not isinstance(ddict, dict):
1580
        msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1581
        raise errors.OpPrereqError(msg)
1582
      elif "size" in ddict:
1583
        if "adopt" in ddict:
1584
          raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1585
                                     " (disk %d)" % didx)
1586
        try:
1587
          ddict["size"] = utils.ParseUnit(ddict["size"])
1588
        except ValueError, err:
1589
          raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1590
                                     (didx, err))
1591
      elif "adopt" in ddict:
1592
        if mode == constants.INSTANCE_IMPORT:
1593
          raise errors.OpPrereqError("Disk adoption not allowed for instance"
1594
                                     " import")
1595
        ddict["size"] = 0
1596
      else:
1597
        raise errors.OpPrereqError("Missing size or adoption source for"
1598
                                   " disk %d" % didx)
1599
      disks[didx] = ddict
1600

    
1601
  utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
1602
  utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1603

    
1604
  if mode == constants.INSTANCE_CREATE:
1605
    start = opts.start
1606
    os_type = opts.os
1607
    src_node = None
1608
    src_path = None
1609
    no_install = opts.no_install
1610
    identify_defaults = False
1611
  elif mode == constants.INSTANCE_IMPORT:
1612
    start = False
1613
    os_type = None
1614
    src_node = opts.src_node
1615
    src_path = opts.src_dir
1616
    no_install = None
1617
    identify_defaults = opts.identify_defaults
1618
  else:
1619
    raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1620

    
1621
  op = opcodes.OpCreateInstance(instance_name=instance,
1622
                                disks=disks,
1623
                                disk_template=opts.disk_template,
1624
                                nics=nics,
1625
                                pnode=pnode, snode=snode,
1626
                                ip_check=opts.ip_check,
1627
                                name_check=opts.name_check,
1628
                                wait_for_sync=opts.wait_for_sync,
1629
                                file_storage_dir=opts.file_storage_dir,
1630
                                file_driver=opts.file_driver,
1631
                                iallocator=opts.iallocator,
1632
                                hypervisor=hypervisor,
1633
                                hvparams=hvparams,
1634
                                beparams=opts.beparams,
1635
                                mode=mode,
1636
                                start=start,
1637
                                os_type=os_type,
1638
                                src_node=src_node,
1639
                                src_path=src_path,
1640
                                no_install=no_install,
1641
                                identify_defaults=identify_defaults)
1642

    
1643
  SubmitOrSend(op, opts)
1644
  return 0
1645

    
1646

    
1647
class _RunWhileClusterStoppedHelper:
1648
  """Helper class for L{RunWhileClusterStopped} to simplify state management
1649

1650
  """
1651
  def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
1652
    """Initializes this class.
1653

1654
    @type feedback_fn: callable
1655
    @param feedback_fn: Feedback function
1656
    @type cluster_name: string
1657
    @param cluster_name: Cluster name
1658
    @type master_node: string
1659
    @param master_node Master node name
1660
    @type online_nodes: list
1661
    @param online_nodes: List of names of online nodes
1662

1663
    """
1664
    self.feedback_fn = feedback_fn
1665
    self.cluster_name = cluster_name
1666
    self.master_node = master_node
1667
    self.online_nodes = online_nodes
1668

    
1669
    self.ssh = ssh.SshRunner(self.cluster_name)
1670

    
1671
    self.nonmaster_nodes = [name for name in online_nodes
1672
                            if name != master_node]
1673

    
1674
    assert self.master_node not in self.nonmaster_nodes
1675

    
1676
  def _RunCmd(self, node_name, cmd):
1677
    """Runs a command on the local or a remote machine.
1678

1679
    @type node_name: string
1680
    @param node_name: Machine name
1681
    @type cmd: list
1682
    @param cmd: Command
1683

1684
    """
1685
    if node_name is None or node_name == self.master_node:
1686
      # No need to use SSH
1687
      result = utils.RunCmd(cmd)
1688
    else:
1689
      result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
1690

    
1691
    if result.failed:
1692
      errmsg = ["Failed to run command %s" % result.cmd]
1693
      if node_name:
1694
        errmsg.append("on node %s" % node_name)
1695
      errmsg.append(": exitcode %s and error %s" %
1696
                    (result.exit_code, result.output))
1697
      raise errors.OpExecError(" ".join(errmsg))
1698

    
1699
  def Call(self, fn, *args):
1700
    """Call function while all daemons are stopped.
1701

1702
    @type fn: callable
1703
    @param fn: Function to be called
1704

1705
    """
1706
    # Pause watcher by acquiring an exclusive lock on watcher state file
1707
    self.feedback_fn("Blocking watcher")
1708
    watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
1709
    try:
1710
      # TODO: Currently, this just blocks. There's no timeout.
1711
      # TODO: Should it be a shared lock?
1712
      watcher_block.Exclusive(blocking=True)
1713

    
1714
      # Stop master daemons, so that no new jobs can come in and all running
1715
      # ones are finished
1716
      self.feedback_fn("Stopping master daemons")
1717
      self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
1718
      try:
1719
        # Stop daemons on all nodes
1720
        for node_name in self.online_nodes:
1721
          self.feedback_fn("Stopping daemons on %s" % node_name)
1722
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
1723

    
1724
        # All daemons are shut down now
1725
        try:
1726
          return fn(self, *args)
1727
        except Exception, err:
1728
          _, errmsg = FormatError(err)
1729
          logging.exception("Caught exception")
1730
          self.feedback_fn(errmsg)
1731
          raise
1732
      finally:
1733
        # Start cluster again, master node last
1734
        for node_name in self.nonmaster_nodes + [self.master_node]:
1735
          self.feedback_fn("Starting daemons on %s" % node_name)
1736
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
1737
    finally:
1738
      # Resume watcher
1739
      watcher_block.Close()
1740

    
1741

    
1742
def RunWhileClusterStopped(feedback_fn, fn, *args):
1743
  """Calls a function while all cluster daemons are stopped.
1744

1745
  @type feedback_fn: callable
1746
  @param feedback_fn: Feedback function
1747
  @type fn: callable
1748
  @param fn: Function to be called when daemons are stopped
1749

1750
  """
1751
  feedback_fn("Gathering cluster information")
1752

    
1753
  # This ensures we're running on the master daemon
1754
  cl = GetClient()
1755

    
1756
  (cluster_name, master_node) = \
1757
    cl.QueryConfigValues(["cluster_name", "master_node"])
1758

    
1759
  online_nodes = GetOnlineNodes([], cl=cl)
1760

    
1761
  # Don't keep a reference to the client. The master daemon will go away.
1762
  del cl
1763

    
1764
  assert master_node in online_nodes
1765

    
1766
  return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
1767
                                       online_nodes).Call(fn, *args)
1768

    
1769

    
1770
def GenerateTable(headers, fields, separator, data,
1771
                  numfields=None, unitfields=None,
1772
                  units=None):
1773
  """Prints a table with headers and different fields.
1774

1775
  @type headers: dict
1776
  @param headers: dictionary mapping field names to headers for
1777
      the table
1778
  @type fields: list
1779
  @param fields: the field names corresponding to each row in
1780
      the data field
1781
  @param separator: the separator to be used; if this is None,
1782
      the default 'smart' algorithm is used which computes optimal
1783
      field width, otherwise just the separator is used between
1784
      each field
1785
  @type data: list
1786
  @param data: a list of lists, each sublist being one row to be output
1787
  @type numfields: list
1788
  @param numfields: a list with the fields that hold numeric
1789
      values and thus should be right-aligned
1790
  @type unitfields: list
1791
  @param unitfields: a list with the fields that hold numeric
1792
      values that should be formatted with the units field
1793
  @type units: string or None
1794
  @param units: the units we should use for formatting, or None for
1795
      automatic choice (human-readable for non-separator usage, otherwise
1796
      megabytes); this is a one-letter string
1797

1798
  """
1799
  if units is None:
1800
    if separator:
1801
      units = "m"
1802
    else:
1803
      units = "h"
1804

    
1805
  if numfields is None:
1806
    numfields = []
1807
  if unitfields is None:
1808
    unitfields = []
1809

    
1810
  numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
1811
  unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
1812

    
1813
  format_fields = []
1814
  for field in fields:
1815
    if headers and field not in headers:
1816
      # TODO: handle better unknown fields (either revert to old
1817
      # style of raising exception, or deal more intelligently with
1818
      # variable fields)
1819
      headers[field] = field
1820
    if separator is not None:
1821
      format_fields.append("%s")
1822
    elif numfields.Matches(field):
1823
      format_fields.append("%*s")
1824
    else:
1825
      format_fields.append("%-*s")
1826

    
1827
  if separator is None:
1828
    mlens = [0 for name in fields]
1829
    format = ' '.join(format_fields)
1830
  else:
1831
    format = separator.replace("%", "%%").join(format_fields)
1832

    
1833
  for row in data:
1834
    if row is None:
1835
      continue
1836
    for idx, val in enumerate(row):
1837
      if unitfields.Matches(fields[idx]):
1838
        try:
1839
          val = int(val)
1840
        except (TypeError, ValueError):
1841
          pass
1842
        else:
1843
          val = row[idx] = utils.FormatUnit(val, units)
1844
      val = row[idx] = str(val)
1845
      if separator is None:
1846
        mlens[idx] = max(mlens[idx], len(val))
1847

    
1848
  result = []
1849
  if headers:
1850
    args = []
1851
    for idx, name in enumerate(fields):
1852
      hdr = headers[name]
1853
      if separator is None:
1854
        mlens[idx] = max(mlens[idx], len(hdr))
1855
        args.append(mlens[idx])
1856
      args.append(hdr)
1857
    result.append(format % tuple(args))
1858

    
1859
  if separator is None:
1860
    assert len(mlens) == len(fields)
1861

    
1862
    if fields and not numfields.Matches(fields[-1]):
1863
      mlens[-1] = 0
1864

    
1865
  for line in data:
1866
    args = []
1867
    if line is None:
1868
      line = ['-' for _ in fields]
1869
    for idx in range(len(fields)):
1870
      if separator is None:
1871
        args.append(mlens[idx])
1872
      args.append(line[idx])
1873
    result.append(format % tuple(args))
1874

    
1875
  return result
1876

    
1877

    
1878
def FormatTimestamp(ts):
1879
  """Formats a given timestamp.
1880

1881
  @type ts: timestamp
1882
  @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
1883

1884
  @rtype: string
1885
  @return: a string with the formatted timestamp
1886

1887
  """
1888
  if not isinstance (ts, (tuple, list)) or len(ts) != 2:
1889
    return '?'
1890
  sec, usec = ts
1891
  return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
1892

    
1893

    
1894
def ParseTimespec(value):
1895
  """Parse a time specification.
1896

1897
  The following suffixed will be recognized:
1898

1899
    - s: seconds
1900
    - m: minutes
1901
    - h: hours
1902
    - d: day
1903
    - w: weeks
1904

1905
  Without any suffix, the value will be taken to be in seconds.
1906

1907
  """
1908
  value = str(value)
1909
  if not value:
1910
    raise errors.OpPrereqError("Empty time specification passed")
1911
  suffix_map = {
1912
    's': 1,
1913
    'm': 60,
1914
    'h': 3600,
1915
    'd': 86400,
1916
    'w': 604800,
1917
    }
1918
  if value[-1] not in suffix_map:
1919
    try:
1920
      value = int(value)
1921
    except (TypeError, ValueError):
1922
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1923
  else:
1924
    multiplier = suffix_map[value[-1]]
1925
    value = value[:-1]
1926
    if not value: # no data left after stripping the suffix
1927
      raise errors.OpPrereqError("Invalid time specification (only"
1928
                                 " suffix passed)")
1929
    try:
1930
      value = int(value) * multiplier
1931
    except (TypeError, ValueError):
1932
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1933
  return value
1934

    
1935

    
1936
def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
1937
                   filter_master=False):
1938
  """Returns the names of online nodes.
1939

1940
  This function will also log a warning on stderr with the names of
1941
  the online nodes.
1942

1943
  @param nodes: if not empty, use only this subset of nodes (minus the
1944
      offline ones)
1945
  @param cl: if not None, luxi client to use
1946
  @type nowarn: boolean
1947
  @param nowarn: by default, this function will output a note with the
1948
      offline nodes that are skipped; if this parameter is True the
1949
      note is not displayed
1950
  @type secondary_ips: boolean
1951
  @param secondary_ips: if True, return the secondary IPs instead of the
1952
      names, useful for doing network traffic over the replication interface
1953
      (if any)
1954
  @type filter_master: boolean
1955
  @param filter_master: if True, do not return the master node in the list
1956
      (useful in coordination with secondary_ips where we cannot check our
1957
      node name against the list)
1958

1959
  """
1960
  if cl is None:
1961
    cl = GetClient()
1962

    
1963
  if secondary_ips:
1964
    name_idx = 2
1965
  else:
1966
    name_idx = 0
1967

    
1968
  if filter_master:
1969
    master_node = cl.QueryConfigValues(["master_node"])[0]
1970
    filter_fn = lambda x: x != master_node
1971
  else:
1972
    filter_fn = lambda _: True
1973

    
1974
  result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"],
1975
                         use_locking=False)
1976
  offline = [row[0] for row in result if row[1]]
1977
  if offline and not nowarn:
1978
    ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
1979
  return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])]
1980

    
1981

    
1982
def _ToStream(stream, txt, *args):
1983
  """Write a message to a stream, bypassing the logging system
1984

1985
  @type stream: file object
1986
  @param stream: the file to which we should write
1987
  @type txt: str
1988
  @param txt: the message
1989

1990
  """
1991
  if args:
1992
    args = tuple(args)
1993
    stream.write(txt % args)
1994
  else:
1995
    stream.write(txt)
1996
  stream.write('\n')
1997
  stream.flush()
1998

    
1999

    
2000
def ToStdout(txt, *args):
2001
  """Write a message to stdout only, bypassing the logging system
2002

2003
  This is just a wrapper over _ToStream.
2004

2005
  @type txt: str
2006
  @param txt: the message
2007

2008
  """
2009
  _ToStream(sys.stdout, txt, *args)
2010

    
2011

    
2012
def ToStderr(txt, *args):
2013
  """Write a message to stderr only, bypassing the logging system
2014

2015
  This is just a wrapper over _ToStream.
2016

2017
  @type txt: str
2018
  @param txt: the message
2019

2020
  """
2021
  _ToStream(sys.stderr, txt, *args)
2022

    
2023

    
2024
class JobExecutor(object):
2025
  """Class which manages the submission and execution of multiple jobs.
2026

2027
  Note that instances of this class should not be reused between
2028
  GetResults() calls.
2029

2030
  """
2031
  def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
2032
    self.queue = []
2033
    if cl is None:
2034
      cl = GetClient()
2035
    self.cl = cl
2036
    self.verbose = verbose
2037
    self.jobs = []
2038
    self.opts = opts
2039
    self.feedback_fn = feedback_fn
2040

    
2041
  def QueueJob(self, name, *ops):
2042
    """Record a job for later submit.
2043

2044
    @type name: string
2045
    @param name: a description of the job, will be used in WaitJobSet
2046
    """
2047
    SetGenericOpcodeOpts(ops, self.opts)
2048
    self.queue.append((name, ops))
2049

    
2050
  def SubmitPending(self):
2051
    """Submit all pending jobs.
2052

2053
    """
2054
    results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
2055
    for (idx, ((status, data), (name, _))) in enumerate(zip(results,
2056
                                                            self.queue)):
2057
      self.jobs.append((idx, status, data, name))
2058

    
2059
  def _ChooseJob(self):
2060
    """Choose a non-waiting/queued job to poll next.
2061

2062
    """
2063
    assert self.jobs, "_ChooseJob called with empty job list"
2064

    
2065
    result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
2066
    assert result
2067

    
2068
    for job_data, status in zip(self.jobs, result):
2069
      if status[0] in (constants.JOB_STATUS_QUEUED,
2070
                    constants.JOB_STATUS_WAITLOCK,
2071
                    constants.JOB_STATUS_CANCELING):
2072
        # job is still waiting
2073
        continue
2074
      # good candidate found
2075
      self.jobs.remove(job_data)
2076
      return job_data
2077

    
2078
    # no job found
2079
    return self.jobs.pop(0)
2080

    
2081
  def GetResults(self):
2082
    """Wait for and return the results of all jobs.
2083

2084
    @rtype: list
2085
    @return: list of tuples (success, job results), in the same order
2086
        as the submitted jobs; if a job has failed, instead of the result
2087
        there will be the error message
2088

2089
    """
2090
    if not self.jobs:
2091
      self.SubmitPending()
2092
    results = []
2093
    if self.verbose:
2094
      ok_jobs = [row[2] for row in self.jobs if row[1]]
2095
      if ok_jobs:
2096
        ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2097

    
2098
    # first, remove any non-submitted jobs
2099
    self.jobs, failures = utils.partition(self.jobs, lambda x: x[1])
2100
    for idx, _, jid, name in failures:
2101
      ToStderr("Failed to submit job for %s: %s", name, jid)
2102
      results.append((idx, False, jid))
2103

    
2104
    while self.jobs:
2105
      (idx, _, jid, name) = self._ChooseJob()
2106
      ToStdout("Waiting for job %s for %s...", jid, name)
2107
      try:
2108
        job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2109
        success = True
2110
      except (errors.GenericError, luxi.ProtocolError), err:
2111
        _, job_result = FormatError(err)
2112
        success = False
2113
        # the error message will always be shown, verbose or not
2114
        ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
2115

    
2116
      results.append((idx, success, job_result))
2117

    
2118
    # sort based on the index, then drop it
2119
    results.sort()
2120
    results = [i[1:] for i in results]
2121

    
2122
    return results
2123

    
2124
  def WaitOrShow(self, wait):
2125
    """Wait for job results or only print the job IDs.
2126

2127
    @type wait: boolean
2128
    @param wait: whether to wait or not
2129

2130
    """
2131
    if wait:
2132
      return self.GetResults()
2133
    else:
2134
      if not self.jobs:
2135
        self.SubmitPending()
2136
      for _, status, result, name in self.jobs:
2137
        if status:
2138
          ToStdout("%s: %s", result, name)
2139
        else:
2140
          ToStderr("Failure for %s: %s", name, result)