4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module dealing with command line parsing"""
31 from cStringIO import StringIO
33 from ganeti import utils
34 from ganeti import errors
35 from ganeti import constants
36 from ganeti import opcodes
37 from ganeti import luxi
38 from ganeti import ssconf
39 from ganeti import rpc
41 from optparse import (OptionParser, make_option, TitledHelpFormatter,
42 Option, OptionValueError)
45 __all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain",
46 "SubmitOpCode", "GetClient",
47 "cli_option", "ikv_option", "keyval_option",
48 "GenerateTable", "AskUser",
49 "ARGS_NONE", "ARGS_FIXED", "ARGS_ATLEAST", "ARGS_ANY", "ARGS_ONE",
50 "USEUNITS_OPT", "FIELDS_OPT", "FORCE_OPT", "SUBMIT_OPT",
51 "ListTags", "AddTags", "RemoveTags", "TAG_SRC_OPT",
52 "FormatError", "SplitNodeOption", "SubmitOrSend",
53 "JobSubmittedException", "FormatTimestamp", "ParseTimespec",
54 "ToStderr", "ToStdout", "UsesRPC",
55 "GetOnlineNodes", "JobExecutor", "SYNC_OPT",
60 def _ExtractTagsObject(opts, args):
61 """Extract the tag type object.
63 Note that this function will modify its args parameter.
66 if not hasattr(opts, "tag_type"):
67 raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
69 if kind == constants.TAG_CLUSTER:
71 elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
73 raise errors.OpPrereqError("no arguments passed to the command")
77 raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
81 def _ExtendTags(opts, args):
82 """Extend the args if a source file has been given.
84 This function will extend the tags with the contents of the file
85 passed in the 'tags_source' attribute of the opts parameter. A file
86 named '-' will be replaced by stdin.
89 fname = opts.tags_source
95 new_fh = open(fname, "r")
98 # we don't use the nice 'new_data = [line.strip() for line in fh]'
99 # because of python bug 1633941
101 line = new_fh.readline()
104 new_data.append(line.strip())
107 args.extend(new_data)
110 def ListTags(opts, args):
111 """List the tags on a given object.
113 This is a generic implementation that knows how to deal with all
114 three cases of tag objects (cluster, node, instance). The opts
115 argument is expected to contain a tag_type field denoting what
116 object type we work on.
119 kind, name = _ExtractTagsObject(opts, args)
120 op = opcodes.OpGetTags(kind=kind, name=name)
121 result = SubmitOpCode(op)
122 result = list(result)
128 def AddTags(opts, args):
129 """Add tags on a given object.
131 This is a generic implementation that knows how to deal with all
132 three cases of tag objects (cluster, node, instance). The opts
133 argument is expected to contain a tag_type field denoting what
134 object type we work on.
137 kind, name = _ExtractTagsObject(opts, args)
138 _ExtendTags(opts, args)
140 raise errors.OpPrereqError("No tags to be added")
141 op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
145 def RemoveTags(opts, args):
146 """Remove tags from a given object.
148 This is a generic implementation that knows how to deal with all
149 three cases of tag objects (cluster, node, instance). The opts
150 argument is expected to contain a tag_type field denoting what
151 object type we work on.
154 kind, name = _ExtractTagsObject(opts, args)
155 _ExtendTags(opts, args)
157 raise errors.OpPrereqError("No tags to be removed")
158 op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
162 DEBUG_OPT = make_option("-d", "--debug", default=False,
164 help="Turn debugging on")
166 NOHDR_OPT = make_option("--no-headers", default=False,
167 action="store_true", dest="no_headers",
168 help="Don't display column headers")
170 SEP_OPT = make_option("--separator", default=None,
171 action="store", dest="separator",
172 help="Separator between output fields"
173 " (defaults to one space)")
175 USEUNITS_OPT = make_option("--units", default=None,
176 dest="units", choices=('h', 'm', 'g', 't'),
177 help="Specify units for output (one of hmgt)")
179 FIELDS_OPT = make_option("-o", "--output", dest="output", action="store",
180 type="string", help="Comma separated list of"
184 FORCE_OPT = make_option("-f", "--force", dest="force", action="store_true",
185 default=False, help="Force the operation")
187 TAG_SRC_OPT = make_option("--from", dest="tags_source",
188 default=None, help="File with tag names")
190 SUBMIT_OPT = make_option("--submit", dest="submit_only",
191 default=False, action="store_true",
192 help="Submit the job and return the job ID, but"
193 " don't wait for the job to finish")
195 SYNC_OPT = make_option("--sync", dest="do_locking",
196 default=False, action="store_true",
197 help="Grab locks while doing the queries"
198 " in order to ensure more consistent results")
202 """Macro-like function denoting a fixed number of arguments"""
206 def ARGS_ATLEAST(val):
207 """Macro-like function denoting a minimum number of arguments"""
212 ARGS_ONE = ARGS_FIXED(1)
213 ARGS_ANY = ARGS_ATLEAST(0)
216 def check_unit(option, opt, value):
217 """OptParsers custom converter for units.
221 return utils.ParseUnit(value)
222 except errors.UnitParseError, err:
223 raise OptionValueError("option %s: %s" % (opt, err))
226 class CliOption(Option):
227 """Custom option class for optparse.
230 TYPES = Option.TYPES + ("unit",)
231 TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
232 TYPE_CHECKER["unit"] = check_unit
235 def _SplitKeyVal(opt, data):
236 """Convert a KeyVal string into a dict.
238 This function will convert a key=val[,...] string into a dict. Empty
239 values will be converted specially: keys which have the prefix 'no_'
240 will have the value=False and the prefix stripped, the others will
244 @param opt: a string holding the option name for which we process the
245 data, used in building error messages
247 @param data: a string of the format key=val,key=val,...
249 @return: {key=val, key=val}
250 @raises errors.ParameterError: if there are duplicate keys
256 for elem in data.split(","):
258 key, val = elem.split("=", 1)
260 if elem.startswith(NO_PREFIX):
261 key, val = elem[len(NO_PREFIX):], False
262 elif elem.startswith(UN_PREFIX):
263 key, val = elem[len(UN_PREFIX):], None
265 key, val = elem, True
267 raise errors.ParameterError("Duplicate key '%s' in option %s" %
273 def check_ident_key_val(option, opt, value):
274 """Custom parser for the IdentKeyVal option type.
280 ident, rest = value.split(":", 1)
281 kv_dict = _SplitKeyVal(opt, rest)
282 retval = (ident, kv_dict)
286 class IdentKeyValOption(Option):
287 """Custom option class for ident:key=val,key=val options.
289 This will store the parsed values as a tuple (ident, {key: val}). As
290 such, multiple uses of this option via action=append is possible.
293 TYPES = Option.TYPES + ("identkeyval",)
294 TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
295 TYPE_CHECKER["identkeyval"] = check_ident_key_val
298 def check_key_val(option, opt, value):
299 """Custom parser for the KeyVal option type.
302 return _SplitKeyVal(opt, value)
305 class KeyValOption(Option):
306 """Custom option class for key=val,key=val options.
308 This will store the parsed values as a dict {key: val}.
311 TYPES = Option.TYPES + ("keyval",)
312 TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
313 TYPE_CHECKER["keyval"] = check_key_val
316 # optparse.py sets make_option, so we do it for our own option class, too
317 cli_option = CliOption
318 ikv_option = IdentKeyValOption
319 keyval_option = KeyValOption
322 def _ParseArgs(argv, commands, aliases):
323 """Parser for the command line arguments.
325 This function parses the arguments and returns the function which
326 must be executed together with its (modified) arguments.
328 @param argv: the command line
329 @param commands: dictionary with special contents, see the design
330 doc for cmdline handling
331 @param aliases: dictionary with command aliases {'alias': 'target, ...}
337 binary = argv[0].split("/")[-1]
339 if len(argv) > 1 and argv[1] == "--version":
340 ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
341 # Quit right away. That way we don't have to care about this special
342 # argument. optparse.py does it the same.
345 if len(argv) < 2 or not (argv[1] in commands or
347 # let's do a nice thing
348 sortedcmds = commands.keys()
351 ToStdout("Usage: %s {command} [options...] [argument...]", binary)
352 ToStdout("%s <command> --help to see details, or man %s", binary, binary)
355 # compute the max line length for cmd + usage
356 mlen = max([len(" %s" % cmd) for cmd in commands])
357 mlen = min(60, mlen) # should not get here...
359 # and format a nice command list
360 ToStdout("Commands:")
361 for cmd in sortedcmds:
362 cmdstr = " %s" % (cmd,)
363 help_text = commands[cmd][4]
364 help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
365 ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
366 for line in help_lines:
367 ToStdout("%-*s %s", mlen, "", line)
371 return None, None, None
373 # get command, unalias it, and look it up in commands
377 raise errors.ProgrammerError("Alias '%s' overrides an existing"
380 if aliases[cmd] not in commands:
381 raise errors.ProgrammerError("Alias '%s' maps to non-existing"
382 " command '%s'" % (cmd, aliases[cmd]))
386 func, nargs, parser_opts, usage, description = commands[cmd]
387 parser = OptionParser(option_list=parser_opts,
388 description=description,
389 formatter=TitledHelpFormatter(),
390 usage="%%prog %s %s" % (cmd, usage))
391 parser.disable_interspersed_args()
392 options, args = parser.parse_args()
395 ToStderr("Error: Command %s expects no arguments", cmd)
396 return None, None, None
397 elif nargs < 0 and len(args) != -nargs:
398 ToStderr("Error: Command %s expects %d argument(s)", cmd, -nargs)
399 return None, None, None
400 elif nargs >= 0 and len(args) < nargs:
401 ToStderr("Error: Command %s expects at least %d argument(s)", cmd, nargs)
402 return None, None, None
404 return func, options, args
407 def SplitNodeOption(value):
408 """Splits the value of a --node option.
411 if value and ':' in value:
412 return value.split(':', 1)
418 def wrapper(*args, **kwargs):
421 return fn(*args, **kwargs)
427 def AskUser(text, choices=None):
428 """Ask the user a question.
430 @param text: the question to ask
432 @param choices: list with elements tuples (input_char, return_value,
433 description); if not given, it will default to: [('y', True,
434 'Perform the operation'), ('n', False, 'Do no do the operation')];
435 note that the '?' char is reserved for help
437 @return: one of the return values from the choices list; if input is
438 not possible (i.e. not running with a tty, we return the last
443 choices = [('y', True, 'Perform the operation'),
444 ('n', False, 'Do not perform the operation')]
445 if not choices or not isinstance(choices, list):
446 raise errors.ProgrammerError("Invalid choices argument to AskUser")
447 for entry in choices:
448 if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
449 raise errors.ProgrammerError("Invalid choices element to AskUser")
451 answer = choices[-1][1]
453 for line in text.splitlines():
454 new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
455 text = "\n".join(new_text)
457 f = file("/dev/tty", "a+")
461 chars = [entry[0] for entry in choices]
462 chars[-1] = "[%s]" % chars[-1]
464 maps = dict([(entry[0], entry[1]) for entry in choices])
468 f.write("/".join(chars))
470 line = f.readline(2).strip().lower()
475 for entry in choices:
476 f.write(" %s - %s\n" % (entry[0], entry[2]))
484 class JobSubmittedException(Exception):
485 """Job was submitted, client should exit.
487 This exception has one argument, the ID of the job that was
488 submitted. The handler should print this ID.
490 This is not an error, just a structured way to exit from clients.
495 def SendJob(ops, cl=None):
496 """Function to submit an opcode without waiting for the results.
499 @param ops: list of opcodes
500 @type cl: luxi.Client
501 @param cl: the luxi client to use for communicating with the master;
502 if None, a new client will be created
508 job_id = cl.SubmitJob(ops)
513 def PollJob(job_id, cl=None, feedback_fn=None):
514 """Function to poll for the result of a job.
516 @type job_id: job identified
517 @param job_id: the job to poll for results
518 @type cl: luxi.Client
519 @param cl: the luxi client to use for communicating with the master;
520 if None, a new client will be created
527 prev_logmsg_serial = None
530 result = cl.WaitForJobChange(job_id, ["status"], prev_job_info,
533 # job not found, go away!
534 raise errors.JobLost("Job with id %s lost" % job_id)
536 # Split result, a tuple of (field values, log entries)
537 (job_info, log_entries) = result
538 (status, ) = job_info
541 for log_entry in log_entries:
542 (serial, timestamp, _, message) = log_entry
543 if callable(feedback_fn):
544 feedback_fn(log_entry[1:])
546 encoded = utils.SafeEncode(message)
547 ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded)
548 prev_logmsg_serial = max(prev_logmsg_serial, serial)
550 # TODO: Handle canceled and archived jobs
551 elif status in (constants.JOB_STATUS_SUCCESS,
552 constants.JOB_STATUS_ERROR,
553 constants.JOB_STATUS_CANCELING,
554 constants.JOB_STATUS_CANCELED):
557 prev_job_info = job_info
559 jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
561 raise errors.JobLost("Job with id %s lost" % job_id)
563 status, opstatus, result = jobs[0]
564 if status == constants.JOB_STATUS_SUCCESS:
566 elif status in (constants.JOB_STATUS_CANCELING,
567 constants.JOB_STATUS_CANCELED):
568 raise errors.OpExecError("Job was canceled")
571 for idx, (status, msg) in enumerate(zip(opstatus, result)):
572 if status == constants.OP_STATUS_SUCCESS:
574 elif status == constants.OP_STATUS_ERROR:
576 raise errors.OpExecError("partial failure (opcode %d): %s" %
579 raise errors.OpExecError(str(msg))
580 # default failure mode
581 raise errors.OpExecError(result)
584 def SubmitOpCode(op, cl=None, feedback_fn=None):
585 """Legacy function to submit an opcode.
587 This is just a simple wrapper over the construction of the processor
588 instance. It should be extended to better handle feedback and
589 interaction functions.
595 job_id = SendJob([op], cl)
597 op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
602 def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
603 """Wrapper around SubmitOpCode or SendJob.
605 This function will decide, based on the 'opts' parameter, whether to
606 submit and wait for the result of the opcode (and return it), or
607 whether to just send the job and print its identifier. It is used in
608 order to simplify the implementation of the '--submit' option.
611 if opts and opts.submit_only:
612 job_id = SendJob([op], cl=cl)
613 raise JobSubmittedException(job_id)
615 return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn)
619 # TODO: Cache object?
621 client = luxi.Client()
622 except luxi.NoMasterError:
623 master, myself = ssconf.GetMasterAndMyself()
625 raise errors.OpPrereqError("This is not the master node, please connect"
626 " to node '%s' and rerun the command" %
633 def FormatError(err):
634 """Return a formatted error message for a given error.
636 This function takes an exception instance and returns a tuple
637 consisting of two values: first, the recommended exit code, and
638 second, a string describing the error message (not
645 if isinstance(err, errors.ConfigurationError):
646 txt = "Corrupt configuration file: %s" % msg
648 obuf.write(txt + "\n")
649 obuf.write("Aborting.")
651 elif isinstance(err, errors.HooksAbort):
652 obuf.write("Failure: hooks execution failed:\n")
653 for node, script, out in err.args[0]:
655 obuf.write(" node: %s, script: %s, output: %s\n" %
658 obuf.write(" node: %s, script: %s (no output)\n" %
660 elif isinstance(err, errors.HooksFailure):
661 obuf.write("Failure: hooks general failure: %s" % msg)
662 elif isinstance(err, errors.ResolverError):
663 this_host = utils.HostInfo.SysName()
664 if err.args[0] == this_host:
665 msg = "Failure: can't resolve my own hostname ('%s')"
667 msg = "Failure: can't resolve hostname '%s'"
668 obuf.write(msg % err.args[0])
669 elif isinstance(err, errors.OpPrereqError):
670 obuf.write("Failure: prerequisites not met for this"
671 " operation:\n%s" % msg)
672 elif isinstance(err, errors.OpExecError):
673 obuf.write("Failure: command execution error:\n%s" % msg)
674 elif isinstance(err, errors.TagError):
675 obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
676 elif isinstance(err, errors.JobQueueDrainError):
677 obuf.write("Failure: the job queue is marked for drain and doesn't"
678 " accept new requests\n")
679 elif isinstance(err, errors.JobQueueFull):
680 obuf.write("Failure: the job queue is full and doesn't accept new"
681 " job submissions until old jobs are archived\n")
682 elif isinstance(err, errors.TypeEnforcementError):
683 obuf.write("Parameter Error: %s" % msg)
684 elif isinstance(err, errors.ParameterError):
685 obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
686 elif isinstance(err, errors.GenericError):
687 obuf.write("Unhandled Ganeti error: %s" % msg)
688 elif isinstance(err, luxi.NoMasterError):
689 obuf.write("Cannot communicate with the master daemon.\nIs it running"
690 " and listening for connections?")
691 elif isinstance(err, luxi.TimeoutError):
692 obuf.write("Timeout while talking to the master daemon. Error:\n"
694 elif isinstance(err, luxi.ProtocolError):
695 obuf.write("Unhandled protocol error while talking to the master daemon:\n"
697 elif isinstance(err, JobSubmittedException):
698 obuf.write("JobID: %s\n" % err.args[0])
701 obuf.write("Unhandled exception: %s" % msg)
702 return retcode, obuf.getvalue().rstrip('\n')
705 def GenericMain(commands, override=None, aliases=None):
706 """Generic main function for all the gnt-* commands.
709 - commands: a dictionary with a special structure, see the design doc
710 for command line handling.
711 - override: if not None, we expect a dictionary with keys that will
712 override command line options; this can be used to pass
713 options from the scripts to generic functions
714 - aliases: dictionary with command aliases {'alias': 'target, ...}
717 # save the program name and the entire command line for later logging
719 binary = os.path.basename(sys.argv[0]) or sys.argv[0]
720 if len(sys.argv) >= 2:
721 binary += " " + sys.argv[1]
722 old_cmdline = " ".join(sys.argv[2:])
726 binary = "<unknown program>"
732 func, options, args = _ParseArgs(sys.argv, commands, aliases)
733 if func is None: # parse error
736 if override is not None:
737 for key, val in override.iteritems():
738 setattr(options, key, val)
740 utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
741 stderr_logging=True, program=binary)
744 logging.info("run with arguments '%s'", old_cmdline)
746 logging.info("run with no arguments")
749 result = func(options, args)
750 except (errors.GenericError, luxi.ProtocolError,
751 JobSubmittedException), err:
752 result, err_msg = FormatError(err)
753 logging.exception("Error during command processing")
759 def GenerateTable(headers, fields, separator, data,
760 numfields=None, unitfields=None,
762 """Prints a table with headers and different fields.
765 @param headers: dictionary mapping field names to headers for
768 @param fields: the field names corresponding to each row in
770 @param separator: the separator to be used; if this is None,
771 the default 'smart' algorithm is used which computes optimal
772 field width, otherwise just the separator is used between
775 @param data: a list of lists, each sublist being one row to be output
776 @type numfields: list
777 @param numfields: a list with the fields that hold numeric
778 values and thus should be right-aligned
779 @type unitfields: list
780 @param unitfields: a list with the fields that hold numeric
781 values that should be formatted with the units field
782 @type units: string or None
783 @param units: the units we should use for formatting, or None for
784 automatic choice (human-readable for non-separator usage, otherwise
785 megabytes); this is a one-letter string
794 if numfields is None:
796 if unitfields is None:
799 numfields = utils.FieldSet(*numfields)
800 unitfields = utils.FieldSet(*unitfields)
804 if headers and field not in headers:
805 # TODO: handle better unknown fields (either revert to old
806 # style of raising exception, or deal more intelligently with
808 headers[field] = field
809 if separator is not None:
810 format_fields.append("%s")
811 elif numfields.Matches(field):
812 format_fields.append("%*s")
814 format_fields.append("%-*s")
816 if separator is None:
817 mlens = [0 for name in fields]
818 format = ' '.join(format_fields)
820 format = separator.replace("%", "%%").join(format_fields)
825 for idx, val in enumerate(row):
826 if unitfields.Matches(fields[idx]):
832 val = row[idx] = utils.FormatUnit(val, units)
833 val = row[idx] = str(val)
834 if separator is None:
835 mlens[idx] = max(mlens[idx], len(val))
840 for idx, name in enumerate(fields):
842 if separator is None:
843 mlens[idx] = max(mlens[idx], len(hdr))
844 args.append(mlens[idx])
846 result.append(format % tuple(args))
851 line = ['-' for _ in fields]
852 for idx in xrange(len(fields)):
853 if separator is None:
854 args.append(mlens[idx])
855 args.append(line[idx])
856 result.append(format % tuple(args))
861 def FormatTimestamp(ts):
862 """Formats a given timestamp.
865 @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
868 @return: a string with the formatted timestamp
871 if not isinstance (ts, (tuple, list)) or len(ts) != 2:
874 return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
877 def ParseTimespec(value):
878 """Parse a time specification.
880 The following suffixed will be recognized:
888 Without any suffix, the value will be taken to be in seconds.
893 raise errors.OpPrereqError("Empty time specification passed")
901 if value[-1] not in suffix_map:
905 raise errors.OpPrereqError("Invalid time specification '%s'" % value)
907 multiplier = suffix_map[value[-1]]
909 if not value: # no data left after stripping the suffix
910 raise errors.OpPrereqError("Invalid time specification (only"
913 value = int(value) * multiplier
915 raise errors.OpPrereqError("Invalid time specification '%s'" % value)
919 def GetOnlineNodes(nodes, cl=None, nowarn=False):
920 """Returns the names of online nodes.
922 This function will also log a warning on stderr with the names of
925 @param nodes: if not empty, use only this subset of nodes (minus the
927 @param cl: if not None, luxi client to use
928 @type nowarn: boolean
929 @param nowarn: by default, this function will output a note with the
930 offline nodes that are skipped; if this parameter is True the
931 note is not displayed
937 result = cl.QueryNodes(names=nodes, fields=["name", "offline"],
939 offline = [row[0] for row in result if row[1]]
940 if offline and not nowarn:
941 ToStderr("Note: skipping offline node(s): %s" % ", ".join(offline))
942 return [row[0] for row in result if not row[1]]
945 def _ToStream(stream, txt, *args):
946 """Write a message to a stream, bypassing the logging system
948 @type stream: file object
949 @param stream: the file to which we should write
951 @param txt: the message
956 stream.write(txt % args)
963 def ToStdout(txt, *args):
964 """Write a message to stdout only, bypassing the logging system
966 This is just a wrapper over _ToStream.
969 @param txt: the message
972 _ToStream(sys.stdout, txt, *args)
975 def ToStderr(txt, *args):
976 """Write a message to stderr only, bypassing the logging system
978 This is just a wrapper over _ToStream.
981 @param txt: the message
984 _ToStream(sys.stderr, txt, *args)
987 class JobExecutor(object):
988 """Class which manages the submission and execution of multiple jobs.
990 Note that instances of this class should not be reused between
994 def __init__(self, cl=None, verbose=True):
999 self.verbose = verbose
1002 def QueueJob(self, name, *ops):
1003 """Record a job for later submit.
1006 @param name: a description of the job, will be used in WaitJobSet
1008 self.queue.append((name, ops))
1010 def SubmitPending(self):
1011 """Submit all pending jobs.
1014 results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
1015 for ((status, data), (name, _)) in zip(results, self.queue):
1016 self.jobs.append((status, data, name))
1018 def GetResults(self):
1019 """Wait for and return the results of all jobs.
1022 @return: list of tuples (success, job results), in the same order
1023 as the submitted jobs; if a job has failed, instead of the result
1024 there will be the error message
1028 self.SubmitPending()
1031 ok_jobs = [row[1] for row in self.jobs if row[0]]
1033 ToStdout("Submitted jobs %s", ", ".join(ok_jobs))
1034 for submit_status, jid, name in self.jobs:
1035 if not submit_status:
1036 ToStderr("Failed to submit job for %s: %s", name, jid)
1037 results.append((False, jid))
1040 ToStdout("Waiting for job %s for %s...", jid, name)
1042 job_result = PollJob(jid, cl=self.cl)
1044 except (errors.GenericError, luxi.ProtocolError), err:
1045 _, job_result = FormatError(err)
1047 # the error message will always be shown, verbose or not
1048 ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
1050 results.append((success, job_result))
1053 def WaitOrShow(self, wait):
1054 """Wait for job results or only print the job IDs.
1057 @param wait: whether to wait or not
1061 return self.GetResults()
1064 self.SubmitPending()
1065 for status, result, name in self.jobs:
1067 ToStdout("%s: %s", result, name)
1069 ToStderr("Failure for %s: %s", name, result)