This patch alters the WaitForJobChanges luxi-RPC call to have a
configurable timeout, so that the call behaves nicely with long jobs
that have no update.
We do this by adding a timeout parameter in the RPC call, and returning
a special constant when the timeout is reached without an update. The
luxi client will repeatedly call the WaitForJobChanges until it gets a
real change. The timeout is hardcoded as half the RWTO value.
The patch also removes an unused variable (new_state) from the
WaitForJobChanges method.
Reviewed-by: imsnah,ultrotter
return queue.ArchiveJob(job_id)
elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
return queue.ArchiveJob(job_id)
elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
- (job_id, fields, prev_job_info, prev_log_serial) = args
+ (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
return queue.WaitForJobChanges(job_id, fields, prev_job_info,
return queue.WaitForJobChanges(job_id, fields, prev_job_info,
+ prev_log_serial, timeout)
elif method == luxi.REQ_QUERY_JOBS:
(job_ids, fields) = args
elif method == luxi.REQ_QUERY_JOBS:
(job_ids, fields) = args
+# unchanged job return
+JOB_NOTCHANGED = "nochange"
+
# Job status
JOB_STATUS_QUEUED = "queued"
JOB_STATUS_RUNNING = "running"
# Job status
JOB_STATUS_QUEUED = "queued"
JOB_STATUS_RUNNING = "running"
@utils.LockedMethod
@_RequireOpenQueue
@utils.LockedMethod
@_RequireOpenQueue
- def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial):
+ def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
+ timeout):
"""Waits for changes in a job.
@type job_id: string
"""Waits for changes in a job.
@type job_id: string
@param prev_job_info: Last job information returned
@type prev_log_serial: int
@param prev_log_serial: Last job message serial number
@param prev_job_info: Last job information returned
@type prev_log_serial: int
@param prev_log_serial: Last job message serial number
+ @type timeout: float
+ @param timeout: maximum time to wait
"""
logging.debug("Waiting for changes in job %s", job_id)
"""
logging.debug("Waiting for changes in job %s", job_id)
+ end_time = time.time() + timeout
+ delta_time = end_time - time.time()
+ if delta_time < 0:
+ return constants.JOB_NOTCHANGED
+
job = self._LoadJobUnlocked(job_id)
if not job:
logging.debug("Job %s not found", job_id)
job = self._LoadJobUnlocked(job_id)
if not job:
logging.debug("Job %s not found", job_id)
break
status = job.CalcStatus()
break
status = job.CalcStatus()
logging.debug("Waiting again")
# Release the queue lock while waiting
logging.debug("Waiting again")
# Release the queue lock while waiting
+ job.change.wait(delta_time)
logging.debug("Job %s changed", job_id)
logging.debug("Job %s changed", job_id)
return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
- return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
- (job_id, fields, prev_job_info, prev_log_serial))
+ timeout = (DEF_RWTO - 1) / 2
+ while True:
+ result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
+ (job_id, fields, prev_job_info,
+ prev_log_serial, timeout))
+ if result != constants.JOB_NOTCHANGED:
+ break
+ return result
def QueryJobs(self, job_ids, fields):
return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
def QueryJobs(self, job_ids, fields):
return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))