Generate a shared HMAC key at cluster init time
[ganeti-local] / lib / cli.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module dealing with command line parsing"""
23
24
25 import sys
26 import textwrap
27 import os.path
28 import copy
29 import time
30 import logging
31 from cStringIO import StringIO
32
33 from ganeti import utils
34 from ganeti import errors
35 from ganeti import constants
36 from ganeti import opcodes
37 from ganeti import luxi
38 from ganeti import ssconf
39 from ganeti import rpc
40
41 from optparse import (OptionParser, make_option, TitledHelpFormatter,
42                       Option, OptionValueError)
43
44 __all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain",
45            "SubmitOpCode", "GetClient",
46            "cli_option", "ikv_option", "keyval_option",
47            "GenerateTable", "AskUser",
48            "ARGS_NONE", "ARGS_FIXED", "ARGS_ATLEAST", "ARGS_ANY", "ARGS_ONE",
49            "USEUNITS_OPT", "FIELDS_OPT", "FORCE_OPT", "SUBMIT_OPT",
50            "ListTags", "AddTags", "RemoveTags", "TAG_SRC_OPT",
51            "FormatError", "SplitNodeOption", "SubmitOrSend",
52            "JobSubmittedException", "FormatTimestamp", "ParseTimespec",
53            "ToStderr", "ToStdout", "UsesRPC",
54            "GetOnlineNodes", "JobExecutor", "SYNC_OPT", "CONFIRM_OPT",
55            ]
56
57 NO_PREFIX = "no_"
58 UN_PREFIX = "-"
59
60 def _ExtractTagsObject(opts, args):
61   """Extract the tag type object.
62
63   Note that this function will modify its args parameter.
64
65   """
66   if not hasattr(opts, "tag_type"):
67     raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
68   kind = opts.tag_type
69   if kind == constants.TAG_CLUSTER:
70     retval = kind, kind
71   elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
72     if not args:
73       raise errors.OpPrereqError("no arguments passed to the command")
74     name = args.pop(0)
75     retval = kind, name
76   else:
77     raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
78   return retval
79
80
81 def _ExtendTags(opts, args):
82   """Extend the args if a source file has been given.
83
84   This function will extend the tags with the contents of the file
85   passed in the 'tags_source' attribute of the opts parameter. A file
86   named '-' will be replaced by stdin.
87
88   """
89   fname = opts.tags_source
90   if fname is None:
91     return
92   if fname == "-":
93     new_fh = sys.stdin
94   else:
95     new_fh = open(fname, "r")
96   new_data = []
97   try:
98     # we don't use the nice 'new_data = [line.strip() for line in fh]'
99     # because of python bug 1633941
100     while True:
101       line = new_fh.readline()
102       if not line:
103         break
104       new_data.append(line.strip())
105   finally:
106     new_fh.close()
107   args.extend(new_data)
108
109
110 def ListTags(opts, args):
111   """List the tags on a given object.
112
113   This is a generic implementation that knows how to deal with all
114   three cases of tag objects (cluster, node, instance). The opts
115   argument is expected to contain a tag_type field denoting what
116   object type we work on.
117
118   """
119   kind, name = _ExtractTagsObject(opts, args)
120   op = opcodes.OpGetTags(kind=kind, name=name)
121   result = SubmitOpCode(op)
122   result = list(result)
123   result.sort()
124   for tag in result:
125     print tag
126
127
128 def AddTags(opts, args):
129   """Add tags on a given object.
130
131   This is a generic implementation that knows how to deal with all
132   three cases of tag objects (cluster, node, instance). The opts
133   argument is expected to contain a tag_type field denoting what
134   object type we work on.
135
136   """
137   kind, name = _ExtractTagsObject(opts, args)
138   _ExtendTags(opts, args)
139   if not args:
140     raise errors.OpPrereqError("No tags to be added")
141   op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
142   SubmitOpCode(op)
143
144
145 def RemoveTags(opts, args):
146   """Remove tags from a given object.
147
148   This is a generic implementation that knows how to deal with all
149   three cases of tag objects (cluster, node, instance). The opts
150   argument is expected to contain a tag_type field denoting what
151   object type we work on.
152
153   """
154   kind, name = _ExtractTagsObject(opts, args)
155   _ExtendTags(opts, args)
156   if not args:
157     raise errors.OpPrereqError("No tags to be removed")
158   op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
159   SubmitOpCode(op)
160
161
162 DEBUG_OPT = make_option("-d", "--debug", default=False,
163                         action="store_true",
164                         help="Turn debugging on")
165
166 NOHDR_OPT = make_option("--no-headers", default=False,
167                         action="store_true", dest="no_headers",
168                         help="Don't display column headers")
169
170 SEP_OPT = make_option("--separator", default=None,
171                       action="store", dest="separator",
172                       help="Separator between output fields"
173                       " (defaults to one space)")
174
175 USEUNITS_OPT = make_option("--units", default=None,
176                            dest="units", choices=('h', 'm', 'g', 't'),
177                            help="Specify units for output (one of hmgt)")
178
179 FIELDS_OPT = make_option("-o", "--output", dest="output", action="store",
180                          type="string", help="Comma separated list of"
181                          " output fields",
182                          metavar="FIELDS")
183
184 FORCE_OPT = make_option("-f", "--force", dest="force", action="store_true",
185                         default=False, help="Force the operation")
186
187 CONFIRM_OPT = make_option("--yes", dest="confirm", action="store_true",
188                           default=False, help="Do not require confirmation")
189
190 TAG_SRC_OPT = make_option("--from", dest="tags_source",
191                           default=None, help="File with tag names")
192
193 SUBMIT_OPT = make_option("--submit", dest="submit_only",
194                          default=False, action="store_true",
195                          help="Submit the job and return the job ID, but"
196                          " don't wait for the job to finish")
197
198 SYNC_OPT = make_option("--sync", dest="do_locking",
199                        default=False, action="store_true",
200                        help="Grab locks while doing the queries"
201                        " in order to ensure more consistent results")
202
203 _DRY_RUN_OPT = make_option("--dry-run", default=False,
204                           action="store_true",
205                           help="Do not execute the operation, just run the"
206                           " check steps and verify it it could be executed")
207
208
209 def ARGS_FIXED(val):
210   """Macro-like function denoting a fixed number of arguments"""
211   return -val
212
213
214 def ARGS_ATLEAST(val):
215   """Macro-like function denoting a minimum number of arguments"""
216   return val
217
218
219 ARGS_NONE = None
220 ARGS_ONE = ARGS_FIXED(1)
221 ARGS_ANY = ARGS_ATLEAST(0)
222
223
224 def check_unit(option, opt, value):
225   """OptParsers custom converter for units.
226
227   """
228   try:
229     return utils.ParseUnit(value)
230   except errors.UnitParseError, err:
231     raise OptionValueError("option %s: %s" % (opt, err))
232
233
234 class CliOption(Option):
235   """Custom option class for optparse.
236
237   """
238   TYPES = Option.TYPES + ("unit",)
239   TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
240   TYPE_CHECKER["unit"] = check_unit
241
242
243 def _SplitKeyVal(opt, data):
244   """Convert a KeyVal string into a dict.
245
246   This function will convert a key=val[,...] string into a dict. Empty
247   values will be converted specially: keys which have the prefix 'no_'
248   will have the value=False and the prefix stripped, the others will
249   have value=True.
250
251   @type opt: string
252   @param opt: a string holding the option name for which we process the
253       data, used in building error messages
254   @type data: string
255   @param data: a string of the format key=val,key=val,...
256   @rtype: dict
257   @return: {key=val, key=val}
258   @raises errors.ParameterError: if there are duplicate keys
259
260   """
261   kv_dict = {}
262   if data:
263     for elem in data.split(","):
264       if "=" in elem:
265         key, val = elem.split("=", 1)
266       else:
267         if elem.startswith(NO_PREFIX):
268           key, val = elem[len(NO_PREFIX):], False
269         elif elem.startswith(UN_PREFIX):
270           key, val = elem[len(UN_PREFIX):], None
271         else:
272           key, val = elem, True
273       if key in kv_dict:
274         raise errors.ParameterError("Duplicate key '%s' in option %s" %
275                                     (key, opt))
276       kv_dict[key] = val
277   return kv_dict
278
279
280 def check_ident_key_val(option, opt, value):
281   """Custom parser for the IdentKeyVal option type.
282
283   """
284   if ":" not in value:
285     ident, rest = value, ''
286   else:
287     ident, rest = value.split(":", 1)
288
289   if ident.startswith(NO_PREFIX):
290     if rest:
291       msg = "Cannot pass options when removing parameter groups: %s" % value
292       raise errors.ParameterError(msg)
293     retval = (ident[len(NO_PREFIX):], False)
294   elif ident.startswith(UN_PREFIX):
295     if rest:
296       msg = "Cannot pass options when removing parameter groups: %s" % value
297       raise errors.ParameterError(msg)
298     retval = (ident[len(UN_PREFIX):], None)
299   else:
300     kv_dict = _SplitKeyVal(opt, rest)
301     retval = (ident, kv_dict)
302   return retval
303
304
305 class IdentKeyValOption(Option):
306   """Custom option class for ident:key=val,key=val options.
307
308   This will store the parsed values as a tuple (ident, {key: val}). As
309   such, multiple uses of this option via action=append is possible.
310
311   """
312   TYPES = Option.TYPES + ("identkeyval",)
313   TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
314   TYPE_CHECKER["identkeyval"] = check_ident_key_val
315
316
317 def check_key_val(option, opt, value):
318   """Custom parser for the KeyVal option type.
319
320   """
321   return _SplitKeyVal(opt, value)
322
323
324 class KeyValOption(Option):
325   """Custom option class for key=val,key=val options.
326
327   This will store the parsed values as a dict {key: val}.
328
329   """
330   TYPES = Option.TYPES + ("keyval",)
331   TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
332   TYPE_CHECKER["keyval"] = check_key_val
333
334
335 # optparse.py sets make_option, so we do it for our own option class, too
336 cli_option = CliOption
337 ikv_option = IdentKeyValOption
338 keyval_option = KeyValOption
339
340
341 def _ParseArgs(argv, commands, aliases):
342   """Parser for the command line arguments.
343
344   This function parses the arguments and returns the function which
345   must be executed together with its (modified) arguments.
346
347   @param argv: the command line
348   @param commands: dictionary with special contents, see the design
349       doc for cmdline handling
350   @param aliases: dictionary with command aliases {'alias': 'target, ...}
351
352   """
353   if len(argv) == 0:
354     binary = "<command>"
355   else:
356     binary = argv[0].split("/")[-1]
357
358   if len(argv) > 1 and argv[1] == "--version":
359     print "%s (ganeti) %s" % (binary, constants.RELEASE_VERSION)
360     # Quit right away. That way we don't have to care about this special
361     # argument. optparse.py does it the same.
362     sys.exit(0)
363
364   if len(argv) < 2 or not (argv[1] in commands or
365                            argv[1] in aliases):
366     # let's do a nice thing
367     sortedcmds = commands.keys()
368     sortedcmds.sort()
369     print ("Usage: %(bin)s {command} [options...] [argument...]"
370            "\n%(bin)s <command> --help to see details, or"
371            " man %(bin)s\n" % {"bin": binary})
372     # compute the max line length for cmd + usage
373     mlen = max([len(" %s" % cmd) for cmd in commands])
374     mlen = min(60, mlen) # should not get here...
375     # and format a nice command list
376     print "Commands:"
377     for cmd in sortedcmds:
378       cmdstr = " %s" % (cmd,)
379       help_text = commands[cmd][4]
380       help_lines = textwrap.wrap(help_text, 79-3-mlen)
381       print "%-*s - %s" % (mlen, cmdstr, help_lines.pop(0))
382       for line in help_lines:
383         print "%-*s   %s" % (mlen, "", line)
384     print
385     return None, None, None
386
387   # get command, unalias it, and look it up in commands
388   cmd = argv.pop(1)
389   if cmd in aliases:
390     if cmd in commands:
391       raise errors.ProgrammerError("Alias '%s' overrides an existing"
392                                    " command" % cmd)
393
394     if aliases[cmd] not in commands:
395       raise errors.ProgrammerError("Alias '%s' maps to non-existing"
396                                    " command '%s'" % (cmd, aliases[cmd]))
397
398     cmd = aliases[cmd]
399
400   func, nargs, parser_opts, usage, description = commands[cmd]
401   parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT],
402                         description=description,
403                         formatter=TitledHelpFormatter(),
404                         usage="%%prog %s %s" % (cmd, usage))
405   parser.disable_interspersed_args()
406   options, args = parser.parse_args()
407   if nargs is None:
408     if len(args) != 0:
409       print >> sys.stderr, ("Error: Command %s expects no arguments" % cmd)
410       return None, None, None
411   elif nargs < 0 and len(args) != -nargs:
412     print >> sys.stderr, ("Error: Command %s expects %d argument(s)" %
413                          (cmd, -nargs))
414     return None, None, None
415   elif nargs >= 0 and len(args) < nargs:
416     print >> sys.stderr, ("Error: Command %s expects at least %d argument(s)" %
417                          (cmd, nargs))
418     return None, None, None
419
420   return func, options, args
421
422
423 def SplitNodeOption(value):
424   """Splits the value of a --node option.
425
426   """
427   if value and ':' in value:
428     return value.split(':', 1)
429   else:
430     return (value, None)
431
432
433 def UsesRPC(fn):
434   def wrapper(*args, **kwargs):
435     rpc.Init()
436     try:
437       return fn(*args, **kwargs)
438     finally:
439       rpc.Shutdown()
440   return wrapper
441
442
443 def AskUser(text, choices=None):
444   """Ask the user a question.
445
446   @param text: the question to ask
447
448   @param choices: list with elements tuples (input_char, return_value,
449       description); if not given, it will default to: [('y', True,
450       'Perform the operation'), ('n', False, 'Do no do the operation')];
451       note that the '?' char is reserved for help
452
453   @return: one of the return values from the choices list; if input is
454       not possible (i.e. not running with a tty, we return the last
455       entry from the list
456
457   """
458   if choices is None:
459     choices = [('y', True, 'Perform the operation'),
460                ('n', False, 'Do not perform the operation')]
461   if not choices or not isinstance(choices, list):
462     raise errors.ProgrammerError("Invalid choices argument to AskUser")
463   for entry in choices:
464     if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
465       raise errors.ProgrammerError("Invalid choices element to AskUser")
466
467   answer = choices[-1][1]
468   new_text = []
469   for line in text.splitlines():
470     new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
471   text = "\n".join(new_text)
472   try:
473     f = file("/dev/tty", "a+")
474   except IOError:
475     return answer
476   try:
477     chars = [entry[0] for entry in choices]
478     chars[-1] = "[%s]" % chars[-1]
479     chars.append('?')
480     maps = dict([(entry[0], entry[1]) for entry in choices])
481     while True:
482       f.write(text)
483       f.write('\n')
484       f.write("/".join(chars))
485       f.write(": ")
486       line = f.readline(2).strip().lower()
487       if line in maps:
488         answer = maps[line]
489         break
490       elif line == '?':
491         for entry in choices:
492           f.write(" %s - %s\n" % (entry[0], entry[2]))
493         f.write("\n")
494         continue
495   finally:
496     f.close()
497   return answer
498
499
500 class JobSubmittedException(Exception):
501   """Job was submitted, client should exit.
502
503   This exception has one argument, the ID of the job that was
504   submitted. The handler should print this ID.
505
506   This is not an error, just a structured way to exit from clients.
507
508   """
509
510
511 def SendJob(ops, cl=None):
512   """Function to submit an opcode without waiting for the results.
513
514   @type ops: list
515   @param ops: list of opcodes
516   @type cl: luxi.Client
517   @param cl: the luxi client to use for communicating with the master;
518              if None, a new client will be created
519
520   """
521   if cl is None:
522     cl = GetClient()
523
524   job_id = cl.SubmitJob(ops)
525
526   return job_id
527
528
529 def PollJob(job_id, cl=None, feedback_fn=None):
530   """Function to poll for the result of a job.
531
532   @type job_id: job identified
533   @param job_id: the job to poll for results
534   @type cl: luxi.Client
535   @param cl: the luxi client to use for communicating with the master;
536              if None, a new client will be created
537
538   """
539   if cl is None:
540     cl = GetClient()
541
542   prev_job_info = None
543   prev_logmsg_serial = None
544
545   while True:
546     result = cl.WaitForJobChange(job_id, ["status"], prev_job_info,
547                                  prev_logmsg_serial)
548     if not result:
549       # job not found, go away!
550       raise errors.JobLost("Job with id %s lost" % job_id)
551
552     # Split result, a tuple of (field values, log entries)
553     (job_info, log_entries) = result
554     (status, ) = job_info
555
556     if log_entries:
557       for log_entry in log_entries:
558         (serial, timestamp, _, message) = log_entry
559         if callable(feedback_fn):
560           feedback_fn(log_entry[1:])
561         else:
562           encoded = utils.SafeEncode(message)
563           print "%s %s" % (time.ctime(utils.MergeTime(timestamp)), encoded)
564         prev_logmsg_serial = max(prev_logmsg_serial, serial)
565
566     # TODO: Handle canceled and archived jobs
567     elif status in (constants.JOB_STATUS_SUCCESS,
568                     constants.JOB_STATUS_ERROR,
569                     constants.JOB_STATUS_CANCELING,
570                     constants.JOB_STATUS_CANCELED):
571       break
572
573     prev_job_info = job_info
574
575   jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
576   if not jobs:
577     raise errors.JobLost("Job with id %s lost" % job_id)
578
579   status, opstatus, result = jobs[0]
580   if status == constants.JOB_STATUS_SUCCESS:
581     return result
582   elif status in (constants.JOB_STATUS_CANCELING,
583                   constants.JOB_STATUS_CANCELED):
584     raise errors.OpExecError("Job was canceled")
585   else:
586     has_ok = False
587     for idx, (status, msg) in enumerate(zip(opstatus, result)):
588       if status == constants.OP_STATUS_SUCCESS:
589         has_ok = True
590       elif status == constants.OP_STATUS_ERROR:
591         if has_ok:
592           raise errors.OpExecError("partial failure (opcode %d): %s" %
593                                    (idx, msg))
594         else:
595           raise errors.OpExecError(str(msg))
596     # default failure mode
597     raise errors.OpExecError(result)
598
599
600 def SubmitOpCode(op, cl=None, feedback_fn=None):
601   """Legacy function to submit an opcode.
602
603   This is just a simple wrapper over the construction of the processor
604   instance. It should be extended to better handle feedback and
605   interaction functions.
606
607   """
608   if cl is None:
609     cl = GetClient()
610
611   job_id = SendJob([op], cl)
612
613   op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
614
615   return op_results[0]
616
617
618 def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
619   """Wrapper around SubmitOpCode or SendJob.
620
621   This function will decide, based on the 'opts' parameter, whether to
622   submit and wait for the result of the opcode (and return it), or
623   whether to just send the job and print its identifier. It is used in
624   order to simplify the implementation of the '--submit' option.
625
626   It will also add the dry-run parameter from the options passed, if true.
627
628   """
629   if opts and opts.dry_run:
630     op.dry_run = opts.dry_run
631   if opts and opts.submit_only:
632     job_id = SendJob([op], cl=cl)
633     raise JobSubmittedException(job_id)
634   else:
635     return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn)
636
637
638 def GetClient():
639   # TODO: Cache object?
640   try:
641     client = luxi.Client()
642   except luxi.NoMasterError:
643     master, myself = ssconf.GetMasterAndMyself()
644     if master != myself:
645       raise errors.OpPrereqError("This is not the master node, please connect"
646                                  " to node '%s' and rerun the command" %
647                                  master)
648     else:
649       raise
650   return client
651
652
653 def FormatError(err):
654   """Return a formatted error message for a given error.
655
656   This function takes an exception instance and returns a tuple
657   consisting of two values: first, the recommended exit code, and
658   second, a string describing the error message (not
659   newline-terminated).
660
661   """
662   retcode = 1
663   obuf = StringIO()
664   msg = str(err)
665   if isinstance(err, errors.ConfigurationError):
666     txt = "Corrupt configuration file: %s" % msg
667     logging.error(txt)
668     obuf.write(txt + "\n")
669     obuf.write("Aborting.")
670     retcode = 2
671   elif isinstance(err, errors.HooksAbort):
672     obuf.write("Failure: hooks execution failed:\n")
673     for node, script, out in err.args[0]:
674       if out:
675         obuf.write("  node: %s, script: %s, output: %s\n" %
676                    (node, script, out))
677       else:
678         obuf.write("  node: %s, script: %s (no output)\n" %
679                    (node, script))
680   elif isinstance(err, errors.HooksFailure):
681     obuf.write("Failure: hooks general failure: %s" % msg)
682   elif isinstance(err, errors.ResolverError):
683     this_host = utils.HostInfo.SysName()
684     if err.args[0] == this_host:
685       msg = "Failure: can't resolve my own hostname ('%s')"
686     else:
687       msg = "Failure: can't resolve hostname '%s'"
688     obuf.write(msg % err.args[0])
689   elif isinstance(err, errors.OpPrereqError):
690     obuf.write("Failure: prerequisites not met for this"
691                " operation:\n%s" % msg)
692   elif isinstance(err, errors.OpExecError):
693     obuf.write("Failure: command execution error:\n%s" % msg)
694   elif isinstance(err, errors.TagError):
695     obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
696   elif isinstance(err, errors.JobQueueDrainError):
697     obuf.write("Failure: the job queue is marked for drain and doesn't"
698                " accept new requests\n")
699   elif isinstance(err, errors.JobQueueFull):
700     obuf.write("Failure: the job queue is full and doesn't accept new"
701                " job submissions until old jobs are archived\n")
702   elif isinstance(err, errors.TypeEnforcementError):
703     obuf.write("Parameter Error: %s" % msg)
704   elif isinstance(err, errors.ParameterError):
705     obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
706   elif isinstance(err, errors.GenericError):
707     obuf.write("Unhandled Ganeti error: %s" % msg)
708   elif isinstance(err, luxi.NoMasterError):
709     obuf.write("Cannot communicate with the master daemon.\nIs it running"
710                " and listening for connections?")
711   elif isinstance(err, luxi.TimeoutError):
712     obuf.write("Timeout while talking to the master daemon. Error:\n"
713                "%s" % msg)
714   elif isinstance(err, luxi.ProtocolError):
715     obuf.write("Unhandled protocol error while talking to the master daemon:\n"
716                "%s" % msg)
717   elif isinstance(err, JobSubmittedException):
718     obuf.write("JobID: %s\n" % err.args[0])
719     retcode = 0
720   else:
721     obuf.write("Unhandled exception: %s" % msg)
722   return retcode, obuf.getvalue().rstrip('\n')
723
724
725 def GenericMain(commands, override=None, aliases=None):
726   """Generic main function for all the gnt-* commands.
727
728   Arguments:
729     - commands: a dictionary with a special structure, see the design doc
730                 for command line handling.
731     - override: if not None, we expect a dictionary with keys that will
732                 override command line options; this can be used to pass
733                 options from the scripts to generic functions
734     - aliases: dictionary with command aliases {'alias': 'target, ...}
735
736   """
737   # save the program name and the entire command line for later logging
738   if sys.argv:
739     binary = os.path.basename(sys.argv[0]) or sys.argv[0]
740     if len(sys.argv) >= 2:
741       binary += " " + sys.argv[1]
742       old_cmdline = " ".join(sys.argv[2:])
743     else:
744       old_cmdline = ""
745   else:
746     binary = "<unknown program>"
747     old_cmdline = ""
748
749   if aliases is None:
750     aliases = {}
751
752   try:
753     func, options, args = _ParseArgs(sys.argv, commands, aliases)
754   except errors.ParameterError, err:
755     result, err_msg = FormatError(err)
756     ToStderr(err_msg)
757     return 1
758
759   if func is None: # parse error
760     return 1
761
762   if override is not None:
763     for key, val in override.iteritems():
764       setattr(options, key, val)
765
766   utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
767                      stderr_logging=True, program=binary)
768
769   utils.debug = options.debug
770
771   if old_cmdline:
772     logging.info("run with arguments '%s'", old_cmdline)
773   else:
774     logging.info("run with no arguments")
775
776   try:
777     result = func(options, args)
778   except (errors.GenericError, luxi.ProtocolError,
779           JobSubmittedException), err:
780     result, err_msg = FormatError(err)
781     logging.exception("Error during command processing")
782     ToStderr(err_msg)
783
784   return result
785
786
787 def GenerateTable(headers, fields, separator, data,
788                   numfields=None, unitfields=None,
789                   units=None):
790   """Prints a table with headers and different fields.
791
792   @type headers: dict
793   @param headers: dictionary mapping field names to headers for
794       the table
795   @type fields: list
796   @param fields: the field names corresponding to each row in
797       the data field
798   @param separator: the separator to be used; if this is None,
799       the default 'smart' algorithm is used which computes optimal
800       field width, otherwise just the separator is used between
801       each field
802   @type data: list
803   @param data: a list of lists, each sublist being one row to be output
804   @type numfields: list
805   @param numfields: a list with the fields that hold numeric
806       values and thus should be right-aligned
807   @type unitfields: list
808   @param unitfields: a list with the fields that hold numeric
809       values that should be formatted with the units field
810   @type units: string or None
811   @param units: the units we should use for formatting, or None for
812       automatic choice (human-readable for non-separator usage, otherwise
813       megabytes); this is a one-letter string
814
815   """
816   if units is None:
817     if separator:
818       units = "m"
819     else:
820       units = "h"
821
822   if numfields is None:
823     numfields = []
824   if unitfields is None:
825     unitfields = []
826
827   numfields = utils.FieldSet(*numfields)
828   unitfields = utils.FieldSet(*unitfields)
829
830   format_fields = []
831   for field in fields:
832     if headers and field not in headers:
833       # TODO: handle better unknown fields (either revert to old
834       # style of raising exception, or deal more intelligently with
835       # variable fields)
836       headers[field] = field
837     if separator is not None:
838       format_fields.append("%s")
839     elif numfields.Matches(field):
840       format_fields.append("%*s")
841     else:
842       format_fields.append("%-*s")
843
844   if separator is None:
845     mlens = [0 for name in fields]
846     format = ' '.join(format_fields)
847   else:
848     format = separator.replace("%", "%%").join(format_fields)
849
850   for row in data:
851     if row is None:
852       continue
853     for idx, val in enumerate(row):
854       if unitfields.Matches(fields[idx]):
855         try:
856           val = int(val)
857         except ValueError:
858           pass
859         else:
860           val = row[idx] = utils.FormatUnit(val, units)
861       val = row[idx] = str(val)
862       if separator is None:
863         mlens[idx] = max(mlens[idx], len(val))
864
865   result = []
866   if headers:
867     args = []
868     for idx, name in enumerate(fields):
869       hdr = headers[name]
870       if separator is None:
871         mlens[idx] = max(mlens[idx], len(hdr))
872         args.append(mlens[idx])
873       args.append(hdr)
874     result.append(format % tuple(args))
875
876   for line in data:
877     args = []
878     if line is None:
879       line = ['-' for _ in fields]
880     for idx in xrange(len(fields)):
881       if separator is None:
882         args.append(mlens[idx])
883       args.append(line[idx])
884     result.append(format % tuple(args))
885
886   return result
887
888
889 def FormatTimestamp(ts):
890   """Formats a given timestamp.
891
892   @type ts: timestamp
893   @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
894
895   @rtype: string
896   @return: a string with the formatted timestamp
897
898   """
899   if not isinstance (ts, (tuple, list)) or len(ts) != 2:
900     return '?'
901   sec, usec = ts
902   return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
903
904
905 def ParseTimespec(value):
906   """Parse a time specification.
907
908   The following suffixed will be recognized:
909
910     - s: seconds
911     - m: minutes
912     - h: hours
913     - d: day
914     - w: weeks
915
916   Without any suffix, the value will be taken to be in seconds.
917
918   """
919   value = str(value)
920   if not value:
921     raise errors.OpPrereqError("Empty time specification passed")
922   suffix_map = {
923     's': 1,
924     'm': 60,
925     'h': 3600,
926     'd': 86400,
927     'w': 604800,
928     }
929   if value[-1] not in suffix_map:
930     try:
931       value = int(value)
932     except ValueError:
933       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
934   else:
935     multiplier = suffix_map[value[-1]]
936     value = value[:-1]
937     if not value: # no data left after stripping the suffix
938       raise errors.OpPrereqError("Invalid time specification (only"
939                                  " suffix passed)")
940     try:
941       value = int(value) * multiplier
942     except ValueError:
943       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
944   return value
945
946
947 def GetOnlineNodes(nodes, cl=None, nowarn=False):
948   """Returns the names of online nodes.
949
950   This function will also log a warning on stderr with the names of
951   the online nodes.
952
953   @param nodes: if not empty, use only this subset of nodes (minus the
954       offline ones)
955   @param cl: if not None, luxi client to use
956   @type nowarn: boolean
957   @param nowarn: by default, this function will output a note with the
958       offline nodes that are skipped; if this parameter is True the
959       note is not displayed
960
961   """
962   if cl is None:
963     cl = GetClient()
964
965   result = cl.QueryNodes(names=nodes, fields=["name", "offline"],
966                          use_locking=False)
967   offline = [row[0] for row in result if row[1]]
968   if offline and not nowarn:
969     ToStderr("Note: skipping offline node(s): %s" % ", ".join(offline))
970   return [row[0] for row in result if not row[1]]
971
972
973 def _ToStream(stream, txt, *args):
974   """Write a message to a stream, bypassing the logging system
975
976   @type stream: file object
977   @param stream: the file to which we should write
978   @type txt: str
979   @param txt: the message
980
981   """
982   if args:
983     args = tuple(args)
984     stream.write(txt % args)
985   else:
986     stream.write(txt)
987   stream.write('\n')
988   stream.flush()
989
990
991 def ToStdout(txt, *args):
992   """Write a message to stdout only, bypassing the logging system
993
994   This is just a wrapper over _ToStream.
995
996   @type txt: str
997   @param txt: the message
998
999   """
1000   _ToStream(sys.stdout, txt, *args)
1001
1002
1003 def ToStderr(txt, *args):
1004   """Write a message to stderr only, bypassing the logging system
1005
1006   This is just a wrapper over _ToStream.
1007
1008   @type txt: str
1009   @param txt: the message
1010
1011   """
1012   _ToStream(sys.stderr, txt, *args)
1013
1014
1015 class JobExecutor(object):
1016   """Class which manages the submission and execution of multiple jobs.
1017
1018   Note that instances of this class should not be reused between
1019   GetResults() calls.
1020
1021   """
1022   def __init__(self, cl=None, verbose=True):
1023     self.queue = []
1024     if cl is None:
1025       cl = GetClient()
1026     self.cl = cl
1027     self.verbose = verbose
1028     self.jobs = []
1029
1030   def QueueJob(self, name, *ops):
1031     """Record a job for later submit.
1032
1033     @type name: string
1034     @param name: a description of the job, will be used in WaitJobSet
1035     """
1036     self.queue.append((name, ops))
1037
1038   def SubmitPending(self):
1039     """Submit all pending jobs.
1040
1041     """
1042     results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
1043     for ((status, data), (name, _)) in zip(results, self.queue):
1044       self.jobs.append((status, data, name))
1045
1046   def GetResults(self):
1047     """Wait for and return the results of all jobs.
1048
1049     @rtype: list
1050     @return: list of tuples (success, job results), in the same order
1051         as the submitted jobs; if a job has failed, instead of the result
1052         there will be the error message
1053
1054     """
1055     if not self.jobs:
1056       self.SubmitPending()
1057     results = []
1058     if self.verbose:
1059       ok_jobs = [row[1] for row in self.jobs if row[0]]
1060       if ok_jobs:
1061         ToStdout("Submitted jobs %s", ", ".join(ok_jobs))
1062     for submit_status, jid, name in self.jobs:
1063       if not submit_status:
1064         ToStderr("Failed to submit job for %s: %s", name, jid)
1065         results.append((False, jid))
1066         continue
1067       if self.verbose:
1068         ToStdout("Waiting for job %s for %s...", jid, name)
1069       try:
1070         job_result = PollJob(jid, cl=self.cl)
1071         success = True
1072       except (errors.GenericError, luxi.ProtocolError), err:
1073         _, job_result = FormatError(err)
1074         success = False
1075         # the error message will always be shown, verbose or not
1076         ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
1077
1078       results.append((success, job_result))
1079     return results
1080
1081   def WaitOrShow(self, wait):
1082     """Wait for job results or only print the job IDs.
1083
1084     @type wait: boolean
1085     @param wait: whether to wait or not
1086
1087     """
1088     if wait:
1089       return self.GetResults()
1090     else:
1091       if not self.jobs:
1092         self.SubmitPending()
1093       for status, result, name in self.jobs:
1094         if status:
1095           ToStdout("%s: %s", result, name)
1096         else:
1097           ToStderr("Failure for %s: %s", name, result)