+ def _base_release_save(self):
+ self._lock.release()
+
+ def _base_acquire_restore(self, _):
+ self._lock.acquire()
+
+ def _check_owned(self):
+ """Raise an exception if the current thread doesn't own the lock.
+
+ """
+ if not self._is_owned():
+ raise RuntimeError("cannot work with un-aquired lock")
+
+
+class SingleNotifyPipeCondition(_BaseCondition):
+ """Condition which can only be notified once.
+
+ This condition class uses pipes and poll, internally, to be able to wait for
+ notification with a timeout, without resorting to polling. It is almost
+ compatible with Python's threading.Condition, with the following differences:
+ - notifyAll can only be called once, and no wait can happen after that
+ - notify is not supported, only notifyAll
+
+ """
+
+ __slots__ = [
+ "_poller",
+ "_read_fd",
+ "_write_fd",
+ "_nwaiters",
+ "_notified",
+ ]
+
+ _waiter_class = _SingleNotifyPipeConditionWaiter
+
+ def __init__(self, lock):
+ """Constructor for SingleNotifyPipeCondition
+
+ """
+ _BaseCondition.__init__(self, lock)
+ self._nwaiters = 0
+ self._notified = False
+ self._read_fd = None
+ self._write_fd = None
+ self._poller = None
+
+ def _check_unnotified(self):
+ """Throws an exception if already notified.
+
+ """
+ if self._notified:
+ raise RuntimeError("cannot use already notified condition")
+
+ def _Cleanup(self):
+ """Cleanup open file descriptors, if any.
+
+ """
+ if self._read_fd is not None:
+ os.close(self._read_fd)
+ self._read_fd = None
+
+ if self._write_fd is not None:
+ os.close(self._write_fd)
+ self._write_fd = None
+ self._poller = None
+
+ def wait(self, timeout):
+ """Wait for a notification.
+
+ @type timeout: float or None
+ @param timeout: Waiting timeout (can be None)
+
+ """
+ self._check_owned()
+ self._check_unnotified()
+
+ self._nwaiters += 1
+ try:
+ if self._poller is None:
+ (self._read_fd, self._write_fd) = os.pipe()
+ self._poller = select.poll()
+ self._poller.register(self._read_fd, select.POLLHUP)
+
+ wait_fn = self._waiter_class(self._poller, self._read_fd)
+ state = self._release_save()
+ try:
+ # Wait for notification
+ wait_fn(timeout)
+ finally:
+ # Re-acquire lock
+ self._acquire_restore(state)
+ finally:
+ self._nwaiters -= 1
+ if self._nwaiters == 0:
+ self._Cleanup()
+
+ def notifyAll(self): # pylint: disable=C0103
+ """Close the writing side of the pipe to notify all waiters.
+
+ """
+ self._check_owned()
+ self._check_unnotified()
+ self._notified = True
+ if self._write_fd is not None:
+ os.close(self._write_fd)
+ self._write_fd = None
+
+
+class PipeCondition(_BaseCondition):
+ """Group-only non-polling condition with counters.
+
+ This condition class uses pipes and poll, internally, to be able to wait for
+ notification with a timeout, without resorting to polling. It is almost
+ compatible with Python's threading.Condition, but only supports notifyAll and
+ non-recursive locks. As an additional features it's able to report whether
+ there are any waiting threads.
+
+ """
+ __slots__ = [
+ "_waiters",
+ "_single_condition",
+ ]
+
+ _single_condition_class = SingleNotifyPipeCondition
+
+ def __init__(self, lock):
+ """Initializes this class.
+
+ """
+ _BaseCondition.__init__(self, lock)
+ self._waiters = set()
+ self._single_condition = self._single_condition_class(self._lock)
+
+ def wait(self, timeout):
+ """Wait for a notification.
+
+ @type timeout: float or None
+ @param timeout: Waiting timeout (can be None)
+
+ """
+ self._check_owned()
+
+ # Keep local reference to the pipe. It could be replaced by another thread
+ # notifying while we're waiting.
+ cond = self._single_condition
+
+ self._waiters.add(threading.currentThread())
+ try:
+ cond.wait(timeout)
+ finally:
+ self._check_owned()
+ self._waiters.remove(threading.currentThread())
+
+ def notifyAll(self): # pylint: disable=C0103
+ """Notify all currently waiting threads.
+
+ """
+ self._check_owned()
+ self._single_condition.notifyAll()
+ self._single_condition = self._single_condition_class(self._lock)
+
+ def get_waiting(self):
+ """Returns a list of all waiting threads.
+
+ """
+ self._check_owned()
+
+ return self._waiters
+
+ def has_waiting(self):
+ """Returns whether there are active waiters.
+
+ """
+ self._check_owned()
+
+ return bool(self._waiters)
+
+ def __repr__(self):
+ return ("<%s.%s waiters=%s at %#x>" %
+ (self.__class__.__module__, self.__class__.__name__,
+ self._waiters, id(self)))
+
+
+class _PipeConditionWithMode(PipeCondition):
+ __slots__ = [
+ "shared",
+ ]
+
+ def __init__(self, lock, shared):
+ """Initializes this class.
+
+ """
+ self.shared = shared
+ PipeCondition.__init__(self, lock)
+
+
+class SharedLock(object):