X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/e1ea54e95bfcf22a4214cfd87ce96d8726a15bf1..4a78c361a6de3bcbf98f02abfe41ae3b11de2b00:/lib/workerpool.py diff --git a/lib/workerpool.py b/lib/workerpool.py index d276b18..8db03c7 100644 --- a/lib/workerpool.py +++ b/lib/workerpool.py @@ -59,7 +59,7 @@ class BaseWorker(threading.Thread, object): Users of a worker pool must override RunTask in a subclass. """ - # pylint: disable-msg=W0212 + # pylint: disable=W0212 def __init__(self, pool, worker_id): """Constructor for BaseWorker thread. @@ -169,7 +169,7 @@ class BaseWorker(threading.Thread, object): logging.debug("Starting task %r, priority %s", args, priority) assert self.getName() == self._worker_id try: - self.RunTask(*args) # pylint: disable-msg=W0142 + self.RunTask(*args) # pylint: disable=W0142 finally: self.SetTaskName(None) logging.debug("Done with task %r, priority %s", args, priority) @@ -184,7 +184,7 @@ class BaseWorker(threading.Thread, object): args, defer.priority) assert self._HasRunningTaskUnlocked() - except: # pylint: disable-msg=W0702 + except: # pylint: disable=W0702 logging.exception("Caught unhandled exception") assert self._HasRunningTaskUnlocked() @@ -246,6 +246,7 @@ class WorkerPool(object): self._last_worker_id = 0 self._workers = [] self._quiescing = False + self._active = True # Terminating workers self._termworkers = [] @@ -340,6 +341,28 @@ class WorkerPool(object): finally: self._lock.release() + def SetActive(self, active): + """Enable/disable processing of tasks. + + This is different from L{Quiesce} in the sense that this function just + changes an internal flag and doesn't wait for the queue to be empty. Tasks + already being processed continue normally, but no new tasks will be + started. New tasks can still be added. + + @type active: bool + @param active: Whether tasks should be processed + + """ + self._lock.acquire() + try: + self._active = active + + if active: + # Tell all workers to continue processing + self._pool_to_worker.notifyAll() + finally: + self._lock.release() + def _WaitForTaskUnlocked(self, worker): """Waits for a task for a worker. @@ -351,21 +374,22 @@ class WorkerPool(object): return _TERMINATE # We only wait if there's no task for us. - if not self._tasks: + if not (self._active and self._tasks): logging.debug("Waiting for tasks") - # wait() releases the lock and sleeps until notified - self._pool_to_worker.wait() + while True: + # wait() releases the lock and sleeps until notified + self._pool_to_worker.wait() - logging.debug("Notified while waiting") + logging.debug("Notified while waiting") - # Were we woken up in order to terminate? - if self._ShouldWorkerTerminateUnlocked(worker): - return _TERMINATE + # Were we woken up in order to terminate? + if self._ShouldWorkerTerminateUnlocked(worker): + return _TERMINATE - if not self._tasks: - # Spurious notification, ignore - return None + # Just loop if pool is not processing tasks at this time + if self._active and self._tasks: + break # Get task from queue and tell pool about it try: @@ -384,10 +408,20 @@ class WorkerPool(object): """ for worker in self._workers + self._termworkers: - if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212 + if worker._HasRunningTaskUnlocked(): # pylint: disable=W0212 return True return False + def HasRunningTasks(self): + """Checks whether there's at least one task running. + + """ + self._lock.acquire() + try: + return self._HasRunningTasksUnlocked() + finally: + self._lock.release() + def Quiesce(self): """Waits until the task queue is empty.