Statistics
| Branch: | Tag: | Revision:

root / lib / cli.py @ 74adc100

History | View | Annotate | Download (67.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module dealing with command line parsing"""
23

    
24

    
25
import sys
26
import textwrap
27
import os.path
28
import time
29
import logging
30
from cStringIO import StringIO
31

    
32
from ganeti import utils
33
from ganeti import errors
34
from ganeti import constants
35
from ganeti import opcodes
36
from ganeti import luxi
37
from ganeti import ssconf
38
from ganeti import rpc
39
from ganeti import ssh
40

    
41
from optparse import (OptionParser, TitledHelpFormatter,
42
                      Option, OptionValueError)
43

    
44

    
45
__all__ = [
46
  # Command line options
47
  "ALLOCATABLE_OPT",
48
  "ALL_OPT",
49
  "AUTO_PROMOTE_OPT",
50
  "AUTO_REPLACE_OPT",
51
  "BACKEND_OPT",
52
  "CLEANUP_OPT",
53
  "CONFIRM_OPT",
54
  "CP_SIZE_OPT",
55
  "DEBUG_OPT",
56
  "DEBUG_SIMERR_OPT",
57
  "DISKIDX_OPT",
58
  "DISK_OPT",
59
  "DISK_TEMPLATE_OPT",
60
  "DRAINED_OPT",
61
  "EARLY_RELEASE_OPT",
62
  "ENABLED_HV_OPT",
63
  "ERROR_CODES_OPT",
64
  "FIELDS_OPT",
65
  "FILESTORE_DIR_OPT",
66
  "FILESTORE_DRIVER_OPT",
67
  "FORCE_OPT",
68
  "FORCE_VARIANT_OPT",
69
  "GLOBAL_FILEDIR_OPT",
70
  "HVLIST_OPT",
71
  "HVOPTS_OPT",
72
  "HYPERVISOR_OPT",
73
  "IALLOCATOR_OPT",
74
  "IGNORE_CONSIST_OPT",
75
  "IGNORE_FAILURES_OPT",
76
  "IGNORE_SECONDARIES_OPT",
77
  "IGNORE_SIZE_OPT",
78
  "MAC_PREFIX_OPT",
79
  "MASTER_NETDEV_OPT",
80
  "MC_OPT",
81
  "NET_OPT",
82
  "NEW_CLUSTER_CERT_OPT",
83
  "NEW_CONFD_HMAC_KEY_OPT",
84
  "NEW_RAPI_CERT_OPT",
85
  "NEW_SECONDARY_OPT",
86
  "NIC_PARAMS_OPT",
87
  "NODE_LIST_OPT",
88
  "NODE_PLACEMENT_OPT",
89
  "NOHDR_OPT",
90
  "NOIPCHECK_OPT",
91
  "NO_INSTALL_OPT",
92
  "NONAMECHECK_OPT",
93
  "NOLVM_STORAGE_OPT",
94
  "NOMODIFY_ETCHOSTS_OPT",
95
  "NOMODIFY_SSH_SETUP_OPT",
96
  "NONICS_OPT",
97
  "NONLIVE_OPT",
98
  "NONPLUS1_OPT",
99
  "NOSHUTDOWN_OPT",
100
  "NOSTART_OPT",
101
  "NOSSH_KEYCHECK_OPT",
102
  "NOVOTING_OPT",
103
  "NWSYNC_OPT",
104
  "ON_PRIMARY_OPT",
105
  "ON_SECONDARY_OPT",
106
  "OFFLINE_OPT",
107
  "OS_OPT",
108
  "OS_SIZE_OPT",
109
  "RAPI_CERT_OPT",
110
  "READD_OPT",
111
  "REBOOT_TYPE_OPT",
112
  "SECONDARY_IP_OPT",
113
  "SELECT_OS_OPT",
114
  "SEP_OPT",
115
  "SHOWCMD_OPT",
116
  "SHUTDOWN_TIMEOUT_OPT",
117
  "SINGLE_NODE_OPT",
118
  "SRC_DIR_OPT",
119
  "SRC_NODE_OPT",
120
  "SUBMIT_OPT",
121
  "STATIC_OPT",
122
  "SYNC_OPT",
123
  "TAG_SRC_OPT",
124
  "TIMEOUT_OPT",
125
  "USEUNITS_OPT",
126
  "USE_REPL_NET_OPT",
127
  "VERBOSE_OPT",
128
  "VG_NAME_OPT",
129
  "YES_DOIT_OPT",
130
  # Generic functions for CLI programs
131
  "GenericMain",
132
  "GenericInstanceCreate",
133
  "GetClient",
134
  "GetOnlineNodes",
135
  "JobExecutor",
136
  "JobSubmittedException",
137
  "ParseTimespec",
138
  "RunWhileClusterStopped",
139
  "SubmitOpCode",
140
  "SubmitOrSend",
141
  "UsesRPC",
142
  # Formatting functions
143
  "ToStderr", "ToStdout",
144
  "FormatError",
145
  "GenerateTable",
146
  "AskUser",
147
  "FormatTimestamp",
148
  # Tags functions
149
  "ListTags",
150
  "AddTags",
151
  "RemoveTags",
152
  # command line options support infrastructure
153
  "ARGS_MANY_INSTANCES",
154
  "ARGS_MANY_NODES",
155
  "ARGS_NONE",
156
  "ARGS_ONE_INSTANCE",
157
  "ARGS_ONE_NODE",
158
  "ARGS_ONE_OS",
159
  "ArgChoice",
160
  "ArgCommand",
161
  "ArgFile",
162
  "ArgHost",
163
  "ArgInstance",
164
  "ArgJobId",
165
  "ArgNode",
166
  "ArgOs",
167
  "ArgSuggest",
168
  "ArgUnknown",
169
  "OPT_COMPL_INST_ADD_NODES",
170
  "OPT_COMPL_MANY_NODES",
171
  "OPT_COMPL_ONE_IALLOCATOR",
172
  "OPT_COMPL_ONE_INSTANCE",
173
  "OPT_COMPL_ONE_NODE",
174
  "OPT_COMPL_ONE_OS",
175
  "cli_option",
176
  "SplitNodeOption",
177
  "CalculateOSNames",
178
  ]
179

    
180
NO_PREFIX = "no_"
181
UN_PREFIX = "-"
182

    
183

    
184
class _Argument:
185
  def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
186
    self.min = min
187
    self.max = max
188

    
189
  def __repr__(self):
190
    return ("<%s min=%s max=%s>" %
191
            (self.__class__.__name__, self.min, self.max))
192

    
193

    
194
class ArgSuggest(_Argument):
195
  """Suggesting argument.
196

197
  Value can be any of the ones passed to the constructor.
198

199
  """
200
  # pylint: disable-msg=W0622
201
  def __init__(self, min=0, max=None, choices=None):
202
    _Argument.__init__(self, min=min, max=max)
203
    self.choices = choices
204

    
205
  def __repr__(self):
206
    return ("<%s min=%s max=%s choices=%r>" %
207
            (self.__class__.__name__, self.min, self.max, self.choices))
208

    
209

    
210
class ArgChoice(ArgSuggest):
211
  """Choice argument.
212

213
  Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
214
  but value must be one of the choices.
215

216
  """
217

    
218

    
219
class ArgUnknown(_Argument):
220
  """Unknown argument to program (e.g. determined at runtime).
221

222
  """
223

    
224

    
225
class ArgInstance(_Argument):
226
  """Instances argument.
227

228
  """
229

    
230

    
231
class ArgNode(_Argument):
232
  """Node argument.
233

234
  """
235

    
236
class ArgJobId(_Argument):
237
  """Job ID argument.
238

239
  """
240

    
241

    
242
class ArgFile(_Argument):
243
  """File path argument.
244

245
  """
246

    
247

    
248
class ArgCommand(_Argument):
249
  """Command argument.
250

251
  """
252

    
253

    
254
class ArgHost(_Argument):
255
  """Host argument.
256

257
  """
258

    
259

    
260
class ArgOs(_Argument):
261
  """OS argument.
262

263
  """
264

    
265

    
266
ARGS_NONE = []
267
ARGS_MANY_INSTANCES = [ArgInstance()]
268
ARGS_MANY_NODES = [ArgNode()]
269
ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
270
ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
271
ARGS_ONE_OS = [ArgOs(min=1, max=1)]
272

    
273

    
274
def _ExtractTagsObject(opts, args):
275
  """Extract the tag type object.
276

277
  Note that this function will modify its args parameter.
278

279
  """
280
  if not hasattr(opts, "tag_type"):
281
    raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
282
  kind = opts.tag_type
283
  if kind == constants.TAG_CLUSTER:
284
    retval = kind, kind
285
  elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
286
    if not args:
287
      raise errors.OpPrereqError("no arguments passed to the command")
288
    name = args.pop(0)
289
    retval = kind, name
290
  else:
291
    raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
292
  return retval
293

    
294

    
295
def _ExtendTags(opts, args):
296
  """Extend the args if a source file has been given.
297

298
  This function will extend the tags with the contents of the file
299
  passed in the 'tags_source' attribute of the opts parameter. A file
300
  named '-' will be replaced by stdin.
301

302
  """
303
  fname = opts.tags_source
304
  if fname is None:
305
    return
306
  if fname == "-":
307
    new_fh = sys.stdin
308
  else:
309
    new_fh = open(fname, "r")
310
  new_data = []
311
  try:
312
    # we don't use the nice 'new_data = [line.strip() for line in fh]'
313
    # because of python bug 1633941
314
    while True:
315
      line = new_fh.readline()
316
      if not line:
317
        break
318
      new_data.append(line.strip())
319
  finally:
320
    new_fh.close()
321
  args.extend(new_data)
322

    
323

    
324
def ListTags(opts, args):
325
  """List the tags on a given object.
326

327
  This is a generic implementation that knows how to deal with all
328
  three cases of tag objects (cluster, node, instance). The opts
329
  argument is expected to contain a tag_type field denoting what
330
  object type we work on.
331

332
  """
333
  kind, name = _ExtractTagsObject(opts, args)
334
  cl = GetClient()
335
  result = cl.QueryTags(kind, name)
336
  result = list(result)
337
  result.sort()
338
  for tag in result:
339
    ToStdout(tag)
340

    
341

    
342
def AddTags(opts, args):
343
  """Add tags on a given object.
344

345
  This is a generic implementation that knows how to deal with all
346
  three cases of tag objects (cluster, node, instance). The opts
347
  argument is expected to contain a tag_type field denoting what
348
  object type we work on.
349

350
  """
351
  kind, name = _ExtractTagsObject(opts, args)
352
  _ExtendTags(opts, args)
353
  if not args:
354
    raise errors.OpPrereqError("No tags to be added")
355
  op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
356
  SubmitOpCode(op)
357

    
358

    
359
def RemoveTags(opts, args):
360
  """Remove tags from a given object.
361

362
  This is a generic implementation that knows how to deal with all
363
  three cases of tag objects (cluster, node, instance). The opts
364
  argument is expected to contain a tag_type field denoting what
365
  object type we work on.
366

367
  """
368
  kind, name = _ExtractTagsObject(opts, args)
369
  _ExtendTags(opts, args)
370
  if not args:
371
    raise errors.OpPrereqError("No tags to be removed")
372
  op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
373
  SubmitOpCode(op)
374

    
375

    
376
def check_unit(option, opt, value): # pylint: disable-msg=W0613
377
  """OptParsers custom converter for units.
378

379
  """
380
  try:
381
    return utils.ParseUnit(value)
382
  except errors.UnitParseError, err:
383
    raise OptionValueError("option %s: %s" % (opt, err))
384

    
385

    
386
def _SplitKeyVal(opt, data):
387
  """Convert a KeyVal string into a dict.
388

389
  This function will convert a key=val[,...] string into a dict. Empty
390
  values will be converted specially: keys which have the prefix 'no_'
391
  will have the value=False and the prefix stripped, the others will
392
  have value=True.
393

394
  @type opt: string
395
  @param opt: a string holding the option name for which we process the
396
      data, used in building error messages
397
  @type data: string
398
  @param data: a string of the format key=val,key=val,...
399
  @rtype: dict
400
  @return: {key=val, key=val}
401
  @raises errors.ParameterError: if there are duplicate keys
402

403
  """
404
  kv_dict = {}
405
  if data:
406
    for elem in utils.UnescapeAndSplit(data, sep=","):
407
      if "=" in elem:
408
        key, val = elem.split("=", 1)
409
      else:
410
        if elem.startswith(NO_PREFIX):
411
          key, val = elem[len(NO_PREFIX):], False
412
        elif elem.startswith(UN_PREFIX):
413
          key, val = elem[len(UN_PREFIX):], None
414
        else:
415
          key, val = elem, True
416
      if key in kv_dict:
417
        raise errors.ParameterError("Duplicate key '%s' in option %s" %
418
                                    (key, opt))
419
      kv_dict[key] = val
420
  return kv_dict
421

    
422

    
423
def check_ident_key_val(option, opt, value):  # pylint: disable-msg=W0613
424
  """Custom parser for ident:key=val,key=val options.
425

426
  This will store the parsed values as a tuple (ident, {key: val}). As such,
427
  multiple uses of this option via action=append is possible.
428

429
  """
430
  if ":" not in value:
431
    ident, rest = value, ''
432
  else:
433
    ident, rest = value.split(":", 1)
434

    
435
  if ident.startswith(NO_PREFIX):
436
    if rest:
437
      msg = "Cannot pass options when removing parameter groups: %s" % value
438
      raise errors.ParameterError(msg)
439
    retval = (ident[len(NO_PREFIX):], False)
440
  elif ident.startswith(UN_PREFIX):
441
    if rest:
442
      msg = "Cannot pass options when removing parameter groups: %s" % value
443
      raise errors.ParameterError(msg)
444
    retval = (ident[len(UN_PREFIX):], None)
445
  else:
446
    kv_dict = _SplitKeyVal(opt, rest)
447
    retval = (ident, kv_dict)
448
  return retval
449

    
450

    
451
def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
452
  """Custom parser class for key=val,key=val options.
453

454
  This will store the parsed values as a dict {key: val}.
455

456
  """
457
  return _SplitKeyVal(opt, value)
458

    
459

    
460
# completion_suggestion is normally a list. Using numeric values not evaluating
461
# to False for dynamic completion.
462
(OPT_COMPL_MANY_NODES,
463
 OPT_COMPL_ONE_NODE,
464
 OPT_COMPL_ONE_INSTANCE,
465
 OPT_COMPL_ONE_OS,
466
 OPT_COMPL_ONE_IALLOCATOR,
467
 OPT_COMPL_INST_ADD_NODES) = range(100, 106)
468

    
469
OPT_COMPL_ALL = frozenset([
470
  OPT_COMPL_MANY_NODES,
471
  OPT_COMPL_ONE_NODE,
472
  OPT_COMPL_ONE_INSTANCE,
473
  OPT_COMPL_ONE_OS,
474
  OPT_COMPL_ONE_IALLOCATOR,
475
  OPT_COMPL_INST_ADD_NODES,
476
  ])
477

    
478

    
479
class CliOption(Option):
480
  """Custom option class for optparse.
481

482
  """
483
  ATTRS = Option.ATTRS + [
484
    "completion_suggest",
485
    ]
486
  TYPES = Option.TYPES + (
487
    "identkeyval",
488
    "keyval",
489
    "unit",
490
    )
491
  TYPE_CHECKER = Option.TYPE_CHECKER.copy()
492
  TYPE_CHECKER["identkeyval"] = check_ident_key_val
493
  TYPE_CHECKER["keyval"] = check_key_val
494
  TYPE_CHECKER["unit"] = check_unit
495

    
496

    
497
# optparse.py sets make_option, so we do it for our own option class, too
498
cli_option = CliOption
499

    
500

    
501
_YESNO = ("yes", "no")
502
_YORNO = "yes|no"
503

    
504
DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
505
                       help="Increase debugging level")
506

    
507
NOHDR_OPT = cli_option("--no-headers", default=False,
508
                       action="store_true", dest="no_headers",
509
                       help="Don't display column headers")
510

    
511
SEP_OPT = cli_option("--separator", default=None,
512
                     action="store", dest="separator",
513
                     help=("Separator between output fields"
514
                           " (defaults to one space)"))
515

    
516
USEUNITS_OPT = cli_option("--units", default=None,
517
                          dest="units", choices=('h', 'm', 'g', 't'),
518
                          help="Specify units for output (one of hmgt)")
519

    
520
FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
521
                        type="string", metavar="FIELDS",
522
                        help="Comma separated list of output fields")
523

    
524
FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
525
                       default=False, help="Force the operation")
526

    
527
CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
528
                         default=False, help="Do not require confirmation")
529

    
530
TAG_SRC_OPT = cli_option("--from", dest="tags_source",
531
                         default=None, help="File with tag names")
532

    
533
SUBMIT_OPT = cli_option("--submit", dest="submit_only",
534
                        default=False, action="store_true",
535
                        help=("Submit the job and return the job ID, but"
536
                              " don't wait for the job to finish"))
537

    
538
SYNC_OPT = cli_option("--sync", dest="do_locking",
539
                      default=False, action="store_true",
540
                      help=("Grab locks while doing the queries"
541
                            " in order to ensure more consistent results"))
542

    
543
_DRY_RUN_OPT = cli_option("--dry-run", default=False,
544
                          action="store_true",
545
                          help=("Do not execute the operation, just run the"
546
                                " check steps and verify it it could be"
547
                                " executed"))
548

    
549
VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
550
                         action="store_true",
551
                         help="Increase the verbosity of the operation")
552

    
553
DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
554
                              action="store_true", dest="simulate_errors",
555
                              help="Debugging option that makes the operation"
556
                              " treat most runtime checks as failed")
557

    
558
NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
559
                        default=True, action="store_false",
560
                        help="Don't wait for sync (DANGEROUS!)")
561

    
562
DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
563
                               help="Custom disk setup (diskless, file,"
564
                               " plain or drbd)",
565
                               default=None, metavar="TEMPL",
566
                               choices=list(constants.DISK_TEMPLATES))
567

    
568
NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
569
                        help="Do not create any network cards for"
570
                        " the instance")
571

    
572
FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
573
                               help="Relative path under default cluster-wide"
574
                               " file storage dir to store file-based disks",
575
                               default=None, metavar="<DIR>")
576

    
577
FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
578
                                  help="Driver to use for image files",
579
                                  default="loop", metavar="<DRIVER>",
580
                                  choices=list(constants.FILE_DRIVER))
581

    
582
IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
583
                            help="Select nodes for the instance automatically"
584
                            " using the <NAME> iallocator plugin",
585
                            default=None, type="string",
586
                            completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
587

    
588
OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
589
                    metavar="<os>",
590
                    completion_suggest=OPT_COMPL_ONE_OS)
591

    
592
FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
593
                               action="store_true", default=False,
594
                               help="Force an unknown variant")
595

    
596
NO_INSTALL_OPT = cli_option("--no-install", dest="no_install",
597
                            action="store_true", default=False,
598
                            help="Do not install the OS (will"
599
                            " enable no-start)")
600

    
601
BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
602
                         type="keyval", default={},
603
                         help="Backend parameters")
604

    
605
HVOPTS_OPT =  cli_option("-H", "--hypervisor-parameters", type="keyval",
606
                         default={}, dest="hvparams",
607
                         help="Hypervisor parameters")
608

    
609
HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
610
                            help="Hypervisor and hypervisor options, in the"
611
                            " format hypervisor:option=value,option=value,...",
612
                            default=None, type="identkeyval")
613

    
614
HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
615
                        help="Hypervisor and hypervisor options, in the"
616
                        " format hypervisor:option=value,option=value,...",
617
                        default=[], action="append", type="identkeyval")
618

    
619
NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
620
                           action="store_false",
621
                           help="Don't check that the instance's IP"
622
                           " is alive")
623

    
624
NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
625
                             default=True, action="store_false",
626
                             help="Don't check that the instance's name"
627
                             " is resolvable")
628

    
629
NET_OPT = cli_option("--net",
630
                     help="NIC parameters", default=[],
631
                     dest="nics", action="append", type="identkeyval")
632

    
633
DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
634
                      dest="disks", action="append", type="identkeyval")
635

    
636
DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
637
                         help="Comma-separated list of disks"
638
                         " indices to act on (e.g. 0,2) (optional,"
639
                         " defaults to all disks)")
640

    
641
OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
642
                         help="Enforces a single-disk configuration using the"
643
                         " given disk size, in MiB unless a suffix is used",
644
                         default=None, type="unit", metavar="<size>")
645

    
646
IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
647
                                dest="ignore_consistency",
648
                                action="store_true", default=False,
649
                                help="Ignore the consistency of the disks on"
650
                                " the secondary")
651

    
652
NONLIVE_OPT = cli_option("--non-live", dest="live",
653
                         default=True, action="store_false",
654
                         help="Do a non-live migration (this usually means"
655
                         " freeze the instance, save the state, transfer and"
656
                         " only then resume running on the secondary node)")
657

    
658
NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
659
                                help="Target node and optional secondary node",
660
                                metavar="<pnode>[:<snode>]",
661
                                completion_suggest=OPT_COMPL_INST_ADD_NODES)
662

    
663
NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
664
                           action="append", metavar="<node>",
665
                           help="Use only this node (can be used multiple"
666
                           " times, if not given defaults to all nodes)",
667
                           completion_suggest=OPT_COMPL_ONE_NODE)
668

    
669
SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
670
                             metavar="<node>",
671
                             completion_suggest=OPT_COMPL_ONE_NODE)
672

    
673
NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
674
                         action="store_false",
675
                         help="Don't start the instance after creation")
676

    
677
SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
678
                         action="store_true", default=False,
679
                         help="Show command instead of executing it")
680

    
681
CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
682
                         default=False, action="store_true",
683
                         help="Instead of performing the migration, try to"
684
                         " recover from a failed cleanup. This is safe"
685
                         " to run even if the instance is healthy, but it"
686
                         " will create extra replication traffic and "
687
                         " disrupt briefly the replication (like during the"
688
                         " migration")
689

    
690
STATIC_OPT = cli_option("-s", "--static", dest="static",
691
                        action="store_true", default=False,
692
                        help="Only show configuration data, not runtime data")
693

    
694
ALL_OPT = cli_option("--all", dest="show_all",
695
                     default=False, action="store_true",
696
                     help="Show info on all instances on the cluster."
697
                     " This can take a long time to run, use wisely")
698

    
699
SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
700
                           action="store_true", default=False,
701
                           help="Interactive OS reinstall, lists available"
702
                           " OS templates for selection")
703

    
704
IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
705
                                 action="store_true", default=False,
706
                                 help="Remove the instance from the cluster"
707
                                 " configuration even if there are failures"
708
                                 " during the removal process")
709

    
710
NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
711
                               help="Specifies the new secondary node",
712
                               metavar="NODE", default=None,
713
                               completion_suggest=OPT_COMPL_ONE_NODE)
714

    
715
ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
716
                            default=False, action="store_true",
717
                            help="Replace the disk(s) on the primary"
718
                            " node (only for the drbd template)")
719

    
720
ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
721
                              default=False, action="store_true",
722
                              help="Replace the disk(s) on the secondary"
723
                              " node (only for the drbd template)")
724

    
725
AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
726
                              default=False, action="store_true",
727
                              help="Lock all nodes and auto-promote as needed"
728
                              " to MC status")
729

    
730
AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
731
                              default=False, action="store_true",
732
                              help="Automatically replace faulty disks"
733
                              " (only for the drbd template)")
734

    
735
IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
736
                             default=False, action="store_true",
737
                             help="Ignore current recorded size"
738
                             " (useful for forcing activation when"
739
                             " the recorded size is wrong)")
740

    
741
SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
742
                          metavar="<node>",
743
                          completion_suggest=OPT_COMPL_ONE_NODE)
744

    
745
SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
746
                         metavar="<dir>")
747

    
748
SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
749
                              help="Specify the secondary ip for the node",
750
                              metavar="ADDRESS", default=None)
751

    
752
READD_OPT = cli_option("--readd", dest="readd",
753
                       default=False, action="store_true",
754
                       help="Readd old node after replacing it")
755

    
756
NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
757
                                default=True, action="store_false",
758
                                help="Disable SSH key fingerprint checking")
759

    
760

    
761
MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
762
                    choices=_YESNO, default=None, metavar=_YORNO,
763
                    help="Set the master_candidate flag on the node")
764

    
765
OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
766
                         choices=_YESNO, default=None,
767
                         help="Set the offline flag on the node")
768

    
769
DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
770
                         choices=_YESNO, default=None,
771
                         help="Set the drained flag on the node")
772

    
773
ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
774
                             choices=_YESNO, default=None, metavar=_YORNO,
775
                             help="Set the allocatable flag on a volume")
776

    
777
NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
778
                               help="Disable support for lvm based instances"
779
                               " (cluster-wide)",
780
                               action="store_false", default=True)
781

    
782
ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
783
                            dest="enabled_hypervisors",
784
                            help="Comma-separated list of hypervisors",
785
                            type="string", default=None)
786

    
787
NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
788
                            type="keyval", default={},
789
                            help="NIC parameters")
790

    
791
CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
792
                         dest="candidate_pool_size", type="int",
793
                         help="Set the candidate pool size")
794

    
795
VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name",
796
                         help="Enables LVM and specifies the volume group"
797
                         " name (cluster-wide) for disk allocation [xenvg]",
798
                         metavar="VG", default=None)
799

    
800
YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
801
                          help="Destroy cluster", action="store_true")
802

    
803
NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
804
                          help="Skip node agreement check (dangerous)",
805
                          action="store_true", default=False)
806

    
807
MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
808
                            help="Specify the mac prefix for the instance IP"
809
                            " addresses, in the format XX:XX:XX",
810
                            metavar="PREFIX",
811
                            default=None)
812

    
813
MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
814
                               help="Specify the node interface (cluster-wide)"
815
                               " on which the master IP address will be added "
816
                               " [%s]" % constants.DEFAULT_BRIDGE,
817
                               metavar="NETDEV",
818
                               default=constants.DEFAULT_BRIDGE)
819

    
820

    
821
GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
822
                                help="Specify the default directory (cluster-"
823
                                "wide) for storing the file-based disks [%s]" %
824
                                constants.DEFAULT_FILE_STORAGE_DIR,
825
                                metavar="DIR",
826
                                default=constants.DEFAULT_FILE_STORAGE_DIR)
827

    
828
NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
829
                                   help="Don't modify /etc/hosts",
830
                                   action="store_false", default=True)
831

    
832
NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
833
                                    help="Don't initialize SSH keys",
834
                                    action="store_false", default=True)
835

    
836
ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
837
                             help="Enable parseable error messages",
838
                             action="store_true", default=False)
839

    
840
NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
841
                          help="Skip N+1 memory redundancy tests",
842
                          action="store_true", default=False)
843

    
844
REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
845
                             help="Type of reboot: soft/hard/full",
846
                             default=constants.INSTANCE_REBOOT_HARD,
847
                             metavar="<REBOOT>",
848
                             choices=list(constants.REBOOT_TYPES))
849

    
850
IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
851
                                    dest="ignore_secondaries",
852
                                    default=False, action="store_true",
853
                                    help="Ignore errors from secondaries")
854

    
855
NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
856
                            action="store_false", default=True,
857
                            help="Don't shutdown the instance (unsafe)")
858

    
859
TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
860
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
861
                         help="Maximum time to wait")
862

    
863
SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
864
                         dest="shutdown_timeout", type="int",
865
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
866
                         help="Maximum time to wait for instance shutdown")
867

    
868
EARLY_RELEASE_OPT = cli_option("--early-release",
869
                               dest="early_release", default=False,
870
                               action="store_true",
871
                               help="Release the locks on the secondary"
872
                               " node(s) early")
873

    
874
NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
875
                                  dest="new_cluster_cert",
876
                                  default=False, action="store_true",
877
                                  help="Generate a new cluster certificate")
878

    
879
RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
880
                           default=None,
881
                           help="File containing new RAPI certificate")
882

    
883
NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
884
                               default=None, action="store_true",
885
                               help=("Generate a new self-signed RAPI"
886
                                     " certificate"))
887

    
888
NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
889
                                    dest="new_confd_hmac_key",
890
                                    default=False, action="store_true",
891
                                    help=("Create a new HMAC key for %s" %
892
                                          constants.CONFD))
893

    
894
USE_REPL_NET_OPT = cli_option("--use-replication-network",
895
                              dest="use_replication_network",
896
                              help="Whether to use the replication network"
897
                              " for talking to the nodes",
898
                              action="store_true", default=False)
899

    
900

    
901
def _ParseArgs(argv, commands, aliases):
902
  """Parser for the command line arguments.
903

904
  This function parses the arguments and returns the function which
905
  must be executed together with its (modified) arguments.
906

907
  @param argv: the command line
908
  @param commands: dictionary with special contents, see the design
909
      doc for cmdline handling
910
  @param aliases: dictionary with command aliases {'alias': 'target, ...}
911

912
  """
913
  if len(argv) == 0:
914
    binary = "<command>"
915
  else:
916
    binary = argv[0].split("/")[-1]
917

    
918
  if len(argv) > 1 and argv[1] == "--version":
919
    ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
920
    # Quit right away. That way we don't have to care about this special
921
    # argument. optparse.py does it the same.
922
    sys.exit(0)
923

    
924
  if len(argv) < 2 or not (argv[1] in commands or
925
                           argv[1] in aliases):
926
    # let's do a nice thing
927
    sortedcmds = commands.keys()
928
    sortedcmds.sort()
929

    
930
    ToStdout("Usage: %s {command} [options...] [argument...]", binary)
931
    ToStdout("%s <command> --help to see details, or man %s", binary, binary)
932
    ToStdout("")
933

    
934
    # compute the max line length for cmd + usage
935
    mlen = max([len(" %s" % cmd) for cmd in commands])
936
    mlen = min(60, mlen) # should not get here...
937

    
938
    # and format a nice command list
939
    ToStdout("Commands:")
940
    for cmd in sortedcmds:
941
      cmdstr = " %s" % (cmd,)
942
      help_text = commands[cmd][4]
943
      help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
944
      ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
945
      for line in help_lines:
946
        ToStdout("%-*s   %s", mlen, "", line)
947

    
948
    ToStdout("")
949

    
950
    return None, None, None
951

    
952
  # get command, unalias it, and look it up in commands
953
  cmd = argv.pop(1)
954
  if cmd in aliases:
955
    if cmd in commands:
956
      raise errors.ProgrammerError("Alias '%s' overrides an existing"
957
                                   " command" % cmd)
958

    
959
    if aliases[cmd] not in commands:
960
      raise errors.ProgrammerError("Alias '%s' maps to non-existing"
961
                                   " command '%s'" % (cmd, aliases[cmd]))
962

    
963
    cmd = aliases[cmd]
964

    
965
  func, args_def, parser_opts, usage, description = commands[cmd]
966
  parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT, DEBUG_OPT],
967
                        description=description,
968
                        formatter=TitledHelpFormatter(),
969
                        usage="%%prog %s %s" % (cmd, usage))
970
  parser.disable_interspersed_args()
971
  options, args = parser.parse_args()
972

    
973
  if not _CheckArguments(cmd, args_def, args):
974
    return None, None, None
975

    
976
  return func, options, args
977

    
978

    
979
def _CheckArguments(cmd, args_def, args):
980
  """Verifies the arguments using the argument definition.
981

982
  Algorithm:
983

984
    1. Abort with error if values specified by user but none expected.
985

986
    1. For each argument in definition
987

988
      1. Keep running count of minimum number of values (min_count)
989
      1. Keep running count of maximum number of values (max_count)
990
      1. If it has an unlimited number of values
991

992
        1. Abort with error if it's not the last argument in the definition
993

994
    1. If last argument has limited number of values
995

996
      1. Abort with error if number of values doesn't match or is too large
997

998
    1. Abort with error if user didn't pass enough values (min_count)
999

1000
  """
1001
  if args and not args_def:
1002
    ToStderr("Error: Command %s expects no arguments", cmd)
1003
    return False
1004

    
1005
  min_count = None
1006
  max_count = None
1007
  check_max = None
1008

    
1009
  last_idx = len(args_def) - 1
1010

    
1011
  for idx, arg in enumerate(args_def):
1012
    if min_count is None:
1013
      min_count = arg.min
1014
    elif arg.min is not None:
1015
      min_count += arg.min
1016

    
1017
    if max_count is None:
1018
      max_count = arg.max
1019
    elif arg.max is not None:
1020
      max_count += arg.max
1021

    
1022
    if idx == last_idx:
1023
      check_max = (arg.max is not None)
1024

    
1025
    elif arg.max is None:
1026
      raise errors.ProgrammerError("Only the last argument can have max=None")
1027

    
1028
  if check_max:
1029
    # Command with exact number of arguments
1030
    if (min_count is not None and max_count is not None and
1031
        min_count == max_count and len(args) != min_count):
1032
      ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
1033
      return False
1034

    
1035
    # Command with limited number of arguments
1036
    if max_count is not None and len(args) > max_count:
1037
      ToStderr("Error: Command %s expects only %d argument(s)",
1038
               cmd, max_count)
1039
      return False
1040

    
1041
  # Command with some required arguments
1042
  if min_count is not None and len(args) < min_count:
1043
    ToStderr("Error: Command %s expects at least %d argument(s)",
1044
             cmd, min_count)
1045
    return False
1046

    
1047
  return True
1048

    
1049

    
1050
def SplitNodeOption(value):
1051
  """Splits the value of a --node option.
1052

1053
  """
1054
  if value and ':' in value:
1055
    return value.split(':', 1)
1056
  else:
1057
    return (value, None)
1058

    
1059

    
1060
def CalculateOSNames(os_name, os_variants):
1061
  """Calculates all the names an OS can be called, according to its variants.
1062

1063
  @type os_name: string
1064
  @param os_name: base name of the os
1065
  @type os_variants: list or None
1066
  @param os_variants: list of supported variants
1067
  @rtype: list
1068
  @return: list of valid names
1069

1070
  """
1071
  if os_variants:
1072
    return ['%s+%s' % (os_name, v) for v in os_variants]
1073
  else:
1074
    return [os_name]
1075

    
1076

    
1077
def UsesRPC(fn):
1078
  def wrapper(*args, **kwargs):
1079
    rpc.Init()
1080
    try:
1081
      return fn(*args, **kwargs)
1082
    finally:
1083
      rpc.Shutdown()
1084
  return wrapper
1085

    
1086

    
1087
def AskUser(text, choices=None):
1088
  """Ask the user a question.
1089

1090
  @param text: the question to ask
1091

1092
  @param choices: list with elements tuples (input_char, return_value,
1093
      description); if not given, it will default to: [('y', True,
1094
      'Perform the operation'), ('n', False, 'Do no do the operation')];
1095
      note that the '?' char is reserved for help
1096

1097
  @return: one of the return values from the choices list; if input is
1098
      not possible (i.e. not running with a tty, we return the last
1099
      entry from the list
1100

1101
  """
1102
  if choices is None:
1103
    choices = [('y', True, 'Perform the operation'),
1104
               ('n', False, 'Do not perform the operation')]
1105
  if not choices or not isinstance(choices, list):
1106
    raise errors.ProgrammerError("Invalid choices argument to AskUser")
1107
  for entry in choices:
1108
    if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
1109
      raise errors.ProgrammerError("Invalid choices element to AskUser")
1110

    
1111
  answer = choices[-1][1]
1112
  new_text = []
1113
  for line in text.splitlines():
1114
    new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
1115
  text = "\n".join(new_text)
1116
  try:
1117
    f = file("/dev/tty", "a+")
1118
  except IOError:
1119
    return answer
1120
  try:
1121
    chars = [entry[0] for entry in choices]
1122
    chars[-1] = "[%s]" % chars[-1]
1123
    chars.append('?')
1124
    maps = dict([(entry[0], entry[1]) for entry in choices])
1125
    while True:
1126
      f.write(text)
1127
      f.write('\n')
1128
      f.write("/".join(chars))
1129
      f.write(": ")
1130
      line = f.readline(2).strip().lower()
1131
      if line in maps:
1132
        answer = maps[line]
1133
        break
1134
      elif line == '?':
1135
        for entry in choices:
1136
          f.write(" %s - %s\n" % (entry[0], entry[2]))
1137
        f.write("\n")
1138
        continue
1139
  finally:
1140
    f.close()
1141
  return answer
1142

    
1143

    
1144
class JobSubmittedException(Exception):
1145
  """Job was submitted, client should exit.
1146

1147
  This exception has one argument, the ID of the job that was
1148
  submitted. The handler should print this ID.
1149

1150
  This is not an error, just a structured way to exit from clients.
1151

1152
  """
1153

    
1154

    
1155
def SendJob(ops, cl=None):
1156
  """Function to submit an opcode without waiting for the results.
1157

1158
  @type ops: list
1159
  @param ops: list of opcodes
1160
  @type cl: luxi.Client
1161
  @param cl: the luxi client to use for communicating with the master;
1162
             if None, a new client will be created
1163

1164
  """
1165
  if cl is None:
1166
    cl = GetClient()
1167

    
1168
  job_id = cl.SubmitJob(ops)
1169

    
1170
  return job_id
1171

    
1172

    
1173
def PollJob(job_id, cl=None, feedback_fn=None):
1174
  """Function to poll for the result of a job.
1175

1176
  @type job_id: job identified
1177
  @param job_id: the job to poll for results
1178
  @type cl: luxi.Client
1179
  @param cl: the luxi client to use for communicating with the master;
1180
             if None, a new client will be created
1181

1182
  """
1183
  if cl is None:
1184
    cl = GetClient()
1185

    
1186
  prev_job_info = None
1187
  prev_logmsg_serial = None
1188

    
1189
  status = None
1190

    
1191
  notified_queued = False
1192
  notified_waitlock = False
1193

    
1194
  while True:
1195
    result = cl.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1196
                                     prev_logmsg_serial)
1197
    if not result:
1198
      # job not found, go away!
1199
      raise errors.JobLost("Job with id %s lost" % job_id)
1200
    elif result == constants.JOB_NOTCHANGED:
1201
      if status is not None and not callable(feedback_fn):
1202
        if status == constants.JOB_STATUS_QUEUED and not notified_queued:
1203
          ToStderr("Job %s is waiting in queue", job_id)
1204
          notified_queued = True
1205
        elif status == constants.JOB_STATUS_WAITLOCK and not notified_waitlock:
1206
          ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1207
          notified_waitlock = True
1208

    
1209
      # Wait again
1210
      continue
1211

    
1212
    # Split result, a tuple of (field values, log entries)
1213
    (job_info, log_entries) = result
1214
    (status, ) = job_info
1215

    
1216
    if log_entries:
1217
      for log_entry in log_entries:
1218
        (serial, timestamp, _, message) = log_entry
1219
        if callable(feedback_fn):
1220
          feedback_fn(log_entry[1:])
1221
        else:
1222
          encoded = utils.SafeEncode(message)
1223
          ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded)
1224
        prev_logmsg_serial = max(prev_logmsg_serial, serial)
1225

    
1226
    # TODO: Handle canceled and archived jobs
1227
    elif status in (constants.JOB_STATUS_SUCCESS,
1228
                    constants.JOB_STATUS_ERROR,
1229
                    constants.JOB_STATUS_CANCELING,
1230
                    constants.JOB_STATUS_CANCELED):
1231
      break
1232

    
1233
    prev_job_info = job_info
1234

    
1235
  jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1236
  if not jobs:
1237
    raise errors.JobLost("Job with id %s lost" % job_id)
1238

    
1239
  status, opstatus, result = jobs[0]
1240
  if status == constants.JOB_STATUS_SUCCESS:
1241
    return result
1242
  elif status in (constants.JOB_STATUS_CANCELING,
1243
                  constants.JOB_STATUS_CANCELED):
1244
    raise errors.OpExecError("Job was canceled")
1245
  else:
1246
    has_ok = False
1247
    for idx, (status, msg) in enumerate(zip(opstatus, result)):
1248
      if status == constants.OP_STATUS_SUCCESS:
1249
        has_ok = True
1250
      elif status == constants.OP_STATUS_ERROR:
1251
        errors.MaybeRaise(msg)
1252
        if has_ok:
1253
          raise errors.OpExecError("partial failure (opcode %d): %s" %
1254
                                   (idx, msg))
1255
        else:
1256
          raise errors.OpExecError(str(msg))
1257
    # default failure mode
1258
    raise errors.OpExecError(result)
1259

    
1260

    
1261
def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):
1262
  """Legacy function to submit an opcode.
1263

1264
  This is just a simple wrapper over the construction of the processor
1265
  instance. It should be extended to better handle feedback and
1266
  interaction functions.
1267

1268
  """
1269
  if cl is None:
1270
    cl = GetClient()
1271

    
1272
  SetGenericOpcodeOpts([op], opts)
1273

    
1274
  job_id = SendJob([op], cl)
1275

    
1276
  op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
1277

    
1278
  return op_results[0]
1279

    
1280

    
1281
def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1282
  """Wrapper around SubmitOpCode or SendJob.
1283

1284
  This function will decide, based on the 'opts' parameter, whether to
1285
  submit and wait for the result of the opcode (and return it), or
1286
  whether to just send the job and print its identifier. It is used in
1287
  order to simplify the implementation of the '--submit' option.
1288

1289
  It will also process the opcodes if we're sending the via SendJob
1290
  (otherwise SubmitOpCode does it).
1291

1292
  """
1293
  if opts and opts.submit_only:
1294
    job = [op]
1295
    SetGenericOpcodeOpts(job, opts)
1296
    job_id = SendJob(job, cl=cl)
1297
    raise JobSubmittedException(job_id)
1298
  else:
1299
    return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1300

    
1301

    
1302
def SetGenericOpcodeOpts(opcode_list, options):
1303
  """Processor for generic options.
1304

1305
  This function updates the given opcodes based on generic command
1306
  line options (like debug, dry-run, etc.).
1307

1308
  @param opcode_list: list of opcodes
1309
  @param options: command line options or None
1310
  @return: None (in-place modification)
1311

1312
  """
1313
  if not options:
1314
    return
1315
  for op in opcode_list:
1316
    op.dry_run = options.dry_run
1317
    op.debug_level = options.debug
1318

    
1319

    
1320
def GetClient():
1321
  # TODO: Cache object?
1322
  try:
1323
    client = luxi.Client()
1324
  except luxi.NoMasterError:
1325
    ss = ssconf.SimpleStore()
1326

    
1327
    # Try to read ssconf file
1328
    try:
1329
      ss.GetMasterNode()
1330
    except errors.ConfigurationError:
1331
      raise errors.OpPrereqError("Cluster not initialized or this machine is"
1332
                                 " not part of a cluster")
1333

    
1334
    master, myself = ssconf.GetMasterAndMyself(ss=ss)
1335
    if master != myself:
1336
      raise errors.OpPrereqError("This is not the master node, please connect"
1337
                                 " to node '%s' and rerun the command" %
1338
                                 master)
1339
    raise
1340
  return client
1341

    
1342

    
1343
def FormatError(err):
1344
  """Return a formatted error message for a given error.
1345

1346
  This function takes an exception instance and returns a tuple
1347
  consisting of two values: first, the recommended exit code, and
1348
  second, a string describing the error message (not
1349
  newline-terminated).
1350

1351
  """
1352
  retcode = 1
1353
  obuf = StringIO()
1354
  msg = str(err)
1355
  if isinstance(err, errors.ConfigurationError):
1356
    txt = "Corrupt configuration file: %s" % msg
1357
    logging.error(txt)
1358
    obuf.write(txt + "\n")
1359
    obuf.write("Aborting.")
1360
    retcode = 2
1361
  elif isinstance(err, errors.HooksAbort):
1362
    obuf.write("Failure: hooks execution failed:\n")
1363
    for node, script, out in err.args[0]:
1364
      if out:
1365
        obuf.write("  node: %s, script: %s, output: %s\n" %
1366
                   (node, script, out))
1367
      else:
1368
        obuf.write("  node: %s, script: %s (no output)\n" %
1369
                   (node, script))
1370
  elif isinstance(err, errors.HooksFailure):
1371
    obuf.write("Failure: hooks general failure: %s" % msg)
1372
  elif isinstance(err, errors.ResolverError):
1373
    this_host = utils.HostInfo.SysName()
1374
    if err.args[0] == this_host:
1375
      msg = "Failure: can't resolve my own hostname ('%s')"
1376
    else:
1377
      msg = "Failure: can't resolve hostname '%s'"
1378
    obuf.write(msg % err.args[0])
1379
  elif isinstance(err, errors.OpPrereqError):
1380
    if len(err.args) == 2:
1381
      obuf.write("Failure: prerequisites not met for this"
1382
               " operation:\nerror type: %s, error details:\n%s" %
1383
                 (err.args[1], err.args[0]))
1384
    else:
1385
      obuf.write("Failure: prerequisites not met for this"
1386
                 " operation:\n%s" % msg)
1387
  elif isinstance(err, errors.OpExecError):
1388
    obuf.write("Failure: command execution error:\n%s" % msg)
1389
  elif isinstance(err, errors.TagError):
1390
    obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1391
  elif isinstance(err, errors.JobQueueDrainError):
1392
    obuf.write("Failure: the job queue is marked for drain and doesn't"
1393
               " accept new requests\n")
1394
  elif isinstance(err, errors.JobQueueFull):
1395
    obuf.write("Failure: the job queue is full and doesn't accept new"
1396
               " job submissions until old jobs are archived\n")
1397
  elif isinstance(err, errors.TypeEnforcementError):
1398
    obuf.write("Parameter Error: %s" % msg)
1399
  elif isinstance(err, errors.ParameterError):
1400
    obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1401
  elif isinstance(err, errors.GenericError):
1402
    obuf.write("Unhandled Ganeti error: %s" % msg)
1403
  elif isinstance(err, luxi.NoMasterError):
1404
    obuf.write("Cannot communicate with the master daemon.\nIs it running"
1405
               " and listening for connections?")
1406
  elif isinstance(err, luxi.TimeoutError):
1407
    obuf.write("Timeout while talking to the master daemon. Error:\n"
1408
               "%s" % msg)
1409
  elif isinstance(err, luxi.ProtocolError):
1410
    obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1411
               "%s" % msg)
1412
  elif isinstance(err, JobSubmittedException):
1413
    obuf.write("JobID: %s\n" % err.args[0])
1414
    retcode = 0
1415
  else:
1416
    obuf.write("Unhandled exception: %s" % msg)
1417
  return retcode, obuf.getvalue().rstrip('\n')
1418

    
1419

    
1420
def GenericMain(commands, override=None, aliases=None):
1421
  """Generic main function for all the gnt-* commands.
1422

1423
  Arguments:
1424
    - commands: a dictionary with a special structure, see the design doc
1425
                for command line handling.
1426
    - override: if not None, we expect a dictionary with keys that will
1427
                override command line options; this can be used to pass
1428
                options from the scripts to generic functions
1429
    - aliases: dictionary with command aliases {'alias': 'target, ...}
1430

1431
  """
1432
  # save the program name and the entire command line for later logging
1433
  if sys.argv:
1434
    binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1435
    if len(sys.argv) >= 2:
1436
      binary += " " + sys.argv[1]
1437
      old_cmdline = " ".join(sys.argv[2:])
1438
    else:
1439
      old_cmdline = ""
1440
  else:
1441
    binary = "<unknown program>"
1442
    old_cmdline = ""
1443

    
1444
  if aliases is None:
1445
    aliases = {}
1446

    
1447
  try:
1448
    func, options, args = _ParseArgs(sys.argv, commands, aliases)
1449
  except errors.ParameterError, err:
1450
    result, err_msg = FormatError(err)
1451
    ToStderr(err_msg)
1452
    return 1
1453

    
1454
  if func is None: # parse error
1455
    return 1
1456

    
1457
  if override is not None:
1458
    for key, val in override.iteritems():
1459
      setattr(options, key, val)
1460

    
1461
  utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1462
                     stderr_logging=True, program=binary)
1463

    
1464
  if old_cmdline:
1465
    logging.info("run with arguments '%s'", old_cmdline)
1466
  else:
1467
    logging.info("run with no arguments")
1468

    
1469
  try:
1470
    result = func(options, args)
1471
  except (errors.GenericError, luxi.ProtocolError,
1472
          JobSubmittedException), err:
1473
    result, err_msg = FormatError(err)
1474
    logging.exception("Error during command processing")
1475
    ToStderr(err_msg)
1476

    
1477
  return result
1478

    
1479

    
1480
def GenericInstanceCreate(mode, opts, args):
1481
  """Add an instance to the cluster via either creation or import.
1482

1483
  @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1484
  @param opts: the command line options selected by the user
1485
  @type args: list
1486
  @param args: should contain only one element, the new instance name
1487
  @rtype: int
1488
  @return: the desired exit code
1489

1490
  """
1491
  instance = args[0]
1492

    
1493
  (pnode, snode) = SplitNodeOption(opts.node)
1494

    
1495
  hypervisor = None
1496
  hvparams = {}
1497
  if opts.hypervisor:
1498
    hypervisor, hvparams = opts.hypervisor
1499

    
1500
  if opts.nics:
1501
    try:
1502
      nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
1503
    except ValueError, err:
1504
      raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
1505
    nics = [{}] * nic_max
1506
    for nidx, ndict in opts.nics:
1507
      nidx = int(nidx)
1508
      if not isinstance(ndict, dict):
1509
        msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict)
1510
        raise errors.OpPrereqError(msg)
1511
      nics[nidx] = ndict
1512
  elif opts.no_nics:
1513
    # no nics
1514
    nics = []
1515
  else:
1516
    # default of one nic, all auto
1517
    nics = [{}]
1518

    
1519
  if opts.disk_template == constants.DT_DISKLESS:
1520
    if opts.disks or opts.sd_size is not None:
1521
      raise errors.OpPrereqError("Diskless instance but disk"
1522
                                 " information passed")
1523
    disks = []
1524
  else:
1525
    if not opts.disks and not opts.sd_size:
1526
      raise errors.OpPrereqError("No disk information specified")
1527
    if opts.disks and opts.sd_size is not None:
1528
      raise errors.OpPrereqError("Please use either the '--disk' or"
1529
                                 " '-s' option")
1530
    if opts.sd_size is not None:
1531
      opts.disks = [(0, {"size": opts.sd_size})]
1532
    try:
1533
      disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1534
    except ValueError, err:
1535
      raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
1536
    disks = [{}] * disk_max
1537
    for didx, ddict in opts.disks:
1538
      didx = int(didx)
1539
      if not isinstance(ddict, dict):
1540
        msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1541
        raise errors.OpPrereqError(msg)
1542
      elif "size" in ddict:
1543
        if "adopt" in ddict:
1544
          raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1545
                                     " (disk %d)" % didx)
1546
        try:
1547
          ddict["size"] = utils.ParseUnit(ddict["size"])
1548
        except ValueError, err:
1549
          raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1550
                                     (didx, err))
1551
      elif "adopt" in ddict:
1552
        if mode == constants.INSTANCE_IMPORT:
1553
          raise errors.OpPrereqError("Disk adoption not allowed for instance"
1554
                                     " import")
1555
        ddict["size"] = 0
1556
      else:
1557
        raise errors.OpPrereqError("Missing size or adoption source for"
1558
                                   " disk %d" % didx)
1559
      disks[didx] = ddict
1560

    
1561
  utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
1562
  utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1563

    
1564
  if mode == constants.INSTANCE_CREATE:
1565
    start = opts.start
1566
    os_type = opts.os
1567
    src_node = None
1568
    src_path = None
1569
    no_install = opts.no_install
1570
  elif mode == constants.INSTANCE_IMPORT:
1571
    start = False
1572
    os_type = None
1573
    src_node = opts.src_node
1574
    src_path = opts.src_dir
1575
    no_install = None
1576
  else:
1577
    raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1578

    
1579
  op = opcodes.OpCreateInstance(instance_name=instance,
1580
                                disks=disks,
1581
                                disk_template=opts.disk_template,
1582
                                nics=nics,
1583
                                pnode=pnode, snode=snode,
1584
                                ip_check=opts.ip_check,
1585
                                name_check=opts.name_check,
1586
                                wait_for_sync=opts.wait_for_sync,
1587
                                file_storage_dir=opts.file_storage_dir,
1588
                                file_driver=opts.file_driver,
1589
                                iallocator=opts.iallocator,
1590
                                hypervisor=hypervisor,
1591
                                hvparams=hvparams,
1592
                                beparams=opts.beparams,
1593
                                mode=mode,
1594
                                start=start,
1595
                                os_type=os_type,
1596
                                src_node=src_node,
1597
                                src_path=src_path,
1598
                                no_install=no_install)
1599

    
1600
  SubmitOrSend(op, opts)
1601
  return 0
1602

    
1603

    
1604
class _RunWhileClusterStoppedHelper:
1605
  """Helper class for L{RunWhileClusterStopped} to simplify state management
1606

1607
  """
1608
  def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
1609
    """Initializes this class.
1610

1611
    @type feedback_fn: callable
1612
    @param feedback_fn: Feedback function
1613
    @type cluster_name: string
1614
    @param cluster_name: Cluster name
1615
    @type master_node: string
1616
    @param master_node Master node name
1617
    @type online_nodes: list
1618
    @param online_nodes: List of names of online nodes
1619

1620
    """
1621
    self.feedback_fn = feedback_fn
1622
    self.cluster_name = cluster_name
1623
    self.master_node = master_node
1624
    self.online_nodes = online_nodes
1625

    
1626
    self.ssh = ssh.SshRunner(self.cluster_name)
1627

    
1628
    self.nonmaster_nodes = [name for name in online_nodes
1629
                            if name != master_node]
1630

    
1631
    assert self.master_node not in self.nonmaster_nodes
1632

    
1633
  def _RunCmd(self, node_name, cmd):
1634
    """Runs a command on the local or a remote machine.
1635

1636
    @type node_name: string
1637
    @param node_name: Machine name
1638
    @type cmd: list
1639
    @param cmd: Command
1640

1641
    """
1642
    if node_name is None or node_name == self.master_node:
1643
      # No need to use SSH
1644
      result = utils.RunCmd(cmd)
1645
    else:
1646
      result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
1647

    
1648
    if result.failed:
1649
      errmsg = ["Failed to run command %s" % result.cmd]
1650
      if node_name:
1651
        errmsg.append("on node %s" % node_name)
1652
      errmsg.append(": exitcode %s and error %s" %
1653
                    (result.exit_code, result.output))
1654
      raise errors.OpExecError(" ".join(errmsg))
1655

    
1656
  def Call(self, fn, *args):
1657
    """Call function while all daemons are stopped.
1658

1659
    @type fn: callable
1660
    @param fn: Function to be called
1661

1662
    """
1663
    # Pause watcher by acquiring an exclusive lock on watcher state file
1664
    self.feedback_fn("Blocking watcher")
1665
    watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
1666
    try:
1667
      # TODO: Currently, this just blocks. There's no timeout.
1668
      # TODO: Should it be a shared lock?
1669
      watcher_block.Exclusive(blocking=True)
1670

    
1671
      # Stop master daemons, so that no new jobs can come in and all running
1672
      # ones are finished
1673
      self.feedback_fn("Stopping master daemons")
1674
      self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
1675
      try:
1676
        # Stop daemons on all nodes
1677
        for node_name in self.online_nodes:
1678
          self.feedback_fn("Stopping daemons on %s" % node_name)
1679
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
1680

    
1681
        # All daemons are shut down now
1682
        try:
1683
          return fn(self, *args)
1684
        except Exception, err:
1685
          _, errmsg = FormatError(err)
1686
          logging.exception("Caught exception")
1687
          self.feedback_fn(errmsg)
1688
          raise
1689
      finally:
1690
        # Start cluster again, master node last
1691
        for node_name in self.nonmaster_nodes + [self.master_node]:
1692
          self.feedback_fn("Starting daemons on %s" % node_name)
1693
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
1694
    finally:
1695
      # Resume watcher
1696
      watcher_block.Close()
1697

    
1698

    
1699
def RunWhileClusterStopped(feedback_fn, fn, *args):
1700
  """Calls a function while all cluster daemons are stopped.
1701

1702
  @type feedback_fn: callable
1703
  @param feedback_fn: Feedback function
1704
  @type fn: callable
1705
  @param fn: Function to be called when daemons are stopped
1706

1707
  """
1708
  feedback_fn("Gathering cluster information")
1709

    
1710
  # This ensures we're running on the master daemon
1711
  cl = GetClient()
1712

    
1713
  (cluster_name, master_node) = \
1714
    cl.QueryConfigValues(["cluster_name", "master_node"])
1715

    
1716
  online_nodes = GetOnlineNodes([], cl=cl)
1717

    
1718
  # Don't keep a reference to the client. The master daemon will go away.
1719
  del cl
1720

    
1721
  assert master_node in online_nodes
1722

    
1723
  return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
1724
                                       online_nodes).Call(fn, *args)
1725

    
1726

    
1727
def GenerateTable(headers, fields, separator, data,
1728
                  numfields=None, unitfields=None,
1729
                  units=None):
1730
  """Prints a table with headers and different fields.
1731

1732
  @type headers: dict
1733
  @param headers: dictionary mapping field names to headers for
1734
      the table
1735
  @type fields: list
1736
  @param fields: the field names corresponding to each row in
1737
      the data field
1738
  @param separator: the separator to be used; if this is None,
1739
      the default 'smart' algorithm is used which computes optimal
1740
      field width, otherwise just the separator is used between
1741
      each field
1742
  @type data: list
1743
  @param data: a list of lists, each sublist being one row to be output
1744
  @type numfields: list
1745
  @param numfields: a list with the fields that hold numeric
1746
      values and thus should be right-aligned
1747
  @type unitfields: list
1748
  @param unitfields: a list with the fields that hold numeric
1749
      values that should be formatted with the units field
1750
  @type units: string or None
1751
  @param units: the units we should use for formatting, or None for
1752
      automatic choice (human-readable for non-separator usage, otherwise
1753
      megabytes); this is a one-letter string
1754

1755
  """
1756
  if units is None:
1757
    if separator:
1758
      units = "m"
1759
    else:
1760
      units = "h"
1761

    
1762
  if numfields is None:
1763
    numfields = []
1764
  if unitfields is None:
1765
    unitfields = []
1766

    
1767
  numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
1768
  unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
1769

    
1770
  format_fields = []
1771
  for field in fields:
1772
    if headers and field not in headers:
1773
      # TODO: handle better unknown fields (either revert to old
1774
      # style of raising exception, or deal more intelligently with
1775
      # variable fields)
1776
      headers[field] = field
1777
    if separator is not None:
1778
      format_fields.append("%s")
1779
    elif numfields.Matches(field):
1780
      format_fields.append("%*s")
1781
    else:
1782
      format_fields.append("%-*s")
1783

    
1784
  if separator is None:
1785
    mlens = [0 for name in fields]
1786
    format = ' '.join(format_fields)
1787
  else:
1788
    format = separator.replace("%", "%%").join(format_fields)
1789

    
1790
  for row in data:
1791
    if row is None:
1792
      continue
1793
    for idx, val in enumerate(row):
1794
      if unitfields.Matches(fields[idx]):
1795
        try:
1796
          val = int(val)
1797
        except (TypeError, ValueError):
1798
          pass
1799
        else:
1800
          val = row[idx] = utils.FormatUnit(val, units)
1801
      val = row[idx] = str(val)
1802
      if separator is None:
1803
        mlens[idx] = max(mlens[idx], len(val))
1804

    
1805
  result = []
1806
  if headers:
1807
    args = []
1808
    for idx, name in enumerate(fields):
1809
      hdr = headers[name]
1810
      if separator is None:
1811
        mlens[idx] = max(mlens[idx], len(hdr))
1812
        args.append(mlens[idx])
1813
      args.append(hdr)
1814
    result.append(format % tuple(args))
1815

    
1816
  if separator is None:
1817
    assert len(mlens) == len(fields)
1818

    
1819
    if fields and not numfields.Matches(fields[-1]):
1820
      mlens[-1] = 0
1821

    
1822
  for line in data:
1823
    args = []
1824
    if line is None:
1825
      line = ['-' for _ in fields]
1826
    for idx in range(len(fields)):
1827
      if separator is None:
1828
        args.append(mlens[idx])
1829
      args.append(line[idx])
1830
    result.append(format % tuple(args))
1831

    
1832
  return result
1833

    
1834

    
1835
def FormatTimestamp(ts):
1836
  """Formats a given timestamp.
1837

1838
  @type ts: timestamp
1839
  @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
1840

1841
  @rtype: string
1842
  @return: a string with the formatted timestamp
1843

1844
  """
1845
  if not isinstance (ts, (tuple, list)) or len(ts) != 2:
1846
    return '?'
1847
  sec, usec = ts
1848
  return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
1849

    
1850

    
1851
def ParseTimespec(value):
1852
  """Parse a time specification.
1853

1854
  The following suffixed will be recognized:
1855

1856
    - s: seconds
1857
    - m: minutes
1858
    - h: hours
1859
    - d: day
1860
    - w: weeks
1861

1862
  Without any suffix, the value will be taken to be in seconds.
1863

1864
  """
1865
  value = str(value)
1866
  if not value:
1867
    raise errors.OpPrereqError("Empty time specification passed")
1868
  suffix_map = {
1869
    's': 1,
1870
    'm': 60,
1871
    'h': 3600,
1872
    'd': 86400,
1873
    'w': 604800,
1874
    }
1875
  if value[-1] not in suffix_map:
1876
    try:
1877
      value = int(value)
1878
    except (TypeError, ValueError):
1879
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1880
  else:
1881
    multiplier = suffix_map[value[-1]]
1882
    value = value[:-1]
1883
    if not value: # no data left after stripping the suffix
1884
      raise errors.OpPrereqError("Invalid time specification (only"
1885
                                 " suffix passed)")
1886
    try:
1887
      value = int(value) * multiplier
1888
    except (TypeError, ValueError):
1889
      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1890
  return value
1891

    
1892

    
1893
def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
1894
                   filter_master=False):
1895
  """Returns the names of online nodes.
1896

1897
  This function will also log a warning on stderr with the names of
1898
  the online nodes.
1899

1900
  @param nodes: if not empty, use only this subset of nodes (minus the
1901
      offline ones)
1902
  @param cl: if not None, luxi client to use
1903
  @type nowarn: boolean
1904
  @param nowarn: by default, this function will output a note with the
1905
      offline nodes that are skipped; if this parameter is True the
1906
      note is not displayed
1907
  @type secondary_ips: boolean
1908
  @param secondary_ips: if True, return the secondary IPs instead of the
1909
      names, useful for doing network traffic over the replication interface
1910
      (if any)
1911
  @type filter_master: boolean
1912
  @param filter_master: if True, do not return the master node in the list
1913
      (useful in coordination with secondary_ips where we cannot check our
1914
      node name against the list)
1915

1916
  """
1917
  if cl is None:
1918
    cl = GetClient()
1919

    
1920
  if secondary_ips:
1921
    name_idx = 2
1922
  else:
1923
    name_idx = 0
1924

    
1925
  if filter_master:
1926
    master_node = cl.QueryConfigValues(["master_node"])[0]
1927
    filter_fn = lambda x: x != master_node
1928
  else:
1929
    filter_fn = lambda _: True
1930

    
1931
  result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"],
1932
                         use_locking=False)
1933
  offline = [row[0] for row in result if row[1]]
1934
  if offline and not nowarn:
1935
    ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
1936
  return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])]
1937

    
1938

    
1939
def _ToStream(stream, txt, *args):
1940
  """Write a message to a stream, bypassing the logging system
1941

1942
  @type stream: file object
1943
  @param stream: the file to which we should write
1944
  @type txt: str
1945
  @param txt: the message
1946

1947
  """
1948
  if args:
1949
    args = tuple(args)
1950
    stream.write(txt % args)
1951
  else:
1952
    stream.write(txt)
1953
  stream.write('\n')
1954
  stream.flush()
1955

    
1956

    
1957
def ToStdout(txt, *args):
1958
  """Write a message to stdout only, bypassing the logging system
1959

1960
  This is just a wrapper over _ToStream.
1961

1962
  @type txt: str
1963
  @param txt: the message
1964

1965
  """
1966
  _ToStream(sys.stdout, txt, *args)
1967

    
1968

    
1969
def ToStderr(txt, *args):
1970
  """Write a message to stderr only, bypassing the logging system
1971

1972
  This is just a wrapper over _ToStream.
1973

1974
  @type txt: str
1975
  @param txt: the message
1976

1977
  """
1978
  _ToStream(sys.stderr, txt, *args)
1979

    
1980

    
1981
class JobExecutor(object):
1982
  """Class which manages the submission and execution of multiple jobs.
1983

1984
  Note that instances of this class should not be reused between
1985
  GetResults() calls.
1986

1987
  """
1988
  def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
1989
    self.queue = []
1990
    if cl is None:
1991
      cl = GetClient()
1992
    self.cl = cl
1993
    self.verbose = verbose
1994
    self.jobs = []
1995
    self.opts = opts
1996
    self.feedback_fn = feedback_fn
1997

    
1998
  def QueueJob(self, name, *ops):
1999
    """Record a job for later submit.
2000

2001
    @type name: string
2002
    @param name: a description of the job, will be used in WaitJobSet
2003
    """
2004
    SetGenericOpcodeOpts(ops, self.opts)
2005
    self.queue.append((name, ops))
2006

    
2007
  def SubmitPending(self):
2008
    """Submit all pending jobs.
2009

2010
    """
2011
    results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
2012
    for (idx, ((status, data), (name, _))) in enumerate(zip(results,
2013
                                                            self.queue)):
2014
      self.jobs.append((idx, status, data, name))
2015

    
2016
  def _ChooseJob(self):
2017
    """Choose a non-waiting/queued job to poll next.
2018

2019
    """
2020
    assert self.jobs, "_ChooseJob called with empty job list"
2021

    
2022
    result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
2023
    assert result
2024

    
2025
    for job_data, status in zip(self.jobs, result):
2026
      if status[0] in (constants.JOB_STATUS_QUEUED,
2027
                    constants.JOB_STATUS_WAITLOCK,
2028
                    constants.JOB_STATUS_CANCELING):
2029
        # job is still waiting
2030
        continue
2031
      # good candidate found
2032
      self.jobs.remove(job_data)
2033
      return job_data
2034

    
2035
    # no job found
2036
    return self.jobs.pop(0)
2037

    
2038
  def GetResults(self):
2039
    """Wait for and return the results of all jobs.
2040

2041
    @rtype: list
2042
    @return: list of tuples (success, job results), in the same order
2043
        as the submitted jobs; if a job has failed, instead of the result
2044
        there will be the error message
2045

2046
    """
2047
    if not self.jobs:
2048
      self.SubmitPending()
2049
    results = []
2050
    if self.verbose:
2051
      ok_jobs = [row[2] for row in self.jobs if row[1]]
2052
      if ok_jobs:
2053
        ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2054

    
2055
    # first, remove any non-submitted jobs
2056
    self.jobs, failures = utils.partition(self.jobs, lambda x: x[1])
2057
    for idx, _, jid, name in failures:
2058
      ToStderr("Failed to submit job for %s: %s", name, jid)
2059
      results.append((idx, False, jid))
2060

    
2061
    while self.jobs:
2062
      (idx, _, jid, name) = self._ChooseJob()
2063
      ToStdout("Waiting for job %s for %s...", jid, name)
2064
      try:
2065
        job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2066
        success = True
2067
      except (errors.GenericError, luxi.ProtocolError), err:
2068
        _, job_result = FormatError(err)
2069
        success = False
2070
        # the error message will always be shown, verbose or not
2071
        ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
2072

    
2073
      results.append((idx, success, job_result))
2074

    
2075
    # sort based on the index, then drop it
2076
    results.sort()
2077
    results = [i[1:] for i in results]
2078

    
2079
    return results
2080

    
2081
  def WaitOrShow(self, wait):
2082
    """Wait for job results or only print the job IDs.
2083

2084
    @type wait: boolean
2085
    @param wait: whether to wait or not
2086

2087
    """
2088
    if wait:
2089
      return self.GetResults()
2090
    else:
2091
      if not self.jobs:
2092
        self.SubmitPending()
2093
      for status, result, name in self.jobs:
2094
        if status:
2095
          ToStdout("%s: %s", result, name)
2096
        else:
2097
          ToStderr("Failure for %s: %s", name, result)