Make WaitForJobChanges deal with long jobs
authorIustin Pop <iustin@google.com>
Fri, 29 Aug 2008 13:42:23 +0000 (13:42 +0000)
committerIustin Pop <iustin@google.com>
Fri, 29 Aug 2008 13:42:23 +0000 (13:42 +0000)
This patch alters the WaitForJobChanges luxi-RPC call to have a
configurable timeout, so that the call behaves nicely with long jobs
that have no update.

We do this by adding a timeout parameter in the RPC call, and returning
a special constant when the timeout is reached without an update. The
luxi client will repeatedly call the WaitForJobChanges until it gets a
real change. The timeout is hardcoded as half the RWTO value.

The patch also removes an unused variable (new_state) from the
WaitForJobChanges method.

Reviewed-by: imsnah,ultrotter

daemons/ganeti-masterd
lib/constants.py
lib/jqueue.py
lib/luxi.py

index d6ef907..6df6aec 100755 (executable)
@@ -217,9 +217,9 @@ class ClientOps:
       return queue.ArchiveJob(job_id)
 
     elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
-      (job_id, fields, prev_job_info, prev_log_serial) = args
+      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
       return queue.WaitForJobChanges(job_id, fields, prev_job_info,
-                                     prev_log_serial)
+                                     prev_log_serial, timeout)
 
     elif method == luxi.REQ_QUERY_JOBS:
       (job_ids, fields) = args
index 2d1ea8f..08e5ba0 100644 (file)
@@ -279,6 +279,9 @@ JOB_QUEUE_ARCHIVE_DIR = QUEUE_DIR + "/archive"
 
 JOB_ID_TEMPLATE = r"\d+"
 
+# unchanged job return
+JOB_NOTCHANGED = "nochange"
+
 # Job status
 JOB_STATUS_QUEUED = "queued"
 JOB_STATUS_RUNNING = "running"
index 517e4b5..edc69b6 100644 (file)
@@ -558,7 +558,8 @@ class JobQueue(object):
 
   @utils.LockedMethod
   @_RequireOpenQueue
-  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial):
+  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
+                        timeout):
     """Waits for changes in a job.
 
     @type job_id: string
@@ -569,15 +570,20 @@ class JobQueue(object):
     @param prev_job_info: Last job information returned
     @type prev_log_serial: int
     @param prev_log_serial: Last job message serial number
+    @type timeout: float
+    @param timeout: maximum time to wait
 
     """
     logging.debug("Waiting for changes in job %s", job_id)
-
+    end_time = time.time() + timeout
     while True:
+      delta_time = end_time - time.time()
+      if delta_time < 0:
+        return constants.JOB_NOTCHANGED
+
       job = self._LoadJobUnlocked(job_id)
       if not job:
         logging.debug("Job %s not found", job_id)
-        new_state = None
         break
 
       status = job.CalcStatus()
@@ -605,7 +611,7 @@ class JobQueue(object):
       logging.debug("Waiting again")
 
       # Release the queue lock while waiting
-      job.change.wait()
+      job.change.wait(delta_time)
 
     logging.debug("Job %s changed", job_id)
 
index 5c48dd5..f04aee7 100644 (file)
@@ -290,8 +290,14 @@ class Client(object):
     return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
 
   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
-    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
-                           (job_id, fields, prev_job_info, prev_log_serial))
+    timeout = (DEF_RWTO - 1) / 2
+    while True:
+      result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
+                               (job_id, fields, prev_job_info,
+                                prev_log_serial, timeout))
+      if result != constants.JOB_NOTCHANGED:
+        break
+    return result
 
   def QueryJobs(self, job_ids, fields):
     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))