RAPI: Allow waiting for job changes
authorMichael Hanselmann <hansmi@google.com>
Tue, 4 May 2010 17:02:54 +0000 (19:02 +0200)
committerMichael Hanselmann <hansmi@google.com>
Tue, 11 May 2010 14:31:37 +0000 (16:31 +0200)
Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: Iustin Pop <iustin@google.com>

doc/rapi.rst
lib/luxi.py
lib/rapi/connector.py
lib/rapi/rlib2.py

index a8a0dc2..21ec481 100644 (file)
@@ -651,6 +651,30 @@ while by a resource we refer to an instance's disk, or NIC, etc.
 
 Cancel a not-yet-started job.
 
+
+``/2/jobs/[job_id]/wait``
++++++++++++++++++++++++++
+
+``GET``
+~~~~~~~
+
+Waits for changes on a job. Takes the following body parameters in a
+dict:
+
+``fields``
+  The job fields on which to watch for changes.
+
+``previous_job_info``
+  Previously received field values or None if not yet available.
+
+``previous_log_serial``
+  Highest log serial number received so far or None if not yet
+  available.
+
+Returns None if no changes have been detected and a dict with two keys,
+``job_info`` and ``log_entries`` otherwise.
+
+
 ``/2/nodes``
 ++++++++++++
 
index a34a237..c967f8e 100644 (file)
@@ -64,6 +64,9 @@ REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
 DEF_CTMO = 10
 DEF_RWTO = 60
 
+# WaitForJobChange timeout
+WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
+
 
 class ProtocolError(Exception):
   """Denotes an error in the server communication"""
@@ -373,11 +376,27 @@ class Client(object):
     return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
 
   def WaitForJobChangeOnce(self, job_id, fields,
-                           prev_job_info, prev_log_serial):
-    timeout = (DEF_RWTO - 1) / 2
+                           prev_job_info, prev_log_serial,
+                           timeout=WFJC_TIMEOUT):
+    """Waits for changes on a job.
+
+    @param job_id: Job ID
+    @type fields: list
+    @param fields: List of field names to be observed
+    @type prev_job_info: None or list
+    @param prev_job_info: Previously received job information
+    @type prev_log_serial: None or int/long
+    @param prev_log_serial: Highest log serial number previously received
+    @type timeout: int/float
+    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
+                    be capped to that value)
+
+    """
+    assert timeout >= 0, "Timeout can not be negative"
     return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
                            (job_id, fields, prev_job_info,
-                            prev_log_serial, timeout))
+                            prev_log_serial,
+                            min(WFJC_TIMEOUT, timeout)))
 
   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
     while True:
index d14d4d6..8e81464 100644 (file)
@@ -209,6 +209,8 @@ def GetHandlers(node_name_pattern, instance_name_pattern, job_id_pattern):
     "/2/jobs": rlib2.R_2_jobs,
     re.compile(r'/2/jobs/(%s)$' % job_id_pattern):
       rlib2.R_2_jobs_id,
+    re.compile(r'/2/jobs/(%s)/wait$' % job_id_pattern):
+      rlib2.R_2_jobs_id_wait,
 
     "/2/tags": rlib2.R_2_tags,
     "/2/info": rlib2.R_2_info,
index 09e883d..2dffc52 100644 (file)
@@ -83,6 +83,9 @@ _NR_MAP = {
   "R": _NR_REGULAR,
   }
 
+# Timeout for /2/jobs/[job_id]/wait. Gives job up to 10 seconds to change.
+_WFJC_TIMEOUT = 10
+
 
 class R_version(baserlib.R_Generic):
   """/version resource.
@@ -211,6 +214,55 @@ class R_2_jobs_id(baserlib.R_Generic):
     return result
 
 
+class R_2_jobs_id_wait(baserlib.R_Generic):
+  """/2/jobs/[job_id]/wait resource.
+
+  """
+  # WaitForJobChange provides access to sensitive information and blocks
+  # machine resources (it's a blocking RAPI call), hence restricting access.
+  GET_ACCESS = [rapi.RAPI_ACCESS_WRITE]
+
+  def GET(self):
+    """Waits for job changes.
+
+    """
+    job_id = self.items[0]
+
+    fields = self.getBodyParameter("fields")
+    prev_job_info = self.getBodyParameter("previous_job_info", None)
+    prev_log_serial = self.getBodyParameter("previous_log_serial", None)
+
+    if not isinstance(fields, list):
+      raise http.HttpBadRequest("The 'fields' parameter should be a list")
+
+    if not (prev_job_info is None or isinstance(prev_job_info, list)):
+      raise http.HttpBadRequest("The 'previous_job_info' parameter should"
+                                " be a list")
+
+    if not (prev_log_serial is None or
+            isinstance(prev_log_serial, (int, long))):
+      raise http.HttpBadRequest("The 'previous_log_serial' parameter should"
+                                " be a number")
+
+    client = baserlib.GetClient()
+    result = client.WaitForJobChangeOnce(job_id, fields,
+                                         prev_job_info, prev_log_serial,
+                                         timeout=_WFJC_TIMEOUT)
+    if not result:
+      raise http.HttpNotFound()
+
+    if result == constants.JOB_NOTCHANGED:
+      # No changes
+      return None
+
+    (job_info, log_entries) = result
+
+    return {
+      "job_info": job_info,
+      "log_entries": log_entries,
+      }
+
+
 class R_2_nodes(baserlib.R_Generic):
   """/2/nodes resource.