" not one of '%s' as required" %
(dep_job_id, status, utils.CommaJoin(dep_status)))
- @locking.ssynchronized(_LOCK)
+ def _RemoveEmptyWaitersUnlocked(self):
+ """Remove all jobs without actual waiters.
+
+ """
+ for job_id in [job_id for (job_id, waiters) in self._waiters.items()
+ if not waiters]:
+ del self._waiters[job_id]
+
def NotifyWaiters(self, job_id):
"""Notifies all jobs waiting for a certain job ID.
+ @important: Do not call until L{CheckAndRegister} returned a status other
+ than L{self.WAIT} for C{job_id}, or behaviour is undefined
@type job_id: string
@param job_id: Job ID
"""
assert ht.TString(job_id)
- jobs = self._waiters.pop(job_id, None)
+ self._lock.acquire()
+ try:
+ self._RemoveEmptyWaitersUnlocked()
+
+ jobs = self._waiters.pop(job_id, None)
+ finally:
+ self._lock.release()
+
if jobs:
# Re-add jobs to workerpool
logging.debug("Re-adding %s jobs which were waiting for job %s",
len(jobs), job_id)
self._enqueue_fn(jobs)
- # Remove all jobs without actual waiters
- for job_id in [job_id for (job_id, waiters) in self._waiters.items()
- if not waiters]:
- del self._waiters[job_id]
-
def _RequireOpenQueue(fn):
"""Decorator for "public" functions.
return result
def _Enqueue(self, jobs):
+ self.assertFalse(self.jdm._lock.is_owned(),
+ msg=("Must not own manager lock while re-adding jobs"
+ " (potential deadlock)"))
self._queue.append(jobs)
def testNotFinalizedThenCancel(self):