cli.SubmitOpCode: Support custom job reporter
[ganeti-local] / lib / cli.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module dealing with command line parsing"""
23
24
25 import sys
26 import textwrap
27 import os.path
28 import time
29 import logging
30 from cStringIO import StringIO
31
32 from ganeti import utils
33 from ganeti import errors
34 from ganeti import constants
35 from ganeti import opcodes
36 from ganeti import luxi
37 from ganeti import ssconf
38 from ganeti import rpc
39 from ganeti import ssh
40 from ganeti import compat
41 from ganeti import netutils
42
43 from optparse import (OptionParser, TitledHelpFormatter,
44                       Option, OptionValueError)
45
46
47 __all__ = [
48   # Command line options
49   "ADD_UIDS_OPT",
50   "ALLOCATABLE_OPT",
51   "ALL_OPT",
52   "AUTO_PROMOTE_OPT",
53   "AUTO_REPLACE_OPT",
54   "BACKEND_OPT",
55   "CLEANUP_OPT",
56   "CLUSTER_DOMAIN_SECRET_OPT",
57   "CONFIRM_OPT",
58   "CP_SIZE_OPT",
59   "DEBUG_OPT",
60   "DEBUG_SIMERR_OPT",
61   "DISKIDX_OPT",
62   "DISK_OPT",
63   "DISK_TEMPLATE_OPT",
64   "DRAINED_OPT",
65   "DRBD_HELPER_OPT",
66   "EARLY_RELEASE_OPT",
67   "ENABLED_HV_OPT",
68   "ERROR_CODES_OPT",
69   "FIELDS_OPT",
70   "FILESTORE_DIR_OPT",
71   "FILESTORE_DRIVER_OPT",
72   "FORCE_OPT",
73   "FORCE_VARIANT_OPT",
74   "GLOBAL_FILEDIR_OPT",
75   "HVLIST_OPT",
76   "HVOPTS_OPT",
77   "HYPERVISOR_OPT",
78   "IALLOCATOR_OPT",
79   "DEFAULT_IALLOCATOR_OPT",
80   "IDENTIFY_DEFAULTS_OPT",
81   "IGNORE_CONSIST_OPT",
82   "IGNORE_FAILURES_OPT",
83   "IGNORE_REMOVE_FAILURES_OPT",
84   "IGNORE_SECONDARIES_OPT",
85   "IGNORE_SIZE_OPT",
86   "MAC_PREFIX_OPT",
87   "MAINTAIN_NODE_HEALTH_OPT",
88   "MASTER_NETDEV_OPT",
89   "MC_OPT",
90   "NET_OPT",
91   "NEW_CLUSTER_CERT_OPT",
92   "NEW_CLUSTER_DOMAIN_SECRET_OPT",
93   "NEW_CONFD_HMAC_KEY_OPT",
94   "NEW_RAPI_CERT_OPT",
95   "NEW_SECONDARY_OPT",
96   "NIC_PARAMS_OPT",
97   "NODE_LIST_OPT",
98   "NODE_PLACEMENT_OPT",
99   "NODRBD_STORAGE_OPT",
100   "NOHDR_OPT",
101   "NOIPCHECK_OPT",
102   "NO_INSTALL_OPT",
103   "NONAMECHECK_OPT",
104   "NOLVM_STORAGE_OPT",
105   "NOMODIFY_ETCHOSTS_OPT",
106   "NOMODIFY_SSH_SETUP_OPT",
107   "NONICS_OPT",
108   "NONLIVE_OPT",
109   "NONPLUS1_OPT",
110   "NOSHUTDOWN_OPT",
111   "NOSTART_OPT",
112   "NOSSH_KEYCHECK_OPT",
113   "NOVOTING_OPT",
114   "NWSYNC_OPT",
115   "ON_PRIMARY_OPT",
116   "ON_SECONDARY_OPT",
117   "OFFLINE_OPT",
118   "OSPARAMS_OPT",
119   "OS_OPT",
120   "OS_SIZE_OPT",
121   "RAPI_CERT_OPT",
122   "READD_OPT",
123   "REBOOT_TYPE_OPT",
124   "REMOVE_INSTANCE_OPT",
125   "REMOVE_UIDS_OPT",
126   "ROMAN_OPT",
127   "SECONDARY_IP_OPT",
128   "SELECT_OS_OPT",
129   "SEP_OPT",
130   "SHOWCMD_OPT",
131   "SHUTDOWN_TIMEOUT_OPT",
132   "SINGLE_NODE_OPT",
133   "SRC_DIR_OPT",
134   "SRC_NODE_OPT",
135   "SUBMIT_OPT",
136   "STATIC_OPT",
137   "SYNC_OPT",
138   "TAG_SRC_OPT",
139   "TIMEOUT_OPT",
140   "UIDPOOL_OPT",
141   "USEUNITS_OPT",
142   "USE_REPL_NET_OPT",
143   "VERBOSE_OPT",
144   "VG_NAME_OPT",
145   "YES_DOIT_OPT",
146   # Generic functions for CLI programs
147   "GenericMain",
148   "GenericInstanceCreate",
149   "GetClient",
150   "GetOnlineNodes",
151   "JobExecutor",
152   "JobSubmittedException",
153   "ParseTimespec",
154   "RunWhileClusterStopped",
155   "SubmitOpCode",
156   "SubmitOrSend",
157   "UsesRPC",
158   # Formatting functions
159   "ToStderr", "ToStdout",
160   "FormatError",
161   "GenerateTable",
162   "AskUser",
163   "FormatTimestamp",
164   "FormatLogMessage",
165   # Tags functions
166   "ListTags",
167   "AddTags",
168   "RemoveTags",
169   # command line options support infrastructure
170   "ARGS_MANY_INSTANCES",
171   "ARGS_MANY_NODES",
172   "ARGS_NONE",
173   "ARGS_ONE_INSTANCE",
174   "ARGS_ONE_NODE",
175   "ARGS_ONE_OS",
176   "ArgChoice",
177   "ArgCommand",
178   "ArgFile",
179   "ArgHost",
180   "ArgInstance",
181   "ArgJobId",
182   "ArgNode",
183   "ArgOs",
184   "ArgSuggest",
185   "ArgUnknown",
186   "OPT_COMPL_INST_ADD_NODES",
187   "OPT_COMPL_MANY_NODES",
188   "OPT_COMPL_ONE_IALLOCATOR",
189   "OPT_COMPL_ONE_INSTANCE",
190   "OPT_COMPL_ONE_NODE",
191   "OPT_COMPL_ONE_OS",
192   "cli_option",
193   "SplitNodeOption",
194   "CalculateOSNames",
195   ]
196
197 NO_PREFIX = "no_"
198 UN_PREFIX = "-"
199
200
201 class _Argument:
202   def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
203     self.min = min
204     self.max = max
205
206   def __repr__(self):
207     return ("<%s min=%s max=%s>" %
208             (self.__class__.__name__, self.min, self.max))
209
210
211 class ArgSuggest(_Argument):
212   """Suggesting argument.
213
214   Value can be any of the ones passed to the constructor.
215
216   """
217   # pylint: disable-msg=W0622
218   def __init__(self, min=0, max=None, choices=None):
219     _Argument.__init__(self, min=min, max=max)
220     self.choices = choices
221
222   def __repr__(self):
223     return ("<%s min=%s max=%s choices=%r>" %
224             (self.__class__.__name__, self.min, self.max, self.choices))
225
226
227 class ArgChoice(ArgSuggest):
228   """Choice argument.
229
230   Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
231   but value must be one of the choices.
232
233   """
234
235
236 class ArgUnknown(_Argument):
237   """Unknown argument to program (e.g. determined at runtime).
238
239   """
240
241
242 class ArgInstance(_Argument):
243   """Instances argument.
244
245   """
246
247
248 class ArgNode(_Argument):
249   """Node argument.
250
251   """
252
253 class ArgJobId(_Argument):
254   """Job ID argument.
255
256   """
257
258
259 class ArgFile(_Argument):
260   """File path argument.
261
262   """
263
264
265 class ArgCommand(_Argument):
266   """Command argument.
267
268   """
269
270
271 class ArgHost(_Argument):
272   """Host argument.
273
274   """
275
276
277 class ArgOs(_Argument):
278   """OS argument.
279
280   """
281
282
283 ARGS_NONE = []
284 ARGS_MANY_INSTANCES = [ArgInstance()]
285 ARGS_MANY_NODES = [ArgNode()]
286 ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
287 ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
288 ARGS_ONE_OS = [ArgOs(min=1, max=1)]
289
290
291 def _ExtractTagsObject(opts, args):
292   """Extract the tag type object.
293
294   Note that this function will modify its args parameter.
295
296   """
297   if not hasattr(opts, "tag_type"):
298     raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
299   kind = opts.tag_type
300   if kind == constants.TAG_CLUSTER:
301     retval = kind, kind
302   elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
303     if not args:
304       raise errors.OpPrereqError("no arguments passed to the command")
305     name = args.pop(0)
306     retval = kind, name
307   else:
308     raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
309   return retval
310
311
312 def _ExtendTags(opts, args):
313   """Extend the args if a source file has been given.
314
315   This function will extend the tags with the contents of the file
316   passed in the 'tags_source' attribute of the opts parameter. A file
317   named '-' will be replaced by stdin.
318
319   """
320   fname = opts.tags_source
321   if fname is None:
322     return
323   if fname == "-":
324     new_fh = sys.stdin
325   else:
326     new_fh = open(fname, "r")
327   new_data = []
328   try:
329     # we don't use the nice 'new_data = [line.strip() for line in fh]'
330     # because of python bug 1633941
331     while True:
332       line = new_fh.readline()
333       if not line:
334         break
335       new_data.append(line.strip())
336   finally:
337     new_fh.close()
338   args.extend(new_data)
339
340
341 def ListTags(opts, args):
342   """List the tags on a given object.
343
344   This is a generic implementation that knows how to deal with all
345   three cases of tag objects (cluster, node, instance). The opts
346   argument is expected to contain a tag_type field denoting what
347   object type we work on.
348
349   """
350   kind, name = _ExtractTagsObject(opts, args)
351   cl = GetClient()
352   result = cl.QueryTags(kind, name)
353   result = list(result)
354   result.sort()
355   for tag in result:
356     ToStdout(tag)
357
358
359 def AddTags(opts, args):
360   """Add tags on a given object.
361
362   This is a generic implementation that knows how to deal with all
363   three cases of tag objects (cluster, node, instance). The opts
364   argument is expected to contain a tag_type field denoting what
365   object type we work on.
366
367   """
368   kind, name = _ExtractTagsObject(opts, args)
369   _ExtendTags(opts, args)
370   if not args:
371     raise errors.OpPrereqError("No tags to be added")
372   op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
373   SubmitOpCode(op)
374
375
376 def RemoveTags(opts, args):
377   """Remove tags from a given object.
378
379   This is a generic implementation that knows how to deal with all
380   three cases of tag objects (cluster, node, instance). The opts
381   argument is expected to contain a tag_type field denoting what
382   object type we work on.
383
384   """
385   kind, name = _ExtractTagsObject(opts, args)
386   _ExtendTags(opts, args)
387   if not args:
388     raise errors.OpPrereqError("No tags to be removed")
389   op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
390   SubmitOpCode(op)
391
392
393 def check_unit(option, opt, value): # pylint: disable-msg=W0613
394   """OptParsers custom converter for units.
395
396   """
397   try:
398     return utils.ParseUnit(value)
399   except errors.UnitParseError, err:
400     raise OptionValueError("option %s: %s" % (opt, err))
401
402
403 def _SplitKeyVal(opt, data):
404   """Convert a KeyVal string into a dict.
405
406   This function will convert a key=val[,...] string into a dict. Empty
407   values will be converted specially: keys which have the prefix 'no_'
408   will have the value=False and the prefix stripped, the others will
409   have value=True.
410
411   @type opt: string
412   @param opt: a string holding the option name for which we process the
413       data, used in building error messages
414   @type data: string
415   @param data: a string of the format key=val,key=val,...
416   @rtype: dict
417   @return: {key=val, key=val}
418   @raises errors.ParameterError: if there are duplicate keys
419
420   """
421   kv_dict = {}
422   if data:
423     for elem in utils.UnescapeAndSplit(data, sep=","):
424       if "=" in elem:
425         key, val = elem.split("=", 1)
426       else:
427         if elem.startswith(NO_PREFIX):
428           key, val = elem[len(NO_PREFIX):], False
429         elif elem.startswith(UN_PREFIX):
430           key, val = elem[len(UN_PREFIX):], None
431         else:
432           key, val = elem, True
433       if key in kv_dict:
434         raise errors.ParameterError("Duplicate key '%s' in option %s" %
435                                     (key, opt))
436       kv_dict[key] = val
437   return kv_dict
438
439
440 def check_ident_key_val(option, opt, value):  # pylint: disable-msg=W0613
441   """Custom parser for ident:key=val,key=val options.
442
443   This will store the parsed values as a tuple (ident, {key: val}). As such,
444   multiple uses of this option via action=append is possible.
445
446   """
447   if ":" not in value:
448     ident, rest = value, ''
449   else:
450     ident, rest = value.split(":", 1)
451
452   if ident.startswith(NO_PREFIX):
453     if rest:
454       msg = "Cannot pass options when removing parameter groups: %s" % value
455       raise errors.ParameterError(msg)
456     retval = (ident[len(NO_PREFIX):], False)
457   elif ident.startswith(UN_PREFIX):
458     if rest:
459       msg = "Cannot pass options when removing parameter groups: %s" % value
460       raise errors.ParameterError(msg)
461     retval = (ident[len(UN_PREFIX):], None)
462   else:
463     kv_dict = _SplitKeyVal(opt, rest)
464     retval = (ident, kv_dict)
465   return retval
466
467
468 def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
469   """Custom parser class for key=val,key=val options.
470
471   This will store the parsed values as a dict {key: val}.
472
473   """
474   return _SplitKeyVal(opt, value)
475
476
477 def check_bool(option, opt, value): # pylint: disable-msg=W0613
478   """Custom parser for yes/no options.
479
480   This will store the parsed value as either True or False.
481
482   """
483   value = value.lower()
484   if value == constants.VALUE_FALSE or value == "no":
485     return False
486   elif value == constants.VALUE_TRUE or value == "yes":
487     return True
488   else:
489     raise errors.ParameterError("Invalid boolean value '%s'" % value)
490
491
492 # completion_suggestion is normally a list. Using numeric values not evaluating
493 # to False for dynamic completion.
494 (OPT_COMPL_MANY_NODES,
495  OPT_COMPL_ONE_NODE,
496  OPT_COMPL_ONE_INSTANCE,
497  OPT_COMPL_ONE_OS,
498  OPT_COMPL_ONE_IALLOCATOR,
499  OPT_COMPL_INST_ADD_NODES) = range(100, 106)
500
501 OPT_COMPL_ALL = frozenset([
502   OPT_COMPL_MANY_NODES,
503   OPT_COMPL_ONE_NODE,
504   OPT_COMPL_ONE_INSTANCE,
505   OPT_COMPL_ONE_OS,
506   OPT_COMPL_ONE_IALLOCATOR,
507   OPT_COMPL_INST_ADD_NODES,
508   ])
509
510
511 class CliOption(Option):
512   """Custom option class for optparse.
513
514   """
515   ATTRS = Option.ATTRS + [
516     "completion_suggest",
517     ]
518   TYPES = Option.TYPES + (
519     "identkeyval",
520     "keyval",
521     "unit",
522     "bool",
523     )
524   TYPE_CHECKER = Option.TYPE_CHECKER.copy()
525   TYPE_CHECKER["identkeyval"] = check_ident_key_val
526   TYPE_CHECKER["keyval"] = check_key_val
527   TYPE_CHECKER["unit"] = check_unit
528   TYPE_CHECKER["bool"] = check_bool
529
530
531 # optparse.py sets make_option, so we do it for our own option class, too
532 cli_option = CliOption
533
534
535 _YORNO = "yes|no"
536
537 DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
538                        help="Increase debugging level")
539
540 NOHDR_OPT = cli_option("--no-headers", default=False,
541                        action="store_true", dest="no_headers",
542                        help="Don't display column headers")
543
544 SEP_OPT = cli_option("--separator", default=None,
545                      action="store", dest="separator",
546                      help=("Separator between output fields"
547                            " (defaults to one space)"))
548
549 USEUNITS_OPT = cli_option("--units", default=None,
550                           dest="units", choices=('h', 'm', 'g', 't'),
551                           help="Specify units for output (one of hmgt)")
552
553 FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
554                         type="string", metavar="FIELDS",
555                         help="Comma separated list of output fields")
556
557 FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
558                        default=False, help="Force the operation")
559
560 CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
561                          default=False, help="Do not require confirmation")
562
563 TAG_SRC_OPT = cli_option("--from", dest="tags_source",
564                          default=None, help="File with tag names")
565
566 SUBMIT_OPT = cli_option("--submit", dest="submit_only",
567                         default=False, action="store_true",
568                         help=("Submit the job and return the job ID, but"
569                               " don't wait for the job to finish"))
570
571 SYNC_OPT = cli_option("--sync", dest="do_locking",
572                       default=False, action="store_true",
573                       help=("Grab locks while doing the queries"
574                             " in order to ensure more consistent results"))
575
576 _DRY_RUN_OPT = cli_option("--dry-run", default=False,
577                           action="store_true",
578                           help=("Do not execute the operation, just run the"
579                                 " check steps and verify it it could be"
580                                 " executed"))
581
582 VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
583                          action="store_true",
584                          help="Increase the verbosity of the operation")
585
586 DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
587                               action="store_true", dest="simulate_errors",
588                               help="Debugging option that makes the operation"
589                               " treat most runtime checks as failed")
590
591 NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
592                         default=True, action="store_false",
593                         help="Don't wait for sync (DANGEROUS!)")
594
595 DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
596                                help="Custom disk setup (diskless, file,"
597                                " plain or drbd)",
598                                default=None, metavar="TEMPL",
599                                choices=list(constants.DISK_TEMPLATES))
600
601 NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
602                         help="Do not create any network cards for"
603                         " the instance")
604
605 FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
606                                help="Relative path under default cluster-wide"
607                                " file storage dir to store file-based disks",
608                                default=None, metavar="<DIR>")
609
610 FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
611                                   help="Driver to use for image files",
612                                   default="loop", metavar="<DRIVER>",
613                                   choices=list(constants.FILE_DRIVER))
614
615 IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
616                             help="Select nodes for the instance automatically"
617                             " using the <NAME> iallocator plugin",
618                             default=None, type="string",
619                             completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
620
621 DEFAULT_IALLOCATOR_OPT = cli_option("-I", "--default-iallocator",
622                             metavar="<NAME>",
623                             help="Set the default instance allocator plugin",
624                             default=None, type="string",
625                             completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
626
627 OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
628                     metavar="<os>",
629                     completion_suggest=OPT_COMPL_ONE_OS)
630
631 OSPARAMS_OPT = cli_option("-O", "--os-parameters", dest="osparams",
632                          type="keyval", default={},
633                          help="OS parameters")
634
635 FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
636                                action="store_true", default=False,
637                                help="Force an unknown variant")
638
639 NO_INSTALL_OPT = cli_option("--no-install", dest="no_install",
640                             action="store_true", default=False,
641                             help="Do not install the OS (will"
642                             " enable no-start)")
643
644 BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
645                          type="keyval", default={},
646                          help="Backend parameters")
647
648 HVOPTS_OPT =  cli_option("-H", "--hypervisor-parameters", type="keyval",
649                          default={}, dest="hvparams",
650                          help="Hypervisor parameters")
651
652 HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
653                             help="Hypervisor and hypervisor options, in the"
654                             " format hypervisor:option=value,option=value,...",
655                             default=None, type="identkeyval")
656
657 HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
658                         help="Hypervisor and hypervisor options, in the"
659                         " format hypervisor:option=value,option=value,...",
660                         default=[], action="append", type="identkeyval")
661
662 NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
663                            action="store_false",
664                            help="Don't check that the instance's IP"
665                            " is alive")
666
667 NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
668                              default=True, action="store_false",
669                              help="Don't check that the instance's name"
670                              " is resolvable")
671
672 NET_OPT = cli_option("--net",
673                      help="NIC parameters", default=[],
674                      dest="nics", action="append", type="identkeyval")
675
676 DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
677                       dest="disks", action="append", type="identkeyval")
678
679 DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
680                          help="Comma-separated list of disks"
681                          " indices to act on (e.g. 0,2) (optional,"
682                          " defaults to all disks)")
683
684 OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
685                          help="Enforces a single-disk configuration using the"
686                          " given disk size, in MiB unless a suffix is used",
687                          default=None, type="unit", metavar="<size>")
688
689 IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
690                                 dest="ignore_consistency",
691                                 action="store_true", default=False,
692                                 help="Ignore the consistency of the disks on"
693                                 " the secondary")
694
695 NONLIVE_OPT = cli_option("--non-live", dest="live",
696                          default=True, action="store_false",
697                          help="Do a non-live migration (this usually means"
698                          " freeze the instance, save the state, transfer and"
699                          " only then resume running on the secondary node)")
700
701 NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
702                                 help="Target node and optional secondary node",
703                                 metavar="<pnode>[:<snode>]",
704                                 completion_suggest=OPT_COMPL_INST_ADD_NODES)
705
706 NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
707                            action="append", metavar="<node>",
708                            help="Use only this node (can be used multiple"
709                            " times, if not given defaults to all nodes)",
710                            completion_suggest=OPT_COMPL_ONE_NODE)
711
712 SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
713                              metavar="<node>",
714                              completion_suggest=OPT_COMPL_ONE_NODE)
715
716 NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
717                          action="store_false",
718                          help="Don't start the instance after creation")
719
720 SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
721                          action="store_true", default=False,
722                          help="Show command instead of executing it")
723
724 CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
725                          default=False, action="store_true",
726                          help="Instead of performing the migration, try to"
727                          " recover from a failed cleanup. This is safe"
728                          " to run even if the instance is healthy, but it"
729                          " will create extra replication traffic and "
730                          " disrupt briefly the replication (like during the"
731                          " migration")
732
733 STATIC_OPT = cli_option("-s", "--static", dest="static",
734                         action="store_true", default=False,
735                         help="Only show configuration data, not runtime data")
736
737 ALL_OPT = cli_option("--all", dest="show_all",
738                      default=False, action="store_true",
739                      help="Show info on all instances on the cluster."
740                      " This can take a long time to run, use wisely")
741
742 SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
743                            action="store_true", default=False,
744                            help="Interactive OS reinstall, lists available"
745                            " OS templates for selection")
746
747 IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
748                                  action="store_true", default=False,
749                                  help="Remove the instance from the cluster"
750                                  " configuration even if there are failures"
751                                  " during the removal process")
752
753 IGNORE_REMOVE_FAILURES_OPT = cli_option("--ignore-remove-failures",
754                                         dest="ignore_remove_failures",
755                                         action="store_true", default=False,
756                                         help="Remove the instance from the"
757                                         " cluster configuration even if there"
758                                         " are failures during the removal"
759                                         " process")
760
761 REMOVE_INSTANCE_OPT = cli_option("--remove-instance", dest="remove_instance",
762                                  action="store_true", default=False,
763                                  help="Remove the instance from the cluster")
764
765 NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
766                                help="Specifies the new secondary node",
767                                metavar="NODE", default=None,
768                                completion_suggest=OPT_COMPL_ONE_NODE)
769
770 ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
771                             default=False, action="store_true",
772                             help="Replace the disk(s) on the primary"
773                             " node (only for the drbd template)")
774
775 ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
776                               default=False, action="store_true",
777                               help="Replace the disk(s) on the secondary"
778                               " node (only for the drbd template)")
779
780 AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
781                               default=False, action="store_true",
782                               help="Lock all nodes and auto-promote as needed"
783                               " to MC status")
784
785 AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
786                               default=False, action="store_true",
787                               help="Automatically replace faulty disks"
788                               " (only for the drbd template)")
789
790 IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
791                              default=False, action="store_true",
792                              help="Ignore current recorded size"
793                              " (useful for forcing activation when"
794                              " the recorded size is wrong)")
795
796 SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
797                           metavar="<node>",
798                           completion_suggest=OPT_COMPL_ONE_NODE)
799
800 SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
801                          metavar="<dir>")
802
803 SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
804                               help="Specify the secondary ip for the node",
805                               metavar="ADDRESS", default=None)
806
807 READD_OPT = cli_option("--readd", dest="readd",
808                        default=False, action="store_true",
809                        help="Readd old node after replacing it")
810
811 NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
812                                 default=True, action="store_false",
813                                 help="Disable SSH key fingerprint checking")
814
815
816 MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
817                     type="bool", default=None, metavar=_YORNO,
818                     help="Set the master_candidate flag on the node")
819
820 OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
821                          type="bool", default=None,
822                          help="Set the offline flag on the node")
823
824 DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
825                          type="bool", default=None,
826                          help="Set the drained flag on the node")
827
828 ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
829                              type="bool", default=None, metavar=_YORNO,
830                              help="Set the allocatable flag on a volume")
831
832 NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
833                                help="Disable support for lvm based instances"
834                                " (cluster-wide)",
835                                action="store_false", default=True)
836
837 ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
838                             dest="enabled_hypervisors",
839                             help="Comma-separated list of hypervisors",
840                             type="string", default=None)
841
842 NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
843                             type="keyval", default={},
844                             help="NIC parameters")
845
846 CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
847                          dest="candidate_pool_size", type="int",
848                          help="Set the candidate pool size")
849
850 VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name",
851                          help="Enables LVM and specifies the volume group"
852                          " name (cluster-wide) for disk allocation [xenvg]",
853                          metavar="VG", default=None)
854
855 YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
856                           help="Destroy cluster", action="store_true")
857
858 NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
859                           help="Skip node agreement check (dangerous)",
860                           action="store_true", default=False)
861
862 MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
863                             help="Specify the mac prefix for the instance IP"
864                             " addresses, in the format XX:XX:XX",
865                             metavar="PREFIX",
866                             default=None)
867
868 MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
869                                help="Specify the node interface (cluster-wide)"
870                                " on which the master IP address will be added "
871                                " [%s]" % constants.DEFAULT_BRIDGE,
872                                metavar="NETDEV",
873                                default=constants.DEFAULT_BRIDGE)
874
875 GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
876                                 help="Specify the default directory (cluster-"
877                                 "wide) for storing the file-based disks [%s]" %
878                                 constants.DEFAULT_FILE_STORAGE_DIR,
879                                 metavar="DIR",
880                                 default=constants.DEFAULT_FILE_STORAGE_DIR)
881
882 NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
883                                    help="Don't modify /etc/hosts",
884                                    action="store_false", default=True)
885
886 NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
887                                     help="Don't initialize SSH keys",
888                                     action="store_false", default=True)
889
890 ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
891                              help="Enable parseable error messages",
892                              action="store_true", default=False)
893
894 NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
895                           help="Skip N+1 memory redundancy tests",
896                           action="store_true", default=False)
897
898 REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
899                              help="Type of reboot: soft/hard/full",
900                              default=constants.INSTANCE_REBOOT_HARD,
901                              metavar="<REBOOT>",
902                              choices=list(constants.REBOOT_TYPES))
903
904 IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
905                                     dest="ignore_secondaries",
906                                     default=False, action="store_true",
907                                     help="Ignore errors from secondaries")
908
909 NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
910                             action="store_false", default=True,
911                             help="Don't shutdown the instance (unsafe)")
912
913 TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
914                          default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
915                          help="Maximum time to wait")
916
917 SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
918                          dest="shutdown_timeout", type="int",
919                          default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
920                          help="Maximum time to wait for instance shutdown")
921
922 EARLY_RELEASE_OPT = cli_option("--early-release",
923                                dest="early_release", default=False,
924                                action="store_true",
925                                help="Release the locks on the secondary"
926                                " node(s) early")
927
928 NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
929                                   dest="new_cluster_cert",
930                                   default=False, action="store_true",
931                                   help="Generate a new cluster certificate")
932
933 RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
934                            default=None,
935                            help="File containing new RAPI certificate")
936
937 NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
938                                default=None, action="store_true",
939                                help=("Generate a new self-signed RAPI"
940                                      " certificate"))
941
942 NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
943                                     dest="new_confd_hmac_key",
944                                     default=False, action="store_true",
945                                     help=("Create a new HMAC key for %s" %
946                                           constants.CONFD))
947
948 CLUSTER_DOMAIN_SECRET_OPT = cli_option("--cluster-domain-secret",
949                                        dest="cluster_domain_secret",
950                                        default=None,
951                                        help=("Load new new cluster domain"
952                                              " secret from file"))
953
954 NEW_CLUSTER_DOMAIN_SECRET_OPT = cli_option("--new-cluster-domain-secret",
955                                            dest="new_cluster_domain_secret",
956                                            default=False, action="store_true",
957                                            help=("Create a new cluster domain"
958                                                  " secret"))
959
960 USE_REPL_NET_OPT = cli_option("--use-replication-network",
961                               dest="use_replication_network",
962                               help="Whether to use the replication network"
963                               " for talking to the nodes",
964                               action="store_true", default=False)
965
966 MAINTAIN_NODE_HEALTH_OPT = \
967     cli_option("--maintain-node-health", dest="maintain_node_health",
968                metavar=_YORNO, default=None, type="bool",
969                help="Configure the cluster to automatically maintain node"
970                " health, by shutting down unknown instances, shutting down"
971                " unknown DRBD devices, etc.")
972
973 IDENTIFY_DEFAULTS_OPT = \
974     cli_option("--identify-defaults", dest="identify_defaults",
975                default=False, action="store_true",
976                help="Identify which saved instance parameters are equal to"
977                " the current cluster defaults and set them as such, instead"
978                " of marking them as overridden")
979
980 UIDPOOL_OPT = cli_option("--uid-pool", default=None,
981                          action="store", dest="uid_pool",
982                          help=("A list of user-ids or user-id"
983                                " ranges separated by commas"))
984
985 ADD_UIDS_OPT = cli_option("--add-uids", default=None,
986                           action="store", dest="add_uids",
987                           help=("A list of user-ids or user-id"
988                                 " ranges separated by commas, to be"
989                                 " added to the user-id pool"))
990
991 REMOVE_UIDS_OPT = cli_option("--remove-uids", default=None,
992                              action="store", dest="remove_uids",
993                              help=("A list of user-ids or user-id"
994                                    " ranges separated by commas, to be"
995                                    " removed from the user-id pool"))
996
997 ROMAN_OPT = cli_option("--roman",
998                        dest="roman_integers", default=False,
999                        action="store_true",
1000                        help="Use roman numbers for positive integers")
1001
1002 DRBD_HELPER_OPT = cli_option("--drbd-usermode-helper", dest="drbd_helper",
1003                              action="store", default=None,
1004                              help="Specifies usermode helper for DRBD")
1005
1006 NODRBD_STORAGE_OPT = cli_option("--no-drbd-storage", dest="drbd_storage",
1007                                 action="store_false", default=True,
1008                                 help="Disable support for DRBD")
1009
1010
1011 def _ParseArgs(argv, commands, aliases):
1012   """Parser for the command line arguments.
1013
1014   This function parses the arguments and returns the function which
1015   must be executed together with its (modified) arguments.
1016
1017   @param argv: the command line
1018   @param commands: dictionary with special contents, see the design
1019       doc for cmdline handling
1020   @param aliases: dictionary with command aliases {'alias': 'target, ...}
1021
1022   """
1023   if len(argv) == 0:
1024     binary = "<command>"
1025   else:
1026     binary = argv[0].split("/")[-1]
1027
1028   if len(argv) > 1 and argv[1] == "--version":
1029     ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
1030     # Quit right away. That way we don't have to care about this special
1031     # argument. optparse.py does it the same.
1032     sys.exit(0)
1033
1034   if len(argv) < 2 or not (argv[1] in commands or
1035                            argv[1] in aliases):
1036     # let's do a nice thing
1037     sortedcmds = commands.keys()
1038     sortedcmds.sort()
1039
1040     ToStdout("Usage: %s {command} [options...] [argument...]", binary)
1041     ToStdout("%s <command> --help to see details, or man %s", binary, binary)
1042     ToStdout("")
1043
1044     # compute the max line length for cmd + usage
1045     mlen = max([len(" %s" % cmd) for cmd in commands])
1046     mlen = min(60, mlen) # should not get here...
1047
1048     # and format a nice command list
1049     ToStdout("Commands:")
1050     for cmd in sortedcmds:
1051       cmdstr = " %s" % (cmd,)
1052       help_text = commands[cmd][4]
1053       help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
1054       ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
1055       for line in help_lines:
1056         ToStdout("%-*s   %s", mlen, "", line)
1057
1058     ToStdout("")
1059
1060     return None, None, None
1061
1062   # get command, unalias it, and look it up in commands
1063   cmd = argv.pop(1)
1064   if cmd in aliases:
1065     if cmd in commands:
1066       raise errors.ProgrammerError("Alias '%s' overrides an existing"
1067                                    " command" % cmd)
1068
1069     if aliases[cmd] not in commands:
1070       raise errors.ProgrammerError("Alias '%s' maps to non-existing"
1071                                    " command '%s'" % (cmd, aliases[cmd]))
1072
1073     cmd = aliases[cmd]
1074
1075   func, args_def, parser_opts, usage, description = commands[cmd]
1076   parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT, DEBUG_OPT],
1077                         description=description,
1078                         formatter=TitledHelpFormatter(),
1079                         usage="%%prog %s %s" % (cmd, usage))
1080   parser.disable_interspersed_args()
1081   options, args = parser.parse_args()
1082
1083   if not _CheckArguments(cmd, args_def, args):
1084     return None, None, None
1085
1086   return func, options, args
1087
1088
1089 def _CheckArguments(cmd, args_def, args):
1090   """Verifies the arguments using the argument definition.
1091
1092   Algorithm:
1093
1094     1. Abort with error if values specified by user but none expected.
1095
1096     1. For each argument in definition
1097
1098       1. Keep running count of minimum number of values (min_count)
1099       1. Keep running count of maximum number of values (max_count)
1100       1. If it has an unlimited number of values
1101
1102         1. Abort with error if it's not the last argument in the definition
1103
1104     1. If last argument has limited number of values
1105
1106       1. Abort with error if number of values doesn't match or is too large
1107
1108     1. Abort with error if user didn't pass enough values (min_count)
1109
1110   """
1111   if args and not args_def:
1112     ToStderr("Error: Command %s expects no arguments", cmd)
1113     return False
1114
1115   min_count = None
1116   max_count = None
1117   check_max = None
1118
1119   last_idx = len(args_def) - 1
1120
1121   for idx, arg in enumerate(args_def):
1122     if min_count is None:
1123       min_count = arg.min
1124     elif arg.min is not None:
1125       min_count += arg.min
1126
1127     if max_count is None:
1128       max_count = arg.max
1129     elif arg.max is not None:
1130       max_count += arg.max
1131
1132     if idx == last_idx:
1133       check_max = (arg.max is not None)
1134
1135     elif arg.max is None:
1136       raise errors.ProgrammerError("Only the last argument can have max=None")
1137
1138   if check_max:
1139     # Command with exact number of arguments
1140     if (min_count is not None and max_count is not None and
1141         min_count == max_count and len(args) != min_count):
1142       ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
1143       return False
1144
1145     # Command with limited number of arguments
1146     if max_count is not None and len(args) > max_count:
1147       ToStderr("Error: Command %s expects only %d argument(s)",
1148                cmd, max_count)
1149       return False
1150
1151   # Command with some required arguments
1152   if min_count is not None and len(args) < min_count:
1153     ToStderr("Error: Command %s expects at least %d argument(s)",
1154              cmd, min_count)
1155     return False
1156
1157   return True
1158
1159
1160 def SplitNodeOption(value):
1161   """Splits the value of a --node option.
1162
1163   """
1164   if value and ':' in value:
1165     return value.split(':', 1)
1166   else:
1167     return (value, None)
1168
1169
1170 def CalculateOSNames(os_name, os_variants):
1171   """Calculates all the names an OS can be called, according to its variants.
1172
1173   @type os_name: string
1174   @param os_name: base name of the os
1175   @type os_variants: list or None
1176   @param os_variants: list of supported variants
1177   @rtype: list
1178   @return: list of valid names
1179
1180   """
1181   if os_variants:
1182     return ['%s+%s' % (os_name, v) for v in os_variants]
1183   else:
1184     return [os_name]
1185
1186
1187 def UsesRPC(fn):
1188   def wrapper(*args, **kwargs):
1189     rpc.Init()
1190     try:
1191       return fn(*args, **kwargs)
1192     finally:
1193       rpc.Shutdown()
1194   return wrapper
1195
1196
1197 def AskUser(text, choices=None):
1198   """Ask the user a question.
1199
1200   @param text: the question to ask
1201
1202   @param choices: list with elements tuples (input_char, return_value,
1203       description); if not given, it will default to: [('y', True,
1204       'Perform the operation'), ('n', False, 'Do no do the operation')];
1205       note that the '?' char is reserved for help
1206
1207   @return: one of the return values from the choices list; if input is
1208       not possible (i.e. not running with a tty, we return the last
1209       entry from the list
1210
1211   """
1212   if choices is None:
1213     choices = [('y', True, 'Perform the operation'),
1214                ('n', False, 'Do not perform the operation')]
1215   if not choices or not isinstance(choices, list):
1216     raise errors.ProgrammerError("Invalid choices argument to AskUser")
1217   for entry in choices:
1218     if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
1219       raise errors.ProgrammerError("Invalid choices element to AskUser")
1220
1221   answer = choices[-1][1]
1222   new_text = []
1223   for line in text.splitlines():
1224     new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
1225   text = "\n".join(new_text)
1226   try:
1227     f = file("/dev/tty", "a+")
1228   except IOError:
1229     return answer
1230   try:
1231     chars = [entry[0] for entry in choices]
1232     chars[-1] = "[%s]" % chars[-1]
1233     chars.append('?')
1234     maps = dict([(entry[0], entry[1]) for entry in choices])
1235     while True:
1236       f.write(text)
1237       f.write('\n')
1238       f.write("/".join(chars))
1239       f.write(": ")
1240       line = f.readline(2).strip().lower()
1241       if line in maps:
1242         answer = maps[line]
1243         break
1244       elif line == '?':
1245         for entry in choices:
1246           f.write(" %s - %s\n" % (entry[0], entry[2]))
1247         f.write("\n")
1248         continue
1249   finally:
1250     f.close()
1251   return answer
1252
1253
1254 class JobSubmittedException(Exception):
1255   """Job was submitted, client should exit.
1256
1257   This exception has one argument, the ID of the job that was
1258   submitted. The handler should print this ID.
1259
1260   This is not an error, just a structured way to exit from clients.
1261
1262   """
1263
1264
1265 def SendJob(ops, cl=None):
1266   """Function to submit an opcode without waiting for the results.
1267
1268   @type ops: list
1269   @param ops: list of opcodes
1270   @type cl: luxi.Client
1271   @param cl: the luxi client to use for communicating with the master;
1272              if None, a new client will be created
1273
1274   """
1275   if cl is None:
1276     cl = GetClient()
1277
1278   job_id = cl.SubmitJob(ops)
1279
1280   return job_id
1281
1282
1283 def GenericPollJob(job_id, cbs, report_cbs):
1284   """Generic job-polling function.
1285
1286   @type job_id: number
1287   @param job_id: Job ID
1288   @type cbs: Instance of L{JobPollCbBase}
1289   @param cbs: Data callbacks
1290   @type report_cbs: Instance of L{JobPollReportCbBase}
1291   @param report_cbs: Reporting callbacks
1292
1293   """
1294   prev_job_info = None
1295   prev_logmsg_serial = None
1296
1297   status = None
1298
1299   while True:
1300     result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1301                                       prev_logmsg_serial)
1302     if not result:
1303       # job not found, go away!
1304       raise errors.JobLost("Job with id %s lost" % job_id)
1305
1306     if result == constants.JOB_NOTCHANGED:
1307       report_cbs.ReportNotChanged(job_id, status)
1308
1309       # Wait again
1310       continue
1311
1312     # Split result, a tuple of (field values, log entries)
1313     (job_info, log_entries) = result
1314     (status, ) = job_info
1315
1316     if log_entries:
1317       for log_entry in log_entries:
1318         (serial, timestamp, log_type, message) = log_entry
1319         report_cbs.ReportLogMessage(job_id, serial, timestamp,
1320                                     log_type, message)
1321         prev_logmsg_serial = max(prev_logmsg_serial, serial)
1322
1323     # TODO: Handle canceled and archived jobs
1324     elif status in (constants.JOB_STATUS_SUCCESS,
1325                     constants.JOB_STATUS_ERROR,
1326                     constants.JOB_STATUS_CANCELING,
1327                     constants.JOB_STATUS_CANCELED):
1328       break
1329
1330     prev_job_info = job_info
1331
1332   jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1333   if not jobs:
1334     raise errors.JobLost("Job with id %s lost" % job_id)
1335
1336   status, opstatus, result = jobs[0]
1337
1338   if status == constants.JOB_STATUS_SUCCESS:
1339     return result
1340
1341   if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
1342     raise errors.OpExecError("Job was canceled")
1343
1344   has_ok = False
1345   for idx, (status, msg) in enumerate(zip(opstatus, result)):
1346     if status == constants.OP_STATUS_SUCCESS:
1347       has_ok = True
1348     elif status == constants.OP_STATUS_ERROR:
1349       errors.MaybeRaise(msg)
1350
1351       if has_ok:
1352         raise errors.OpExecError("partial failure (opcode %d): %s" %
1353                                  (idx, msg))
1354
1355       raise errors.OpExecError(str(msg))
1356
1357   # default failure mode
1358   raise errors.OpExecError(result)
1359
1360
1361 class JobPollCbBase:
1362   """Base class for L{GenericPollJob} callbacks.
1363
1364   """
1365   def __init__(self):
1366     """Initializes this class.
1367
1368     """
1369
1370   def WaitForJobChangeOnce(self, job_id, fields,
1371                            prev_job_info, prev_log_serial):
1372     """Waits for changes on a job.
1373
1374     """
1375     raise NotImplementedError()
1376
1377   def QueryJobs(self, job_ids, fields):
1378     """Returns the selected fields for the selected job IDs.
1379
1380     @type job_ids: list of numbers
1381     @param job_ids: Job IDs
1382     @type fields: list of strings
1383     @param fields: Fields
1384
1385     """
1386     raise NotImplementedError()
1387
1388
1389 class JobPollReportCbBase:
1390   """Base class for L{GenericPollJob} reporting callbacks.
1391
1392   """
1393   def __init__(self):
1394     """Initializes this class.
1395
1396     """
1397
1398   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1399     """Handles a log message.
1400
1401     """
1402     raise NotImplementedError()
1403
1404   def ReportNotChanged(self, job_id, status):
1405     """Called for if a job hasn't changed in a while.
1406
1407     @type job_id: number
1408     @param job_id: Job ID
1409     @type status: string or None
1410     @param status: Job status if available
1411
1412     """
1413     raise NotImplementedError()
1414
1415
1416 class _LuxiJobPollCb(JobPollCbBase):
1417   def __init__(self, cl):
1418     """Initializes this class.
1419
1420     """
1421     JobPollCbBase.__init__(self)
1422     self.cl = cl
1423
1424   def WaitForJobChangeOnce(self, job_id, fields,
1425                            prev_job_info, prev_log_serial):
1426     """Waits for changes on a job.
1427
1428     """
1429     return self.cl.WaitForJobChangeOnce(job_id, fields,
1430                                         prev_job_info, prev_log_serial)
1431
1432   def QueryJobs(self, job_ids, fields):
1433     """Returns the selected fields for the selected job IDs.
1434
1435     """
1436     return self.cl.QueryJobs(job_ids, fields)
1437
1438
1439 class FeedbackFnJobPollReportCb(JobPollReportCbBase):
1440   def __init__(self, feedback_fn):
1441     """Initializes this class.
1442
1443     """
1444     JobPollReportCbBase.__init__(self)
1445
1446     self.feedback_fn = feedback_fn
1447
1448     assert callable(feedback_fn)
1449
1450   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1451     """Handles a log message.
1452
1453     """
1454     self.feedback_fn((timestamp, log_type, log_msg))
1455
1456   def ReportNotChanged(self, job_id, status):
1457     """Called if a job hasn't changed in a while.
1458
1459     """
1460     # Ignore
1461
1462
1463 class StdioJobPollReportCb(JobPollReportCbBase):
1464   def __init__(self):
1465     """Initializes this class.
1466
1467     """
1468     JobPollReportCbBase.__init__(self)
1469
1470     self.notified_queued = False
1471     self.notified_waitlock = False
1472
1473   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1474     """Handles a log message.
1475
1476     """
1477     ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
1478              FormatLogMessage(log_type, log_msg))
1479
1480   def ReportNotChanged(self, job_id, status):
1481     """Called if a job hasn't changed in a while.
1482
1483     """
1484     if status is None:
1485       return
1486
1487     if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
1488       ToStderr("Job %s is waiting in queue", job_id)
1489       self.notified_queued = True
1490
1491     elif status == constants.JOB_STATUS_WAITLOCK and not self.notified_waitlock:
1492       ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1493       self.notified_waitlock = True
1494
1495
1496 def FormatLogMessage(log_type, log_msg):
1497   """Formats a job message according to its type.
1498
1499   """
1500   if log_type != constants.ELOG_MESSAGE:
1501     log_msg = str(log_msg)
1502
1503   return utils.SafeEncode(log_msg)
1504
1505
1506 def PollJob(job_id, cl=None, feedback_fn=None, reporter=None):
1507   """Function to poll for the result of a job.
1508
1509   @type job_id: job identified
1510   @param job_id: the job to poll for results
1511   @type cl: luxi.Client
1512   @param cl: the luxi client to use for communicating with the master;
1513              if None, a new client will be created
1514
1515   """
1516   if cl is None:
1517     cl = GetClient()
1518
1519   if reporter is None:
1520     if feedback_fn:
1521       reporter = FeedbackFnJobPollReportCb(feedback_fn)
1522     else:
1523       reporter = StdioJobPollReportCb()
1524   elif feedback_fn:
1525     raise errors.ProgrammerError("Can't specify reporter and feedback function")
1526
1527   return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
1528
1529
1530 def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None, reporter=None):
1531   """Legacy function to submit an opcode.
1532
1533   This is just a simple wrapper over the construction of the processor
1534   instance. It should be extended to better handle feedback and
1535   interaction functions.
1536
1537   """
1538   if cl is None:
1539     cl = GetClient()
1540
1541   SetGenericOpcodeOpts([op], opts)
1542
1543   job_id = SendJob([op], cl)
1544
1545   op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn,
1546                        reporter=reporter)
1547
1548   return op_results[0]
1549
1550
1551 def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1552   """Wrapper around SubmitOpCode or SendJob.
1553
1554   This function will decide, based on the 'opts' parameter, whether to
1555   submit and wait for the result of the opcode (and return it), or
1556   whether to just send the job and print its identifier. It is used in
1557   order to simplify the implementation of the '--submit' option.
1558
1559   It will also process the opcodes if we're sending the via SendJob
1560   (otherwise SubmitOpCode does it).
1561
1562   """
1563   if opts and opts.submit_only:
1564     job = [op]
1565     SetGenericOpcodeOpts(job, opts)
1566     job_id = SendJob(job, cl=cl)
1567     raise JobSubmittedException(job_id)
1568   else:
1569     return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1570
1571
1572 def SetGenericOpcodeOpts(opcode_list, options):
1573   """Processor for generic options.
1574
1575   This function updates the given opcodes based on generic command
1576   line options (like debug, dry-run, etc.).
1577
1578   @param opcode_list: list of opcodes
1579   @param options: command line options or None
1580   @return: None (in-place modification)
1581
1582   """
1583   if not options:
1584     return
1585   for op in opcode_list:
1586     op.dry_run = options.dry_run
1587     op.debug_level = options.debug
1588
1589
1590 def GetClient():
1591   # TODO: Cache object?
1592   try:
1593     client = luxi.Client()
1594   except luxi.NoMasterError:
1595     ss = ssconf.SimpleStore()
1596
1597     # Try to read ssconf file
1598     try:
1599       ss.GetMasterNode()
1600     except errors.ConfigurationError:
1601       raise errors.OpPrereqError("Cluster not initialized or this machine is"
1602                                  " not part of a cluster")
1603
1604     master, myself = ssconf.GetMasterAndMyself(ss=ss)
1605     if master != myself:
1606       raise errors.OpPrereqError("This is not the master node, please connect"
1607                                  " to node '%s' and rerun the command" %
1608                                  master)
1609     raise
1610   return client
1611
1612
1613 def FormatError(err):
1614   """Return a formatted error message for a given error.
1615
1616   This function takes an exception instance and returns a tuple
1617   consisting of two values: first, the recommended exit code, and
1618   second, a string describing the error message (not
1619   newline-terminated).
1620
1621   """
1622   retcode = 1
1623   obuf = StringIO()
1624   msg = str(err)
1625   if isinstance(err, errors.ConfigurationError):
1626     txt = "Corrupt configuration file: %s" % msg
1627     logging.error(txt)
1628     obuf.write(txt + "\n")
1629     obuf.write("Aborting.")
1630     retcode = 2
1631   elif isinstance(err, errors.HooksAbort):
1632     obuf.write("Failure: hooks execution failed:\n")
1633     for node, script, out in err.args[0]:
1634       if out:
1635         obuf.write("  node: %s, script: %s, output: %s\n" %
1636                    (node, script, out))
1637       else:
1638         obuf.write("  node: %s, script: %s (no output)\n" %
1639                    (node, script))
1640   elif isinstance(err, errors.HooksFailure):
1641     obuf.write("Failure: hooks general failure: %s" % msg)
1642   elif isinstance(err, errors.ResolverError):
1643     this_host = netutils.HostInfo.SysName()
1644     if err.args[0] == this_host:
1645       msg = "Failure: can't resolve my own hostname ('%s')"
1646     else:
1647       msg = "Failure: can't resolve hostname '%s'"
1648     obuf.write(msg % err.args[0])
1649   elif isinstance(err, errors.OpPrereqError):
1650     if len(err.args) == 2:
1651       obuf.write("Failure: prerequisites not met for this"
1652                " operation:\nerror type: %s, error details:\n%s" %
1653                  (err.args[1], err.args[0]))
1654     else:
1655       obuf.write("Failure: prerequisites not met for this"
1656                  " operation:\n%s" % msg)
1657   elif isinstance(err, errors.OpExecError):
1658     obuf.write("Failure: command execution error:\n%s" % msg)
1659   elif isinstance(err, errors.TagError):
1660     obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1661   elif isinstance(err, errors.JobQueueDrainError):
1662     obuf.write("Failure: the job queue is marked for drain and doesn't"
1663                " accept new requests\n")
1664   elif isinstance(err, errors.JobQueueFull):
1665     obuf.write("Failure: the job queue is full and doesn't accept new"
1666                " job submissions until old jobs are archived\n")
1667   elif isinstance(err, errors.TypeEnforcementError):
1668     obuf.write("Parameter Error: %s" % msg)
1669   elif isinstance(err, errors.ParameterError):
1670     obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1671   elif isinstance(err, luxi.NoMasterError):
1672     obuf.write("Cannot communicate with the master daemon.\nIs it running"
1673                " and listening for connections?")
1674   elif isinstance(err, luxi.TimeoutError):
1675     obuf.write("Timeout while talking to the master daemon. Error:\n"
1676                "%s" % msg)
1677   elif isinstance(err, luxi.ProtocolError):
1678     obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1679                "%s" % msg)
1680   elif isinstance(err, errors.GenericError):
1681     obuf.write("Unhandled Ganeti error: %s" % msg)
1682   elif isinstance(err, JobSubmittedException):
1683     obuf.write("JobID: %s\n" % err.args[0])
1684     retcode = 0
1685   else:
1686     obuf.write("Unhandled exception: %s" % msg)
1687   return retcode, obuf.getvalue().rstrip('\n')
1688
1689
1690 def GenericMain(commands, override=None, aliases=None):
1691   """Generic main function for all the gnt-* commands.
1692
1693   Arguments:
1694     - commands: a dictionary with a special structure, see the design doc
1695                 for command line handling.
1696     - override: if not None, we expect a dictionary with keys that will
1697                 override command line options; this can be used to pass
1698                 options from the scripts to generic functions
1699     - aliases: dictionary with command aliases {'alias': 'target, ...}
1700
1701   """
1702   # save the program name and the entire command line for later logging
1703   if sys.argv:
1704     binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1705     if len(sys.argv) >= 2:
1706       binary += " " + sys.argv[1]
1707       old_cmdline = " ".join(sys.argv[2:])
1708     else:
1709       old_cmdline = ""
1710   else:
1711     binary = "<unknown program>"
1712     old_cmdline = ""
1713
1714   if aliases is None:
1715     aliases = {}
1716
1717   try:
1718     func, options, args = _ParseArgs(sys.argv, commands, aliases)
1719   except errors.ParameterError, err:
1720     result, err_msg = FormatError(err)
1721     ToStderr(err_msg)
1722     return 1
1723
1724   if func is None: # parse error
1725     return 1
1726
1727   if override is not None:
1728     for key, val in override.iteritems():
1729       setattr(options, key, val)
1730
1731   utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1732                      stderr_logging=True, program=binary)
1733
1734   if old_cmdline:
1735     logging.info("run with arguments '%s'", old_cmdline)
1736   else:
1737     logging.info("run with no arguments")
1738
1739   try:
1740     result = func(options, args)
1741   except (errors.GenericError, luxi.ProtocolError,
1742           JobSubmittedException), err:
1743     result, err_msg = FormatError(err)
1744     logging.exception("Error during command processing")
1745     ToStderr(err_msg)
1746
1747   return result
1748
1749
1750 def GenericInstanceCreate(mode, opts, args):
1751   """Add an instance to the cluster via either creation or import.
1752
1753   @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1754   @param opts: the command line options selected by the user
1755   @type args: list
1756   @param args: should contain only one element, the new instance name
1757   @rtype: int
1758   @return: the desired exit code
1759
1760   """
1761   instance = args[0]
1762
1763   (pnode, snode) = SplitNodeOption(opts.node)
1764
1765   hypervisor = None
1766   hvparams = {}
1767   if opts.hypervisor:
1768     hypervisor, hvparams = opts.hypervisor
1769
1770   if opts.nics:
1771     try:
1772       nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
1773     except ValueError, err:
1774       raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
1775     nics = [{}] * nic_max
1776     for nidx, ndict in opts.nics:
1777       nidx = int(nidx)
1778       if not isinstance(ndict, dict):
1779         msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict)
1780         raise errors.OpPrereqError(msg)
1781       nics[nidx] = ndict
1782   elif opts.no_nics:
1783     # no nics
1784     nics = []
1785   elif mode == constants.INSTANCE_CREATE:
1786     # default of one nic, all auto
1787     nics = [{}]
1788   else:
1789     # mode == import
1790     nics = []
1791
1792   if opts.disk_template == constants.DT_DISKLESS:
1793     if opts.disks or opts.sd_size is not None:
1794       raise errors.OpPrereqError("Diskless instance but disk"
1795                                  " information passed")
1796     disks = []
1797   else:
1798     if (not opts.disks and not opts.sd_size
1799         and mode == constants.INSTANCE_CREATE):
1800       raise errors.OpPrereqError("No disk information specified")
1801     if opts.disks and opts.sd_size is not None:
1802       raise errors.OpPrereqError("Please use either the '--disk' or"
1803                                  " '-s' option")
1804     if opts.sd_size is not None:
1805       opts.disks = [(0, {"size": opts.sd_size})]
1806
1807     if opts.disks:
1808       try:
1809         disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1810       except ValueError, err:
1811         raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
1812       disks = [{}] * disk_max
1813     else:
1814       disks = []
1815     for didx, ddict in opts.disks:
1816       didx = int(didx)
1817       if not isinstance(ddict, dict):
1818         msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1819         raise errors.OpPrereqError(msg)
1820       elif "size" in ddict:
1821         if "adopt" in ddict:
1822           raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1823                                      " (disk %d)" % didx)
1824         try:
1825           ddict["size"] = utils.ParseUnit(ddict["size"])
1826         except ValueError, err:
1827           raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1828                                      (didx, err))
1829       elif "adopt" in ddict:
1830         if mode == constants.INSTANCE_IMPORT:
1831           raise errors.OpPrereqError("Disk adoption not allowed for instance"
1832                                      " import")
1833         ddict["size"] = 0
1834       else:
1835         raise errors.OpPrereqError("Missing size or adoption source for"
1836                                    " disk %d" % didx)
1837       disks[didx] = ddict
1838
1839   utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
1840   utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1841
1842   if mode == constants.INSTANCE_CREATE:
1843     start = opts.start
1844     os_type = opts.os
1845     force_variant = opts.force_variant
1846     src_node = None
1847     src_path = None
1848     no_install = opts.no_install
1849     identify_defaults = False
1850   elif mode == constants.INSTANCE_IMPORT:
1851     start = False
1852     os_type = None
1853     force_variant = False
1854     src_node = opts.src_node
1855     src_path = opts.src_dir
1856     no_install = None
1857     identify_defaults = opts.identify_defaults
1858   else:
1859     raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1860
1861   op = opcodes.OpCreateInstance(instance_name=instance,
1862                                 disks=disks,
1863                                 disk_template=opts.disk_template,
1864                                 nics=nics,
1865                                 pnode=pnode, snode=snode,
1866                                 ip_check=opts.ip_check,
1867                                 name_check=opts.name_check,
1868                                 wait_for_sync=opts.wait_for_sync,
1869                                 file_storage_dir=opts.file_storage_dir,
1870                                 file_driver=opts.file_driver,
1871                                 iallocator=opts.iallocator,
1872                                 hypervisor=hypervisor,
1873                                 hvparams=hvparams,
1874                                 beparams=opts.beparams,
1875                                 osparams=opts.osparams,
1876                                 mode=mode,
1877                                 start=start,
1878                                 os_type=os_type,
1879                                 force_variant=force_variant,
1880                                 src_node=src_node,
1881                                 src_path=src_path,
1882                                 no_install=no_install,
1883                                 identify_defaults=identify_defaults)
1884
1885   SubmitOrSend(op, opts)
1886   return 0
1887
1888
1889 class _RunWhileClusterStoppedHelper:
1890   """Helper class for L{RunWhileClusterStopped} to simplify state management
1891
1892   """
1893   def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
1894     """Initializes this class.
1895
1896     @type feedback_fn: callable
1897     @param feedback_fn: Feedback function
1898     @type cluster_name: string
1899     @param cluster_name: Cluster name
1900     @type master_node: string
1901     @param master_node Master node name
1902     @type online_nodes: list
1903     @param online_nodes: List of names of online nodes
1904
1905     """
1906     self.feedback_fn = feedback_fn
1907     self.cluster_name = cluster_name
1908     self.master_node = master_node
1909     self.online_nodes = online_nodes
1910
1911     self.ssh = ssh.SshRunner(self.cluster_name)
1912
1913     self.nonmaster_nodes = [name for name in online_nodes
1914                             if name != master_node]
1915
1916     assert self.master_node not in self.nonmaster_nodes
1917
1918   def _RunCmd(self, node_name, cmd):
1919     """Runs a command on the local or a remote machine.
1920
1921     @type node_name: string
1922     @param node_name: Machine name
1923     @type cmd: list
1924     @param cmd: Command
1925
1926     """
1927     if node_name is None or node_name == self.master_node:
1928       # No need to use SSH
1929       result = utils.RunCmd(cmd)
1930     else:
1931       result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
1932
1933     if result.failed:
1934       errmsg = ["Failed to run command %s" % result.cmd]
1935       if node_name:
1936         errmsg.append("on node %s" % node_name)
1937       errmsg.append(": exitcode %s and error %s" %
1938                     (result.exit_code, result.output))
1939       raise errors.OpExecError(" ".join(errmsg))
1940
1941   def Call(self, fn, *args):
1942     """Call function while all daemons are stopped.
1943
1944     @type fn: callable
1945     @param fn: Function to be called
1946
1947     """
1948     # Pause watcher by acquiring an exclusive lock on watcher state file
1949     self.feedback_fn("Blocking watcher")
1950     watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
1951     try:
1952       # TODO: Currently, this just blocks. There's no timeout.
1953       # TODO: Should it be a shared lock?
1954       watcher_block.Exclusive(blocking=True)
1955
1956       # Stop master daemons, so that no new jobs can come in and all running
1957       # ones are finished
1958       self.feedback_fn("Stopping master daemons")
1959       self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
1960       try:
1961         # Stop daemons on all nodes
1962         for node_name in self.online_nodes:
1963           self.feedback_fn("Stopping daemons on %s" % node_name)
1964           self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
1965
1966         # All daemons are shut down now
1967         try:
1968           return fn(self, *args)
1969         except Exception, err:
1970           _, errmsg = FormatError(err)
1971           logging.exception("Caught exception")
1972           self.feedback_fn(errmsg)
1973           raise
1974       finally:
1975         # Start cluster again, master node last
1976         for node_name in self.nonmaster_nodes + [self.master_node]:
1977           self.feedback_fn("Starting daemons on %s" % node_name)
1978           self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
1979     finally:
1980       # Resume watcher
1981       watcher_block.Close()
1982
1983
1984 def RunWhileClusterStopped(feedback_fn, fn, *args):
1985   """Calls a function while all cluster daemons are stopped.
1986
1987   @type feedback_fn: callable
1988   @param feedback_fn: Feedback function
1989   @type fn: callable
1990   @param fn: Function to be called when daemons are stopped
1991
1992   """
1993   feedback_fn("Gathering cluster information")
1994
1995   # This ensures we're running on the master daemon
1996   cl = GetClient()
1997
1998   (cluster_name, master_node) = \
1999     cl.QueryConfigValues(["cluster_name", "master_node"])
2000
2001   online_nodes = GetOnlineNodes([], cl=cl)
2002
2003   # Don't keep a reference to the client. The master daemon will go away.
2004   del cl
2005
2006   assert master_node in online_nodes
2007
2008   return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
2009                                        online_nodes).Call(fn, *args)
2010
2011
2012 def GenerateTable(headers, fields, separator, data,
2013                   numfields=None, unitfields=None,
2014                   units=None):
2015   """Prints a table with headers and different fields.
2016
2017   @type headers: dict
2018   @param headers: dictionary mapping field names to headers for
2019       the table
2020   @type fields: list
2021   @param fields: the field names corresponding to each row in
2022       the data field
2023   @param separator: the separator to be used; if this is None,
2024       the default 'smart' algorithm is used which computes optimal
2025       field width, otherwise just the separator is used between
2026       each field
2027   @type data: list
2028   @param data: a list of lists, each sublist being one row to be output
2029   @type numfields: list
2030   @param numfields: a list with the fields that hold numeric
2031       values and thus should be right-aligned
2032   @type unitfields: list
2033   @param unitfields: a list with the fields that hold numeric
2034       values that should be formatted with the units field
2035   @type units: string or None
2036   @param units: the units we should use for formatting, or None for
2037       automatic choice (human-readable for non-separator usage, otherwise
2038       megabytes); this is a one-letter string
2039
2040   """
2041   if units is None:
2042     if separator:
2043       units = "m"
2044     else:
2045       units = "h"
2046
2047   if numfields is None:
2048     numfields = []
2049   if unitfields is None:
2050     unitfields = []
2051
2052   numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
2053   unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
2054
2055   format_fields = []
2056   for field in fields:
2057     if headers and field not in headers:
2058       # TODO: handle better unknown fields (either revert to old
2059       # style of raising exception, or deal more intelligently with
2060       # variable fields)
2061       headers[field] = field
2062     if separator is not None:
2063       format_fields.append("%s")
2064     elif numfields.Matches(field):
2065       format_fields.append("%*s")
2066     else:
2067       format_fields.append("%-*s")
2068
2069   if separator is None:
2070     mlens = [0 for name in fields]
2071     format_str = ' '.join(format_fields)
2072   else:
2073     format_str = separator.replace("%", "%%").join(format_fields)
2074
2075   for row in data:
2076     if row is None:
2077       continue
2078     for idx, val in enumerate(row):
2079       if unitfields.Matches(fields[idx]):
2080         try:
2081           val = int(val)
2082         except (TypeError, ValueError):
2083           pass
2084         else:
2085           val = row[idx] = utils.FormatUnit(val, units)
2086       val = row[idx] = str(val)
2087       if separator is None:
2088         mlens[idx] = max(mlens[idx], len(val))
2089
2090   result = []
2091   if headers:
2092     args = []
2093     for idx, name in enumerate(fields):
2094       hdr = headers[name]
2095       if separator is None:
2096         mlens[idx] = max(mlens[idx], len(hdr))
2097         args.append(mlens[idx])
2098       args.append(hdr)
2099     result.append(format_str % tuple(args))
2100
2101   if separator is None:
2102     assert len(mlens) == len(fields)
2103
2104     if fields and not numfields.Matches(fields[-1]):
2105       mlens[-1] = 0
2106
2107   for line in data:
2108     args = []
2109     if line is None:
2110       line = ['-' for _ in fields]
2111     for idx in range(len(fields)):
2112       if separator is None:
2113         args.append(mlens[idx])
2114       args.append(line[idx])
2115     result.append(format_str % tuple(args))
2116
2117   return result
2118
2119
2120 def FormatTimestamp(ts):
2121   """Formats a given timestamp.
2122
2123   @type ts: timestamp
2124   @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
2125
2126   @rtype: string
2127   @return: a string with the formatted timestamp
2128
2129   """
2130   if not isinstance (ts, (tuple, list)) or len(ts) != 2:
2131     return '?'
2132   sec, usec = ts
2133   return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
2134
2135
2136 def ParseTimespec(value):
2137   """Parse a time specification.
2138
2139   The following suffixed will be recognized:
2140
2141     - s: seconds
2142     - m: minutes
2143     - h: hours
2144     - d: day
2145     - w: weeks
2146
2147   Without any suffix, the value will be taken to be in seconds.
2148
2149   """
2150   value = str(value)
2151   if not value:
2152     raise errors.OpPrereqError("Empty time specification passed")
2153   suffix_map = {
2154     's': 1,
2155     'm': 60,
2156     'h': 3600,
2157     'd': 86400,
2158     'w': 604800,
2159     }
2160   if value[-1] not in suffix_map:
2161     try:
2162       value = int(value)
2163     except (TypeError, ValueError):
2164       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2165   else:
2166     multiplier = suffix_map[value[-1]]
2167     value = value[:-1]
2168     if not value: # no data left after stripping the suffix
2169       raise errors.OpPrereqError("Invalid time specification (only"
2170                                  " suffix passed)")
2171     try:
2172       value = int(value) * multiplier
2173     except (TypeError, ValueError):
2174       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2175   return value
2176
2177
2178 def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
2179                    filter_master=False):
2180   """Returns the names of online nodes.
2181
2182   This function will also log a warning on stderr with the names of
2183   the online nodes.
2184
2185   @param nodes: if not empty, use only this subset of nodes (minus the
2186       offline ones)
2187   @param cl: if not None, luxi client to use
2188   @type nowarn: boolean
2189   @param nowarn: by default, this function will output a note with the
2190       offline nodes that are skipped; if this parameter is True the
2191       note is not displayed
2192   @type secondary_ips: boolean
2193   @param secondary_ips: if True, return the secondary IPs instead of the
2194       names, useful for doing network traffic over the replication interface
2195       (if any)
2196   @type filter_master: boolean
2197   @param filter_master: if True, do not return the master node in the list
2198       (useful in coordination with secondary_ips where we cannot check our
2199       node name against the list)
2200
2201   """
2202   if cl is None:
2203     cl = GetClient()
2204
2205   if secondary_ips:
2206     name_idx = 2
2207   else:
2208     name_idx = 0
2209
2210   if filter_master:
2211     master_node = cl.QueryConfigValues(["master_node"])[0]
2212     filter_fn = lambda x: x != master_node
2213   else:
2214     filter_fn = lambda _: True
2215
2216   result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"],
2217                          use_locking=False)
2218   offline = [row[0] for row in result if row[1]]
2219   if offline and not nowarn:
2220     ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
2221   return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])]
2222
2223
2224 def _ToStream(stream, txt, *args):
2225   """Write a message to a stream, bypassing the logging system
2226
2227   @type stream: file object
2228   @param stream: the file to which we should write
2229   @type txt: str
2230   @param txt: the message
2231
2232   """
2233   if args:
2234     args = tuple(args)
2235     stream.write(txt % args)
2236   else:
2237     stream.write(txt)
2238   stream.write('\n')
2239   stream.flush()
2240
2241
2242 def ToStdout(txt, *args):
2243   """Write a message to stdout only, bypassing the logging system
2244
2245   This is just a wrapper over _ToStream.
2246
2247   @type txt: str
2248   @param txt: the message
2249
2250   """
2251   _ToStream(sys.stdout, txt, *args)
2252
2253
2254 def ToStderr(txt, *args):
2255   """Write a message to stderr only, bypassing the logging system
2256
2257   This is just a wrapper over _ToStream.
2258
2259   @type txt: str
2260   @param txt: the message
2261
2262   """
2263   _ToStream(sys.stderr, txt, *args)
2264
2265
2266 class JobExecutor(object):
2267   """Class which manages the submission and execution of multiple jobs.
2268
2269   Note that instances of this class should not be reused between
2270   GetResults() calls.
2271
2272   """
2273   def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
2274     self.queue = []
2275     if cl is None:
2276       cl = GetClient()
2277     self.cl = cl
2278     self.verbose = verbose
2279     self.jobs = []
2280     self.opts = opts
2281     self.feedback_fn = feedback_fn
2282
2283   def QueueJob(self, name, *ops):
2284     """Record a job for later submit.
2285
2286     @type name: string
2287     @param name: a description of the job, will be used in WaitJobSet
2288     """
2289     SetGenericOpcodeOpts(ops, self.opts)
2290     self.queue.append((name, ops))
2291
2292   def SubmitPending(self, each=False):
2293     """Submit all pending jobs.
2294
2295     """
2296     if each:
2297       results = []
2298       for row in self.queue:
2299         # SubmitJob will remove the success status, but raise an exception if
2300         # the submission fails, so we'll notice that anyway.
2301         results.append([True, self.cl.SubmitJob(row[1])])
2302     else:
2303       results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
2304     for (idx, ((status, data), (name, _))) in enumerate(zip(results,
2305                                                             self.queue)):
2306       self.jobs.append((idx, status, data, name))
2307
2308   def _ChooseJob(self):
2309     """Choose a non-waiting/queued job to poll next.
2310
2311     """
2312     assert self.jobs, "_ChooseJob called with empty job list"
2313
2314     result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
2315     assert result
2316
2317     for job_data, status in zip(self.jobs, result):
2318       if status[0] in (constants.JOB_STATUS_QUEUED,
2319                     constants.JOB_STATUS_WAITLOCK,
2320                     constants.JOB_STATUS_CANCELING):
2321         # job is still waiting
2322         continue
2323       # good candidate found
2324       self.jobs.remove(job_data)
2325       return job_data
2326
2327     # no job found
2328     return self.jobs.pop(0)
2329
2330   def GetResults(self):
2331     """Wait for and return the results of all jobs.
2332
2333     @rtype: list
2334     @return: list of tuples (success, job results), in the same order
2335         as the submitted jobs; if a job has failed, instead of the result
2336         there will be the error message
2337
2338     """
2339     if not self.jobs:
2340       self.SubmitPending()
2341     results = []
2342     if self.verbose:
2343       ok_jobs = [row[2] for row in self.jobs if row[1]]
2344       if ok_jobs:
2345         ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2346
2347     # first, remove any non-submitted jobs
2348     self.jobs, failures = compat.partition(self.jobs, lambda x: x[1])
2349     for idx, _, jid, name in failures:
2350       ToStderr("Failed to submit job for %s: %s", name, jid)
2351       results.append((idx, False, jid))
2352
2353     while self.jobs:
2354       (idx, _, jid, name) = self._ChooseJob()
2355       ToStdout("Waiting for job %s for %s...", jid, name)
2356       try:
2357         job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2358         success = True
2359       except (errors.GenericError, luxi.ProtocolError), err:
2360         _, job_result = FormatError(err)
2361         success = False
2362         # the error message will always be shown, verbose or not
2363         ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
2364
2365       results.append((idx, success, job_result))
2366
2367     # sort based on the index, then drop it
2368     results.sort()
2369     results = [i[1:] for i in results]
2370
2371     return results
2372
2373   def WaitOrShow(self, wait):
2374     """Wait for job results or only print the job IDs.
2375
2376     @type wait: boolean
2377     @param wait: whether to wait or not
2378
2379     """
2380     if wait:
2381       return self.GetResults()
2382     else:
2383       if not self.jobs:
2384         self.SubmitPending()
2385       for _, status, result, name in self.jobs:
2386         if status:
2387           ToStdout("%s: %s", result, name)
2388         else:
2389           ToStderr("Failure for %s: %s", name, result)
2390       return [row[1:3] for row in self.jobs]