Unify the “--candidate-pool-size” option
[ganeti-local] / lib / cli.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module dealing with command line parsing"""
23
24
25 import sys
26 import textwrap
27 import os.path
28 import copy
29 import time
30 import logging
31 from cStringIO import StringIO
32
33 from ganeti import utils
34 from ganeti import errors
35 from ganeti import constants
36 from ganeti import opcodes
37 from ganeti import luxi
38 from ganeti import ssconf
39 from ganeti import rpc
40
41 from optparse import (OptionParser, TitledHelpFormatter,
42                       Option, OptionValueError)
43
44
45 __all__ = [
46   # Command line options
47   "ALLOCATABLE_OPT",
48   "ALL_OPT",
49   "AUTO_REPLACE_OPT",
50   "BACKEND_OPT",
51   "CLEANUP_OPT",
52   "CONFIRM_OPT",
53   "CP_SIZE_OPT",
54   "DEBUG_OPT",
55   "DEBUG_SIMERR_OPT",
56   "DISKIDX_OPT",
57   "DISK_OPT",
58   "DISK_TEMPLATE_OPT",
59   "DRAINED_OPT",
60   "ENABLED_HV_OPT",
61   "FIELDS_OPT",
62   "FILESTORE_DIR_OPT",
63   "FILESTORE_DRIVER_OPT",
64   "HVLIST_OPT",
65   "HVOPTS_OPT",
66   "HYPERVISOR_OPT",
67   "IALLOCATOR_OPT",
68   "IGNORE_CONSIST_OPT",
69   "IGNORE_FAILURES_OPT",
70   "IGNORE_SIZE_OPT",
71   "FORCE_OPT",
72   "MC_OPT",
73   "NET_OPT",
74   "NEW_SECONDARY_OPT",
75   "NIC_PARAMS_OPT",
76   "NODE_LIST_OPT",
77   "NODE_PLACEMENT_OPT",
78   "NOHDR_OPT",
79   "NOIPCHECK_OPT",
80   "NOLVM_STORAGE_OPT",
81   "NONICS_OPT",
82   "NONLIVE_OPT",
83   "NOSTART_OPT",
84   "NOSSH_KEYCHECK_OPT",
85   "NWSYNC_OPT",
86   "ON_PRIMARY_OPT",
87   "ON_SECONDARY_OPT",
88   "OFFLINE_OPT",
89   "OS_OPT",
90   "OS_SIZE_OPT",
91   "READD_OPT",
92   "SECONDARY_IP_OPT",
93   "SELECT_OS_OPT",
94   "SEP_OPT",
95   "SHOWCMD_OPT",
96   "SINGLE_NODE_OPT",
97   "SRC_DIR_OPT",
98   "SRC_NODE_OPT",
99   "SUBMIT_OPT",
100   "STATIC_OPT",
101   "SYNC_OPT",
102   "TAG_SRC_OPT",
103   "USEUNITS_OPT",
104   "VERBOSE_OPT",
105   # Generic functions for CLI programs
106   "GenericMain",
107   "GetClient",
108   "GetOnlineNodes",
109   "JobExecutor",
110   "JobSubmittedException",
111   "ParseTimespec",
112   "SubmitOpCode",
113   "SubmitOrSend",
114   "UsesRPC",
115   # Formatting functions
116   "ToStderr", "ToStdout",
117   "FormatError",
118   "GenerateTable",
119   "AskUser",
120   "FormatTimestamp",
121   # Tags functions
122   "ListTags",
123   "AddTags",
124   "RemoveTags",
125   # command line options support infrastructure
126   "ARGS_MANY_INSTANCES",
127   "ARGS_MANY_NODES",
128   "ARGS_NONE",
129   "ARGS_ONE_INSTANCE",
130   "ARGS_ONE_NODE",
131   "ArgChoice",
132   "ArgCommand",
133   "ArgFile",
134   "ArgHost",
135   "ArgInstance",
136   "ArgJobId",
137   "ArgNode",
138   "ArgSuggest",
139   "ArgUnknown",
140   "OPT_COMPL_INST_ADD_NODES",
141   "OPT_COMPL_MANY_NODES",
142   "OPT_COMPL_ONE_IALLOCATOR",
143   "OPT_COMPL_ONE_INSTANCE",
144   "OPT_COMPL_ONE_NODE",
145   "OPT_COMPL_ONE_OS",
146   "cli_option",
147   "SplitNodeOption",
148   ]
149
150 NO_PREFIX = "no_"
151 UN_PREFIX = "-"
152
153
154 class _Argument:
155   def __init__(self, min=0, max=None):
156     self.min = min
157     self.max = max
158
159   def __repr__(self):
160     return ("<%s min=%s max=%s>" %
161             (self.__class__.__name__, self.min, self.max))
162
163
164 class ArgSuggest(_Argument):
165   """Suggesting argument.
166
167   Value can be any of the ones passed to the constructor.
168
169   """
170   def __init__(self, min=0, max=None, choices=None):
171     _Argument.__init__(self, min=min, max=max)
172     self.choices = choices
173
174   def __repr__(self):
175     return ("<%s min=%s max=%s choices=%r>" %
176             (self.__class__.__name__, self.min, self.max, self.choices))
177
178
179 class ArgChoice(ArgSuggest):
180   """Choice argument.
181
182   Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
183   but value must be one of the choices.
184
185   """
186
187
188 class ArgUnknown(_Argument):
189   """Unknown argument to program (e.g. determined at runtime).
190
191   """
192
193
194 class ArgInstance(_Argument):
195   """Instances argument.
196
197   """
198
199
200 class ArgNode(_Argument):
201   """Node argument.
202
203   """
204
205 class ArgJobId(_Argument):
206   """Job ID argument.
207
208   """
209
210
211 class ArgFile(_Argument):
212   """File path argument.
213
214   """
215
216
217 class ArgCommand(_Argument):
218   """Command argument.
219
220   """
221
222
223 class ArgHost(_Argument):
224   """Host argument.
225
226   """
227
228
229 ARGS_NONE = []
230 ARGS_MANY_INSTANCES = [ArgInstance()]
231 ARGS_MANY_NODES = [ArgNode()]
232 ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
233 ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
234
235
236
237 def _ExtractTagsObject(opts, args):
238   """Extract the tag type object.
239
240   Note that this function will modify its args parameter.
241
242   """
243   if not hasattr(opts, "tag_type"):
244     raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
245   kind = opts.tag_type
246   if kind == constants.TAG_CLUSTER:
247     retval = kind, kind
248   elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
249     if not args:
250       raise errors.OpPrereqError("no arguments passed to the command")
251     name = args.pop(0)
252     retval = kind, name
253   else:
254     raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
255   return retval
256
257
258 def _ExtendTags(opts, args):
259   """Extend the args if a source file has been given.
260
261   This function will extend the tags with the contents of the file
262   passed in the 'tags_source' attribute of the opts parameter. A file
263   named '-' will be replaced by stdin.
264
265   """
266   fname = opts.tags_source
267   if fname is None:
268     return
269   if fname == "-":
270     new_fh = sys.stdin
271   else:
272     new_fh = open(fname, "r")
273   new_data = []
274   try:
275     # we don't use the nice 'new_data = [line.strip() for line in fh]'
276     # because of python bug 1633941
277     while True:
278       line = new_fh.readline()
279       if not line:
280         break
281       new_data.append(line.strip())
282   finally:
283     new_fh.close()
284   args.extend(new_data)
285
286
287 def ListTags(opts, args):
288   """List the tags on a given object.
289
290   This is a generic implementation that knows how to deal with all
291   three cases of tag objects (cluster, node, instance). The opts
292   argument is expected to contain a tag_type field denoting what
293   object type we work on.
294
295   """
296   kind, name = _ExtractTagsObject(opts, args)
297   op = opcodes.OpGetTags(kind=kind, name=name)
298   result = SubmitOpCode(op)
299   result = list(result)
300   result.sort()
301   for tag in result:
302     ToStdout(tag)
303
304
305 def AddTags(opts, args):
306   """Add tags on a given object.
307
308   This is a generic implementation that knows how to deal with all
309   three cases of tag objects (cluster, node, instance). The opts
310   argument is expected to contain a tag_type field denoting what
311   object type we work on.
312
313   """
314   kind, name = _ExtractTagsObject(opts, args)
315   _ExtendTags(opts, args)
316   if not args:
317     raise errors.OpPrereqError("No tags to be added")
318   op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
319   SubmitOpCode(op)
320
321
322 def RemoveTags(opts, args):
323   """Remove tags from a given object.
324
325   This is a generic implementation that knows how to deal with all
326   three cases of tag objects (cluster, node, instance). The opts
327   argument is expected to contain a tag_type field denoting what
328   object type we work on.
329
330   """
331   kind, name = _ExtractTagsObject(opts, args)
332   _ExtendTags(opts, args)
333   if not args:
334     raise errors.OpPrereqError("No tags to be removed")
335   op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
336   SubmitOpCode(op)
337
338
339 def check_unit(option, opt, value):
340   """OptParsers custom converter for units.
341
342   """
343   try:
344     return utils.ParseUnit(value)
345   except errors.UnitParseError, err:
346     raise OptionValueError("option %s: %s" % (opt, err))
347
348
349 def _SplitKeyVal(opt, data):
350   """Convert a KeyVal string into a dict.
351
352   This function will convert a key=val[,...] string into a dict. Empty
353   values will be converted specially: keys which have the prefix 'no_'
354   will have the value=False and the prefix stripped, the others will
355   have value=True.
356
357   @type opt: string
358   @param opt: a string holding the option name for which we process the
359       data, used in building error messages
360   @type data: string
361   @param data: a string of the format key=val,key=val,...
362   @rtype: dict
363   @return: {key=val, key=val}
364   @raises errors.ParameterError: if there are duplicate keys
365
366   """
367   kv_dict = {}
368   if data:
369     for elem in data.split(","):
370       if "=" in elem:
371         key, val = elem.split("=", 1)
372       else:
373         if elem.startswith(NO_PREFIX):
374           key, val = elem[len(NO_PREFIX):], False
375         elif elem.startswith(UN_PREFIX):
376           key, val = elem[len(UN_PREFIX):], None
377         else:
378           key, val = elem, True
379       if key in kv_dict:
380         raise errors.ParameterError("Duplicate key '%s' in option %s" %
381                                     (key, opt))
382       kv_dict[key] = val
383   return kv_dict
384
385
386 def check_ident_key_val(option, opt, value):
387   """Custom parser for ident:key=val,key=val options.
388
389   This will store the parsed values as a tuple (ident, {key: val}). As such,
390   multiple uses of this option via action=append is possible.
391
392   """
393   if ":" not in value:
394     ident, rest = value, ''
395   else:
396     ident, rest = value.split(":", 1)
397
398   if ident.startswith(NO_PREFIX):
399     if rest:
400       msg = "Cannot pass options when removing parameter groups: %s" % value
401       raise errors.ParameterError(msg)
402     retval = (ident[len(NO_PREFIX):], False)
403   elif ident.startswith(UN_PREFIX):
404     if rest:
405       msg = "Cannot pass options when removing parameter groups: %s" % value
406       raise errors.ParameterError(msg)
407     retval = (ident[len(UN_PREFIX):], None)
408   else:
409     kv_dict = _SplitKeyVal(opt, rest)
410     retval = (ident, kv_dict)
411   return retval
412
413
414 def check_key_val(option, opt, value):
415   """Custom parser class for key=val,key=val options.
416
417   This will store the parsed values as a dict {key: val}.
418
419   """
420   return _SplitKeyVal(opt, value)
421
422
423 # completion_suggestion is normally a list. Using numeric values not evaluating
424 # to False for dynamic completion.
425 (OPT_COMPL_MANY_NODES,
426  OPT_COMPL_ONE_NODE,
427  OPT_COMPL_ONE_INSTANCE,
428  OPT_COMPL_ONE_OS,
429  OPT_COMPL_ONE_IALLOCATOR,
430  OPT_COMPL_INST_ADD_NODES) = range(100, 106)
431
432 OPT_COMPL_ALL = frozenset([
433   OPT_COMPL_MANY_NODES,
434   OPT_COMPL_ONE_NODE,
435   OPT_COMPL_ONE_INSTANCE,
436   OPT_COMPL_ONE_OS,
437   OPT_COMPL_ONE_IALLOCATOR,
438   OPT_COMPL_INST_ADD_NODES,
439   ])
440
441
442 class CliOption(Option):
443   """Custom option class for optparse.
444
445   """
446   ATTRS = Option.ATTRS + [
447     "completion_suggest",
448     ]
449   TYPES = Option.TYPES + (
450     "identkeyval",
451     "keyval",
452     "unit",
453     )
454   TYPE_CHECKER = Option.TYPE_CHECKER.copy()
455   TYPE_CHECKER["identkeyval"] = check_ident_key_val
456   TYPE_CHECKER["keyval"] = check_key_val
457   TYPE_CHECKER["unit"] = check_unit
458
459
460 # optparse.py sets make_option, so we do it for our own option class, too
461 cli_option = CliOption
462
463
464 _YESNO = ("yes", "no")
465 _YORNO = "yes|no"
466
467 DEBUG_OPT = cli_option("-d", "--debug", default=False,
468                        action="store_true",
469                        help="Turn debugging on")
470
471 NOHDR_OPT = cli_option("--no-headers", default=False,
472                        action="store_true", dest="no_headers",
473                        help="Don't display column headers")
474
475 SEP_OPT = cli_option("--separator", default=None,
476                      action="store", dest="separator",
477                      help=("Separator between output fields"
478                            " (defaults to one space)"))
479
480 USEUNITS_OPT = cli_option("--units", default=None,
481                           dest="units", choices=('h', 'm', 'g', 't'),
482                           help="Specify units for output (one of hmgt)")
483
484 FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
485                         type="string", metavar="FIELDS",
486                         help="Comma separated list of output fields")
487
488 FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
489                        default=False, help="Force the operation")
490
491 CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
492                          default=False, help="Do not require confirmation")
493
494 TAG_SRC_OPT = cli_option("--from", dest="tags_source",
495                          default=None, help="File with tag names")
496
497 SUBMIT_OPT = cli_option("--submit", dest="submit_only",
498                         default=False, action="store_true",
499                         help=("Submit the job and return the job ID, but"
500                               " don't wait for the job to finish"))
501
502 SYNC_OPT = cli_option("--sync", dest="do_locking",
503                       default=False, action="store_true",
504                       help=("Grab locks while doing the queries"
505                             " in order to ensure more consistent results"))
506
507 _DRY_RUN_OPT = cli_option("--dry-run", default=False,
508                           action="store_true",
509                           help=("Do not execute the operation, just run the"
510                                 " check steps and verify it it could be"
511                                 " executed"))
512
513 VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
514                          action="store_true",
515                          help="Increase the verbosity of the operation")
516
517 DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
518                               action="store_true", dest="simulate_errors",
519                               help="Debugging option that makes the operation"
520                               " treat most runtime checks as failed")
521
522 NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
523                         default=True, action="store_false",
524                         help="Don't wait for sync (DANGEROUS!)")
525
526 DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
527                                help="Custom disk setup (diskless, file,"
528                                " plain or drbd)",
529                                default=None, metavar="TEMPL",
530                                choices=list(constants.DISK_TEMPLATES))
531
532 NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
533                         help="Do not create any network cards for"
534                         " the instance")
535
536 FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
537                                help="Relative path under default cluster-wide"
538                                " file storage dir to store file-based disks",
539                                default=None, metavar="<DIR>")
540
541 FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
542                                   help="Driver to use for image files",
543                                   default="loop", metavar="<DRIVER>",
544                                   choices=list(constants.FILE_DRIVER))
545
546 IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
547                             help="Select nodes for the instance automatically"
548                             " using the <NAME> iallocator plugin",
549                             default=None, type="string",
550                             completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
551
552 OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
553                     metavar="<os>",
554                     completion_suggest=OPT_COMPL_ONE_OS)
555
556 BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
557                          type="keyval", default={},
558                          help="Backend parameters")
559
560 HVOPTS_OPT =  cli_option("-H", "--hypervisor-parameters", type="keyval",
561                          default={}, dest="hvparams",
562                          help="Hypervisor parameters")
563
564 HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
565                             help="Hypervisor and hypervisor options, in the"
566                             " format hypervisor:option=value,option=value,...",
567                             default=None, type="identkeyval")
568
569 HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
570                         help="Hypervisor and hypervisor options, in the"
571                         " format hypervisor:option=value,option=value,...",
572                         default=[], action="append", type="identkeyval")
573
574 NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
575                            action="store_false",
576                            help="Don't check that the instance's IP"
577                            " is alive")
578
579 NET_OPT = cli_option("--net",
580                      help="NIC parameters", default=[],
581                      dest="nics", action="append", type="identkeyval")
582
583 DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
584                       dest="disks", action="append", type="identkeyval")
585
586 DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
587                          help="Comma-separated list of disks"
588                          " indices to act on (e.g. 0,2) (optional,"
589                          " defaults to all disks)")
590
591 OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
592                          help="Enforces a single-disk configuration using the"
593                          " given disk size, in MiB unless a suffix is used",
594                          default=None, type="unit", metavar="<size>")
595
596 IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
597                                 dest="ignore_consistency",
598                                 action="store_true", default=False,
599                                 help="Ignore the consistency of the disks on"
600                                 " the secondary")
601
602 NONLIVE_OPT = cli_option("--non-live", dest="live",
603                          default=True, action="store_false",
604                          help="Do a non-live migration (this usually means"
605                          " freeze the instance, save the state, transfer and"
606                          " only then resume running on the secondary node)")
607
608 NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
609                                 help="Target node and optional secondary node",
610                                 metavar="<pnode>[:<snode>]",
611                                 completion_suggest=OPT_COMPL_INST_ADD_NODES)
612
613 NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
614                            action="append", metavar="<node>",
615                            help="Use only this node (can be used multiple"
616                            " times, if not given defaults to all nodes)",
617                            completion_suggest=OPT_COMPL_ONE_NODE)
618
619 SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
620                              metavar="<node>",
621                              completion_suggest=OPT_COMPL_ONE_NODE)
622
623 NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
624                          action="store_false",
625                          help="Don't start the instance after creation")
626
627 SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
628                          action="store_true", default=False,
629                          help="Show command instead of executing it")
630
631 CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
632                          default=False, action="store_true",
633                          help="Instead of performing the migration, try to"
634                          " recover from a failed cleanup. This is safe"
635                          " to run even if the instance is healthy, but it"
636                          " will create extra replication traffic and "
637                          " disrupt briefly the replication (like during the"
638                          " migration")
639
640 STATIC_OPT = cli_option("-s", "--static", dest="static",
641                         action="store_true", default=False,
642                         help="Only show configuration data, not runtime data")
643
644 ALL_OPT = cli_option("--all", dest="show_all",
645                      default=False, action="store_true",
646                      help="Show info on all instances on the cluster."
647                      " This can take a long time to run, use wisely")
648
649 SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
650                            action="store_true", default=False,
651                            help="Interactive OS reinstall, lists available"
652                            " OS templates for selection")
653
654 IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
655                                  action="store_true", default=False,
656                                  help="Remove the instance from the cluster"
657                                  " configuration even if there are failures"
658                                  " during the removal process")
659
660 NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
661                                help="Specifies the new secondary node",
662                                metavar="NODE", default=None,
663                                completion_suggest=OPT_COMPL_ONE_NODE)
664
665 ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
666                             default=False, action="store_true",
667                             help="Replace the disk(s) on the primary"
668                             " node (only for the drbd template)")
669
670 ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
671                               default=False, action="store_true",
672                               help="Replace the disk(s) on the secondary"
673                               " node (only for the drbd template)")
674
675 AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
676                               default=False, action="store_true",
677                               help="Automatically replace faulty disks"
678                               " (only for the drbd template)")
679
680 IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
681                              default=False, action="store_true",
682                              help="Ignore current recorded size"
683                              " (useful for forcing activation when"
684                              " the recorded size is wrong)")
685
686 SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
687                           metavar="<node>",
688                           completion_suggest=OPT_COMPL_ONE_NODE)
689
690 SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
691                          metavar="<dir>")
692
693 SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
694                               help="Specify the secondary ip for the node",
695                               metavar="ADDRESS", default=None)
696
697 READD_OPT = cli_option("--readd", dest="readd",
698                        default=False, action="store_true",
699                        help="Readd old node after replacing it")
700
701 NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
702                                 default=True, action="store_false",
703                                 help="Disable SSH key fingerprint checking")
704
705
706 MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
707                     choices=_YESNO, default=None, metavar=_YORNO,
708                     help="Set the master_candidate flag on the node")
709
710 OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
711                          choices=_YESNO, default=None,
712                          help="Set the offline flag on the node")
713
714 DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
715                          choices=_YESNO, default=None,
716                          help="Set the drained flag on the node")
717
718 ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
719                              choices=_YESNO, default=None, metavar=_YORNO,
720                              help="Set the allocatable flag on a volume")
721
722 NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
723                                help="Disable support for lvm based instances"
724                                " (cluster-wide)",
725                                action="store_false", default=True)
726
727 ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
728                             dest="enabled_hypervisors",
729                             help="Comma-separated list of hypervisors",
730                             type="string", default=None)
731
732 NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
733                             type="keyval", default={},
734                             help="NIC parameters")
735
736 CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
737                          dest="candidate_pool_size", type="int",
738                          help="Set the candidate pool size")
739
740
741 def _ParseArgs(argv, commands, aliases):
742   """Parser for the command line arguments.
743
744   This function parses the arguments and returns the function which
745   must be executed together with its (modified) arguments.
746
747   @param argv: the command line
748   @param commands: dictionary with special contents, see the design
749       doc for cmdline handling
750   @param aliases: dictionary with command aliases {'alias': 'target, ...}
751
752   """
753   if len(argv) == 0:
754     binary = "<command>"
755   else:
756     binary = argv[0].split("/")[-1]
757
758   if len(argv) > 1 and argv[1] == "--version":
759     ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
760     # Quit right away. That way we don't have to care about this special
761     # argument. optparse.py does it the same.
762     sys.exit(0)
763
764   if len(argv) < 2 or not (argv[1] in commands or
765                            argv[1] in aliases):
766     # let's do a nice thing
767     sortedcmds = commands.keys()
768     sortedcmds.sort()
769
770     ToStdout("Usage: %s {command} [options...] [argument...]", binary)
771     ToStdout("%s <command> --help to see details, or man %s", binary, binary)
772     ToStdout("")
773
774     # compute the max line length for cmd + usage
775     mlen = max([len(" %s" % cmd) for cmd in commands])
776     mlen = min(60, mlen) # should not get here...
777
778     # and format a nice command list
779     ToStdout("Commands:")
780     for cmd in sortedcmds:
781       cmdstr = " %s" % (cmd,)
782       help_text = commands[cmd][4]
783       help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
784       ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
785       for line in help_lines:
786         ToStdout("%-*s   %s", mlen, "", line)
787
788     ToStdout("")
789
790     return None, None, None
791
792   # get command, unalias it, and look it up in commands
793   cmd = argv.pop(1)
794   if cmd in aliases:
795     if cmd in commands:
796       raise errors.ProgrammerError("Alias '%s' overrides an existing"
797                                    " command" % cmd)
798
799     if aliases[cmd] not in commands:
800       raise errors.ProgrammerError("Alias '%s' maps to non-existing"
801                                    " command '%s'" % (cmd, aliases[cmd]))
802
803     cmd = aliases[cmd]
804
805   func, args_def, parser_opts, usage, description = commands[cmd]
806   parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT],
807                         description=description,
808                         formatter=TitledHelpFormatter(),
809                         usage="%%prog %s %s" % (cmd, usage))
810   parser.disable_interspersed_args()
811   options, args = parser.parse_args()
812
813   if not _CheckArguments(cmd, args_def, args):
814     return None, None, None
815
816   return func, options, args
817
818
819 def _CheckArguments(cmd, args_def, args):
820   """Verifies the arguments using the argument definition.
821
822   Algorithm:
823
824     1. Abort with error if values specified by user but none expected.
825
826     1. For each argument in definition
827
828       1. Keep running count of minimum number of values (min_count)
829       1. Keep running count of maximum number of values (max_count)
830       1. If it has an unlimited number of values
831
832         1. Abort with error if it's not the last argument in the definition
833
834     1. If last argument has limited number of values
835
836       1. Abort with error if number of values doesn't match or is too large
837
838     1. Abort with error if user didn't pass enough values (min_count)
839
840   """
841   if args and not args_def:
842     ToStderr("Error: Command %s expects no arguments", cmd)
843     return False
844
845   min_count = None
846   max_count = None
847   check_max = None
848
849   last_idx = len(args_def) - 1
850
851   for idx, arg in enumerate(args_def):
852     if min_count is None:
853       min_count = arg.min
854     elif arg.min is not None:
855       min_count += arg.min
856
857     if max_count is None:
858       max_count = arg.max
859     elif arg.max is not None:
860       max_count += arg.max
861
862     if idx == last_idx:
863       check_max = (arg.max is not None)
864
865     elif arg.max is None:
866       raise errors.ProgrammerError("Only the last argument can have max=None")
867
868   if check_max:
869     # Command with exact number of arguments
870     if (min_count is not None and max_count is not None and
871         min_count == max_count and len(args) != min_count):
872       ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
873       return False
874
875     # Command with limited number of arguments
876     if max_count is not None and len(args) > max_count:
877       ToStderr("Error: Command %s expects only %d argument(s)",
878                cmd, max_count)
879       return False
880
881   # Command with some required arguments
882   if min_count is not None and len(args) < min_count:
883     ToStderr("Error: Command %s expects at least %d argument(s)",
884              cmd, min_count)
885     return False
886
887   return True
888
889
890 def SplitNodeOption(value):
891   """Splits the value of a --node option.
892
893   """
894   if value and ':' in value:
895     return value.split(':', 1)
896   else:
897     return (value, None)
898
899
900 def UsesRPC(fn):
901   def wrapper(*args, **kwargs):
902     rpc.Init()
903     try:
904       return fn(*args, **kwargs)
905     finally:
906       rpc.Shutdown()
907   return wrapper
908
909
910 def AskUser(text, choices=None):
911   """Ask the user a question.
912
913   @param text: the question to ask
914
915   @param choices: list with elements tuples (input_char, return_value,
916       description); if not given, it will default to: [('y', True,
917       'Perform the operation'), ('n', False, 'Do no do the operation')];
918       note that the '?' char is reserved for help
919
920   @return: one of the return values from the choices list; if input is
921       not possible (i.e. not running with a tty, we return the last
922       entry from the list
923
924   """
925   if choices is None:
926     choices = [('y', True, 'Perform the operation'),
927                ('n', False, 'Do not perform the operation')]
928   if not choices or not isinstance(choices, list):
929     raise errors.ProgrammerError("Invalid choices argument to AskUser")
930   for entry in choices:
931     if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
932       raise errors.ProgrammerError("Invalid choices element to AskUser")
933
934   answer = choices[-1][1]
935   new_text = []
936   for line in text.splitlines():
937     new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
938   text = "\n".join(new_text)
939   try:
940     f = file("/dev/tty", "a+")
941   except IOError:
942     return answer
943   try:
944     chars = [entry[0] for entry in choices]
945     chars[-1] = "[%s]" % chars[-1]
946     chars.append('?')
947     maps = dict([(entry[0], entry[1]) for entry in choices])
948     while True:
949       f.write(text)
950       f.write('\n')
951       f.write("/".join(chars))
952       f.write(": ")
953       line = f.readline(2).strip().lower()
954       if line in maps:
955         answer = maps[line]
956         break
957       elif line == '?':
958         for entry in choices:
959           f.write(" %s - %s\n" % (entry[0], entry[2]))
960         f.write("\n")
961         continue
962   finally:
963     f.close()
964   return answer
965
966
967 class JobSubmittedException(Exception):
968   """Job was submitted, client should exit.
969
970   This exception has one argument, the ID of the job that was
971   submitted. The handler should print this ID.
972
973   This is not an error, just a structured way to exit from clients.
974
975   """
976
977
978 def SendJob(ops, cl=None):
979   """Function to submit an opcode without waiting for the results.
980
981   @type ops: list
982   @param ops: list of opcodes
983   @type cl: luxi.Client
984   @param cl: the luxi client to use for communicating with the master;
985              if None, a new client will be created
986
987   """
988   if cl is None:
989     cl = GetClient()
990
991   job_id = cl.SubmitJob(ops)
992
993   return job_id
994
995
996 def PollJob(job_id, cl=None, feedback_fn=None):
997   """Function to poll for the result of a job.
998
999   @type job_id: job identified
1000   @param job_id: the job to poll for results
1001   @type cl: luxi.Client
1002   @param cl: the luxi client to use for communicating with the master;
1003              if None, a new client will be created
1004
1005   """
1006   if cl is None:
1007     cl = GetClient()
1008
1009   prev_job_info = None
1010   prev_logmsg_serial = None
1011
1012   while True:
1013     result = cl.WaitForJobChange(job_id, ["status"], prev_job_info,
1014                                  prev_logmsg_serial)
1015     if not result:
1016       # job not found, go away!
1017       raise errors.JobLost("Job with id %s lost" % job_id)
1018
1019     # Split result, a tuple of (field values, log entries)
1020     (job_info, log_entries) = result
1021     (status, ) = job_info
1022
1023     if log_entries:
1024       for log_entry in log_entries:
1025         (serial, timestamp, _, message) = log_entry
1026         if callable(feedback_fn):
1027           feedback_fn(log_entry[1:])
1028         else:
1029           encoded = utils.SafeEncode(message)
1030           ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded)
1031         prev_logmsg_serial = max(prev_logmsg_serial, serial)
1032
1033     # TODO: Handle canceled and archived jobs
1034     elif status in (constants.JOB_STATUS_SUCCESS,
1035                     constants.JOB_STATUS_ERROR,
1036                     constants.JOB_STATUS_CANCELING,
1037                     constants.JOB_STATUS_CANCELED):
1038       break
1039
1040     prev_job_info = job_info
1041
1042   jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1043   if not jobs:
1044     raise errors.JobLost("Job with id %s lost" % job_id)
1045
1046   status, opstatus, result = jobs[0]
1047   if status == constants.JOB_STATUS_SUCCESS:
1048     return result
1049   elif status in (constants.JOB_STATUS_CANCELING,
1050                   constants.JOB_STATUS_CANCELED):
1051     raise errors.OpExecError("Job was canceled")
1052   else:
1053     has_ok = False
1054     for idx, (status, msg) in enumerate(zip(opstatus, result)):
1055       if status == constants.OP_STATUS_SUCCESS:
1056         has_ok = True
1057       elif status == constants.OP_STATUS_ERROR:
1058         errors.MaybeRaise(msg)
1059         if has_ok:
1060           raise errors.OpExecError("partial failure (opcode %d): %s" %
1061                                    (idx, msg))
1062         else:
1063           raise errors.OpExecError(str(msg))
1064     # default failure mode
1065     raise errors.OpExecError(result)
1066
1067
1068 def SubmitOpCode(op, cl=None, feedback_fn=None):
1069   """Legacy function to submit an opcode.
1070
1071   This is just a simple wrapper over the construction of the processor
1072   instance. It should be extended to better handle feedback and
1073   interaction functions.
1074
1075   """
1076   if cl is None:
1077     cl = GetClient()
1078
1079   job_id = SendJob([op], cl)
1080
1081   op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
1082
1083   return op_results[0]
1084
1085
1086 def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1087   """Wrapper around SubmitOpCode or SendJob.
1088
1089   This function will decide, based on the 'opts' parameter, whether to
1090   submit and wait for the result of the opcode (and return it), or
1091   whether to just send the job and print its identifier. It is used in
1092   order to simplify the implementation of the '--submit' option.
1093
1094   It will also add the dry-run parameter from the options passed, if true.
1095
1096   """
1097   if opts and opts.dry_run:
1098     op.dry_run = opts.dry_run
1099   if opts and opts.submit_only:
1100     job_id = SendJob([op], cl=cl)
1101     raise JobSubmittedException(job_id)
1102   else:
1103     return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn)
1104
1105
1106 def GetClient():
1107   # TODO: Cache object?
1108   try:
1109     client = luxi.Client()
1110   except luxi.NoMasterError:
1111     master, myself = ssconf.GetMasterAndMyself()
1112     if master != myself:
1113       raise errors.OpPrereqError("This is not the master node, please connect"
1114                                  " to node '%s' and rerun the command" %
1115                                  master)
1116     else:
1117       raise
1118   return client
1119
1120
1121 def FormatError(err):
1122   """Return a formatted error message for a given error.
1123
1124   This function takes an exception instance and returns a tuple
1125   consisting of two values: first, the recommended exit code, and
1126   second, a string describing the error message (not
1127   newline-terminated).
1128
1129   """
1130   retcode = 1
1131   obuf = StringIO()
1132   msg = str(err)
1133   if isinstance(err, errors.ConfigurationError):
1134     txt = "Corrupt configuration file: %s" % msg
1135     logging.error(txt)
1136     obuf.write(txt + "\n")
1137     obuf.write("Aborting.")
1138     retcode = 2
1139   elif isinstance(err, errors.HooksAbort):
1140     obuf.write("Failure: hooks execution failed:\n")
1141     for node, script, out in err.args[0]:
1142       if out:
1143         obuf.write("  node: %s, script: %s, output: %s\n" %
1144                    (node, script, out))
1145       else:
1146         obuf.write("  node: %s, script: %s (no output)\n" %
1147                    (node, script))
1148   elif isinstance(err, errors.HooksFailure):
1149     obuf.write("Failure: hooks general failure: %s" % msg)
1150   elif isinstance(err, errors.ResolverError):
1151     this_host = utils.HostInfo.SysName()
1152     if err.args[0] == this_host:
1153       msg = "Failure: can't resolve my own hostname ('%s')"
1154     else:
1155       msg = "Failure: can't resolve hostname '%s'"
1156     obuf.write(msg % err.args[0])
1157   elif isinstance(err, errors.OpPrereqError):
1158     obuf.write("Failure: prerequisites not met for this"
1159                " operation:\n%s" % msg)
1160   elif isinstance(err, errors.OpExecError):
1161     obuf.write("Failure: command execution error:\n%s" % msg)
1162   elif isinstance(err, errors.TagError):
1163     obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1164   elif isinstance(err, errors.JobQueueDrainError):
1165     obuf.write("Failure: the job queue is marked for drain and doesn't"
1166                " accept new requests\n")
1167   elif isinstance(err, errors.JobQueueFull):
1168     obuf.write("Failure: the job queue is full and doesn't accept new"
1169                " job submissions until old jobs are archived\n")
1170   elif isinstance(err, errors.TypeEnforcementError):
1171     obuf.write("Parameter Error: %s" % msg)
1172   elif isinstance(err, errors.ParameterError):
1173     obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1174   elif isinstance(err, errors.GenericError):
1175     obuf.write("Unhandled Ganeti error: %s" % msg)
1176   elif isinstance(err, luxi.NoMasterError):
1177     obuf.write("Cannot communicate with the master daemon.\nIs it running"
1178                " and listening for connections?")
1179   elif isinstance(err, luxi.TimeoutError):
1180     obuf.write("Timeout while talking to the master daemon. Error:\n"
1181                "%s" % msg)
1182   elif isinstance(err, luxi.ProtocolError):
1183     obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1184                "%s" % msg)
1185   elif isinstance(err, JobSubmittedException):
1186     obuf.write("JobID: %s\n" % err.args[0])
1187     retcode = 0
1188   else:
1189     obuf.write("Unhandled exception: %s" % msg)
1190   return retcode, obuf.getvalue().rstrip('\n')
1191
1192
1193 def GenericMain(commands, override=None, aliases=None):
1194   """Generic main function for all the gnt-* commands.
1195
1196   Arguments:
1197     - commands: a dictionary with a special structure, see the design doc
1198                 for command line handling.
1199     - override: if not None, we expect a dictionary with keys that will
1200                 override command line options; this can be used to pass
1201                 options from the scripts to generic functions
1202     - aliases: dictionary with command aliases {'alias': 'target, ...}
1203
1204   """
1205   # save the program name and the entire command line for later logging
1206   if sys.argv:
1207     binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1208     if len(sys.argv) >= 2:
1209       binary += " " + sys.argv[1]
1210       old_cmdline = " ".join(sys.argv[2:])
1211     else:
1212       old_cmdline = ""
1213   else:
1214     binary = "<unknown program>"
1215     old_cmdline = ""
1216
1217   if aliases is None:
1218     aliases = {}
1219
1220   try:
1221     func, options, args = _ParseArgs(sys.argv, commands, aliases)
1222   except errors.ParameterError, err:
1223     result, err_msg = FormatError(err)
1224     ToStderr(err_msg)
1225     return 1
1226
1227   if func is None: # parse error
1228     return 1
1229
1230   if override is not None:
1231     for key, val in override.iteritems():
1232       setattr(options, key, val)
1233
1234   utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1235                      stderr_logging=True, program=binary)
1236
1237   if old_cmdline:
1238     logging.info("run with arguments '%s'", old_cmdline)
1239   else:
1240     logging.info("run with no arguments")
1241
1242   try:
1243     result = func(options, args)
1244   except (errors.GenericError, luxi.ProtocolError,
1245           JobSubmittedException), err:
1246     result, err_msg = FormatError(err)
1247     logging.exception("Error during command processing")
1248     ToStderr(err_msg)
1249
1250   return result
1251
1252
1253 def GenerateTable(headers, fields, separator, data,
1254                   numfields=None, unitfields=None,
1255                   units=None):
1256   """Prints a table with headers and different fields.
1257
1258   @type headers: dict
1259   @param headers: dictionary mapping field names to headers for
1260       the table
1261   @type fields: list
1262   @param fields: the field names corresponding to each row in
1263       the data field
1264   @param separator: the separator to be used; if this is None,
1265       the default 'smart' algorithm is used which computes optimal
1266       field width, otherwise just the separator is used between
1267       each field
1268   @type data: list
1269   @param data: a list of lists, each sublist being one row to be output
1270   @type numfields: list
1271   @param numfields: a list with the fields that hold numeric
1272       values and thus should be right-aligned
1273   @type unitfields: list
1274   @param unitfields: a list with the fields that hold numeric
1275       values that should be formatted with the units field
1276   @type units: string or None
1277   @param units: the units we should use for formatting, or None for
1278       automatic choice (human-readable for non-separator usage, otherwise
1279       megabytes); this is a one-letter string
1280
1281   """
1282   if units is None:
1283     if separator:
1284       units = "m"
1285     else:
1286       units = "h"
1287
1288   if numfields is None:
1289     numfields = []
1290   if unitfields is None:
1291     unitfields = []
1292
1293   numfields = utils.FieldSet(*numfields)
1294   unitfields = utils.FieldSet(*unitfields)
1295
1296   format_fields = []
1297   for field in fields:
1298     if headers and field not in headers:
1299       # TODO: handle better unknown fields (either revert to old
1300       # style of raising exception, or deal more intelligently with
1301       # variable fields)
1302       headers[field] = field
1303     if separator is not None:
1304       format_fields.append("%s")
1305     elif numfields.Matches(field):
1306       format_fields.append("%*s")
1307     else:
1308       format_fields.append("%-*s")
1309
1310   if separator is None:
1311     mlens = [0 for name in fields]
1312     format = ' '.join(format_fields)
1313   else:
1314     format = separator.replace("%", "%%").join(format_fields)
1315
1316   for row in data:
1317     if row is None:
1318       continue
1319     for idx, val in enumerate(row):
1320       if unitfields.Matches(fields[idx]):
1321         try:
1322           val = int(val)
1323         except ValueError:
1324           pass
1325         else:
1326           val = row[idx] = utils.FormatUnit(val, units)
1327       val = row[idx] = str(val)
1328       if separator is None:
1329         mlens[idx] = max(mlens[idx], len(val))
1330
1331   result = []
1332   if headers:
1333     args = []
1334     for idx, name in enumerate(fields):
1335       hdr = headers[name]
1336       if separator is None:
1337         mlens[idx] = max(mlens[idx], len(hdr))
1338         args.append(mlens[idx])
1339       args.append(hdr)
1340     result.append(format % tuple(args))
1341
1342   for line in data:
1343     args = []
1344     if line is None:
1345       line = ['-' for _ in fields]
1346     for idx in xrange(len(fields)):
1347       if separator is None:
1348         args.append(mlens[idx])
1349       args.append(line[idx])
1350     result.append(format % tuple(args))
1351
1352   return result
1353
1354
1355 def FormatTimestamp(ts):
1356   """Formats a given timestamp.
1357
1358   @type ts: timestamp
1359   @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
1360
1361   @rtype: string
1362   @return: a string with the formatted timestamp
1363
1364   """
1365   if not isinstance (ts, (tuple, list)) or len(ts) != 2:
1366     return '?'
1367   sec, usec = ts
1368   return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
1369
1370
1371 def ParseTimespec(value):
1372   """Parse a time specification.
1373
1374   The following suffixed will be recognized:
1375
1376     - s: seconds
1377     - m: minutes
1378     - h: hours
1379     - d: day
1380     - w: weeks
1381
1382   Without any suffix, the value will be taken to be in seconds.
1383
1384   """
1385   value = str(value)
1386   if not value:
1387     raise errors.OpPrereqError("Empty time specification passed")
1388   suffix_map = {
1389     's': 1,
1390     'm': 60,
1391     'h': 3600,
1392     'd': 86400,
1393     'w': 604800,
1394     }
1395   if value[-1] not in suffix_map:
1396     try:
1397       value = int(value)
1398     except ValueError:
1399       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1400   else:
1401     multiplier = suffix_map[value[-1]]
1402     value = value[:-1]
1403     if not value: # no data left after stripping the suffix
1404       raise errors.OpPrereqError("Invalid time specification (only"
1405                                  " suffix passed)")
1406     try:
1407       value = int(value) * multiplier
1408     except ValueError:
1409       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1410   return value
1411
1412
1413 def GetOnlineNodes(nodes, cl=None, nowarn=False):
1414   """Returns the names of online nodes.
1415
1416   This function will also log a warning on stderr with the names of
1417   the online nodes.
1418
1419   @param nodes: if not empty, use only this subset of nodes (minus the
1420       offline ones)
1421   @param cl: if not None, luxi client to use
1422   @type nowarn: boolean
1423   @param nowarn: by default, this function will output a note with the
1424       offline nodes that are skipped; if this parameter is True the
1425       note is not displayed
1426
1427   """
1428   if cl is None:
1429     cl = GetClient()
1430
1431   result = cl.QueryNodes(names=nodes, fields=["name", "offline"],
1432                          use_locking=False)
1433   offline = [row[0] for row in result if row[1]]
1434   if offline and not nowarn:
1435     ToStderr("Note: skipping offline node(s): %s" % ", ".join(offline))
1436   return [row[0] for row in result if not row[1]]
1437
1438
1439 def _ToStream(stream, txt, *args):
1440   """Write a message to a stream, bypassing the logging system
1441
1442   @type stream: file object
1443   @param stream: the file to which we should write
1444   @type txt: str
1445   @param txt: the message
1446
1447   """
1448   if args:
1449     args = tuple(args)
1450     stream.write(txt % args)
1451   else:
1452     stream.write(txt)
1453   stream.write('\n')
1454   stream.flush()
1455
1456
1457 def ToStdout(txt, *args):
1458   """Write a message to stdout only, bypassing the logging system
1459
1460   This is just a wrapper over _ToStream.
1461
1462   @type txt: str
1463   @param txt: the message
1464
1465   """
1466   _ToStream(sys.stdout, txt, *args)
1467
1468
1469 def ToStderr(txt, *args):
1470   """Write a message to stderr only, bypassing the logging system
1471
1472   This is just a wrapper over _ToStream.
1473
1474   @type txt: str
1475   @param txt: the message
1476
1477   """
1478   _ToStream(sys.stderr, txt, *args)
1479
1480
1481 class JobExecutor(object):
1482   """Class which manages the submission and execution of multiple jobs.
1483
1484   Note that instances of this class should not be reused between
1485   GetResults() calls.
1486
1487   """
1488   def __init__(self, cl=None, verbose=True):
1489     self.queue = []
1490     if cl is None:
1491       cl = GetClient()
1492     self.cl = cl
1493     self.verbose = verbose
1494     self.jobs = []
1495
1496   def QueueJob(self, name, *ops):
1497     """Record a job for later submit.
1498
1499     @type name: string
1500     @param name: a description of the job, will be used in WaitJobSet
1501     """
1502     self.queue.append((name, ops))
1503
1504   def SubmitPending(self):
1505     """Submit all pending jobs.
1506
1507     """
1508     results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
1509     for ((status, data), (name, _)) in zip(results, self.queue):
1510       self.jobs.append((status, data, name))
1511
1512   def GetResults(self):
1513     """Wait for and return the results of all jobs.
1514
1515     @rtype: list
1516     @return: list of tuples (success, job results), in the same order
1517         as the submitted jobs; if a job has failed, instead of the result
1518         there will be the error message
1519
1520     """
1521     if not self.jobs:
1522       self.SubmitPending()
1523     results = []
1524     if self.verbose:
1525       ok_jobs = [row[1] for row in self.jobs if row[0]]
1526       if ok_jobs:
1527         ToStdout("Submitted jobs %s", ", ".join(ok_jobs))
1528     for submit_status, jid, name in self.jobs:
1529       if not submit_status:
1530         ToStderr("Failed to submit job for %s: %s", name, jid)
1531         results.append((False, jid))
1532         continue
1533       if self.verbose:
1534         ToStdout("Waiting for job %s for %s...", jid, name)
1535       try:
1536         job_result = PollJob(jid, cl=self.cl)
1537         success = True
1538       except (errors.GenericError, luxi.ProtocolError), err:
1539         _, job_result = FormatError(err)
1540         success = False
1541         # the error message will always be shown, verbose or not
1542         ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
1543
1544       results.append((success, job_result))
1545     return results
1546
1547   def WaitOrShow(self, wait):
1548     """Wait for job results or only print the job IDs.
1549
1550     @type wait: boolean
1551     @param wait: whether to wait or not
1552
1553     """
1554     if wait:
1555       return self.GetResults()
1556     else:
1557       if not self.jobs:
1558         self.SubmitPending()
1559       for status, result, name in self.jobs:
1560         if status:
1561           ToStdout("%s: %s", result, name)
1562         else:
1563           ToStderr("Failure for %s: %s", name, result)