Statistics
| Branch: | Tag: | Revision:

root / lib / cli.py @ a744b676

History | View | Annotate | Download (76.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module dealing with command line parsing"""
23

    
24

    
25
import sys
26
import textwrap
27
import os.path
28
import time
29
import logging
30
from cStringIO import StringIO
31

    
32
from ganeti import utils
33
from ganeti import errors
34
from ganeti import constants
35
from ganeti import opcodes
36
from ganeti import luxi
37
from ganeti import ssconf
38
from ganeti import rpc
39
from ganeti import ssh
40
from ganeti import compat
41
from ganeti import netutils
42

    
43
from optparse import (OptionParser, TitledHelpFormatter,
44
                      Option, OptionValueError)
45

    
46

    
47
__all__ = [
48
  # Command line options
49
  "ADD_UIDS_OPT",
50
  "ALLOCATABLE_OPT",
51
  "ALL_OPT",
52
  "AUTO_PROMOTE_OPT",
53
  "AUTO_REPLACE_OPT",
54
  "BACKEND_OPT",
55
  "CLEANUP_OPT",
56
  "CLUSTER_DOMAIN_SECRET_OPT",
57
  "CONFIRM_OPT",
58
  "CP_SIZE_OPT",
59
  "DEBUG_OPT",
60
  "DEBUG_SIMERR_OPT",
61
  "DISKIDX_OPT",
62
  "DISK_OPT",
63
  "DISK_TEMPLATE_OPT",
64
  "DRAINED_OPT",
65
  "DRBD_HELPER_OPT",
66
  "EARLY_RELEASE_OPT",
67
  "ENABLED_HV_OPT",
68
  "ERROR_CODES_OPT",
69
  "FIELDS_OPT",
70
  "FILESTORE_DIR_OPT",
71
  "FILESTORE_DRIVER_OPT",
72
  "FORCE_OPT",
73
  "FORCE_VARIANT_OPT",
74
  "GLOBAL_FILEDIR_OPT",
75
  "HVLIST_OPT",
76
  "HVOPTS_OPT",
77
  "HYPERVISOR_OPT",
78
  "IALLOCATOR_OPT",
79
  "DEFAULT_IALLOCATOR_OPT",
80
  "IDENTIFY_DEFAULTS_OPT",
81
  "IGNORE_CONSIST_OPT",
82
  "IGNORE_FAILURES_OPT",
83
  "IGNORE_REMOVE_FAILURES_OPT",
84
  "IGNORE_SECONDARIES_OPT",
85
  "IGNORE_SIZE_OPT",
86
  "MAC_PREFIX_OPT",
87
  "MAINTAIN_NODE_HEALTH_OPT",
88
  "MASTER_NETDEV_OPT",
89
  "MC_OPT",
90
  "NET_OPT",
91
  "NEW_CLUSTER_CERT_OPT",
92
  "NEW_CLUSTER_DOMAIN_SECRET_OPT",
93
  "NEW_CONFD_HMAC_KEY_OPT",
94
  "NEW_RAPI_CERT_OPT",
95
  "NEW_SECONDARY_OPT",
96
  "NIC_PARAMS_OPT",
97
  "NODE_LIST_OPT",
98
  "NODE_PLACEMENT_OPT",
99
  "NODRBD_STORAGE_OPT",
100
  "NOHDR_OPT",
101
  "NOIPCHECK_OPT",
102
  "NO_INSTALL_OPT",
103
  "NONAMECHECK_OPT",
104
  "NOLVM_STORAGE_OPT",
105
  "NOMODIFY_ETCHOSTS_OPT",
106
  "NOMODIFY_SSH_SETUP_OPT",
107
  "NONICS_OPT",
108
  "NONLIVE_OPT",
109
  "NONPLUS1_OPT",
110
  "NOSHUTDOWN_OPT",
111
  "NOSTART_OPT",
112
  "NOSSH_KEYCHECK_OPT",
113
  "NOVOTING_OPT",
114
  "NWSYNC_OPT",
115
  "ON_PRIMARY_OPT",
116
  "ON_SECONDARY_OPT",
117
  "OFFLINE_OPT",
118
  "OSPARAMS_OPT",
119
  "OS_OPT",
120
  "OS_SIZE_OPT",
121
  "RAPI_CERT_OPT",
122
  "READD_OPT",
123
  "REBOOT_TYPE_OPT",
124
  "REMOVE_INSTANCE_OPT",
125
  "REMOVE_UIDS_OPT",
126
  "ROMAN_OPT",
127
  "SECONDARY_IP_OPT",
128
  "SELECT_OS_OPT",
129
  "SEP_OPT",
130
  "SHOWCMD_OPT",
131
  "SHUTDOWN_TIMEOUT_OPT",
132
  "SINGLE_NODE_OPT",
133
  "SRC_DIR_OPT",
134
  "SRC_NODE_OPT",
135
  "SUBMIT_OPT",
136
  "STATIC_OPT",
137
  "SYNC_OPT",
138
  "TAG_SRC_OPT",
139
  "TIMEOUT_OPT",
140
  "UIDPOOL_OPT",
141
  "USEUNITS_OPT",
142
  "USE_REPL_NET_OPT",
143
  "VERBOSE_OPT",
144
  "VG_NAME_OPT",
145
  "YES_DOIT_OPT",
146
  # Generic functions for CLI programs
147
  "GenericMain",
148
  "GenericInstanceCreate",
149
  "GetClient",
150
  "GetOnlineNodes",
151
  "JobExecutor",
152
  "JobSubmittedException",
153
  "ParseTimespec",
154
  "RunWhileClusterStopped",
155
  "SubmitOpCode",
156
  "SubmitOrSend",
157
  "UsesRPC",
158
  # Formatting functions
159
  "ToStderr", "ToStdout",
160
  "FormatError",
161
  "GenerateTable",
162
  "AskUser",
163
  "FormatTimestamp",
164
  # Tags functions
165
  "ListTags",
166
  "AddTags",
167
  "RemoveTags",
168
  # command line options support infrastructure
169
  "ARGS_MANY_INSTANCES",
170
  "ARGS_MANY_NODES",
171
  "ARGS_NONE",
172
  "ARGS_ONE_INSTANCE",
173
  "ARGS_ONE_NODE",
174
  "ARGS_ONE_OS",
175
  "ArgChoice",
176
  "ArgCommand",
177
  "ArgFile",
178
  "ArgHost",
179
  "ArgInstance",
180
  "ArgJobId",
181
  "ArgNode",
182
  "ArgOs",
183
  "ArgSuggest",
184
  "ArgUnknown",
185
  "OPT_COMPL_INST_ADD_NODES",
186
  "OPT_COMPL_MANY_NODES",
187
  "OPT_COMPL_ONE_IALLOCATOR",
188
  "OPT_COMPL_ONE_INSTANCE",
189
  "OPT_COMPL_ONE_NODE",
190
  "OPT_COMPL_ONE_OS",
191
  "cli_option",
192
  "SplitNodeOption",
193
  "CalculateOSNames",
194
  ]
195

    
196
NO_PREFIX = "no_"
197
UN_PREFIX = "-"
198

    
199

    
200
class _Argument:
201
  def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
202
    self.min = min
203
    self.max = max
204

    
205
  def __repr__(self):
206
    return ("<%s min=%s max=%s>" %
207
            (self.__class__.__name__, self.min, self.max))
208

    
209

    
210
class ArgSuggest(_Argument):
211
  """Suggesting argument.
212

213
  Value can be any of the ones passed to the constructor.
214

215
  """
216
  # pylint: disable-msg=W0622
217
  def __init__(self, min=0, max=None, choices=None):
218
    _Argument.__init__(self, min=min, max=max)
219
    self.choices = choices
220

    
221
  def __repr__(self):
222
    return ("<%s min=%s max=%s choices=%r>" %
223
            (self.__class__.__name__, self.min, self.max, self.choices))
224

    
225

    
226
class ArgChoice(ArgSuggest):
227
  """Choice argument.
228

229
  Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
230
  but value must be one of the choices.
231

232
  """
233

    
234

    
235
class ArgUnknown(_Argument):
236
  """Unknown argument to program (e.g. determined at runtime).
237

238
  """
239

    
240

    
241
class ArgInstance(_Argument):
242
  """Instances argument.
243

244
  """
245

    
246

    
247
class ArgNode(_Argument):
248
  """Node argument.
249

250
  """
251

    
252
class ArgJobId(_Argument):
253
  """Job ID argument.
254

255
  """
256

    
257

    
258
class ArgFile(_Argument):
259
  """File path argument.
260

261
  """
262

    
263

    
264
class ArgCommand(_Argument):
265
  """Command argument.
266

267
  """
268

    
269

    
270
class ArgHost(_Argument):
271
  """Host argument.
272

273
  """
274

    
275

    
276
class ArgOs(_Argument):
277
  """OS argument.
278

279
  """
280

    
281

    
282
ARGS_NONE = []
283
ARGS_MANY_INSTANCES = [ArgInstance()]
284
ARGS_MANY_NODES = [ArgNode()]
285
ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
286
ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
287
ARGS_ONE_OS = [ArgOs(min=1, max=1)]
288

    
289

    
290
def _ExtractTagsObject(opts, args):
291
  """Extract the tag type object.
292

293
  Note that this function will modify its args parameter.
294

295
  """
296
  if not hasattr(opts, "tag_type"):
297
    raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
298
  kind = opts.tag_type
299
  if kind == constants.TAG_CLUSTER:
300
    retval = kind, kind
301
  elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
302
    if not args:
303
      raise errors.OpPrereqError("no arguments passed to the command")
304
    name = args.pop(0)
305
    retval = kind, name
306
  else:
307
    raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
308
  return retval
309

    
310

    
311
def _ExtendTags(opts, args):
312
  """Extend the args if a source file has been given.
313

314
  This function will extend the tags with the contents of the file
315
  passed in the 'tags_source' attribute of the opts parameter. A file
316
  named '-' will be replaced by stdin.
317

318
  """
319
  fname = opts.tags_source
320
  if fname is None:
321
    return
322
  if fname == "-":
323
    new_fh = sys.stdin
324
  else:
325
    new_fh = open(fname, "r")
326
  new_data = []
327
  try:
328
    # we don't use the nice 'new_data = [line.strip() for line in fh]'
329
    # because of python bug 1633941
330
    while True:
331
      line = new_fh.readline()
332
      if not line:
333
        break
334
      new_data.append(line.strip())
335
  finally:
336
    new_fh.close()
337
  args.extend(new_data)
338

    
339

    
340
def ListTags(opts, args):
341
  """List the tags on a given object.
342

343
  This is a generic implementation that knows how to deal with all
344
  three cases of tag objects (cluster, node, instance). The opts
345
  argument is expected to contain a tag_type field denoting what
346
  object type we work on.
347

348
  """
349
  kind, name = _ExtractTagsObject(opts, args)
350
  cl = GetClient()
351
  result = cl.QueryTags(kind, name)
352
  result = list(result)
353
  result.sort()
354
  for tag in result:
355
    ToStdout(tag)
356

    
357

    
358
def AddTags(opts, args):
359
  """Add tags on a given object.
360

361
  This is a generic implementation that knows how to deal with all
362
  three cases of tag objects (cluster, node, instance). The opts
363
  argument is expected to contain a tag_type field denoting what
364
  object type we work on.
365

366
  """
367
  kind, name = _ExtractTagsObject(opts, args)
368
  _ExtendTags(opts, args)
369
  if not args:
370
    raise errors.OpPrereqError("No tags to be added")
371
  op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
372
  SubmitOpCode(op)
373

    
374

    
375
def RemoveTags(opts, args):
376
  """Remove tags from a given object.
377

378
  This is a generic implementation that knows how to deal with all
379
  three cases of tag objects (cluster, node, instance). The opts
380
  argument is expected to contain a tag_type field denoting what
381
  object type we work on.
382

383
  """
384
  kind, name = _ExtractTagsObject(opts, args)
385
  _ExtendTags(opts, args)
386
  if not args:
387
    raise errors.OpPrereqError("No tags to be removed")
388
  op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
389
  SubmitOpCode(op)
390

    
391

    
392
def check_unit(option, opt, value): # pylint: disable-msg=W0613
393
  """OptParsers custom converter for units.
394

395
  """
396
  try:
397
    return utils.ParseUnit(value)
398
  except errors.UnitParseError, err:
399
    raise OptionValueError("option %s: %s" % (opt, err))
400

    
401

    
402
def _SplitKeyVal(opt, data):
403
  """Convert a KeyVal string into a dict.
404

405
  This function will convert a key=val[,...] string into a dict. Empty
406
  values will be converted specially: keys which have the prefix 'no_'
407
  will have the value=False and the prefix stripped, the others will
408
  have value=True.
409

410
  @type opt: string
411
  @param opt: a string holding the option name for which we process the
412
      data, used in building error messages
413
  @type data: string
414
  @param data: a string of the format key=val,key=val,...
415
  @rtype: dict
416
  @return: {key=val, key=val}
417
  @raises errors.ParameterError: if there are duplicate keys
418

419
  """
420
  kv_dict = {}
421
  if data:
422
    for elem in utils.UnescapeAndSplit(data, sep=","):
423
      if "=" in elem:
424
        key, val = elem.split("=", 1)
425
      else:
426
        if elem.startswith(NO_PREFIX):
427
          key, val = elem[len(NO_PREFIX):], False
428
        elif elem.startswith(UN_PREFIX):
429
          key, val = elem[len(UN_PREFIX):], None
430
        else:
431
          key, val = elem, True
432
      if key in kv_dict:
433
        raise errors.ParameterError("Duplicate key '%s' in option %s" %
434
                                    (key, opt))
435
      kv_dict[key] = val
436
  return kv_dict
437

    
438

    
439
def check_ident_key_val(option, opt, value):  # pylint: disable-msg=W0613
440
  """Custom parser for ident:key=val,key=val options.
441

442
  This will store the parsed values as a tuple (ident, {key: val}). As such,
443
  multiple uses of this option via action=append is possible.
444

445
  """
446
  if ":" not in value:
447
    ident, rest = value, ''
448
  else:
449
    ident, rest = value.split(":", 1)
450

    
451
  if ident.startswith(NO_PREFIX):
452
    if rest:
453
      msg = "Cannot pass options when removing parameter groups: %s" % value
454
      raise errors.ParameterError(msg)
455
    retval = (ident[len(NO_PREFIX):], False)
456
  elif ident.startswith(UN_PREFIX):
457
    if rest:
458
      msg = "Cannot pass options when removing parameter groups: %s" % value
459
      raise errors.ParameterError(msg)
460
    retval = (ident[len(UN_PREFIX):], None)
461
  else:
462
    kv_dict = _SplitKeyVal(opt, rest)
463
    retval = (ident, kv_dict)
464
  return retval
465

    
466

    
467
def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
468
  """Custom parser class for key=val,key=val options.
469

470
  This will store the parsed values as a dict {key: val}.
471

472
  """
473
  return _SplitKeyVal(opt, value)
474

    
475

    
476
def check_bool(option, opt, value): # pylint: disable-msg=W0613
477
  """Custom parser for yes/no options.
478

479
  This will store the parsed value as either True or False.
480

481
  """
482
  value = value.lower()
483
  if value == constants.VALUE_FALSE or value == "no":
484
    return False
485
  elif value == constants.VALUE_TRUE or value == "yes":
486
    return True
487
  else:
488
    raise errors.ParameterError("Invalid boolean value '%s'" % value)
489

    
490

    
491
# completion_suggestion is normally a list. Using numeric values not evaluating
492
# to False for dynamic completion.
493
(OPT_COMPL_MANY_NODES,
494
 OPT_COMPL_ONE_NODE,
495
 OPT_COMPL_ONE_INSTANCE,
496
 OPT_COMPL_ONE_OS,
497
 OPT_COMPL_ONE_IALLOCATOR,
498
 OPT_COMPL_INST_ADD_NODES) = range(100, 106)
499

    
500
OPT_COMPL_ALL = frozenset([
501
  OPT_COMPL_MANY_NODES,
502
  OPT_COMPL_ONE_NODE,
503
  OPT_COMPL_ONE_INSTANCE,
504
  OPT_COMPL_ONE_OS,
505
  OPT_COMPL_ONE_IALLOCATOR,
506
  OPT_COMPL_INST_ADD_NODES,
507
  ])
508

    
509

    
510
class CliOption(Option):
511
  """Custom option class for optparse.
512

513
  """
514
  ATTRS = Option.ATTRS + [
515
    "completion_suggest",
516
    ]
517
  TYPES = Option.TYPES + (
518
    "identkeyval",
519
    "keyval",
520
    "unit",
521
    "bool",
522
    )
523
  TYPE_CHECKER = Option.TYPE_CHECKER.copy()
524
  TYPE_CHECKER["identkeyval"] = check_ident_key_val
525
  TYPE_CHECKER["keyval"] = check_key_val
526
  TYPE_CHECKER["unit"] = check_unit
527
  TYPE_CHECKER["bool"] = check_bool
528

    
529

    
530
# optparse.py sets make_option, so we do it for our own option class, too
531
cli_option = CliOption
532

    
533

    
534
_YORNO = "yes|no"
535

    
536
DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
537
                       help="Increase debugging level")
538

    
539
NOHDR_OPT = cli_option("--no-headers", default=False,
540
                       action="store_true", dest="no_headers",
541
                       help="Don't display column headers")
542

    
543
SEP_OPT = cli_option("--separator", default=None,
544
                     action="store", dest="separator",
545
                     help=("Separator between output fields"
546
                           " (defaults to one space)"))
547

    
548
USEUNITS_OPT = cli_option("--units", default=None,
549
                          dest="units", choices=('h', 'm', 'g', 't'),
550
                          help="Specify units for output (one of hmgt)")
551

    
552
FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
553
                        type="string", metavar="FIELDS",
554
                        help="Comma separated list of output fields")
555

    
556
FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
557
                       default=False, help="Force the operation")
558

    
559
CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
560
                         default=False, help="Do not require confirmation")
561

    
562
TAG_SRC_OPT = cli_option("--from", dest="tags_source",
563
                         default=None, help="File with tag names")
564

    
565
SUBMIT_OPT = cli_option("--submit", dest="submit_only",
566
                        default=False, action="store_true",
567
                        help=("Submit the job and return the job ID, but"
568
                              " don't wait for the job to finish"))
569

    
570
SYNC_OPT = cli_option("--sync", dest="do_locking",
571
                      default=False, action="store_true",
572
                      help=("Grab locks while doing the queries"
573
                            " in order to ensure more consistent results"))
574

    
575
_DRY_RUN_OPT = cli_option("--dry-run", default=False,
576
                          action="store_true",
577
                          help=("Do not execute the operation, just run the"
578
                                " check steps and verify it it could be"
579
                                " executed"))
580

    
581
VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
582
                         action="store_true",
583
                         help="Increase the verbosity of the operation")
584

    
585
DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
586
                              action="store_true", dest="simulate_errors",
587
                              help="Debugging option that makes the operation"
588
                              " treat most runtime checks as failed")
589

    
590
NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
591
                        default=True, action="store_false",
592
                        help="Don't wait for sync (DANGEROUS!)")
593

    
594
DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
595
                               help="Custom disk setup (diskless, file,"
596
                               " plain or drbd)",
597
                               default=None, metavar="TEMPL",
598
                               choices=list(constants.DISK_TEMPLATES))
599

    
600
NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
601
                        help="Do not create any network cards for"
602
                        " the instance")
603

    
604
FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
605
                               help="Relative path under default cluster-wide"
606
                               " file storage dir to store file-based disks",
607
                               default=None, metavar="<DIR>")
608

    
609
FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
610
                                  help="Driver to use for image files",
611
                                  default="loop", metavar="<DRIVER>",
612
                                  choices=list(constants.FILE_DRIVER))
613

    
614
IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
615
                            help="Select nodes for the instance automatically"
616
                            " using the <NAME> iallocator plugin",
617
                            default=None, type="string",
618
                            completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
619

    
620
DEFAULT_IALLOCATOR_OPT = cli_option("-I", "--default-iallocator",
621
                            metavar="<NAME>",
622
                            help="Set the default instance allocator plugin",
623
                            default=None, type="string",
624
                            completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
625

    
626
OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
627
                    metavar="<os>",
628
                    completion_suggest=OPT_COMPL_ONE_OS)
629

    
630
OSPARAMS_OPT = cli_option("-O", "--os-parameters", dest="osparams",
631
                         type="keyval", default={},
632
                         help="OS parameters")
633

    
634
FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
635
                               action="store_true", default=False,
636
                               help="Force an unknown variant")
637

    
638
NO_INSTALL_OPT = cli_option("--no-install", dest="no_install",
639
                            action="store_true", default=False,
640
                            help="Do not install the OS (will"
641
                            " enable no-start)")
642

    
643
BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
644
                         type="keyval", default={},
645
                         help="Backend parameters")
646

    
647
HVOPTS_OPT =  cli_option("-H", "--hypervisor-parameters", type="keyval",
648
                         default={}, dest="hvparams",
649
                         help="Hypervisor parameters")
650

    
651
HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
652
                            help="Hypervisor and hypervisor options, in the"
653
                            " format hypervisor:option=value,option=value,...",
654
                            default=None, type="identkeyval")
655

    
656
HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
657
                        help="Hypervisor and hypervisor options, in the"
658
                        " format hypervisor:option=value,option=value,...",
659
                        default=[], action="append", type="identkeyval")
660

    
661
NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
662
                           action="store_false",
663
                           help="Don't check that the instance's IP"
664
                           " is alive")
665

    
666
NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
667
                             default=True, action="store_false",
668
                             help="Don't check that the instance's name"
669
                             " is resolvable")
670

    
671
NET_OPT = cli_option("--net",
672
                     help="NIC parameters", default=[],
673
                     dest="nics", action="append", type="identkeyval")
674

    
675
DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
676
                      dest="disks", action="append", type="identkeyval")
677

    
678
DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
679
                         help="Comma-separated list of disks"
680
                         " indices to act on (e.g. 0,2) (optional,"
681
                         " defaults to all disks)")
682

    
683
OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
684
                         help="Enforces a single-disk configuration using the"
685
                         " given disk size, in MiB unless a suffix is used",
686
                         default=None, type="unit", metavar="<size>")
687

    
688
IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
689
                                dest="ignore_consistency",
690
                                action="store_true", default=False,
691
                                help="Ignore the consistency of the disks on"
692
                                " the secondary")
693

    
694
NONLIVE_OPT = cli_option("--non-live", dest="live",
695
                         default=True, action="store_false",
696
                         help="Do a non-live migration (this usually means"
697
                         " freeze the instance, save the state, transfer and"
698
                         " only then resume running on the secondary node)")
699

    
700
NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
701
                                help="Target node and optional secondary node",
702
                                metavar="<pnode>[:<snode>]",
703
                                completion_suggest=OPT_COMPL_INST_ADD_NODES)
704

    
705
NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
706
                           action="append", metavar="<node>",
707
                           help="Use only this node (can be used multiple"
708
                           " times, if not given defaults to all nodes)",
709
                           completion_suggest=OPT_COMPL_ONE_NODE)
710

    
711
SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
712
                             metavar="<node>",
713
                             completion_suggest=OPT_COMPL_ONE_NODE)
714

    
715
NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
716
                         action="store_false",
717
                         help="Don't start the instance after creation")
718

    
719
SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
720
                         action="store_true", default=False,
721
                         help="Show command instead of executing it")
722

    
723
CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
724
                         default=False, action="store_true",
725
                         help="Instead of performing the migration, try to"
726
                         " recover from a failed cleanup. This is safe"
727
                         " to run even if the instance is healthy, but it"
728
                         " will create extra replication traffic and "
729
                         " disrupt briefly the replication (like during the"
730
                         " migration")
731

    
732
STATIC_OPT = cli_option("-s", "--static", dest="static",
733
                        action="store_true", default=False,
734
                        help="Only show configuration data, not runtime data")
735

    
736
ALL_OPT = cli_option("--all", dest="show_all",
737
                     default=False, action="store_true",
738
                     help="Show info on all instances on the cluster."
739
                     " This can take a long time to run, use wisely")
740

    
741
SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
742
                           action="store_true", default=False,
743
                           help="Interactive OS reinstall, lists available"
744
                           " OS templates for selection")
745

    
746
IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
747
                                 action="store_true", default=False,
748
                                 help="Remove the instance from the cluster"
749
                                 " configuration even if there are failures"
750
                                 " during the removal process")
751

    
752
IGNORE_REMOVE_FAILURES_OPT = cli_option("--ignore-remove-failures",
753
                                        dest="ignore_remove_failures",
754
                                        action="store_true", default=False,
755
                                        help="Remove the instance from the"
756
                                        " cluster configuration even if there"
757
                                        " are failures during the removal"
758
                                        " process")
759

    
760
REMOVE_INSTANCE_OPT = cli_option("--remove-instance", dest="remove_instance",
761
                                 action="store_true", default=False,
762
                                 help="Remove the instance from the cluster")
763

    
764
NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
765
                               help="Specifies the new secondary node",
766
                               metavar="NODE", default=None,
767
                               completion_suggest=OPT_COMPL_ONE_NODE)
768

    
769
ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
770
                            default=False, action="store_true",
771
                            help="Replace the disk(s) on the primary"
772
                            " node (only for the drbd template)")
773

    
774
ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
775
                              default=False, action="store_true",
776
                              help="Replace the disk(s) on the secondary"
777
                              " node (only for the drbd template)")
778

    
779
AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
780
                              default=False, action="store_true",
781
                              help="Lock all nodes and auto-promote as needed"
782
                              " to MC status")
783

    
784
AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
785
                              default=False, action="store_true",
786
                              help="Automatically replace faulty disks"
787
                              " (only for the drbd template)")
788

    
789
IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
790
                             default=False, action="store_true",
791
                             help="Ignore current recorded size"
792
                             " (useful for forcing activation when"
793
                             " the recorded size is wrong)")
794

    
795
SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
796
                          metavar="<node>",
797
                          completion_suggest=OPT_COMPL_ONE_NODE)
798

    
799
SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
800
                         metavar="<dir>")
801

    
802
SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
803
                              help="Specify the secondary ip for the node",
804
                              metavar="ADDRESS", default=None)
805

    
806
READD_OPT = cli_option("--readd", dest="readd",
807
                       default=False, action="store_true",
808
                       help="Readd old node after replacing it")
809

    
810
NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
811
                                default=True, action="store_false",
812
                                help="Disable SSH key fingerprint checking")
813

    
814

    
815
MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
816
                    type="bool", default=None, metavar=_YORNO,
817
                    help="Set the master_candidate flag on the node")
818

    
819
OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
820
                         type="bool", default=None,
821
                         help="Set the offline flag on the node")
822

    
823
DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
824
                         type="bool", default=None,
825
                         help="Set the drained flag on the node")
826

    
827
ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
828
                             type="bool", default=None, metavar=_YORNO,
829
                             help="Set the allocatable flag on a volume")
830

    
831
NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
832
                               help="Disable support for lvm based instances"
833
                               " (cluster-wide)",
834
                               action="store_false", default=True)
835

    
836
ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
837
                            dest="enabled_hypervisors",
838
                            help="Comma-separated list of hypervisors",
839
                            type="string", default=None)
840

    
841
NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
842
                            type="keyval", default={},
843
                            help="NIC parameters")
844

    
845
CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
846
                         dest="candidate_pool_size", type="int",
847
                         help="Set the candidate pool size")
848

    
849
VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name",
850
                         help="Enables LVM and specifies the volume group"
851
                         " name (cluster-wide) for disk allocation [xenvg]",
852
                         metavar="VG", default=None)
853

    
854
YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
855
                          help="Destroy cluster", action="store_true")
856

    
857
NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
858
                          help="Skip node agreement check (dangerous)",
859
                          action="store_true", default=False)
860

    
861
MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
862
                            help="Specify the mac prefix for the instance IP"
863
                            " addresses, in the format XX:XX:XX",
864
                            metavar="PREFIX",
865
                            default=None)
866

    
867
MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
868
                               help="Specify the node interface (cluster-wide)"
869
                               " on which the master IP address will be added "
870
                               " [%s]" % constants.DEFAULT_BRIDGE,
871
                               metavar="NETDEV",
872
                               default=constants.DEFAULT_BRIDGE)
873

    
874
GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
875
                                help="Specify the default directory (cluster-"
876
                                "wide) for storing the file-based disks [%s]" %
877
                                constants.DEFAULT_FILE_STORAGE_DIR,
878
                                metavar="DIR",
879
                                default=constants.DEFAULT_FILE_STORAGE_DIR)
880

    
881
NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
882
                                   help="Don't modify /etc/hosts",
883
                                   action="store_false", default=True)
884

    
885
NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
886
                                    help="Don't initialize SSH keys",
887
                                    action="store_false", default=True)
888

    
889
ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
890
                             help="Enable parseable error messages",
891
                             action="store_true", default=False)
892

    
893
NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
894
                          help="Skip N+1 memory redundancy tests",
895
                          action="store_true", default=False)
896

    
897
REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
898
                             help="Type of reboot: soft/hard/full",
899
                             default=constants.INSTANCE_REBOOT_HARD,
900
                             metavar="<REBOOT>",
901
                             choices=list(constants.REBOOT_TYPES))
902

    
903
IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
904
                                    dest="ignore_secondaries",
905
                                    default=False, action="store_true",
906
                                    help="Ignore errors from secondaries")
907

    
908
NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
909
                            action="store_false", default=True,
910
                            help="Don't shutdown the instance (unsafe)")
911

    
912
TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
913
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
914
                         help="Maximum time to wait")
915

    
916
SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
917
                         dest="shutdown_timeout", type="int",
918
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
919
                         help="Maximum time to wait for instance shutdown")
920

    
921
EARLY_RELEASE_OPT = cli_option("--early-release",
922
                               dest="early_release", default=False,
923
                               action="store_true",
924
                               help="Release the locks on the secondary"
925
                               " node(s) early")
926

    
927
NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
928
                                  dest="new_cluster_cert",
929
                                  default=False, action="store_true",
930
                                  help="Generate a new cluster certificate")
931

    
932
RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
933
                           default=None,
934
                           help="File containing new RAPI certificate")
935

    
936
NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
937
                               default=None, action="store_true",
938
                               help=("Generate a new self-signed RAPI"
939
                                     " certificate"))
940

    
941
NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
942
                                    dest="new_confd_hmac_key",
943
                                    default=False, action="store_true",
944
                                    help=("Create a new HMAC key for %s" %
945
                                          constants.CONFD))
946

    
947
CLUSTER_DOMAIN_SECRET_OPT = cli_option("--cluster-domain-secret",
948
                                       dest="cluster_domain_secret",
949
                                       default=None,
950
                                       help=("Load new new cluster domain"
951
                                             " secret from file"))
952

    
953
NEW_CLUSTER_DOMAIN_SECRET_OPT = cli_option("--new-cluster-domain-secret",
954
                                           dest="new_cluster_domain_secret",
955
                                           default=False, action="store_true",
956
                                           help=("Create a new cluster domain"
957
                                                 " secret"))
958

    
959
USE_REPL_NET_OPT = cli_option("--use-replication-network",
960
                              dest="use_replication_network",
961
                              help="Whether to use the replication network"
962
                              " for talking to the nodes",
963
                              action="store_true", default=False)
964

    
965
MAINTAIN_NODE_HEALTH_OPT = \
966
    cli_option("--maintain-node-health", dest="maintain_node_health",
967
               metavar=_YORNO, default=None, type="bool",
968
               help="Configure the cluster to automatically maintain node"
969
               " health, by shutting down unknown instances, shutting down"
970
               " unknown DRBD devices, etc.")
971

    
972
IDENTIFY_DEFAULTS_OPT = \
973
    cli_option("--identify-defaults", dest="identify_defaults",
974
               default=False, action="store_true",
975
               help="Identify which saved instance parameters are equal to"
976
               " the current cluster defaults and set them as such, instead"
977
               " of marking them as overridden")
978

    
979
UIDPOOL_OPT = cli_option("--uid-pool", default=None,
980
                         action="store", dest="uid_pool",
981
                         help=("A list of user-ids or user-id"
982
                               " ranges separated by commas"))
983

    
984
ADD_UIDS_OPT = cli_option("--add-uids", default=None,
985
                          action="store", dest="add_uids",
986
                          help=("A list of user-ids or user-id"
987
                                " ranges separated by commas, to be"
988
                                " added to the user-id pool"))
989

    
990
REMOVE_UIDS_OPT = cli_option("--remove-uids", default=None,
991
                             action="store", dest="remove_uids",
992
                             help=("A list of user-ids or user-id"
993
                                   " ranges separated by commas, to be"
994
                                   " removed from the user-id pool"))
995

    
996
ROMAN_OPT = cli_option("--roman",
997
                       dest="roman_integers", default=False,
998
                       action="store_true",
999
                       help="Use roman numbers for positive integers")
1000

    
1001
DRBD_HELPER_OPT = cli_option("--drbd-usermode-helper", dest="drbd_helper",
1002
                             action="store", default=None,
1003
                             help="Specifies usermode helper for DRBD")
1004

    
1005
NODRBD_STORAGE_OPT = cli_option("--no-drbd-storage", dest="drbd_storage",
1006
                                action="store_false", default=True,
1007
                                help="Disable support for DRBD")
1008

    
1009

    
1010
def _ParseArgs(argv, commands, aliases):
1011
  """Parser for the command line arguments.
1012

1013
  This function parses the arguments and returns the function which
1014
  must be executed together with its (modified) arguments.
1015

1016
  @param argv: the command line
1017
  @param commands: dictionary with special contents, see the design
1018
      doc for cmdline handling
1019
  @param aliases: dictionary with command aliases {'alias': 'target, ...}
1020

1021
  """
1022
  if len(argv) == 0:
1023
    binary = "<command>"
1024
  else:
1025
    binary = argv[0].split("/")[-1]
1026

    
1027
  if len(argv) > 1 and argv[1] == "--version":
1028
    ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
1029
    # Quit right away. That way we don't have to care about this special
1030
    # argument. optparse.py does it the same.
1031
    sys.exit(0)
1032

    
1033
  if len(argv) < 2 or not (argv[1] in commands or
1034
                           argv[1] in aliases):
1035
    # let's do a nice thing
1036
    sortedcmds = commands.keys()
1037
    sortedcmds.sort()
1038

    
1039
    ToStdout("Usage: %s {command} [options...] [argument...]", binary)
1040
    ToStdout("%s <command> --help to see details, or man %s", binary, binary)
1041
    ToStdout("")
1042

    
1043
    # compute the max line length for cmd + usage
1044
    mlen = max([len(" %s" % cmd) for cmd in commands])
1045
    mlen = min(60, mlen) # should not get here...
1046

    
1047
    # and format a nice command list
1048
    ToStdout("Commands:")
1049
    for cmd in sortedcmds:
1050
      cmdstr = " %s" % (cmd,)
1051
      help_text = commands[cmd][4]
1052
      help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
1053
      ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
1054
      for line in help_lines:
1055
        ToStdout("%-*s   %s", mlen, "", line)
1056

    
1057
    ToStdout("")
1058

    
1059
    return None, None, None
1060

    
1061
  # get command, unalias it, and look it up in commands
1062
  cmd = argv.pop(1)
1063
  if cmd in aliases:
1064
    if cmd in commands:
1065
      raise errors.ProgrammerError("Alias '%s' overrides an existing"
1066
                                   " command" % cmd)
1067

    
1068
    if aliases[cmd] not in commands:
1069
      raise errors.ProgrammerError("Alias '%s' maps to non-existing"
1070
                                   " command '%s'" % (cmd, aliases[cmd]))
1071

    
1072
    cmd = aliases[cmd]
1073

    
1074
  func, args_def, parser_opts, usage, description = commands[cmd]
1075
  parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT, DEBUG_OPT],
1076
                        description=description,
1077
                        formatter=TitledHelpFormatter(),
1078
                        usage="%%prog %s %s" % (cmd, usage))
1079
  parser.disable_interspersed_args()
1080
  options, args = parser.parse_args()
1081

    
1082
  if not _CheckArguments(cmd, args_def, args):
1083
    return None, None, None
1084

    
1085
  return func, options, args
1086

    
1087

    
1088
def _CheckArguments(cmd, args_def, args):
1089
  """Verifies the arguments using the argument definition.
1090

1091
  Algorithm:
1092

1093
    1. Abort with error if values specified by user but none expected.
1094

1095
    1. For each argument in definition
1096

1097
      1. Keep running count of minimum number of values (min_count)
1098
      1. Keep running count of maximum number of values (max_count)
1099
      1. If it has an unlimited number of values
1100

1101
        1. Abort with error if it's not the last argument in the definition
1102

1103
    1. If last argument has limited number of values
1104

1105
      1. Abort with error if number of values doesn't match or is too large
1106

1107
    1. Abort with error if user didn't pass enough values (min_count)
1108

1109
  """
1110
  if args and not args_def:
1111
    ToStderr("Error: Command %s expects no arguments", cmd)
1112
    return False
1113

    
1114
  min_count = None
1115
  max_count = None
1116
  check_max = None
1117

    
1118
  last_idx = len(args_def) - 1
1119

    
1120
  for idx, arg in enumerate(args_def):
1121
    if min_count is None:
1122
      min_count = arg.min
1123
    elif arg.min is not None:
1124
      min_count += arg.min
1125

    
1126
    if max_count is None:
1127
      max_count = arg.max
1128
    elif arg.max is not None:
1129
      max_count += arg.max
1130

    
1131
    if idx == last_idx:
1132
      check_max = (arg.max is not None)
1133

    
1134
    elif arg.max is None:
1135
      raise errors.ProgrammerError("Only the last argument can have max=None")
1136

    
1137
  if check_max:
1138
    # Command with exact number of arguments
1139
    if (min_count is not None and max_count is not None and
1140
        min_count == max_count and len(args) != min_count):
1141
      ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
1142
      return False
1143

    
1144
    # Command with limited number of arguments
1145
    if max_count is not None and len(args) > max_count:
1146
      ToStderr("Error: Command %s expects only %d argument(s)",
1147
               cmd, max_count)
1148
      return False
1149

    
1150
  # Command with some required arguments
1151
  if min_count is not None and len(args) < min_count:
1152
    ToStderr("Error: Command %s expects at least %d argument(s)",
1153
             cmd, min_count)
1154
    return False
1155

    
1156
  return True
1157

    
1158

    
1159
def SplitNodeOption(value):
1160
  """Splits the value of a --node option.
1161

1162
  """
1163
  if value and ':' in value:
1164
    return value.split(':', 1)
1165
  else:
1166
    return (value, None)
1167

    
1168

    
1169
def CalculateOSNames(os_name, os_variants):
1170
  """Calculates all the names an OS can be called, according to its variants.
1171

1172
  @type os_name: string
1173
  @param os_name: base name of the os
1174
  @type os_variants: list or None
1175
  @param os_variants: list of supported variants
1176
  @rtype: list
1177
  @return: list of valid names
1178

1179
  """
1180
  if os_variants:
1181
    return ['%s+%s' % (os_name, v) for v in os_variants]
1182
  else:
1183
    return [os_name]
1184

    
1185

    
1186
def UsesRPC(fn):
1187
  def wrapper(*args, **kwargs):
1188
    rpc.Init()
1189
    try:
1190
      return fn(*args, **kwargs)
1191
    finally:
1192
      rpc.Shutdown()
1193
  return wrapper
1194

    
1195

    
1196
def AskUser(text, choices=None):
1197
  """Ask the user a question.
1198

1199
  @param text: the question to ask
1200

1201
  @param choices: list with elements tuples (input_char, return_value,
1202
      description); if not given, it will default to: [('y', True,
1203
      'Perform the operation'), ('n', False, 'Do no do the operation')];
1204
      note that the '?' char is reserved for help
1205

1206
  @return: one of the return values from the choices list; if input is
1207
      not possible (i.e. not running with a tty, we return the last
1208
      entry from the list
1209

1210
  """
1211
  if choices is None:
1212
    choices = [('y', True, 'Perform the operation'),
1213
               ('n', False, 'Do not perform the operation')]
1214
  if not choices or not isinstance(choices, list):
1215
    raise errors.ProgrammerError("Invalid choices argument to AskUser")
1216
  for entry in choices:
1217
    if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
1218
      raise errors.ProgrammerError("Invalid choices element to AskUser")
1219

    
1220
  answer = choices[-1][1]
1221
  new_text = []
1222
  for line in text.splitlines():
1223
    new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
1224
  text = "\n".join(new_text)
1225
  try:
1226
    f = file("/dev/tty", "a+")
1227
  except IOError:
1228
    return answer
1229
  try:
1230
    chars = [entry[0] for entry in choices]
1231
    chars[-1] = "[%s]" % chars[-1]
1232
    chars.append('?')
1233
    maps = dict([(entry[0], entry[1]) for entry in choices])
1234
    while True:
1235
      f.write(text)
1236
      f.write('\n')
1237
      f.write("/".join(chars))
1238
      f.write(": ")
1239
      line = f.readline(2).strip().lower()
1240
      if line in maps:
1241
        answer = maps[line]
1242
        break
1243
      elif line == '?':
1244
        for entry in choices:
1245
          f.write(" %s - %s\n" % (entry[0], entry[2]))
1246
        f.write("\n")
1247
        continue
1248
  finally:
1249
    f.close()
1250
  return answer
1251

    
1252

    
1253
class JobSubmittedException(Exception):
1254
  """Job was submitted, client should exit.
1255

1256
  This exception has one argument, the ID of the job that was
1257
  submitted. The handler should print this ID.
1258

1259
  This is not an error, just a structured way to exit from clients.
1260

1261
  """
1262

    
1263

    
1264
def SendJob(ops, cl=None):
1265
  """Function to submit an opcode without waiting for the results.
1266

1267
  @type ops: list
1268
  @param ops: list of opcodes
1269
  @type cl: luxi.Client
1270
  @param cl: the luxi client to use for communicating with the master;
1271
             if None, a new client will be created
1272

1273
  """
1274
  if cl is None:
1275
    cl = GetClient()
1276

    
1277
  job_id = cl.SubmitJob(ops)
1278

    
1279
  return job_id
1280

    
1281

    
1282
def GenericPollJob(job_id, cbs, report_cbs):
1283
  """Generic job-polling function.
1284

1285
  @type job_id: number
1286
  @param job_id: Job ID
1287
  @type cbs: Instance of L{JobPollCbBase}
1288
  @param cbs: Data callbacks
1289
  @type report_cbs: Instance of L{JobPollReportCbBase}
1290
  @param report_cbs: Reporting callbacks
1291

1292
  """
1293
  prev_job_info = None
1294
  prev_logmsg_serial = None
1295

    
1296
  status = None
1297

    
1298
  while True:
1299
    result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1300
                                      prev_logmsg_serial)
1301
    if not result:
1302
      # job not found, go away!
1303
      raise errors.JobLost("Job with id %s lost" % job_id)
1304

    
1305
    if result == constants.JOB_NOTCHANGED:
1306
      report_cbs.ReportNotChanged(job_id, status)
1307

    
1308
      # Wait again
1309
      continue
1310

    
1311
    # Split result, a tuple of (field values, log entries)
1312
    (job_info, log_entries) = result
1313
    (status, ) = job_info
1314

    
1315
    if log_entries:
1316
      for log_entry in log_entries:
1317
        (serial, timestamp, log_type, message) = log_entry
1318
        report_cbs.ReportLogMessage(job_id, serial, timestamp,
1319
                                    log_type, message)
1320
        prev_logmsg_serial = max(prev_logmsg_serial, serial)
1321

    
1322
    # TODO: Handle canceled and archived jobs
1323
    elif status in (constants.JOB_STATUS_SUCCESS,
1324
                    constants.JOB_STATUS_ERROR,
1325
                    constants.JOB_STATUS_CANCELING,
1326
                    constants.JOB_STATUS_CANCELED):
1327
      break
1328

    
1329
    prev_job_info = job_info
1330

    
1331
  jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1332
  if not jobs:
1333
    raise errors.JobLost("Job with id %s lost" % job_id)
1334

    
1335
  status, opstatus, result = jobs[0]
1336

    
1337
  if status == constants.JOB_STATUS_SUCCESS:
1338
    return result
1339

    
1340
  if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
1341
    raise errors.OpExecError("Job was canceled")
1342

    
1343
  has_ok = False
1344
  for idx, (status, msg) in enumerate(zip(opstatus, result)):
1345
    if status == constants.OP_STATUS_SUCCESS:
1346
      has_ok = True
1347
    elif status == constants.OP_STATUS_ERROR:
1348
      errors.MaybeRaise(msg)
1349

    
1350
      if has_ok:
1351
        raise errors.OpExecError("partial failure (opcode %d): %s" %
1352
                                 (idx, msg))
1353

    
1354
      raise errors.OpExecError(str(msg))
1355

    
1356
  # default failure mode
1357
  raise errors.OpExecError(result)
1358

    
1359

    
1360
class JobPollCbBase:
1361
  """Base class for L{GenericPollJob} callbacks.
1362

1363
  """
1364
  def __init__(self):
1365
    """Initializes this class.
1366

1367
    """
1368

    
1369
  def WaitForJobChangeOnce(self, job_id, fields,
1370
                           prev_job_info, prev_log_serial):
1371
    """Waits for changes on a job.
1372

1373
    """
1374
    raise NotImplementedError()
1375

    
1376
  def QueryJobs(self, job_ids, fields):
1377
    """Returns the selected fields for the selected job IDs.
1378

1379
    @type job_ids: list of numbers
1380
    @param job_ids: Job IDs
1381
    @type fields: list of strings
1382
    @param fields: Fields
1383

1384
    """
1385
    raise NotImplementedError()
1386

    
1387

    
1388
class JobPollReportCbBase:
1389
  """Base class for L{GenericPollJob} reporting callbacks.
1390

1391
  """
1392
  def __init__(self):
1393
    """Initializes this class.
1394

1395
    """
1396

    
1397
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1398
    """Handles a log message.
1399

1400
    """
1401
    raise NotImplementedError()
1402

    
1403
  def ReportNotChanged(self, job_id, status):
1404
    """Called for if a job hasn't changed in a while.
1405

1406
    @type job_id: number
1407
    @param job_id: Job ID
1408
    @type status: string or None
1409
    @param status: Job status if available
1410

1411
    """
1412
    raise NotImplementedError()
1413

    
1414

    
1415
class _LuxiJobPollCb(JobPollCbBase):
1416
  def __init__(self, cl):
1417
    """Initializes this class.
1418

1419
    """
1420
    JobPollCbBase.__init__(self)
1421
    self.cl = cl
1422

    
1423
  def WaitForJobChangeOnce(self, job_id, fields,
1424
                           prev_job_info, prev_log_serial):
1425
    """Waits for changes on a job.
1426

1427
    """
1428
    return self.cl.WaitForJobChangeOnce(job_id, fields,
1429
                                        prev_job_info, prev_log_serial)
1430

    
1431
  def QueryJobs(self, job_ids, fields):
1432
    """Returns the selected fields for the selected job IDs.
1433

1434
    """
1435
    return self.cl.QueryJobs(job_ids, fields)
1436

    
1437

    
1438
class FeedbackFnJobPollReportCb(JobPollReportCbBase):
1439
  def __init__(self, feedback_fn):
1440
    """Initializes this class.
1441

1442
    """
1443
    JobPollReportCbBase.__init__(self)
1444

    
1445
    self.feedback_fn = feedback_fn
1446

    
1447
    assert callable(feedback_fn)
1448

    
1449
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1450
    """Handles a log message.
1451

1452
    """
1453
    self.feedback_fn((timestamp, log_type, log_msg))
1454

    
1455
  def ReportNotChanged(self, job_id, status):
1456
    """Called if a job hasn't changed in a while.
1457

1458
    """
1459
    # Ignore
1460

    
1461

    
1462
class StdioJobPollReportCb(JobPollReportCbBase):
1463
  def __init__(self):
1464
    """Initializes this class.
1465

1466
    """
1467
    JobPollReportCbBase.__init__(self)
1468

    
1469
    self.notified_queued = False
1470
    self.notified_waitlock = False
1471

    
1472
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1473
    """Handles a log message.
1474

1475
    """
1476
    ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
1477
             utils.SafeEncode(log_msg))
1478

    
1479
  def ReportNotChanged(self, job_id, status):
1480
    """Called if a job hasn't changed in a while.
1481

1482
    """
1483
    if status is None:
1484
      return
1485

    
1486
    if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
1487
      ToStderr("Job %s is waiting in queue", job_id)
1488
      self.notified_queued = True
1489

    
1490
    elif status == constants.JOB_STATUS_WAITLOCK and not self.notified_waitlock:
1491
      ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1492
      self.notified_waitlock = True
1493

    
1494

    
1495
def PollJob(job_id, cl=None, feedback_fn=None):
1496
  """Function to poll for the result of a job.
1497

1498
  @type job_id: job identified
1499
  @param job_id: the job to poll for results
1500
  @type cl: luxi.Client
1501
  @param cl: the luxi client to use for communicating with the master;
1502
             if None, a new client will be created
1503

1504
  """
1505
  if cl is None:
1506
    cl = GetClient()
1507

    
1508
  if feedback_fn:
1509
    reporter = FeedbackFnJobPollReportCb(feedback_fn)
1510
  else:
1511
    reporter = StdioJobPollReportCb()
1512

    
1513
  return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
1514

    
1515

    
1516
def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):
1517
  """Legacy function to submit an opcode.
1518

1519
  This is just a simple wrapper over the construction of the processor
1520
  instance. It should be extended to better handle feedback and
1521
  interaction functions.
1522

1523
  """
1524
  if cl is None:
1525
    cl = GetClient()
1526

    
1527
  SetGenericOpcodeOpts([op], opts)
1528

    
1529
  job_id = SendJob([op], cl)
1530

    
1531
  op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
1532

    
1533
  return op_results[0]
1534

    
1535

    
1536
def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1537
  """Wrapper around SubmitOpCode or SendJob.
1538

1539
  This function will decide, based on the 'opts' parameter, whether to
1540
  submit and wait for the result of the opcode (and return it), or
1541
  whether to just send the job and print its identifier. It is used in
1542
  order to simplify the implementation of the '--submit' option.
1543

1544
  It will also process the opcodes if we're sending the via SendJob
1545
  (otherwise SubmitOpCode does it).
1546

1547
  """
1548
  if opts and opts.submit_only:
1549
    job = [op]
1550
    SetGenericOpcodeOpts(job, opts)
1551
    job_id = SendJob(job, cl=cl)
1552
    raise JobSubmittedException(job_id)
1553
  else:
1554
    return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1555

    
1556

    
1557
def SetGenericOpcodeOpts(opcode_list, options):
1558
  """Processor for generic options.
1559

1560
  This function updates the given opcodes based on generic command
1561
  line options (like debug, dry-run, etc.).
1562

1563
  @param opcode_list: list of opcodes
1564
  @param options: command line options or None
1565
  @return: None (in-place modification)
1566

1567
  """
1568
  if not options:
1569
    return
1570
  for op in opcode_list:
1571
    op.dry_run = options.dry_run
1572
    op.debug_level = options.debug
1573

    
1574

    
1575
def GetClient():
1576
  # TODO: Cache object?
1577
  try:
1578
    client = luxi.Client()
1579
  except luxi.NoMasterError:
1580
    ss = ssconf.SimpleStore()
1581

    
1582
    # Try to read ssconf file
1583
    try:
1584
      ss.GetMasterNode()
1585
    except errors.ConfigurationError:
1586
      raise errors.OpPrereqError("Cluster not initialized or this machine is"
1587
                                 " not part of a cluster")
1588

    
1589
    master, myself = ssconf.GetMasterAndMyself(ss=ss)
1590
    if master != myself:
1591
      raise errors.OpPrereqError("This is not the master node, please connect"
1592
                                 " to node '%s' and rerun the command" %
1593
                                 master)
1594
    raise
1595
  return client
1596

    
1597

    
1598
def FormatError(err):
1599
  """Return a formatted error message for a given error.
1600

1601
  This function takes an exception instance and returns a tuple
1602
  consisting of two values: first, the recommended exit code, and
1603
  second, a string describing the error message (not
1604
  newline-terminated).
1605

1606
  """
1607
  retcode = 1
1608
  obuf = StringIO()
1609
  msg = str(err)
1610
  if isinstance(err, errors.ConfigurationError):
1611
    txt = "Corrupt configuration file: %s" % msg
1612
    logging.error(txt)
1613
    obuf.write(txt + "\n")
1614
    obuf.write("Aborting.")
1615
    retcode = 2
1616
  elif isinstance(err, errors.HooksAbort):
1617
    obuf.write("Failure: hooks execution failed:\n")
1618
    for node, script, out in err.args[0]:
1619
      if out:
1620
        obuf.write("  node: %s, script: %s, output: %s\n" %
1621
                   (node, script, out))
1622
      else:
1623
        obuf.write("  node: %s, script: %s (no output)\n" %
1624
                   (node, script))
1625
  elif isinstance(err, errors.HooksFailure):
1626
    obuf.write("Failure: hooks general failure: %s" % msg)
1627
  elif isinstance(err, errors.ResolverError):
1628
    this_host = netutils.HostInfo.SysName()
1629
    if err.args[0] == this_host:
1630
      msg = "Failure: can't resolve my own hostname ('%s')"
1631
    else:
1632
      msg = "Failure: can't resolve hostname '%s'"
1633
    obuf.write(msg % err.args[0])
1634
  elif isinstance(err, errors.OpPrereqError):
1635
    if len(err.args) == 2:
1636
      obuf.write("Failure: prerequisites not met for this"
1637
               " operation:\nerror type: %s, error details:\n%s" %
1638
                 (err.args[1], err.args[0]))
1639
    else:
1640
      obuf.write("Failure: prerequisites not met for this"
1641
                 " operation:\n%s" % msg)
1642
  elif isinstance(err, errors.OpExecError):
1643
    obuf.write("Failure: command execution error:\n%s" % msg)
1644
  elif isinstance(err, errors.TagError):
1645
    obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1646
  elif isinstance(err, errors.JobQueueDrainError):
1647
    obuf.write("Failure: the job queue is marked for drain and doesn't"
1648
               " accept new requests\n")
1649
  elif isinstance(err, errors.JobQueueFull):
1650
    obuf.write("Failure: the job queue is full and doesn't accept new"
1651
               " job submissions until old jobs are archived\n")
1652
  elif isinstance(err, errors.TypeEnforcementError):
1653
    obuf.write("Parameter Error: %s" % msg)
1654
  elif isinstance(err, errors.ParameterError):
1655
    obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1656
  elif isinstance(err, luxi.NoMasterError):
1657
    obuf.write("Cannot communicate with the master daemon.\nIs it running"
1658
               " and listening for connections?")
1659
  elif isinstance(err, luxi.TimeoutError):
1660
    obuf.write("Timeout while talking to the master daemon. Error:\n"
1661
               "%s" % msg)
1662
  elif isinstance(err, luxi.ProtocolError):
1663
    obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1664
               "%s" % msg)
1665
  elif isinstance(err, errors.GenericError):
1666
    obuf.write("Unhandled Ganeti error: %s" % msg)
1667
  elif isinstance(err, JobSubmittedException):
1668
    obuf.write("JobID: %s\n" % err.args[0])
1669
    retcode = 0
1670
  else:
1671
    obuf.write("Unhandled exception: %s" % msg)
1672
  return retcode, obuf.getvalue().rstrip('\n')
1673

    
1674

    
1675
def GenericMain(commands, override=None, aliases=None):
1676
  """Generic main function for all the gnt-* commands.
1677

1678
  Arguments:
1679
    - commands: a dictionary with a special structure, see the design doc
1680
                for command line handling.
1681
    - override: if not None, we expect a dictionary with keys that will
1682
                override command line options; this can be used to pass
1683
                options from the scripts to generic functions
1684
    - aliases: dictionary with command aliases {'alias': 'target, ...}
1685

1686
  """
1687
  # save the program name and the entire command line for later logging
1688
  if sys.argv:
1689
    binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1690
    if len(sys.argv) >= 2:
1691
      binary += " " + sys.argv[1]
1692
      old_cmdline = " ".join(sys.argv[2:])
1693
    else:
1694
      old_cmdline = ""
1695
  else:
1696
    binary = "<unknown program>"
1697
    old_cmdline = ""
1698

    
1699
  if aliases is None:
1700
    aliases = {}
1701

    
1702
  try:
1703
    func, options, args = _ParseArgs(sys.argv, commands, aliases)
1704
  except errors.ParameterError, err:
1705
    result, err_msg = FormatError(err)
1706
    ToStderr(err_msg)
1707
    return 1
1708

    
1709
  if func is None: # parse error
1710
    return 1
1711

    
1712
  if override is not None:
1713
    for key, val in override.iteritems():
1714
      setattr(options, key, val)
1715

    
1716
  utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1717
                     stderr_logging=True, program=binary)
1718

    
1719
  if old_cmdline:
1720
    logging.info("run with arguments '%s'", old_cmdline)
1721
  else:
1722
    logging.info("run with no arguments")
1723

    
1724
  try:
1725
    result = func(options, args)
1726
  except (errors.GenericError, luxi.ProtocolError,
1727
          JobSubmittedException), err:
1728
    result, err_msg = FormatError(err)
1729
    logging.exception("Error during command processing")
1730
    ToStderr(err_msg)
1731

    
1732
  return result
1733

    
1734

    
1735
def GenericInstanceCreate(mode, opts, args):
1736
  """Add an instance to the cluster via either creation or import.
1737

1738
  @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1739
  @param opts: the command line options selected by the user
1740
  @type args: list
1741
  @param args: should contain only one element, the new instance name
1742
  @rtype: int
1743
  @return: the desired exit code
1744

1745
  """
1746
  instance = args[0]
1747

    
1748
  (pnode, snode) = SplitNodeOption(opts.node)
1749

    
1750
  hypervisor = None
1751
  hvparams = {}
1752
  if opts.hypervisor:
1753
    hypervisor, hvparams = opts.hypervisor
1754

    
1755
  if opts.nics:
1756
    try:
1757
      nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
1758
    except ValueError, err:
1759
      raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
1760
    nics = [{}] * nic_max
1761
    for nidx, ndict in opts.nics:
1762
      nidx = int(nidx)
1763
      if not isinstance(ndict, dict):
1764
        msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict)
1765
        raise errors.OpPrereqError(msg)
1766
      nics[nidx] = ndict
1767
  elif opts.no_nics:
1768
    # no nics
1769
    nics = []
1770
  elif mode == constants.INSTANCE_CREATE:
1771
    # default of one nic, all auto
1772
    nics = [{}]
1773
  else:
1774
    # mode == import
1775
    nics = []
1776

    
1777
  if opts.disk_template == constants.DT_DISKLESS:
1778
    if opts.disks or opts.sd_size is not None:
1779
      raise errors.OpPrereqError("Diskless instance but disk"
1780
                                 " information passed")
1781
    disks = []
1782
  else:
1783
    if (not opts.disks and not opts.sd_size
1784
        and mode == constants.INSTANCE_CREATE):
1785
      raise errors.OpPrereqError("No disk information specified")
1786
    if opts.disks and opts.sd_size is not None:
1787
      raise errors.OpPrereqError("Please use either the '--disk' or"
1788
                                 " '-s' option")
1789
    if opts.sd_size is not None:
1790
      opts.disks = [(0, {"size": opts.sd_size})]
1791

    
1792
    if opts.disks:
1793
      try:
1794
        disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1795
      except ValueError, err:
1796
        raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
1797
      disks = [{}] * disk_max
1798
    else:
1799
      disks = []
1800
    for didx, ddict in opts.disks:
1801
      didx = int(didx)
1802
      if not isinstance(ddict, dict):
1803
        msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1804
        raise errors.OpPrereqError(msg)
1805
      elif "size" in ddict:
1806
        if "adopt" in ddict:
1807
          raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1808
                                     " (disk %d)" % didx)
1809
        try:
1810
          ddict["size"] = utils.ParseUnit(ddict["size"])
1811
        except ValueError, err:
1812
          raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1813
                                     (didx, err))
1814
      elif "adopt" in ddict:
1815
        if mode == constants.INSTANCE_IMPORT:
1816
          raise errors.OpPrereqError("Disk adoption not allowed for instance"
1817
                                     " import")
1818
        ddict["size"] = 0
1819
      else:
1820
        raise errors.OpPrereqError("Missing size or adoption source for"
1821
                                   " disk %d" % didx)
1822
      disks[didx] = ddict
1823

    
1824
  utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
1825
  utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1826

    
1827
  if mode == constants.INSTANCE_CREATE:
1828
    start = opts.start
1829
    os_type = opts.os
1830
    force_variant = opts.force_variant
1831
    src_node = None
1832
    src_path = None
1833
    no_install = opts.no_install
1834
    identify_defaults = False
1835
  elif mode == constants.INSTANCE_IMPORT:
1836
    start = False
1837
    os_type = None
1838
    force_variant = False
1839
    src_node = opts.src_node
1840
    src_path = opts.src_dir
1841
    no_install = None
1842
    identify_defaults = opts.identify_defaults
1843
  else:
1844
    raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1845

    
1846
  op = opcodes.OpCreateInstance(instance_name=instance,
1847
                                disks=disks,
1848
                                disk_template=opts.disk_template,
1849
                                nics=nics,
1850
                                pnode=pnode, snode=snode,
1851
                                ip_check=opts.ip_check,
1852
                                name_check=opts.name_check,
1853
                                wait_for_sync=opts.wait_for_sync,
1854
                                file_storage_dir=opts.file_storage_dir,
1855
                                file_driver=opts.file_driver,
1856
                                iallocator=opts.iallocator,
1857
                                hypervisor=hypervisor,
1858
                                hvparams=hvparams,
1859
                                beparams=opts.beparams,
1860
                                osparams=opts.osparams,
1861
                                mode=mode,
1862
                                start=start,
1863
                                os_type=os_type,
1864
                                force_variant=force_variant,
1865
                                src_node=src_node,
1866
                                src_path=src_path,
1867
                                no_install=no_install,
1868
                                identify_defaults=identify_defaults)
1869

    
1870
  SubmitOrSend(op, opts)
1871
  return 0
1872

    
1873

    
1874
class _RunWhileClusterStoppedHelper:
1875
  """Helper class for L{RunWhileClusterStopped} to simplify state management
1876

1877
  """
1878
  def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
1879
    """Initializes this class.
1880

1881
    @type feedback_fn: callable
1882
    @param feedback_fn: Feedback function
1883
    @type cluster_name: string
1884
    @param cluster_name: Cluster name
1885
    @type master_node: string
1886
    @param master_node Master node name
1887
    @type online_nodes: list
1888
    @param online_nodes: List of names of online nodes
1889

1890
    """
1891
    self.feedback_fn = feedback_fn
1892
    self.cluster_name = cluster_name
1893
    self.master_node = master_node
1894
    self.online_nodes = online_nodes
1895

    
1896
    self.ssh = ssh.SshRunner(self.cluster_name)
1897

    
1898
    self.nonmaster_nodes = [name for name in online_nodes
1899
                            if name != master_node]
1900

    
1901
    assert self.master_node not in self.nonmaster_nodes
1902

    
1903
  def _RunCmd(self, node_name, cmd):
1904
    """Runs a command on the local or a remote machine.
1905

1906
    @type node_name: string
1907
    @param node_name: Machine name
1908
    @type cmd: list
1909
    @param cmd: Command
1910

1911
    """
1912
    if node_name is None or node_name == self.master_node:
1913
      # No need to use SSH
1914
      result = utils.RunCmd(cmd)
1915
    else:
1916
      result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
1917

    
1918
    if result.failed:
1919
      errmsg = ["Failed to run command %s" % result.cmd]
1920
      if node_name:
1921
        errmsg.append("on node %s" % node_name)
1922
      errmsg.append(": exitcode %s and error %s" %
1923
                    (result.exit_code, result.output))
1924
      raise errors.OpExecError(" ".join(errmsg))
1925

    
1926
  def Call(self, fn, *args):
1927
    """Call function while all daemons are stopped.
1928

1929
    @type fn: callable
1930
    @param fn: Function to be called
1931

1932
    """
1933
    # Pause watcher by acquiring an exclusive lock on watcher state file
1934
    self.feedback_fn("Blocking watcher")
1935
    watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
1936
    try:
1937
      # TODO: Currently, this just blocks. There's no timeout.
1938
      # TODO: Should it be a shared lock?
1939
      watcher_block.Exclusive(blocking=True)
1940

    
1941
      # Stop master daemons, so that no new jobs can come in and all running
1942
      # ones are finished
1943
      self.feedback_fn("Stopping master daemons")
1944
      self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
1945
      try:
1946
        # Stop daemons on all nodes
1947
        for node_name in self.online_nodes:
1948
          self.feedback_fn("Stopping daemons on %s" % node_name)
1949
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
1950

    
1951
        # All daemons are shut down now
1952
        try:
1953
          return fn(self, *args)
1954
        except Exception, err:
1955
          _, errmsg = FormatError(err)
1956
          logging.exception("Caught exception")
1957
          self.feedback_fn(errmsg)
1958
          raise
1959
      finally:
1960
        # Start cluster again, master node last
1961
        for node_name in self.nonmaster_nodes + [self.master_node]:
1962
          self.feedback_fn("Starting daemons on %s" % node_name)
1963
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
1964
    finally:
1965
      # Resume watcher
1966
      watcher_block.Close()
1967

    
1968

    
1969
def RunWhileClusterStopped(feedback_fn, fn, *args):
1970
  """Calls a function while all cluster daemons are stopped.
1971

1972
  @type feedback_fn: callable
1973
  @param feedback_fn: Feedback function
1974
  @type fn: callable
1975
  @param fn: Function to be called when daemons are stopped
1976

1977
  """
1978
  feedback_fn("Gathering cluster information")
1979

    
1980
  # This ensures we're running on the master daemon
1981
  cl = GetClient()
1982

    
1983
  (cluster_name, master_node) = \
1984
    cl.QueryConfigValues(["cluster_name", "master_node"])
1985

    
1986
  online_nodes = GetOnlineNodes([], cl=cl)
1987

    
1988
  # Don't keep a reference to the client. The master daemon will go away.
1989
  del cl
1990

    
1991
  assert master_node in online_nodes
1992

    
1993
  return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
1994
                                       online_nodes).Call(fn, *args)
1995

    
1996

    
1997
def GenerateTable(headers, fields, separator, data,
1998
                  numfields=None, unitfields=None,
1999
                  units=None):
2000
  """Prints a table with headers and different fields.
2001

2002
  @type headers: dict
2003
  @param headers: dictionary mapping field names to headers for
2004
      the table
2005
  @type fields: list
2006
  @param fields: the field names corresponding to each row in
2007
      the data field
2008
  @param separator: the separator to be used; if this is None,
2009
      the default 'smart' algorithm is used which computes optimal
2010
      field width, otherwise just the separator is used between
2011
      each field
2012
  @type data: list
2013
  @param data: a list of lists, each sublist being one row to be output
2014
  @type numfields: list
2015
  @param numfields: a list with the fields that hold numeric
2016
      values and thus should be right-aligned
2017
  @type unitfields: list
2018
  @param unitfields: a list with the fields that hold numeric
2019
      values that should be formatted with the units field
2020
  @type units: string or None
2021
  @param units: the units we should use for formatting, or None for
2022
      automatic choice (human-readable for non-separator usage, otherwise
2023
      megabytes); this is a one-letter string
2024

2025
  """
2026
  if units is None:
2027
    if separator:
2028
      units = "m"
2029
    else:
2030
      units = "h"
2031

    
2032
  if numfields is None:
2033
    numfields = []
2034
  if unitfields is None:
2035
    unitfields = []
2036

    
2037
  numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
2038
  unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
2039

    
2040
  format_fields = []
2041
  for field in fields:
2042
    if headers and field not in headers:
2043
      # TODO: handle better unknown fields (either revert to old
2044
      # style of raising exception, or deal more intelligently with
2045
      # variable fields)
2046
      headers[field] = field
2047
    if separator is not None:
2048
      format_fields.append("%s")
2049
    elif numfields.Matches(field):
2050
      format_fields.append("%*s")
2051
    else:
2052
      format_fields.append("%-*s")
2053

    
2054
  if separator is None:
2055
    mlens = [0 for name in fields]
2056
    format_str = ' '.join(format_fields)
2057
  else:
2058
    format_str = separator.replace("%", "%%").join(format_fields)
2059

    
2060
  for row in data:
2061
    if row is None:
2062
      continue
2063
    for idx, val in enumerate(row):
2064
      if unitfields.Matches(fields[idx]):
2065
        try:
2066
          val = int(val)
2067
        except (TypeError, ValueError):
2068
          pass
2069
        else:
2070
          val = row[idx] = utils.FormatUnit(val, units)
2071
      val = row[idx] = str(val)
2072
      if separator is None:
2073
        mlens[idx] = max(mlens[idx], len(val))
2074

    
2075
  result = []
2076
  if headers:
2077
    args = []
2078
    for idx, name in enumerate(fields):
2079
      hdr = headers[name]
2080
      if separator is None:
2081
        mlens[idx] = max(mlens[idx], len(hdr))
2082
        args.append(mlens[idx])
2083
      args.append(hdr)
2084
    result.append(format_str % tuple(args))
2085

    
2086
  if separator is None:
2087
    assert len(mlens) == len(fields)
2088

    
2089
    if fields and not numfields.Matches(fields[-1]):
2090
      mlens[-1] = 0
2091

    
2092
  for line in data:
2093
    args = []
2094
    if line is None:
2095
      line = ['-' for _ in fields]
2096
    for idx in range(len(fields)):
2097
      if separator is None:
2098
        args.append(mlens[idx])
2099
      args.append(line[idx])
2100
    result.append(format_str % tuple(args))
2101

    
2102
  return result
2103

    
2104

    
2105
def FormatTimestamp(ts):
2106
  """Formats a given timestamp.
2107

2108
  @type ts: timestamp
2109
  @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
2110

2111
  @rtype: string
2112
  @return: a string with the formatted timestamp
2113

2114
  """
2115
  if not isinstance (ts, (tuple, list)) or len(ts) != 2:
2116
    return '?'
2117
  sec, usec = ts
2118
  return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
2119

    
2120

    
2121
def ParseTimespec(value):
2122
  """Parse a time specification.
2123

2124
  The following suffixed will be recognized:
2125

2126
    - s: seconds
2127
    - m: minutes
2128
    - h: hours
2129
    - d: day
2130
    - w: weeks
2131

2132
  Without any suffix, the value will be taken to be in seconds.
2133

2134
  """
2135
  value = str(value)
2136
  if not value:
2137
    raise errors.OpPrereqError("Empty time specification passed")
2138
  suffix_map = {
2139
    's': 1,
2140
    'm': 60,
2141
    'h': 3600,
2142
    'd': 86400,
2143
    'w': 604800,
2144
    }
2145
  if value[-1] not in suffix_map:
2146
    try:
2147
      value = int(value)
2148
    except (TypeError, ValueError):
2149
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2150
  else:
2151
    multiplier = suffix_map[value[-1]]
2152
    value = value[:-1]
2153
    if not value: # no data left after stripping the suffix
2154
      raise errors.OpPrereqError("Invalid time specification (only"
2155
                                 " suffix passed)")
2156
    try:
2157
      value = int(value) * multiplier
2158
    except (TypeError, ValueError):
2159
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2160
  return value
2161

    
2162

    
2163
def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
2164
                   filter_master=False):
2165
  """Returns the names of online nodes.
2166

2167
  This function will also log a warning on stderr with the names of
2168
  the online nodes.
2169

2170
  @param nodes: if not empty, use only this subset of nodes (minus the
2171
      offline ones)
2172
  @param cl: if not None, luxi client to use
2173
  @type nowarn: boolean
2174
  @param nowarn: by default, this function will output a note with the
2175
      offline nodes that are skipped; if this parameter is True the
2176
      note is not displayed
2177
  @type secondary_ips: boolean
2178
  @param secondary_ips: if True, return the secondary IPs instead of the
2179
      names, useful for doing network traffic over the replication interface
2180
      (if any)
2181
  @type filter_master: boolean
2182
  @param filter_master: if True, do not return the master node in the list
2183
      (useful in coordination with secondary_ips where we cannot check our
2184
      node name against the list)
2185

2186
  """
2187
  if cl is None:
2188
    cl = GetClient()
2189

    
2190
  if secondary_ips:
2191
    name_idx = 2
2192
  else:
2193
    name_idx = 0
2194

    
2195
  if filter_master:
2196
    master_node = cl.QueryConfigValues(["master_node"])[0]
2197
    filter_fn = lambda x: x != master_node
2198
  else:
2199
    filter_fn = lambda _: True
2200

    
2201
  result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"],
2202
                         use_locking=False)
2203
  offline = [row[0] for row in result if row[1]]
2204
  if offline and not nowarn:
2205
    ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
2206
  return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])]
2207

    
2208

    
2209
def _ToStream(stream, txt, *args):
2210
  """Write a message to a stream, bypassing the logging system
2211

2212
  @type stream: file object
2213
  @param stream: the file to which we should write
2214
  @type txt: str
2215
  @param txt: the message
2216

2217
  """
2218
  if args:
2219
    args = tuple(args)
2220
    stream.write(txt % args)
2221
  else:
2222
    stream.write(txt)
2223
  stream.write('\n')
2224
  stream.flush()
2225

    
2226

    
2227
def ToStdout(txt, *args):
2228
  """Write a message to stdout only, bypassing the logging system
2229

2230
  This is just a wrapper over _ToStream.
2231

2232
  @type txt: str
2233
  @param txt: the message
2234

2235
  """
2236
  _ToStream(sys.stdout, txt, *args)
2237

    
2238

    
2239
def ToStderr(txt, *args):
2240
  """Write a message to stderr only, bypassing the logging system
2241

2242
  This is just a wrapper over _ToStream.
2243

2244
  @type txt: str
2245
  @param txt: the message
2246

2247
  """
2248
  _ToStream(sys.stderr, txt, *args)
2249

    
2250

    
2251
class JobExecutor(object):
2252
  """Class which manages the submission and execution of multiple jobs.
2253

2254
  Note that instances of this class should not be reused between
2255
  GetResults() calls.
2256

2257
  """
2258
  def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
2259
    self.queue = []
2260
    if cl is None:
2261
      cl = GetClient()
2262
    self.cl = cl
2263
    self.verbose = verbose
2264
    self.jobs = []
2265
    self.opts = opts
2266
    self.feedback_fn = feedback_fn
2267

    
2268
  def QueueJob(self, name, *ops):
2269
    """Record a job for later submit.
2270

2271
    @type name: string
2272
    @param name: a description of the job, will be used in WaitJobSet
2273
    """
2274
    SetGenericOpcodeOpts(ops, self.opts)
2275
    self.queue.append((name, ops))
2276

    
2277
  def SubmitPending(self, each=False):
2278
    """Submit all pending jobs.
2279

2280
    """
2281
    if each:
2282
      results = []
2283
      for row in self.queue:
2284
        # SubmitJob will remove the success status, but raise an exception if
2285
        # the submission fails, so we'll notice that anyway.
2286
        results.append([True, self.cl.SubmitJob(row[1])])
2287
    else:
2288
      results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
2289
    for (idx, ((status, data), (name, _))) in enumerate(zip(results,
2290
                                                            self.queue)):
2291
      self.jobs.append((idx, status, data, name))
2292

    
2293
  def _ChooseJob(self):
2294
    """Choose a non-waiting/queued job to poll next.
2295

2296
    """
2297
    assert self.jobs, "_ChooseJob called with empty job list"
2298

    
2299
    result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
2300
    assert result
2301

    
2302
    for job_data, status in zip(self.jobs, result):
2303
      if status[0] in (constants.JOB_STATUS_QUEUED,
2304
                    constants.JOB_STATUS_WAITLOCK,
2305
                    constants.JOB_STATUS_CANCELING):
2306
        # job is still waiting
2307
        continue
2308
      # good candidate found
2309
      self.jobs.remove(job_data)
2310
      return job_data
2311

    
2312
    # no job found
2313
    return self.jobs.pop(0)
2314

    
2315
  def GetResults(self):
2316
    """Wait for and return the results of all jobs.
2317

2318
    @rtype: list
2319
    @return: list of tuples (success, job results), in the same order
2320
        as the submitted jobs; if a job has failed, instead of the result
2321
        there will be the error message
2322

2323
    """
2324
    if not self.jobs:
2325
      self.SubmitPending()
2326
    results = []
2327
    if self.verbose:
2328
      ok_jobs = [row[2] for row in self.jobs if row[1]]
2329
      if ok_jobs:
2330
        ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2331

    
2332
    # first, remove any non-submitted jobs
2333
    self.jobs, failures = compat.partition(self.jobs, lambda x: x[1])
2334
    for idx, _, jid, name in failures:
2335
      ToStderr("Failed to submit job for %s: %s", name, jid)
2336
      results.append((idx, False, jid))
2337

    
2338
    while self.jobs:
2339
      (idx, _, jid, name) = self._ChooseJob()
2340
      ToStdout("Waiting for job %s for %s...", jid, name)
2341
      try:
2342
        job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2343
        success = True
2344
      except (errors.GenericError, luxi.ProtocolError), err:
2345
        _, job_result = FormatError(err)
2346
        success = False
2347
        # the error message will always be shown, verbose or not
2348
        ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
2349

    
2350
      results.append((idx, success, job_result))
2351

    
2352
    # sort based on the index, then drop it
2353
    results.sort()
2354
    results = [i[1:] for i in results]
2355

    
2356
    return results
2357

    
2358
  def WaitOrShow(self, wait):
2359
    """Wait for job results or only print the job IDs.
2360

2361
    @type wait: boolean
2362
    @param wait: whether to wait or not
2363

2364
    """
2365
    if wait:
2366
      return self.GetResults()
2367
    else:
2368
      if not self.jobs:
2369
        self.SubmitPending()
2370
      for _, status, result, name in self.jobs:
2371
        if status:
2372
          ToStdout("%s: %s", result, name)
2373
        else:
2374
          ToStderr("Failure for %s: %s", name, result)
2375
      return [row[1:3] for row in self.jobs]