Revision c69c45a7 lib/workerpool.py

b/lib/workerpool.py
370 370
    @param worker: Worker thread
371 371

  
372 372
    """
373
    if self._ShouldWorkerTerminateUnlocked(worker):
374
      return _TERMINATE
375

  
376
    # We only wait if there's no task for us.
377
    if not (self._active and self._tasks):
378
      logging.debug("Waiting for tasks")
373
    while True:
374
      if self._ShouldWorkerTerminateUnlocked(worker):
375
        return _TERMINATE
379 376

  
380
      while True:
381
        # wait() releases the lock and sleeps until notified
382
        self._pool_to_worker.wait()
377
      # If there's a pending task, return it immediately
378
      if self._active and self._tasks:
379
        # Get task from queue and tell pool about it
380
        try:
381
          task = heapq.heappop(self._tasks)
382
        finally:
383
          self._worker_to_pool.notifyAll()
383 384

  
384
        logging.debug("Notified while waiting")
385
        return task
385 386

  
386
        # Were we woken up in order to terminate?
387
        if self._ShouldWorkerTerminateUnlocked(worker):
388
          return _TERMINATE
387
      logging.debug("Waiting for tasks")
389 388

  
390
        # Just loop if pool is not processing tasks at this time
391
        if self._active and self._tasks:
392
          break
389
      # wait() releases the lock and sleeps until notified
390
      self._pool_to_worker.wait()
393 391

  
394
    # Get task from queue and tell pool about it
395
    try:
396
      return heapq.heappop(self._tasks)
397
    finally:
398
      self._worker_to_pool.notifyAll()
392
      logging.debug("Notified while waiting")
399 393

  
400 394
  def _ShouldWorkerTerminateUnlocked(self, worker):
401 395
    """Returns whether a worker should terminate.

Also available in: Unified diff