X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/a66bd91b7074b62884839ebff79bce9e237b39f1..69b999879d380c6c8ea2e8871085ffe46a13779b:/lib/locking.py diff --git a/lib/locking.py b/lib/locking.py index f7a62d2..382a45c 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -18,6 +18,9 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA # 02110-1301, USA. +# Disable "Invalid name ..." message +# pylint: disable-msg=C0103 + """Module implementing the Ganeti locking code.""" import os @@ -49,21 +52,18 @@ def ssynchronized(lock, shared=0): return wrap -class _SingleActionPipeConditionWaiter(object): - """Callable helper class for _SingleActionPipeCondition. +class _SingleNotifyPipeConditionWaiter(object): + """Helper class for SingleNotifyPipeCondition """ __slots__ = [ - "_cond", "_fd", "_poller", ] - def __init__(self, cond, poller, fd): - """Initializes this class. + def __init__(self, poller, fd): + """Constructor for _SingleNotifyPipeConditionWaiter - @type cond: L{_SingleActionPipeCondition} - @param cond: Parent condition @type poller: select.poll @param poller: Poller object @type fd: int @@ -71,8 +71,6 @@ class _SingleActionPipeConditionWaiter(object): """ object.__init__(self) - - self._cond = cond self._poller = poller self._fd = fd @@ -103,105 +101,96 @@ class _SingleActionPipeConditionWaiter(object): remaining_time = start_time + timeout - time.time() -class _SingleActionPipeCondition(object): - """Wrapper around a pipe for usage inside conditions. - - This class contains a POSIX pipe(2) and a poller to poll it. The pipe is - always allocated when constructing the class. Extra care is taken to always - close the file descriptors. +class _BaseCondition(object): + """Base class containing common code for conditions. - An additional class, L{_SingleActionPipeConditionWaiter}, is used to wait for - notifications. - - Warning: This class is designed to be used as the underlying component of a - locking condition, but is not by itself thread safe, and needs to be - protected by an external lock. + Some of this code is taken from python's threading module. """ __slots__ = [ - "_poller", - "_read_fd", - "_write_fd", - "_nwaiters", + "_lock", + "acquire", + "release", ] - _waiter_class = _SingleActionPipeConditionWaiter + def __init__(self, lock): + """Constructor for _BaseCondition. - def __init__(self): - """Initializes this class. + @type lock: threading.Lock + @param lock: condition base lock """ object.__init__(self) - self._nwaiters = 0 - - # Just assume the unpacking is successful, otherwise error handling gets - # very complicated. - (self._read_fd, self._write_fd) = os.pipe() - try: - # The poller looks for closure of the write side - poller = select.poll() - poller.register(self._read_fd, select.POLLHUP) - - self._poller = poller - except: - if self._read_fd is not None: - os.close(self._read_fd) - if self._write_fd is not None: - os.close(self._write_fd) - raise + # Recursive locks are not supported + assert not hasattr(lock, "_acquire_restore") + assert not hasattr(lock, "_release_save") - # There should be no code here anymore, otherwise the pipe file descriptors - # may be not be cleaned up properly in case of errors. + self._lock = lock - def StartWaiting(self): - """Return function to wait for notification. + # Export the lock's acquire() and release() methods + self.acquire = lock.acquire + self.release = lock.release - @rtype: L{_SingleActionPipeConditionWaiter} - @return: Function to wait for notification + def _is_owned(self): + """Check whether lock is owned by current thread. """ - assert self._nwaiters >= 0 + if self._lock.acquire(0): + self._lock.release() + return False - if self._poller is None: - raise RuntimeError("Already cleaned up") + return True - # Create waiter function and increase number of waiters - wait_fn = self._waiter_class(self, self._poller, self._read_fd) - self._nwaiters += 1 - return wait_fn + def _check_owned(self): + """Raise an exception if the current thread doesn't own the lock. - def DoneWaiting(self): - """Decrement number of waiters and automatic cleanup. + """ + if not self._is_owned(): + raise RuntimeError("cannot work with un-aquired lock") - Must be called after waiting for a notification. - @rtype: bool - @return: Whether this was the last waiter +class SingleNotifyPipeCondition(_BaseCondition): + """Condition which can only be notified once. - """ - assert self._nwaiters > 0 + This condition class uses pipes and poll, internally, to be able to wait for + notification with a timeout, without resorting to polling. It is almost + compatible with Python's threading.Condition, with the following differences: + - notifyAll can only be called once, and no wait can happen after that + - notify is not supported, only notifyAll - self._nwaiters -= 1 + """ - if self._nwaiters == 0: - self._Cleanup() - return True + __slots__ = _BaseCondition.__slots__ + [ + "_poller", + "_read_fd", + "_write_fd", + "_nwaiters", + "_notified", + ] - return False + _waiter_class = _SingleNotifyPipeConditionWaiter - def notifyAll(self): - """Close the writing side of the pipe to notify all waiters. + def __init__(self, lock): + """Constructor for SingleNotifyPipeCondition """ - if self._write_fd is None: - raise RuntimeError("Can only notify once") - - os.close(self._write_fd) + _BaseCondition.__init__(self, lock) + self._nwaiters = 0 + self._notified = False + self._read_fd = None self._write_fd = None + self._poller = None + + def _check_unnotified(self): + """Throws an exception if already notified. + + """ + if self._notified: + raise RuntimeError("cannot use already notified condition") def _Cleanup(self): - """Close all file descriptors. + """Cleanup open file descriptors, if any. """ if self._read_fd is not None: @@ -211,19 +200,51 @@ class _SingleActionPipeCondition(object): if self._write_fd is not None: os.close(self._write_fd) self._write_fd = None - self._poller = None - def __del__(self): - """Called on object deletion. + def wait(self, timeout=None): + """Wait for a notification. + + @type timeout: float or None + @param timeout: Waiting timeout (can be None) + + """ + self._check_owned() + self._check_unnotified() + + self._nwaiters += 1 + try: + if self._poller is None: + (self._read_fd, self._write_fd) = os.pipe() + self._poller = select.poll() + self._poller.register(self._read_fd, select.POLLHUP) + + wait_fn = self._waiter_class(self._poller, self._read_fd) + self.release() + try: + # Wait for notification + wait_fn(timeout) + finally: + # Re-acquire lock + self.acquire() + finally: + self._nwaiters -= 1 + if self._nwaiters == 0: + self._Cleanup() - Ensure no file descriptors are left open. + def notifyAll(self): + """Close the writing side of the pipe to notify all waiters. """ - self._Cleanup() + self._check_owned() + self._check_unnotified() + self._notified = True + if self._write_fd is not None: + os.close(self._write_fd) + self._write_fd = None -class _PipeCondition(object): +class PipeCondition(_BaseCondition): """Group-only non-polling condition with counters. This condition class uses pipes and poll, internally, to be able to wait for @@ -233,51 +254,20 @@ class _PipeCondition(object): there are any waiting threads. """ - __slots__ = [ - "_lock", + __slots__ = _BaseCondition.__slots__ + [ "_nwaiters", - "_pipe", - "acquire", - "release", + "_single_condition", ] - _pipe_class = _SingleActionPipeCondition + _single_condition_class = SingleNotifyPipeCondition def __init__(self, lock): """Initializes this class. """ - object.__init__(self) - - # Recursive locks are not supported - assert not hasattr(lock, "_acquire_restore") - assert not hasattr(lock, "_release_save") - - self._lock = lock - - # Export the lock's acquire() and release() methods - self.acquire = lock.acquire - self.release = lock.release - + _BaseCondition.__init__(self, lock) self._nwaiters = 0 - self._pipe = None - - def _is_owned(self): - """Check whether lock is owned by current thread. - - """ - if self._lock.acquire(0): - self._lock.release() - return False - - return True - - def _check_owned(self): - """Raise an exception if the current thread doesn't own the lock. - - """ - if not self._is_owned(): - raise RuntimeError("cannot work with un-aquired lock") + self._single_condition = self._single_condition_class(self._lock) def wait(self, timeout=None): """Wait for a notification. @@ -288,32 +278,14 @@ class _PipeCondition(object): """ self._check_owned() - if not self._pipe: - self._pipe = self._pipe_class() - # Keep local reference to the pipe. It could be replaced by another thread # notifying while we're waiting. - pipe = self._pipe + my_condition = self._single_condition assert self._nwaiters >= 0 self._nwaiters += 1 try: - # Get function to wait on the pipe - wait_fn = pipe.StartWaiting() - try: - # Release lock while waiting - self.release() - try: - # Wait for notification - wait_fn(timeout) - finally: - # Re-acquire lock - self.acquire() - finally: - # Destroy pipe if this was the last waiter and the current pipe is - # still the same. The same pipe cannot be reused after cleanup. - if pipe.DoneWaiting() and pipe == self._pipe: - self._pipe = None + my_condition.wait(timeout) finally: assert self._nwaiters > 0 self._nwaiters -= 1 @@ -323,12 +295,8 @@ class _PipeCondition(object): """ self._check_owned() - - # Notify and forget pipe. A new one will be created on the next call to - # wait. - if self._pipe is not None: - self._pipe.notifyAll() - self._pipe = None + self._single_condition.notifyAll() + self._single_condition = self._single_condition_class(self._lock) def has_waiting(self): """Returns whether there are active waiters. @@ -369,7 +337,7 @@ class _CountingCondition(object): """Waits for the condition to be notified. @type timeout: float or None - @param timeout: Timeout in seconds + @param timeout: Waiting timeout (can be None) """ assert self._nwaiters >= 0 @@ -409,7 +377,7 @@ class SharedLock(object): "__shr", ] - __condition_class = _PipeCondition + __condition_class = PipeCondition def __init__(self): """Construct a new SharedLock. @@ -574,7 +542,7 @@ class SharedLock(object): return False - def acquire(self, shared=0, timeout=None): + def acquire(self, shared=0, timeout=None, test_notify=None): """Acquire a shared lock. @type shared: int @@ -582,10 +550,16 @@ class SharedLock(object): exclusive lock will be acquired @type timeout: float @param timeout: maximum waiting time before giving up + @type test_notify: callable or None + @param test_notify: Special callback function for unittesting """ self.__lock.acquire() try: + # We already got the lock, notify now + if __debug__ and callable(test_notify): + test_notify() + return self.__acquire_unlocked(shared, timeout) finally: self.__lock.release() @@ -664,6 +638,12 @@ class SharedLock(object): ALL_SET = None +class _AcquireTimeout(Exception): + """Internal exception to abort an acquire on a timeout. + + """ + + class LockSet: """Implements a set of locks. @@ -734,6 +714,12 @@ class LockSet: else: return set() + def _release_and_delete_owned(self): + """Release and delete all resources owned by the current thread""" + for lname in self._list_owned(): + self.__lockdict[lname].release() + self._del_owned(name=lname) + def __names(self): """Return the current set of names. @@ -762,30 +748,43 @@ class LockSet: self.__lock.release() return set(result) - def acquire(self, names, timeout=None, shared=0): + def acquire(self, names, timeout=None, shared=0, test_notify=None): """Acquire a set of resource locks. @param names: the names of the locks which shall be acquired (special lock names, or instance/node names) @param shared: whether to acquire in shared mode; by default an exclusive lock will be acquired - @type timeout: float + @type timeout: float or None @param timeout: Maximum time to acquire all locks + @type test_notify: callable or None + @param test_notify: Special callback function for unittesting - @return: True when all the locks are successfully acquired + @return: Set of all locks successfully acquired or None in case of timeout @raise errors.LockError: when any lock we try to acquire has been deleted before we succeed. In this case none of the locks requested will be acquired. """ - if timeout is not None: - raise NotImplementedError + assert timeout is None or timeout >= 0.0 # Check we don't already own locks at this level assert not self._is_owned(), "Cannot acquire locks in the same set twice" - if names is None: + # We need to keep track of how long we spent waiting for a lock. The + # timeout passed to this function is over all lock acquires. + remaining_timeout = timeout + if timeout is None: + start = None + calc_remaining_timeout = lambda: None + else: + start = time.time() + calc_remaining_timeout = lambda: (start + timeout) - time.time() + + want_all = names is None + + if want_all: # If no names are given acquire the whole set by not letting new names # being added before we release, and getting the current list of names. # Some of them may then be deleted later, but we'll cope with this. @@ -795,7 +794,7 @@ class LockSet: # them exclusively though they won't be able to do this anyway, though, # so we'll get the list lock exclusively as well in order to be able to # do add() on the set while owning it. - self.__lock.acquire(shared=shared) + self.__lock.acquire(shared=shared, timeout=remaining_timeout) try: # note we own the set-lock self._add_owned() @@ -807,65 +806,103 @@ class LockSet: self.__lock.release() raise + # Re-calculate timeout + remaining_timeout = calc_remaining_timeout() + try: - # Support passing in a single resource to acquire rather than many - if isinstance(names, basestring): - names = [names] - else: - names = sorted(names) - - acquire_list = [] - # First we look the locks up on __lockdict. We have no way of being sure - # they will still be there after, but this makes it a lot faster should - # just one of them be the already wrong - for lname in utils.UniqueSequence(names): - try: - lock = self.__lockdict[lname] # raises KeyError if lock is not there - acquire_list.append((lname, lock)) - except (KeyError): - if self.__lock._is_owned(): - # We are acquiring all the set, it doesn't matter if this - # particular element is not there anymore. - continue - else: - raise errors.LockError('non-existing lock in set (%s)' % lname) - - # This will hold the locknames we effectively acquired. - acquired = set() - # Now acquire_list contains a sorted list of resources and locks we want. - # In order to get them we loop on this (private) list and acquire() them. - # We gave no real guarantee they will still exist till this is done but - # .acquire() itself is safe and will alert us if the lock gets deleted. - for (lname, lock) in acquire_list: - try: - lock.acquire(shared=shared) # raises LockError if the lock is deleted - # now the lock cannot be deleted, we have it! - self._add_owned(name=lname) - acquired.add(lname) - except (errors.LockError): - if self.__lock._is_owned(): - # We are acquiring all the set, it doesn't matter if this - # particular element is not there anymore. - continue + try: + # Support passing in a single resource to acquire rather than many + if isinstance(names, basestring): + names = [names] + else: + names = sorted(names) + + acquire_list = [] + # First we look the locks up on __lockdict. We have no way of being sure + # they will still be there after, but this makes it a lot faster should + # just one of them be the already wrong + for lname in utils.UniqueSequence(names): + try: + lock = self.__lockdict[lname] # raises KeyError if lock is not there + acquire_list.append((lname, lock)) + except KeyError: + if want_all: + # We are acquiring all the set, it doesn't matter if this + # particular element is not there anymore. + continue + else: + raise errors.LockError("Non-existing lock in set (%s)" % lname) + + # This will hold the locknames we effectively acquired. + acquired = set() + + # Now acquire_list contains a sorted list of resources and locks we + # want. In order to get them we loop on this (private) list and + # acquire() them. We gave no real guarantee they will still exist till + # this is done but .acquire() itself is safe and will alert us if the + # lock gets deleted. + for (lname, lock) in acquire_list: + if __debug__ and callable(test_notify): + test_notify_fn = lambda: test_notify(lname) else: - name_fail = lname - for lname in self._list_owned(): - self.__lockdict[lname].release() - self._del_owned(name=lname) - raise errors.LockError('non-existing lock in set (%s)' % name_fail) - except: - # We shouldn't have problems adding the lock to the owners list, but - # if we did we'll try to release this lock and re-raise exception. - # Of course something is going to be really wrong, after this. - if lock._is_owned(): - lock.release() - raise + test_notify_fn = None - except: - # If something went wrong and we had the set-lock let's release it... - if self.__lock._is_owned(): - self.__lock.release() - raise + try: + if timeout is not None and remaining_timeout < 0: + raise _AcquireTimeout() + + # raises LockError if the lock was deleted + if not lock.acquire(shared=shared, timeout=remaining_timeout, + test_notify=test_notify_fn): + # Couldn't get lock or timeout occurred + if timeout is None: + # This shouldn't happen as SharedLock.acquire(timeout=None) is + # blocking. + raise errors.LockError("Failed to get lock %s" % lname) + + raise _AcquireTimeout() + + # Re-calculate timeout + remaining_timeout = calc_remaining_timeout() + + # now the lock cannot be deleted, we have it! + self._add_owned(name=lname) + acquired.add(lname) + + except _AcquireTimeout: + # Release all acquired locks + self._release_and_delete_owned() + raise + + except errors.LockError: + if want_all: + # We are acquiring all the set, it doesn't matter if this + # particular element is not there anymore. + continue + + self._release_and_delete_owned() + + raise errors.LockError("Non-existing lock in set (%s)" % lname) + + except: + # We shouldn't have problems adding the lock to the owners list, but + # if we did we'll try to release this lock and re-raise exception. + # Of course something is going to be really wrong, after this. + if lock._is_owned(): + lock.release() + raise + + except: + # If something went wrong and we had the set-lock let's release it... + if want_all: + self.__lock.release() + raise + + except _AcquireTimeout: + if want_all: + self._del_owned() + + return None return acquired @@ -971,7 +1008,7 @@ class LockSet: @param names: names of the resource to remove. - @return:: a list of locks which we removed; the list is always + @return: a list of locks which we removed; the list is always equal to the names list if we were holding all the locks exclusively