logging.debug("Worker %s: waiting for tasks", self.worker_id)
# wait() releases the lock and sleeps until notified
- pool._lock.wait()
+ pool._pool_to_worker.wait()
logging.debug("Worker %s: notified while waiting", self.worker_id)
try:
self._current_task = pool._tasks.popleft()
finally:
- pool._lock.notifyAll()
+ pool._worker_to_pool.notifyAll()
finally:
pool._lock.release()
try:
if self._current_task:
self._current_task = None
- pool._lock.notifyAll()
+ pool._worker_to_pool.notifyAll()
finally:
pool._lock.release()
"""
# Some of these variables are accessed by BaseWorker
- self._lock = threading.Condition(threading.Lock())
+ self._lock = threading.Lock()
+ self._pool_to_pool = threading.Condition(self._lock)
+ self._pool_to_worker = threading.Condition(self._lock)
+ self._worker_to_pool = threading.Condition(self._lock)
self._worker_class = worker_class
self._last_worker_id = 0
self._workers = []
try:
# Don't add new tasks while we're quiescing
while self._quiescing:
- self._lock.wait()
+ self._pool_to_pool.wait()
# Add task to internal queue
self._tasks.append(args)
- self._lock.notify()
+
+ # Wake one idling worker up
+ self._pool_to_worker.notify()
finally:
self._lock.release()
# Wait while there are tasks pending or running
while self._tasks or self._HasRunningTasksUnlocked():
- self._lock.wait()
+ self._worker_to_pool.wait()
finally:
self._quiescing = False
# Make sure AddTasks continues in case it was waiting
- self._lock.notifyAll()
+ self._pool_to_pool.notifyAll()
self._lock.release()
self._termworkers += termworkers
# Notify workers that something has changed
- self._lock.notifyAll()
+ self._pool_to_worker.notifyAll()
# Join all terminating workers
self._lock.release()