Statistics
| Branch: | Tag: | Revision:

root / lib / cli.py @ e7b61bb0

History | View | Annotate | Download (68.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module dealing with command line parsing"""
23

    
24

    
25
import sys
26
import textwrap
27
import os.path
28
import time
29
import logging
30
from cStringIO import StringIO
31

    
32
from ganeti import utils
33
from ganeti import errors
34
from ganeti import constants
35
from ganeti import opcodes
36
from ganeti import luxi
37
from ganeti import ssconf
38
from ganeti import rpc
39
from ganeti import ssh
40

    
41
from optparse import (OptionParser, TitledHelpFormatter,
42
                      Option, OptionValueError)
43

    
44

    
45
__all__ = [
46
  # Command line options
47
  "ALLOCATABLE_OPT",
48
  "ALL_OPT",
49
  "AUTO_PROMOTE_OPT",
50
  "AUTO_REPLACE_OPT",
51
  "BACKEND_OPT",
52
  "CLEANUP_OPT",
53
  "CONFIRM_OPT",
54
  "CP_SIZE_OPT",
55
  "DEBUG_OPT",
56
  "DEBUG_SIMERR_OPT",
57
  "DISKIDX_OPT",
58
  "DISK_OPT",
59
  "DISK_TEMPLATE_OPT",
60
  "DRAINED_OPT",
61
  "EARLY_RELEASE_OPT",
62
  "ENABLED_HV_OPT",
63
  "ERROR_CODES_OPT",
64
  "FIELDS_OPT",
65
  "FILESTORE_DIR_OPT",
66
  "FILESTORE_DRIVER_OPT",
67
  "FORCE_OPT",
68
  "FORCE_VARIANT_OPT",
69
  "GLOBAL_FILEDIR_OPT",
70
  "HVLIST_OPT",
71
  "HVOPTS_OPT",
72
  "HYPERVISOR_OPT",
73
  "IALLOCATOR_OPT",
74
  "IGNORE_CONSIST_OPT",
75
  "IGNORE_FAILURES_OPT",
76
  "IGNORE_SECONDARIES_OPT",
77
  "IGNORE_SIZE_OPT",
78
  "MAC_PREFIX_OPT",
79
  "MASTER_NETDEV_OPT",
80
  "MC_OPT",
81
  "NET_OPT",
82
  "NEW_CLUSTER_CERT_OPT",
83
  "NEW_CONFD_HMAC_KEY_OPT",
84
  "NEW_RAPI_CERT_OPT",
85
  "NEW_SECONDARY_OPT",
86
  "NIC_PARAMS_OPT",
87
  "NODE_LIST_OPT",
88
  "NODE_PLACEMENT_OPT",
89
  "NOHDR_OPT",
90
  "NOIPCHECK_OPT",
91
  "NO_INSTALL_OPT",
92
  "NONAMECHECK_OPT",
93
  "NOLVM_STORAGE_OPT",
94
  "NOMODIFY_ETCHOSTS_OPT",
95
  "NOMODIFY_SSH_SETUP_OPT",
96
  "NONICS_OPT",
97
  "NONLIVE_OPT",
98
  "NONPLUS1_OPT",
99
  "NOSHUTDOWN_OPT",
100
  "NOSTART_OPT",
101
  "NOSSH_KEYCHECK_OPT",
102
  "NOVOTING_OPT",
103
  "NWSYNC_OPT",
104
  "ON_PRIMARY_OPT",
105
  "ON_SECONDARY_OPT",
106
  "OFFLINE_OPT",
107
  "OS_OPT",
108
  "OS_SIZE_OPT",
109
  "RAPI_CERT_OPT",
110
  "READD_OPT",
111
  "REBOOT_TYPE_OPT",
112
  "SECONDARY_IP_OPT",
113
  "SELECT_OS_OPT",
114
  "SEP_OPT",
115
  "SHOWCMD_OPT",
116
  "SHUTDOWN_TIMEOUT_OPT",
117
  "SINGLE_NODE_OPT",
118
  "SRC_DIR_OPT",
119
  "SRC_NODE_OPT",
120
  "SUBMIT_OPT",
121
  "STATIC_OPT",
122
  "SYNC_OPT",
123
  "TAG_SRC_OPT",
124
  "TIMEOUT_OPT",
125
  "USEUNITS_OPT",
126
  "USE_REPL_NET_OPT",
127
  "VERBOSE_OPT",
128
  "VG_NAME_OPT",
129
  "YES_DOIT_OPT",
130
  # Generic functions for CLI programs
131
  "GenericMain",
132
  "GenericInstanceCreate",
133
  "GetClient",
134
  "GetOnlineNodes",
135
  "JobExecutor",
136
  "JobSubmittedException",
137
  "ParseTimespec",
138
  "RunWhileClusterStopped",
139
  "SubmitOpCode",
140
  "SubmitOrSend",
141
  "UsesRPC",
142
  # Formatting functions
143
  "ToStderr", "ToStdout",
144
  "FormatError",
145
  "GenerateTable",
146
  "AskUser",
147
  "FormatTimestamp",
148
  # Tags functions
149
  "ListTags",
150
  "AddTags",
151
  "RemoveTags",
152
  # command line options support infrastructure
153
  "ARGS_MANY_INSTANCES",
154
  "ARGS_MANY_NODES",
155
  "ARGS_NONE",
156
  "ARGS_ONE_INSTANCE",
157
  "ARGS_ONE_NODE",
158
  "ARGS_ONE_OS",
159
  "ArgChoice",
160
  "ArgCommand",
161
  "ArgFile",
162
  "ArgHost",
163
  "ArgInstance",
164
  "ArgJobId",
165
  "ArgNode",
166
  "ArgOs",
167
  "ArgSuggest",
168
  "ArgUnknown",
169
  "OPT_COMPL_INST_ADD_NODES",
170
  "OPT_COMPL_MANY_NODES",
171
  "OPT_COMPL_ONE_IALLOCATOR",
172
  "OPT_COMPL_ONE_INSTANCE",
173
  "OPT_COMPL_ONE_NODE",
174
  "OPT_COMPL_ONE_OS",
175
  "cli_option",
176
  "SplitNodeOption",
177
  "CalculateOSNames",
178
  ]
179

    
180
NO_PREFIX = "no_"
181
UN_PREFIX = "-"
182

    
183

    
184
class _Argument:
185
  def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
186
    self.min = min
187
    self.max = max
188

    
189
  def __repr__(self):
190
    return ("<%s min=%s max=%s>" %
191
            (self.__class__.__name__, self.min, self.max))
192

    
193

    
194
class ArgSuggest(_Argument):
195
  """Suggesting argument.
196

197
  Value can be any of the ones passed to the constructor.
198

199
  """
200
  # pylint: disable-msg=W0622
201
  def __init__(self, min=0, max=None, choices=None):
202
    _Argument.__init__(self, min=min, max=max)
203
    self.choices = choices
204

    
205
  def __repr__(self):
206
    return ("<%s min=%s max=%s choices=%r>" %
207
            (self.__class__.__name__, self.min, self.max, self.choices))
208

    
209

    
210
class ArgChoice(ArgSuggest):
211
  """Choice argument.
212

213
  Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
214
  but value must be one of the choices.
215

216
  """
217

    
218

    
219
class ArgUnknown(_Argument):
220
  """Unknown argument to program (e.g. determined at runtime).
221

222
  """
223

    
224

    
225
class ArgInstance(_Argument):
226
  """Instances argument.
227

228
  """
229

    
230

    
231
class ArgNode(_Argument):
232
  """Node argument.
233

234
  """
235

    
236
class ArgJobId(_Argument):
237
  """Job ID argument.
238

239
  """
240

    
241

    
242
class ArgFile(_Argument):
243
  """File path argument.
244

245
  """
246

    
247

    
248
class ArgCommand(_Argument):
249
  """Command argument.
250

251
  """
252

    
253

    
254
class ArgHost(_Argument):
255
  """Host argument.
256

257
  """
258

    
259

    
260
class ArgOs(_Argument):
261
  """OS argument.
262

263
  """
264

    
265

    
266
ARGS_NONE = []
267
ARGS_MANY_INSTANCES = [ArgInstance()]
268
ARGS_MANY_NODES = [ArgNode()]
269
ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
270
ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
271
ARGS_ONE_OS = [ArgOs(min=1, max=1)]
272

    
273

    
274
def _ExtractTagsObject(opts, args):
275
  """Extract the tag type object.
276

277
  Note that this function will modify its args parameter.
278

279
  """
280
  if not hasattr(opts, "tag_type"):
281
    raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
282
  kind = opts.tag_type
283
  if kind == constants.TAG_CLUSTER:
284
    retval = kind, kind
285
  elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
286
    if not args:
287
      raise errors.OpPrereqError("no arguments passed to the command")
288
    name = args.pop(0)
289
    retval = kind, name
290
  else:
291
    raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
292
  return retval
293

    
294

    
295
def _ExtendTags(opts, args):
296
  """Extend the args if a source file has been given.
297

298
  This function will extend the tags with the contents of the file
299
  passed in the 'tags_source' attribute of the opts parameter. A file
300
  named '-' will be replaced by stdin.
301

302
  """
303
  fname = opts.tags_source
304
  if fname is None:
305
    return
306
  if fname == "-":
307
    new_fh = sys.stdin
308
  else:
309
    new_fh = open(fname, "r")
310
  new_data = []
311
  try:
312
    # we don't use the nice 'new_data = [line.strip() for line in fh]'
313
    # because of python bug 1633941
314
    while True:
315
      line = new_fh.readline()
316
      if not line:
317
        break
318
      new_data.append(line.strip())
319
  finally:
320
    new_fh.close()
321
  args.extend(new_data)
322

    
323

    
324
def ListTags(opts, args):
325
  """List the tags on a given object.
326

327
  This is a generic implementation that knows how to deal with all
328
  three cases of tag objects (cluster, node, instance). The opts
329
  argument is expected to contain a tag_type field denoting what
330
  object type we work on.
331

332
  """
333
  kind, name = _ExtractTagsObject(opts, args)
334
  cl = GetClient()
335
  result = cl.QueryTags(kind, name)
336
  result = list(result)
337
  result.sort()
338
  for tag in result:
339
    ToStdout(tag)
340

    
341

    
342
def AddTags(opts, args):
343
  """Add tags on a given object.
344

345
  This is a generic implementation that knows how to deal with all
346
  three cases of tag objects (cluster, node, instance). The opts
347
  argument is expected to contain a tag_type field denoting what
348
  object type we work on.
349

350
  """
351
  kind, name = _ExtractTagsObject(opts, args)
352
  _ExtendTags(opts, args)
353
  if not args:
354
    raise errors.OpPrereqError("No tags to be added")
355
  op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
356
  SubmitOpCode(op)
357

    
358

    
359
def RemoveTags(opts, args):
360
  """Remove tags from a given object.
361

362
  This is a generic implementation that knows how to deal with all
363
  three cases of tag objects (cluster, node, instance). The opts
364
  argument is expected to contain a tag_type field denoting what
365
  object type we work on.
366

367
  """
368
  kind, name = _ExtractTagsObject(opts, args)
369
  _ExtendTags(opts, args)
370
  if not args:
371
    raise errors.OpPrereqError("No tags to be removed")
372
  op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
373
  SubmitOpCode(op)
374

    
375

    
376
def check_unit(option, opt, value): # pylint: disable-msg=W0613
377
  """OptParsers custom converter for units.
378

379
  """
380
  try:
381
    return utils.ParseUnit(value)
382
  except errors.UnitParseError, err:
383
    raise OptionValueError("option %s: %s" % (opt, err))
384

    
385

    
386
def _SplitKeyVal(opt, data):
387
  """Convert a KeyVal string into a dict.
388

389
  This function will convert a key=val[,...] string into a dict. Empty
390
  values will be converted specially: keys which have the prefix 'no_'
391
  will have the value=False and the prefix stripped, the others will
392
  have value=True.
393

394
  @type opt: string
395
  @param opt: a string holding the option name for which we process the
396
      data, used in building error messages
397
  @type data: string
398
  @param data: a string of the format key=val,key=val,...
399
  @rtype: dict
400
  @return: {key=val, key=val}
401
  @raises errors.ParameterError: if there are duplicate keys
402

403
  """
404
  kv_dict = {}
405
  if data:
406
    for elem in utils.UnescapeAndSplit(data, sep=","):
407
      if "=" in elem:
408
        key, val = elem.split("=", 1)
409
      else:
410
        if elem.startswith(NO_PREFIX):
411
          key, val = elem[len(NO_PREFIX):], False
412
        elif elem.startswith(UN_PREFIX):
413
          key, val = elem[len(UN_PREFIX):], None
414
        else:
415
          key, val = elem, True
416
      if key in kv_dict:
417
        raise errors.ParameterError("Duplicate key '%s' in option %s" %
418
                                    (key, opt))
419
      kv_dict[key] = val
420
  return kv_dict
421

    
422

    
423
def check_ident_key_val(option, opt, value):  # pylint: disable-msg=W0613
424
  """Custom parser for ident:key=val,key=val options.
425

426
  This will store the parsed values as a tuple (ident, {key: val}). As such,
427
  multiple uses of this option via action=append is possible.
428

429
  """
430
  if ":" not in value:
431
    ident, rest = value, ''
432
  else:
433
    ident, rest = value.split(":", 1)
434

    
435
  if ident.startswith(NO_PREFIX):
436
    if rest:
437
      msg = "Cannot pass options when removing parameter groups: %s" % value
438
      raise errors.ParameterError(msg)
439
    retval = (ident[len(NO_PREFIX):], False)
440
  elif ident.startswith(UN_PREFIX):
441
    if rest:
442
      msg = "Cannot pass options when removing parameter groups: %s" % value
443
      raise errors.ParameterError(msg)
444
    retval = (ident[len(UN_PREFIX):], None)
445
  else:
446
    kv_dict = _SplitKeyVal(opt, rest)
447
    retval = (ident, kv_dict)
448
  return retval
449

    
450

    
451
def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
452
  """Custom parser class for key=val,key=val options.
453

454
  This will store the parsed values as a dict {key: val}.
455

456
  """
457
  return _SplitKeyVal(opt, value)
458

    
459

    
460
def check_bool(option, opt, value): # pylint: disable-msg=W0613
461
  """Custom parser for yes/no options.
462

463
  This will store the parsed value as either True or False.
464

465
  """
466
  value = value.lower()
467
  if value == constants.VALUE_FALSE or value == "no":
468
    return False
469
  elif value == constants.VALUE_TRUE or value == "yes":
470
    return True
471
  else:
472
    raise errors.ParameterError("Invalid boolean value '%s'" % value)
473

    
474

    
475
# completion_suggestion is normally a list. Using numeric values not evaluating
476
# to False for dynamic completion.
477
(OPT_COMPL_MANY_NODES,
478
 OPT_COMPL_ONE_NODE,
479
 OPT_COMPL_ONE_INSTANCE,
480
 OPT_COMPL_ONE_OS,
481
 OPT_COMPL_ONE_IALLOCATOR,
482
 OPT_COMPL_INST_ADD_NODES) = range(100, 106)
483

    
484
OPT_COMPL_ALL = frozenset([
485
  OPT_COMPL_MANY_NODES,
486
  OPT_COMPL_ONE_NODE,
487
  OPT_COMPL_ONE_INSTANCE,
488
  OPT_COMPL_ONE_OS,
489
  OPT_COMPL_ONE_IALLOCATOR,
490
  OPT_COMPL_INST_ADD_NODES,
491
  ])
492

    
493

    
494
class CliOption(Option):
495
  """Custom option class for optparse.
496

497
  """
498
  ATTRS = Option.ATTRS + [
499
    "completion_suggest",
500
    ]
501
  TYPES = Option.TYPES + (
502
    "identkeyval",
503
    "keyval",
504
    "unit",
505
    "bool",
506
    )
507
  TYPE_CHECKER = Option.TYPE_CHECKER.copy()
508
  TYPE_CHECKER["identkeyval"] = check_ident_key_val
509
  TYPE_CHECKER["keyval"] = check_key_val
510
  TYPE_CHECKER["unit"] = check_unit
511
  TYPE_CHECKER["bool"] = check_bool
512

    
513

    
514
# optparse.py sets make_option, so we do it for our own option class, too
515
cli_option = CliOption
516

    
517

    
518
_YORNO = "yes|no"
519

    
520
DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
521
                       help="Increase debugging level")
522

    
523
NOHDR_OPT = cli_option("--no-headers", default=False,
524
                       action="store_true", dest="no_headers",
525
                       help="Don't display column headers")
526

    
527
SEP_OPT = cli_option("--separator", default=None,
528
                     action="store", dest="separator",
529
                     help=("Separator between output fields"
530
                           " (defaults to one space)"))
531

    
532
USEUNITS_OPT = cli_option("--units", default=None,
533
                          dest="units", choices=('h', 'm', 'g', 't'),
534
                          help="Specify units for output (one of hmgt)")
535

    
536
FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
537
                        type="string", metavar="FIELDS",
538
                        help="Comma separated list of output fields")
539

    
540
FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
541
                       default=False, help="Force the operation")
542

    
543
CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
544
                         default=False, help="Do not require confirmation")
545

    
546
TAG_SRC_OPT = cli_option("--from", dest="tags_source",
547
                         default=None, help="File with tag names")
548

    
549
SUBMIT_OPT = cli_option("--submit", dest="submit_only",
550
                        default=False, action="store_true",
551
                        help=("Submit the job and return the job ID, but"
552
                              " don't wait for the job to finish"))
553

    
554
SYNC_OPT = cli_option("--sync", dest="do_locking",
555
                      default=False, action="store_true",
556
                      help=("Grab locks while doing the queries"
557
                            " in order to ensure more consistent results"))
558

    
559
_DRY_RUN_OPT = cli_option("--dry-run", default=False,
560
                          action="store_true",
561
                          help=("Do not execute the operation, just run the"
562
                                " check steps and verify it it could be"
563
                                " executed"))
564

    
565
VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
566
                         action="store_true",
567
                         help="Increase the verbosity of the operation")
568

    
569
DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
570
                              action="store_true", dest="simulate_errors",
571
                              help="Debugging option that makes the operation"
572
                              " treat most runtime checks as failed")
573

    
574
NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
575
                        default=True, action="store_false",
576
                        help="Don't wait for sync (DANGEROUS!)")
577

    
578
DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
579
                               help="Custom disk setup (diskless, file,"
580
                               " plain or drbd)",
581
                               default=None, metavar="TEMPL",
582
                               choices=list(constants.DISK_TEMPLATES))
583

    
584
NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
585
                        help="Do not create any network cards for"
586
                        " the instance")
587

    
588
FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
589
                               help="Relative path under default cluster-wide"
590
                               " file storage dir to store file-based disks",
591
                               default=None, metavar="<DIR>")
592

    
593
FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
594
                                  help="Driver to use for image files",
595
                                  default="loop", metavar="<DRIVER>",
596
                                  choices=list(constants.FILE_DRIVER))
597

    
598
IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
599
                            help="Select nodes for the instance automatically"
600
                            " using the <NAME> iallocator plugin",
601
                            default=None, type="string",
602
                            completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
603

    
604
OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
605
                    metavar="<os>",
606
                    completion_suggest=OPT_COMPL_ONE_OS)
607

    
608
FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
609
                               action="store_true", default=False,
610
                               help="Force an unknown variant")
611

    
612
NO_INSTALL_OPT = cli_option("--no-install", dest="no_install",
613
                            action="store_true", default=False,
614
                            help="Do not install the OS (will"
615
                            " enable no-start)")
616

    
617
BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
618
                         type="keyval", default={},
619
                         help="Backend parameters")
620

    
621
HVOPTS_OPT =  cli_option("-H", "--hypervisor-parameters", type="keyval",
622
                         default={}, dest="hvparams",
623
                         help="Hypervisor parameters")
624

    
625
HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
626
                            help="Hypervisor and hypervisor options, in the"
627
                            " format hypervisor:option=value,option=value,...",
628
                            default=None, type="identkeyval")
629

    
630
HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
631
                        help="Hypervisor and hypervisor options, in the"
632
                        " format hypervisor:option=value,option=value,...",
633
                        default=[], action="append", type="identkeyval")
634

    
635
NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
636
                           action="store_false",
637
                           help="Don't check that the instance's IP"
638
                           " is alive")
639

    
640
NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
641
                             default=True, action="store_false",
642
                             help="Don't check that the instance's name"
643
                             " is resolvable")
644

    
645
NET_OPT = cli_option("--net",
646
                     help="NIC parameters", default=[],
647
                     dest="nics", action="append", type="identkeyval")
648

    
649
DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
650
                      dest="disks", action="append", type="identkeyval")
651

    
652
DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
653
                         help="Comma-separated list of disks"
654
                         " indices to act on (e.g. 0,2) (optional,"
655
                         " defaults to all disks)")
656

    
657
OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
658
                         help="Enforces a single-disk configuration using the"
659
                         " given disk size, in MiB unless a suffix is used",
660
                         default=None, type="unit", metavar="<size>")
661

    
662
IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
663
                                dest="ignore_consistency",
664
                                action="store_true", default=False,
665
                                help="Ignore the consistency of the disks on"
666
                                " the secondary")
667

    
668
NONLIVE_OPT = cli_option("--non-live", dest="live",
669
                         default=True, action="store_false",
670
                         help="Do a non-live migration (this usually means"
671
                         " freeze the instance, save the state, transfer and"
672
                         " only then resume running on the secondary node)")
673

    
674
NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
675
                                help="Target node and optional secondary node",
676
                                metavar="<pnode>[:<snode>]",
677
                                completion_suggest=OPT_COMPL_INST_ADD_NODES)
678

    
679
NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
680
                           action="append", metavar="<node>",
681
                           help="Use only this node (can be used multiple"
682
                           " times, if not given defaults to all nodes)",
683
                           completion_suggest=OPT_COMPL_ONE_NODE)
684

    
685
SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
686
                             metavar="<node>",
687
                             completion_suggest=OPT_COMPL_ONE_NODE)
688

    
689
NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
690
                         action="store_false",
691
                         help="Don't start the instance after creation")
692

    
693
SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
694
                         action="store_true", default=False,
695
                         help="Show command instead of executing it")
696

    
697
CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
698
                         default=False, action="store_true",
699
                         help="Instead of performing the migration, try to"
700
                         " recover from a failed cleanup. This is safe"
701
                         " to run even if the instance is healthy, but it"
702
                         " will create extra replication traffic and "
703
                         " disrupt briefly the replication (like during the"
704
                         " migration")
705

    
706
STATIC_OPT = cli_option("-s", "--static", dest="static",
707
                        action="store_true", default=False,
708
                        help="Only show configuration data, not runtime data")
709

    
710
ALL_OPT = cli_option("--all", dest="show_all",
711
                     default=False, action="store_true",
712
                     help="Show info on all instances on the cluster."
713
                     " This can take a long time to run, use wisely")
714

    
715
SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
716
                           action="store_true", default=False,
717
                           help="Interactive OS reinstall, lists available"
718
                           " OS templates for selection")
719

    
720
IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
721
                                 action="store_true", default=False,
722
                                 help="Remove the instance from the cluster"
723
                                 " configuration even if there are failures"
724
                                 " during the removal process")
725

    
726
NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
727
                               help="Specifies the new secondary node",
728
                               metavar="NODE", default=None,
729
                               completion_suggest=OPT_COMPL_ONE_NODE)
730

    
731
ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
732
                            default=False, action="store_true",
733
                            help="Replace the disk(s) on the primary"
734
                            " node (only for the drbd template)")
735

    
736
ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
737
                              default=False, action="store_true",
738
                              help="Replace the disk(s) on the secondary"
739
                              " node (only for the drbd template)")
740

    
741
AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
742
                              default=False, action="store_true",
743
                              help="Lock all nodes and auto-promote as needed"
744
                              " to MC status")
745

    
746
AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
747
                              default=False, action="store_true",
748
                              help="Automatically replace faulty disks"
749
                              " (only for the drbd template)")
750

    
751
IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
752
                             default=False, action="store_true",
753
                             help="Ignore current recorded size"
754
                             " (useful for forcing activation when"
755
                             " the recorded size is wrong)")
756

    
757
SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
758
                          metavar="<node>",
759
                          completion_suggest=OPT_COMPL_ONE_NODE)
760

    
761
SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
762
                         metavar="<dir>")
763

    
764
SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
765
                              help="Specify the secondary ip for the node",
766
                              metavar="ADDRESS", default=None)
767

    
768
READD_OPT = cli_option("--readd", dest="readd",
769
                       default=False, action="store_true",
770
                       help="Readd old node after replacing it")
771

    
772
NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
773
                                default=True, action="store_false",
774
                                help="Disable SSH key fingerprint checking")
775

    
776

    
777
MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
778
                    type="bool", default=None, metavar=_YORNO,
779
                    help="Set the master_candidate flag on the node")
780

    
781
OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
782
                         type="bool", default=None,
783
                         help="Set the offline flag on the node")
784

    
785
DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
786
                         type="bool", default=None,
787
                         help="Set the drained flag on the node")
788

    
789
ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
790
                             type="bool", default=None, metavar=_YORNO,
791
                             help="Set the allocatable flag on a volume")
792

    
793
NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
794
                               help="Disable support for lvm based instances"
795
                               " (cluster-wide)",
796
                               action="store_false", default=True)
797

    
798
ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
799
                            dest="enabled_hypervisors",
800
                            help="Comma-separated list of hypervisors",
801
                            type="string", default=None)
802

    
803
NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
804
                            type="keyval", default={},
805
                            help="NIC parameters")
806

    
807
CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
808
                         dest="candidate_pool_size", type="int",
809
                         help="Set the candidate pool size")
810

    
811
VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name",
812
                         help="Enables LVM and specifies the volume group"
813
                         " name (cluster-wide) for disk allocation [xenvg]",
814
                         metavar="VG", default=None)
815

    
816
YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
817
                          help="Destroy cluster", action="store_true")
818

    
819
NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
820
                          help="Skip node agreement check (dangerous)",
821
                          action="store_true", default=False)
822

    
823
MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
824
                            help="Specify the mac prefix for the instance IP"
825
                            " addresses, in the format XX:XX:XX",
826
                            metavar="PREFIX",
827
                            default=None)
828

    
829
MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
830
                               help="Specify the node interface (cluster-wide)"
831
                               " on which the master IP address will be added "
832
                               " [%s]" % constants.DEFAULT_BRIDGE,
833
                               metavar="NETDEV",
834
                               default=constants.DEFAULT_BRIDGE)
835

    
836

    
837
GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
838
                                help="Specify the default directory (cluster-"
839
                                "wide) for storing the file-based disks [%s]" %
840
                                constants.DEFAULT_FILE_STORAGE_DIR,
841
                                metavar="DIR",
842
                                default=constants.DEFAULT_FILE_STORAGE_DIR)
843

    
844
NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
845
                                   help="Don't modify /etc/hosts",
846
                                   action="store_false", default=True)
847

    
848
NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
849
                                    help="Don't initialize SSH keys",
850
                                    action="store_false", default=True)
851

    
852
ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
853
                             help="Enable parseable error messages",
854
                             action="store_true", default=False)
855

    
856
NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
857
                          help="Skip N+1 memory redundancy tests",
858
                          action="store_true", default=False)
859

    
860
REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
861
                             help="Type of reboot: soft/hard/full",
862
                             default=constants.INSTANCE_REBOOT_HARD,
863
                             metavar="<REBOOT>",
864
                             choices=list(constants.REBOOT_TYPES))
865

    
866
IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
867
                                    dest="ignore_secondaries",
868
                                    default=False, action="store_true",
869
                                    help="Ignore errors from secondaries")
870

    
871
NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
872
                            action="store_false", default=True,
873
                            help="Don't shutdown the instance (unsafe)")
874

    
875
TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
876
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
877
                         help="Maximum time to wait")
878

    
879
SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
880
                         dest="shutdown_timeout", type="int",
881
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
882
                         help="Maximum time to wait for instance shutdown")
883

    
884
EARLY_RELEASE_OPT = cli_option("--early-release",
885
                               dest="early_release", default=False,
886
                               action="store_true",
887
                               help="Release the locks on the secondary"
888
                               " node(s) early")
889

    
890
NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
891
                                  dest="new_cluster_cert",
892
                                  default=False, action="store_true",
893
                                  help="Generate a new cluster certificate")
894

    
895
RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
896
                           default=None,
897
                           help="File containing new RAPI certificate")
898

    
899
NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
900
                               default=None, action="store_true",
901
                               help=("Generate a new self-signed RAPI"
902
                                     " certificate"))
903

    
904
NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
905
                                    dest="new_confd_hmac_key",
906
                                    default=False, action="store_true",
907
                                    help=("Create a new HMAC key for %s" %
908
                                          constants.CONFD))
909

    
910
USE_REPL_NET_OPT = cli_option("--use-replication-network",
911
                              dest="use_replication_network",
912
                              help="Whether to use the replication network"
913
                              " for talking to the nodes",
914
                              action="store_true", default=False)
915

    
916

    
917
def _ParseArgs(argv, commands, aliases):
918
  """Parser for the command line arguments.
919

920
  This function parses the arguments and returns the function which
921
  must be executed together with its (modified) arguments.
922

923
  @param argv: the command line
924
  @param commands: dictionary with special contents, see the design
925
      doc for cmdline handling
926
  @param aliases: dictionary with command aliases {'alias': 'target, ...}
927

928
  """
929
  if len(argv) == 0:
930
    binary = "<command>"
931
  else:
932
    binary = argv[0].split("/")[-1]
933

    
934
  if len(argv) > 1 and argv[1] == "--version":
935
    ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
936
    # Quit right away. That way we don't have to care about this special
937
    # argument. optparse.py does it the same.
938
    sys.exit(0)
939

    
940
  if len(argv) < 2 or not (argv[1] in commands or
941
                           argv[1] in aliases):
942
    # let's do a nice thing
943
    sortedcmds = commands.keys()
944
    sortedcmds.sort()
945

    
946
    ToStdout("Usage: %s {command} [options...] [argument...]", binary)
947
    ToStdout("%s <command> --help to see details, or man %s", binary, binary)
948
    ToStdout("")
949

    
950
    # compute the max line length for cmd + usage
951
    mlen = max([len(" %s" % cmd) for cmd in commands])
952
    mlen = min(60, mlen) # should not get here...
953

    
954
    # and format a nice command list
955
    ToStdout("Commands:")
956
    for cmd in sortedcmds:
957
      cmdstr = " %s" % (cmd,)
958
      help_text = commands[cmd][4]
959
      help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
960
      ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
961
      for line in help_lines:
962
        ToStdout("%-*s   %s", mlen, "", line)
963

    
964
    ToStdout("")
965

    
966
    return None, None, None
967

    
968
  # get command, unalias it, and look it up in commands
969
  cmd = argv.pop(1)
970
  if cmd in aliases:
971
    if cmd in commands:
972
      raise errors.ProgrammerError("Alias '%s' overrides an existing"
973
                                   " command" % cmd)
974

    
975
    if aliases[cmd] not in commands:
976
      raise errors.ProgrammerError("Alias '%s' maps to non-existing"
977
                                   " command '%s'" % (cmd, aliases[cmd]))
978

    
979
    cmd = aliases[cmd]
980

    
981
  func, args_def, parser_opts, usage, description = commands[cmd]
982
  parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT, DEBUG_OPT],
983
                        description=description,
984
                        formatter=TitledHelpFormatter(),
985
                        usage="%%prog %s %s" % (cmd, usage))
986
  parser.disable_interspersed_args()
987
  options, args = parser.parse_args()
988

    
989
  if not _CheckArguments(cmd, args_def, args):
990
    return None, None, None
991

    
992
  return func, options, args
993

    
994

    
995
def _CheckArguments(cmd, args_def, args):
996
  """Verifies the arguments using the argument definition.
997

998
  Algorithm:
999

1000
    1. Abort with error if values specified by user but none expected.
1001

1002
    1. For each argument in definition
1003

1004
      1. Keep running count of minimum number of values (min_count)
1005
      1. Keep running count of maximum number of values (max_count)
1006
      1. If it has an unlimited number of values
1007

1008
        1. Abort with error if it's not the last argument in the definition
1009

1010
    1. If last argument has limited number of values
1011

1012
      1. Abort with error if number of values doesn't match or is too large
1013

1014
    1. Abort with error if user didn't pass enough values (min_count)
1015

1016
  """
1017
  if args and not args_def:
1018
    ToStderr("Error: Command %s expects no arguments", cmd)
1019
    return False
1020

    
1021
  min_count = None
1022
  max_count = None
1023
  check_max = None
1024

    
1025
  last_idx = len(args_def) - 1
1026

    
1027
  for idx, arg in enumerate(args_def):
1028
    if min_count is None:
1029
      min_count = arg.min
1030
    elif arg.min is not None:
1031
      min_count += arg.min
1032

    
1033
    if max_count is None:
1034
      max_count = arg.max
1035
    elif arg.max is not None:
1036
      max_count += arg.max
1037

    
1038
    if idx == last_idx:
1039
      check_max = (arg.max is not None)
1040

    
1041
    elif arg.max is None:
1042
      raise errors.ProgrammerError("Only the last argument can have max=None")
1043

    
1044
  if check_max:
1045
    # Command with exact number of arguments
1046
    if (min_count is not None and max_count is not None and
1047
        min_count == max_count and len(args) != min_count):
1048
      ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
1049
      return False
1050

    
1051
    # Command with limited number of arguments
1052
    if max_count is not None and len(args) > max_count:
1053
      ToStderr("Error: Command %s expects only %d argument(s)",
1054
               cmd, max_count)
1055
      return False
1056

    
1057
  # Command with some required arguments
1058
  if min_count is not None and len(args) < min_count:
1059
    ToStderr("Error: Command %s expects at least %d argument(s)",
1060
             cmd, min_count)
1061
    return False
1062

    
1063
  return True
1064

    
1065

    
1066
def SplitNodeOption(value):
1067
  """Splits the value of a --node option.
1068

1069
  """
1070
  if value and ':' in value:
1071
    return value.split(':', 1)
1072
  else:
1073
    return (value, None)
1074

    
1075

    
1076
def CalculateOSNames(os_name, os_variants):
1077
  """Calculates all the names an OS can be called, according to its variants.
1078

1079
  @type os_name: string
1080
  @param os_name: base name of the os
1081
  @type os_variants: list or None
1082
  @param os_variants: list of supported variants
1083
  @rtype: list
1084
  @return: list of valid names
1085

1086
  """
1087
  if os_variants:
1088
    return ['%s+%s' % (os_name, v) for v in os_variants]
1089
  else:
1090
    return [os_name]
1091

    
1092

    
1093
def UsesRPC(fn):
1094
  def wrapper(*args, **kwargs):
1095
    rpc.Init()
1096
    try:
1097
      return fn(*args, **kwargs)
1098
    finally:
1099
      rpc.Shutdown()
1100
  return wrapper
1101

    
1102

    
1103
def AskUser(text, choices=None):
1104
  """Ask the user a question.
1105

1106
  @param text: the question to ask
1107

1108
  @param choices: list with elements tuples (input_char, return_value,
1109
      description); if not given, it will default to: [('y', True,
1110
      'Perform the operation'), ('n', False, 'Do no do the operation')];
1111
      note that the '?' char is reserved for help
1112

1113
  @return: one of the return values from the choices list; if input is
1114
      not possible (i.e. not running with a tty, we return the last
1115
      entry from the list
1116

1117
  """
1118
  if choices is None:
1119
    choices = [('y', True, 'Perform the operation'),
1120
               ('n', False, 'Do not perform the operation')]
1121
  if not choices or not isinstance(choices, list):
1122
    raise errors.ProgrammerError("Invalid choices argument to AskUser")
1123
  for entry in choices:
1124
    if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
1125
      raise errors.ProgrammerError("Invalid choices element to AskUser")
1126

    
1127
  answer = choices[-1][1]
1128
  new_text = []
1129
  for line in text.splitlines():
1130
    new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
1131
  text = "\n".join(new_text)
1132
  try:
1133
    f = file("/dev/tty", "a+")
1134
  except IOError:
1135
    return answer
1136
  try:
1137
    chars = [entry[0] for entry in choices]
1138
    chars[-1] = "[%s]" % chars[-1]
1139
    chars.append('?')
1140
    maps = dict([(entry[0], entry[1]) for entry in choices])
1141
    while True:
1142
      f.write(text)
1143
      f.write('\n')
1144
      f.write("/".join(chars))
1145
      f.write(": ")
1146
      line = f.readline(2).strip().lower()
1147
      if line in maps:
1148
        answer = maps[line]
1149
        break
1150
      elif line == '?':
1151
        for entry in choices:
1152
          f.write(" %s - %s\n" % (entry[0], entry[2]))
1153
        f.write("\n")
1154
        continue
1155
  finally:
1156
    f.close()
1157
  return answer
1158

    
1159

    
1160
class JobSubmittedException(Exception):
1161
  """Job was submitted, client should exit.
1162

1163
  This exception has one argument, the ID of the job that was
1164
  submitted. The handler should print this ID.
1165

1166
  This is not an error, just a structured way to exit from clients.
1167

1168
  """
1169

    
1170

    
1171
def SendJob(ops, cl=None):
1172
  """Function to submit an opcode without waiting for the results.
1173

1174
  @type ops: list
1175
  @param ops: list of opcodes
1176
  @type cl: luxi.Client
1177
  @param cl: the luxi client to use for communicating with the master;
1178
             if None, a new client will be created
1179

1180
  """
1181
  if cl is None:
1182
    cl = GetClient()
1183

    
1184
  job_id = cl.SubmitJob(ops)
1185

    
1186
  return job_id
1187

    
1188

    
1189
def PollJob(job_id, cl=None, feedback_fn=None):
1190
  """Function to poll for the result of a job.
1191

1192
  @type job_id: job identified
1193
  @param job_id: the job to poll for results
1194
  @type cl: luxi.Client
1195
  @param cl: the luxi client to use for communicating with the master;
1196
             if None, a new client will be created
1197

1198
  """
1199
  if cl is None:
1200
    cl = GetClient()
1201

    
1202
  prev_job_info = None
1203
  prev_logmsg_serial = None
1204

    
1205
  status = None
1206

    
1207
  notified_queued = False
1208
  notified_waitlock = False
1209

    
1210
  while True:
1211
    result = cl.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1212
                                     prev_logmsg_serial)
1213
    if not result:
1214
      # job not found, go away!
1215
      raise errors.JobLost("Job with id %s lost" % job_id)
1216
    elif result == constants.JOB_NOTCHANGED:
1217
      if status is not None and not callable(feedback_fn):
1218
        if status == constants.JOB_STATUS_QUEUED and not notified_queued:
1219
          ToStderr("Job %s is waiting in queue", job_id)
1220
          notified_queued = True
1221
        elif status == constants.JOB_STATUS_WAITLOCK and not notified_waitlock:
1222
          ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1223
          notified_waitlock = True
1224

    
1225
      # Wait again
1226
      continue
1227

    
1228
    # Split result, a tuple of (field values, log entries)
1229
    (job_info, log_entries) = result
1230
    (status, ) = job_info
1231

    
1232
    if log_entries:
1233
      for log_entry in log_entries:
1234
        (serial, timestamp, _, message) = log_entry
1235
        if callable(feedback_fn):
1236
          feedback_fn(log_entry[1:])
1237
        else:
1238
          encoded = utils.SafeEncode(message)
1239
          ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded)
1240
        prev_logmsg_serial = max(prev_logmsg_serial, serial)
1241

    
1242
    # TODO: Handle canceled and archived jobs
1243
    elif status in (constants.JOB_STATUS_SUCCESS,
1244
                    constants.JOB_STATUS_ERROR,
1245
                    constants.JOB_STATUS_CANCELING,
1246
                    constants.JOB_STATUS_CANCELED):
1247
      break
1248

    
1249
    prev_job_info = job_info
1250

    
1251
  jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1252
  if not jobs:
1253
    raise errors.JobLost("Job with id %s lost" % job_id)
1254

    
1255
  status, opstatus, result = jobs[0]
1256
  if status == constants.JOB_STATUS_SUCCESS:
1257
    return result
1258
  elif status in (constants.JOB_STATUS_CANCELING,
1259
                  constants.JOB_STATUS_CANCELED):
1260
    raise errors.OpExecError("Job was canceled")
1261
  else:
1262
    has_ok = False
1263
    for idx, (status, msg) in enumerate(zip(opstatus, result)):
1264
      if status == constants.OP_STATUS_SUCCESS:
1265
        has_ok = True
1266
      elif status == constants.OP_STATUS_ERROR:
1267
        errors.MaybeRaise(msg)
1268
        if has_ok:
1269
          raise errors.OpExecError("partial failure (opcode %d): %s" %
1270
                                   (idx, msg))
1271
        else:
1272
          raise errors.OpExecError(str(msg))
1273
    # default failure mode
1274
    raise errors.OpExecError(result)
1275

    
1276

    
1277
def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):
1278
  """Legacy function to submit an opcode.
1279

1280
  This is just a simple wrapper over the construction of the processor
1281
  instance. It should be extended to better handle feedback and
1282
  interaction functions.
1283

1284
  """
1285
  if cl is None:
1286
    cl = GetClient()
1287

    
1288
  SetGenericOpcodeOpts([op], opts)
1289

    
1290
  job_id = SendJob([op], cl)
1291

    
1292
  op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
1293

    
1294
  return op_results[0]
1295

    
1296

    
1297
def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1298
  """Wrapper around SubmitOpCode or SendJob.
1299

1300
  This function will decide, based on the 'opts' parameter, whether to
1301
  submit and wait for the result of the opcode (and return it), or
1302
  whether to just send the job and print its identifier. It is used in
1303
  order to simplify the implementation of the '--submit' option.
1304

1305
  It will also process the opcodes if we're sending the via SendJob
1306
  (otherwise SubmitOpCode does it).
1307

1308
  """
1309
  if opts and opts.submit_only:
1310
    job = [op]
1311
    SetGenericOpcodeOpts(job, opts)
1312
    job_id = SendJob(job, cl=cl)
1313
    raise JobSubmittedException(job_id)
1314
  else:
1315
    return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1316

    
1317

    
1318
def SetGenericOpcodeOpts(opcode_list, options):
1319
  """Processor for generic options.
1320

1321
  This function updates the given opcodes based on generic command
1322
  line options (like debug, dry-run, etc.).
1323

1324
  @param opcode_list: list of opcodes
1325
  @param options: command line options or None
1326
  @return: None (in-place modification)
1327

1328
  """
1329
  if not options:
1330
    return
1331
  for op in opcode_list:
1332
    op.dry_run = options.dry_run
1333
    op.debug_level = options.debug
1334

    
1335

    
1336
def GetClient():
1337
  # TODO: Cache object?
1338
  try:
1339
    client = luxi.Client()
1340
  except luxi.NoMasterError:
1341
    ss = ssconf.SimpleStore()
1342

    
1343
    # Try to read ssconf file
1344
    try:
1345
      ss.GetMasterNode()
1346
    except errors.ConfigurationError:
1347
      raise errors.OpPrereqError("Cluster not initialized or this machine is"
1348
                                 " not part of a cluster")
1349

    
1350
    master, myself = ssconf.GetMasterAndMyself(ss=ss)
1351
    if master != myself:
1352
      raise errors.OpPrereqError("This is not the master node, please connect"
1353
                                 " to node '%s' and rerun the command" %
1354
                                 master)
1355
    raise
1356
  return client
1357

    
1358

    
1359
def FormatError(err):
1360
  """Return a formatted error message for a given error.
1361

1362
  This function takes an exception instance and returns a tuple
1363
  consisting of two values: first, the recommended exit code, and
1364
  second, a string describing the error message (not
1365
  newline-terminated).
1366

1367
  """
1368
  retcode = 1
1369
  obuf = StringIO()
1370
  msg = str(err)
1371
  if isinstance(err, errors.ConfigurationError):
1372
    txt = "Corrupt configuration file: %s" % msg
1373
    logging.error(txt)
1374
    obuf.write(txt + "\n")
1375
    obuf.write("Aborting.")
1376
    retcode = 2
1377
  elif isinstance(err, errors.HooksAbort):
1378
    obuf.write("Failure: hooks execution failed:\n")
1379
    for node, script, out in err.args[0]:
1380
      if out:
1381
        obuf.write("  node: %s, script: %s, output: %s\n" %
1382
                   (node, script, out))
1383
      else:
1384
        obuf.write("  node: %s, script: %s (no output)\n" %
1385
                   (node, script))
1386
  elif isinstance(err, errors.HooksFailure):
1387
    obuf.write("Failure: hooks general failure: %s" % msg)
1388
  elif isinstance(err, errors.ResolverError):
1389
    this_host = utils.HostInfo.SysName()
1390
    if err.args[0] == this_host:
1391
      msg = "Failure: can't resolve my own hostname ('%s')"
1392
    else:
1393
      msg = "Failure: can't resolve hostname '%s'"
1394
    obuf.write(msg % err.args[0])
1395
  elif isinstance(err, errors.OpPrereqError):
1396
    if len(err.args) == 2:
1397
      obuf.write("Failure: prerequisites not met for this"
1398
               " operation:\nerror type: %s, error details:\n%s" %
1399
                 (err.args[1], err.args[0]))
1400
    else:
1401
      obuf.write("Failure: prerequisites not met for this"
1402
                 " operation:\n%s" % msg)
1403
  elif isinstance(err, errors.OpExecError):
1404
    obuf.write("Failure: command execution error:\n%s" % msg)
1405
  elif isinstance(err, errors.TagError):
1406
    obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1407
  elif isinstance(err, errors.JobQueueDrainError):
1408
    obuf.write("Failure: the job queue is marked for drain and doesn't"
1409
               " accept new requests\n")
1410
  elif isinstance(err, errors.JobQueueFull):
1411
    obuf.write("Failure: the job queue is full and doesn't accept new"
1412
               " job submissions until old jobs are archived\n")
1413
  elif isinstance(err, errors.TypeEnforcementError):
1414
    obuf.write("Parameter Error: %s" % msg)
1415
  elif isinstance(err, errors.ParameterError):
1416
    obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1417
  elif isinstance(err, errors.GenericError):
1418
    obuf.write("Unhandled Ganeti error: %s" % msg)
1419
  elif isinstance(err, luxi.NoMasterError):
1420
    obuf.write("Cannot communicate with the master daemon.\nIs it running"
1421
               " and listening for connections?")
1422
  elif isinstance(err, luxi.TimeoutError):
1423
    obuf.write("Timeout while talking to the master daemon. Error:\n"
1424
               "%s" % msg)
1425
  elif isinstance(err, luxi.ProtocolError):
1426
    obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1427
               "%s" % msg)
1428
  elif isinstance(err, JobSubmittedException):
1429
    obuf.write("JobID: %s\n" % err.args[0])
1430
    retcode = 0
1431
  else:
1432
    obuf.write("Unhandled exception: %s" % msg)
1433
  return retcode, obuf.getvalue().rstrip('\n')
1434

    
1435

    
1436
def GenericMain(commands, override=None, aliases=None):
1437
  """Generic main function for all the gnt-* commands.
1438

1439
  Arguments:
1440
    - commands: a dictionary with a special structure, see the design doc
1441
                for command line handling.
1442
    - override: if not None, we expect a dictionary with keys that will
1443
                override command line options; this can be used to pass
1444
                options from the scripts to generic functions
1445
    - aliases: dictionary with command aliases {'alias': 'target, ...}
1446

1447
  """
1448
  # save the program name and the entire command line for later logging
1449
  if sys.argv:
1450
    binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1451
    if len(sys.argv) >= 2:
1452
      binary += " " + sys.argv[1]
1453
      old_cmdline = " ".join(sys.argv[2:])
1454
    else:
1455
      old_cmdline = ""
1456
  else:
1457
    binary = "<unknown program>"
1458
    old_cmdline = ""
1459

    
1460
  if aliases is None:
1461
    aliases = {}
1462

    
1463
  try:
1464
    func, options, args = _ParseArgs(sys.argv, commands, aliases)
1465
  except errors.ParameterError, err:
1466
    result, err_msg = FormatError(err)
1467
    ToStderr(err_msg)
1468
    return 1
1469

    
1470
  if func is None: # parse error
1471
    return 1
1472

    
1473
  if override is not None:
1474
    for key, val in override.iteritems():
1475
      setattr(options, key, val)
1476

    
1477
  utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1478
                     stderr_logging=True, program=binary)
1479

    
1480
  if old_cmdline:
1481
    logging.info("run with arguments '%s'", old_cmdline)
1482
  else:
1483
    logging.info("run with no arguments")
1484

    
1485
  try:
1486
    result = func(options, args)
1487
  except (errors.GenericError, luxi.ProtocolError,
1488
          JobSubmittedException), err:
1489
    result, err_msg = FormatError(err)
1490
    logging.exception("Error during command processing")
1491
    ToStderr(err_msg)
1492

    
1493
  return result
1494

    
1495

    
1496
def GenericInstanceCreate(mode, opts, args):
1497
  """Add an instance to the cluster via either creation or import.
1498

1499
  @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1500
  @param opts: the command line options selected by the user
1501
  @type args: list
1502
  @param args: should contain only one element, the new instance name
1503
  @rtype: int
1504
  @return: the desired exit code
1505

1506
  """
1507
  instance = args[0]
1508

    
1509
  (pnode, snode) = SplitNodeOption(opts.node)
1510

    
1511
  hypervisor = None
1512
  hvparams = {}
1513
  if opts.hypervisor:
1514
    hypervisor, hvparams = opts.hypervisor
1515

    
1516
  if opts.nics:
1517
    try:
1518
      nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
1519
    except ValueError, err:
1520
      raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
1521
    nics = [{}] * nic_max
1522
    for nidx, ndict in opts.nics:
1523
      nidx = int(nidx)
1524
      if not isinstance(ndict, dict):
1525
        msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict)
1526
        raise errors.OpPrereqError(msg)
1527
      nics[nidx] = ndict
1528
  elif opts.no_nics:
1529
    # no nics
1530
    nics = []
1531
  else:
1532
    # default of one nic, all auto
1533
    nics = [{}]
1534

    
1535
  if opts.disk_template == constants.DT_DISKLESS:
1536
    if opts.disks or opts.sd_size is not None:
1537
      raise errors.OpPrereqError("Diskless instance but disk"
1538
                                 " information passed")
1539
    disks = []
1540
  else:
1541
    if not opts.disks and not opts.sd_size:
1542
      raise errors.OpPrereqError("No disk information specified")
1543
    if opts.disks and opts.sd_size is not None:
1544
      raise errors.OpPrereqError("Please use either the '--disk' or"
1545
                                 " '-s' option")
1546
    if opts.sd_size is not None:
1547
      opts.disks = [(0, {"size": opts.sd_size})]
1548
    try:
1549
      disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1550
    except ValueError, err:
1551
      raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
1552
    disks = [{}] * disk_max
1553
    for didx, ddict in opts.disks:
1554
      didx = int(didx)
1555
      if not isinstance(ddict, dict):
1556
        msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1557
        raise errors.OpPrereqError(msg)
1558
      elif "size" in ddict:
1559
        if "adopt" in ddict:
1560
          raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1561
                                     " (disk %d)" % didx)
1562
        try:
1563
          ddict["size"] = utils.ParseUnit(ddict["size"])
1564
        except ValueError, err:
1565
          raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1566
                                     (didx, err))
1567
      elif "adopt" in ddict:
1568
        if mode == constants.INSTANCE_IMPORT:
1569
          raise errors.OpPrereqError("Disk adoption not allowed for instance"
1570
                                     " import")
1571
        ddict["size"] = 0
1572
      else:
1573
        raise errors.OpPrereqError("Missing size or adoption source for"
1574
                                   " disk %d" % didx)
1575
      disks[didx] = ddict
1576

    
1577
  utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
1578
  utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1579

    
1580
  if mode == constants.INSTANCE_CREATE:
1581
    start = opts.start
1582
    os_type = opts.os
1583
    src_node = None
1584
    src_path = None
1585
    no_install = opts.no_install
1586
  elif mode == constants.INSTANCE_IMPORT:
1587
    start = False
1588
    os_type = None
1589
    src_node = opts.src_node
1590
    src_path = opts.src_dir
1591
    no_install = None
1592
  else:
1593
    raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1594

    
1595
  op = opcodes.OpCreateInstance(instance_name=instance,
1596
                                disks=disks,
1597
                                disk_template=opts.disk_template,
1598
                                nics=nics,
1599
                                pnode=pnode, snode=snode,
1600
                                ip_check=opts.ip_check,
1601
                                name_check=opts.name_check,
1602
                                wait_for_sync=opts.wait_for_sync,
1603
                                file_storage_dir=opts.file_storage_dir,
1604
                                file_driver=opts.file_driver,
1605
                                iallocator=opts.iallocator,
1606
                                hypervisor=hypervisor,
1607
                                hvparams=hvparams,
1608
                                beparams=opts.beparams,
1609
                                mode=mode,
1610
                                start=start,
1611
                                os_type=os_type,
1612
                                src_node=src_node,
1613
                                src_path=src_path,
1614
                                no_install=no_install)
1615

    
1616
  SubmitOrSend(op, opts)
1617
  return 0
1618

    
1619

    
1620
class _RunWhileClusterStoppedHelper:
1621
  """Helper class for L{RunWhileClusterStopped} to simplify state management
1622

1623
  """
1624
  def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
1625
    """Initializes this class.
1626

1627
    @type feedback_fn: callable
1628
    @param feedback_fn: Feedback function
1629
    @type cluster_name: string
1630
    @param cluster_name: Cluster name
1631
    @type master_node: string
1632
    @param master_node Master node name
1633
    @type online_nodes: list
1634
    @param online_nodes: List of names of online nodes
1635

1636
    """
1637
    self.feedback_fn = feedback_fn
1638
    self.cluster_name = cluster_name
1639
    self.master_node = master_node
1640
    self.online_nodes = online_nodes
1641

    
1642
    self.ssh = ssh.SshRunner(self.cluster_name)
1643

    
1644
    self.nonmaster_nodes = [name for name in online_nodes
1645
                            if name != master_node]
1646

    
1647
    assert self.master_node not in self.nonmaster_nodes
1648

    
1649
  def _RunCmd(self, node_name, cmd):
1650
    """Runs a command on the local or a remote machine.
1651

1652
    @type node_name: string
1653
    @param node_name: Machine name
1654
    @type cmd: list
1655
    @param cmd: Command
1656

1657
    """
1658
    if node_name is None or node_name == self.master_node:
1659
      # No need to use SSH
1660
      result = utils.RunCmd(cmd)
1661
    else:
1662
      result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
1663

    
1664
    if result.failed:
1665
      errmsg = ["Failed to run command %s" % result.cmd]
1666
      if node_name:
1667
        errmsg.append("on node %s" % node_name)
1668
      errmsg.append(": exitcode %s and error %s" %
1669
                    (result.exit_code, result.output))
1670
      raise errors.OpExecError(" ".join(errmsg))
1671

    
1672
  def Call(self, fn, *args):
1673
    """Call function while all daemons are stopped.
1674

1675
    @type fn: callable
1676
    @param fn: Function to be called
1677

1678
    """
1679
    # Pause watcher by acquiring an exclusive lock on watcher state file
1680
    self.feedback_fn("Blocking watcher")
1681
    watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
1682
    try:
1683
      # TODO: Currently, this just blocks. There's no timeout.
1684
      # TODO: Should it be a shared lock?
1685
      watcher_block.Exclusive(blocking=True)
1686

    
1687
      # Stop master daemons, so that no new jobs can come in and all running
1688
      # ones are finished
1689
      self.feedback_fn("Stopping master daemons")
1690
      self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
1691
      try:
1692
        # Stop daemons on all nodes
1693
        for node_name in self.online_nodes:
1694
          self.feedback_fn("Stopping daemons on %s" % node_name)
1695
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
1696

    
1697
        # All daemons are shut down now
1698
        try:
1699
          return fn(self, *args)
1700
        except Exception, err:
1701
          _, errmsg = FormatError(err)
1702
          logging.exception("Caught exception")
1703
          self.feedback_fn(errmsg)
1704
          raise
1705
      finally:
1706
        # Start cluster again, master node last
1707
        for node_name in self.nonmaster_nodes + [self.master_node]:
1708
          self.feedback_fn("Starting daemons on %s" % node_name)
1709
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
1710
    finally:
1711
      # Resume watcher
1712
      watcher_block.Close()
1713

    
1714

    
1715
def RunWhileClusterStopped(feedback_fn, fn, *args):
1716
  """Calls a function while all cluster daemons are stopped.
1717

1718
  @type feedback_fn: callable
1719
  @param feedback_fn: Feedback function
1720
  @type fn: callable
1721
  @param fn: Function to be called when daemons are stopped
1722

1723
  """
1724
  feedback_fn("Gathering cluster information")
1725

    
1726
  # This ensures we're running on the master daemon
1727
  cl = GetClient()
1728

    
1729
  (cluster_name, master_node) = \
1730
    cl.QueryConfigValues(["cluster_name", "master_node"])
1731

    
1732
  online_nodes = GetOnlineNodes([], cl=cl)
1733

    
1734
  # Don't keep a reference to the client. The master daemon will go away.
1735
  del cl
1736

    
1737
  assert master_node in online_nodes
1738

    
1739
  return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
1740
                                       online_nodes).Call(fn, *args)
1741

    
1742

    
1743
def GenerateTable(headers, fields, separator, data,
1744
                  numfields=None, unitfields=None,
1745
                  units=None):
1746
  """Prints a table with headers and different fields.
1747

1748
  @type headers: dict
1749
  @param headers: dictionary mapping field names to headers for
1750
      the table
1751
  @type fields: list
1752
  @param fields: the field names corresponding to each row in
1753
      the data field
1754
  @param separator: the separator to be used; if this is None,
1755
      the default 'smart' algorithm is used which computes optimal
1756
      field width, otherwise just the separator is used between
1757
      each field
1758
  @type data: list
1759
  @param data: a list of lists, each sublist being one row to be output
1760
  @type numfields: list
1761
  @param numfields: a list with the fields that hold numeric
1762
      values and thus should be right-aligned
1763
  @type unitfields: list
1764
  @param unitfields: a list with the fields that hold numeric
1765
      values that should be formatted with the units field
1766
  @type units: string or None
1767
  @param units: the units we should use for formatting, or None for
1768
      automatic choice (human-readable for non-separator usage, otherwise
1769
      megabytes); this is a one-letter string
1770

1771
  """
1772
  if units is None:
1773
    if separator:
1774
      units = "m"
1775
    else:
1776
      units = "h"
1777

    
1778
  if numfields is None:
1779
    numfields = []
1780
  if unitfields is None:
1781
    unitfields = []
1782

    
1783
  numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
1784
  unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
1785

    
1786
  format_fields = []
1787
  for field in fields:
1788
    if headers and field not in headers:
1789
      # TODO: handle better unknown fields (either revert to old
1790
      # style of raising exception, or deal more intelligently with
1791
      # variable fields)
1792
      headers[field] = field
1793
    if separator is not None:
1794
      format_fields.append("%s")
1795
    elif numfields.Matches(field):
1796
      format_fields.append("%*s")
1797
    else:
1798
      format_fields.append("%-*s")
1799

    
1800
  if separator is None:
1801
    mlens = [0 for name in fields]
1802
    format = ' '.join(format_fields)
1803
  else:
1804
    format = separator.replace("%", "%%").join(format_fields)
1805

    
1806
  for row in data:
1807
    if row is None:
1808
      continue
1809
    for idx, val in enumerate(row):
1810
      if unitfields.Matches(fields[idx]):
1811
        try:
1812
          val = int(val)
1813
        except (TypeError, ValueError):
1814
          pass
1815
        else:
1816
          val = row[idx] = utils.FormatUnit(val, units)
1817
      val = row[idx] = str(val)
1818
      if separator is None:
1819
        mlens[idx] = max(mlens[idx], len(val))
1820

    
1821
  result = []
1822
  if headers:
1823
    args = []
1824
    for idx, name in enumerate(fields):
1825
      hdr = headers[name]
1826
      if separator is None:
1827
        mlens[idx] = max(mlens[idx], len(hdr))
1828
        args.append(mlens[idx])
1829
      args.append(hdr)
1830
    result.append(format % tuple(args))
1831

    
1832
  if separator is None:
1833
    assert len(mlens) == len(fields)
1834

    
1835
    if fields and not numfields.Matches(fields[-1]):
1836
      mlens[-1] = 0
1837

    
1838
  for line in data:
1839
    args = []
1840
    if line is None:
1841
      line = ['-' for _ in fields]
1842
    for idx in range(len(fields)):
1843
      if separator is None:
1844
        args.append(mlens[idx])
1845
      args.append(line[idx])
1846
    result.append(format % tuple(args))
1847

    
1848
  return result
1849

    
1850

    
1851
def FormatTimestamp(ts):
1852
  """Formats a given timestamp.
1853

1854
  @type ts: timestamp
1855
  @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
1856

1857
  @rtype: string
1858
  @return: a string with the formatted timestamp
1859

1860
  """
1861
  if not isinstance (ts, (tuple, list)) or len(ts) != 2:
1862
    return '?'
1863
  sec, usec = ts
1864
  return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
1865

    
1866

    
1867
def ParseTimespec(value):
1868
  """Parse a time specification.
1869

1870
  The following suffixed will be recognized:
1871

1872
    - s: seconds
1873
    - m: minutes
1874
    - h: hours
1875
    - d: day
1876
    - w: weeks
1877

1878
  Without any suffix, the value will be taken to be in seconds.
1879

1880
  """
1881
  value = str(value)
1882
  if not value:
1883
    raise errors.OpPrereqError("Empty time specification passed")
1884
  suffix_map = {
1885
    's': 1,
1886
    'm': 60,
1887
    'h': 3600,
1888
    'd': 86400,
1889
    'w': 604800,
1890
    }
1891
  if value[-1] not in suffix_map:
1892
    try:
1893
      value = int(value)
1894
    except (TypeError, ValueError):
1895
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1896
  else:
1897
    multiplier = suffix_map[value[-1]]
1898
    value = value[:-1]
1899
    if not value: # no data left after stripping the suffix
1900
      raise errors.OpPrereqError("Invalid time specification (only"
1901
                                 " suffix passed)")
1902
    try:
1903
      value = int(value) * multiplier
1904
    except (TypeError, ValueError):
1905
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1906
  return value
1907

    
1908

    
1909
def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
1910
                   filter_master=False):
1911
  """Returns the names of online nodes.
1912

1913
  This function will also log a warning on stderr with the names of
1914
  the online nodes.
1915

1916
  @param nodes: if not empty, use only this subset of nodes (minus the
1917
      offline ones)
1918
  @param cl: if not None, luxi client to use
1919
  @type nowarn: boolean
1920
  @param nowarn: by default, this function will output a note with the
1921
      offline nodes that are skipped; if this parameter is True the
1922
      note is not displayed
1923
  @type secondary_ips: boolean
1924
  @param secondary_ips: if True, return the secondary IPs instead of the
1925
      names, useful for doing network traffic over the replication interface
1926
      (if any)
1927
  @type filter_master: boolean
1928
  @param filter_master: if True, do not return the master node in the list
1929
      (useful in coordination with secondary_ips where we cannot check our
1930
      node name against the list)
1931

1932
  """
1933
  if cl is None:
1934
    cl = GetClient()
1935

    
1936
  if secondary_ips:
1937
    name_idx = 2
1938
  else:
1939
    name_idx = 0
1940

    
1941
  if filter_master:
1942
    master_node = cl.QueryConfigValues(["master_node"])[0]
1943
    filter_fn = lambda x: x != master_node
1944
  else:
1945
    filter_fn = lambda _: True
1946

    
1947
  result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"],
1948
                         use_locking=False)
1949
  offline = [row[0] for row in result if row[1]]
1950
  if offline and not nowarn:
1951
    ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
1952
  return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])]
1953

    
1954

    
1955
def _ToStream(stream, txt, *args):
1956
  """Write a message to a stream, bypassing the logging system
1957

1958
  @type stream: file object
1959
  @param stream: the file to which we should write
1960
  @type txt: str
1961
  @param txt: the message
1962

1963
  """
1964
  if args:
1965
    args = tuple(args)
1966
    stream.write(txt % args)
1967
  else:
1968
    stream.write(txt)
1969
  stream.write('\n')
1970
  stream.flush()
1971

    
1972

    
1973
def ToStdout(txt, *args):
1974
  """Write a message to stdout only, bypassing the logging system
1975

1976
  This is just a wrapper over _ToStream.
1977

1978
  @type txt: str
1979
  @param txt: the message
1980

1981
  """
1982
  _ToStream(sys.stdout, txt, *args)
1983

    
1984

    
1985
def ToStderr(txt, *args):
1986
  """Write a message to stderr only, bypassing the logging system
1987

1988
  This is just a wrapper over _ToStream.
1989

1990
  @type txt: str
1991
  @param txt: the message
1992

1993
  """
1994
  _ToStream(sys.stderr, txt, *args)
1995

    
1996

    
1997
class JobExecutor(object):
1998
  """Class which manages the submission and execution of multiple jobs.
1999

2000
  Note that instances of this class should not be reused between
2001
  GetResults() calls.
2002

2003
  """
2004
  def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
2005
    self.queue = []
2006
    if cl is None:
2007
      cl = GetClient()
2008
    self.cl = cl
2009
    self.verbose = verbose
2010
    self.jobs = []
2011
    self.opts = opts
2012
    self.feedback_fn = feedback_fn
2013

    
2014
  def QueueJob(self, name, *ops):
2015
    """Record a job for later submit.
2016

2017
    @type name: string
2018
    @param name: a description of the job, will be used in WaitJobSet
2019
    """
2020
    SetGenericOpcodeOpts(ops, self.opts)
2021
    self.queue.append((name, ops))
2022

    
2023
  def SubmitPending(self):
2024
    """Submit all pending jobs.
2025

2026
    """
2027
    results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
2028
    for (idx, ((status, data), (name, _))) in enumerate(zip(results,
2029
                                                            self.queue)):
2030
      self.jobs.append((idx, status, data, name))
2031

    
2032
  def _ChooseJob(self):
2033
    """Choose a non-waiting/queued job to poll next.
2034

2035
    """
2036
    assert self.jobs, "_ChooseJob called with empty job list"
2037

    
2038
    result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
2039
    assert result
2040

    
2041
    for job_data, status in zip(self.jobs, result):
2042
      if status[0] in (constants.JOB_STATUS_QUEUED,
2043
                    constants.JOB_STATUS_WAITLOCK,
2044
                    constants.JOB_STATUS_CANCELING):
2045
        # job is still waiting
2046
        continue
2047
      # good candidate found
2048
      self.jobs.remove(job_data)
2049
      return job_data
2050

    
2051
    # no job found
2052
    return self.jobs.pop(0)
2053

    
2054
  def GetResults(self):
2055
    """Wait for and return the results of all jobs.
2056

2057
    @rtype: list
2058
    @return: list of tuples (success, job results), in the same order
2059
        as the submitted jobs; if a job has failed, instead of the result
2060
        there will be the error message
2061

2062
    """
2063
    if not self.jobs:
2064
      self.SubmitPending()
2065
    results = []
2066
    if self.verbose:
2067
      ok_jobs = [row[2] for row in self.jobs if row[1]]
2068
      if ok_jobs:
2069
        ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2070

    
2071
    # first, remove any non-submitted jobs
2072
    self.jobs, failures = utils.partition(self.jobs, lambda x: x[1])
2073
    for idx, _, jid, name in failures:
2074
      ToStderr("Failed to submit job for %s: %s", name, jid)
2075
      results.append((idx, False, jid))
2076

    
2077
    while self.jobs:
2078
      (idx, _, jid, name) = self._ChooseJob()
2079
      ToStdout("Waiting for job %s for %s...", jid, name)
2080
      try:
2081
        job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2082
        success = True
2083
      except (errors.GenericError, luxi.ProtocolError), err:
2084
        _, job_result = FormatError(err)
2085
        success = False
2086
        # the error message will always be shown, verbose or not
2087
        ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
2088

    
2089
      results.append((idx, success, job_result))
2090

    
2091
    # sort based on the index, then drop it
2092
    results.sort()
2093
    results = [i[1:] for i in results]
2094

    
2095
    return results
2096

    
2097
  def WaitOrShow(self, wait):
2098
    """Wait for job results or only print the job IDs.
2099

2100
    @type wait: boolean
2101
    @param wait: whether to wait or not
2102

2103
    """
2104
    if wait:
2105
      return self.GetResults()
2106
    else:
2107
      if not self.jobs:
2108
        self.SubmitPending()
2109
      for status, result, name in self.jobs:
2110
        if status:
2111
          ToStdout("%s: %s", result, name)
2112
        else:
2113
          ToStderr("Failure for %s: %s", name, result)