Statistics
| Branch: | Tag: | Revision:

root / lib / cli.py @ 583163a6

History | View | Annotate | Download (77 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module dealing with command line parsing"""
23

    
24

    
25
import sys
26
import textwrap
27
import os.path
28
import time
29
import logging
30
from cStringIO import StringIO
31

    
32
from ganeti import utils
33
from ganeti import errors
34
from ganeti import constants
35
from ganeti import opcodes
36
from ganeti import luxi
37
from ganeti import ssconf
38
from ganeti import rpc
39
from ganeti import ssh
40
from ganeti import compat
41
from ganeti import netutils
42

    
43
from optparse import (OptionParser, TitledHelpFormatter,
44
                      Option, OptionValueError)
45

    
46

    
47
__all__ = [
48
  # Command line options
49
  "ADD_UIDS_OPT",
50
  "ALLOCATABLE_OPT",
51
  "ALL_OPT",
52
  "AUTO_PROMOTE_OPT",
53
  "AUTO_REPLACE_OPT",
54
  "BACKEND_OPT",
55
  "CLEANUP_OPT",
56
  "CLUSTER_DOMAIN_SECRET_OPT",
57
  "CONFIRM_OPT",
58
  "CP_SIZE_OPT",
59
  "DEBUG_OPT",
60
  "DEBUG_SIMERR_OPT",
61
  "DISKIDX_OPT",
62
  "DISK_OPT",
63
  "DISK_TEMPLATE_OPT",
64
  "DRAINED_OPT",
65
  "DRBD_HELPER_OPT",
66
  "EARLY_RELEASE_OPT",
67
  "ENABLED_HV_OPT",
68
  "ERROR_CODES_OPT",
69
  "FIELDS_OPT",
70
  "FILESTORE_DIR_OPT",
71
  "FILESTORE_DRIVER_OPT",
72
  "FORCE_OPT",
73
  "FORCE_VARIANT_OPT",
74
  "GLOBAL_FILEDIR_OPT",
75
  "HVLIST_OPT",
76
  "HVOPTS_OPT",
77
  "HYPERVISOR_OPT",
78
  "IALLOCATOR_OPT",
79
  "DEFAULT_IALLOCATOR_OPT",
80
  "IDENTIFY_DEFAULTS_OPT",
81
  "IGNORE_CONSIST_OPT",
82
  "IGNORE_FAILURES_OPT",
83
  "IGNORE_REMOVE_FAILURES_OPT",
84
  "IGNORE_SECONDARIES_OPT",
85
  "IGNORE_SIZE_OPT",
86
  "MAC_PREFIX_OPT",
87
  "MAINTAIN_NODE_HEALTH_OPT",
88
  "MASTER_NETDEV_OPT",
89
  "MC_OPT",
90
  "NET_OPT",
91
  "NEW_CLUSTER_CERT_OPT",
92
  "NEW_CLUSTER_DOMAIN_SECRET_OPT",
93
  "NEW_CONFD_HMAC_KEY_OPT",
94
  "NEW_RAPI_CERT_OPT",
95
  "NEW_SECONDARY_OPT",
96
  "NIC_PARAMS_OPT",
97
  "NODE_LIST_OPT",
98
  "NODE_PLACEMENT_OPT",
99
  "NODRBD_STORAGE_OPT",
100
  "NOHDR_OPT",
101
  "NOIPCHECK_OPT",
102
  "NO_INSTALL_OPT",
103
  "NONAMECHECK_OPT",
104
  "NOLVM_STORAGE_OPT",
105
  "NOMODIFY_ETCHOSTS_OPT",
106
  "NOMODIFY_SSH_SETUP_OPT",
107
  "NONICS_OPT",
108
  "NONLIVE_OPT",
109
  "NONPLUS1_OPT",
110
  "NOSHUTDOWN_OPT",
111
  "NOSTART_OPT",
112
  "NOSSH_KEYCHECK_OPT",
113
  "NOVOTING_OPT",
114
  "NWSYNC_OPT",
115
  "ON_PRIMARY_OPT",
116
  "ON_SECONDARY_OPT",
117
  "OFFLINE_OPT",
118
  "OSPARAMS_OPT",
119
  "OS_OPT",
120
  "OS_SIZE_OPT",
121
  "RAPI_CERT_OPT",
122
  "READD_OPT",
123
  "REBOOT_TYPE_OPT",
124
  "REMOVE_INSTANCE_OPT",
125
  "REMOVE_UIDS_OPT",
126
  "ROMAN_OPT",
127
  "SECONDARY_IP_OPT",
128
  "SELECT_OS_OPT",
129
  "SEP_OPT",
130
  "SHOWCMD_OPT",
131
  "SHUTDOWN_TIMEOUT_OPT",
132
  "SINGLE_NODE_OPT",
133
  "SRC_DIR_OPT",
134
  "SRC_NODE_OPT",
135
  "SUBMIT_OPT",
136
  "STATIC_OPT",
137
  "SYNC_OPT",
138
  "TAG_SRC_OPT",
139
  "TIMEOUT_OPT",
140
  "UIDPOOL_OPT",
141
  "USEUNITS_OPT",
142
  "USE_REPL_NET_OPT",
143
  "VERBOSE_OPT",
144
  "VG_NAME_OPT",
145
  "YES_DOIT_OPT",
146
  # Generic functions for CLI programs
147
  "GenericMain",
148
  "GenericInstanceCreate",
149
  "GetClient",
150
  "GetOnlineNodes",
151
  "JobExecutor",
152
  "JobSubmittedException",
153
  "ParseTimespec",
154
  "RunWhileClusterStopped",
155
  "SubmitOpCode",
156
  "SubmitOrSend",
157
  "UsesRPC",
158
  # Formatting functions
159
  "ToStderr", "ToStdout",
160
  "FormatError",
161
  "GenerateTable",
162
  "AskUser",
163
  "FormatTimestamp",
164
  "FormatLogMessage",
165
  # Tags functions
166
  "ListTags",
167
  "AddTags",
168
  "RemoveTags",
169
  # command line options support infrastructure
170
  "ARGS_MANY_INSTANCES",
171
  "ARGS_MANY_NODES",
172
  "ARGS_NONE",
173
  "ARGS_ONE_INSTANCE",
174
  "ARGS_ONE_NODE",
175
  "ARGS_ONE_OS",
176
  "ArgChoice",
177
  "ArgCommand",
178
  "ArgFile",
179
  "ArgHost",
180
  "ArgInstance",
181
  "ArgJobId",
182
  "ArgNode",
183
  "ArgOs",
184
  "ArgSuggest",
185
  "ArgUnknown",
186
  "OPT_COMPL_INST_ADD_NODES",
187
  "OPT_COMPL_MANY_NODES",
188
  "OPT_COMPL_ONE_IALLOCATOR",
189
  "OPT_COMPL_ONE_INSTANCE",
190
  "OPT_COMPL_ONE_NODE",
191
  "OPT_COMPL_ONE_OS",
192
  "cli_option",
193
  "SplitNodeOption",
194
  "CalculateOSNames",
195
  ]
196

    
197
NO_PREFIX = "no_"
198
UN_PREFIX = "-"
199

    
200

    
201
class _Argument:
202
  def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
203
    self.min = min
204
    self.max = max
205

    
206
  def __repr__(self):
207
    return ("<%s min=%s max=%s>" %
208
            (self.__class__.__name__, self.min, self.max))
209

    
210

    
211
class ArgSuggest(_Argument):
212
  """Suggesting argument.
213

214
  Value can be any of the ones passed to the constructor.
215

216
  """
217
  # pylint: disable-msg=W0622
218
  def __init__(self, min=0, max=None, choices=None):
219
    _Argument.__init__(self, min=min, max=max)
220
    self.choices = choices
221

    
222
  def __repr__(self):
223
    return ("<%s min=%s max=%s choices=%r>" %
224
            (self.__class__.__name__, self.min, self.max, self.choices))
225

    
226

    
227
class ArgChoice(ArgSuggest):
228
  """Choice argument.
229

230
  Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
231
  but value must be one of the choices.
232

233
  """
234

    
235

    
236
class ArgUnknown(_Argument):
237
  """Unknown argument to program (e.g. determined at runtime).
238

239
  """
240

    
241

    
242
class ArgInstance(_Argument):
243
  """Instances argument.
244

245
  """
246

    
247

    
248
class ArgNode(_Argument):
249
  """Node argument.
250

251
  """
252

    
253
class ArgJobId(_Argument):
254
  """Job ID argument.
255

256
  """
257

    
258

    
259
class ArgFile(_Argument):
260
  """File path argument.
261

262
  """
263

    
264

    
265
class ArgCommand(_Argument):
266
  """Command argument.
267

268
  """
269

    
270

    
271
class ArgHost(_Argument):
272
  """Host argument.
273

274
  """
275

    
276

    
277
class ArgOs(_Argument):
278
  """OS argument.
279

280
  """
281

    
282

    
283
ARGS_NONE = []
284
ARGS_MANY_INSTANCES = [ArgInstance()]
285
ARGS_MANY_NODES = [ArgNode()]
286
ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
287
ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
288
ARGS_ONE_OS = [ArgOs(min=1, max=1)]
289

    
290

    
291
def _ExtractTagsObject(opts, args):
292
  """Extract the tag type object.
293

294
  Note that this function will modify its args parameter.
295

296
  """
297
  if not hasattr(opts, "tag_type"):
298
    raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
299
  kind = opts.tag_type
300
  if kind == constants.TAG_CLUSTER:
301
    retval = kind, kind
302
  elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
303
    if not args:
304
      raise errors.OpPrereqError("no arguments passed to the command")
305
    name = args.pop(0)
306
    retval = kind, name
307
  else:
308
    raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
309
  return retval
310

    
311

    
312
def _ExtendTags(opts, args):
313
  """Extend the args if a source file has been given.
314

315
  This function will extend the tags with the contents of the file
316
  passed in the 'tags_source' attribute of the opts parameter. A file
317
  named '-' will be replaced by stdin.
318

319
  """
320
  fname = opts.tags_source
321
  if fname is None:
322
    return
323
  if fname == "-":
324
    new_fh = sys.stdin
325
  else:
326
    new_fh = open(fname, "r")
327
  new_data = []
328
  try:
329
    # we don't use the nice 'new_data = [line.strip() for line in fh]'
330
    # because of python bug 1633941
331
    while True:
332
      line = new_fh.readline()
333
      if not line:
334
        break
335
      new_data.append(line.strip())
336
  finally:
337
    new_fh.close()
338
  args.extend(new_data)
339

    
340

    
341
def ListTags(opts, args):
342
  """List the tags on a given object.
343

344
  This is a generic implementation that knows how to deal with all
345
  three cases of tag objects (cluster, node, instance). The opts
346
  argument is expected to contain a tag_type field denoting what
347
  object type we work on.
348

349
  """
350
  kind, name = _ExtractTagsObject(opts, args)
351
  cl = GetClient()
352
  result = cl.QueryTags(kind, name)
353
  result = list(result)
354
  result.sort()
355
  for tag in result:
356
    ToStdout(tag)
357

    
358

    
359
def AddTags(opts, args):
360
  """Add tags on a given object.
361

362
  This is a generic implementation that knows how to deal with all
363
  three cases of tag objects (cluster, node, instance). The opts
364
  argument is expected to contain a tag_type field denoting what
365
  object type we work on.
366

367
  """
368
  kind, name = _ExtractTagsObject(opts, args)
369
  _ExtendTags(opts, args)
370
  if not args:
371
    raise errors.OpPrereqError("No tags to be added")
372
  op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
373
  SubmitOpCode(op)
374

    
375

    
376
def RemoveTags(opts, args):
377
  """Remove tags from a given object.
378

379
  This is a generic implementation that knows how to deal with all
380
  three cases of tag objects (cluster, node, instance). The opts
381
  argument is expected to contain a tag_type field denoting what
382
  object type we work on.
383

384
  """
385
  kind, name = _ExtractTagsObject(opts, args)
386
  _ExtendTags(opts, args)
387
  if not args:
388
    raise errors.OpPrereqError("No tags to be removed")
389
  op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
390
  SubmitOpCode(op)
391

    
392

    
393
def check_unit(option, opt, value): # pylint: disable-msg=W0613
394
  """OptParsers custom converter for units.
395

396
  """
397
  try:
398
    return utils.ParseUnit(value)
399
  except errors.UnitParseError, err:
400
    raise OptionValueError("option %s: %s" % (opt, err))
401

    
402

    
403
def _SplitKeyVal(opt, data):
404
  """Convert a KeyVal string into a dict.
405

406
  This function will convert a key=val[,...] string into a dict. Empty
407
  values will be converted specially: keys which have the prefix 'no_'
408
  will have the value=False and the prefix stripped, the others will
409
  have value=True.
410

411
  @type opt: string
412
  @param opt: a string holding the option name for which we process the
413
      data, used in building error messages
414
  @type data: string
415
  @param data: a string of the format key=val,key=val,...
416
  @rtype: dict
417
  @return: {key=val, key=val}
418
  @raises errors.ParameterError: if there are duplicate keys
419

420
  """
421
  kv_dict = {}
422
  if data:
423
    for elem in utils.UnescapeAndSplit(data, sep=","):
424
      if "=" in elem:
425
        key, val = elem.split("=", 1)
426
      else:
427
        if elem.startswith(NO_PREFIX):
428
          key, val = elem[len(NO_PREFIX):], False
429
        elif elem.startswith(UN_PREFIX):
430
          key, val = elem[len(UN_PREFIX):], None
431
        else:
432
          key, val = elem, True
433
      if key in kv_dict:
434
        raise errors.ParameterError("Duplicate key '%s' in option %s" %
435
                                    (key, opt))
436
      kv_dict[key] = val
437
  return kv_dict
438

    
439

    
440
def check_ident_key_val(option, opt, value):  # pylint: disable-msg=W0613
441
  """Custom parser for ident:key=val,key=val options.
442

443
  This will store the parsed values as a tuple (ident, {key: val}). As such,
444
  multiple uses of this option via action=append is possible.
445

446
  """
447
  if ":" not in value:
448
    ident, rest = value, ''
449
  else:
450
    ident, rest = value.split(":", 1)
451

    
452
  if ident.startswith(NO_PREFIX):
453
    if rest:
454
      msg = "Cannot pass options when removing parameter groups: %s" % value
455
      raise errors.ParameterError(msg)
456
    retval = (ident[len(NO_PREFIX):], False)
457
  elif ident.startswith(UN_PREFIX):
458
    if rest:
459
      msg = "Cannot pass options when removing parameter groups: %s" % value
460
      raise errors.ParameterError(msg)
461
    retval = (ident[len(UN_PREFIX):], None)
462
  else:
463
    kv_dict = _SplitKeyVal(opt, rest)
464
    retval = (ident, kv_dict)
465
  return retval
466

    
467

    
468
def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
469
  """Custom parser class for key=val,key=val options.
470

471
  This will store the parsed values as a dict {key: val}.
472

473
  """
474
  return _SplitKeyVal(opt, value)
475

    
476

    
477
def check_bool(option, opt, value): # pylint: disable-msg=W0613
478
  """Custom parser for yes/no options.
479

480
  This will store the parsed value as either True or False.
481

482
  """
483
  value = value.lower()
484
  if value == constants.VALUE_FALSE or value == "no":
485
    return False
486
  elif value == constants.VALUE_TRUE or value == "yes":
487
    return True
488
  else:
489
    raise errors.ParameterError("Invalid boolean value '%s'" % value)
490

    
491

    
492
# completion_suggestion is normally a list. Using numeric values not evaluating
493
# to False for dynamic completion.
494
(OPT_COMPL_MANY_NODES,
495
 OPT_COMPL_ONE_NODE,
496
 OPT_COMPL_ONE_INSTANCE,
497
 OPT_COMPL_ONE_OS,
498
 OPT_COMPL_ONE_IALLOCATOR,
499
 OPT_COMPL_INST_ADD_NODES) = range(100, 106)
500

    
501
OPT_COMPL_ALL = frozenset([
502
  OPT_COMPL_MANY_NODES,
503
  OPT_COMPL_ONE_NODE,
504
  OPT_COMPL_ONE_INSTANCE,
505
  OPT_COMPL_ONE_OS,
506
  OPT_COMPL_ONE_IALLOCATOR,
507
  OPT_COMPL_INST_ADD_NODES,
508
  ])
509

    
510

    
511
class CliOption(Option):
512
  """Custom option class for optparse.
513

514
  """
515
  ATTRS = Option.ATTRS + [
516
    "completion_suggest",
517
    ]
518
  TYPES = Option.TYPES + (
519
    "identkeyval",
520
    "keyval",
521
    "unit",
522
    "bool",
523
    )
524
  TYPE_CHECKER = Option.TYPE_CHECKER.copy()
525
  TYPE_CHECKER["identkeyval"] = check_ident_key_val
526
  TYPE_CHECKER["keyval"] = check_key_val
527
  TYPE_CHECKER["unit"] = check_unit
528
  TYPE_CHECKER["bool"] = check_bool
529

    
530

    
531
# optparse.py sets make_option, so we do it for our own option class, too
532
cli_option = CliOption
533

    
534

    
535
_YORNO = "yes|no"
536

    
537
DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
538
                       help="Increase debugging level")
539

    
540
NOHDR_OPT = cli_option("--no-headers", default=False,
541
                       action="store_true", dest="no_headers",
542
                       help="Don't display column headers")
543

    
544
SEP_OPT = cli_option("--separator", default=None,
545
                     action="store", dest="separator",
546
                     help=("Separator between output fields"
547
                           " (defaults to one space)"))
548

    
549
USEUNITS_OPT = cli_option("--units", default=None,
550
                          dest="units", choices=('h', 'm', 'g', 't'),
551
                          help="Specify units for output (one of hmgt)")
552

    
553
FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
554
                        type="string", metavar="FIELDS",
555
                        help="Comma separated list of output fields")
556

    
557
FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
558
                       default=False, help="Force the operation")
559

    
560
CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
561
                         default=False, help="Do not require confirmation")
562

    
563
TAG_SRC_OPT = cli_option("--from", dest="tags_source",
564
                         default=None, help="File with tag names")
565

    
566
SUBMIT_OPT = cli_option("--submit", dest="submit_only",
567
                        default=False, action="store_true",
568
                        help=("Submit the job and return the job ID, but"
569
                              " don't wait for the job to finish"))
570

    
571
SYNC_OPT = cli_option("--sync", dest="do_locking",
572
                      default=False, action="store_true",
573
                      help=("Grab locks while doing the queries"
574
                            " in order to ensure more consistent results"))
575

    
576
_DRY_RUN_OPT = cli_option("--dry-run", default=False,
577
                          action="store_true",
578
                          help=("Do not execute the operation, just run the"
579
                                " check steps and verify it it could be"
580
                                " executed"))
581

    
582
VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
583
                         action="store_true",
584
                         help="Increase the verbosity of the operation")
585

    
586
DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
587
                              action="store_true", dest="simulate_errors",
588
                              help="Debugging option that makes the operation"
589
                              " treat most runtime checks as failed")
590

    
591
NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
592
                        default=True, action="store_false",
593
                        help="Don't wait for sync (DANGEROUS!)")
594

    
595
DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
596
                               help="Custom disk setup (diskless, file,"
597
                               " plain or drbd)",
598
                               default=None, metavar="TEMPL",
599
                               choices=list(constants.DISK_TEMPLATES))
600

    
601
NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
602
                        help="Do not create any network cards for"
603
                        " the instance")
604

    
605
FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
606
                               help="Relative path under default cluster-wide"
607
                               " file storage dir to store file-based disks",
608
                               default=None, metavar="<DIR>")
609

    
610
FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
611
                                  help="Driver to use for image files",
612
                                  default="loop", metavar="<DRIVER>",
613
                                  choices=list(constants.FILE_DRIVER))
614

    
615
IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
616
                            help="Select nodes for the instance automatically"
617
                            " using the <NAME> iallocator plugin",
618
                            default=None, type="string",
619
                            completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
620

    
621
DEFAULT_IALLOCATOR_OPT = cli_option("-I", "--default-iallocator",
622
                            metavar="<NAME>",
623
                            help="Set the default instance allocator plugin",
624
                            default=None, type="string",
625
                            completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
626

    
627
OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
628
                    metavar="<os>",
629
                    completion_suggest=OPT_COMPL_ONE_OS)
630

    
631
OSPARAMS_OPT = cli_option("-O", "--os-parameters", dest="osparams",
632
                         type="keyval", default={},
633
                         help="OS parameters")
634

    
635
FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
636
                               action="store_true", default=False,
637
                               help="Force an unknown variant")
638

    
639
NO_INSTALL_OPT = cli_option("--no-install", dest="no_install",
640
                            action="store_true", default=False,
641
                            help="Do not install the OS (will"
642
                            " enable no-start)")
643

    
644
BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
645
                         type="keyval", default={},
646
                         help="Backend parameters")
647

    
648
HVOPTS_OPT =  cli_option("-H", "--hypervisor-parameters", type="keyval",
649
                         default={}, dest="hvparams",
650
                         help="Hypervisor parameters")
651

    
652
HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
653
                            help="Hypervisor and hypervisor options, in the"
654
                            " format hypervisor:option=value,option=value,...",
655
                            default=None, type="identkeyval")
656

    
657
HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
658
                        help="Hypervisor and hypervisor options, in the"
659
                        " format hypervisor:option=value,option=value,...",
660
                        default=[], action="append", type="identkeyval")
661

    
662
NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
663
                           action="store_false",
664
                           help="Don't check that the instance's IP"
665
                           " is alive")
666

    
667
NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
668
                             default=True, action="store_false",
669
                             help="Don't check that the instance's name"
670
                             " is resolvable")
671

    
672
NET_OPT = cli_option("--net",
673
                     help="NIC parameters", default=[],
674
                     dest="nics", action="append", type="identkeyval")
675

    
676
DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
677
                      dest="disks", action="append", type="identkeyval")
678

    
679
DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
680
                         help="Comma-separated list of disks"
681
                         " indices to act on (e.g. 0,2) (optional,"
682
                         " defaults to all disks)")
683

    
684
OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
685
                         help="Enforces a single-disk configuration using the"
686
                         " given disk size, in MiB unless a suffix is used",
687
                         default=None, type="unit", metavar="<size>")
688

    
689
IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
690
                                dest="ignore_consistency",
691
                                action="store_true", default=False,
692
                                help="Ignore the consistency of the disks on"
693
                                " the secondary")
694

    
695
NONLIVE_OPT = cli_option("--non-live", dest="live",
696
                         default=True, action="store_false",
697
                         help="Do a non-live migration (this usually means"
698
                         " freeze the instance, save the state, transfer and"
699
                         " only then resume running on the secondary node)")
700

    
701
NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
702
                                help="Target node and optional secondary node",
703
                                metavar="<pnode>[:<snode>]",
704
                                completion_suggest=OPT_COMPL_INST_ADD_NODES)
705

    
706
NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
707
                           action="append", metavar="<node>",
708
                           help="Use only this node (can be used multiple"
709
                           " times, if not given defaults to all nodes)",
710
                           completion_suggest=OPT_COMPL_ONE_NODE)
711

    
712
SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
713
                             metavar="<node>",
714
                             completion_suggest=OPT_COMPL_ONE_NODE)
715

    
716
NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
717
                         action="store_false",
718
                         help="Don't start the instance after creation")
719

    
720
SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
721
                         action="store_true", default=False,
722
                         help="Show command instead of executing it")
723

    
724
CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
725
                         default=False, action="store_true",
726
                         help="Instead of performing the migration, try to"
727
                         " recover from a failed cleanup. This is safe"
728
                         " to run even if the instance is healthy, but it"
729
                         " will create extra replication traffic and "
730
                         " disrupt briefly the replication (like during the"
731
                         " migration")
732

    
733
STATIC_OPT = cli_option("-s", "--static", dest="static",
734
                        action="store_true", default=False,
735
                        help="Only show configuration data, not runtime data")
736

    
737
ALL_OPT = cli_option("--all", dest="show_all",
738
                     default=False, action="store_true",
739
                     help="Show info on all instances on the cluster."
740
                     " This can take a long time to run, use wisely")
741

    
742
SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
743
                           action="store_true", default=False,
744
                           help="Interactive OS reinstall, lists available"
745
                           " OS templates for selection")
746

    
747
IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
748
                                 action="store_true", default=False,
749
                                 help="Remove the instance from the cluster"
750
                                 " configuration even if there are failures"
751
                                 " during the removal process")
752

    
753
IGNORE_REMOVE_FAILURES_OPT = cli_option("--ignore-remove-failures",
754
                                        dest="ignore_remove_failures",
755
                                        action="store_true", default=False,
756
                                        help="Remove the instance from the"
757
                                        " cluster configuration even if there"
758
                                        " are failures during the removal"
759
                                        " process")
760

    
761
REMOVE_INSTANCE_OPT = cli_option("--remove-instance", dest="remove_instance",
762
                                 action="store_true", default=False,
763
                                 help="Remove the instance from the cluster")
764

    
765
NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
766
                               help="Specifies the new secondary node",
767
                               metavar="NODE", default=None,
768
                               completion_suggest=OPT_COMPL_ONE_NODE)
769

    
770
ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
771
                            default=False, action="store_true",
772
                            help="Replace the disk(s) on the primary"
773
                            " node (only for the drbd template)")
774

    
775
ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
776
                              default=False, action="store_true",
777
                              help="Replace the disk(s) on the secondary"
778
                              " node (only for the drbd template)")
779

    
780
AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
781
                              default=False, action="store_true",
782
                              help="Lock all nodes and auto-promote as needed"
783
                              " to MC status")
784

    
785
AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
786
                              default=False, action="store_true",
787
                              help="Automatically replace faulty disks"
788
                              " (only for the drbd template)")
789

    
790
IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
791
                             default=False, action="store_true",
792
                             help="Ignore current recorded size"
793
                             " (useful for forcing activation when"
794
                             " the recorded size is wrong)")
795

    
796
SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
797
                          metavar="<node>",
798
                          completion_suggest=OPT_COMPL_ONE_NODE)
799

    
800
SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
801
                         metavar="<dir>")
802

    
803
SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
804
                              help="Specify the secondary ip for the node",
805
                              metavar="ADDRESS", default=None)
806

    
807
READD_OPT = cli_option("--readd", dest="readd",
808
                       default=False, action="store_true",
809
                       help="Readd old node after replacing it")
810

    
811
NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
812
                                default=True, action="store_false",
813
                                help="Disable SSH key fingerprint checking")
814

    
815

    
816
MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
817
                    type="bool", default=None, metavar=_YORNO,
818
                    help="Set the master_candidate flag on the node")
819

    
820
OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
821
                         type="bool", default=None,
822
                         help="Set the offline flag on the node")
823

    
824
DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
825
                         type="bool", default=None,
826
                         help="Set the drained flag on the node")
827

    
828
ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
829
                             type="bool", default=None, metavar=_YORNO,
830
                             help="Set the allocatable flag on a volume")
831

    
832
NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
833
                               help="Disable support for lvm based instances"
834
                               " (cluster-wide)",
835
                               action="store_false", default=True)
836

    
837
ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
838
                            dest="enabled_hypervisors",
839
                            help="Comma-separated list of hypervisors",
840
                            type="string", default=None)
841

    
842
NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
843
                            type="keyval", default={},
844
                            help="NIC parameters")
845

    
846
CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
847
                         dest="candidate_pool_size", type="int",
848
                         help="Set the candidate pool size")
849

    
850
VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name",
851
                         help="Enables LVM and specifies the volume group"
852
                         " name (cluster-wide) for disk allocation [xenvg]",
853
                         metavar="VG", default=None)
854

    
855
YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
856
                          help="Destroy cluster", action="store_true")
857

    
858
NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
859
                          help="Skip node agreement check (dangerous)",
860
                          action="store_true", default=False)
861

    
862
MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
863
                            help="Specify the mac prefix for the instance IP"
864
                            " addresses, in the format XX:XX:XX",
865
                            metavar="PREFIX",
866
                            default=None)
867

    
868
MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
869
                               help="Specify the node interface (cluster-wide)"
870
                               " on which the master IP address will be added "
871
                               " [%s]" % constants.DEFAULT_BRIDGE,
872
                               metavar="NETDEV",
873
                               default=constants.DEFAULT_BRIDGE)
874

    
875
GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
876
                                help="Specify the default directory (cluster-"
877
                                "wide) for storing the file-based disks [%s]" %
878
                                constants.DEFAULT_FILE_STORAGE_DIR,
879
                                metavar="DIR",
880
                                default=constants.DEFAULT_FILE_STORAGE_DIR)
881

    
882
NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
883
                                   help="Don't modify /etc/hosts",
884
                                   action="store_false", default=True)
885

    
886
NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
887
                                    help="Don't initialize SSH keys",
888
                                    action="store_false", default=True)
889

    
890
ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
891
                             help="Enable parseable error messages",
892
                             action="store_true", default=False)
893

    
894
NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
895
                          help="Skip N+1 memory redundancy tests",
896
                          action="store_true", default=False)
897

    
898
REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
899
                             help="Type of reboot: soft/hard/full",
900
                             default=constants.INSTANCE_REBOOT_HARD,
901
                             metavar="<REBOOT>",
902
                             choices=list(constants.REBOOT_TYPES))
903

    
904
IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
905
                                    dest="ignore_secondaries",
906
                                    default=False, action="store_true",
907
                                    help="Ignore errors from secondaries")
908

    
909
NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
910
                            action="store_false", default=True,
911
                            help="Don't shutdown the instance (unsafe)")
912

    
913
TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
914
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
915
                         help="Maximum time to wait")
916

    
917
SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
918
                         dest="shutdown_timeout", type="int",
919
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
920
                         help="Maximum time to wait for instance shutdown")
921

    
922
EARLY_RELEASE_OPT = cli_option("--early-release",
923
                               dest="early_release", default=False,
924
                               action="store_true",
925
                               help="Release the locks on the secondary"
926
                               " node(s) early")
927

    
928
NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
929
                                  dest="new_cluster_cert",
930
                                  default=False, action="store_true",
931
                                  help="Generate a new cluster certificate")
932

    
933
RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
934
                           default=None,
935
                           help="File containing new RAPI certificate")
936

    
937
NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
938
                               default=None, action="store_true",
939
                               help=("Generate a new self-signed RAPI"
940
                                     " certificate"))
941

    
942
NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
943
                                    dest="new_confd_hmac_key",
944
                                    default=False, action="store_true",
945
                                    help=("Create a new HMAC key for %s" %
946
                                          constants.CONFD))
947

    
948
CLUSTER_DOMAIN_SECRET_OPT = cli_option("--cluster-domain-secret",
949
                                       dest="cluster_domain_secret",
950
                                       default=None,
951
                                       help=("Load new new cluster domain"
952
                                             " secret from file"))
953

    
954
NEW_CLUSTER_DOMAIN_SECRET_OPT = cli_option("--new-cluster-domain-secret",
955
                                           dest="new_cluster_domain_secret",
956
                                           default=False, action="store_true",
957
                                           help=("Create a new cluster domain"
958
                                                 " secret"))
959

    
960
USE_REPL_NET_OPT = cli_option("--use-replication-network",
961
                              dest="use_replication_network",
962
                              help="Whether to use the replication network"
963
                              " for talking to the nodes",
964
                              action="store_true", default=False)
965

    
966
MAINTAIN_NODE_HEALTH_OPT = \
967
    cli_option("--maintain-node-health", dest="maintain_node_health",
968
               metavar=_YORNO, default=None, type="bool",
969
               help="Configure the cluster to automatically maintain node"
970
               " health, by shutting down unknown instances, shutting down"
971
               " unknown DRBD devices, etc.")
972

    
973
IDENTIFY_DEFAULTS_OPT = \
974
    cli_option("--identify-defaults", dest="identify_defaults",
975
               default=False, action="store_true",
976
               help="Identify which saved instance parameters are equal to"
977
               " the current cluster defaults and set them as such, instead"
978
               " of marking them as overridden")
979

    
980
UIDPOOL_OPT = cli_option("--uid-pool", default=None,
981
                         action="store", dest="uid_pool",
982
                         help=("A list of user-ids or user-id"
983
                               " ranges separated by commas"))
984

    
985
ADD_UIDS_OPT = cli_option("--add-uids", default=None,
986
                          action="store", dest="add_uids",
987
                          help=("A list of user-ids or user-id"
988
                                " ranges separated by commas, to be"
989
                                " added to the user-id pool"))
990

    
991
REMOVE_UIDS_OPT = cli_option("--remove-uids", default=None,
992
                             action="store", dest="remove_uids",
993
                             help=("A list of user-ids or user-id"
994
                                   " ranges separated by commas, to be"
995
                                   " removed from the user-id pool"))
996

    
997
ROMAN_OPT = cli_option("--roman",
998
                       dest="roman_integers", default=False,
999
                       action="store_true",
1000
                       help="Use roman numbers for positive integers")
1001

    
1002
DRBD_HELPER_OPT = cli_option("--drbd-usermode-helper", dest="drbd_helper",
1003
                             action="store", default=None,
1004
                             help="Specifies usermode helper for DRBD")
1005

    
1006
NODRBD_STORAGE_OPT = cli_option("--no-drbd-storage", dest="drbd_storage",
1007
                                action="store_false", default=True,
1008
                                help="Disable support for DRBD")
1009

    
1010

    
1011
def _ParseArgs(argv, commands, aliases):
1012
  """Parser for the command line arguments.
1013

1014
  This function parses the arguments and returns the function which
1015
  must be executed together with its (modified) arguments.
1016

1017
  @param argv: the command line
1018
  @param commands: dictionary with special contents, see the design
1019
      doc for cmdline handling
1020
  @param aliases: dictionary with command aliases {'alias': 'target, ...}
1021

1022
  """
1023
  if len(argv) == 0:
1024
    binary = "<command>"
1025
  else:
1026
    binary = argv[0].split("/")[-1]
1027

    
1028
  if len(argv) > 1 and argv[1] == "--version":
1029
    ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
1030
    # Quit right away. That way we don't have to care about this special
1031
    # argument. optparse.py does it the same.
1032
    sys.exit(0)
1033

    
1034
  if len(argv) < 2 or not (argv[1] in commands or
1035
                           argv[1] in aliases):
1036
    # let's do a nice thing
1037
    sortedcmds = commands.keys()
1038
    sortedcmds.sort()
1039

    
1040
    ToStdout("Usage: %s {command} [options...] [argument...]", binary)
1041
    ToStdout("%s <command> --help to see details, or man %s", binary, binary)
1042
    ToStdout("")
1043

    
1044
    # compute the max line length for cmd + usage
1045
    mlen = max([len(" %s" % cmd) for cmd in commands])
1046
    mlen = min(60, mlen) # should not get here...
1047

    
1048
    # and format a nice command list
1049
    ToStdout("Commands:")
1050
    for cmd in sortedcmds:
1051
      cmdstr = " %s" % (cmd,)
1052
      help_text = commands[cmd][4]
1053
      help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
1054
      ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
1055
      for line in help_lines:
1056
        ToStdout("%-*s   %s", mlen, "", line)
1057

    
1058
    ToStdout("")
1059

    
1060
    return None, None, None
1061

    
1062
  # get command, unalias it, and look it up in commands
1063
  cmd = argv.pop(1)
1064
  if cmd in aliases:
1065
    if cmd in commands:
1066
      raise errors.ProgrammerError("Alias '%s' overrides an existing"
1067
                                   " command" % cmd)
1068

    
1069
    if aliases[cmd] not in commands:
1070
      raise errors.ProgrammerError("Alias '%s' maps to non-existing"
1071
                                   " command '%s'" % (cmd, aliases[cmd]))
1072

    
1073
    cmd = aliases[cmd]
1074

    
1075
  func, args_def, parser_opts, usage, description = commands[cmd]
1076
  parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT, DEBUG_OPT],
1077
                        description=description,
1078
                        formatter=TitledHelpFormatter(),
1079
                        usage="%%prog %s %s" % (cmd, usage))
1080
  parser.disable_interspersed_args()
1081
  options, args = parser.parse_args()
1082

    
1083
  if not _CheckArguments(cmd, args_def, args):
1084
    return None, None, None
1085

    
1086
  return func, options, args
1087

    
1088

    
1089
def _CheckArguments(cmd, args_def, args):
1090
  """Verifies the arguments using the argument definition.
1091

1092
  Algorithm:
1093

1094
    1. Abort with error if values specified by user but none expected.
1095

1096
    1. For each argument in definition
1097

1098
      1. Keep running count of minimum number of values (min_count)
1099
      1. Keep running count of maximum number of values (max_count)
1100
      1. If it has an unlimited number of values
1101

1102
        1. Abort with error if it's not the last argument in the definition
1103

1104
    1. If last argument has limited number of values
1105

1106
      1. Abort with error if number of values doesn't match or is too large
1107

1108
    1. Abort with error if user didn't pass enough values (min_count)
1109

1110
  """
1111
  if args and not args_def:
1112
    ToStderr("Error: Command %s expects no arguments", cmd)
1113
    return False
1114

    
1115
  min_count = None
1116
  max_count = None
1117
  check_max = None
1118

    
1119
  last_idx = len(args_def) - 1
1120

    
1121
  for idx, arg in enumerate(args_def):
1122
    if min_count is None:
1123
      min_count = arg.min
1124
    elif arg.min is not None:
1125
      min_count += arg.min
1126

    
1127
    if max_count is None:
1128
      max_count = arg.max
1129
    elif arg.max is not None:
1130
      max_count += arg.max
1131

    
1132
    if idx == last_idx:
1133
      check_max = (arg.max is not None)
1134

    
1135
    elif arg.max is None:
1136
      raise errors.ProgrammerError("Only the last argument can have max=None")
1137

    
1138
  if check_max:
1139
    # Command with exact number of arguments
1140
    if (min_count is not None and max_count is not None and
1141
        min_count == max_count and len(args) != min_count):
1142
      ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
1143
      return False
1144

    
1145
    # Command with limited number of arguments
1146
    if max_count is not None and len(args) > max_count:
1147
      ToStderr("Error: Command %s expects only %d argument(s)",
1148
               cmd, max_count)
1149
      return False
1150

    
1151
  # Command with some required arguments
1152
  if min_count is not None and len(args) < min_count:
1153
    ToStderr("Error: Command %s expects at least %d argument(s)",
1154
             cmd, min_count)
1155
    return False
1156

    
1157
  return True
1158

    
1159

    
1160
def SplitNodeOption(value):
1161
  """Splits the value of a --node option.
1162

1163
  """
1164
  if value and ':' in value:
1165
    return value.split(':', 1)
1166
  else:
1167
    return (value, None)
1168

    
1169

    
1170
def CalculateOSNames(os_name, os_variants):
1171
  """Calculates all the names an OS can be called, according to its variants.
1172

1173
  @type os_name: string
1174
  @param os_name: base name of the os
1175
  @type os_variants: list or None
1176
  @param os_variants: list of supported variants
1177
  @rtype: list
1178
  @return: list of valid names
1179

1180
  """
1181
  if os_variants:
1182
    return ['%s+%s' % (os_name, v) for v in os_variants]
1183
  else:
1184
    return [os_name]
1185

    
1186

    
1187
def UsesRPC(fn):
1188
  def wrapper(*args, **kwargs):
1189
    rpc.Init()
1190
    try:
1191
      return fn(*args, **kwargs)
1192
    finally:
1193
      rpc.Shutdown()
1194
  return wrapper
1195

    
1196

    
1197
def AskUser(text, choices=None):
1198
  """Ask the user a question.
1199

1200
  @param text: the question to ask
1201

1202
  @param choices: list with elements tuples (input_char, return_value,
1203
      description); if not given, it will default to: [('y', True,
1204
      'Perform the operation'), ('n', False, 'Do no do the operation')];
1205
      note that the '?' char is reserved for help
1206

1207
  @return: one of the return values from the choices list; if input is
1208
      not possible (i.e. not running with a tty, we return the last
1209
      entry from the list
1210

1211
  """
1212
  if choices is None:
1213
    choices = [('y', True, 'Perform the operation'),
1214
               ('n', False, 'Do not perform the operation')]
1215
  if not choices or not isinstance(choices, list):
1216
    raise errors.ProgrammerError("Invalid choices argument to AskUser")
1217
  for entry in choices:
1218
    if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
1219
      raise errors.ProgrammerError("Invalid choices element to AskUser")
1220

    
1221
  answer = choices[-1][1]
1222
  new_text = []
1223
  for line in text.splitlines():
1224
    new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
1225
  text = "\n".join(new_text)
1226
  try:
1227
    f = file("/dev/tty", "a+")
1228
  except IOError:
1229
    return answer
1230
  try:
1231
    chars = [entry[0] for entry in choices]
1232
    chars[-1] = "[%s]" % chars[-1]
1233
    chars.append('?')
1234
    maps = dict([(entry[0], entry[1]) for entry in choices])
1235
    while True:
1236
      f.write(text)
1237
      f.write('\n')
1238
      f.write("/".join(chars))
1239
      f.write(": ")
1240
      line = f.readline(2).strip().lower()
1241
      if line in maps:
1242
        answer = maps[line]
1243
        break
1244
      elif line == '?':
1245
        for entry in choices:
1246
          f.write(" %s - %s\n" % (entry[0], entry[2]))
1247
        f.write("\n")
1248
        continue
1249
  finally:
1250
    f.close()
1251
  return answer
1252

    
1253

    
1254
class JobSubmittedException(Exception):
1255
  """Job was submitted, client should exit.
1256

1257
  This exception has one argument, the ID of the job that was
1258
  submitted. The handler should print this ID.
1259

1260
  This is not an error, just a structured way to exit from clients.
1261

1262
  """
1263

    
1264

    
1265
def SendJob(ops, cl=None):
1266
  """Function to submit an opcode without waiting for the results.
1267

1268
  @type ops: list
1269
  @param ops: list of opcodes
1270
  @type cl: luxi.Client
1271
  @param cl: the luxi client to use for communicating with the master;
1272
             if None, a new client will be created
1273

1274
  """
1275
  if cl is None:
1276
    cl = GetClient()
1277

    
1278
  job_id = cl.SubmitJob(ops)
1279

    
1280
  return job_id
1281

    
1282

    
1283
def GenericPollJob(job_id, cbs, report_cbs):
1284
  """Generic job-polling function.
1285

1286
  @type job_id: number
1287
  @param job_id: Job ID
1288
  @type cbs: Instance of L{JobPollCbBase}
1289
  @param cbs: Data callbacks
1290
  @type report_cbs: Instance of L{JobPollReportCbBase}
1291
  @param report_cbs: Reporting callbacks
1292

1293
  """
1294
  prev_job_info = None
1295
  prev_logmsg_serial = None
1296

    
1297
  status = None
1298

    
1299
  while True:
1300
    result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1301
                                      prev_logmsg_serial)
1302
    if not result:
1303
      # job not found, go away!
1304
      raise errors.JobLost("Job with id %s lost" % job_id)
1305

    
1306
    if result == constants.JOB_NOTCHANGED:
1307
      report_cbs.ReportNotChanged(job_id, status)
1308

    
1309
      # Wait again
1310
      continue
1311

    
1312
    # Split result, a tuple of (field values, log entries)
1313
    (job_info, log_entries) = result
1314
    (status, ) = job_info
1315

    
1316
    if log_entries:
1317
      for log_entry in log_entries:
1318
        (serial, timestamp, log_type, message) = log_entry
1319
        report_cbs.ReportLogMessage(job_id, serial, timestamp,
1320
                                    log_type, message)
1321
        prev_logmsg_serial = max(prev_logmsg_serial, serial)
1322

    
1323
    # TODO: Handle canceled and archived jobs
1324
    elif status in (constants.JOB_STATUS_SUCCESS,
1325
                    constants.JOB_STATUS_ERROR,
1326
                    constants.JOB_STATUS_CANCELING,
1327
                    constants.JOB_STATUS_CANCELED):
1328
      break
1329

    
1330
    prev_job_info = job_info
1331

    
1332
  jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1333
  if not jobs:
1334
    raise errors.JobLost("Job with id %s lost" % job_id)
1335

    
1336
  status, opstatus, result = jobs[0]
1337

    
1338
  if status == constants.JOB_STATUS_SUCCESS:
1339
    return result
1340

    
1341
  if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
1342
    raise errors.OpExecError("Job was canceled")
1343

    
1344
  has_ok = False
1345
  for idx, (status, msg) in enumerate(zip(opstatus, result)):
1346
    if status == constants.OP_STATUS_SUCCESS:
1347
      has_ok = True
1348
    elif status == constants.OP_STATUS_ERROR:
1349
      errors.MaybeRaise(msg)
1350

    
1351
      if has_ok:
1352
        raise errors.OpExecError("partial failure (opcode %d): %s" %
1353
                                 (idx, msg))
1354

    
1355
      raise errors.OpExecError(str(msg))
1356

    
1357
  # default failure mode
1358
  raise errors.OpExecError(result)
1359

    
1360

    
1361
class JobPollCbBase:
1362
  """Base class for L{GenericPollJob} callbacks.
1363

1364
  """
1365
  def __init__(self):
1366
    """Initializes this class.
1367

1368
    """
1369

    
1370
  def WaitForJobChangeOnce(self, job_id, fields,
1371
                           prev_job_info, prev_log_serial):
1372
    """Waits for changes on a job.
1373

1374
    """
1375
    raise NotImplementedError()
1376

    
1377
  def QueryJobs(self, job_ids, fields):
1378
    """Returns the selected fields for the selected job IDs.
1379

1380
    @type job_ids: list of numbers
1381
    @param job_ids: Job IDs
1382
    @type fields: list of strings
1383
    @param fields: Fields
1384

1385
    """
1386
    raise NotImplementedError()
1387

    
1388

    
1389
class JobPollReportCbBase:
1390
  """Base class for L{GenericPollJob} reporting callbacks.
1391

1392
  """
1393
  def __init__(self):
1394
    """Initializes this class.
1395

1396
    """
1397

    
1398
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1399
    """Handles a log message.
1400

1401
    """
1402
    raise NotImplementedError()
1403

    
1404
  def ReportNotChanged(self, job_id, status):
1405
    """Called for if a job hasn't changed in a while.
1406

1407
    @type job_id: number
1408
    @param job_id: Job ID
1409
    @type status: string or None
1410
    @param status: Job status if available
1411

1412
    """
1413
    raise NotImplementedError()
1414

    
1415

    
1416
class _LuxiJobPollCb(JobPollCbBase):
1417
  def __init__(self, cl):
1418
    """Initializes this class.
1419

1420
    """
1421
    JobPollCbBase.__init__(self)
1422
    self.cl = cl
1423

    
1424
  def WaitForJobChangeOnce(self, job_id, fields,
1425
                           prev_job_info, prev_log_serial):
1426
    """Waits for changes on a job.
1427

1428
    """
1429
    return self.cl.WaitForJobChangeOnce(job_id, fields,
1430
                                        prev_job_info, prev_log_serial)
1431

    
1432
  def QueryJobs(self, job_ids, fields):
1433
    """Returns the selected fields for the selected job IDs.
1434

1435
    """
1436
    return self.cl.QueryJobs(job_ids, fields)
1437

    
1438

    
1439
class FeedbackFnJobPollReportCb(JobPollReportCbBase):
1440
  def __init__(self, feedback_fn):
1441
    """Initializes this class.
1442

1443
    """
1444
    JobPollReportCbBase.__init__(self)
1445

    
1446
    self.feedback_fn = feedback_fn
1447

    
1448
    assert callable(feedback_fn)
1449

    
1450
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1451
    """Handles a log message.
1452

1453
    """
1454
    self.feedback_fn((timestamp, log_type, log_msg))
1455

    
1456
  def ReportNotChanged(self, job_id, status):
1457
    """Called if a job hasn't changed in a while.
1458

1459
    """
1460
    # Ignore
1461

    
1462

    
1463
class StdioJobPollReportCb(JobPollReportCbBase):
1464
  def __init__(self):
1465
    """Initializes this class.
1466

1467
    """
1468
    JobPollReportCbBase.__init__(self)
1469

    
1470
    self.notified_queued = False
1471
    self.notified_waitlock = False
1472

    
1473
  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1474
    """Handles a log message.
1475

1476
    """
1477
    ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
1478
             FormatLogMessage(log_type, log_msg))
1479

    
1480
  def ReportNotChanged(self, job_id, status):
1481
    """Called if a job hasn't changed in a while.
1482

1483
    """
1484
    if status is None:
1485
      return
1486

    
1487
    if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
1488
      ToStderr("Job %s is waiting in queue", job_id)
1489
      self.notified_queued = True
1490

    
1491
    elif status == constants.JOB_STATUS_WAITLOCK and not self.notified_waitlock:
1492
      ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1493
      self.notified_waitlock = True
1494

    
1495

    
1496
def FormatLogMessage(log_type, log_msg):
1497
  """Formats a job message according to its type.
1498

1499
  """
1500
  if log_type != constants.ELOG_MESSAGE:
1501
    log_msg = str(log_msg)
1502

    
1503
  return utils.SafeEncode(log_msg)
1504

    
1505

    
1506
def PollJob(job_id, cl=None, feedback_fn=None, reporter=None):
1507
  """Function to poll for the result of a job.
1508

1509
  @type job_id: job identified
1510
  @param job_id: the job to poll for results
1511
  @type cl: luxi.Client
1512
  @param cl: the luxi client to use for communicating with the master;
1513
             if None, a new client will be created
1514

1515
  """
1516
  if cl is None:
1517
    cl = GetClient()
1518

    
1519
  if reporter is None:
1520
    if feedback_fn:
1521
      reporter = FeedbackFnJobPollReportCb(feedback_fn)
1522
    else:
1523
      reporter = StdioJobPollReportCb()
1524
  elif feedback_fn:
1525
    raise errors.ProgrammerError("Can't specify reporter and feedback function")
1526

    
1527
  return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
1528

    
1529

    
1530
def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None, reporter=None):
1531
  """Legacy function to submit an opcode.
1532

1533
  This is just a simple wrapper over the construction of the processor
1534
  instance. It should be extended to better handle feedback and
1535
  interaction functions.
1536

1537
  """
1538
  if cl is None:
1539
    cl = GetClient()
1540

    
1541
  SetGenericOpcodeOpts([op], opts)
1542

    
1543
  job_id = SendJob([op], cl)
1544

    
1545
  op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn,
1546
                       reporter=reporter)
1547

    
1548
  return op_results[0]
1549

    
1550

    
1551
def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1552
  """Wrapper around SubmitOpCode or SendJob.
1553

1554
  This function will decide, based on the 'opts' parameter, whether to
1555
  submit and wait for the result of the opcode (and return it), or
1556
  whether to just send the job and print its identifier. It is used in
1557
  order to simplify the implementation of the '--submit' option.
1558

1559
  It will also process the opcodes if we're sending the via SendJob
1560
  (otherwise SubmitOpCode does it).
1561

1562
  """
1563
  if opts and opts.submit_only:
1564
    job = [op]
1565
    SetGenericOpcodeOpts(job, opts)
1566
    job_id = SendJob(job, cl=cl)
1567
    raise JobSubmittedException(job_id)
1568
  else:
1569
    return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1570

    
1571

    
1572
def SetGenericOpcodeOpts(opcode_list, options):
1573
  """Processor for generic options.
1574

1575
  This function updates the given opcodes based on generic command
1576
  line options (like debug, dry-run, etc.).
1577

1578
  @param opcode_list: list of opcodes
1579
  @param options: command line options or None
1580
  @return: None (in-place modification)
1581

1582
  """
1583
  if not options:
1584
    return
1585
  for op in opcode_list:
1586
    op.dry_run = options.dry_run
1587
    op.debug_level = options.debug
1588

    
1589

    
1590
def GetClient():
1591
  # TODO: Cache object?
1592
  try:
1593
    client = luxi.Client()
1594
  except luxi.NoMasterError:
1595
    ss = ssconf.SimpleStore()
1596

    
1597
    # Try to read ssconf file
1598
    try:
1599
      ss.GetMasterNode()
1600
    except errors.ConfigurationError:
1601
      raise errors.OpPrereqError("Cluster not initialized or this machine is"
1602
                                 " not part of a cluster")
1603

    
1604
    master, myself = ssconf.GetMasterAndMyself(ss=ss)
1605
    if master != myself:
1606
      raise errors.OpPrereqError("This is not the master node, please connect"
1607
                                 " to node '%s' and rerun the command" %
1608
                                 master)
1609
    raise
1610
  return client
1611

    
1612

    
1613
def FormatError(err):
1614
  """Return a formatted error message for a given error.
1615

1616
  This function takes an exception instance and returns a tuple
1617
  consisting of two values: first, the recommended exit code, and
1618
  second, a string describing the error message (not
1619
  newline-terminated).
1620

1621
  """
1622
  retcode = 1
1623
  obuf = StringIO()
1624
  msg = str(err)
1625
  if isinstance(err, errors.ConfigurationError):
1626
    txt = "Corrupt configuration file: %s" % msg
1627
    logging.error(txt)
1628
    obuf.write(txt + "\n")
1629
    obuf.write("Aborting.")
1630
    retcode = 2
1631
  elif isinstance(err, errors.HooksAbort):
1632
    obuf.write("Failure: hooks execution failed:\n")
1633
    for node, script, out in err.args[0]:
1634
      if out:
1635
        obuf.write("  node: %s, script: %s, output: %s\n" %
1636
                   (node, script, out))
1637
      else:
1638
        obuf.write("  node: %s, script: %s (no output)\n" %
1639
                   (node, script))
1640
  elif isinstance(err, errors.HooksFailure):
1641
    obuf.write("Failure: hooks general failure: %s" % msg)
1642
  elif isinstance(err, errors.ResolverError):
1643
    this_host = netutils.HostInfo.SysName()
1644
    if err.args[0] == this_host:
1645
      msg = "Failure: can't resolve my own hostname ('%s')"
1646
    else:
1647
      msg = "Failure: can't resolve hostname '%s'"
1648
    obuf.write(msg % err.args[0])
1649
  elif isinstance(err, errors.OpPrereqError):
1650
    if len(err.args) == 2:
1651
      obuf.write("Failure: prerequisites not met for this"
1652
               " operation:\nerror type: %s, error details:\n%s" %
1653
                 (err.args[1], err.args[0]))
1654
    else:
1655
      obuf.write("Failure: prerequisites not met for this"
1656
                 " operation:\n%s" % msg)
1657
  elif isinstance(err, errors.OpExecError):
1658
    obuf.write("Failure: command execution error:\n%s" % msg)
1659
  elif isinstance(err, errors.TagError):
1660
    obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1661
  elif isinstance(err, errors.JobQueueDrainError):
1662
    obuf.write("Failure: the job queue is marked for drain and doesn't"
1663
               " accept new requests\n")
1664
  elif isinstance(err, errors.JobQueueFull):
1665
    obuf.write("Failure: the job queue is full and doesn't accept new"
1666
               " job submissions until old jobs are archived\n")
1667
  elif isinstance(err, errors.TypeEnforcementError):
1668
    obuf.write("Parameter Error: %s" % msg)
1669
  elif isinstance(err, errors.ParameterError):
1670
    obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1671
  elif isinstance(err, luxi.NoMasterError):
1672
    obuf.write("Cannot communicate with the master daemon.\nIs it running"
1673
               " and listening for connections?")
1674
  elif isinstance(err, luxi.TimeoutError):
1675
    obuf.write("Timeout while talking to the master daemon. Error:\n"
1676
               "%s" % msg)
1677
  elif isinstance(err, luxi.ProtocolError):
1678
    obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1679
               "%s" % msg)
1680
  elif isinstance(err, errors.GenericError):
1681
    obuf.write("Unhandled Ganeti error: %s" % msg)
1682
  elif isinstance(err, JobSubmittedException):
1683
    obuf.write("JobID: %s\n" % err.args[0])
1684
    retcode = 0
1685
  else:
1686
    obuf.write("Unhandled exception: %s" % msg)
1687
  return retcode, obuf.getvalue().rstrip('\n')
1688

    
1689

    
1690
def GenericMain(commands, override=None, aliases=None):
1691
  """Generic main function for all the gnt-* commands.
1692

1693
  Arguments:
1694
    - commands: a dictionary with a special structure, see the design doc
1695
                for command line handling.
1696
    - override: if not None, we expect a dictionary with keys that will
1697
                override command line options; this can be used to pass
1698
                options from the scripts to generic functions
1699
    - aliases: dictionary with command aliases {'alias': 'target, ...}
1700

1701
  """
1702
  # save the program name and the entire command line for later logging
1703
  if sys.argv:
1704
    binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1705
    if len(sys.argv) >= 2:
1706
      binary += " " + sys.argv[1]
1707
      old_cmdline = " ".join(sys.argv[2:])
1708
    else:
1709
      old_cmdline = ""
1710
  else:
1711
    binary = "<unknown program>"
1712
    old_cmdline = ""
1713

    
1714
  if aliases is None:
1715
    aliases = {}
1716

    
1717
  try:
1718
    func, options, args = _ParseArgs(sys.argv, commands, aliases)
1719
  except errors.ParameterError, err:
1720
    result, err_msg = FormatError(err)
1721
    ToStderr(err_msg)
1722
    return 1
1723

    
1724
  if func is None: # parse error
1725
    return 1
1726

    
1727
  if override is not None:
1728
    for key, val in override.iteritems():
1729
      setattr(options, key, val)
1730

    
1731
  utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1732
                     stderr_logging=True, program=binary)
1733

    
1734
  if old_cmdline:
1735
    logging.info("run with arguments '%s'", old_cmdline)
1736
  else:
1737
    logging.info("run with no arguments")
1738

    
1739
  try:
1740
    result = func(options, args)
1741
  except (errors.GenericError, luxi.ProtocolError,
1742
          JobSubmittedException), err:
1743
    result, err_msg = FormatError(err)
1744
    logging.exception("Error during command processing")
1745
    ToStderr(err_msg)
1746

    
1747
  return result
1748

    
1749

    
1750
def GenericInstanceCreate(mode, opts, args):
1751
  """Add an instance to the cluster via either creation or import.
1752

1753
  @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1754
  @param opts: the command line options selected by the user
1755
  @type args: list
1756
  @param args: should contain only one element, the new instance name
1757
  @rtype: int
1758
  @return: the desired exit code
1759

1760
  """
1761
  instance = args[0]
1762

    
1763
  (pnode, snode) = SplitNodeOption(opts.node)
1764

    
1765
  hypervisor = None
1766
  hvparams = {}
1767
  if opts.hypervisor:
1768
    hypervisor, hvparams = opts.hypervisor
1769

    
1770
  if opts.nics:
1771
    try:
1772
      nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
1773
    except ValueError, err:
1774
      raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
1775
    nics = [{}] * nic_max
1776
    for nidx, ndict in opts.nics:
1777
      nidx = int(nidx)
1778
      if not isinstance(ndict, dict):
1779
        msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict)
1780
        raise errors.OpPrereqError(msg)
1781
      nics[nidx] = ndict
1782
  elif opts.no_nics:
1783
    # no nics
1784
    nics = []
1785
  elif mode == constants.INSTANCE_CREATE:
1786
    # default of one nic, all auto
1787
    nics = [{}]
1788
  else:
1789
    # mode == import
1790
    nics = []
1791

    
1792
  if opts.disk_template == constants.DT_DISKLESS:
1793
    if opts.disks or opts.sd_size is not None:
1794
      raise errors.OpPrereqError("Diskless instance but disk"
1795
                                 " information passed")
1796
    disks = []
1797
  else:
1798
    if (not opts.disks and not opts.sd_size
1799
        and mode == constants.INSTANCE_CREATE):
1800
      raise errors.OpPrereqError("No disk information specified")
1801
    if opts.disks and opts.sd_size is not None:
1802
      raise errors.OpPrereqError("Please use either the '--disk' or"
1803
                                 " '-s' option")
1804
    if opts.sd_size is not None:
1805
      opts.disks = [(0, {"size": opts.sd_size})]
1806

    
1807
    if opts.disks:
1808
      try:
1809
        disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1810
      except ValueError, err:
1811
        raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
1812
      disks = [{}] * disk_max
1813
    else:
1814
      disks = []
1815
    for didx, ddict in opts.disks:
1816
      didx = int(didx)
1817
      if not isinstance(ddict, dict):
1818
        msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1819
        raise errors.OpPrereqError(msg)
1820
      elif "size" in ddict:
1821
        if "adopt" in ddict:
1822
          raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1823
                                     " (disk %d)" % didx)
1824
        try:
1825
          ddict["size"] = utils.ParseUnit(ddict["size"])
1826
        except ValueError, err:
1827
          raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1828
                                     (didx, err))
1829
      elif "adopt" in ddict:
1830
        if mode == constants.INSTANCE_IMPORT:
1831
          raise errors.OpPrereqError("Disk adoption not allowed for instance"
1832
                                     " import")
1833
        ddict["size"] = 0
1834
      else:
1835
        raise errors.OpPrereqError("Missing size or adoption source for"
1836
                                   " disk %d" % didx)
1837
      disks[didx] = ddict
1838

    
1839
  utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
1840
  utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1841

    
1842
  if mode == constants.INSTANCE_CREATE:
1843
    start = opts.start
1844
    os_type = opts.os
1845
    force_variant = opts.force_variant
1846
    src_node = None
1847
    src_path = None
1848
    no_install = opts.no_install
1849
    identify_defaults = False
1850
  elif mode == constants.INSTANCE_IMPORT:
1851
    start = False
1852
    os_type = None
1853
    force_variant = False
1854
    src_node = opts.src_node
1855
    src_path = opts.src_dir
1856
    no_install = None
1857
    identify_defaults = opts.identify_defaults
1858
  else:
1859
    raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1860

    
1861
  op = opcodes.OpCreateInstance(instance_name=instance,
1862
                                disks=disks,
1863
                                disk_template=opts.disk_template,
1864
                                nics=nics,
1865
                                pnode=pnode, snode=snode,
1866
                                ip_check=opts.ip_check,
1867
                                name_check=opts.name_check,
1868
                                wait_for_sync=opts.wait_for_sync,
1869
                                file_storage_dir=opts.file_storage_dir,
1870
                                file_driver=opts.file_driver,
1871
                                iallocator=opts.iallocator,
1872
                                hypervisor=hypervisor,
1873
                                hvparams=hvparams,
1874
                                beparams=opts.beparams,
1875
                                osparams=opts.osparams,
1876
                                mode=mode,
1877
                                start=start,
1878
                                os_type=os_type,
1879
                                force_variant=force_variant,
1880
                                src_node=src_node,
1881
                                src_path=src_path,
1882
                                no_install=no_install,
1883
                                identify_defaults=identify_defaults)
1884

    
1885
  SubmitOrSend(op, opts)
1886
  return 0
1887

    
1888

    
1889
class _RunWhileClusterStoppedHelper:
1890
  """Helper class for L{RunWhileClusterStopped} to simplify state management
1891

1892
  """
1893
  def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
1894
    """Initializes this class.
1895

1896
    @type feedback_fn: callable
1897
    @param feedback_fn: Feedback function
1898
    @type cluster_name: string
1899
    @param cluster_name: Cluster name
1900
    @type master_node: string
1901
    @param master_node Master node name
1902
    @type online_nodes: list
1903
    @param online_nodes: List of names of online nodes
1904

1905
    """
1906
    self.feedback_fn = feedback_fn
1907
    self.cluster_name = cluster_name
1908
    self.master_node = master_node
1909
    self.online_nodes = online_nodes
1910

    
1911
    self.ssh = ssh.SshRunner(self.cluster_name)
1912

    
1913
    self.nonmaster_nodes = [name for name in online_nodes
1914
                            if name != master_node]
1915

    
1916
    assert self.master_node not in self.nonmaster_nodes
1917

    
1918
  def _RunCmd(self, node_name, cmd):
1919
    """Runs a command on the local or a remote machine.
1920

1921
    @type node_name: string
1922
    @param node_name: Machine name
1923
    @type cmd: list
1924
    @param cmd: Command
1925

1926
    """
1927
    if node_name is None or node_name == self.master_node:
1928
      # No need to use SSH
1929
      result = utils.RunCmd(cmd)
1930
    else:
1931
      result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
1932

    
1933
    if result.failed:
1934
      errmsg = ["Failed to run command %s" % result.cmd]
1935
      if node_name:
1936
        errmsg.append("on node %s" % node_name)
1937
      errmsg.append(": exitcode %s and error %s" %
1938
                    (result.exit_code, result.output))
1939
      raise errors.OpExecError(" ".join(errmsg))
1940

    
1941
  def Call(self, fn, *args):
1942
    """Call function while all daemons are stopped.
1943

1944
    @type fn: callable
1945
    @param fn: Function to be called
1946

1947
    """
1948
    # Pause watcher by acquiring an exclusive lock on watcher state file
1949
    self.feedback_fn("Blocking watcher")
1950
    watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
1951
    try:
1952
      # TODO: Currently, this just blocks. There's no timeout.
1953
      # TODO: Should it be a shared lock?
1954
      watcher_block.Exclusive(blocking=True)
1955

    
1956
      # Stop master daemons, so that no new jobs can come in and all running
1957
      # ones are finished
1958
      self.feedback_fn("Stopping master daemons")
1959
      self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
1960
      try:
1961
        # Stop daemons on all nodes
1962
        for node_name in self.online_nodes:
1963
          self.feedback_fn("Stopping daemons on %s" % node_name)
1964
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
1965

    
1966
        # All daemons are shut down now
1967
        try:
1968
          return fn(self, *args)
1969
        except Exception, err:
1970
          _, errmsg = FormatError(err)
1971
          logging.exception("Caught exception")
1972
          self.feedback_fn(errmsg)
1973
          raise
1974
      finally:
1975
        # Start cluster again, master node last
1976
        for node_name in self.nonmaster_nodes + [self.master_node]:
1977
          self.feedback_fn("Starting daemons on %s" % node_name)
1978
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
1979
    finally:
1980
      # Resume watcher
1981
      watcher_block.Close()
1982

    
1983

    
1984
def RunWhileClusterStopped(feedback_fn, fn, *args):
1985
  """Calls a function while all cluster daemons are stopped.
1986

1987
  @type feedback_fn: callable
1988
  @param feedback_fn: Feedback function
1989
  @type fn: callable
1990
  @param fn: Function to be called when daemons are stopped
1991

1992
  """
1993
  feedback_fn("Gathering cluster information")
1994

    
1995
  # This ensures we're running on the master daemon
1996
  cl = GetClient()
1997

    
1998
  (cluster_name, master_node) = \
1999
    cl.QueryConfigValues(["cluster_name", "master_node"])
2000

    
2001
  online_nodes = GetOnlineNodes([], cl=cl)
2002

    
2003
  # Don't keep a reference to the client. The master daemon will go away.
2004
  del cl
2005

    
2006
  assert master_node in online_nodes
2007

    
2008
  return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
2009
                                       online_nodes).Call(fn, *args)
2010

    
2011

    
2012
def GenerateTable(headers, fields, separator, data,
2013
                  numfields=None, unitfields=None,
2014
                  units=None):
2015
  """Prints a table with headers and different fields.
2016

2017
  @type headers: dict
2018
  @param headers: dictionary mapping field names to headers for
2019
      the table
2020
  @type fields: list
2021
  @param fields: the field names corresponding to each row in
2022
      the data field
2023
  @param separator: the separator to be used; if this is None,
2024
      the default 'smart' algorithm is used which computes optimal
2025
      field width, otherwise just the separator is used between
2026
      each field
2027
  @type data: list
2028
  @param data: a list of lists, each sublist being one row to be output
2029
  @type numfields: list
2030
  @param numfields: a list with the fields that hold numeric
2031
      values and thus should be right-aligned
2032
  @type unitfields: list
2033
  @param unitfields: a list with the fields that hold numeric
2034
      values that should be formatted with the units field
2035
  @type units: string or None
2036
  @param units: the units we should use for formatting, or None for
2037
      automatic choice (human-readable for non-separator usage, otherwise
2038
      megabytes); this is a one-letter string
2039

2040
  """
2041
  if units is None:
2042
    if separator:
2043
      units = "m"
2044
    else:
2045
      units = "h"
2046

    
2047
  if numfields is None:
2048
    numfields = []
2049
  if unitfields is None:
2050
    unitfields = []
2051

    
2052
  numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
2053
  unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
2054

    
2055
  format_fields = []
2056
  for field in fields:
2057
    if headers and field not in headers:
2058
      # TODO: handle better unknown fields (either revert to old
2059
      # style of raising exception, or deal more intelligently with
2060
      # variable fields)
2061
      headers[field] = field
2062
    if separator is not None:
2063
      format_fields.append("%s")
2064
    elif numfields.Matches(field):
2065
      format_fields.append("%*s")
2066
    else:
2067
      format_fields.append("%-*s")
2068

    
2069
  if separator is None:
2070
    mlens = [0 for name in fields]
2071
    format_str = ' '.join(format_fields)
2072
  else:
2073
    format_str = separator.replace("%", "%%").join(format_fields)
2074

    
2075
  for row in data:
2076
    if row is None:
2077
      continue
2078
    for idx, val in enumerate(row):
2079
      if unitfields.Matches(fields[idx]):
2080
        try:
2081
          val = int(val)
2082
        except (TypeError, ValueError):
2083
          pass
2084
        else:
2085
          val = row[idx] = utils.FormatUnit(val, units)
2086
      val = row[idx] = str(val)
2087
      if separator is None:
2088
        mlens[idx] = max(mlens[idx], len(val))
2089

    
2090
  result = []
2091
  if headers:
2092
    args = []
2093
    for idx, name in enumerate(fields):
2094
      hdr = headers[name]
2095
      if separator is None:
2096
        mlens[idx] = max(mlens[idx], len(hdr))
2097
        args.append(mlens[idx])
2098
      args.append(hdr)
2099
    result.append(format_str % tuple(args))
2100

    
2101
  if separator is None:
2102
    assert len(mlens) == len(fields)
2103

    
2104
    if fields and not numfields.Matches(fields[-1]):
2105
      mlens[-1] = 0
2106

    
2107
  for line in data:
2108
    args = []
2109
    if line is None:
2110
      line = ['-' for _ in fields]
2111
    for idx in range(len(fields)):
2112
      if separator is None:
2113
        args.append(mlens[idx])
2114
      args.append(line[idx])
2115
    result.append(format_str % tuple(args))
2116

    
2117
  return result
2118

    
2119

    
2120
def FormatTimestamp(ts):
2121
  """Formats a given timestamp.
2122

2123
  @type ts: timestamp
2124
  @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
2125

2126
  @rtype: string
2127
  @return: a string with the formatted timestamp
2128

2129
  """
2130
  if not isinstance (ts, (tuple, list)) or len(ts) != 2:
2131
    return '?'
2132
  sec, usec = ts
2133
  return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
2134

    
2135

    
2136
def ParseTimespec(value):
2137
  """Parse a time specification.
2138

2139
  The following suffixed will be recognized:
2140

2141
    - s: seconds
2142
    - m: minutes
2143
    - h: hours
2144
    - d: day
2145
    - w: weeks
2146

2147
  Without any suffix, the value will be taken to be in seconds.
2148

2149
  """
2150
  value = str(value)
2151
  if not value:
2152
    raise errors.OpPrereqError("Empty time specification passed")
2153
  suffix_map = {
2154
    's': 1,
2155
    'm': 60,
2156
    'h': 3600,
2157
    'd': 86400,
2158
    'w': 604800,
2159
    }
2160
  if value[-1] not in suffix_map:
2161
    try:
2162
      value = int(value)
2163
    except (TypeError, ValueError):
2164
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2165
  else:
2166
    multiplier = suffix_map[value[-1]]
2167
    value = value[:-1]
2168
    if not value: # no data left after stripping the suffix
2169
      raise errors.OpPrereqError("Invalid time specification (only"
2170
                                 " suffix passed)")
2171
    try:
2172
      value = int(value) * multiplier
2173
    except (TypeError, ValueError):
2174
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2175
  return value
2176

    
2177

    
2178
def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
2179
                   filter_master=False):
2180
  """Returns the names of online nodes.
2181

2182
  This function will also log a warning on stderr with the names of
2183
  the online nodes.
2184

2185
  @param nodes: if not empty, use only this subset of nodes (minus the
2186
      offline ones)
2187
  @param cl: if not None, luxi client to use
2188
  @type nowarn: boolean
2189
  @param nowarn: by default, this function will output a note with the
2190
      offline nodes that are skipped; if this parameter is True the
2191
      note is not displayed
2192
  @type secondary_ips: boolean
2193
  @param secondary_ips: if True, return the secondary IPs instead of the
2194
      names, useful for doing network traffic over the replication interface
2195
      (if any)
2196
  @type filter_master: boolean
2197
  @param filter_master: if True, do not return the master node in the list
2198
      (useful in coordination with secondary_ips where we cannot check our
2199
      node name against the list)
2200

2201
  """
2202
  if cl is None:
2203
    cl = GetClient()
2204

    
2205
  if secondary_ips:
2206
    name_idx = 2
2207
  else:
2208
    name_idx = 0
2209

    
2210
  if filter_master:
2211
    master_node = cl.QueryConfigValues(["master_node"])[0]
2212
    filter_fn = lambda x: x != master_node
2213
  else:
2214
    filter_fn = lambda _: True
2215

    
2216
  result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"],
2217
                         use_locking=False)
2218
  offline = [row[0] for row in result if row[1]]
2219
  if offline and not nowarn:
2220
    ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
2221
  return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])]
2222

    
2223

    
2224
def _ToStream(stream, txt, *args):
2225
  """Write a message to a stream, bypassing the logging system
2226

2227
  @type stream: file object
2228
  @param stream: the file to which we should write
2229
  @type txt: str
2230
  @param txt: the message
2231

2232
  """
2233
  if args:
2234
    args = tuple(args)
2235
    stream.write(txt % args)
2236
  else:
2237
    stream.write(txt)
2238
  stream.write('\n')
2239
  stream.flush()
2240

    
2241

    
2242
def ToStdout(txt, *args):
2243
  """Write a message to stdout only, bypassing the logging system
2244

2245
  This is just a wrapper over _ToStream.
2246

2247
  @type txt: str
2248
  @param txt: the message
2249

2250
  """
2251
  _ToStream(sys.stdout, txt, *args)
2252

    
2253

    
2254
def ToStderr(txt, *args):
2255
  """Write a message to stderr only, bypassing the logging system
2256

2257
  This is just a wrapper over _ToStream.
2258

2259
  @type txt: str
2260
  @param txt: the message
2261

2262
  """
2263
  _ToStream(sys.stderr, txt, *args)
2264

    
2265

    
2266
class JobExecutor(object):
2267
  """Class which manages the submission and execution of multiple jobs.
2268

2269
  Note that instances of this class should not be reused between
2270
  GetResults() calls.
2271

2272
  """
2273
  def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
2274
    self.queue = []
2275
    if cl is None:
2276
      cl = GetClient()
2277
    self.cl = cl
2278
    self.verbose = verbose
2279
    self.jobs = []
2280
    self.opts = opts
2281
    self.feedback_fn = feedback_fn
2282

    
2283
  def QueueJob(self, name, *ops):
2284
    """Record a job for later submit.
2285

2286
    @type name: string
2287
    @param name: a description of the job, will be used in WaitJobSet
2288
    """
2289
    SetGenericOpcodeOpts(ops, self.opts)
2290
    self.queue.append((name, ops))
2291

    
2292
  def SubmitPending(self, each=False):
2293
    """Submit all pending jobs.
2294

2295
    """
2296
    if each:
2297
      results = []
2298
      for row in self.queue:
2299
        # SubmitJob will remove the success status, but raise an exception if
2300
        # the submission fails, so we'll notice that anyway.
2301
        results.append([True, self.cl.SubmitJob(row[1])])
2302
    else:
2303
      results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
2304
    for (idx, ((status, data), (name, _))) in enumerate(zip(results,
2305
                                                            self.queue)):
2306
      self.jobs.append((idx, status, data, name))
2307

    
2308
  def _ChooseJob(self):
2309
    """Choose a non-waiting/queued job to poll next.
2310

2311
    """
2312
    assert self.jobs, "_ChooseJob called with empty job list"
2313

    
2314
    result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
2315
    assert result
2316

    
2317
    for job_data, status in zip(self.jobs, result):
2318
      if status[0] in (constants.JOB_STATUS_QUEUED,
2319
                    constants.JOB_STATUS_WAITLOCK,
2320
                    constants.JOB_STATUS_CANCELING):
2321
        # job is still waiting
2322
        continue
2323
      # good candidate found
2324
      self.jobs.remove(job_data)
2325
      return job_data
2326

    
2327
    # no job found
2328
    return self.jobs.pop(0)
2329

    
2330
  def GetResults(self):
2331
    """Wait for and return the results of all jobs.
2332

2333
    @rtype: list
2334
    @return: list of tuples (success, job results), in the same order
2335
        as the submitted jobs; if a job has failed, instead of the result
2336
        there will be the error message
2337

2338
    """
2339
    if not self.jobs:
2340
      self.SubmitPending()
2341
    results = []
2342
    if self.verbose:
2343
      ok_jobs = [row[2] for row in self.jobs if row[1]]
2344
      if ok_jobs:
2345
        ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2346

    
2347
    # first, remove any non-submitted jobs
2348
    self.jobs, failures = compat.partition(self.jobs, lambda x: x[1])
2349
    for idx, _, jid, name in failures:
2350
      ToStderr("Failed to submit job for %s: %s", name, jid)
2351
      results.append((idx, False, jid))
2352

    
2353
    while self.jobs:
2354
      (idx, _, jid, name) = self._ChooseJob()
2355
      ToStdout("Waiting for job %s for %s...", jid, name)
2356
      try:
2357
        job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2358
        success = True
2359
      except (errors.GenericError, luxi.ProtocolError), err:
2360
        _, job_result = FormatError(err)
2361
        success = False
2362
        # the error message will always be shown, verbose or not
2363
        ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
2364

    
2365
      results.append((idx, success, job_result))
2366

    
2367
    # sort based on the index, then drop it
2368
    results.sort()
2369
    results = [i[1:] for i in results]
2370

    
2371
    return results
2372

    
2373
  def WaitOrShow(self, wait):
2374
    """Wait for job results or only print the job IDs.
2375

2376
    @type wait: boolean
2377
    @param wait: whether to wait or not
2378

2379
    """
2380
    if wait:
2381
      return self.GetResults()
2382
    else:
2383
      if not self.jobs:
2384
        self.SubmitPending()
2385
      for _, status, result, name in self.jobs:
2386
        if status:
2387
          ToStdout("%s: %s", result, name)
2388
        else:
2389
          ToStderr("Failure for %s: %s", name, result)
2390
      return [row[1:3] for row in self.jobs]