44 |
44 |
"""
|
45 |
45 |
super(BaseWorker, self).__init__(name=worker_id)
|
46 |
46 |
self.pool = pool
|
47 |
|
self.worker_id = worker_id
|
48 |
47 |
self._current_task = None
|
49 |
48 |
|
50 |
49 |
def ShouldTerminate(self):
|
... | ... | |
89 |
88 |
|
90 |
89 |
# We only wait if there's no task for us.
|
91 |
90 |
if not pool._tasks:
|
92 |
|
logging.debug("Worker %s: waiting for tasks", self.worker_id)
|
|
91 |
logging.debug("Waiting for tasks")
|
93 |
92 |
|
94 |
93 |
# wait() releases the lock and sleeps until notified
|
95 |
94 |
pool._pool_to_worker.wait()
|
96 |
95 |
|
97 |
|
logging.debug("Worker %s: notified while waiting", self.worker_id)
|
|
96 |
logging.debug("Notified while waiting")
|
98 |
97 |
|
99 |
98 |
# Were we woken up in order to terminate?
|
100 |
99 |
if pool._ShouldWorkerTerminateUnlocked(self):
|
... | ... | |
114 |
113 |
|
115 |
114 |
# Run the actual task
|
116 |
115 |
try:
|
117 |
|
logging.debug("Worker %s: starting task %r",
|
118 |
|
self.worker_id, self._current_task)
|
|
116 |
logging.debug("Starting task %r", self._current_task)
|
119 |
117 |
self.RunTask(*self._current_task)
|
120 |
|
logging.debug("Worker %s: done with task %r",
|
121 |
|
self.worker_id, self._current_task)
|
|
118 |
logging.debug("Done with task %r", self._current_task)
|
122 |
119 |
except: # pylint: disable-msg=W0702
|
123 |
|
logging.error("Worker %s: Caught unhandled exception",
|
124 |
|
self.worker_id, exc_info=True)
|
|
120 |
logging.exception("Caught unhandled exception")
|
125 |
121 |
finally:
|
126 |
122 |
# Notify pool
|
127 |
123 |
pool._lock.acquire()
|
... | ... | |
132 |
128 |
finally:
|
133 |
129 |
pool._lock.release()
|
134 |
130 |
|
135 |
|
logging.debug("Worker %s: terminates", self.worker_id)
|
|
131 |
logging.debug("Terminates")
|
136 |
132 |
|
137 |
133 |
def RunTask(self, *args):
|
138 |
134 |
"""Function called to start a task.
|