import logging
import threading
-from ganeti import errors
-from ganeti import utils
-
class BaseWorker(threading.Thread, object):
"""Base worker class for worker pools.
def __init__(self, pool, worker_id):
"""Constructor for BaseWorker thread.
- Args:
- - pool: Parent worker pool
- - worker_id: Identifier for this worker
+ @param pool: the parent worker pool
+ @param worker_id: identifier for this worker
"""
super(BaseWorker, self).__init__()
self.pool = pool
self.worker_id = worker_id
-
- # Also used by WorkerPool
self._current_task = None
def ShouldTerminate(self):
"""
return self.pool.ShouldWorkerTerminate(self)
+ def _HasRunningTaskUnlocked(self):
+ """Returns whether this worker is currently running a task.
+
+ """
+ return (self._current_task is not None)
+
+ def HasRunningTask(self):
+ """Returns whether this worker is currently running a task.
+
+ """
+ self.pool._lock.acquire()
+ try:
+ return self._HasRunningTaskUnlocked()
+ finally:
+ self.pool._lock.release()
+
def run(self):
"""Main thread function.
"""
pool = self.pool
- assert self._current_task is None
+ assert not self.HasRunningTask()
while True:
try:
# We only wait if there's no task for us.
if not pool._tasks:
+ logging.debug("Worker %s: waiting for tasks", self.worker_id)
+
# wait() releases the lock and sleeps until notified
- pool._lock.wait()
+ pool._pool_to_worker.wait()
+
+ logging.debug("Worker %s: notified while waiting", self.worker_id)
# Were we woken up in order to terminate?
if pool._ShouldWorkerTerminateUnlocked(self):
try:
self._current_task = pool._tasks.popleft()
finally:
- pool._lock.notifyAll()
+ pool._worker_to_pool.notifyAll()
finally:
pool._lock.release()
# Run the actual task
try:
+ logging.debug("Worker %s: starting task %r",
+ self.worker_id, self._current_task)
self.RunTask(*self._current_task)
+ logging.debug("Worker %s: done with task %r",
+ self.worker_id, self._current_task)
except:
logging.error("Worker %s: Caught unhandled exception",
self.worker_id, exc_info=True)
finally:
- self._current_task = None
-
# Notify pool
pool._lock.acquire()
try:
- pool._lock.notifyAll()
+ if self._current_task:
+ self._current_task = None
+ pool._worker_to_pool.notifyAll()
finally:
pool._lock.release()
+ logging.debug("Worker %s: terminates", self.worker_id)
+
def RunTask(self, *args):
"""Function called to start a task.
+ This needs to be implemented by child classes.
+
"""
raise NotImplementedError()
This class is thread-safe.
- Tasks are guaranteed to be started in the order in which they're added to the
- pool. Due to the nature of threading, they're not guaranteed to finish in the
- same order.
+ Tasks are guaranteed to be started in the order in which they're
+ added to the pool. Due to the nature of threading, they're not
+ guaranteed to finish in the same order.
"""
def __init__(self, num_workers, worker_class):
"""Constructor for worker pool.
- Args:
- - num_workers: Number of workers to be started (dynamic resizing is not
- yet implemented)
- - worker_class: Class to be instantiated for workers; should derive from
- BaseWorker
+ @param num_workers: number of workers to be started
+ (dynamic resizing is not yet implemented)
+ @param worker_class: the class to be instantiated for workers;
+ should derive from L{BaseWorker}
"""
# Some of these variables are accessed by BaseWorker
- self._lock = threading.Condition(threading.Lock())
+ self._lock = threading.Lock()
+ self._pool_to_pool = threading.Condition(self._lock)
+ self._pool_to_worker = threading.Condition(self._lock)
+ self._worker_to_pool = threading.Condition(self._lock)
self._worker_class = worker_class
self._last_worker_id = 0
self._workers = []
def AddTask(self, *args):
"""Adds a task to the queue.
- Args:
- - *args: Arguments passed to BaseWorker.RunTask
+ @param args: arguments passed to L{BaseWorker.RunTask}
"""
self._lock.acquire()
try:
# Don't add new tasks while we're quiescing
while self._quiescing:
- self._lock.wait()
+ self._pool_to_pool.wait()
# Add task to internal queue
self._tasks.append(args)
- self._lock.notify()
+
+ # Wake one idling worker up
+ self._pool_to_worker.notify()
finally:
self._lock.release()
"""
self._lock.acquire()
try:
- return self._ShouldWorkerTerminateUnlocked(self)
+ return self._ShouldWorkerTerminateUnlocked(worker)
finally:
self._lock.release()
"""
for worker in self._workers + self._termworkers:
- if worker._current_task is not None:
+ if worker._HasRunningTaskUnlocked():
return True
return False
# Wait while there are tasks pending or running
while self._tasks or self._HasRunningTasksUnlocked():
- self._lock.wait()
+ self._worker_to_pool.wait()
finally:
self._quiescing = False
# Make sure AddTasks continues in case it was waiting
- self._lock.notifyAll()
+ self._pool_to_pool.notifyAll()
self._lock.release()
def _NewWorkerIdUnlocked(self):
+ """Return an identifier for a new worker.
+
+ """
self._last_worker_id += 1
return self._last_worker_id
self._termworkers += termworkers
# Notify workers that something has changed
- self._lock.notifyAll()
+ self._pool_to_worker.notifyAll()
# Join all terminating workers
self._lock.release()
try:
for worker in termworkers:
+ logging.debug("Waiting for thread %s", worker.getName())
worker.join()
finally:
self._lock.acquire()
elif current_count < num_workers:
# Create (num_workers - current_count) new workers
- for i in xrange(num_workers - current_count):
+ for _ in xrange(num_workers - current_count):
worker = self._worker_class(self, self._NewWorkerIdUnlocked())
self._workers.append(worker)
worker.start()
def Resize(self, num_workers):
"""Changes the number of workers in the pool.
- Args:
- - num_workers: New number of workers
+ @param num_workers: the new number of workers
"""
self._lock.acquire()