Convert export_info rpc to new style result
[ganeti-local] / lib / cli.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module dealing with command line parsing"""
23
24
25 import sys
26 import textwrap
27 import os.path
28 import copy
29 import time
30 import logging
31 from cStringIO import StringIO
32
33 from ganeti import utils
34 from ganeti import errors
35 from ganeti import constants
36 from ganeti import opcodes
37 from ganeti import luxi
38 from ganeti import ssconf
39 from ganeti import rpc
40
41 from optparse import (OptionParser, make_option, TitledHelpFormatter,
42                       Option, OptionValueError)
43
44 __all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain",
45            "SubmitOpCode", "GetClient",
46            "cli_option", "ikv_option", "keyval_option",
47            "GenerateTable", "AskUser",
48            "ARGS_NONE", "ARGS_FIXED", "ARGS_ATLEAST", "ARGS_ANY", "ARGS_ONE",
49            "USEUNITS_OPT", "FIELDS_OPT", "FORCE_OPT", "SUBMIT_OPT",
50            "ListTags", "AddTags", "RemoveTags", "TAG_SRC_OPT",
51            "FormatError", "SplitNodeOption", "SubmitOrSend",
52            "JobSubmittedException", "FormatTimestamp", "ParseTimespec",
53            "ToStderr", "ToStdout", "UsesRPC",
54            "GetOnlineNodes", "JobExecutor", "SYNC_OPT", "CONFIRM_OPT",
55            ]
56
57
58 def _ExtractTagsObject(opts, args):
59   """Extract the tag type object.
60
61   Note that this function will modify its args parameter.
62
63   """
64   if not hasattr(opts, "tag_type"):
65     raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
66   kind = opts.tag_type
67   if kind == constants.TAG_CLUSTER:
68     retval = kind, kind
69   elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
70     if not args:
71       raise errors.OpPrereqError("no arguments passed to the command")
72     name = args.pop(0)
73     retval = kind, name
74   else:
75     raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
76   return retval
77
78
79 def _ExtendTags(opts, args):
80   """Extend the args if a source file has been given.
81
82   This function will extend the tags with the contents of the file
83   passed in the 'tags_source' attribute of the opts parameter. A file
84   named '-' will be replaced by stdin.
85
86   """
87   fname = opts.tags_source
88   if fname is None:
89     return
90   if fname == "-":
91     new_fh = sys.stdin
92   else:
93     new_fh = open(fname, "r")
94   new_data = []
95   try:
96     # we don't use the nice 'new_data = [line.strip() for line in fh]'
97     # because of python bug 1633941
98     while True:
99       line = new_fh.readline()
100       if not line:
101         break
102       new_data.append(line.strip())
103   finally:
104     new_fh.close()
105   args.extend(new_data)
106
107
108 def ListTags(opts, args):
109   """List the tags on a given object.
110
111   This is a generic implementation that knows how to deal with all
112   three cases of tag objects (cluster, node, instance). The opts
113   argument is expected to contain a tag_type field denoting what
114   object type we work on.
115
116   """
117   kind, name = _ExtractTagsObject(opts, args)
118   op = opcodes.OpGetTags(kind=kind, name=name)
119   result = SubmitOpCode(op)
120   result = list(result)
121   result.sort()
122   for tag in result:
123     print tag
124
125
126 def AddTags(opts, args):
127   """Add tags on a given object.
128
129   This is a generic implementation that knows how to deal with all
130   three cases of tag objects (cluster, node, instance). The opts
131   argument is expected to contain a tag_type field denoting what
132   object type we work on.
133
134   """
135   kind, name = _ExtractTagsObject(opts, args)
136   _ExtendTags(opts, args)
137   if not args:
138     raise errors.OpPrereqError("No tags to be added")
139   op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
140   SubmitOpCode(op)
141
142
143 def RemoveTags(opts, args):
144   """Remove tags from a given object.
145
146   This is a generic implementation that knows how to deal with all
147   three cases of tag objects (cluster, node, instance). The opts
148   argument is expected to contain a tag_type field denoting what
149   object type we work on.
150
151   """
152   kind, name = _ExtractTagsObject(opts, args)
153   _ExtendTags(opts, args)
154   if not args:
155     raise errors.OpPrereqError("No tags to be removed")
156   op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
157   SubmitOpCode(op)
158
159
160 DEBUG_OPT = make_option("-d", "--debug", default=False,
161                         action="store_true",
162                         help="Turn debugging on")
163
164 NOHDR_OPT = make_option("--no-headers", default=False,
165                         action="store_true", dest="no_headers",
166                         help="Don't display column headers")
167
168 SEP_OPT = make_option("--separator", default=None,
169                       action="store", dest="separator",
170                       help="Separator between output fields"
171                       " (defaults to one space)")
172
173 USEUNITS_OPT = make_option("--units", default=None,
174                            dest="units", choices=('h', 'm', 'g', 't'),
175                            help="Specify units for output (one of hmgt)")
176
177 FIELDS_OPT = make_option("-o", "--output", dest="output", action="store",
178                          type="string", help="Comma separated list of"
179                          " output fields",
180                          metavar="FIELDS")
181
182 FORCE_OPT = make_option("-f", "--force", dest="force", action="store_true",
183                         default=False, help="Force the operation")
184
185 CONFIRM_OPT = make_option("--yes", dest="confirm", action="store_true",
186                           default=False, help="Do not require confirmation")
187
188 TAG_SRC_OPT = make_option("--from", dest="tags_source",
189                           default=None, help="File with tag names")
190
191 SUBMIT_OPT = make_option("--submit", dest="submit_only",
192                          default=False, action="store_true",
193                          help="Submit the job and return the job ID, but"
194                          " don't wait for the job to finish")
195
196 SYNC_OPT = make_option("--sync", dest="do_locking",
197                        default=False, action="store_true",
198                        help="Grab locks while doing the queries"
199                        " in order to ensure more consistent results")
200
201
202 def ARGS_FIXED(val):
203   """Macro-like function denoting a fixed number of arguments"""
204   return -val
205
206
207 def ARGS_ATLEAST(val):
208   """Macro-like function denoting a minimum number of arguments"""
209   return val
210
211
212 ARGS_NONE = None
213 ARGS_ONE = ARGS_FIXED(1)
214 ARGS_ANY = ARGS_ATLEAST(0)
215
216
217 def check_unit(option, opt, value):
218   """OptParsers custom converter for units.
219
220   """
221   try:
222     return utils.ParseUnit(value)
223   except errors.UnitParseError, err:
224     raise OptionValueError("option %s: %s" % (opt, err))
225
226
227 class CliOption(Option):
228   """Custom option class for optparse.
229
230   """
231   TYPES = Option.TYPES + ("unit",)
232   TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
233   TYPE_CHECKER["unit"] = check_unit
234
235
236 def _SplitKeyVal(opt, data):
237   """Convert a KeyVal string into a dict.
238
239   This function will convert a key=val[,...] string into a dict. Empty
240   values will be converted specially: keys which have the prefix 'no_'
241   will have the value=False and the prefix stripped, the others will
242   have value=True.
243
244   @type opt: string
245   @param opt: a string holding the option name for which we process the
246       data, used in building error messages
247   @type data: string
248   @param data: a string of the format key=val,key=val,...
249   @rtype: dict
250   @return: {key=val, key=val}
251   @raises errors.ParameterError: if there are duplicate keys
252
253   """
254   NO_PREFIX = "no_"
255   UN_PREFIX = "-"
256   kv_dict = {}
257   for elem in data.split(","):
258     if "=" in elem:
259       key, val = elem.split("=", 1)
260     else:
261       if elem.startswith(NO_PREFIX):
262         key, val = elem[len(NO_PREFIX):], False
263       elif elem.startswith(UN_PREFIX):
264         key, val = elem[len(UN_PREFIX):], None
265       else:
266         key, val = elem, True
267     if key in kv_dict:
268       raise errors.ParameterError("Duplicate key '%s' in option %s" %
269                                   (key, opt))
270     kv_dict[key] = val
271   return kv_dict
272
273
274 def check_ident_key_val(option, opt, value):
275   """Custom parser for the IdentKeyVal option type.
276
277   """
278   if ":" not in value:
279     retval =  (value, {})
280   else:
281     ident, rest = value.split(":", 1)
282     kv_dict = _SplitKeyVal(opt, rest)
283     retval = (ident, kv_dict)
284   return retval
285
286
287 class IdentKeyValOption(Option):
288   """Custom option class for ident:key=val,key=val options.
289
290   This will store the parsed values as a tuple (ident, {key: val}). As
291   such, multiple uses of this option via action=append is possible.
292
293   """
294   TYPES = Option.TYPES + ("identkeyval",)
295   TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
296   TYPE_CHECKER["identkeyval"] = check_ident_key_val
297
298
299 def check_key_val(option, opt, value):
300   """Custom parser for the KeyVal option type.
301
302   """
303   return _SplitKeyVal(opt, value)
304
305
306 class KeyValOption(Option):
307   """Custom option class for key=val,key=val options.
308
309   This will store the parsed values as a dict {key: val}.
310
311   """
312   TYPES = Option.TYPES + ("keyval",)
313   TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
314   TYPE_CHECKER["keyval"] = check_key_val
315
316
317 # optparse.py sets make_option, so we do it for our own option class, too
318 cli_option = CliOption
319 ikv_option = IdentKeyValOption
320 keyval_option = KeyValOption
321
322
323 def _ParseArgs(argv, commands, aliases):
324   """Parser for the command line arguments.
325
326   This function parses the arguements and returns the function which
327   must be executed together with its (modified) arguments.
328
329   @param argv: the command line
330   @param commands: dictionary with special contents, see the design
331       doc for cmdline handling
332   @param aliases: dictionary with command aliases {'alias': 'target, ...}
333
334   """
335   if len(argv) == 0:
336     binary = "<command>"
337   else:
338     binary = argv[0].split("/")[-1]
339
340   if len(argv) > 1 and argv[1] == "--version":
341     print "%s (ganeti) %s" % (binary, constants.RELEASE_VERSION)
342     # Quit right away. That way we don't have to care about this special
343     # argument. optparse.py does it the same.
344     sys.exit(0)
345
346   if len(argv) < 2 or not (argv[1] in commands or
347                            argv[1] in aliases):
348     # let's do a nice thing
349     sortedcmds = commands.keys()
350     sortedcmds.sort()
351     print ("Usage: %(bin)s {command} [options...] [argument...]"
352            "\n%(bin)s <command> --help to see details, or"
353            " man %(bin)s\n" % {"bin": binary})
354     # compute the max line length for cmd + usage
355     mlen = max([len(" %s" % cmd) for cmd in commands])
356     mlen = min(60, mlen) # should not get here...
357     # and format a nice command list
358     print "Commands:"
359     for cmd in sortedcmds:
360       cmdstr = " %s" % (cmd,)
361       help_text = commands[cmd][4]
362       help_lines = textwrap.wrap(help_text, 79-3-mlen)
363       print "%-*s - %s" % (mlen, cmdstr, help_lines.pop(0))
364       for line in help_lines:
365         print "%-*s   %s" % (mlen, "", line)
366     print
367     return None, None, None
368
369   # get command, unalias it, and look it up in commands
370   cmd = argv.pop(1)
371   if cmd in aliases:
372     if cmd in commands:
373       raise errors.ProgrammerError("Alias '%s' overrides an existing"
374                                    " command" % cmd)
375
376     if aliases[cmd] not in commands:
377       raise errors.ProgrammerError("Alias '%s' maps to non-existing"
378                                    " command '%s'" % (cmd, aliases[cmd]))
379
380     cmd = aliases[cmd]
381
382   func, nargs, parser_opts, usage, description = commands[cmd]
383   parser = OptionParser(option_list=parser_opts,
384                         description=description,
385                         formatter=TitledHelpFormatter(),
386                         usage="%%prog %s %s" % (cmd, usage))
387   parser.disable_interspersed_args()
388   options, args = parser.parse_args()
389   if nargs is None:
390     if len(args) != 0:
391       print >> sys.stderr, ("Error: Command %s expects no arguments" % cmd)
392       return None, None, None
393   elif nargs < 0 and len(args) != -nargs:
394     print >> sys.stderr, ("Error: Command %s expects %d argument(s)" %
395                          (cmd, -nargs))
396     return None, None, None
397   elif nargs >= 0 and len(args) < nargs:
398     print >> sys.stderr, ("Error: Command %s expects at least %d argument(s)" %
399                          (cmd, nargs))
400     return None, None, None
401
402   return func, options, args
403
404
405 def SplitNodeOption(value):
406   """Splits the value of a --node option.
407
408   """
409   if value and ':' in value:
410     return value.split(':', 1)
411   else:
412     return (value, None)
413
414
415 def UsesRPC(fn):
416   def wrapper(*args, **kwargs):
417     rpc.Init()
418     try:
419       return fn(*args, **kwargs)
420     finally:
421       rpc.Shutdown()
422   return wrapper
423
424
425 def AskUser(text, choices=None):
426   """Ask the user a question.
427
428   @param text: the question to ask
429
430   @param choices: list with elements tuples (input_char, return_value,
431       description); if not given, it will default to: [('y', True,
432       'Perform the operation'), ('n', False, 'Do no do the operation')];
433       note that the '?' char is reserved for help
434
435   @return: one of the return values from the choices list; if input is
436       not possible (i.e. not running with a tty, we return the last
437       entry from the list
438
439   """
440   if choices is None:
441     choices = [('y', True, 'Perform the operation'),
442                ('n', False, 'Do not perform the operation')]
443   if not choices or not isinstance(choices, list):
444     raise errors.ProgrammerError("Invalid choiches argument to AskUser")
445   for entry in choices:
446     if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
447       raise errors.ProgrammerError("Invalid choiches element to AskUser")
448
449   answer = choices[-1][1]
450   new_text = []
451   for line in text.splitlines():
452     new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
453   text = "\n".join(new_text)
454   try:
455     f = file("/dev/tty", "a+")
456   except IOError:
457     return answer
458   try:
459     chars = [entry[0] for entry in choices]
460     chars[-1] = "[%s]" % chars[-1]
461     chars.append('?')
462     maps = dict([(entry[0], entry[1]) for entry in choices])
463     while True:
464       f.write(text)
465       f.write('\n')
466       f.write("/".join(chars))
467       f.write(": ")
468       line = f.readline(2).strip().lower()
469       if line in maps:
470         answer = maps[line]
471         break
472       elif line == '?':
473         for entry in choices:
474           f.write(" %s - %s\n" % (entry[0], entry[2]))
475         f.write("\n")
476         continue
477   finally:
478     f.close()
479   return answer
480
481
482 class JobSubmittedException(Exception):
483   """Job was submitted, client should exit.
484
485   This exception has one argument, the ID of the job that was
486   submitted. The handler should print this ID.
487
488   This is not an error, just a structured way to exit from clients.
489
490   """
491
492
493 def SendJob(ops, cl=None):
494   """Function to submit an opcode without waiting for the results.
495
496   @type ops: list
497   @param ops: list of opcodes
498   @type cl: luxi.Client
499   @param cl: the luxi client to use for communicating with the master;
500              if None, a new client will be created
501
502   """
503   if cl is None:
504     cl = GetClient()
505
506   job_id = cl.SubmitJob(ops)
507
508   return job_id
509
510
511 def PollJob(job_id, cl=None, feedback_fn=None):
512   """Function to poll for the result of a job.
513
514   @type job_id: job identified
515   @param job_id: the job to poll for results
516   @type cl: luxi.Client
517   @param cl: the luxi client to use for communicating with the master;
518              if None, a new client will be created
519
520   """
521   if cl is None:
522     cl = GetClient()
523
524   prev_job_info = None
525   prev_logmsg_serial = None
526
527   while True:
528     result = cl.WaitForJobChange(job_id, ["status"], prev_job_info,
529                                  prev_logmsg_serial)
530     if not result:
531       # job not found, go away!
532       raise errors.JobLost("Job with id %s lost" % job_id)
533
534     # Split result, a tuple of (field values, log entries)
535     (job_info, log_entries) = result
536     (status, ) = job_info
537
538     if log_entries:
539       for log_entry in log_entries:
540         (serial, timestamp, _, message) = log_entry
541         if callable(feedback_fn):
542           feedback_fn(log_entry[1:])
543         else:
544           encoded = utils.SafeEncode(message)
545           print "%s %s" % (time.ctime(utils.MergeTime(timestamp)), encoded)
546         prev_logmsg_serial = max(prev_logmsg_serial, serial)
547
548     # TODO: Handle canceled and archived jobs
549     elif status in (constants.JOB_STATUS_SUCCESS,
550                     constants.JOB_STATUS_ERROR,
551                     constants.JOB_STATUS_CANCELING,
552                     constants.JOB_STATUS_CANCELED):
553       break
554
555     prev_job_info = job_info
556
557   jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
558   if not jobs:
559     raise errors.JobLost("Job with id %s lost" % job_id)
560
561   status, opstatus, result = jobs[0]
562   if status == constants.JOB_STATUS_SUCCESS:
563     return result
564   elif status in (constants.JOB_STATUS_CANCELING,
565                   constants.JOB_STATUS_CANCELED):
566     raise errors.OpExecError("Job was canceled")
567   else:
568     has_ok = False
569     for idx, (status, msg) in enumerate(zip(opstatus, result)):
570       if status == constants.OP_STATUS_SUCCESS:
571         has_ok = True
572       elif status == constants.OP_STATUS_ERROR:
573         if has_ok:
574           raise errors.OpExecError("partial failure (opcode %d): %s" %
575                                    (idx, msg))
576         else:
577           raise errors.OpExecError(str(msg))
578     # default failure mode
579     raise errors.OpExecError(result)
580
581
582 def SubmitOpCode(op, cl=None, feedback_fn=None):
583   """Legacy function to submit an opcode.
584
585   This is just a simple wrapper over the construction of the processor
586   instance. It should be extended to better handle feedback and
587   interaction functions.
588
589   """
590   if cl is None:
591     cl = GetClient()
592
593   job_id = SendJob([op], cl)
594
595   op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
596
597   return op_results[0]
598
599
600 def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
601   """Wrapper around SubmitOpCode or SendJob.
602
603   This function will decide, based on the 'opts' parameter, whether to
604   submit and wait for the result of the opcode (and return it), or
605   whether to just send the job and print its identifier. It is used in
606   order to simplify the implementation of the '--submit' option.
607
608   """
609   if opts and opts.submit_only:
610     job_id = SendJob([op], cl=cl)
611     raise JobSubmittedException(job_id)
612   else:
613     return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn)
614
615
616 def GetClient():
617   # TODO: Cache object?
618   try:
619     client = luxi.Client()
620   except luxi.NoMasterError:
621     master, myself = ssconf.GetMasterAndMyself()
622     if master != myself:
623       raise errors.OpPrereqError("This is not the master node, please connect"
624                                  " to node '%s' and rerun the command" %
625                                  master)
626     else:
627       raise
628   return client
629
630
631 def FormatError(err):
632   """Return a formatted error message for a given error.
633
634   This function takes an exception instance and returns a tuple
635   consisting of two values: first, the recommended exit code, and
636   second, a string describing the error message (not
637   newline-terminated).
638
639   """
640   retcode = 1
641   obuf = StringIO()
642   msg = str(err)
643   if isinstance(err, errors.ConfigurationError):
644     txt = "Corrupt configuration file: %s" % msg
645     logging.error(txt)
646     obuf.write(txt + "\n")
647     obuf.write("Aborting.")
648     retcode = 2
649   elif isinstance(err, errors.HooksAbort):
650     obuf.write("Failure: hooks execution failed:\n")
651     for node, script, out in err.args[0]:
652       if out:
653         obuf.write("  node: %s, script: %s, output: %s\n" %
654                    (node, script, out))
655       else:
656         obuf.write("  node: %s, script: %s (no output)\n" %
657                    (node, script))
658   elif isinstance(err, errors.HooksFailure):
659     obuf.write("Failure: hooks general failure: %s" % msg)
660   elif isinstance(err, errors.ResolverError):
661     this_host = utils.HostInfo.SysName()
662     if err.args[0] == this_host:
663       msg = "Failure: can't resolve my own hostname ('%s')"
664     else:
665       msg = "Failure: can't resolve hostname '%s'"
666     obuf.write(msg % err.args[0])
667   elif isinstance(err, errors.OpPrereqError):
668     obuf.write("Failure: prerequisites not met for this"
669                " operation:\n%s" % msg)
670   elif isinstance(err, errors.OpExecError):
671     obuf.write("Failure: command execution error:\n%s" % msg)
672   elif isinstance(err, errors.TagError):
673     obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
674   elif isinstance(err, errors.JobQueueDrainError):
675     obuf.write("Failure: the job queue is marked for drain and doesn't"
676                " accept new requests\n")
677   elif isinstance(err, errors.JobQueueFull):
678     obuf.write("Failure: the job queue is full and doesn't accept new"
679                " job submissions until old jobs are archived\n")
680   elif isinstance(err, errors.TypeEnforcementError):
681     obuf.write("Parameter Error: %s" % msg)
682   elif isinstance(err, errors.GenericError):
683     obuf.write("Unhandled Ganeti error: %s" % msg)
684   elif isinstance(err, luxi.NoMasterError):
685     obuf.write("Cannot communicate with the master daemon.\nIs it running"
686                " and listening for connections?")
687   elif isinstance(err, luxi.TimeoutError):
688     obuf.write("Timeout while talking to the master daemon. Error:\n"
689                "%s" % msg)
690   elif isinstance(err, luxi.ProtocolError):
691     obuf.write("Unhandled protocol error while talking to the master daemon:\n"
692                "%s" % msg)
693   elif isinstance(err, JobSubmittedException):
694     obuf.write("JobID: %s\n" % err.args[0])
695     retcode = 0
696   else:
697     obuf.write("Unhandled exception: %s" % msg)
698   return retcode, obuf.getvalue().rstrip('\n')
699
700
701 def GenericMain(commands, override=None, aliases=None):
702   """Generic main function for all the gnt-* commands.
703
704   Arguments:
705     - commands: a dictionary with a special structure, see the design doc
706                 for command line handling.
707     - override: if not None, we expect a dictionary with keys that will
708                 override command line options; this can be used to pass
709                 options from the scripts to generic functions
710     - aliases: dictionary with command aliases {'alias': 'target, ...}
711
712   """
713   # save the program name and the entire command line for later logging
714   if sys.argv:
715     binary = os.path.basename(sys.argv[0]) or sys.argv[0]
716     if len(sys.argv) >= 2:
717       binary += " " + sys.argv[1]
718       old_cmdline = " ".join(sys.argv[2:])
719     else:
720       old_cmdline = ""
721   else:
722     binary = "<unknown program>"
723     old_cmdline = ""
724
725   if aliases is None:
726     aliases = {}
727
728   func, options, args = _ParseArgs(sys.argv, commands, aliases)
729   if func is None: # parse error
730     return 1
731
732   if override is not None:
733     for key, val in override.iteritems():
734       setattr(options, key, val)
735
736   utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
737                      stderr_logging=True, program=binary)
738
739   utils.debug = options.debug
740
741   if old_cmdline:
742     logging.info("run with arguments '%s'", old_cmdline)
743   else:
744     logging.info("run with no arguments")
745
746   try:
747     result = func(options, args)
748   except (errors.GenericError, luxi.ProtocolError,
749           JobSubmittedException), err:
750     result, err_msg = FormatError(err)
751     logging.exception("Error durring command processing")
752     ToStderr(err_msg)
753
754   return result
755
756
757 def GenerateTable(headers, fields, separator, data,
758                   numfields=None, unitfields=None,
759                   units=None):
760   """Prints a table with headers and different fields.
761
762   @type headers: dict
763   @param headers: dictionary mapping field names to headers for
764       the table
765   @type fields: list
766   @param fields: the field names corresponding to each row in
767       the data field
768   @param separator: the separator to be used; if this is None,
769       the default 'smart' algorithm is used which computes optimal
770       field width, otherwise just the separator is used between
771       each field
772   @type data: list
773   @param data: a list of lists, each sublist being one row to be output
774   @type numfields: list
775   @param numfields: a list with the fields that hold numeric
776       values and thus should be right-aligned
777   @type unitfields: list
778   @param unitfields: a list with the fields that hold numeric
779       values that should be formatted with the units field
780   @type units: string or None
781   @param units: the units we should use for formatting, or None for
782       automatic choice (human-readable for non-separator usage, otherwise
783       megabytes); this is a one-letter string
784
785   """
786   if units is None:
787     if separator:
788       units = "m"
789     else:
790       units = "h"
791
792   if numfields is None:
793     numfields = []
794   if unitfields is None:
795     unitfields = []
796
797   numfields = utils.FieldSet(*numfields)
798   unitfields = utils.FieldSet(*unitfields)
799
800   format_fields = []
801   for field in fields:
802     if headers and field not in headers:
803       # TODO: handle better unknown fields (either revert to old
804       # style of raising exception, or deal more intelligently with
805       # variable fields)
806       headers[field] = field
807     if separator is not None:
808       format_fields.append("%s")
809     elif numfields.Matches(field):
810       format_fields.append("%*s")
811     else:
812       format_fields.append("%-*s")
813
814   if separator is None:
815     mlens = [0 for name in fields]
816     format = ' '.join(format_fields)
817   else:
818     format = separator.replace("%", "%%").join(format_fields)
819
820   for row in data:
821     if row is None:
822       continue
823     for idx, val in enumerate(row):
824       if unitfields.Matches(fields[idx]):
825         try:
826           val = int(val)
827         except ValueError:
828           pass
829         else:
830           val = row[idx] = utils.FormatUnit(val, units)
831       val = row[idx] = str(val)
832       if separator is None:
833         mlens[idx] = max(mlens[idx], len(val))
834
835   result = []
836   if headers:
837     args = []
838     for idx, name in enumerate(fields):
839       hdr = headers[name]
840       if separator is None:
841         mlens[idx] = max(mlens[idx], len(hdr))
842         args.append(mlens[idx])
843       args.append(hdr)
844     result.append(format % tuple(args))
845
846   for line in data:
847     args = []
848     if line is None:
849       line = ['-' for _ in fields]
850     for idx in xrange(len(fields)):
851       if separator is None:
852         args.append(mlens[idx])
853       args.append(line[idx])
854     result.append(format % tuple(args))
855
856   return result
857
858
859 def FormatTimestamp(ts):
860   """Formats a given timestamp.
861
862   @type ts: timestamp
863   @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
864
865   @rtype: string
866   @return: a string with the formatted timestamp
867
868   """
869   if not isinstance (ts, (tuple, list)) or len(ts) != 2:
870     return '?'
871   sec, usec = ts
872   return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
873
874
875 def ParseTimespec(value):
876   """Parse a time specification.
877
878   The following suffixed will be recognized:
879
880     - s: seconds
881     - m: minutes
882     - h: hours
883     - d: day
884     - w: weeks
885
886   Without any suffix, the value will be taken to be in seconds.
887
888   """
889   value = str(value)
890   if not value:
891     raise errors.OpPrereqError("Empty time specification passed")
892   suffix_map = {
893     's': 1,
894     'm': 60,
895     'h': 3600,
896     'd': 86400,
897     'w': 604800,
898     }
899   if value[-1] not in suffix_map:
900     try:
901       value = int(value)
902     except ValueError:
903       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
904   else:
905     multiplier = suffix_map[value[-1]]
906     value = value[:-1]
907     if not value: # no data left after stripping the suffix
908       raise errors.OpPrereqError("Invalid time specification (only"
909                                  " suffix passed)")
910     try:
911       value = int(value) * multiplier
912     except ValueError:
913       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
914   return value
915
916
917 def GetOnlineNodes(nodes, cl=None, nowarn=False):
918   """Returns the names of online nodes.
919
920   This function will also log a warning on stderr with the names of
921   the online nodes.
922
923   @param nodes: if not empty, use only this subset of nodes (minus the
924       offline ones)
925   @param cl: if not None, luxi client to use
926   @type nowarn: boolean
927   @param nowarn: by default, this function will output a note with the
928       offline nodes that are skipped; if this parameter is True the
929       note is not displayed
930
931   """
932   if cl is None:
933     cl = GetClient()
934
935   result = cl.QueryNodes(names=nodes, fields=["name", "offline"],
936                          use_locking=False)
937   offline = [row[0] for row in result if row[1]]
938   if offline and not nowarn:
939     ToStderr("Note: skipping offline node(s): %s" % ", ".join(offline))
940   return [row[0] for row in result if not row[1]]
941
942
943 def _ToStream(stream, txt, *args):
944   """Write a message to a stream, bypassing the logging system
945
946   @type stream: file object
947   @param stream: the file to which we should write
948   @type txt: str
949   @param txt: the message
950
951   """
952   if args:
953     args = tuple(args)
954     stream.write(txt % args)
955   else:
956     stream.write(txt)
957   stream.write('\n')
958   stream.flush()
959
960
961 def ToStdout(txt, *args):
962   """Write a message to stdout only, bypassing the logging system
963
964   This is just a wrapper over _ToStream.
965
966   @type txt: str
967   @param txt: the message
968
969   """
970   _ToStream(sys.stdout, txt, *args)
971
972
973 def ToStderr(txt, *args):
974   """Write a message to stderr only, bypassing the logging system
975
976   This is just a wrapper over _ToStream.
977
978   @type txt: str
979   @param txt: the message
980
981   """
982   _ToStream(sys.stderr, txt, *args)
983
984
985 class JobExecutor(object):
986   """Class which manages the submission and execution of multiple jobs.
987
988   Note that instances of this class should not be reused between
989   GetResults() calls.
990
991   """
992   def __init__(self, cl=None, verbose=True):
993     self.queue = []
994     if cl is None:
995       cl = GetClient()
996     self.cl = cl
997     self.verbose = verbose
998     self.jobs = []
999
1000   def QueueJob(self, name, *ops):
1001     """Record a job for later submit.
1002
1003     @type name: string
1004     @param name: a description of the job, will be used in WaitJobSet
1005     """
1006     self.queue.append((name, ops))
1007
1008
1009   def SubmitPending(self):
1010     """Submit all pending jobs.
1011
1012     """
1013     results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
1014     for ((status, data), (name, _)) in zip(results, self.queue):
1015       self.jobs.append((status, data, name))
1016
1017   def GetResults(self):
1018     """Wait for and return the results of all jobs.
1019
1020     @rtype: list
1021     @return: list of tuples (success, job results), in the same order
1022         as the submitted jobs; if a job has failed, instead of the result
1023         there will be the error message
1024
1025     """
1026     if not self.jobs:
1027       self.SubmitPending()
1028     results = []
1029     if self.verbose:
1030       ok_jobs = [row[1] for row in self.jobs if row[0]]
1031       if ok_jobs:
1032         ToStdout("Submitted jobs %s", ", ".join(ok_jobs))
1033     for submit_status, jid, name in self.jobs:
1034       if not submit_status:
1035         ToStderr("Failed to submit job for %s: %s", name, jid)
1036         results.append((False, jid))
1037         continue
1038       if self.verbose:
1039         ToStdout("Waiting for job %s for %s...", jid, name)
1040       try:
1041         job_result = PollJob(jid, cl=self.cl)
1042         success = True
1043       except (errors.GenericError, luxi.ProtocolError), err:
1044         _, job_result = FormatError(err)
1045         success = False
1046         # the error message will always be shown, verbose or not
1047         ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
1048
1049       results.append((success, job_result))
1050     return results
1051
1052   def WaitOrShow(self, wait):
1053     """Wait for job results or only print the job IDs.
1054
1055     @type wait: boolean
1056     @param wait: whether to wait or not
1057
1058     """
1059     if wait:
1060       return self.GetResults()
1061     else:
1062       if not self.jobs:
1063         self.SubmitPending()
1064       for status, result, name in self.jobs:
1065         if status:
1066           ToStdout("%s: %s", result, name)
1067         else:
1068           ToStderr("Failure for %s: %s", name, result)