X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/52c47e4e8adce0406f08ad1992ce99c1b8f2f2b7..4a78c361a6de3bcbf98f02abfe41ae3b11de2b00:/lib/workerpool.py diff --git a/lib/workerpool.py b/lib/workerpool.py index 1838d97..8db03c7 100644 --- a/lib/workerpool.py +++ b/lib/workerpool.py @@ -59,7 +59,7 @@ class BaseWorker(threading.Thread, object): Users of a worker pool must override RunTask in a subclass. """ - # pylint: disable-msg=W0212 + # pylint: disable=W0212 def __init__(self, pool, worker_id): """Constructor for BaseWorker thread. @@ -69,8 +69,11 @@ class BaseWorker(threading.Thread, object): """ super(BaseWorker, self).__init__(name=worker_id) self.pool = pool + self._worker_id = worker_id self._current_task = None + assert self.getName() == worker_id + def ShouldTerminate(self): """Returns whether this worker should terminate. @@ -100,6 +103,23 @@ class BaseWorker(threading.Thread, object): finally: self.pool._lock.release() + def SetTaskName(self, taskname): + """Sets the name of the current task. + + Should only be called from within L{RunTask}. + + @type taskname: string + @param taskname: Task's name + + """ + if taskname: + name = "%s/%s" % (self._worker_id, taskname) + else: + name = self._worker_id + + # Set thread name + self.setName(name) + def _HasRunningTaskUnlocked(self): """Returns whether this worker is currently running a task. @@ -147,7 +167,11 @@ class BaseWorker(threading.Thread, object): # Run the actual task assert defer is None logging.debug("Starting task %r, priority %s", args, priority) - self.RunTask(*args) # pylint: disable-msg=W0142 + assert self.getName() == self._worker_id + try: + self.RunTask(*args) # pylint: disable=W0142 + finally: + self.SetTaskName(None) logging.debug("Done with task %r, priority %s", args, priority) except DeferTask, err: defer = err @@ -156,11 +180,11 @@ class BaseWorker(threading.Thread, object): # Use same priority defer.priority = priority - logging.debug("Deferring task %r, new priority %s", defer.priority) + logging.debug("Deferring task %r, new priority %s", + args, defer.priority) assert self._HasRunningTaskUnlocked() - - except: # pylint: disable-msg=W0702 + except: # pylint: disable=W0702 logging.exception("Caught unhandled exception") assert self._HasRunningTaskUnlocked() @@ -222,6 +246,7 @@ class WorkerPool(object): self._last_worker_id = 0 self._workers = [] self._quiescing = False + self._active = True # Terminating workers self._termworkers = [] @@ -316,6 +341,28 @@ class WorkerPool(object): finally: self._lock.release() + def SetActive(self, active): + """Enable/disable processing of tasks. + + This is different from L{Quiesce} in the sense that this function just + changes an internal flag and doesn't wait for the queue to be empty. Tasks + already being processed continue normally, but no new tasks will be + started. New tasks can still be added. + + @type active: bool + @param active: Whether tasks should be processed + + """ + self._lock.acquire() + try: + self._active = active + + if active: + # Tell all workers to continue processing + self._pool_to_worker.notifyAll() + finally: + self._lock.release() + def _WaitForTaskUnlocked(self, worker): """Waits for a task for a worker. @@ -327,21 +374,22 @@ class WorkerPool(object): return _TERMINATE # We only wait if there's no task for us. - if not self._tasks: + if not (self._active and self._tasks): logging.debug("Waiting for tasks") - # wait() releases the lock and sleeps until notified - self._pool_to_worker.wait() + while True: + # wait() releases the lock and sleeps until notified + self._pool_to_worker.wait() - logging.debug("Notified while waiting") + logging.debug("Notified while waiting") - # Were we woken up in order to terminate? - if self._ShouldWorkerTerminateUnlocked(worker): - return _TERMINATE + # Were we woken up in order to terminate? + if self._ShouldWorkerTerminateUnlocked(worker): + return _TERMINATE - if not self._tasks: - # Spurious notification, ignore - return None + # Just loop if pool is not processing tasks at this time + if self._active and self._tasks: + break # Get task from queue and tell pool about it try: @@ -360,10 +408,20 @@ class WorkerPool(object): """ for worker in self._workers + self._termworkers: - if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212 + if worker._HasRunningTaskUnlocked(): # pylint: disable=W0212 return True return False + def HasRunningTasks(self): + """Checks whether there's at least one task running. + + """ + self._lock.acquire() + try: + return self._HasRunningTasksUnlocked() + finally: + self._lock.release() + def Quiesce(self): """Waits until the task queue is empty.