Statistics
| Branch: | Tag: | Revision:

root / lib / cli.py @ 3953242f

History | View | Annotate | Download (68.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module dealing with command line parsing"""
23

    
24

    
25
import sys
26
import textwrap
27
import os.path
28
import time
29
import logging
30
from cStringIO import StringIO
31

    
32
from ganeti import utils
33
from ganeti import errors
34
from ganeti import constants
35
from ganeti import opcodes
36
from ganeti import luxi
37
from ganeti import ssconf
38
from ganeti import rpc
39
from ganeti import ssh
40

    
41
from optparse import (OptionParser, TitledHelpFormatter,
42
                      Option, OptionValueError)
43

    
44

    
45
__all__ = [
46
  # Command line options
47
  "ALLOCATABLE_OPT",
48
  "ALL_OPT",
49
  "AUTO_PROMOTE_OPT",
50
  "AUTO_REPLACE_OPT",
51
  "BACKEND_OPT",
52
  "CLEANUP_OPT",
53
  "CONFIRM_OPT",
54
  "CP_SIZE_OPT",
55
  "DEBUG_OPT",
56
  "DEBUG_SIMERR_OPT",
57
  "DISKIDX_OPT",
58
  "DISK_OPT",
59
  "DISK_TEMPLATE_OPT",
60
  "DRAINED_OPT",
61
  "EARLY_RELEASE_OPT",
62
  "ENABLED_HV_OPT",
63
  "ERROR_CODES_OPT",
64
  "FIELDS_OPT",
65
  "FILESTORE_DIR_OPT",
66
  "FILESTORE_DRIVER_OPT",
67
  "FORCE_OPT",
68
  "FORCE_VARIANT_OPT",
69
  "GLOBAL_FILEDIR_OPT",
70
  "HVLIST_OPT",
71
  "HVOPTS_OPT",
72
  "HYPERVISOR_OPT",
73
  "IALLOCATOR_OPT",
74
  "IGNORE_CONSIST_OPT",
75
  "IGNORE_FAILURES_OPT",
76
  "IGNORE_SECONDARIES_OPT",
77
  "IGNORE_SIZE_OPT",
78
  "MAC_PREFIX_OPT",
79
  "MAINTAIN_NODE_HEALTH_OPT",
80
  "MASTER_NETDEV_OPT",
81
  "MC_OPT",
82
  "NET_OPT",
83
  "NEW_CLUSTER_CERT_OPT",
84
  "NEW_CONFD_HMAC_KEY_OPT",
85
  "NEW_RAPI_CERT_OPT",
86
  "NEW_SECONDARY_OPT",
87
  "NIC_PARAMS_OPT",
88
  "NODE_LIST_OPT",
89
  "NODE_PLACEMENT_OPT",
90
  "NOHDR_OPT",
91
  "NOIPCHECK_OPT",
92
  "NO_INSTALL_OPT",
93
  "NONAMECHECK_OPT",
94
  "NOLVM_STORAGE_OPT",
95
  "NOMODIFY_ETCHOSTS_OPT",
96
  "NOMODIFY_SSH_SETUP_OPT",
97
  "NONICS_OPT",
98
  "NONLIVE_OPT",
99
  "NONPLUS1_OPT",
100
  "NOSHUTDOWN_OPT",
101
  "NOSTART_OPT",
102
  "NOSSH_KEYCHECK_OPT",
103
  "NOVOTING_OPT",
104
  "NWSYNC_OPT",
105
  "ON_PRIMARY_OPT",
106
  "ON_SECONDARY_OPT",
107
  "OFFLINE_OPT",
108
  "OS_OPT",
109
  "OS_SIZE_OPT",
110
  "RAPI_CERT_OPT",
111
  "READD_OPT",
112
  "REBOOT_TYPE_OPT",
113
  "SECONDARY_IP_OPT",
114
  "SELECT_OS_OPT",
115
  "SEP_OPT",
116
  "SHOWCMD_OPT",
117
  "SHUTDOWN_TIMEOUT_OPT",
118
  "SINGLE_NODE_OPT",
119
  "SRC_DIR_OPT",
120
  "SRC_NODE_OPT",
121
  "SUBMIT_OPT",
122
  "STATIC_OPT",
123
  "SYNC_OPT",
124
  "TAG_SRC_OPT",
125
  "TIMEOUT_OPT",
126
  "USEUNITS_OPT",
127
  "USE_REPL_NET_OPT",
128
  "VERBOSE_OPT",
129
  "VG_NAME_OPT",
130
  "YES_DOIT_OPT",
131
  # Generic functions for CLI programs
132
  "GenericMain",
133
  "GenericInstanceCreate",
134
  "GetClient",
135
  "GetOnlineNodes",
136
  "JobExecutor",
137
  "JobSubmittedException",
138
  "ParseTimespec",
139
  "RunWhileClusterStopped",
140
  "SubmitOpCode",
141
  "SubmitOrSend",
142
  "UsesRPC",
143
  # Formatting functions
144
  "ToStderr", "ToStdout",
145
  "FormatError",
146
  "GenerateTable",
147
  "AskUser",
148
  "FormatTimestamp",
149
  # Tags functions
150
  "ListTags",
151
  "AddTags",
152
  "RemoveTags",
153
  # command line options support infrastructure
154
  "ARGS_MANY_INSTANCES",
155
  "ARGS_MANY_NODES",
156
  "ARGS_NONE",
157
  "ARGS_ONE_INSTANCE",
158
  "ARGS_ONE_NODE",
159
  "ARGS_ONE_OS",
160
  "ArgChoice",
161
  "ArgCommand",
162
  "ArgFile",
163
  "ArgHost",
164
  "ArgInstance",
165
  "ArgJobId",
166
  "ArgNode",
167
  "ArgOs",
168
  "ArgSuggest",
169
  "ArgUnknown",
170
  "OPT_COMPL_INST_ADD_NODES",
171
  "OPT_COMPL_MANY_NODES",
172
  "OPT_COMPL_ONE_IALLOCATOR",
173
  "OPT_COMPL_ONE_INSTANCE",
174
  "OPT_COMPL_ONE_NODE",
175
  "OPT_COMPL_ONE_OS",
176
  "cli_option",
177
  "SplitNodeOption",
178
  "CalculateOSNames",
179
  ]
180

    
181
NO_PREFIX = "no_"
182
UN_PREFIX = "-"
183

    
184

    
185
class _Argument:
186
  def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
187
    self.min = min
188
    self.max = max
189

    
190
  def __repr__(self):
191
    return ("<%s min=%s max=%s>" %
192
            (self.__class__.__name__, self.min, self.max))
193

    
194

    
195
class ArgSuggest(_Argument):
196
  """Suggesting argument.
197

198
  Value can be any of the ones passed to the constructor.
199

200
  """
201
  # pylint: disable-msg=W0622
202
  def __init__(self, min=0, max=None, choices=None):
203
    _Argument.__init__(self, min=min, max=max)
204
    self.choices = choices
205

    
206
  def __repr__(self):
207
    return ("<%s min=%s max=%s choices=%r>" %
208
            (self.__class__.__name__, self.min, self.max, self.choices))
209

    
210

    
211
class ArgChoice(ArgSuggest):
212
  """Choice argument.
213

214
  Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
215
  but value must be one of the choices.
216

217
  """
218

    
219

    
220
class ArgUnknown(_Argument):
221
  """Unknown argument to program (e.g. determined at runtime).
222

223
  """
224

    
225

    
226
class ArgInstance(_Argument):
227
  """Instances argument.
228

229
  """
230

    
231

    
232
class ArgNode(_Argument):
233
  """Node argument.
234

235
  """
236

    
237
class ArgJobId(_Argument):
238
  """Job ID argument.
239

240
  """
241

    
242

    
243
class ArgFile(_Argument):
244
  """File path argument.
245

246
  """
247

    
248

    
249
class ArgCommand(_Argument):
250
  """Command argument.
251

252
  """
253

    
254

    
255
class ArgHost(_Argument):
256
  """Host argument.
257

258
  """
259

    
260

    
261
class ArgOs(_Argument):
262
  """OS argument.
263

264
  """
265

    
266

    
267
ARGS_NONE = []
268
ARGS_MANY_INSTANCES = [ArgInstance()]
269
ARGS_MANY_NODES = [ArgNode()]
270
ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
271
ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
272
ARGS_ONE_OS = [ArgOs(min=1, max=1)]
273

    
274

    
275
def _ExtractTagsObject(opts, args):
276
  """Extract the tag type object.
277

278
  Note that this function will modify its args parameter.
279

280
  """
281
  if not hasattr(opts, "tag_type"):
282
    raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
283
  kind = opts.tag_type
284
  if kind == constants.TAG_CLUSTER:
285
    retval = kind, kind
286
  elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
287
    if not args:
288
      raise errors.OpPrereqError("no arguments passed to the command")
289
    name = args.pop(0)
290
    retval = kind, name
291
  else:
292
    raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
293
  return retval
294

    
295

    
296
def _ExtendTags(opts, args):
297
  """Extend the args if a source file has been given.
298

299
  This function will extend the tags with the contents of the file
300
  passed in the 'tags_source' attribute of the opts parameter. A file
301
  named '-' will be replaced by stdin.
302

303
  """
304
  fname = opts.tags_source
305
  if fname is None:
306
    return
307
  if fname == "-":
308
    new_fh = sys.stdin
309
  else:
310
    new_fh = open(fname, "r")
311
  new_data = []
312
  try:
313
    # we don't use the nice 'new_data = [line.strip() for line in fh]'
314
    # because of python bug 1633941
315
    while True:
316
      line = new_fh.readline()
317
      if not line:
318
        break
319
      new_data.append(line.strip())
320
  finally:
321
    new_fh.close()
322
  args.extend(new_data)
323

    
324

    
325
def ListTags(opts, args):
326
  """List the tags on a given object.
327

328
  This is a generic implementation that knows how to deal with all
329
  three cases of tag objects (cluster, node, instance). The opts
330
  argument is expected to contain a tag_type field denoting what
331
  object type we work on.
332

333
  """
334
  kind, name = _ExtractTagsObject(opts, args)
335
  cl = GetClient()
336
  result = cl.QueryTags(kind, name)
337
  result = list(result)
338
  result.sort()
339
  for tag in result:
340
    ToStdout(tag)
341

    
342

    
343
def AddTags(opts, args):
344
  """Add tags on a given object.
345

346
  This is a generic implementation that knows how to deal with all
347
  three cases of tag objects (cluster, node, instance). The opts
348
  argument is expected to contain a tag_type field denoting what
349
  object type we work on.
350

351
  """
352
  kind, name = _ExtractTagsObject(opts, args)
353
  _ExtendTags(opts, args)
354
  if not args:
355
    raise errors.OpPrereqError("No tags to be added")
356
  op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
357
  SubmitOpCode(op)
358

    
359

    
360
def RemoveTags(opts, args):
361
  """Remove tags from a given object.
362

363
  This is a generic implementation that knows how to deal with all
364
  three cases of tag objects (cluster, node, instance). The opts
365
  argument is expected to contain a tag_type field denoting what
366
  object type we work on.
367

368
  """
369
  kind, name = _ExtractTagsObject(opts, args)
370
  _ExtendTags(opts, args)
371
  if not args:
372
    raise errors.OpPrereqError("No tags to be removed")
373
  op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
374
  SubmitOpCode(op)
375

    
376

    
377
def check_unit(option, opt, value): # pylint: disable-msg=W0613
378
  """OptParsers custom converter for units.
379

380
  """
381
  try:
382
    return utils.ParseUnit(value)
383
  except errors.UnitParseError, err:
384
    raise OptionValueError("option %s: %s" % (opt, err))
385

    
386

    
387
def _SplitKeyVal(opt, data):
388
  """Convert a KeyVal string into a dict.
389

390
  This function will convert a key=val[,...] string into a dict. Empty
391
  values will be converted specially: keys which have the prefix 'no_'
392
  will have the value=False and the prefix stripped, the others will
393
  have value=True.
394

395
  @type opt: string
396
  @param opt: a string holding the option name for which we process the
397
      data, used in building error messages
398
  @type data: string
399
  @param data: a string of the format key=val,key=val,...
400
  @rtype: dict
401
  @return: {key=val, key=val}
402
  @raises errors.ParameterError: if there are duplicate keys
403

404
  """
405
  kv_dict = {}
406
  if data:
407
    for elem in utils.UnescapeAndSplit(data, sep=","):
408
      if "=" in elem:
409
        key, val = elem.split("=", 1)
410
      else:
411
        if elem.startswith(NO_PREFIX):
412
          key, val = elem[len(NO_PREFIX):], False
413
        elif elem.startswith(UN_PREFIX):
414
          key, val = elem[len(UN_PREFIX):], None
415
        else:
416
          key, val = elem, True
417
      if key in kv_dict:
418
        raise errors.ParameterError("Duplicate key '%s' in option %s" %
419
                                    (key, opt))
420
      kv_dict[key] = val
421
  return kv_dict
422

    
423

    
424
def check_ident_key_val(option, opt, value):  # pylint: disable-msg=W0613
425
  """Custom parser for ident:key=val,key=val options.
426

427
  This will store the parsed values as a tuple (ident, {key: val}). As such,
428
  multiple uses of this option via action=append is possible.
429

430
  """
431
  if ":" not in value:
432
    ident, rest = value, ''
433
  else:
434
    ident, rest = value.split(":", 1)
435

    
436
  if ident.startswith(NO_PREFIX):
437
    if rest:
438
      msg = "Cannot pass options when removing parameter groups: %s" % value
439
      raise errors.ParameterError(msg)
440
    retval = (ident[len(NO_PREFIX):], False)
441
  elif ident.startswith(UN_PREFIX):
442
    if rest:
443
      msg = "Cannot pass options when removing parameter groups: %s" % value
444
      raise errors.ParameterError(msg)
445
    retval = (ident[len(UN_PREFIX):], None)
446
  else:
447
    kv_dict = _SplitKeyVal(opt, rest)
448
    retval = (ident, kv_dict)
449
  return retval
450

    
451

    
452
def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
453
  """Custom parser class for key=val,key=val options.
454

455
  This will store the parsed values as a dict {key: val}.
456

457
  """
458
  return _SplitKeyVal(opt, value)
459

    
460

    
461
def check_bool(option, opt, value): # pylint: disable-msg=W0613
462
  """Custom parser for yes/no options.
463

464
  This will store the parsed value as either True or False.
465

466
  """
467
  value = value.lower()
468
  if value == constants.VALUE_FALSE or value == "no":
469
    return False
470
  elif value == constants.VALUE_TRUE or value == "yes":
471
    return True
472
  else:
473
    raise errors.ParameterError("Invalid boolean value '%s'" % value)
474

    
475

    
476
# completion_suggestion is normally a list. Using numeric values not evaluating
477
# to False for dynamic completion.
478
(OPT_COMPL_MANY_NODES,
479
 OPT_COMPL_ONE_NODE,
480
 OPT_COMPL_ONE_INSTANCE,
481
 OPT_COMPL_ONE_OS,
482
 OPT_COMPL_ONE_IALLOCATOR,
483
 OPT_COMPL_INST_ADD_NODES) = range(100, 106)
484

    
485
OPT_COMPL_ALL = frozenset([
486
  OPT_COMPL_MANY_NODES,
487
  OPT_COMPL_ONE_NODE,
488
  OPT_COMPL_ONE_INSTANCE,
489
  OPT_COMPL_ONE_OS,
490
  OPT_COMPL_ONE_IALLOCATOR,
491
  OPT_COMPL_INST_ADD_NODES,
492
  ])
493

    
494

    
495
class CliOption(Option):
496
  """Custom option class for optparse.
497

498
  """
499
  ATTRS = Option.ATTRS + [
500
    "completion_suggest",
501
    ]
502
  TYPES = Option.TYPES + (
503
    "identkeyval",
504
    "keyval",
505
    "unit",
506
    "bool",
507
    )
508
  TYPE_CHECKER = Option.TYPE_CHECKER.copy()
509
  TYPE_CHECKER["identkeyval"] = check_ident_key_val
510
  TYPE_CHECKER["keyval"] = check_key_val
511
  TYPE_CHECKER["unit"] = check_unit
512
  TYPE_CHECKER["bool"] = check_bool
513

    
514

    
515
# optparse.py sets make_option, so we do it for our own option class, too
516
cli_option = CliOption
517

    
518

    
519
_YORNO = "yes|no"
520

    
521
DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
522
                       help="Increase debugging level")
523

    
524
NOHDR_OPT = cli_option("--no-headers", default=False,
525
                       action="store_true", dest="no_headers",
526
                       help="Don't display column headers")
527

    
528
SEP_OPT = cli_option("--separator", default=None,
529
                     action="store", dest="separator",
530
                     help=("Separator between output fields"
531
                           " (defaults to one space)"))
532

    
533
USEUNITS_OPT = cli_option("--units", default=None,
534
                          dest="units", choices=('h', 'm', 'g', 't'),
535
                          help="Specify units for output (one of hmgt)")
536

    
537
FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
538
                        type="string", metavar="FIELDS",
539
                        help="Comma separated list of output fields")
540

    
541
FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
542
                       default=False, help="Force the operation")
543

    
544
CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
545
                         default=False, help="Do not require confirmation")
546

    
547
TAG_SRC_OPT = cli_option("--from", dest="tags_source",
548
                         default=None, help="File with tag names")
549

    
550
SUBMIT_OPT = cli_option("--submit", dest="submit_only",
551
                        default=False, action="store_true",
552
                        help=("Submit the job and return the job ID, but"
553
                              " don't wait for the job to finish"))
554

    
555
SYNC_OPT = cli_option("--sync", dest="do_locking",
556
                      default=False, action="store_true",
557
                      help=("Grab locks while doing the queries"
558
                            " in order to ensure more consistent results"))
559

    
560
_DRY_RUN_OPT = cli_option("--dry-run", default=False,
561
                          action="store_true",
562
                          help=("Do not execute the operation, just run the"
563
                                " check steps and verify it it could be"
564
                                " executed"))
565

    
566
VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
567
                         action="store_true",
568
                         help="Increase the verbosity of the operation")
569

    
570
DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
571
                              action="store_true", dest="simulate_errors",
572
                              help="Debugging option that makes the operation"
573
                              " treat most runtime checks as failed")
574

    
575
NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
576
                        default=True, action="store_false",
577
                        help="Don't wait for sync (DANGEROUS!)")
578

    
579
DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
580
                               help="Custom disk setup (diskless, file,"
581
                               " plain or drbd)",
582
                               default=None, metavar="TEMPL",
583
                               choices=list(constants.DISK_TEMPLATES))
584

    
585
NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
586
                        help="Do not create any network cards for"
587
                        " the instance")
588

    
589
FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
590
                               help="Relative path under default cluster-wide"
591
                               " file storage dir to store file-based disks",
592
                               default=None, metavar="<DIR>")
593

    
594
FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
595
                                  help="Driver to use for image files",
596
                                  default="loop", metavar="<DRIVER>",
597
                                  choices=list(constants.FILE_DRIVER))
598

    
599
IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
600
                            help="Select nodes for the instance automatically"
601
                            " using the <NAME> iallocator plugin",
602
                            default=None, type="string",
603
                            completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
604

    
605
OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
606
                    metavar="<os>",
607
                    completion_suggest=OPT_COMPL_ONE_OS)
608

    
609
FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
610
                               action="store_true", default=False,
611
                               help="Force an unknown variant")
612

    
613
NO_INSTALL_OPT = cli_option("--no-install", dest="no_install",
614
                            action="store_true", default=False,
615
                            help="Do not install the OS (will"
616
                            " enable no-start)")
617

    
618
BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
619
                         type="keyval", default={},
620
                         help="Backend parameters")
621

    
622
HVOPTS_OPT =  cli_option("-H", "--hypervisor-parameters", type="keyval",
623
                         default={}, dest="hvparams",
624
                         help="Hypervisor parameters")
625

    
626
HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
627
                            help="Hypervisor and hypervisor options, in the"
628
                            " format hypervisor:option=value,option=value,...",
629
                            default=None, type="identkeyval")
630

    
631
HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
632
                        help="Hypervisor and hypervisor options, in the"
633
                        " format hypervisor:option=value,option=value,...",
634
                        default=[], action="append", type="identkeyval")
635

    
636
NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
637
                           action="store_false",
638
                           help="Don't check that the instance's IP"
639
                           " is alive")
640

    
641
NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
642
                             default=True, action="store_false",
643
                             help="Don't check that the instance's name"
644
                             " is resolvable")
645

    
646
NET_OPT = cli_option("--net",
647
                     help="NIC parameters", default=[],
648
                     dest="nics", action="append", type="identkeyval")
649

    
650
DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
651
                      dest="disks", action="append", type="identkeyval")
652

    
653
DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
654
                         help="Comma-separated list of disks"
655
                         " indices to act on (e.g. 0,2) (optional,"
656
                         " defaults to all disks)")
657

    
658
OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
659
                         help="Enforces a single-disk configuration using the"
660
                         " given disk size, in MiB unless a suffix is used",
661
                         default=None, type="unit", metavar="<size>")
662

    
663
IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
664
                                dest="ignore_consistency",
665
                                action="store_true", default=False,
666
                                help="Ignore the consistency of the disks on"
667
                                " the secondary")
668

    
669
NONLIVE_OPT = cli_option("--non-live", dest="live",
670
                         default=True, action="store_false",
671
                         help="Do a non-live migration (this usually means"
672
                         " freeze the instance, save the state, transfer and"
673
                         " only then resume running on the secondary node)")
674

    
675
NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
676
                                help="Target node and optional secondary node",
677
                                metavar="<pnode>[:<snode>]",
678
                                completion_suggest=OPT_COMPL_INST_ADD_NODES)
679

    
680
NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
681
                           action="append", metavar="<node>",
682
                           help="Use only this node (can be used multiple"
683
                           " times, if not given defaults to all nodes)",
684
                           completion_suggest=OPT_COMPL_ONE_NODE)
685

    
686
SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
687
                             metavar="<node>",
688
                             completion_suggest=OPT_COMPL_ONE_NODE)
689

    
690
NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
691
                         action="store_false",
692
                         help="Don't start the instance after creation")
693

    
694
SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
695
                         action="store_true", default=False,
696
                         help="Show command instead of executing it")
697

    
698
CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
699
                         default=False, action="store_true",
700
                         help="Instead of performing the migration, try to"
701
                         " recover from a failed cleanup. This is safe"
702
                         " to run even if the instance is healthy, but it"
703
                         " will create extra replication traffic and "
704
                         " disrupt briefly the replication (like during the"
705
                         " migration")
706

    
707
STATIC_OPT = cli_option("-s", "--static", dest="static",
708
                        action="store_true", default=False,
709
                        help="Only show configuration data, not runtime data")
710

    
711
ALL_OPT = cli_option("--all", dest="show_all",
712
                     default=False, action="store_true",
713
                     help="Show info on all instances on the cluster."
714
                     " This can take a long time to run, use wisely")
715

    
716
SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
717
                           action="store_true", default=False,
718
                           help="Interactive OS reinstall, lists available"
719
                           " OS templates for selection")
720

    
721
IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
722
                                 action="store_true", default=False,
723
                                 help="Remove the instance from the cluster"
724
                                 " configuration even if there are failures"
725
                                 " during the removal process")
726

    
727
NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
728
                               help="Specifies the new secondary node",
729
                               metavar="NODE", default=None,
730
                               completion_suggest=OPT_COMPL_ONE_NODE)
731

    
732
ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
733
                            default=False, action="store_true",
734
                            help="Replace the disk(s) on the primary"
735
                            " node (only for the drbd template)")
736

    
737
ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
738
                              default=False, action="store_true",
739
                              help="Replace the disk(s) on the secondary"
740
                              " node (only for the drbd template)")
741

    
742
AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
743
                              default=False, action="store_true",
744
                              help="Lock all nodes and auto-promote as needed"
745
                              " to MC status")
746

    
747
AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
748
                              default=False, action="store_true",
749
                              help="Automatically replace faulty disks"
750
                              " (only for the drbd template)")
751

    
752
IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
753
                             default=False, action="store_true",
754
                             help="Ignore current recorded size"
755
                             " (useful for forcing activation when"
756
                             " the recorded size is wrong)")
757

    
758
SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
759
                          metavar="<node>",
760
                          completion_suggest=OPT_COMPL_ONE_NODE)
761

    
762
SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
763
                         metavar="<dir>")
764

    
765
SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
766
                              help="Specify the secondary ip for the node",
767
                              metavar="ADDRESS", default=None)
768

    
769
READD_OPT = cli_option("--readd", dest="readd",
770
                       default=False, action="store_true",
771
                       help="Readd old node after replacing it")
772

    
773
NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
774
                                default=True, action="store_false",
775
                                help="Disable SSH key fingerprint checking")
776

    
777

    
778
MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
779
                    type="bool", default=None, metavar=_YORNO,
780
                    help="Set the master_candidate flag on the node")
781

    
782
OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
783
                         type="bool", default=None,
784
                         help="Set the offline flag on the node")
785

    
786
DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
787
                         type="bool", default=None,
788
                         help="Set the drained flag on the node")
789

    
790
ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
791
                             type="bool", default=None, metavar=_YORNO,
792
                             help="Set the allocatable flag on a volume")
793

    
794
NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
795
                               help="Disable support for lvm based instances"
796
                               " (cluster-wide)",
797
                               action="store_false", default=True)
798

    
799
ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
800
                            dest="enabled_hypervisors",
801
                            help="Comma-separated list of hypervisors",
802
                            type="string", default=None)
803

    
804
NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
805
                            type="keyval", default={},
806
                            help="NIC parameters")
807

    
808
CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
809
                         dest="candidate_pool_size", type="int",
810
                         help="Set the candidate pool size")
811

    
812
VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name",
813
                         help="Enables LVM and specifies the volume group"
814
                         " name (cluster-wide) for disk allocation [xenvg]",
815
                         metavar="VG", default=None)
816

    
817
YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
818
                          help="Destroy cluster", action="store_true")
819

    
820
NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
821
                          help="Skip node agreement check (dangerous)",
822
                          action="store_true", default=False)
823

    
824
MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
825
                            help="Specify the mac prefix for the instance IP"
826
                            " addresses, in the format XX:XX:XX",
827
                            metavar="PREFIX",
828
                            default=None)
829

    
830
MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
831
                               help="Specify the node interface (cluster-wide)"
832
                               " on which the master IP address will be added "
833
                               " [%s]" % constants.DEFAULT_BRIDGE,
834
                               metavar="NETDEV",
835
                               default=constants.DEFAULT_BRIDGE)
836

    
837

    
838
GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
839
                                help="Specify the default directory (cluster-"
840
                                "wide) for storing the file-based disks [%s]" %
841
                                constants.DEFAULT_FILE_STORAGE_DIR,
842
                                metavar="DIR",
843
                                default=constants.DEFAULT_FILE_STORAGE_DIR)
844

    
845
NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
846
                                   help="Don't modify /etc/hosts",
847
                                   action="store_false", default=True)
848

    
849
NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
850
                                    help="Don't initialize SSH keys",
851
                                    action="store_false", default=True)
852

    
853
ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
854
                             help="Enable parseable error messages",
855
                             action="store_true", default=False)
856

    
857
NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
858
                          help="Skip N+1 memory redundancy tests",
859
                          action="store_true", default=False)
860

    
861
REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
862
                             help="Type of reboot: soft/hard/full",
863
                             default=constants.INSTANCE_REBOOT_HARD,
864
                             metavar="<REBOOT>",
865
                             choices=list(constants.REBOOT_TYPES))
866

    
867
IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
868
                                    dest="ignore_secondaries",
869
                                    default=False, action="store_true",
870
                                    help="Ignore errors from secondaries")
871

    
872
NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
873
                            action="store_false", default=True,
874
                            help="Don't shutdown the instance (unsafe)")
875

    
876
TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
877
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
878
                         help="Maximum time to wait")
879

    
880
SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
881
                         dest="shutdown_timeout", type="int",
882
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
883
                         help="Maximum time to wait for instance shutdown")
884

    
885
EARLY_RELEASE_OPT = cli_option("--early-release",
886
                               dest="early_release", default=False,
887
                               action="store_true",
888
                               help="Release the locks on the secondary"
889
                               " node(s) early")
890

    
891
NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
892
                                  dest="new_cluster_cert",
893
                                  default=False, action="store_true",
894
                                  help="Generate a new cluster certificate")
895

    
896
RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
897
                           default=None,
898
                           help="File containing new RAPI certificate")
899

    
900
NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
901
                               default=None, action="store_true",
902
                               help=("Generate a new self-signed RAPI"
903
                                     " certificate"))
904

    
905
NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
906
                                    dest="new_confd_hmac_key",
907
                                    default=False, action="store_true",
908
                                    help=("Create a new HMAC key for %s" %
909
                                          constants.CONFD))
910

    
911
USE_REPL_NET_OPT = cli_option("--use-replication-network",
912
                              dest="use_replication_network",
913
                              help="Whether to use the replication network"
914
                              " for talking to the nodes",
915
                              action="store_true", default=False)
916

    
917
MAINTAIN_NODE_HEALTH_OPT = \
918
    cli_option("--maintain-node-health", dest="maintain_node_health",
919
               metavar=_YORNO, default=None, type="bool",
920
               help="Configure the cluster to automatically maintain node"
921
               " health, by shutting down unknown instances, shutting down"
922
               " unknown DRBD devices, etc.")
923

    
924

    
925
def _ParseArgs(argv, commands, aliases):
926
  """Parser for the command line arguments.
927

928
  This function parses the arguments and returns the function which
929
  must be executed together with its (modified) arguments.
930

931
  @param argv: the command line
932
  @param commands: dictionary with special contents, see the design
933
      doc for cmdline handling
934
  @param aliases: dictionary with command aliases {'alias': 'target, ...}
935

936
  """
937
  if len(argv) == 0:
938
    binary = "<command>"
939
  else:
940
    binary = argv[0].split("/")[-1]
941

    
942
  if len(argv) > 1 and argv[1] == "--version":
943
    ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
944
    # Quit right away. That way we don't have to care about this special
945
    # argument. optparse.py does it the same.
946
    sys.exit(0)
947

    
948
  if len(argv) < 2 or not (argv[1] in commands or
949
                           argv[1] in aliases):
950
    # let's do a nice thing
951
    sortedcmds = commands.keys()
952
    sortedcmds.sort()
953

    
954
    ToStdout("Usage: %s {command} [options...] [argument...]", binary)
955
    ToStdout("%s <command> --help to see details, or man %s", binary, binary)
956
    ToStdout("")
957

    
958
    # compute the max line length for cmd + usage
959
    mlen = max([len(" %s" % cmd) for cmd in commands])
960
    mlen = min(60, mlen) # should not get here...
961

    
962
    # and format a nice command list
963
    ToStdout("Commands:")
964
    for cmd in sortedcmds:
965
      cmdstr = " %s" % (cmd,)
966
      help_text = commands[cmd][4]
967
      help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
968
      ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
969
      for line in help_lines:
970
        ToStdout("%-*s   %s", mlen, "", line)
971

    
972
    ToStdout("")
973

    
974
    return None, None, None
975

    
976
  # get command, unalias it, and look it up in commands
977
  cmd = argv.pop(1)
978
  if cmd in aliases:
979
    if cmd in commands:
980
      raise errors.ProgrammerError("Alias '%s' overrides an existing"
981
                                   " command" % cmd)
982

    
983
    if aliases[cmd] not in commands:
984
      raise errors.ProgrammerError("Alias '%s' maps to non-existing"
985
                                   " command '%s'" % (cmd, aliases[cmd]))
986

    
987
    cmd = aliases[cmd]
988

    
989
  func, args_def, parser_opts, usage, description = commands[cmd]
990
  parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT, DEBUG_OPT],
991
                        description=description,
992
                        formatter=TitledHelpFormatter(),
993
                        usage="%%prog %s %s" % (cmd, usage))
994
  parser.disable_interspersed_args()
995
  options, args = parser.parse_args()
996

    
997
  if not _CheckArguments(cmd, args_def, args):
998
    return None, None, None
999

    
1000
  return func, options, args
1001

    
1002

    
1003
def _CheckArguments(cmd, args_def, args):
1004
  """Verifies the arguments using the argument definition.
1005

1006
  Algorithm:
1007

1008
    1. Abort with error if values specified by user but none expected.
1009

1010
    1. For each argument in definition
1011

1012
      1. Keep running count of minimum number of values (min_count)
1013
      1. Keep running count of maximum number of values (max_count)
1014
      1. If it has an unlimited number of values
1015

1016
        1. Abort with error if it's not the last argument in the definition
1017

1018
    1. If last argument has limited number of values
1019

1020
      1. Abort with error if number of values doesn't match or is too large
1021

1022
    1. Abort with error if user didn't pass enough values (min_count)
1023

1024
  """
1025
  if args and not args_def:
1026
    ToStderr("Error: Command %s expects no arguments", cmd)
1027
    return False
1028

    
1029
  min_count = None
1030
  max_count = None
1031
  check_max = None
1032

    
1033
  last_idx = len(args_def) - 1
1034

    
1035
  for idx, arg in enumerate(args_def):
1036
    if min_count is None:
1037
      min_count = arg.min
1038
    elif arg.min is not None:
1039
      min_count += arg.min
1040

    
1041
    if max_count is None:
1042
      max_count = arg.max
1043
    elif arg.max is not None:
1044
      max_count += arg.max
1045

    
1046
    if idx == last_idx:
1047
      check_max = (arg.max is not None)
1048

    
1049
    elif arg.max is None:
1050
      raise errors.ProgrammerError("Only the last argument can have max=None")
1051

    
1052
  if check_max:
1053
    # Command with exact number of arguments
1054
    if (min_count is not None and max_count is not None and
1055
        min_count == max_count and len(args) != min_count):
1056
      ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
1057
      return False
1058

    
1059
    # Command with limited number of arguments
1060
    if max_count is not None and len(args) > max_count:
1061
      ToStderr("Error: Command %s expects only %d argument(s)",
1062
               cmd, max_count)
1063
      return False
1064

    
1065
  # Command with some required arguments
1066
  if min_count is not None and len(args) < min_count:
1067
    ToStderr("Error: Command %s expects at least %d argument(s)",
1068
             cmd, min_count)
1069
    return False
1070

    
1071
  return True
1072

    
1073

    
1074
def SplitNodeOption(value):
1075
  """Splits the value of a --node option.
1076

1077
  """
1078
  if value and ':' in value:
1079
    return value.split(':', 1)
1080
  else:
1081
    return (value, None)
1082

    
1083

    
1084
def CalculateOSNames(os_name, os_variants):
1085
  """Calculates all the names an OS can be called, according to its variants.
1086

1087
  @type os_name: string
1088
  @param os_name: base name of the os
1089
  @type os_variants: list or None
1090
  @param os_variants: list of supported variants
1091
  @rtype: list
1092
  @return: list of valid names
1093

1094
  """
1095
  if os_variants:
1096
    return ['%s+%s' % (os_name, v) for v in os_variants]
1097
  else:
1098
    return [os_name]
1099

    
1100

    
1101
def UsesRPC(fn):
1102
  def wrapper(*args, **kwargs):
1103
    rpc.Init()
1104
    try:
1105
      return fn(*args, **kwargs)
1106
    finally:
1107
      rpc.Shutdown()
1108
  return wrapper
1109

    
1110

    
1111
def AskUser(text, choices=None):
1112
  """Ask the user a question.
1113

1114
  @param text: the question to ask
1115

1116
  @param choices: list with elements tuples (input_char, return_value,
1117
      description); if not given, it will default to: [('y', True,
1118
      'Perform the operation'), ('n', False, 'Do no do the operation')];
1119
      note that the '?' char is reserved for help
1120

1121
  @return: one of the return values from the choices list; if input is
1122
      not possible (i.e. not running with a tty, we return the last
1123
      entry from the list
1124

1125
  """
1126
  if choices is None:
1127
    choices = [('y', True, 'Perform the operation'),
1128
               ('n', False, 'Do not perform the operation')]
1129
  if not choices or not isinstance(choices, list):
1130
    raise errors.ProgrammerError("Invalid choices argument to AskUser")
1131
  for entry in choices:
1132
    if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
1133
      raise errors.ProgrammerError("Invalid choices element to AskUser")
1134

    
1135
  answer = choices[-1][1]
1136
  new_text = []
1137
  for line in text.splitlines():
1138
    new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
1139
  text = "\n".join(new_text)
1140
  try:
1141
    f = file("/dev/tty", "a+")
1142
  except IOError:
1143
    return answer
1144
  try:
1145
    chars = [entry[0] for entry in choices]
1146
    chars[-1] = "[%s]" % chars[-1]
1147
    chars.append('?')
1148
    maps = dict([(entry[0], entry[1]) for entry in choices])
1149
    while True:
1150
      f.write(text)
1151
      f.write('\n')
1152
      f.write("/".join(chars))
1153
      f.write(": ")
1154
      line = f.readline(2).strip().lower()
1155
      if line in maps:
1156
        answer = maps[line]
1157
        break
1158
      elif line == '?':
1159
        for entry in choices:
1160
          f.write(" %s - %s\n" % (entry[0], entry[2]))
1161
        f.write("\n")
1162
        continue
1163
  finally:
1164
    f.close()
1165
  return answer
1166

    
1167

    
1168
class JobSubmittedException(Exception):
1169
  """Job was submitted, client should exit.
1170

1171
  This exception has one argument, the ID of the job that was
1172
  submitted. The handler should print this ID.
1173

1174
  This is not an error, just a structured way to exit from clients.
1175

1176
  """
1177

    
1178

    
1179
def SendJob(ops, cl=None):
1180
  """Function to submit an opcode without waiting for the results.
1181

1182
  @type ops: list
1183
  @param ops: list of opcodes
1184
  @type cl: luxi.Client
1185
  @param cl: the luxi client to use for communicating with the master;
1186
             if None, a new client will be created
1187

1188
  """
1189
  if cl is None:
1190
    cl = GetClient()
1191

    
1192
  job_id = cl.SubmitJob(ops)
1193

    
1194
  return job_id
1195

    
1196

    
1197
def PollJob(job_id, cl=None, feedback_fn=None):
1198
  """Function to poll for the result of a job.
1199

1200
  @type job_id: job identified
1201
  @param job_id: the job to poll for results
1202
  @type cl: luxi.Client
1203
  @param cl: the luxi client to use for communicating with the master;
1204
             if None, a new client will be created
1205

1206
  """
1207
  if cl is None:
1208
    cl = GetClient()
1209

    
1210
  prev_job_info = None
1211
  prev_logmsg_serial = None
1212

    
1213
  status = None
1214

    
1215
  notified_queued = False
1216
  notified_waitlock = False
1217

    
1218
  while True:
1219
    result = cl.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1220
                                     prev_logmsg_serial)
1221
    if not result:
1222
      # job not found, go away!
1223
      raise errors.JobLost("Job with id %s lost" % job_id)
1224
    elif result == constants.JOB_NOTCHANGED:
1225
      if status is not None and not callable(feedback_fn):
1226
        if status == constants.JOB_STATUS_QUEUED and not notified_queued:
1227
          ToStderr("Job %s is waiting in queue", job_id)
1228
          notified_queued = True
1229
        elif status == constants.JOB_STATUS_WAITLOCK and not notified_waitlock:
1230
          ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1231
          notified_waitlock = True
1232

    
1233
      # Wait again
1234
      continue
1235

    
1236
    # Split result, a tuple of (field values, log entries)
1237
    (job_info, log_entries) = result
1238
    (status, ) = job_info
1239

    
1240
    if log_entries:
1241
      for log_entry in log_entries:
1242
        (serial, timestamp, _, message) = log_entry
1243
        if callable(feedback_fn):
1244
          feedback_fn(log_entry[1:])
1245
        else:
1246
          encoded = utils.SafeEncode(message)
1247
          ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded)
1248
        prev_logmsg_serial = max(prev_logmsg_serial, serial)
1249

    
1250
    # TODO: Handle canceled and archived jobs
1251
    elif status in (constants.JOB_STATUS_SUCCESS,
1252
                    constants.JOB_STATUS_ERROR,
1253
                    constants.JOB_STATUS_CANCELING,
1254
                    constants.JOB_STATUS_CANCELED):
1255
      break
1256

    
1257
    prev_job_info = job_info
1258

    
1259
  jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1260
  if not jobs:
1261
    raise errors.JobLost("Job with id %s lost" % job_id)
1262

    
1263
  status, opstatus, result = jobs[0]
1264
  if status == constants.JOB_STATUS_SUCCESS:
1265
    return result
1266
  elif status in (constants.JOB_STATUS_CANCELING,
1267
                  constants.JOB_STATUS_CANCELED):
1268
    raise errors.OpExecError("Job was canceled")
1269
  else:
1270
    has_ok = False
1271
    for idx, (status, msg) in enumerate(zip(opstatus, result)):
1272
      if status == constants.OP_STATUS_SUCCESS:
1273
        has_ok = True
1274
      elif status == constants.OP_STATUS_ERROR:
1275
        errors.MaybeRaise(msg)
1276
        if has_ok:
1277
          raise errors.OpExecError("partial failure (opcode %d): %s" %
1278
                                   (idx, msg))
1279
        else:
1280
          raise errors.OpExecError(str(msg))
1281
    # default failure mode
1282
    raise errors.OpExecError(result)
1283

    
1284

    
1285
def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):
1286
  """Legacy function to submit an opcode.
1287

1288
  This is just a simple wrapper over the construction of the processor
1289
  instance. It should be extended to better handle feedback and
1290
  interaction functions.
1291

1292
  """
1293
  if cl is None:
1294
    cl = GetClient()
1295

    
1296
  SetGenericOpcodeOpts([op], opts)
1297

    
1298
  job_id = SendJob([op], cl)
1299

    
1300
  op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
1301

    
1302
  return op_results[0]
1303

    
1304

    
1305
def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1306
  """Wrapper around SubmitOpCode or SendJob.
1307

1308
  This function will decide, based on the 'opts' parameter, whether to
1309
  submit and wait for the result of the opcode (and return it), or
1310
  whether to just send the job and print its identifier. It is used in
1311
  order to simplify the implementation of the '--submit' option.
1312

1313
  It will also process the opcodes if we're sending the via SendJob
1314
  (otherwise SubmitOpCode does it).
1315

1316
  """
1317
  if opts and opts.submit_only:
1318
    job = [op]
1319
    SetGenericOpcodeOpts(job, opts)
1320
    job_id = SendJob(job, cl=cl)
1321
    raise JobSubmittedException(job_id)
1322
  else:
1323
    return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1324

    
1325

    
1326
def SetGenericOpcodeOpts(opcode_list, options):
1327
  """Processor for generic options.
1328

1329
  This function updates the given opcodes based on generic command
1330
  line options (like debug, dry-run, etc.).
1331

1332
  @param opcode_list: list of opcodes
1333
  @param options: command line options or None
1334
  @return: None (in-place modification)
1335

1336
  """
1337
  if not options:
1338
    return
1339
  for op in opcode_list:
1340
    op.dry_run = options.dry_run
1341
    op.debug_level = options.debug
1342

    
1343

    
1344
def GetClient():
1345
  # TODO: Cache object?
1346
  try:
1347
    client = luxi.Client()
1348
  except luxi.NoMasterError:
1349
    ss = ssconf.SimpleStore()
1350

    
1351
    # Try to read ssconf file
1352
    try:
1353
      ss.GetMasterNode()
1354
    except errors.ConfigurationError:
1355
      raise errors.OpPrereqError("Cluster not initialized or this machine is"
1356
                                 " not part of a cluster")
1357

    
1358
    master, myself = ssconf.GetMasterAndMyself(ss=ss)
1359
    if master != myself:
1360
      raise errors.OpPrereqError("This is not the master node, please connect"
1361
                                 " to node '%s' and rerun the command" %
1362
                                 master)
1363
    raise
1364
  return client
1365

    
1366

    
1367
def FormatError(err):
1368
  """Return a formatted error message for a given error.
1369

1370
  This function takes an exception instance and returns a tuple
1371
  consisting of two values: first, the recommended exit code, and
1372
  second, a string describing the error message (not
1373
  newline-terminated).
1374

1375
  """
1376
  retcode = 1
1377
  obuf = StringIO()
1378
  msg = str(err)
1379
  if isinstance(err, errors.ConfigurationError):
1380
    txt = "Corrupt configuration file: %s" % msg
1381
    logging.error(txt)
1382
    obuf.write(txt + "\n")
1383
    obuf.write("Aborting.")
1384
    retcode = 2
1385
  elif isinstance(err, errors.HooksAbort):
1386
    obuf.write("Failure: hooks execution failed:\n")
1387
    for node, script, out in err.args[0]:
1388
      if out:
1389
        obuf.write("  node: %s, script: %s, output: %s\n" %
1390
                   (node, script, out))
1391
      else:
1392
        obuf.write("  node: %s, script: %s (no output)\n" %
1393
                   (node, script))
1394
  elif isinstance(err, errors.HooksFailure):
1395
    obuf.write("Failure: hooks general failure: %s" % msg)
1396
  elif isinstance(err, errors.ResolverError):
1397
    this_host = utils.HostInfo.SysName()
1398
    if err.args[0] == this_host:
1399
      msg = "Failure: can't resolve my own hostname ('%s')"
1400
    else:
1401
      msg = "Failure: can't resolve hostname '%s'"
1402
    obuf.write(msg % err.args[0])
1403
  elif isinstance(err, errors.OpPrereqError):
1404
    if len(err.args) == 2:
1405
      obuf.write("Failure: prerequisites not met for this"
1406
               " operation:\nerror type: %s, error details:\n%s" %
1407
                 (err.args[1], err.args[0]))
1408
    else:
1409
      obuf.write("Failure: prerequisites not met for this"
1410
                 " operation:\n%s" % msg)
1411
  elif isinstance(err, errors.OpExecError):
1412
    obuf.write("Failure: command execution error:\n%s" % msg)
1413
  elif isinstance(err, errors.TagError):
1414
    obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1415
  elif isinstance(err, errors.JobQueueDrainError):
1416
    obuf.write("Failure: the job queue is marked for drain and doesn't"
1417
               " accept new requests\n")
1418
  elif isinstance(err, errors.JobQueueFull):
1419
    obuf.write("Failure: the job queue is full and doesn't accept new"
1420
               " job submissions until old jobs are archived\n")
1421
  elif isinstance(err, errors.TypeEnforcementError):
1422
    obuf.write("Parameter Error: %s" % msg)
1423
  elif isinstance(err, errors.ParameterError):
1424
    obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1425
  elif isinstance(err, errors.GenericError):
1426
    obuf.write("Unhandled Ganeti error: %s" % msg)
1427
  elif isinstance(err, luxi.NoMasterError):
1428
    obuf.write("Cannot communicate with the master daemon.\nIs it running"
1429
               " and listening for connections?")
1430
  elif isinstance(err, luxi.TimeoutError):
1431
    obuf.write("Timeout while talking to the master daemon. Error:\n"
1432
               "%s" % msg)
1433
  elif isinstance(err, luxi.ProtocolError):
1434
    obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1435
               "%s" % msg)
1436
  elif isinstance(err, JobSubmittedException):
1437
    obuf.write("JobID: %s\n" % err.args[0])
1438
    retcode = 0
1439
  else:
1440
    obuf.write("Unhandled exception: %s" % msg)
1441
  return retcode, obuf.getvalue().rstrip('\n')
1442

    
1443

    
1444
def GenericMain(commands, override=None, aliases=None):
1445
  """Generic main function for all the gnt-* commands.
1446

1447
  Arguments:
1448
    - commands: a dictionary with a special structure, see the design doc
1449
                for command line handling.
1450
    - override: if not None, we expect a dictionary with keys that will
1451
                override command line options; this can be used to pass
1452
                options from the scripts to generic functions
1453
    - aliases: dictionary with command aliases {'alias': 'target, ...}
1454

1455
  """
1456
  # save the program name and the entire command line for later logging
1457
  if sys.argv:
1458
    binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1459
    if len(sys.argv) >= 2:
1460
      binary += " " + sys.argv[1]
1461
      old_cmdline = " ".join(sys.argv[2:])
1462
    else:
1463
      old_cmdline = ""
1464
  else:
1465
    binary = "<unknown program>"
1466
    old_cmdline = ""
1467

    
1468
  if aliases is None:
1469
    aliases = {}
1470

    
1471
  try:
1472
    func, options, args = _ParseArgs(sys.argv, commands, aliases)
1473
  except errors.ParameterError, err:
1474
    result, err_msg = FormatError(err)
1475
    ToStderr(err_msg)
1476
    return 1
1477

    
1478
  if func is None: # parse error
1479
    return 1
1480

    
1481
  if override is not None:
1482
    for key, val in override.iteritems():
1483
      setattr(options, key, val)
1484

    
1485
  utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1486
                     stderr_logging=True, program=binary)
1487

    
1488
  if old_cmdline:
1489
    logging.info("run with arguments '%s'", old_cmdline)
1490
  else:
1491
    logging.info("run with no arguments")
1492

    
1493
  try:
1494
    result = func(options, args)
1495
  except (errors.GenericError, luxi.ProtocolError,
1496
          JobSubmittedException), err:
1497
    result, err_msg = FormatError(err)
1498
    logging.exception("Error during command processing")
1499
    ToStderr(err_msg)
1500

    
1501
  return result
1502

    
1503

    
1504
def GenericInstanceCreate(mode, opts, args):
1505
  """Add an instance to the cluster via either creation or import.
1506

1507
  @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1508
  @param opts: the command line options selected by the user
1509
  @type args: list
1510
  @param args: should contain only one element, the new instance name
1511
  @rtype: int
1512
  @return: the desired exit code
1513

1514
  """
1515
  instance = args[0]
1516

    
1517
  (pnode, snode) = SplitNodeOption(opts.node)
1518

    
1519
  hypervisor = None
1520
  hvparams = {}
1521
  if opts.hypervisor:
1522
    hypervisor, hvparams = opts.hypervisor
1523

    
1524
  if opts.nics:
1525
    try:
1526
      nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
1527
    except ValueError, err:
1528
      raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
1529
    nics = [{}] * nic_max
1530
    for nidx, ndict in opts.nics:
1531
      nidx = int(nidx)
1532
      if not isinstance(ndict, dict):
1533
        msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict)
1534
        raise errors.OpPrereqError(msg)
1535
      nics[nidx] = ndict
1536
  elif opts.no_nics:
1537
    # no nics
1538
    nics = []
1539
  else:
1540
    # default of one nic, all auto
1541
    nics = [{}]
1542

    
1543
  if opts.disk_template == constants.DT_DISKLESS:
1544
    if opts.disks or opts.sd_size is not None:
1545
      raise errors.OpPrereqError("Diskless instance but disk"
1546
                                 " information passed")
1547
    disks = []
1548
  else:
1549
    if not opts.disks and not opts.sd_size:
1550
      raise errors.OpPrereqError("No disk information specified")
1551
    if opts.disks and opts.sd_size is not None:
1552
      raise errors.OpPrereqError("Please use either the '--disk' or"
1553
                                 " '-s' option")
1554
    if opts.sd_size is not None:
1555
      opts.disks = [(0, {"size": opts.sd_size})]
1556
    try:
1557
      disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1558
    except ValueError, err:
1559
      raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
1560
    disks = [{}] * disk_max
1561
    for didx, ddict in opts.disks:
1562
      didx = int(didx)
1563
      if not isinstance(ddict, dict):
1564
        msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1565
        raise errors.OpPrereqError(msg)
1566
      elif "size" in ddict:
1567
        if "adopt" in ddict:
1568
          raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1569
                                     " (disk %d)" % didx)
1570
        try:
1571
          ddict["size"] = utils.ParseUnit(ddict["size"])
1572
        except ValueError, err:
1573
          raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1574
                                     (didx, err))
1575
      elif "adopt" in ddict:
1576
        if mode == constants.INSTANCE_IMPORT:
1577
          raise errors.OpPrereqError("Disk adoption not allowed for instance"
1578
                                     " import")
1579
        ddict["size"] = 0
1580
      else:
1581
        raise errors.OpPrereqError("Missing size or adoption source for"
1582
                                   " disk %d" % didx)
1583
      disks[didx] = ddict
1584

    
1585
  utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
1586
  utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1587

    
1588
  if mode == constants.INSTANCE_CREATE:
1589
    start = opts.start
1590
    os_type = opts.os
1591
    src_node = None
1592
    src_path = None
1593
    no_install = opts.no_install
1594
  elif mode == constants.INSTANCE_IMPORT:
1595
    start = False
1596
    os_type = None
1597
    src_node = opts.src_node
1598
    src_path = opts.src_dir
1599
    no_install = None
1600
  else:
1601
    raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1602

    
1603
  op = opcodes.OpCreateInstance(instance_name=instance,
1604
                                disks=disks,
1605
                                disk_template=opts.disk_template,
1606
                                nics=nics,
1607
                                pnode=pnode, snode=snode,
1608
                                ip_check=opts.ip_check,
1609
                                name_check=opts.name_check,
1610
                                wait_for_sync=opts.wait_for_sync,
1611
                                file_storage_dir=opts.file_storage_dir,
1612
                                file_driver=opts.file_driver,
1613
                                iallocator=opts.iallocator,
1614
                                hypervisor=hypervisor,
1615
                                hvparams=hvparams,
1616
                                beparams=opts.beparams,
1617
                                mode=mode,
1618
                                start=start,
1619
                                os_type=os_type,
1620
                                src_node=src_node,
1621
                                src_path=src_path,
1622
                                no_install=no_install)
1623

    
1624
  SubmitOrSend(op, opts)
1625
  return 0
1626

    
1627

    
1628
class _RunWhileClusterStoppedHelper:
1629
  """Helper class for L{RunWhileClusterStopped} to simplify state management
1630

1631
  """
1632
  def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
1633
    """Initializes this class.
1634

1635
    @type feedback_fn: callable
1636
    @param feedback_fn: Feedback function
1637
    @type cluster_name: string
1638
    @param cluster_name: Cluster name
1639
    @type master_node: string
1640
    @param master_node Master node name
1641
    @type online_nodes: list
1642
    @param online_nodes: List of names of online nodes
1643

1644
    """
1645
    self.feedback_fn = feedback_fn
1646
    self.cluster_name = cluster_name
1647
    self.master_node = master_node
1648
    self.online_nodes = online_nodes
1649

    
1650
    self.ssh = ssh.SshRunner(self.cluster_name)
1651

    
1652
    self.nonmaster_nodes = [name for name in online_nodes
1653
                            if name != master_node]
1654

    
1655
    assert self.master_node not in self.nonmaster_nodes
1656

    
1657
  def _RunCmd(self, node_name, cmd):
1658
    """Runs a command on the local or a remote machine.
1659

1660
    @type node_name: string
1661
    @param node_name: Machine name
1662
    @type cmd: list
1663
    @param cmd: Command
1664

1665
    """
1666
    if node_name is None or node_name == self.master_node:
1667
      # No need to use SSH
1668
      result = utils.RunCmd(cmd)
1669
    else:
1670
      result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
1671

    
1672
    if result.failed:
1673
      errmsg = ["Failed to run command %s" % result.cmd]
1674
      if node_name:
1675
        errmsg.append("on node %s" % node_name)
1676
      errmsg.append(": exitcode %s and error %s" %
1677
                    (result.exit_code, result.output))
1678
      raise errors.OpExecError(" ".join(errmsg))
1679

    
1680
  def Call(self, fn, *args):
1681
    """Call function while all daemons are stopped.
1682

1683
    @type fn: callable
1684
    @param fn: Function to be called
1685

1686
    """
1687
    # Pause watcher by acquiring an exclusive lock on watcher state file
1688
    self.feedback_fn("Blocking watcher")
1689
    watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
1690
    try:
1691
      # TODO: Currently, this just blocks. There's no timeout.
1692
      # TODO: Should it be a shared lock?
1693
      watcher_block.Exclusive(blocking=True)
1694

    
1695
      # Stop master daemons, so that no new jobs can come in and all running
1696
      # ones are finished
1697
      self.feedback_fn("Stopping master daemons")
1698
      self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
1699
      try:
1700
        # Stop daemons on all nodes
1701
        for node_name in self.online_nodes:
1702
          self.feedback_fn("Stopping daemons on %s" % node_name)
1703
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
1704

    
1705
        # All daemons are shut down now
1706
        try:
1707
          return fn(self, *args)
1708
        except Exception, err:
1709
          _, errmsg = FormatError(err)
1710
          logging.exception("Caught exception")
1711
          self.feedback_fn(errmsg)
1712
          raise
1713
      finally:
1714
        # Start cluster again, master node last
1715
        for node_name in self.nonmaster_nodes + [self.master_node]:
1716
          self.feedback_fn("Starting daemons on %s" % node_name)
1717
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
1718
    finally:
1719
      # Resume watcher
1720
      watcher_block.Close()
1721

    
1722

    
1723
def RunWhileClusterStopped(feedback_fn, fn, *args):
1724
  """Calls a function while all cluster daemons are stopped.
1725

1726
  @type feedback_fn: callable
1727
  @param feedback_fn: Feedback function
1728
  @type fn: callable
1729
  @param fn: Function to be called when daemons are stopped
1730

1731
  """
1732
  feedback_fn("Gathering cluster information")
1733

    
1734
  # This ensures we're running on the master daemon
1735
  cl = GetClient()
1736

    
1737
  (cluster_name, master_node) = \
1738
    cl.QueryConfigValues(["cluster_name", "master_node"])
1739

    
1740
  online_nodes = GetOnlineNodes([], cl=cl)
1741

    
1742
  # Don't keep a reference to the client. The master daemon will go away.
1743
  del cl
1744

    
1745
  assert master_node in online_nodes
1746

    
1747
  return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
1748
                                       online_nodes).Call(fn, *args)
1749

    
1750

    
1751
def GenerateTable(headers, fields, separator, data,
1752
                  numfields=None, unitfields=None,
1753
                  units=None):
1754
  """Prints a table with headers and different fields.
1755

1756
  @type headers: dict
1757
  @param headers: dictionary mapping field names to headers for
1758
      the table
1759
  @type fields: list
1760
  @param fields: the field names corresponding to each row in
1761
      the data field
1762
  @param separator: the separator to be used; if this is None,
1763
      the default 'smart' algorithm is used which computes optimal
1764
      field width, otherwise just the separator is used between
1765
      each field
1766
  @type data: list
1767
  @param data: a list of lists, each sublist being one row to be output
1768
  @type numfields: list
1769
  @param numfields: a list with the fields that hold numeric
1770
      values and thus should be right-aligned
1771
  @type unitfields: list
1772
  @param unitfields: a list with the fields that hold numeric
1773
      values that should be formatted with the units field
1774
  @type units: string or None
1775
  @param units: the units we should use for formatting, or None for
1776
      automatic choice (human-readable for non-separator usage, otherwise
1777
      megabytes); this is a one-letter string
1778

1779
  """
1780
  if units is None:
1781
    if separator:
1782
      units = "m"
1783
    else:
1784
      units = "h"
1785

    
1786
  if numfields is None:
1787
    numfields = []
1788
  if unitfields is None:
1789
    unitfields = []
1790

    
1791
  numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
1792
  unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
1793

    
1794
  format_fields = []
1795
  for field in fields:
1796
    if headers and field not in headers:
1797
      # TODO: handle better unknown fields (either revert to old
1798
      # style of raising exception, or deal more intelligently with
1799
      # variable fields)
1800
      headers[field] = field
1801
    if separator is not None:
1802
      format_fields.append("%s")
1803
    elif numfields.Matches(field):
1804
      format_fields.append("%*s")
1805
    else:
1806
      format_fields.append("%-*s")
1807

    
1808
  if separator is None:
1809
    mlens = [0 for name in fields]
1810
    format = ' '.join(format_fields)
1811
  else:
1812
    format = separator.replace("%", "%%").join(format_fields)
1813

    
1814
  for row in data:
1815
    if row is None:
1816
      continue
1817
    for idx, val in enumerate(row):
1818
      if unitfields.Matches(fields[idx]):
1819
        try:
1820
          val = int(val)
1821
        except (TypeError, ValueError):
1822
          pass
1823
        else:
1824
          val = row[idx] = utils.FormatUnit(val, units)
1825
      val = row[idx] = str(val)
1826
      if separator is None:
1827
        mlens[idx] = max(mlens[idx], len(val))
1828

    
1829
  result = []
1830
  if headers:
1831
    args = []
1832
    for idx, name in enumerate(fields):
1833
      hdr = headers[name]
1834
      if separator is None:
1835
        mlens[idx] = max(mlens[idx], len(hdr))
1836
        args.append(mlens[idx])
1837
      args.append(hdr)
1838
    result.append(format % tuple(args))
1839

    
1840
  if separator is None:
1841
    assert len(mlens) == len(fields)
1842

    
1843
    if fields and not numfields.Matches(fields[-1]):
1844
      mlens[-1] = 0
1845

    
1846
  for line in data:
1847
    args = []
1848
    if line is None:
1849
      line = ['-' for _ in fields]
1850
    for idx in range(len(fields)):
1851
      if separator is None:
1852
        args.append(mlens[idx])
1853
      args.append(line[idx])
1854
    result.append(format % tuple(args))
1855

    
1856
  return result
1857

    
1858

    
1859
def FormatTimestamp(ts):
1860
  """Formats a given timestamp.
1861

1862
  @type ts: timestamp
1863
  @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
1864

1865
  @rtype: string
1866
  @return: a string with the formatted timestamp
1867

1868
  """
1869
  if not isinstance (ts, (tuple, list)) or len(ts) != 2:
1870
    return '?'
1871
  sec, usec = ts
1872
  return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
1873

    
1874

    
1875
def ParseTimespec(value):
1876
  """Parse a time specification.
1877

1878
  The following suffixed will be recognized:
1879

1880
    - s: seconds
1881
    - m: minutes
1882
    - h: hours
1883
    - d: day
1884
    - w: weeks
1885

1886
  Without any suffix, the value will be taken to be in seconds.
1887

1888
  """
1889
  value = str(value)
1890
  if not value:
1891
    raise errors.OpPrereqError("Empty time specification passed")
1892
  suffix_map = {
1893
    's': 1,
1894
    'm': 60,
1895
    'h': 3600,
1896
    'd': 86400,
1897
    'w': 604800,
1898
    }
1899
  if value[-1] not in suffix_map:
1900
    try:
1901
      value = int(value)
1902
    except (TypeError, ValueError):
1903
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1904
  else:
1905
    multiplier = suffix_map[value[-1]]
1906
    value = value[:-1]
1907
    if not value: # no data left after stripping the suffix
1908
      raise errors.OpPrereqError("Invalid time specification (only"
1909
                                 " suffix passed)")
1910
    try:
1911
      value = int(value) * multiplier
1912
    except (TypeError, ValueError):
1913
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1914
  return value
1915

    
1916

    
1917
def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
1918
                   filter_master=False):
1919
  """Returns the names of online nodes.
1920

1921
  This function will also log a warning on stderr with the names of
1922
  the online nodes.
1923

1924
  @param nodes: if not empty, use only this subset of nodes (minus the
1925
      offline ones)
1926
  @param cl: if not None, luxi client to use
1927
  @type nowarn: boolean
1928
  @param nowarn: by default, this function will output a note with the
1929
      offline nodes that are skipped; if this parameter is True the
1930
      note is not displayed
1931
  @type secondary_ips: boolean
1932
  @param secondary_ips: if True, return the secondary IPs instead of the
1933
      names, useful for doing network traffic over the replication interface
1934
      (if any)
1935
  @type filter_master: boolean
1936
  @param filter_master: if True, do not return the master node in the list
1937
      (useful in coordination with secondary_ips where we cannot check our
1938
      node name against the list)
1939

1940
  """
1941
  if cl is None:
1942
    cl = GetClient()
1943

    
1944
  if secondary_ips:
1945
    name_idx = 2
1946
  else:
1947
    name_idx = 0
1948

    
1949
  if filter_master:
1950
    master_node = cl.QueryConfigValues(["master_node"])[0]
1951
    filter_fn = lambda x: x != master_node
1952
  else:
1953
    filter_fn = lambda _: True
1954

    
1955
  result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"],
1956
                         use_locking=False)
1957
  offline = [row[0] for row in result if row[1]]
1958
  if offline and not nowarn:
1959
    ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
1960
  return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])]
1961

    
1962

    
1963
def _ToStream(stream, txt, *args):
1964
  """Write a message to a stream, bypassing the logging system
1965

1966
  @type stream: file object
1967
  @param stream: the file to which we should write
1968
  @type txt: str
1969
  @param txt: the message
1970

1971
  """
1972
  if args:
1973
    args = tuple(args)
1974
    stream.write(txt % args)
1975
  else:
1976
    stream.write(txt)
1977
  stream.write('\n')
1978
  stream.flush()
1979

    
1980

    
1981
def ToStdout(txt, *args):
1982
  """Write a message to stdout only, bypassing the logging system
1983

1984
  This is just a wrapper over _ToStream.
1985

1986
  @type txt: str
1987
  @param txt: the message
1988

1989
  """
1990
  _ToStream(sys.stdout, txt, *args)
1991

    
1992

    
1993
def ToStderr(txt, *args):
1994
  """Write a message to stderr only, bypassing the logging system
1995

1996
  This is just a wrapper over _ToStream.
1997

1998
  @type txt: str
1999
  @param txt: the message
2000

2001
  """
2002
  _ToStream(sys.stderr, txt, *args)
2003

    
2004

    
2005
class JobExecutor(object):
2006
  """Class which manages the submission and execution of multiple jobs.
2007

2008
  Note that instances of this class should not be reused between
2009
  GetResults() calls.
2010

2011
  """
2012
  def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
2013
    self.queue = []
2014
    if cl is None:
2015
      cl = GetClient()
2016
    self.cl = cl
2017
    self.verbose = verbose
2018
    self.jobs = []
2019
    self.opts = opts
2020
    self.feedback_fn = feedback_fn
2021

    
2022
  def QueueJob(self, name, *ops):
2023
    """Record a job for later submit.
2024

2025
    @type name: string
2026
    @param name: a description of the job, will be used in WaitJobSet
2027
    """
2028
    SetGenericOpcodeOpts(ops, self.opts)
2029
    self.queue.append((name, ops))
2030

    
2031
  def SubmitPending(self):
2032
    """Submit all pending jobs.
2033

2034
    """
2035
    results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
2036
    for (idx, ((status, data), (name, _))) in enumerate(zip(results,
2037
                                                            self.queue)):
2038
      self.jobs.append((idx, status, data, name))
2039

    
2040
  def _ChooseJob(self):
2041
    """Choose a non-waiting/queued job to poll next.
2042

2043
    """
2044
    assert self.jobs, "_ChooseJob called with empty job list"
2045

    
2046
    result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
2047
    assert result
2048

    
2049
    for job_data, status in zip(self.jobs, result):
2050
      if status[0] in (constants.JOB_STATUS_QUEUED,
2051
                    constants.JOB_STATUS_WAITLOCK,
2052
                    constants.JOB_STATUS_CANCELING):
2053
        # job is still waiting
2054
        continue
2055
      # good candidate found
2056
      self.jobs.remove(job_data)
2057
      return job_data
2058

    
2059
    # no job found
2060
    return self.jobs.pop(0)
2061

    
2062
  def GetResults(self):
2063
    """Wait for and return the results of all jobs.
2064

2065
    @rtype: list
2066
    @return: list of tuples (success, job results), in the same order
2067
        as the submitted jobs; if a job has failed, instead of the result
2068
        there will be the error message
2069

2070
    """
2071
    if not self.jobs:
2072
      self.SubmitPending()
2073
    results = []
2074
    if self.verbose:
2075
      ok_jobs = [row[2] for row in self.jobs if row[1]]
2076
      if ok_jobs:
2077
        ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2078

    
2079
    # first, remove any non-submitted jobs
2080
    self.jobs, failures = utils.partition(self.jobs, lambda x: x[1])
2081
    for idx, _, jid, name in failures:
2082
      ToStderr("Failed to submit job for %s: %s", name, jid)
2083
      results.append((idx, False, jid))
2084

    
2085
    while self.jobs:
2086
      (idx, _, jid, name) = self._ChooseJob()
2087
      ToStdout("Waiting for job %s for %s...", jid, name)
2088
      try:
2089
        job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2090
        success = True
2091
      except (errors.GenericError, luxi.ProtocolError), err:
2092
        _, job_result = FormatError(err)
2093
        success = False
2094
        # the error message will always be shown, verbose or not
2095
        ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
2096

    
2097
      results.append((idx, success, job_result))
2098

    
2099
    # sort based on the index, then drop it
2100
    results.sort()
2101
    results = [i[1:] for i in results]
2102

    
2103
    return results
2104

    
2105
  def WaitOrShow(self, wait):
2106
    """Wait for job results or only print the job IDs.
2107

2108
    @type wait: boolean
2109
    @param wait: whether to wait or not
2110

2111
    """
2112
    if wait:
2113
      return self.GetResults()
2114
    else:
2115
      if not self.jobs:
2116
        self.SubmitPending()
2117
      for _, status, result, name in self.jobs:
2118
        if status:
2119
          ToStdout("%s: %s", result, name)
2120
        else:
2121
          ToStderr("Failure for %s: %s", name, result)