self._last_worker_id = 0
self._workers = []
self._quiescing = False
+ self._active = True
# Terminating workers
self._termworkers = []
finally:
self._lock.release()
+ def SetActive(self, active):
+ """Enable/disable processing of tasks.
+
+ This is different from L{Quiesce} in the sense that this function just
+ changes an internal flag and doesn't wait for the queue to be empty. Tasks
+ already being processed continue normally, but no new tasks will be
+ started. New tasks can still be added.
+
+ @type active: bool
+ @param active: Whether tasks should be processed
+
+ """
+ self._lock.acquire()
+ try:
+ self._active = active
+
+ if active:
+ # Tell all workers to continue processing
+ self._pool_to_worker.notifyAll()
+ finally:
+ self._lock.release()
+
def _WaitForTaskUnlocked(self, worker):
"""Waits for a task for a worker.
return _TERMINATE
# We only wait if there's no task for us.
- if not self._tasks:
+ if not (self._active and self._tasks):
logging.debug("Waiting for tasks")
while True:
if self._ShouldWorkerTerminateUnlocked(worker):
return _TERMINATE
- if self._tasks:
+ # Just loop if pool is not processing tasks at this time
+ if self._active and self._tasks:
break
# Get task from queue and tell pool about it
return True
return False
+ def HasRunningTasks(self):
+ """Checks whether there's at least one task running.
+
+ """
+ self._lock.acquire()
+ try:
+ return self._HasRunningTasksUnlocked()
+ finally:
+ self._lock.release()
+
def Quiesce(self):
"""Waits until the task queue is empty.