cli: Use ToStdout/ToStderr instead of print
[ganeti-local] / lib / cli.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module dealing with command line parsing"""
23
24
25 import sys
26 import textwrap
27 import os.path
28 import copy
29 import time
30 import logging
31 from cStringIO import StringIO
32
33 from ganeti import utils
34 from ganeti import errors
35 from ganeti import constants
36 from ganeti import opcodes
37 from ganeti import luxi
38 from ganeti import ssconf
39 from ganeti import rpc
40
41 from optparse import (OptionParser, make_option, TitledHelpFormatter,
42                       Option, OptionValueError)
43
44
45 __all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain",
46            "SubmitOpCode", "GetClient",
47            "cli_option", "ikv_option", "keyval_option",
48            "GenerateTable", "AskUser",
49            "ARGS_NONE", "ARGS_FIXED", "ARGS_ATLEAST", "ARGS_ANY", "ARGS_ONE",
50            "USEUNITS_OPT", "FIELDS_OPT", "FORCE_OPT", "SUBMIT_OPT",
51            "ListTags", "AddTags", "RemoveTags", "TAG_SRC_OPT",
52            "FormatError", "SplitNodeOption", "SubmitOrSend",
53            "JobSubmittedException", "FormatTimestamp", "ParseTimespec",
54            "ToStderr", "ToStdout", "UsesRPC",
55            "GetOnlineNodes", "JobExecutor", "SYNC_OPT",
56            ]
57
58
59
60 def _ExtractTagsObject(opts, args):
61   """Extract the tag type object.
62
63   Note that this function will modify its args parameter.
64
65   """
66   if not hasattr(opts, "tag_type"):
67     raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
68   kind = opts.tag_type
69   if kind == constants.TAG_CLUSTER:
70     retval = kind, kind
71   elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
72     if not args:
73       raise errors.OpPrereqError("no arguments passed to the command")
74     name = args.pop(0)
75     retval = kind, name
76   else:
77     raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
78   return retval
79
80
81 def _ExtendTags(opts, args):
82   """Extend the args if a source file has been given.
83
84   This function will extend the tags with the contents of the file
85   passed in the 'tags_source' attribute of the opts parameter. A file
86   named '-' will be replaced by stdin.
87
88   """
89   fname = opts.tags_source
90   if fname is None:
91     return
92   if fname == "-":
93     new_fh = sys.stdin
94   else:
95     new_fh = open(fname, "r")
96   new_data = []
97   try:
98     # we don't use the nice 'new_data = [line.strip() for line in fh]'
99     # because of python bug 1633941
100     while True:
101       line = new_fh.readline()
102       if not line:
103         break
104       new_data.append(line.strip())
105   finally:
106     new_fh.close()
107   args.extend(new_data)
108
109
110 def ListTags(opts, args):
111   """List the tags on a given object.
112
113   This is a generic implementation that knows how to deal with all
114   three cases of tag objects (cluster, node, instance). The opts
115   argument is expected to contain a tag_type field denoting what
116   object type we work on.
117
118   """
119   kind, name = _ExtractTagsObject(opts, args)
120   op = opcodes.OpGetTags(kind=kind, name=name)
121   result = SubmitOpCode(op)
122   result = list(result)
123   result.sort()
124   for tag in result:
125     ToStdout(tag)
126
127
128 def AddTags(opts, args):
129   """Add tags on a given object.
130
131   This is a generic implementation that knows how to deal with all
132   three cases of tag objects (cluster, node, instance). The opts
133   argument is expected to contain a tag_type field denoting what
134   object type we work on.
135
136   """
137   kind, name = _ExtractTagsObject(opts, args)
138   _ExtendTags(opts, args)
139   if not args:
140     raise errors.OpPrereqError("No tags to be added")
141   op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
142   SubmitOpCode(op)
143
144
145 def RemoveTags(opts, args):
146   """Remove tags from a given object.
147
148   This is a generic implementation that knows how to deal with all
149   three cases of tag objects (cluster, node, instance). The opts
150   argument is expected to contain a tag_type field denoting what
151   object type we work on.
152
153   """
154   kind, name = _ExtractTagsObject(opts, args)
155   _ExtendTags(opts, args)
156   if not args:
157     raise errors.OpPrereqError("No tags to be removed")
158   op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
159   SubmitOpCode(op)
160
161
162 DEBUG_OPT = make_option("-d", "--debug", default=False,
163                         action="store_true",
164                         help="Turn debugging on")
165
166 NOHDR_OPT = make_option("--no-headers", default=False,
167                         action="store_true", dest="no_headers",
168                         help="Don't display column headers")
169
170 SEP_OPT = make_option("--separator", default=None,
171                       action="store", dest="separator",
172                       help="Separator between output fields"
173                       " (defaults to one space)")
174
175 USEUNITS_OPT = make_option("--units", default=None,
176                            dest="units", choices=('h', 'm', 'g', 't'),
177                            help="Specify units for output (one of hmgt)")
178
179 FIELDS_OPT = make_option("-o", "--output", dest="output", action="store",
180                          type="string", help="Comma separated list of"
181                          " output fields",
182                          metavar="FIELDS")
183
184 FORCE_OPT = make_option("-f", "--force", dest="force", action="store_true",
185                         default=False, help="Force the operation")
186
187 TAG_SRC_OPT = make_option("--from", dest="tags_source",
188                           default=None, help="File with tag names")
189
190 SUBMIT_OPT = make_option("--submit", dest="submit_only",
191                          default=False, action="store_true",
192                          help="Submit the job and return the job ID, but"
193                          " don't wait for the job to finish")
194
195 SYNC_OPT = make_option("--sync", dest="do_locking",
196                        default=False, action="store_true",
197                        help="Grab locks while doing the queries"
198                        " in order to ensure more consistent results")
199
200
201 def ARGS_FIXED(val):
202   """Macro-like function denoting a fixed number of arguments"""
203   return -val
204
205
206 def ARGS_ATLEAST(val):
207   """Macro-like function denoting a minimum number of arguments"""
208   return val
209
210
211 ARGS_NONE = None
212 ARGS_ONE = ARGS_FIXED(1)
213 ARGS_ANY = ARGS_ATLEAST(0)
214
215
216 def check_unit(option, opt, value):
217   """OptParsers custom converter for units.
218
219   """
220   try:
221     return utils.ParseUnit(value)
222   except errors.UnitParseError, err:
223     raise OptionValueError("option %s: %s" % (opt, err))
224
225
226 class CliOption(Option):
227   """Custom option class for optparse.
228
229   """
230   TYPES = Option.TYPES + ("unit",)
231   TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
232   TYPE_CHECKER["unit"] = check_unit
233
234
235 def _SplitKeyVal(opt, data):
236   """Convert a KeyVal string into a dict.
237
238   This function will convert a key=val[,...] string into a dict. Empty
239   values will be converted specially: keys which have the prefix 'no_'
240   will have the value=False and the prefix stripped, the others will
241   have value=True.
242
243   @type opt: string
244   @param opt: a string holding the option name for which we process the
245       data, used in building error messages
246   @type data: string
247   @param data: a string of the format key=val,key=val,...
248   @rtype: dict
249   @return: {key=val, key=val}
250   @raises errors.ParameterError: if there are duplicate keys
251
252   """
253   NO_PREFIX = "no_"
254   UN_PREFIX = "-"
255   kv_dict = {}
256   for elem in data.split(","):
257     if "=" in elem:
258       key, val = elem.split("=", 1)
259     else:
260       if elem.startswith(NO_PREFIX):
261         key, val = elem[len(NO_PREFIX):], False
262       elif elem.startswith(UN_PREFIX):
263         key, val = elem[len(UN_PREFIX):], None
264       else:
265         key, val = elem, True
266     if key in kv_dict:
267       raise errors.ParameterError("Duplicate key '%s' in option %s" %
268                                   (key, opt))
269     kv_dict[key] = val
270   return kv_dict
271
272
273 def check_ident_key_val(option, opt, value):
274   """Custom parser for the IdentKeyVal option type.
275
276   """
277   if ":" not in value:
278     retval =  (value, {})
279   else:
280     ident, rest = value.split(":", 1)
281     kv_dict = _SplitKeyVal(opt, rest)
282     retval = (ident, kv_dict)
283   return retval
284
285
286 class IdentKeyValOption(Option):
287   """Custom option class for ident:key=val,key=val options.
288
289   This will store the parsed values as a tuple (ident, {key: val}). As
290   such, multiple uses of this option via action=append is possible.
291
292   """
293   TYPES = Option.TYPES + ("identkeyval",)
294   TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
295   TYPE_CHECKER["identkeyval"] = check_ident_key_val
296
297
298 def check_key_val(option, opt, value):
299   """Custom parser for the KeyVal option type.
300
301   """
302   return _SplitKeyVal(opt, value)
303
304
305 class KeyValOption(Option):
306   """Custom option class for key=val,key=val options.
307
308   This will store the parsed values as a dict {key: val}.
309
310   """
311   TYPES = Option.TYPES + ("keyval",)
312   TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
313   TYPE_CHECKER["keyval"] = check_key_val
314
315
316 # optparse.py sets make_option, so we do it for our own option class, too
317 cli_option = CliOption
318 ikv_option = IdentKeyValOption
319 keyval_option = KeyValOption
320
321
322 def _ParseArgs(argv, commands, aliases):
323   """Parser for the command line arguments.
324
325   This function parses the arguments and returns the function which
326   must be executed together with its (modified) arguments.
327
328   @param argv: the command line
329   @param commands: dictionary with special contents, see the design
330       doc for cmdline handling
331   @param aliases: dictionary with command aliases {'alias': 'target, ...}
332
333   """
334   if len(argv) == 0:
335     binary = "<command>"
336   else:
337     binary = argv[0].split("/")[-1]
338
339   if len(argv) > 1 and argv[1] == "--version":
340     ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
341     # Quit right away. That way we don't have to care about this special
342     # argument. optparse.py does it the same.
343     sys.exit(0)
344
345   if len(argv) < 2 or not (argv[1] in commands or
346                            argv[1] in aliases):
347     # let's do a nice thing
348     sortedcmds = commands.keys()
349     sortedcmds.sort()
350
351     ToStdout("Usage: %s {command} [options...] [argument...]", binary)
352     ToStdout("%s <command> --help to see details, or man %s", binary, binary)
353     ToStdout("")
354
355     # compute the max line length for cmd + usage
356     mlen = max([len(" %s" % cmd) for cmd in commands])
357     mlen = min(60, mlen) # should not get here...
358
359     # and format a nice command list
360     ToStdout("Commands:")
361     for cmd in sortedcmds:
362       cmdstr = " %s" % (cmd,)
363       help_text = commands[cmd][4]
364       help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
365       ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
366       for line in help_lines:
367         ToStdout("%-*s   %s", mlen, "", line)
368
369     ToStdout("")
370
371     return None, None, None
372
373   # get command, unalias it, and look it up in commands
374   cmd = argv.pop(1)
375   if cmd in aliases:
376     if cmd in commands:
377       raise errors.ProgrammerError("Alias '%s' overrides an existing"
378                                    " command" % cmd)
379
380     if aliases[cmd] not in commands:
381       raise errors.ProgrammerError("Alias '%s' maps to non-existing"
382                                    " command '%s'" % (cmd, aliases[cmd]))
383
384     cmd = aliases[cmd]
385
386   func, nargs, parser_opts, usage, description = commands[cmd]
387   parser = OptionParser(option_list=parser_opts,
388                         description=description,
389                         formatter=TitledHelpFormatter(),
390                         usage="%%prog %s %s" % (cmd, usage))
391   parser.disable_interspersed_args()
392   options, args = parser.parse_args()
393   if nargs is None:
394     if len(args) != 0:
395       ToStderr("Error: Command %s expects no arguments", cmd)
396       return None, None, None
397   elif nargs < 0 and len(args) != -nargs:
398     ToStderr("Error: Command %s expects %d argument(s)", cmd, -nargs)
399     return None, None, None
400   elif nargs >= 0 and len(args) < nargs:
401     ToStderr("Error: Command %s expects at least %d argument(s)", cmd, nargs)
402     return None, None, None
403
404   return func, options, args
405
406
407 def SplitNodeOption(value):
408   """Splits the value of a --node option.
409
410   """
411   if value and ':' in value:
412     return value.split(':', 1)
413   else:
414     return (value, None)
415
416
417 def UsesRPC(fn):
418   def wrapper(*args, **kwargs):
419     rpc.Init()
420     try:
421       return fn(*args, **kwargs)
422     finally:
423       rpc.Shutdown()
424   return wrapper
425
426
427 def AskUser(text, choices=None):
428   """Ask the user a question.
429
430   @param text: the question to ask
431
432   @param choices: list with elements tuples (input_char, return_value,
433       description); if not given, it will default to: [('y', True,
434       'Perform the operation'), ('n', False, 'Do no do the operation')];
435       note that the '?' char is reserved for help
436
437   @return: one of the return values from the choices list; if input is
438       not possible (i.e. not running with a tty, we return the last
439       entry from the list
440
441   """
442   if choices is None:
443     choices = [('y', True, 'Perform the operation'),
444                ('n', False, 'Do not perform the operation')]
445   if not choices or not isinstance(choices, list):
446     raise errors.ProgrammerError("Invalid choices argument to AskUser")
447   for entry in choices:
448     if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
449       raise errors.ProgrammerError("Invalid choices element to AskUser")
450
451   answer = choices[-1][1]
452   new_text = []
453   for line in text.splitlines():
454     new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
455   text = "\n".join(new_text)
456   try:
457     f = file("/dev/tty", "a+")
458   except IOError:
459     return answer
460   try:
461     chars = [entry[0] for entry in choices]
462     chars[-1] = "[%s]" % chars[-1]
463     chars.append('?')
464     maps = dict([(entry[0], entry[1]) for entry in choices])
465     while True:
466       f.write(text)
467       f.write('\n')
468       f.write("/".join(chars))
469       f.write(": ")
470       line = f.readline(2).strip().lower()
471       if line in maps:
472         answer = maps[line]
473         break
474       elif line == '?':
475         for entry in choices:
476           f.write(" %s - %s\n" % (entry[0], entry[2]))
477         f.write("\n")
478         continue
479   finally:
480     f.close()
481   return answer
482
483
484 class JobSubmittedException(Exception):
485   """Job was submitted, client should exit.
486
487   This exception has one argument, the ID of the job that was
488   submitted. The handler should print this ID.
489
490   This is not an error, just a structured way to exit from clients.
491
492   """
493
494
495 def SendJob(ops, cl=None):
496   """Function to submit an opcode without waiting for the results.
497
498   @type ops: list
499   @param ops: list of opcodes
500   @type cl: luxi.Client
501   @param cl: the luxi client to use for communicating with the master;
502              if None, a new client will be created
503
504   """
505   if cl is None:
506     cl = GetClient()
507
508   job_id = cl.SubmitJob(ops)
509
510   return job_id
511
512
513 def PollJob(job_id, cl=None, feedback_fn=None):
514   """Function to poll for the result of a job.
515
516   @type job_id: job identified
517   @param job_id: the job to poll for results
518   @type cl: luxi.Client
519   @param cl: the luxi client to use for communicating with the master;
520              if None, a new client will be created
521
522   """
523   if cl is None:
524     cl = GetClient()
525
526   prev_job_info = None
527   prev_logmsg_serial = None
528
529   while True:
530     result = cl.WaitForJobChange(job_id, ["status"], prev_job_info,
531                                  prev_logmsg_serial)
532     if not result:
533       # job not found, go away!
534       raise errors.JobLost("Job with id %s lost" % job_id)
535
536     # Split result, a tuple of (field values, log entries)
537     (job_info, log_entries) = result
538     (status, ) = job_info
539
540     if log_entries:
541       for log_entry in log_entries:
542         (serial, timestamp, _, message) = log_entry
543         if callable(feedback_fn):
544           feedback_fn(log_entry[1:])
545         else:
546           encoded = utils.SafeEncode(message)
547           ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded)
548         prev_logmsg_serial = max(prev_logmsg_serial, serial)
549
550     # TODO: Handle canceled and archived jobs
551     elif status in (constants.JOB_STATUS_SUCCESS,
552                     constants.JOB_STATUS_ERROR,
553                     constants.JOB_STATUS_CANCELING,
554                     constants.JOB_STATUS_CANCELED):
555       break
556
557     prev_job_info = job_info
558
559   jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
560   if not jobs:
561     raise errors.JobLost("Job with id %s lost" % job_id)
562
563   status, opstatus, result = jobs[0]
564   if status == constants.JOB_STATUS_SUCCESS:
565     return result
566   elif status in (constants.JOB_STATUS_CANCELING,
567                   constants.JOB_STATUS_CANCELED):
568     raise errors.OpExecError("Job was canceled")
569   else:
570     has_ok = False
571     for idx, (status, msg) in enumerate(zip(opstatus, result)):
572       if status == constants.OP_STATUS_SUCCESS:
573         has_ok = True
574       elif status == constants.OP_STATUS_ERROR:
575         if has_ok:
576           raise errors.OpExecError("partial failure (opcode %d): %s" %
577                                    (idx, msg))
578         else:
579           raise errors.OpExecError(str(msg))
580     # default failure mode
581     raise errors.OpExecError(result)
582
583
584 def SubmitOpCode(op, cl=None, feedback_fn=None):
585   """Legacy function to submit an opcode.
586
587   This is just a simple wrapper over the construction of the processor
588   instance. It should be extended to better handle feedback and
589   interaction functions.
590
591   """
592   if cl is None:
593     cl = GetClient()
594
595   job_id = SendJob([op], cl)
596
597   op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
598
599   return op_results[0]
600
601
602 def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
603   """Wrapper around SubmitOpCode or SendJob.
604
605   This function will decide, based on the 'opts' parameter, whether to
606   submit and wait for the result of the opcode (and return it), or
607   whether to just send the job and print its identifier. It is used in
608   order to simplify the implementation of the '--submit' option.
609
610   """
611   if opts and opts.submit_only:
612     job_id = SendJob([op], cl=cl)
613     raise JobSubmittedException(job_id)
614   else:
615     return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn)
616
617
618 def GetClient():
619   # TODO: Cache object?
620   try:
621     client = luxi.Client()
622   except luxi.NoMasterError:
623     master, myself = ssconf.GetMasterAndMyself()
624     if master != myself:
625       raise errors.OpPrereqError("This is not the master node, please connect"
626                                  " to node '%s' and rerun the command" %
627                                  master)
628     else:
629       raise
630   return client
631
632
633 def FormatError(err):
634   """Return a formatted error message for a given error.
635
636   This function takes an exception instance and returns a tuple
637   consisting of two values: first, the recommended exit code, and
638   second, a string describing the error message (not
639   newline-terminated).
640
641   """
642   retcode = 1
643   obuf = StringIO()
644   msg = str(err)
645   if isinstance(err, errors.ConfigurationError):
646     txt = "Corrupt configuration file: %s" % msg
647     logging.error(txt)
648     obuf.write(txt + "\n")
649     obuf.write("Aborting.")
650     retcode = 2
651   elif isinstance(err, errors.HooksAbort):
652     obuf.write("Failure: hooks execution failed:\n")
653     for node, script, out in err.args[0]:
654       if out:
655         obuf.write("  node: %s, script: %s, output: %s\n" %
656                    (node, script, out))
657       else:
658         obuf.write("  node: %s, script: %s (no output)\n" %
659                    (node, script))
660   elif isinstance(err, errors.HooksFailure):
661     obuf.write("Failure: hooks general failure: %s" % msg)
662   elif isinstance(err, errors.ResolverError):
663     this_host = utils.HostInfo.SysName()
664     if err.args[0] == this_host:
665       msg = "Failure: can't resolve my own hostname ('%s')"
666     else:
667       msg = "Failure: can't resolve hostname '%s'"
668     obuf.write(msg % err.args[0])
669   elif isinstance(err, errors.OpPrereqError):
670     obuf.write("Failure: prerequisites not met for this"
671                " operation:\n%s" % msg)
672   elif isinstance(err, errors.OpExecError):
673     obuf.write("Failure: command execution error:\n%s" % msg)
674   elif isinstance(err, errors.TagError):
675     obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
676   elif isinstance(err, errors.JobQueueDrainError):
677     obuf.write("Failure: the job queue is marked for drain and doesn't"
678                " accept new requests\n")
679   elif isinstance(err, errors.JobQueueFull):
680     obuf.write("Failure: the job queue is full and doesn't accept new"
681                " job submissions until old jobs are archived\n")
682   elif isinstance(err, errors.TypeEnforcementError):
683     obuf.write("Parameter Error: %s" % msg)
684   elif isinstance(err, errors.ParameterError):
685     obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
686   elif isinstance(err, errors.GenericError):
687     obuf.write("Unhandled Ganeti error: %s" % msg)
688   elif isinstance(err, luxi.NoMasterError):
689     obuf.write("Cannot communicate with the master daemon.\nIs it running"
690                " and listening for connections?")
691   elif isinstance(err, luxi.TimeoutError):
692     obuf.write("Timeout while talking to the master daemon. Error:\n"
693                "%s" % msg)
694   elif isinstance(err, luxi.ProtocolError):
695     obuf.write("Unhandled protocol error while talking to the master daemon:\n"
696                "%s" % msg)
697   elif isinstance(err, JobSubmittedException):
698     obuf.write("JobID: %s\n" % err.args[0])
699     retcode = 0
700   else:
701     obuf.write("Unhandled exception: %s" % msg)
702   return retcode, obuf.getvalue().rstrip('\n')
703
704
705 def GenericMain(commands, override=None, aliases=None):
706   """Generic main function for all the gnt-* commands.
707
708   Arguments:
709     - commands: a dictionary with a special structure, see the design doc
710                 for command line handling.
711     - override: if not None, we expect a dictionary with keys that will
712                 override command line options; this can be used to pass
713                 options from the scripts to generic functions
714     - aliases: dictionary with command aliases {'alias': 'target, ...}
715
716   """
717   # save the program name and the entire command line for later logging
718   if sys.argv:
719     binary = os.path.basename(sys.argv[0]) or sys.argv[0]
720     if len(sys.argv) >= 2:
721       binary += " " + sys.argv[1]
722       old_cmdline = " ".join(sys.argv[2:])
723     else:
724       old_cmdline = ""
725   else:
726     binary = "<unknown program>"
727     old_cmdline = ""
728
729   if aliases is None:
730     aliases = {}
731
732   func, options, args = _ParseArgs(sys.argv, commands, aliases)
733   if func is None: # parse error
734     return 1
735
736   if override is not None:
737     for key, val in override.iteritems():
738       setattr(options, key, val)
739
740   utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
741                      stderr_logging=True, program=binary)
742
743   if old_cmdline:
744     logging.info("run with arguments '%s'", old_cmdline)
745   else:
746     logging.info("run with no arguments")
747
748   try:
749     result = func(options, args)
750   except (errors.GenericError, luxi.ProtocolError,
751           JobSubmittedException), err:
752     result, err_msg = FormatError(err)
753     logging.exception("Error during command processing")
754     ToStderr(err_msg)
755
756   return result
757
758
759 def GenerateTable(headers, fields, separator, data,
760                   numfields=None, unitfields=None,
761                   units=None):
762   """Prints a table with headers and different fields.
763
764   @type headers: dict
765   @param headers: dictionary mapping field names to headers for
766       the table
767   @type fields: list
768   @param fields: the field names corresponding to each row in
769       the data field
770   @param separator: the separator to be used; if this is None,
771       the default 'smart' algorithm is used which computes optimal
772       field width, otherwise just the separator is used between
773       each field
774   @type data: list
775   @param data: a list of lists, each sublist being one row to be output
776   @type numfields: list
777   @param numfields: a list with the fields that hold numeric
778       values and thus should be right-aligned
779   @type unitfields: list
780   @param unitfields: a list with the fields that hold numeric
781       values that should be formatted with the units field
782   @type units: string or None
783   @param units: the units we should use for formatting, or None for
784       automatic choice (human-readable for non-separator usage, otherwise
785       megabytes); this is a one-letter string
786
787   """
788   if units is None:
789     if separator:
790       units = "m"
791     else:
792       units = "h"
793
794   if numfields is None:
795     numfields = []
796   if unitfields is None:
797     unitfields = []
798
799   numfields = utils.FieldSet(*numfields)
800   unitfields = utils.FieldSet(*unitfields)
801
802   format_fields = []
803   for field in fields:
804     if headers and field not in headers:
805       # TODO: handle better unknown fields (either revert to old
806       # style of raising exception, or deal more intelligently with
807       # variable fields)
808       headers[field] = field
809     if separator is not None:
810       format_fields.append("%s")
811     elif numfields.Matches(field):
812       format_fields.append("%*s")
813     else:
814       format_fields.append("%-*s")
815
816   if separator is None:
817     mlens = [0 for name in fields]
818     format = ' '.join(format_fields)
819   else:
820     format = separator.replace("%", "%%").join(format_fields)
821
822   for row in data:
823     if row is None:
824       continue
825     for idx, val in enumerate(row):
826       if unitfields.Matches(fields[idx]):
827         try:
828           val = int(val)
829         except ValueError:
830           pass
831         else:
832           val = row[idx] = utils.FormatUnit(val, units)
833       val = row[idx] = str(val)
834       if separator is None:
835         mlens[idx] = max(mlens[idx], len(val))
836
837   result = []
838   if headers:
839     args = []
840     for idx, name in enumerate(fields):
841       hdr = headers[name]
842       if separator is None:
843         mlens[idx] = max(mlens[idx], len(hdr))
844         args.append(mlens[idx])
845       args.append(hdr)
846     result.append(format % tuple(args))
847
848   for line in data:
849     args = []
850     if line is None:
851       line = ['-' for _ in fields]
852     for idx in xrange(len(fields)):
853       if separator is None:
854         args.append(mlens[idx])
855       args.append(line[idx])
856     result.append(format % tuple(args))
857
858   return result
859
860
861 def FormatTimestamp(ts):
862   """Formats a given timestamp.
863
864   @type ts: timestamp
865   @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
866
867   @rtype: string
868   @return: a string with the formatted timestamp
869
870   """
871   if not isinstance (ts, (tuple, list)) or len(ts) != 2:
872     return '?'
873   sec, usec = ts
874   return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
875
876
877 def ParseTimespec(value):
878   """Parse a time specification.
879
880   The following suffixed will be recognized:
881
882     - s: seconds
883     - m: minutes
884     - h: hours
885     - d: day
886     - w: weeks
887
888   Without any suffix, the value will be taken to be in seconds.
889
890   """
891   value = str(value)
892   if not value:
893     raise errors.OpPrereqError("Empty time specification passed")
894   suffix_map = {
895     's': 1,
896     'm': 60,
897     'h': 3600,
898     'd': 86400,
899     'w': 604800,
900     }
901   if value[-1] not in suffix_map:
902     try:
903       value = int(value)
904     except ValueError:
905       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
906   else:
907     multiplier = suffix_map[value[-1]]
908     value = value[:-1]
909     if not value: # no data left after stripping the suffix
910       raise errors.OpPrereqError("Invalid time specification (only"
911                                  " suffix passed)")
912     try:
913       value = int(value) * multiplier
914     except ValueError:
915       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
916   return value
917
918
919 def GetOnlineNodes(nodes, cl=None, nowarn=False):
920   """Returns the names of online nodes.
921
922   This function will also log a warning on stderr with the names of
923   the online nodes.
924
925   @param nodes: if not empty, use only this subset of nodes (minus the
926       offline ones)
927   @param cl: if not None, luxi client to use
928   @type nowarn: boolean
929   @param nowarn: by default, this function will output a note with the
930       offline nodes that are skipped; if this parameter is True the
931       note is not displayed
932
933   """
934   if cl is None:
935     cl = GetClient()
936
937   result = cl.QueryNodes(names=nodes, fields=["name", "offline"],
938                          use_locking=False)
939   offline = [row[0] for row in result if row[1]]
940   if offline and not nowarn:
941     ToStderr("Note: skipping offline node(s): %s" % ", ".join(offline))
942   return [row[0] for row in result if not row[1]]
943
944
945 def _ToStream(stream, txt, *args):
946   """Write a message to a stream, bypassing the logging system
947
948   @type stream: file object
949   @param stream: the file to which we should write
950   @type txt: str
951   @param txt: the message
952
953   """
954   if args:
955     args = tuple(args)
956     stream.write(txt % args)
957   else:
958     stream.write(txt)
959   stream.write('\n')
960   stream.flush()
961
962
963 def ToStdout(txt, *args):
964   """Write a message to stdout only, bypassing the logging system
965
966   This is just a wrapper over _ToStream.
967
968   @type txt: str
969   @param txt: the message
970
971   """
972   _ToStream(sys.stdout, txt, *args)
973
974
975 def ToStderr(txt, *args):
976   """Write a message to stderr only, bypassing the logging system
977
978   This is just a wrapper over _ToStream.
979
980   @type txt: str
981   @param txt: the message
982
983   """
984   _ToStream(sys.stderr, txt, *args)
985
986
987 class JobExecutor(object):
988   """Class which manages the submission and execution of multiple jobs.
989
990   Note that instances of this class should not be reused between
991   GetResults() calls.
992
993   """
994   def __init__(self, cl=None, verbose=True):
995     self.queue = []
996     if cl is None:
997       cl = GetClient()
998     self.cl = cl
999     self.verbose = verbose
1000     self.jobs = []
1001
1002   def QueueJob(self, name, *ops):
1003     """Record a job for later submit.
1004
1005     @type name: string
1006     @param name: a description of the job, will be used in WaitJobSet
1007     """
1008     self.queue.append((name, ops))
1009
1010   def SubmitPending(self):
1011     """Submit all pending jobs.
1012
1013     """
1014     results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
1015     for ((status, data), (name, _)) in zip(results, self.queue):
1016       self.jobs.append((status, data, name))
1017
1018   def GetResults(self):
1019     """Wait for and return the results of all jobs.
1020
1021     @rtype: list
1022     @return: list of tuples (success, job results), in the same order
1023         as the submitted jobs; if a job has failed, instead of the result
1024         there will be the error message
1025
1026     """
1027     if not self.jobs:
1028       self.SubmitPending()
1029     results = []
1030     if self.verbose:
1031       ok_jobs = [row[1] for row in self.jobs if row[0]]
1032       if ok_jobs:
1033         ToStdout("Submitted jobs %s", ", ".join(ok_jobs))
1034     for submit_status, jid, name in self.jobs:
1035       if not submit_status:
1036         ToStderr("Failed to submit job for %s: %s", name, jid)
1037         results.append((False, jid))
1038         continue
1039       if self.verbose:
1040         ToStdout("Waiting for job %s for %s...", jid, name)
1041       try:
1042         job_result = PollJob(jid, cl=self.cl)
1043         success = True
1044       except (errors.GenericError, luxi.ProtocolError), err:
1045         _, job_result = FormatError(err)
1046         success = False
1047         # the error message will always be shown, verbose or not
1048         ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
1049
1050       results.append((success, job_result))
1051     return results
1052
1053   def WaitOrShow(self, wait):
1054     """Wait for job results or only print the job IDs.
1055
1056     @type wait: boolean
1057     @param wait: whether to wait or not
1058
1059     """
1060     if wait:
1061       return self.GetResults()
1062     else:
1063       if not self.jobs:
1064         self.SubmitPending()
1065       for status, result, name in self.jobs:
1066         if status:
1067           ToStdout("%s: %s", result, name)
1068         else:
1069           ToStderr("Failure for %s: %s", name, result)