queue.release()
+def _EvaluateJobProcessorResult(depmgr, job, result):
+ """Looks at a result from L{_JobProcessor} for a job.
+
+ To be used in a L{_JobQueueWorker}.
+
+ """
+ if result == _JobProcessor.FINISHED:
+ # Notify waiting jobs
+ depmgr.NotifyWaiters(job.id)
+
+ elif result == _JobProcessor.DEFER:
+ # Schedule again
+ raise workerpool.DeferTask(priority=job.CalcPriority())
+
+ elif result == _JobProcessor.WAITDEP:
+ # No-op, dependency manager will re-schedule
+ pass
+
+ else:
+ raise errors.ProgrammerError("Job processor returned unknown status %s" %
+ (result, ))
+
+
class _JobQueueWorker(workerpool.BaseWorker):
"""The actual job workers.
wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
proc.ExecOpCode)
- result = _JobProcessor(queue, wrap_execop_fn, job)()
-
- if result == _JobProcessor.FINISHED:
- # Notify waiting jobs
- queue.depmgr.NotifyWaiters(job.id)
-
- elif result == _JobProcessor.DEFER:
- # Schedule again
- raise workerpool.DeferTask(priority=job.CalcPriority())
-
- elif result == _JobProcessor.WAITDEP:
- # No-op, dependency manager will re-schedule
- pass
-
- else:
- raise errors.ProgrammerError("Job processor returned unknown status %s" %
- (result, ))
+ _EvaluateJobProcessorResult(queue.depmgr, job,
+ _JobProcessor(queue, wrap_execop_fn, job)())
@staticmethod
def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
" not one of '%s' as required" %
(dep_job_id, status, utils.CommaJoin(dep_status)))
- @locking.ssynchronized(_LOCK)
+ def _RemoveEmptyWaitersUnlocked(self):
+ """Remove all jobs without actual waiters.
+
+ """
+ for job_id in [job_id for (job_id, waiters) in self._waiters.items()
+ if not waiters]:
+ del self._waiters[job_id]
+
def NotifyWaiters(self, job_id):
"""Notifies all jobs waiting for a certain job ID.
+ @attention: Do not call until L{CheckAndRegister} returned a status other
+ than C{WAITDEP} for C{job_id}, or behaviour is undefined
@type job_id: string
@param job_id: Job ID
"""
assert ht.TString(job_id)
- jobs = self._waiters.pop(job_id, None)
+ self._lock.acquire()
+ try:
+ self._RemoveEmptyWaitersUnlocked()
+
+ jobs = self._waiters.pop(job_id, None)
+ finally:
+ self._lock.release()
+
if jobs:
# Re-add jobs to workerpool
logging.debug("Re-adding %s jobs which were waiting for job %s",
len(jobs), job_id)
self._enqueue_fn(jobs)
- # Remove all jobs without actual waiters
- for job_id in [job_id for (job_id, waiters) in self._waiters.items()
- if not waiters]:
- del self._waiters[job_id]
-
def _RequireOpenQueue(fn):
"""Decorator for "public" functions.
assert job.writable, "Can't update read-only job"
filename = self._GetJobPath(job.id)
- data = serializer.DumpJson(job.Serialize(), indent=False)
+ data = serializer.DumpJson(job.Serialize())
logging.debug("Writing job %s to %s", job.id, filename)
self._UpdateJobQueueFile(filename, data, replicate)