Merge branch 'stable-2.6'
[ganeti-local] / lib / workerpool.py
index 1838d97..8db03c7 100644 (file)
@@ -59,7 +59,7 @@ class BaseWorker(threading.Thread, object):
   Users of a worker pool must override RunTask in a subclass.
 
   """
-  # pylint: disable-msg=W0212
+  # pylint: disable=W0212
   def __init__(self, pool, worker_id):
     """Constructor for BaseWorker thread.
 
@@ -69,8 +69,11 @@ class BaseWorker(threading.Thread, object):
     """
     super(BaseWorker, self).__init__(name=worker_id)
     self.pool = pool
+    self._worker_id = worker_id
     self._current_task = None
 
+    assert self.getName() == worker_id
+
   def ShouldTerminate(self):
     """Returns whether this worker should terminate.
 
@@ -100,6 +103,23 @@ class BaseWorker(threading.Thread, object):
     finally:
       self.pool._lock.release()
 
+  def SetTaskName(self, taskname):
+    """Sets the name of the current task.
+
+    Should only be called from within L{RunTask}.
+
+    @type taskname: string
+    @param taskname: Task's name
+
+    """
+    if taskname:
+      name = "%s/%s" % (self._worker_id, taskname)
+    else:
+      name = self._worker_id
+
+    # Set thread name
+    self.setName(name)
+
   def _HasRunningTaskUnlocked(self):
     """Returns whether this worker is currently running a task.
 
@@ -147,7 +167,11 @@ class BaseWorker(threading.Thread, object):
           # Run the actual task
           assert defer is None
           logging.debug("Starting task %r, priority %s", args, priority)
-          self.RunTask(*args) # pylint: disable-msg=W0142
+          assert self.getName() == self._worker_id
+          try:
+            self.RunTask(*args) # pylint: disable=W0142
+          finally:
+            self.SetTaskName(None)
           logging.debug("Done with task %r, priority %s", args, priority)
         except DeferTask, err:
           defer = err
@@ -156,11 +180,11 @@ class BaseWorker(threading.Thread, object):
             # Use same priority
             defer.priority = priority
 
-          logging.debug("Deferring task %r, new priority %s", defer.priority)
+          logging.debug("Deferring task %r, new priority %s",
+                        args, defer.priority)
 
           assert self._HasRunningTaskUnlocked()
-
-        except: # pylint: disable-msg=W0702
+        except: # pylint: disable=W0702
           logging.exception("Caught unhandled exception")
 
         assert self._HasRunningTaskUnlocked()
@@ -222,6 +246,7 @@ class WorkerPool(object):
     self._last_worker_id = 0
     self._workers = []
     self._quiescing = False
+    self._active = True
 
     # Terminating workers
     self._termworkers = []
@@ -316,6 +341,28 @@ class WorkerPool(object):
     finally:
       self._lock.release()
 
+  def SetActive(self, active):
+    """Enable/disable processing of tasks.
+
+    This is different from L{Quiesce} in the sense that this function just
+    changes an internal flag and doesn't wait for the queue to be empty. Tasks
+    already being processed continue normally, but no new tasks will be
+    started. New tasks can still be added.
+
+    @type active: bool
+    @param active: Whether tasks should be processed
+
+    """
+    self._lock.acquire()
+    try:
+      self._active = active
+
+      if active:
+        # Tell all workers to continue processing
+        self._pool_to_worker.notifyAll()
+    finally:
+      self._lock.release()
+
   def _WaitForTaskUnlocked(self, worker):
     """Waits for a task for a worker.
 
@@ -327,21 +374,22 @@ class WorkerPool(object):
       return _TERMINATE
 
     # We only wait if there's no task for us.
-    if not self._tasks:
+    if not (self._active and self._tasks):
       logging.debug("Waiting for tasks")
 
-      # wait() releases the lock and sleeps until notified
-      self._pool_to_worker.wait()
+      while True:
+        # wait() releases the lock and sleeps until notified
+        self._pool_to_worker.wait()
 
-      logging.debug("Notified while waiting")
+        logging.debug("Notified while waiting")
 
-      # Were we woken up in order to terminate?
-      if self._ShouldWorkerTerminateUnlocked(worker):
-        return _TERMINATE
+        # Were we woken up in order to terminate?
+        if self._ShouldWorkerTerminateUnlocked(worker):
+          return _TERMINATE
 
-      if not self._tasks:
-        # Spurious notification, ignore
-        return None
+        # Just loop if pool is not processing tasks at this time
+        if self._active and self._tasks:
+          break
 
     # Get task from queue and tell pool about it
     try:
@@ -360,10 +408,20 @@ class WorkerPool(object):
 
     """
     for worker in self._workers + self._termworkers:
-      if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212
+      if worker._HasRunningTaskUnlocked(): # pylint: disable=W0212
         return True
     return False
 
+  def HasRunningTasks(self):
+    """Checks whether there's at least one task running.
+
+    """
+    self._lock.acquire()
+    try:
+      return self._HasRunningTasksUnlocked()
+    finally:
+      self._lock.release()
+
   def Quiesce(self):
     """Waits until the task queue is empty.