workerpool: Don't notify if there was no task
authorMichael Hanselmann <hansmi@google.com>
Tue, 8 Jul 2008 15:03:50 +0000 (15:03 +0000)
committerMichael Hanselmann <hansmi@google.com>
Tue, 8 Jul 2008 15:03:50 +0000 (15:03 +0000)
Workers have to notify their pool if they finished a task to make
the WorkerPool.Quiesce function work. This is done in the finally:
clause to notify even in case of an exception. However, before
we notified on each run, even if there was no task, thereby creating
some sort of an endless loop of notifications. In a future patch
we should split the single condition object into several to
produce less spurious notifications.

While we're at this, this patch also adds two new functions to
BaseWorker to query whether it's currently running a task and then
uses one of these functions in the WorkerPool instead of querying
the internal variable directly.

Reviewed-by: iustinp

lib/workerpool.py

index 63d85cc..0075bd7 100644 (file)
@@ -48,8 +48,6 @@ class BaseWorker(threading.Thread, object):
     super(BaseWorker, self).__init__()
     self.pool = pool
     self.worker_id = worker_id
-
-    # Also used by WorkerPool
     self._current_task = None
 
   def ShouldTerminate(self):
@@ -58,6 +56,22 @@ class BaseWorker(threading.Thread, object):
     """
     return self.pool.ShouldWorkerTerminate(self)
 
+  def _HasRunningTaskUnlocked(self):
+    """Returns whether this worker is currently running a task.
+
+    """
+    return (self._current_task is not None)
+
+  def HasRunningTask(self):
+    """Returns whether this worker is currently running a task.
+
+    """
+    self.pool._lock.acquire()
+    try:
+      return self._HasRunningTaskUnlocked()
+    finally:
+      self.pool._lock.release()
+
   def run(self):
     """Main thread function.
 
@@ -66,7 +80,7 @@ class BaseWorker(threading.Thread, object):
     """
     pool = self.pool
 
-    assert self._current_task is None
+    assert not self.HasRunningTask()
 
     while True:
       try:
@@ -78,9 +92,13 @@ class BaseWorker(threading.Thread, object):
 
           # We only wait if there's no task for us.
           if not pool._tasks:
+            logging.debug("Worker %s: waiting for tasks", self.worker_id)
+
             # wait() releases the lock and sleeps until notified
             pool._lock.wait()
 
+            logging.debug("Worker %s: notified while waiting", self.worker_id)
+
             # Were we woken up in order to terminate?
             if pool._ShouldWorkerTerminateUnlocked(self):
               break
@@ -99,20 +117,26 @@ class BaseWorker(threading.Thread, object):
 
         # Run the actual task
         try:
+          logging.debug("Worker %s: starting task %r",
+                        self.worker_id, self._current_task)
           self.RunTask(*self._current_task)
+          logging.debug("Worker %s: done with task %r",
+                        self.worker_id, self._current_task)
         except:
           logging.error("Worker %s: Caught unhandled exception",
                         self.worker_id, exc_info=True)
       finally:
-        self._current_task = None
-
         # Notify pool
         pool._lock.acquire()
         try:
-          pool._lock.notifyAll()
+          if self._current_task:
+            self._current_task = None
+            pool._lock.notifyAll()
         finally:
           pool._lock.release()
 
+    logging.debug("Worker %s: terminates", self.worker_id)
+
   def RunTask(self, *args):
     """Function called to start a task.
 
@@ -198,7 +222,7 @@ class WorkerPool(object):
 
     """
     for worker in self._workers + self._termworkers:
-      if worker._current_task is not None:
+      if worker._HasRunningTaskUnlocked():
         return True
     return False