super(BaseWorker, self).__init__()
self.pool = pool
self.worker_id = worker_id
-
- # Also used by WorkerPool
self._current_task = None
def ShouldTerminate(self):
"""
return self.pool.ShouldWorkerTerminate(self)
+ def _HasRunningTaskUnlocked(self):
+ """Returns whether this worker is currently running a task.
+
+ """
+ return (self._current_task is not None)
+
+ def HasRunningTask(self):
+ """Returns whether this worker is currently running a task.
+
+ """
+ self.pool._lock.acquire()
+ try:
+ return self._HasRunningTaskUnlocked()
+ finally:
+ self.pool._lock.release()
+
def run(self):
"""Main thread function.
"""
pool = self.pool
- assert self._current_task is None
+ assert not self.HasRunningTask()
while True:
try:
# We only wait if there's no task for us.
if not pool._tasks:
+ logging.debug("Worker %s: waiting for tasks", self.worker_id)
+
# wait() releases the lock and sleeps until notified
pool._lock.wait()
+ logging.debug("Worker %s: notified while waiting", self.worker_id)
+
# Were we woken up in order to terminate?
if pool._ShouldWorkerTerminateUnlocked(self):
break
# Run the actual task
try:
+ logging.debug("Worker %s: starting task %r",
+ self.worker_id, self._current_task)
self.RunTask(*self._current_task)
+ logging.debug("Worker %s: done with task %r",
+ self.worker_id, self._current_task)
except:
logging.error("Worker %s: Caught unhandled exception",
self.worker_id, exc_info=True)
finally:
- self._current_task = None
-
# Notify pool
pool._lock.acquire()
try:
- pool._lock.notifyAll()
+ if self._current_task:
+ self._current_task = None
+ pool._lock.notifyAll()
finally:
pool._lock.release()
+ logging.debug("Worker %s: terminates", self.worker_id)
+
def RunTask(self, *args):
"""Function called to start a task.
"""
for worker in self._workers + self._termworkers:
- if worker._current_task is not None:
+ if worker._HasRunningTaskUnlocked():
return True
return False