The function in is simplified in its structure and duplicated checks
have been merged.
Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: Bernardo Dal Seno <bdalseno@google.com>
@param worker: Worker thread
"""
@param worker: Worker thread
"""
- if self._ShouldWorkerTerminateUnlocked(worker):
- return _TERMINATE
-
- # We only wait if there's no task for us.
- if not (self._active and self._tasks):
- logging.debug("Waiting for tasks")
+ while True:
+ if self._ShouldWorkerTerminateUnlocked(worker):
+ return _TERMINATE
- while True:
- # wait() releases the lock and sleeps until notified
- self._pool_to_worker.wait()
+ # If there's a pending task, return it immediately
+ if self._active and self._tasks:
+ # Get task from queue and tell pool about it
+ try:
+ task = heapq.heappop(self._tasks)
+ finally:
+ self._worker_to_pool.notifyAll()
- logging.debug("Notified while waiting")
- # Were we woken up in order to terminate?
- if self._ShouldWorkerTerminateUnlocked(worker):
- return _TERMINATE
+ logging.debug("Waiting for tasks")
- # Just loop if pool is not processing tasks at this time
- if self._active and self._tasks:
- break
+ # wait() releases the lock and sleeps until notified
+ self._pool_to_worker.wait()
- # Get task from queue and tell pool about it
- try:
- return heapq.heappop(self._tasks)
- finally:
- self._worker_to_pool.notifyAll()
+ logging.debug("Notified while waiting")
def _ShouldWorkerTerminateUnlocked(self, worker):
"""Returns whether a worker should terminate.
def _ShouldWorkerTerminateUnlocked(self, worker):
"""Returns whether a worker should terminate.