Revision 27caa993 lib/workerpool.py
b/lib/workerpool.py | ||
---|---|---|
246 | 246 |
self._last_worker_id = 0 |
247 | 247 |
self._workers = [] |
248 | 248 |
self._quiescing = False |
249 |
self._active = True |
|
249 | 250 |
|
250 | 251 |
# Terminating workers |
251 | 252 |
self._termworkers = [] |
... | ... | |
340 | 341 |
finally: |
341 | 342 |
self._lock.release() |
342 | 343 |
|
344 |
def SetActive(self, active): |
|
345 |
"""Enable/disable processing of tasks. |
|
346 |
|
|
347 |
This is different from L{Quiesce} in the sense that this function just |
|
348 |
changes an internal flag and doesn't wait for the queue to be empty. Tasks |
|
349 |
already being processed continue normally, but no new tasks will be |
|
350 |
started. New tasks can still be added. |
|
351 |
|
|
352 |
@type active: bool |
|
353 |
@param active: Whether tasks should be processed |
|
354 |
|
|
355 |
""" |
|
356 |
self._lock.acquire() |
|
357 |
try: |
|
358 |
self._active = active |
|
359 |
|
|
360 |
if active: |
|
361 |
# Tell all workers to continue processing |
|
362 |
self._pool_to_worker.notifyAll() |
|
363 |
finally: |
|
364 |
self._lock.release() |
|
365 |
|
|
343 | 366 |
def _WaitForTaskUnlocked(self, worker): |
344 | 367 |
"""Waits for a task for a worker. |
345 | 368 |
|
... | ... | |
351 | 374 |
return _TERMINATE |
352 | 375 |
|
353 | 376 |
# We only wait if there's no task for us. |
354 |
if not self._tasks:
|
|
377 |
if not (self._active and self._tasks):
|
|
355 | 378 |
logging.debug("Waiting for tasks") |
356 | 379 |
|
357 | 380 |
while True: |
... | ... | |
364 | 387 |
if self._ShouldWorkerTerminateUnlocked(worker): |
365 | 388 |
return _TERMINATE |
366 | 389 |
|
367 |
if self._tasks: |
|
390 |
# Just loop if pool is not processing tasks at this time |
|
391 |
if self._active and self._tasks: |
|
368 | 392 |
break |
369 | 393 |
|
370 | 394 |
# Get task from queue and tell pool about it |
Also available in: Unified diff