4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module dealing with command line parsing"""
31 from cStringIO import StringIO
33 from ganeti import utils
34 from ganeti import errors
35 from ganeti import constants
36 from ganeti import opcodes
37 from ganeti import luxi
38 from ganeti import ssconf
39 from ganeti import rpc
41 from optparse import (OptionParser, make_option, TitledHelpFormatter,
42 Option, OptionValueError)
44 __all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain",
45 "SubmitOpCode", "GetClient",
46 "cli_option", "ikv_option", "keyval_option",
47 "GenerateTable", "AskUser",
48 "ARGS_NONE", "ARGS_FIXED", "ARGS_ATLEAST", "ARGS_ANY", "ARGS_ONE",
49 "USEUNITS_OPT", "FIELDS_OPT", "FORCE_OPT", "SUBMIT_OPT",
50 "ListTags", "AddTags", "RemoveTags", "TAG_SRC_OPT",
51 "FormatError", "SplitNodeOption", "SubmitOrSend",
52 "JobSubmittedException", "FormatTimestamp", "ParseTimespec",
53 "ValidateBeParams", "ToStderr", "ToStdout", "UsesRPC",
54 "GetOnlineNodes", "JobExecutor", "SYNC_OPT",
58 def _ExtractTagsObject(opts, args):
59 """Extract the tag type object.
61 Note that this function will modify its args parameter.
64 if not hasattr(opts, "tag_type"):
65 raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
67 if kind == constants.TAG_CLUSTER:
69 elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
71 raise errors.OpPrereqError("no arguments passed to the command")
75 raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
79 def _ExtendTags(opts, args):
80 """Extend the args if a source file has been given.
82 This function will extend the tags with the contents of the file
83 passed in the 'tags_source' attribute of the opts parameter. A file
84 named '-' will be replaced by stdin.
87 fname = opts.tags_source
93 new_fh = open(fname, "r")
96 # we don't use the nice 'new_data = [line.strip() for line in fh]'
97 # because of python bug 1633941
99 line = new_fh.readline()
102 new_data.append(line.strip())
105 args.extend(new_data)
108 def ListTags(opts, args):
109 """List the tags on a given object.
111 This is a generic implementation that knows how to deal with all
112 three cases of tag objects (cluster, node, instance). The opts
113 argument is expected to contain a tag_type field denoting what
114 object type we work on.
117 kind, name = _ExtractTagsObject(opts, args)
118 op = opcodes.OpGetTags(kind=kind, name=name)
119 result = SubmitOpCode(op)
120 result = list(result)
126 def AddTags(opts, args):
127 """Add tags on a given object.
129 This is a generic implementation that knows how to deal with all
130 three cases of tag objects (cluster, node, instance). The opts
131 argument is expected to contain a tag_type field denoting what
132 object type we work on.
135 kind, name = _ExtractTagsObject(opts, args)
136 _ExtendTags(opts, args)
138 raise errors.OpPrereqError("No tags to be added")
139 op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
143 def RemoveTags(opts, args):
144 """Remove tags from a given object.
146 This is a generic implementation that knows how to deal with all
147 three cases of tag objects (cluster, node, instance). The opts
148 argument is expected to contain a tag_type field denoting what
149 object type we work on.
152 kind, name = _ExtractTagsObject(opts, args)
153 _ExtendTags(opts, args)
155 raise errors.OpPrereqError("No tags to be removed")
156 op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
160 DEBUG_OPT = make_option("-d", "--debug", default=False,
162 help="Turn debugging on")
164 NOHDR_OPT = make_option("--no-headers", default=False,
165 action="store_true", dest="no_headers",
166 help="Don't display column headers")
168 SEP_OPT = make_option("--separator", default=None,
169 action="store", dest="separator",
170 help="Separator between output fields"
171 " (defaults to one space)")
173 USEUNITS_OPT = make_option("--units", default=None,
174 dest="units", choices=('h', 'm', 'g', 't'),
175 help="Specify units for output (one of hmgt)")
177 FIELDS_OPT = make_option("-o", "--output", dest="output", action="store",
178 type="string", help="Comma separated list of"
182 FORCE_OPT = make_option("-f", "--force", dest="force", action="store_true",
183 default=False, help="Force the operation")
185 TAG_SRC_OPT = make_option("--from", dest="tags_source",
186 default=None, help="File with tag names")
188 SUBMIT_OPT = make_option("--submit", dest="submit_only",
189 default=False, action="store_true",
190 help="Submit the job and return the job ID, but"
191 " don't wait for the job to finish")
193 SYNC_OPT = make_option("--sync", dest="do_locking",
194 default=False, action="store_true",
195 help="Grab locks while doing the queries"
196 " in order to ensure more consistent results")
200 """Macro-like function denoting a fixed number of arguments"""
204 def ARGS_ATLEAST(val):
205 """Macro-like function denoting a minimum number of arguments"""
210 ARGS_ONE = ARGS_FIXED(1)
211 ARGS_ANY = ARGS_ATLEAST(0)
214 def check_unit(option, opt, value):
215 """OptParsers custom converter for units.
219 return utils.ParseUnit(value)
220 except errors.UnitParseError, err:
221 raise OptionValueError("option %s: %s" % (opt, err))
224 class CliOption(Option):
225 """Custom option class for optparse.
228 TYPES = Option.TYPES + ("unit",)
229 TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
230 TYPE_CHECKER["unit"] = check_unit
233 def _SplitKeyVal(opt, data):
234 """Convert a KeyVal string into a dict.
236 This function will convert a key=val[,...] string into a dict. Empty
237 values will be converted specially: keys which have the prefix 'no_'
238 will have the value=False and the prefix stripped, the others will
242 @param opt: a string holding the option name for which we process the
243 data, used in building error messages
245 @param data: a string of the format key=val,key=val,...
247 @return: {key=val, key=val}
248 @raises errors.ParameterError: if there are duplicate keys
254 for elem in data.split(","):
256 key, val = elem.split("=", 1)
258 if elem.startswith(NO_PREFIX):
259 key, val = elem[len(NO_PREFIX):], False
260 elif elem.startswith(UN_PREFIX):
261 key, val = elem[len(UN_PREFIX):], None
263 key, val = elem, True
265 raise errors.ParameterError("Duplicate key '%s' in option %s" %
271 def check_ident_key_val(option, opt, value):
272 """Custom parser for the IdentKeyVal option type.
278 ident, rest = value.split(":", 1)
279 kv_dict = _SplitKeyVal(opt, rest)
280 retval = (ident, kv_dict)
284 class IdentKeyValOption(Option):
285 """Custom option class for ident:key=val,key=val options.
287 This will store the parsed values as a tuple (ident, {key: val}). As
288 such, multiple uses of this option via action=append is possible.
291 TYPES = Option.TYPES + ("identkeyval",)
292 TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
293 TYPE_CHECKER["identkeyval"] = check_ident_key_val
296 def check_key_val(option, opt, value):
297 """Custom parser for the KeyVal option type.
300 return _SplitKeyVal(opt, value)
303 class KeyValOption(Option):
304 """Custom option class for key=val,key=val options.
306 This will store the parsed values as a dict {key: val}.
309 TYPES = Option.TYPES + ("keyval",)
310 TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
311 TYPE_CHECKER["keyval"] = check_key_val
314 # optparse.py sets make_option, so we do it for our own option class, too
315 cli_option = CliOption
316 ikv_option = IdentKeyValOption
317 keyval_option = KeyValOption
320 def _ParseArgs(argv, commands, aliases):
321 """Parser for the command line arguments.
323 This function parses the arguements and returns the function which
324 must be executed together with its (modified) arguments.
326 @param argv: the command line
327 @param commands: dictionary with special contents, see the design
328 doc for cmdline handling
329 @param aliases: dictionary with command aliases {'alias': 'target, ...}
335 binary = argv[0].split("/")[-1]
337 if len(argv) > 1 and argv[1] == "--version":
338 print "%s (ganeti) %s" % (binary, constants.RELEASE_VERSION)
339 # Quit right away. That way we don't have to care about this special
340 # argument. optparse.py does it the same.
343 if len(argv) < 2 or not (argv[1] in commands or
345 # let's do a nice thing
346 sortedcmds = commands.keys()
348 print ("Usage: %(bin)s {command} [options...] [argument...]"
349 "\n%(bin)s <command> --help to see details, or"
350 " man %(bin)s\n" % {"bin": binary})
351 # compute the max line length for cmd + usage
352 mlen = max([len(" %s" % cmd) for cmd in commands])
353 mlen = min(60, mlen) # should not get here...
354 # and format a nice command list
356 for cmd in sortedcmds:
357 cmdstr = " %s" % (cmd,)
358 help_text = commands[cmd][4]
359 help_lines = textwrap.wrap(help_text, 79-3-mlen)
360 print "%-*s - %s" % (mlen, cmdstr, help_lines.pop(0))
361 for line in help_lines:
362 print "%-*s %s" % (mlen, "", line)
364 return None, None, None
366 # get command, unalias it, and look it up in commands
370 raise errors.ProgrammerError("Alias '%s' overrides an existing"
373 if aliases[cmd] not in commands:
374 raise errors.ProgrammerError("Alias '%s' maps to non-existing"
375 " command '%s'" % (cmd, aliases[cmd]))
379 func, nargs, parser_opts, usage, description = commands[cmd]
380 parser = OptionParser(option_list=parser_opts,
381 description=description,
382 formatter=TitledHelpFormatter(),
383 usage="%%prog %s %s" % (cmd, usage))
384 parser.disable_interspersed_args()
385 options, args = parser.parse_args()
388 print >> sys.stderr, ("Error: Command %s expects no arguments" % cmd)
389 return None, None, None
390 elif nargs < 0 and len(args) != -nargs:
391 print >> sys.stderr, ("Error: Command %s expects %d argument(s)" %
393 return None, None, None
394 elif nargs >= 0 and len(args) < nargs:
395 print >> sys.stderr, ("Error: Command %s expects at least %d argument(s)" %
397 return None, None, None
399 return func, options, args
402 def SplitNodeOption(value):
403 """Splits the value of a --node option.
406 if value and ':' in value:
407 return value.split(':', 1)
412 def ValidateBeParams(bep):
413 """Parse and check the given beparams.
415 The function will update in-place the given dictionary.
418 @param bep: input beparams
419 @raise errors.ParameterError: if the input values are not OK
420 @raise errors.UnitParseError: if the input values are not OK
423 if constants.BE_MEMORY in bep:
424 bep[constants.BE_MEMORY] = utils.ParseUnit(bep[constants.BE_MEMORY])
426 if constants.BE_VCPUS in bep:
428 bep[constants.BE_VCPUS] = int(bep[constants.BE_VCPUS])
430 raise errors.ParameterError("Invalid number of VCPUs")
434 def wrapper(*args, **kwargs):
437 return fn(*args, **kwargs)
443 def AskUser(text, choices=None):
444 """Ask the user a question.
446 @param text: the question to ask
448 @param choices: list with elements tuples (input_char, return_value,
449 description); if not given, it will default to: [('y', True,
450 'Perform the operation'), ('n', False, 'Do no do the operation')];
451 note that the '?' char is reserved for help
453 @return: one of the return values from the choices list; if input is
454 not possible (i.e. not running with a tty, we return the last
459 choices = [('y', True, 'Perform the operation'),
460 ('n', False, 'Do not perform the operation')]
461 if not choices or not isinstance(choices, list):
462 raise errors.ProgrammerError("Invalid choiches argument to AskUser")
463 for entry in choices:
464 if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
465 raise errors.ProgrammerError("Invalid choiches element to AskUser")
467 answer = choices[-1][1]
469 for line in text.splitlines():
470 new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
471 text = "\n".join(new_text)
473 f = file("/dev/tty", "a+")
477 chars = [entry[0] for entry in choices]
478 chars[-1] = "[%s]" % chars[-1]
480 maps = dict([(entry[0], entry[1]) for entry in choices])
484 f.write("/".join(chars))
486 line = f.readline(2).strip().lower()
491 for entry in choices:
492 f.write(" %s - %s\n" % (entry[0], entry[2]))
500 class JobSubmittedException(Exception):
501 """Job was submitted, client should exit.
503 This exception has one argument, the ID of the job that was
504 submitted. The handler should print this ID.
506 This is not an error, just a structured way to exit from clients.
511 def SendJob(ops, cl=None):
512 """Function to submit an opcode without waiting for the results.
515 @param ops: list of opcodes
516 @type cl: luxi.Client
517 @param cl: the luxi client to use for communicating with the master;
518 if None, a new client will be created
524 job_id = cl.SubmitJob(ops)
529 def PollJob(job_id, cl=None, feedback_fn=None):
530 """Function to poll for the result of a job.
532 @type job_id: job identified
533 @param job_id: the job to poll for results
534 @type cl: luxi.Client
535 @param cl: the luxi client to use for communicating with the master;
536 if None, a new client will be created
543 prev_logmsg_serial = None
546 result = cl.WaitForJobChange(job_id, ["status"], prev_job_info,
549 # job not found, go away!
550 raise errors.JobLost("Job with id %s lost" % job_id)
552 # Split result, a tuple of (field values, log entries)
553 (job_info, log_entries) = result
554 (status, ) = job_info
557 for log_entry in log_entries:
558 (serial, timestamp, _, message) = log_entry
559 if callable(feedback_fn):
560 feedback_fn(log_entry[1:])
562 encoded = utils.SafeEncode(message)
563 print "%s %s" % (time.ctime(utils.MergeTime(timestamp)), encoded)
564 prev_logmsg_serial = max(prev_logmsg_serial, serial)
566 # TODO: Handle canceled and archived jobs
567 elif status in (constants.JOB_STATUS_SUCCESS,
568 constants.JOB_STATUS_ERROR,
569 constants.JOB_STATUS_CANCELING,
570 constants.JOB_STATUS_CANCELED):
573 prev_job_info = job_info
575 jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
577 raise errors.JobLost("Job with id %s lost" % job_id)
579 status, opstatus, result = jobs[0]
580 if status == constants.JOB_STATUS_SUCCESS:
582 elif status in (constants.JOB_STATUS_CANCELING,
583 constants.JOB_STATUS_CANCELED):
584 raise errors.OpExecError("Job was canceled")
587 for idx, (status, msg) in enumerate(zip(opstatus, result)):
588 if status == constants.OP_STATUS_SUCCESS:
590 elif status == constants.OP_STATUS_ERROR:
592 raise errors.OpExecError("partial failure (opcode %d): %s" %
595 raise errors.OpExecError(str(msg))
596 # default failure mode
597 raise errors.OpExecError(result)
600 def SubmitOpCode(op, cl=None, feedback_fn=None):
601 """Legacy function to submit an opcode.
603 This is just a simple wrapper over the construction of the processor
604 instance. It should be extended to better handle feedback and
605 interaction functions.
611 job_id = SendJob([op], cl)
613 op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
618 def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
619 """Wrapper around SubmitOpCode or SendJob.
621 This function will decide, based on the 'opts' parameter, whether to
622 submit and wait for the result of the opcode (and return it), or
623 whether to just send the job and print its identifier. It is used in
624 order to simplify the implementation of the '--submit' option.
627 if opts and opts.submit_only:
628 job_id = SendJob([op], cl=cl)
629 raise JobSubmittedException(job_id)
631 return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn)
635 # TODO: Cache object?
637 client = luxi.Client()
638 except luxi.NoMasterError:
639 master, myself = ssconf.GetMasterAndMyself()
641 raise errors.OpPrereqError("This is not the master node, please connect"
642 " to node '%s' and rerun the command" %
649 def FormatError(err):
650 """Return a formatted error message for a given error.
652 This function takes an exception instance and returns a tuple
653 consisting of two values: first, the recommended exit code, and
654 second, a string describing the error message (not
661 if isinstance(err, errors.ConfigurationError):
662 txt = "Corrupt configuration file: %s" % msg
664 obuf.write(txt + "\n")
665 obuf.write("Aborting.")
667 elif isinstance(err, errors.HooksAbort):
668 obuf.write("Failure: hooks execution failed:\n")
669 for node, script, out in err.args[0]:
671 obuf.write(" node: %s, script: %s, output: %s\n" %
674 obuf.write(" node: %s, script: %s (no output)\n" %
676 elif isinstance(err, errors.HooksFailure):
677 obuf.write("Failure: hooks general failure: %s" % msg)
678 elif isinstance(err, errors.ResolverError):
679 this_host = utils.HostInfo.SysName()
680 if err.args[0] == this_host:
681 msg = "Failure: can't resolve my own hostname ('%s')"
683 msg = "Failure: can't resolve hostname '%s'"
684 obuf.write(msg % err.args[0])
685 elif isinstance(err, errors.OpPrereqError):
686 obuf.write("Failure: prerequisites not met for this"
687 " operation:\n%s" % msg)
688 elif isinstance(err, errors.OpExecError):
689 obuf.write("Failure: command execution error:\n%s" % msg)
690 elif isinstance(err, errors.TagError):
691 obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
692 elif isinstance(err, errors.JobQueueDrainError):
693 obuf.write("Failure: the job queue is marked for drain and doesn't"
694 " accept new requests\n")
695 elif isinstance(err, errors.JobQueueFull):
696 obuf.write("Failure: the job queue is full and doesn't accept new"
697 " job submissions until old jobs are archived\n")
698 elif isinstance(err, errors.GenericError):
699 obuf.write("Unhandled Ganeti error: %s" % msg)
700 elif isinstance(err, luxi.NoMasterError):
701 obuf.write("Cannot communicate with the master daemon.\nIs it running"
702 " and listening for connections?")
703 elif isinstance(err, luxi.TimeoutError):
704 obuf.write("Timeout while talking to the master daemon. Error:\n"
706 elif isinstance(err, luxi.ProtocolError):
707 obuf.write("Unhandled protocol error while talking to the master daemon:\n"
709 elif isinstance(err, JobSubmittedException):
710 obuf.write("JobID: %s\n" % err.args[0])
713 obuf.write("Unhandled exception: %s" % msg)
714 return retcode, obuf.getvalue().rstrip('\n')
717 def GenericMain(commands, override=None, aliases=None):
718 """Generic main function for all the gnt-* commands.
721 - commands: a dictionary with a special structure, see the design doc
722 for command line handling.
723 - override: if not None, we expect a dictionary with keys that will
724 override command line options; this can be used to pass
725 options from the scripts to generic functions
726 - aliases: dictionary with command aliases {'alias': 'target, ...}
729 # save the program name and the entire command line for later logging
731 binary = os.path.basename(sys.argv[0]) or sys.argv[0]
732 if len(sys.argv) >= 2:
733 binary += " " + sys.argv[1]
734 old_cmdline = " ".join(sys.argv[2:])
738 binary = "<unknown program>"
744 func, options, args = _ParseArgs(sys.argv, commands, aliases)
745 if func is None: # parse error
748 if override is not None:
749 for key, val in override.iteritems():
750 setattr(options, key, val)
752 utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
753 stderr_logging=True, program=binary)
755 utils.debug = options.debug
758 logging.info("run with arguments '%s'", old_cmdline)
760 logging.info("run with no arguments")
763 result = func(options, args)
764 except (errors.GenericError, luxi.ProtocolError,
765 JobSubmittedException), err:
766 result, err_msg = FormatError(err)
767 logging.exception("Error durring command processing")
773 def GenerateTable(headers, fields, separator, data,
774 numfields=None, unitfields=None,
776 """Prints a table with headers and different fields.
779 @param headers: dictionary mapping field names to headers for
782 @param fields: the field names corresponding to each row in
784 @param separator: the separator to be used; if this is None,
785 the default 'smart' algorithm is used which computes optimal
786 field width, otherwise just the separator is used between
789 @param data: a list of lists, each sublist being one row to be output
790 @type numfields: list
791 @param numfields: a list with the fields that hold numeric
792 values and thus should be right-aligned
793 @type unitfields: list
794 @param unitfields: a list with the fields that hold numeric
795 values that should be formatted with the units field
796 @type units: string or None
797 @param units: the units we should use for formatting, or None for
798 automatic choice (human-readable for non-separator usage, otherwise
799 megabytes); this is a one-letter string
808 if numfields is None:
810 if unitfields is None:
813 numfields = utils.FieldSet(*numfields)
814 unitfields = utils.FieldSet(*unitfields)
818 if headers and field not in headers:
819 # FIXME: handle better unknown fields (either revert to old
820 # style of raising exception, or deal more intelligently with
822 headers[field] = field
823 if separator is not None:
824 format_fields.append("%s")
825 elif numfields.Matches(field):
826 format_fields.append("%*s")
828 format_fields.append("%-*s")
830 if separator is None:
831 mlens = [0 for name in fields]
832 format = ' '.join(format_fields)
834 format = separator.replace("%", "%%").join(format_fields)
837 for idx, val in enumerate(row):
838 if unitfields.Matches(fields[idx]):
844 val = row[idx] = utils.FormatUnit(val, units)
845 val = row[idx] = str(val)
846 if separator is None:
847 mlens[idx] = max(mlens[idx], len(val))
852 for idx, name in enumerate(fields):
854 if separator is None:
855 mlens[idx] = max(mlens[idx], len(hdr))
856 args.append(mlens[idx])
858 result.append(format % tuple(args))
862 for idx in xrange(len(fields)):
863 if separator is None:
864 args.append(mlens[idx])
865 args.append(line[idx])
866 result.append(format % tuple(args))
871 def FormatTimestamp(ts):
872 """Formats a given timestamp.
875 @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
878 @returns: a string with the formatted timestamp
881 if not isinstance (ts, (tuple, list)) or len(ts) != 2:
884 return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
887 def ParseTimespec(value):
888 """Parse a time specification.
890 The following suffixed will be recognized:
898 Without any suffix, the value will be taken to be in seconds.
903 raise errors.OpPrereqError("Empty time specification passed")
911 if value[-1] not in suffix_map:
915 raise errors.OpPrereqError("Invalid time specification '%s'" % value)
917 multiplier = suffix_map[value[-1]]
919 if not value: # no data left after stripping the suffix
920 raise errors.OpPrereqError("Invalid time specification (only"
923 value = int(value) * multiplier
925 raise errors.OpPrereqError("Invalid time specification '%s'" % value)
929 def GetOnlineNodes(nodes, cl=None, nowarn=False):
930 """Returns the names of online nodes.
932 This function will also log a warning on stderr with the names of
935 @param nodes: if not empty, use only this subset of nodes (minus the
937 @param cl: if not None, luxi client to use
938 @type nowarn: boolean
939 @param nowarn: by default, this function will output a note with the
940 offline nodes that are skipped; if this parameter is True the
941 note is not displayed
947 result = cl.QueryNodes(names=nodes, fields=["name", "offline"],
949 offline = [row[0] for row in result if row[1]]
950 if offline and not nowarn:
951 ToStderr("Note: skipping offline node(s): %s" % ", ".join(offline))
952 return [row[0] for row in result if not row[1]]
955 def _ToStream(stream, txt, *args):
956 """Write a message to a stream, bypassing the logging system
958 @type stream: file object
959 @param stream: the file to which we should write
961 @param txt: the message
966 stream.write(txt % args)
973 def ToStdout(txt, *args):
974 """Write a message to stdout only, bypassing the logging system
976 This is just a wrapper over _ToStream.
979 @param txt: the message
982 _ToStream(sys.stdout, txt, *args)
985 def ToStderr(txt, *args):
986 """Write a message to stderr only, bypassing the logging system
988 This is just a wrapper over _ToStream.
991 @param txt: the message
994 _ToStream(sys.stderr, txt, *args)
997 class JobExecutor(object):
998 """Class which manages the submission and execution of multiple jobs.
1000 Note that instances of this class should not be reused between
1004 def __init__(self, cl=None, verbose=True):
1009 self.verbose = verbose
1011 def QueueJob(self, name, *ops):
1012 """Submit a job for execution.
1015 @param name: a description of the job, will be used in WaitJobSet
1017 job_id = SendJob(ops, cl=self.cl)
1018 self.queue.append((job_id, name))
1020 def GetResults(self):
1021 """Wait for and return the results of all jobs.
1024 @return: list of tuples (success, job results), in the same order
1025 as the submitted jobs; if a job has failed, instead of the result
1026 there will be the error message
1031 ToStdout("Submitted jobs %s", ", ".join(row[0] for row in self.queue))
1032 for jid, name in self.queue:
1034 ToStdout("Waiting for job %s for %s...", jid, name)
1036 job_result = PollJob(jid, cl=self.cl)
1038 except (errors.GenericError, luxi.ProtocolError), err:
1039 _, job_result = FormatError(err)
1041 # the error message will always be shown, verbose or not
1042 ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
1044 results.append((success, job_result))
1047 def WaitOrShow(self, wait):
1048 """Wait for job results or only print the job IDs.
1051 @param wait: whether to wait or not
1055 return self.GetResults()
1057 for jid, name in self.queue:
1058 ToStdout("%s: %s", jid, name)