cli.JobExecutor: poll jobs in execution order
authorIustin Pop <iustin@google.com>
Thu, 11 Mar 2010 12:35:38 +0000 (13:35 +0100)
committerIustin Pop <iustin@google.com>
Thu, 11 Mar 2010 15:12:06 +0000 (16:12 +0100)
… rather than submission order. The results are still returned in the
submission order, and for this we needed to track internally the index
of the submission.

Signed-off-by: Iustin Pop <iustin@google.com>
Reviewed-by: Guido Trotter <ultrotter@google.com>

lib/cli.py

index 5f8a9ed..6ccf4d9 100644 (file)
@@ -1813,8 +1813,31 @@ class JobExecutor(object):
 
     """
     results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
-    for ((status, data), (name, _)) in zip(results, self.queue):
-      self.jobs.append((status, data, name))
+    for (idx, ((status, data), (name, _))) in enumerate(zip(results,
+                                                            self.queue)):
+      self.jobs.append((idx, status, data, name))
+
+  def _ChooseJob(self):
+    """Choose a non-waiting/queued job to poll next.
+
+    """
+    assert self.jobs, "_ChooseJob called with empty job list"
+
+    result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
+    assert result
+
+    for job_data, status in zip(self.jobs, result):
+      if status[0] in (constants.JOB_STATUS_QUEUED,
+                    constants.JOB_STATUS_WAITLOCK,
+                    constants.JOB_STATUS_CANCELING):
+        # job is still waiting
+        continue
+      # good candidate found
+      self.jobs.remove(job_data)
+      return job_data
+
+    # no job found
+    return self.jobs.pop(0)
 
   def GetResults(self):
     """Wait for and return the results of all jobs.
@@ -1829,16 +1852,19 @@ class JobExecutor(object):
       self.SubmitPending()
     results = []
     if self.verbose:
-      ok_jobs = [row[1] for row in self.jobs if row[0]]
+      ok_jobs = [row[2] for row in self.jobs if row[1]]
       if ok_jobs:
         ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
-    for submit_status, jid, name in self.jobs:
-      if not submit_status:
+
+    # first, remove any non-submitted jobs
+    self.jobs, failures = utils.partition(self.jobs, lambda x: x[1])
+    for idx, _, jid, name in failures:
         ToStderr("Failed to submit job for %s: %s", name, jid)
-        results.append((False, jid))
-        continue
-      if self.verbose:
-        ToStdout("Waiting for job %s for %s...", jid, name)
+        results.append((idx, False, jid))
+
+    while self.jobs:
+      (idx, _, jid, name) = self._ChooseJob()
+      ToStdout("Waiting for job %s for %s...", jid, name)
       try:
         job_result = PollJob(jid, cl=self.cl)
         success = True
@@ -1848,7 +1874,12 @@ class JobExecutor(object):
         # the error message will always be shown, verbose or not
         ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
 
-      results.append((success, job_result))
+      results.append((idx, success, job_result))
+
+    # sort based on the index, then drop it
+    results.sort()
+    results = [i[1:] for i in results]
+
     return results
 
   def WaitOrShow(self, wait):