Revision 53b1d12b lib/workerpool.py

b/lib/workerpool.py
95 95
            logging.debug("Worker %s: waiting for tasks", self.worker_id)
96 96

  
97 97
            # wait() releases the lock and sleeps until notified
98
            pool._lock.wait()
98
            pool._pool_to_worker.wait()
99 99

  
100 100
            logging.debug("Worker %s: notified while waiting", self.worker_id)
101 101

  
......
111 111
          try:
112 112
            self._current_task = pool._tasks.popleft()
113 113
          finally:
114
            pool._lock.notifyAll()
114
            pool._worker_to_pool.notifyAll()
115 115
        finally:
116 116
          pool._lock.release()
117 117

  
......
131 131
        try:
132 132
          if self._current_task:
133 133
            self._current_task = None
134
            pool._lock.notifyAll()
134
            pool._worker_to_pool.notifyAll()
135 135
        finally:
136 136
          pool._lock.release()
137 137

  
......
165 165

  
166 166
    """
167 167
    # Some of these variables are accessed by BaseWorker
168
    self._lock = threading.Condition(threading.Lock())
168
    self._lock = threading.Lock()
169
    self._pool_to_pool = threading.Condition(self._lock)
170
    self._pool_to_worker = threading.Condition(self._lock)
171
    self._worker_to_pool = threading.Condition(self._lock)
169 172
    self._worker_class = worker_class
170 173
    self._last_worker_id = 0
171 174
    self._workers = []
......
193 196
    try:
194 197
      # Don't add new tasks while we're quiescing
195 198
      while self._quiescing:
196
        self._lock.wait()
199
        self._pool_to_pool.wait()
197 200

  
198 201
      # Add task to internal queue
199 202
      self._tasks.append(args)
200
      self._lock.notify()
203

  
204
      # Wake one idling worker up
205
      self._pool_to_worker.notify()
201 206
    finally:
202 207
      self._lock.release()
203 208

  
......
236 241

  
237 242
      # Wait while there are tasks pending or running
238 243
      while self._tasks or self._HasRunningTasksUnlocked():
239
        self._lock.wait()
244
        self._worker_to_pool.wait()
240 245

  
241 246
    finally:
242 247
      self._quiescing = False
243 248

  
244 249
      # Make sure AddTasks continues in case it was waiting
245
      self._lock.notifyAll()
250
      self._pool_to_pool.notifyAll()
246 251

  
247 252
      self._lock.release()
248 253

  
......
277 282
      self._termworkers += termworkers
278 283

  
279 284
      # Notify workers that something has changed
280
      self._lock.notifyAll()
285
      self._pool_to_worker.notifyAll()
281 286

  
282 287
      # Join all terminating workers
283 288
      self._lock.release()

Also available in: Unified diff