Revision 21c5ad52
b/lib/workerpool.py | ||
---|---|---|
30 | 30 |
from ganeti import compat |
31 | 31 |
|
32 | 32 |
|
33 |
_TERMINATE = object() |
|
34 |
|
|
35 |
|
|
33 | 36 |
class BaseWorker(threading.Thread, object): |
34 | 37 |
"""Base worker class for worker pools. |
35 | 38 |
|
... | ... | |
82 | 85 |
|
83 | 86 |
while True: |
84 | 87 |
try: |
85 |
# We wait on lock to be told either terminate or do a task.
|
|
88 |
# Wait on lock to be told either to terminate or to do a task
|
|
86 | 89 |
pool._lock.acquire() |
87 | 90 |
try: |
88 |
if pool._ShouldWorkerTerminateUnlocked(self): |
|
89 |
break |
|
90 |
|
|
91 |
# We only wait if there's no task for us. |
|
92 |
if not pool._tasks: |
|
93 |
logging.debug("Waiting for tasks") |
|
94 |
|
|
95 |
# wait() releases the lock and sleeps until notified |
|
96 |
pool._pool_to_worker.wait() |
|
91 |
task = pool._WaitForTaskUnlocked(self) |
|
97 | 92 |
|
98 |
logging.debug("Notified while waiting") |
|
93 |
if task is _TERMINATE: |
|
94 |
# Told to terminate |
|
95 |
break |
|
99 | 96 |
|
100 |
# Were we woken up in order to terminate?
|
|
101 |
if pool._ShouldWorkerTerminateUnlocked(self):
|
|
102 |
break
|
|
97 |
if task is None:
|
|
98 |
# Spurious notification, ignore
|
|
99 |
continue
|
|
103 | 100 |
|
104 |
if not pool._tasks: |
|
105 |
# Spurious notification, ignore |
|
106 |
continue |
|
101 |
self._current_task = task |
|
107 | 102 |
|
108 |
# Get task from queue and tell pool about it |
|
109 |
try: |
|
110 |
self._current_task = pool._tasks.popleft() |
|
111 |
finally: |
|
112 |
pool._worker_to_pool.notifyAll() |
|
103 |
assert self._HasRunningTaskUnlocked() |
|
113 | 104 |
finally: |
114 | 105 |
pool._lock.release() |
115 | 106 |
|
... | ... | |
229 | 220 |
finally: |
230 | 221 |
self._lock.release() |
231 | 222 |
|
223 |
def _WaitForTaskUnlocked(self, worker): |
|
224 |
"""Waits for a task for a worker. |
|
225 |
|
|
226 |
@type worker: L{BaseWorker} |
|
227 |
@param worker: Worker thread |
|
228 |
|
|
229 |
""" |
|
230 |
if self._ShouldWorkerTerminateUnlocked(worker): |
|
231 |
return _TERMINATE |
|
232 |
|
|
233 |
# We only wait if there's no task for us. |
|
234 |
if not self._tasks: |
|
235 |
logging.debug("Waiting for tasks") |
|
236 |
|
|
237 |
# wait() releases the lock and sleeps until notified |
|
238 |
self._pool_to_worker.wait() |
|
239 |
|
|
240 |
logging.debug("Notified while waiting") |
|
241 |
|
|
242 |
# Were we woken up in order to terminate? |
|
243 |
if self._ShouldWorkerTerminateUnlocked(worker): |
|
244 |
return _TERMINATE |
|
245 |
|
|
246 |
if not self._tasks: |
|
247 |
# Spurious notification, ignore |
|
248 |
return None |
|
249 |
|
|
250 |
# Get task from queue and tell pool about it |
|
251 |
try: |
|
252 |
return self._tasks.popleft() |
|
253 |
finally: |
|
254 |
self._worker_to_pool.notifyAll() |
|
255 |
|
|
232 | 256 |
def _ShouldWorkerTerminateUnlocked(self, worker): |
233 | 257 |
"""Returns whether a worker should terminate. |
234 | 258 |
|
Also available in: Unified diff