workerpool: Don't notify if there was no task
[ganeti-local] / lib / workerpool.py
index 63d85cc..0075bd7 100644 (file)
@@ -48,8 +48,6 @@ class BaseWorker(threading.Thread, object):
     super(BaseWorker, self).__init__()
     self.pool = pool
     self.worker_id = worker_id
-
-    # Also used by WorkerPool
     self._current_task = None
 
   def ShouldTerminate(self):
@@ -58,6 +56,22 @@ class BaseWorker(threading.Thread, object):
     """
     return self.pool.ShouldWorkerTerminate(self)
 
+  def _HasRunningTaskUnlocked(self):
+    """Returns whether this worker is currently running a task.
+
+    """
+    return (self._current_task is not None)
+
+  def HasRunningTask(self):
+    """Returns whether this worker is currently running a task.
+
+    """
+    self.pool._lock.acquire()
+    try:
+      return self._HasRunningTaskUnlocked()
+    finally:
+      self.pool._lock.release()
+
   def run(self):
     """Main thread function.
 
@@ -66,7 +80,7 @@ class BaseWorker(threading.Thread, object):
     """
     pool = self.pool
 
-    assert self._current_task is None
+    assert not self.HasRunningTask()
 
     while True:
       try:
@@ -78,9 +92,13 @@ class BaseWorker(threading.Thread, object):
 
           # We only wait if there's no task for us.
           if not pool._tasks:
+            logging.debug("Worker %s: waiting for tasks", self.worker_id)
+
             # wait() releases the lock and sleeps until notified
             pool._lock.wait()
 
+            logging.debug("Worker %s: notified while waiting", self.worker_id)
+
             # Were we woken up in order to terminate?
             if pool._ShouldWorkerTerminateUnlocked(self):
               break
@@ -99,20 +117,26 @@ class BaseWorker(threading.Thread, object):
 
         # Run the actual task
         try:
+          logging.debug("Worker %s: starting task %r",
+                        self.worker_id, self._current_task)
           self.RunTask(*self._current_task)
+          logging.debug("Worker %s: done with task %r",
+                        self.worker_id, self._current_task)
         except:
           logging.error("Worker %s: Caught unhandled exception",
                         self.worker_id, exc_info=True)
       finally:
-        self._current_task = None
-
         # Notify pool
         pool._lock.acquire()
         try:
-          pool._lock.notifyAll()
+          if self._current_task:
+            self._current_task = None
+            pool._lock.notifyAll()
         finally:
           pool._lock.release()
 
+    logging.debug("Worker %s: terminates", self.worker_id)
+
   def RunTask(self, *args):
     """Function called to start a task.
 
@@ -198,7 +222,7 @@ class WorkerPool(object):
 
     """
     for worker in self._workers + self._termworkers:
-      if worker._current_task is not None:
+      if worker._HasRunningTaskUnlocked():
         return True
     return False