+def ssynchronized(lock, shared=0):
+ """Shared Synchronization decorator.
+
+ Calls the function holding the given lock, either in exclusive or shared
+ mode. It requires the passed lock to be a SharedLock (or support its
+ semantics).
+
+ """
+ def wrap(fn):
+ def sync_function(*args, **kwargs):
+ lock.acquire(shared=shared)
+ try:
+ return fn(*args, **kwargs)
+ finally:
+ lock.release()
+ return sync_function
+ return wrap
+
+
+class _SingleNotifyPipeConditionWaiter(object):
+ """Helper class for SingleNotifyPipeCondition
+
+ """
+ __slots__ = [
+ "_fd",
+ "_poller",
+ ]
+
+ def __init__(self, poller, fd):
+ """Constructor for _SingleNotifyPipeConditionWaiter
+
+ @type poller: select.poll
+ @param poller: Poller object
+ @type fd: int
+ @param fd: File descriptor to wait for
+
+ """
+ object.__init__(self)
+ self._poller = poller
+ self._fd = fd
+
+ def __call__(self, timeout):
+ """Wait for something to happen on the pipe.
+
+ @type timeout: float or None
+ @param timeout: Timeout for waiting (can be None)
+
+ """
+ start_time = time.time()
+ remaining_time = timeout
+
+ while timeout is None or remaining_time > 0:
+ try:
+ result = self._poller.poll(remaining_time)
+ except EnvironmentError, err:
+ if err.errno != errno.EINTR:
+ raise
+ result = None
+
+ # Check whether we were notified
+ if result and result[0][0] == self._fd:
+ break
+
+ # Re-calculate timeout if necessary
+ if timeout is not None:
+ remaining_time = start_time + timeout - time.time()
+
+
+class _BaseCondition(object):
+ """Base class containing common code for conditions.
+
+ Some of this code is taken from python's threading module.
+
+ """
+ __slots__ = [
+ "_lock",
+ "acquire",
+ "release",
+ ]
+
+ def __init__(self, lock):
+ """Constructor for _BaseCondition.
+
+ @type lock: threading.Lock
+ @param lock: condition base lock
+
+ """
+ object.__init__(self)
+
+ # Recursive locks are not supported
+ assert not hasattr(lock, "_acquire_restore")
+ assert not hasattr(lock, "_release_save")
+
+ self._lock = lock
+
+ # Export the lock's acquire() and release() methods
+ self.acquire = lock.acquire
+ self.release = lock.release
+
+ def _is_owned(self):
+ """Check whether lock is owned by current thread.
+
+ """
+ if self._lock.acquire(0):
+ self._lock.release()
+ return False
+
+ return True
+
+ def _check_owned(self):
+ """Raise an exception if the current thread doesn't own the lock.
+
+ """
+ if not self._is_owned():
+ raise RuntimeError("cannot work with un-aquired lock")
+
+
+class SingleNotifyPipeCondition(_BaseCondition):
+ """Condition which can only be notified once.
+
+ This condition class uses pipes and poll, internally, to be able to wait for
+ notification with a timeout, without resorting to polling. It is almost
+ compatible with Python's threading.Condition, with the following differences:
+ - notifyAll can only be called once, and no wait can happen after that
+ - notify is not supported, only notifyAll
+
+ """
+
+ __slots__ = _BaseCondition.__slots__ + [
+ "_poller",
+ "_read_fd",
+ "_write_fd",
+ "_nwaiters",
+ "_notified",
+ ]
+
+ _waiter_class = _SingleNotifyPipeConditionWaiter
+
+ def __init__(self, lock):
+ """Constructor for SingleNotifyPipeCondition
+
+ """
+ _BaseCondition.__init__(self, lock)
+ self._nwaiters = 0
+ self._notified = False
+ self._read_fd = None
+ self._write_fd = None
+ self._poller = None
+
+ def _check_unnotified(self):
+ """Throws an exception if already notified.
+
+ """
+ if self._notified:
+ raise RuntimeError("cannot use already notified condition")
+
+ def _Cleanup(self):
+ """Cleanup open file descriptors, if any.
+
+ """
+ if self._read_fd is not None:
+ os.close(self._read_fd)
+ self._read_fd = None
+
+ if self._write_fd is not None:
+ os.close(self._write_fd)
+ self._write_fd = None
+ self._poller = None
+
+ def wait(self, timeout=None):
+ """Wait for a notification.
+
+ @type timeout: float or None
+ @param timeout: Waiting timeout (can be None)
+
+ """
+ self._check_owned()
+ self._check_unnotified()
+
+ self._nwaiters += 1
+ try:
+ if self._poller is None:
+ (self._read_fd, self._write_fd) = os.pipe()
+ self._poller = select.poll()
+ self._poller.register(self._read_fd, select.POLLHUP)
+
+ wait_fn = self._waiter_class(self._poller, self._read_fd)
+ self.release()
+ try:
+ # Wait for notification
+ wait_fn(timeout)
+ finally:
+ # Re-acquire lock
+ self.acquire()
+ finally:
+ self._nwaiters -= 1
+ if self._nwaiters == 0:
+ self._Cleanup()
+
+ def notifyAll(self):
+ """Close the writing side of the pipe to notify all waiters.
+
+ """
+ self._check_owned()
+ self._check_unnotified()
+ self._notified = True
+ if self._write_fd is not None:
+ os.close(self._write_fd)
+ self._write_fd = None
+
+
+class PipeCondition(_BaseCondition):
+ """Group-only non-polling condition with counters.
+
+ This condition class uses pipes and poll, internally, to be able to wait for
+ notification with a timeout, without resorting to polling. It is almost
+ compatible with Python's threading.Condition, but only supports notifyAll and
+ non-recursive locks. As an additional features it's able to report whether
+ there are any waiting threads.
+
+ """
+ __slots__ = _BaseCondition.__slots__ + [
+ "_nwaiters",
+ "_single_condition",
+ ]
+
+ _single_condition_class = SingleNotifyPipeCondition
+
+ def __init__(self, lock):
+ """Initializes this class.
+
+ """
+ _BaseCondition.__init__(self, lock)
+ self._nwaiters = 0
+ self._single_condition = self._single_condition_class(self._lock)
+
+ def wait(self, timeout=None):
+ """Wait for a notification.
+
+ @type timeout: float or None
+ @param timeout: Waiting timeout (can be None)
+
+ """
+ self._check_owned()
+
+ # Keep local reference to the pipe. It could be replaced by another thread
+ # notifying while we're waiting.
+ my_condition = self._single_condition
+
+ assert self._nwaiters >= 0
+ self._nwaiters += 1
+ try:
+ my_condition.wait(timeout)
+ finally:
+ assert self._nwaiters > 0
+ self._nwaiters -= 1
+
+ def notifyAll(self):
+ """Notify all currently waiting threads.
+
+ """
+ self._check_owned()
+ self._single_condition.notifyAll()
+ self._single_condition = self._single_condition_class(self._lock)
+
+ def has_waiting(self):
+ """Returns whether there are active waiters.
+
+ """
+ self._check_owned()
+
+ return bool(self._nwaiters)
+
+
+class _CountingCondition(object):
+ """Wrapper for Python's built-in threading.Condition class.
+
+ This wrapper keeps a count of active waiters. We can't access the internal
+ "__waiters" attribute of threading.Condition because it's not thread-safe.
+
+ """
+ __slots__ = [
+ "_cond",
+ "_nwaiters",
+ ]
+
+ def __init__(self, lock):
+ """Initializes this class.
+
+ """
+ object.__init__(self)
+ self._cond = threading.Condition(lock=lock)
+ self._nwaiters = 0
+
+ def notifyAll(self):
+ """Notifies the condition.
+
+ """
+ return self._cond.notifyAll()
+
+ def wait(self, timeout=None):
+ """Waits for the condition to be notified.
+
+ @type timeout: float or None
+ @param timeout: Waiting timeout (can be None)
+
+ """
+ assert self._nwaiters >= 0
+
+ self._nwaiters += 1
+ try:
+ return self._cond.wait(timeout=timeout)
+ finally:
+ self._nwaiters -= 1
+
+ def has_waiting(self):
+ """Returns whether there are active waiters.
+
+ """
+ return bool(self._nwaiters)
+
+
+class SharedLock(object):