Modify cli.JobExecutor to use SubmitManyJobs
authorIustin Pop <iustin@google.com>
Fri, 22 May 2009 10:25:31 +0000 (12:25 +0200)
committerIustin Pop <iustin@google.com>
Sun, 19 Jul 2009 11:53:21 +0000 (13:53 +0200)
This patch changes the generic "multiple job executor" to use the many
jobs submit model, which automatically makes all its users use the new
model.

This makes, for example, startup/shutdown of a full cluster much more
logical (all the submitted job IDs are visible fast, and then waiting
for them proceeds normally).

Signed-off-by: Iustin Pop <iustin@google.com>
Reviewed-by: Guido Trotter <ultrotter@google.com>
(cherry picked from commit 23b4b983afc9b9e81d558f06e4e0cde53703e575)

lib/cli.py

index 03c2ab1..cfa64f2 100644 (file)
@@ -994,15 +994,24 @@ class JobExecutor(object):
       cl = GetClient()
     self.cl = cl
     self.verbose = verbose
+    self.jobs = []
 
   def QueueJob(self, name, *ops):
-    """Submit a job for execution.
+    """Record a job for later submit.
 
     @type name: string
     @param name: a description of the job, will be used in WaitJobSet
     """
-    job_id = SendJob(ops, cl=self.cl)
-    self.queue.append((job_id, name))
+    self.queue.append((name, ops))
+
+
+  def SubmitPending(self):
+    """Submit all pending jobs.
+
+    """
+    results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
+    for ((status, data), (name, _)) in zip(results, self.queue):
+      self.jobs.append((status, data, name))
 
   def GetResults(self):
     """Wait for and return the results of all jobs.
@@ -1013,10 +1022,18 @@ class JobExecutor(object):
         there will be the error message
 
     """
+    if not self.jobs:
+      self.SubmitPending()
     results = []
     if self.verbose:
-      ToStdout("Submitted jobs %s", ", ".join(row[0] for row in self.queue))
-    for jid, name in self.queue:
+      ok_jobs = [row[1] for row in self.jobs if row[0]]
+      if ok_jobs:
+        ToStdout("Submitted jobs %s", ", ".join(ok_jobs))
+    for submit_status, jid, name in self.jobs:
+      if not submit_status:
+        ToStderr("Failed to submit job for %s: %s", name, jid)
+        results.append((False, jid))
+        continue
       if self.verbose:
         ToStdout("Waiting for job %s for %s...", jid, name)
       try:
@@ -1041,5 +1058,10 @@ class JobExecutor(object):
     if wait:
       return self.GetResults()
     else:
-      for jid, name in self.queue:
-        ToStdout("%s: %s", jid, name)
+      if not self.jobs:
+        self.SubmitPending()
+      for status, result, name in self.jobs:
+        if status:
+          ToStdout("%s: %s", result, name)
+        else:
+          ToStderr("Failure for %s: %s", name, result)