Cancel a not-yet-started job.
+
+``/2/jobs/[job_id]/wait``
++++++++++++++++++++++++++
+
+``GET``
+~~~~~~~
+
+Waits for changes on a job. Takes the following body parameters in a
+dict:
+
+``fields``
+ The job fields on which to watch for changes.
+
+``previous_job_info``
+ Previously received field values or None if not yet available.
+
+``previous_log_serial``
+ Highest log serial number received so far or None if not yet
+ available.
+
+Returns None if no changes have been detected and a dict with two keys,
+``job_info`` and ``log_entries`` otherwise.
+
+
``/2/nodes``
++++++++++++
DEF_CTMO = 10
DEF_RWTO = 60
+# WaitForJobChange timeout
+WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
+
class ProtocolError(Exception):
"""Denotes an error in the server communication"""
return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
def WaitForJobChangeOnce(self, job_id, fields,
- prev_job_info, prev_log_serial):
- timeout = (DEF_RWTO - 1) / 2
+ prev_job_info, prev_log_serial,
+ timeout=WFJC_TIMEOUT):
+ """Waits for changes on a job.
+
+ @param job_id: Job ID
+ @type fields: list
+ @param fields: List of field names to be observed
+ @type prev_job_info: None or list
+ @param prev_job_info: Previously received job information
+ @type prev_log_serial: None or int/long
+ @param prev_log_serial: Highest log serial number previously received
+ @type timeout: int/float
+ @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
+ be capped to that value)
+
+ """
+ assert timeout >= 0, "Timeout can not be negative"
return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
(job_id, fields, prev_job_info,
- prev_log_serial, timeout))
+ prev_log_serial,
+ min(WFJC_TIMEOUT, timeout)))
def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
while True:
"/2/jobs": rlib2.R_2_jobs,
re.compile(r'/2/jobs/(%s)$' % job_id_pattern):
rlib2.R_2_jobs_id,
+ re.compile(r'/2/jobs/(%s)/wait$' % job_id_pattern):
+ rlib2.R_2_jobs_id_wait,
"/2/tags": rlib2.R_2_tags,
"/2/info": rlib2.R_2_info,
"R": _NR_REGULAR,
}
+# Timeout for /2/jobs/[job_id]/wait. Gives job up to 10 seconds to change.
+_WFJC_TIMEOUT = 10
+
class R_version(baserlib.R_Generic):
"""/version resource.
return result
+class R_2_jobs_id_wait(baserlib.R_Generic):
+ """/2/jobs/[job_id]/wait resource.
+
+ """
+ # WaitForJobChange provides access to sensitive information and blocks
+ # machine resources (it's a blocking RAPI call), hence restricting access.
+ GET_ACCESS = [rapi.RAPI_ACCESS_WRITE]
+
+ def GET(self):
+ """Waits for job changes.
+
+ """
+ job_id = self.items[0]
+
+ fields = self.getBodyParameter("fields")
+ prev_job_info = self.getBodyParameter("previous_job_info", None)
+ prev_log_serial = self.getBodyParameter("previous_log_serial", None)
+
+ if not isinstance(fields, list):
+ raise http.HttpBadRequest("The 'fields' parameter should be a list")
+
+ if not (prev_job_info is None or isinstance(prev_job_info, list)):
+ raise http.HttpBadRequest("The 'previous_job_info' parameter should"
+ " be a list")
+
+ if not (prev_log_serial is None or
+ isinstance(prev_log_serial, (int, long))):
+ raise http.HttpBadRequest("The 'previous_log_serial' parameter should"
+ " be a number")
+
+ client = baserlib.GetClient()
+ result = client.WaitForJobChangeOnce(job_id, fields,
+ prev_job_info, prev_log_serial,
+ timeout=_WFJC_TIMEOUT)
+ if not result:
+ raise http.HttpNotFound()
+
+ if result == constants.JOB_NOTCHANGED:
+ # No changes
+ return None
+
+ (job_info, log_entries) = result
+
+ return {
+ "job_info": job_info,
+ "log_entries": log_entries,
+ }
+
+
class R_2_nodes(baserlib.R_Generic):
"""/2/nodes resource.