Statistics
| Branch: | Tag: | Revision:

root / lib / cli.py @ bf4af505

History | View | Annotate | Download (76.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module dealing with command line parsing"""
23

    
24

    
25
import sys
26
import textwrap
27
import os.path
28
import time
29
import logging
30
from cStringIO import StringIO
31

    
32
from ganeti import utils
33
from ganeti import errors
34
from ganeti import constants
35
from ganeti import opcodes
36
from ganeti import luxi
37
from ganeti import ssconf
38
from ganeti import rpc
39
from ganeti import ssh
40
from ganeti import compat
41

    
42
from optparse import (OptionParser, TitledHelpFormatter,
43
                      Option, OptionValueError)
44

    
45

    
46
__all__ = [
47
  # Command line options
48
  "ADD_UIDS_OPT",
49
  "ALLOCATABLE_OPT",
50
  "ALL_OPT",
51
  "AUTO_PROMOTE_OPT",
52
  "AUTO_REPLACE_OPT",
53
  "BACKEND_OPT",
54
  "CLEANUP_OPT",
55
  "CLUSTER_DOMAIN_SECRET_OPT",
56
  "CONFIRM_OPT",
57
  "CP_SIZE_OPT",
58
  "DEBUG_OPT",
59
  "DEBUG_SIMERR_OPT",
60
  "DISKIDX_OPT",
61
  "DISK_OPT",
62
  "DISK_TEMPLATE_OPT",
63
  "DRAINED_OPT",
64
  "DRBD_HELPER_OPT",
65
  "EARLY_RELEASE_OPT",
66
  "ENABLED_HV_OPT",
67
  "ERROR_CODES_OPT",
68
  "FIELDS_OPT",
69
  "FILESTORE_DIR_OPT",
70
  "FILESTORE_DRIVER_OPT",
71
  "FORCE_OPT",
72
  "FORCE_VARIANT_OPT",
73
  "GLOBAL_FILEDIR_OPT",
74
  "HVLIST_OPT",
75
  "HVOPTS_OPT",
76
  "HYPERVISOR_OPT",
77
  "IALLOCATOR_OPT",
78
  "DEFAULT_IALLOCATOR_OPT",
79
  "IDENTIFY_DEFAULTS_OPT",
80
  "IGNORE_CONSIST_OPT",
81
  "IGNORE_FAILURES_OPT",
82
  "IGNORE_REMOVE_FAILURES_OPT",
83
  "IGNORE_SECONDARIES_OPT",
84
  "IGNORE_SIZE_OPT",
85
  "MAC_PREFIX_OPT",
86
  "MAINTAIN_NODE_HEALTH_OPT",
87
  "MASTER_NETDEV_OPT",
88
  "MC_OPT",
89
  "NET_OPT",
90
  "NEW_CLUSTER_CERT_OPT",
91
  "NEW_CLUSTER_DOMAIN_SECRET_OPT",
92
  "NEW_CONFD_HMAC_KEY_OPT",
93
  "NEW_RAPI_CERT_OPT",
94
  "NEW_SECONDARY_OPT",
95
  "NIC_PARAMS_OPT",
96
  "NODE_LIST_OPT",
97
  "NODE_PLACEMENT_OPT",
98
  "NODRBD_STORAGE_OPT",
99
  "NOHDR_OPT",
100
  "NOIPCHECK_OPT",
101
  "NO_INSTALL_OPT",
102
  "NONAMECHECK_OPT",
103
  "NOLVM_STORAGE_OPT",
104
  "NOMODIFY_ETCHOSTS_OPT",
105
  "NOMODIFY_SSH_SETUP_OPT",
106
  "NONICS_OPT",
107
  "NONLIVE_OPT",
108
  "NONPLUS1_OPT",
109
  "NOSHUTDOWN_OPT",
110
  "NOSTART_OPT",
111
  "NOSSH_KEYCHECK_OPT",
112
  "NOVOTING_OPT",
113
  "NWSYNC_OPT",
114
  "ON_PRIMARY_OPT",
115
  "ON_SECONDARY_OPT",
116
  "OFFLINE_OPT",
117
  "OSPARAMS_OPT",
118
  "OS_OPT",
119
  "OS_SIZE_OPT",
120
  "RAPI_CERT_OPT",
121
  "READD_OPT",
122
  "REBOOT_TYPE_OPT",
123
  "REMOVE_INSTANCE_OPT",
124
  "REMOVE_UIDS_OPT",
125
  "ROMAN_OPT",
126
  "SECONDARY_IP_OPT",
127
  "SELECT_OS_OPT",
128
  "SEP_OPT",
129
  "SHOWCMD_OPT",
130
  "SHUTDOWN_TIMEOUT_OPT",
131
  "SINGLE_NODE_OPT",
132
  "SRC_DIR_OPT",
133
  "SRC_NODE_OPT",
134
  "SUBMIT_OPT",
135
  "STATIC_OPT",
136
  "SYNC_OPT",
137
  "TAG_SRC_OPT",
138
  "TIMEOUT_OPT",
139
  "UIDPOOL_OPT",
140
  "USEUNITS_OPT",
141
  "USE_REPL_NET_OPT",
142
  "VERBOSE_OPT",
143
  "VG_NAME_OPT",
144
  "YES_DOIT_OPT",
145
  # Generic functions for CLI programs
146
  "GenericMain",
147
  "GenericInstanceCreate",
148
  "GetClient",
149
  "GetOnlineNodes",
150
  "JobExecutor",
151
  "JobSubmittedException",
152
  "ParseTimespec",
153
  "RunWhileClusterStopped",
154
  "SubmitOpCode",
155
  "SubmitOrSend",
156
  "UsesRPC",
157
  # Formatting functions
158
  "ToStderr", "ToStdout",
159
  "FormatError",
160
  "GenerateTable",
161
  "AskUser",
162
  "FormatTimestamp",
163
  # Tags functions
164
  "ListTags",
165
  "AddTags",
166
  "RemoveTags",
167
  # command line options support infrastructure
168
  "ARGS_MANY_INSTANCES",
169
  "ARGS_MANY_NODES",
170
  "ARGS_NONE",
171
  "ARGS_ONE_INSTANCE",
172
  "ARGS_ONE_NODE",
173
  "ARGS_ONE_OS",
174
  "ArgChoice",
175
  "ArgCommand",
176
  "ArgFile",
177
  "ArgHost",
178
  "ArgInstance",
179
  "ArgJobId",
180
  "ArgNode",
181
  "ArgOs",
182
  "ArgSuggest",
183
  "ArgUnknown",
184
  "OPT_COMPL_INST_ADD_NODES",
185
  "OPT_COMPL_MANY_NODES",
186
  "OPT_COMPL_ONE_IALLOCATOR",
187
  "OPT_COMPL_ONE_INSTANCE",
188
  "OPT_COMPL_ONE_NODE",
189
  "OPT_COMPL_ONE_OS",
190
  "cli_option",
191
  "SplitNodeOption",
192
  "CalculateOSNames",
193
  ]
194

    
195
NO_PREFIX = "no_"
196
UN_PREFIX = "-"
197

    
198

    
199
class _Argument:
200
  def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
201
    self.min = min
202
    self.max = max
203

    
204
  def __repr__(self):
205
    return ("<%s min=%s max=%s>" %
206
            (self.__class__.__name__, self.min, self.max))
207

    
208

    
209
class ArgSuggest(_Argument):
210
  """Suggesting argument.
211

212
  Value can be any of the ones passed to the constructor.
213

214
  """
215
  # pylint: disable-msg=W0622
216
  def __init__(self, min=0, max=None, choices=None):
217
    _Argument.__init__(self, min=min, max=max)
218
    self.choices = choices
219

    
220
  def __repr__(self):
221
    return ("<%s min=%s max=%s choices=%r>" %
222
            (self.__class__.__name__, self.min, self.max, self.choices))
223

    
224

    
225
class ArgChoice(ArgSuggest):
226
  """Choice argument.
227

228
  Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
229
  but value must be one of the choices.
230

231
  """
232

    
233

    
234
class ArgUnknown(_Argument):
235
  """Unknown argument to program (e.g. determined at runtime).
236

237
  """
238

    
239

    
240
class ArgInstance(_Argument):
241
  """Instances argument.
242

243
  """
244

    
245

    
246
class ArgNode(_Argument):
247
  """Node argument.
248

249
  """
250

    
251
class ArgJobId(_Argument):
252
  """Job ID argument.
253

254
  """
255

    
256

    
257
class ArgFile(_Argument):
258
  """File path argument.
259

260
  """
261

    
262

    
263
class ArgCommand(_Argument):
264
  """Command argument.
265

266
  """
267

    
268

    
269
class ArgHost(_Argument):
270
  """Host argument.
271

272
  """
273

    
274

    
275
class ArgOs(_Argument):
276
  """OS argument.
277

278
  """
279

    
280

    
281
ARGS_NONE = []
282
ARGS_MANY_INSTANCES = [ArgInstance()]
283
ARGS_MANY_NODES = [ArgNode()]
284
ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
285
ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
286
ARGS_ONE_OS = [ArgOs(min=1, max=1)]
287

    
288

    
289
def _ExtractTagsObject(opts, args):
290
  """Extract the tag type object.
291

292
  Note that this function will modify its args parameter.
293

294
  """
295
  if not hasattr(opts, "tag_type"):
296
    raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
297
  kind = opts.tag_type
298
  if kind == constants.TAG_CLUSTER:
299
    retval = kind, kind
300
  elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
301
    if not args:
302
      raise errors.OpPrereqError("no arguments passed to the command")
303
    name = args.pop(0)
304
    retval = kind, name
305
  else:
306
    raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
307
  return retval
308

    
309

    
310
def _ExtendTags(opts, args):
311
  """Extend the args if a source file has been given.
312

313
  This function will extend the tags with the contents of the file
314
  passed in the 'tags_source' attribute of the opts parameter. A file
315
  named '-' will be replaced by stdin.
316

317
  """
318
  fname = opts.tags_source
319
  if fname is None:
320
    return
321
  if fname == "-":
322
    new_fh = sys.stdin
323
  else:
324
    new_fh = open(fname, "r")
325
  new_data = []
326
  try:
327
    # we don't use the nice 'new_data = [line.strip() for line in fh]'
328
    # because of python bug 1633941
329
    while True:
330
      line = new_fh.readline()
331
      if not line:
332
        break
333
      new_data.append(line.strip())
334
  finally:
335
    new_fh.close()
336
  args.extend(new_data)
337

    
338

    
339
def ListTags(opts, args):
340
  """List the tags on a given object.
341

342
  This is a generic implementation that knows how to deal with all
343
  three cases of tag objects (cluster, node, instance). The opts
344
  argument is expected to contain a tag_type field denoting what
345
  object type we work on.
346

347
  """
348
  kind, name = _ExtractTagsObject(opts, args)
349
  cl = GetClient()
350
  result = cl.QueryTags(kind, name)
351
  result = list(result)
352
  result.sort()
353
  for tag in result:
354
    ToStdout(tag)
355

    
356

    
357
def AddTags(opts, args):
358
  """Add tags on a given object.
359

360
  This is a generic implementation that knows how to deal with all
361
  three cases of tag objects (cluster, node, instance). The opts
362
  argument is expected to contain a tag_type field denoting what
363
  object type we work on.
364

365
  """
366
  kind, name = _ExtractTagsObject(opts, args)
367
  _ExtendTags(opts, args)
368
  if not args:
369
    raise errors.OpPrereqError("No tags to be added")
370
  op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
371
  SubmitOpCode(op)
372

    
373

    
374
def RemoveTags(opts, args):
375
  """Remove tags from a given object.
376

377
  This is a generic implementation that knows how to deal with all
378
  three cases of tag objects (cluster, node, instance). The opts
379
  argument is expected to contain a tag_type field denoting what
380
  object type we work on.
381

382
  """
383
  kind, name = _ExtractTagsObject(opts, args)
384
  _ExtendTags(opts, args)
385
  if not args:
386
    raise errors.OpPrereqError("No tags to be removed")
387
  op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
388
  SubmitOpCode(op)
389

    
390

    
391
def check_unit(option, opt, value): # pylint: disable-msg=W0613
392
  """OptParsers custom converter for units.
393

394
  """
395
  try:
396
    return utils.ParseUnit(value)
397
  except errors.UnitParseError, err:
398
    raise OptionValueError("option %s: %s" % (opt, err))
399

    
400

    
401
def _SplitKeyVal(opt, data):
402
  """Convert a KeyVal string into a dict.
403

404
  This function will convert a key=val[,...] string into a dict. Empty
405
  values will be converted specially: keys which have the prefix 'no_'
406
  will have the value=False and the prefix stripped, the others will
407
  have value=True.
408

409
  @type opt: string
410
  @param opt: a string holding the option name for which we process the
411
      data, used in building error messages
412
  @type data: string
413
  @param data: a string of the format key=val,key=val,...
414
  @rtype: dict
415
  @return: {key=val, key=val}
416
  @raises errors.ParameterError: if there are duplicate keys
417

418
  """
419
  kv_dict = {}
420
  if data:
421
    for elem in utils.UnescapeAndSplit(data, sep=","):
422
      if "=" in elem:
423
        key, val = elem.split("=", 1)
424
      else:
425
        if elem.startswith(NO_PREFIX):
426
          key, val = elem[len(NO_PREFIX):], False
427
        elif elem.startswith(UN_PREFIX):
428
          key, val = elem[len(UN_PREFIX):], None
429
        else:
430
          key, val = elem, True
431
      if key in kv_dict:
432
        raise errors.ParameterError("Duplicate key '%s' in option %s" %
433
                                    (key, opt))
434
      kv_dict[key] = val
435
  return kv_dict
436

    
437

    
438
def check_ident_key_val(option, opt, value):  # pylint: disable-msg=W0613
439
  """Custom parser for ident:key=val,key=val options.
440

441
  This will store the parsed values as a tuple (ident, {key: val}). As such,
442
  multiple uses of this option via action=append is possible.
443

444
  """
445
  if ":" not in value:
446
    ident, rest = value, ''
447
  else:
448
    ident, rest = value.split(":", 1)
449

    
450
  if ident.startswith(NO_PREFIX):
451
    if rest:
452
      msg = "Cannot pass options when removing parameter groups: %s" % value
453
      raise errors.ParameterError(msg)
454
    retval = (ident[len(NO_PREFIX):], False)
455
  elif ident.startswith(UN_PREFIX):
456
    if rest:
457
      msg = "Cannot pass options when removing parameter groups: %s" % value
458
      raise errors.ParameterError(msg)
459
    retval = (ident[len(UN_PREFIX):], None)
460
  else:
461
    kv_dict = _SplitKeyVal(opt, rest)
462
    retval = (ident, kv_dict)
463
  return retval
464

    
465

    
466
def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
467
  """Custom parser class for key=val,key=val options.
468

469
  This will store the parsed values as a dict {key: val}.
470

471
  """
472
  return _SplitKeyVal(opt, value)
473

    
474

    
475
def check_bool(option, opt, value): # pylint: disable-msg=W0613
476
  """Custom parser for yes/no options.
477

478
  This will store the parsed value as either True or False.
479

480
  """
481
  value = value.lower()
482
  if value == constants.VALUE_FALSE or value == "no":
483
    return False
484
  elif value == constants.VALUE_TRUE or value == "yes":
485
    return True
486
  else:
487
    raise errors.ParameterError("Invalid boolean value '%s'" % value)
488

    
489

    
490
# completion_suggestion is normally a list. Using numeric values not evaluating
491
# to False for dynamic completion.
492
(OPT_COMPL_MANY_NODES,
493
 OPT_COMPL_ONE_NODE,
494
 OPT_COMPL_ONE_INSTANCE,
495
 OPT_COMPL_ONE_OS,
496
 OPT_COMPL_ONE_IALLOCATOR,
497
 OPT_COMPL_INST_ADD_NODES) = range(100, 106)
498

    
499
OPT_COMPL_ALL = frozenset([
500
  OPT_COMPL_MANY_NODES,
501
  OPT_COMPL_ONE_NODE,
502
  OPT_COMPL_ONE_INSTANCE,
503
  OPT_COMPL_ONE_OS,
504
  OPT_COMPL_ONE_IALLOCATOR,
505
  OPT_COMPL_INST_ADD_NODES,
506
  ])
507

    
508

    
509
class CliOption(Option):
510
  """Custom option class for optparse.
511

512
  """
513
  ATTRS = Option.ATTRS + [
514
    "completion_suggest",
515
    ]
516
  TYPES = Option.TYPES + (
517
    "identkeyval",
518
    "keyval",
519
    "unit",
520
    "bool",
521
    )
522
  TYPE_CHECKER = Option.TYPE_CHECKER.copy()
523
  TYPE_CHECKER["identkeyval"] = check_ident_key_val
524
  TYPE_CHECKER["keyval"] = check_key_val
525
  TYPE_CHECKER["unit"] = check_unit
526
  TYPE_CHECKER["bool"] = check_bool
527

    
528

    
529
# optparse.py sets make_option, so we do it for our own option class, too
530
cli_option = CliOption
531

    
532

    
533
_YORNO = "yes|no"
534

    
535
DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
536
                       help="Increase debugging level")
537

    
538
NOHDR_OPT = cli_option("--no-headers", default=False,
539
                       action="store_true", dest="no_headers",
540
                       help="Don't display column headers")
541

    
542
SEP_OPT = cli_option("--separator", default=None,
543
                     action="store", dest="separator",
544
                     help=("Separator between output fields"
545
                           " (defaults to one space)"))
546

    
547
USEUNITS_OPT = cli_option("--units", default=None,
548
                          dest="units", choices=('h', 'm', 'g', 't'),
549
                          help="Specify units for output (one of hmgt)")
550

    
551
FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
552
                        type="string", metavar="FIELDS",
553
                        help="Comma separated list of output fields")
554

    
555
FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
556
                       default=False, help="Force the operation")
557

    
558
CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
559
                         default=False, help="Do not require confirmation")
560

    
561
TAG_SRC_OPT = cli_option("--from", dest="tags_source",
562
                         default=None, help="File with tag names")
563

    
564
SUBMIT_OPT = cli_option("--submit", dest="submit_only",
565
                        default=False, action="store_true",
566
                        help=("Submit the job and return the job ID, but"
567
                              " don't wait for the job to finish"))
568

    
569
SYNC_OPT = cli_option("--sync", dest="do_locking",
570
                      default=False, action="store_true",
571
                      help=("Grab locks while doing the queries"
572
                            " in order to ensure more consistent results"))
573

    
574
_DRY_RUN_OPT = cli_option("--dry-run", default=False,
575
                          action="store_true",
576
                          help=("Do not execute the operation, just run the"
577
                                " check steps and verify it it could be"
578
                                " executed"))
579

    
580
VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
581
                         action="store_true",
582
                         help="Increase the verbosity of the operation")
583

    
584
DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
585
                              action="store_true", dest="simulate_errors",
586
                              help="Debugging option that makes the operation"
587
                              " treat most runtime checks as failed")
588

    
589
NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
590
                        default=True, action="store_false",
591
                        help="Don't wait for sync (DANGEROUS!)")
592

    
593
DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
594
                               help="Custom disk setup (diskless, file,"
595
                               " plain or drbd)",
596
                               default=None, metavar="TEMPL",
597
                               choices=list(constants.DISK_TEMPLATES))
598

    
599
NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
600
                        help="Do not create any network cards for"
601
                        " the instance")
602

    
603
FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
604
                               help="Relative path under default cluster-wide"
605
                               " file storage dir to store file-based disks",
606
                               default=None, metavar="<DIR>")
607

    
608
FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
609
                                  help="Driver to use for image files",
610
                                  default="loop", metavar="<DRIVER>",
611
                                  choices=list(constants.FILE_DRIVER))
612

    
613
IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
614
                            help="Select nodes for the instance automatically"
615
                            " using the <NAME> iallocator plugin",
616
                            default=None, type="string",
617
                            completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
618

    
619
DEFAULT_IALLOCATOR_OPT = cli_option("-I", "--default-iallocator",
620
                            metavar="<NAME>",
621
                            help="Set the default instance allocator plugin",
622
                            default=None, type="string",
623
                            completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
624

    
625
OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
626
                    metavar="<os>",
627
                    completion_suggest=OPT_COMPL_ONE_OS)
628

    
629
OSPARAMS_OPT = cli_option("-O", "--os-parameters", dest="osparams",
630
                         type="keyval", default={},
631
                         help="OS parameters")
632

    
633
FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
634
                               action="store_true", default=False,
635
                               help="Force an unknown variant")
636

    
637
NO_INSTALL_OPT = cli_option("--no-install", dest="no_install",
638
                            action="store_true", default=False,
639
                            help="Do not install the OS (will"
640
                            " enable no-start)")
641

    
642
BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
643
                         type="keyval", default={},
644
                         help="Backend parameters")
645

    
646
HVOPTS_OPT =  cli_option("-H", "--hypervisor-parameters", type="keyval",
647
                         default={}, dest="hvparams",
648
                         help="Hypervisor parameters")
649

    
650
HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
651
                            help="Hypervisor and hypervisor options, in the"
652
                            " format hypervisor:option=value,option=value,...",
653
                            default=None, type="identkeyval")
654

    
655
HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
656
                        help="Hypervisor and hypervisor options, in the"
657
                        " format hypervisor:option=value,option=value,...",
658
                        default=[], action="append", type="identkeyval")
659

    
660
NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
661
                           action="store_false",
662
                           help="Don't check that the instance's IP"
663
                           " is alive")
664

    
665
NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
666
                             default=True, action="store_false",
667
                             help="Don't check that the instance's name"
668
                             " is resolvable")
669

    
670
NET_OPT = cli_option("--net",
671
                     help="NIC parameters", default=[],
672
                     dest="nics", action="append", type="identkeyval")
673

    
674
DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
675
                      dest="disks", action="append", type="identkeyval")
676

    
677
DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
678
                         help="Comma-separated list of disks"
679
                         " indices to act on (e.g. 0,2) (optional,"
680
                         " defaults to all disks)")
681

    
682
OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
683
                         help="Enforces a single-disk configuration using the"
684
                         " given disk size, in MiB unless a suffix is used",
685
                         default=None, type="unit", metavar="<size>")
686

    
687
IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
688
                                dest="ignore_consistency",
689
                                action="store_true", default=False,
690
                                help="Ignore the consistency of the disks on"
691
                                " the secondary")
692

    
693
NONLIVE_OPT = cli_option("--non-live", dest="live",
694
                         default=True, action="store_false",
695
                         help="Do a non-live migration (this usually means"
696
                         " freeze the instance, save the state, transfer and"
697
                         " only then resume running on the secondary node)")
698

    
699
NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
700
                                help="Target node and optional secondary node",
701
                                metavar="<pnode>[:<snode>]",
702
                                completion_suggest=OPT_COMPL_INST_ADD_NODES)
703

    
704
NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
705
                           action="append", metavar="<node>",
706
                           help="Use only this node (can be used multiple"
707
                           " times, if not given defaults to all nodes)",
708
                           completion_suggest=OPT_COMPL_ONE_NODE)
709

    
710
SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
711
                             metavar="<node>",
712
                             completion_suggest=OPT_COMPL_ONE_NODE)
713

    
714
NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
715
                         action="store_false",
716
                         help="Don't start the instance after creation")
717

    
718
SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
719
                         action="store_true", default=False,
720
                         help="Show command instead of executing it")
721

    
722
CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
723
                         default=False, action="store_true",
724
                         help="Instead of performing the migration, try to"
725
                         " recover from a failed cleanup. This is safe"
726
                         " to run even if the instance is healthy, but it"
727
                         " will create extra replication traffic and "
728
                         " disrupt briefly the replication (like during the"
729
                         " migration")
730

    
731
STATIC_OPT = cli_option("-s", "--static", dest="static",
732
                        action="store_true", default=False,
733
                        help="Only show configuration data, not runtime data")
734

    
735
ALL_OPT = cli_option("--all", dest="show_all",
736
                     default=False, action="store_true",
737
                     help="Show info on all instances on the cluster."
738
                     " This can take a long time to run, use wisely")
739

    
740
SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
741
                           action="store_true", default=False,
742
                           help="Interactive OS reinstall, lists available"
743
                           " OS templates for selection")
744

    
745
IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
746
                                 action="store_true", default=False,
747
                                 help="Remove the instance from the cluster"
748
                                 " configuration even if there are failures"
749
                                 " during the removal process")
750

    
751
IGNORE_REMOVE_FAILURES_OPT = cli_option("--ignore-remove-failures",
752
                                        dest="ignore_remove_failures",
753
                                        action="store_true", default=False,
754
                                        help="Remove the instance from the"
755
                                        " cluster configuration even if there"
756
                                        " are failures during the removal"
757
                                        " process")
758

    
759
REMOVE_INSTANCE_OPT = cli_option("--remove-instance", dest="remove_instance",
760
                                 action="store_true", default=False,
761
                                 help="Remove the instance from the cluster")
762

    
763
NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
764
                               help="Specifies the new secondary node",
765
                               metavar="NODE", default=None,
766
                               completion_suggest=OPT_COMPL_ONE_NODE)
767

    
768
ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
769
                            default=False, action="store_true",
770
                            help="Replace the disk(s) on the primary"
771
                            " node (only for the drbd template)")
772

    
773
ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
774
                              default=False, action="store_true",
775
                              help="Replace the disk(s) on the secondary"
776
                              " node (only for the drbd template)")
777

    
778
AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
779
                              default=False, action="store_true",
780
                              help="Lock all nodes and auto-promote as needed"
781
                              " to MC status")
782

    
783
AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
784
                              default=False, action="store_true",
785
                              help="Automatically replace faulty disks"
786
                              " (only for the drbd template)")
787

    
788
IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
789
                             default=False, action="store_true",
790
                             help="Ignore current recorded size"
791
                             " (useful for forcing activation when"
792
                             " the recorded size is wrong)")
793

    
794
SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
795
                          metavar="<node>",
796
                          completion_suggest=OPT_COMPL_ONE_NODE)
797

    
798
SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
799
                         metavar="<dir>")
800

    
801
SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
802
                              help="Specify the secondary ip for the node",
803
                              metavar="ADDRESS", default=None)
804

    
805
READD_OPT = cli_option("--readd", dest="readd",
806
                       default=False, action="store_true",
807
                       help="Readd old node after replacing it")
808

    
809
NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
810
                                default=True, action="store_false",
811
                                help="Disable SSH key fingerprint checking")
812

    
813

    
814
MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
815
                    type="bool", default=None, metavar=_YORNO,
816
                    help="Set the master_candidate flag on the node")
817

    
818
OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
819
                         type="bool", default=None,
820
                         help="Set the offline flag on the node")
821

    
822
DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
823
                         type="bool", default=None,
824
                         help="Set the drained flag on the node")
825

    
826
ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
827
                             type="bool", default=None, metavar=_YORNO,
828
                             help="Set the allocatable flag on a volume")
829

    
830
NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
831
                               help="Disable support for lvm based instances"
832
                               " (cluster-wide)",
833
                               action="store_false", default=True)
834

    
835
ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
836
                            dest="enabled_hypervisors",
837
                            help="Comma-separated list of hypervisors",
838
                            type="string", default=None)
839

    
840
NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
841
                            type="keyval", default={},
842
                            help="NIC parameters")
843

    
844
CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
845
                         dest="candidate_pool_size", type="int",
846
                         help="Set the candidate pool size")
847

    
848
VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name",
849
                         help="Enables LVM and specifies the volume group"
850
                         " name (cluster-wide) for disk allocation [xenvg]",
851
                         metavar="VG", default=None)
852

    
853
YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
854
                          help="Destroy cluster", action="store_true")
855

    
856
NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
857
                          help="Skip node agreement check (dangerous)",
858
                          action="store_true", default=False)
859

    
860
MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
861
                            help="Specify the mac prefix for the instance IP"
862
                            " addresses, in the format XX:XX:XX",
863
                            metavar="PREFIX",
864
                            default=None)
865

    
866
MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
867
                               help="Specify the node interface (cluster-wide)"
868
                               " on which the master IP address will be added "
869
                               " [%s]" % constants.DEFAULT_BRIDGE,
870
                               metavar="NETDEV",
871
                               default=constants.DEFAULT_BRIDGE)
872

    
873
GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
874
                                help="Specify the default directory (cluster-"
875
                                "wide) for storing the file-based disks [%s]" %
876
                                constants.DEFAULT_FILE_STORAGE_DIR,
877
                                metavar="DIR",
878
                                default=constants.DEFAULT_FILE_STORAGE_DIR)
879

    
880
NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
881
                                   help="Don't modify /etc/hosts",
882
                                   action="store_false", default=True)
883

    
884
NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
885
                                    help="Don't initialize SSH keys",
886
                                    action="store_false", default=True)
887

    
888
ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
889
                             help="Enable parseable error messages",
890
                             action="store_true", default=False)
891

    
892
NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
893
                          help="Skip N+1 memory redundancy tests",
894
                          action="store_true", default=False)
895

    
896
REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
897
                             help="Type of reboot: soft/hard/full",
898
                             default=constants.INSTANCE_REBOOT_HARD,
899
                             metavar="<REBOOT>",
900
                             choices=list(constants.REBOOT_TYPES))
901

    
902
IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
903
                                    dest="ignore_secondaries",
904
                                    default=False, action="store_true",
905
                                    help="Ignore errors from secondaries")
906

    
907
NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
908
                            action="store_false", default=True,
909
                            help="Don't shutdown the instance (unsafe)")
910

    
911
TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
912
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
913
                         help="Maximum time to wait")
914

    
915
SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
916
                         dest="shutdown_timeout", type="int",
917
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
918
                         help="Maximum time to wait for instance shutdown")
919

    
920
EARLY_RELEASE_OPT = cli_option("--early-release",
921
                               dest="early_release", default=False,
922
                               action="store_true",
923
                               help="Release the locks on the secondary"
924
                               " node(s) early")
925

    
926
NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
927
                                  dest="new_cluster_cert",
928
                                  default=False, action="store_true",
929
                                  help="Generate a new cluster certificate")
930

    
931
RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
932
                           default=None,
933
                           help="File containing new RAPI certificate")
934

    
935
NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
936
                               default=None, action="store_true",
937
                               help=("Generate a new self-signed RAPI"
938
                                     " certificate"))
939

    
940
NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
941
                                    dest="new_confd_hmac_key",
942
                                    default=False, action="store_true",
943
                                    help=("Create a new HMAC key for %s" %
944
                                          constants.CONFD))
945

    
946
CLUSTER_DOMAIN_SECRET_OPT = cli_option("--cluster-domain-secret",
947
                                       dest="cluster_domain_secret",
948
                                       default=None,
949
                                       help=("Load new new cluster domain"
950
                                             " secret from file"))
951

    
952
NEW_CLUSTER_DOMAIN_SECRET_OPT = cli_option("--new-cluster-domain-secret",
953
                                           dest="new_cluster_domain_secret",
954
                                           default=False, action="store_true",
955
                                           help=("Create a new cluster domain"
956
                                                 " secret"))
957

    
958
USE_REPL_NET_OPT = cli_option("--use-replication-network",
959
                              dest="use_replication_network",
960
                              help="Whether to use the replication network"
961
                              " for talking to the nodes",
962
                              action="store_true", default=False)
963

    
964
MAINTAIN_NODE_HEALTH_OPT = \
965
    cli_option("--maintain-node-health", dest="maintain_node_health",
966
               metavar=_YORNO, default=None, type="bool",
967
               help="Configure the cluster to automatically maintain node"
968
               " health, by shutting down unknown instances, shutting down"
969
               " unknown DRBD devices, etc.")
970

    
971
IDENTIFY_DEFAULTS_OPT = \
972
    cli_option("--identify-defaults", dest="identify_defaults",
973
               default=False, action="store_true",
974
               help="Identify which saved instance parameters are equal to"
975
               " the current cluster defaults and set them as such, instead"
976
               " of marking them as overridden")
977

    
978
UIDPOOL_OPT = cli_option("--uid-pool", default=None,
979
                         action="store", dest="uid_pool",
980
                         help=("A list of user-ids or user-id"
981
                               " ranges separated by commas"))
982

    
983
ADD_UIDS_OPT = cli_option("--add-uids", default=None,
984
                          action="store", dest="add_uids",
985
                          help=("A list of user-ids or user-id"
986
                                " ranges separated by commas, to be"
987
                                " added to the user-id pool"))
988

    
989
REMOVE_UIDS_OPT = cli_option("--remove-uids", default=None,
990
                             action="store", dest="remove_uids",
991
                             help=("A list of user-ids or user-id"
992
                                   " ranges separated by commas, to be"
993
                                   " removed from the user-id pool"))
994

    
995
ROMAN_OPT = cli_option("--roman",
996
                       dest="roman_integers", default=False,
997
                       action="store_true",
998
                       help="Use roman numbers for positive integers")
999

    
1000
DRBD_HELPER_OPT = cli_option("--drbd-usermode-helper", dest="drbd_helper",
1001
                             action="store", default=None,
1002
                             help="Specifies usermode helper for DRBD")
1003

    
1004
NODRBD_STORAGE_OPT = cli_option("--no-drbd-storage", dest="drbd_storage",
1005
                                action="store_false", default=True,
1006
                                help="Disable support for DRBD")
1007

    
1008

    
1009
def _ParseArgs(argv, commands, aliases):
1010
  """Parser for the command line arguments.
1011

1012
  This function parses the arguments and returns the function which
1013
  must be executed together with its (modified) arguments.
1014

1015
  @param argv: the command line
1016
  @param commands: dictionary with special contents, see the design
1017
      doc for cmdline handling
1018
  @param aliases: dictionary with command aliases {'alias': 'target, ...}
1019

1020
  """
1021
  if len(argv) == 0:
1022
    binary = "<command>"
1023
  else:
1024
    binary = argv[0].split("/")[-1]
1025

    
1026
  if len(argv) > 1 and argv[1] == "--version":
1027
    ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
1028
    # Quit right away. That way we don't have to care about this special
1029
    # argument. optparse.py does it the same.
1030
    sys.exit(0)
1031

    
1032
  if len(argv) < 2 or not (argv[1] in commands or
1033
                           argv[1] in aliases):
1034
    # let's do a nice thing
1035
    sortedcmds = commands.keys()
1036
    sortedcmds.sort()
1037

    
1038
    ToStdout("Usage: %s {command} [options...] [argument...]", binary)
1039
    ToStdout("%s <command> --help to see details, or man %s", binary, binary)
1040
    ToStdout("")
1041

    
1042
    # compute the max line length for cmd + usage
1043
    mlen = max([len(" %s" % cmd) for cmd in commands])
1044
    mlen = min(60, mlen) # should not get here...
1045

    
1046
    # and format a nice command list
1047
    ToStdout("Commands:")
1048
    for cmd in sortedcmds:
1049
      cmdstr = " %s" % (cmd,)
1050
      help_text = commands[cmd][4]
1051
      help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
1052
      ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
1053
      for line in help_lines:
1054
        ToStdout("%-*s   %s", mlen, "", line)
1055

    
1056
    ToStdout("")
1057

    
1058
    return None, None, None
1059

    
1060
  # get command, unalias it, and look it up in commands
1061
  cmd = argv.pop(1)
1062
  if cmd in aliases:
1063
    if cmd in commands:
1064
      raise errors.ProgrammerError("Alias '%s' overrides an existing"
1065
                                   " command" % cmd)
1066

    
1067
    if aliases[cmd] not in commands:
1068
      raise errors.ProgrammerError("Alias '%s' maps to non-existing"
1069
                                   " command '%s'" % (cmd, aliases[cmd]))
1070

    
1071
    cmd = aliases[cmd]
1072

    
1073
  func, args_def, parser_opts, usage, description = commands[cmd]
1074
  parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT, DEBUG_OPT],
1075
                        description=description,
1076
                        formatter=TitledHelpFormatter(),
1077
                        usage="%%prog %s %s" % (cmd, usage))
1078
  parser.disable_interspersed_args()
1079
  options, args = parser.parse_args()
1080

    
1081
  if not _CheckArguments(cmd, args_def, args):
1082
    return None, None, None
1083

    
1084
  return func, options, args
1085

    
1086

    
1087
def _CheckArguments(cmd, args_def, args):
1088
  """Verifies the arguments using the argument definition.
1089

1090
  Algorithm:
1091

1092
    1. Abort with error if values specified by user but none expected.
1093

1094
    1. For each argument in definition
1095

1096
      1. Keep running count of minimum number of values (min_count)
1097
      1. Keep running count of maximum number of values (max_count)
1098
      1. If it has an unlimited number of values
1099

1100
        1. Abort with error if it's not the last argument in the definition
1101

1102
    1. If last argument has limited number of values
1103

1104
      1. Abort with error if number of values doesn't match or is too large
1105

1106
    1. Abort with error if user didn't pass enough values (min_count)
1107

1108
  """
1109
  if args and not args_def:
1110
    ToStderr("Error: Command %s expects no arguments", cmd)
1111
    return False
1112

    
1113
  min_count = None
1114
  max_count = None
1115
  check_max = None
1116

    
1117
  last_idx = len(args_def) - 1
1118

    
1119
  for idx, arg in enumerate(args_def):
1120
    if min_count is None:
1121
      min_count = arg.min
1122
    elif arg.min is not None:
1123
      min_count += arg.min
1124

    
1125
    if max_count is None:
1126
      max_count = arg.max
1127
    elif arg.max is not None:
1128
      max_count += arg.max
1129

    
1130
    if idx == last_idx:
1131
      check_max = (arg.max is not None)
1132

    
1133
    elif arg.max is None:
1134
      raise errors.ProgrammerError("Only the last argument can have max=None")
1135

    
1136
  if check_max:
1137
    # Command with exact number of arguments
1138
    if (min_count is not None and max_count is not None and
1139
        min_count == max_count and len(args) != min_count):
1140
      ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
1141
      return False
1142

    
1143
    # Command with limited number of arguments
1144
    if max_count is not None and len(args) > max_count:
1145
      ToStderr("Error: Command %s expects only %d argument(s)",
1146
               cmd, max_count)
1147
      return False
1148

    
1149
  # Command with some required arguments
1150
  if min_count is not None and len(args) < min_count:
1151
    ToStderr("Error: Command %s expects at least %d argument(s)",
1152
             cmd, min_count)
1153
    return False
1154

    
1155
  return True
1156

    
1157

    
1158
def SplitNodeOption(value):
1159
  """Splits the value of a --node option.
1160

1161
  """
1162
  if value and ':' in value:
1163
    return value.split(':', 1)
1164
  else:
1165
    return (value, None)
1166

    
1167

    
1168
def CalculateOSNames(os_name, os_variants):
1169
  """Calculates all the names an OS can be called, according to its variants.
1170

1171
  @type os_name: string
1172
  @param os_name: base name of the os
1173
  @type os_variants: list or None
1174
  @param os_variants: list of supported variants
1175
  @rtype: list
1176
  @return: list of valid names
1177

1178
  """
1179
  if os_variants:
1180
    return ['%s+%s' % (os_name, v) for v in os_variants]
1181
  else:
1182
    return [os_name]
1183

    
1184

    
1185
def UsesRPC(fn):
1186
  def wrapper(*args, **kwargs):
1187
    rpc.Init()
1188
    try:
1189
      return fn(*args, **kwargs)
1190
    finally:
1191
      rpc.Shutdown()
1192
  return wrapper
1193

    
1194

    
1195
def AskUser(text, choices=None):
1196
  """Ask the user a question.
1197

1198
  @param text: the question to ask
1199

1200
  @param choices: list with elements tuples (input_char, return_value,
1201
      description); if not given, it will default to: [('y', True,
1202
      'Perform the operation'), ('n', False, 'Do no do the operation')];
1203
      note that the '?' char is reserved for help
1204

1205
  @return: one of the return values from the choices list; if input is
1206
      not possible (i.e. not running with a tty, we return the last
1207
      entry from the list
1208

1209
  """
1210
  if choices is None:
1211
    choices = [('y', True, 'Perform the operation'),
1212
               ('n', False, 'Do not perform the operation')]
1213
  if not choices or not isinstance(choices, list):
1214
    raise errors.ProgrammerError("Invalid choices argument to AskUser")
1215
  for entry in choices:
1216
    if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
1217
      raise errors.ProgrammerError("Invalid choices element to AskUser")
1218

    
1219
  answer = choices[-1][1]
1220
  new_text = []
1221
  for line in text.splitlines():
1222
    new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
1223
  text = "\n".join(new_text)
1224
  try:
1225
    f = file("/dev/tty", "a+")
1226
  except IOError:
1227
    return answer
1228
  try:
1229
    chars = [entry[0] for entry in choices]
1230
    chars[-1] = "[%s]" % chars[-1]
1231
    chars.append('?')
1232
    maps = dict([(entry[0], entry[1]) for entry in choices])
1233
    while True:
1234
      f.write(text)
1235
      f.write('\n')
1236
      f.write("/".join(chars))
1237
      f.write(": ")
1238
      line = f.readline(2).strip().lower()
1239
      if line in maps:
1240
        answer = maps[line]
1241
        break
1242
      elif line == '?':
1243
        for entry in choices:
1244
          f.write(" %s - %s\n" % (entry[0], entry[2]))
1245
        f.write("\n")
1246
        continue
1247
  finally:
1248
    f.close()
1249
  return answer
1250

    
1251

    
1252
class JobSubmittedException(Exception):
1253
  """Job was submitted, client should exit.
1254

1255
  This exception has one argument, the ID of the job that was
1256
  submitted. The handler should print this ID.
1257

1258
  This is not an error, just a structured way to exit from clients.
1259

1260
  """
1261

    
1262

    
1263
def SendJob(ops, cl=None):
1264
  """Function to submit an opcode without waiting for the results.
1265

1266
  @type ops: list
1267
  @param ops: list of opcodes
1268
  @type cl: luxi.Client
1269
  @param cl: the luxi client to use for communicating with the master;
1270
             if None, a new client will be created
1271

1272
  """
1273
  if cl is None:
1274
    cl = GetClient()
1275

    
1276
  job_id = cl.SubmitJob(ops)
1277

    
1278
  return job_id
1279

    
1280

    
1281
def GenericPollJob(job_id, cbs, report_cbs):
1282
  """Generic job-polling function.
1283

1284
  @type job_id: number
1285
  @param job_id: Job ID
1286
  @type cbs: Instance of L{JobPollCbBase}
1287
  @param cbs: Data callbacks
1288
  @type report_cbs: Instance of L{JobPollReportCbBase}
1289
  @param report_cbs: Reporting callbacks
1290

1291
  """
1292
  prev_job_info = None
1293
  prev_logmsg_serial = None
1294

    
1295
  status = None
1296

    
1297
  while True:
1298
    result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1299
                                      prev_logmsg_serial)
1300
    if not result:
1301
      # job not found, go away!
1302
      raise errors.JobLost("Job with id %s lost" % job_id)
1303

    
1304
    if result == constants.JOB_NOTCHANGED:
1305
      report_cbs.ReportNotChanged(job_id, status)
1306

    
1307
      # Wait again
1308
      continue
1309

    
1310
    # Split result, a tuple of (field values, log entries)
1311
    (job_info, log_entries) = result
1312
    (status, ) = job_info
1313

    
1314
    if log_entries:
1315
      for log_entry in log_entries:
1316
        (serial, timestamp, log_type, message) = log_entry
1317
        report_cbs.ReportLogMessage(job_id, serial, timestamp,
1318
                                    log_type, message)
1319
        prev_logmsg_serial = max(prev_logmsg_serial, serial)
1320

    
1321
    # TODO: Handle canceled and archived jobs
1322
    elif status in (constants.JOB_STATUS_SUCCESS,
1323
                    constants.JOB_STATUS_ERROR,
1324
                    constants.JOB_STATUS_CANCELING,
1325
                    constants.JOB_STATUS_CANCELED):
1326
      break
1327

    
1328
    prev_job_info = job_info
1329

    
1330
  jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1331
  if not jobs:
1332
    raise errors.JobLost("Job with id %s lost" % job_id)
1333

    
1334
  status, opstatus, result = jobs[0]
1335

    
1336
  if status == constants.JOB_STATUS_SUCCESS:
1337
    return result
1338

    
1339
  if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
1340
    raise errors.OpExecError("Job was canceled")
1341

    
1342
  has_ok = False
1343
  for idx, (status, msg) in enumerate(zip(opstatus, result)):
1344
    if status == constants.OP_STATUS_SUCCESS:
1345
      has_ok = True
1346
    elif status == constants.OP_STATUS_ERROR:
1347
      errors.MaybeRaise(msg)
1348

    
1349
      if has_ok:
1350
        raise errors.OpExecError("partial failure (opcode %d): %s" %
1351
                                 (idx, msg))
1352

    
1353
      raise errors.OpExecError(str(msg))
1354

    
1355
  # default failure mode
1356
  raise errors.OpExecError(result)
1357

    
1358

    
1359
class JobPollCbBase:
1360
  """Base class for L{GenericPollJob} callbacks.
1361

1362
  """
1363
  def __init__(self):
1364
    """Initializes this class.
1365

1366
    """
1367

    
1368
  def WaitForJobChangeOnce(self, job_id, fields,
1369
                           prev_job_info, prev_log_serial):
1370
    """Waits for changes on a job.
1371

1372
    """
1373
    raise NotImplementedError()
1374

    
1375
  def QueryJobs(self, job_ids, fields):
1376
    """Returns the selected fields for the selected job IDs.
1377

1378
    @type job_ids: list of numbers
1379
    @param job_ids: Job IDs
1380
    @type fields: list of strings
1381
    @param fields: Fields
1382

1383
    """
1384
    raise NotImplementedError()
1385

    
1386

    
1387
class JobPollReportCbBase:
1388
  """Base class for L{GenericPollJob} reporting callbacks.
1389

1390
  """
1391
  def __init__(self):
1392
    """Initializes this class.
1393

1394
    """
1395

    
1396
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1397
    """Handles a log message.
1398

1399
    """
1400
    raise NotImplementedError()
1401

    
1402
  def ReportNotChanged(self, job_id, status):
1403
    """Called for if a job hasn't changed in a while.
1404

1405
    @type job_id: number
1406
    @param job_id: Job ID
1407
    @type status: string or None
1408
    @param status: Job status if available
1409

1410
    """
1411
    raise NotImplementedError()
1412

    
1413

    
1414
class _LuxiJobPollCb(JobPollCbBase):
1415
  def __init__(self, cl):
1416
    """Initializes this class.
1417

1418
    """
1419
    JobPollCbBase.__init__(self)
1420
    self.cl = cl
1421

    
1422
  def WaitForJobChangeOnce(self, job_id, fields,
1423
                           prev_job_info, prev_log_serial):
1424
    """Waits for changes on a job.
1425

1426
    """
1427
    return self.cl.WaitForJobChangeOnce(job_id, fields,
1428
                                        prev_job_info, prev_log_serial)
1429

    
1430
  def QueryJobs(self, job_ids, fields):
1431
    """Returns the selected fields for the selected job IDs.
1432

1433
    """
1434
    return self.cl.QueryJobs(job_ids, fields)
1435

    
1436

    
1437
class FeedbackFnJobPollReportCb(JobPollReportCbBase):
1438
  def __init__(self, feedback_fn):
1439
    """Initializes this class.
1440

1441
    """
1442
    JobPollReportCbBase.__init__(self)
1443

    
1444
    self.feedback_fn = feedback_fn
1445

    
1446
    assert callable(feedback_fn)
1447

    
1448
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1449
    """Handles a log message.
1450

1451
    """
1452
    self.feedback_fn((timestamp, log_type, log_msg))
1453

    
1454
  def ReportNotChanged(self, job_id, status):
1455
    """Called if a job hasn't changed in a while.
1456

1457
    """
1458
    # Ignore
1459

    
1460

    
1461
class StdioJobPollReportCb(JobPollReportCbBase):
1462
  def __init__(self):
1463
    """Initializes this class.
1464

1465
    """
1466
    JobPollReportCbBase.__init__(self)
1467

    
1468
    self.notified_queued = False
1469
    self.notified_waitlock = False
1470

    
1471
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1472
    """Handles a log message.
1473

1474
    """
1475
    ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
1476
             utils.SafeEncode(log_msg))
1477

    
1478
  def ReportNotChanged(self, job_id, status):
1479
    """Called if a job hasn't changed in a while.
1480

1481
    """
1482
    if status is None:
1483
      return
1484

    
1485
    if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
1486
      ToStderr("Job %s is waiting in queue", job_id)
1487
      self.notified_queued = True
1488

    
1489
    elif status == constants.JOB_STATUS_WAITLOCK and not self.notified_waitlock:
1490
      ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1491
      self.notified_waitlock = True
1492

    
1493

    
1494
def PollJob(job_id, cl=None, feedback_fn=None):
1495
  """Function to poll for the result of a job.
1496

1497
  @type job_id: job identified
1498
  @param job_id: the job to poll for results
1499
  @type cl: luxi.Client
1500
  @param cl: the luxi client to use for communicating with the master;
1501
             if None, a new client will be created
1502

1503
  """
1504
  if cl is None:
1505
    cl = GetClient()
1506

    
1507
  if feedback_fn:
1508
    reporter = FeedbackFnJobPollReportCb(feedback_fn)
1509
  else:
1510
    reporter = StdioJobPollReportCb()
1511

    
1512
  return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
1513

    
1514

    
1515
def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):
1516
  """Legacy function to submit an opcode.
1517

1518
  This is just a simple wrapper over the construction of the processor
1519
  instance. It should be extended to better handle feedback and
1520
  interaction functions.
1521

1522
  """
1523
  if cl is None:
1524
    cl = GetClient()
1525

    
1526
  SetGenericOpcodeOpts([op], opts)
1527

    
1528
  job_id = SendJob([op], cl)
1529

    
1530
  op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
1531

    
1532
  return op_results[0]
1533

    
1534

    
1535
def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1536
  """Wrapper around SubmitOpCode or SendJob.
1537

1538
  This function will decide, based on the 'opts' parameter, whether to
1539
  submit and wait for the result of the opcode (and return it), or
1540
  whether to just send the job and print its identifier. It is used in
1541
  order to simplify the implementation of the '--submit' option.
1542

1543
  It will also process the opcodes if we're sending the via SendJob
1544
  (otherwise SubmitOpCode does it).
1545

1546
  """
1547
  if opts and opts.submit_only:
1548
    job = [op]
1549
    SetGenericOpcodeOpts(job, opts)
1550
    job_id = SendJob(job, cl=cl)
1551
    raise JobSubmittedException(job_id)
1552
  else:
1553
    return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1554

    
1555

    
1556
def SetGenericOpcodeOpts(opcode_list, options):
1557
  """Processor for generic options.
1558

1559
  This function updates the given opcodes based on generic command
1560
  line options (like debug, dry-run, etc.).
1561

1562
  @param opcode_list: list of opcodes
1563
  @param options: command line options or None
1564
  @return: None (in-place modification)
1565

1566
  """
1567
  if not options:
1568
    return
1569
  for op in opcode_list:
1570
    op.dry_run = options.dry_run
1571
    op.debug_level = options.debug
1572

    
1573

    
1574
def GetClient():
1575
  # TODO: Cache object?
1576
  try:
1577
    client = luxi.Client()
1578
  except luxi.NoMasterError:
1579
    ss = ssconf.SimpleStore()
1580

    
1581
    # Try to read ssconf file
1582
    try:
1583
      ss.GetMasterNode()
1584
    except errors.ConfigurationError:
1585
      raise errors.OpPrereqError("Cluster not initialized or this machine is"
1586
                                 " not part of a cluster")
1587

    
1588
    master, myself = ssconf.GetMasterAndMyself(ss=ss)
1589
    if master != myself:
1590
      raise errors.OpPrereqError("This is not the master node, please connect"
1591
                                 " to node '%s' and rerun the command" %
1592
                                 master)
1593
    raise
1594
  return client
1595

    
1596

    
1597
def FormatError(err):
1598
  """Return a formatted error message for a given error.
1599

1600
  This function takes an exception instance and returns a tuple
1601
  consisting of two values: first, the recommended exit code, and
1602
  second, a string describing the error message (not
1603
  newline-terminated).
1604

1605
  """
1606
  retcode = 1
1607
  obuf = StringIO()
1608
  msg = str(err)
1609
  if isinstance(err, errors.ConfigurationError):
1610
    txt = "Corrupt configuration file: %s" % msg
1611
    logging.error(txt)
1612
    obuf.write(txt + "\n")
1613
    obuf.write("Aborting.")
1614
    retcode = 2
1615
  elif isinstance(err, errors.HooksAbort):
1616
    obuf.write("Failure: hooks execution failed:\n")
1617
    for node, script, out in err.args[0]:
1618
      if out:
1619
        obuf.write("  node: %s, script: %s, output: %s\n" %
1620
                   (node, script, out))
1621
      else:
1622
        obuf.write("  node: %s, script: %s (no output)\n" %
1623
                   (node, script))
1624
  elif isinstance(err, errors.HooksFailure):
1625
    obuf.write("Failure: hooks general failure: %s" % msg)
1626
  elif isinstance(err, errors.ResolverError):
1627
    this_host = utils.HostInfo.SysName()
1628
    if err.args[0] == this_host:
1629
      msg = "Failure: can't resolve my own hostname ('%s')"
1630
    else:
1631
      msg = "Failure: can't resolve hostname '%s'"
1632
    obuf.write(msg % err.args[0])
1633
  elif isinstance(err, errors.OpPrereqError):
1634
    if len(err.args) == 2:
1635
      obuf.write("Failure: prerequisites not met for this"
1636
               " operation:\nerror type: %s, error details:\n%s" %
1637
                 (err.args[1], err.args[0]))
1638
    else:
1639
      obuf.write("Failure: prerequisites not met for this"
1640
                 " operation:\n%s" % msg)
1641
  elif isinstance(err, errors.OpExecError):
1642
    obuf.write("Failure: command execution error:\n%s" % msg)
1643
  elif isinstance(err, errors.TagError):
1644
    obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1645
  elif isinstance(err, errors.JobQueueDrainError):
1646
    obuf.write("Failure: the job queue is marked for drain and doesn't"
1647
               " accept new requests\n")
1648
  elif isinstance(err, errors.JobQueueFull):
1649
    obuf.write("Failure: the job queue is full and doesn't accept new"
1650
               " job submissions until old jobs are archived\n")
1651
  elif isinstance(err, errors.TypeEnforcementError):
1652
    obuf.write("Parameter Error: %s" % msg)
1653
  elif isinstance(err, errors.ParameterError):
1654
    obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1655
  elif isinstance(err, luxi.NoMasterError):
1656
    obuf.write("Cannot communicate with the master daemon.\nIs it running"
1657
               " and listening for connections?")
1658
  elif isinstance(err, luxi.TimeoutError):
1659
    obuf.write("Timeout while talking to the master daemon. Error:\n"
1660
               "%s" % msg)
1661
  elif isinstance(err, luxi.ProtocolError):
1662
    obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1663
               "%s" % msg)
1664
  elif isinstance(err, errors.GenericError):
1665
    obuf.write("Unhandled Ganeti error: %s" % msg)
1666
  elif isinstance(err, JobSubmittedException):
1667
    obuf.write("JobID: %s\n" % err.args[0])
1668
    retcode = 0
1669
  else:
1670
    obuf.write("Unhandled exception: %s" % msg)
1671
  return retcode, obuf.getvalue().rstrip('\n')
1672

    
1673

    
1674
def GenericMain(commands, override=None, aliases=None):
1675
  """Generic main function for all the gnt-* commands.
1676

1677
  Arguments:
1678
    - commands: a dictionary with a special structure, see the design doc
1679
                for command line handling.
1680
    - override: if not None, we expect a dictionary with keys that will
1681
                override command line options; this can be used to pass
1682
                options from the scripts to generic functions
1683
    - aliases: dictionary with command aliases {'alias': 'target, ...}
1684

1685
  """
1686
  # save the program name and the entire command line for later logging
1687
  if sys.argv:
1688
    binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1689
    if len(sys.argv) >= 2:
1690
      binary += " " + sys.argv[1]
1691
      old_cmdline = " ".join(sys.argv[2:])
1692
    else:
1693
      old_cmdline = ""
1694
  else:
1695
    binary = "<unknown program>"
1696
    old_cmdline = ""
1697

    
1698
  if aliases is None:
1699
    aliases = {}
1700

    
1701
  try:
1702
    func, options, args = _ParseArgs(sys.argv, commands, aliases)
1703
  except errors.ParameterError, err:
1704
    result, err_msg = FormatError(err)
1705
    ToStderr(err_msg)
1706
    return 1
1707

    
1708
  if func is None: # parse error
1709
    return 1
1710

    
1711
  if override is not None:
1712
    for key, val in override.iteritems():
1713
      setattr(options, key, val)
1714

    
1715
  utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1716
                     stderr_logging=True, program=binary)
1717

    
1718
  if old_cmdline:
1719
    logging.info("run with arguments '%s'", old_cmdline)
1720
  else:
1721
    logging.info("run with no arguments")
1722

    
1723
  try:
1724
    result = func(options, args)
1725
  except (errors.GenericError, luxi.ProtocolError,
1726
          JobSubmittedException), err:
1727
    result, err_msg = FormatError(err)
1728
    logging.exception("Error during command processing")
1729
    ToStderr(err_msg)
1730

    
1731
  return result
1732

    
1733

    
1734
def GenericInstanceCreate(mode, opts, args):
1735
  """Add an instance to the cluster via either creation or import.
1736

1737
  @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1738
  @param opts: the command line options selected by the user
1739
  @type args: list
1740
  @param args: should contain only one element, the new instance name
1741
  @rtype: int
1742
  @return: the desired exit code
1743

1744
  """
1745
  instance = args[0]
1746

    
1747
  (pnode, snode) = SplitNodeOption(opts.node)
1748

    
1749
  hypervisor = None
1750
  hvparams = {}
1751
  if opts.hypervisor:
1752
    hypervisor, hvparams = opts.hypervisor
1753

    
1754
  if opts.nics:
1755
    try:
1756
      nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
1757
    except ValueError, err:
1758
      raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
1759
    nics = [{}] * nic_max
1760
    for nidx, ndict in opts.nics:
1761
      nidx = int(nidx)
1762
      if not isinstance(ndict, dict):
1763
        msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict)
1764
        raise errors.OpPrereqError(msg)
1765
      nics[nidx] = ndict
1766
  elif opts.no_nics:
1767
    # no nics
1768
    nics = []
1769
  elif mode == constants.INSTANCE_CREATE:
1770
    # default of one nic, all auto
1771
    nics = [{}]
1772
  else:
1773
    # mode == import
1774
    nics = []
1775

    
1776
  if opts.disk_template == constants.DT_DISKLESS:
1777
    if opts.disks or opts.sd_size is not None:
1778
      raise errors.OpPrereqError("Diskless instance but disk"
1779
                                 " information passed")
1780
    disks = []
1781
  else:
1782
    if (not opts.disks and not opts.sd_size
1783
        and mode == constants.INSTANCE_CREATE):
1784
      raise errors.OpPrereqError("No disk information specified")
1785
    if opts.disks and opts.sd_size is not None:
1786
      raise errors.OpPrereqError("Please use either the '--disk' or"
1787
                                 " '-s' option")
1788
    if opts.sd_size is not None:
1789
      opts.disks = [(0, {"size": opts.sd_size})]
1790

    
1791
    if opts.disks:
1792
      try:
1793
        disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1794
      except ValueError, err:
1795
        raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
1796
      disks = [{}] * disk_max
1797
    else:
1798
      disks = []
1799
    for didx, ddict in opts.disks:
1800
      didx = int(didx)
1801
      if not isinstance(ddict, dict):
1802
        msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1803
        raise errors.OpPrereqError(msg)
1804
      elif "size" in ddict:
1805
        if "adopt" in ddict:
1806
          raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1807
                                     " (disk %d)" % didx)
1808
        try:
1809
          ddict["size"] = utils.ParseUnit(ddict["size"])
1810
        except ValueError, err:
1811
          raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1812
                                     (didx, err))
1813
      elif "adopt" in ddict:
1814
        if mode == constants.INSTANCE_IMPORT:
1815
          raise errors.OpPrereqError("Disk adoption not allowed for instance"
1816
                                     " import")
1817
        ddict["size"] = 0
1818
      else:
1819
        raise errors.OpPrereqError("Missing size or adoption source for"
1820
                                   " disk %d" % didx)
1821
      disks[didx] = ddict
1822

    
1823
  utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
1824
  utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1825

    
1826
  if mode == constants.INSTANCE_CREATE:
1827
    start = opts.start
1828
    os_type = opts.os
1829
    force_variant = opts.force_variant
1830
    src_node = None
1831
    src_path = None
1832
    no_install = opts.no_install
1833
    identify_defaults = False
1834
  elif mode == constants.INSTANCE_IMPORT:
1835
    start = False
1836
    os_type = None
1837
    force_variant = False
1838
    src_node = opts.src_node
1839
    src_path = opts.src_dir
1840
    no_install = None
1841
    identify_defaults = opts.identify_defaults
1842
  else:
1843
    raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1844

    
1845
  op = opcodes.OpCreateInstance(instance_name=instance,
1846
                                disks=disks,
1847
                                disk_template=opts.disk_template,
1848
                                nics=nics,
1849
                                pnode=pnode, snode=snode,
1850
                                ip_check=opts.ip_check,
1851
                                name_check=opts.name_check,
1852
                                wait_for_sync=opts.wait_for_sync,
1853
                                file_storage_dir=opts.file_storage_dir,
1854
                                file_driver=opts.file_driver,
1855
                                iallocator=opts.iallocator,
1856
                                hypervisor=hypervisor,
1857
                                hvparams=hvparams,
1858
                                beparams=opts.beparams,
1859
                                osparams=opts.osparams,
1860
                                mode=mode,
1861
                                start=start,
1862
                                os_type=os_type,
1863
                                force_variant=force_variant,
1864
                                src_node=src_node,
1865
                                src_path=src_path,
1866
                                no_install=no_install,
1867
                                identify_defaults=identify_defaults)
1868

    
1869
  SubmitOrSend(op, opts)
1870
  return 0
1871

    
1872

    
1873
class _RunWhileClusterStoppedHelper:
1874
  """Helper class for L{RunWhileClusterStopped} to simplify state management
1875

1876
  """
1877
  def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
1878
    """Initializes this class.
1879

1880
    @type feedback_fn: callable
1881
    @param feedback_fn: Feedback function
1882
    @type cluster_name: string
1883
    @param cluster_name: Cluster name
1884
    @type master_node: string
1885
    @param master_node Master node name
1886
    @type online_nodes: list
1887
    @param online_nodes: List of names of online nodes
1888

1889
    """
1890
    self.feedback_fn = feedback_fn
1891
    self.cluster_name = cluster_name
1892
    self.master_node = master_node
1893
    self.online_nodes = online_nodes
1894

    
1895
    self.ssh = ssh.SshRunner(self.cluster_name)
1896

    
1897
    self.nonmaster_nodes = [name for name in online_nodes
1898
                            if name != master_node]
1899

    
1900
    assert self.master_node not in self.nonmaster_nodes
1901

    
1902
  def _RunCmd(self, node_name, cmd):
1903
    """Runs a command on the local or a remote machine.
1904

1905
    @type node_name: string
1906
    @param node_name: Machine name
1907
    @type cmd: list
1908
    @param cmd: Command
1909

1910
    """
1911
    if node_name is None or node_name == self.master_node:
1912
      # No need to use SSH
1913
      result = utils.RunCmd(cmd)
1914
    else:
1915
      result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
1916

    
1917
    if result.failed:
1918
      errmsg = ["Failed to run command %s" % result.cmd]
1919
      if node_name:
1920
        errmsg.append("on node %s" % node_name)
1921
      errmsg.append(": exitcode %s and error %s" %
1922
                    (result.exit_code, result.output))
1923
      raise errors.OpExecError(" ".join(errmsg))
1924

    
1925
  def Call(self, fn, *args):
1926
    """Call function while all daemons are stopped.
1927

1928
    @type fn: callable
1929
    @param fn: Function to be called
1930

1931
    """
1932
    # Pause watcher by acquiring an exclusive lock on watcher state file
1933
    self.feedback_fn("Blocking watcher")
1934
    watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
1935
    try:
1936
      # TODO: Currently, this just blocks. There's no timeout.
1937
      # TODO: Should it be a shared lock?
1938
      watcher_block.Exclusive(blocking=True)
1939

    
1940
      # Stop master daemons, so that no new jobs can come in and all running
1941
      # ones are finished
1942
      self.feedback_fn("Stopping master daemons")
1943
      self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
1944
      try:
1945
        # Stop daemons on all nodes
1946
        for node_name in self.online_nodes:
1947
          self.feedback_fn("Stopping daemons on %s" % node_name)
1948
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
1949

    
1950
        # All daemons are shut down now
1951
        try:
1952
          return fn(self, *args)
1953
        except Exception, err:
1954
          _, errmsg = FormatError(err)
1955
          logging.exception("Caught exception")
1956
          self.feedback_fn(errmsg)
1957
          raise
1958
      finally:
1959
        # Start cluster again, master node last
1960
        for node_name in self.nonmaster_nodes + [self.master_node]:
1961
          self.feedback_fn("Starting daemons on %s" % node_name)
1962
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
1963
    finally:
1964
      # Resume watcher
1965
      watcher_block.Close()
1966

    
1967

    
1968
def RunWhileClusterStopped(feedback_fn, fn, *args):
1969
  """Calls a function while all cluster daemons are stopped.
1970

1971
  @type feedback_fn: callable
1972
  @param feedback_fn: Feedback function
1973
  @type fn: callable
1974
  @param fn: Function to be called when daemons are stopped
1975

1976
  """
1977
  feedback_fn("Gathering cluster information")
1978

    
1979
  # This ensures we're running on the master daemon
1980
  cl = GetClient()
1981

    
1982
  (cluster_name, master_node) = \
1983
    cl.QueryConfigValues(["cluster_name", "master_node"])
1984

    
1985
  online_nodes = GetOnlineNodes([], cl=cl)
1986

    
1987
  # Don't keep a reference to the client. The master daemon will go away.
1988
  del cl
1989

    
1990
  assert master_node in online_nodes
1991

    
1992
  return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
1993
                                       online_nodes).Call(fn, *args)
1994

    
1995

    
1996
def GenerateTable(headers, fields, separator, data,
1997
                  numfields=None, unitfields=None,
1998
                  units=None):
1999
  """Prints a table with headers and different fields.
2000

2001
  @type headers: dict
2002
  @param headers: dictionary mapping field names to headers for
2003
      the table
2004
  @type fields: list
2005
  @param fields: the field names corresponding to each row in
2006
      the data field
2007
  @param separator: the separator to be used; if this is None,
2008
      the default 'smart' algorithm is used which computes optimal
2009
      field width, otherwise just the separator is used between
2010
      each field
2011
  @type data: list
2012
  @param data: a list of lists, each sublist being one row to be output
2013
  @type numfields: list
2014
  @param numfields: a list with the fields that hold numeric
2015
      values and thus should be right-aligned
2016
  @type unitfields: list
2017
  @param unitfields: a list with the fields that hold numeric
2018
      values that should be formatted with the units field
2019
  @type units: string or None
2020
  @param units: the units we should use for formatting, or None for
2021
      automatic choice (human-readable for non-separator usage, otherwise
2022
      megabytes); this is a one-letter string
2023

2024
  """
2025
  if units is None:
2026
    if separator:
2027
      units = "m"
2028
    else:
2029
      units = "h"
2030

    
2031
  if numfields is None:
2032
    numfields = []
2033
  if unitfields is None:
2034
    unitfields = []
2035

    
2036
  numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
2037
  unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
2038

    
2039
  format_fields = []
2040
  for field in fields:
2041
    if headers and field not in headers:
2042
      # TODO: handle better unknown fields (either revert to old
2043
      # style of raising exception, or deal more intelligently with
2044
      # variable fields)
2045
      headers[field] = field
2046
    if separator is not None:
2047
      format_fields.append("%s")
2048
    elif numfields.Matches(field):
2049
      format_fields.append("%*s")
2050
    else:
2051
      format_fields.append("%-*s")
2052

    
2053
  if separator is None:
2054
    mlens = [0 for name in fields]
2055
    format_str = ' '.join(format_fields)
2056
  else:
2057
    format_str = separator.replace("%", "%%").join(format_fields)
2058

    
2059
  for row in data:
2060
    if row is None:
2061
      continue
2062
    for idx, val in enumerate(row):
2063
      if unitfields.Matches(fields[idx]):
2064
        try:
2065
          val = int(val)
2066
        except (TypeError, ValueError):
2067
          pass
2068
        else:
2069
          val = row[idx] = utils.FormatUnit(val, units)
2070
      val = row[idx] = str(val)
2071
      if separator is None:
2072
        mlens[idx] = max(mlens[idx], len(val))
2073

    
2074
  result = []
2075
  if headers:
2076
    args = []
2077
    for idx, name in enumerate(fields):
2078
      hdr = headers[name]
2079
      if separator is None:
2080
        mlens[idx] = max(mlens[idx], len(hdr))
2081
        args.append(mlens[idx])
2082
      args.append(hdr)
2083
    result.append(format_str % tuple(args))
2084

    
2085
  if separator is None:
2086
    assert len(mlens) == len(fields)
2087

    
2088
    if fields and not numfields.Matches(fields[-1]):
2089
      mlens[-1] = 0
2090

    
2091
  for line in data:
2092
    args = []
2093
    if line is None:
2094
      line = ['-' for _ in fields]
2095
    for idx in range(len(fields)):
2096
      if separator is None:
2097
        args.append(mlens[idx])
2098
      args.append(line[idx])
2099
    result.append(format_str % tuple(args))
2100

    
2101
  return result
2102

    
2103

    
2104
def FormatTimestamp(ts):
2105
  """Formats a given timestamp.
2106

2107
  @type ts: timestamp
2108
  @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
2109

2110
  @rtype: string
2111
  @return: a string with the formatted timestamp
2112

2113
  """
2114
  if not isinstance (ts, (tuple, list)) or len(ts) != 2:
2115
    return '?'
2116
  sec, usec = ts
2117
  return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
2118

    
2119

    
2120
def ParseTimespec(value):
2121
  """Parse a time specification.
2122

2123
  The following suffixed will be recognized:
2124

2125
    - s: seconds
2126
    - m: minutes
2127
    - h: hours
2128
    - d: day
2129
    - w: weeks
2130

2131
  Without any suffix, the value will be taken to be in seconds.
2132

2133
  """
2134
  value = str(value)
2135
  if not value:
2136
    raise errors.OpPrereqError("Empty time specification passed")
2137
  suffix_map = {
2138
    's': 1,
2139
    'm': 60,
2140
    'h': 3600,
2141
    'd': 86400,
2142
    'w': 604800,
2143
    }
2144
  if value[-1] not in suffix_map:
2145
    try:
2146
      value = int(value)
2147
    except (TypeError, ValueError):
2148
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2149
  else:
2150
    multiplier = suffix_map[value[-1]]
2151
    value = value[:-1]
2152
    if not value: # no data left after stripping the suffix
2153
      raise errors.OpPrereqError("Invalid time specification (only"
2154
                                 " suffix passed)")
2155
    try:
2156
      value = int(value) * multiplier
2157
    except (TypeError, ValueError):
2158
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2159
  return value
2160

    
2161

    
2162
def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
2163
                   filter_master=False):
2164
  """Returns the names of online nodes.
2165

2166
  This function will also log a warning on stderr with the names of
2167
  the online nodes.
2168

2169
  @param nodes: if not empty, use only this subset of nodes (minus the
2170
      offline ones)
2171
  @param cl: if not None, luxi client to use
2172
  @type nowarn: boolean
2173
  @param nowarn: by default, this function will output a note with the
2174
      offline nodes that are skipped; if this parameter is True the
2175
      note is not displayed
2176
  @type secondary_ips: boolean
2177
  @param secondary_ips: if True, return the secondary IPs instead of the
2178
      names, useful for doing network traffic over the replication interface
2179
      (if any)
2180
  @type filter_master: boolean
2181
  @param filter_master: if True, do not return the master node in the list
2182
      (useful in coordination with secondary_ips where we cannot check our
2183
      node name against the list)
2184

2185
  """
2186
  if cl is None:
2187
    cl = GetClient()
2188

    
2189
  if secondary_ips:
2190
    name_idx = 2
2191
  else:
2192
    name_idx = 0
2193

    
2194
  if filter_master:
2195
    master_node = cl.QueryConfigValues(["master_node"])[0]
2196
    filter_fn = lambda x: x != master_node
2197
  else:
2198
    filter_fn = lambda _: True
2199

    
2200
  result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"],
2201
                         use_locking=False)
2202
  offline = [row[0] for row in result if row[1]]
2203
  if offline and not nowarn:
2204
    ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
2205
  return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])]
2206

    
2207

    
2208
def _ToStream(stream, txt, *args):
2209
  """Write a message to a stream, bypassing the logging system
2210

2211
  @type stream: file object
2212
  @param stream: the file to which we should write
2213
  @type txt: str
2214
  @param txt: the message
2215

2216
  """
2217
  if args:
2218
    args = tuple(args)
2219
    stream.write(txt % args)
2220
  else:
2221
    stream.write(txt)
2222
  stream.write('\n')
2223
  stream.flush()
2224

    
2225

    
2226
def ToStdout(txt, *args):
2227
  """Write a message to stdout only, bypassing the logging system
2228

2229
  This is just a wrapper over _ToStream.
2230

2231
  @type txt: str
2232
  @param txt: the message
2233

2234
  """
2235
  _ToStream(sys.stdout, txt, *args)
2236

    
2237

    
2238
def ToStderr(txt, *args):
2239
  """Write a message to stderr only, bypassing the logging system
2240

2241
  This is just a wrapper over _ToStream.
2242

2243
  @type txt: str
2244
  @param txt: the message
2245

2246
  """
2247
  _ToStream(sys.stderr, txt, *args)
2248

    
2249

    
2250
class JobExecutor(object):
2251
  """Class which manages the submission and execution of multiple jobs.
2252

2253
  Note that instances of this class should not be reused between
2254
  GetResults() calls.
2255

2256
  """
2257
  def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
2258
    self.queue = []
2259
    if cl is None:
2260
      cl = GetClient()
2261
    self.cl = cl
2262
    self.verbose = verbose
2263
    self.jobs = []
2264
    self.opts = opts
2265
    self.feedback_fn = feedback_fn
2266

    
2267
  def QueueJob(self, name, *ops):
2268
    """Record a job for later submit.
2269

2270
    @type name: string
2271
    @param name: a description of the job, will be used in WaitJobSet
2272
    """
2273
    SetGenericOpcodeOpts(ops, self.opts)
2274
    self.queue.append((name, ops))
2275

    
2276
  def SubmitPending(self, each=False):
2277
    """Submit all pending jobs.
2278

2279
    """
2280
    if each:
2281
      results = []
2282
      for row in self.queue:
2283
        # SubmitJob will remove the success status, but raise an exception if
2284
        # the submission fails, so we'll notice that anyway.
2285
        results.append([True, self.cl.SubmitJob(row[1])])
2286
    else:
2287
      results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
2288
    for (idx, ((status, data), (name, _))) in enumerate(zip(results,
2289
                                                            self.queue)):
2290
      self.jobs.append((idx, status, data, name))
2291

    
2292
  def _ChooseJob(self):
2293
    """Choose a non-waiting/queued job to poll next.
2294

2295
    """
2296
    assert self.jobs, "_ChooseJob called with empty job list"
2297

    
2298
    result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
2299
    assert result
2300

    
2301
    for job_data, status in zip(self.jobs, result):
2302
      if status[0] in (constants.JOB_STATUS_QUEUED,
2303
                    constants.JOB_STATUS_WAITLOCK,
2304
                    constants.JOB_STATUS_CANCELING):
2305
        # job is still waiting
2306
        continue
2307
      # good candidate found
2308
      self.jobs.remove(job_data)
2309
      return job_data
2310

    
2311
    # no job found
2312
    return self.jobs.pop(0)
2313

    
2314
  def GetResults(self):
2315
    """Wait for and return the results of all jobs.
2316

2317
    @rtype: list
2318
    @return: list of tuples (success, job results), in the same order
2319
        as the submitted jobs; if a job has failed, instead of the result
2320
        there will be the error message
2321

2322
    """
2323
    if not self.jobs:
2324
      self.SubmitPending()
2325
    results = []
2326
    if self.verbose:
2327
      ok_jobs = [row[2] for row in self.jobs if row[1]]
2328
      if ok_jobs:
2329
        ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2330

    
2331
    # first, remove any non-submitted jobs
2332
    self.jobs, failures = compat.partition(self.jobs, lambda x: x[1])
2333
    for idx, _, jid, name in failures:
2334
      ToStderr("Failed to submit job for %s: %s", name, jid)
2335
      results.append((idx, False, jid))
2336

    
2337
    while self.jobs:
2338
      (idx, _, jid, name) = self._ChooseJob()
2339
      ToStdout("Waiting for job %s for %s...", jid, name)
2340
      try:
2341
        job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2342
        success = True
2343
      except (errors.GenericError, luxi.ProtocolError), err:
2344
        _, job_result = FormatError(err)
2345
        success = False
2346
        # the error message will always be shown, verbose or not
2347
        ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
2348

    
2349
      results.append((idx, success, job_result))
2350

    
2351
    # sort based on the index, then drop it
2352
    results.sort()
2353
    results = [i[1:] for i in results]
2354

    
2355
    return results
2356

    
2357
  def WaitOrShow(self, wait):
2358
    """Wait for job results or only print the job IDs.
2359

2360
    @type wait: boolean
2361
    @param wait: whether to wait or not
2362

2363
    """
2364
    if wait:
2365
      return self.GetResults()
2366
    else:
2367
      if not self.jobs:
2368
        self.SubmitPending()
2369
      for _, status, result, name in self.jobs:
2370
        if status:
2371
          ToStdout("%s: %s", result, name)
2372
        else:
2373
          ToStderr("Failure for %s: %s", name, result)
2374
      return [row[1:3] for row in self.jobs]