Fix breakage introduced by commit 8044bf655
[ganeti-local] / lib / cli.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module dealing with command line parsing"""
23
24
25 import sys
26 import textwrap
27 import os.path
28 import time
29 import logging
30 from cStringIO import StringIO
31
32 from ganeti import utils
33 from ganeti import errors
34 from ganeti import constants
35 from ganeti import opcodes
36 from ganeti import luxi
37 from ganeti import ssconf
38 from ganeti import rpc
39 from ganeti import ssh
40 from ganeti import compat
41 from ganeti import netutils
42
43 from optparse import (OptionParser, TitledHelpFormatter,
44                       Option, OptionValueError)
45
46
47 __all__ = [
48   # Command line options
49   "ADD_UIDS_OPT",
50   "ALLOCATABLE_OPT",
51   "ALL_OPT",
52   "AUTO_PROMOTE_OPT",
53   "AUTO_REPLACE_OPT",
54   "BACKEND_OPT",
55   "CLEANUP_OPT",
56   "CLUSTER_DOMAIN_SECRET_OPT",
57   "CONFIRM_OPT",
58   "CP_SIZE_OPT",
59   "DEBUG_OPT",
60   "DEBUG_SIMERR_OPT",
61   "DISKIDX_OPT",
62   "DISK_OPT",
63   "DISK_TEMPLATE_OPT",
64   "DRAINED_OPT",
65   "DRY_RUN_OPT",
66   "DRBD_HELPER_OPT",
67   "EARLY_RELEASE_OPT",
68   "ENABLED_HV_OPT",
69   "ERROR_CODES_OPT",
70   "FIELDS_OPT",
71   "FILESTORE_DIR_OPT",
72   "FILESTORE_DRIVER_OPT",
73   "FORCE_OPT",
74   "FORCE_VARIANT_OPT",
75   "GLOBAL_FILEDIR_OPT",
76   "HVLIST_OPT",
77   "HVOPTS_OPT",
78   "HYPERVISOR_OPT",
79   "IALLOCATOR_OPT",
80   "DEFAULT_IALLOCATOR_OPT",
81   "IDENTIFY_DEFAULTS_OPT",
82   "IGNORE_CONSIST_OPT",
83   "IGNORE_FAILURES_OPT",
84   "IGNORE_REMOVE_FAILURES_OPT",
85   "IGNORE_SECONDARIES_OPT",
86   "IGNORE_SIZE_OPT",
87   "MAC_PREFIX_OPT",
88   "MAINTAIN_NODE_HEALTH_OPT",
89   "MASTER_NETDEV_OPT",
90   "MC_OPT",
91   "MIGRATION_MODE_OPT",
92   "NET_OPT",
93   "NEW_CLUSTER_CERT_OPT",
94   "NEW_CLUSTER_DOMAIN_SECRET_OPT",
95   "NEW_CONFD_HMAC_KEY_OPT",
96   "NEW_RAPI_CERT_OPT",
97   "NEW_SECONDARY_OPT",
98   "NIC_PARAMS_OPT",
99   "NODE_LIST_OPT",
100   "NODE_PLACEMENT_OPT",
101   "NODRBD_STORAGE_OPT",
102   "NOHDR_OPT",
103   "NOIPCHECK_OPT",
104   "NO_INSTALL_OPT",
105   "NONAMECHECK_OPT",
106   "NOLVM_STORAGE_OPT",
107   "NOMODIFY_ETCHOSTS_OPT",
108   "NOMODIFY_SSH_SETUP_OPT",
109   "NONICS_OPT",
110   "NONLIVE_OPT",
111   "NONPLUS1_OPT",
112   "NOSHUTDOWN_OPT",
113   "NOSTART_OPT",
114   "NOSSH_KEYCHECK_OPT",
115   "NOVOTING_OPT",
116   "NWSYNC_OPT",
117   "ON_PRIMARY_OPT",
118   "ON_SECONDARY_OPT",
119   "OFFLINE_OPT",
120   "OSPARAMS_OPT",
121   "OS_OPT",
122   "OS_SIZE_OPT",
123   "RAPI_CERT_OPT",
124   "READD_OPT",
125   "REBOOT_TYPE_OPT",
126   "REMOVE_INSTANCE_OPT",
127   "REMOVE_UIDS_OPT",
128   "RESERVED_LVS_OPT",
129   "ROMAN_OPT",
130   "SECONDARY_IP_OPT",
131   "SELECT_OS_OPT",
132   "SEP_OPT",
133   "SHOWCMD_OPT",
134   "SHUTDOWN_TIMEOUT_OPT",
135   "SINGLE_NODE_OPT",
136   "SRC_DIR_OPT",
137   "SRC_NODE_OPT",
138   "SUBMIT_OPT",
139   "STATIC_OPT",
140   "SYNC_OPT",
141   "TAG_SRC_OPT",
142   "TIMEOUT_OPT",
143   "UIDPOOL_OPT",
144   "USEUNITS_OPT",
145   "USE_REPL_NET_OPT",
146   "VERBOSE_OPT",
147   "VG_NAME_OPT",
148   "YES_DOIT_OPT",
149   # Generic functions for CLI programs
150   "GenericMain",
151   "GenericInstanceCreate",
152   "GetClient",
153   "GetOnlineNodes",
154   "JobExecutor",
155   "JobSubmittedException",
156   "ParseTimespec",
157   "RunWhileClusterStopped",
158   "SubmitOpCode",
159   "SubmitOrSend",
160   "UsesRPC",
161   # Formatting functions
162   "ToStderr", "ToStdout",
163   "FormatError",
164   "GenerateTable",
165   "AskUser",
166   "FormatTimestamp",
167   "FormatLogMessage",
168   # Tags functions
169   "ListTags",
170   "AddTags",
171   "RemoveTags",
172   # command line options support infrastructure
173   "ARGS_MANY_INSTANCES",
174   "ARGS_MANY_NODES",
175   "ARGS_NONE",
176   "ARGS_ONE_INSTANCE",
177   "ARGS_ONE_NODE",
178   "ARGS_ONE_OS",
179   "ArgChoice",
180   "ArgCommand",
181   "ArgFile",
182   "ArgHost",
183   "ArgInstance",
184   "ArgJobId",
185   "ArgNode",
186   "ArgOs",
187   "ArgSuggest",
188   "ArgUnknown",
189   "OPT_COMPL_INST_ADD_NODES",
190   "OPT_COMPL_MANY_NODES",
191   "OPT_COMPL_ONE_IALLOCATOR",
192   "OPT_COMPL_ONE_INSTANCE",
193   "OPT_COMPL_ONE_NODE",
194   "OPT_COMPL_ONE_OS",
195   "cli_option",
196   "SplitNodeOption",
197   "CalculateOSNames",
198   "ParseFields",
199   ]
200
201 NO_PREFIX = "no_"
202 UN_PREFIX = "-"
203
204
205 class _Argument:
206   def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
207     self.min = min
208     self.max = max
209
210   def __repr__(self):
211     return ("<%s min=%s max=%s>" %
212             (self.__class__.__name__, self.min, self.max))
213
214
215 class ArgSuggest(_Argument):
216   """Suggesting argument.
217
218   Value can be any of the ones passed to the constructor.
219
220   """
221   # pylint: disable-msg=W0622
222   def __init__(self, min=0, max=None, choices=None):
223     _Argument.__init__(self, min=min, max=max)
224     self.choices = choices
225
226   def __repr__(self):
227     return ("<%s min=%s max=%s choices=%r>" %
228             (self.__class__.__name__, self.min, self.max, self.choices))
229
230
231 class ArgChoice(ArgSuggest):
232   """Choice argument.
233
234   Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
235   but value must be one of the choices.
236
237   """
238
239
240 class ArgUnknown(_Argument):
241   """Unknown argument to program (e.g. determined at runtime).
242
243   """
244
245
246 class ArgInstance(_Argument):
247   """Instances argument.
248
249   """
250
251
252 class ArgNode(_Argument):
253   """Node argument.
254
255   """
256
257 class ArgJobId(_Argument):
258   """Job ID argument.
259
260   """
261
262
263 class ArgFile(_Argument):
264   """File path argument.
265
266   """
267
268
269 class ArgCommand(_Argument):
270   """Command argument.
271
272   """
273
274
275 class ArgHost(_Argument):
276   """Host argument.
277
278   """
279
280
281 class ArgOs(_Argument):
282   """OS argument.
283
284   """
285
286
287 ARGS_NONE = []
288 ARGS_MANY_INSTANCES = [ArgInstance()]
289 ARGS_MANY_NODES = [ArgNode()]
290 ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
291 ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
292 ARGS_ONE_OS = [ArgOs(min=1, max=1)]
293
294
295 def _ExtractTagsObject(opts, args):
296   """Extract the tag type object.
297
298   Note that this function will modify its args parameter.
299
300   """
301   if not hasattr(opts, "tag_type"):
302     raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
303   kind = opts.tag_type
304   if kind == constants.TAG_CLUSTER:
305     retval = kind, kind
306   elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
307     if not args:
308       raise errors.OpPrereqError("no arguments passed to the command")
309     name = args.pop(0)
310     retval = kind, name
311   else:
312     raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
313   return retval
314
315
316 def _ExtendTags(opts, args):
317   """Extend the args if a source file has been given.
318
319   This function will extend the tags with the contents of the file
320   passed in the 'tags_source' attribute of the opts parameter. A file
321   named '-' will be replaced by stdin.
322
323   """
324   fname = opts.tags_source
325   if fname is None:
326     return
327   if fname == "-":
328     new_fh = sys.stdin
329   else:
330     new_fh = open(fname, "r")
331   new_data = []
332   try:
333     # we don't use the nice 'new_data = [line.strip() for line in fh]'
334     # because of python bug 1633941
335     while True:
336       line = new_fh.readline()
337       if not line:
338         break
339       new_data.append(line.strip())
340   finally:
341     new_fh.close()
342   args.extend(new_data)
343
344
345 def ListTags(opts, args):
346   """List the tags on a given object.
347
348   This is a generic implementation that knows how to deal with all
349   three cases of tag objects (cluster, node, instance). The opts
350   argument is expected to contain a tag_type field denoting what
351   object type we work on.
352
353   """
354   kind, name = _ExtractTagsObject(opts, args)
355   cl = GetClient()
356   result = cl.QueryTags(kind, name)
357   result = list(result)
358   result.sort()
359   for tag in result:
360     ToStdout(tag)
361
362
363 def AddTags(opts, args):
364   """Add tags on a given object.
365
366   This is a generic implementation that knows how to deal with all
367   three cases of tag objects (cluster, node, instance). The opts
368   argument is expected to contain a tag_type field denoting what
369   object type we work on.
370
371   """
372   kind, name = _ExtractTagsObject(opts, args)
373   _ExtendTags(opts, args)
374   if not args:
375     raise errors.OpPrereqError("No tags to be added")
376   op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
377   SubmitOpCode(op)
378
379
380 def RemoveTags(opts, args):
381   """Remove tags from a given object.
382
383   This is a generic implementation that knows how to deal with all
384   three cases of tag objects (cluster, node, instance). The opts
385   argument is expected to contain a tag_type field denoting what
386   object type we work on.
387
388   """
389   kind, name = _ExtractTagsObject(opts, args)
390   _ExtendTags(opts, args)
391   if not args:
392     raise errors.OpPrereqError("No tags to be removed")
393   op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
394   SubmitOpCode(op)
395
396
397 def check_unit(option, opt, value): # pylint: disable-msg=W0613
398   """OptParsers custom converter for units.
399
400   """
401   try:
402     return utils.ParseUnit(value)
403   except errors.UnitParseError, err:
404     raise OptionValueError("option %s: %s" % (opt, err))
405
406
407 def _SplitKeyVal(opt, data):
408   """Convert a KeyVal string into a dict.
409
410   This function will convert a key=val[,...] string into a dict. Empty
411   values will be converted specially: keys which have the prefix 'no_'
412   will have the value=False and the prefix stripped, the others will
413   have value=True.
414
415   @type opt: string
416   @param opt: a string holding the option name for which we process the
417       data, used in building error messages
418   @type data: string
419   @param data: a string of the format key=val,key=val,...
420   @rtype: dict
421   @return: {key=val, key=val}
422   @raises errors.ParameterError: if there are duplicate keys
423
424   """
425   kv_dict = {}
426   if data:
427     for elem in utils.UnescapeAndSplit(data, sep=","):
428       if "=" in elem:
429         key, val = elem.split("=", 1)
430       else:
431         if elem.startswith(NO_PREFIX):
432           key, val = elem[len(NO_PREFIX):], False
433         elif elem.startswith(UN_PREFIX):
434           key, val = elem[len(UN_PREFIX):], None
435         else:
436           key, val = elem, True
437       if key in kv_dict:
438         raise errors.ParameterError("Duplicate key '%s' in option %s" %
439                                     (key, opt))
440       kv_dict[key] = val
441   return kv_dict
442
443
444 def check_ident_key_val(option, opt, value):  # pylint: disable-msg=W0613
445   """Custom parser for ident:key=val,key=val options.
446
447   This will store the parsed values as a tuple (ident, {key: val}). As such,
448   multiple uses of this option via action=append is possible.
449
450   """
451   if ":" not in value:
452     ident, rest = value, ''
453   else:
454     ident, rest = value.split(":", 1)
455
456   if ident.startswith(NO_PREFIX):
457     if rest:
458       msg = "Cannot pass options when removing parameter groups: %s" % value
459       raise errors.ParameterError(msg)
460     retval = (ident[len(NO_PREFIX):], False)
461   elif ident.startswith(UN_PREFIX):
462     if rest:
463       msg = "Cannot pass options when removing parameter groups: %s" % value
464       raise errors.ParameterError(msg)
465     retval = (ident[len(UN_PREFIX):], None)
466   else:
467     kv_dict = _SplitKeyVal(opt, rest)
468     retval = (ident, kv_dict)
469   return retval
470
471
472 def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
473   """Custom parser class for key=val,key=val options.
474
475   This will store the parsed values as a dict {key: val}.
476
477   """
478   return _SplitKeyVal(opt, value)
479
480
481 def check_bool(option, opt, value): # pylint: disable-msg=W0613
482   """Custom parser for yes/no options.
483
484   This will store the parsed value as either True or False.
485
486   """
487   value = value.lower()
488   if value == constants.VALUE_FALSE or value == "no":
489     return False
490   elif value == constants.VALUE_TRUE or value == "yes":
491     return True
492   else:
493     raise errors.ParameterError("Invalid boolean value '%s'" % value)
494
495
496 # completion_suggestion is normally a list. Using numeric values not evaluating
497 # to False for dynamic completion.
498 (OPT_COMPL_MANY_NODES,
499  OPT_COMPL_ONE_NODE,
500  OPT_COMPL_ONE_INSTANCE,
501  OPT_COMPL_ONE_OS,
502  OPT_COMPL_ONE_IALLOCATOR,
503  OPT_COMPL_INST_ADD_NODES) = range(100, 106)
504
505 OPT_COMPL_ALL = frozenset([
506   OPT_COMPL_MANY_NODES,
507   OPT_COMPL_ONE_NODE,
508   OPT_COMPL_ONE_INSTANCE,
509   OPT_COMPL_ONE_OS,
510   OPT_COMPL_ONE_IALLOCATOR,
511   OPT_COMPL_INST_ADD_NODES,
512   ])
513
514
515 class CliOption(Option):
516   """Custom option class for optparse.
517
518   """
519   ATTRS = Option.ATTRS + [
520     "completion_suggest",
521     ]
522   TYPES = Option.TYPES + (
523     "identkeyval",
524     "keyval",
525     "unit",
526     "bool",
527     )
528   TYPE_CHECKER = Option.TYPE_CHECKER.copy()
529   TYPE_CHECKER["identkeyval"] = check_ident_key_val
530   TYPE_CHECKER["keyval"] = check_key_val
531   TYPE_CHECKER["unit"] = check_unit
532   TYPE_CHECKER["bool"] = check_bool
533
534
535 # optparse.py sets make_option, so we do it for our own option class, too
536 cli_option = CliOption
537
538
539 _YORNO = "yes|no"
540
541 DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
542                        help="Increase debugging level")
543
544 NOHDR_OPT = cli_option("--no-headers", default=False,
545                        action="store_true", dest="no_headers",
546                        help="Don't display column headers")
547
548 SEP_OPT = cli_option("--separator", default=None,
549                      action="store", dest="separator",
550                      help=("Separator between output fields"
551                            " (defaults to one space)"))
552
553 USEUNITS_OPT = cli_option("--units", default=None,
554                           dest="units", choices=('h', 'm', 'g', 't'),
555                           help="Specify units for output (one of hmgt)")
556
557 FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
558                         type="string", metavar="FIELDS",
559                         help="Comma separated list of output fields")
560
561 FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
562                        default=False, help="Force the operation")
563
564 CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
565                          default=False, help="Do not require confirmation")
566
567 TAG_SRC_OPT = cli_option("--from", dest="tags_source",
568                          default=None, help="File with tag names")
569
570 SUBMIT_OPT = cli_option("--submit", dest="submit_only",
571                         default=False, action="store_true",
572                         help=("Submit the job and return the job ID, but"
573                               " don't wait for the job to finish"))
574
575 SYNC_OPT = cli_option("--sync", dest="do_locking",
576                       default=False, action="store_true",
577                       help=("Grab locks while doing the queries"
578                             " in order to ensure more consistent results"))
579
580 DRY_RUN_OPT = cli_option("--dry-run", default=False,
581                          action="store_true",
582                          help=("Do not execute the operation, just run the"
583                                " check steps and verify it it could be"
584                                " executed"))
585
586 VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
587                          action="store_true",
588                          help="Increase the verbosity of the operation")
589
590 DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
591                               action="store_true", dest="simulate_errors",
592                               help="Debugging option that makes the operation"
593                               " treat most runtime checks as failed")
594
595 NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
596                         default=True, action="store_false",
597                         help="Don't wait for sync (DANGEROUS!)")
598
599 DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
600                                help="Custom disk setup (diskless, file,"
601                                " plain or drbd)",
602                                default=None, metavar="TEMPL",
603                                choices=list(constants.DISK_TEMPLATES))
604
605 NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
606                         help="Do not create any network cards for"
607                         " the instance")
608
609 FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
610                                help="Relative path under default cluster-wide"
611                                " file storage dir to store file-based disks",
612                                default=None, metavar="<DIR>")
613
614 FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
615                                   help="Driver to use for image files",
616                                   default="loop", metavar="<DRIVER>",
617                                   choices=list(constants.FILE_DRIVER))
618
619 IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
620                             help="Select nodes for the instance automatically"
621                             " using the <NAME> iallocator plugin",
622                             default=None, type="string",
623                             completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
624
625 DEFAULT_IALLOCATOR_OPT = cli_option("-I", "--default-iallocator",
626                             metavar="<NAME>",
627                             help="Set the default instance allocator plugin",
628                             default=None, type="string",
629                             completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
630
631 OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
632                     metavar="<os>",
633                     completion_suggest=OPT_COMPL_ONE_OS)
634
635 OSPARAMS_OPT = cli_option("-O", "--os-parameters", dest="osparams",
636                          type="keyval", default={},
637                          help="OS parameters")
638
639 FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
640                                action="store_true", default=False,
641                                help="Force an unknown variant")
642
643 NO_INSTALL_OPT = cli_option("--no-install", dest="no_install",
644                             action="store_true", default=False,
645                             help="Do not install the OS (will"
646                             " enable no-start)")
647
648 BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
649                          type="keyval", default={},
650                          help="Backend parameters")
651
652 HVOPTS_OPT =  cli_option("-H", "--hypervisor-parameters", type="keyval",
653                          default={}, dest="hvparams",
654                          help="Hypervisor parameters")
655
656 HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
657                             help="Hypervisor and hypervisor options, in the"
658                             " format hypervisor:option=value,option=value,...",
659                             default=None, type="identkeyval")
660
661 HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
662                         help="Hypervisor and hypervisor options, in the"
663                         " format hypervisor:option=value,option=value,...",
664                         default=[], action="append", type="identkeyval")
665
666 NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
667                            action="store_false",
668                            help="Don't check that the instance's IP"
669                            " is alive")
670
671 NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
672                              default=True, action="store_false",
673                              help="Don't check that the instance's name"
674                              " is resolvable")
675
676 NET_OPT = cli_option("--net",
677                      help="NIC parameters", default=[],
678                      dest="nics", action="append", type="identkeyval")
679
680 DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
681                       dest="disks", action="append", type="identkeyval")
682
683 DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
684                          help="Comma-separated list of disks"
685                          " indices to act on (e.g. 0,2) (optional,"
686                          " defaults to all disks)")
687
688 OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
689                          help="Enforces a single-disk configuration using the"
690                          " given disk size, in MiB unless a suffix is used",
691                          default=None, type="unit", metavar="<size>")
692
693 IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
694                                 dest="ignore_consistency",
695                                 action="store_true", default=False,
696                                 help="Ignore the consistency of the disks on"
697                                 " the secondary")
698
699 NONLIVE_OPT = cli_option("--non-live", dest="live",
700                          default=True, action="store_false",
701                          help="Do a non-live migration (this usually means"
702                          " freeze the instance, save the state, transfer and"
703                          " only then resume running on the secondary node)")
704
705 MIGRATION_MODE_OPT = cli_option("--migration-mode", dest="migration_mode",
706                                 default=None,
707                                 choices=list(constants.HT_MIGRATION_MODES),
708                                 help="Override default migration mode (choose"
709                                 " either live or non-live")
710
711 NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
712                                 help="Target node and optional secondary node",
713                                 metavar="<pnode>[:<snode>]",
714                                 completion_suggest=OPT_COMPL_INST_ADD_NODES)
715
716 NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
717                            action="append", metavar="<node>",
718                            help="Use only this node (can be used multiple"
719                            " times, if not given defaults to all nodes)",
720                            completion_suggest=OPT_COMPL_ONE_NODE)
721
722 SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
723                              metavar="<node>",
724                              completion_suggest=OPT_COMPL_ONE_NODE)
725
726 NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
727                          action="store_false",
728                          help="Don't start the instance after creation")
729
730 SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
731                          action="store_true", default=False,
732                          help="Show command instead of executing it")
733
734 CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
735                          default=False, action="store_true",
736                          help="Instead of performing the migration, try to"
737                          " recover from a failed cleanup. This is safe"
738                          " to run even if the instance is healthy, but it"
739                          " will create extra replication traffic and "
740                          " disrupt briefly the replication (like during the"
741                          " migration")
742
743 STATIC_OPT = cli_option("-s", "--static", dest="static",
744                         action="store_true", default=False,
745                         help="Only show configuration data, not runtime data")
746
747 ALL_OPT = cli_option("--all", dest="show_all",
748                      default=False, action="store_true",
749                      help="Show info on all instances on the cluster."
750                      " This can take a long time to run, use wisely")
751
752 SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
753                            action="store_true", default=False,
754                            help="Interactive OS reinstall, lists available"
755                            " OS templates for selection")
756
757 IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
758                                  action="store_true", default=False,
759                                  help="Remove the instance from the cluster"
760                                  " configuration even if there are failures"
761                                  " during the removal process")
762
763 IGNORE_REMOVE_FAILURES_OPT = cli_option("--ignore-remove-failures",
764                                         dest="ignore_remove_failures",
765                                         action="store_true", default=False,
766                                         help="Remove the instance from the"
767                                         " cluster configuration even if there"
768                                         " are failures during the removal"
769                                         " process")
770
771 REMOVE_INSTANCE_OPT = cli_option("--remove-instance", dest="remove_instance",
772                                  action="store_true", default=False,
773                                  help="Remove the instance from the cluster")
774
775 NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
776                                help="Specifies the new secondary node",
777                                metavar="NODE", default=None,
778                                completion_suggest=OPT_COMPL_ONE_NODE)
779
780 ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
781                             default=False, action="store_true",
782                             help="Replace the disk(s) on the primary"
783                             " node (only for the drbd template)")
784
785 ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
786                               default=False, action="store_true",
787                               help="Replace the disk(s) on the secondary"
788                               " node (only for the drbd template)")
789
790 AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
791                               default=False, action="store_true",
792                               help="Lock all nodes and auto-promote as needed"
793                               " to MC status")
794
795 AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
796                               default=False, action="store_true",
797                               help="Automatically replace faulty disks"
798                               " (only for the drbd template)")
799
800 IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
801                              default=False, action="store_true",
802                              help="Ignore current recorded size"
803                              " (useful for forcing activation when"
804                              " the recorded size is wrong)")
805
806 SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
807                           metavar="<node>",
808                           completion_suggest=OPT_COMPL_ONE_NODE)
809
810 SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
811                          metavar="<dir>")
812
813 SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
814                               help="Specify the secondary ip for the node",
815                               metavar="ADDRESS", default=None)
816
817 READD_OPT = cli_option("--readd", dest="readd",
818                        default=False, action="store_true",
819                        help="Readd old node after replacing it")
820
821 NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
822                                 default=True, action="store_false",
823                                 help="Disable SSH key fingerprint checking")
824
825
826 MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
827                     type="bool", default=None, metavar=_YORNO,
828                     help="Set the master_candidate flag on the node")
829
830 OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
831                          type="bool", default=None,
832                          help="Set the offline flag on the node")
833
834 DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
835                          type="bool", default=None,
836                          help="Set the drained flag on the node")
837
838 ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
839                              type="bool", default=None, metavar=_YORNO,
840                              help="Set the allocatable flag on a volume")
841
842 NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
843                                help="Disable support for lvm based instances"
844                                " (cluster-wide)",
845                                action="store_false", default=True)
846
847 ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
848                             dest="enabled_hypervisors",
849                             help="Comma-separated list of hypervisors",
850                             type="string", default=None)
851
852 NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
853                             type="keyval", default={},
854                             help="NIC parameters")
855
856 CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
857                          dest="candidate_pool_size", type="int",
858                          help="Set the candidate pool size")
859
860 VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name",
861                          help="Enables LVM and specifies the volume group"
862                          " name (cluster-wide) for disk allocation [xenvg]",
863                          metavar="VG", default=None)
864
865 YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
866                           help="Destroy cluster", action="store_true")
867
868 NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
869                           help="Skip node agreement check (dangerous)",
870                           action="store_true", default=False)
871
872 MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
873                             help="Specify the mac prefix for the instance IP"
874                             " addresses, in the format XX:XX:XX",
875                             metavar="PREFIX",
876                             default=None)
877
878 MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
879                                help="Specify the node interface (cluster-wide)"
880                                " on which the master IP address will be added "
881                                " [%s]" % constants.DEFAULT_BRIDGE,
882                                metavar="NETDEV",
883                                default=constants.DEFAULT_BRIDGE)
884
885 GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
886                                 help="Specify the default directory (cluster-"
887                                 "wide) for storing the file-based disks [%s]" %
888                                 constants.DEFAULT_FILE_STORAGE_DIR,
889                                 metavar="DIR",
890                                 default=constants.DEFAULT_FILE_STORAGE_DIR)
891
892 NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
893                                    help="Don't modify /etc/hosts",
894                                    action="store_false", default=True)
895
896 NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
897                                     help="Don't initialize SSH keys",
898                                     action="store_false", default=True)
899
900 ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
901                              help="Enable parseable error messages",
902                              action="store_true", default=False)
903
904 NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
905                           help="Skip N+1 memory redundancy tests",
906                           action="store_true", default=False)
907
908 REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
909                              help="Type of reboot: soft/hard/full",
910                              default=constants.INSTANCE_REBOOT_HARD,
911                              metavar="<REBOOT>",
912                              choices=list(constants.REBOOT_TYPES))
913
914 IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
915                                     dest="ignore_secondaries",
916                                     default=False, action="store_true",
917                                     help="Ignore errors from secondaries")
918
919 NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
920                             action="store_false", default=True,
921                             help="Don't shutdown the instance (unsafe)")
922
923 TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
924                          default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
925                          help="Maximum time to wait")
926
927 SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
928                          dest="shutdown_timeout", type="int",
929                          default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
930                          help="Maximum time to wait for instance shutdown")
931
932 EARLY_RELEASE_OPT = cli_option("--early-release",
933                                dest="early_release", default=False,
934                                action="store_true",
935                                help="Release the locks on the secondary"
936                                " node(s) early")
937
938 NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
939                                   dest="new_cluster_cert",
940                                   default=False, action="store_true",
941                                   help="Generate a new cluster certificate")
942
943 RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
944                            default=None,
945                            help="File containing new RAPI certificate")
946
947 NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
948                                default=None, action="store_true",
949                                help=("Generate a new self-signed RAPI"
950                                      " certificate"))
951
952 NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
953                                     dest="new_confd_hmac_key",
954                                     default=False, action="store_true",
955                                     help=("Create a new HMAC key for %s" %
956                                           constants.CONFD))
957
958 CLUSTER_DOMAIN_SECRET_OPT = cli_option("--cluster-domain-secret",
959                                        dest="cluster_domain_secret",
960                                        default=None,
961                                        help=("Load new new cluster domain"
962                                              " secret from file"))
963
964 NEW_CLUSTER_DOMAIN_SECRET_OPT = cli_option("--new-cluster-domain-secret",
965                                            dest="new_cluster_domain_secret",
966                                            default=False, action="store_true",
967                                            help=("Create a new cluster domain"
968                                                  " secret"))
969
970 USE_REPL_NET_OPT = cli_option("--use-replication-network",
971                               dest="use_replication_network",
972                               help="Whether to use the replication network"
973                               " for talking to the nodes",
974                               action="store_true", default=False)
975
976 MAINTAIN_NODE_HEALTH_OPT = \
977     cli_option("--maintain-node-health", dest="maintain_node_health",
978                metavar=_YORNO, default=None, type="bool",
979                help="Configure the cluster to automatically maintain node"
980                " health, by shutting down unknown instances, shutting down"
981                " unknown DRBD devices, etc.")
982
983 IDENTIFY_DEFAULTS_OPT = \
984     cli_option("--identify-defaults", dest="identify_defaults",
985                default=False, action="store_true",
986                help="Identify which saved instance parameters are equal to"
987                " the current cluster defaults and set them as such, instead"
988                " of marking them as overridden")
989
990 UIDPOOL_OPT = cli_option("--uid-pool", default=None,
991                          action="store", dest="uid_pool",
992                          help=("A list of user-ids or user-id"
993                                " ranges separated by commas"))
994
995 ADD_UIDS_OPT = cli_option("--add-uids", default=None,
996                           action="store", dest="add_uids",
997                           help=("A list of user-ids or user-id"
998                                 " ranges separated by commas, to be"
999                                 " added to the user-id pool"))
1000
1001 REMOVE_UIDS_OPT = cli_option("--remove-uids", default=None,
1002                              action="store", dest="remove_uids",
1003                              help=("A list of user-ids or user-id"
1004                                    " ranges separated by commas, to be"
1005                                    " removed from the user-id pool"))
1006
1007 RESERVED_LVS_OPT = cli_option("--reserved-lvs", default=None,
1008                              action="store", dest="reserved_lvs",
1009                              help=("A comma-separated list of reserved"
1010                                    " logical volumes names, that will be"
1011                                    " ignored by cluster verify"))
1012
1013 ROMAN_OPT = cli_option("--roman",
1014                        dest="roman_integers", default=False,
1015                        action="store_true",
1016                        help="Use roman numbers for positive integers")
1017
1018 DRBD_HELPER_OPT = cli_option("--drbd-usermode-helper", dest="drbd_helper",
1019                              action="store", default=None,
1020                              help="Specifies usermode helper for DRBD")
1021
1022 NODRBD_STORAGE_OPT = cli_option("--no-drbd-storage", dest="drbd_storage",
1023                                 action="store_false", default=True,
1024                                 help="Disable support for DRBD")
1025
1026
1027 def _ParseArgs(argv, commands, aliases):
1028   """Parser for the command line arguments.
1029
1030   This function parses the arguments and returns the function which
1031   must be executed together with its (modified) arguments.
1032
1033   @param argv: the command line
1034   @param commands: dictionary with special contents, see the design
1035       doc for cmdline handling
1036   @param aliases: dictionary with command aliases {'alias': 'target, ...}
1037
1038   """
1039   if len(argv) == 0:
1040     binary = "<command>"
1041   else:
1042     binary = argv[0].split("/")[-1]
1043
1044   if len(argv) > 1 and argv[1] == "--version":
1045     ToStdout("%s (ganeti %s) %s", binary, constants.VCS_VERSION,
1046              constants.RELEASE_VERSION)
1047     # Quit right away. That way we don't have to care about this special
1048     # argument. optparse.py does it the same.
1049     sys.exit(0)
1050
1051   if len(argv) < 2 or not (argv[1] in commands or
1052                            argv[1] in aliases):
1053     # let's do a nice thing
1054     sortedcmds = commands.keys()
1055     sortedcmds.sort()
1056
1057     ToStdout("Usage: %s {command} [options...] [argument...]", binary)
1058     ToStdout("%s <command> --help to see details, or man %s", binary, binary)
1059     ToStdout("")
1060
1061     # compute the max line length for cmd + usage
1062     mlen = max([len(" %s" % cmd) for cmd in commands])
1063     mlen = min(60, mlen) # should not get here...
1064
1065     # and format a nice command list
1066     ToStdout("Commands:")
1067     for cmd in sortedcmds:
1068       cmdstr = " %s" % (cmd,)
1069       help_text = commands[cmd][4]
1070       help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
1071       ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
1072       for line in help_lines:
1073         ToStdout("%-*s   %s", mlen, "", line)
1074
1075     ToStdout("")
1076
1077     return None, None, None
1078
1079   # get command, unalias it, and look it up in commands
1080   cmd = argv.pop(1)
1081   if cmd in aliases:
1082     if cmd in commands:
1083       raise errors.ProgrammerError("Alias '%s' overrides an existing"
1084                                    " command" % cmd)
1085
1086     if aliases[cmd] not in commands:
1087       raise errors.ProgrammerError("Alias '%s' maps to non-existing"
1088                                    " command '%s'" % (cmd, aliases[cmd]))
1089
1090     cmd = aliases[cmd]
1091
1092   func, args_def, parser_opts, usage, description = commands[cmd]
1093   parser = OptionParser(option_list=parser_opts + [DEBUG_OPT],
1094                         description=description,
1095                         formatter=TitledHelpFormatter(),
1096                         usage="%%prog %s %s" % (cmd, usage))
1097   parser.disable_interspersed_args()
1098   options, args = parser.parse_args()
1099
1100   if not _CheckArguments(cmd, args_def, args):
1101     return None, None, None
1102
1103   return func, options, args
1104
1105
1106 def _CheckArguments(cmd, args_def, args):
1107   """Verifies the arguments using the argument definition.
1108
1109   Algorithm:
1110
1111     1. Abort with error if values specified by user but none expected.
1112
1113     1. For each argument in definition
1114
1115       1. Keep running count of minimum number of values (min_count)
1116       1. Keep running count of maximum number of values (max_count)
1117       1. If it has an unlimited number of values
1118
1119         1. Abort with error if it's not the last argument in the definition
1120
1121     1. If last argument has limited number of values
1122
1123       1. Abort with error if number of values doesn't match or is too large
1124
1125     1. Abort with error if user didn't pass enough values (min_count)
1126
1127   """
1128   if args and not args_def:
1129     ToStderr("Error: Command %s expects no arguments", cmd)
1130     return False
1131
1132   min_count = None
1133   max_count = None
1134   check_max = None
1135
1136   last_idx = len(args_def) - 1
1137
1138   for idx, arg in enumerate(args_def):
1139     if min_count is None:
1140       min_count = arg.min
1141     elif arg.min is not None:
1142       min_count += arg.min
1143
1144     if max_count is None:
1145       max_count = arg.max
1146     elif arg.max is not None:
1147       max_count += arg.max
1148
1149     if idx == last_idx:
1150       check_max = (arg.max is not None)
1151
1152     elif arg.max is None:
1153       raise errors.ProgrammerError("Only the last argument can have max=None")
1154
1155   if check_max:
1156     # Command with exact number of arguments
1157     if (min_count is not None and max_count is not None and
1158         min_count == max_count and len(args) != min_count):
1159       ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
1160       return False
1161
1162     # Command with limited number of arguments
1163     if max_count is not None and len(args) > max_count:
1164       ToStderr("Error: Command %s expects only %d argument(s)",
1165                cmd, max_count)
1166       return False
1167
1168   # Command with some required arguments
1169   if min_count is not None and len(args) < min_count:
1170     ToStderr("Error: Command %s expects at least %d argument(s)",
1171              cmd, min_count)
1172     return False
1173
1174   return True
1175
1176
1177 def SplitNodeOption(value):
1178   """Splits the value of a --node option.
1179
1180   """
1181   if value and ':' in value:
1182     return value.split(':', 1)
1183   else:
1184     return (value, None)
1185
1186
1187 def CalculateOSNames(os_name, os_variants):
1188   """Calculates all the names an OS can be called, according to its variants.
1189
1190   @type os_name: string
1191   @param os_name: base name of the os
1192   @type os_variants: list or None
1193   @param os_variants: list of supported variants
1194   @rtype: list
1195   @return: list of valid names
1196
1197   """
1198   if os_variants:
1199     return ['%s+%s' % (os_name, v) for v in os_variants]
1200   else:
1201     return [os_name]
1202
1203
1204 def ParseFields(selected, default):
1205   """Parses the values of "--field"-like options.
1206
1207   @type selected: string or None
1208   @param selected: User-selected options
1209   @type default: list
1210   @param default: Default fields
1211
1212   """
1213   if selected is None:
1214     return default
1215
1216   if selected.startswith("+"):
1217     return default + selected[1:].split(",")
1218
1219   return selected.split(",")
1220
1221
1222 UsesRPC = rpc.RunWithRPC
1223
1224
1225 def AskUser(text, choices=None):
1226   """Ask the user a question.
1227
1228   @param text: the question to ask
1229
1230   @param choices: list with elements tuples (input_char, return_value,
1231       description); if not given, it will default to: [('y', True,
1232       'Perform the operation'), ('n', False, 'Do no do the operation')];
1233       note that the '?' char is reserved for help
1234
1235   @return: one of the return values from the choices list; if input is
1236       not possible (i.e. not running with a tty, we return the last
1237       entry from the list
1238
1239   """
1240   if choices is None:
1241     choices = [('y', True, 'Perform the operation'),
1242                ('n', False, 'Do not perform the operation')]
1243   if not choices or not isinstance(choices, list):
1244     raise errors.ProgrammerError("Invalid choices argument to AskUser")
1245   for entry in choices:
1246     if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
1247       raise errors.ProgrammerError("Invalid choices element to AskUser")
1248
1249   answer = choices[-1][1]
1250   new_text = []
1251   for line in text.splitlines():
1252     new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
1253   text = "\n".join(new_text)
1254   try:
1255     f = file("/dev/tty", "a+")
1256   except IOError:
1257     return answer
1258   try:
1259     chars = [entry[0] for entry in choices]
1260     chars[-1] = "[%s]" % chars[-1]
1261     chars.append('?')
1262     maps = dict([(entry[0], entry[1]) for entry in choices])
1263     while True:
1264       f.write(text)
1265       f.write('\n')
1266       f.write("/".join(chars))
1267       f.write(": ")
1268       line = f.readline(2).strip().lower()
1269       if line in maps:
1270         answer = maps[line]
1271         break
1272       elif line == '?':
1273         for entry in choices:
1274           f.write(" %s - %s\n" % (entry[0], entry[2]))
1275         f.write("\n")
1276         continue
1277   finally:
1278     f.close()
1279   return answer
1280
1281
1282 class JobSubmittedException(Exception):
1283   """Job was submitted, client should exit.
1284
1285   This exception has one argument, the ID of the job that was
1286   submitted. The handler should print this ID.
1287
1288   This is not an error, just a structured way to exit from clients.
1289
1290   """
1291
1292
1293 def SendJob(ops, cl=None):
1294   """Function to submit an opcode without waiting for the results.
1295
1296   @type ops: list
1297   @param ops: list of opcodes
1298   @type cl: luxi.Client
1299   @param cl: the luxi client to use for communicating with the master;
1300              if None, a new client will be created
1301
1302   """
1303   if cl is None:
1304     cl = GetClient()
1305
1306   job_id = cl.SubmitJob(ops)
1307
1308   return job_id
1309
1310
1311 def GenericPollJob(job_id, cbs, report_cbs):
1312   """Generic job-polling function.
1313
1314   @type job_id: number
1315   @param job_id: Job ID
1316   @type cbs: Instance of L{JobPollCbBase}
1317   @param cbs: Data callbacks
1318   @type report_cbs: Instance of L{JobPollReportCbBase}
1319   @param report_cbs: Reporting callbacks
1320
1321   """
1322   prev_job_info = None
1323   prev_logmsg_serial = None
1324
1325   status = None
1326
1327   while True:
1328     result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1329                                       prev_logmsg_serial)
1330     if not result:
1331       # job not found, go away!
1332       raise errors.JobLost("Job with id %s lost" % job_id)
1333
1334     if result == constants.JOB_NOTCHANGED:
1335       report_cbs.ReportNotChanged(job_id, status)
1336
1337       # Wait again
1338       continue
1339
1340     # Split result, a tuple of (field values, log entries)
1341     (job_info, log_entries) = result
1342     (status, ) = job_info
1343
1344     if log_entries:
1345       for log_entry in log_entries:
1346         (serial, timestamp, log_type, message) = log_entry
1347         report_cbs.ReportLogMessage(job_id, serial, timestamp,
1348                                     log_type, message)
1349         prev_logmsg_serial = max(prev_logmsg_serial, serial)
1350
1351     # TODO: Handle canceled and archived jobs
1352     elif status in (constants.JOB_STATUS_SUCCESS,
1353                     constants.JOB_STATUS_ERROR,
1354                     constants.JOB_STATUS_CANCELING,
1355                     constants.JOB_STATUS_CANCELED):
1356       break
1357
1358     prev_job_info = job_info
1359
1360   jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1361   if not jobs:
1362     raise errors.JobLost("Job with id %s lost" % job_id)
1363
1364   status, opstatus, result = jobs[0]
1365
1366   if status == constants.JOB_STATUS_SUCCESS:
1367     return result
1368
1369   if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
1370     raise errors.OpExecError("Job was canceled")
1371
1372   has_ok = False
1373   for idx, (status, msg) in enumerate(zip(opstatus, result)):
1374     if status == constants.OP_STATUS_SUCCESS:
1375       has_ok = True
1376     elif status == constants.OP_STATUS_ERROR:
1377       errors.MaybeRaise(msg)
1378
1379       if has_ok:
1380         raise errors.OpExecError("partial failure (opcode %d): %s" %
1381                                  (idx, msg))
1382
1383       raise errors.OpExecError(str(msg))
1384
1385   # default failure mode
1386   raise errors.OpExecError(result)
1387
1388
1389 class JobPollCbBase:
1390   """Base class for L{GenericPollJob} callbacks.
1391
1392   """
1393   def __init__(self):
1394     """Initializes this class.
1395
1396     """
1397
1398   def WaitForJobChangeOnce(self, job_id, fields,
1399                            prev_job_info, prev_log_serial):
1400     """Waits for changes on a job.
1401
1402     """
1403     raise NotImplementedError()
1404
1405   def QueryJobs(self, job_ids, fields):
1406     """Returns the selected fields for the selected job IDs.
1407
1408     @type job_ids: list of numbers
1409     @param job_ids: Job IDs
1410     @type fields: list of strings
1411     @param fields: Fields
1412
1413     """
1414     raise NotImplementedError()
1415
1416
1417 class JobPollReportCbBase:
1418   """Base class for L{GenericPollJob} reporting callbacks.
1419
1420   """
1421   def __init__(self):
1422     """Initializes this class.
1423
1424     """
1425
1426   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1427     """Handles a log message.
1428
1429     """
1430     raise NotImplementedError()
1431
1432   def ReportNotChanged(self, job_id, status):
1433     """Called for if a job hasn't changed in a while.
1434
1435     @type job_id: number
1436     @param job_id: Job ID
1437     @type status: string or None
1438     @param status: Job status if available
1439
1440     """
1441     raise NotImplementedError()
1442
1443
1444 class _LuxiJobPollCb(JobPollCbBase):
1445   def __init__(self, cl):
1446     """Initializes this class.
1447
1448     """
1449     JobPollCbBase.__init__(self)
1450     self.cl = cl
1451
1452   def WaitForJobChangeOnce(self, job_id, fields,
1453                            prev_job_info, prev_log_serial):
1454     """Waits for changes on a job.
1455
1456     """
1457     return self.cl.WaitForJobChangeOnce(job_id, fields,
1458                                         prev_job_info, prev_log_serial)
1459
1460   def QueryJobs(self, job_ids, fields):
1461     """Returns the selected fields for the selected job IDs.
1462
1463     """
1464     return self.cl.QueryJobs(job_ids, fields)
1465
1466
1467 class FeedbackFnJobPollReportCb(JobPollReportCbBase):
1468   def __init__(self, feedback_fn):
1469     """Initializes this class.
1470
1471     """
1472     JobPollReportCbBase.__init__(self)
1473
1474     self.feedback_fn = feedback_fn
1475
1476     assert callable(feedback_fn)
1477
1478   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1479     """Handles a log message.
1480
1481     """
1482     self.feedback_fn((timestamp, log_type, log_msg))
1483
1484   def ReportNotChanged(self, job_id, status):
1485     """Called if a job hasn't changed in a while.
1486
1487     """
1488     # Ignore
1489
1490
1491 class StdioJobPollReportCb(JobPollReportCbBase):
1492   def __init__(self):
1493     """Initializes this class.
1494
1495     """
1496     JobPollReportCbBase.__init__(self)
1497
1498     self.notified_queued = False
1499     self.notified_waitlock = False
1500
1501   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1502     """Handles a log message.
1503
1504     """
1505     ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
1506              FormatLogMessage(log_type, log_msg))
1507
1508   def ReportNotChanged(self, job_id, status):
1509     """Called if a job hasn't changed in a while.
1510
1511     """
1512     if status is None:
1513       return
1514
1515     if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
1516       ToStderr("Job %s is waiting in queue", job_id)
1517       self.notified_queued = True
1518
1519     elif status == constants.JOB_STATUS_WAITLOCK and not self.notified_waitlock:
1520       ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1521       self.notified_waitlock = True
1522
1523
1524 def FormatLogMessage(log_type, log_msg):
1525   """Formats a job message according to its type.
1526
1527   """
1528   if log_type != constants.ELOG_MESSAGE:
1529     log_msg = str(log_msg)
1530
1531   return utils.SafeEncode(log_msg)
1532
1533
1534 def PollJob(job_id, cl=None, feedback_fn=None, reporter=None):
1535   """Function to poll for the result of a job.
1536
1537   @type job_id: job identified
1538   @param job_id: the job to poll for results
1539   @type cl: luxi.Client
1540   @param cl: the luxi client to use for communicating with the master;
1541              if None, a new client will be created
1542
1543   """
1544   if cl is None:
1545     cl = GetClient()
1546
1547   if reporter is None:
1548     if feedback_fn:
1549       reporter = FeedbackFnJobPollReportCb(feedback_fn)
1550     else:
1551       reporter = StdioJobPollReportCb()
1552   elif feedback_fn:
1553     raise errors.ProgrammerError("Can't specify reporter and feedback function")
1554
1555   return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
1556
1557
1558 def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None, reporter=None):
1559   """Legacy function to submit an opcode.
1560
1561   This is just a simple wrapper over the construction of the processor
1562   instance. It should be extended to better handle feedback and
1563   interaction functions.
1564
1565   """
1566   if cl is None:
1567     cl = GetClient()
1568
1569   SetGenericOpcodeOpts([op], opts)
1570
1571   job_id = SendJob([op], cl=cl)
1572
1573   op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn,
1574                        reporter=reporter)
1575
1576   return op_results[0]
1577
1578
1579 def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1580   """Wrapper around SubmitOpCode or SendJob.
1581
1582   This function will decide, based on the 'opts' parameter, whether to
1583   submit and wait for the result of the opcode (and return it), or
1584   whether to just send the job and print its identifier. It is used in
1585   order to simplify the implementation of the '--submit' option.
1586
1587   It will also process the opcodes if we're sending the via SendJob
1588   (otherwise SubmitOpCode does it).
1589
1590   """
1591   if opts and opts.submit_only:
1592     job = [op]
1593     SetGenericOpcodeOpts(job, opts)
1594     job_id = SendJob(job, cl=cl)
1595     raise JobSubmittedException(job_id)
1596   else:
1597     return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1598
1599
1600 def SetGenericOpcodeOpts(opcode_list, options):
1601   """Processor for generic options.
1602
1603   This function updates the given opcodes based on generic command
1604   line options (like debug, dry-run, etc.).
1605
1606   @param opcode_list: list of opcodes
1607   @param options: command line options or None
1608   @return: None (in-place modification)
1609
1610   """
1611   if not options:
1612     return
1613   for op in opcode_list:
1614     if hasattr(options, "dry_run"):
1615       op.dry_run = options.dry_run
1616     op.debug_level = options.debug
1617
1618
1619 def GetClient():
1620   # TODO: Cache object?
1621   try:
1622     client = luxi.Client()
1623   except luxi.NoMasterError:
1624     ss = ssconf.SimpleStore()
1625
1626     # Try to read ssconf file
1627     try:
1628       ss.GetMasterNode()
1629     except errors.ConfigurationError:
1630       raise errors.OpPrereqError("Cluster not initialized or this machine is"
1631                                  " not part of a cluster")
1632
1633     master, myself = ssconf.GetMasterAndMyself(ss=ss)
1634     if master != myself:
1635       raise errors.OpPrereqError("This is not the master node, please connect"
1636                                  " to node '%s' and rerun the command" %
1637                                  master)
1638     raise
1639   return client
1640
1641
1642 def FormatError(err):
1643   """Return a formatted error message for a given error.
1644
1645   This function takes an exception instance and returns a tuple
1646   consisting of two values: first, the recommended exit code, and
1647   second, a string describing the error message (not
1648   newline-terminated).
1649
1650   """
1651   retcode = 1
1652   obuf = StringIO()
1653   msg = str(err)
1654   if isinstance(err, errors.ConfigurationError):
1655     txt = "Corrupt configuration file: %s" % msg
1656     logging.error(txt)
1657     obuf.write(txt + "\n")
1658     obuf.write("Aborting.")
1659     retcode = 2
1660   elif isinstance(err, errors.HooksAbort):
1661     obuf.write("Failure: hooks execution failed:\n")
1662     for node, script, out in err.args[0]:
1663       if out:
1664         obuf.write("  node: %s, script: %s, output: %s\n" %
1665                    (node, script, out))
1666       else:
1667         obuf.write("  node: %s, script: %s (no output)\n" %
1668                    (node, script))
1669   elif isinstance(err, errors.HooksFailure):
1670     obuf.write("Failure: hooks general failure: %s" % msg)
1671   elif isinstance(err, errors.ResolverError):
1672     this_host = netutils.HostInfo.SysName()
1673     if err.args[0] == this_host:
1674       msg = "Failure: can't resolve my own hostname ('%s')"
1675     else:
1676       msg = "Failure: can't resolve hostname '%s'"
1677     obuf.write(msg % err.args[0])
1678   elif isinstance(err, errors.OpPrereqError):
1679     if len(err.args) == 2:
1680       obuf.write("Failure: prerequisites not met for this"
1681                " operation:\nerror type: %s, error details:\n%s" %
1682                  (err.args[1], err.args[0]))
1683     else:
1684       obuf.write("Failure: prerequisites not met for this"
1685                  " operation:\n%s" % msg)
1686   elif isinstance(err, errors.OpExecError):
1687     obuf.write("Failure: command execution error:\n%s" % msg)
1688   elif isinstance(err, errors.TagError):
1689     obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1690   elif isinstance(err, errors.JobQueueDrainError):
1691     obuf.write("Failure: the job queue is marked for drain and doesn't"
1692                " accept new requests\n")
1693   elif isinstance(err, errors.JobQueueFull):
1694     obuf.write("Failure: the job queue is full and doesn't accept new"
1695                " job submissions until old jobs are archived\n")
1696   elif isinstance(err, errors.TypeEnforcementError):
1697     obuf.write("Parameter Error: %s" % msg)
1698   elif isinstance(err, errors.ParameterError):
1699     obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1700   elif isinstance(err, luxi.NoMasterError):
1701     obuf.write("Cannot communicate with the master daemon.\nIs it running"
1702                " and listening for connections?")
1703   elif isinstance(err, luxi.TimeoutError):
1704     obuf.write("Timeout while talking to the master daemon. Error:\n"
1705                "%s" % msg)
1706   elif isinstance(err, luxi.PermissionError):
1707     obuf.write("It seems you don't have permissions to connect to the"
1708                " master daemon.\nPlease retry as a different user.")
1709   elif isinstance(err, luxi.ProtocolError):
1710     obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1711                "%s" % msg)
1712   elif isinstance(err, errors.JobLost):
1713     obuf.write("Error checking job status: %s" % msg)
1714   elif isinstance(err, errors.GenericError):
1715     obuf.write("Unhandled Ganeti error: %s" % msg)
1716   elif isinstance(err, JobSubmittedException):
1717     obuf.write("JobID: %s\n" % err.args[0])
1718     retcode = 0
1719   else:
1720     obuf.write("Unhandled exception: %s" % msg)
1721   return retcode, obuf.getvalue().rstrip('\n')
1722
1723
1724 def GenericMain(commands, override=None, aliases=None):
1725   """Generic main function for all the gnt-* commands.
1726
1727   Arguments:
1728     - commands: a dictionary with a special structure, see the design doc
1729                 for command line handling.
1730     - override: if not None, we expect a dictionary with keys that will
1731                 override command line options; this can be used to pass
1732                 options from the scripts to generic functions
1733     - aliases: dictionary with command aliases {'alias': 'target, ...}
1734
1735   """
1736   # save the program name and the entire command line for later logging
1737   if sys.argv:
1738     binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1739     if len(sys.argv) >= 2:
1740       binary += " " + sys.argv[1]
1741       old_cmdline = " ".join(sys.argv[2:])
1742     else:
1743       old_cmdline = ""
1744   else:
1745     binary = "<unknown program>"
1746     old_cmdline = ""
1747
1748   if aliases is None:
1749     aliases = {}
1750
1751   try:
1752     func, options, args = _ParseArgs(sys.argv, commands, aliases)
1753   except errors.ParameterError, err:
1754     result, err_msg = FormatError(err)
1755     ToStderr(err_msg)
1756     return 1
1757
1758   if func is None: # parse error
1759     return 1
1760
1761   if override is not None:
1762     for key, val in override.iteritems():
1763       setattr(options, key, val)
1764
1765   utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1766                      stderr_logging=True, program=binary)
1767
1768   if old_cmdline:
1769     logging.info("run with arguments '%s'", old_cmdline)
1770   else:
1771     logging.info("run with no arguments")
1772
1773   try:
1774     result = func(options, args)
1775   except (errors.GenericError, luxi.ProtocolError,
1776           JobSubmittedException), err:
1777     result, err_msg = FormatError(err)
1778     logging.exception("Error during command processing")
1779     ToStderr(err_msg)
1780
1781   return result
1782
1783
1784 def GenericInstanceCreate(mode, opts, args):
1785   """Add an instance to the cluster via either creation or import.
1786
1787   @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1788   @param opts: the command line options selected by the user
1789   @type args: list
1790   @param args: should contain only one element, the new instance name
1791   @rtype: int
1792   @return: the desired exit code
1793
1794   """
1795   instance = args[0]
1796
1797   (pnode, snode) = SplitNodeOption(opts.node)
1798
1799   hypervisor = None
1800   hvparams = {}
1801   if opts.hypervisor:
1802     hypervisor, hvparams = opts.hypervisor
1803
1804   if opts.nics:
1805     try:
1806       nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
1807     except ValueError, err:
1808       raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
1809     nics = [{}] * nic_max
1810     for nidx, ndict in opts.nics:
1811       nidx = int(nidx)
1812       if not isinstance(ndict, dict):
1813         msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict)
1814         raise errors.OpPrereqError(msg)
1815       nics[nidx] = ndict
1816   elif opts.no_nics:
1817     # no nics
1818     nics = []
1819   elif mode == constants.INSTANCE_CREATE:
1820     # default of one nic, all auto
1821     nics = [{}]
1822   else:
1823     # mode == import
1824     nics = []
1825
1826   if opts.disk_template == constants.DT_DISKLESS:
1827     if opts.disks or opts.sd_size is not None:
1828       raise errors.OpPrereqError("Diskless instance but disk"
1829                                  " information passed")
1830     disks = []
1831   else:
1832     if (not opts.disks and not opts.sd_size
1833         and mode == constants.INSTANCE_CREATE):
1834       raise errors.OpPrereqError("No disk information specified")
1835     if opts.disks and opts.sd_size is not None:
1836       raise errors.OpPrereqError("Please use either the '--disk' or"
1837                                  " '-s' option")
1838     if opts.sd_size is not None:
1839       opts.disks = [(0, {"size": opts.sd_size})]
1840
1841     if opts.disks:
1842       try:
1843         disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1844       except ValueError, err:
1845         raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
1846       disks = [{}] * disk_max
1847     else:
1848       disks = []
1849     for didx, ddict in opts.disks:
1850       didx = int(didx)
1851       if not isinstance(ddict, dict):
1852         msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1853         raise errors.OpPrereqError(msg)
1854       elif "size" in ddict:
1855         if "adopt" in ddict:
1856           raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1857                                      " (disk %d)" % didx)
1858         try:
1859           ddict["size"] = utils.ParseUnit(ddict["size"])
1860         except ValueError, err:
1861           raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1862                                      (didx, err))
1863       elif "adopt" in ddict:
1864         if mode == constants.INSTANCE_IMPORT:
1865           raise errors.OpPrereqError("Disk adoption not allowed for instance"
1866                                      " import")
1867         ddict["size"] = 0
1868       else:
1869         raise errors.OpPrereqError("Missing size or adoption source for"
1870                                    " disk %d" % didx)
1871       disks[didx] = ddict
1872
1873   utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
1874   utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1875
1876   if mode == constants.INSTANCE_CREATE:
1877     start = opts.start
1878     os_type = opts.os
1879     force_variant = opts.force_variant
1880     src_node = None
1881     src_path = None
1882     no_install = opts.no_install
1883     identify_defaults = False
1884   elif mode == constants.INSTANCE_IMPORT:
1885     start = False
1886     os_type = None
1887     force_variant = False
1888     src_node = opts.src_node
1889     src_path = opts.src_dir
1890     no_install = None
1891     identify_defaults = opts.identify_defaults
1892   else:
1893     raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1894
1895   op = opcodes.OpCreateInstance(instance_name=instance,
1896                                 disks=disks,
1897                                 disk_template=opts.disk_template,
1898                                 nics=nics,
1899                                 pnode=pnode, snode=snode,
1900                                 ip_check=opts.ip_check,
1901                                 name_check=opts.name_check,
1902                                 wait_for_sync=opts.wait_for_sync,
1903                                 file_storage_dir=opts.file_storage_dir,
1904                                 file_driver=opts.file_driver,
1905                                 iallocator=opts.iallocator,
1906                                 hypervisor=hypervisor,
1907                                 hvparams=hvparams,
1908                                 beparams=opts.beparams,
1909                                 osparams=opts.osparams,
1910                                 mode=mode,
1911                                 start=start,
1912                                 os_type=os_type,
1913                                 force_variant=force_variant,
1914                                 src_node=src_node,
1915                                 src_path=src_path,
1916                                 no_install=no_install,
1917                                 identify_defaults=identify_defaults)
1918
1919   SubmitOrSend(op, opts)
1920   return 0
1921
1922
1923 class _RunWhileClusterStoppedHelper:
1924   """Helper class for L{RunWhileClusterStopped} to simplify state management
1925
1926   """
1927   def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
1928     """Initializes this class.
1929
1930     @type feedback_fn: callable
1931     @param feedback_fn: Feedback function
1932     @type cluster_name: string
1933     @param cluster_name: Cluster name
1934     @type master_node: string
1935     @param master_node Master node name
1936     @type online_nodes: list
1937     @param online_nodes: List of names of online nodes
1938
1939     """
1940     self.feedback_fn = feedback_fn
1941     self.cluster_name = cluster_name
1942     self.master_node = master_node
1943     self.online_nodes = online_nodes
1944
1945     self.ssh = ssh.SshRunner(self.cluster_name)
1946
1947     self.nonmaster_nodes = [name for name in online_nodes
1948                             if name != master_node]
1949
1950     assert self.master_node not in self.nonmaster_nodes
1951
1952   def _RunCmd(self, node_name, cmd):
1953     """Runs a command on the local or a remote machine.
1954
1955     @type node_name: string
1956     @param node_name: Machine name
1957     @type cmd: list
1958     @param cmd: Command
1959
1960     """
1961     if node_name is None or node_name == self.master_node:
1962       # No need to use SSH
1963       result = utils.RunCmd(cmd)
1964     else:
1965       result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
1966
1967     if result.failed:
1968       errmsg = ["Failed to run command %s" % result.cmd]
1969       if node_name:
1970         errmsg.append("on node %s" % node_name)
1971       errmsg.append(": exitcode %s and error %s" %
1972                     (result.exit_code, result.output))
1973       raise errors.OpExecError(" ".join(errmsg))
1974
1975   def Call(self, fn, *args):
1976     """Call function while all daemons are stopped.
1977
1978     @type fn: callable
1979     @param fn: Function to be called
1980
1981     """
1982     # Pause watcher by acquiring an exclusive lock on watcher state file
1983     self.feedback_fn("Blocking watcher")
1984     watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
1985     try:
1986       # TODO: Currently, this just blocks. There's no timeout.
1987       # TODO: Should it be a shared lock?
1988       watcher_block.Exclusive(blocking=True)
1989
1990       # Stop master daemons, so that no new jobs can come in and all running
1991       # ones are finished
1992       self.feedback_fn("Stopping master daemons")
1993       self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
1994       try:
1995         # Stop daemons on all nodes
1996         for node_name in self.online_nodes:
1997           self.feedback_fn("Stopping daemons on %s" % node_name)
1998           self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
1999
2000         # All daemons are shut down now
2001         try:
2002           return fn(self, *args)
2003         except Exception, err:
2004           _, errmsg = FormatError(err)
2005           logging.exception("Caught exception")
2006           self.feedback_fn(errmsg)
2007           raise
2008       finally:
2009         # Start cluster again, master node last
2010         for node_name in self.nonmaster_nodes + [self.master_node]:
2011           self.feedback_fn("Starting daemons on %s" % node_name)
2012           self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
2013     finally:
2014       # Resume watcher
2015       watcher_block.Close()
2016
2017
2018 def RunWhileClusterStopped(feedback_fn, fn, *args):
2019   """Calls a function while all cluster daemons are stopped.
2020
2021   @type feedback_fn: callable
2022   @param feedback_fn: Feedback function
2023   @type fn: callable
2024   @param fn: Function to be called when daemons are stopped
2025
2026   """
2027   feedback_fn("Gathering cluster information")
2028
2029   # This ensures we're running on the master daemon
2030   cl = GetClient()
2031
2032   (cluster_name, master_node) = \
2033     cl.QueryConfigValues(["cluster_name", "master_node"])
2034
2035   online_nodes = GetOnlineNodes([], cl=cl)
2036
2037   # Don't keep a reference to the client. The master daemon will go away.
2038   del cl
2039
2040   assert master_node in online_nodes
2041
2042   return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
2043                                        online_nodes).Call(fn, *args)
2044
2045
2046 def GenerateTable(headers, fields, separator, data,
2047                   numfields=None, unitfields=None,
2048                   units=None):
2049   """Prints a table with headers and different fields.
2050
2051   @type headers: dict
2052   @param headers: dictionary mapping field names to headers for
2053       the table
2054   @type fields: list
2055   @param fields: the field names corresponding to each row in
2056       the data field
2057   @param separator: the separator to be used; if this is None,
2058       the default 'smart' algorithm is used which computes optimal
2059       field width, otherwise just the separator is used between
2060       each field
2061   @type data: list
2062   @param data: a list of lists, each sublist being one row to be output
2063   @type numfields: list
2064   @param numfields: a list with the fields that hold numeric
2065       values and thus should be right-aligned
2066   @type unitfields: list
2067   @param unitfields: a list with the fields that hold numeric
2068       values that should be formatted with the units field
2069   @type units: string or None
2070   @param units: the units we should use for formatting, or None for
2071       automatic choice (human-readable for non-separator usage, otherwise
2072       megabytes); this is a one-letter string
2073
2074   """
2075   if units is None:
2076     if separator:
2077       units = "m"
2078     else:
2079       units = "h"
2080
2081   if numfields is None:
2082     numfields = []
2083   if unitfields is None:
2084     unitfields = []
2085
2086   numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
2087   unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
2088
2089   format_fields = []
2090   for field in fields:
2091     if headers and field not in headers:
2092       # TODO: handle better unknown fields (either revert to old
2093       # style of raising exception, or deal more intelligently with
2094       # variable fields)
2095       headers[field] = field
2096     if separator is not None:
2097       format_fields.append("%s")
2098     elif numfields.Matches(field):
2099       format_fields.append("%*s")
2100     else:
2101       format_fields.append("%-*s")
2102
2103   if separator is None:
2104     mlens = [0 for name in fields]
2105     format_str = ' '.join(format_fields)
2106   else:
2107     format_str = separator.replace("%", "%%").join(format_fields)
2108
2109   for row in data:
2110     if row is None:
2111       continue
2112     for idx, val in enumerate(row):
2113       if unitfields.Matches(fields[idx]):
2114         try:
2115           val = int(val)
2116         except (TypeError, ValueError):
2117           pass
2118         else:
2119           val = row[idx] = utils.FormatUnit(val, units)
2120       val = row[idx] = str(val)
2121       if separator is None:
2122         mlens[idx] = max(mlens[idx], len(val))
2123
2124   result = []
2125   if headers:
2126     args = []
2127     for idx, name in enumerate(fields):
2128       hdr = headers[name]
2129       if separator is None:
2130         mlens[idx] = max(mlens[idx], len(hdr))
2131         args.append(mlens[idx])
2132       args.append(hdr)
2133     result.append(format_str % tuple(args))
2134
2135   if separator is None:
2136     assert len(mlens) == len(fields)
2137
2138     if fields and not numfields.Matches(fields[-1]):
2139       mlens[-1] = 0
2140
2141   for line in data:
2142     args = []
2143     if line is None:
2144       line = ['-' for _ in fields]
2145     for idx in range(len(fields)):
2146       if separator is None:
2147         args.append(mlens[idx])
2148       args.append(line[idx])
2149     result.append(format_str % tuple(args))
2150
2151   return result
2152
2153
2154 def FormatTimestamp(ts):
2155   """Formats a given timestamp.
2156
2157   @type ts: timestamp
2158   @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
2159
2160   @rtype: string
2161   @return: a string with the formatted timestamp
2162
2163   """
2164   if not isinstance (ts, (tuple, list)) or len(ts) != 2:
2165     return '?'
2166   sec, usec = ts
2167   return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
2168
2169
2170 def ParseTimespec(value):
2171   """Parse a time specification.
2172
2173   The following suffixed will be recognized:
2174
2175     - s: seconds
2176     - m: minutes
2177     - h: hours
2178     - d: day
2179     - w: weeks
2180
2181   Without any suffix, the value will be taken to be in seconds.
2182
2183   """
2184   value = str(value)
2185   if not value:
2186     raise errors.OpPrereqError("Empty time specification passed")
2187   suffix_map = {
2188     's': 1,
2189     'm': 60,
2190     'h': 3600,
2191     'd': 86400,
2192     'w': 604800,
2193     }
2194   if value[-1] not in suffix_map:
2195     try:
2196       value = int(value)
2197     except (TypeError, ValueError):
2198       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2199   else:
2200     multiplier = suffix_map[value[-1]]
2201     value = value[:-1]
2202     if not value: # no data left after stripping the suffix
2203       raise errors.OpPrereqError("Invalid time specification (only"
2204                                  " suffix passed)")
2205     try:
2206       value = int(value) * multiplier
2207     except (TypeError, ValueError):
2208       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2209   return value
2210
2211
2212 def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
2213                    filter_master=False):
2214   """Returns the names of online nodes.
2215
2216   This function will also log a warning on stderr with the names of
2217   the online nodes.
2218
2219   @param nodes: if not empty, use only this subset of nodes (minus the
2220       offline ones)
2221   @param cl: if not None, luxi client to use
2222   @type nowarn: boolean
2223   @param nowarn: by default, this function will output a note with the
2224       offline nodes that are skipped; if this parameter is True the
2225       note is not displayed
2226   @type secondary_ips: boolean
2227   @param secondary_ips: if True, return the secondary IPs instead of the
2228       names, useful for doing network traffic over the replication interface
2229       (if any)
2230   @type filter_master: boolean
2231   @param filter_master: if True, do not return the master node in the list
2232       (useful in coordination with secondary_ips where we cannot check our
2233       node name against the list)
2234
2235   """
2236   if cl is None:
2237     cl = GetClient()
2238
2239   if secondary_ips:
2240     name_idx = 2
2241   else:
2242     name_idx = 0
2243
2244   if filter_master:
2245     master_node = cl.QueryConfigValues(["master_node"])[0]
2246     filter_fn = lambda x: x != master_node
2247   else:
2248     filter_fn = lambda _: True
2249
2250   result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"],
2251                          use_locking=False)
2252   offline = [row[0] for row in result if row[1]]
2253   if offline and not nowarn:
2254     ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
2255   return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])]
2256
2257
2258 def _ToStream(stream, txt, *args):
2259   """Write a message to a stream, bypassing the logging system
2260
2261   @type stream: file object
2262   @param stream: the file to which we should write
2263   @type txt: str
2264   @param txt: the message
2265
2266   """
2267   if args:
2268     args = tuple(args)
2269     stream.write(txt % args)
2270   else:
2271     stream.write(txt)
2272   stream.write('\n')
2273   stream.flush()
2274
2275
2276 def ToStdout(txt, *args):
2277   """Write a message to stdout only, bypassing the logging system
2278
2279   This is just a wrapper over _ToStream.
2280
2281   @type txt: str
2282   @param txt: the message
2283
2284   """
2285   _ToStream(sys.stdout, txt, *args)
2286
2287
2288 def ToStderr(txt, *args):
2289   """Write a message to stderr only, bypassing the logging system
2290
2291   This is just a wrapper over _ToStream.
2292
2293   @type txt: str
2294   @param txt: the message
2295
2296   """
2297   _ToStream(sys.stderr, txt, *args)
2298
2299
2300 class JobExecutor(object):
2301   """Class which manages the submission and execution of multiple jobs.
2302
2303   Note that instances of this class should not be reused between
2304   GetResults() calls.
2305
2306   """
2307   def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
2308     self.queue = []
2309     if cl is None:
2310       cl = GetClient()
2311     self.cl = cl
2312     self.verbose = verbose
2313     self.jobs = []
2314     self.opts = opts
2315     self.feedback_fn = feedback_fn
2316
2317   def QueueJob(self, name, *ops):
2318     """Record a job for later submit.
2319
2320     @type name: string
2321     @param name: a description of the job, will be used in WaitJobSet
2322     """
2323     SetGenericOpcodeOpts(ops, self.opts)
2324     self.queue.append((name, ops))
2325
2326   def SubmitPending(self, each=False):
2327     """Submit all pending jobs.
2328
2329     """
2330     if each:
2331       results = []
2332       for row in self.queue:
2333         # SubmitJob will remove the success status, but raise an exception if
2334         # the submission fails, so we'll notice that anyway.
2335         results.append([True, self.cl.SubmitJob(row[1])])
2336     else:
2337       results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
2338     for (idx, ((status, data), (name, _))) in enumerate(zip(results,
2339                                                             self.queue)):
2340       self.jobs.append((idx, status, data, name))
2341
2342   def _ChooseJob(self):
2343     """Choose a non-waiting/queued job to poll next.
2344
2345     """
2346     assert self.jobs, "_ChooseJob called with empty job list"
2347
2348     result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
2349     assert result
2350
2351     for job_data, status in zip(self.jobs, result):
2352       if (isinstance(status, list) and status and
2353           status[0] in (constants.JOB_STATUS_QUEUED,
2354                         constants.JOB_STATUS_WAITLOCK,
2355                         constants.JOB_STATUS_CANCELING)):
2356         # job is still present and waiting
2357         continue
2358       # good candidate found (either running job or lost job)
2359       self.jobs.remove(job_data)
2360       return job_data
2361
2362     # no job found
2363     return self.jobs.pop(0)
2364
2365   def GetResults(self):
2366     """Wait for and return the results of all jobs.
2367
2368     @rtype: list
2369     @return: list of tuples (success, job results), in the same order
2370         as the submitted jobs; if a job has failed, instead of the result
2371         there will be the error message
2372
2373     """
2374     if not self.jobs:
2375       self.SubmitPending()
2376     results = []
2377     if self.verbose:
2378       ok_jobs = [row[2] for row in self.jobs if row[1]]
2379       if ok_jobs:
2380         ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2381
2382     # first, remove any non-submitted jobs
2383     self.jobs, failures = compat.partition(self.jobs, lambda x: x[1])
2384     for idx, _, jid, name in failures:
2385       ToStderr("Failed to submit job for %s: %s", name, jid)
2386       results.append((idx, False, jid))
2387
2388     while self.jobs:
2389       (idx, _, jid, name) = self._ChooseJob()
2390       ToStdout("Waiting for job %s for %s...", jid, name)
2391       try:
2392         job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2393         success = True
2394       except errors.JobLost, err:
2395         _, job_result = FormatError(err)
2396         ToStderr("Job %s for %s has been archived, cannot check its result",
2397                  jid, name)
2398         success = False
2399       except (errors.GenericError, luxi.ProtocolError), err:
2400         _, job_result = FormatError(err)
2401         success = False
2402         # the error message will always be shown, verbose or not
2403         ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
2404
2405       results.append((idx, success, job_result))
2406
2407     # sort based on the index, then drop it
2408     results.sort()
2409     results = [i[1:] for i in results]
2410
2411     return results
2412
2413   def WaitOrShow(self, wait):
2414     """Wait for job results or only print the job IDs.
2415
2416     @type wait: boolean
2417     @param wait: whether to wait or not
2418
2419     """
2420     if wait:
2421       return self.GetResults()
2422     else:
2423       if not self.jobs:
2424         self.SubmitPending()
2425       for _, status, result, name in self.jobs:
2426         if status:
2427           ToStdout("%s: %s", result, name)
2428         else:
2429           ToStderr("Failure for %s: %s", name, result)
2430       return [row[1:3] for row in self.jobs]