Support for resolving hostnames to IPv6 addresses
[ganeti-local] / lib / cli.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module dealing with command line parsing"""
23
24
25 import sys
26 import textwrap
27 import os.path
28 import time
29 import logging
30 from cStringIO import StringIO
31
32 from ganeti import utils
33 from ganeti import errors
34 from ganeti import constants
35 from ganeti import opcodes
36 from ganeti import luxi
37 from ganeti import ssconf
38 from ganeti import rpc
39 from ganeti import ssh
40 from ganeti import compat
41 from ganeti import netutils
42
43 from optparse import (OptionParser, TitledHelpFormatter,
44                       Option, OptionValueError)
45
46
47 __all__ = [
48   # Command line options
49   "ADD_UIDS_OPT",
50   "ALLOCATABLE_OPT",
51   "ALL_OPT",
52   "AUTO_PROMOTE_OPT",
53   "AUTO_REPLACE_OPT",
54   "BACKEND_OPT",
55   "CLEANUP_OPT",
56   "CLUSTER_DOMAIN_SECRET_OPT",
57   "CONFIRM_OPT",
58   "CP_SIZE_OPT",
59   "DEBUG_OPT",
60   "DEBUG_SIMERR_OPT",
61   "DISKIDX_OPT",
62   "DISK_OPT",
63   "DISK_TEMPLATE_OPT",
64   "DRAINED_OPT",
65   "DRBD_HELPER_OPT",
66   "EARLY_RELEASE_OPT",
67   "ENABLED_HV_OPT",
68   "ERROR_CODES_OPT",
69   "FIELDS_OPT",
70   "FILESTORE_DIR_OPT",
71   "FILESTORE_DRIVER_OPT",
72   "FORCE_OPT",
73   "FORCE_VARIANT_OPT",
74   "GLOBAL_FILEDIR_OPT",
75   "HVLIST_OPT",
76   "HVOPTS_OPT",
77   "HYPERVISOR_OPT",
78   "IALLOCATOR_OPT",
79   "DEFAULT_IALLOCATOR_OPT",
80   "IDENTIFY_DEFAULTS_OPT",
81   "IGNORE_CONSIST_OPT",
82   "IGNORE_FAILURES_OPT",
83   "IGNORE_REMOVE_FAILURES_OPT",
84   "IGNORE_SECONDARIES_OPT",
85   "IGNORE_SIZE_OPT",
86   "MAC_PREFIX_OPT",
87   "MAINTAIN_NODE_HEALTH_OPT",
88   "MASTER_NETDEV_OPT",
89   "MC_OPT",
90   "MIGRATION_MODE_OPT",
91   "NET_OPT",
92   "NEW_CLUSTER_CERT_OPT",
93   "NEW_CLUSTER_DOMAIN_SECRET_OPT",
94   "NEW_CONFD_HMAC_KEY_OPT",
95   "NEW_RAPI_CERT_OPT",
96   "NEW_SECONDARY_OPT",
97   "NIC_PARAMS_OPT",
98   "NODE_LIST_OPT",
99   "NODE_PLACEMENT_OPT",
100   "NODRBD_STORAGE_OPT",
101   "NOHDR_OPT",
102   "NOIPCHECK_OPT",
103   "NO_INSTALL_OPT",
104   "NONAMECHECK_OPT",
105   "NOLVM_STORAGE_OPT",
106   "NOMODIFY_ETCHOSTS_OPT",
107   "NOMODIFY_SSH_SETUP_OPT",
108   "NONICS_OPT",
109   "NONLIVE_OPT",
110   "NONPLUS1_OPT",
111   "NOSHUTDOWN_OPT",
112   "NOSTART_OPT",
113   "NOSSH_KEYCHECK_OPT",
114   "NOVOTING_OPT",
115   "NWSYNC_OPT",
116   "ON_PRIMARY_OPT",
117   "ON_SECONDARY_OPT",
118   "OFFLINE_OPT",
119   "OSPARAMS_OPT",
120   "OS_OPT",
121   "OS_SIZE_OPT",
122   "RAPI_CERT_OPT",
123   "READD_OPT",
124   "REBOOT_TYPE_OPT",
125   "REMOVE_INSTANCE_OPT",
126   "REMOVE_UIDS_OPT",
127   "RESERVED_LVS_OPT",
128   "ROMAN_OPT",
129   "SECONDARY_IP_OPT",
130   "SELECT_OS_OPT",
131   "SEP_OPT",
132   "SHOWCMD_OPT",
133   "SHUTDOWN_TIMEOUT_OPT",
134   "SINGLE_NODE_OPT",
135   "SRC_DIR_OPT",
136   "SRC_NODE_OPT",
137   "SUBMIT_OPT",
138   "STATIC_OPT",
139   "SYNC_OPT",
140   "TAG_SRC_OPT",
141   "TIMEOUT_OPT",
142   "UIDPOOL_OPT",
143   "USEUNITS_OPT",
144   "USE_REPL_NET_OPT",
145   "VERBOSE_OPT",
146   "VG_NAME_OPT",
147   "YES_DOIT_OPT",
148   # Generic functions for CLI programs
149   "GenericMain",
150   "GenericInstanceCreate",
151   "GetClient",
152   "GetOnlineNodes",
153   "JobExecutor",
154   "JobSubmittedException",
155   "ParseTimespec",
156   "RunWhileClusterStopped",
157   "SubmitOpCode",
158   "SubmitOrSend",
159   "UsesRPC",
160   # Formatting functions
161   "ToStderr", "ToStdout",
162   "FormatError",
163   "GenerateTable",
164   "AskUser",
165   "FormatTimestamp",
166   "FormatLogMessage",
167   # Tags functions
168   "ListTags",
169   "AddTags",
170   "RemoveTags",
171   # command line options support infrastructure
172   "ARGS_MANY_INSTANCES",
173   "ARGS_MANY_NODES",
174   "ARGS_NONE",
175   "ARGS_ONE_INSTANCE",
176   "ARGS_ONE_NODE",
177   "ARGS_ONE_OS",
178   "ArgChoice",
179   "ArgCommand",
180   "ArgFile",
181   "ArgHost",
182   "ArgInstance",
183   "ArgJobId",
184   "ArgNode",
185   "ArgOs",
186   "ArgSuggest",
187   "ArgUnknown",
188   "OPT_COMPL_INST_ADD_NODES",
189   "OPT_COMPL_MANY_NODES",
190   "OPT_COMPL_ONE_IALLOCATOR",
191   "OPT_COMPL_ONE_INSTANCE",
192   "OPT_COMPL_ONE_NODE",
193   "OPT_COMPL_ONE_OS",
194   "cli_option",
195   "SplitNodeOption",
196   "CalculateOSNames",
197   ]
198
199 NO_PREFIX = "no_"
200 UN_PREFIX = "-"
201
202
203 class _Argument:
204   def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
205     self.min = min
206     self.max = max
207
208   def __repr__(self):
209     return ("<%s min=%s max=%s>" %
210             (self.__class__.__name__, self.min, self.max))
211
212
213 class ArgSuggest(_Argument):
214   """Suggesting argument.
215
216   Value can be any of the ones passed to the constructor.
217
218   """
219   # pylint: disable-msg=W0622
220   def __init__(self, min=0, max=None, choices=None):
221     _Argument.__init__(self, min=min, max=max)
222     self.choices = choices
223
224   def __repr__(self):
225     return ("<%s min=%s max=%s choices=%r>" %
226             (self.__class__.__name__, self.min, self.max, self.choices))
227
228
229 class ArgChoice(ArgSuggest):
230   """Choice argument.
231
232   Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
233   but value must be one of the choices.
234
235   """
236
237
238 class ArgUnknown(_Argument):
239   """Unknown argument to program (e.g. determined at runtime).
240
241   """
242
243
244 class ArgInstance(_Argument):
245   """Instances argument.
246
247   """
248
249
250 class ArgNode(_Argument):
251   """Node argument.
252
253   """
254
255 class ArgJobId(_Argument):
256   """Job ID argument.
257
258   """
259
260
261 class ArgFile(_Argument):
262   """File path argument.
263
264   """
265
266
267 class ArgCommand(_Argument):
268   """Command argument.
269
270   """
271
272
273 class ArgHost(_Argument):
274   """Host argument.
275
276   """
277
278
279 class ArgOs(_Argument):
280   """OS argument.
281
282   """
283
284
285 ARGS_NONE = []
286 ARGS_MANY_INSTANCES = [ArgInstance()]
287 ARGS_MANY_NODES = [ArgNode()]
288 ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
289 ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
290 ARGS_ONE_OS = [ArgOs(min=1, max=1)]
291
292
293 def _ExtractTagsObject(opts, args):
294   """Extract the tag type object.
295
296   Note that this function will modify its args parameter.
297
298   """
299   if not hasattr(opts, "tag_type"):
300     raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
301   kind = opts.tag_type
302   if kind == constants.TAG_CLUSTER:
303     retval = kind, kind
304   elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
305     if not args:
306       raise errors.OpPrereqError("no arguments passed to the command")
307     name = args.pop(0)
308     retval = kind, name
309   else:
310     raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
311   return retval
312
313
314 def _ExtendTags(opts, args):
315   """Extend the args if a source file has been given.
316
317   This function will extend the tags with the contents of the file
318   passed in the 'tags_source' attribute of the opts parameter. A file
319   named '-' will be replaced by stdin.
320
321   """
322   fname = opts.tags_source
323   if fname is None:
324     return
325   if fname == "-":
326     new_fh = sys.stdin
327   else:
328     new_fh = open(fname, "r")
329   new_data = []
330   try:
331     # we don't use the nice 'new_data = [line.strip() for line in fh]'
332     # because of python bug 1633941
333     while True:
334       line = new_fh.readline()
335       if not line:
336         break
337       new_data.append(line.strip())
338   finally:
339     new_fh.close()
340   args.extend(new_data)
341
342
343 def ListTags(opts, args):
344   """List the tags on a given object.
345
346   This is a generic implementation that knows how to deal with all
347   three cases of tag objects (cluster, node, instance). The opts
348   argument is expected to contain a tag_type field denoting what
349   object type we work on.
350
351   """
352   kind, name = _ExtractTagsObject(opts, args)
353   cl = GetClient()
354   result = cl.QueryTags(kind, name)
355   result = list(result)
356   result.sort()
357   for tag in result:
358     ToStdout(tag)
359
360
361 def AddTags(opts, args):
362   """Add tags on a given object.
363
364   This is a generic implementation that knows how to deal with all
365   three cases of tag objects (cluster, node, instance). The opts
366   argument is expected to contain a tag_type field denoting what
367   object type we work on.
368
369   """
370   kind, name = _ExtractTagsObject(opts, args)
371   _ExtendTags(opts, args)
372   if not args:
373     raise errors.OpPrereqError("No tags to be added")
374   op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
375   SubmitOpCode(op)
376
377
378 def RemoveTags(opts, args):
379   """Remove tags from a given object.
380
381   This is a generic implementation that knows how to deal with all
382   three cases of tag objects (cluster, node, instance). The opts
383   argument is expected to contain a tag_type field denoting what
384   object type we work on.
385
386   """
387   kind, name = _ExtractTagsObject(opts, args)
388   _ExtendTags(opts, args)
389   if not args:
390     raise errors.OpPrereqError("No tags to be removed")
391   op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
392   SubmitOpCode(op)
393
394
395 def check_unit(option, opt, value): # pylint: disable-msg=W0613
396   """OptParsers custom converter for units.
397
398   """
399   try:
400     return utils.ParseUnit(value)
401   except errors.UnitParseError, err:
402     raise OptionValueError("option %s: %s" % (opt, err))
403
404
405 def _SplitKeyVal(opt, data):
406   """Convert a KeyVal string into a dict.
407
408   This function will convert a key=val[,...] string into a dict. Empty
409   values will be converted specially: keys which have the prefix 'no_'
410   will have the value=False and the prefix stripped, the others will
411   have value=True.
412
413   @type opt: string
414   @param opt: a string holding the option name for which we process the
415       data, used in building error messages
416   @type data: string
417   @param data: a string of the format key=val,key=val,...
418   @rtype: dict
419   @return: {key=val, key=val}
420   @raises errors.ParameterError: if there are duplicate keys
421
422   """
423   kv_dict = {}
424   if data:
425     for elem in utils.UnescapeAndSplit(data, sep=","):
426       if "=" in elem:
427         key, val = elem.split("=", 1)
428       else:
429         if elem.startswith(NO_PREFIX):
430           key, val = elem[len(NO_PREFIX):], False
431         elif elem.startswith(UN_PREFIX):
432           key, val = elem[len(UN_PREFIX):], None
433         else:
434           key, val = elem, True
435       if key in kv_dict:
436         raise errors.ParameterError("Duplicate key '%s' in option %s" %
437                                     (key, opt))
438       kv_dict[key] = val
439   return kv_dict
440
441
442 def check_ident_key_val(option, opt, value):  # pylint: disable-msg=W0613
443   """Custom parser for ident:key=val,key=val options.
444
445   This will store the parsed values as a tuple (ident, {key: val}). As such,
446   multiple uses of this option via action=append is possible.
447
448   """
449   if ":" not in value:
450     ident, rest = value, ''
451   else:
452     ident, rest = value.split(":", 1)
453
454   if ident.startswith(NO_PREFIX):
455     if rest:
456       msg = "Cannot pass options when removing parameter groups: %s" % value
457       raise errors.ParameterError(msg)
458     retval = (ident[len(NO_PREFIX):], False)
459   elif ident.startswith(UN_PREFIX):
460     if rest:
461       msg = "Cannot pass options when removing parameter groups: %s" % value
462       raise errors.ParameterError(msg)
463     retval = (ident[len(UN_PREFIX):], None)
464   else:
465     kv_dict = _SplitKeyVal(opt, rest)
466     retval = (ident, kv_dict)
467   return retval
468
469
470 def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
471   """Custom parser class for key=val,key=val options.
472
473   This will store the parsed values as a dict {key: val}.
474
475   """
476   return _SplitKeyVal(opt, value)
477
478
479 def check_bool(option, opt, value): # pylint: disable-msg=W0613
480   """Custom parser for yes/no options.
481
482   This will store the parsed value as either True or False.
483
484   """
485   value = value.lower()
486   if value == constants.VALUE_FALSE or value == "no":
487     return False
488   elif value == constants.VALUE_TRUE or value == "yes":
489     return True
490   else:
491     raise errors.ParameterError("Invalid boolean value '%s'" % value)
492
493
494 # completion_suggestion is normally a list. Using numeric values not evaluating
495 # to False for dynamic completion.
496 (OPT_COMPL_MANY_NODES,
497  OPT_COMPL_ONE_NODE,
498  OPT_COMPL_ONE_INSTANCE,
499  OPT_COMPL_ONE_OS,
500  OPT_COMPL_ONE_IALLOCATOR,
501  OPT_COMPL_INST_ADD_NODES) = range(100, 106)
502
503 OPT_COMPL_ALL = frozenset([
504   OPT_COMPL_MANY_NODES,
505   OPT_COMPL_ONE_NODE,
506   OPT_COMPL_ONE_INSTANCE,
507   OPT_COMPL_ONE_OS,
508   OPT_COMPL_ONE_IALLOCATOR,
509   OPT_COMPL_INST_ADD_NODES,
510   ])
511
512
513 class CliOption(Option):
514   """Custom option class for optparse.
515
516   """
517   ATTRS = Option.ATTRS + [
518     "completion_suggest",
519     ]
520   TYPES = Option.TYPES + (
521     "identkeyval",
522     "keyval",
523     "unit",
524     "bool",
525     )
526   TYPE_CHECKER = Option.TYPE_CHECKER.copy()
527   TYPE_CHECKER["identkeyval"] = check_ident_key_val
528   TYPE_CHECKER["keyval"] = check_key_val
529   TYPE_CHECKER["unit"] = check_unit
530   TYPE_CHECKER["bool"] = check_bool
531
532
533 # optparse.py sets make_option, so we do it for our own option class, too
534 cli_option = CliOption
535
536
537 _YORNO = "yes|no"
538
539 DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
540                        help="Increase debugging level")
541
542 NOHDR_OPT = cli_option("--no-headers", default=False,
543                        action="store_true", dest="no_headers",
544                        help="Don't display column headers")
545
546 SEP_OPT = cli_option("--separator", default=None,
547                      action="store", dest="separator",
548                      help=("Separator between output fields"
549                            " (defaults to one space)"))
550
551 USEUNITS_OPT = cli_option("--units", default=None,
552                           dest="units", choices=('h', 'm', 'g', 't'),
553                           help="Specify units for output (one of hmgt)")
554
555 FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
556                         type="string", metavar="FIELDS",
557                         help="Comma separated list of output fields")
558
559 FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
560                        default=False, help="Force the operation")
561
562 CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
563                          default=False, help="Do not require confirmation")
564
565 TAG_SRC_OPT = cli_option("--from", dest="tags_source",
566                          default=None, help="File with tag names")
567
568 SUBMIT_OPT = cli_option("--submit", dest="submit_only",
569                         default=False, action="store_true",
570                         help=("Submit the job and return the job ID, but"
571                               " don't wait for the job to finish"))
572
573 SYNC_OPT = cli_option("--sync", dest="do_locking",
574                       default=False, action="store_true",
575                       help=("Grab locks while doing the queries"
576                             " in order to ensure more consistent results"))
577
578 _DRY_RUN_OPT = cli_option("--dry-run", default=False,
579                           action="store_true",
580                           help=("Do not execute the operation, just run the"
581                                 " check steps and verify it it could be"
582                                 " executed"))
583
584 VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
585                          action="store_true",
586                          help="Increase the verbosity of the operation")
587
588 DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
589                               action="store_true", dest="simulate_errors",
590                               help="Debugging option that makes the operation"
591                               " treat most runtime checks as failed")
592
593 NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
594                         default=True, action="store_false",
595                         help="Don't wait for sync (DANGEROUS!)")
596
597 DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
598                                help="Custom disk setup (diskless, file,"
599                                " plain or drbd)",
600                                default=None, metavar="TEMPL",
601                                choices=list(constants.DISK_TEMPLATES))
602
603 NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
604                         help="Do not create any network cards for"
605                         " the instance")
606
607 FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
608                                help="Relative path under default cluster-wide"
609                                " file storage dir to store file-based disks",
610                                default=None, metavar="<DIR>")
611
612 FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
613                                   help="Driver to use for image files",
614                                   default="loop", metavar="<DRIVER>",
615                                   choices=list(constants.FILE_DRIVER))
616
617 IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
618                             help="Select nodes for the instance automatically"
619                             " using the <NAME> iallocator plugin",
620                             default=None, type="string",
621                             completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
622
623 DEFAULT_IALLOCATOR_OPT = cli_option("-I", "--default-iallocator",
624                             metavar="<NAME>",
625                             help="Set the default instance allocator plugin",
626                             default=None, type="string",
627                             completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
628
629 OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
630                     metavar="<os>",
631                     completion_suggest=OPT_COMPL_ONE_OS)
632
633 OSPARAMS_OPT = cli_option("-O", "--os-parameters", dest="osparams",
634                          type="keyval", default={},
635                          help="OS parameters")
636
637 FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
638                                action="store_true", default=False,
639                                help="Force an unknown variant")
640
641 NO_INSTALL_OPT = cli_option("--no-install", dest="no_install",
642                             action="store_true", default=False,
643                             help="Do not install the OS (will"
644                             " enable no-start)")
645
646 BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
647                          type="keyval", default={},
648                          help="Backend parameters")
649
650 HVOPTS_OPT =  cli_option("-H", "--hypervisor-parameters", type="keyval",
651                          default={}, dest="hvparams",
652                          help="Hypervisor parameters")
653
654 HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
655                             help="Hypervisor and hypervisor options, in the"
656                             " format hypervisor:option=value,option=value,...",
657                             default=None, type="identkeyval")
658
659 HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
660                         help="Hypervisor and hypervisor options, in the"
661                         " format hypervisor:option=value,option=value,...",
662                         default=[], action="append", type="identkeyval")
663
664 NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
665                            action="store_false",
666                            help="Don't check that the instance's IP"
667                            " is alive")
668
669 NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
670                              default=True, action="store_false",
671                              help="Don't check that the instance's name"
672                              " is resolvable")
673
674 NET_OPT = cli_option("--net",
675                      help="NIC parameters", default=[],
676                      dest="nics", action="append", type="identkeyval")
677
678 DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
679                       dest="disks", action="append", type="identkeyval")
680
681 DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
682                          help="Comma-separated list of disks"
683                          " indices to act on (e.g. 0,2) (optional,"
684                          " defaults to all disks)")
685
686 OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
687                          help="Enforces a single-disk configuration using the"
688                          " given disk size, in MiB unless a suffix is used",
689                          default=None, type="unit", metavar="<size>")
690
691 IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
692                                 dest="ignore_consistency",
693                                 action="store_true", default=False,
694                                 help="Ignore the consistency of the disks on"
695                                 " the secondary")
696
697 NONLIVE_OPT = cli_option("--non-live", dest="live",
698                          default=True, action="store_false",
699                          help="Do a non-live migration (this usually means"
700                          " freeze the instance, save the state, transfer and"
701                          " only then resume running on the secondary node)")
702
703 MIGRATION_MODE_OPT = cli_option("--migration-mode", dest="migration_mode",
704                                 default=None,
705                                 choices=list(constants.HT_MIGRATION_MODES),
706                                 help="Override default migration mode (choose"
707                                 " either live or non-live")
708
709 NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
710                                 help="Target node and optional secondary node",
711                                 metavar="<pnode>[:<snode>]",
712                                 completion_suggest=OPT_COMPL_INST_ADD_NODES)
713
714 NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
715                            action="append", metavar="<node>",
716                            help="Use only this node (can be used multiple"
717                            " times, if not given defaults to all nodes)",
718                            completion_suggest=OPT_COMPL_ONE_NODE)
719
720 SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
721                              metavar="<node>",
722                              completion_suggest=OPT_COMPL_ONE_NODE)
723
724 NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
725                          action="store_false",
726                          help="Don't start the instance after creation")
727
728 SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
729                          action="store_true", default=False,
730                          help="Show command instead of executing it")
731
732 CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
733                          default=False, action="store_true",
734                          help="Instead of performing the migration, try to"
735                          " recover from a failed cleanup. This is safe"
736                          " to run even if the instance is healthy, but it"
737                          " will create extra replication traffic and "
738                          " disrupt briefly the replication (like during the"
739                          " migration")
740
741 STATIC_OPT = cli_option("-s", "--static", dest="static",
742                         action="store_true", default=False,
743                         help="Only show configuration data, not runtime data")
744
745 ALL_OPT = cli_option("--all", dest="show_all",
746                      default=False, action="store_true",
747                      help="Show info on all instances on the cluster."
748                      " This can take a long time to run, use wisely")
749
750 SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
751                            action="store_true", default=False,
752                            help="Interactive OS reinstall, lists available"
753                            " OS templates for selection")
754
755 IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
756                                  action="store_true", default=False,
757                                  help="Remove the instance from the cluster"
758                                  " configuration even if there are failures"
759                                  " during the removal process")
760
761 IGNORE_REMOVE_FAILURES_OPT = cli_option("--ignore-remove-failures",
762                                         dest="ignore_remove_failures",
763                                         action="store_true", default=False,
764                                         help="Remove the instance from the"
765                                         " cluster configuration even if there"
766                                         " are failures during the removal"
767                                         " process")
768
769 REMOVE_INSTANCE_OPT = cli_option("--remove-instance", dest="remove_instance",
770                                  action="store_true", default=False,
771                                  help="Remove the instance from the cluster")
772
773 NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
774                                help="Specifies the new secondary node",
775                                metavar="NODE", default=None,
776                                completion_suggest=OPT_COMPL_ONE_NODE)
777
778 ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
779                             default=False, action="store_true",
780                             help="Replace the disk(s) on the primary"
781                             " node (only for the drbd template)")
782
783 ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
784                               default=False, action="store_true",
785                               help="Replace the disk(s) on the secondary"
786                               " node (only for the drbd template)")
787
788 AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
789                               default=False, action="store_true",
790                               help="Lock all nodes and auto-promote as needed"
791                               " to MC status")
792
793 AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
794                               default=False, action="store_true",
795                               help="Automatically replace faulty disks"
796                               " (only for the drbd template)")
797
798 IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
799                              default=False, action="store_true",
800                              help="Ignore current recorded size"
801                              " (useful for forcing activation when"
802                              " the recorded size is wrong)")
803
804 SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
805                           metavar="<node>",
806                           completion_suggest=OPT_COMPL_ONE_NODE)
807
808 SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
809                          metavar="<dir>")
810
811 SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
812                               help="Specify the secondary ip for the node",
813                               metavar="ADDRESS", default=None)
814
815 READD_OPT = cli_option("--readd", dest="readd",
816                        default=False, action="store_true",
817                        help="Readd old node after replacing it")
818
819 NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
820                                 default=True, action="store_false",
821                                 help="Disable SSH key fingerprint checking")
822
823
824 MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
825                     type="bool", default=None, metavar=_YORNO,
826                     help="Set the master_candidate flag on the node")
827
828 OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
829                          type="bool", default=None,
830                          help="Set the offline flag on the node")
831
832 DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
833                          type="bool", default=None,
834                          help="Set the drained flag on the node")
835
836 ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
837                              type="bool", default=None, metavar=_YORNO,
838                              help="Set the allocatable flag on a volume")
839
840 NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
841                                help="Disable support for lvm based instances"
842                                " (cluster-wide)",
843                                action="store_false", default=True)
844
845 ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
846                             dest="enabled_hypervisors",
847                             help="Comma-separated list of hypervisors",
848                             type="string", default=None)
849
850 NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
851                             type="keyval", default={},
852                             help="NIC parameters")
853
854 CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
855                          dest="candidate_pool_size", type="int",
856                          help="Set the candidate pool size")
857
858 VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name",
859                          help="Enables LVM and specifies the volume group"
860                          " name (cluster-wide) for disk allocation [xenvg]",
861                          metavar="VG", default=None)
862
863 YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
864                           help="Destroy cluster", action="store_true")
865
866 NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
867                           help="Skip node agreement check (dangerous)",
868                           action="store_true", default=False)
869
870 MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
871                             help="Specify the mac prefix for the instance IP"
872                             " addresses, in the format XX:XX:XX",
873                             metavar="PREFIX",
874                             default=None)
875
876 MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
877                                help="Specify the node interface (cluster-wide)"
878                                " on which the master IP address will be added "
879                                " [%s]" % constants.DEFAULT_BRIDGE,
880                                metavar="NETDEV",
881                                default=constants.DEFAULT_BRIDGE)
882
883 GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
884                                 help="Specify the default directory (cluster-"
885                                 "wide) for storing the file-based disks [%s]" %
886                                 constants.DEFAULT_FILE_STORAGE_DIR,
887                                 metavar="DIR",
888                                 default=constants.DEFAULT_FILE_STORAGE_DIR)
889
890 NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
891                                    help="Don't modify /etc/hosts",
892                                    action="store_false", default=True)
893
894 NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
895                                     help="Don't initialize SSH keys",
896                                     action="store_false", default=True)
897
898 ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
899                              help="Enable parseable error messages",
900                              action="store_true", default=False)
901
902 NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
903                           help="Skip N+1 memory redundancy tests",
904                           action="store_true", default=False)
905
906 REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
907                              help="Type of reboot: soft/hard/full",
908                              default=constants.INSTANCE_REBOOT_HARD,
909                              metavar="<REBOOT>",
910                              choices=list(constants.REBOOT_TYPES))
911
912 IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
913                                     dest="ignore_secondaries",
914                                     default=False, action="store_true",
915                                     help="Ignore errors from secondaries")
916
917 NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
918                             action="store_false", default=True,
919                             help="Don't shutdown the instance (unsafe)")
920
921 TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
922                          default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
923                          help="Maximum time to wait")
924
925 SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
926                          dest="shutdown_timeout", type="int",
927                          default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
928                          help="Maximum time to wait for instance shutdown")
929
930 EARLY_RELEASE_OPT = cli_option("--early-release",
931                                dest="early_release", default=False,
932                                action="store_true",
933                                help="Release the locks on the secondary"
934                                " node(s) early")
935
936 NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
937                                   dest="new_cluster_cert",
938                                   default=False, action="store_true",
939                                   help="Generate a new cluster certificate")
940
941 RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
942                            default=None,
943                            help="File containing new RAPI certificate")
944
945 NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
946                                default=None, action="store_true",
947                                help=("Generate a new self-signed RAPI"
948                                      " certificate"))
949
950 NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
951                                     dest="new_confd_hmac_key",
952                                     default=False, action="store_true",
953                                     help=("Create a new HMAC key for %s" %
954                                           constants.CONFD))
955
956 CLUSTER_DOMAIN_SECRET_OPT = cli_option("--cluster-domain-secret",
957                                        dest="cluster_domain_secret",
958                                        default=None,
959                                        help=("Load new new cluster domain"
960                                              " secret from file"))
961
962 NEW_CLUSTER_DOMAIN_SECRET_OPT = cli_option("--new-cluster-domain-secret",
963                                            dest="new_cluster_domain_secret",
964                                            default=False, action="store_true",
965                                            help=("Create a new cluster domain"
966                                                  " secret"))
967
968 USE_REPL_NET_OPT = cli_option("--use-replication-network",
969                               dest="use_replication_network",
970                               help="Whether to use the replication network"
971                               " for talking to the nodes",
972                               action="store_true", default=False)
973
974 MAINTAIN_NODE_HEALTH_OPT = \
975     cli_option("--maintain-node-health", dest="maintain_node_health",
976                metavar=_YORNO, default=None, type="bool",
977                help="Configure the cluster to automatically maintain node"
978                " health, by shutting down unknown instances, shutting down"
979                " unknown DRBD devices, etc.")
980
981 IDENTIFY_DEFAULTS_OPT = \
982     cli_option("--identify-defaults", dest="identify_defaults",
983                default=False, action="store_true",
984                help="Identify which saved instance parameters are equal to"
985                " the current cluster defaults and set them as such, instead"
986                " of marking them as overridden")
987
988 UIDPOOL_OPT = cli_option("--uid-pool", default=None,
989                          action="store", dest="uid_pool",
990                          help=("A list of user-ids or user-id"
991                                " ranges separated by commas"))
992
993 ADD_UIDS_OPT = cli_option("--add-uids", default=None,
994                           action="store", dest="add_uids",
995                           help=("A list of user-ids or user-id"
996                                 " ranges separated by commas, to be"
997                                 " added to the user-id pool"))
998
999 REMOVE_UIDS_OPT = cli_option("--remove-uids", default=None,
1000                              action="store", dest="remove_uids",
1001                              help=("A list of user-ids or user-id"
1002                                    " ranges separated by commas, to be"
1003                                    " removed from the user-id pool"))
1004
1005 RESERVED_LVS_OPT = cli_option("--reserved-lvs", default=None,
1006                              action="store", dest="reserved_lvs",
1007                              help=("A comma-separated list of reserved"
1008                                    " logical volumes names, that will be"
1009                                    " ignored by cluster verify"))
1010
1011 ROMAN_OPT = cli_option("--roman",
1012                        dest="roman_integers", default=False,
1013                        action="store_true",
1014                        help="Use roman numbers for positive integers")
1015
1016 DRBD_HELPER_OPT = cli_option("--drbd-usermode-helper", dest="drbd_helper",
1017                              action="store", default=None,
1018                              help="Specifies usermode helper for DRBD")
1019
1020 NODRBD_STORAGE_OPT = cli_option("--no-drbd-storage", dest="drbd_storage",
1021                                 action="store_false", default=True,
1022                                 help="Disable support for DRBD")
1023
1024
1025 def _ParseArgs(argv, commands, aliases):
1026   """Parser for the command line arguments.
1027
1028   This function parses the arguments and returns the function which
1029   must be executed together with its (modified) arguments.
1030
1031   @param argv: the command line
1032   @param commands: dictionary with special contents, see the design
1033       doc for cmdline handling
1034   @param aliases: dictionary with command aliases {'alias': 'target, ...}
1035
1036   """
1037   if len(argv) == 0:
1038     binary = "<command>"
1039   else:
1040     binary = argv[0].split("/")[-1]
1041
1042   if len(argv) > 1 and argv[1] == "--version":
1043     ToStdout("%s (ganeti %s) %s", binary, constants.VCS_VERSION,
1044              constants.RELEASE_VERSION)
1045     # Quit right away. That way we don't have to care about this special
1046     # argument. optparse.py does it the same.
1047     sys.exit(0)
1048
1049   if len(argv) < 2 or not (argv[1] in commands or
1050                            argv[1] in aliases):
1051     # let's do a nice thing
1052     sortedcmds = commands.keys()
1053     sortedcmds.sort()
1054
1055     ToStdout("Usage: %s {command} [options...] [argument...]", binary)
1056     ToStdout("%s <command> --help to see details, or man %s", binary, binary)
1057     ToStdout("")
1058
1059     # compute the max line length for cmd + usage
1060     mlen = max([len(" %s" % cmd) for cmd in commands])
1061     mlen = min(60, mlen) # should not get here...
1062
1063     # and format a nice command list
1064     ToStdout("Commands:")
1065     for cmd in sortedcmds:
1066       cmdstr = " %s" % (cmd,)
1067       help_text = commands[cmd][4]
1068       help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
1069       ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
1070       for line in help_lines:
1071         ToStdout("%-*s   %s", mlen, "", line)
1072
1073     ToStdout("")
1074
1075     return None, None, None
1076
1077   # get command, unalias it, and look it up in commands
1078   cmd = argv.pop(1)
1079   if cmd in aliases:
1080     if cmd in commands:
1081       raise errors.ProgrammerError("Alias '%s' overrides an existing"
1082                                    " command" % cmd)
1083
1084     if aliases[cmd] not in commands:
1085       raise errors.ProgrammerError("Alias '%s' maps to non-existing"
1086                                    " command '%s'" % (cmd, aliases[cmd]))
1087
1088     cmd = aliases[cmd]
1089
1090   func, args_def, parser_opts, usage, description = commands[cmd]
1091   parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT, DEBUG_OPT],
1092                         description=description,
1093                         formatter=TitledHelpFormatter(),
1094                         usage="%%prog %s %s" % (cmd, usage))
1095   parser.disable_interspersed_args()
1096   options, args = parser.parse_args()
1097
1098   if not _CheckArguments(cmd, args_def, args):
1099     return None, None, None
1100
1101   return func, options, args
1102
1103
1104 def _CheckArguments(cmd, args_def, args):
1105   """Verifies the arguments using the argument definition.
1106
1107   Algorithm:
1108
1109     1. Abort with error if values specified by user but none expected.
1110
1111     1. For each argument in definition
1112
1113       1. Keep running count of minimum number of values (min_count)
1114       1. Keep running count of maximum number of values (max_count)
1115       1. If it has an unlimited number of values
1116
1117         1. Abort with error if it's not the last argument in the definition
1118
1119     1. If last argument has limited number of values
1120
1121       1. Abort with error if number of values doesn't match or is too large
1122
1123     1. Abort with error if user didn't pass enough values (min_count)
1124
1125   """
1126   if args and not args_def:
1127     ToStderr("Error: Command %s expects no arguments", cmd)
1128     return False
1129
1130   min_count = None
1131   max_count = None
1132   check_max = None
1133
1134   last_idx = len(args_def) - 1
1135
1136   for idx, arg in enumerate(args_def):
1137     if min_count is None:
1138       min_count = arg.min
1139     elif arg.min is not None:
1140       min_count += arg.min
1141
1142     if max_count is None:
1143       max_count = arg.max
1144     elif arg.max is not None:
1145       max_count += arg.max
1146
1147     if idx == last_idx:
1148       check_max = (arg.max is not None)
1149
1150     elif arg.max is None:
1151       raise errors.ProgrammerError("Only the last argument can have max=None")
1152
1153   if check_max:
1154     # Command with exact number of arguments
1155     if (min_count is not None and max_count is not None and
1156         min_count == max_count and len(args) != min_count):
1157       ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
1158       return False
1159
1160     # Command with limited number of arguments
1161     if max_count is not None and len(args) > max_count:
1162       ToStderr("Error: Command %s expects only %d argument(s)",
1163                cmd, max_count)
1164       return False
1165
1166   # Command with some required arguments
1167   if min_count is not None and len(args) < min_count:
1168     ToStderr("Error: Command %s expects at least %d argument(s)",
1169              cmd, min_count)
1170     return False
1171
1172   return True
1173
1174
1175 def SplitNodeOption(value):
1176   """Splits the value of a --node option.
1177
1178   """
1179   if value and ':' in value:
1180     return value.split(':', 1)
1181   else:
1182     return (value, None)
1183
1184
1185 def CalculateOSNames(os_name, os_variants):
1186   """Calculates all the names an OS can be called, according to its variants.
1187
1188   @type os_name: string
1189   @param os_name: base name of the os
1190   @type os_variants: list or None
1191   @param os_variants: list of supported variants
1192   @rtype: list
1193   @return: list of valid names
1194
1195   """
1196   if os_variants:
1197     return ['%s+%s' % (os_name, v) for v in os_variants]
1198   else:
1199     return [os_name]
1200
1201
1202 UsesRPC = rpc.RunWithRPC
1203
1204
1205 def AskUser(text, choices=None):
1206   """Ask the user a question.
1207
1208   @param text: the question to ask
1209
1210   @param choices: list with elements tuples (input_char, return_value,
1211       description); if not given, it will default to: [('y', True,
1212       'Perform the operation'), ('n', False, 'Do no do the operation')];
1213       note that the '?' char is reserved for help
1214
1215   @return: one of the return values from the choices list; if input is
1216       not possible (i.e. not running with a tty, we return the last
1217       entry from the list
1218
1219   """
1220   if choices is None:
1221     choices = [('y', True, 'Perform the operation'),
1222                ('n', False, 'Do not perform the operation')]
1223   if not choices or not isinstance(choices, list):
1224     raise errors.ProgrammerError("Invalid choices argument to AskUser")
1225   for entry in choices:
1226     if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
1227       raise errors.ProgrammerError("Invalid choices element to AskUser")
1228
1229   answer = choices[-1][1]
1230   new_text = []
1231   for line in text.splitlines():
1232     new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
1233   text = "\n".join(new_text)
1234   try:
1235     f = file("/dev/tty", "a+")
1236   except IOError:
1237     return answer
1238   try:
1239     chars = [entry[0] for entry in choices]
1240     chars[-1] = "[%s]" % chars[-1]
1241     chars.append('?')
1242     maps = dict([(entry[0], entry[1]) for entry in choices])
1243     while True:
1244       f.write(text)
1245       f.write('\n')
1246       f.write("/".join(chars))
1247       f.write(": ")
1248       line = f.readline(2).strip().lower()
1249       if line in maps:
1250         answer = maps[line]
1251         break
1252       elif line == '?':
1253         for entry in choices:
1254           f.write(" %s - %s\n" % (entry[0], entry[2]))
1255         f.write("\n")
1256         continue
1257   finally:
1258     f.close()
1259   return answer
1260
1261
1262 class JobSubmittedException(Exception):
1263   """Job was submitted, client should exit.
1264
1265   This exception has one argument, the ID of the job that was
1266   submitted. The handler should print this ID.
1267
1268   This is not an error, just a structured way to exit from clients.
1269
1270   """
1271
1272
1273 def SendJob(ops, cl=None):
1274   """Function to submit an opcode without waiting for the results.
1275
1276   @type ops: list
1277   @param ops: list of opcodes
1278   @type cl: luxi.Client
1279   @param cl: the luxi client to use for communicating with the master;
1280              if None, a new client will be created
1281
1282   """
1283   if cl is None:
1284     cl = GetClient()
1285
1286   job_id = cl.SubmitJob(ops)
1287
1288   return job_id
1289
1290
1291 def GenericPollJob(job_id, cbs, report_cbs):
1292   """Generic job-polling function.
1293
1294   @type job_id: number
1295   @param job_id: Job ID
1296   @type cbs: Instance of L{JobPollCbBase}
1297   @param cbs: Data callbacks
1298   @type report_cbs: Instance of L{JobPollReportCbBase}
1299   @param report_cbs: Reporting callbacks
1300
1301   """
1302   prev_job_info = None
1303   prev_logmsg_serial = None
1304
1305   status = None
1306
1307   while True:
1308     result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1309                                       prev_logmsg_serial)
1310     if not result:
1311       # job not found, go away!
1312       raise errors.JobLost("Job with id %s lost" % job_id)
1313
1314     if result == constants.JOB_NOTCHANGED:
1315       report_cbs.ReportNotChanged(job_id, status)
1316
1317       # Wait again
1318       continue
1319
1320     # Split result, a tuple of (field values, log entries)
1321     (job_info, log_entries) = result
1322     (status, ) = job_info
1323
1324     if log_entries:
1325       for log_entry in log_entries:
1326         (serial, timestamp, log_type, message) = log_entry
1327         report_cbs.ReportLogMessage(job_id, serial, timestamp,
1328                                     log_type, message)
1329         prev_logmsg_serial = max(prev_logmsg_serial, serial)
1330
1331     # TODO: Handle canceled and archived jobs
1332     elif status in (constants.JOB_STATUS_SUCCESS,
1333                     constants.JOB_STATUS_ERROR,
1334                     constants.JOB_STATUS_CANCELING,
1335                     constants.JOB_STATUS_CANCELED):
1336       break
1337
1338     prev_job_info = job_info
1339
1340   jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1341   if not jobs:
1342     raise errors.JobLost("Job with id %s lost" % job_id)
1343
1344   status, opstatus, result = jobs[0]
1345
1346   if status == constants.JOB_STATUS_SUCCESS:
1347     return result
1348
1349   if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
1350     raise errors.OpExecError("Job was canceled")
1351
1352   has_ok = False
1353   for idx, (status, msg) in enumerate(zip(opstatus, result)):
1354     if status == constants.OP_STATUS_SUCCESS:
1355       has_ok = True
1356     elif status == constants.OP_STATUS_ERROR:
1357       errors.MaybeRaise(msg)
1358
1359       if has_ok:
1360         raise errors.OpExecError("partial failure (opcode %d): %s" %
1361                                  (idx, msg))
1362
1363       raise errors.OpExecError(str(msg))
1364
1365   # default failure mode
1366   raise errors.OpExecError(result)
1367
1368
1369 class JobPollCbBase:
1370   """Base class for L{GenericPollJob} callbacks.
1371
1372   """
1373   def __init__(self):
1374     """Initializes this class.
1375
1376     """
1377
1378   def WaitForJobChangeOnce(self, job_id, fields,
1379                            prev_job_info, prev_log_serial):
1380     """Waits for changes on a job.
1381
1382     """
1383     raise NotImplementedError()
1384
1385   def QueryJobs(self, job_ids, fields):
1386     """Returns the selected fields for the selected job IDs.
1387
1388     @type job_ids: list of numbers
1389     @param job_ids: Job IDs
1390     @type fields: list of strings
1391     @param fields: Fields
1392
1393     """
1394     raise NotImplementedError()
1395
1396
1397 class JobPollReportCbBase:
1398   """Base class for L{GenericPollJob} reporting callbacks.
1399
1400   """
1401   def __init__(self):
1402     """Initializes this class.
1403
1404     """
1405
1406   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1407     """Handles a log message.
1408
1409     """
1410     raise NotImplementedError()
1411
1412   def ReportNotChanged(self, job_id, status):
1413     """Called for if a job hasn't changed in a while.
1414
1415     @type job_id: number
1416     @param job_id: Job ID
1417     @type status: string or None
1418     @param status: Job status if available
1419
1420     """
1421     raise NotImplementedError()
1422
1423
1424 class _LuxiJobPollCb(JobPollCbBase):
1425   def __init__(self, cl):
1426     """Initializes this class.
1427
1428     """
1429     JobPollCbBase.__init__(self)
1430     self.cl = cl
1431
1432   def WaitForJobChangeOnce(self, job_id, fields,
1433                            prev_job_info, prev_log_serial):
1434     """Waits for changes on a job.
1435
1436     """
1437     return self.cl.WaitForJobChangeOnce(job_id, fields,
1438                                         prev_job_info, prev_log_serial)
1439
1440   def QueryJobs(self, job_ids, fields):
1441     """Returns the selected fields for the selected job IDs.
1442
1443     """
1444     return self.cl.QueryJobs(job_ids, fields)
1445
1446
1447 class FeedbackFnJobPollReportCb(JobPollReportCbBase):
1448   def __init__(self, feedback_fn):
1449     """Initializes this class.
1450
1451     """
1452     JobPollReportCbBase.__init__(self)
1453
1454     self.feedback_fn = feedback_fn
1455
1456     assert callable(feedback_fn)
1457
1458   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1459     """Handles a log message.
1460
1461     """
1462     self.feedback_fn((timestamp, log_type, log_msg))
1463
1464   def ReportNotChanged(self, job_id, status):
1465     """Called if a job hasn't changed in a while.
1466
1467     """
1468     # Ignore
1469
1470
1471 class StdioJobPollReportCb(JobPollReportCbBase):
1472   def __init__(self):
1473     """Initializes this class.
1474
1475     """
1476     JobPollReportCbBase.__init__(self)
1477
1478     self.notified_queued = False
1479     self.notified_waitlock = False
1480
1481   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1482     """Handles a log message.
1483
1484     """
1485     ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
1486              FormatLogMessage(log_type, log_msg))
1487
1488   def ReportNotChanged(self, job_id, status):
1489     """Called if a job hasn't changed in a while.
1490
1491     """
1492     if status is None:
1493       return
1494
1495     if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
1496       ToStderr("Job %s is waiting in queue", job_id)
1497       self.notified_queued = True
1498
1499     elif status == constants.JOB_STATUS_WAITLOCK and not self.notified_waitlock:
1500       ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1501       self.notified_waitlock = True
1502
1503
1504 def FormatLogMessage(log_type, log_msg):
1505   """Formats a job message according to its type.
1506
1507   """
1508   if log_type != constants.ELOG_MESSAGE:
1509     log_msg = str(log_msg)
1510
1511   return utils.SafeEncode(log_msg)
1512
1513
1514 def PollJob(job_id, cl=None, feedback_fn=None, reporter=None):
1515   """Function to poll for the result of a job.
1516
1517   @type job_id: job identified
1518   @param job_id: the job to poll for results
1519   @type cl: luxi.Client
1520   @param cl: the luxi client to use for communicating with the master;
1521              if None, a new client will be created
1522
1523   """
1524   if cl is None:
1525     cl = GetClient()
1526
1527   if reporter is None:
1528     if feedback_fn:
1529       reporter = FeedbackFnJobPollReportCb(feedback_fn)
1530     else:
1531       reporter = StdioJobPollReportCb()
1532   elif feedback_fn:
1533     raise errors.ProgrammerError("Can't specify reporter and feedback function")
1534
1535   return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
1536
1537
1538 def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None, reporter=None):
1539   """Legacy function to submit an opcode.
1540
1541   This is just a simple wrapper over the construction of the processor
1542   instance. It should be extended to better handle feedback and
1543   interaction functions.
1544
1545   """
1546   if cl is None:
1547     cl = GetClient()
1548
1549   SetGenericOpcodeOpts([op], opts)
1550
1551   job_id = SendJob([op], cl=cl)
1552
1553   op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn,
1554                        reporter=reporter)
1555
1556   return op_results[0]
1557
1558
1559 def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1560   """Wrapper around SubmitOpCode or SendJob.
1561
1562   This function will decide, based on the 'opts' parameter, whether to
1563   submit and wait for the result of the opcode (and return it), or
1564   whether to just send the job and print its identifier. It is used in
1565   order to simplify the implementation of the '--submit' option.
1566
1567   It will also process the opcodes if we're sending the via SendJob
1568   (otherwise SubmitOpCode does it).
1569
1570   """
1571   if opts and opts.submit_only:
1572     job = [op]
1573     SetGenericOpcodeOpts(job, opts)
1574     job_id = SendJob(job, cl=cl)
1575     raise JobSubmittedException(job_id)
1576   else:
1577     return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1578
1579
1580 def SetGenericOpcodeOpts(opcode_list, options):
1581   """Processor for generic options.
1582
1583   This function updates the given opcodes based on generic command
1584   line options (like debug, dry-run, etc.).
1585
1586   @param opcode_list: list of opcodes
1587   @param options: command line options or None
1588   @return: None (in-place modification)
1589
1590   """
1591   if not options:
1592     return
1593   for op in opcode_list:
1594     op.dry_run = options.dry_run
1595     op.debug_level = options.debug
1596
1597
1598 def GetClient():
1599   # TODO: Cache object?
1600   try:
1601     client = luxi.Client()
1602   except luxi.NoMasterError:
1603     ss = ssconf.SimpleStore()
1604
1605     # Try to read ssconf file
1606     try:
1607       ss.GetMasterNode()
1608     except errors.ConfigurationError:
1609       raise errors.OpPrereqError("Cluster not initialized or this machine is"
1610                                  " not part of a cluster")
1611
1612     master, myself = ssconf.GetMasterAndMyself(ss=ss)
1613     if master != myself:
1614       raise errors.OpPrereqError("This is not the master node, please connect"
1615                                  " to node '%s' and rerun the command" %
1616                                  master)
1617     raise
1618   return client
1619
1620
1621 def FormatError(err):
1622   """Return a formatted error message for a given error.
1623
1624   This function takes an exception instance and returns a tuple
1625   consisting of two values: first, the recommended exit code, and
1626   second, a string describing the error message (not
1627   newline-terminated).
1628
1629   """
1630   retcode = 1
1631   obuf = StringIO()
1632   msg = str(err)
1633   if isinstance(err, errors.ConfigurationError):
1634     txt = "Corrupt configuration file: %s" % msg
1635     logging.error(txt)
1636     obuf.write(txt + "\n")
1637     obuf.write("Aborting.")
1638     retcode = 2
1639   elif isinstance(err, errors.HooksAbort):
1640     obuf.write("Failure: hooks execution failed:\n")
1641     for node, script, out in err.args[0]:
1642       if out:
1643         obuf.write("  node: %s, script: %s, output: %s\n" %
1644                    (node, script, out))
1645       else:
1646         obuf.write("  node: %s, script: %s (no output)\n" %
1647                    (node, script))
1648   elif isinstance(err, errors.HooksFailure):
1649     obuf.write("Failure: hooks general failure: %s" % msg)
1650   elif isinstance(err, errors.ResolverError):
1651     this_host = netutils.Hostname.GetSysName()
1652     if err.args[0] == this_host:
1653       msg = "Failure: can't resolve my own hostname ('%s')"
1654     else:
1655       msg = "Failure: can't resolve hostname '%s'"
1656     obuf.write(msg % err.args[0])
1657   elif isinstance(err, errors.OpPrereqError):
1658     if len(err.args) == 2:
1659       obuf.write("Failure: prerequisites not met for this"
1660                " operation:\nerror type: %s, error details:\n%s" %
1661                  (err.args[1], err.args[0]))
1662     else:
1663       obuf.write("Failure: prerequisites not met for this"
1664                  " operation:\n%s" % msg)
1665   elif isinstance(err, errors.OpExecError):
1666     obuf.write("Failure: command execution error:\n%s" % msg)
1667   elif isinstance(err, errors.TagError):
1668     obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1669   elif isinstance(err, errors.JobQueueDrainError):
1670     obuf.write("Failure: the job queue is marked for drain and doesn't"
1671                " accept new requests\n")
1672   elif isinstance(err, errors.JobQueueFull):
1673     obuf.write("Failure: the job queue is full and doesn't accept new"
1674                " job submissions until old jobs are archived\n")
1675   elif isinstance(err, errors.TypeEnforcementError):
1676     obuf.write("Parameter Error: %s" % msg)
1677   elif isinstance(err, errors.ParameterError):
1678     obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1679   elif isinstance(err, luxi.NoMasterError):
1680     obuf.write("Cannot communicate with the master daemon.\nIs it running"
1681                " and listening for connections?")
1682   elif isinstance(err, luxi.TimeoutError):
1683     obuf.write("Timeout while talking to the master daemon. Error:\n"
1684                "%s" % msg)
1685   elif isinstance(err, luxi.PermissionError):
1686     obuf.write("It seems you don't have permissions to connect to the"
1687                " master daemon.\nPlease retry as a different user.")
1688   elif isinstance(err, luxi.ProtocolError):
1689     obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1690                "%s" % msg)
1691   elif isinstance(err, errors.JobLost):
1692     obuf.write("Error checking job status: %s" % msg)
1693   elif isinstance(err, errors.GenericError):
1694     obuf.write("Unhandled Ganeti error: %s" % msg)
1695   elif isinstance(err, JobSubmittedException):
1696     obuf.write("JobID: %s\n" % err.args[0])
1697     retcode = 0
1698   else:
1699     obuf.write("Unhandled exception: %s" % msg)
1700   return retcode, obuf.getvalue().rstrip('\n')
1701
1702
1703 def GenericMain(commands, override=None, aliases=None):
1704   """Generic main function for all the gnt-* commands.
1705
1706   Arguments:
1707     - commands: a dictionary with a special structure, see the design doc
1708                 for command line handling.
1709     - override: if not None, we expect a dictionary with keys that will
1710                 override command line options; this can be used to pass
1711                 options from the scripts to generic functions
1712     - aliases: dictionary with command aliases {'alias': 'target, ...}
1713
1714   """
1715   # save the program name and the entire command line for later logging
1716   if sys.argv:
1717     binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1718     if len(sys.argv) >= 2:
1719       binary += " " + sys.argv[1]
1720       old_cmdline = " ".join(sys.argv[2:])
1721     else:
1722       old_cmdline = ""
1723   else:
1724     binary = "<unknown program>"
1725     old_cmdline = ""
1726
1727   if aliases is None:
1728     aliases = {}
1729
1730   try:
1731     func, options, args = _ParseArgs(sys.argv, commands, aliases)
1732   except errors.ParameterError, err:
1733     result, err_msg = FormatError(err)
1734     ToStderr(err_msg)
1735     return 1
1736
1737   if func is None: # parse error
1738     return 1
1739
1740   if override is not None:
1741     for key, val in override.iteritems():
1742       setattr(options, key, val)
1743
1744   utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1745                      stderr_logging=True, program=binary)
1746
1747   if old_cmdline:
1748     logging.info("run with arguments '%s'", old_cmdline)
1749   else:
1750     logging.info("run with no arguments")
1751
1752   try:
1753     result = func(options, args)
1754   except (errors.GenericError, luxi.ProtocolError,
1755           JobSubmittedException), err:
1756     result, err_msg = FormatError(err)
1757     logging.exception("Error during command processing")
1758     ToStderr(err_msg)
1759
1760   return result
1761
1762
1763 def GenericInstanceCreate(mode, opts, args):
1764   """Add an instance to the cluster via either creation or import.
1765
1766   @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1767   @param opts: the command line options selected by the user
1768   @type args: list
1769   @param args: should contain only one element, the new instance name
1770   @rtype: int
1771   @return: the desired exit code
1772
1773   """
1774   instance = args[0]
1775
1776   (pnode, snode) = SplitNodeOption(opts.node)
1777
1778   hypervisor = None
1779   hvparams = {}
1780   if opts.hypervisor:
1781     hypervisor, hvparams = opts.hypervisor
1782
1783   if opts.nics:
1784     try:
1785       nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
1786     except ValueError, err:
1787       raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
1788     nics = [{}] * nic_max
1789     for nidx, ndict in opts.nics:
1790       nidx = int(nidx)
1791       if not isinstance(ndict, dict):
1792         msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict)
1793         raise errors.OpPrereqError(msg)
1794       nics[nidx] = ndict
1795   elif opts.no_nics:
1796     # no nics
1797     nics = []
1798   elif mode == constants.INSTANCE_CREATE:
1799     # default of one nic, all auto
1800     nics = [{}]
1801   else:
1802     # mode == import
1803     nics = []
1804
1805   if opts.disk_template == constants.DT_DISKLESS:
1806     if opts.disks or opts.sd_size is not None:
1807       raise errors.OpPrereqError("Diskless instance but disk"
1808                                  " information passed")
1809     disks = []
1810   else:
1811     if (not opts.disks and not opts.sd_size
1812         and mode == constants.INSTANCE_CREATE):
1813       raise errors.OpPrereqError("No disk information specified")
1814     if opts.disks and opts.sd_size is not None:
1815       raise errors.OpPrereqError("Please use either the '--disk' or"
1816                                  " '-s' option")
1817     if opts.sd_size is not None:
1818       opts.disks = [(0, {"size": opts.sd_size})]
1819
1820     if opts.disks:
1821       try:
1822         disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1823       except ValueError, err:
1824         raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
1825       disks = [{}] * disk_max
1826     else:
1827       disks = []
1828     for didx, ddict in opts.disks:
1829       didx = int(didx)
1830       if not isinstance(ddict, dict):
1831         msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1832         raise errors.OpPrereqError(msg)
1833       elif "size" in ddict:
1834         if "adopt" in ddict:
1835           raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1836                                      " (disk %d)" % didx)
1837         try:
1838           ddict["size"] = utils.ParseUnit(ddict["size"])
1839         except ValueError, err:
1840           raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1841                                      (didx, err))
1842       elif "adopt" in ddict:
1843         if mode == constants.INSTANCE_IMPORT:
1844           raise errors.OpPrereqError("Disk adoption not allowed for instance"
1845                                      " import")
1846         ddict["size"] = 0
1847       else:
1848         raise errors.OpPrereqError("Missing size or adoption source for"
1849                                    " disk %d" % didx)
1850       disks[didx] = ddict
1851
1852   utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
1853   utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1854
1855   if mode == constants.INSTANCE_CREATE:
1856     start = opts.start
1857     os_type = opts.os
1858     force_variant = opts.force_variant
1859     src_node = None
1860     src_path = None
1861     no_install = opts.no_install
1862     identify_defaults = False
1863   elif mode == constants.INSTANCE_IMPORT:
1864     start = False
1865     os_type = None
1866     force_variant = False
1867     src_node = opts.src_node
1868     src_path = opts.src_dir
1869     no_install = None
1870     identify_defaults = opts.identify_defaults
1871   else:
1872     raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1873
1874   op = opcodes.OpCreateInstance(instance_name=instance,
1875                                 disks=disks,
1876                                 disk_template=opts.disk_template,
1877                                 nics=nics,
1878                                 pnode=pnode, snode=snode,
1879                                 ip_check=opts.ip_check,
1880                                 name_check=opts.name_check,
1881                                 wait_for_sync=opts.wait_for_sync,
1882                                 file_storage_dir=opts.file_storage_dir,
1883                                 file_driver=opts.file_driver,
1884                                 iallocator=opts.iallocator,
1885                                 hypervisor=hypervisor,
1886                                 hvparams=hvparams,
1887                                 beparams=opts.beparams,
1888                                 osparams=opts.osparams,
1889                                 mode=mode,
1890                                 start=start,
1891                                 os_type=os_type,
1892                                 force_variant=force_variant,
1893                                 src_node=src_node,
1894                                 src_path=src_path,
1895                                 no_install=no_install,
1896                                 identify_defaults=identify_defaults)
1897
1898   SubmitOrSend(op, opts)
1899   return 0
1900
1901
1902 class _RunWhileClusterStoppedHelper:
1903   """Helper class for L{RunWhileClusterStopped} to simplify state management
1904
1905   """
1906   def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
1907     """Initializes this class.
1908
1909     @type feedback_fn: callable
1910     @param feedback_fn: Feedback function
1911     @type cluster_name: string
1912     @param cluster_name: Cluster name
1913     @type master_node: string
1914     @param master_node Master node name
1915     @type online_nodes: list
1916     @param online_nodes: List of names of online nodes
1917
1918     """
1919     self.feedback_fn = feedback_fn
1920     self.cluster_name = cluster_name
1921     self.master_node = master_node
1922     self.online_nodes = online_nodes
1923
1924     self.ssh = ssh.SshRunner(self.cluster_name)
1925
1926     self.nonmaster_nodes = [name for name in online_nodes
1927                             if name != master_node]
1928
1929     assert self.master_node not in self.nonmaster_nodes
1930
1931   def _RunCmd(self, node_name, cmd):
1932     """Runs a command on the local or a remote machine.
1933
1934     @type node_name: string
1935     @param node_name: Machine name
1936     @type cmd: list
1937     @param cmd: Command
1938
1939     """
1940     if node_name is None or node_name == self.master_node:
1941       # No need to use SSH
1942       result = utils.RunCmd(cmd)
1943     else:
1944       result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
1945
1946     if result.failed:
1947       errmsg = ["Failed to run command %s" % result.cmd]
1948       if node_name:
1949         errmsg.append("on node %s" % node_name)
1950       errmsg.append(": exitcode %s and error %s" %
1951                     (result.exit_code, result.output))
1952       raise errors.OpExecError(" ".join(errmsg))
1953
1954   def Call(self, fn, *args):
1955     """Call function while all daemons are stopped.
1956
1957     @type fn: callable
1958     @param fn: Function to be called
1959
1960     """
1961     # Pause watcher by acquiring an exclusive lock on watcher state file
1962     self.feedback_fn("Blocking watcher")
1963     watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
1964     try:
1965       # TODO: Currently, this just blocks. There's no timeout.
1966       # TODO: Should it be a shared lock?
1967       watcher_block.Exclusive(blocking=True)
1968
1969       # Stop master daemons, so that no new jobs can come in and all running
1970       # ones are finished
1971       self.feedback_fn("Stopping master daemons")
1972       self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
1973       try:
1974         # Stop daemons on all nodes
1975         for node_name in self.online_nodes:
1976           self.feedback_fn("Stopping daemons on %s" % node_name)
1977           self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
1978
1979         # All daemons are shut down now
1980         try:
1981           return fn(self, *args)
1982         except Exception, err:
1983           _, errmsg = FormatError(err)
1984           logging.exception("Caught exception")
1985           self.feedback_fn(errmsg)
1986           raise
1987       finally:
1988         # Start cluster again, master node last
1989         for node_name in self.nonmaster_nodes + [self.master_node]:
1990           self.feedback_fn("Starting daemons on %s" % node_name)
1991           self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
1992     finally:
1993       # Resume watcher
1994       watcher_block.Close()
1995
1996
1997 def RunWhileClusterStopped(feedback_fn, fn, *args):
1998   """Calls a function while all cluster daemons are stopped.
1999
2000   @type feedback_fn: callable
2001   @param feedback_fn: Feedback function
2002   @type fn: callable
2003   @param fn: Function to be called when daemons are stopped
2004
2005   """
2006   feedback_fn("Gathering cluster information")
2007
2008   # This ensures we're running on the master daemon
2009   cl = GetClient()
2010
2011   (cluster_name, master_node) = \
2012     cl.QueryConfigValues(["cluster_name", "master_node"])
2013
2014   online_nodes = GetOnlineNodes([], cl=cl)
2015
2016   # Don't keep a reference to the client. The master daemon will go away.
2017   del cl
2018
2019   assert master_node in online_nodes
2020
2021   return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
2022                                        online_nodes).Call(fn, *args)
2023
2024
2025 def GenerateTable(headers, fields, separator, data,
2026                   numfields=None, unitfields=None,
2027                   units=None):
2028   """Prints a table with headers and different fields.
2029
2030   @type headers: dict
2031   @param headers: dictionary mapping field names to headers for
2032       the table
2033   @type fields: list
2034   @param fields: the field names corresponding to each row in
2035       the data field
2036   @param separator: the separator to be used; if this is None,
2037       the default 'smart' algorithm is used which computes optimal
2038       field width, otherwise just the separator is used between
2039       each field
2040   @type data: list
2041   @param data: a list of lists, each sublist being one row to be output
2042   @type numfields: list
2043   @param numfields: a list with the fields that hold numeric
2044       values and thus should be right-aligned
2045   @type unitfields: list
2046   @param unitfields: a list with the fields that hold numeric
2047       values that should be formatted with the units field
2048   @type units: string or None
2049   @param units: the units we should use for formatting, or None for
2050       automatic choice (human-readable for non-separator usage, otherwise
2051       megabytes); this is a one-letter string
2052
2053   """
2054   if units is None:
2055     if separator:
2056       units = "m"
2057     else:
2058       units = "h"
2059
2060   if numfields is None:
2061     numfields = []
2062   if unitfields is None:
2063     unitfields = []
2064
2065   numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
2066   unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
2067
2068   format_fields = []
2069   for field in fields:
2070     if headers and field not in headers:
2071       # TODO: handle better unknown fields (either revert to old
2072       # style of raising exception, or deal more intelligently with
2073       # variable fields)
2074       headers[field] = field
2075     if separator is not None:
2076       format_fields.append("%s")
2077     elif numfields.Matches(field):
2078       format_fields.append("%*s")
2079     else:
2080       format_fields.append("%-*s")
2081
2082   if separator is None:
2083     mlens = [0 for name in fields]
2084     format_str = ' '.join(format_fields)
2085   else:
2086     format_str = separator.replace("%", "%%").join(format_fields)
2087
2088   for row in data:
2089     if row is None:
2090       continue
2091     for idx, val in enumerate(row):
2092       if unitfields.Matches(fields[idx]):
2093         try:
2094           val = int(val)
2095         except (TypeError, ValueError):
2096           pass
2097         else:
2098           val = row[idx] = utils.FormatUnit(val, units)
2099       val = row[idx] = str(val)
2100       if separator is None:
2101         mlens[idx] = max(mlens[idx], len(val))
2102
2103   result = []
2104   if headers:
2105     args = []
2106     for idx, name in enumerate(fields):
2107       hdr = headers[name]
2108       if separator is None:
2109         mlens[idx] = max(mlens[idx], len(hdr))
2110         args.append(mlens[idx])
2111       args.append(hdr)
2112     result.append(format_str % tuple(args))
2113
2114   if separator is None:
2115     assert len(mlens) == len(fields)
2116
2117     if fields and not numfields.Matches(fields[-1]):
2118       mlens[-1] = 0
2119
2120   for line in data:
2121     args = []
2122     if line is None:
2123       line = ['-' for _ in fields]
2124     for idx in range(len(fields)):
2125       if separator is None:
2126         args.append(mlens[idx])
2127       args.append(line[idx])
2128     result.append(format_str % tuple(args))
2129
2130   return result
2131
2132
2133 def FormatTimestamp(ts):
2134   """Formats a given timestamp.
2135
2136   @type ts: timestamp
2137   @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
2138
2139   @rtype: string
2140   @return: a string with the formatted timestamp
2141
2142   """
2143   if not isinstance (ts, (tuple, list)) or len(ts) != 2:
2144     return '?'
2145   sec, usec = ts
2146   return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
2147
2148
2149 def ParseTimespec(value):
2150   """Parse a time specification.
2151
2152   The following suffixed will be recognized:
2153
2154     - s: seconds
2155     - m: minutes
2156     - h: hours
2157     - d: day
2158     - w: weeks
2159
2160   Without any suffix, the value will be taken to be in seconds.
2161
2162   """
2163   value = str(value)
2164   if not value:
2165     raise errors.OpPrereqError("Empty time specification passed")
2166   suffix_map = {
2167     's': 1,
2168     'm': 60,
2169     'h': 3600,
2170     'd': 86400,
2171     'w': 604800,
2172     }
2173   if value[-1] not in suffix_map:
2174     try:
2175       value = int(value)
2176     except (TypeError, ValueError):
2177       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2178   else:
2179     multiplier = suffix_map[value[-1]]
2180     value = value[:-1]
2181     if not value: # no data left after stripping the suffix
2182       raise errors.OpPrereqError("Invalid time specification (only"
2183                                  " suffix passed)")
2184     try:
2185       value = int(value) * multiplier
2186     except (TypeError, ValueError):
2187       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2188   return value
2189
2190
2191 def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
2192                    filter_master=False):
2193   """Returns the names of online nodes.
2194
2195   This function will also log a warning on stderr with the names of
2196   the online nodes.
2197
2198   @param nodes: if not empty, use only this subset of nodes (minus the
2199       offline ones)
2200   @param cl: if not None, luxi client to use
2201   @type nowarn: boolean
2202   @param nowarn: by default, this function will output a note with the
2203       offline nodes that are skipped; if this parameter is True the
2204       note is not displayed
2205   @type secondary_ips: boolean
2206   @param secondary_ips: if True, return the secondary IPs instead of the
2207       names, useful for doing network traffic over the replication interface
2208       (if any)
2209   @type filter_master: boolean
2210   @param filter_master: if True, do not return the master node in the list
2211       (useful in coordination with secondary_ips where we cannot check our
2212       node name against the list)
2213
2214   """
2215   if cl is None:
2216     cl = GetClient()
2217
2218   if secondary_ips:
2219     name_idx = 2
2220   else:
2221     name_idx = 0
2222
2223   if filter_master:
2224     master_node = cl.QueryConfigValues(["master_node"])[0]
2225     filter_fn = lambda x: x != master_node
2226   else:
2227     filter_fn = lambda _: True
2228
2229   result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"],
2230                          use_locking=False)
2231   offline = [row[0] for row in result if row[1]]
2232   if offline and not nowarn:
2233     ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
2234   return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])]
2235
2236
2237 def _ToStream(stream, txt, *args):
2238   """Write a message to a stream, bypassing the logging system
2239
2240   @type stream: file object
2241   @param stream: the file to which we should write
2242   @type txt: str
2243   @param txt: the message
2244
2245   """
2246   if args:
2247     args = tuple(args)
2248     stream.write(txt % args)
2249   else:
2250     stream.write(txt)
2251   stream.write('\n')
2252   stream.flush()
2253
2254
2255 def ToStdout(txt, *args):
2256   """Write a message to stdout only, bypassing the logging system
2257
2258   This is just a wrapper over _ToStream.
2259
2260   @type txt: str
2261   @param txt: the message
2262
2263   """
2264   _ToStream(sys.stdout, txt, *args)
2265
2266
2267 def ToStderr(txt, *args):
2268   """Write a message to stderr only, bypassing the logging system
2269
2270   This is just a wrapper over _ToStream.
2271
2272   @type txt: str
2273   @param txt: the message
2274
2275   """
2276   _ToStream(sys.stderr, txt, *args)
2277
2278
2279 class JobExecutor(object):
2280   """Class which manages the submission and execution of multiple jobs.
2281
2282   Note that instances of this class should not be reused between
2283   GetResults() calls.
2284
2285   """
2286   def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
2287     self.queue = []
2288     if cl is None:
2289       cl = GetClient()
2290     self.cl = cl
2291     self.verbose = verbose
2292     self.jobs = []
2293     self.opts = opts
2294     self.feedback_fn = feedback_fn
2295
2296   def QueueJob(self, name, *ops):
2297     """Record a job for later submit.
2298
2299     @type name: string
2300     @param name: a description of the job, will be used in WaitJobSet
2301     """
2302     SetGenericOpcodeOpts(ops, self.opts)
2303     self.queue.append((name, ops))
2304
2305   def SubmitPending(self, each=False):
2306     """Submit all pending jobs.
2307
2308     """
2309     if each:
2310       results = []
2311       for row in self.queue:
2312         # SubmitJob will remove the success status, but raise an exception if
2313         # the submission fails, so we'll notice that anyway.
2314         results.append([True, self.cl.SubmitJob(row[1])])
2315     else:
2316       results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
2317     for (idx, ((status, data), (name, _))) in enumerate(zip(results,
2318                                                             self.queue)):
2319       self.jobs.append((idx, status, data, name))
2320
2321   def _ChooseJob(self):
2322     """Choose a non-waiting/queued job to poll next.
2323
2324     """
2325     assert self.jobs, "_ChooseJob called with empty job list"
2326
2327     result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
2328     assert result
2329
2330     for job_data, status in zip(self.jobs, result):
2331       if (isinstance(status, list) and status and
2332           status[0] in (constants.JOB_STATUS_QUEUED,
2333                         constants.JOB_STATUS_WAITLOCK,
2334                         constants.JOB_STATUS_CANCELING)):
2335         # job is still present and waiting
2336         continue
2337       # good candidate found (either running job or lost job)
2338       self.jobs.remove(job_data)
2339       return job_data
2340
2341     # no job found
2342     return self.jobs.pop(0)
2343
2344   def GetResults(self):
2345     """Wait for and return the results of all jobs.
2346
2347     @rtype: list
2348     @return: list of tuples (success, job results), in the same order
2349         as the submitted jobs; if a job has failed, instead of the result
2350         there will be the error message
2351
2352     """
2353     if not self.jobs:
2354       self.SubmitPending()
2355     results = []
2356     if self.verbose:
2357       ok_jobs = [row[2] for row in self.jobs if row[1]]
2358       if ok_jobs:
2359         ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2360
2361     # first, remove any non-submitted jobs
2362     self.jobs, failures = compat.partition(self.jobs, lambda x: x[1])
2363     for idx, _, jid, name in failures:
2364       ToStderr("Failed to submit job for %s: %s", name, jid)
2365       results.append((idx, False, jid))
2366
2367     while self.jobs:
2368       (idx, _, jid, name) = self._ChooseJob()
2369       ToStdout("Waiting for job %s for %s...", jid, name)
2370       try:
2371         job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2372         success = True
2373       except errors.JobLost, err:
2374         _, job_result = FormatError(err)
2375         ToStderr("Job %s for %s has been archived, cannot check its result",
2376                  jid, name)
2377         success = False
2378       except (errors.GenericError, luxi.ProtocolError), err:
2379         _, job_result = FormatError(err)
2380         success = False
2381         # the error message will always be shown, verbose or not
2382         ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
2383
2384       results.append((idx, success, job_result))
2385
2386     # sort based on the index, then drop it
2387     results.sort()
2388     results = [i[1:] for i in results]
2389
2390     return results
2391
2392   def WaitOrShow(self, wait):
2393     """Wait for job results or only print the job IDs.
2394
2395     @type wait: boolean
2396     @param wait: whether to wait or not
2397
2398     """
2399     if wait:
2400       return self.GetResults()
2401     else:
2402       if not self.jobs:
2403         self.SubmitPending()
2404       for _, status, result, name in self.jobs:
2405         if status:
2406           ToStdout("%s: %s", result, name)
2407         else:
2408           ToStderr("Failure for %s: %s", name, result)
2409       return [row[1:3] for row in self.jobs]