Merge branch 'stable-2.6'
[ganeti-local] / lib / workerpool.py
index 91f213b..8db03c7 100644 (file)
@@ -246,6 +246,7 @@ class WorkerPool(object):
     self._last_worker_id = 0
     self._workers = []
     self._quiescing = False
+    self._active = True
 
     # Terminating workers
     self._termworkers = []
@@ -340,6 +341,28 @@ class WorkerPool(object):
     finally:
       self._lock.release()
 
+  def SetActive(self, active):
+    """Enable/disable processing of tasks.
+
+    This is different from L{Quiesce} in the sense that this function just
+    changes an internal flag and doesn't wait for the queue to be empty. Tasks
+    already being processed continue normally, but no new tasks will be
+    started. New tasks can still be added.
+
+    @type active: bool
+    @param active: Whether tasks should be processed
+
+    """
+    self._lock.acquire()
+    try:
+      self._active = active
+
+      if active:
+        # Tell all workers to continue processing
+        self._pool_to_worker.notifyAll()
+    finally:
+      self._lock.release()
+
   def _WaitForTaskUnlocked(self, worker):
     """Waits for a task for a worker.
 
@@ -351,7 +374,7 @@ class WorkerPool(object):
       return _TERMINATE
 
     # We only wait if there's no task for us.
-    if not self._tasks:
+    if not (self._active and self._tasks):
       logging.debug("Waiting for tasks")
 
       while True:
@@ -364,7 +387,8 @@ class WorkerPool(object):
         if self._ShouldWorkerTerminateUnlocked(worker):
           return _TERMINATE
 
-        if self._tasks:
+        # Just loop if pool is not processing tasks at this time
+        if self._active and self._tasks:
           break
 
     # Get task from queue and tell pool about it
@@ -388,6 +412,16 @@ class WorkerPool(object):
         return True
     return False
 
+  def HasRunningTasks(self):
+    """Checks whether there's at least one task running.
+
+    """
+    self._lock.acquire()
+    try:
+      return self._HasRunningTasksUnlocked()
+    finally:
+      self._lock.release()
+
   def Quiesce(self):
     """Waits until the task queue is empty.