X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/2419060dfa640186dc22cb461a21d0eaf0b32307..69b999879d380c6c8ea2e8871085ffe46a13779b:/lib/locking.py diff --git a/lib/locking.py b/lib/locking.py index 1e14f65..382a45c 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -18,6 +18,9 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA # 02110-1301, USA. +# Disable "Invalid name ..." message +# pylint: disable-msg=C0103 + """Module implementing the Ganeti locking code.""" import os @@ -49,21 +52,18 @@ def ssynchronized(lock, shared=0): return wrap -class _SingleActionPipeConditionWaiter(object): - """Callable helper class for _SingleActionPipeCondition. +class _SingleNotifyPipeConditionWaiter(object): + """Helper class for SingleNotifyPipeCondition """ __slots__ = [ - "_cond", "_fd", "_poller", ] - def __init__(self, cond, poller, fd): - """Initializes this class. + def __init__(self, poller, fd): + """Constructor for _SingleNotifyPipeConditionWaiter - @type cond: L{_SingleActionPipeCondition} - @param cond: Parent condition @type poller: select.poll @param poller: Poller object @type fd: int @@ -71,8 +71,6 @@ class _SingleActionPipeConditionWaiter(object): """ object.__init__(self) - - self._cond = cond self._poller = poller self._fd = fd @@ -118,7 +116,7 @@ class _BaseCondition(object): def __init__(self, lock): """Constructor for _BaseCondition. - @type lock: L{threading.Lock} + @type lock: threading.Lock @param lock: condition base lock """ @@ -152,127 +150,101 @@ class _BaseCondition(object): raise RuntimeError("cannot work with un-aquired lock") -class _SingleActionPipeCondition(object): - """Wrapper around a pipe for usage inside conditions. +class SingleNotifyPipeCondition(_BaseCondition): + """Condition which can only be notified once. - This class contains a POSIX pipe(2) and a poller to poll it. The pipe is - always allocated when constructing the class. Extra care is taken to always - close the file descriptors. - - An additional class, L{_SingleActionPipeConditionWaiter}, is used to wait for - notifications. - - Warning: This class is designed to be used as the underlying component of a - locking condition, but is not by itself thread safe, and needs to be - protected by an external lock. + This condition class uses pipes and poll, internally, to be able to wait for + notification with a timeout, without resorting to polling. It is almost + compatible with Python's threading.Condition, with the following differences: + - notifyAll can only be called once, and no wait can happen after that + - notify is not supported, only notifyAll """ - __slots__ = [ + + __slots__ = _BaseCondition.__slots__ + [ "_poller", "_read_fd", "_write_fd", "_nwaiters", + "_notified", ] - _waiter_class = _SingleActionPipeConditionWaiter + _waiter_class = _SingleNotifyPipeConditionWaiter - def __init__(self): - """Initializes this class. + def __init__(self, lock): + """Constructor for SingleNotifyPipeCondition """ - object.__init__(self) - + _BaseCondition.__init__(self, lock) self._nwaiters = 0 + self._notified = False + self._read_fd = None + self._write_fd = None + self._poller = None - # Just assume the unpacking is successful, otherwise error handling gets - # very complicated. - (self._read_fd, self._write_fd) = os.pipe() - try: - # The poller looks for closure of the write side - poller = select.poll() - poller.register(self._read_fd, select.POLLHUP) - - self._poller = poller - except: - if self._read_fd is not None: - os.close(self._read_fd) - if self._write_fd is not None: - os.close(self._write_fd) - raise - - # There should be no code here anymore, otherwise the pipe file descriptors - # may be not be cleaned up properly in case of errors. - - def StartWaiting(self): - """Return function to wait for notification. - - @rtype: L{_SingleActionPipeConditionWaiter} - @return: Function to wait for notification + def _check_unnotified(self): + """Throws an exception if already notified. """ - assert self._nwaiters >= 0 + if self._notified: + raise RuntimeError("cannot use already notified condition") - if self._poller is None: - raise RuntimeError("Already cleaned up") + def _Cleanup(self): + """Cleanup open file descriptors, if any. - # Create waiter function and increase number of waiters - wait_fn = self._waiter_class(self, self._poller, self._read_fd) - self._nwaiters += 1 - return wait_fn + """ + if self._read_fd is not None: + os.close(self._read_fd) + self._read_fd = None - def DoneWaiting(self): - """Decrement number of waiters and automatic cleanup. + if self._write_fd is not None: + os.close(self._write_fd) + self._write_fd = None + self._poller = None - Must be called after waiting for a notification. + def wait(self, timeout=None): + """Wait for a notification. - @rtype: bool - @return: Whether this was the last waiter + @type timeout: float or None + @param timeout: Waiting timeout (can be None) """ - assert self._nwaiters > 0 - - self._nwaiters -= 1 + self._check_owned() + self._check_unnotified() - if self._nwaiters == 0: - self._Cleanup() - return True + self._nwaiters += 1 + try: + if self._poller is None: + (self._read_fd, self._write_fd) = os.pipe() + self._poller = select.poll() + self._poller.register(self._read_fd, select.POLLHUP) - return False + wait_fn = self._waiter_class(self._poller, self._read_fd) + self.release() + try: + # Wait for notification + wait_fn(timeout) + finally: + # Re-acquire lock + self.acquire() + finally: + self._nwaiters -= 1 + if self._nwaiters == 0: + self._Cleanup() def notifyAll(self): """Close the writing side of the pipe to notify all waiters. """ - if self._write_fd is None: - raise RuntimeError("Can only notify once") - - os.close(self._write_fd) - self._write_fd = None - - def _Cleanup(self): - """Close all file descriptors. - - """ - if self._read_fd is not None: - os.close(self._read_fd) - self._read_fd = None - + self._check_owned() + self._check_unnotified() + self._notified = True if self._write_fd is not None: os.close(self._write_fd) self._write_fd = None - self._poller = None - - def __del__(self): - """Called on object deletion. - - Ensure no file descriptors are left open. - - """ - self._Cleanup() - -class _PipeCondition(_BaseCondition): +class PipeCondition(_BaseCondition): """Group-only non-polling condition with counters. This condition class uses pipes and poll, internally, to be able to wait for @@ -284,10 +256,10 @@ class _PipeCondition(_BaseCondition): """ __slots__ = _BaseCondition.__slots__ + [ "_nwaiters", - "_pipe", + "_single_condition", ] - _pipe_class = _SingleActionPipeCondition + _single_condition_class = SingleNotifyPipeCondition def __init__(self, lock): """Initializes this class. @@ -295,7 +267,7 @@ class _PipeCondition(_BaseCondition): """ _BaseCondition.__init__(self, lock) self._nwaiters = 0 - self._pipe = None + self._single_condition = self._single_condition_class(self._lock) def wait(self, timeout=None): """Wait for a notification. @@ -306,32 +278,14 @@ class _PipeCondition(_BaseCondition): """ self._check_owned() - if not self._pipe: - self._pipe = self._pipe_class() - # Keep local reference to the pipe. It could be replaced by another thread # notifying while we're waiting. - pipe = self._pipe + my_condition = self._single_condition assert self._nwaiters >= 0 self._nwaiters += 1 try: - # Get function to wait on the pipe - wait_fn = pipe.StartWaiting() - try: - # Release lock while waiting - self.release() - try: - # Wait for notification - wait_fn(timeout) - finally: - # Re-acquire lock - self.acquire() - finally: - # Destroy pipe if this was the last waiter and the current pipe is - # still the same. The same pipe cannot be reused after cleanup. - if pipe.DoneWaiting() and pipe == self._pipe: - self._pipe = None + my_condition.wait(timeout) finally: assert self._nwaiters > 0 self._nwaiters -= 1 @@ -341,12 +295,8 @@ class _PipeCondition(_BaseCondition): """ self._check_owned() - - # Notify and forget pipe. A new one will be created on the next call to - # wait. - if self._pipe is not None: - self._pipe.notifyAll() - self._pipe = None + self._single_condition.notifyAll() + self._single_condition = self._single_condition_class(self._lock) def has_waiting(self): """Returns whether there are active waiters. @@ -387,7 +337,7 @@ class _CountingCondition(object): """Waits for the condition to be notified. @type timeout: float or None - @param timeout: Timeout in seconds + @param timeout: Waiting timeout (can be None) """ assert self._nwaiters >= 0 @@ -427,7 +377,7 @@ class SharedLock(object): "__shr", ] - __condition_class = _PipeCondition + __condition_class = PipeCondition def __init__(self): """Construct a new SharedLock. @@ -592,7 +542,7 @@ class SharedLock(object): return False - def acquire(self, shared=0, timeout=None): + def acquire(self, shared=0, timeout=None, test_notify=None): """Acquire a shared lock. @type shared: int @@ -600,10 +550,16 @@ class SharedLock(object): exclusive lock will be acquired @type timeout: float @param timeout: maximum waiting time before giving up + @type test_notify: callable or None + @param test_notify: Special callback function for unittesting """ self.__lock.acquire() try: + # We already got the lock, notify now + if __debug__ and callable(test_notify): + test_notify() + return self.__acquire_unlocked(shared, timeout) finally: self.__lock.release() @@ -682,6 +638,12 @@ class SharedLock(object): ALL_SET = None +class _AcquireTimeout(Exception): + """Internal exception to abort an acquire on a timeout. + + """ + + class LockSet: """Implements a set of locks. @@ -752,6 +714,12 @@ class LockSet: else: return set() + def _release_and_delete_owned(self): + """Release and delete all resources owned by the current thread""" + for lname in self._list_owned(): + self.__lockdict[lname].release() + self._del_owned(name=lname) + def __names(self): """Return the current set of names. @@ -780,30 +748,43 @@ class LockSet: self.__lock.release() return set(result) - def acquire(self, names, timeout=None, shared=0): + def acquire(self, names, timeout=None, shared=0, test_notify=None): """Acquire a set of resource locks. @param names: the names of the locks which shall be acquired (special lock names, or instance/node names) @param shared: whether to acquire in shared mode; by default an exclusive lock will be acquired - @type timeout: float + @type timeout: float or None @param timeout: Maximum time to acquire all locks + @type test_notify: callable or None + @param test_notify: Special callback function for unittesting - @return: True when all the locks are successfully acquired + @return: Set of all locks successfully acquired or None in case of timeout @raise errors.LockError: when any lock we try to acquire has been deleted before we succeed. In this case none of the locks requested will be acquired. """ - if timeout is not None: - raise NotImplementedError + assert timeout is None or timeout >= 0.0 # Check we don't already own locks at this level assert not self._is_owned(), "Cannot acquire locks in the same set twice" - if names is None: + # We need to keep track of how long we spent waiting for a lock. The + # timeout passed to this function is over all lock acquires. + remaining_timeout = timeout + if timeout is None: + start = None + calc_remaining_timeout = lambda: None + else: + start = time.time() + calc_remaining_timeout = lambda: (start + timeout) - time.time() + + want_all = names is None + + if want_all: # If no names are given acquire the whole set by not letting new names # being added before we release, and getting the current list of names. # Some of them may then be deleted later, but we'll cope with this. @@ -813,7 +794,7 @@ class LockSet: # them exclusively though they won't be able to do this anyway, though, # so we'll get the list lock exclusively as well in order to be able to # do add() on the set while owning it. - self.__lock.acquire(shared=shared) + self.__lock.acquire(shared=shared, timeout=remaining_timeout) try: # note we own the set-lock self._add_owned() @@ -825,65 +806,103 @@ class LockSet: self.__lock.release() raise + # Re-calculate timeout + remaining_timeout = calc_remaining_timeout() + try: - # Support passing in a single resource to acquire rather than many - if isinstance(names, basestring): - names = [names] - else: - names = sorted(names) - - acquire_list = [] - # First we look the locks up on __lockdict. We have no way of being sure - # they will still be there after, but this makes it a lot faster should - # just one of them be the already wrong - for lname in utils.UniqueSequence(names): - try: - lock = self.__lockdict[lname] # raises KeyError if lock is not there - acquire_list.append((lname, lock)) - except (KeyError): - if self.__lock._is_owned(): - # We are acquiring all the set, it doesn't matter if this - # particular element is not there anymore. - continue - else: - raise errors.LockError('non-existing lock in set (%s)' % lname) - - # This will hold the locknames we effectively acquired. - acquired = set() - # Now acquire_list contains a sorted list of resources and locks we want. - # In order to get them we loop on this (private) list and acquire() them. - # We gave no real guarantee they will still exist till this is done but - # .acquire() itself is safe and will alert us if the lock gets deleted. - for (lname, lock) in acquire_list: - try: - lock.acquire(shared=shared) # raises LockError if the lock is deleted - # now the lock cannot be deleted, we have it! - self._add_owned(name=lname) - acquired.add(lname) - except (errors.LockError): - if self.__lock._is_owned(): - # We are acquiring all the set, it doesn't matter if this - # particular element is not there anymore. - continue + try: + # Support passing in a single resource to acquire rather than many + if isinstance(names, basestring): + names = [names] + else: + names = sorted(names) + + acquire_list = [] + # First we look the locks up on __lockdict. We have no way of being sure + # they will still be there after, but this makes it a lot faster should + # just one of them be the already wrong + for lname in utils.UniqueSequence(names): + try: + lock = self.__lockdict[lname] # raises KeyError if lock is not there + acquire_list.append((lname, lock)) + except KeyError: + if want_all: + # We are acquiring all the set, it doesn't matter if this + # particular element is not there anymore. + continue + else: + raise errors.LockError("Non-existing lock in set (%s)" % lname) + + # This will hold the locknames we effectively acquired. + acquired = set() + + # Now acquire_list contains a sorted list of resources and locks we + # want. In order to get them we loop on this (private) list and + # acquire() them. We gave no real guarantee they will still exist till + # this is done but .acquire() itself is safe and will alert us if the + # lock gets deleted. + for (lname, lock) in acquire_list: + if __debug__ and callable(test_notify): + test_notify_fn = lambda: test_notify(lname) else: - name_fail = lname - for lname in self._list_owned(): - self.__lockdict[lname].release() - self._del_owned(name=lname) - raise errors.LockError('non-existing lock in set (%s)' % name_fail) - except: - # We shouldn't have problems adding the lock to the owners list, but - # if we did we'll try to release this lock and re-raise exception. - # Of course something is going to be really wrong, after this. - if lock._is_owned(): - lock.release() - raise + test_notify_fn = None - except: - # If something went wrong and we had the set-lock let's release it... - if self.__lock._is_owned(): - self.__lock.release() - raise + try: + if timeout is not None and remaining_timeout < 0: + raise _AcquireTimeout() + + # raises LockError if the lock was deleted + if not lock.acquire(shared=shared, timeout=remaining_timeout, + test_notify=test_notify_fn): + # Couldn't get lock or timeout occurred + if timeout is None: + # This shouldn't happen as SharedLock.acquire(timeout=None) is + # blocking. + raise errors.LockError("Failed to get lock %s" % lname) + + raise _AcquireTimeout() + + # Re-calculate timeout + remaining_timeout = calc_remaining_timeout() + + # now the lock cannot be deleted, we have it! + self._add_owned(name=lname) + acquired.add(lname) + + except _AcquireTimeout: + # Release all acquired locks + self._release_and_delete_owned() + raise + + except errors.LockError: + if want_all: + # We are acquiring all the set, it doesn't matter if this + # particular element is not there anymore. + continue + + self._release_and_delete_owned() + + raise errors.LockError("Non-existing lock in set (%s)" % lname) + + except: + # We shouldn't have problems adding the lock to the owners list, but + # if we did we'll try to release this lock and re-raise exception. + # Of course something is going to be really wrong, after this. + if lock._is_owned(): + lock.release() + raise + + except: + # If something went wrong and we had the set-lock let's release it... + if want_all: + self.__lock.release() + raise + + except _AcquireTimeout: + if want_all: + self._del_owned() + + return None return acquired @@ -989,7 +1008,7 @@ class LockSet: @param names: names of the resource to remove. - @return:: a list of locks which we removed; the list is always + @return: a list of locks which we removed; the list is always equal to the names list if we were holding all the locks exclusively