LUClusterSetParams: When ipolicy is updated warn for new violations
[ganeti-local] / lib / workerpool.py
index 4736be5..8db03c7 100644 (file)
@@ -246,6 +246,7 @@ class WorkerPool(object):
     self._last_worker_id = 0
     self._workers = []
     self._quiescing = False
+    self._active = True
 
     # Terminating workers
     self._termworkers = []
@@ -340,6 +341,28 @@ class WorkerPool(object):
     finally:
       self._lock.release()
 
+  def SetActive(self, active):
+    """Enable/disable processing of tasks.
+
+    This is different from L{Quiesce} in the sense that this function just
+    changes an internal flag and doesn't wait for the queue to be empty. Tasks
+    already being processed continue normally, but no new tasks will be
+    started. New tasks can still be added.
+
+    @type active: bool
+    @param active: Whether tasks should be processed
+
+    """
+    self._lock.acquire()
+    try:
+      self._active = active
+
+      if active:
+        # Tell all workers to continue processing
+        self._pool_to_worker.notifyAll()
+    finally:
+      self._lock.release()
+
   def _WaitForTaskUnlocked(self, worker):
     """Waits for a task for a worker.
 
@@ -351,21 +374,22 @@ class WorkerPool(object):
       return _TERMINATE
 
     # We only wait if there's no task for us.
-    if not self._tasks:
+    if not (self._active and self._tasks):
       logging.debug("Waiting for tasks")
 
-      # wait() releases the lock and sleeps until notified
-      self._pool_to_worker.wait()
+      while True:
+        # wait() releases the lock and sleeps until notified
+        self._pool_to_worker.wait()
 
-      logging.debug("Notified while waiting")
+        logging.debug("Notified while waiting")
 
-      # Were we woken up in order to terminate?
-      if self._ShouldWorkerTerminateUnlocked(worker):
-        return _TERMINATE
+        # Were we woken up in order to terminate?
+        if self._ShouldWorkerTerminateUnlocked(worker):
+          return _TERMINATE
 
-      if not self._tasks:
-        # Spurious notification, ignore
-        return None
+        # Just loop if pool is not processing tasks at this time
+        if self._active and self._tasks:
+          break
 
     # Get task from queue and tell pool about it
     try:
@@ -388,6 +412,16 @@ class WorkerPool(object):
         return True
     return False
 
+  def HasRunningTasks(self):
+    """Checks whether there's at least one task running.
+
+    """
+    self._lock.acquire()
+    try:
+      return self._HasRunningTasksUnlocked()
+    finally:
+      self._lock.release()
+
   def Quiesce(self):
     """Waits until the task queue is empty.