Revision 21c5ad52

b/lib/workerpool.py
30 30
from ganeti import compat
31 31

  
32 32

  
33
_TERMINATE = object()
34

  
35

  
33 36
class BaseWorker(threading.Thread, object):
34 37
  """Base worker class for worker pools.
35 38

  
......
82 85

  
83 86
    while True:
84 87
      try:
85
        # We wait on lock to be told either terminate or do a task.
88
        # Wait on lock to be told either to terminate or to do a task
86 89
        pool._lock.acquire()
87 90
        try:
88
          if pool._ShouldWorkerTerminateUnlocked(self):
89
            break
90

  
91
          # We only wait if there's no task for us.
92
          if not pool._tasks:
93
            logging.debug("Waiting for tasks")
94

  
95
            # wait() releases the lock and sleeps until notified
96
            pool._pool_to_worker.wait()
91
          task = pool._WaitForTaskUnlocked(self)
97 92

  
98
            logging.debug("Notified while waiting")
93
          if task is _TERMINATE:
94
            # Told to terminate
95
            break
99 96

  
100
            # Were we woken up in order to terminate?
101
            if pool._ShouldWorkerTerminateUnlocked(self):
102
              break
97
          if task is None:
98
            # Spurious notification, ignore
99
            continue
103 100

  
104
            if not pool._tasks:
105
              # Spurious notification, ignore
106
              continue
101
          self._current_task = task
107 102

  
108
          # Get task from queue and tell pool about it
109
          try:
110
            self._current_task = pool._tasks.popleft()
111
          finally:
112
            pool._worker_to_pool.notifyAll()
103
          assert self._HasRunningTaskUnlocked()
113 104
        finally:
114 105
          pool._lock.release()
115 106

  
......
229 220
    finally:
230 221
      self._lock.release()
231 222

  
223
  def _WaitForTaskUnlocked(self, worker):
224
    """Waits for a task for a worker.
225

  
226
    @type worker: L{BaseWorker}
227
    @param worker: Worker thread
228

  
229
    """
230
    if self._ShouldWorkerTerminateUnlocked(worker):
231
      return _TERMINATE
232

  
233
    # We only wait if there's no task for us.
234
    if not self._tasks:
235
      logging.debug("Waiting for tasks")
236

  
237
      # wait() releases the lock and sleeps until notified
238
      self._pool_to_worker.wait()
239

  
240
      logging.debug("Notified while waiting")
241

  
242
      # Were we woken up in order to terminate?
243
      if self._ShouldWorkerTerminateUnlocked(worker):
244
        return _TERMINATE
245

  
246
      if not self._tasks:
247
        # Spurious notification, ignore
248
        return None
249

  
250
    # Get task from queue and tell pool about it
251
    try:
252
      return self._tasks.popleft()
253
    finally:
254
      self._worker_to_pool.notifyAll()
255

  
232 256
  def _ShouldWorkerTerminateUnlocked(self, worker):
233 257
    """Returns whether a worker should terminate.
234 258

  

Also available in: Unified diff