+from ganeti import compat
+from ganeti import query
+
+
+_EXCLUSIVE_TEXT = "exclusive"
+_SHARED_TEXT = "shared"
+_DELETED_TEXT = "deleted"
+
+_DEFAULT_PRIORITY = 0
+
+
+def ssynchronized(mylock, shared=0):
+ """Shared Synchronization decorator.
+
+ Calls the function holding the given lock, either in exclusive or shared
+ mode. It requires the passed lock to be a SharedLock (or support its
+ semantics).
+
+ @type mylock: lockable object or string
+ @param mylock: lock to acquire or class member name of the lock to acquire
+
+ """
+ def wrap(fn):
+ def sync_function(*args, **kwargs):
+ if isinstance(mylock, basestring):
+ assert args, "cannot ssynchronize on non-class method: self not found"
+ # args[0] is "self"
+ lock = getattr(args[0], mylock)
+ else:
+ lock = mylock
+ lock.acquire(shared=shared)
+ try:
+ return fn(*args, **kwargs)
+ finally:
+ lock.release()
+ return sync_function
+ return wrap
+
+
+class _SingleNotifyPipeConditionWaiter(object):
+ """Helper class for SingleNotifyPipeCondition
+
+ """
+ __slots__ = [
+ "_fd",
+ "_poller",
+ ]
+
+ def __init__(self, poller, fd):
+ """Constructor for _SingleNotifyPipeConditionWaiter
+
+ @type poller: select.poll
+ @param poller: Poller object
+ @type fd: int
+ @param fd: File descriptor to wait for
+
+ """
+ object.__init__(self)
+ self._poller = poller
+ self._fd = fd
+
+ def __call__(self, timeout):
+ """Wait for something to happen on the pipe.
+
+ @type timeout: float or None
+ @param timeout: Timeout for waiting (can be None)
+
+ """
+ running_timeout = utils.RunningTimeout(timeout, True)
+
+ while True:
+ remaining_time = running_timeout.Remaining()
+
+ if remaining_time is not None:
+ if remaining_time < 0.0:
+ break
+
+ # Our calculation uses seconds, poll() wants milliseconds
+ remaining_time *= 1000
+
+ try:
+ result = self._poller.poll(remaining_time)
+ except EnvironmentError, err:
+ if err.errno != errno.EINTR:
+ raise
+ result = None
+
+ # Check whether we were notified
+ if result and result[0][0] == self._fd:
+ break
+
+
+class _BaseCondition(object):
+ """Base class containing common code for conditions.
+
+ Some of this code is taken from python's threading module.
+
+ """
+ __slots__ = [
+ "_lock",
+ "acquire",
+ "release",
+ "_is_owned",
+ "_acquire_restore",
+ "_release_save",
+ ]
+
+ def __init__(self, lock):
+ """Constructor for _BaseCondition.
+
+ @type lock: threading.Lock
+ @param lock: condition base lock
+
+ """
+ object.__init__(self)
+
+ try:
+ self._release_save = lock._release_save
+ except AttributeError:
+ self._release_save = self._base_release_save
+ try:
+ self._acquire_restore = lock._acquire_restore
+ except AttributeError:
+ self._acquire_restore = self._base_acquire_restore
+ try:
+ self._is_owned = lock.is_owned
+ except AttributeError:
+ self._is_owned = self._base_is_owned
+
+ self._lock = lock
+
+ # Export the lock's acquire() and release() methods
+ self.acquire = lock.acquire
+ self.release = lock.release
+
+ def _base_is_owned(self):
+ """Check whether lock is owned by current thread.
+
+ """
+ if self._lock.acquire(0):
+ self._lock.release()
+ return False
+ return True
+
+ def _base_release_save(self):
+ self._lock.release()
+
+ def _base_acquire_restore(self, _):
+ self._lock.acquire()
+
+ def _check_owned(self):
+ """Raise an exception if the current thread doesn't own the lock.
+
+ """
+ if not self._is_owned():
+ raise RuntimeError("cannot work with un-aquired lock")
+
+
+class SingleNotifyPipeCondition(_BaseCondition):
+ """Condition which can only be notified once.
+
+ This condition class uses pipes and poll, internally, to be able to wait for
+ notification with a timeout, without resorting to polling. It is almost
+ compatible with Python's threading.Condition, with the following differences:
+ - notifyAll can only be called once, and no wait can happen after that
+ - notify is not supported, only notifyAll
+
+ """
+
+ __slots__ = [
+ "_poller",
+ "_read_fd",
+ "_write_fd",
+ "_nwaiters",
+ "_notified",
+ ]
+
+ _waiter_class = _SingleNotifyPipeConditionWaiter