Split conditions in worker pool
authorMichael Hanselmann <hansmi@google.com>
Tue, 22 Jul 2008 08:17:52 +0000 (08:17 +0000)
committerMichael Hanselmann <hansmi@google.com>
Tue, 22 Jul 2008 08:17:52 +0000 (08:17 +0000)
This patch splits the single threading.Condition object used in the
worker pool for synchronization into three.

- worker_to_pool: Notified if a worker wants to notify the pool
- pool_to_worker: Notified if the pool wants to notify a single
  or all workers
- pool_to_pool: Used for synchronization in Quiesce

Reviewed-by: ultrotter

lib/workerpool.py

index 0075bd7..b7c4d0a 100644 (file)
@@ -95,7 +95,7 @@ class BaseWorker(threading.Thread, object):
             logging.debug("Worker %s: waiting for tasks", self.worker_id)
 
             # wait() releases the lock and sleeps until notified
-            pool._lock.wait()
+            pool._pool_to_worker.wait()
 
             logging.debug("Worker %s: notified while waiting", self.worker_id)
 
@@ -111,7 +111,7 @@ class BaseWorker(threading.Thread, object):
           try:
             self._current_task = pool._tasks.popleft()
           finally:
-            pool._lock.notifyAll()
+            pool._worker_to_pool.notifyAll()
         finally:
           pool._lock.release()
 
@@ -131,7 +131,7 @@ class BaseWorker(threading.Thread, object):
         try:
           if self._current_task:
             self._current_task = None
-            pool._lock.notifyAll()
+            pool._worker_to_pool.notifyAll()
         finally:
           pool._lock.release()
 
@@ -165,7 +165,10 @@ class WorkerPool(object):
 
     """
     # Some of these variables are accessed by BaseWorker
-    self._lock = threading.Condition(threading.Lock())
+    self._lock = threading.Lock()
+    self._pool_to_pool = threading.Condition(self._lock)
+    self._pool_to_worker = threading.Condition(self._lock)
+    self._worker_to_pool = threading.Condition(self._lock)
     self._worker_class = worker_class
     self._last_worker_id = 0
     self._workers = []
@@ -193,11 +196,13 @@ class WorkerPool(object):
     try:
       # Don't add new tasks while we're quiescing
       while self._quiescing:
-        self._lock.wait()
+        self._pool_to_pool.wait()
 
       # Add task to internal queue
       self._tasks.append(args)
-      self._lock.notify()
+
+      # Wake one idling worker up
+      self._pool_to_worker.notify()
     finally:
       self._lock.release()
 
@@ -236,13 +241,13 @@ class WorkerPool(object):
 
       # Wait while there are tasks pending or running
       while self._tasks or self._HasRunningTasksUnlocked():
-        self._lock.wait()
+        self._worker_to_pool.wait()
 
     finally:
       self._quiescing = False
 
       # Make sure AddTasks continues in case it was waiting
-      self._lock.notifyAll()
+      self._pool_to_pool.notifyAll()
 
       self._lock.release()
 
@@ -277,7 +282,7 @@ class WorkerPool(object):
       self._termworkers += termworkers
 
       # Notify workers that something has changed
-      self._lock.notifyAll()
+      self._pool_to_worker.notifyAll()
 
       # Join all terminating workers
       self._lock.release()