X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/76094e37ce7478f4e035e049898ad509fb6c0252..90469357969d0550410a59e733661feece6b45de:/lib/workerpool.py diff --git a/lib/workerpool.py b/lib/workerpool.py index 63d85cc..54b3fb7 100644 --- a/lib/workerpool.py +++ b/lib/workerpool.py @@ -27,9 +27,6 @@ import collections import logging import threading -from ganeti import errors -from ganeti import utils - class BaseWorker(threading.Thread, object): """Base worker class for worker pools. @@ -40,16 +37,13 @@ class BaseWorker(threading.Thread, object): def __init__(self, pool, worker_id): """Constructor for BaseWorker thread. - Args: - - pool: Parent worker pool - - worker_id: Identifier for this worker + @param pool: the parent worker pool + @param worker_id: identifier for this worker """ super(BaseWorker, self).__init__() self.pool = pool self.worker_id = worker_id - - # Also used by WorkerPool self._current_task = None def ShouldTerminate(self): @@ -58,6 +52,22 @@ class BaseWorker(threading.Thread, object): """ return self.pool.ShouldWorkerTerminate(self) + def _HasRunningTaskUnlocked(self): + """Returns whether this worker is currently running a task. + + """ + return (self._current_task is not None) + + def HasRunningTask(self): + """Returns whether this worker is currently running a task. + + """ + self.pool._lock.acquire() + try: + return self._HasRunningTaskUnlocked() + finally: + self.pool._lock.release() + def run(self): """Main thread function. @@ -66,7 +76,7 @@ class BaseWorker(threading.Thread, object): """ pool = self.pool - assert self._current_task is None + assert not self.HasRunningTask() while True: try: @@ -78,8 +88,12 @@ class BaseWorker(threading.Thread, object): # We only wait if there's no task for us. if not pool._tasks: + logging.debug("Worker %s: waiting for tasks", self.worker_id) + # wait() releases the lock and sleeps until notified - pool._lock.wait() + pool._pool_to_worker.wait() + + logging.debug("Worker %s: notified while waiting", self.worker_id) # Were we woken up in order to terminate? if pool._ShouldWorkerTerminateUnlocked(self): @@ -93,29 +107,37 @@ class BaseWorker(threading.Thread, object): try: self._current_task = pool._tasks.popleft() finally: - pool._lock.notifyAll() + pool._worker_to_pool.notifyAll() finally: pool._lock.release() # Run the actual task try: + logging.debug("Worker %s: starting task %r", + self.worker_id, self._current_task) self.RunTask(*self._current_task) + logging.debug("Worker %s: done with task %r", + self.worker_id, self._current_task) except: logging.error("Worker %s: Caught unhandled exception", self.worker_id, exc_info=True) finally: - self._current_task = None - # Notify pool pool._lock.acquire() try: - pool._lock.notifyAll() + if self._current_task: + self._current_task = None + pool._worker_to_pool.notifyAll() finally: pool._lock.release() + logging.debug("Worker %s: terminates", self.worker_id) + def RunTask(self, *args): """Function called to start a task. + This needs to be implemented by child classes. + """ raise NotImplementedError() @@ -125,23 +147,25 @@ class WorkerPool(object): This class is thread-safe. - Tasks are guaranteed to be started in the order in which they're added to the - pool. Due to the nature of threading, they're not guaranteed to finish in the - same order. + Tasks are guaranteed to be started in the order in which they're + added to the pool. Due to the nature of threading, they're not + guaranteed to finish in the same order. """ def __init__(self, num_workers, worker_class): """Constructor for worker pool. - Args: - - num_workers: Number of workers to be started (dynamic resizing is not - yet implemented) - - worker_class: Class to be instantiated for workers; should derive from - BaseWorker + @param num_workers: number of workers to be started + (dynamic resizing is not yet implemented) + @param worker_class: the class to be instantiated for workers; + should derive from L{BaseWorker} """ # Some of these variables are accessed by BaseWorker - self._lock = threading.Condition(threading.Lock()) + self._lock = threading.Lock() + self._pool_to_pool = threading.Condition(self._lock) + self._pool_to_worker = threading.Condition(self._lock) + self._worker_to_pool = threading.Condition(self._lock) self._worker_class = worker_class self._last_worker_id = 0 self._workers = [] @@ -161,19 +185,20 @@ class WorkerPool(object): def AddTask(self, *args): """Adds a task to the queue. - Args: - - *args: Arguments passed to BaseWorker.RunTask + @param args: arguments passed to L{BaseWorker.RunTask} """ self._lock.acquire() try: # Don't add new tasks while we're quiescing while self._quiescing: - self._lock.wait() + self._pool_to_pool.wait() # Add task to internal queue self._tasks.append(args) - self._lock.notify() + + # Wake one idling worker up + self._pool_to_worker.notify() finally: self._lock.release() @@ -189,7 +214,7 @@ class WorkerPool(object): """ self._lock.acquire() try: - return self._ShouldWorkerTerminateUnlocked(self) + return self._ShouldWorkerTerminateUnlocked(worker) finally: self._lock.release() @@ -198,7 +223,7 @@ class WorkerPool(object): """ for worker in self._workers + self._termworkers: - if worker._current_task is not None: + if worker._HasRunningTaskUnlocked(): return True return False @@ -212,17 +237,20 @@ class WorkerPool(object): # Wait while there are tasks pending or running while self._tasks or self._HasRunningTasksUnlocked(): - self._lock.wait() + self._worker_to_pool.wait() finally: self._quiescing = False # Make sure AddTasks continues in case it was waiting - self._lock.notifyAll() + self._pool_to_pool.notifyAll() self._lock.release() def _NewWorkerIdUnlocked(self): + """Return an identifier for a new worker. + + """ self._last_worker_id += 1 return self._last_worker_id @@ -253,12 +281,13 @@ class WorkerPool(object): self._termworkers += termworkers # Notify workers that something has changed - self._lock.notifyAll() + self._pool_to_worker.notifyAll() # Join all terminating workers self._lock.release() try: for worker in termworkers: + logging.debug("Waiting for thread %s", worker.getName()) worker.join() finally: self._lock.acquire() @@ -276,7 +305,7 @@ class WorkerPool(object): elif current_count < num_workers: # Create (num_workers - current_count) new workers - for i in xrange(num_workers - current_count): + for _ in xrange(num_workers - current_count): worker = self._worker_class(self, self._NewWorkerIdUnlocked()) self._workers.append(worker) worker.start() @@ -284,8 +313,7 @@ class WorkerPool(object): def Resize(self, num_workers): """Changes the number of workers in the pool. - Args: - - num_workers: New number of workers + @param num_workers: the new number of workers """ self._lock.acquire()