Statistics
| Branch: | Tag: | Revision:

root / lib / cli.py @ 66ecc479

History | View | Annotate | Download (75.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module dealing with command line parsing"""
23

    
24

    
25
import sys
26
import textwrap
27
import os.path
28
import time
29
import logging
30
from cStringIO import StringIO
31

    
32
from ganeti import utils
33
from ganeti import errors
34
from ganeti import constants
35
from ganeti import opcodes
36
from ganeti import luxi
37
from ganeti import ssconf
38
from ganeti import rpc
39
from ganeti import ssh
40
from ganeti import compat
41

    
42
from optparse import (OptionParser, TitledHelpFormatter,
43
                      Option, OptionValueError)
44

    
45

    
46
__all__ = [
47
  # Command line options
48
  "ADD_UIDS_OPT",
49
  "ALLOCATABLE_OPT",
50
  "ALL_OPT",
51
  "AUTO_PROMOTE_OPT",
52
  "AUTO_REPLACE_OPT",
53
  "BACKEND_OPT",
54
  "CLEANUP_OPT",
55
  "CLUSTER_DOMAIN_SECRET_OPT",
56
  "CONFIRM_OPT",
57
  "CP_SIZE_OPT",
58
  "DEBUG_OPT",
59
  "DEBUG_SIMERR_OPT",
60
  "DISKIDX_OPT",
61
  "DISK_OPT",
62
  "DISK_TEMPLATE_OPT",
63
  "DRAINED_OPT",
64
  "EARLY_RELEASE_OPT",
65
  "ENABLED_HV_OPT",
66
  "ERROR_CODES_OPT",
67
  "FIELDS_OPT",
68
  "FILESTORE_DIR_OPT",
69
  "FILESTORE_DRIVER_OPT",
70
  "FORCE_OPT",
71
  "FORCE_VARIANT_OPT",
72
  "GLOBAL_FILEDIR_OPT",
73
  "HVLIST_OPT",
74
  "HVOPTS_OPT",
75
  "HYPERVISOR_OPT",
76
  "IALLOCATOR_OPT",
77
  "IDENTIFY_DEFAULTS_OPT",
78
  "IGNORE_CONSIST_OPT",
79
  "IGNORE_FAILURES_OPT",
80
  "IGNORE_REMOVE_FAILURES_OPT",
81
  "IGNORE_SECONDARIES_OPT",
82
  "IGNORE_SIZE_OPT",
83
  "MAC_PREFIX_OPT",
84
  "MAINTAIN_NODE_HEALTH_OPT",
85
  "MASTER_NETDEV_OPT",
86
  "MC_OPT",
87
  "NET_OPT",
88
  "NEW_CLUSTER_CERT_OPT",
89
  "NEW_CLUSTER_DOMAIN_SECRET_OPT",
90
  "NEW_CONFD_HMAC_KEY_OPT",
91
  "NEW_RAPI_CERT_OPT",
92
  "NEW_SECONDARY_OPT",
93
  "NIC_PARAMS_OPT",
94
  "NODE_LIST_OPT",
95
  "NODE_PLACEMENT_OPT",
96
  "NOHDR_OPT",
97
  "NOIPCHECK_OPT",
98
  "NO_INSTALL_OPT",
99
  "NONAMECHECK_OPT",
100
  "NOLVM_STORAGE_OPT",
101
  "NOMODIFY_ETCHOSTS_OPT",
102
  "NOMODIFY_SSH_SETUP_OPT",
103
  "NONICS_OPT",
104
  "NONLIVE_OPT",
105
  "NONPLUS1_OPT",
106
  "NOSHUTDOWN_OPT",
107
  "NOSTART_OPT",
108
  "NOSSH_KEYCHECK_OPT",
109
  "NOVOTING_OPT",
110
  "NWSYNC_OPT",
111
  "ON_PRIMARY_OPT",
112
  "ON_SECONDARY_OPT",
113
  "OFFLINE_OPT",
114
  "OS_OPT",
115
  "OS_SIZE_OPT",
116
  "RAPI_CERT_OPT",
117
  "READD_OPT",
118
  "REBOOT_TYPE_OPT",
119
  "REMOVE_INSTANCE_OPT",
120
  "REMOVE_UIDS_OPT",
121
  "ROMAN_OPT",
122
  "SECONDARY_IP_OPT",
123
  "SELECT_OS_OPT",
124
  "SEP_OPT",
125
  "SHOWCMD_OPT",
126
  "SHUTDOWN_TIMEOUT_OPT",
127
  "SINGLE_NODE_OPT",
128
  "SRC_DIR_OPT",
129
  "SRC_NODE_OPT",
130
  "SUBMIT_OPT",
131
  "STATIC_OPT",
132
  "SYNC_OPT",
133
  "TAG_SRC_OPT",
134
  "TIMEOUT_OPT",
135
  "UIDPOOL_OPT",
136
  "USEUNITS_OPT",
137
  "USE_REPL_NET_OPT",
138
  "VERBOSE_OPT",
139
  "VG_NAME_OPT",
140
  "YES_DOIT_OPT",
141
  # Generic functions for CLI programs
142
  "GenericMain",
143
  "GenericInstanceCreate",
144
  "GetClient",
145
  "GetOnlineNodes",
146
  "JobExecutor",
147
  "JobSubmittedException",
148
  "ParseTimespec",
149
  "RunWhileClusterStopped",
150
  "SubmitOpCode",
151
  "SubmitOrSend",
152
  "UsesRPC",
153
  # Formatting functions
154
  "ToStderr", "ToStdout",
155
  "FormatError",
156
  "GenerateTable",
157
  "AskUser",
158
  "FormatTimestamp",
159
  # Tags functions
160
  "ListTags",
161
  "AddTags",
162
  "RemoveTags",
163
  # command line options support infrastructure
164
  "ARGS_MANY_INSTANCES",
165
  "ARGS_MANY_NODES",
166
  "ARGS_NONE",
167
  "ARGS_ONE_INSTANCE",
168
  "ARGS_ONE_NODE",
169
  "ARGS_ONE_OS",
170
  "ArgChoice",
171
  "ArgCommand",
172
  "ArgFile",
173
  "ArgHost",
174
  "ArgInstance",
175
  "ArgJobId",
176
  "ArgNode",
177
  "ArgOs",
178
  "ArgSuggest",
179
  "ArgUnknown",
180
  "OPT_COMPL_INST_ADD_NODES",
181
  "OPT_COMPL_MANY_NODES",
182
  "OPT_COMPL_ONE_IALLOCATOR",
183
  "OPT_COMPL_ONE_INSTANCE",
184
  "OPT_COMPL_ONE_NODE",
185
  "OPT_COMPL_ONE_OS",
186
  "cli_option",
187
  "SplitNodeOption",
188
  "CalculateOSNames",
189
  ]
190

    
191
NO_PREFIX = "no_"
192
UN_PREFIX = "-"
193

    
194

    
195
class _Argument:
196
  def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
197
    self.min = min
198
    self.max = max
199

    
200
  def __repr__(self):
201
    return ("<%s min=%s max=%s>" %
202
            (self.__class__.__name__, self.min, self.max))
203

    
204

    
205
class ArgSuggest(_Argument):
206
  """Suggesting argument.
207

208
  Value can be any of the ones passed to the constructor.
209

210
  """
211
  # pylint: disable-msg=W0622
212
  def __init__(self, min=0, max=None, choices=None):
213
    _Argument.__init__(self, min=min, max=max)
214
    self.choices = choices
215

    
216
  def __repr__(self):
217
    return ("<%s min=%s max=%s choices=%r>" %
218
            (self.__class__.__name__, self.min, self.max, self.choices))
219

    
220

    
221
class ArgChoice(ArgSuggest):
222
  """Choice argument.
223

224
  Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
225
  but value must be one of the choices.
226

227
  """
228

    
229

    
230
class ArgUnknown(_Argument):
231
  """Unknown argument to program (e.g. determined at runtime).
232

233
  """
234

    
235

    
236
class ArgInstance(_Argument):
237
  """Instances argument.
238

239
  """
240

    
241

    
242
class ArgNode(_Argument):
243
  """Node argument.
244

245
  """
246

    
247
class ArgJobId(_Argument):
248
  """Job ID argument.
249

250
  """
251

    
252

    
253
class ArgFile(_Argument):
254
  """File path argument.
255

256
  """
257

    
258

    
259
class ArgCommand(_Argument):
260
  """Command argument.
261

262
  """
263

    
264

    
265
class ArgHost(_Argument):
266
  """Host argument.
267

268
  """
269

    
270

    
271
class ArgOs(_Argument):
272
  """OS argument.
273

274
  """
275

    
276

    
277
ARGS_NONE = []
278
ARGS_MANY_INSTANCES = [ArgInstance()]
279
ARGS_MANY_NODES = [ArgNode()]
280
ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
281
ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
282
ARGS_ONE_OS = [ArgOs(min=1, max=1)]
283

    
284

    
285
def _ExtractTagsObject(opts, args):
286
  """Extract the tag type object.
287

288
  Note that this function will modify its args parameter.
289

290
  """
291
  if not hasattr(opts, "tag_type"):
292
    raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
293
  kind = opts.tag_type
294
  if kind == constants.TAG_CLUSTER:
295
    retval = kind, kind
296
  elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
297
    if not args:
298
      raise errors.OpPrereqError("no arguments passed to the command")
299
    name = args.pop(0)
300
    retval = kind, name
301
  else:
302
    raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
303
  return retval
304

    
305

    
306
def _ExtendTags(opts, args):
307
  """Extend the args if a source file has been given.
308

309
  This function will extend the tags with the contents of the file
310
  passed in the 'tags_source' attribute of the opts parameter. A file
311
  named '-' will be replaced by stdin.
312

313
  """
314
  fname = opts.tags_source
315
  if fname is None:
316
    return
317
  if fname == "-":
318
    new_fh = sys.stdin
319
  else:
320
    new_fh = open(fname, "r")
321
  new_data = []
322
  try:
323
    # we don't use the nice 'new_data = [line.strip() for line in fh]'
324
    # because of python bug 1633941
325
    while True:
326
      line = new_fh.readline()
327
      if not line:
328
        break
329
      new_data.append(line.strip())
330
  finally:
331
    new_fh.close()
332
  args.extend(new_data)
333

    
334

    
335
def ListTags(opts, args):
336
  """List the tags on a given object.
337

338
  This is a generic implementation that knows how to deal with all
339
  three cases of tag objects (cluster, node, instance). The opts
340
  argument is expected to contain a tag_type field denoting what
341
  object type we work on.
342

343
  """
344
  kind, name = _ExtractTagsObject(opts, args)
345
  cl = GetClient()
346
  result = cl.QueryTags(kind, name)
347
  result = list(result)
348
  result.sort()
349
  for tag in result:
350
    ToStdout(tag)
351

    
352

    
353
def AddTags(opts, args):
354
  """Add tags on a given object.
355

356
  This is a generic implementation that knows how to deal with all
357
  three cases of tag objects (cluster, node, instance). The opts
358
  argument is expected to contain a tag_type field denoting what
359
  object type we work on.
360

361
  """
362
  kind, name = _ExtractTagsObject(opts, args)
363
  _ExtendTags(opts, args)
364
  if not args:
365
    raise errors.OpPrereqError("No tags to be added")
366
  op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
367
  SubmitOpCode(op)
368

    
369

    
370
def RemoveTags(opts, args):
371
  """Remove tags from a given object.
372

373
  This is a generic implementation that knows how to deal with all
374
  three cases of tag objects (cluster, node, instance). The opts
375
  argument is expected to contain a tag_type field denoting what
376
  object type we work on.
377

378
  """
379
  kind, name = _ExtractTagsObject(opts, args)
380
  _ExtendTags(opts, args)
381
  if not args:
382
    raise errors.OpPrereqError("No tags to be removed")
383
  op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
384
  SubmitOpCode(op)
385

    
386

    
387
def check_unit(option, opt, value): # pylint: disable-msg=W0613
388
  """OptParsers custom converter for units.
389

390
  """
391
  try:
392
    return utils.ParseUnit(value)
393
  except errors.UnitParseError, err:
394
    raise OptionValueError("option %s: %s" % (opt, err))
395

    
396

    
397
def _SplitKeyVal(opt, data):
398
  """Convert a KeyVal string into a dict.
399

400
  This function will convert a key=val[,...] string into a dict. Empty
401
  values will be converted specially: keys which have the prefix 'no_'
402
  will have the value=False and the prefix stripped, the others will
403
  have value=True.
404

405
  @type opt: string
406
  @param opt: a string holding the option name for which we process the
407
      data, used in building error messages
408
  @type data: string
409
  @param data: a string of the format key=val,key=val,...
410
  @rtype: dict
411
  @return: {key=val, key=val}
412
  @raises errors.ParameterError: if there are duplicate keys
413

414
  """
415
  kv_dict = {}
416
  if data:
417
    for elem in utils.UnescapeAndSplit(data, sep=","):
418
      if "=" in elem:
419
        key, val = elem.split("=", 1)
420
      else:
421
        if elem.startswith(NO_PREFIX):
422
          key, val = elem[len(NO_PREFIX):], False
423
        elif elem.startswith(UN_PREFIX):
424
          key, val = elem[len(UN_PREFIX):], None
425
        else:
426
          key, val = elem, True
427
      if key in kv_dict:
428
        raise errors.ParameterError("Duplicate key '%s' in option %s" %
429
                                    (key, opt))
430
      kv_dict[key] = val
431
  return kv_dict
432

    
433

    
434
def check_ident_key_val(option, opt, value):  # pylint: disable-msg=W0613
435
  """Custom parser for ident:key=val,key=val options.
436

437
  This will store the parsed values as a tuple (ident, {key: val}). As such,
438
  multiple uses of this option via action=append is possible.
439

440
  """
441
  if ":" not in value:
442
    ident, rest = value, ''
443
  else:
444
    ident, rest = value.split(":", 1)
445

    
446
  if ident.startswith(NO_PREFIX):
447
    if rest:
448
      msg = "Cannot pass options when removing parameter groups: %s" % value
449
      raise errors.ParameterError(msg)
450
    retval = (ident[len(NO_PREFIX):], False)
451
  elif ident.startswith(UN_PREFIX):
452
    if rest:
453
      msg = "Cannot pass options when removing parameter groups: %s" % value
454
      raise errors.ParameterError(msg)
455
    retval = (ident[len(UN_PREFIX):], None)
456
  else:
457
    kv_dict = _SplitKeyVal(opt, rest)
458
    retval = (ident, kv_dict)
459
  return retval
460

    
461

    
462
def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
463
  """Custom parser class for key=val,key=val options.
464

465
  This will store the parsed values as a dict {key: val}.
466

467
  """
468
  return _SplitKeyVal(opt, value)
469

    
470

    
471
def check_bool(option, opt, value): # pylint: disable-msg=W0613
472
  """Custom parser for yes/no options.
473

474
  This will store the parsed value as either True or False.
475

476
  """
477
  value = value.lower()
478
  if value == constants.VALUE_FALSE or value == "no":
479
    return False
480
  elif value == constants.VALUE_TRUE or value == "yes":
481
    return True
482
  else:
483
    raise errors.ParameterError("Invalid boolean value '%s'" % value)
484

    
485

    
486
# completion_suggestion is normally a list. Using numeric values not evaluating
487
# to False for dynamic completion.
488
(OPT_COMPL_MANY_NODES,
489
 OPT_COMPL_ONE_NODE,
490
 OPT_COMPL_ONE_INSTANCE,
491
 OPT_COMPL_ONE_OS,
492
 OPT_COMPL_ONE_IALLOCATOR,
493
 OPT_COMPL_INST_ADD_NODES) = range(100, 106)
494

    
495
OPT_COMPL_ALL = frozenset([
496
  OPT_COMPL_MANY_NODES,
497
  OPT_COMPL_ONE_NODE,
498
  OPT_COMPL_ONE_INSTANCE,
499
  OPT_COMPL_ONE_OS,
500
  OPT_COMPL_ONE_IALLOCATOR,
501
  OPT_COMPL_INST_ADD_NODES,
502
  ])
503

    
504

    
505
class CliOption(Option):
506
  """Custom option class for optparse.
507

508
  """
509
  ATTRS = Option.ATTRS + [
510
    "completion_suggest",
511
    ]
512
  TYPES = Option.TYPES + (
513
    "identkeyval",
514
    "keyval",
515
    "unit",
516
    "bool",
517
    )
518
  TYPE_CHECKER = Option.TYPE_CHECKER.copy()
519
  TYPE_CHECKER["identkeyval"] = check_ident_key_val
520
  TYPE_CHECKER["keyval"] = check_key_val
521
  TYPE_CHECKER["unit"] = check_unit
522
  TYPE_CHECKER["bool"] = check_bool
523

    
524

    
525
# optparse.py sets make_option, so we do it for our own option class, too
526
cli_option = CliOption
527

    
528

    
529
_YORNO = "yes|no"
530

    
531
DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
532
                       help="Increase debugging level")
533

    
534
NOHDR_OPT = cli_option("--no-headers", default=False,
535
                       action="store_true", dest="no_headers",
536
                       help="Don't display column headers")
537

    
538
SEP_OPT = cli_option("--separator", default=None,
539
                     action="store", dest="separator",
540
                     help=("Separator between output fields"
541
                           " (defaults to one space)"))
542

    
543
USEUNITS_OPT = cli_option("--units", default=None,
544
                          dest="units", choices=('h', 'm', 'g', 't'),
545
                          help="Specify units for output (one of hmgt)")
546

    
547
FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
548
                        type="string", metavar="FIELDS",
549
                        help="Comma separated list of output fields")
550

    
551
FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
552
                       default=False, help="Force the operation")
553

    
554
CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
555
                         default=False, help="Do not require confirmation")
556

    
557
TAG_SRC_OPT = cli_option("--from", dest="tags_source",
558
                         default=None, help="File with tag names")
559

    
560
SUBMIT_OPT = cli_option("--submit", dest="submit_only",
561
                        default=False, action="store_true",
562
                        help=("Submit the job and return the job ID, but"
563
                              " don't wait for the job to finish"))
564

    
565
SYNC_OPT = cli_option("--sync", dest="do_locking",
566
                      default=False, action="store_true",
567
                      help=("Grab locks while doing the queries"
568
                            " in order to ensure more consistent results"))
569

    
570
_DRY_RUN_OPT = cli_option("--dry-run", default=False,
571
                          action="store_true",
572
                          help=("Do not execute the operation, just run the"
573
                                " check steps and verify it it could be"
574
                                " executed"))
575

    
576
VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
577
                         action="store_true",
578
                         help="Increase the verbosity of the operation")
579

    
580
DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
581
                              action="store_true", dest="simulate_errors",
582
                              help="Debugging option that makes the operation"
583
                              " treat most runtime checks as failed")
584

    
585
NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
586
                        default=True, action="store_false",
587
                        help="Don't wait for sync (DANGEROUS!)")
588

    
589
DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
590
                               help="Custom disk setup (diskless, file,"
591
                               " plain or drbd)",
592
                               default=None, metavar="TEMPL",
593
                               choices=list(constants.DISK_TEMPLATES))
594

    
595
NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
596
                        help="Do not create any network cards for"
597
                        " the instance")
598

    
599
FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
600
                               help="Relative path under default cluster-wide"
601
                               " file storage dir to store file-based disks",
602
                               default=None, metavar="<DIR>")
603

    
604
FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
605
                                  help="Driver to use for image files",
606
                                  default="loop", metavar="<DRIVER>",
607
                                  choices=list(constants.FILE_DRIVER))
608

    
609
IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
610
                            help="Select nodes for the instance automatically"
611
                            " using the <NAME> iallocator plugin",
612
                            default=None, type="string",
613
                            completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
614

    
615
OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
616
                    metavar="<os>",
617
                    completion_suggest=OPT_COMPL_ONE_OS)
618

    
619
FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
620
                               action="store_true", default=False,
621
                               help="Force an unknown variant")
622

    
623
NO_INSTALL_OPT = cli_option("--no-install", dest="no_install",
624
                            action="store_true", default=False,
625
                            help="Do not install the OS (will"
626
                            " enable no-start)")
627

    
628
BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
629
                         type="keyval", default={},
630
                         help="Backend parameters")
631

    
632
HVOPTS_OPT =  cli_option("-H", "--hypervisor-parameters", type="keyval",
633
                         default={}, dest="hvparams",
634
                         help="Hypervisor parameters")
635

    
636
HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
637
                            help="Hypervisor and hypervisor options, in the"
638
                            " format hypervisor:option=value,option=value,...",
639
                            default=None, type="identkeyval")
640

    
641
HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
642
                        help="Hypervisor and hypervisor options, in the"
643
                        " format hypervisor:option=value,option=value,...",
644
                        default=[], action="append", type="identkeyval")
645

    
646
NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
647
                           action="store_false",
648
                           help="Don't check that the instance's IP"
649
                           " is alive")
650

    
651
NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
652
                             default=True, action="store_false",
653
                             help="Don't check that the instance's name"
654
                             " is resolvable")
655

    
656
NET_OPT = cli_option("--net",
657
                     help="NIC parameters", default=[],
658
                     dest="nics", action="append", type="identkeyval")
659

    
660
DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
661
                      dest="disks", action="append", type="identkeyval")
662

    
663
DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
664
                         help="Comma-separated list of disks"
665
                         " indices to act on (e.g. 0,2) (optional,"
666
                         " defaults to all disks)")
667

    
668
OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
669
                         help="Enforces a single-disk configuration using the"
670
                         " given disk size, in MiB unless a suffix is used",
671
                         default=None, type="unit", metavar="<size>")
672

    
673
IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
674
                                dest="ignore_consistency",
675
                                action="store_true", default=False,
676
                                help="Ignore the consistency of the disks on"
677
                                " the secondary")
678

    
679
NONLIVE_OPT = cli_option("--non-live", dest="live",
680
                         default=True, action="store_false",
681
                         help="Do a non-live migration (this usually means"
682
                         " freeze the instance, save the state, transfer and"
683
                         " only then resume running on the secondary node)")
684

    
685
NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
686
                                help="Target node and optional secondary node",
687
                                metavar="<pnode>[:<snode>]",
688
                                completion_suggest=OPT_COMPL_INST_ADD_NODES)
689

    
690
NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
691
                           action="append", metavar="<node>",
692
                           help="Use only this node (can be used multiple"
693
                           " times, if not given defaults to all nodes)",
694
                           completion_suggest=OPT_COMPL_ONE_NODE)
695

    
696
SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
697
                             metavar="<node>",
698
                             completion_suggest=OPT_COMPL_ONE_NODE)
699

    
700
NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
701
                         action="store_false",
702
                         help="Don't start the instance after creation")
703

    
704
SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
705
                         action="store_true", default=False,
706
                         help="Show command instead of executing it")
707

    
708
CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
709
                         default=False, action="store_true",
710
                         help="Instead of performing the migration, try to"
711
                         " recover from a failed cleanup. This is safe"
712
                         " to run even if the instance is healthy, but it"
713
                         " will create extra replication traffic and "
714
                         " disrupt briefly the replication (like during the"
715
                         " migration")
716

    
717
STATIC_OPT = cli_option("-s", "--static", dest="static",
718
                        action="store_true", default=False,
719
                        help="Only show configuration data, not runtime data")
720

    
721
ALL_OPT = cli_option("--all", dest="show_all",
722
                     default=False, action="store_true",
723
                     help="Show info on all instances on the cluster."
724
                     " This can take a long time to run, use wisely")
725

    
726
SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
727
                           action="store_true", default=False,
728
                           help="Interactive OS reinstall, lists available"
729
                           " OS templates for selection")
730

    
731
IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
732
                                 action="store_true", default=False,
733
                                 help="Remove the instance from the cluster"
734
                                 " configuration even if there are failures"
735
                                 " during the removal process")
736

    
737
IGNORE_REMOVE_FAILURES_OPT = cli_option("--ignore-remove-failures",
738
                                        dest="ignore_remove_failures",
739
                                        action="store_true", default=False,
740
                                        help="Remove the instance from the"
741
                                        " cluster configuration even if there"
742
                                        " are failures during the removal"
743
                                        " process")
744

    
745
REMOVE_INSTANCE_OPT = cli_option("--remove-instance", dest="remove_instance",
746
                                 action="store_true", default=False,
747
                                 help="Remove the instance from the cluster")
748

    
749
NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
750
                               help="Specifies the new secondary node",
751
                               metavar="NODE", default=None,
752
                               completion_suggest=OPT_COMPL_ONE_NODE)
753

    
754
ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
755
                            default=False, action="store_true",
756
                            help="Replace the disk(s) on the primary"
757
                            " node (only for the drbd template)")
758

    
759
ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
760
                              default=False, action="store_true",
761
                              help="Replace the disk(s) on the secondary"
762
                              " node (only for the drbd template)")
763

    
764
AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
765
                              default=False, action="store_true",
766
                              help="Lock all nodes and auto-promote as needed"
767
                              " to MC status")
768

    
769
AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
770
                              default=False, action="store_true",
771
                              help="Automatically replace faulty disks"
772
                              " (only for the drbd template)")
773

    
774
IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
775
                             default=False, action="store_true",
776
                             help="Ignore current recorded size"
777
                             " (useful for forcing activation when"
778
                             " the recorded size is wrong)")
779

    
780
SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
781
                          metavar="<node>",
782
                          completion_suggest=OPT_COMPL_ONE_NODE)
783

    
784
SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
785
                         metavar="<dir>")
786

    
787
SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
788
                              help="Specify the secondary ip for the node",
789
                              metavar="ADDRESS", default=None)
790

    
791
READD_OPT = cli_option("--readd", dest="readd",
792
                       default=False, action="store_true",
793
                       help="Readd old node after replacing it")
794

    
795
NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
796
                                default=True, action="store_false",
797
                                help="Disable SSH key fingerprint checking")
798

    
799

    
800
MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
801
                    type="bool", default=None, metavar=_YORNO,
802
                    help="Set the master_candidate flag on the node")
803

    
804
OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
805
                         type="bool", default=None,
806
                         help="Set the offline flag on the node")
807

    
808
DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
809
                         type="bool", default=None,
810
                         help="Set the drained flag on the node")
811

    
812
ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
813
                             type="bool", default=None, metavar=_YORNO,
814
                             help="Set the allocatable flag on a volume")
815

    
816
NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
817
                               help="Disable support for lvm based instances"
818
                               " (cluster-wide)",
819
                               action="store_false", default=True)
820

    
821
ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
822
                            dest="enabled_hypervisors",
823
                            help="Comma-separated list of hypervisors",
824
                            type="string", default=None)
825

    
826
NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
827
                            type="keyval", default={},
828
                            help="NIC parameters")
829

    
830
CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
831
                         dest="candidate_pool_size", type="int",
832
                         help="Set the candidate pool size")
833

    
834
VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name",
835
                         help="Enables LVM and specifies the volume group"
836
                         " name (cluster-wide) for disk allocation [xenvg]",
837
                         metavar="VG", default=None)
838

    
839
YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
840
                          help="Destroy cluster", action="store_true")
841

    
842
NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
843
                          help="Skip node agreement check (dangerous)",
844
                          action="store_true", default=False)
845

    
846
MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
847
                            help="Specify the mac prefix for the instance IP"
848
                            " addresses, in the format XX:XX:XX",
849
                            metavar="PREFIX",
850
                            default=None)
851

    
852
MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
853
                               help="Specify the node interface (cluster-wide)"
854
                               " on which the master IP address will be added "
855
                               " [%s]" % constants.DEFAULT_BRIDGE,
856
                               metavar="NETDEV",
857
                               default=constants.DEFAULT_BRIDGE)
858

    
859
GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
860
                                help="Specify the default directory (cluster-"
861
                                "wide) for storing the file-based disks [%s]" %
862
                                constants.DEFAULT_FILE_STORAGE_DIR,
863
                                metavar="DIR",
864
                                default=constants.DEFAULT_FILE_STORAGE_DIR)
865

    
866
NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
867
                                   help="Don't modify /etc/hosts",
868
                                   action="store_false", default=True)
869

    
870
NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
871
                                    help="Don't initialize SSH keys",
872
                                    action="store_false", default=True)
873

    
874
ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
875
                             help="Enable parseable error messages",
876
                             action="store_true", default=False)
877

    
878
NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
879
                          help="Skip N+1 memory redundancy tests",
880
                          action="store_true", default=False)
881

    
882
REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
883
                             help="Type of reboot: soft/hard/full",
884
                             default=constants.INSTANCE_REBOOT_HARD,
885
                             metavar="<REBOOT>",
886
                             choices=list(constants.REBOOT_TYPES))
887

    
888
IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
889
                                    dest="ignore_secondaries",
890
                                    default=False, action="store_true",
891
                                    help="Ignore errors from secondaries")
892

    
893
NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
894
                            action="store_false", default=True,
895
                            help="Don't shutdown the instance (unsafe)")
896

    
897
TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
898
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
899
                         help="Maximum time to wait")
900

    
901
SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
902
                         dest="shutdown_timeout", type="int",
903
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
904
                         help="Maximum time to wait for instance shutdown")
905

    
906
EARLY_RELEASE_OPT = cli_option("--early-release",
907
                               dest="early_release", default=False,
908
                               action="store_true",
909
                               help="Release the locks on the secondary"
910
                               " node(s) early")
911

    
912
NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
913
                                  dest="new_cluster_cert",
914
                                  default=False, action="store_true",
915
                                  help="Generate a new cluster certificate")
916

    
917
RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
918
                           default=None,
919
                           help="File containing new RAPI certificate")
920

    
921
NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
922
                               default=None, action="store_true",
923
                               help=("Generate a new self-signed RAPI"
924
                                     " certificate"))
925

    
926
NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
927
                                    dest="new_confd_hmac_key",
928
                                    default=False, action="store_true",
929
                                    help=("Create a new HMAC key for %s" %
930
                                          constants.CONFD))
931

    
932
CLUSTER_DOMAIN_SECRET_OPT = cli_option("--cluster-domain-secret",
933
                                       dest="cluster_domain_secret",
934
                                       default=None,
935
                                       help=("Load new new cluster domain"
936
                                             " secret from file"))
937

    
938
NEW_CLUSTER_DOMAIN_SECRET_OPT = cli_option("--new-cluster-domain-secret",
939
                                           dest="new_cluster_domain_secret",
940
                                           default=False, action="store_true",
941
                                           help=("Create a new cluster domain"
942
                                                 " secret"))
943

    
944
USE_REPL_NET_OPT = cli_option("--use-replication-network",
945
                              dest="use_replication_network",
946
                              help="Whether to use the replication network"
947
                              " for talking to the nodes",
948
                              action="store_true", default=False)
949

    
950
MAINTAIN_NODE_HEALTH_OPT = \
951
    cli_option("--maintain-node-health", dest="maintain_node_health",
952
               metavar=_YORNO, default=None, type="bool",
953
               help="Configure the cluster to automatically maintain node"
954
               " health, by shutting down unknown instances, shutting down"
955
               " unknown DRBD devices, etc.")
956

    
957
IDENTIFY_DEFAULTS_OPT = \
958
    cli_option("--identify-defaults", dest="identify_defaults",
959
               default=False, action="store_true",
960
               help="Identify which saved instance parameters are equal to"
961
               " the current cluster defaults and set them as such, instead"
962
               " of marking them as overridden")
963

    
964
UIDPOOL_OPT = cli_option("--uid-pool", default=None,
965
                         action="store", dest="uid_pool",
966
                         help=("A list of user-ids or user-id"
967
                               " ranges separated by commas"))
968

    
969
ADD_UIDS_OPT = cli_option("--add-uids", default=None,
970
                          action="store", dest="add_uids",
971
                          help=("A list of user-ids or user-id"
972
                                " ranges separated by commas, to be"
973
                                " added to the user-id pool"))
974

    
975
REMOVE_UIDS_OPT = cli_option("--remove-uids", default=None,
976
                             action="store", dest="remove_uids",
977
                             help=("A list of user-ids or user-id"
978
                                   " ranges separated by commas, to be"
979
                                   " removed from the user-id pool"))
980

    
981
ROMAN_OPT = cli_option("--roman",
982
                       dest="roman_integers", default=False,
983
                       action="store_true",
984
                       help="Use roman numbers for positive integers")
985

    
986

    
987

    
988
def _ParseArgs(argv, commands, aliases):
989
  """Parser for the command line arguments.
990

991
  This function parses the arguments and returns the function which
992
  must be executed together with its (modified) arguments.
993

994
  @param argv: the command line
995
  @param commands: dictionary with special contents, see the design
996
      doc for cmdline handling
997
  @param aliases: dictionary with command aliases {'alias': 'target, ...}
998

999
  """
1000
  if len(argv) == 0:
1001
    binary = "<command>"
1002
  else:
1003
    binary = argv[0].split("/")[-1]
1004

    
1005
  if len(argv) > 1 and argv[1] == "--version":
1006
    ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
1007
    # Quit right away. That way we don't have to care about this special
1008
    # argument. optparse.py does it the same.
1009
    sys.exit(0)
1010

    
1011
  if len(argv) < 2 or not (argv[1] in commands or
1012
                           argv[1] in aliases):
1013
    # let's do a nice thing
1014
    sortedcmds = commands.keys()
1015
    sortedcmds.sort()
1016

    
1017
    ToStdout("Usage: %s {command} [options...] [argument...]", binary)
1018
    ToStdout("%s <command> --help to see details, or man %s", binary, binary)
1019
    ToStdout("")
1020

    
1021
    # compute the max line length for cmd + usage
1022
    mlen = max([len(" %s" % cmd) for cmd in commands])
1023
    mlen = min(60, mlen) # should not get here...
1024

    
1025
    # and format a nice command list
1026
    ToStdout("Commands:")
1027
    for cmd in sortedcmds:
1028
      cmdstr = " %s" % (cmd,)
1029
      help_text = commands[cmd][4]
1030
      help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
1031
      ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
1032
      for line in help_lines:
1033
        ToStdout("%-*s   %s", mlen, "", line)
1034

    
1035
    ToStdout("")
1036

    
1037
    return None, None, None
1038

    
1039
  # get command, unalias it, and look it up in commands
1040
  cmd = argv.pop(1)
1041
  if cmd in aliases:
1042
    if cmd in commands:
1043
      raise errors.ProgrammerError("Alias '%s' overrides an existing"
1044
                                   " command" % cmd)
1045

    
1046
    if aliases[cmd] not in commands:
1047
      raise errors.ProgrammerError("Alias '%s' maps to non-existing"
1048
                                   " command '%s'" % (cmd, aliases[cmd]))
1049

    
1050
    cmd = aliases[cmd]
1051

    
1052
  func, args_def, parser_opts, usage, description = commands[cmd]
1053
  parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT, DEBUG_OPT],
1054
                        description=description,
1055
                        formatter=TitledHelpFormatter(),
1056
                        usage="%%prog %s %s" % (cmd, usage))
1057
  parser.disable_interspersed_args()
1058
  options, args = parser.parse_args()
1059

    
1060
  if not _CheckArguments(cmd, args_def, args):
1061
    return None, None, None
1062

    
1063
  return func, options, args
1064

    
1065

    
1066
def _CheckArguments(cmd, args_def, args):
1067
  """Verifies the arguments using the argument definition.
1068

1069
  Algorithm:
1070

1071
    1. Abort with error if values specified by user but none expected.
1072

1073
    1. For each argument in definition
1074

1075
      1. Keep running count of minimum number of values (min_count)
1076
      1. Keep running count of maximum number of values (max_count)
1077
      1. If it has an unlimited number of values
1078

1079
        1. Abort with error if it's not the last argument in the definition
1080

1081
    1. If last argument has limited number of values
1082

1083
      1. Abort with error if number of values doesn't match or is too large
1084

1085
    1. Abort with error if user didn't pass enough values (min_count)
1086

1087
  """
1088
  if args and not args_def:
1089
    ToStderr("Error: Command %s expects no arguments", cmd)
1090
    return False
1091

    
1092
  min_count = None
1093
  max_count = None
1094
  check_max = None
1095

    
1096
  last_idx = len(args_def) - 1
1097

    
1098
  for idx, arg in enumerate(args_def):
1099
    if min_count is None:
1100
      min_count = arg.min
1101
    elif arg.min is not None:
1102
      min_count += arg.min
1103

    
1104
    if max_count is None:
1105
      max_count = arg.max
1106
    elif arg.max is not None:
1107
      max_count += arg.max
1108

    
1109
    if idx == last_idx:
1110
      check_max = (arg.max is not None)
1111

    
1112
    elif arg.max is None:
1113
      raise errors.ProgrammerError("Only the last argument can have max=None")
1114

    
1115
  if check_max:
1116
    # Command with exact number of arguments
1117
    if (min_count is not None and max_count is not None and
1118
        min_count == max_count and len(args) != min_count):
1119
      ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
1120
      return False
1121

    
1122
    # Command with limited number of arguments
1123
    if max_count is not None and len(args) > max_count:
1124
      ToStderr("Error: Command %s expects only %d argument(s)",
1125
               cmd, max_count)
1126
      return False
1127

    
1128
  # Command with some required arguments
1129
  if min_count is not None and len(args) < min_count:
1130
    ToStderr("Error: Command %s expects at least %d argument(s)",
1131
             cmd, min_count)
1132
    return False
1133

    
1134
  return True
1135

    
1136

    
1137
def SplitNodeOption(value):
1138
  """Splits the value of a --node option.
1139

1140
  """
1141
  if value and ':' in value:
1142
    return value.split(':', 1)
1143
  else:
1144
    return (value, None)
1145

    
1146

    
1147
def CalculateOSNames(os_name, os_variants):
1148
  """Calculates all the names an OS can be called, according to its variants.
1149

1150
  @type os_name: string
1151
  @param os_name: base name of the os
1152
  @type os_variants: list or None
1153
  @param os_variants: list of supported variants
1154
  @rtype: list
1155
  @return: list of valid names
1156

1157
  """
1158
  if os_variants:
1159
    return ['%s+%s' % (os_name, v) for v in os_variants]
1160
  else:
1161
    return [os_name]
1162

    
1163

    
1164
def UsesRPC(fn):
1165
  def wrapper(*args, **kwargs):
1166
    rpc.Init()
1167
    try:
1168
      return fn(*args, **kwargs)
1169
    finally:
1170
      rpc.Shutdown()
1171
  return wrapper
1172

    
1173

    
1174
def AskUser(text, choices=None):
1175
  """Ask the user a question.
1176

1177
  @param text: the question to ask
1178

1179
  @param choices: list with elements tuples (input_char, return_value,
1180
      description); if not given, it will default to: [('y', True,
1181
      'Perform the operation'), ('n', False, 'Do no do the operation')];
1182
      note that the '?' char is reserved for help
1183

1184
  @return: one of the return values from the choices list; if input is
1185
      not possible (i.e. not running with a tty, we return the last
1186
      entry from the list
1187

1188
  """
1189
  if choices is None:
1190
    choices = [('y', True, 'Perform the operation'),
1191
               ('n', False, 'Do not perform the operation')]
1192
  if not choices or not isinstance(choices, list):
1193
    raise errors.ProgrammerError("Invalid choices argument to AskUser")
1194
  for entry in choices:
1195
    if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
1196
      raise errors.ProgrammerError("Invalid choices element to AskUser")
1197

    
1198
  answer = choices[-1][1]
1199
  new_text = []
1200
  for line in text.splitlines():
1201
    new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
1202
  text = "\n".join(new_text)
1203
  try:
1204
    f = file("/dev/tty", "a+")
1205
  except IOError:
1206
    return answer
1207
  try:
1208
    chars = [entry[0] for entry in choices]
1209
    chars[-1] = "[%s]" % chars[-1]
1210
    chars.append('?')
1211
    maps = dict([(entry[0], entry[1]) for entry in choices])
1212
    while True:
1213
      f.write(text)
1214
      f.write('\n')
1215
      f.write("/".join(chars))
1216
      f.write(": ")
1217
      line = f.readline(2).strip().lower()
1218
      if line in maps:
1219
        answer = maps[line]
1220
        break
1221
      elif line == '?':
1222
        for entry in choices:
1223
          f.write(" %s - %s\n" % (entry[0], entry[2]))
1224
        f.write("\n")
1225
        continue
1226
  finally:
1227
    f.close()
1228
  return answer
1229

    
1230

    
1231
class JobSubmittedException(Exception):
1232
  """Job was submitted, client should exit.
1233

1234
  This exception has one argument, the ID of the job that was
1235
  submitted. The handler should print this ID.
1236

1237
  This is not an error, just a structured way to exit from clients.
1238

1239
  """
1240

    
1241

    
1242
def SendJob(ops, cl=None):
1243
  """Function to submit an opcode without waiting for the results.
1244

1245
  @type ops: list
1246
  @param ops: list of opcodes
1247
  @type cl: luxi.Client
1248
  @param cl: the luxi client to use for communicating with the master;
1249
             if None, a new client will be created
1250

1251
  """
1252
  if cl is None:
1253
    cl = GetClient()
1254

    
1255
  job_id = cl.SubmitJob(ops)
1256

    
1257
  return job_id
1258

    
1259

    
1260
def GenericPollJob(job_id, cbs, report_cbs):
1261
  """Generic job-polling function.
1262

1263
  @type job_id: number
1264
  @param job_id: Job ID
1265
  @type cbs: Instance of L{JobPollCbBase}
1266
  @param cbs: Data callbacks
1267
  @type report_cbs: Instance of L{JobPollReportCbBase}
1268
  @param report_cbs: Reporting callbacks
1269

1270
  """
1271
  prev_job_info = None
1272
  prev_logmsg_serial = None
1273

    
1274
  status = None
1275

    
1276
  while True:
1277
    result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1278
                                      prev_logmsg_serial)
1279
    if not result:
1280
      # job not found, go away!
1281
      raise errors.JobLost("Job with id %s lost" % job_id)
1282

    
1283
    if result == constants.JOB_NOTCHANGED:
1284
      report_cbs.ReportNotChanged(job_id, status)
1285

    
1286
      # Wait again
1287
      continue
1288

    
1289
    # Split result, a tuple of (field values, log entries)
1290
    (job_info, log_entries) = result
1291
    (status, ) = job_info
1292

    
1293
    if log_entries:
1294
      for log_entry in log_entries:
1295
        (serial, timestamp, log_type, message) = log_entry
1296
        report_cbs.ReportLogMessage(job_id, serial, timestamp,
1297
                                    log_type, message)
1298
        prev_logmsg_serial = max(prev_logmsg_serial, serial)
1299

    
1300
    # TODO: Handle canceled and archived jobs
1301
    elif status in (constants.JOB_STATUS_SUCCESS,
1302
                    constants.JOB_STATUS_ERROR,
1303
                    constants.JOB_STATUS_CANCELING,
1304
                    constants.JOB_STATUS_CANCELED):
1305
      break
1306

    
1307
    prev_job_info = job_info
1308

    
1309
  jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1310
  if not jobs:
1311
    raise errors.JobLost("Job with id %s lost" % job_id)
1312

    
1313
  status, opstatus, result = jobs[0]
1314

    
1315
  if status == constants.JOB_STATUS_SUCCESS:
1316
    return result
1317

    
1318
  if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
1319
    raise errors.OpExecError("Job was canceled")
1320

    
1321
  has_ok = False
1322
  for idx, (status, msg) in enumerate(zip(opstatus, result)):
1323
    if status == constants.OP_STATUS_SUCCESS:
1324
      has_ok = True
1325
    elif status == constants.OP_STATUS_ERROR:
1326
      errors.MaybeRaise(msg)
1327

    
1328
      if has_ok:
1329
        raise errors.OpExecError("partial failure (opcode %d): %s" %
1330
                                 (idx, msg))
1331

    
1332
      raise errors.OpExecError(str(msg))
1333

    
1334
  # default failure mode
1335
  raise errors.OpExecError(result)
1336

    
1337

    
1338
class JobPollCbBase:
1339
  """Base class for L{GenericPollJob} callbacks.
1340

1341
  """
1342
  def __init__(self):
1343
    """Initializes this class.
1344

1345
    """
1346

    
1347
  def WaitForJobChangeOnce(self, job_id, fields,
1348
                           prev_job_info, prev_log_serial):
1349
    """Waits for changes on a job.
1350

1351
    """
1352
    raise NotImplementedError()
1353

    
1354
  def QueryJobs(self, job_ids, fields):
1355
    """Returns the selected fields for the selected job IDs.
1356

1357
    @type job_ids: list of numbers
1358
    @param job_ids: Job IDs
1359
    @type fields: list of strings
1360
    @param fields: Fields
1361

1362
    """
1363
    raise NotImplementedError()
1364

    
1365

    
1366
class JobPollReportCbBase:
1367
  """Base class for L{GenericPollJob} reporting callbacks.
1368

1369
  """
1370
  def __init__(self):
1371
    """Initializes this class.
1372

1373
    """
1374

    
1375
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1376
    """Handles a log message.
1377

1378
    """
1379
    raise NotImplementedError()
1380

    
1381
  def ReportNotChanged(self, job_id, status):
1382
    """Called for if a job hasn't changed in a while.
1383

1384
    @type job_id: number
1385
    @param job_id: Job ID
1386
    @type status: string or None
1387
    @param status: Job status if available
1388

1389
    """
1390
    raise NotImplementedError()
1391

    
1392

    
1393
class _LuxiJobPollCb(JobPollCbBase):
1394
  def __init__(self, cl):
1395
    """Initializes this class.
1396

1397
    """
1398
    JobPollCbBase.__init__(self)
1399
    self.cl = cl
1400

    
1401
  def WaitForJobChangeOnce(self, job_id, fields,
1402
                           prev_job_info, prev_log_serial):
1403
    """Waits for changes on a job.
1404

1405
    """
1406
    return self.cl.WaitForJobChangeOnce(job_id, fields,
1407
                                        prev_job_info, prev_log_serial)
1408

    
1409
  def QueryJobs(self, job_ids, fields):
1410
    """Returns the selected fields for the selected job IDs.
1411

1412
    """
1413
    return self.cl.QueryJobs(job_ids, fields)
1414

    
1415

    
1416
class FeedbackFnJobPollReportCb(JobPollReportCbBase):
1417
  def __init__(self, feedback_fn):
1418
    """Initializes this class.
1419

1420
    """
1421
    JobPollReportCbBase.__init__(self)
1422

    
1423
    self.feedback_fn = feedback_fn
1424

    
1425
    assert callable(feedback_fn)
1426

    
1427
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1428
    """Handles a log message.
1429

1430
    """
1431
    self.feedback_fn((timestamp, log_type, log_msg))
1432

    
1433
  def ReportNotChanged(self, job_id, status):
1434
    """Called if a job hasn't changed in a while.
1435

1436
    """
1437
    # Ignore
1438

    
1439

    
1440
class StdioJobPollReportCb(JobPollReportCbBase):
1441
  def __init__(self):
1442
    """Initializes this class.
1443

1444
    """
1445
    JobPollReportCbBase.__init__(self)
1446

    
1447
    self.notified_queued = False
1448
    self.notified_waitlock = False
1449

    
1450
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1451
    """Handles a log message.
1452

1453
    """
1454
    ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
1455
             utils.SafeEncode(log_msg))
1456

    
1457
  def ReportNotChanged(self, job_id, status):
1458
    """Called if a job hasn't changed in a while.
1459

1460
    """
1461
    if status is None:
1462
      return
1463

    
1464
    if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
1465
      ToStderr("Job %s is waiting in queue", job_id)
1466
      self.notified_queued = True
1467

    
1468
    elif status == constants.JOB_STATUS_WAITLOCK and not self.notified_waitlock:
1469
      ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1470
      self.notified_waitlock = True
1471

    
1472

    
1473
def PollJob(job_id, cl=None, feedback_fn=None):
1474
  """Function to poll for the result of a job.
1475

1476
  @type job_id: job identified
1477
  @param job_id: the job to poll for results
1478
  @type cl: luxi.Client
1479
  @param cl: the luxi client to use for communicating with the master;
1480
             if None, a new client will be created
1481

1482
  """
1483
  if cl is None:
1484
    cl = GetClient()
1485

    
1486
  if feedback_fn:
1487
    reporter = FeedbackFnJobPollReportCb(feedback_fn)
1488
  else:
1489
    reporter = StdioJobPollReportCb()
1490

    
1491
  return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
1492

    
1493

    
1494
def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):
1495
  """Legacy function to submit an opcode.
1496

1497
  This is just a simple wrapper over the construction of the processor
1498
  instance. It should be extended to better handle feedback and
1499
  interaction functions.
1500

1501
  """
1502
  if cl is None:
1503
    cl = GetClient()
1504

    
1505
  SetGenericOpcodeOpts([op], opts)
1506

    
1507
  job_id = SendJob([op], cl)
1508

    
1509
  op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
1510

    
1511
  return op_results[0]
1512

    
1513

    
1514
def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1515
  """Wrapper around SubmitOpCode or SendJob.
1516

1517
  This function will decide, based on the 'opts' parameter, whether to
1518
  submit and wait for the result of the opcode (and return it), or
1519
  whether to just send the job and print its identifier. It is used in
1520
  order to simplify the implementation of the '--submit' option.
1521

1522
  It will also process the opcodes if we're sending the via SendJob
1523
  (otherwise SubmitOpCode does it).
1524

1525
  """
1526
  if opts and opts.submit_only:
1527
    job = [op]
1528
    SetGenericOpcodeOpts(job, opts)
1529
    job_id = SendJob(job, cl=cl)
1530
    raise JobSubmittedException(job_id)
1531
  else:
1532
    return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1533

    
1534

    
1535
def SetGenericOpcodeOpts(opcode_list, options):
1536
  """Processor for generic options.
1537

1538
  This function updates the given opcodes based on generic command
1539
  line options (like debug, dry-run, etc.).
1540

1541
  @param opcode_list: list of opcodes
1542
  @param options: command line options or None
1543
  @return: None (in-place modification)
1544

1545
  """
1546
  if not options:
1547
    return
1548
  for op in opcode_list:
1549
    op.dry_run = options.dry_run
1550
    op.debug_level = options.debug
1551

    
1552

    
1553
def GetClient():
1554
  # TODO: Cache object?
1555
  try:
1556
    client = luxi.Client()
1557
  except luxi.NoMasterError:
1558
    ss = ssconf.SimpleStore()
1559

    
1560
    # Try to read ssconf file
1561
    try:
1562
      ss.GetMasterNode()
1563
    except errors.ConfigurationError:
1564
      raise errors.OpPrereqError("Cluster not initialized or this machine is"
1565
                                 " not part of a cluster")
1566

    
1567
    master, myself = ssconf.GetMasterAndMyself(ss=ss)
1568
    if master != myself:
1569
      raise errors.OpPrereqError("This is not the master node, please connect"
1570
                                 " to node '%s' and rerun the command" %
1571
                                 master)
1572
    raise
1573
  return client
1574

    
1575

    
1576
def FormatError(err):
1577
  """Return a formatted error message for a given error.
1578

1579
  This function takes an exception instance and returns a tuple
1580
  consisting of two values: first, the recommended exit code, and
1581
  second, a string describing the error message (not
1582
  newline-terminated).
1583

1584
  """
1585
  retcode = 1
1586
  obuf = StringIO()
1587
  msg = str(err)
1588
  if isinstance(err, errors.ConfigurationError):
1589
    txt = "Corrupt configuration file: %s" % msg
1590
    logging.error(txt)
1591
    obuf.write(txt + "\n")
1592
    obuf.write("Aborting.")
1593
    retcode = 2
1594
  elif isinstance(err, errors.HooksAbort):
1595
    obuf.write("Failure: hooks execution failed:\n")
1596
    for node, script, out in err.args[0]:
1597
      if out:
1598
        obuf.write("  node: %s, script: %s, output: %s\n" %
1599
                   (node, script, out))
1600
      else:
1601
        obuf.write("  node: %s, script: %s (no output)\n" %
1602
                   (node, script))
1603
  elif isinstance(err, errors.HooksFailure):
1604
    obuf.write("Failure: hooks general failure: %s" % msg)
1605
  elif isinstance(err, errors.ResolverError):
1606
    this_host = utils.HostInfo.SysName()
1607
    if err.args[0] == this_host:
1608
      msg = "Failure: can't resolve my own hostname ('%s')"
1609
    else:
1610
      msg = "Failure: can't resolve hostname '%s'"
1611
    obuf.write(msg % err.args[0])
1612
  elif isinstance(err, errors.OpPrereqError):
1613
    if len(err.args) == 2:
1614
      obuf.write("Failure: prerequisites not met for this"
1615
               " operation:\nerror type: %s, error details:\n%s" %
1616
                 (err.args[1], err.args[0]))
1617
    else:
1618
      obuf.write("Failure: prerequisites not met for this"
1619
                 " operation:\n%s" % msg)
1620
  elif isinstance(err, errors.OpExecError):
1621
    obuf.write("Failure: command execution error:\n%s" % msg)
1622
  elif isinstance(err, errors.TagError):
1623
    obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1624
  elif isinstance(err, errors.JobQueueDrainError):
1625
    obuf.write("Failure: the job queue is marked for drain and doesn't"
1626
               " accept new requests\n")
1627
  elif isinstance(err, errors.JobQueueFull):
1628
    obuf.write("Failure: the job queue is full and doesn't accept new"
1629
               " job submissions until old jobs are archived\n")
1630
  elif isinstance(err, errors.TypeEnforcementError):
1631
    obuf.write("Parameter Error: %s" % msg)
1632
  elif isinstance(err, errors.ParameterError):
1633
    obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1634
  elif isinstance(err, luxi.NoMasterError):
1635
    obuf.write("Cannot communicate with the master daemon.\nIs it running"
1636
               " and listening for connections?")
1637
  elif isinstance(err, luxi.TimeoutError):
1638
    obuf.write("Timeout while talking to the master daemon. Error:\n"
1639
               "%s" % msg)
1640
  elif isinstance(err, luxi.ProtocolError):
1641
    obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1642
               "%s" % msg)
1643
  elif isinstance(err, errors.GenericError):
1644
    obuf.write("Unhandled Ganeti error: %s" % msg)
1645
  elif isinstance(err, JobSubmittedException):
1646
    obuf.write("JobID: %s\n" % err.args[0])
1647
    retcode = 0
1648
  else:
1649
    obuf.write("Unhandled exception: %s" % msg)
1650
  return retcode, obuf.getvalue().rstrip('\n')
1651

    
1652

    
1653
def GenericMain(commands, override=None, aliases=None):
1654
  """Generic main function for all the gnt-* commands.
1655

1656
  Arguments:
1657
    - commands: a dictionary with a special structure, see the design doc
1658
                for command line handling.
1659
    - override: if not None, we expect a dictionary with keys that will
1660
                override command line options; this can be used to pass
1661
                options from the scripts to generic functions
1662
    - aliases: dictionary with command aliases {'alias': 'target, ...}
1663

1664
  """
1665
  # save the program name and the entire command line for later logging
1666
  if sys.argv:
1667
    binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1668
    if len(sys.argv) >= 2:
1669
      binary += " " + sys.argv[1]
1670
      old_cmdline = " ".join(sys.argv[2:])
1671
    else:
1672
      old_cmdline = ""
1673
  else:
1674
    binary = "<unknown program>"
1675
    old_cmdline = ""
1676

    
1677
  if aliases is None:
1678
    aliases = {}
1679

    
1680
  try:
1681
    func, options, args = _ParseArgs(sys.argv, commands, aliases)
1682
  except errors.ParameterError, err:
1683
    result, err_msg = FormatError(err)
1684
    ToStderr(err_msg)
1685
    return 1
1686

    
1687
  if func is None: # parse error
1688
    return 1
1689

    
1690
  if override is not None:
1691
    for key, val in override.iteritems():
1692
      setattr(options, key, val)
1693

    
1694
  utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1695
                     stderr_logging=True, program=binary)
1696

    
1697
  if old_cmdline:
1698
    logging.info("run with arguments '%s'", old_cmdline)
1699
  else:
1700
    logging.info("run with no arguments")
1701

    
1702
  try:
1703
    result = func(options, args)
1704
  except (errors.GenericError, luxi.ProtocolError,
1705
          JobSubmittedException), err:
1706
    result, err_msg = FormatError(err)
1707
    logging.exception("Error during command processing")
1708
    ToStderr(err_msg)
1709

    
1710
  return result
1711

    
1712

    
1713
def GenericInstanceCreate(mode, opts, args):
1714
  """Add an instance to the cluster via either creation or import.
1715

1716
  @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1717
  @param opts: the command line options selected by the user
1718
  @type args: list
1719
  @param args: should contain only one element, the new instance name
1720
  @rtype: int
1721
  @return: the desired exit code
1722

1723
  """
1724
  instance = args[0]
1725

    
1726
  (pnode, snode) = SplitNodeOption(opts.node)
1727

    
1728
  hypervisor = None
1729
  hvparams = {}
1730
  if opts.hypervisor:
1731
    hypervisor, hvparams = opts.hypervisor
1732

    
1733
  if opts.nics:
1734
    try:
1735
      nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
1736
    except ValueError, err:
1737
      raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
1738
    nics = [{}] * nic_max
1739
    for nidx, ndict in opts.nics:
1740
      nidx = int(nidx)
1741
      if not isinstance(ndict, dict):
1742
        msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict)
1743
        raise errors.OpPrereqError(msg)
1744
      nics[nidx] = ndict
1745
  elif opts.no_nics:
1746
    # no nics
1747
    nics = []
1748
  elif mode == constants.INSTANCE_CREATE:
1749
    # default of one nic, all auto
1750
    nics = [{}]
1751
  else:
1752
    # mode == import
1753
    nics = []
1754

    
1755
  if opts.disk_template == constants.DT_DISKLESS:
1756
    if opts.disks or opts.sd_size is not None:
1757
      raise errors.OpPrereqError("Diskless instance but disk"
1758
                                 " information passed")
1759
    disks = []
1760
  else:
1761
    if (not opts.disks and not opts.sd_size
1762
        and mode == constants.INSTANCE_CREATE):
1763
      raise errors.OpPrereqError("No disk information specified")
1764
    if opts.disks and opts.sd_size is not None:
1765
      raise errors.OpPrereqError("Please use either the '--disk' or"
1766
                                 " '-s' option")
1767
    if opts.sd_size is not None:
1768
      opts.disks = [(0, {"size": opts.sd_size})]
1769

    
1770
    if opts.disks:
1771
      try:
1772
        disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1773
      except ValueError, err:
1774
        raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
1775
      disks = [{}] * disk_max
1776
    else:
1777
      disks = []
1778
    for didx, ddict in opts.disks:
1779
      didx = int(didx)
1780
      if not isinstance(ddict, dict):
1781
        msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1782
        raise errors.OpPrereqError(msg)
1783
      elif "size" in ddict:
1784
        if "adopt" in ddict:
1785
          raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1786
                                     " (disk %d)" % didx)
1787
        try:
1788
          ddict["size"] = utils.ParseUnit(ddict["size"])
1789
        except ValueError, err:
1790
          raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1791
                                     (didx, err))
1792
      elif "adopt" in ddict:
1793
        if mode == constants.INSTANCE_IMPORT:
1794
          raise errors.OpPrereqError("Disk adoption not allowed for instance"
1795
                                     " import")
1796
        ddict["size"] = 0
1797
      else:
1798
        raise errors.OpPrereqError("Missing size or adoption source for"
1799
                                   " disk %d" % didx)
1800
      disks[didx] = ddict
1801

    
1802
  utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
1803
  utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1804

    
1805
  if mode == constants.INSTANCE_CREATE:
1806
    start = opts.start
1807
    os_type = opts.os
1808
    src_node = None
1809
    src_path = None
1810
    no_install = opts.no_install
1811
    identify_defaults = False
1812
  elif mode == constants.INSTANCE_IMPORT:
1813
    start = False
1814
    os_type = None
1815
    src_node = opts.src_node
1816
    src_path = opts.src_dir
1817
    no_install = None
1818
    identify_defaults = opts.identify_defaults
1819
  else:
1820
    raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1821

    
1822
  op = opcodes.OpCreateInstance(instance_name=instance,
1823
                                disks=disks,
1824
                                disk_template=opts.disk_template,
1825
                                nics=nics,
1826
                                pnode=pnode, snode=snode,
1827
                                ip_check=opts.ip_check,
1828
                                name_check=opts.name_check,
1829
                                wait_for_sync=opts.wait_for_sync,
1830
                                file_storage_dir=opts.file_storage_dir,
1831
                                file_driver=opts.file_driver,
1832
                                iallocator=opts.iallocator,
1833
                                hypervisor=hypervisor,
1834
                                hvparams=hvparams,
1835
                                beparams=opts.beparams,
1836
                                mode=mode,
1837
                                start=start,
1838
                                os_type=os_type,
1839
                                src_node=src_node,
1840
                                src_path=src_path,
1841
                                no_install=no_install,
1842
                                identify_defaults=identify_defaults)
1843

    
1844
  SubmitOrSend(op, opts)
1845
  return 0
1846

    
1847

    
1848
class _RunWhileClusterStoppedHelper:
1849
  """Helper class for L{RunWhileClusterStopped} to simplify state management
1850

1851
  """
1852
  def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
1853
    """Initializes this class.
1854

1855
    @type feedback_fn: callable
1856
    @param feedback_fn: Feedback function
1857
    @type cluster_name: string
1858
    @param cluster_name: Cluster name
1859
    @type master_node: string
1860
    @param master_node Master node name
1861
    @type online_nodes: list
1862
    @param online_nodes: List of names of online nodes
1863

1864
    """
1865
    self.feedback_fn = feedback_fn
1866
    self.cluster_name = cluster_name
1867
    self.master_node = master_node
1868
    self.online_nodes = online_nodes
1869

    
1870
    self.ssh = ssh.SshRunner(self.cluster_name)
1871

    
1872
    self.nonmaster_nodes = [name for name in online_nodes
1873
                            if name != master_node]
1874

    
1875
    assert self.master_node not in self.nonmaster_nodes
1876

    
1877
  def _RunCmd(self, node_name, cmd):
1878
    """Runs a command on the local or a remote machine.
1879

1880
    @type node_name: string
1881
    @param node_name: Machine name
1882
    @type cmd: list
1883
    @param cmd: Command
1884

1885
    """
1886
    if node_name is None or node_name == self.master_node:
1887
      # No need to use SSH
1888
      result = utils.RunCmd(cmd)
1889
    else:
1890
      result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
1891

    
1892
    if result.failed:
1893
      errmsg = ["Failed to run command %s" % result.cmd]
1894
      if node_name:
1895
        errmsg.append("on node %s" % node_name)
1896
      errmsg.append(": exitcode %s and error %s" %
1897
                    (result.exit_code, result.output))
1898
      raise errors.OpExecError(" ".join(errmsg))
1899

    
1900
  def Call(self, fn, *args):
1901
    """Call function while all daemons are stopped.
1902

1903
    @type fn: callable
1904
    @param fn: Function to be called
1905

1906
    """
1907
    # Pause watcher by acquiring an exclusive lock on watcher state file
1908
    self.feedback_fn("Blocking watcher")
1909
    watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
1910
    try:
1911
      # TODO: Currently, this just blocks. There's no timeout.
1912
      # TODO: Should it be a shared lock?
1913
      watcher_block.Exclusive(blocking=True)
1914

    
1915
      # Stop master daemons, so that no new jobs can come in and all running
1916
      # ones are finished
1917
      self.feedback_fn("Stopping master daemons")
1918
      self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
1919
      try:
1920
        # Stop daemons on all nodes
1921
        for node_name in self.online_nodes:
1922
          self.feedback_fn("Stopping daemons on %s" % node_name)
1923
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
1924

    
1925
        # All daemons are shut down now
1926
        try:
1927
          return fn(self, *args)
1928
        except Exception, err:
1929
          _, errmsg = FormatError(err)
1930
          logging.exception("Caught exception")
1931
          self.feedback_fn(errmsg)
1932
          raise
1933
      finally:
1934
        # Start cluster again, master node last
1935
        for node_name in self.nonmaster_nodes + [self.master_node]:
1936
          self.feedback_fn("Starting daemons on %s" % node_name)
1937
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
1938
    finally:
1939
      # Resume watcher
1940
      watcher_block.Close()
1941

    
1942

    
1943
def RunWhileClusterStopped(feedback_fn, fn, *args):
1944
  """Calls a function while all cluster daemons are stopped.
1945

1946
  @type feedback_fn: callable
1947
  @param feedback_fn: Feedback function
1948
  @type fn: callable
1949
  @param fn: Function to be called when daemons are stopped
1950

1951
  """
1952
  feedback_fn("Gathering cluster information")
1953

    
1954
  # This ensures we're running on the master daemon
1955
  cl = GetClient()
1956

    
1957
  (cluster_name, master_node) = \
1958
    cl.QueryConfigValues(["cluster_name", "master_node"])
1959

    
1960
  online_nodes = GetOnlineNodes([], cl=cl)
1961

    
1962
  # Don't keep a reference to the client. The master daemon will go away.
1963
  del cl
1964

    
1965
  assert master_node in online_nodes
1966

    
1967
  return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
1968
                                       online_nodes).Call(fn, *args)
1969

    
1970

    
1971
def GenerateTable(headers, fields, separator, data,
1972
                  numfields=None, unitfields=None,
1973
                  units=None):
1974
  """Prints a table with headers and different fields.
1975

1976
  @type headers: dict
1977
  @param headers: dictionary mapping field names to headers for
1978
      the table
1979
  @type fields: list
1980
  @param fields: the field names corresponding to each row in
1981
      the data field
1982
  @param separator: the separator to be used; if this is None,
1983
      the default 'smart' algorithm is used which computes optimal
1984
      field width, otherwise just the separator is used between
1985
      each field
1986
  @type data: list
1987
  @param data: a list of lists, each sublist being one row to be output
1988
  @type numfields: list
1989
  @param numfields: a list with the fields that hold numeric
1990
      values and thus should be right-aligned
1991
  @type unitfields: list
1992
  @param unitfields: a list with the fields that hold numeric
1993
      values that should be formatted with the units field
1994
  @type units: string or None
1995
  @param units: the units we should use for formatting, or None for
1996
      automatic choice (human-readable for non-separator usage, otherwise
1997
      megabytes); this is a one-letter string
1998

1999
  """
2000
  if units is None:
2001
    if separator:
2002
      units = "m"
2003
    else:
2004
      units = "h"
2005

    
2006
  if numfields is None:
2007
    numfields = []
2008
  if unitfields is None:
2009
    unitfields = []
2010

    
2011
  numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
2012
  unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
2013

    
2014
  format_fields = []
2015
  for field in fields:
2016
    if headers and field not in headers:
2017
      # TODO: handle better unknown fields (either revert to old
2018
      # style of raising exception, or deal more intelligently with
2019
      # variable fields)
2020
      headers[field] = field
2021
    if separator is not None:
2022
      format_fields.append("%s")
2023
    elif numfields.Matches(field):
2024
      format_fields.append("%*s")
2025
    else:
2026
      format_fields.append("%-*s")
2027

    
2028
  if separator is None:
2029
    mlens = [0 for name in fields]
2030
    format = ' '.join(format_fields)
2031
  else:
2032
    format = separator.replace("%", "%%").join(format_fields)
2033

    
2034
  for row in data:
2035
    if row is None:
2036
      continue
2037
    for idx, val in enumerate(row):
2038
      if unitfields.Matches(fields[idx]):
2039
        try:
2040
          val = int(val)
2041
        except (TypeError, ValueError):
2042
          pass
2043
        else:
2044
          val = row[idx] = utils.FormatUnit(val, units)
2045
      val = row[idx] = str(val)
2046
      if separator is None:
2047
        mlens[idx] = max(mlens[idx], len(val))
2048

    
2049
  result = []
2050
  if headers:
2051
    args = []
2052
    for idx, name in enumerate(fields):
2053
      hdr = headers[name]
2054
      if separator is None:
2055
        mlens[idx] = max(mlens[idx], len(hdr))
2056
        args.append(mlens[idx])
2057
      args.append(hdr)
2058
    result.append(format % tuple(args))
2059

    
2060
  if separator is None:
2061
    assert len(mlens) == len(fields)
2062

    
2063
    if fields and not numfields.Matches(fields[-1]):
2064
      mlens[-1] = 0
2065

    
2066
  for line in data:
2067
    args = []
2068
    if line is None:
2069
      line = ['-' for _ in fields]
2070
    for idx in range(len(fields)):
2071
      if separator is None:
2072
        args.append(mlens[idx])
2073
      args.append(line[idx])
2074
    result.append(format % tuple(args))
2075

    
2076
  return result
2077

    
2078

    
2079
def FormatTimestamp(ts):
2080
  """Formats a given timestamp.
2081

2082
  @type ts: timestamp
2083
  @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
2084

2085
  @rtype: string
2086
  @return: a string with the formatted timestamp
2087

2088
  """
2089
  if not isinstance (ts, (tuple, list)) or len(ts) != 2:
2090
    return '?'
2091
  sec, usec = ts
2092
  return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
2093

    
2094

    
2095
def ParseTimespec(value):
2096
  """Parse a time specification.
2097

2098
  The following suffixed will be recognized:
2099

2100
    - s: seconds
2101
    - m: minutes
2102
    - h: hours
2103
    - d: day
2104
    - w: weeks
2105

2106
  Without any suffix, the value will be taken to be in seconds.
2107

2108
  """
2109
  value = str(value)
2110
  if not value:
2111
    raise errors.OpPrereqError("Empty time specification passed")
2112
  suffix_map = {
2113
    's': 1,
2114
    'm': 60,
2115
    'h': 3600,
2116
    'd': 86400,
2117
    'w': 604800,
2118
    }
2119
  if value[-1] not in suffix_map:
2120
    try:
2121
      value = int(value)
2122
    except (TypeError, ValueError):
2123
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2124
  else:
2125
    multiplier = suffix_map[value[-1]]
2126
    value = value[:-1]
2127
    if not value: # no data left after stripping the suffix
2128
      raise errors.OpPrereqError("Invalid time specification (only"
2129
                                 " suffix passed)")
2130
    try:
2131
      value = int(value) * multiplier
2132
    except (TypeError, ValueError):
2133
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2134
  return value
2135

    
2136

    
2137
def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
2138
                   filter_master=False):
2139
  """Returns the names of online nodes.
2140

2141
  This function will also log a warning on stderr with the names of
2142
  the online nodes.
2143

2144
  @param nodes: if not empty, use only this subset of nodes (minus the
2145
      offline ones)
2146
  @param cl: if not None, luxi client to use
2147
  @type nowarn: boolean
2148
  @param nowarn: by default, this function will output a note with the
2149
      offline nodes that are skipped; if this parameter is True the
2150
      note is not displayed
2151
  @type secondary_ips: boolean
2152
  @param secondary_ips: if True, return the secondary IPs instead of the
2153
      names, useful for doing network traffic over the replication interface
2154
      (if any)
2155
  @type filter_master: boolean
2156
  @param filter_master: if True, do not return the master node in the list
2157
      (useful in coordination with secondary_ips where we cannot check our
2158
      node name against the list)
2159

2160
  """
2161
  if cl is None:
2162
    cl = GetClient()
2163

    
2164
  if secondary_ips:
2165
    name_idx = 2
2166
  else:
2167
    name_idx = 0
2168

    
2169
  if filter_master:
2170
    master_node = cl.QueryConfigValues(["master_node"])[0]
2171
    filter_fn = lambda x: x != master_node
2172
  else:
2173
    filter_fn = lambda _: True
2174

    
2175
  result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"],
2176
                         use_locking=False)
2177
  offline = [row[0] for row in result if row[1]]
2178
  if offline and not nowarn:
2179
    ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
2180
  return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])]
2181

    
2182

    
2183
def _ToStream(stream, txt, *args):
2184
  """Write a message to a stream, bypassing the logging system
2185

2186
  @type stream: file object
2187
  @param stream: the file to which we should write
2188
  @type txt: str
2189
  @param txt: the message
2190

2191
  """
2192
  if args:
2193
    args = tuple(args)
2194
    stream.write(txt % args)
2195
  else:
2196
    stream.write(txt)
2197
  stream.write('\n')
2198
  stream.flush()
2199

    
2200

    
2201
def ToStdout(txt, *args):
2202
  """Write a message to stdout only, bypassing the logging system
2203

2204
  This is just a wrapper over _ToStream.
2205

2206
  @type txt: str
2207
  @param txt: the message
2208

2209
  """
2210
  _ToStream(sys.stdout, txt, *args)
2211

    
2212

    
2213
def ToStderr(txt, *args):
2214
  """Write a message to stderr only, bypassing the logging system
2215

2216
  This is just a wrapper over _ToStream.
2217

2218
  @type txt: str
2219
  @param txt: the message
2220

2221
  """
2222
  _ToStream(sys.stderr, txt, *args)
2223

    
2224

    
2225
class JobExecutor(object):
2226
  """Class which manages the submission and execution of multiple jobs.
2227

2228
  Note that instances of this class should not be reused between
2229
  GetResults() calls.
2230

2231
  """
2232
  def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
2233
    self.queue = []
2234
    if cl is None:
2235
      cl = GetClient()
2236
    self.cl = cl
2237
    self.verbose = verbose
2238
    self.jobs = []
2239
    self.opts = opts
2240
    self.feedback_fn = feedback_fn
2241

    
2242
  def QueueJob(self, name, *ops):
2243
    """Record a job for later submit.
2244

2245
    @type name: string
2246
    @param name: a description of the job, will be used in WaitJobSet
2247
    """
2248
    SetGenericOpcodeOpts(ops, self.opts)
2249
    self.queue.append((name, ops))
2250

    
2251
  def SubmitPending(self, each=False):
2252
    """Submit all pending jobs.
2253

2254
    """
2255
    if each:
2256
      results = []
2257
      for row in self.queue:
2258
        # SubmitJob will remove the success status, but raise an exception if
2259
        # the submission fails, so we'll notice that anyway.
2260
        results.append([True, self.cl.SubmitJob(row[1])])
2261
    else:
2262
      results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
2263
    for (idx, ((status, data), (name, _))) in enumerate(zip(results,
2264
                                                            self.queue)):
2265
      self.jobs.append((idx, status, data, name))
2266

    
2267
  def _ChooseJob(self):
2268
    """Choose a non-waiting/queued job to poll next.
2269

2270
    """
2271
    assert self.jobs, "_ChooseJob called with empty job list"
2272

    
2273
    result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
2274
    assert result
2275

    
2276
    for job_data, status in zip(self.jobs, result):
2277
      if status[0] in (constants.JOB_STATUS_QUEUED,
2278
                    constants.JOB_STATUS_WAITLOCK,
2279
                    constants.JOB_STATUS_CANCELING):
2280
        # job is still waiting
2281
        continue
2282
      # good candidate found
2283
      self.jobs.remove(job_data)
2284
      return job_data
2285

    
2286
    # no job found
2287
    return self.jobs.pop(0)
2288

    
2289
  def GetResults(self):
2290
    """Wait for and return the results of all jobs.
2291

2292
    @rtype: list
2293
    @return: list of tuples (success, job results), in the same order
2294
        as the submitted jobs; if a job has failed, instead of the result
2295
        there will be the error message
2296

2297
    """
2298
    if not self.jobs:
2299
      self.SubmitPending()
2300
    results = []
2301
    if self.verbose:
2302
      ok_jobs = [row[2] for row in self.jobs if row[1]]
2303
      if ok_jobs:
2304
        ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2305

    
2306
    # first, remove any non-submitted jobs
2307
    self.jobs, failures = compat.partition(self.jobs, lambda x: x[1])
2308
    for idx, _, jid, name in failures:
2309
      ToStderr("Failed to submit job for %s: %s", name, jid)
2310
      results.append((idx, False, jid))
2311

    
2312
    while self.jobs:
2313
      (idx, _, jid, name) = self._ChooseJob()
2314
      ToStdout("Waiting for job %s for %s...", jid, name)
2315
      try:
2316
        job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2317
        success = True
2318
      except (errors.GenericError, luxi.ProtocolError), err:
2319
        _, job_result = FormatError(err)
2320
        success = False
2321
        # the error message will always be shown, verbose or not
2322
        ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
2323

    
2324
      results.append((idx, success, job_result))
2325

    
2326
    # sort based on the index, then drop it
2327
    results.sort()
2328
    results = [i[1:] for i in results]
2329

    
2330
    return results
2331

    
2332
  def WaitOrShow(self, wait):
2333
    """Wait for job results or only print the job IDs.
2334

2335
    @type wait: boolean
2336
    @param wait: whether to wait or not
2337

2338
    """
2339
    if wait:
2340
      return self.GetResults()
2341
    else:
2342
      if not self.jobs:
2343
        self.SubmitPending()
2344
      for _, status, result, name in self.jobs:
2345
        if status:
2346
          ToStdout("%s: %s", result, name)
2347
        else:
2348
          ToStderr("Failure for %s: %s", name, result)