cluster verify: show correctly drained nodes
[ganeti-local] / lib / cli.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module dealing with command line parsing"""
23
24
25 import sys
26 import textwrap
27 import os.path
28 import copy
29 import time
30 import logging
31 from cStringIO import StringIO
32
33 from ganeti import utils
34 from ganeti import errors
35 from ganeti import constants
36 from ganeti import opcodes
37 from ganeti import luxi
38 from ganeti import ssconf
39 from ganeti import rpc
40
41 from optparse import (OptionParser, make_option, TitledHelpFormatter,
42                       Option, OptionValueError)
43
44 __all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain",
45            "SubmitOpCode", "GetClient",
46            "cli_option", "ikv_option", "keyval_option",
47            "GenerateTable", "AskUser",
48            "ARGS_NONE", "ARGS_FIXED", "ARGS_ATLEAST", "ARGS_ANY", "ARGS_ONE",
49            "USEUNITS_OPT", "FIELDS_OPT", "FORCE_OPT", "SUBMIT_OPT",
50            "ListTags", "AddTags", "RemoveTags", "TAG_SRC_OPT",
51            "FormatError", "SplitNodeOption", "SubmitOrSend",
52            "JobSubmittedException", "FormatTimestamp", "ParseTimespec",
53            "ValidateBeParams", "ToStderr", "ToStdout", "UsesRPC",
54            "GetOnlineNodes", "JobExecutor", "SYNC_OPT",
55            ]
56
57
58 def _ExtractTagsObject(opts, args):
59   """Extract the tag type object.
60
61   Note that this function will modify its args parameter.
62
63   """
64   if not hasattr(opts, "tag_type"):
65     raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
66   kind = opts.tag_type
67   if kind == constants.TAG_CLUSTER:
68     retval = kind, kind
69   elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
70     if not args:
71       raise errors.OpPrereqError("no arguments passed to the command")
72     name = args.pop(0)
73     retval = kind, name
74   else:
75     raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
76   return retval
77
78
79 def _ExtendTags(opts, args):
80   """Extend the args if a source file has been given.
81
82   This function will extend the tags with the contents of the file
83   passed in the 'tags_source' attribute of the opts parameter. A file
84   named '-' will be replaced by stdin.
85
86   """
87   fname = opts.tags_source
88   if fname is None:
89     return
90   if fname == "-":
91     new_fh = sys.stdin
92   else:
93     new_fh = open(fname, "r")
94   new_data = []
95   try:
96     # we don't use the nice 'new_data = [line.strip() for line in fh]'
97     # because of python bug 1633941
98     while True:
99       line = new_fh.readline()
100       if not line:
101         break
102       new_data.append(line.strip())
103   finally:
104     new_fh.close()
105   args.extend(new_data)
106
107
108 def ListTags(opts, args):
109   """List the tags on a given object.
110
111   This is a generic implementation that knows how to deal with all
112   three cases of tag objects (cluster, node, instance). The opts
113   argument is expected to contain a tag_type field denoting what
114   object type we work on.
115
116   """
117   kind, name = _ExtractTagsObject(opts, args)
118   op = opcodes.OpGetTags(kind=kind, name=name)
119   result = SubmitOpCode(op)
120   result = list(result)
121   result.sort()
122   for tag in result:
123     print tag
124
125
126 def AddTags(opts, args):
127   """Add tags on a given object.
128
129   This is a generic implementation that knows how to deal with all
130   three cases of tag objects (cluster, node, instance). The opts
131   argument is expected to contain a tag_type field denoting what
132   object type we work on.
133
134   """
135   kind, name = _ExtractTagsObject(opts, args)
136   _ExtendTags(opts, args)
137   if not args:
138     raise errors.OpPrereqError("No tags to be added")
139   op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
140   SubmitOpCode(op)
141
142
143 def RemoveTags(opts, args):
144   """Remove tags from a given object.
145
146   This is a generic implementation that knows how to deal with all
147   three cases of tag objects (cluster, node, instance). The opts
148   argument is expected to contain a tag_type field denoting what
149   object type we work on.
150
151   """
152   kind, name = _ExtractTagsObject(opts, args)
153   _ExtendTags(opts, args)
154   if not args:
155     raise errors.OpPrereqError("No tags to be removed")
156   op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
157   SubmitOpCode(op)
158
159
160 DEBUG_OPT = make_option("-d", "--debug", default=False,
161                         action="store_true",
162                         help="Turn debugging on")
163
164 NOHDR_OPT = make_option("--no-headers", default=False,
165                         action="store_true", dest="no_headers",
166                         help="Don't display column headers")
167
168 SEP_OPT = make_option("--separator", default=None,
169                       action="store", dest="separator",
170                       help="Separator between output fields"
171                       " (defaults to one space)")
172
173 USEUNITS_OPT = make_option("--units", default=None,
174                            dest="units", choices=('h', 'm', 'g', 't'),
175                            help="Specify units for output (one of hmgt)")
176
177 FIELDS_OPT = make_option("-o", "--output", dest="output", action="store",
178                          type="string", help="Comma separated list of"
179                          " output fields",
180                          metavar="FIELDS")
181
182 FORCE_OPT = make_option("-f", "--force", dest="force", action="store_true",
183                         default=False, help="Force the operation")
184
185 TAG_SRC_OPT = make_option("--from", dest="tags_source",
186                           default=None, help="File with tag names")
187
188 SUBMIT_OPT = make_option("--submit", dest="submit_only",
189                          default=False, action="store_true",
190                          help="Submit the job and return the job ID, but"
191                          " don't wait for the job to finish")
192
193 SYNC_OPT = make_option("--sync", dest="do_locking",
194                        default=False, action="store_true",
195                        help="Grab locks while doing the queries"
196                        " in order to ensure more consistent results")
197
198
199 def ARGS_FIXED(val):
200   """Macro-like function denoting a fixed number of arguments"""
201   return -val
202
203
204 def ARGS_ATLEAST(val):
205   """Macro-like function denoting a minimum number of arguments"""
206   return val
207
208
209 ARGS_NONE = None
210 ARGS_ONE = ARGS_FIXED(1)
211 ARGS_ANY = ARGS_ATLEAST(0)
212
213
214 def check_unit(option, opt, value):
215   """OptParsers custom converter for units.
216
217   """
218   try:
219     return utils.ParseUnit(value)
220   except errors.UnitParseError, err:
221     raise OptionValueError("option %s: %s" % (opt, err))
222
223
224 class CliOption(Option):
225   """Custom option class for optparse.
226
227   """
228   TYPES = Option.TYPES + ("unit",)
229   TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
230   TYPE_CHECKER["unit"] = check_unit
231
232
233 def _SplitKeyVal(opt, data):
234   """Convert a KeyVal string into a dict.
235
236   This function will convert a key=val[,...] string into a dict. Empty
237   values will be converted specially: keys which have the prefix 'no_'
238   will have the value=False and the prefix stripped, the others will
239   have value=True.
240
241   @type opt: string
242   @param opt: a string holding the option name for which we process the
243       data, used in building error messages
244   @type data: string
245   @param data: a string of the format key=val,key=val,...
246   @rtype: dict
247   @return: {key=val, key=val}
248   @raises errors.ParameterError: if there are duplicate keys
249
250   """
251   NO_PREFIX = "no_"
252   UN_PREFIX = "-"
253   kv_dict = {}
254   for elem in data.split(","):
255     if "=" in elem:
256       key, val = elem.split("=", 1)
257     else:
258       if elem.startswith(NO_PREFIX):
259         key, val = elem[len(NO_PREFIX):], False
260       elif elem.startswith(UN_PREFIX):
261         key, val = elem[len(UN_PREFIX):], None
262       else:
263         key, val = elem, True
264     if key in kv_dict:
265       raise errors.ParameterError("Duplicate key '%s' in option %s" %
266                                   (key, opt))
267     kv_dict[key] = val
268   return kv_dict
269
270
271 def check_ident_key_val(option, opt, value):
272   """Custom parser for the IdentKeyVal option type.
273
274   """
275   if ":" not in value:
276     retval =  (value, {})
277   else:
278     ident, rest = value.split(":", 1)
279     kv_dict = _SplitKeyVal(opt, rest)
280     retval = (ident, kv_dict)
281   return retval
282
283
284 class IdentKeyValOption(Option):
285   """Custom option class for ident:key=val,key=val options.
286
287   This will store the parsed values as a tuple (ident, {key: val}). As
288   such, multiple uses of this option via action=append is possible.
289
290   """
291   TYPES = Option.TYPES + ("identkeyval",)
292   TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
293   TYPE_CHECKER["identkeyval"] = check_ident_key_val
294
295
296 def check_key_val(option, opt, value):
297   """Custom parser for the KeyVal option type.
298
299   """
300   return _SplitKeyVal(opt, value)
301
302
303 class KeyValOption(Option):
304   """Custom option class for key=val,key=val options.
305
306   This will store the parsed values as a dict {key: val}.
307
308   """
309   TYPES = Option.TYPES + ("keyval",)
310   TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
311   TYPE_CHECKER["keyval"] = check_key_val
312
313
314 # optparse.py sets make_option, so we do it for our own option class, too
315 cli_option = CliOption
316 ikv_option = IdentKeyValOption
317 keyval_option = KeyValOption
318
319
320 def _ParseArgs(argv, commands, aliases):
321   """Parser for the command line arguments.
322
323   This function parses the arguements and returns the function which
324   must be executed together with its (modified) arguments.
325
326   @param argv: the command line
327   @param commands: dictionary with special contents, see the design
328       doc for cmdline handling
329   @param aliases: dictionary with command aliases {'alias': 'target, ...}
330
331   """
332   if len(argv) == 0:
333     binary = "<command>"
334   else:
335     binary = argv[0].split("/")[-1]
336
337   if len(argv) > 1 and argv[1] == "--version":
338     print "%s (ganeti) %s" % (binary, constants.RELEASE_VERSION)
339     # Quit right away. That way we don't have to care about this special
340     # argument. optparse.py does it the same.
341     sys.exit(0)
342
343   if len(argv) < 2 or not (argv[1] in commands or
344                            argv[1] in aliases):
345     # let's do a nice thing
346     sortedcmds = commands.keys()
347     sortedcmds.sort()
348     print ("Usage: %(bin)s {command} [options...] [argument...]"
349            "\n%(bin)s <command> --help to see details, or"
350            " man %(bin)s\n" % {"bin": binary})
351     # compute the max line length for cmd + usage
352     mlen = max([len(" %s" % cmd) for cmd in commands])
353     mlen = min(60, mlen) # should not get here...
354     # and format a nice command list
355     print "Commands:"
356     for cmd in sortedcmds:
357       cmdstr = " %s" % (cmd,)
358       help_text = commands[cmd][4]
359       help_lines = textwrap.wrap(help_text, 79-3-mlen)
360       print "%-*s - %s" % (mlen, cmdstr, help_lines.pop(0))
361       for line in help_lines:
362         print "%-*s   %s" % (mlen, "", line)
363     print
364     return None, None, None
365
366   # get command, unalias it, and look it up in commands
367   cmd = argv.pop(1)
368   if cmd in aliases:
369     if cmd in commands:
370       raise errors.ProgrammerError("Alias '%s' overrides an existing"
371                                    " command" % cmd)
372
373     if aliases[cmd] not in commands:
374       raise errors.ProgrammerError("Alias '%s' maps to non-existing"
375                                    " command '%s'" % (cmd, aliases[cmd]))
376
377     cmd = aliases[cmd]
378
379   func, nargs, parser_opts, usage, description = commands[cmd]
380   parser = OptionParser(option_list=parser_opts,
381                         description=description,
382                         formatter=TitledHelpFormatter(),
383                         usage="%%prog %s %s" % (cmd, usage))
384   parser.disable_interspersed_args()
385   options, args = parser.parse_args()
386   if nargs is None:
387     if len(args) != 0:
388       print >> sys.stderr, ("Error: Command %s expects no arguments" % cmd)
389       return None, None, None
390   elif nargs < 0 and len(args) != -nargs:
391     print >> sys.stderr, ("Error: Command %s expects %d argument(s)" %
392                          (cmd, -nargs))
393     return None, None, None
394   elif nargs >= 0 and len(args) < nargs:
395     print >> sys.stderr, ("Error: Command %s expects at least %d argument(s)" %
396                          (cmd, nargs))
397     return None, None, None
398
399   return func, options, args
400
401
402 def SplitNodeOption(value):
403   """Splits the value of a --node option.
404
405   """
406   if value and ':' in value:
407     return value.split(':', 1)
408   else:
409     return (value, None)
410
411
412 def ValidateBeParams(bep):
413   """Parse and check the given beparams.
414
415   The function will update in-place the given dictionary.
416
417   @type bep: dict
418   @param bep: input beparams
419   @raise errors.ParameterError: if the input values are not OK
420   @raise errors.UnitParseError: if the input values are not OK
421
422   """
423   if constants.BE_MEMORY in bep:
424     bep[constants.BE_MEMORY] = utils.ParseUnit(bep[constants.BE_MEMORY])
425
426   if constants.BE_VCPUS in bep:
427     try:
428       bep[constants.BE_VCPUS] = int(bep[constants.BE_VCPUS])
429     except ValueError:
430       raise errors.ParameterError("Invalid number of VCPUs")
431
432
433 def UsesRPC(fn):
434   def wrapper(*args, **kwargs):
435     rpc.Init()
436     try:
437       return fn(*args, **kwargs)
438     finally:
439       rpc.Shutdown()
440   return wrapper
441
442
443 def AskUser(text, choices=None):
444   """Ask the user a question.
445
446   @param text: the question to ask
447
448   @param choices: list with elements tuples (input_char, return_value,
449       description); if not given, it will default to: [('y', True,
450       'Perform the operation'), ('n', False, 'Do no do the operation')];
451       note that the '?' char is reserved for help
452
453   @return: one of the return values from the choices list; if input is
454       not possible (i.e. not running with a tty, we return the last
455       entry from the list
456
457   """
458   if choices is None:
459     choices = [('y', True, 'Perform the operation'),
460                ('n', False, 'Do not perform the operation')]
461   if not choices or not isinstance(choices, list):
462     raise errors.ProgrammerError("Invalid choiches argument to AskUser")
463   for entry in choices:
464     if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
465       raise errors.ProgrammerError("Invalid choiches element to AskUser")
466
467   answer = choices[-1][1]
468   new_text = []
469   for line in text.splitlines():
470     new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
471   text = "\n".join(new_text)
472   try:
473     f = file("/dev/tty", "a+")
474   except IOError:
475     return answer
476   try:
477     chars = [entry[0] for entry in choices]
478     chars[-1] = "[%s]" % chars[-1]
479     chars.append('?')
480     maps = dict([(entry[0], entry[1]) for entry in choices])
481     while True:
482       f.write(text)
483       f.write('\n')
484       f.write("/".join(chars))
485       f.write(": ")
486       line = f.readline(2).strip().lower()
487       if line in maps:
488         answer = maps[line]
489         break
490       elif line == '?':
491         for entry in choices:
492           f.write(" %s - %s\n" % (entry[0], entry[2]))
493         f.write("\n")
494         continue
495   finally:
496     f.close()
497   return answer
498
499
500 class JobSubmittedException(Exception):
501   """Job was submitted, client should exit.
502
503   This exception has one argument, the ID of the job that was
504   submitted. The handler should print this ID.
505
506   This is not an error, just a structured way to exit from clients.
507
508   """
509
510
511 def SendJob(ops, cl=None):
512   """Function to submit an opcode without waiting for the results.
513
514   @type ops: list
515   @param ops: list of opcodes
516   @type cl: luxi.Client
517   @param cl: the luxi client to use for communicating with the master;
518              if None, a new client will be created
519
520   """
521   if cl is None:
522     cl = GetClient()
523
524   job_id = cl.SubmitJob(ops)
525
526   return job_id
527
528
529 def PollJob(job_id, cl=None, feedback_fn=None):
530   """Function to poll for the result of a job.
531
532   @type job_id: job identified
533   @param job_id: the job to poll for results
534   @type cl: luxi.Client
535   @param cl: the luxi client to use for communicating with the master;
536              if None, a new client will be created
537
538   """
539   if cl is None:
540     cl = GetClient()
541
542   prev_job_info = None
543   prev_logmsg_serial = None
544
545   while True:
546     result = cl.WaitForJobChange(job_id, ["status"], prev_job_info,
547                                  prev_logmsg_serial)
548     if not result:
549       # job not found, go away!
550       raise errors.JobLost("Job with id %s lost" % job_id)
551
552     # Split result, a tuple of (field values, log entries)
553     (job_info, log_entries) = result
554     (status, ) = job_info
555
556     if log_entries:
557       for log_entry in log_entries:
558         (serial, timestamp, _, message) = log_entry
559         if callable(feedback_fn):
560           feedback_fn(log_entry[1:])
561         else:
562           encoded = utils.SafeEncode(message)
563           print "%s %s" % (time.ctime(utils.MergeTime(timestamp)), encoded)
564         prev_logmsg_serial = max(prev_logmsg_serial, serial)
565
566     # TODO: Handle canceled and archived jobs
567     elif status in (constants.JOB_STATUS_SUCCESS,
568                     constants.JOB_STATUS_ERROR,
569                     constants.JOB_STATUS_CANCELING,
570                     constants.JOB_STATUS_CANCELED):
571       break
572
573     prev_job_info = job_info
574
575   jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
576   if not jobs:
577     raise errors.JobLost("Job with id %s lost" % job_id)
578
579   status, opstatus, result = jobs[0]
580   if status == constants.JOB_STATUS_SUCCESS:
581     return result
582   elif status in (constants.JOB_STATUS_CANCELING,
583                   constants.JOB_STATUS_CANCELED):
584     raise errors.OpExecError("Job was canceled")
585   else:
586     has_ok = False
587     for idx, (status, msg) in enumerate(zip(opstatus, result)):
588       if status == constants.OP_STATUS_SUCCESS:
589         has_ok = True
590       elif status == constants.OP_STATUS_ERROR:
591         if has_ok:
592           raise errors.OpExecError("partial failure (opcode %d): %s" %
593                                    (idx, msg))
594         else:
595           raise errors.OpExecError(str(msg))
596     # default failure mode
597     raise errors.OpExecError(result)
598
599
600 def SubmitOpCode(op, cl=None, feedback_fn=None):
601   """Legacy function to submit an opcode.
602
603   This is just a simple wrapper over the construction of the processor
604   instance. It should be extended to better handle feedback and
605   interaction functions.
606
607   """
608   if cl is None:
609     cl = GetClient()
610
611   job_id = SendJob([op], cl)
612
613   op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
614
615   return op_results[0]
616
617
618 def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
619   """Wrapper around SubmitOpCode or SendJob.
620
621   This function will decide, based on the 'opts' parameter, whether to
622   submit and wait for the result of the opcode (and return it), or
623   whether to just send the job and print its identifier. It is used in
624   order to simplify the implementation of the '--submit' option.
625
626   """
627   if opts and opts.submit_only:
628     job_id = SendJob([op], cl=cl)
629     raise JobSubmittedException(job_id)
630   else:
631     return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn)
632
633
634 def GetClient():
635   # TODO: Cache object?
636   try:
637     client = luxi.Client()
638   except luxi.NoMasterError:
639     master, myself = ssconf.GetMasterAndMyself()
640     if master != myself:
641       raise errors.OpPrereqError("This is not the master node, please connect"
642                                  " to node '%s' and rerun the command" %
643                                  master)
644     else:
645       raise
646   return client
647
648
649 def FormatError(err):
650   """Return a formatted error message for a given error.
651
652   This function takes an exception instance and returns a tuple
653   consisting of two values: first, the recommended exit code, and
654   second, a string describing the error message (not
655   newline-terminated).
656
657   """
658   retcode = 1
659   obuf = StringIO()
660   msg = str(err)
661   if isinstance(err, errors.ConfigurationError):
662     txt = "Corrupt configuration file: %s" % msg
663     logging.error(txt)
664     obuf.write(txt + "\n")
665     obuf.write("Aborting.")
666     retcode = 2
667   elif isinstance(err, errors.HooksAbort):
668     obuf.write("Failure: hooks execution failed:\n")
669     for node, script, out in err.args[0]:
670       if out:
671         obuf.write("  node: %s, script: %s, output: %s\n" %
672                    (node, script, out))
673       else:
674         obuf.write("  node: %s, script: %s (no output)\n" %
675                    (node, script))
676   elif isinstance(err, errors.HooksFailure):
677     obuf.write("Failure: hooks general failure: %s" % msg)
678   elif isinstance(err, errors.ResolverError):
679     this_host = utils.HostInfo.SysName()
680     if err.args[0] == this_host:
681       msg = "Failure: can't resolve my own hostname ('%s')"
682     else:
683       msg = "Failure: can't resolve hostname '%s'"
684     obuf.write(msg % err.args[0])
685   elif isinstance(err, errors.OpPrereqError):
686     obuf.write("Failure: prerequisites not met for this"
687                " operation:\n%s" % msg)
688   elif isinstance(err, errors.OpExecError):
689     obuf.write("Failure: command execution error:\n%s" % msg)
690   elif isinstance(err, errors.TagError):
691     obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
692   elif isinstance(err, errors.JobQueueDrainError):
693     obuf.write("Failure: the job queue is marked for drain and doesn't"
694                " accept new requests\n")
695   elif isinstance(err, errors.JobQueueFull):
696     obuf.write("Failure: the job queue is full and doesn't accept new"
697                " job submissions until old jobs are archived\n")
698   elif isinstance(err, errors.GenericError):
699     obuf.write("Unhandled Ganeti error: %s" % msg)
700   elif isinstance(err, luxi.NoMasterError):
701     obuf.write("Cannot communicate with the master daemon.\nIs it running"
702                " and listening for connections?")
703   elif isinstance(err, luxi.TimeoutError):
704     obuf.write("Timeout while talking to the master daemon. Error:\n"
705                "%s" % msg)
706   elif isinstance(err, luxi.ProtocolError):
707     obuf.write("Unhandled protocol error while talking to the master daemon:\n"
708                "%s" % msg)
709   elif isinstance(err, JobSubmittedException):
710     obuf.write("JobID: %s\n" % err.args[0])
711     retcode = 0
712   else:
713     obuf.write("Unhandled exception: %s" % msg)
714   return retcode, obuf.getvalue().rstrip('\n')
715
716
717 def GenericMain(commands, override=None, aliases=None):
718   """Generic main function for all the gnt-* commands.
719
720   Arguments:
721     - commands: a dictionary with a special structure, see the design doc
722                 for command line handling.
723     - override: if not None, we expect a dictionary with keys that will
724                 override command line options; this can be used to pass
725                 options from the scripts to generic functions
726     - aliases: dictionary with command aliases {'alias': 'target, ...}
727
728   """
729   # save the program name and the entire command line for later logging
730   if sys.argv:
731     binary = os.path.basename(sys.argv[0]) or sys.argv[0]
732     if len(sys.argv) >= 2:
733       binary += " " + sys.argv[1]
734       old_cmdline = " ".join(sys.argv[2:])
735     else:
736       old_cmdline = ""
737   else:
738     binary = "<unknown program>"
739     old_cmdline = ""
740
741   if aliases is None:
742     aliases = {}
743
744   func, options, args = _ParseArgs(sys.argv, commands, aliases)
745   if func is None: # parse error
746     return 1
747
748   if override is not None:
749     for key, val in override.iteritems():
750       setattr(options, key, val)
751
752   utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
753                      stderr_logging=True, program=binary)
754
755   utils.debug = options.debug
756
757   if old_cmdline:
758     logging.info("run with arguments '%s'", old_cmdline)
759   else:
760     logging.info("run with no arguments")
761
762   try:
763     result = func(options, args)
764   except (errors.GenericError, luxi.ProtocolError,
765           JobSubmittedException), err:
766     result, err_msg = FormatError(err)
767     logging.exception("Error durring command processing")
768     ToStderr(err_msg)
769
770   return result
771
772
773 def GenerateTable(headers, fields, separator, data,
774                   numfields=None, unitfields=None,
775                   units=None):
776   """Prints a table with headers and different fields.
777
778   @type headers: dict
779   @param headers: dictionary mapping field names to headers for
780       the table
781   @type fields: list
782   @param fields: the field names corresponding to each row in
783       the data field
784   @param separator: the separator to be used; if this is None,
785       the default 'smart' algorithm is used which computes optimal
786       field width, otherwise just the separator is used between
787       each field
788   @type data: list
789   @param data: a list of lists, each sublist being one row to be output
790   @type numfields: list
791   @param numfields: a list with the fields that hold numeric
792       values and thus should be right-aligned
793   @type unitfields: list
794   @param unitfields: a list with the fields that hold numeric
795       values that should be formatted with the units field
796   @type units: string or None
797   @param units: the units we should use for formatting, or None for
798       automatic choice (human-readable for non-separator usage, otherwise
799       megabytes); this is a one-letter string
800
801   """
802   if units is None:
803     if separator:
804       units = "m"
805     else:
806       units = "h"
807
808   if numfields is None:
809     numfields = []
810   if unitfields is None:
811     unitfields = []
812
813   numfields = utils.FieldSet(*numfields)
814   unitfields = utils.FieldSet(*unitfields)
815
816   format_fields = []
817   for field in fields:
818     if headers and field not in headers:
819       # FIXME: handle better unknown fields (either revert to old
820       # style of raising exception, or deal more intelligently with
821       # variable fields)
822       headers[field] = field
823     if separator is not None:
824       format_fields.append("%s")
825     elif numfields.Matches(field):
826       format_fields.append("%*s")
827     else:
828       format_fields.append("%-*s")
829
830   if separator is None:
831     mlens = [0 for name in fields]
832     format = ' '.join(format_fields)
833   else:
834     format = separator.replace("%", "%%").join(format_fields)
835
836   for row in data:
837     for idx, val in enumerate(row):
838       if unitfields.Matches(fields[idx]):
839         try:
840           val = int(val)
841         except ValueError:
842           pass
843         else:
844           val = row[idx] = utils.FormatUnit(val, units)
845       val = row[idx] = str(val)
846       if separator is None:
847         mlens[idx] = max(mlens[idx], len(val))
848
849   result = []
850   if headers:
851     args = []
852     for idx, name in enumerate(fields):
853       hdr = headers[name]
854       if separator is None:
855         mlens[idx] = max(mlens[idx], len(hdr))
856         args.append(mlens[idx])
857       args.append(hdr)
858     result.append(format % tuple(args))
859
860   for line in data:
861     args = []
862     for idx in xrange(len(fields)):
863       if separator is None:
864         args.append(mlens[idx])
865       args.append(line[idx])
866     result.append(format % tuple(args))
867
868   return result
869
870
871 def FormatTimestamp(ts):
872   """Formats a given timestamp.
873
874   @type ts: timestamp
875   @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
876
877   @rtype: string
878   @returns: a string with the formatted timestamp
879
880   """
881   if not isinstance (ts, (tuple, list)) or len(ts) != 2:
882     return '?'
883   sec, usec = ts
884   return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
885
886
887 def ParseTimespec(value):
888   """Parse a time specification.
889
890   The following suffixed will be recognized:
891
892     - s: seconds
893     - m: minutes
894     - h: hours
895     - d: day
896     - w: weeks
897
898   Without any suffix, the value will be taken to be in seconds.
899
900   """
901   value = str(value)
902   if not value:
903     raise errors.OpPrereqError("Empty time specification passed")
904   suffix_map = {
905     's': 1,
906     'm': 60,
907     'h': 3600,
908     'd': 86400,
909     'w': 604800,
910     }
911   if value[-1] not in suffix_map:
912     try:
913       value = int(value)
914     except ValueError:
915       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
916   else:
917     multiplier = suffix_map[value[-1]]
918     value = value[:-1]
919     if not value: # no data left after stripping the suffix
920       raise errors.OpPrereqError("Invalid time specification (only"
921                                  " suffix passed)")
922     try:
923       value = int(value) * multiplier
924     except ValueError:
925       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
926   return value
927
928
929 def GetOnlineNodes(nodes, cl=None, nowarn=False):
930   """Returns the names of online nodes.
931
932   This function will also log a warning on stderr with the names of
933   the online nodes.
934
935   @param nodes: if not empty, use only this subset of nodes (minus the
936       offline ones)
937   @param cl: if not None, luxi client to use
938   @type nowarn: boolean
939   @param nowarn: by default, this function will output a note with the
940       offline nodes that are skipped; if this parameter is True the
941       note is not displayed
942
943   """
944   if cl is None:
945     cl = GetClient()
946
947   result = cl.QueryNodes(names=nodes, fields=["name", "offline"],
948                          use_locking=False)
949   offline = [row[0] for row in result if row[1]]
950   if offline and not nowarn:
951     ToStderr("Note: skipping offline node(s): %s" % ", ".join(offline))
952   return [row[0] for row in result if not row[1]]
953
954
955 def _ToStream(stream, txt, *args):
956   """Write a message to a stream, bypassing the logging system
957
958   @type stream: file object
959   @param stream: the file to which we should write
960   @type txt: str
961   @param txt: the message
962
963   """
964   if args:
965     args = tuple(args)
966     stream.write(txt % args)
967   else:
968     stream.write(txt)
969   stream.write('\n')
970   stream.flush()
971
972
973 def ToStdout(txt, *args):
974   """Write a message to stdout only, bypassing the logging system
975
976   This is just a wrapper over _ToStream.
977
978   @type txt: str
979   @param txt: the message
980
981   """
982   _ToStream(sys.stdout, txt, *args)
983
984
985 def ToStderr(txt, *args):
986   """Write a message to stderr only, bypassing the logging system
987
988   This is just a wrapper over _ToStream.
989
990   @type txt: str
991   @param txt: the message
992
993   """
994   _ToStream(sys.stderr, txt, *args)
995
996
997 class JobExecutor(object):
998   """Class which manages the submission and execution of multiple jobs.
999
1000   Note that instances of this class should not be reused between
1001   GetResults() calls.
1002
1003   """
1004   def __init__(self, cl=None, verbose=True):
1005     self.queue = []
1006     if cl is None:
1007       cl = GetClient()
1008     self.cl = cl
1009     self.verbose = verbose
1010
1011   def QueueJob(self, name, *ops):
1012     """Submit a job for execution.
1013
1014     @type name: string
1015     @param name: a description of the job, will be used in WaitJobSet
1016     """
1017     job_id = SendJob(ops, cl=self.cl)
1018     self.queue.append((job_id, name))
1019
1020   def GetResults(self):
1021     """Wait for and return the results of all jobs.
1022
1023     @rtype: list
1024     @return: list of tuples (success, job results), in the same order
1025         as the submitted jobs; if a job has failed, instead of the result
1026         there will be the error message
1027
1028     """
1029     results = []
1030     if self.verbose:
1031       ToStdout("Submitted jobs %s", ", ".join(row[0] for row in self.queue))
1032     for jid, name in self.queue:
1033       if self.verbose:
1034         ToStdout("Waiting for job %s for %s...", jid, name)
1035       try:
1036         job_result = PollJob(jid, cl=self.cl)
1037         success = True
1038       except (errors.GenericError, luxi.ProtocolError), err:
1039         _, job_result = FormatError(err)
1040         success = False
1041         # the error message will always be shown, verbose or not
1042         ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
1043
1044       results.append((success, job_result))
1045     return results
1046
1047   def WaitOrShow(self, wait):
1048     """Wait for job results or only print the job IDs.
1049
1050     @type wait: boolean
1051     @param wait: whether to wait or not
1052
1053     """
1054     if wait:
1055       return self.GetResults()
1056     else:
1057       for jid, name in self.queue:
1058         ToStdout("%s: %s", jid, name)