job_id = args
return queue.ArchiveJob(job_id)
+ elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
+ (job_id, fields, previous) = args
+ return queue.WaitForJobChanges(job_id, fields, previous)
+
elif method == luxi.REQ_QUERY_JOBS:
(job_ids, fields) = args
return queue.QueryJobs(job_ids, fields)
This is what we use to track the user-submitted jobs.
"""
+ def __new__(cls, *args, **kwargs):
+ obj = object.__new__(cls, *args, **kwargs)
+ # Condition to wait for changes
+ obj.change = threading.Condition()
+ return obj
+
def __init__(self, queue, job_id, ops):
if not ops:
# TODO
finally:
queue.release()
- result = proc.ExecOpCode(input_opcode, op.Log)
+ def _Log(*args):
+ op.Log(*args)
+
+ job.change.acquire()
+ try:
+ job.change.notifyAll()
+ finally:
+ job.change.release()
+
+ result = proc.ExecOpCode(input_opcode, _Log)
queue.acquire()
try:
self._WriteAndReplicateFileUnlocked(filename, data)
self._CleanCacheUnlocked([job.id])
+ # Notify waiters about potential changes
+ job.change.acquire()
+ try:
+ job.change.notifyAll()
+ finally:
+ job.change.release()
+
def _CleanCacheUnlocked(self, exclude):
"""Clean the memory cache.
except KeyError:
pass
+ @_RequireOpenQueue
+ def WaitForJobChanges(self, job_id, fields, previous):
+ logging.debug("Waiting for changes in job %s", job_id)
+
+ while True:
+ self.acquire()
+ try:
+ job = self._LoadJobUnlocked(job_id)
+ if not job:
+ logging.debug("Job %s not found", job_id)
+ new_state = None
+ break
+
+ new_state = self._GetJobInfoUnlocked(job, fields)
+ finally:
+ self.release()
+
+ # Serializing and deserializing data can cause type changes (e.g. from
+ # tuple to list) or precision loss. We're doing it here so that we get
+ # the same modifications as the data received from the client. Without
+ # this, the comparison afterwards might fail without the data being
+ # significantly different.
+ new_state = serializer.LoadJson(serializer.DumpJson(new_state))
+
+ if previous != new_state:
+ break
+
+ job.change.acquire()
+ try:
+ job.change.wait()
+ finally:
+ job.change.release()
+
+ logging.debug("Job %s changed", job_id)
+
+ return new_state
+
@utils.LockedMethod
@_RequireOpenQueue
def CancelJob(self, job_id):
KEY_RESULT = "result"
REQ_SUBMIT_JOB = "SubmitJob"
+REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
REQ_CANCEL_JOB = "CancelJob"
REQ_ARCHIVE_JOB = "ArchiveJob"
REQ_QUERY_JOBS = "QueryJobs"
def ArchiveJob(self, job_id):
return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
+ def WaitForJobChange(self, job_id, fields, previous):
+ return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
+ (job_id, fields, previous))
+
def QueryJobs(self, job_ids, fields):
return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))