Revision 27caa993
b/lib/workerpool.py | ||
---|---|---|
246 | 246 |
self._last_worker_id = 0 |
247 | 247 |
self._workers = [] |
248 | 248 |
self._quiescing = False |
249 |
self._active = True |
|
249 | 250 |
|
250 | 251 |
# Terminating workers |
251 | 252 |
self._termworkers = [] |
... | ... | |
340 | 341 |
finally: |
341 | 342 |
self._lock.release() |
342 | 343 |
|
344 |
def SetActive(self, active): |
|
345 |
"""Enable/disable processing of tasks. |
|
346 |
|
|
347 |
This is different from L{Quiesce} in the sense that this function just |
|
348 |
changes an internal flag and doesn't wait for the queue to be empty. Tasks |
|
349 |
already being processed continue normally, but no new tasks will be |
|
350 |
started. New tasks can still be added. |
|
351 |
|
|
352 |
@type active: bool |
|
353 |
@param active: Whether tasks should be processed |
|
354 |
|
|
355 |
""" |
|
356 |
self._lock.acquire() |
|
357 |
try: |
|
358 |
self._active = active |
|
359 |
|
|
360 |
if active: |
|
361 |
# Tell all workers to continue processing |
|
362 |
self._pool_to_worker.notifyAll() |
|
363 |
finally: |
|
364 |
self._lock.release() |
|
365 |
|
|
343 | 366 |
def _WaitForTaskUnlocked(self, worker): |
344 | 367 |
"""Waits for a task for a worker. |
345 | 368 |
|
... | ... | |
351 | 374 |
return _TERMINATE |
352 | 375 |
|
353 | 376 |
# We only wait if there's no task for us. |
354 |
if not self._tasks:
|
|
377 |
if not (self._active and self._tasks):
|
|
355 | 378 |
logging.debug("Waiting for tasks") |
356 | 379 |
|
357 | 380 |
while True: |
... | ... | |
364 | 387 |
if self._ShouldWorkerTerminateUnlocked(worker): |
365 | 388 |
return _TERMINATE |
366 | 389 |
|
367 |
if self._tasks: |
|
390 |
# Just loop if pool is not processing tasks at this time |
|
391 |
if self._active and self._tasks: |
|
368 | 392 |
break |
369 | 393 |
|
370 | 394 |
# Get task from queue and tell pool about it |
b/test/ganeti.workerpool_unittest.py | ||
---|---|---|
170 | 170 |
wp.TerminateWorkers() |
171 | 171 |
self._CheckWorkerCount(wp, 0) |
172 | 172 |
|
173 |
def testActive(self): |
|
174 |
ctx = CountingContext() |
|
175 |
wp = workerpool.WorkerPool("TestActive", 5, CountingBaseWorker) |
|
176 |
try: |
|
177 |
self._CheckWorkerCount(wp, 5) |
|
178 |
self.assertTrue(wp._active) |
|
179 |
|
|
180 |
# Process some tasks |
|
181 |
for _ in range(10): |
|
182 |
wp.AddTask((ctx, None)) |
|
183 |
|
|
184 |
wp.Quiesce() |
|
185 |
self._CheckNoTasks(wp) |
|
186 |
self.assertEquals(ctx.GetDoneTasks(), 10) |
|
187 |
|
|
188 |
# Repeat a few times |
|
189 |
for count in range(10): |
|
190 |
# Deactivate pool |
|
191 |
wp.SetActive(False) |
|
192 |
self._CheckNoTasks(wp) |
|
193 |
|
|
194 |
# Queue some more tasks |
|
195 |
for _ in range(10): |
|
196 |
wp.AddTask((ctx, None)) |
|
197 |
|
|
198 |
for _ in range(5): |
|
199 |
# Short delays to give other threads a chance to cause breakage |
|
200 |
time.sleep(.01) |
|
201 |
wp.AddTask((ctx, "Hello world %s" % 999)) |
|
202 |
self.assertFalse(wp._active) |
|
203 |
|
|
204 |
self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15)) |
|
205 |
|
|
206 |
# Start processing again |
|
207 |
wp.SetActive(True) |
|
208 |
self.assertTrue(wp._active) |
|
209 |
|
|
210 |
# Wait for tasks to finish |
|
211 |
wp.Quiesce() |
|
212 |
self._CheckNoTasks(wp) |
|
213 |
self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15) + 15) |
|
214 |
|
|
215 |
self._CheckWorkerCount(wp, 5) |
|
216 |
finally: |
|
217 |
wp.TerminateWorkers() |
|
218 |
self._CheckWorkerCount(wp, 0) |
|
219 |
|
|
173 | 220 |
def testChecksum(self): |
174 | 221 |
# Tests whether all tasks are run and, since we're only using a single |
175 | 222 |
# thread, whether everything is started in order. |
Also available in: Unified diff