@param worker_id: identifier for this worker
"""
- super(BaseWorker, self).__init__()
+ super(BaseWorker, self).__init__(name=worker_id)
self.pool = pool
- self.worker_id = worker_id
self._current_task = None
def ShouldTerminate(self):
# We only wait if there's no task for us.
if not pool._tasks:
- logging.debug("Worker %s: waiting for tasks", self.worker_id)
+ logging.debug("Waiting for tasks")
# wait() releases the lock and sleeps until notified
pool._pool_to_worker.wait()
- logging.debug("Worker %s: notified while waiting", self.worker_id)
+ logging.debug("Notified while waiting")
# Were we woken up in order to terminate?
if pool._ShouldWorkerTerminateUnlocked(self):
# Run the actual task
try:
- logging.debug("Worker %s: starting task %r",
- self.worker_id, self._current_task)
+ logging.debug("Starting task %r", self._current_task)
self.RunTask(*self._current_task)
- logging.debug("Worker %s: done with task %r",
- self.worker_id, self._current_task)
+ logging.debug("Done with task %r", self._current_task)
except: # pylint: disable-msg=W0702
- logging.error("Worker %s: Caught unhandled exception",
- self.worker_id, exc_info=True)
+ logging.exception("Caught unhandled exception")
finally:
# Notify pool
pool._lock.acquire()
finally:
pool._lock.release()
- logging.debug("Worker %s: terminates", self.worker_id)
+ logging.debug("Terminates")
def RunTask(self, *args):
"""Function called to start a task.
guaranteed to finish in the same order.
"""
- def __init__(self, num_workers, worker_class):
+ def __init__(self, name, num_workers, worker_class):
"""Constructor for worker pool.
@param num_workers: number of workers to be started
self._pool_to_worker = threading.Condition(self._lock)
self._worker_to_pool = threading.Condition(self._lock)
self._worker_class = worker_class
+ self._name = name
self._last_worker_id = 0
self._workers = []
self._quiescing = False
# TODO: Implement dynamic resizing?
+ def _WaitWhileQuiescingUnlocked(self):
+ """Wait until the worker pool has finished quiescing.
+
+ """
+ while self._quiescing:
+ self._pool_to_pool.wait()
+
def AddTask(self, *args):
"""Adds a task to the queue.
"""
self._lock.acquire()
try:
- # Don't add new tasks while we're quiescing
- while self._quiescing:
- self._pool_to_pool.wait()
+ self._WaitWhileQuiescingUnlocked()
- # Add task to internal queue
self._tasks.append(args)
# Wake one idling worker up
finally:
self._lock.release()
+ def AddManyTasks(self, tasks):
+ """Add a list of tasks to the queue.
+
+ @type tasks: list of tuples
+ @param tasks: list of args passed to L{BaseWorker.RunTask}
+
+ """
+ self._lock.acquire()
+ try:
+ self._WaitWhileQuiescingUnlocked()
+
+ self._tasks.extend(tasks)
+
+ for _ in tasks:
+ self._pool_to_worker.notify()
+ finally:
+ self._lock.release()
+
def _ShouldWorkerTerminateUnlocked(self, worker):
"""Returns whether a worker should terminate.
"""
self._last_worker_id += 1
- return self._last_worker_id
+
+ return "%s%d" % (self._name, self._last_worker_id)
def _ResizeUnlocked(self, num_workers):
"""Changes the number of workers.