Implement modification of the drained flag
[ganeti-local] / lib / workerpool.py
index 0075bd7..54b3fb7 100644 (file)
@@ -27,9 +27,6 @@ import collections
 import logging
 import threading
 
-from ganeti import errors
-from ganeti import utils
-
 
 class BaseWorker(threading.Thread, object):
   """Base worker class for worker pools.
@@ -40,9 +37,8 @@ class BaseWorker(threading.Thread, object):
   def __init__(self, pool, worker_id):
     """Constructor for BaseWorker thread.
 
-    Args:
-    - pool: Parent worker pool
-    - worker_id: Identifier for this worker
+    @param pool: the parent worker pool
+    @param worker_id: identifier for this worker
 
     """
     super(BaseWorker, self).__init__()
@@ -95,7 +91,7 @@ class BaseWorker(threading.Thread, object):
             logging.debug("Worker %s: waiting for tasks", self.worker_id)
 
             # wait() releases the lock and sleeps until notified
-            pool._lock.wait()
+            pool._pool_to_worker.wait()
 
             logging.debug("Worker %s: notified while waiting", self.worker_id)
 
@@ -111,7 +107,7 @@ class BaseWorker(threading.Thread, object):
           try:
             self._current_task = pool._tasks.popleft()
           finally:
-            pool._lock.notifyAll()
+            pool._worker_to_pool.notifyAll()
         finally:
           pool._lock.release()
 
@@ -131,7 +127,7 @@ class BaseWorker(threading.Thread, object):
         try:
           if self._current_task:
             self._current_task = None
-            pool._lock.notifyAll()
+            pool._worker_to_pool.notifyAll()
         finally:
           pool._lock.release()
 
@@ -140,6 +136,8 @@ class BaseWorker(threading.Thread, object):
   def RunTask(self, *args):
     """Function called to start a task.
 
+    This needs to be implemented by child classes.
+
     """
     raise NotImplementedError()
 
@@ -149,23 +147,25 @@ class WorkerPool(object):
 
   This class is thread-safe.
 
-  Tasks are guaranteed to be started in the order in which they're added to the
-  pool. Due to the nature of threading, they're not guaranteed to finish in the
-  same order.
+  Tasks are guaranteed to be started in the order in which they're
+  added to the pool. Due to the nature of threading, they're not
+  guaranteed to finish in the same order.
 
   """
   def __init__(self, num_workers, worker_class):
     """Constructor for worker pool.
 
-    Args:
-    - num_workers: Number of workers to be started (dynamic resizing is not
-                   yet implemented)
-    - worker_class: Class to be instantiated for workers; should derive from
-                    BaseWorker
+    @param num_workers: number of workers to be started
+        (dynamic resizing is not yet implemented)
+    @param worker_class: the class to be instantiated for workers;
+        should derive from L{BaseWorker}
 
     """
     # Some of these variables are accessed by BaseWorker
-    self._lock = threading.Condition(threading.Lock())
+    self._lock = threading.Lock()
+    self._pool_to_pool = threading.Condition(self._lock)
+    self._pool_to_worker = threading.Condition(self._lock)
+    self._worker_to_pool = threading.Condition(self._lock)
     self._worker_class = worker_class
     self._last_worker_id = 0
     self._workers = []
@@ -185,19 +185,20 @@ class WorkerPool(object):
   def AddTask(self, *args):
     """Adds a task to the queue.
 
-    Args:
-    - *args: Arguments passed to BaseWorker.RunTask
+    @param args: arguments passed to L{BaseWorker.RunTask}
 
     """
     self._lock.acquire()
     try:
       # Don't add new tasks while we're quiescing
       while self._quiescing:
-        self._lock.wait()
+        self._pool_to_pool.wait()
 
       # Add task to internal queue
       self._tasks.append(args)
-      self._lock.notify()
+
+      # Wake one idling worker up
+      self._pool_to_worker.notify()
     finally:
       self._lock.release()
 
@@ -213,7 +214,7 @@ class WorkerPool(object):
     """
     self._lock.acquire()
     try:
-      return self._ShouldWorkerTerminateUnlocked(self)
+      return self._ShouldWorkerTerminateUnlocked(worker)
     finally:
       self._lock.release()
 
@@ -236,17 +237,20 @@ class WorkerPool(object):
 
       # Wait while there are tasks pending or running
       while self._tasks or self._HasRunningTasksUnlocked():
-        self._lock.wait()
+        self._worker_to_pool.wait()
 
     finally:
       self._quiescing = False
 
       # Make sure AddTasks continues in case it was waiting
-      self._lock.notifyAll()
+      self._pool_to_pool.notifyAll()
 
       self._lock.release()
 
   def _NewWorkerIdUnlocked(self):
+    """Return an identifier for a new worker.
+
+    """
     self._last_worker_id += 1
     return self._last_worker_id
 
@@ -277,12 +281,13 @@ class WorkerPool(object):
       self._termworkers += termworkers
 
       # Notify workers that something has changed
-      self._lock.notifyAll()
+      self._pool_to_worker.notifyAll()
 
       # Join all terminating workers
       self._lock.release()
       try:
         for worker in termworkers:
+          logging.debug("Waiting for thread %s", worker.getName())
           worker.join()
       finally:
         self._lock.acquire()
@@ -300,7 +305,7 @@ class WorkerPool(object):
 
     elif current_count < num_workers:
       # Create (num_workers - current_count) new workers
-      for i in xrange(num_workers - current_count):
+      for _ in xrange(num_workers - current_count):
         worker = self._worker_class(self, self._NewWorkerIdUnlocked())
         self._workers.append(worker)
         worker.start()
@@ -308,8 +313,7 @@ class WorkerPool(object):
   def Resize(self, num_workers):
     """Changes the number of workers in the pool.
 
-    Args:
-    - num_workers: New number of workers
+    @param num_workers: the new number of workers
 
     """
     self._lock.acquire()