Revision 48dabc6a lib/locking.py

b/lib/locking.py
223 223
    self._Cleanup()
224 224

  
225 225

  
226
class _PipeCondition(object):
227
  """Group-only non-polling condition with counters.
228

  
229
  This condition class uses pipes and poll, internally, to be able to wait for
230
  notification with a timeout, without resorting to polling. It is almost
231
  compatible with Python's threading.Condition, but only supports notifyAll and
232
  non-recursive locks. As an additional features it's able to report whether
233
  there are any waiting threads.
234

  
235
  """
236
  __slots__ = [
237
    "_lock",
238
    "_nwaiters",
239
    "_pipe",
240
    "acquire",
241
    "release",
242
    ]
243

  
244
  _pipe_class = _SingleActionPipeCondition
245

  
246
  def __init__(self, lock):
247
    """Initializes this class.
248

  
249
    """
250
    object.__init__(self)
251

  
252
    # Recursive locks are not supported
253
    assert not hasattr(lock, "_acquire_restore")
254
    assert not hasattr(lock, "_release_save")
255

  
256
    self._lock = lock
257

  
258
    # Export the lock's acquire() and release() methods
259
    self.acquire = lock.acquire
260
    self.release = lock.release
261

  
262
    self._nwaiters = 0
263
    self._pipe = None
264

  
265
  def _is_owned(self):
266
    """Check whether lock is owned by current thread.
267

  
268
    """
269
    if self._lock.acquire(0):
270
      self._lock.release()
271
      return False
272

  
273
    return True
274

  
275
  def _check_owned(self):
276
    """Raise an exception if the current thread doesn't own the lock.
277

  
278
    """
279
    if not self._is_owned():
280
      raise RuntimeError("cannot work with un-aquired lock")
281

  
282
  def wait(self, timeout=None):
283
    """Wait for a notification.
284

  
285
    @type timeout: float or None
286
    @param timeout: Waiting timeout (can be None)
287

  
288
    """
289
    self._check_owned()
290

  
291
    if not self._pipe:
292
      self._pipe = self._pipe_class()
293

  
294
    # Keep local reference to the pipe. It could be replaced by another thread
295
    # notifying while we're waiting.
296
    pipe = self._pipe
297

  
298
    assert self._nwaiters >= 0
299
    self._nwaiters += 1
300
    try:
301
      # Get function to wait on the pipe
302
      wait_fn = pipe.StartWaiting()
303
      try:
304
        # Release lock while waiting
305
        self.release()
306
        try:
307
          # Wait for notification
308
          wait_fn(timeout)
309
        finally:
310
          # Re-acquire lock
311
          self.acquire()
312
      finally:
313
        # Destroy pipe if this was the last waiter and the current pipe is
314
        # still the same. The same pipe cannot be reused after cleanup.
315
        if pipe.DoneWaiting() and pipe == self._pipe:
316
          self._pipe = None
317
    finally:
318
      assert self._nwaiters > 0
319
      self._nwaiters -= 1
320

  
321
  def notifyAll(self):
322
    """Notify all currently waiting threads.
323

  
324
    """
325
    self._check_owned()
326

  
327
    # Notify and forget pipe. A new one will be created on the next call to
328
    # wait.
329
    if self._pipe is not None:
330
      self._pipe.notifyAll()
331
      self._pipe = None
332

  
333
  def has_waiting(self):
334
    """Returns whether there are active waiters.
335

  
336
    """
337
    self._check_owned()
338

  
339
    return bool(self._nwaiters)
340

  
341

  
226 342
class _CountingCondition(object):
227 343
  """Wrapper for Python's built-in threading.Condition class.
228 344

  

Also available in: Unified diff