4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module dealing with command line parsing"""
31 from cStringIO import StringIO
33 from ganeti import utils
34 from ganeti import errors
35 from ganeti import constants
36 from ganeti import opcodes
37 from ganeti import luxi
38 from ganeti import ssconf
39 from ganeti import rpc
41 from optparse import (OptionParser, make_option, TitledHelpFormatter,
42 Option, OptionValueError)
44 __all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain",
45 "SubmitOpCode", "GetClient",
46 "cli_option", "ikv_option", "keyval_option",
47 "GenerateTable", "AskUser",
48 "ARGS_NONE", "ARGS_FIXED", "ARGS_ATLEAST", "ARGS_ANY", "ARGS_ONE",
49 "USEUNITS_OPT", "FIELDS_OPT", "FORCE_OPT", "SUBMIT_OPT",
50 "ListTags", "AddTags", "RemoveTags", "TAG_SRC_OPT",
51 "FormatError", "SplitNodeOption", "SubmitOrSend",
52 "JobSubmittedException", "FormatTimestamp", "ParseTimespec",
53 "ToStderr", "ToStdout", "UsesRPC",
54 "GetOnlineNodes", "JobExecutor", "SYNC_OPT",
58 def _ExtractTagsObject(opts, args):
59 """Extract the tag type object.
61 Note that this function will modify its args parameter.
64 if not hasattr(opts, "tag_type"):
65 raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
67 if kind == constants.TAG_CLUSTER:
69 elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
71 raise errors.OpPrereqError("no arguments passed to the command")
75 raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
79 def _ExtendTags(opts, args):
80 """Extend the args if a source file has been given.
82 This function will extend the tags with the contents of the file
83 passed in the 'tags_source' attribute of the opts parameter. A file
84 named '-' will be replaced by stdin.
87 fname = opts.tags_source
93 new_fh = open(fname, "r")
96 # we don't use the nice 'new_data = [line.strip() for line in fh]'
97 # because of python bug 1633941
99 line = new_fh.readline()
102 new_data.append(line.strip())
105 args.extend(new_data)
108 def ListTags(opts, args):
109 """List the tags on a given object.
111 This is a generic implementation that knows how to deal with all
112 three cases of tag objects (cluster, node, instance). The opts
113 argument is expected to contain a tag_type field denoting what
114 object type we work on.
117 kind, name = _ExtractTagsObject(opts, args)
118 op = opcodes.OpGetTags(kind=kind, name=name)
119 result = SubmitOpCode(op)
120 result = list(result)
126 def AddTags(opts, args):
127 """Add tags on a given object.
129 This is a generic implementation that knows how to deal with all
130 three cases of tag objects (cluster, node, instance). The opts
131 argument is expected to contain a tag_type field denoting what
132 object type we work on.
135 kind, name = _ExtractTagsObject(opts, args)
136 _ExtendTags(opts, args)
138 raise errors.OpPrereqError("No tags to be added")
139 op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
143 def RemoveTags(opts, args):
144 """Remove tags from a given object.
146 This is a generic implementation that knows how to deal with all
147 three cases of tag objects (cluster, node, instance). The opts
148 argument is expected to contain a tag_type field denoting what
149 object type we work on.
152 kind, name = _ExtractTagsObject(opts, args)
153 _ExtendTags(opts, args)
155 raise errors.OpPrereqError("No tags to be removed")
156 op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
160 DEBUG_OPT = make_option("-d", "--debug", default=False,
162 help="Turn debugging on")
164 NOHDR_OPT = make_option("--no-headers", default=False,
165 action="store_true", dest="no_headers",
166 help="Don't display column headers")
168 SEP_OPT = make_option("--separator", default=None,
169 action="store", dest="separator",
170 help="Separator between output fields"
171 " (defaults to one space)")
173 USEUNITS_OPT = make_option("--units", default=None,
174 dest="units", choices=('h', 'm', 'g', 't'),
175 help="Specify units for output (one of hmgt)")
177 FIELDS_OPT = make_option("-o", "--output", dest="output", action="store",
178 type="string", help="Comma separated list of"
182 FORCE_OPT = make_option("-f", "--force", dest="force", action="store_true",
183 default=False, help="Force the operation")
185 TAG_SRC_OPT = make_option("--from", dest="tags_source",
186 default=None, help="File with tag names")
188 SUBMIT_OPT = make_option("--submit", dest="submit_only",
189 default=False, action="store_true",
190 help="Submit the job and return the job ID, but"
191 " don't wait for the job to finish")
193 SYNC_OPT = make_option("--sync", dest="do_locking",
194 default=False, action="store_true",
195 help="Grab locks while doing the queries"
196 " in order to ensure more consistent results")
200 """Macro-like function denoting a fixed number of arguments"""
204 def ARGS_ATLEAST(val):
205 """Macro-like function denoting a minimum number of arguments"""
210 ARGS_ONE = ARGS_FIXED(1)
211 ARGS_ANY = ARGS_ATLEAST(0)
214 def check_unit(option, opt, value):
215 """OptParsers custom converter for units.
219 return utils.ParseUnit(value)
220 except errors.UnitParseError, err:
221 raise OptionValueError("option %s: %s" % (opt, err))
224 class CliOption(Option):
225 """Custom option class for optparse.
228 TYPES = Option.TYPES + ("unit",)
229 TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
230 TYPE_CHECKER["unit"] = check_unit
233 def _SplitKeyVal(opt, data):
234 """Convert a KeyVal string into a dict.
236 This function will convert a key=val[,...] string into a dict. Empty
237 values will be converted specially: keys which have the prefix 'no_'
238 will have the value=False and the prefix stripped, the others will
242 @param opt: a string holding the option name for which we process the
243 data, used in building error messages
245 @param data: a string of the format key=val,key=val,...
247 @return: {key=val, key=val}
248 @raises errors.ParameterError: if there are duplicate keys
254 for elem in data.split(","):
256 key, val = elem.split("=", 1)
258 if elem.startswith(NO_PREFIX):
259 key, val = elem[len(NO_PREFIX):], False
260 elif elem.startswith(UN_PREFIX):
261 key, val = elem[len(UN_PREFIX):], None
263 key, val = elem, True
265 raise errors.ParameterError("Duplicate key '%s' in option %s" %
271 def check_ident_key_val(option, opt, value):
272 """Custom parser for the IdentKeyVal option type.
278 ident, rest = value.split(":", 1)
279 kv_dict = _SplitKeyVal(opt, rest)
280 retval = (ident, kv_dict)
284 class IdentKeyValOption(Option):
285 """Custom option class for ident:key=val,key=val options.
287 This will store the parsed values as a tuple (ident, {key: val}). As
288 such, multiple uses of this option via action=append is possible.
291 TYPES = Option.TYPES + ("identkeyval",)
292 TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
293 TYPE_CHECKER["identkeyval"] = check_ident_key_val
296 def check_key_val(option, opt, value):
297 """Custom parser for the KeyVal option type.
300 return _SplitKeyVal(opt, value)
303 class KeyValOption(Option):
304 """Custom option class for key=val,key=val options.
306 This will store the parsed values as a dict {key: val}.
309 TYPES = Option.TYPES + ("keyval",)
310 TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
311 TYPE_CHECKER["keyval"] = check_key_val
314 # optparse.py sets make_option, so we do it for our own option class, too
315 cli_option = CliOption
316 ikv_option = IdentKeyValOption
317 keyval_option = KeyValOption
320 def _ParseArgs(argv, commands, aliases):
321 """Parser for the command line arguments.
323 This function parses the arguements and returns the function which
324 must be executed together with its (modified) arguments.
326 @param argv: the command line
327 @param commands: dictionary with special contents, see the design
328 doc for cmdline handling
329 @param aliases: dictionary with command aliases {'alias': 'target, ...}
335 binary = argv[0].split("/")[-1]
337 if len(argv) > 1 and argv[1] == "--version":
338 print "%s (ganeti) %s" % (binary, constants.RELEASE_VERSION)
339 # Quit right away. That way we don't have to care about this special
340 # argument. optparse.py does it the same.
343 if len(argv) < 2 or not (argv[1] in commands or
345 # let's do a nice thing
346 sortedcmds = commands.keys()
348 print ("Usage: %(bin)s {command} [options...] [argument...]"
349 "\n%(bin)s <command> --help to see details, or"
350 " man %(bin)s\n" % {"bin": binary})
351 # compute the max line length for cmd + usage
352 mlen = max([len(" %s" % cmd) for cmd in commands])
353 mlen = min(60, mlen) # should not get here...
354 # and format a nice command list
356 for cmd in sortedcmds:
357 cmdstr = " %s" % (cmd,)
358 help_text = commands[cmd][4]
359 help_lines = textwrap.wrap(help_text, 79-3-mlen)
360 print "%-*s - %s" % (mlen, cmdstr, help_lines.pop(0))
361 for line in help_lines:
362 print "%-*s %s" % (mlen, "", line)
364 return None, None, None
366 # get command, unalias it, and look it up in commands
370 raise errors.ProgrammerError("Alias '%s' overrides an existing"
373 if aliases[cmd] not in commands:
374 raise errors.ProgrammerError("Alias '%s' maps to non-existing"
375 " command '%s'" % (cmd, aliases[cmd]))
379 func, nargs, parser_opts, usage, description = commands[cmd]
380 parser = OptionParser(option_list=parser_opts,
381 description=description,
382 formatter=TitledHelpFormatter(),
383 usage="%%prog %s %s" % (cmd, usage))
384 parser.disable_interspersed_args()
385 options, args = parser.parse_args()
388 print >> sys.stderr, ("Error: Command %s expects no arguments" % cmd)
389 return None, None, None
390 elif nargs < 0 and len(args) != -nargs:
391 print >> sys.stderr, ("Error: Command %s expects %d argument(s)" %
393 return None, None, None
394 elif nargs >= 0 and len(args) < nargs:
395 print >> sys.stderr, ("Error: Command %s expects at least %d argument(s)" %
397 return None, None, None
399 return func, options, args
402 def SplitNodeOption(value):
403 """Splits the value of a --node option.
406 if value and ':' in value:
407 return value.split(':', 1)
413 def wrapper(*args, **kwargs):
416 return fn(*args, **kwargs)
422 def AskUser(text, choices=None):
423 """Ask the user a question.
425 @param text: the question to ask
427 @param choices: list with elements tuples (input_char, return_value,
428 description); if not given, it will default to: [('y', True,
429 'Perform the operation'), ('n', False, 'Do no do the operation')];
430 note that the '?' char is reserved for help
432 @return: one of the return values from the choices list; if input is
433 not possible (i.e. not running with a tty, we return the last
438 choices = [('y', True, 'Perform the operation'),
439 ('n', False, 'Do not perform the operation')]
440 if not choices or not isinstance(choices, list):
441 raise errors.ProgrammerError("Invalid choiches argument to AskUser")
442 for entry in choices:
443 if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
444 raise errors.ProgrammerError("Invalid choiches element to AskUser")
446 answer = choices[-1][1]
448 for line in text.splitlines():
449 new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
450 text = "\n".join(new_text)
452 f = file("/dev/tty", "a+")
456 chars = [entry[0] for entry in choices]
457 chars[-1] = "[%s]" % chars[-1]
459 maps = dict([(entry[0], entry[1]) for entry in choices])
463 f.write("/".join(chars))
465 line = f.readline(2).strip().lower()
470 for entry in choices:
471 f.write(" %s - %s\n" % (entry[0], entry[2]))
479 class JobSubmittedException(Exception):
480 """Job was submitted, client should exit.
482 This exception has one argument, the ID of the job that was
483 submitted. The handler should print this ID.
485 This is not an error, just a structured way to exit from clients.
490 def SendJob(ops, cl=None):
491 """Function to submit an opcode without waiting for the results.
494 @param ops: list of opcodes
495 @type cl: luxi.Client
496 @param cl: the luxi client to use for communicating with the master;
497 if None, a new client will be created
503 job_id = cl.SubmitJob(ops)
508 def PollJob(job_id, cl=None, feedback_fn=None):
509 """Function to poll for the result of a job.
511 @type job_id: job identified
512 @param job_id: the job to poll for results
513 @type cl: luxi.Client
514 @param cl: the luxi client to use for communicating with the master;
515 if None, a new client will be created
522 prev_logmsg_serial = None
525 result = cl.WaitForJobChange(job_id, ["status"], prev_job_info,
528 # job not found, go away!
529 raise errors.JobLost("Job with id %s lost" % job_id)
531 # Split result, a tuple of (field values, log entries)
532 (job_info, log_entries) = result
533 (status, ) = job_info
536 for log_entry in log_entries:
537 (serial, timestamp, _, message) = log_entry
538 if callable(feedback_fn):
539 feedback_fn(log_entry[1:])
541 encoded = utils.SafeEncode(message)
542 print "%s %s" % (time.ctime(utils.MergeTime(timestamp)), encoded)
543 prev_logmsg_serial = max(prev_logmsg_serial, serial)
545 # TODO: Handle canceled and archived jobs
546 elif status in (constants.JOB_STATUS_SUCCESS,
547 constants.JOB_STATUS_ERROR,
548 constants.JOB_STATUS_CANCELING,
549 constants.JOB_STATUS_CANCELED):
552 prev_job_info = job_info
554 jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
556 raise errors.JobLost("Job with id %s lost" % job_id)
558 status, opstatus, result = jobs[0]
559 if status == constants.JOB_STATUS_SUCCESS:
561 elif status in (constants.JOB_STATUS_CANCELING,
562 constants.JOB_STATUS_CANCELED):
563 raise errors.OpExecError("Job was canceled")
566 for idx, (status, msg) in enumerate(zip(opstatus, result)):
567 if status == constants.OP_STATUS_SUCCESS:
569 elif status == constants.OP_STATUS_ERROR:
571 raise errors.OpExecError("partial failure (opcode %d): %s" %
574 raise errors.OpExecError(str(msg))
575 # default failure mode
576 raise errors.OpExecError(result)
579 def SubmitOpCode(op, cl=None, feedback_fn=None):
580 """Legacy function to submit an opcode.
582 This is just a simple wrapper over the construction of the processor
583 instance. It should be extended to better handle feedback and
584 interaction functions.
590 job_id = SendJob([op], cl)
592 op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
597 def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
598 """Wrapper around SubmitOpCode or SendJob.
600 This function will decide, based on the 'opts' parameter, whether to
601 submit and wait for the result of the opcode (and return it), or
602 whether to just send the job and print its identifier. It is used in
603 order to simplify the implementation of the '--submit' option.
606 if opts and opts.submit_only:
607 job_id = SendJob([op], cl=cl)
608 raise JobSubmittedException(job_id)
610 return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn)
614 # TODO: Cache object?
616 client = luxi.Client()
617 except luxi.NoMasterError:
618 master, myself = ssconf.GetMasterAndMyself()
620 raise errors.OpPrereqError("This is not the master node, please connect"
621 " to node '%s' and rerun the command" %
628 def FormatError(err):
629 """Return a formatted error message for a given error.
631 This function takes an exception instance and returns a tuple
632 consisting of two values: first, the recommended exit code, and
633 second, a string describing the error message (not
640 if isinstance(err, errors.ConfigurationError):
641 txt = "Corrupt configuration file: %s" % msg
643 obuf.write(txt + "\n")
644 obuf.write("Aborting.")
646 elif isinstance(err, errors.HooksAbort):
647 obuf.write("Failure: hooks execution failed:\n")
648 for node, script, out in err.args[0]:
650 obuf.write(" node: %s, script: %s, output: %s\n" %
653 obuf.write(" node: %s, script: %s (no output)\n" %
655 elif isinstance(err, errors.HooksFailure):
656 obuf.write("Failure: hooks general failure: %s" % msg)
657 elif isinstance(err, errors.ResolverError):
658 this_host = utils.HostInfo.SysName()
659 if err.args[0] == this_host:
660 msg = "Failure: can't resolve my own hostname ('%s')"
662 msg = "Failure: can't resolve hostname '%s'"
663 obuf.write(msg % err.args[0])
664 elif isinstance(err, errors.OpPrereqError):
665 obuf.write("Failure: prerequisites not met for this"
666 " operation:\n%s" % msg)
667 elif isinstance(err, errors.OpExecError):
668 obuf.write("Failure: command execution error:\n%s" % msg)
669 elif isinstance(err, errors.TagError):
670 obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
671 elif isinstance(err, errors.JobQueueDrainError):
672 obuf.write("Failure: the job queue is marked for drain and doesn't"
673 " accept new requests\n")
674 elif isinstance(err, errors.JobQueueFull):
675 obuf.write("Failure: the job queue is full and doesn't accept new"
676 " job submissions until old jobs are archived\n")
677 elif isinstance(err, errors.TypeEnforcementError):
678 obuf.write("Parameter Error: %s" % msg)
679 elif isinstance(err, errors.GenericError):
680 obuf.write("Unhandled Ganeti error: %s" % msg)
681 elif isinstance(err, luxi.NoMasterError):
682 obuf.write("Cannot communicate with the master daemon.\nIs it running"
683 " and listening for connections?")
684 elif isinstance(err, luxi.TimeoutError):
685 obuf.write("Timeout while talking to the master daemon. Error:\n"
687 elif isinstance(err, luxi.ProtocolError):
688 obuf.write("Unhandled protocol error while talking to the master daemon:\n"
690 elif isinstance(err, JobSubmittedException):
691 obuf.write("JobID: %s\n" % err.args[0])
694 obuf.write("Unhandled exception: %s" % msg)
695 return retcode, obuf.getvalue().rstrip('\n')
698 def GenericMain(commands, override=None, aliases=None):
699 """Generic main function for all the gnt-* commands.
702 - commands: a dictionary with a special structure, see the design doc
703 for command line handling.
704 - override: if not None, we expect a dictionary with keys that will
705 override command line options; this can be used to pass
706 options from the scripts to generic functions
707 - aliases: dictionary with command aliases {'alias': 'target, ...}
710 # save the program name and the entire command line for later logging
712 binary = os.path.basename(sys.argv[0]) or sys.argv[0]
713 if len(sys.argv) >= 2:
714 binary += " " + sys.argv[1]
715 old_cmdline = " ".join(sys.argv[2:])
719 binary = "<unknown program>"
725 func, options, args = _ParseArgs(sys.argv, commands, aliases)
726 if func is None: # parse error
729 if override is not None:
730 for key, val in override.iteritems():
731 setattr(options, key, val)
733 utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
734 stderr_logging=True, program=binary)
736 utils.debug = options.debug
739 logging.info("run with arguments '%s'", old_cmdline)
741 logging.info("run with no arguments")
744 result = func(options, args)
745 except (errors.GenericError, luxi.ProtocolError,
746 JobSubmittedException), err:
747 result, err_msg = FormatError(err)
748 logging.exception("Error durring command processing")
754 def GenerateTable(headers, fields, separator, data,
755 numfields=None, unitfields=None,
757 """Prints a table with headers and different fields.
760 @param headers: dictionary mapping field names to headers for
763 @param fields: the field names corresponding to each row in
765 @param separator: the separator to be used; if this is None,
766 the default 'smart' algorithm is used which computes optimal
767 field width, otherwise just the separator is used between
770 @param data: a list of lists, each sublist being one row to be output
771 @type numfields: list
772 @param numfields: a list with the fields that hold numeric
773 values and thus should be right-aligned
774 @type unitfields: list
775 @param unitfields: a list with the fields that hold numeric
776 values that should be formatted with the units field
777 @type units: string or None
778 @param units: the units we should use for formatting, or None for
779 automatic choice (human-readable for non-separator usage, otherwise
780 megabytes); this is a one-letter string
789 if numfields is None:
791 if unitfields is None:
794 numfields = utils.FieldSet(*numfields)
795 unitfields = utils.FieldSet(*unitfields)
799 if headers and field not in headers:
800 # TODO: handle better unknown fields (either revert to old
801 # style of raising exception, or deal more intelligently with
803 headers[field] = field
804 if separator is not None:
805 format_fields.append("%s")
806 elif numfields.Matches(field):
807 format_fields.append("%*s")
809 format_fields.append("%-*s")
811 if separator is None:
812 mlens = [0 for name in fields]
813 format = ' '.join(format_fields)
815 format = separator.replace("%", "%%").join(format_fields)
820 for idx, val in enumerate(row):
821 if unitfields.Matches(fields[idx]):
827 val = row[idx] = utils.FormatUnit(val, units)
828 val = row[idx] = str(val)
829 if separator is None:
830 mlens[idx] = max(mlens[idx], len(val))
835 for idx, name in enumerate(fields):
837 if separator is None:
838 mlens[idx] = max(mlens[idx], len(hdr))
839 args.append(mlens[idx])
841 result.append(format % tuple(args))
846 line = ['-' for _ in fields]
847 for idx in xrange(len(fields)):
848 if separator is None:
849 args.append(mlens[idx])
850 args.append(line[idx])
851 result.append(format % tuple(args))
856 def FormatTimestamp(ts):
857 """Formats a given timestamp.
860 @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
863 @return: a string with the formatted timestamp
866 if not isinstance (ts, (tuple, list)) or len(ts) != 2:
869 return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
872 def ParseTimespec(value):
873 """Parse a time specification.
875 The following suffixed will be recognized:
883 Without any suffix, the value will be taken to be in seconds.
888 raise errors.OpPrereqError("Empty time specification passed")
896 if value[-1] not in suffix_map:
900 raise errors.OpPrereqError("Invalid time specification '%s'" % value)
902 multiplier = suffix_map[value[-1]]
904 if not value: # no data left after stripping the suffix
905 raise errors.OpPrereqError("Invalid time specification (only"
908 value = int(value) * multiplier
910 raise errors.OpPrereqError("Invalid time specification '%s'" % value)
914 def GetOnlineNodes(nodes, cl=None, nowarn=False):
915 """Returns the names of online nodes.
917 This function will also log a warning on stderr with the names of
920 @param nodes: if not empty, use only this subset of nodes (minus the
922 @param cl: if not None, luxi client to use
923 @type nowarn: boolean
924 @param nowarn: by default, this function will output a note with the
925 offline nodes that are skipped; if this parameter is True the
926 note is not displayed
932 result = cl.QueryNodes(names=nodes, fields=["name", "offline"],
934 offline = [row[0] for row in result if row[1]]
935 if offline and not nowarn:
936 ToStderr("Note: skipping offline node(s): %s" % ", ".join(offline))
937 return [row[0] for row in result if not row[1]]
940 def _ToStream(stream, txt, *args):
941 """Write a message to a stream, bypassing the logging system
943 @type stream: file object
944 @param stream: the file to which we should write
946 @param txt: the message
951 stream.write(txt % args)
958 def ToStdout(txt, *args):
959 """Write a message to stdout only, bypassing the logging system
961 This is just a wrapper over _ToStream.
964 @param txt: the message
967 _ToStream(sys.stdout, txt, *args)
970 def ToStderr(txt, *args):
971 """Write a message to stderr only, bypassing the logging system
973 This is just a wrapper over _ToStream.
976 @param txt: the message
979 _ToStream(sys.stderr, txt, *args)
982 class JobExecutor(object):
983 """Class which manages the submission and execution of multiple jobs.
985 Note that instances of this class should not be reused between
989 def __init__(self, cl=None, verbose=True):
994 self.verbose = verbose
996 def QueueJob(self, name, *ops):
997 """Submit a job for execution.
1000 @param name: a description of the job, will be used in WaitJobSet
1002 job_id = SendJob(ops, cl=self.cl)
1003 self.queue.append((job_id, name))
1005 def GetResults(self):
1006 """Wait for and return the results of all jobs.
1009 @return: list of tuples (success, job results), in the same order
1010 as the submitted jobs; if a job has failed, instead of the result
1011 there will be the error message
1016 ToStdout("Submitted jobs %s", ", ".join(row[0] for row in self.queue))
1017 for jid, name in self.queue:
1019 ToStdout("Waiting for job %s for %s...", jid, name)
1021 job_result = PollJob(jid, cl=self.cl)
1023 except (errors.GenericError, luxi.ProtocolError), err:
1024 _, job_result = FormatError(err)
1026 # the error message will always be shown, verbose or not
1027 ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
1029 results.append((success, job_result))
1032 def WaitOrShow(self, wait):
1033 """Wait for job results or only print the job IDs.
1036 @param wait: whether to wait or not
1040 return self.GetResults()
1042 for jid, name in self.queue:
1043 ToStdout("%s: %s", jid, name)