Revision b3558df1 lib/workerpool.py
b/lib/workerpool.py | ||
---|---|---|
48 | 48 |
super(BaseWorker, self).__init__() |
49 | 49 |
self.pool = pool |
50 | 50 |
self.worker_id = worker_id |
51 |
|
|
52 |
# Also used by WorkerPool |
|
53 | 51 |
self._current_task = None |
54 | 52 |
|
55 | 53 |
def ShouldTerminate(self): |
... | ... | |
58 | 56 |
""" |
59 | 57 |
return self.pool.ShouldWorkerTerminate(self) |
60 | 58 |
|
59 |
def _HasRunningTaskUnlocked(self): |
|
60 |
"""Returns whether this worker is currently running a task. |
|
61 |
|
|
62 |
""" |
|
63 |
return (self._current_task is not None) |
|
64 |
|
|
65 |
def HasRunningTask(self): |
|
66 |
"""Returns whether this worker is currently running a task. |
|
67 |
|
|
68 |
""" |
|
69 |
self.pool._lock.acquire() |
|
70 |
try: |
|
71 |
return self._HasRunningTaskUnlocked() |
|
72 |
finally: |
|
73 |
self.pool._lock.release() |
|
74 |
|
|
61 | 75 |
def run(self): |
62 | 76 |
"""Main thread function. |
63 | 77 |
|
... | ... | |
66 | 80 |
""" |
67 | 81 |
pool = self.pool |
68 | 82 |
|
69 |
assert self._current_task is None
|
|
83 |
assert not self.HasRunningTask()
|
|
70 | 84 |
|
71 | 85 |
while True: |
72 | 86 |
try: |
... | ... | |
78 | 92 |
|
79 | 93 |
# We only wait if there's no task for us. |
80 | 94 |
if not pool._tasks: |
95 |
logging.debug("Worker %s: waiting for tasks", self.worker_id) |
|
96 |
|
|
81 | 97 |
# wait() releases the lock and sleeps until notified |
82 | 98 |
pool._lock.wait() |
83 | 99 |
|
100 |
logging.debug("Worker %s: notified while waiting", self.worker_id) |
|
101 |
|
|
84 | 102 |
# Were we woken up in order to terminate? |
85 | 103 |
if pool._ShouldWorkerTerminateUnlocked(self): |
86 | 104 |
break |
... | ... | |
99 | 117 |
|
100 | 118 |
# Run the actual task |
101 | 119 |
try: |
120 |
logging.debug("Worker %s: starting task %r", |
|
121 |
self.worker_id, self._current_task) |
|
102 | 122 |
self.RunTask(*self._current_task) |
123 |
logging.debug("Worker %s: done with task %r", |
|
124 |
self.worker_id, self._current_task) |
|
103 | 125 |
except: |
104 | 126 |
logging.error("Worker %s: Caught unhandled exception", |
105 | 127 |
self.worker_id, exc_info=True) |
106 | 128 |
finally: |
107 |
self._current_task = None |
|
108 |
|
|
109 | 129 |
# Notify pool |
110 | 130 |
pool._lock.acquire() |
111 | 131 |
try: |
112 |
pool._lock.notifyAll() |
|
132 |
if self._current_task: |
|
133 |
self._current_task = None |
|
134 |
pool._lock.notifyAll() |
|
113 | 135 |
finally: |
114 | 136 |
pool._lock.release() |
115 | 137 |
|
138 |
logging.debug("Worker %s: terminates", self.worker_id) |
|
139 |
|
|
116 | 140 |
def RunTask(self, *args): |
117 | 141 |
"""Function called to start a task. |
118 | 142 |
|
... | ... | |
198 | 222 |
|
199 | 223 |
""" |
200 | 224 |
for worker in self._workers + self._termworkers: |
201 |
if worker._current_task is not None:
|
|
225 |
if worker._HasRunningTaskUnlocked():
|
|
202 | 226 |
return True |
203 | 227 |
return False |
204 | 228 |
|
Also available in: Unified diff