Revision dfe57c22 lib/jqueue.py
b/lib/jqueue.py | ||
---|---|---|
121 | 121 |
This is what we use to track the user-submitted jobs. |
122 | 122 |
|
123 | 123 |
""" |
124 |
def __new__(cls, *args, **kwargs): |
|
125 |
obj = object.__new__(cls, *args, **kwargs) |
|
126 |
# Condition to wait for changes |
|
127 |
obj.change = threading.Condition() |
|
128 |
return obj |
|
129 |
|
|
124 | 130 |
def __init__(self, queue, job_id, ops): |
125 | 131 |
if not ops: |
126 | 132 |
# TODO |
... | ... | |
204 | 210 |
finally: |
205 | 211 |
queue.release() |
206 | 212 |
|
207 |
result = proc.ExecOpCode(input_opcode, op.Log) |
|
213 |
def _Log(*args): |
|
214 |
op.Log(*args) |
|
215 |
|
|
216 |
job.change.acquire() |
|
217 |
try: |
|
218 |
job.change.notifyAll() |
|
219 |
finally: |
|
220 |
job.change.release() |
|
221 |
|
|
222 |
result = proc.ExecOpCode(input_opcode, _Log) |
|
208 | 223 |
|
209 | 224 |
queue.acquire() |
210 | 225 |
try: |
... | ... | |
516 | 531 |
self._WriteAndReplicateFileUnlocked(filename, data) |
517 | 532 |
self._CleanCacheUnlocked([job.id]) |
518 | 533 |
|
534 |
# Notify waiters about potential changes |
|
535 |
job.change.acquire() |
|
536 |
try: |
|
537 |
job.change.notifyAll() |
|
538 |
finally: |
|
539 |
job.change.release() |
|
540 |
|
|
519 | 541 |
def _CleanCacheUnlocked(self, exclude): |
520 | 542 |
"""Clean the memory cache. |
521 | 543 |
|
... | ... | |
536 | 558 |
except KeyError: |
537 | 559 |
pass |
538 | 560 |
|
561 |
@_RequireOpenQueue |
|
562 |
def WaitForJobChanges(self, job_id, fields, previous): |
|
563 |
logging.debug("Waiting for changes in job %s", job_id) |
|
564 |
|
|
565 |
while True: |
|
566 |
self.acquire() |
|
567 |
try: |
|
568 |
job = self._LoadJobUnlocked(job_id) |
|
569 |
if not job: |
|
570 |
logging.debug("Job %s not found", job_id) |
|
571 |
new_state = None |
|
572 |
break |
|
573 |
|
|
574 |
new_state = self._GetJobInfoUnlocked(job, fields) |
|
575 |
finally: |
|
576 |
self.release() |
|
577 |
|
|
578 |
# Serializing and deserializing data can cause type changes (e.g. from |
|
579 |
# tuple to list) or precision loss. We're doing it here so that we get |
|
580 |
# the same modifications as the data received from the client. Without |
|
581 |
# this, the comparison afterwards might fail without the data being |
|
582 |
# significantly different. |
|
583 |
new_state = serializer.LoadJson(serializer.DumpJson(new_state)) |
|
584 |
|
|
585 |
if previous != new_state: |
|
586 |
break |
|
587 |
|
|
588 |
job.change.acquire() |
|
589 |
try: |
|
590 |
job.change.wait() |
|
591 |
finally: |
|
592 |
job.change.release() |
|
593 |
|
|
594 |
logging.debug("Job %s changed", job_id) |
|
595 |
|
|
596 |
return new_state |
|
597 |
|
|
539 | 598 |
@utils.LockedMethod |
540 | 599 |
@_RequireOpenQueue |
541 | 600 |
def CancelJob(self, job_id): |
Also available in: Unified diff