4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module dealing with command line parsing"""
31 from cStringIO import StringIO
33 from ganeti import utils
34 from ganeti import errors
35 from ganeti import constants
36 from ganeti import opcodes
37 from ganeti import luxi
38 from ganeti import ssconf
39 from ganeti import rpc
41 from optparse import (OptionParser, make_option, TitledHelpFormatter,
42 Option, OptionValueError)
44 __all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain",
45 "SubmitOpCode", "GetClient",
46 "cli_option", "ikv_option", "keyval_option",
47 "GenerateTable", "AskUser",
48 "ARGS_NONE", "ARGS_FIXED", "ARGS_ATLEAST", "ARGS_ANY", "ARGS_ONE",
49 "USEUNITS_OPT", "FIELDS_OPT", "FORCE_OPT", "SUBMIT_OPT",
50 "ListTags", "AddTags", "RemoveTags", "TAG_SRC_OPT",
51 "FormatError", "SplitNodeOption", "SubmitOrSend",
52 "JobSubmittedException", "FormatTimestamp", "ParseTimespec",
53 "ToStderr", "ToStdout", "UsesRPC",
54 "GetOnlineNodes", "JobExecutor", "SYNC_OPT", "CONFIRM_OPT",
58 def _ExtractTagsObject(opts, args):
59 """Extract the tag type object.
61 Note that this function will modify its args parameter.
64 if not hasattr(opts, "tag_type"):
65 raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
67 if kind == constants.TAG_CLUSTER:
69 elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
71 raise errors.OpPrereqError("no arguments passed to the command")
75 raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
79 def _ExtendTags(opts, args):
80 """Extend the args if a source file has been given.
82 This function will extend the tags with the contents of the file
83 passed in the 'tags_source' attribute of the opts parameter. A file
84 named '-' will be replaced by stdin.
87 fname = opts.tags_source
93 new_fh = open(fname, "r")
96 # we don't use the nice 'new_data = [line.strip() for line in fh]'
97 # because of python bug 1633941
99 line = new_fh.readline()
102 new_data.append(line.strip())
105 args.extend(new_data)
108 def ListTags(opts, args):
109 """List the tags on a given object.
111 This is a generic implementation that knows how to deal with all
112 three cases of tag objects (cluster, node, instance). The opts
113 argument is expected to contain a tag_type field denoting what
114 object type we work on.
117 kind, name = _ExtractTagsObject(opts, args)
118 op = opcodes.OpGetTags(kind=kind, name=name)
119 result = SubmitOpCode(op)
120 result = list(result)
126 def AddTags(opts, args):
127 """Add tags on a given object.
129 This is a generic implementation that knows how to deal with all
130 three cases of tag objects (cluster, node, instance). The opts
131 argument is expected to contain a tag_type field denoting what
132 object type we work on.
135 kind, name = _ExtractTagsObject(opts, args)
136 _ExtendTags(opts, args)
138 raise errors.OpPrereqError("No tags to be added")
139 op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
143 def RemoveTags(opts, args):
144 """Remove tags from a given object.
146 This is a generic implementation that knows how to deal with all
147 three cases of tag objects (cluster, node, instance). The opts
148 argument is expected to contain a tag_type field denoting what
149 object type we work on.
152 kind, name = _ExtractTagsObject(opts, args)
153 _ExtendTags(opts, args)
155 raise errors.OpPrereqError("No tags to be removed")
156 op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
160 DEBUG_OPT = make_option("-d", "--debug", default=False,
162 help="Turn debugging on")
164 NOHDR_OPT = make_option("--no-headers", default=False,
165 action="store_true", dest="no_headers",
166 help="Don't display column headers")
168 SEP_OPT = make_option("--separator", default=None,
169 action="store", dest="separator",
170 help="Separator between output fields"
171 " (defaults to one space)")
173 USEUNITS_OPT = make_option("--units", default=None,
174 dest="units", choices=('h', 'm', 'g', 't'),
175 help="Specify units for output (one of hmgt)")
177 FIELDS_OPT = make_option("-o", "--output", dest="output", action="store",
178 type="string", help="Comma separated list of"
182 FORCE_OPT = make_option("-f", "--force", dest="force", action="store_true",
183 default=False, help="Force the operation")
185 CONFIRM_OPT = make_option("--yes", dest="confirm", action="store_true",
186 default=False, help="Do not require confirmation")
188 TAG_SRC_OPT = make_option("--from", dest="tags_source",
189 default=None, help="File with tag names")
191 SUBMIT_OPT = make_option("--submit", dest="submit_only",
192 default=False, action="store_true",
193 help="Submit the job and return the job ID, but"
194 " don't wait for the job to finish")
196 SYNC_OPT = make_option("--sync", dest="do_locking",
197 default=False, action="store_true",
198 help="Grab locks while doing the queries"
199 " in order to ensure more consistent results")
203 """Macro-like function denoting a fixed number of arguments"""
207 def ARGS_ATLEAST(val):
208 """Macro-like function denoting a minimum number of arguments"""
213 ARGS_ONE = ARGS_FIXED(1)
214 ARGS_ANY = ARGS_ATLEAST(0)
217 def check_unit(option, opt, value):
218 """OptParsers custom converter for units.
222 return utils.ParseUnit(value)
223 except errors.UnitParseError, err:
224 raise OptionValueError("option %s: %s" % (opt, err))
227 class CliOption(Option):
228 """Custom option class for optparse.
231 TYPES = Option.TYPES + ("unit",)
232 TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
233 TYPE_CHECKER["unit"] = check_unit
236 def _SplitKeyVal(opt, data):
237 """Convert a KeyVal string into a dict.
239 This function will convert a key=val[,...] string into a dict. Empty
240 values will be converted specially: keys which have the prefix 'no_'
241 will have the value=False and the prefix stripped, the others will
245 @param opt: a string holding the option name for which we process the
246 data, used in building error messages
248 @param data: a string of the format key=val,key=val,...
250 @return: {key=val, key=val}
251 @raises errors.ParameterError: if there are duplicate keys
257 for elem in data.split(","):
259 key, val = elem.split("=", 1)
261 if elem.startswith(NO_PREFIX):
262 key, val = elem[len(NO_PREFIX):], False
263 elif elem.startswith(UN_PREFIX):
264 key, val = elem[len(UN_PREFIX):], None
266 key, val = elem, True
268 raise errors.ParameterError("Duplicate key '%s' in option %s" %
274 def check_ident_key_val(option, opt, value):
275 """Custom parser for the IdentKeyVal option type.
281 ident, rest = value.split(":", 1)
282 kv_dict = _SplitKeyVal(opt, rest)
283 retval = (ident, kv_dict)
287 class IdentKeyValOption(Option):
288 """Custom option class for ident:key=val,key=val options.
290 This will store the parsed values as a tuple (ident, {key: val}). As
291 such, multiple uses of this option via action=append is possible.
294 TYPES = Option.TYPES + ("identkeyval",)
295 TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
296 TYPE_CHECKER["identkeyval"] = check_ident_key_val
299 def check_key_val(option, opt, value):
300 """Custom parser for the KeyVal option type.
303 return _SplitKeyVal(opt, value)
306 class KeyValOption(Option):
307 """Custom option class for key=val,key=val options.
309 This will store the parsed values as a dict {key: val}.
312 TYPES = Option.TYPES + ("keyval",)
313 TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
314 TYPE_CHECKER["keyval"] = check_key_val
317 # optparse.py sets make_option, so we do it for our own option class, too
318 cli_option = CliOption
319 ikv_option = IdentKeyValOption
320 keyval_option = KeyValOption
323 def _ParseArgs(argv, commands, aliases):
324 """Parser for the command line arguments.
326 This function parses the arguements and returns the function which
327 must be executed together with its (modified) arguments.
329 @param argv: the command line
330 @param commands: dictionary with special contents, see the design
331 doc for cmdline handling
332 @param aliases: dictionary with command aliases {'alias': 'target, ...}
338 binary = argv[0].split("/")[-1]
340 if len(argv) > 1 and argv[1] == "--version":
341 print "%s (ganeti) %s" % (binary, constants.RELEASE_VERSION)
342 # Quit right away. That way we don't have to care about this special
343 # argument. optparse.py does it the same.
346 if len(argv) < 2 or not (argv[1] in commands or
348 # let's do a nice thing
349 sortedcmds = commands.keys()
351 print ("Usage: %(bin)s {command} [options...] [argument...]"
352 "\n%(bin)s <command> --help to see details, or"
353 " man %(bin)s\n" % {"bin": binary})
354 # compute the max line length for cmd + usage
355 mlen = max([len(" %s" % cmd) for cmd in commands])
356 mlen = min(60, mlen) # should not get here...
357 # and format a nice command list
359 for cmd in sortedcmds:
360 cmdstr = " %s" % (cmd,)
361 help_text = commands[cmd][4]
362 help_lines = textwrap.wrap(help_text, 79-3-mlen)
363 print "%-*s - %s" % (mlen, cmdstr, help_lines.pop(0))
364 for line in help_lines:
365 print "%-*s %s" % (mlen, "", line)
367 return None, None, None
369 # get command, unalias it, and look it up in commands
373 raise errors.ProgrammerError("Alias '%s' overrides an existing"
376 if aliases[cmd] not in commands:
377 raise errors.ProgrammerError("Alias '%s' maps to non-existing"
378 " command '%s'" % (cmd, aliases[cmd]))
382 func, nargs, parser_opts, usage, description = commands[cmd]
383 parser = OptionParser(option_list=parser_opts,
384 description=description,
385 formatter=TitledHelpFormatter(),
386 usage="%%prog %s %s" % (cmd, usage))
387 parser.disable_interspersed_args()
388 options, args = parser.parse_args()
391 print >> sys.stderr, ("Error: Command %s expects no arguments" % cmd)
392 return None, None, None
393 elif nargs < 0 and len(args) != -nargs:
394 print >> sys.stderr, ("Error: Command %s expects %d argument(s)" %
396 return None, None, None
397 elif nargs >= 0 and len(args) < nargs:
398 print >> sys.stderr, ("Error: Command %s expects at least %d argument(s)" %
400 return None, None, None
402 return func, options, args
405 def SplitNodeOption(value):
406 """Splits the value of a --node option.
409 if value and ':' in value:
410 return value.split(':', 1)
416 def wrapper(*args, **kwargs):
419 return fn(*args, **kwargs)
425 def AskUser(text, choices=None):
426 """Ask the user a question.
428 @param text: the question to ask
430 @param choices: list with elements tuples (input_char, return_value,
431 description); if not given, it will default to: [('y', True,
432 'Perform the operation'), ('n', False, 'Do no do the operation')];
433 note that the '?' char is reserved for help
435 @return: one of the return values from the choices list; if input is
436 not possible (i.e. not running with a tty, we return the last
441 choices = [('y', True, 'Perform the operation'),
442 ('n', False, 'Do not perform the operation')]
443 if not choices or not isinstance(choices, list):
444 raise errors.ProgrammerError("Invalid choiches argument to AskUser")
445 for entry in choices:
446 if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
447 raise errors.ProgrammerError("Invalid choiches element to AskUser")
449 answer = choices[-1][1]
451 for line in text.splitlines():
452 new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
453 text = "\n".join(new_text)
455 f = file("/dev/tty", "a+")
459 chars = [entry[0] for entry in choices]
460 chars[-1] = "[%s]" % chars[-1]
462 maps = dict([(entry[0], entry[1]) for entry in choices])
466 f.write("/".join(chars))
468 line = f.readline(2).strip().lower()
473 for entry in choices:
474 f.write(" %s - %s\n" % (entry[0], entry[2]))
482 class JobSubmittedException(Exception):
483 """Job was submitted, client should exit.
485 This exception has one argument, the ID of the job that was
486 submitted. The handler should print this ID.
488 This is not an error, just a structured way to exit from clients.
493 def SendJob(ops, cl=None):
494 """Function to submit an opcode without waiting for the results.
497 @param ops: list of opcodes
498 @type cl: luxi.Client
499 @param cl: the luxi client to use for communicating with the master;
500 if None, a new client will be created
506 job_id = cl.SubmitJob(ops)
511 def PollJob(job_id, cl=None, feedback_fn=None):
512 """Function to poll for the result of a job.
514 @type job_id: job identified
515 @param job_id: the job to poll for results
516 @type cl: luxi.Client
517 @param cl: the luxi client to use for communicating with the master;
518 if None, a new client will be created
525 prev_logmsg_serial = None
528 result = cl.WaitForJobChange(job_id, ["status"], prev_job_info,
531 # job not found, go away!
532 raise errors.JobLost("Job with id %s lost" % job_id)
534 # Split result, a tuple of (field values, log entries)
535 (job_info, log_entries) = result
536 (status, ) = job_info
539 for log_entry in log_entries:
540 (serial, timestamp, _, message) = log_entry
541 if callable(feedback_fn):
542 feedback_fn(log_entry[1:])
544 encoded = utils.SafeEncode(message)
545 print "%s %s" % (time.ctime(utils.MergeTime(timestamp)), encoded)
546 prev_logmsg_serial = max(prev_logmsg_serial, serial)
548 # TODO: Handle canceled and archived jobs
549 elif status in (constants.JOB_STATUS_SUCCESS,
550 constants.JOB_STATUS_ERROR,
551 constants.JOB_STATUS_CANCELING,
552 constants.JOB_STATUS_CANCELED):
555 prev_job_info = job_info
557 jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
559 raise errors.JobLost("Job with id %s lost" % job_id)
561 status, opstatus, result = jobs[0]
562 if status == constants.JOB_STATUS_SUCCESS:
564 elif status in (constants.JOB_STATUS_CANCELING,
565 constants.JOB_STATUS_CANCELED):
566 raise errors.OpExecError("Job was canceled")
569 for idx, (status, msg) in enumerate(zip(opstatus, result)):
570 if status == constants.OP_STATUS_SUCCESS:
572 elif status == constants.OP_STATUS_ERROR:
574 raise errors.OpExecError("partial failure (opcode %d): %s" %
577 raise errors.OpExecError(str(msg))
578 # default failure mode
579 raise errors.OpExecError(result)
582 def SubmitOpCode(op, cl=None, feedback_fn=None):
583 """Legacy function to submit an opcode.
585 This is just a simple wrapper over the construction of the processor
586 instance. It should be extended to better handle feedback and
587 interaction functions.
593 job_id = SendJob([op], cl)
595 op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
600 def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
601 """Wrapper around SubmitOpCode or SendJob.
603 This function will decide, based on the 'opts' parameter, whether to
604 submit and wait for the result of the opcode (and return it), or
605 whether to just send the job and print its identifier. It is used in
606 order to simplify the implementation of the '--submit' option.
609 if opts and opts.submit_only:
610 job_id = SendJob([op], cl=cl)
611 raise JobSubmittedException(job_id)
613 return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn)
617 # TODO: Cache object?
619 client = luxi.Client()
620 except luxi.NoMasterError:
621 master, myself = ssconf.GetMasterAndMyself()
623 raise errors.OpPrereqError("This is not the master node, please connect"
624 " to node '%s' and rerun the command" %
631 def FormatError(err):
632 """Return a formatted error message for a given error.
634 This function takes an exception instance and returns a tuple
635 consisting of two values: first, the recommended exit code, and
636 second, a string describing the error message (not
643 if isinstance(err, errors.ConfigurationError):
644 txt = "Corrupt configuration file: %s" % msg
646 obuf.write(txt + "\n")
647 obuf.write("Aborting.")
649 elif isinstance(err, errors.HooksAbort):
650 obuf.write("Failure: hooks execution failed:\n")
651 for node, script, out in err.args[0]:
653 obuf.write(" node: %s, script: %s, output: %s\n" %
656 obuf.write(" node: %s, script: %s (no output)\n" %
658 elif isinstance(err, errors.HooksFailure):
659 obuf.write("Failure: hooks general failure: %s" % msg)
660 elif isinstance(err, errors.ResolverError):
661 this_host = utils.HostInfo.SysName()
662 if err.args[0] == this_host:
663 msg = "Failure: can't resolve my own hostname ('%s')"
665 msg = "Failure: can't resolve hostname '%s'"
666 obuf.write(msg % err.args[0])
667 elif isinstance(err, errors.OpPrereqError):
668 obuf.write("Failure: prerequisites not met for this"
669 " operation:\n%s" % msg)
670 elif isinstance(err, errors.OpExecError):
671 obuf.write("Failure: command execution error:\n%s" % msg)
672 elif isinstance(err, errors.TagError):
673 obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
674 elif isinstance(err, errors.JobQueueDrainError):
675 obuf.write("Failure: the job queue is marked for drain and doesn't"
676 " accept new requests\n")
677 elif isinstance(err, errors.JobQueueFull):
678 obuf.write("Failure: the job queue is full and doesn't accept new"
679 " job submissions until old jobs are archived\n")
680 elif isinstance(err, errors.TypeEnforcementError):
681 obuf.write("Parameter Error: %s" % msg)
682 elif isinstance(err, errors.GenericError):
683 obuf.write("Unhandled Ganeti error: %s" % msg)
684 elif isinstance(err, luxi.NoMasterError):
685 obuf.write("Cannot communicate with the master daemon.\nIs it running"
686 " and listening for connections?")
687 elif isinstance(err, luxi.TimeoutError):
688 obuf.write("Timeout while talking to the master daemon. Error:\n"
690 elif isinstance(err, luxi.ProtocolError):
691 obuf.write("Unhandled protocol error while talking to the master daemon:\n"
693 elif isinstance(err, JobSubmittedException):
694 obuf.write("JobID: %s\n" % err.args[0])
697 obuf.write("Unhandled exception: %s" % msg)
698 return retcode, obuf.getvalue().rstrip('\n')
701 def GenericMain(commands, override=None, aliases=None):
702 """Generic main function for all the gnt-* commands.
705 - commands: a dictionary with a special structure, see the design doc
706 for command line handling.
707 - override: if not None, we expect a dictionary with keys that will
708 override command line options; this can be used to pass
709 options from the scripts to generic functions
710 - aliases: dictionary with command aliases {'alias': 'target, ...}
713 # save the program name and the entire command line for later logging
715 binary = os.path.basename(sys.argv[0]) or sys.argv[0]
716 if len(sys.argv) >= 2:
717 binary += " " + sys.argv[1]
718 old_cmdline = " ".join(sys.argv[2:])
722 binary = "<unknown program>"
728 func, options, args = _ParseArgs(sys.argv, commands, aliases)
729 if func is None: # parse error
732 if override is not None:
733 for key, val in override.iteritems():
734 setattr(options, key, val)
736 utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
737 stderr_logging=True, program=binary)
739 utils.debug = options.debug
742 logging.info("run with arguments '%s'", old_cmdline)
744 logging.info("run with no arguments")
747 result = func(options, args)
748 except (errors.GenericError, luxi.ProtocolError,
749 JobSubmittedException), err:
750 result, err_msg = FormatError(err)
751 logging.exception("Error durring command processing")
757 def GenerateTable(headers, fields, separator, data,
758 numfields=None, unitfields=None,
760 """Prints a table with headers and different fields.
763 @param headers: dictionary mapping field names to headers for
766 @param fields: the field names corresponding to each row in
768 @param separator: the separator to be used; if this is None,
769 the default 'smart' algorithm is used which computes optimal
770 field width, otherwise just the separator is used between
773 @param data: a list of lists, each sublist being one row to be output
774 @type numfields: list
775 @param numfields: a list with the fields that hold numeric
776 values and thus should be right-aligned
777 @type unitfields: list
778 @param unitfields: a list with the fields that hold numeric
779 values that should be formatted with the units field
780 @type units: string or None
781 @param units: the units we should use for formatting, or None for
782 automatic choice (human-readable for non-separator usage, otherwise
783 megabytes); this is a one-letter string
792 if numfields is None:
794 if unitfields is None:
797 numfields = utils.FieldSet(*numfields)
798 unitfields = utils.FieldSet(*unitfields)
802 if headers and field not in headers:
803 # TODO: handle better unknown fields (either revert to old
804 # style of raising exception, or deal more intelligently with
806 headers[field] = field
807 if separator is not None:
808 format_fields.append("%s")
809 elif numfields.Matches(field):
810 format_fields.append("%*s")
812 format_fields.append("%-*s")
814 if separator is None:
815 mlens = [0 for name in fields]
816 format = ' '.join(format_fields)
818 format = separator.replace("%", "%%").join(format_fields)
823 for idx, val in enumerate(row):
824 if unitfields.Matches(fields[idx]):
830 val = row[idx] = utils.FormatUnit(val, units)
831 val = row[idx] = str(val)
832 if separator is None:
833 mlens[idx] = max(mlens[idx], len(val))
838 for idx, name in enumerate(fields):
840 if separator is None:
841 mlens[idx] = max(mlens[idx], len(hdr))
842 args.append(mlens[idx])
844 result.append(format % tuple(args))
849 line = ['-' for _ in fields]
850 for idx in xrange(len(fields)):
851 if separator is None:
852 args.append(mlens[idx])
853 args.append(line[idx])
854 result.append(format % tuple(args))
859 def FormatTimestamp(ts):
860 """Formats a given timestamp.
863 @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
866 @return: a string with the formatted timestamp
869 if not isinstance (ts, (tuple, list)) or len(ts) != 2:
872 return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
875 def ParseTimespec(value):
876 """Parse a time specification.
878 The following suffixed will be recognized:
886 Without any suffix, the value will be taken to be in seconds.
891 raise errors.OpPrereqError("Empty time specification passed")
899 if value[-1] not in suffix_map:
903 raise errors.OpPrereqError("Invalid time specification '%s'" % value)
905 multiplier = suffix_map[value[-1]]
907 if not value: # no data left after stripping the suffix
908 raise errors.OpPrereqError("Invalid time specification (only"
911 value = int(value) * multiplier
913 raise errors.OpPrereqError("Invalid time specification '%s'" % value)
917 def GetOnlineNodes(nodes, cl=None, nowarn=False):
918 """Returns the names of online nodes.
920 This function will also log a warning on stderr with the names of
923 @param nodes: if not empty, use only this subset of nodes (minus the
925 @param cl: if not None, luxi client to use
926 @type nowarn: boolean
927 @param nowarn: by default, this function will output a note with the
928 offline nodes that are skipped; if this parameter is True the
929 note is not displayed
935 result = cl.QueryNodes(names=nodes, fields=["name", "offline"],
937 offline = [row[0] for row in result if row[1]]
938 if offline and not nowarn:
939 ToStderr("Note: skipping offline node(s): %s" % ", ".join(offline))
940 return [row[0] for row in result if not row[1]]
943 def _ToStream(stream, txt, *args):
944 """Write a message to a stream, bypassing the logging system
946 @type stream: file object
947 @param stream: the file to which we should write
949 @param txt: the message
954 stream.write(txt % args)
961 def ToStdout(txt, *args):
962 """Write a message to stdout only, bypassing the logging system
964 This is just a wrapper over _ToStream.
967 @param txt: the message
970 _ToStream(sys.stdout, txt, *args)
973 def ToStderr(txt, *args):
974 """Write a message to stderr only, bypassing the logging system
976 This is just a wrapper over _ToStream.
979 @param txt: the message
982 _ToStream(sys.stderr, txt, *args)
985 class JobExecutor(object):
986 """Class which manages the submission and execution of multiple jobs.
988 Note that instances of this class should not be reused between
992 def __init__(self, cl=None, verbose=True):
997 self.verbose = verbose
1000 def QueueJob(self, name, *ops):
1001 """Record a job for later submit.
1004 @param name: a description of the job, will be used in WaitJobSet
1006 self.queue.append((name, ops))
1009 def SubmitPending(self):
1010 """Submit all pending jobs.
1013 results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
1014 for ((status, data), (name, _)) in zip(results, self.queue):
1015 self.jobs.append((status, data, name))
1017 def GetResults(self):
1018 """Wait for and return the results of all jobs.
1021 @return: list of tuples (success, job results), in the same order
1022 as the submitted jobs; if a job has failed, instead of the result
1023 there will be the error message
1027 self.SubmitPending()
1030 ok_jobs = [row[1] for row in self.jobs if row[0]]
1032 ToStdout("Submitted jobs %s", ", ".join(ok_jobs))
1033 for submit_status, jid, name in self.jobs:
1034 if not submit_status:
1035 ToStderr("Failed to submit job for %s: %s", name, jid)
1036 results.append((False, jid))
1039 ToStdout("Waiting for job %s for %s...", jid, name)
1041 job_result = PollJob(jid, cl=self.cl)
1043 except (errors.GenericError, luxi.ProtocolError), err:
1044 _, job_result = FormatError(err)
1046 # the error message will always be shown, verbose or not
1047 ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
1049 results.append((success, job_result))
1052 def WaitOrShow(self, wait):
1053 """Wait for job results or only print the job IDs.
1056 @param wait: whether to wait or not
1060 return self.GetResults()
1063 self.SubmitPending()
1064 for status, result, name in self.jobs:
1066 ToStdout("%s: %s", result, name)
1068 ToStderr("Failure for %s: %s", name, result)