121 |
121 |
This is what we use to track the user-submitted jobs.
|
122 |
122 |
|
123 |
123 |
"""
|
|
124 |
def __new__(cls, *args, **kwargs):
|
|
125 |
obj = object.__new__(cls, *args, **kwargs)
|
|
126 |
# Condition to wait for changes
|
|
127 |
obj.change = threading.Condition()
|
|
128 |
return obj
|
|
129 |
|
124 |
130 |
def __init__(self, queue, job_id, ops):
|
125 |
131 |
if not ops:
|
126 |
132 |
# TODO
|
... | ... | |
204 |
210 |
finally:
|
205 |
211 |
queue.release()
|
206 |
212 |
|
207 |
|
result = proc.ExecOpCode(input_opcode, op.Log)
|
|
213 |
def _Log(*args):
|
|
214 |
op.Log(*args)
|
|
215 |
|
|
216 |
job.change.acquire()
|
|
217 |
try:
|
|
218 |
job.change.notifyAll()
|
|
219 |
finally:
|
|
220 |
job.change.release()
|
|
221 |
|
|
222 |
result = proc.ExecOpCode(input_opcode, _Log)
|
208 |
223 |
|
209 |
224 |
queue.acquire()
|
210 |
225 |
try:
|
... | ... | |
516 |
531 |
self._WriteAndReplicateFileUnlocked(filename, data)
|
517 |
532 |
self._CleanCacheUnlocked([job.id])
|
518 |
533 |
|
|
534 |
# Notify waiters about potential changes
|
|
535 |
job.change.acquire()
|
|
536 |
try:
|
|
537 |
job.change.notifyAll()
|
|
538 |
finally:
|
|
539 |
job.change.release()
|
|
540 |
|
519 |
541 |
def _CleanCacheUnlocked(self, exclude):
|
520 |
542 |
"""Clean the memory cache.
|
521 |
543 |
|
... | ... | |
536 |
558 |
except KeyError:
|
537 |
559 |
pass
|
538 |
560 |
|
|
561 |
@_RequireOpenQueue
|
|
562 |
def WaitForJobChanges(self, job_id, fields, previous):
|
|
563 |
logging.debug("Waiting for changes in job %s", job_id)
|
|
564 |
|
|
565 |
while True:
|
|
566 |
self.acquire()
|
|
567 |
try:
|
|
568 |
job = self._LoadJobUnlocked(job_id)
|
|
569 |
if not job:
|
|
570 |
logging.debug("Job %s not found", job_id)
|
|
571 |
new_state = None
|
|
572 |
break
|
|
573 |
|
|
574 |
new_state = self._GetJobInfoUnlocked(job, fields)
|
|
575 |
finally:
|
|
576 |
self.release()
|
|
577 |
|
|
578 |
# Serializing and deserializing data can cause type changes (e.g. from
|
|
579 |
# tuple to list) or precision loss. We're doing it here so that we get
|
|
580 |
# the same modifications as the data received from the client. Without
|
|
581 |
# this, the comparison afterwards might fail without the data being
|
|
582 |
# significantly different.
|
|
583 |
new_state = serializer.LoadJson(serializer.DumpJson(new_state))
|
|
584 |
|
|
585 |
if previous != new_state:
|
|
586 |
break
|
|
587 |
|
|
588 |
job.change.acquire()
|
|
589 |
try:
|
|
590 |
job.change.wait()
|
|
591 |
finally:
|
|
592 |
job.change.release()
|
|
593 |
|
|
594 |
logging.debug("Job %s changed", job_id)
|
|
595 |
|
|
596 |
return new_state
|
|
597 |
|
539 |
598 |
@utils.LockedMethod
|
540 |
599 |
@_RequireOpenQueue
|
541 |
600 |
def CancelJob(self, job_id):
|