return queue.ArchiveJob(job_id)
elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
- (job_id, fields, prev_job_info, prev_log_serial) = args
+ (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
return queue.WaitForJobChanges(job_id, fields, prev_job_info,
- prev_log_serial)
+ prev_log_serial, timeout)
elif method == luxi.REQ_QUERY_JOBS:
(job_ids, fields) = args
JOB_ID_TEMPLATE = r"\d+"
+# unchanged job return
+JOB_NOTCHANGED = "nochange"
+
# Job status
JOB_STATUS_QUEUED = "queued"
JOB_STATUS_RUNNING = "running"
@utils.LockedMethod
@_RequireOpenQueue
- def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial):
+ def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
+ timeout):
"""Waits for changes in a job.
@type job_id: string
@param prev_job_info: Last job information returned
@type prev_log_serial: int
@param prev_log_serial: Last job message serial number
+ @type timeout: float
+ @param timeout: maximum time to wait
"""
logging.debug("Waiting for changes in job %s", job_id)
-
+ end_time = time.time() + timeout
while True:
+ delta_time = end_time - time.time()
+ if delta_time < 0:
+ return constants.JOB_NOTCHANGED
+
job = self._LoadJobUnlocked(job_id)
if not job:
logging.debug("Job %s not found", job_id)
- new_state = None
break
status = job.CalcStatus()
logging.debug("Waiting again")
# Release the queue lock while waiting
- job.change.wait()
+ job.change.wait(delta_time)
logging.debug("Job %s changed", job_id)
return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
- return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
- (job_id, fields, prev_job_info, prev_log_serial))
+ timeout = (DEF_RWTO - 1) / 2
+ while True:
+ result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
+ (job_id, fields, prev_job_info,
+ prev_log_serial, timeout))
+ if result != constants.JOB_NOTCHANGED:
+ break
+ return result
def QueryJobs(self, job_ids, fields):
return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))