jqueue: make replication on job update optional
[ganeti-local] / lib / cli.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module dealing with command line parsing"""
23
24
25 import sys
26 import textwrap
27 import os.path
28 import time
29 import logging
30 from cStringIO import StringIO
31
32 from ganeti import utils
33 from ganeti import errors
34 from ganeti import constants
35 from ganeti import opcodes
36 from ganeti import luxi
37 from ganeti import ssconf
38 from ganeti import rpc
39 from ganeti import ssh
40 from ganeti import compat
41
42 from optparse import (OptionParser, TitledHelpFormatter,
43                       Option, OptionValueError)
44
45
46 __all__ = [
47   # Command line options
48   "ADD_UIDS_OPT",
49   "ALLOCATABLE_OPT",
50   "ALL_OPT",
51   "AUTO_PROMOTE_OPT",
52   "AUTO_REPLACE_OPT",
53   "BACKEND_OPT",
54   "CLEANUP_OPT",
55   "CLUSTER_DOMAIN_SECRET_OPT",
56   "CONFIRM_OPT",
57   "CP_SIZE_OPT",
58   "DEBUG_OPT",
59   "DEBUG_SIMERR_OPT",
60   "DISKIDX_OPT",
61   "DISK_OPT",
62   "DISK_TEMPLATE_OPT",
63   "DRAINED_OPT",
64   "EARLY_RELEASE_OPT",
65   "ENABLED_HV_OPT",
66   "ERROR_CODES_OPT",
67   "FIELDS_OPT",
68   "FILESTORE_DIR_OPT",
69   "FILESTORE_DRIVER_OPT",
70   "FORCE_OPT",
71   "FORCE_VARIANT_OPT",
72   "GLOBAL_FILEDIR_OPT",
73   "HVLIST_OPT",
74   "HVOPTS_OPT",
75   "HYPERVISOR_OPT",
76   "IALLOCATOR_OPT",
77   "IDENTIFY_DEFAULTS_OPT",
78   "IGNORE_CONSIST_OPT",
79   "IGNORE_FAILURES_OPT",
80   "IGNORE_REMOVE_FAILURES_OPT",
81   "IGNORE_SECONDARIES_OPT",
82   "IGNORE_SIZE_OPT",
83   "MAC_PREFIX_OPT",
84   "MAINTAIN_NODE_HEALTH_OPT",
85   "MASTER_NETDEV_OPT",
86   "MC_OPT",
87   "NET_OPT",
88   "NEW_CLUSTER_CERT_OPT",
89   "NEW_CLUSTER_DOMAIN_SECRET_OPT",
90   "NEW_CONFD_HMAC_KEY_OPT",
91   "NEW_RAPI_CERT_OPT",
92   "NEW_SECONDARY_OPT",
93   "NIC_PARAMS_OPT",
94   "NODE_LIST_OPT",
95   "NODE_PLACEMENT_OPT",
96   "NOHDR_OPT",
97   "NOIPCHECK_OPT",
98   "NO_INSTALL_OPT",
99   "NONAMECHECK_OPT",
100   "NOLVM_STORAGE_OPT",
101   "NOMODIFY_ETCHOSTS_OPT",
102   "NOMODIFY_SSH_SETUP_OPT",
103   "NONICS_OPT",
104   "NONLIVE_OPT",
105   "NONPLUS1_OPT",
106   "NOSHUTDOWN_OPT",
107   "NOSTART_OPT",
108   "NOSSH_KEYCHECK_OPT",
109   "NOVOTING_OPT",
110   "NWSYNC_OPT",
111   "ON_PRIMARY_OPT",
112   "ON_SECONDARY_OPT",
113   "OFFLINE_OPT",
114   "OS_OPT",
115   "OS_SIZE_OPT",
116   "RAPI_CERT_OPT",
117   "READD_OPT",
118   "REBOOT_TYPE_OPT",
119   "REMOVE_INSTANCE_OPT",
120   "REMOVE_UIDS_OPT",
121   "ROMAN_OPT",
122   "SECONDARY_IP_OPT",
123   "SELECT_OS_OPT",
124   "SEP_OPT",
125   "SHOWCMD_OPT",
126   "SHUTDOWN_TIMEOUT_OPT",
127   "SINGLE_NODE_OPT",
128   "SRC_DIR_OPT",
129   "SRC_NODE_OPT",
130   "SUBMIT_OPT",
131   "STATIC_OPT",
132   "SYNC_OPT",
133   "TAG_SRC_OPT",
134   "TIMEOUT_OPT",
135   "UIDPOOL_OPT",
136   "USEUNITS_OPT",
137   "USE_REPL_NET_OPT",
138   "VERBOSE_OPT",
139   "VG_NAME_OPT",
140   "YES_DOIT_OPT",
141   # Generic functions for CLI programs
142   "GenericMain",
143   "GenericInstanceCreate",
144   "GetClient",
145   "GetOnlineNodes",
146   "JobExecutor",
147   "JobSubmittedException",
148   "ParseTimespec",
149   "RunWhileClusterStopped",
150   "SubmitOpCode",
151   "SubmitOrSend",
152   "UsesRPC",
153   # Formatting functions
154   "ToStderr", "ToStdout",
155   "FormatError",
156   "GenerateTable",
157   "AskUser",
158   "FormatTimestamp",
159   # Tags functions
160   "ListTags",
161   "AddTags",
162   "RemoveTags",
163   # command line options support infrastructure
164   "ARGS_MANY_INSTANCES",
165   "ARGS_MANY_NODES",
166   "ARGS_NONE",
167   "ARGS_ONE_INSTANCE",
168   "ARGS_ONE_NODE",
169   "ARGS_ONE_OS",
170   "ArgChoice",
171   "ArgCommand",
172   "ArgFile",
173   "ArgHost",
174   "ArgInstance",
175   "ArgJobId",
176   "ArgNode",
177   "ArgOs",
178   "ArgSuggest",
179   "ArgUnknown",
180   "OPT_COMPL_INST_ADD_NODES",
181   "OPT_COMPL_MANY_NODES",
182   "OPT_COMPL_ONE_IALLOCATOR",
183   "OPT_COMPL_ONE_INSTANCE",
184   "OPT_COMPL_ONE_NODE",
185   "OPT_COMPL_ONE_OS",
186   "cli_option",
187   "SplitNodeOption",
188   "CalculateOSNames",
189   ]
190
191 NO_PREFIX = "no_"
192 UN_PREFIX = "-"
193
194
195 class _Argument:
196   def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
197     self.min = min
198     self.max = max
199
200   def __repr__(self):
201     return ("<%s min=%s max=%s>" %
202             (self.__class__.__name__, self.min, self.max))
203
204
205 class ArgSuggest(_Argument):
206   """Suggesting argument.
207
208   Value can be any of the ones passed to the constructor.
209
210   """
211   # pylint: disable-msg=W0622
212   def __init__(self, min=0, max=None, choices=None):
213     _Argument.__init__(self, min=min, max=max)
214     self.choices = choices
215
216   def __repr__(self):
217     return ("<%s min=%s max=%s choices=%r>" %
218             (self.__class__.__name__, self.min, self.max, self.choices))
219
220
221 class ArgChoice(ArgSuggest):
222   """Choice argument.
223
224   Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
225   but value must be one of the choices.
226
227   """
228
229
230 class ArgUnknown(_Argument):
231   """Unknown argument to program (e.g. determined at runtime).
232
233   """
234
235
236 class ArgInstance(_Argument):
237   """Instances argument.
238
239   """
240
241
242 class ArgNode(_Argument):
243   """Node argument.
244
245   """
246
247 class ArgJobId(_Argument):
248   """Job ID argument.
249
250   """
251
252
253 class ArgFile(_Argument):
254   """File path argument.
255
256   """
257
258
259 class ArgCommand(_Argument):
260   """Command argument.
261
262   """
263
264
265 class ArgHost(_Argument):
266   """Host argument.
267
268   """
269
270
271 class ArgOs(_Argument):
272   """OS argument.
273
274   """
275
276
277 ARGS_NONE = []
278 ARGS_MANY_INSTANCES = [ArgInstance()]
279 ARGS_MANY_NODES = [ArgNode()]
280 ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
281 ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
282 ARGS_ONE_OS = [ArgOs(min=1, max=1)]
283
284
285 def _ExtractTagsObject(opts, args):
286   """Extract the tag type object.
287
288   Note that this function will modify its args parameter.
289
290   """
291   if not hasattr(opts, "tag_type"):
292     raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
293   kind = opts.tag_type
294   if kind == constants.TAG_CLUSTER:
295     retval = kind, kind
296   elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
297     if not args:
298       raise errors.OpPrereqError("no arguments passed to the command")
299     name = args.pop(0)
300     retval = kind, name
301   else:
302     raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
303   return retval
304
305
306 def _ExtendTags(opts, args):
307   """Extend the args if a source file has been given.
308
309   This function will extend the tags with the contents of the file
310   passed in the 'tags_source' attribute of the opts parameter. A file
311   named '-' will be replaced by stdin.
312
313   """
314   fname = opts.tags_source
315   if fname is None:
316     return
317   if fname == "-":
318     new_fh = sys.stdin
319   else:
320     new_fh = open(fname, "r")
321   new_data = []
322   try:
323     # we don't use the nice 'new_data = [line.strip() for line in fh]'
324     # because of python bug 1633941
325     while True:
326       line = new_fh.readline()
327       if not line:
328         break
329       new_data.append(line.strip())
330   finally:
331     new_fh.close()
332   args.extend(new_data)
333
334
335 def ListTags(opts, args):
336   """List the tags on a given object.
337
338   This is a generic implementation that knows how to deal with all
339   three cases of tag objects (cluster, node, instance). The opts
340   argument is expected to contain a tag_type field denoting what
341   object type we work on.
342
343   """
344   kind, name = _ExtractTagsObject(opts, args)
345   cl = GetClient()
346   result = cl.QueryTags(kind, name)
347   result = list(result)
348   result.sort()
349   for tag in result:
350     ToStdout(tag)
351
352
353 def AddTags(opts, args):
354   """Add tags on a given object.
355
356   This is a generic implementation that knows how to deal with all
357   three cases of tag objects (cluster, node, instance). The opts
358   argument is expected to contain a tag_type field denoting what
359   object type we work on.
360
361   """
362   kind, name = _ExtractTagsObject(opts, args)
363   _ExtendTags(opts, args)
364   if not args:
365     raise errors.OpPrereqError("No tags to be added")
366   op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
367   SubmitOpCode(op)
368
369
370 def RemoveTags(opts, args):
371   """Remove tags from a given object.
372
373   This is a generic implementation that knows how to deal with all
374   three cases of tag objects (cluster, node, instance). The opts
375   argument is expected to contain a tag_type field denoting what
376   object type we work on.
377
378   """
379   kind, name = _ExtractTagsObject(opts, args)
380   _ExtendTags(opts, args)
381   if not args:
382     raise errors.OpPrereqError("No tags to be removed")
383   op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
384   SubmitOpCode(op)
385
386
387 def check_unit(option, opt, value): # pylint: disable-msg=W0613
388   """OptParsers custom converter for units.
389
390   """
391   try:
392     return utils.ParseUnit(value)
393   except errors.UnitParseError, err:
394     raise OptionValueError("option %s: %s" % (opt, err))
395
396
397 def _SplitKeyVal(opt, data):
398   """Convert a KeyVal string into a dict.
399
400   This function will convert a key=val[,...] string into a dict. Empty
401   values will be converted specially: keys which have the prefix 'no_'
402   will have the value=False and the prefix stripped, the others will
403   have value=True.
404
405   @type opt: string
406   @param opt: a string holding the option name for which we process the
407       data, used in building error messages
408   @type data: string
409   @param data: a string of the format key=val,key=val,...
410   @rtype: dict
411   @return: {key=val, key=val}
412   @raises errors.ParameterError: if there are duplicate keys
413
414   """
415   kv_dict = {}
416   if data:
417     for elem in utils.UnescapeAndSplit(data, sep=","):
418       if "=" in elem:
419         key, val = elem.split("=", 1)
420       else:
421         if elem.startswith(NO_PREFIX):
422           key, val = elem[len(NO_PREFIX):], False
423         elif elem.startswith(UN_PREFIX):
424           key, val = elem[len(UN_PREFIX):], None
425         else:
426           key, val = elem, True
427       if key in kv_dict:
428         raise errors.ParameterError("Duplicate key '%s' in option %s" %
429                                     (key, opt))
430       kv_dict[key] = val
431   return kv_dict
432
433
434 def check_ident_key_val(option, opt, value):  # pylint: disable-msg=W0613
435   """Custom parser for ident:key=val,key=val options.
436
437   This will store the parsed values as a tuple (ident, {key: val}). As such,
438   multiple uses of this option via action=append is possible.
439
440   """
441   if ":" not in value:
442     ident, rest = value, ''
443   else:
444     ident, rest = value.split(":", 1)
445
446   if ident.startswith(NO_PREFIX):
447     if rest:
448       msg = "Cannot pass options when removing parameter groups: %s" % value
449       raise errors.ParameterError(msg)
450     retval = (ident[len(NO_PREFIX):], False)
451   elif ident.startswith(UN_PREFIX):
452     if rest:
453       msg = "Cannot pass options when removing parameter groups: %s" % value
454       raise errors.ParameterError(msg)
455     retval = (ident[len(UN_PREFIX):], None)
456   else:
457     kv_dict = _SplitKeyVal(opt, rest)
458     retval = (ident, kv_dict)
459   return retval
460
461
462 def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
463   """Custom parser class for key=val,key=val options.
464
465   This will store the parsed values as a dict {key: val}.
466
467   """
468   return _SplitKeyVal(opt, value)
469
470
471 def check_bool(option, opt, value): # pylint: disable-msg=W0613
472   """Custom parser for yes/no options.
473
474   This will store the parsed value as either True or False.
475
476   """
477   value = value.lower()
478   if value == constants.VALUE_FALSE or value == "no":
479     return False
480   elif value == constants.VALUE_TRUE or value == "yes":
481     return True
482   else:
483     raise errors.ParameterError("Invalid boolean value '%s'" % value)
484
485
486 # completion_suggestion is normally a list. Using numeric values not evaluating
487 # to False for dynamic completion.
488 (OPT_COMPL_MANY_NODES,
489  OPT_COMPL_ONE_NODE,
490  OPT_COMPL_ONE_INSTANCE,
491  OPT_COMPL_ONE_OS,
492  OPT_COMPL_ONE_IALLOCATOR,
493  OPT_COMPL_INST_ADD_NODES) = range(100, 106)
494
495 OPT_COMPL_ALL = frozenset([
496   OPT_COMPL_MANY_NODES,
497   OPT_COMPL_ONE_NODE,
498   OPT_COMPL_ONE_INSTANCE,
499   OPT_COMPL_ONE_OS,
500   OPT_COMPL_ONE_IALLOCATOR,
501   OPT_COMPL_INST_ADD_NODES,
502   ])
503
504
505 class CliOption(Option):
506   """Custom option class for optparse.
507
508   """
509   ATTRS = Option.ATTRS + [
510     "completion_suggest",
511     ]
512   TYPES = Option.TYPES + (
513     "identkeyval",
514     "keyval",
515     "unit",
516     "bool",
517     )
518   TYPE_CHECKER = Option.TYPE_CHECKER.copy()
519   TYPE_CHECKER["identkeyval"] = check_ident_key_val
520   TYPE_CHECKER["keyval"] = check_key_val
521   TYPE_CHECKER["unit"] = check_unit
522   TYPE_CHECKER["bool"] = check_bool
523
524
525 # optparse.py sets make_option, so we do it for our own option class, too
526 cli_option = CliOption
527
528
529 _YORNO = "yes|no"
530
531 DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
532                        help="Increase debugging level")
533
534 NOHDR_OPT = cli_option("--no-headers", default=False,
535                        action="store_true", dest="no_headers",
536                        help="Don't display column headers")
537
538 SEP_OPT = cli_option("--separator", default=None,
539                      action="store", dest="separator",
540                      help=("Separator between output fields"
541                            " (defaults to one space)"))
542
543 USEUNITS_OPT = cli_option("--units", default=None,
544                           dest="units", choices=('h', 'm', 'g', 't'),
545                           help="Specify units for output (one of hmgt)")
546
547 FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
548                         type="string", metavar="FIELDS",
549                         help="Comma separated list of output fields")
550
551 FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
552                        default=False, help="Force the operation")
553
554 CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
555                          default=False, help="Do not require confirmation")
556
557 TAG_SRC_OPT = cli_option("--from", dest="tags_source",
558                          default=None, help="File with tag names")
559
560 SUBMIT_OPT = cli_option("--submit", dest="submit_only",
561                         default=False, action="store_true",
562                         help=("Submit the job and return the job ID, but"
563                               " don't wait for the job to finish"))
564
565 SYNC_OPT = cli_option("--sync", dest="do_locking",
566                       default=False, action="store_true",
567                       help=("Grab locks while doing the queries"
568                             " in order to ensure more consistent results"))
569
570 _DRY_RUN_OPT = cli_option("--dry-run", default=False,
571                           action="store_true",
572                           help=("Do not execute the operation, just run the"
573                                 " check steps and verify it it could be"
574                                 " executed"))
575
576 VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
577                          action="store_true",
578                          help="Increase the verbosity of the operation")
579
580 DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
581                               action="store_true", dest="simulate_errors",
582                               help="Debugging option that makes the operation"
583                               " treat most runtime checks as failed")
584
585 NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
586                         default=True, action="store_false",
587                         help="Don't wait for sync (DANGEROUS!)")
588
589 DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
590                                help="Custom disk setup (diskless, file,"
591                                " plain or drbd)",
592                                default=None, metavar="TEMPL",
593                                choices=list(constants.DISK_TEMPLATES))
594
595 NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
596                         help="Do not create any network cards for"
597                         " the instance")
598
599 FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
600                                help="Relative path under default cluster-wide"
601                                " file storage dir to store file-based disks",
602                                default=None, metavar="<DIR>")
603
604 FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
605                                   help="Driver to use for image files",
606                                   default="loop", metavar="<DRIVER>",
607                                   choices=list(constants.FILE_DRIVER))
608
609 IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
610                             help="Select nodes for the instance automatically"
611                             " using the <NAME> iallocator plugin",
612                             default=None, type="string",
613                             completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
614
615 OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
616                     metavar="<os>",
617                     completion_suggest=OPT_COMPL_ONE_OS)
618
619 FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
620                                action="store_true", default=False,
621                                help="Force an unknown variant")
622
623 NO_INSTALL_OPT = cli_option("--no-install", dest="no_install",
624                             action="store_true", default=False,
625                             help="Do not install the OS (will"
626                             " enable no-start)")
627
628 BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
629                          type="keyval", default={},
630                          help="Backend parameters")
631
632 HVOPTS_OPT =  cli_option("-H", "--hypervisor-parameters", type="keyval",
633                          default={}, dest="hvparams",
634                          help="Hypervisor parameters")
635
636 HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
637                             help="Hypervisor and hypervisor options, in the"
638                             " format hypervisor:option=value,option=value,...",
639                             default=None, type="identkeyval")
640
641 HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
642                         help="Hypervisor and hypervisor options, in the"
643                         " format hypervisor:option=value,option=value,...",
644                         default=[], action="append", type="identkeyval")
645
646 NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
647                            action="store_false",
648                            help="Don't check that the instance's IP"
649                            " is alive")
650
651 NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
652                              default=True, action="store_false",
653                              help="Don't check that the instance's name"
654                              " is resolvable")
655
656 NET_OPT = cli_option("--net",
657                      help="NIC parameters", default=[],
658                      dest="nics", action="append", type="identkeyval")
659
660 DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
661                       dest="disks", action="append", type="identkeyval")
662
663 DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
664                          help="Comma-separated list of disks"
665                          " indices to act on (e.g. 0,2) (optional,"
666                          " defaults to all disks)")
667
668 OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
669                          help="Enforces a single-disk configuration using the"
670                          " given disk size, in MiB unless a suffix is used",
671                          default=None, type="unit", metavar="<size>")
672
673 IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
674                                 dest="ignore_consistency",
675                                 action="store_true", default=False,
676                                 help="Ignore the consistency of the disks on"
677                                 " the secondary")
678
679 NONLIVE_OPT = cli_option("--non-live", dest="live",
680                          default=True, action="store_false",
681                          help="Do a non-live migration (this usually means"
682                          " freeze the instance, save the state, transfer and"
683                          " only then resume running on the secondary node)")
684
685 NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
686                                 help="Target node and optional secondary node",
687                                 metavar="<pnode>[:<snode>]",
688                                 completion_suggest=OPT_COMPL_INST_ADD_NODES)
689
690 NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
691                            action="append", metavar="<node>",
692                            help="Use only this node (can be used multiple"
693                            " times, if not given defaults to all nodes)",
694                            completion_suggest=OPT_COMPL_ONE_NODE)
695
696 SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
697                              metavar="<node>",
698                              completion_suggest=OPT_COMPL_ONE_NODE)
699
700 NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
701                          action="store_false",
702                          help="Don't start the instance after creation")
703
704 SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
705                          action="store_true", default=False,
706                          help="Show command instead of executing it")
707
708 CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
709                          default=False, action="store_true",
710                          help="Instead of performing the migration, try to"
711                          " recover from a failed cleanup. This is safe"
712                          " to run even if the instance is healthy, but it"
713                          " will create extra replication traffic and "
714                          " disrupt briefly the replication (like during the"
715                          " migration")
716
717 STATIC_OPT = cli_option("-s", "--static", dest="static",
718                         action="store_true", default=False,
719                         help="Only show configuration data, not runtime data")
720
721 ALL_OPT = cli_option("--all", dest="show_all",
722                      default=False, action="store_true",
723                      help="Show info on all instances on the cluster."
724                      " This can take a long time to run, use wisely")
725
726 SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
727                            action="store_true", default=False,
728                            help="Interactive OS reinstall, lists available"
729                            " OS templates for selection")
730
731 IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
732                                  action="store_true", default=False,
733                                  help="Remove the instance from the cluster"
734                                  " configuration even if there are failures"
735                                  " during the removal process")
736
737 IGNORE_REMOVE_FAILURES_OPT = cli_option("--ignore-remove-failures",
738                                         dest="ignore_remove_failures",
739                                         action="store_true", default=False,
740                                         help="Remove the instance from the"
741                                         " cluster configuration even if there"
742                                         " are failures during the removal"
743                                         " process")
744
745 REMOVE_INSTANCE_OPT = cli_option("--remove-instance", dest="remove_instance",
746                                  action="store_true", default=False,
747                                  help="Remove the instance from the cluster")
748
749 NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
750                                help="Specifies the new secondary node",
751                                metavar="NODE", default=None,
752                                completion_suggest=OPT_COMPL_ONE_NODE)
753
754 ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
755                             default=False, action="store_true",
756                             help="Replace the disk(s) on the primary"
757                             " node (only for the drbd template)")
758
759 ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
760                               default=False, action="store_true",
761                               help="Replace the disk(s) on the secondary"
762                               " node (only for the drbd template)")
763
764 AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
765                               default=False, action="store_true",
766                               help="Lock all nodes and auto-promote as needed"
767                               " to MC status")
768
769 AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
770                               default=False, action="store_true",
771                               help="Automatically replace faulty disks"
772                               " (only for the drbd template)")
773
774 IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
775                              default=False, action="store_true",
776                              help="Ignore current recorded size"
777                              " (useful for forcing activation when"
778                              " the recorded size is wrong)")
779
780 SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
781                           metavar="<node>",
782                           completion_suggest=OPT_COMPL_ONE_NODE)
783
784 SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
785                          metavar="<dir>")
786
787 SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
788                               help="Specify the secondary ip for the node",
789                               metavar="ADDRESS", default=None)
790
791 READD_OPT = cli_option("--readd", dest="readd",
792                        default=False, action="store_true",
793                        help="Readd old node after replacing it")
794
795 NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
796                                 default=True, action="store_false",
797                                 help="Disable SSH key fingerprint checking")
798
799
800 MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
801                     type="bool", default=None, metavar=_YORNO,
802                     help="Set the master_candidate flag on the node")
803
804 OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
805                          type="bool", default=None,
806                          help="Set the offline flag on the node")
807
808 DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
809                          type="bool", default=None,
810                          help="Set the drained flag on the node")
811
812 ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
813                              type="bool", default=None, metavar=_YORNO,
814                              help="Set the allocatable flag on a volume")
815
816 NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
817                                help="Disable support for lvm based instances"
818                                " (cluster-wide)",
819                                action="store_false", default=True)
820
821 ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
822                             dest="enabled_hypervisors",
823                             help="Comma-separated list of hypervisors",
824                             type="string", default=None)
825
826 NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
827                             type="keyval", default={},
828                             help="NIC parameters")
829
830 CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
831                          dest="candidate_pool_size", type="int",
832                          help="Set the candidate pool size")
833
834 VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name",
835                          help="Enables LVM and specifies the volume group"
836                          " name (cluster-wide) for disk allocation [xenvg]",
837                          metavar="VG", default=None)
838
839 YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
840                           help="Destroy cluster", action="store_true")
841
842 NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
843                           help="Skip node agreement check (dangerous)",
844                           action="store_true", default=False)
845
846 MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
847                             help="Specify the mac prefix for the instance IP"
848                             " addresses, in the format XX:XX:XX",
849                             metavar="PREFIX",
850                             default=None)
851
852 MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
853                                help="Specify the node interface (cluster-wide)"
854                                " on which the master IP address will be added "
855                                " [%s]" % constants.DEFAULT_BRIDGE,
856                                metavar="NETDEV",
857                                default=constants.DEFAULT_BRIDGE)
858
859 GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
860                                 help="Specify the default directory (cluster-"
861                                 "wide) for storing the file-based disks [%s]" %
862                                 constants.DEFAULT_FILE_STORAGE_DIR,
863                                 metavar="DIR",
864                                 default=constants.DEFAULT_FILE_STORAGE_DIR)
865
866 NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
867                                    help="Don't modify /etc/hosts",
868                                    action="store_false", default=True)
869
870 NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
871                                     help="Don't initialize SSH keys",
872                                     action="store_false", default=True)
873
874 ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
875                              help="Enable parseable error messages",
876                              action="store_true", default=False)
877
878 NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
879                           help="Skip N+1 memory redundancy tests",
880                           action="store_true", default=False)
881
882 REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
883                              help="Type of reboot: soft/hard/full",
884                              default=constants.INSTANCE_REBOOT_HARD,
885                              metavar="<REBOOT>",
886                              choices=list(constants.REBOOT_TYPES))
887
888 IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
889                                     dest="ignore_secondaries",
890                                     default=False, action="store_true",
891                                     help="Ignore errors from secondaries")
892
893 NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
894                             action="store_false", default=True,
895                             help="Don't shutdown the instance (unsafe)")
896
897 TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
898                          default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
899                          help="Maximum time to wait")
900
901 SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
902                          dest="shutdown_timeout", type="int",
903                          default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
904                          help="Maximum time to wait for instance shutdown")
905
906 EARLY_RELEASE_OPT = cli_option("--early-release",
907                                dest="early_release", default=False,
908                                action="store_true",
909                                help="Release the locks on the secondary"
910                                " node(s) early")
911
912 NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
913                                   dest="new_cluster_cert",
914                                   default=False, action="store_true",
915                                   help="Generate a new cluster certificate")
916
917 RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
918                            default=None,
919                            help="File containing new RAPI certificate")
920
921 NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
922                                default=None, action="store_true",
923                                help=("Generate a new self-signed RAPI"
924                                      " certificate"))
925
926 NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
927                                     dest="new_confd_hmac_key",
928                                     default=False, action="store_true",
929                                     help=("Create a new HMAC key for %s" %
930                                           constants.CONFD))
931
932 CLUSTER_DOMAIN_SECRET_OPT = cli_option("--cluster-domain-secret",
933                                        dest="cluster_domain_secret",
934                                        default=None,
935                                        help=("Load new new cluster domain"
936                                              " secret from file"))
937
938 NEW_CLUSTER_DOMAIN_SECRET_OPT = cli_option("--new-cluster-domain-secret",
939                                            dest="new_cluster_domain_secret",
940                                            default=False, action="store_true",
941                                            help=("Create a new cluster domain"
942                                                  " secret"))
943
944 USE_REPL_NET_OPT = cli_option("--use-replication-network",
945                               dest="use_replication_network",
946                               help="Whether to use the replication network"
947                               " for talking to the nodes",
948                               action="store_true", default=False)
949
950 MAINTAIN_NODE_HEALTH_OPT = \
951     cli_option("--maintain-node-health", dest="maintain_node_health",
952                metavar=_YORNO, default=None, type="bool",
953                help="Configure the cluster to automatically maintain node"
954                " health, by shutting down unknown instances, shutting down"
955                " unknown DRBD devices, etc.")
956
957 IDENTIFY_DEFAULTS_OPT = \
958     cli_option("--identify-defaults", dest="identify_defaults",
959                default=False, action="store_true",
960                help="Identify which saved instance parameters are equal to"
961                " the current cluster defaults and set them as such, instead"
962                " of marking them as overridden")
963
964 UIDPOOL_OPT = cli_option("--uid-pool", default=None,
965                          action="store", dest="uid_pool",
966                          help=("A list of user-ids or user-id"
967                                " ranges separated by commas"))
968
969 ADD_UIDS_OPT = cli_option("--add-uids", default=None,
970                           action="store", dest="add_uids",
971                           help=("A list of user-ids or user-id"
972                                 " ranges separated by commas, to be"
973                                 " added to the user-id pool"))
974
975 REMOVE_UIDS_OPT = cli_option("--remove-uids", default=None,
976                              action="store", dest="remove_uids",
977                              help=("A list of user-ids or user-id"
978                                    " ranges separated by commas, to be"
979                                    " removed from the user-id pool"))
980
981 ROMAN_OPT = cli_option("--roman",
982                        dest="roman_integers", default=False,
983                        action="store_true",
984                        help="Use roman numbers for positive integers")
985
986
987
988 def _ParseArgs(argv, commands, aliases):
989   """Parser for the command line arguments.
990
991   This function parses the arguments and returns the function which
992   must be executed together with its (modified) arguments.
993
994   @param argv: the command line
995   @param commands: dictionary with special contents, see the design
996       doc for cmdline handling
997   @param aliases: dictionary with command aliases {'alias': 'target, ...}
998
999   """
1000   if len(argv) == 0:
1001     binary = "<command>"
1002   else:
1003     binary = argv[0].split("/")[-1]
1004
1005   if len(argv) > 1 and argv[1] == "--version":
1006     ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
1007     # Quit right away. That way we don't have to care about this special
1008     # argument. optparse.py does it the same.
1009     sys.exit(0)
1010
1011   if len(argv) < 2 or not (argv[1] in commands or
1012                            argv[1] in aliases):
1013     # let's do a nice thing
1014     sortedcmds = commands.keys()
1015     sortedcmds.sort()
1016
1017     ToStdout("Usage: %s {command} [options...] [argument...]", binary)
1018     ToStdout("%s <command> --help to see details, or man %s", binary, binary)
1019     ToStdout("")
1020
1021     # compute the max line length for cmd + usage
1022     mlen = max([len(" %s" % cmd) for cmd in commands])
1023     mlen = min(60, mlen) # should not get here...
1024
1025     # and format a nice command list
1026     ToStdout("Commands:")
1027     for cmd in sortedcmds:
1028       cmdstr = " %s" % (cmd,)
1029       help_text = commands[cmd][4]
1030       help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
1031       ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
1032       for line in help_lines:
1033         ToStdout("%-*s   %s", mlen, "", line)
1034
1035     ToStdout("")
1036
1037     return None, None, None
1038
1039   # get command, unalias it, and look it up in commands
1040   cmd = argv.pop(1)
1041   if cmd in aliases:
1042     if cmd in commands:
1043       raise errors.ProgrammerError("Alias '%s' overrides an existing"
1044                                    " command" % cmd)
1045
1046     if aliases[cmd] not in commands:
1047       raise errors.ProgrammerError("Alias '%s' maps to non-existing"
1048                                    " command '%s'" % (cmd, aliases[cmd]))
1049
1050     cmd = aliases[cmd]
1051
1052   func, args_def, parser_opts, usage, description = commands[cmd]
1053   parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT, DEBUG_OPT],
1054                         description=description,
1055                         formatter=TitledHelpFormatter(),
1056                         usage="%%prog %s %s" % (cmd, usage))
1057   parser.disable_interspersed_args()
1058   options, args = parser.parse_args()
1059
1060   if not _CheckArguments(cmd, args_def, args):
1061     return None, None, None
1062
1063   return func, options, args
1064
1065
1066 def _CheckArguments(cmd, args_def, args):
1067   """Verifies the arguments using the argument definition.
1068
1069   Algorithm:
1070
1071     1. Abort with error if values specified by user but none expected.
1072
1073     1. For each argument in definition
1074
1075       1. Keep running count of minimum number of values (min_count)
1076       1. Keep running count of maximum number of values (max_count)
1077       1. If it has an unlimited number of values
1078
1079         1. Abort with error if it's not the last argument in the definition
1080
1081     1. If last argument has limited number of values
1082
1083       1. Abort with error if number of values doesn't match or is too large
1084
1085     1. Abort with error if user didn't pass enough values (min_count)
1086
1087   """
1088   if args and not args_def:
1089     ToStderr("Error: Command %s expects no arguments", cmd)
1090     return False
1091
1092   min_count = None
1093   max_count = None
1094   check_max = None
1095
1096   last_idx = len(args_def) - 1
1097
1098   for idx, arg in enumerate(args_def):
1099     if min_count is None:
1100       min_count = arg.min
1101     elif arg.min is not None:
1102       min_count += arg.min
1103
1104     if max_count is None:
1105       max_count = arg.max
1106     elif arg.max is not None:
1107       max_count += arg.max
1108
1109     if idx == last_idx:
1110       check_max = (arg.max is not None)
1111
1112     elif arg.max is None:
1113       raise errors.ProgrammerError("Only the last argument can have max=None")
1114
1115   if check_max:
1116     # Command with exact number of arguments
1117     if (min_count is not None and max_count is not None and
1118         min_count == max_count and len(args) != min_count):
1119       ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
1120       return False
1121
1122     # Command with limited number of arguments
1123     if max_count is not None and len(args) > max_count:
1124       ToStderr("Error: Command %s expects only %d argument(s)",
1125                cmd, max_count)
1126       return False
1127
1128   # Command with some required arguments
1129   if min_count is not None and len(args) < min_count:
1130     ToStderr("Error: Command %s expects at least %d argument(s)",
1131              cmd, min_count)
1132     return False
1133
1134   return True
1135
1136
1137 def SplitNodeOption(value):
1138   """Splits the value of a --node option.
1139
1140   """
1141   if value and ':' in value:
1142     return value.split(':', 1)
1143   else:
1144     return (value, None)
1145
1146
1147 def CalculateOSNames(os_name, os_variants):
1148   """Calculates all the names an OS can be called, according to its variants.
1149
1150   @type os_name: string
1151   @param os_name: base name of the os
1152   @type os_variants: list or None
1153   @param os_variants: list of supported variants
1154   @rtype: list
1155   @return: list of valid names
1156
1157   """
1158   if os_variants:
1159     return ['%s+%s' % (os_name, v) for v in os_variants]
1160   else:
1161     return [os_name]
1162
1163
1164 def UsesRPC(fn):
1165   def wrapper(*args, **kwargs):
1166     rpc.Init()
1167     try:
1168       return fn(*args, **kwargs)
1169     finally:
1170       rpc.Shutdown()
1171   return wrapper
1172
1173
1174 def AskUser(text, choices=None):
1175   """Ask the user a question.
1176
1177   @param text: the question to ask
1178
1179   @param choices: list with elements tuples (input_char, return_value,
1180       description); if not given, it will default to: [('y', True,
1181       'Perform the operation'), ('n', False, 'Do no do the operation')];
1182       note that the '?' char is reserved for help
1183
1184   @return: one of the return values from the choices list; if input is
1185       not possible (i.e. not running with a tty, we return the last
1186       entry from the list
1187
1188   """
1189   if choices is None:
1190     choices = [('y', True, 'Perform the operation'),
1191                ('n', False, 'Do not perform the operation')]
1192   if not choices or not isinstance(choices, list):
1193     raise errors.ProgrammerError("Invalid choices argument to AskUser")
1194   for entry in choices:
1195     if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
1196       raise errors.ProgrammerError("Invalid choices element to AskUser")
1197
1198   answer = choices[-1][1]
1199   new_text = []
1200   for line in text.splitlines():
1201     new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
1202   text = "\n".join(new_text)
1203   try:
1204     f = file("/dev/tty", "a+")
1205   except IOError:
1206     return answer
1207   try:
1208     chars = [entry[0] for entry in choices]
1209     chars[-1] = "[%s]" % chars[-1]
1210     chars.append('?')
1211     maps = dict([(entry[0], entry[1]) for entry in choices])
1212     while True:
1213       f.write(text)
1214       f.write('\n')
1215       f.write("/".join(chars))
1216       f.write(": ")
1217       line = f.readline(2).strip().lower()
1218       if line in maps:
1219         answer = maps[line]
1220         break
1221       elif line == '?':
1222         for entry in choices:
1223           f.write(" %s - %s\n" % (entry[0], entry[2]))
1224         f.write("\n")
1225         continue
1226   finally:
1227     f.close()
1228   return answer
1229
1230
1231 class JobSubmittedException(Exception):
1232   """Job was submitted, client should exit.
1233
1234   This exception has one argument, the ID of the job that was
1235   submitted. The handler should print this ID.
1236
1237   This is not an error, just a structured way to exit from clients.
1238
1239   """
1240
1241
1242 def SendJob(ops, cl=None):
1243   """Function to submit an opcode without waiting for the results.
1244
1245   @type ops: list
1246   @param ops: list of opcodes
1247   @type cl: luxi.Client
1248   @param cl: the luxi client to use for communicating with the master;
1249              if None, a new client will be created
1250
1251   """
1252   if cl is None:
1253     cl = GetClient()
1254
1255   job_id = cl.SubmitJob(ops)
1256
1257   return job_id
1258
1259
1260 def GenericPollJob(job_id, cbs, report_cbs):
1261   """Generic job-polling function.
1262
1263   @type job_id: number
1264   @param job_id: Job ID
1265   @type cbs: Instance of L{JobPollCbBase}
1266   @param cbs: Data callbacks
1267   @type report_cbs: Instance of L{JobPollReportCbBase}
1268   @param report_cbs: Reporting callbacks
1269
1270   """
1271   prev_job_info = None
1272   prev_logmsg_serial = None
1273
1274   status = None
1275
1276   while True:
1277     result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1278                                       prev_logmsg_serial)
1279     if not result:
1280       # job not found, go away!
1281       raise errors.JobLost("Job with id %s lost" % job_id)
1282
1283     if result == constants.JOB_NOTCHANGED:
1284       report_cbs.ReportNotChanged(job_id, status)
1285
1286       # Wait again
1287       continue
1288
1289     # Split result, a tuple of (field values, log entries)
1290     (job_info, log_entries) = result
1291     (status, ) = job_info
1292
1293     if log_entries:
1294       for log_entry in log_entries:
1295         (serial, timestamp, log_type, message) = log_entry
1296         report_cbs.ReportLogMessage(job_id, serial, timestamp,
1297                                     log_type, message)
1298         prev_logmsg_serial = max(prev_logmsg_serial, serial)
1299
1300     # TODO: Handle canceled and archived jobs
1301     elif status in (constants.JOB_STATUS_SUCCESS,
1302                     constants.JOB_STATUS_ERROR,
1303                     constants.JOB_STATUS_CANCELING,
1304                     constants.JOB_STATUS_CANCELED):
1305       break
1306
1307     prev_job_info = job_info
1308
1309   jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1310   if not jobs:
1311     raise errors.JobLost("Job with id %s lost" % job_id)
1312
1313   status, opstatus, result = jobs[0]
1314
1315   if status == constants.JOB_STATUS_SUCCESS:
1316     return result
1317
1318   if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
1319     raise errors.OpExecError("Job was canceled")
1320
1321   has_ok = False
1322   for idx, (status, msg) in enumerate(zip(opstatus, result)):
1323     if status == constants.OP_STATUS_SUCCESS:
1324       has_ok = True
1325     elif status == constants.OP_STATUS_ERROR:
1326       errors.MaybeRaise(msg)
1327
1328       if has_ok:
1329         raise errors.OpExecError("partial failure (opcode %d): %s" %
1330                                  (idx, msg))
1331
1332       raise errors.OpExecError(str(msg))
1333
1334   # default failure mode
1335   raise errors.OpExecError(result)
1336
1337
1338 class JobPollCbBase:
1339   """Base class for L{GenericPollJob} callbacks.
1340
1341   """
1342   def __init__(self):
1343     """Initializes this class.
1344
1345     """
1346
1347   def WaitForJobChangeOnce(self, job_id, fields,
1348                            prev_job_info, prev_log_serial):
1349     """Waits for changes on a job.
1350
1351     """
1352     raise NotImplementedError()
1353
1354   def QueryJobs(self, job_ids, fields):
1355     """Returns the selected fields for the selected job IDs.
1356
1357     @type job_ids: list of numbers
1358     @param job_ids: Job IDs
1359     @type fields: list of strings
1360     @param fields: Fields
1361
1362     """
1363     raise NotImplementedError()
1364
1365
1366 class JobPollReportCbBase:
1367   """Base class for L{GenericPollJob} reporting callbacks.
1368
1369   """
1370   def __init__(self):
1371     """Initializes this class.
1372
1373     """
1374
1375   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1376     """Handles a log message.
1377
1378     """
1379     raise NotImplementedError()
1380
1381   def ReportNotChanged(self, job_id, status):
1382     """Called for if a job hasn't changed in a while.
1383
1384     @type job_id: number
1385     @param job_id: Job ID
1386     @type status: string or None
1387     @param status: Job status if available
1388
1389     """
1390     raise NotImplementedError()
1391
1392
1393 class _LuxiJobPollCb(JobPollCbBase):
1394   def __init__(self, cl):
1395     """Initializes this class.
1396
1397     """
1398     JobPollCbBase.__init__(self)
1399     self.cl = cl
1400
1401   def WaitForJobChangeOnce(self, job_id, fields,
1402                            prev_job_info, prev_log_serial):
1403     """Waits for changes on a job.
1404
1405     """
1406     return self.cl.WaitForJobChangeOnce(job_id, fields,
1407                                         prev_job_info, prev_log_serial)
1408
1409   def QueryJobs(self, job_ids, fields):
1410     """Returns the selected fields for the selected job IDs.
1411
1412     """
1413     return self.cl.QueryJobs(job_ids, fields)
1414
1415
1416 class FeedbackFnJobPollReportCb(JobPollReportCbBase):
1417   def __init__(self, feedback_fn):
1418     """Initializes this class.
1419
1420     """
1421     JobPollReportCbBase.__init__(self)
1422
1423     self.feedback_fn = feedback_fn
1424
1425     assert callable(feedback_fn)
1426
1427   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1428     """Handles a log message.
1429
1430     """
1431     self.feedback_fn((timestamp, log_type, log_msg))
1432
1433   def ReportNotChanged(self, job_id, status):
1434     """Called if a job hasn't changed in a while.
1435
1436     """
1437     # Ignore
1438
1439
1440 class StdioJobPollReportCb(JobPollReportCbBase):
1441   def __init__(self):
1442     """Initializes this class.
1443
1444     """
1445     JobPollReportCbBase.__init__(self)
1446
1447     self.notified_queued = False
1448     self.notified_waitlock = False
1449
1450   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1451     """Handles a log message.
1452
1453     """
1454     ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
1455              utils.SafeEncode(log_msg))
1456
1457   def ReportNotChanged(self, job_id, status):
1458     """Called if a job hasn't changed in a while.
1459
1460     """
1461     if status is None:
1462       return
1463
1464     if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
1465       ToStderr("Job %s is waiting in queue", job_id)
1466       self.notified_queued = True
1467
1468     elif status == constants.JOB_STATUS_WAITLOCK and not self.notified_waitlock:
1469       ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1470       self.notified_waitlock = True
1471
1472
1473 def PollJob(job_id, cl=None, feedback_fn=None):
1474   """Function to poll for the result of a job.
1475
1476   @type job_id: job identified
1477   @param job_id: the job to poll for results
1478   @type cl: luxi.Client
1479   @param cl: the luxi client to use for communicating with the master;
1480              if None, a new client will be created
1481
1482   """
1483   if cl is None:
1484     cl = GetClient()
1485
1486   if feedback_fn:
1487     reporter = FeedbackFnJobPollReportCb(feedback_fn)
1488   else:
1489     reporter = StdioJobPollReportCb()
1490
1491   return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
1492
1493
1494 def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):
1495   """Legacy function to submit an opcode.
1496
1497   This is just a simple wrapper over the construction of the processor
1498   instance. It should be extended to better handle feedback and
1499   interaction functions.
1500
1501   """
1502   if cl is None:
1503     cl = GetClient()
1504
1505   SetGenericOpcodeOpts([op], opts)
1506
1507   job_id = SendJob([op], cl)
1508
1509   op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
1510
1511   return op_results[0]
1512
1513
1514 def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1515   """Wrapper around SubmitOpCode or SendJob.
1516
1517   This function will decide, based on the 'opts' parameter, whether to
1518   submit and wait for the result of the opcode (and return it), or
1519   whether to just send the job and print its identifier. It is used in
1520   order to simplify the implementation of the '--submit' option.
1521
1522   It will also process the opcodes if we're sending the via SendJob
1523   (otherwise SubmitOpCode does it).
1524
1525   """
1526   if opts and opts.submit_only:
1527     job = [op]
1528     SetGenericOpcodeOpts(job, opts)
1529     job_id = SendJob(job, cl=cl)
1530     raise JobSubmittedException(job_id)
1531   else:
1532     return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1533
1534
1535 def SetGenericOpcodeOpts(opcode_list, options):
1536   """Processor for generic options.
1537
1538   This function updates the given opcodes based on generic command
1539   line options (like debug, dry-run, etc.).
1540
1541   @param opcode_list: list of opcodes
1542   @param options: command line options or None
1543   @return: None (in-place modification)
1544
1545   """
1546   if not options:
1547     return
1548   for op in opcode_list:
1549     op.dry_run = options.dry_run
1550     op.debug_level = options.debug
1551
1552
1553 def GetClient():
1554   # TODO: Cache object?
1555   try:
1556     client = luxi.Client()
1557   except luxi.NoMasterError:
1558     ss = ssconf.SimpleStore()
1559
1560     # Try to read ssconf file
1561     try:
1562       ss.GetMasterNode()
1563     except errors.ConfigurationError:
1564       raise errors.OpPrereqError("Cluster not initialized or this machine is"
1565                                  " not part of a cluster")
1566
1567     master, myself = ssconf.GetMasterAndMyself(ss=ss)
1568     if master != myself:
1569       raise errors.OpPrereqError("This is not the master node, please connect"
1570                                  " to node '%s' and rerun the command" %
1571                                  master)
1572     raise
1573   return client
1574
1575
1576 def FormatError(err):
1577   """Return a formatted error message for a given error.
1578
1579   This function takes an exception instance and returns a tuple
1580   consisting of two values: first, the recommended exit code, and
1581   second, a string describing the error message (not
1582   newline-terminated).
1583
1584   """
1585   retcode = 1
1586   obuf = StringIO()
1587   msg = str(err)
1588   if isinstance(err, errors.ConfigurationError):
1589     txt = "Corrupt configuration file: %s" % msg
1590     logging.error(txt)
1591     obuf.write(txt + "\n")
1592     obuf.write("Aborting.")
1593     retcode = 2
1594   elif isinstance(err, errors.HooksAbort):
1595     obuf.write("Failure: hooks execution failed:\n")
1596     for node, script, out in err.args[0]:
1597       if out:
1598         obuf.write("  node: %s, script: %s, output: %s\n" %
1599                    (node, script, out))
1600       else:
1601         obuf.write("  node: %s, script: %s (no output)\n" %
1602                    (node, script))
1603   elif isinstance(err, errors.HooksFailure):
1604     obuf.write("Failure: hooks general failure: %s" % msg)
1605   elif isinstance(err, errors.ResolverError):
1606     this_host = utils.HostInfo.SysName()
1607     if err.args[0] == this_host:
1608       msg = "Failure: can't resolve my own hostname ('%s')"
1609     else:
1610       msg = "Failure: can't resolve hostname '%s'"
1611     obuf.write(msg % err.args[0])
1612   elif isinstance(err, errors.OpPrereqError):
1613     if len(err.args) == 2:
1614       obuf.write("Failure: prerequisites not met for this"
1615                " operation:\nerror type: %s, error details:\n%s" %
1616                  (err.args[1], err.args[0]))
1617     else:
1618       obuf.write("Failure: prerequisites not met for this"
1619                  " operation:\n%s" % msg)
1620   elif isinstance(err, errors.OpExecError):
1621     obuf.write("Failure: command execution error:\n%s" % msg)
1622   elif isinstance(err, errors.TagError):
1623     obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1624   elif isinstance(err, errors.JobQueueDrainError):
1625     obuf.write("Failure: the job queue is marked for drain and doesn't"
1626                " accept new requests\n")
1627   elif isinstance(err, errors.JobQueueFull):
1628     obuf.write("Failure: the job queue is full and doesn't accept new"
1629                " job submissions until old jobs are archived\n")
1630   elif isinstance(err, errors.TypeEnforcementError):
1631     obuf.write("Parameter Error: %s" % msg)
1632   elif isinstance(err, errors.ParameterError):
1633     obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1634   elif isinstance(err, luxi.NoMasterError):
1635     obuf.write("Cannot communicate with the master daemon.\nIs it running"
1636                " and listening for connections?")
1637   elif isinstance(err, luxi.TimeoutError):
1638     obuf.write("Timeout while talking to the master daemon. Error:\n"
1639                "%s" % msg)
1640   elif isinstance(err, luxi.ProtocolError):
1641     obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1642                "%s" % msg)
1643   elif isinstance(err, errors.GenericError):
1644     obuf.write("Unhandled Ganeti error: %s" % msg)
1645   elif isinstance(err, JobSubmittedException):
1646     obuf.write("JobID: %s\n" % err.args[0])
1647     retcode = 0
1648   else:
1649     obuf.write("Unhandled exception: %s" % msg)
1650   return retcode, obuf.getvalue().rstrip('\n')
1651
1652
1653 def GenericMain(commands, override=None, aliases=None):
1654   """Generic main function for all the gnt-* commands.
1655
1656   Arguments:
1657     - commands: a dictionary with a special structure, see the design doc
1658                 for command line handling.
1659     - override: if not None, we expect a dictionary with keys that will
1660                 override command line options; this can be used to pass
1661                 options from the scripts to generic functions
1662     - aliases: dictionary with command aliases {'alias': 'target, ...}
1663
1664   """
1665   # save the program name and the entire command line for later logging
1666   if sys.argv:
1667     binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1668     if len(sys.argv) >= 2:
1669       binary += " " + sys.argv[1]
1670       old_cmdline = " ".join(sys.argv[2:])
1671     else:
1672       old_cmdline = ""
1673   else:
1674     binary = "<unknown program>"
1675     old_cmdline = ""
1676
1677   if aliases is None:
1678     aliases = {}
1679
1680   try:
1681     func, options, args = _ParseArgs(sys.argv, commands, aliases)
1682   except errors.ParameterError, err:
1683     result, err_msg = FormatError(err)
1684     ToStderr(err_msg)
1685     return 1
1686
1687   if func is None: # parse error
1688     return 1
1689
1690   if override is not None:
1691     for key, val in override.iteritems():
1692       setattr(options, key, val)
1693
1694   utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1695                      stderr_logging=True, program=binary)
1696
1697   if old_cmdline:
1698     logging.info("run with arguments '%s'", old_cmdline)
1699   else:
1700     logging.info("run with no arguments")
1701
1702   try:
1703     result = func(options, args)
1704   except (errors.GenericError, luxi.ProtocolError,
1705           JobSubmittedException), err:
1706     result, err_msg = FormatError(err)
1707     logging.exception("Error during command processing")
1708     ToStderr(err_msg)
1709
1710   return result
1711
1712
1713 def GenericInstanceCreate(mode, opts, args):
1714   """Add an instance to the cluster via either creation or import.
1715
1716   @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1717   @param opts: the command line options selected by the user
1718   @type args: list
1719   @param args: should contain only one element, the new instance name
1720   @rtype: int
1721   @return: the desired exit code
1722
1723   """
1724   instance = args[0]
1725
1726   (pnode, snode) = SplitNodeOption(opts.node)
1727
1728   hypervisor = None
1729   hvparams = {}
1730   if opts.hypervisor:
1731     hypervisor, hvparams = opts.hypervisor
1732
1733   if opts.nics:
1734     try:
1735       nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
1736     except ValueError, err:
1737       raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
1738     nics = [{}] * nic_max
1739     for nidx, ndict in opts.nics:
1740       nidx = int(nidx)
1741       if not isinstance(ndict, dict):
1742         msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict)
1743         raise errors.OpPrereqError(msg)
1744       nics[nidx] = ndict
1745   elif opts.no_nics:
1746     # no nics
1747     nics = []
1748   elif mode == constants.INSTANCE_CREATE:
1749     # default of one nic, all auto
1750     nics = [{}]
1751   else:
1752     # mode == import
1753     nics = []
1754
1755   if opts.disk_template == constants.DT_DISKLESS:
1756     if opts.disks or opts.sd_size is not None:
1757       raise errors.OpPrereqError("Diskless instance but disk"
1758                                  " information passed")
1759     disks = []
1760   else:
1761     if (not opts.disks and not opts.sd_size
1762         and mode == constants.INSTANCE_CREATE):
1763       raise errors.OpPrereqError("No disk information specified")
1764     if opts.disks and opts.sd_size is not None:
1765       raise errors.OpPrereqError("Please use either the '--disk' or"
1766                                  " '-s' option")
1767     if opts.sd_size is not None:
1768       opts.disks = [(0, {"size": opts.sd_size})]
1769
1770     if opts.disks:
1771       try:
1772         disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1773       except ValueError, err:
1774         raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
1775       disks = [{}] * disk_max
1776     else:
1777       disks = []
1778     for didx, ddict in opts.disks:
1779       didx = int(didx)
1780       if not isinstance(ddict, dict):
1781         msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1782         raise errors.OpPrereqError(msg)
1783       elif "size" in ddict:
1784         if "adopt" in ddict:
1785           raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1786                                      " (disk %d)" % didx)
1787         try:
1788           ddict["size"] = utils.ParseUnit(ddict["size"])
1789         except ValueError, err:
1790           raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1791                                      (didx, err))
1792       elif "adopt" in ddict:
1793         if mode == constants.INSTANCE_IMPORT:
1794           raise errors.OpPrereqError("Disk adoption not allowed for instance"
1795                                      " import")
1796         ddict["size"] = 0
1797       else:
1798         raise errors.OpPrereqError("Missing size or adoption source for"
1799                                    " disk %d" % didx)
1800       disks[didx] = ddict
1801
1802   utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
1803   utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1804
1805   if mode == constants.INSTANCE_CREATE:
1806     start = opts.start
1807     os_type = opts.os
1808     src_node = None
1809     src_path = None
1810     no_install = opts.no_install
1811     identify_defaults = False
1812   elif mode == constants.INSTANCE_IMPORT:
1813     start = False
1814     os_type = None
1815     src_node = opts.src_node
1816     src_path = opts.src_dir
1817     no_install = None
1818     identify_defaults = opts.identify_defaults
1819   else:
1820     raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1821
1822   op = opcodes.OpCreateInstance(instance_name=instance,
1823                                 disks=disks,
1824                                 disk_template=opts.disk_template,
1825                                 nics=nics,
1826                                 pnode=pnode, snode=snode,
1827                                 ip_check=opts.ip_check,
1828                                 name_check=opts.name_check,
1829                                 wait_for_sync=opts.wait_for_sync,
1830                                 file_storage_dir=opts.file_storage_dir,
1831                                 file_driver=opts.file_driver,
1832                                 iallocator=opts.iallocator,
1833                                 hypervisor=hypervisor,
1834                                 hvparams=hvparams,
1835                                 beparams=opts.beparams,
1836                                 mode=mode,
1837                                 start=start,
1838                                 os_type=os_type,
1839                                 src_node=src_node,
1840                                 src_path=src_path,
1841                                 no_install=no_install,
1842                                 identify_defaults=identify_defaults)
1843
1844   SubmitOrSend(op, opts)
1845   return 0
1846
1847
1848 class _RunWhileClusterStoppedHelper:
1849   """Helper class for L{RunWhileClusterStopped} to simplify state management
1850
1851   """
1852   def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
1853     """Initializes this class.
1854
1855     @type feedback_fn: callable
1856     @param feedback_fn: Feedback function
1857     @type cluster_name: string
1858     @param cluster_name: Cluster name
1859     @type master_node: string
1860     @param master_node Master node name
1861     @type online_nodes: list
1862     @param online_nodes: List of names of online nodes
1863
1864     """
1865     self.feedback_fn = feedback_fn
1866     self.cluster_name = cluster_name
1867     self.master_node = master_node
1868     self.online_nodes = online_nodes
1869
1870     self.ssh = ssh.SshRunner(self.cluster_name)
1871
1872     self.nonmaster_nodes = [name for name in online_nodes
1873                             if name != master_node]
1874
1875     assert self.master_node not in self.nonmaster_nodes
1876
1877   def _RunCmd(self, node_name, cmd):
1878     """Runs a command on the local or a remote machine.
1879
1880     @type node_name: string
1881     @param node_name: Machine name
1882     @type cmd: list
1883     @param cmd: Command
1884
1885     """
1886     if node_name is None or node_name == self.master_node:
1887       # No need to use SSH
1888       result = utils.RunCmd(cmd)
1889     else:
1890       result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
1891
1892     if result.failed:
1893       errmsg = ["Failed to run command %s" % result.cmd]
1894       if node_name:
1895         errmsg.append("on node %s" % node_name)
1896       errmsg.append(": exitcode %s and error %s" %
1897                     (result.exit_code, result.output))
1898       raise errors.OpExecError(" ".join(errmsg))
1899
1900   def Call(self, fn, *args):
1901     """Call function while all daemons are stopped.
1902
1903     @type fn: callable
1904     @param fn: Function to be called
1905
1906     """
1907     # Pause watcher by acquiring an exclusive lock on watcher state file
1908     self.feedback_fn("Blocking watcher")
1909     watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
1910     try:
1911       # TODO: Currently, this just blocks. There's no timeout.
1912       # TODO: Should it be a shared lock?
1913       watcher_block.Exclusive(blocking=True)
1914
1915       # Stop master daemons, so that no new jobs can come in and all running
1916       # ones are finished
1917       self.feedback_fn("Stopping master daemons")
1918       self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
1919       try:
1920         # Stop daemons on all nodes
1921         for node_name in self.online_nodes:
1922           self.feedback_fn("Stopping daemons on %s" % node_name)
1923           self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
1924
1925         # All daemons are shut down now
1926         try:
1927           return fn(self, *args)
1928         except Exception, err:
1929           _, errmsg = FormatError(err)
1930           logging.exception("Caught exception")
1931           self.feedback_fn(errmsg)
1932           raise
1933       finally:
1934         # Start cluster again, master node last
1935         for node_name in self.nonmaster_nodes + [self.master_node]:
1936           self.feedback_fn("Starting daemons on %s" % node_name)
1937           self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
1938     finally:
1939       # Resume watcher
1940       watcher_block.Close()
1941
1942
1943 def RunWhileClusterStopped(feedback_fn, fn, *args):
1944   """Calls a function while all cluster daemons are stopped.
1945
1946   @type feedback_fn: callable
1947   @param feedback_fn: Feedback function
1948   @type fn: callable
1949   @param fn: Function to be called when daemons are stopped
1950
1951   """
1952   feedback_fn("Gathering cluster information")
1953
1954   # This ensures we're running on the master daemon
1955   cl = GetClient()
1956
1957   (cluster_name, master_node) = \
1958     cl.QueryConfigValues(["cluster_name", "master_node"])
1959
1960   online_nodes = GetOnlineNodes([], cl=cl)
1961
1962   # Don't keep a reference to the client. The master daemon will go away.
1963   del cl
1964
1965   assert master_node in online_nodes
1966
1967   return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
1968                                        online_nodes).Call(fn, *args)
1969
1970
1971 def GenerateTable(headers, fields, separator, data,
1972                   numfields=None, unitfields=None,
1973                   units=None):
1974   """Prints a table with headers and different fields.
1975
1976   @type headers: dict
1977   @param headers: dictionary mapping field names to headers for
1978       the table
1979   @type fields: list
1980   @param fields: the field names corresponding to each row in
1981       the data field
1982   @param separator: the separator to be used; if this is None,
1983       the default 'smart' algorithm is used which computes optimal
1984       field width, otherwise just the separator is used between
1985       each field
1986   @type data: list
1987   @param data: a list of lists, each sublist being one row to be output
1988   @type numfields: list
1989   @param numfields: a list with the fields that hold numeric
1990       values and thus should be right-aligned
1991   @type unitfields: list
1992   @param unitfields: a list with the fields that hold numeric
1993       values that should be formatted with the units field
1994   @type units: string or None
1995   @param units: the units we should use for formatting, or None for
1996       automatic choice (human-readable for non-separator usage, otherwise
1997       megabytes); this is a one-letter string
1998
1999   """
2000   if units is None:
2001     if separator:
2002       units = "m"
2003     else:
2004       units = "h"
2005
2006   if numfields is None:
2007     numfields = []
2008   if unitfields is None:
2009     unitfields = []
2010
2011   numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
2012   unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
2013
2014   format_fields = []
2015   for field in fields:
2016     if headers and field not in headers:
2017       # TODO: handle better unknown fields (either revert to old
2018       # style of raising exception, or deal more intelligently with
2019       # variable fields)
2020       headers[field] = field
2021     if separator is not None:
2022       format_fields.append("%s")
2023     elif numfields.Matches(field):
2024       format_fields.append("%*s")
2025     else:
2026       format_fields.append("%-*s")
2027
2028   if separator is None:
2029     mlens = [0 for name in fields]
2030     format = ' '.join(format_fields)
2031   else:
2032     format = separator.replace("%", "%%").join(format_fields)
2033
2034   for row in data:
2035     if row is None:
2036       continue
2037     for idx, val in enumerate(row):
2038       if unitfields.Matches(fields[idx]):
2039         try:
2040           val = int(val)
2041         except (TypeError, ValueError):
2042           pass
2043         else:
2044           val = row[idx] = utils.FormatUnit(val, units)
2045       val = row[idx] = str(val)
2046       if separator is None:
2047         mlens[idx] = max(mlens[idx], len(val))
2048
2049   result = []
2050   if headers:
2051     args = []
2052     for idx, name in enumerate(fields):
2053       hdr = headers[name]
2054       if separator is None:
2055         mlens[idx] = max(mlens[idx], len(hdr))
2056         args.append(mlens[idx])
2057       args.append(hdr)
2058     result.append(format % tuple(args))
2059
2060   if separator is None:
2061     assert len(mlens) == len(fields)
2062
2063     if fields and not numfields.Matches(fields[-1]):
2064       mlens[-1] = 0
2065
2066   for line in data:
2067     args = []
2068     if line is None:
2069       line = ['-' for _ in fields]
2070     for idx in range(len(fields)):
2071       if separator is None:
2072         args.append(mlens[idx])
2073       args.append(line[idx])
2074     result.append(format % tuple(args))
2075
2076   return result
2077
2078
2079 def FormatTimestamp(ts):
2080   """Formats a given timestamp.
2081
2082   @type ts: timestamp
2083   @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
2084
2085   @rtype: string
2086   @return: a string with the formatted timestamp
2087
2088   """
2089   if not isinstance (ts, (tuple, list)) or len(ts) != 2:
2090     return '?'
2091   sec, usec = ts
2092   return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
2093
2094
2095 def ParseTimespec(value):
2096   """Parse a time specification.
2097
2098   The following suffixed will be recognized:
2099
2100     - s: seconds
2101     - m: minutes
2102     - h: hours
2103     - d: day
2104     - w: weeks
2105
2106   Without any suffix, the value will be taken to be in seconds.
2107
2108   """
2109   value = str(value)
2110   if not value:
2111     raise errors.OpPrereqError("Empty time specification passed")
2112   suffix_map = {
2113     's': 1,
2114     'm': 60,
2115     'h': 3600,
2116     'd': 86400,
2117     'w': 604800,
2118     }
2119   if value[-1] not in suffix_map:
2120     try:
2121       value = int(value)
2122     except (TypeError, ValueError):
2123       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2124   else:
2125     multiplier = suffix_map[value[-1]]
2126     value = value[:-1]
2127     if not value: # no data left after stripping the suffix
2128       raise errors.OpPrereqError("Invalid time specification (only"
2129                                  " suffix passed)")
2130     try:
2131       value = int(value) * multiplier
2132     except (TypeError, ValueError):
2133       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2134   return value
2135
2136
2137 def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
2138                    filter_master=False):
2139   """Returns the names of online nodes.
2140
2141   This function will also log a warning on stderr with the names of
2142   the online nodes.
2143
2144   @param nodes: if not empty, use only this subset of nodes (minus the
2145       offline ones)
2146   @param cl: if not None, luxi client to use
2147   @type nowarn: boolean
2148   @param nowarn: by default, this function will output a note with the
2149       offline nodes that are skipped; if this parameter is True the
2150       note is not displayed
2151   @type secondary_ips: boolean
2152   @param secondary_ips: if True, return the secondary IPs instead of the
2153       names, useful for doing network traffic over the replication interface
2154       (if any)
2155   @type filter_master: boolean
2156   @param filter_master: if True, do not return the master node in the list
2157       (useful in coordination with secondary_ips where we cannot check our
2158       node name against the list)
2159
2160   """
2161   if cl is None:
2162     cl = GetClient()
2163
2164   if secondary_ips:
2165     name_idx = 2
2166   else:
2167     name_idx = 0
2168
2169   if filter_master:
2170     master_node = cl.QueryConfigValues(["master_node"])[0]
2171     filter_fn = lambda x: x != master_node
2172   else:
2173     filter_fn = lambda _: True
2174
2175   result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"],
2176                          use_locking=False)
2177   offline = [row[0] for row in result if row[1]]
2178   if offline and not nowarn:
2179     ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
2180   return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])]
2181
2182
2183 def _ToStream(stream, txt, *args):
2184   """Write a message to a stream, bypassing the logging system
2185
2186   @type stream: file object
2187   @param stream: the file to which we should write
2188   @type txt: str
2189   @param txt: the message
2190
2191   """
2192   if args:
2193     args = tuple(args)
2194     stream.write(txt % args)
2195   else:
2196     stream.write(txt)
2197   stream.write('\n')
2198   stream.flush()
2199
2200
2201 def ToStdout(txt, *args):
2202   """Write a message to stdout only, bypassing the logging system
2203
2204   This is just a wrapper over _ToStream.
2205
2206   @type txt: str
2207   @param txt: the message
2208
2209   """
2210   _ToStream(sys.stdout, txt, *args)
2211
2212
2213 def ToStderr(txt, *args):
2214   """Write a message to stderr only, bypassing the logging system
2215
2216   This is just a wrapper over _ToStream.
2217
2218   @type txt: str
2219   @param txt: the message
2220
2221   """
2222   _ToStream(sys.stderr, txt, *args)
2223
2224
2225 class JobExecutor(object):
2226   """Class which manages the submission and execution of multiple jobs.
2227
2228   Note that instances of this class should not be reused between
2229   GetResults() calls.
2230
2231   """
2232   def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
2233     self.queue = []
2234     if cl is None:
2235       cl = GetClient()
2236     self.cl = cl
2237     self.verbose = verbose
2238     self.jobs = []
2239     self.opts = opts
2240     self.feedback_fn = feedback_fn
2241
2242   def QueueJob(self, name, *ops):
2243     """Record a job for later submit.
2244
2245     @type name: string
2246     @param name: a description of the job, will be used in WaitJobSet
2247     """
2248     SetGenericOpcodeOpts(ops, self.opts)
2249     self.queue.append((name, ops))
2250
2251   def SubmitPending(self, each=False):
2252     """Submit all pending jobs.
2253
2254     """
2255     if each:
2256       results = []
2257       for row in self.queue:
2258         # SubmitJob will remove the success status, but raise an exception if
2259         # the submission fails, so we'll notice that anyway.
2260         results.append([True, self.cl.SubmitJob(row[1])])
2261     else:
2262       results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
2263     for (idx, ((status, data), (name, _))) in enumerate(zip(results,
2264                                                             self.queue)):
2265       self.jobs.append((idx, status, data, name))
2266
2267   def _ChooseJob(self):
2268     """Choose a non-waiting/queued job to poll next.
2269
2270     """
2271     assert self.jobs, "_ChooseJob called with empty job list"
2272
2273     result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
2274     assert result
2275
2276     for job_data, status in zip(self.jobs, result):
2277       if status[0] in (constants.JOB_STATUS_QUEUED,
2278                     constants.JOB_STATUS_WAITLOCK,
2279                     constants.JOB_STATUS_CANCELING):
2280         # job is still waiting
2281         continue
2282       # good candidate found
2283       self.jobs.remove(job_data)
2284       return job_data
2285
2286     # no job found
2287     return self.jobs.pop(0)
2288
2289   def GetResults(self):
2290     """Wait for and return the results of all jobs.
2291
2292     @rtype: list
2293     @return: list of tuples (success, job results), in the same order
2294         as the submitted jobs; if a job has failed, instead of the result
2295         there will be the error message
2296
2297     """
2298     if not self.jobs:
2299       self.SubmitPending()
2300     results = []
2301     if self.verbose:
2302       ok_jobs = [row[2] for row in self.jobs if row[1]]
2303       if ok_jobs:
2304         ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2305
2306     # first, remove any non-submitted jobs
2307     self.jobs, failures = compat.partition(self.jobs, lambda x: x[1])
2308     for idx, _, jid, name in failures:
2309       ToStderr("Failed to submit job for %s: %s", name, jid)
2310       results.append((idx, False, jid))
2311
2312     while self.jobs:
2313       (idx, _, jid, name) = self._ChooseJob()
2314       ToStdout("Waiting for job %s for %s...", jid, name)
2315       try:
2316         job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2317         success = True
2318       except (errors.GenericError, luxi.ProtocolError), err:
2319         _, job_result = FormatError(err)
2320         success = False
2321         # the error message will always be shown, verbose or not
2322         ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
2323
2324       results.append((idx, success, job_result))
2325
2326     # sort based on the index, then drop it
2327     results.sort()
2328     results = [i[1:] for i in results]
2329
2330     return results
2331
2332   def WaitOrShow(self, wait):
2333     """Wait for job results or only print the job IDs.
2334
2335     @type wait: boolean
2336     @param wait: whether to wait or not
2337
2338     """
2339     if wait:
2340       return self.GetResults()
2341     else:
2342       if not self.jobs:
2343         self.SubmitPending()
2344       for _, status, result, name in self.jobs:
2345         if status:
2346           ToStdout("%s: %s", result, name)
2347         else:
2348           ToStderr("Failure for %s: %s", name, result)