X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/48dabc6ae13316c7ea99cd5382dc32e25f7d599c..6e7f0cd9e9ead56c60527647423422e384c20cbb:/lib/locking.py diff --git a/lib/locking.py b/lib/locking.py index 901fdac..57d2600 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007 Google Inc. +# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -20,26 +20,53 @@ """Module implementing the Ganeti locking code.""" +# pylint: disable-msg=W0212 + +# W0212 since e.g. LockSet methods use (a lot) the internals of +# SharedLock + import os import select import threading -import time import errno +import weakref +import logging +import heapq +import operator +import itertools from ganeti import errors from ganeti import utils +from ganeti import compat +from ganeti import query + + +_EXCLUSIVE_TEXT = "exclusive" +_SHARED_TEXT = "shared" +_DELETED_TEXT = "deleted" + +_DEFAULT_PRIORITY = 0 -def ssynchronized(lock, shared=0): +def ssynchronized(mylock, shared=0): """Shared Synchronization decorator. Calls the function holding the given lock, either in exclusive or shared mode. It requires the passed lock to be a SharedLock (or support its semantics). + @type mylock: lockable object or string + @param mylock: lock to acquire or class member name of the lock to acquire + """ def wrap(fn): def sync_function(*args, **kwargs): + if isinstance(mylock, basestring): + assert args, "cannot ssynchronize on non-class method: self not found" + # args[0] is "self" + lock = getattr(args[0], mylock) + else: + lock = mylock lock.acquire(shared=shared) try: return fn(*args, **kwargs) @@ -49,21 +76,18 @@ def ssynchronized(lock, shared=0): return wrap -class _SingleActionPipeConditionWaiter(object): - """Callable helper class for _SingleActionPipeCondition. +class _SingleNotifyPipeConditionWaiter(object): + """Helper class for SingleNotifyPipeCondition """ __slots__ = [ - "_cond", "_fd", "_poller", ] - def __init__(self, cond, poller, fd): - """Initializes this class. + def __init__(self, poller, fd): + """Constructor for _SingleNotifyPipeConditionWaiter - @type cond: L{_SingleActionPipeCondition} - @param cond: Parent condition @type poller: select.poll @param poller: Poller object @type fd: int @@ -71,8 +95,6 @@ class _SingleActionPipeConditionWaiter(object): """ object.__init__(self) - - self._cond = cond self._poller = poller self._fd = fd @@ -83,10 +105,18 @@ class _SingleActionPipeConditionWaiter(object): @param timeout: Timeout for waiting (can be None) """ - start_time = time.time() - remaining_time = timeout + running_timeout = utils.RunningTimeout(timeout, True) + + while True: + remaining_time = running_timeout.Remaining() + + if remaining_time is not None: + if remaining_time < 0.0: + break + + # Our calculation uses seconds, poll() wants milliseconds + remaining_time *= 1000 - while timeout is None or remaining_time > 0: try: result = self._poller.poll(remaining_time) except EnvironmentError, err: @@ -98,110 +128,114 @@ class _SingleActionPipeConditionWaiter(object): if result and result[0][0] == self._fd: break - # Re-calculate timeout if necessary - if timeout is not None: - remaining_time = start_time + timeout - time.time() - - -class _SingleActionPipeCondition(object): - """Wrapper around a pipe for usage inside conditions. - This class contains a POSIX pipe(2) and a poller to poll it. The pipe is - always allocated when constructing the class. Extra care is taken to always - close the file descriptors. +class _BaseCondition(object): + """Base class containing common code for conditions. - An additional class, L{_SingleActionPipeConditionWaiter}, is used to wait for - notifications. - - Warning: This class is designed to be used as the underlying component of a - locking condition, but is not by itself thread safe, and needs to be - protected by an external lock. + Some of this code is taken from python's threading module. """ __slots__ = [ - "_poller", - "_read_fd", - "_write_fd", - "_nwaiters", + "_lock", + "acquire", + "release", + "_is_owned", + "_acquire_restore", + "_release_save", ] - _waiter_class = _SingleActionPipeConditionWaiter + def __init__(self, lock): + """Constructor for _BaseCondition. - def __init__(self): - """Initializes this class. + @type lock: threading.Lock + @param lock: condition base lock """ object.__init__(self) - self._nwaiters = 0 - - # Just assume the unpacking is successful, otherwise error handling gets - # very complicated. - (self._read_fd, self._write_fd) = os.pipe() try: - # The poller looks for closure of the write side - poller = select.poll() - poller.register(self._read_fd, select.POLLHUP) - - self._poller = poller - except: - if self._read_fd is not None: - os.close(self._read_fd) - if self._write_fd is not None: - os.close(self._write_fd) - raise + self._release_save = lock._release_save + except AttributeError: + self._release_save = self._base_release_save + try: + self._acquire_restore = lock._acquire_restore + except AttributeError: + self._acquire_restore = self._base_acquire_restore + try: + self._is_owned = lock._is_owned + except AttributeError: + self._is_owned = self._base_is_owned - # There should be no code here anymore, otherwise the pipe file descriptors - # may be not be cleaned up properly in case of errors. + self._lock = lock - def StartWaiting(self): - """Return function to wait for notification. + # Export the lock's acquire() and release() methods + self.acquire = lock.acquire + self.release = lock.release - @rtype: L{_SingleActionPipeConditionWaiter} - @return: Function to wait for notification + def _base_is_owned(self): + """Check whether lock is owned by current thread. """ - assert self._nwaiters >= 0 + if self._lock.acquire(0): + self._lock.release() + return False + return True - if self._poller is None: - raise RuntimeError("Already cleaned up") + def _base_release_save(self): + self._lock.release() - # Create waiter function and increase number of waiters - wait_fn = self._waiter_class(self, self._poller, self._read_fd) - self._nwaiters += 1 - return wait_fn + def _base_acquire_restore(self, _): + self._lock.acquire() - def DoneWaiting(self): - """Decrement number of waiters and automatic cleanup. + def _check_owned(self): + """Raise an exception if the current thread doesn't own the lock. - Must be called after waiting for a notification. + """ + if not self._is_owned(): + raise RuntimeError("cannot work with un-aquired lock") - @rtype: bool - @return: Whether this was the last waiter - """ - assert self._nwaiters > 0 +class SingleNotifyPipeCondition(_BaseCondition): + """Condition which can only be notified once. - self._nwaiters -= 1 + This condition class uses pipes and poll, internally, to be able to wait for + notification with a timeout, without resorting to polling. It is almost + compatible with Python's threading.Condition, with the following differences: + - notifyAll can only be called once, and no wait can happen after that + - notify is not supported, only notifyAll - if self._nwaiters == 0: - self._Cleanup() - return True + """ - return False + __slots__ = [ + "_poller", + "_read_fd", + "_write_fd", + "_nwaiters", + "_notified", + ] - def notifyAll(self): - """Close the writing side of the pipe to notify all waiters. + _waiter_class = _SingleNotifyPipeConditionWaiter - """ - if self._write_fd is None: - raise RuntimeError("Can only notify once") + def __init__(self, lock): + """Constructor for SingleNotifyPipeCondition - os.close(self._write_fd) + """ + _BaseCondition.__init__(self, lock) + self._nwaiters = 0 + self._notified = False + self._read_fd = None self._write_fd = None + self._poller = None + + def _check_unnotified(self): + """Throws an exception if already notified. + + """ + if self._notified: + raise RuntimeError("cannot use already notified condition") def _Cleanup(self): - """Close all file descriptors. + """Cleanup open file descriptors, if any. """ if self._read_fd is not None: @@ -211,19 +245,51 @@ class _SingleActionPipeCondition(object): if self._write_fd is not None: os.close(self._write_fd) self._write_fd = None - self._poller = None - def __del__(self): - """Called on object deletion. + def wait(self, timeout=None): + """Wait for a notification. + + @type timeout: float or None + @param timeout: Waiting timeout (can be None) + + """ + self._check_owned() + self._check_unnotified() + + self._nwaiters += 1 + try: + if self._poller is None: + (self._read_fd, self._write_fd) = os.pipe() + self._poller = select.poll() + self._poller.register(self._read_fd, select.POLLHUP) + + wait_fn = self._waiter_class(self._poller, self._read_fd) + state = self._release_save() + try: + # Wait for notification + wait_fn(timeout) + finally: + # Re-acquire lock + self._acquire_restore(state) + finally: + self._nwaiters -= 1 + if self._nwaiters == 0: + self._Cleanup() - Ensure no file descriptors are left open. + def notifyAll(self): # pylint: disable-msg=C0103 + """Close the writing side of the pipe to notify all waiters. """ - self._Cleanup() + self._check_owned() + self._check_unnotified() + self._notified = True + if self._write_fd is not None: + os.close(self._write_fd) + self._write_fd = None -class _PipeCondition(object): +class PipeCondition(_BaseCondition): """Group-only non-polling condition with counters. This condition class uses pipes and poll, internally, to be able to wait for @@ -234,50 +300,19 @@ class _PipeCondition(object): """ __slots__ = [ - "_lock", - "_nwaiters", - "_pipe", - "acquire", - "release", + "_waiters", + "_single_condition", ] - _pipe_class = _SingleActionPipeCondition + _single_condition_class = SingleNotifyPipeCondition def __init__(self, lock): """Initializes this class. """ - object.__init__(self) - - # Recursive locks are not supported - assert not hasattr(lock, "_acquire_restore") - assert not hasattr(lock, "_release_save") - - self._lock = lock - - # Export the lock's acquire() and release() methods - self.acquire = lock.acquire - self.release = lock.release - - self._nwaiters = 0 - self._pipe = None - - def _is_owned(self): - """Check whether lock is owned by current thread. - - """ - if self._lock.acquire(0): - self._lock.release() - return False - - return True - - def _check_owned(self): - """Raise an exception if the current thread doesn't own the lock. - - """ - if not self._is_owned(): - raise RuntimeError("cannot work with un-aquired lock") + _BaseCondition.__init__(self, lock) + self._waiters = set() + self._single_condition = self._single_condition_class(self._lock) def wait(self, timeout=None): """Wait for a notification. @@ -288,47 +323,32 @@ class _PipeCondition(object): """ self._check_owned() - if not self._pipe: - self._pipe = self._pipe_class() - # Keep local reference to the pipe. It could be replaced by another thread # notifying while we're waiting. - pipe = self._pipe + cond = self._single_condition - assert self._nwaiters >= 0 - self._nwaiters += 1 + self._waiters.add(threading.currentThread()) try: - # Get function to wait on the pipe - wait_fn = pipe.StartWaiting() - try: - # Release lock while waiting - self.release() - try: - # Wait for notification - wait_fn(timeout) - finally: - # Re-acquire lock - self.acquire() - finally: - # Destroy pipe if this was the last waiter and the current pipe is - # still the same. The same pipe cannot be reused after cleanup. - if pipe.DoneWaiting() and pipe == self._pipe: - self._pipe = None + cond.wait(timeout) finally: - assert self._nwaiters > 0 - self._nwaiters -= 1 + self._check_owned() + self._waiters.remove(threading.currentThread()) - def notifyAll(self): + def notifyAll(self): # pylint: disable-msg=C0103 """Notify all currently waiting threads. """ self._check_owned() + self._single_condition.notifyAll() + self._single_condition = self._single_condition_class(self._lock) + + def get_waiting(self): + """Returns a list of all waiting threads. + + """ + self._check_owned() - # Notify and forget pipe. A new one will be created on the next call to - # wait. - if self._pipe is not None: - self._pipe.notifyAll() - self._pipe = None + return self._waiters def has_waiting(self): """Returns whether there are active waiters. @@ -336,96 +356,74 @@ class _PipeCondition(object): """ self._check_owned() - return bool(self._nwaiters) - + return bool(self._waiters) -class _CountingCondition(object): - """Wrapper for Python's built-in threading.Condition class. - This wrapper keeps a count of active waiters. We can't access the internal - "__waiters" attribute of threading.Condition because it's not thread-safe. - - """ +class _PipeConditionWithMode(PipeCondition): __slots__ = [ - "_cond", - "_nwaiters", + "shared", ] - def __init__(self, lock): + def __init__(self, lock, shared): """Initializes this class. """ - object.__init__(self) - self._cond = threading.Condition(lock=lock) - self._nwaiters = 0 - - def notifyAll(self): - """Notifies the condition. - - """ - return self._cond.notifyAll() - - def wait(self, timeout=None): - """Waits for the condition to be notified. - - @type timeout: float or None - @param timeout: Timeout in seconds - - """ - assert self._nwaiters >= 0 - - self._nwaiters += 1 - try: - return self._cond.wait(timeout=timeout) - finally: - self._nwaiters -= 1 - - def has_waiting(self): - """Returns whether there are active waiters. - - """ - return bool(self._nwaiters) + self.shared = shared + PipeCondition.__init__(self, lock) class SharedLock(object): """Implements a shared lock. - Multiple threads can acquire the lock in a shared way, calling - acquire_shared(). In order to acquire the lock in an exclusive way threads - can call acquire_exclusive(). + Multiple threads can acquire the lock in a shared way by calling + C{acquire(shared=1)}. In order to acquire the lock in an exclusive way + threads can call C{acquire(shared=0)}. + + Notes on data structures: C{__pending} contains a priority queue (heapq) of + all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2), + ...]}. Each per-priority queue contains a normal in-order list of conditions + to be notified when the lock can be acquired. Shared locks are grouped + together by priority and the condition for them is stored in + C{__pending_shared} if it already exists. C{__pending_by_prio} keeps + references for the per-priority queues indexed by priority for faster access. - The lock prevents starvation but does not guarantee that threads will acquire - the shared lock in the order they queued for it, just that they will - eventually do so. + @type name: string + @ivar name: the name of the lock """ __slots__ = [ - "__active_shr_c", - "__inactive_shr_c", + "__weakref__", "__deleted", "__exc", "__lock", "__pending", + "__pending_by_prio", + "__pending_shared", "__shr", + "name", ] - __condition_class = _CountingCondition + __condition_class = _PipeConditionWithMode - def __init__(self): + def __init__(self, name, monitor=None): """Construct a new SharedLock. + @param name: the name of the lock + @type monitor: L{LockMonitor} + @param monitor: Lock monitor with which to register + """ object.__init__(self) + self.name = name + # Internal lock self.__lock = threading.Lock() # Queue containing waiting acquires self.__pending = [] - - # Active and inactive conditions for shared locks - self.__active_shr_c = self.__condition_class(self.__lock) - self.__inactive_shr_c = self.__condition_class(self.__lock) + self.__pending_by_prio = {} + self.__pending_shared = {} # Current lock holders self.__shr = set() @@ -434,12 +432,73 @@ class SharedLock(object): # is this lock in the deleted state? self.__deleted = False + # Register with lock monitor + if monitor: + monitor.RegisterLock(self) + + def GetInfo(self, requested): + """Retrieves information for querying locks. + + @type requested: set + @param requested: Requested information, see C{query.LQ_*} + + """ + self.__lock.acquire() + try: + # Note: to avoid unintentional race conditions, no references to + # modifiable objects should be returned unless they were created in this + # function. + mode = None + owner_names = None + + if query.LQ_MODE in requested: + if self.__deleted: + mode = _DELETED_TEXT + assert not (self.__exc or self.__shr) + elif self.__exc: + mode = _EXCLUSIVE_TEXT + elif self.__shr: + mode = _SHARED_TEXT + + # Current owner(s) are wanted + if query.LQ_OWNER in requested: + if self.__exc: + owner = [self.__exc] + else: + owner = self.__shr + + if owner: + assert not self.__deleted + owner_names = [i.getName() for i in owner] + + # Pending acquires are wanted + if query.LQ_PENDING in requested: + pending = [] + + # Sorting instead of copying and using heaq functions for simplicity + for (_, prioqueue) in sorted(self.__pending): + for cond in prioqueue: + if cond.shared: + pendmode = _SHARED_TEXT + else: + pendmode = _EXCLUSIVE_TEXT + + # List of names will be sorted in L{query._GetLockPending} + pending.append((pendmode, [i.getName() + for i in cond.get_waiting()])) + else: + pending = None + + return (self.name, mode, owner_names, pending) + finally: + self.__lock.release() + def __check_deleted(self): """Raises an exception if the lock has been deleted. """ if self.__deleted: - raise errors.LockError("Deleted lock") + raise errors.LockError("Deleted lock %s" % self.name) def __is_sharer(self): """Is the current thread sharing the lock at this time? @@ -490,7 +549,23 @@ class SharedLock(object): """ self.__lock.acquire() try: - return len(self.__pending) + return sum(len(prioqueue) for (_, prioqueue) in self.__pending) + finally: + self.__lock.release() + + def _check_empty(self): + """Checks whether there are any pending acquires. + + @rtype: bool + + """ + self.__lock.acquire() + try: + # Order is important: __find_first_pending_queue modifies __pending + return not (self.__find_first_pending_queue() or + self.__pending or + self.__pending_by_prio or + self.__pending_shared) finally: self.__lock.release() @@ -512,26 +587,52 @@ class SharedLock(object): else: return len(self.__shr) == 0 and self.__exc is None + def __find_first_pending_queue(self): + """Tries to find the topmost queued entry with pending acquires. + + Removes empty entries while going through the list. + + """ + while self.__pending: + (priority, prioqueue) = self.__pending[0] + + if not prioqueue: + heapq.heappop(self.__pending) + del self.__pending_by_prio[priority] + assert priority not in self.__pending_shared + continue + + if prioqueue: + return prioqueue + + return None + def __is_on_top(self, cond): """Checks whether the passed condition is on top of the queue. The caller must make sure the queue isn't empty. """ - return self.__pending[0] == cond + return cond == self.__find_first_pending_queue()[0] - def __acquire_unlocked(self, shared=0, timeout=None): + def __acquire_unlocked(self, shared, timeout, priority): """Acquire a shared lock. @param shared: whether to acquire in shared mode; by default an exclusive lock will be acquired @param timeout: maximum waiting time before giving up + @type priority: integer + @param priority: Priority for acquiring lock """ self.__check_deleted() # We cannot acquire the lock if we already have it - assert not self.__is_owned(), "double acquire() on a non-recursive lock" + assert not self.__is_owned(), ("double acquire() on a non-recursive lock" + " %s" % self.name) + + # Remove empty entries from queue + self.__find_first_pending_queue() # Check whether someone else holds the lock or there are pending acquires. if not self.__pending and self.__can_acquire(shared): @@ -539,20 +640,37 @@ class SharedLock(object): self.__do_acquire(shared) return True - if shared: - wait_condition = self.__active_shr_c + prioqueue = self.__pending_by_prio.get(priority, None) - # Check if we're not yet in the queue - if wait_condition not in self.__pending: - self.__pending.append(wait_condition) + if shared: + # Try to re-use condition for shared acquire + wait_condition = self.__pending_shared.get(priority, None) + assert (wait_condition is None or + (wait_condition.shared and wait_condition in prioqueue)) else: - wait_condition = self.__condition_class(self.__lock) - # Always add to queue - self.__pending.append(wait_condition) + wait_condition = None + + if wait_condition is None: + if prioqueue is None: + assert priority not in self.__pending_by_prio + + prioqueue = [] + heapq.heappush(self.__pending, (priority, prioqueue)) + self.__pending_by_prio[priority] = prioqueue + + wait_condition = self.__condition_class(self.__lock, shared) + prioqueue.append(wait_condition) + + if shared: + # Keep reference for further shared acquires on same priority. This is + # better than trying to find it in the list of pending acquires. + assert priority not in self.__pending_shared + self.__pending_shared[priority] = wait_condition try: # Wait until we become the topmost acquire in the queue or the timeout # expires. + # TODO: Decrease timeout with spurious notifications while not (self.__is_on_top(wait_condition) and self.__can_acquire(shared)): # Wait for notification @@ -569,24 +687,38 @@ class SharedLock(object): return True finally: # Remove condition from queue if there are no more waiters - if not wait_condition.has_waiting() and not self.__deleted: - self.__pending.remove(wait_condition) + if not wait_condition.has_waiting(): + prioqueue.remove(wait_condition) + if wait_condition.shared: + del self.__pending_shared[priority] return False - def acquire(self, shared=0, timeout=None): + def acquire(self, shared=0, timeout=None, priority=None, + test_notify=None): """Acquire a shared lock. - @type shared: int + @type shared: integer (0/1) used as a boolean @param shared: whether to acquire in shared mode; by default an exclusive lock will be acquired @type timeout: float @param timeout: maximum waiting time before giving up + @type priority: integer + @param priority: Priority for acquiring lock + @type test_notify: callable or None + @param test_notify: Special callback function for unittesting """ + if priority is None: + priority = _DEFAULT_PRIORITY + self.__lock.acquire() try: - return self.__acquire_unlocked(shared, timeout) + # We already got the lock, notify now + if __debug__ and callable(test_notify): + test_notify() + + return self.__acquire_unlocked(shared, timeout, priority) finally: self.__lock.release() @@ -609,18 +741,14 @@ class SharedLock(object): self.__shr.remove(threading.currentThread()) # Notify topmost condition in queue - if self.__pending: - first_condition = self.__pending[0] - first_condition.notifyAll() - - if first_condition == self.__active_shr_c: - self.__active_shr_c = self.__inactive_shr_c - self.__inactive_shr_c = first_condition + prioqueue = self.__find_first_pending_queue() + if prioqueue: + prioqueue[0].notifyAll() finally: self.__lock.release() - def delete(self, timeout=None): + def delete(self, timeout=None, priority=None): """Delete a Shared Lock. This operation will declare the lock for removal. First the lock will be @@ -629,8 +757,13 @@ class SharedLock(object): @type timeout: float @param timeout: maximum waiting time before giving up + @type priority: integer + @param priority: Priority for acquiring lock """ + if priority is None: + priority = _DEFAULT_PRIORITY + self.__lock.acquire() try: assert not self.__is_sharer(), "Cannot delete() a lock while sharing it" @@ -641,26 +774,48 @@ class SharedLock(object): acquired = self.__is_exclusive() if not acquired: - acquired = self.__acquire_unlocked(timeout) + acquired = self.__acquire_unlocked(0, timeout, priority) + + assert self.__is_exclusive() and not self.__is_sharer(), \ + "Lock wasn't acquired in exclusive mode" if acquired: self.__deleted = True self.__exc = None + assert not (self.__exc or self.__shr), "Found owner during deletion" + # Notify all acquires. They'll throw an error. - while self.__pending: - self.__pending.pop().notifyAll() + for (_, prioqueue) in self.__pending: + for cond in prioqueue: + cond.notifyAll() + + assert self.__deleted return acquired finally: self.__lock.release() + def _release_save(self): + shared = self.__is_sharer() + self.release() + return shared + + def _acquire_restore(self, shared): + self.acquire(shared=shared) + # Whenever we want to acquire a full LockSet we pass None as the value # to acquire. Hide this behind this nicely named constant. ALL_SET = None +class _AcquireTimeout(Exception): + """Internal exception to abort an acquire on a timeout. + + """ + + class LockSet: """Implements a set of locks. @@ -671,23 +826,35 @@ class LockSet: All the locks needed in the same set must be acquired together, though. + @type name: string + @ivar name: the name of the lockset + """ - def __init__(self, members=None): + def __init__(self, members, name, monitor=None): """Constructs a new LockSet. + @type members: list of strings @param members: initial members of the set + @type monitor: L{LockMonitor} + @param monitor: Lock monitor with which to register member locks """ + assert members is not None, "members parameter is not a list" + self.name = name + + # Lock monitor + self.__monitor = monitor + # Used internally to guarantee coherency. - self.__lock = SharedLock() + self.__lock = SharedLock(name) # The lockdict indexes the relationship name -> lock # The order-of-locking is implied by the alphabetical order of names self.__lockdict = {} - if members is not None: - for name in members: - self.__lockdict[name] = SharedLock() + for mname in members: + self.__lockdict[mname] = SharedLock(self._GetLockName(mname), + monitor=monitor) # The owner dict contains the set of locks each thread owns. For # performance each thread can access its own key without a global lock on @@ -698,6 +865,12 @@ class LockSet: # will be trouble. self.__owners = {} + def _GetLockName(self, mname): + """Returns the name for a member lock. + + """ + return "%s/%s" % (self.name, mname) + def _is_owned(self): """Is the current thread a current level owner?""" return threading.currentThread() in self.__owners @@ -716,6 +889,9 @@ class LockSet: def _del_owned(self, name=None): """Note the current thread owns the given lock""" + assert not (name is None and self.__lock._is_owned()), \ + "Cannot hold internal lock when deleting owner status" + if name is not None: self.__owners[threading.currentThread()].remove(name) @@ -731,6 +907,14 @@ class LockSet: else: return set() + def _release_and_delete_owned(self): + """Release and delete all resources owned by the current thread""" + for lname in self._list_owned(): + lock = self.__lockdict[lname] + if lock._is_owned(): + lock.release() + self._del_owned(name=lname) + def __names(self): """Return the current set of names. @@ -759,110 +943,171 @@ class LockSet: self.__lock.release() return set(result) - def acquire(self, names, blocking=1, shared=0): + def acquire(self, names, timeout=None, shared=0, priority=None, + test_notify=None): """Acquire a set of resource locks. + @type names: list of strings (or string) @param names: the names of the locks which shall be acquired (special lock names, or instance/node names) + @type shared: integer (0/1) used as a boolean @param shared: whether to acquire in shared mode; by default an exclusive lock will be acquired - @param blocking: whether to block while trying to acquire or to - operate in try-lock mode (this locking mode is not supported yet) + @type timeout: float or None + @param timeout: Maximum time to acquire all locks + @type priority: integer + @param priority: Priority for acquiring locks + @type test_notify: callable or None + @param test_notify: Special callback function for unittesting - @return: True when all the locks are successfully acquired + @return: Set of all locks successfully acquired or None in case of timeout @raise errors.LockError: when any lock we try to acquire has been deleted before we succeed. In this case none of the locks requested will be acquired. """ - if not blocking: - # We don't have non-blocking mode for now - raise NotImplementedError + assert timeout is None or timeout >= 0.0 # Check we don't already own locks at this level - assert not self._is_owned(), "Cannot acquire locks in the same set twice" + assert not self._is_owned(), ("Cannot acquire locks in the same set twice" + " (lockset %s)" % self.name) - if names is None: - # If no names are given acquire the whole set by not letting new names - # being added before we release, and getting the current list of names. - # Some of them may then be deleted later, but we'll cope with this. - # - # We'd like to acquire this lock in a shared way, as it's nice if - # everybody else can use the instances at the same time. If are acquiring - # them exclusively though they won't be able to do this anyway, though, - # so we'll get the list lock exclusively as well in order to be able to - # do add() on the set while owning it. - self.__lock.acquire(shared=shared) - try: - # note we own the set-lock - self._add_owned() - names = self.__names() - except: - # We shouldn't have problems adding the lock to the owners list, but - # if we did we'll try to release this lock and re-raise exception. - # Of course something is going to be really wrong, after this. - self.__lock.release() - raise + if priority is None: + priority = _DEFAULT_PRIORITY + + # We need to keep track of how long we spent waiting for a lock. The + # timeout passed to this function is over all lock acquires. + running_timeout = utils.RunningTimeout(timeout, False) try: - # Support passing in a single resource to acquire rather than many - if isinstance(names, basestring): - names = [names] + if names is not None: + # Support passing in a single resource to acquire rather than many + if isinstance(names, basestring): + names = [names] + + return self.__acquire_inner(names, False, shared, priority, + running_timeout.Remaining, test_notify) + else: - names = sorted(names) + # If no names are given acquire the whole set by not letting new names + # being added before we release, and getting the current list of names. + # Some of them may then be deleted later, but we'll cope with this. + # + # We'd like to acquire this lock in a shared way, as it's nice if + # everybody else can use the instances at the same time. If we are + # acquiring them exclusively though they won't be able to do this + # anyway, though, so we'll get the list lock exclusively as well in + # order to be able to do add() on the set while owning it. + if not self.__lock.acquire(shared=shared, priority=priority, + timeout=running_timeout.Remaining()): + raise _AcquireTimeout() + try: + # note we own the set-lock + self._add_owned() + + return self.__acquire_inner(self.__names(), True, shared, priority, + running_timeout.Remaining, test_notify) + except: + # We shouldn't have problems adding the lock to the owners list, but + # if we did we'll try to release this lock and re-raise exception. + # Of course something is going to be really wrong, after this. + self.__lock.release() + self._del_owned() + raise + + except _AcquireTimeout: + return None + + def __acquire_inner(self, names, want_all, shared, priority, + timeout_fn, test_notify): + """Inner logic for acquiring a number of locks. + + @param names: Names of the locks to be acquired + @param want_all: Whether all locks in the set should be acquired + @param shared: Whether to acquire in shared mode + @param timeout_fn: Function returning remaining timeout + @param priority: Priority for acquiring locks + @param test_notify: Special callback function for unittesting + + """ + acquire_list = [] + + # First we look the locks up on __lockdict. We have no way of being sure + # they will still be there after, but this makes it a lot faster should + # just one of them be the already wrong. Using a sorted sequence to prevent + # deadlocks. + for lname in sorted(utils.UniqueSequence(names)): + try: + lock = self.__lockdict[lname] # raises KeyError if lock is not there + except KeyError: + if want_all: + # We are acquiring all the set, it doesn't matter if this particular + # element is not there anymore. + continue + + raise errors.LockError("Non-existing lock %s in set %s (it may have" + " been removed)" % (lname, self.name)) + + acquire_list.append((lname, lock)) + + # This will hold the locknames we effectively acquired. + acquired = set() + + try: + # Now acquire_list contains a sorted list of resources and locks we + # want. In order to get them we loop on this (private) list and + # acquire() them. We gave no real guarantee they will still exist till + # this is done but .acquire() itself is safe and will alert us if the + # lock gets deleted. + for (lname, lock) in acquire_list: + if __debug__ and callable(test_notify): + test_notify_fn = lambda: test_notify(lname) + else: + test_notify_fn = None + + timeout = timeout_fn() - acquire_list = [] - # First we look the locks up on __lockdict. We have no way of being sure - # they will still be there after, but this makes it a lot faster should - # just one of them be the already wrong - for lname in utils.UniqueSequence(names): try: - lock = self.__lockdict[lname] # raises KeyError if lock is not there - acquire_list.append((lname, lock)) - except (KeyError): - if self.__lock._is_owned(): + # raises LockError if the lock was deleted + acq_success = lock.acquire(shared=shared, timeout=timeout, + priority=priority, + test_notify=test_notify_fn) + except errors.LockError: + if want_all: # We are acquiring all the set, it doesn't matter if this # particular element is not there anymore. continue - else: - raise errors.LockError('non-existing lock in set (%s)' % lname) - - # This will hold the locknames we effectively acquired. - acquired = set() - # Now acquire_list contains a sorted list of resources and locks we want. - # In order to get them we loop on this (private) list and acquire() them. - # We gave no real guarantee they will still exist till this is done but - # .acquire() itself is safe and will alert us if the lock gets deleted. - for (lname, lock) in acquire_list: + + raise errors.LockError("Non-existing lock %s in set %s (it may" + " have been removed)" % (lname, self.name)) + + if not acq_success: + # Couldn't get lock or timeout occurred + if timeout is None: + # This shouldn't happen as SharedLock.acquire(timeout=None) is + # blocking. + raise errors.LockError("Failed to get lock %s (set %s)" % + (lname, self.name)) + + raise _AcquireTimeout() + try: - lock.acquire(shared=shared) # raises LockError if the lock is deleted # now the lock cannot be deleted, we have it! self._add_owned(name=lname) acquired.add(lname) - except (errors.LockError): - if self.__lock._is_owned(): - # We are acquiring all the set, it doesn't matter if this - # particular element is not there anymore. - continue - else: - name_fail = lname - for lname in self._list_owned(): - self.__lockdict[lname].release() - self._del_owned(name=lname) - raise errors.LockError('non-existing lock in set (%s)' % name_fail) + except: # We shouldn't have problems adding the lock to the owners list, but # if we did we'll try to release this lock and re-raise exception. - # Of course something is going to be really wrong, after this. + # Of course something is going to be really wrong after this. if lock._is_owned(): lock.release() raise except: - # If something went wrong and we had the set-lock let's release it... - if self.__lock._is_owned(): - self.__lock.release() + # Release all owned locks + self._release_and_delete_owned() raise return acquired @@ -873,11 +1118,13 @@ class LockSet: You must have acquired the locks, either in shared or in exclusive mode, before releasing them. + @type names: list of strings, or None @param names: the names of the locks which shall be released (defaults to all the locks acquired at that level). """ - assert self._is_owned(), "release() on lock set while not owner" + assert self._is_owned(), ("release() on lock set %s while not owner" % + self.name) # Support passing in a single resource to release rather than many if isinstance(names, basestring): @@ -888,8 +1135,8 @@ class LockSet: else: names = set(names) assert self._list_owned().issuperset(names), ( - "release() on unheld resources %s" % - names.difference(self._list_owned())) + "release() on unheld resources %s (set %s)" % + (names.difference(self._list_owned()), self.name)) # First of all let's release the "all elements" lock, if set. # After this 'add' can work again @@ -906,14 +1153,18 @@ class LockSet: def add(self, names, acquired=0, shared=0): """Add a new set of elements to the set + @type names: list of strings @param names: names of the new elements to add + @type acquired: integer (0/1) used as a boolean @param acquired: pre-acquire the new resource? + @type shared: integer (0/1) used as a boolean @param shared: is the pre-acquisition shared? """ # Check we don't already own locks at this level assert not self._is_owned() or self.__lock._is_owned(shared=0), \ - "Cannot add locks if the set is only partially owned, or shared" + ("Cannot add locks if the set %s is only partially owned, or shared" % + self.name) # Support passing in a single resource to add rather than many if isinstance(names, basestring): @@ -932,12 +1183,15 @@ class LockSet: # This must be an explicit raise, not an assert, because assert is # turned off when using optimization, and this can happen because of # concurrency even if the user doesn't want it. - raise errors.LockError("duplicate add() (%s)" % invalid_names) + raise errors.LockError("duplicate add(%s) on lockset %s" % + (invalid_names, self.name)) for lockname in names: - lock = SharedLock() + lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor) if acquired: + # No need for priority or timeout here as this lock has just been + # created lock.acquire(shared=shared) # now the lock cannot be deleted, we have it! try: @@ -961,26 +1215,20 @@ class LockSet: return True - def remove(self, names, blocking=1): + def remove(self, names): """Remove elements from the lock set. You can either not hold anything in the lockset or already hold a superset of the elements you want to delete, exclusively. + @type names: list of strings @param names: names of the resource to remove. - @param blocking: whether to block while trying to acquire or to - operate in try-lock mode (this locking mode is not supported - yet unless you are already holding exclusively the locks) - @return:: a list of locks which we removed; the list is always + @return: a list of locks which we removed; the list is always equal to the names list if we were holding all the locks exclusively """ - if not blocking and not self._is_owned(): - # We don't have non-blocking mode for now - raise NotImplementedError - # Support passing in a single resource to remove rather than many if isinstance(names, basestring): names = [names] @@ -989,7 +1237,8 @@ class LockSet: # to delete. The ownership must also be exclusive, but that will be checked # by the lock itself. assert not self._is_owned() or self._list_owned().issuperset(names), ( - "remove() on acquired lockset while not owning all elements") + "remove() on acquired lockset %s while not owning all elements" % + self.name) removed = [] @@ -1004,7 +1253,8 @@ class LockSet: removed.append(lname) except (KeyError, errors.LockError): # This cannot happen if we were already holding it, verify: - assert not self._is_owned(), "remove failed while holding lockset" + assert not self._is_owned(), ("remove failed while holding lockset %s" + % self.name) else: # If no LockError was raised we are the ones who deleted the lock. # This means we can safely remove it from lockdict, as any further or @@ -1032,18 +1282,21 @@ class LockSet: # the same time. LEVEL_CLUSTER = 0 LEVEL_INSTANCE = 1 -LEVEL_NODE = 2 +LEVEL_NODEGROUP = 2 +LEVEL_NODE = 3 LEVELS = [LEVEL_CLUSTER, LEVEL_INSTANCE, + LEVEL_NODEGROUP, LEVEL_NODE] # Lock levels which are modifiable -LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE] +LEVELS_MOD = [LEVEL_NODE, LEVEL_NODEGROUP, LEVEL_INSTANCE] LEVEL_NAMES = { LEVEL_CLUSTER: "cluster", LEVEL_INSTANCE: "instance", + LEVEL_NODEGROUP: "nodegroup", LEVEL_NODE: "node", } @@ -1062,13 +1315,14 @@ class GanetiLockManager: """ _instance = None - def __init__(self, nodes=None, instances=None): + def __init__(self, nodes, nodegroups, instances): """Constructs a new GanetiLockManager object. There should be only a GanetiLockManager object at any time, so this function raises an error if this is not the case. @param nodes: list of node names + @param nodegroups: list of nodegroup uuids @param instances: list of instance names """ @@ -1077,13 +1331,33 @@ class GanetiLockManager: self.__class__._instance = self + self._monitor = LockMonitor() + # The keyring contains all the locks, at their level and in the correct # locking order. self.__keyring = { - LEVEL_CLUSTER: LockSet([BGL]), - LEVEL_NODE: LockSet(nodes), - LEVEL_INSTANCE: LockSet(instances), - } + LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor), + LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor), + LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroups", monitor=self._monitor), + LEVEL_INSTANCE: LockSet(instances, "instances", + monitor=self._monitor), + } + + def QueryLocks(self, fields): + """Queries information from all locks. + + See L{LockMonitor.QueryLocks}. + + """ + return self._monitor.QueryLocks(fields) + + def OldStyleQueryLocks(self, fields): + """Queries information from all locks, returning old-style data. + + See L{LockMonitor.OldStyleQueryLocks}. + + """ + return self._monitor.OldStyleQueryLocks(fields) def _names(self, level): """List the lock names at the given level. @@ -1116,9 +1390,9 @@ class GanetiLockManager: """ # This way of checking only works if LEVELS[i] = i, which we check for in # the test cases. - return utils.any((self._is_owned(l) for l in LEVELS[level + 1:])) + return compat.any((self._is_owned(l) for l in LEVELS[level + 1:])) - def _BGL_owned(self): + def _BGL_owned(self): # pylint: disable-msg=C0103 """Check if the current thread owns the BGL. Both an exclusive or a shared acquisition work. @@ -1126,7 +1400,8 @@ class GanetiLockManager: """ return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned() - def _contains_BGL(self, level, names): + @staticmethod + def _contains_BGL(level, names): # pylint: disable-msg=C0103 """Check if the level contains the BGL. Check if acting on the given level and set of names will change @@ -1135,17 +1410,21 @@ class GanetiLockManager: """ return level == LEVEL_CLUSTER and (names is None or BGL in names) - def acquire(self, level, names, blocking=1, shared=0): + def acquire(self, level, names, timeout=None, shared=0, priority=None): """Acquire a set of resource locks, at the same level. - @param level: the level at which the locks shall be acquired; - it must be a member of LEVELS. + @type level: member of locking.LEVELS + @param level: the level at which the locks shall be acquired + @type names: list of strings (or string) @param names: the names of the locks which shall be acquired (special lock names, or instance/node names) + @type shared: integer (0/1) used as a boolean @param shared: whether to acquire in shared mode; by default an exclusive lock will be acquired - @param blocking: whether to block while trying to acquire or to - operate in try-lock mode (this locking mode is not supported yet) + @type timeout: float + @param timeout: Maximum time to acquire all locks + @type priority: integer + @param priority: Priority for acquiring lock """ assert level in LEVELS, "Invalid locking level %s" % level @@ -1164,8 +1443,8 @@ class GanetiLockManager: " while owning some at a greater one") # Acquire the locks in the set. - return self.__keyring[level].acquire(names, shared=shared, - blocking=blocking) + return self.__keyring[level].acquire(names, shared=shared, timeout=timeout, + priority=priority) def release(self, level, names=None): """Release a set of resource locks, at the same level. @@ -1173,8 +1452,9 @@ class GanetiLockManager: You must have acquired the locks, either in shared or in exclusive mode, before releasing them. - @param level: the level at which the locks shall be released; - it must be a member of LEVELS + @type level: member of locking.LEVELS + @param level: the level at which the locks shall be released + @type names: list of strings, or None @param names: the names of the locks which shall be released (defaults to all the locks acquired at that level) @@ -1183,7 +1463,9 @@ class GanetiLockManager: assert (not self._contains_BGL(level, names) or not self._upper_owned(LEVEL_CLUSTER)), ( "Cannot release the Big Ganeti Lock while holding something" - " at upper levels") + " at upper levels (%r)" % + (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i)) + for i in self.__keyring.keys()]), )) # Release will complain if we don't own the locks already return self.__keyring[level].release(names) @@ -1191,10 +1473,13 @@ class GanetiLockManager: def add(self, level, names, acquired=0, shared=0): """Add locks at the specified level. - @param level: the level at which the locks shall be added; - it must be a member of LEVELS_MOD. + @type level: member of locking.LEVELS_MOD + @param level: the level at which the locks shall be added + @type names: list of strings @param names: names of the locks to acquire + @type acquired: integer (0/1) used as a boolean @param acquired: whether to acquire the newly added locks + @type shared: integer (0/1) used as a boolean @param shared: whether the acquisition will be shared """ @@ -1205,18 +1490,17 @@ class GanetiLockManager: " while owning some at a greater one") return self.__keyring[level].add(names, acquired=acquired, shared=shared) - def remove(self, level, names, blocking=1): + def remove(self, level, names): """Remove locks from the specified level. You must either already own the locks you are trying to remove exclusively or not own any lock at an upper level. - @param level: the level at which the locks shall be removed; - it must be a member of LEVELS_MOD + @type level: member of locking.LEVELS_MOD + @param level: the level at which the locks shall be removed + @type names: list of strings @param names: the names of the locks which shall be removed (special lock names, or instance/node names) - @param blocking: whether to block while trying to operate in - try-lock mode (this locking mode is not supported yet) """ assert level in LEVELS_MOD, "Invalid or immutable level %s" % level @@ -1228,4 +1512,95 @@ class GanetiLockManager: assert self._is_owned(level) or not self._upper_owned(level), ( "Cannot remove locks at a level while not owning it or" " owning some at a greater one") - return self.__keyring[level].remove(names, blocking=blocking) + return self.__keyring[level].remove(names) + + +def _MonitorSortKey((num, item)): + """Sorting key function. + + Sort by name, then by incoming order. + + """ + (name, _, _, _) = item + + return (utils.NiceSortKey(name), num) + + +class LockMonitor(object): + _LOCK_ATTR = "_lock" + + def __init__(self): + """Initializes this class. + + """ + self._lock = SharedLock("LockMonitor") + + # Counter for stable sorting + self._counter = itertools.count(0) + + # Tracked locks. Weak references are used to avoid issues with circular + # references and deletion. + self._locks = weakref.WeakKeyDictionary() + + @ssynchronized(_LOCK_ATTR) + def RegisterLock(self, lock): + """Registers a new lock. + + """ + logging.debug("Registering lock %s", lock.name) + assert lock not in self._locks, "Duplicate lock registration" + + # There used to be a check for duplicate names here. As it turned out, when + # a lock is re-created with the same name in a very short timeframe, the + # previous instance might not yet be removed from the weakref dictionary. + # By keeping track of the order of incoming registrations, a stable sort + # ordering can still be guaranteed. + + self._locks[lock] = self._counter.next() + + @ssynchronized(_LOCK_ATTR) + def _GetLockInfo(self, requested): + """Get information from all locks while the monitor lock is held. + + """ + return [(num, lock.GetInfo(requested)) for lock, num in self._locks.items()] + + def _Query(self, fields): + """Queries information from all locks. + + @type fields: list of strings + @param fields: List of fields to return + + """ + qobj = query.Query(query.LOCK_FIELDS, fields) + + # Get all data with internal lock held and then sort by name and incoming + # order + lockinfo = sorted(self._GetLockInfo(qobj.RequestedData()), + key=_MonitorSortKey) + + # Extract lock information and build query data + return (qobj, query.LockQueryData(map(operator.itemgetter(1), lockinfo))) + + def QueryLocks(self, fields): + """Queries information from all locks. + + @type fields: list of strings + @param fields: List of fields to return + + """ + (qobj, ctx) = self._Query(fields) + + # Prepare query response + return query.GetQueryResponse(qobj, ctx) + + def OldStyleQueryLocks(self, fields): + """Queries information from all locks, returning old-style data. + + @type fields: list of strings + @param fields: List of fields to return + + """ + (qobj, ctx) = self._Query(fields) + + return qobj.OldStyleQuery(ctx)