17dc47c174d21617eb4b85943ec007fb27e784b0
[ganeti-local] / lib / cli.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module dealing with command line parsing"""
23
24
25 import sys
26 import textwrap
27 import os.path
28 import copy
29 import time
30 import logging
31 from cStringIO import StringIO
32
33 from ganeti import utils
34 from ganeti import errors
35 from ganeti import constants
36 from ganeti import opcodes
37 from ganeti import luxi
38 from ganeti import ssconf
39 from ganeti import rpc
40
41 from optparse import (OptionParser, make_option, TitledHelpFormatter,
42                       Option, OptionValueError)
43
44 __all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain",
45            "SubmitOpCode", "GetClient",
46            "cli_option", "ikv_option", "keyval_option",
47            "GenerateTable", "AskUser",
48            "ARGS_NONE", "ARGS_FIXED", "ARGS_ATLEAST", "ARGS_ANY", "ARGS_ONE",
49            "USEUNITS_OPT", "FIELDS_OPT", "FORCE_OPT", "SUBMIT_OPT",
50            "ListTags", "AddTags", "RemoveTags", "TAG_SRC_OPT",
51            "FormatError", "SplitNodeOption", "SubmitOrSend",
52            "JobSubmittedException", "FormatTimestamp", "ParseTimespec",
53            "ToStderr", "ToStdout", "UsesRPC",
54            "GetOnlineNodes", "JobExecutor", "SYNC_OPT",
55            ]
56
57
58 def _ExtractTagsObject(opts, args):
59   """Extract the tag type object.
60
61   Note that this function will modify its args parameter.
62
63   """
64   if not hasattr(opts, "tag_type"):
65     raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
66   kind = opts.tag_type
67   if kind == constants.TAG_CLUSTER:
68     retval = kind, kind
69   elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
70     if not args:
71       raise errors.OpPrereqError("no arguments passed to the command")
72     name = args.pop(0)
73     retval = kind, name
74   else:
75     raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
76   return retval
77
78
79 def _ExtendTags(opts, args):
80   """Extend the args if a source file has been given.
81
82   This function will extend the tags with the contents of the file
83   passed in the 'tags_source' attribute of the opts parameter. A file
84   named '-' will be replaced by stdin.
85
86   """
87   fname = opts.tags_source
88   if fname is None:
89     return
90   if fname == "-":
91     new_fh = sys.stdin
92   else:
93     new_fh = open(fname, "r")
94   new_data = []
95   try:
96     # we don't use the nice 'new_data = [line.strip() for line in fh]'
97     # because of python bug 1633941
98     while True:
99       line = new_fh.readline()
100       if not line:
101         break
102       new_data.append(line.strip())
103   finally:
104     new_fh.close()
105   args.extend(new_data)
106
107
108 def ListTags(opts, args):
109   """List the tags on a given object.
110
111   This is a generic implementation that knows how to deal with all
112   three cases of tag objects (cluster, node, instance). The opts
113   argument is expected to contain a tag_type field denoting what
114   object type we work on.
115
116   """
117   kind, name = _ExtractTagsObject(opts, args)
118   op = opcodes.OpGetTags(kind=kind, name=name)
119   result = SubmitOpCode(op)
120   result = list(result)
121   result.sort()
122   for tag in result:
123     print tag
124
125
126 def AddTags(opts, args):
127   """Add tags on a given object.
128
129   This is a generic implementation that knows how to deal with all
130   three cases of tag objects (cluster, node, instance). The opts
131   argument is expected to contain a tag_type field denoting what
132   object type we work on.
133
134   """
135   kind, name = _ExtractTagsObject(opts, args)
136   _ExtendTags(opts, args)
137   if not args:
138     raise errors.OpPrereqError("No tags to be added")
139   op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
140   SubmitOpCode(op)
141
142
143 def RemoveTags(opts, args):
144   """Remove tags from a given object.
145
146   This is a generic implementation that knows how to deal with all
147   three cases of tag objects (cluster, node, instance). The opts
148   argument is expected to contain a tag_type field denoting what
149   object type we work on.
150
151   """
152   kind, name = _ExtractTagsObject(opts, args)
153   _ExtendTags(opts, args)
154   if not args:
155     raise errors.OpPrereqError("No tags to be removed")
156   op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
157   SubmitOpCode(op)
158
159
160 DEBUG_OPT = make_option("-d", "--debug", default=False,
161                         action="store_true",
162                         help="Turn debugging on")
163
164 NOHDR_OPT = make_option("--no-headers", default=False,
165                         action="store_true", dest="no_headers",
166                         help="Don't display column headers")
167
168 SEP_OPT = make_option("--separator", default=None,
169                       action="store", dest="separator",
170                       help="Separator between output fields"
171                       " (defaults to one space)")
172
173 USEUNITS_OPT = make_option("--units", default=None,
174                            dest="units", choices=('h', 'm', 'g', 't'),
175                            help="Specify units for output (one of hmgt)")
176
177 FIELDS_OPT = make_option("-o", "--output", dest="output", action="store",
178                          type="string", help="Comma separated list of"
179                          " output fields",
180                          metavar="FIELDS")
181
182 FORCE_OPT = make_option("-f", "--force", dest="force", action="store_true",
183                         default=False, help="Force the operation")
184
185 TAG_SRC_OPT = make_option("--from", dest="tags_source",
186                           default=None, help="File with tag names")
187
188 SUBMIT_OPT = make_option("--submit", dest="submit_only",
189                          default=False, action="store_true",
190                          help="Submit the job and return the job ID, but"
191                          " don't wait for the job to finish")
192
193 SYNC_OPT = make_option("--sync", dest="do_locking",
194                        default=False, action="store_true",
195                        help="Grab locks while doing the queries"
196                        " in order to ensure more consistent results")
197
198
199 def ARGS_FIXED(val):
200   """Macro-like function denoting a fixed number of arguments"""
201   return -val
202
203
204 def ARGS_ATLEAST(val):
205   """Macro-like function denoting a minimum number of arguments"""
206   return val
207
208
209 ARGS_NONE = None
210 ARGS_ONE = ARGS_FIXED(1)
211 ARGS_ANY = ARGS_ATLEAST(0)
212
213
214 def check_unit(option, opt, value):
215   """OptParsers custom converter for units.
216
217   """
218   try:
219     return utils.ParseUnit(value)
220   except errors.UnitParseError, err:
221     raise OptionValueError("option %s: %s" % (opt, err))
222
223
224 class CliOption(Option):
225   """Custom option class for optparse.
226
227   """
228   TYPES = Option.TYPES + ("unit",)
229   TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
230   TYPE_CHECKER["unit"] = check_unit
231
232
233 def _SplitKeyVal(opt, data):
234   """Convert a KeyVal string into a dict.
235
236   This function will convert a key=val[,...] string into a dict. Empty
237   values will be converted specially: keys which have the prefix 'no_'
238   will have the value=False and the prefix stripped, the others will
239   have value=True.
240
241   @type opt: string
242   @param opt: a string holding the option name for which we process the
243       data, used in building error messages
244   @type data: string
245   @param data: a string of the format key=val,key=val,...
246   @rtype: dict
247   @return: {key=val, key=val}
248   @raises errors.ParameterError: if there are duplicate keys
249
250   """
251   NO_PREFIX = "no_"
252   UN_PREFIX = "-"
253   kv_dict = {}
254   for elem in data.split(","):
255     if "=" in elem:
256       key, val = elem.split("=", 1)
257     else:
258       if elem.startswith(NO_PREFIX):
259         key, val = elem[len(NO_PREFIX):], False
260       elif elem.startswith(UN_PREFIX):
261         key, val = elem[len(UN_PREFIX):], None
262       else:
263         key, val = elem, True
264     if key in kv_dict:
265       raise errors.ParameterError("Duplicate key '%s' in option %s" %
266                                   (key, opt))
267     kv_dict[key] = val
268   return kv_dict
269
270
271 def check_ident_key_val(option, opt, value):
272   """Custom parser for the IdentKeyVal option type.
273
274   """
275   if ":" not in value:
276     retval =  (value, {})
277   else:
278     ident, rest = value.split(":", 1)
279     kv_dict = _SplitKeyVal(opt, rest)
280     retval = (ident, kv_dict)
281   return retval
282
283
284 class IdentKeyValOption(Option):
285   """Custom option class for ident:key=val,key=val options.
286
287   This will store the parsed values as a tuple (ident, {key: val}). As
288   such, multiple uses of this option via action=append is possible.
289
290   """
291   TYPES = Option.TYPES + ("identkeyval",)
292   TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
293   TYPE_CHECKER["identkeyval"] = check_ident_key_val
294
295
296 def check_key_val(option, opt, value):
297   """Custom parser for the KeyVal option type.
298
299   """
300   return _SplitKeyVal(opt, value)
301
302
303 class KeyValOption(Option):
304   """Custom option class for key=val,key=val options.
305
306   This will store the parsed values as a dict {key: val}.
307
308   """
309   TYPES = Option.TYPES + ("keyval",)
310   TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
311   TYPE_CHECKER["keyval"] = check_key_val
312
313
314 # optparse.py sets make_option, so we do it for our own option class, too
315 cli_option = CliOption
316 ikv_option = IdentKeyValOption
317 keyval_option = KeyValOption
318
319
320 def _ParseArgs(argv, commands, aliases):
321   """Parser for the command line arguments.
322
323   This function parses the arguements and returns the function which
324   must be executed together with its (modified) arguments.
325
326   @param argv: the command line
327   @param commands: dictionary with special contents, see the design
328       doc for cmdline handling
329   @param aliases: dictionary with command aliases {'alias': 'target, ...}
330
331   """
332   if len(argv) == 0:
333     binary = "<command>"
334   else:
335     binary = argv[0].split("/")[-1]
336
337   if len(argv) > 1 and argv[1] == "--version":
338     print "%s (ganeti) %s" % (binary, constants.RELEASE_VERSION)
339     # Quit right away. That way we don't have to care about this special
340     # argument. optparse.py does it the same.
341     sys.exit(0)
342
343   if len(argv) < 2 or not (argv[1] in commands or
344                            argv[1] in aliases):
345     # let's do a nice thing
346     sortedcmds = commands.keys()
347     sortedcmds.sort()
348     print ("Usage: %(bin)s {command} [options...] [argument...]"
349            "\n%(bin)s <command> --help to see details, or"
350            " man %(bin)s\n" % {"bin": binary})
351     # compute the max line length for cmd + usage
352     mlen = max([len(" %s" % cmd) for cmd in commands])
353     mlen = min(60, mlen) # should not get here...
354     # and format a nice command list
355     print "Commands:"
356     for cmd in sortedcmds:
357       cmdstr = " %s" % (cmd,)
358       help_text = commands[cmd][4]
359       help_lines = textwrap.wrap(help_text, 79-3-mlen)
360       print "%-*s - %s" % (mlen, cmdstr, help_lines.pop(0))
361       for line in help_lines:
362         print "%-*s   %s" % (mlen, "", line)
363     print
364     return None, None, None
365
366   # get command, unalias it, and look it up in commands
367   cmd = argv.pop(1)
368   if cmd in aliases:
369     if cmd in commands:
370       raise errors.ProgrammerError("Alias '%s' overrides an existing"
371                                    " command" % cmd)
372
373     if aliases[cmd] not in commands:
374       raise errors.ProgrammerError("Alias '%s' maps to non-existing"
375                                    " command '%s'" % (cmd, aliases[cmd]))
376
377     cmd = aliases[cmd]
378
379   func, nargs, parser_opts, usage, description = commands[cmd]
380   parser = OptionParser(option_list=parser_opts,
381                         description=description,
382                         formatter=TitledHelpFormatter(),
383                         usage="%%prog %s %s" % (cmd, usage))
384   parser.disable_interspersed_args()
385   options, args = parser.parse_args()
386   if nargs is None:
387     if len(args) != 0:
388       print >> sys.stderr, ("Error: Command %s expects no arguments" % cmd)
389       return None, None, None
390   elif nargs < 0 and len(args) != -nargs:
391     print >> sys.stderr, ("Error: Command %s expects %d argument(s)" %
392                          (cmd, -nargs))
393     return None, None, None
394   elif nargs >= 0 and len(args) < nargs:
395     print >> sys.stderr, ("Error: Command %s expects at least %d argument(s)" %
396                          (cmd, nargs))
397     return None, None, None
398
399   return func, options, args
400
401
402 def SplitNodeOption(value):
403   """Splits the value of a --node option.
404
405   """
406   if value and ':' in value:
407     return value.split(':', 1)
408   else:
409     return (value, None)
410
411
412 def UsesRPC(fn):
413   def wrapper(*args, **kwargs):
414     rpc.Init()
415     try:
416       return fn(*args, **kwargs)
417     finally:
418       rpc.Shutdown()
419   return wrapper
420
421
422 def AskUser(text, choices=None):
423   """Ask the user a question.
424
425   @param text: the question to ask
426
427   @param choices: list with elements tuples (input_char, return_value,
428       description); if not given, it will default to: [('y', True,
429       'Perform the operation'), ('n', False, 'Do no do the operation')];
430       note that the '?' char is reserved for help
431
432   @return: one of the return values from the choices list; if input is
433       not possible (i.e. not running with a tty, we return the last
434       entry from the list
435
436   """
437   if choices is None:
438     choices = [('y', True, 'Perform the operation'),
439                ('n', False, 'Do not perform the operation')]
440   if not choices or not isinstance(choices, list):
441     raise errors.ProgrammerError("Invalid choiches argument to AskUser")
442   for entry in choices:
443     if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
444       raise errors.ProgrammerError("Invalid choiches element to AskUser")
445
446   answer = choices[-1][1]
447   new_text = []
448   for line in text.splitlines():
449     new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
450   text = "\n".join(new_text)
451   try:
452     f = file("/dev/tty", "a+")
453   except IOError:
454     return answer
455   try:
456     chars = [entry[0] for entry in choices]
457     chars[-1] = "[%s]" % chars[-1]
458     chars.append('?')
459     maps = dict([(entry[0], entry[1]) for entry in choices])
460     while True:
461       f.write(text)
462       f.write('\n')
463       f.write("/".join(chars))
464       f.write(": ")
465       line = f.readline(2).strip().lower()
466       if line in maps:
467         answer = maps[line]
468         break
469       elif line == '?':
470         for entry in choices:
471           f.write(" %s - %s\n" % (entry[0], entry[2]))
472         f.write("\n")
473         continue
474   finally:
475     f.close()
476   return answer
477
478
479 class JobSubmittedException(Exception):
480   """Job was submitted, client should exit.
481
482   This exception has one argument, the ID of the job that was
483   submitted. The handler should print this ID.
484
485   This is not an error, just a structured way to exit from clients.
486
487   """
488
489
490 def SendJob(ops, cl=None):
491   """Function to submit an opcode without waiting for the results.
492
493   @type ops: list
494   @param ops: list of opcodes
495   @type cl: luxi.Client
496   @param cl: the luxi client to use for communicating with the master;
497              if None, a new client will be created
498
499   """
500   if cl is None:
501     cl = GetClient()
502
503   job_id = cl.SubmitJob(ops)
504
505   return job_id
506
507
508 def PollJob(job_id, cl=None, feedback_fn=None):
509   """Function to poll for the result of a job.
510
511   @type job_id: job identified
512   @param job_id: the job to poll for results
513   @type cl: luxi.Client
514   @param cl: the luxi client to use for communicating with the master;
515              if None, a new client will be created
516
517   """
518   if cl is None:
519     cl = GetClient()
520
521   prev_job_info = None
522   prev_logmsg_serial = None
523
524   while True:
525     result = cl.WaitForJobChange(job_id, ["status"], prev_job_info,
526                                  prev_logmsg_serial)
527     if not result:
528       # job not found, go away!
529       raise errors.JobLost("Job with id %s lost" % job_id)
530
531     # Split result, a tuple of (field values, log entries)
532     (job_info, log_entries) = result
533     (status, ) = job_info
534
535     if log_entries:
536       for log_entry in log_entries:
537         (serial, timestamp, _, message) = log_entry
538         if callable(feedback_fn):
539           feedback_fn(log_entry[1:])
540         else:
541           encoded = utils.SafeEncode(message)
542           print "%s %s" % (time.ctime(utils.MergeTime(timestamp)), encoded)
543         prev_logmsg_serial = max(prev_logmsg_serial, serial)
544
545     # TODO: Handle canceled and archived jobs
546     elif status in (constants.JOB_STATUS_SUCCESS,
547                     constants.JOB_STATUS_ERROR,
548                     constants.JOB_STATUS_CANCELING,
549                     constants.JOB_STATUS_CANCELED):
550       break
551
552     prev_job_info = job_info
553
554   jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
555   if not jobs:
556     raise errors.JobLost("Job with id %s lost" % job_id)
557
558   status, opstatus, result = jobs[0]
559   if status == constants.JOB_STATUS_SUCCESS:
560     return result
561   elif status in (constants.JOB_STATUS_CANCELING,
562                   constants.JOB_STATUS_CANCELED):
563     raise errors.OpExecError("Job was canceled")
564   else:
565     has_ok = False
566     for idx, (status, msg) in enumerate(zip(opstatus, result)):
567       if status == constants.OP_STATUS_SUCCESS:
568         has_ok = True
569       elif status == constants.OP_STATUS_ERROR:
570         if has_ok:
571           raise errors.OpExecError("partial failure (opcode %d): %s" %
572                                    (idx, msg))
573         else:
574           raise errors.OpExecError(str(msg))
575     # default failure mode
576     raise errors.OpExecError(result)
577
578
579 def SubmitOpCode(op, cl=None, feedback_fn=None):
580   """Legacy function to submit an opcode.
581
582   This is just a simple wrapper over the construction of the processor
583   instance. It should be extended to better handle feedback and
584   interaction functions.
585
586   """
587   if cl is None:
588     cl = GetClient()
589
590   job_id = SendJob([op], cl)
591
592   op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
593
594   return op_results[0]
595
596
597 def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
598   """Wrapper around SubmitOpCode or SendJob.
599
600   This function will decide, based on the 'opts' parameter, whether to
601   submit and wait for the result of the opcode (and return it), or
602   whether to just send the job and print its identifier. It is used in
603   order to simplify the implementation of the '--submit' option.
604
605   """
606   if opts and opts.submit_only:
607     job_id = SendJob([op], cl=cl)
608     raise JobSubmittedException(job_id)
609   else:
610     return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn)
611
612
613 def GetClient():
614   # TODO: Cache object?
615   try:
616     client = luxi.Client()
617   except luxi.NoMasterError:
618     master, myself = ssconf.GetMasterAndMyself()
619     if master != myself:
620       raise errors.OpPrereqError("This is not the master node, please connect"
621                                  " to node '%s' and rerun the command" %
622                                  master)
623     else:
624       raise
625   return client
626
627
628 def FormatError(err):
629   """Return a formatted error message for a given error.
630
631   This function takes an exception instance and returns a tuple
632   consisting of two values: first, the recommended exit code, and
633   second, a string describing the error message (not
634   newline-terminated).
635
636   """
637   retcode = 1
638   obuf = StringIO()
639   msg = str(err)
640   if isinstance(err, errors.ConfigurationError):
641     txt = "Corrupt configuration file: %s" % msg
642     logging.error(txt)
643     obuf.write(txt + "\n")
644     obuf.write("Aborting.")
645     retcode = 2
646   elif isinstance(err, errors.HooksAbort):
647     obuf.write("Failure: hooks execution failed:\n")
648     for node, script, out in err.args[0]:
649       if out:
650         obuf.write("  node: %s, script: %s, output: %s\n" %
651                    (node, script, out))
652       else:
653         obuf.write("  node: %s, script: %s (no output)\n" %
654                    (node, script))
655   elif isinstance(err, errors.HooksFailure):
656     obuf.write("Failure: hooks general failure: %s" % msg)
657   elif isinstance(err, errors.ResolverError):
658     this_host = utils.HostInfo.SysName()
659     if err.args[0] == this_host:
660       msg = "Failure: can't resolve my own hostname ('%s')"
661     else:
662       msg = "Failure: can't resolve hostname '%s'"
663     obuf.write(msg % err.args[0])
664   elif isinstance(err, errors.OpPrereqError):
665     obuf.write("Failure: prerequisites not met for this"
666                " operation:\n%s" % msg)
667   elif isinstance(err, errors.OpExecError):
668     obuf.write("Failure: command execution error:\n%s" % msg)
669   elif isinstance(err, errors.TagError):
670     obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
671   elif isinstance(err, errors.JobQueueDrainError):
672     obuf.write("Failure: the job queue is marked for drain and doesn't"
673                " accept new requests\n")
674   elif isinstance(err, errors.JobQueueFull):
675     obuf.write("Failure: the job queue is full and doesn't accept new"
676                " job submissions until old jobs are archived\n")
677   elif isinstance(err, errors.TypeEnforcementError):
678     obuf.write("Parameter Error: %s" % msg)
679   elif isinstance(err, errors.GenericError):
680     obuf.write("Unhandled Ganeti error: %s" % msg)
681   elif isinstance(err, luxi.NoMasterError):
682     obuf.write("Cannot communicate with the master daemon.\nIs it running"
683                " and listening for connections?")
684   elif isinstance(err, luxi.TimeoutError):
685     obuf.write("Timeout while talking to the master daemon. Error:\n"
686                "%s" % msg)
687   elif isinstance(err, luxi.ProtocolError):
688     obuf.write("Unhandled protocol error while talking to the master daemon:\n"
689                "%s" % msg)
690   elif isinstance(err, JobSubmittedException):
691     obuf.write("JobID: %s\n" % err.args[0])
692     retcode = 0
693   else:
694     obuf.write("Unhandled exception: %s" % msg)
695   return retcode, obuf.getvalue().rstrip('\n')
696
697
698 def GenericMain(commands, override=None, aliases=None):
699   """Generic main function for all the gnt-* commands.
700
701   Arguments:
702     - commands: a dictionary with a special structure, see the design doc
703                 for command line handling.
704     - override: if not None, we expect a dictionary with keys that will
705                 override command line options; this can be used to pass
706                 options from the scripts to generic functions
707     - aliases: dictionary with command aliases {'alias': 'target, ...}
708
709   """
710   # save the program name and the entire command line for later logging
711   if sys.argv:
712     binary = os.path.basename(sys.argv[0]) or sys.argv[0]
713     if len(sys.argv) >= 2:
714       binary += " " + sys.argv[1]
715       old_cmdline = " ".join(sys.argv[2:])
716     else:
717       old_cmdline = ""
718   else:
719     binary = "<unknown program>"
720     old_cmdline = ""
721
722   if aliases is None:
723     aliases = {}
724
725   func, options, args = _ParseArgs(sys.argv, commands, aliases)
726   if func is None: # parse error
727     return 1
728
729   if override is not None:
730     for key, val in override.iteritems():
731       setattr(options, key, val)
732
733   utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
734                      stderr_logging=True, program=binary)
735
736   utils.debug = options.debug
737
738   if old_cmdline:
739     logging.info("run with arguments '%s'", old_cmdline)
740   else:
741     logging.info("run with no arguments")
742
743   try:
744     result = func(options, args)
745   except (errors.GenericError, luxi.ProtocolError,
746           JobSubmittedException), err:
747     result, err_msg = FormatError(err)
748     logging.exception("Error durring command processing")
749     ToStderr(err_msg)
750
751   return result
752
753
754 def GenerateTable(headers, fields, separator, data,
755                   numfields=None, unitfields=None,
756                   units=None):
757   """Prints a table with headers and different fields.
758
759   @type headers: dict
760   @param headers: dictionary mapping field names to headers for
761       the table
762   @type fields: list
763   @param fields: the field names corresponding to each row in
764       the data field
765   @param separator: the separator to be used; if this is None,
766       the default 'smart' algorithm is used which computes optimal
767       field width, otherwise just the separator is used between
768       each field
769   @type data: list
770   @param data: a list of lists, each sublist being one row to be output
771   @type numfields: list
772   @param numfields: a list with the fields that hold numeric
773       values and thus should be right-aligned
774   @type unitfields: list
775   @param unitfields: a list with the fields that hold numeric
776       values that should be formatted with the units field
777   @type units: string or None
778   @param units: the units we should use for formatting, or None for
779       automatic choice (human-readable for non-separator usage, otherwise
780       megabytes); this is a one-letter string
781
782   """
783   if units is None:
784     if separator:
785       units = "m"
786     else:
787       units = "h"
788
789   if numfields is None:
790     numfields = []
791   if unitfields is None:
792     unitfields = []
793
794   numfields = utils.FieldSet(*numfields)
795   unitfields = utils.FieldSet(*unitfields)
796
797   format_fields = []
798   for field in fields:
799     if headers and field not in headers:
800       # TODO: handle better unknown fields (either revert to old
801       # style of raising exception, or deal more intelligently with
802       # variable fields)
803       headers[field] = field
804     if separator is not None:
805       format_fields.append("%s")
806     elif numfields.Matches(field):
807       format_fields.append("%*s")
808     else:
809       format_fields.append("%-*s")
810
811   if separator is None:
812     mlens = [0 for name in fields]
813     format = ' '.join(format_fields)
814   else:
815     format = separator.replace("%", "%%").join(format_fields)
816
817   for row in data:
818     if row is None:
819       continue
820     for idx, val in enumerate(row):
821       if unitfields.Matches(fields[idx]):
822         try:
823           val = int(val)
824         except ValueError:
825           pass
826         else:
827           val = row[idx] = utils.FormatUnit(val, units)
828       val = row[idx] = str(val)
829       if separator is None:
830         mlens[idx] = max(mlens[idx], len(val))
831
832   result = []
833   if headers:
834     args = []
835     for idx, name in enumerate(fields):
836       hdr = headers[name]
837       if separator is None:
838         mlens[idx] = max(mlens[idx], len(hdr))
839         args.append(mlens[idx])
840       args.append(hdr)
841     result.append(format % tuple(args))
842
843   for line in data:
844     args = []
845     if line is None:
846       line = ['-' for _ in fields]
847     for idx in xrange(len(fields)):
848       if separator is None:
849         args.append(mlens[idx])
850       args.append(line[idx])
851     result.append(format % tuple(args))
852
853   return result
854
855
856 def FormatTimestamp(ts):
857   """Formats a given timestamp.
858
859   @type ts: timestamp
860   @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
861
862   @rtype: string
863   @return: a string with the formatted timestamp
864
865   """
866   if not isinstance (ts, (tuple, list)) or len(ts) != 2:
867     return '?'
868   sec, usec = ts
869   return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
870
871
872 def ParseTimespec(value):
873   """Parse a time specification.
874
875   The following suffixed will be recognized:
876
877     - s: seconds
878     - m: minutes
879     - h: hours
880     - d: day
881     - w: weeks
882
883   Without any suffix, the value will be taken to be in seconds.
884
885   """
886   value = str(value)
887   if not value:
888     raise errors.OpPrereqError("Empty time specification passed")
889   suffix_map = {
890     's': 1,
891     'm': 60,
892     'h': 3600,
893     'd': 86400,
894     'w': 604800,
895     }
896   if value[-1] not in suffix_map:
897     try:
898       value = int(value)
899     except ValueError:
900       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
901   else:
902     multiplier = suffix_map[value[-1]]
903     value = value[:-1]
904     if not value: # no data left after stripping the suffix
905       raise errors.OpPrereqError("Invalid time specification (only"
906                                  " suffix passed)")
907     try:
908       value = int(value) * multiplier
909     except ValueError:
910       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
911   return value
912
913
914 def GetOnlineNodes(nodes, cl=None, nowarn=False):
915   """Returns the names of online nodes.
916
917   This function will also log a warning on stderr with the names of
918   the online nodes.
919
920   @param nodes: if not empty, use only this subset of nodes (minus the
921       offline ones)
922   @param cl: if not None, luxi client to use
923   @type nowarn: boolean
924   @param nowarn: by default, this function will output a note with the
925       offline nodes that are skipped; if this parameter is True the
926       note is not displayed
927
928   """
929   if cl is None:
930     cl = GetClient()
931
932   result = cl.QueryNodes(names=nodes, fields=["name", "offline"],
933                          use_locking=False)
934   offline = [row[0] for row in result if row[1]]
935   if offline and not nowarn:
936     ToStderr("Note: skipping offline node(s): %s" % ", ".join(offline))
937   return [row[0] for row in result if not row[1]]
938
939
940 def _ToStream(stream, txt, *args):
941   """Write a message to a stream, bypassing the logging system
942
943   @type stream: file object
944   @param stream: the file to which we should write
945   @type txt: str
946   @param txt: the message
947
948   """
949   if args:
950     args = tuple(args)
951     stream.write(txt % args)
952   else:
953     stream.write(txt)
954   stream.write('\n')
955   stream.flush()
956
957
958 def ToStdout(txt, *args):
959   """Write a message to stdout only, bypassing the logging system
960
961   This is just a wrapper over _ToStream.
962
963   @type txt: str
964   @param txt: the message
965
966   """
967   _ToStream(sys.stdout, txt, *args)
968
969
970 def ToStderr(txt, *args):
971   """Write a message to stderr only, bypassing the logging system
972
973   This is just a wrapper over _ToStream.
974
975   @type txt: str
976   @param txt: the message
977
978   """
979   _ToStream(sys.stderr, txt, *args)
980
981
982 class JobExecutor(object):
983   """Class which manages the submission and execution of multiple jobs.
984
985   Note that instances of this class should not be reused between
986   GetResults() calls.
987
988   """
989   def __init__(self, cl=None, verbose=True):
990     self.queue = []
991     if cl is None:
992       cl = GetClient()
993     self.cl = cl
994     self.verbose = verbose
995
996   def QueueJob(self, name, *ops):
997     """Submit a job for execution.
998
999     @type name: string
1000     @param name: a description of the job, will be used in WaitJobSet
1001     """
1002     job_id = SendJob(ops, cl=self.cl)
1003     self.queue.append((job_id, name))
1004
1005   def GetResults(self):
1006     """Wait for and return the results of all jobs.
1007
1008     @rtype: list
1009     @return: list of tuples (success, job results), in the same order
1010         as the submitted jobs; if a job has failed, instead of the result
1011         there will be the error message
1012
1013     """
1014     results = []
1015     if self.verbose:
1016       ToStdout("Submitted jobs %s", ", ".join(row[0] for row in self.queue))
1017     for jid, name in self.queue:
1018       if self.verbose:
1019         ToStdout("Waiting for job %s for %s...", jid, name)
1020       try:
1021         job_result = PollJob(jid, cl=self.cl)
1022         success = True
1023       except (errors.GenericError, luxi.ProtocolError), err:
1024         _, job_result = FormatError(err)
1025         success = False
1026         # the error message will always be shown, verbose or not
1027         ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
1028
1029       results.append((success, job_result))
1030     return results
1031
1032   def WaitOrShow(self, wait):
1033     """Wait for job results or only print the job IDs.
1034
1035     @type wait: boolean
1036     @param wait: whether to wait or not
1037
1038     """
1039     if wait:
1040       return self.GetResults()
1041     else:
1042       for jid, name in self.queue:
1043         ToStdout("%s: %s", jid, name)