Add RPC call to wait for job changes
authorMichael Hanselmann <hansmi@google.com>
Mon, 11 Aug 2008 16:27:45 +0000 (16:27 +0000)
committerMichael Hanselmann <hansmi@google.com>
Mon, 11 Aug 2008 16:27:45 +0000 (16:27 +0000)
This way clients can react faster to status or message changes and
don't have to poll anymore.

Reviewed-by: ultrotter

daemons/ganeti-masterd
lib/jqueue.py
lib/luxi.py

index f240c62..6400e2f 100755 (executable)
@@ -217,6 +217,10 @@ class ClientOps:
       job_id = args
       return queue.ArchiveJob(job_id)
 
+    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
+      (job_id, fields, previous) = args
+      return queue.WaitForJobChanges(job_id, fields, previous)
+
     elif method == luxi.REQ_QUERY_JOBS:
       (job_ids, fields) = args
       return queue.QueryJobs(job_ids, fields)
index 9f9709a..d97f10e 100644 (file)
@@ -121,6 +121,12 @@ class _QueuedJob(object):
   This is what we use to track the user-submitted jobs.
 
   """
+  def __new__(cls, *args, **kwargs):
+    obj = object.__new__(cls, *args, **kwargs)
+    # Condition to wait for changes
+    obj.change = threading.Condition()
+    return obj
+
   def __init__(self, queue, job_id, ops):
     if not ops:
       # TODO
@@ -204,7 +210,16 @@ class _JobQueueWorker(workerpool.BaseWorker):
             finally:
               queue.release()
 
-            result = proc.ExecOpCode(input_opcode, op.Log)
+            def _Log(*args):
+              op.Log(*args)
+
+              job.change.acquire()
+              try:
+                job.change.notifyAll()
+              finally:
+                job.change.release()
+
+            result = proc.ExecOpCode(input_opcode, _Log)
 
             queue.acquire()
             try:
@@ -516,6 +531,13 @@ class JobQueue(object):
     self._WriteAndReplicateFileUnlocked(filename, data)
     self._CleanCacheUnlocked([job.id])
 
+    # Notify waiters about potential changes
+    job.change.acquire()
+    try:
+      job.change.notifyAll()
+    finally:
+      job.change.release()
+
   def _CleanCacheUnlocked(self, exclude):
     """Clean the memory cache.
 
@@ -536,6 +558,43 @@ class JobQueue(object):
         except KeyError:
           pass
 
+  @_RequireOpenQueue
+  def WaitForJobChanges(self, job_id, fields, previous):
+    logging.debug("Waiting for changes in job %s", job_id)
+
+    while True:
+      self.acquire()
+      try:
+        job = self._LoadJobUnlocked(job_id)
+        if not job:
+          logging.debug("Job %s not found", job_id)
+          new_state = None
+          break
+
+        new_state = self._GetJobInfoUnlocked(job, fields)
+      finally:
+        self.release()
+
+      # Serializing and deserializing data can cause type changes (e.g. from
+      # tuple to list) or precision loss. We're doing it here so that we get
+      # the same modifications as the data received from the client. Without
+      # this, the comparison afterwards might fail without the data being
+      # significantly different.
+      new_state = serializer.LoadJson(serializer.DumpJson(new_state))
+
+      if previous != new_state:
+        break
+
+      job.change.acquire()
+      try:
+        job.change.wait()
+      finally:
+        job.change.release()
+
+    logging.debug("Job %s changed", job_id)
+
+    return new_state
+
   @utils.LockedMethod
   @_RequireOpenQueue
   def CancelJob(self, job_id):
index a35adad..6aa4ee9 100644 (file)
@@ -44,6 +44,7 @@ KEY_SUCCESS = "success"
 KEY_RESULT = "result"
 
 REQ_SUBMIT_JOB = "SubmitJob"
+REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
 REQ_CANCEL_JOB = "CancelJob"
 REQ_ARCHIVE_JOB = "ArchiveJob"
 REQ_QUERY_JOBS = "QueryJobs"
@@ -288,6 +289,10 @@ class Client(object):
   def ArchiveJob(self, job_id):
     return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
 
+  def WaitForJobChange(self, job_id, fields, previous):
+    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
+                           (job_id, fields, previous))
+
   def QueryJobs(self, job_ids, fields):
     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))