Revision 53b1d12b lib/workerpool.py
b/lib/workerpool.py | ||
---|---|---|
95 | 95 |
logging.debug("Worker %s: waiting for tasks", self.worker_id) |
96 | 96 |
|
97 | 97 |
# wait() releases the lock and sleeps until notified |
98 |
pool._lock.wait()
|
|
98 |
pool._pool_to_worker.wait()
|
|
99 | 99 |
|
100 | 100 |
logging.debug("Worker %s: notified while waiting", self.worker_id) |
101 | 101 |
|
... | ... | |
111 | 111 |
try: |
112 | 112 |
self._current_task = pool._tasks.popleft() |
113 | 113 |
finally: |
114 |
pool._lock.notifyAll()
|
|
114 |
pool._worker_to_pool.notifyAll()
|
|
115 | 115 |
finally: |
116 | 116 |
pool._lock.release() |
117 | 117 |
|
... | ... | |
131 | 131 |
try: |
132 | 132 |
if self._current_task: |
133 | 133 |
self._current_task = None |
134 |
pool._lock.notifyAll()
|
|
134 |
pool._worker_to_pool.notifyAll()
|
|
135 | 135 |
finally: |
136 | 136 |
pool._lock.release() |
137 | 137 |
|
... | ... | |
165 | 165 |
|
166 | 166 |
""" |
167 | 167 |
# Some of these variables are accessed by BaseWorker |
168 |
self._lock = threading.Condition(threading.Lock()) |
|
168 |
self._lock = threading.Lock() |
|
169 |
self._pool_to_pool = threading.Condition(self._lock) |
|
170 |
self._pool_to_worker = threading.Condition(self._lock) |
|
171 |
self._worker_to_pool = threading.Condition(self._lock) |
|
169 | 172 |
self._worker_class = worker_class |
170 | 173 |
self._last_worker_id = 0 |
171 | 174 |
self._workers = [] |
... | ... | |
193 | 196 |
try: |
194 | 197 |
# Don't add new tasks while we're quiescing |
195 | 198 |
while self._quiescing: |
196 |
self._lock.wait()
|
|
199 |
self._pool_to_pool.wait()
|
|
197 | 200 |
|
198 | 201 |
# Add task to internal queue |
199 | 202 |
self._tasks.append(args) |
200 |
self._lock.notify() |
|
203 |
|
|
204 |
# Wake one idling worker up |
|
205 |
self._pool_to_worker.notify() |
|
201 | 206 |
finally: |
202 | 207 |
self._lock.release() |
203 | 208 |
|
... | ... | |
236 | 241 |
|
237 | 242 |
# Wait while there are tasks pending or running |
238 | 243 |
while self._tasks or self._HasRunningTasksUnlocked(): |
239 |
self._lock.wait()
|
|
244 |
self._worker_to_pool.wait()
|
|
240 | 245 |
|
241 | 246 |
finally: |
242 | 247 |
self._quiescing = False |
243 | 248 |
|
244 | 249 |
# Make sure AddTasks continues in case it was waiting |
245 |
self._lock.notifyAll()
|
|
250 |
self._pool_to_pool.notifyAll()
|
|
246 | 251 |
|
247 | 252 |
self._lock.release() |
248 | 253 |
|
... | ... | |
277 | 282 |
self._termworkers += termworkers |
278 | 283 |
|
279 | 284 |
# Notify workers that something has changed |
280 |
self._lock.notifyAll()
|
|
285 |
self._pool_to_worker.notifyAll()
|
|
281 | 286 |
|
282 | 287 |
# Join all terminating workers |
283 | 288 |
self._lock.release() |
Also available in: Unified diff