+
+
+class JobExecutor(object):
+ """Class which manages the submission and execution of multiple jobs.
+
+ Note that instances of this class should not be reused between
+ GetResults() calls.
+
+ """
+ def __init__(self, cl=None, verbose=True):
+ self.queue = []
+ if cl is None:
+ cl = GetClient()
+ self.cl = cl
+ self.verbose = verbose
+ self.jobs = []
+
+ def QueueJob(self, name, *ops):
+ """Record a job for later submit.
+
+ @type name: string
+ @param name: a description of the job, will be used in WaitJobSet
+ """
+ self.queue.append((name, ops))
+
+
+ def SubmitPending(self):
+ """Submit all pending jobs.
+
+ """
+ results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
+ for ((status, data), (name, _)) in zip(results, self.queue):
+ self.jobs.append((status, data, name))
+
+ def GetResults(self):
+ """Wait for and return the results of all jobs.
+
+ @rtype: list
+ @return: list of tuples (success, job results), in the same order
+ as the submitted jobs; if a job has failed, instead of the result
+ there will be the error message
+
+ """
+ if not self.jobs:
+ self.SubmitPending()
+ results = []
+ if self.verbose:
+ ok_jobs = [row[1] for row in self.jobs if row[0]]
+ if ok_jobs:
+ ToStdout("Submitted jobs %s", ", ".join(ok_jobs))
+ for submit_status, jid, name in self.jobs:
+ if not submit_status:
+ ToStderr("Failed to submit job for %s: %s", name, jid)
+ results.append((False, jid))
+ continue
+ if self.verbose:
+ ToStdout("Waiting for job %s for %s...", jid, name)
+ try:
+ job_result = PollJob(jid, cl=self.cl)
+ success = True
+ except (errors.GenericError, luxi.ProtocolError), err:
+ _, job_result = FormatError(err)
+ success = False
+ # the error message will always be shown, verbose or not
+ ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
+
+ results.append((success, job_result))
+ return results
+
+ def WaitOrShow(self, wait):
+ """Wait for job results or only print the job IDs.
+
+ @type wait: boolean
+ @param wait: whether to wait or not
+
+ """
+ if wait:
+ return self.GetResults()
+ else:
+ if not self.jobs:
+ self.SubmitPending()
+ for status, result, name in self.jobs:
+ if status:
+ ToStdout("%s: %s", result, name)
+ else:
+ ToStderr("Failure for %s: %s", name, result)