Fix two potentially endless loops in http library
[ganeti-local] / lib / cli.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module dealing with command line parsing"""
23
24
25 import sys
26 import textwrap
27 import os.path
28 import copy
29 import time
30 import logging
31 from cStringIO import StringIO
32
33 from ganeti import utils
34 from ganeti import errors
35 from ganeti import constants
36 from ganeti import opcodes
37 from ganeti import luxi
38 from ganeti import ssconf
39 from ganeti import rpc
40
41 from optparse import (OptionParser, make_option, TitledHelpFormatter,
42                       Option, OptionValueError)
43
44
45 __all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain",
46            "SubmitOpCode", "GetClient",
47            "cli_option", "ikv_option", "keyval_option",
48            "GenerateTable", "AskUser",
49            "ARGS_NONE", "ARGS_FIXED", "ARGS_ATLEAST", "ARGS_ANY", "ARGS_ONE",
50            "USEUNITS_OPT", "FIELDS_OPT", "FORCE_OPT", "SUBMIT_OPT",
51            "ListTags", "AddTags", "RemoveTags", "TAG_SRC_OPT",
52            "FormatError", "SplitNodeOption", "SubmitOrSend",
53            "JobSubmittedException", "FormatTimestamp", "ParseTimespec",
54            "ToStderr", "ToStdout", "UsesRPC",
55            "GetOnlineNodes", "JobExecutor", "SYNC_OPT",
56            ]
57
58
59
60 def _ExtractTagsObject(opts, args):
61   """Extract the tag type object.
62
63   Note that this function will modify its args parameter.
64
65   """
66   if not hasattr(opts, "tag_type"):
67     raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
68   kind = opts.tag_type
69   if kind == constants.TAG_CLUSTER:
70     retval = kind, kind
71   elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
72     if not args:
73       raise errors.OpPrereqError("no arguments passed to the command")
74     name = args.pop(0)
75     retval = kind, name
76   else:
77     raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
78   return retval
79
80
81 def _ExtendTags(opts, args):
82   """Extend the args if a source file has been given.
83
84   This function will extend the tags with the contents of the file
85   passed in the 'tags_source' attribute of the opts parameter. A file
86   named '-' will be replaced by stdin.
87
88   """
89   fname = opts.tags_source
90   if fname is None:
91     return
92   if fname == "-":
93     new_fh = sys.stdin
94   else:
95     new_fh = open(fname, "r")
96   new_data = []
97   try:
98     # we don't use the nice 'new_data = [line.strip() for line in fh]'
99     # because of python bug 1633941
100     while True:
101       line = new_fh.readline()
102       if not line:
103         break
104       new_data.append(line.strip())
105   finally:
106     new_fh.close()
107   args.extend(new_data)
108
109
110 def ListTags(opts, args):
111   """List the tags on a given object.
112
113   This is a generic implementation that knows how to deal with all
114   three cases of tag objects (cluster, node, instance). The opts
115   argument is expected to contain a tag_type field denoting what
116   object type we work on.
117
118   """
119   kind, name = _ExtractTagsObject(opts, args)
120   op = opcodes.OpGetTags(kind=kind, name=name)
121   result = SubmitOpCode(op)
122   result = list(result)
123   result.sort()
124   for tag in result:
125     ToStdout(tag)
126
127
128 def AddTags(opts, args):
129   """Add tags on a given object.
130
131   This is a generic implementation that knows how to deal with all
132   three cases of tag objects (cluster, node, instance). The opts
133   argument is expected to contain a tag_type field denoting what
134   object type we work on.
135
136   """
137   kind, name = _ExtractTagsObject(opts, args)
138   _ExtendTags(opts, args)
139   if not args:
140     raise errors.OpPrereqError("No tags to be added")
141   op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
142   SubmitOpCode(op)
143
144
145 def RemoveTags(opts, args):
146   """Remove tags from a given object.
147
148   This is a generic implementation that knows how to deal with all
149   three cases of tag objects (cluster, node, instance). The opts
150   argument is expected to contain a tag_type field denoting what
151   object type we work on.
152
153   """
154   kind, name = _ExtractTagsObject(opts, args)
155   _ExtendTags(opts, args)
156   if not args:
157     raise errors.OpPrereqError("No tags to be removed")
158   op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
159   SubmitOpCode(op)
160
161
162 DEBUG_OPT = make_option("-d", "--debug", default=False,
163                         action="store_true",
164                         help="Turn debugging on")
165
166 NOHDR_OPT = make_option("--no-headers", default=False,
167                         action="store_true", dest="no_headers",
168                         help="Don't display column headers")
169
170 SEP_OPT = make_option("--separator", default=None,
171                       action="store", dest="separator",
172                       help="Separator between output fields"
173                       " (defaults to one space)")
174
175 USEUNITS_OPT = make_option("--units", default=None,
176                            dest="units", choices=('h', 'm', 'g', 't'),
177                            help="Specify units for output (one of hmgt)")
178
179 FIELDS_OPT = make_option("-o", "--output", dest="output", action="store",
180                          type="string", help="Comma separated list of"
181                          " output fields",
182                          metavar="FIELDS")
183
184 FORCE_OPT = make_option("-f", "--force", dest="force", action="store_true",
185                         default=False, help="Force the operation")
186
187 TAG_SRC_OPT = make_option("--from", dest="tags_source",
188                           default=None, help="File with tag names")
189
190 SUBMIT_OPT = make_option("--submit", dest="submit_only",
191                          default=False, action="store_true",
192                          help="Submit the job and return the job ID, but"
193                          " don't wait for the job to finish")
194
195 SYNC_OPT = make_option("--sync", dest="do_locking",
196                        default=False, action="store_true",
197                        help="Grab locks while doing the queries"
198                        " in order to ensure more consistent results")
199
200
201 def ARGS_FIXED(val): # pylint: disable-msg=C0103
202   """Macro-like function denoting a fixed number of arguments"""
203   return -val
204
205
206 def ARGS_ATLEAST(val): # pylint: disable-msg=C0103
207   """Macro-like function denoting a minimum number of arguments"""
208   return val
209
210
211 ARGS_NONE = None
212 ARGS_ONE = ARGS_FIXED(1)
213 ARGS_ANY = ARGS_ATLEAST(0)
214
215
216 def check_unit(option, opt, value):
217   """OptParsers custom converter for units.
218
219   """
220   try:
221     return utils.ParseUnit(value)
222   except errors.UnitParseError, err:
223     raise OptionValueError("option %s: %s" % (opt, err))
224
225
226 class CliOption(Option):
227   """Custom option class for optparse.
228
229   """
230   TYPES = Option.TYPES + ("unit",)
231   TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
232   TYPE_CHECKER["unit"] = check_unit
233
234
235 def _SplitKeyVal(opt, data):
236   """Convert a KeyVal string into a dict.
237
238   This function will convert a key=val[,...] string into a dict. Empty
239   values will be converted specially: keys which have the prefix 'no_'
240   will have the value=False and the prefix stripped, the others will
241   have value=True.
242
243   @type opt: string
244   @param opt: a string holding the option name for which we process the
245       data, used in building error messages
246   @type data: string
247   @param data: a string of the format key=val,key=val,...
248   @rtype: dict
249   @return: {key=val, key=val}
250   @raises errors.ParameterError: if there are duplicate keys
251
252   """
253   NO_PREFIX = "no_"
254   UN_PREFIX = "-"
255   kv_dict = {}
256   for elem in data.split(","):
257     if "=" in elem:
258       key, val = elem.split("=", 1)
259     else:
260       if elem.startswith(NO_PREFIX):
261         key, val = elem[len(NO_PREFIX):], False
262       elif elem.startswith(UN_PREFIX):
263         key, val = elem[len(UN_PREFIX):], None
264       else:
265         key, val = elem, True
266     if key in kv_dict:
267       raise errors.ParameterError("Duplicate key '%s' in option %s" %
268                                   (key, opt))
269     kv_dict[key] = val
270   return kv_dict
271
272
273 def check_ident_key_val(option, opt, value):
274   """Custom parser for the IdentKeyVal option type.
275
276   """
277   if ":" not in value:
278     retval =  (value, {})
279   else:
280     ident, rest = value.split(":", 1)
281     kv_dict = _SplitKeyVal(opt, rest)
282     retval = (ident, kv_dict)
283   return retval
284
285
286 class IdentKeyValOption(Option):
287   """Custom option class for ident:key=val,key=val options.
288
289   This will store the parsed values as a tuple (ident, {key: val}). As
290   such, multiple uses of this option via action=append is possible.
291
292   """
293   TYPES = Option.TYPES + ("identkeyval",)
294   TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
295   TYPE_CHECKER["identkeyval"] = check_ident_key_val
296
297
298 def check_key_val(option, opt, value):
299   """Custom parser for the KeyVal option type.
300
301   """
302   return _SplitKeyVal(opt, value)
303
304
305 class KeyValOption(Option):
306   """Custom option class for key=val,key=val options.
307
308   This will store the parsed values as a dict {key: val}.
309
310   """
311   TYPES = Option.TYPES + ("keyval",)
312   TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
313   TYPE_CHECKER["keyval"] = check_key_val
314
315
316 # optparse.py sets make_option, so we do it for our own option class, too
317 cli_option = CliOption
318 ikv_option = IdentKeyValOption
319 keyval_option = KeyValOption
320
321
322 def _ParseArgs(argv, commands, aliases):
323   """Parser for the command line arguments.
324
325   This function parses the arguments and returns the function which
326   must be executed together with its (modified) arguments.
327
328   @param argv: the command line
329   @param commands: dictionary with special contents, see the design
330       doc for cmdline handling
331   @param aliases: dictionary with command aliases {'alias': 'target, ...}
332
333   """
334   if len(argv) == 0:
335     binary = "<command>"
336   else:
337     binary = argv[0].split("/")[-1]
338
339   if len(argv) > 1 and argv[1] == "--version":
340     ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
341     # Quit right away. That way we don't have to care about this special
342     # argument. optparse.py does it the same.
343     sys.exit(0)
344
345   if len(argv) < 2 or not (argv[1] in commands or
346                            argv[1] in aliases):
347     # let's do a nice thing
348     sortedcmds = commands.keys()
349     sortedcmds.sort()
350
351     ToStdout("Usage: %s {command} [options...] [argument...]", binary)
352     ToStdout("%s <command> --help to see details, or man %s", binary, binary)
353     ToStdout("")
354
355     # compute the max line length for cmd + usage
356     mlen = max([len(" %s" % cmd) for cmd in commands])
357     mlen = min(60, mlen) # should not get here...
358
359     # and format a nice command list
360     ToStdout("Commands:")
361     for cmd in sortedcmds:
362       cmdstr = " %s" % (cmd,)
363       help_text = commands[cmd][4]
364       help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
365       ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
366       for line in help_lines:
367         ToStdout("%-*s   %s", mlen, "", line)
368
369     ToStdout("")
370
371     return None, None, None
372
373   # get command, unalias it, and look it up in commands
374   cmd = argv.pop(1)
375   if cmd in aliases:
376     if cmd in commands:
377       raise errors.ProgrammerError("Alias '%s' overrides an existing"
378                                    " command" % cmd)
379
380     if aliases[cmd] not in commands:
381       raise errors.ProgrammerError("Alias '%s' maps to non-existing"
382                                    " command '%s'" % (cmd, aliases[cmd]))
383
384     cmd = aliases[cmd]
385
386   func, nargs, parser_opts, usage, description = commands[cmd]
387   parser = OptionParser(option_list=parser_opts,
388                         description=description,
389                         formatter=TitledHelpFormatter(),
390                         usage="%%prog %s %s" % (cmd, usage))
391   parser.disable_interspersed_args()
392   options, args = parser.parse_args()
393   if nargs is None:
394     if len(args) != 0:
395       ToStderr("Error: Command %s expects no arguments", cmd)
396       return None, None, None
397   elif nargs < 0 and len(args) != -nargs:
398     ToStderr("Error: Command %s expects %d argument(s)", cmd, -nargs)
399     return None, None, None
400   elif nargs >= 0 and len(args) < nargs:
401     ToStderr("Error: Command %s expects at least %d argument(s)", cmd, nargs)
402     return None, None, None
403
404   return func, options, args
405
406
407 def SplitNodeOption(value):
408   """Splits the value of a --node option.
409
410   """
411   if value and ':' in value:
412     return value.split(':', 1)
413   else:
414     return (value, None)
415
416
417 def UsesRPC(fn):
418   def wrapper(*args, **kwargs):
419     rpc.Init()
420     try:
421       return fn(*args, **kwargs)
422     finally:
423       rpc.Shutdown()
424   return wrapper
425
426
427 def AskUser(text, choices=None):
428   """Ask the user a question.
429
430   @param text: the question to ask
431
432   @param choices: list with elements tuples (input_char, return_value,
433       description); if not given, it will default to: [('y', True,
434       'Perform the operation'), ('n', False, 'Do no do the operation')];
435       note that the '?' char is reserved for help
436
437   @return: one of the return values from the choices list; if input is
438       not possible (i.e. not running with a tty, we return the last
439       entry from the list
440
441   """
442   if choices is None:
443     choices = [('y', True, 'Perform the operation'),
444                ('n', False, 'Do not perform the operation')]
445   if not choices or not isinstance(choices, list):
446     raise errors.ProgrammerError("Invalid choices argument to AskUser")
447   for entry in choices:
448     if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
449       raise errors.ProgrammerError("Invalid choices element to AskUser")
450
451   answer = choices[-1][1]
452   new_text = []
453   for line in text.splitlines():
454     new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
455   text = "\n".join(new_text)
456   try:
457     f = file("/dev/tty", "a+")
458   except IOError:
459     return answer
460   try:
461     chars = [entry[0] for entry in choices]
462     chars[-1] = "[%s]" % chars[-1]
463     chars.append('?')
464     maps = dict([(entry[0], entry[1]) for entry in choices])
465     while True:
466       f.write(text)
467       f.write('\n')
468       f.write("/".join(chars))
469       f.write(": ")
470       line = f.readline(2).strip().lower()
471       if line in maps:
472         answer = maps[line]
473         break
474       elif line == '?':
475         for entry in choices:
476           f.write(" %s - %s\n" % (entry[0], entry[2]))
477         f.write("\n")
478         continue
479   finally:
480     f.close()
481   return answer
482
483
484 class JobSubmittedException(Exception):
485   """Job was submitted, client should exit.
486
487   This exception has one argument, the ID of the job that was
488   submitted. The handler should print this ID.
489
490   This is not an error, just a structured way to exit from clients.
491
492   """
493
494
495 def SendJob(ops, cl=None):
496   """Function to submit an opcode without waiting for the results.
497
498   @type ops: list
499   @param ops: list of opcodes
500   @type cl: luxi.Client
501   @param cl: the luxi client to use for communicating with the master;
502              if None, a new client will be created
503
504   """
505   if cl is None:
506     cl = GetClient()
507
508   job_id = cl.SubmitJob(ops)
509
510   return job_id
511
512
513 def PollJob(job_id, cl=None, feedback_fn=None):
514   """Function to poll for the result of a job.
515
516   @type job_id: job identified
517   @param job_id: the job to poll for results
518   @type cl: luxi.Client
519   @param cl: the luxi client to use for communicating with the master;
520              if None, a new client will be created
521
522   """
523   if cl is None:
524     cl = GetClient()
525
526   prev_job_info = None
527   prev_logmsg_serial = None
528
529   while True:
530     result = cl.WaitForJobChange(job_id, ["status"], prev_job_info,
531                                  prev_logmsg_serial)
532     if not result:
533       # job not found, go away!
534       raise errors.JobLost("Job with id %s lost" % job_id)
535
536     # Split result, a tuple of (field values, log entries)
537     (job_info, log_entries) = result
538     (status, ) = job_info
539
540     if log_entries:
541       for log_entry in log_entries:
542         (serial, timestamp, _, message) = log_entry
543         if callable(feedback_fn):
544           feedback_fn(log_entry[1:])
545         else:
546           encoded = utils.SafeEncode(message)
547           ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded)
548         prev_logmsg_serial = max(prev_logmsg_serial, serial)
549
550     # TODO: Handle canceled and archived jobs
551     elif status in (constants.JOB_STATUS_SUCCESS,
552                     constants.JOB_STATUS_ERROR,
553                     constants.JOB_STATUS_CANCELING,
554                     constants.JOB_STATUS_CANCELED):
555       break
556
557     prev_job_info = job_info
558
559   jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
560   if not jobs:
561     raise errors.JobLost("Job with id %s lost" % job_id)
562
563   status, opstatus, result = jobs[0]
564   if status == constants.JOB_STATUS_SUCCESS:
565     return result
566   elif status in (constants.JOB_STATUS_CANCELING,
567                   constants.JOB_STATUS_CANCELED):
568     raise errors.OpExecError("Job was canceled")
569   else:
570     has_ok = False
571     for idx, (status, msg) in enumerate(zip(opstatus, result)):
572       if status == constants.OP_STATUS_SUCCESS:
573         has_ok = True
574       elif status == constants.OP_STATUS_ERROR:
575         errors.MaybeRaise(msg)
576         if has_ok:
577           raise errors.OpExecError("partial failure (opcode %d): %s" %
578                                    (idx, msg))
579         else:
580           raise errors.OpExecError(str(msg))
581     # default failure mode
582     raise errors.OpExecError(result)
583
584
585 def SubmitOpCode(op, cl=None, feedback_fn=None):
586   """Legacy function to submit an opcode.
587
588   This is just a simple wrapper over the construction of the processor
589   instance. It should be extended to better handle feedback and
590   interaction functions.
591
592   """
593   if cl is None:
594     cl = GetClient()
595
596   job_id = SendJob([op], cl)
597
598   op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
599
600   return op_results[0]
601
602
603 def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
604   """Wrapper around SubmitOpCode or SendJob.
605
606   This function will decide, based on the 'opts' parameter, whether to
607   submit and wait for the result of the opcode (and return it), or
608   whether to just send the job and print its identifier. It is used in
609   order to simplify the implementation of the '--submit' option.
610
611   """
612   if opts and opts.submit_only:
613     job_id = SendJob([op], cl=cl)
614     raise JobSubmittedException(job_id)
615   else:
616     return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn)
617
618
619 def GetClient():
620   # TODO: Cache object?
621   try:
622     client = luxi.Client()
623   except luxi.NoMasterError:
624     master, myself = ssconf.GetMasterAndMyself()
625     if master != myself:
626       raise errors.OpPrereqError("This is not the master node, please connect"
627                                  " to node '%s' and rerun the command" %
628                                  master)
629     else:
630       raise
631   return client
632
633
634 def FormatError(err):
635   """Return a formatted error message for a given error.
636
637   This function takes an exception instance and returns a tuple
638   consisting of two values: first, the recommended exit code, and
639   second, a string describing the error message (not
640   newline-terminated).
641
642   """
643   retcode = 1
644   obuf = StringIO()
645   msg = str(err)
646   if isinstance(err, errors.ConfigurationError):
647     txt = "Corrupt configuration file: %s" % msg
648     logging.error(txt)
649     obuf.write(txt + "\n")
650     obuf.write("Aborting.")
651     retcode = 2
652   elif isinstance(err, errors.HooksAbort):
653     obuf.write("Failure: hooks execution failed:\n")
654     for node, script, out in err.args[0]:
655       if out:
656         obuf.write("  node: %s, script: %s, output: %s\n" %
657                    (node, script, out))
658       else:
659         obuf.write("  node: %s, script: %s (no output)\n" %
660                    (node, script))
661   elif isinstance(err, errors.HooksFailure):
662     obuf.write("Failure: hooks general failure: %s" % msg)
663   elif isinstance(err, errors.ResolverError):
664     this_host = utils.HostInfo.SysName()
665     if err.args[0] == this_host:
666       msg = "Failure: can't resolve my own hostname ('%s')"
667     else:
668       msg = "Failure: can't resolve hostname '%s'"
669     obuf.write(msg % err.args[0])
670   elif isinstance(err, errors.OpPrereqError):
671     obuf.write("Failure: prerequisites not met for this"
672                " operation:\n%s" % msg)
673   elif isinstance(err, errors.OpExecError):
674     obuf.write("Failure: command execution error:\n%s" % msg)
675   elif isinstance(err, errors.TagError):
676     obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
677   elif isinstance(err, errors.JobQueueDrainError):
678     obuf.write("Failure: the job queue is marked for drain and doesn't"
679                " accept new requests\n")
680   elif isinstance(err, errors.JobQueueFull):
681     obuf.write("Failure: the job queue is full and doesn't accept new"
682                " job submissions until old jobs are archived\n")
683   elif isinstance(err, errors.TypeEnforcementError):
684     obuf.write("Parameter Error: %s" % msg)
685   elif isinstance(err, errors.ParameterError):
686     obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
687   elif isinstance(err, errors.GenericError):
688     obuf.write("Unhandled Ganeti error: %s" % msg)
689   elif isinstance(err, luxi.NoMasterError):
690     obuf.write("Cannot communicate with the master daemon.\nIs it running"
691                " and listening for connections?")
692   elif isinstance(err, luxi.TimeoutError):
693     obuf.write("Timeout while talking to the master daemon. Error:\n"
694                "%s" % msg)
695   elif isinstance(err, luxi.ProtocolError):
696     obuf.write("Unhandled protocol error while talking to the master daemon:\n"
697                "%s" % msg)
698   elif isinstance(err, JobSubmittedException):
699     obuf.write("JobID: %s\n" % err.args[0])
700     retcode = 0
701   else:
702     obuf.write("Unhandled exception: %s" % msg)
703   return retcode, obuf.getvalue().rstrip('\n')
704
705
706 def GenericMain(commands, override=None, aliases=None):
707   """Generic main function for all the gnt-* commands.
708
709   Arguments:
710     - commands: a dictionary with a special structure, see the design doc
711                 for command line handling.
712     - override: if not None, we expect a dictionary with keys that will
713                 override command line options; this can be used to pass
714                 options from the scripts to generic functions
715     - aliases: dictionary with command aliases {'alias': 'target, ...}
716
717   """
718   # save the program name and the entire command line for later logging
719   if sys.argv:
720     binary = os.path.basename(sys.argv[0]) or sys.argv[0]
721     if len(sys.argv) >= 2:
722       binary += " " + sys.argv[1]
723       old_cmdline = " ".join(sys.argv[2:])
724     else:
725       old_cmdline = ""
726   else:
727     binary = "<unknown program>"
728     old_cmdline = ""
729
730   if aliases is None:
731     aliases = {}
732
733   func, options, args = _ParseArgs(sys.argv, commands, aliases)
734   if func is None: # parse error
735     return 1
736
737   if override is not None:
738     for key, val in override.iteritems():
739       setattr(options, key, val)
740
741   utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
742                      stderr_logging=True, program=binary)
743
744   if old_cmdline:
745     logging.info("run with arguments '%s'", old_cmdline)
746   else:
747     logging.info("run with no arguments")
748
749   try:
750     result = func(options, args)
751   except (errors.GenericError, luxi.ProtocolError,
752           JobSubmittedException), err:
753     result, err_msg = FormatError(err)
754     logging.exception("Error during command processing")
755     ToStderr(err_msg)
756
757   return result
758
759
760 def GenerateTable(headers, fields, separator, data,
761                   numfields=None, unitfields=None,
762                   units=None):
763   """Prints a table with headers and different fields.
764
765   @type headers: dict
766   @param headers: dictionary mapping field names to headers for
767       the table
768   @type fields: list
769   @param fields: the field names corresponding to each row in
770       the data field
771   @param separator: the separator to be used; if this is None,
772       the default 'smart' algorithm is used which computes optimal
773       field width, otherwise just the separator is used between
774       each field
775   @type data: list
776   @param data: a list of lists, each sublist being one row to be output
777   @type numfields: list
778   @param numfields: a list with the fields that hold numeric
779       values and thus should be right-aligned
780   @type unitfields: list
781   @param unitfields: a list with the fields that hold numeric
782       values that should be formatted with the units field
783   @type units: string or None
784   @param units: the units we should use for formatting, or None for
785       automatic choice (human-readable for non-separator usage, otherwise
786       megabytes); this is a one-letter string
787
788   """
789   if units is None:
790     if separator:
791       units = "m"
792     else:
793       units = "h"
794
795   if numfields is None:
796     numfields = []
797   if unitfields is None:
798     unitfields = []
799
800   numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
801   unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
802
803   format_fields = []
804   for field in fields:
805     if headers and field not in headers:
806       # TODO: handle better unknown fields (either revert to old
807       # style of raising exception, or deal more intelligently with
808       # variable fields)
809       headers[field] = field
810     if separator is not None:
811       format_fields.append("%s")
812     elif numfields.Matches(field):
813       format_fields.append("%*s")
814     else:
815       format_fields.append("%-*s")
816
817   if separator is None:
818     mlens = [0 for name in fields]
819     format = ' '.join(format_fields)
820   else:
821     format = separator.replace("%", "%%").join(format_fields)
822
823   for row in data:
824     if row is None:
825       continue
826     for idx, val in enumerate(row):
827       if unitfields.Matches(fields[idx]):
828         try:
829           val = int(val)
830         except (TypeError, ValueError):
831           pass
832         else:
833           val = row[idx] = utils.FormatUnit(val, units)
834       val = row[idx] = str(val)
835       if separator is None:
836         mlens[idx] = max(mlens[idx], len(val))
837
838   result = []
839   if headers:
840     args = []
841     for idx, name in enumerate(fields):
842       hdr = headers[name]
843       if separator is None:
844         mlens[idx] = max(mlens[idx], len(hdr))
845         args.append(mlens[idx])
846       args.append(hdr)
847     result.append(format % tuple(args))
848
849   for line in data:
850     args = []
851     if line is None:
852       line = ['-' for _ in fields]
853     for idx in xrange(len(fields)):
854       if separator is None:
855         args.append(mlens[idx])
856       args.append(line[idx])
857     result.append(format % tuple(args))
858
859   return result
860
861
862 def FormatTimestamp(ts):
863   """Formats a given timestamp.
864
865   @type ts: timestamp
866   @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
867
868   @rtype: string
869   @return: a string with the formatted timestamp
870
871   """
872   if not isinstance (ts, (tuple, list)) or len(ts) != 2:
873     return '?'
874   sec, usec = ts
875   return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
876
877
878 def ParseTimespec(value):
879   """Parse a time specification.
880
881   The following suffixed will be recognized:
882
883     - s: seconds
884     - m: minutes
885     - h: hours
886     - d: day
887     - w: weeks
888
889   Without any suffix, the value will be taken to be in seconds.
890
891   """
892   value = str(value)
893   if not value:
894     raise errors.OpPrereqError("Empty time specification passed")
895   suffix_map = {
896     's': 1,
897     'm': 60,
898     'h': 3600,
899     'd': 86400,
900     'w': 604800,
901     }
902   if value[-1] not in suffix_map:
903     try:
904       value = int(value)
905     except (TypeError, ValueError):
906       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
907   else:
908     multiplier = suffix_map[value[-1]]
909     value = value[:-1]
910     if not value: # no data left after stripping the suffix
911       raise errors.OpPrereqError("Invalid time specification (only"
912                                  " suffix passed)")
913     try:
914       value = int(value) * multiplier
915     except (TypeError, ValueError):
916       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
917   return value
918
919
920 def GetOnlineNodes(nodes, cl=None, nowarn=False):
921   """Returns the names of online nodes.
922
923   This function will also log a warning on stderr with the names of
924   the online nodes.
925
926   @param nodes: if not empty, use only this subset of nodes (minus the
927       offline ones)
928   @param cl: if not None, luxi client to use
929   @type nowarn: boolean
930   @param nowarn: by default, this function will output a note with the
931       offline nodes that are skipped; if this parameter is True the
932       note is not displayed
933
934   """
935   if cl is None:
936     cl = GetClient()
937
938   result = cl.QueryNodes(names=nodes, fields=["name", "offline"],
939                          use_locking=False)
940   offline = [row[0] for row in result if row[1]]
941   if offline and not nowarn:
942     ToStderr("Note: skipping offline node(s): %s" % ", ".join(offline))
943   return [row[0] for row in result if not row[1]]
944
945
946 def _ToStream(stream, txt, *args):
947   """Write a message to a stream, bypassing the logging system
948
949   @type stream: file object
950   @param stream: the file to which we should write
951   @type txt: str
952   @param txt: the message
953
954   """
955   if args:
956     args = tuple(args)
957     stream.write(txt % args)
958   else:
959     stream.write(txt)
960   stream.write('\n')
961   stream.flush()
962
963
964 def ToStdout(txt, *args):
965   """Write a message to stdout only, bypassing the logging system
966
967   This is just a wrapper over _ToStream.
968
969   @type txt: str
970   @param txt: the message
971
972   """
973   _ToStream(sys.stdout, txt, *args)
974
975
976 def ToStderr(txt, *args):
977   """Write a message to stderr only, bypassing the logging system
978
979   This is just a wrapper over _ToStream.
980
981   @type txt: str
982   @param txt: the message
983
984   """
985   _ToStream(sys.stderr, txt, *args)
986
987
988 class JobExecutor(object):
989   """Class which manages the submission and execution of multiple jobs.
990
991   Note that instances of this class should not be reused between
992   GetResults() calls.
993
994   """
995   def __init__(self, cl=None, verbose=True):
996     self.queue = []
997     if cl is None:
998       cl = GetClient()
999     self.cl = cl
1000     self.verbose = verbose
1001     self.jobs = []
1002
1003   def QueueJob(self, name, *ops):
1004     """Record a job for later submit.
1005
1006     @type name: string
1007     @param name: a description of the job, will be used in WaitJobSet
1008     """
1009     self.queue.append((name, ops))
1010
1011   def SubmitPending(self):
1012     """Submit all pending jobs.
1013
1014     """
1015     results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
1016     for ((status, data), (name, _)) in zip(results, self.queue):
1017       self.jobs.append((status, data, name))
1018
1019   def GetResults(self):
1020     """Wait for and return the results of all jobs.
1021
1022     @rtype: list
1023     @return: list of tuples (success, job results), in the same order
1024         as the submitted jobs; if a job has failed, instead of the result
1025         there will be the error message
1026
1027     """
1028     if not self.jobs:
1029       self.SubmitPending()
1030     results = []
1031     if self.verbose:
1032       ok_jobs = [row[1] for row in self.jobs if row[0]]
1033       if ok_jobs:
1034         ToStdout("Submitted jobs %s", ", ".join(ok_jobs))
1035     for submit_status, jid, name in self.jobs:
1036       if not submit_status:
1037         ToStderr("Failed to submit job for %s: %s", name, jid)
1038         results.append((False, jid))
1039         continue
1040       if self.verbose:
1041         ToStdout("Waiting for job %s for %s...", jid, name)
1042       try:
1043         job_result = PollJob(jid, cl=self.cl)
1044         success = True
1045       except (errors.GenericError, luxi.ProtocolError), err:
1046         _, job_result = FormatError(err)
1047         success = False
1048         # the error message will always be shown, verbose or not
1049         ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
1050
1051       results.append((success, job_result))
1052     return results
1053
1054   def WaitOrShow(self, wait):
1055     """Wait for job results or only print the job IDs.
1056
1057     @type wait: boolean
1058     @param wait: whether to wait or not
1059
1060     """
1061     if wait:
1062       return self.GetResults()
1063     else:
1064       if not self.jobs:
1065         self.SubmitPending()
1066       for status, result, name in self.jobs:
1067         if status:
1068           ToStdout("%s: %s", result, name)
1069         else:
1070           ToStderr("Failure for %s: %s", name, result)