Add KVM hypervisor code
[ganeti-local] / lib / workerpool.py
index 63d85cc..b895b2c 100644 (file)
@@ -27,9 +27,6 @@ import collections
 import logging
 import threading
 
-from ganeti import errors
-from ganeti import utils
-
 
 class BaseWorker(threading.Thread, object):
   """Base worker class for worker pools.
@@ -48,8 +45,6 @@ class BaseWorker(threading.Thread, object):
     super(BaseWorker, self).__init__()
     self.pool = pool
     self.worker_id = worker_id
-
-    # Also used by WorkerPool
     self._current_task = None
 
   def ShouldTerminate(self):
@@ -58,6 +53,22 @@ class BaseWorker(threading.Thread, object):
     """
     return self.pool.ShouldWorkerTerminate(self)
 
+  def _HasRunningTaskUnlocked(self):
+    """Returns whether this worker is currently running a task.
+
+    """
+    return (self._current_task is not None)
+
+  def HasRunningTask(self):
+    """Returns whether this worker is currently running a task.
+
+    """
+    self.pool._lock.acquire()
+    try:
+      return self._HasRunningTaskUnlocked()
+    finally:
+      self.pool._lock.release()
+
   def run(self):
     """Main thread function.
 
@@ -66,7 +77,7 @@ class BaseWorker(threading.Thread, object):
     """
     pool = self.pool
 
-    assert self._current_task is None
+    assert not self.HasRunningTask()
 
     while True:
       try:
@@ -78,8 +89,12 @@ class BaseWorker(threading.Thread, object):
 
           # We only wait if there's no task for us.
           if not pool._tasks:
+            logging.debug("Worker %s: waiting for tasks", self.worker_id)
+
             # wait() releases the lock and sleeps until notified
-            pool._lock.wait()
+            pool._pool_to_worker.wait()
+
+            logging.debug("Worker %s: notified while waiting", self.worker_id)
 
             # Were we woken up in order to terminate?
             if pool._ShouldWorkerTerminateUnlocked(self):
@@ -93,26 +108,32 @@ class BaseWorker(threading.Thread, object):
           try:
             self._current_task = pool._tasks.popleft()
           finally:
-            pool._lock.notifyAll()
+            pool._worker_to_pool.notifyAll()
         finally:
           pool._lock.release()
 
         # Run the actual task
         try:
+          logging.debug("Worker %s: starting task %r",
+                        self.worker_id, self._current_task)
           self.RunTask(*self._current_task)
+          logging.debug("Worker %s: done with task %r",
+                        self.worker_id, self._current_task)
         except:
           logging.error("Worker %s: Caught unhandled exception",
                         self.worker_id, exc_info=True)
       finally:
-        self._current_task = None
-
         # Notify pool
         pool._lock.acquire()
         try:
-          pool._lock.notifyAll()
+          if self._current_task:
+            self._current_task = None
+            pool._worker_to_pool.notifyAll()
         finally:
           pool._lock.release()
 
+    logging.debug("Worker %s: terminates", self.worker_id)
+
   def RunTask(self, *args):
     """Function called to start a task.
 
@@ -141,7 +162,10 @@ class WorkerPool(object):
 
     """
     # Some of these variables are accessed by BaseWorker
-    self._lock = threading.Condition(threading.Lock())
+    self._lock = threading.Lock()
+    self._pool_to_pool = threading.Condition(self._lock)
+    self._pool_to_worker = threading.Condition(self._lock)
+    self._worker_to_pool = threading.Condition(self._lock)
     self._worker_class = worker_class
     self._last_worker_id = 0
     self._workers = []
@@ -169,11 +193,13 @@ class WorkerPool(object):
     try:
       # Don't add new tasks while we're quiescing
       while self._quiescing:
-        self._lock.wait()
+        self._pool_to_pool.wait()
 
       # Add task to internal queue
       self._tasks.append(args)
-      self._lock.notify()
+
+      # Wake one idling worker up
+      self._pool_to_worker.notify()
     finally:
       self._lock.release()
 
@@ -198,7 +224,7 @@ class WorkerPool(object):
 
     """
     for worker in self._workers + self._termworkers:
-      if worker._current_task is not None:
+      if worker._HasRunningTaskUnlocked():
         return True
     return False
 
@@ -212,13 +238,13 @@ class WorkerPool(object):
 
       # Wait while there are tasks pending or running
       while self._tasks or self._HasRunningTasksUnlocked():
-        self._lock.wait()
+        self._worker_to_pool.wait()
 
     finally:
       self._quiescing = False
 
       # Make sure AddTasks continues in case it was waiting
-      self._lock.notifyAll()
+      self._pool_to_pool.notifyAll()
 
       self._lock.release()
 
@@ -253,12 +279,13 @@ class WorkerPool(object):
       self._termworkers += termworkers
 
       # Notify workers that something has changed
-      self._lock.notifyAll()
+      self._pool_to_worker.notifyAll()
 
       # Join all terminating workers
       self._lock.release()
       try:
         for worker in termworkers:
+          logging.debug("Waiting for thread %s", worker.getName())
           worker.join()
       finally:
         self._lock.acquire()