Users of a worker pool must override RunTask in a subclass.
"""
- # pylint: disable-msg=W0212
+ # pylint: disable=W0212
def __init__(self, pool, worker_id):
"""Constructor for BaseWorker thread.
"""
super(BaseWorker, self).__init__(name=worker_id)
self.pool = pool
+ self._worker_id = worker_id
self._current_task = None
+ assert self.getName() == worker_id
+
def ShouldTerminate(self):
"""Returns whether this worker should terminate.
finally:
self.pool._lock.release()
+ def SetTaskName(self, taskname):
+ """Sets the name of the current task.
+
+ Should only be called from within L{RunTask}.
+
+ @type taskname: string
+ @param taskname: Task's name
+
+ """
+ if taskname:
+ name = "%s/%s" % (self._worker_id, taskname)
+ else:
+ name = self._worker_id
+
+ # Set thread name
+ self.setName(name)
+
def _HasRunningTaskUnlocked(self):
"""Returns whether this worker is currently running a task.
# Run the actual task
assert defer is None
logging.debug("Starting task %r, priority %s", args, priority)
- self.RunTask(*args) # pylint: disable-msg=W0142
+ assert self.getName() == self._worker_id
+ try:
+ self.RunTask(*args) # pylint: disable=W0142
+ finally:
+ self.SetTaskName(None)
logging.debug("Done with task %r, priority %s", args, priority)
except DeferTask, err:
defer = err
# Use same priority
defer.priority = priority
- logging.debug("Deferring task %r, new priority %s", defer.priority)
+ logging.debug("Deferring task %r, new priority %s",
+ args, defer.priority)
assert self._HasRunningTaskUnlocked()
-
- except: # pylint: disable-msg=W0702
+ except: # pylint: disable=W0702
logging.exception("Caught unhandled exception")
assert self._HasRunningTaskUnlocked()
self._last_worker_id = 0
self._workers = []
self._quiescing = False
+ self._active = True
# Terminating workers
self._termworkers = []
finally:
self._lock.release()
+ def SetActive(self, active):
+ """Enable/disable processing of tasks.
+
+ This is different from L{Quiesce} in the sense that this function just
+ changes an internal flag and doesn't wait for the queue to be empty. Tasks
+ already being processed continue normally, but no new tasks will be
+ started. New tasks can still be added.
+
+ @type active: bool
+ @param active: Whether tasks should be processed
+
+ """
+ self._lock.acquire()
+ try:
+ self._active = active
+
+ if active:
+ # Tell all workers to continue processing
+ self._pool_to_worker.notifyAll()
+ finally:
+ self._lock.release()
+
def _WaitForTaskUnlocked(self, worker):
"""Waits for a task for a worker.
return _TERMINATE
# We only wait if there's no task for us.
- if not self._tasks:
+ if not (self._active and self._tasks):
logging.debug("Waiting for tasks")
- # wait() releases the lock and sleeps until notified
- self._pool_to_worker.wait()
+ while True:
+ # wait() releases the lock and sleeps until notified
+ self._pool_to_worker.wait()
- logging.debug("Notified while waiting")
+ logging.debug("Notified while waiting")
- # Were we woken up in order to terminate?
- if self._ShouldWorkerTerminateUnlocked(worker):
- return _TERMINATE
+ # Were we woken up in order to terminate?
+ if self._ShouldWorkerTerminateUnlocked(worker):
+ return _TERMINATE
- if not self._tasks:
- # Spurious notification, ignore
- return None
+ # Just loop if pool is not processing tasks at this time
+ if self._active and self._tasks:
+ break
# Get task from queue and tell pool about it
try:
"""
for worker in self._workers + self._termworkers:
- if worker._HasRunningTaskUnlocked(): # pylint: disable-msg=W0212
+ if worker._HasRunningTaskUnlocked(): # pylint: disable=W0212
return True
return False
+ def HasRunningTasks(self):
+ """Checks whether there's at least one task running.
+
+ """
+ self._lock.acquire()
+ try:
+ return self._HasRunningTasksUnlocked()
+ finally:
+ self._lock.release()
+
def Quiesce(self):
"""Waits until the task queue is empty.