X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/7f93570a6d0a9dbc12fdf958ffb488a3544bf6f1..4a78c361a6de3bcbf98f02abfe41ae3b11de2b00:/lib/locking.py diff --git a/lib/locking.py b/lib/locking.py index 59ab7db..ea044d5 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc. +# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -20,7 +20,7 @@ """Module implementing the Ganeti locking code.""" -# pylint: disable-msg=W0212 +# pylint: disable=W0212 # W0212 since e.g. LockSet methods use (a lot) the internals of # SharedLock @@ -28,12 +28,24 @@ import os import select import threading -import time import errno +import weakref +import logging +import heapq +import itertools +import time from ganeti import errors from ganeti import utils from ganeti import compat +from ganeti import query + + +_EXCLUSIVE_TEXT = "exclusive" +_SHARED_TEXT = "shared" +_DELETED_TEXT = "deleted" + +_DEFAULT_PRIORITY = 0 def ssynchronized(mylock, shared=0): @@ -64,59 +76,6 @@ def ssynchronized(mylock, shared=0): return wrap -class RunningTimeout(object): - """Class to calculate remaining timeout when doing several operations. - - """ - __slots__ = [ - "_allow_negative", - "_start_time", - "_time_fn", - "_timeout", - ] - - def __init__(self, timeout, allow_negative, _time_fn=time.time): - """Initializes this class. - - @type timeout: float - @param timeout: Timeout duration - @type allow_negative: bool - @param allow_negative: Whether to return values below zero - @param _time_fn: Time function for unittests - - """ - object.__init__(self) - - if timeout is not None and timeout < 0.0: - raise ValueError("Timeout must not be negative") - - self._timeout = timeout - self._allow_negative = allow_negative - self._time_fn = _time_fn - - self._start_time = None - - def Remaining(self): - """Returns the remaining timeout. - - """ - if self._timeout is None: - return None - - # Get start time on first calculation - if self._start_time is None: - self._start_time = self._time_fn() - - # Calculate remaining time - remaining_timeout = self._start_time + self._timeout - self._time_fn() - - if not self._allow_negative: - # Ensure timeout is always >= 0 - return max(0.0, remaining_timeout) - - return remaining_timeout - - class _SingleNotifyPipeConditionWaiter(object): """Helper class for SingleNotifyPipeCondition @@ -146,7 +105,7 @@ class _SingleNotifyPipeConditionWaiter(object): @param timeout: Timeout for waiting (can be None) """ - running_timeout = RunningTimeout(timeout, True) + running_timeout = utils.RunningTimeout(timeout, True) while True: remaining_time = running_timeout.Remaining() @@ -203,7 +162,7 @@ class _BaseCondition(object): except AttributeError: self._acquire_restore = self._base_acquire_restore try: - self._is_owned = lock._is_owned + self._is_owned = lock.is_owned except AttributeError: self._is_owned = self._base_is_owned @@ -288,7 +247,7 @@ class SingleNotifyPipeCondition(_BaseCondition): self._write_fd = None self._poller = None - def wait(self, timeout=None): + def wait(self, timeout): """Wait for a notification. @type timeout: float or None @@ -318,7 +277,7 @@ class SingleNotifyPipeCondition(_BaseCondition): if self._nwaiters == 0: self._Cleanup() - def notifyAll(self): # pylint: disable-msg=C0103 + def notifyAll(self): # pylint: disable=C0103 """Close the writing side of the pipe to notify all waiters. """ @@ -341,7 +300,7 @@ class PipeCondition(_BaseCondition): """ __slots__ = [ - "_nwaiters", + "_waiters", "_single_condition", ] @@ -352,10 +311,10 @@ class PipeCondition(_BaseCondition): """ _BaseCondition.__init__(self, lock) - self._nwaiters = 0 + self._waiters = set() self._single_condition = self._single_condition_class(self._lock) - def wait(self, timeout=None): + def wait(self, timeout): """Wait for a notification. @type timeout: float or None @@ -366,17 +325,16 @@ class PipeCondition(_BaseCondition): # Keep local reference to the pipe. It could be replaced by another thread # notifying while we're waiting. - my_condition = self._single_condition + cond = self._single_condition - assert self._nwaiters >= 0 - self._nwaiters += 1 + self._waiters.add(threading.currentThread()) try: - my_condition.wait(timeout) + cond.wait(timeout) finally: - assert self._nwaiters > 0 - self._nwaiters -= 1 + self._check_owned() + self._waiters.remove(threading.currentThread()) - def notifyAll(self): # pylint: disable-msg=C0103 + def notifyAll(self): # pylint: disable=C0103 """Notify all currently waiting threads. """ @@ -384,62 +342,97 @@ class PipeCondition(_BaseCondition): self._single_condition.notifyAll() self._single_condition = self._single_condition_class(self._lock) + def get_waiting(self): + """Returns a list of all waiting threads. + + """ + self._check_owned() + + return self._waiters + def has_waiting(self): """Returns whether there are active waiters. """ self._check_owned() - return bool(self._nwaiters) + return bool(self._waiters) + + def __repr__(self): + return ("<%s.%s waiters=%s at %#x>" % + (self.__class__.__module__, self.__class__.__name__, + self._waiters, id(self))) + + +class _PipeConditionWithMode(PipeCondition): + __slots__ = [ + "shared", + ] + + def __init__(self, lock, shared): + """Initializes this class. + + """ + self.shared = shared + PipeCondition.__init__(self, lock) class SharedLock(object): """Implements a shared lock. - Multiple threads can acquire the lock in a shared way, calling - acquire_shared(). In order to acquire the lock in an exclusive way threads - can call acquire_exclusive(). + Multiple threads can acquire the lock in a shared way by calling + C{acquire(shared=1)}. In order to acquire the lock in an exclusive way + threads can call C{acquire(shared=0)}. - The lock prevents starvation but does not guarantee that threads will acquire - the shared lock in the order they queued for it, just that they will - eventually do so. + Notes on data structures: C{__pending} contains a priority queue (heapq) of + all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2), + ...]}. Each per-priority queue contains a normal in-order list of conditions + to be notified when the lock can be acquired. Shared locks are grouped + together by priority and the condition for them is stored in + C{__pending_shared} if it already exists. C{__pending_by_prio} keeps + references for the per-priority queues indexed by priority for faster access. @type name: string @ivar name: the name of the lock """ __slots__ = [ - "__active_shr_c", - "__inactive_shr_c", + "__weakref__", "__deleted", "__exc", "__lock", "__pending", + "__pending_by_prio", + "__pending_shared", "__shr", + "__time_fn", "name", ] - __condition_class = PipeCondition + __condition_class = _PipeConditionWithMode - def __init__(self, name): + def __init__(self, name, monitor=None, _time_fn=time.time): """Construct a new SharedLock. @param name: the name of the lock + @type monitor: L{LockMonitor} + @param monitor: Lock monitor with which to register """ object.__init__(self) self.name = name + # Used for unittesting + self.__time_fn = _time_fn + # Internal lock self.__lock = threading.Lock() # Queue containing waiting acquires self.__pending = [] - - # Active and inactive conditions for shared locks - self.__active_shr_c = self.__condition_class(self.__lock) - self.__inactive_shr_c = self.__condition_class(self.__lock) + self.__pending_by_prio = {} + self.__pending_shared = {} # Current lock holders self.__shr = set() @@ -448,6 +441,73 @@ class SharedLock(object): # is this lock in the deleted state? self.__deleted = False + # Register with lock monitor + if monitor: + logging.debug("Adding lock %s to monitor", name) + monitor.RegisterLock(self) + + def __repr__(self): + return ("<%s.%s name=%s at %#x>" % + (self.__class__.__module__, self.__class__.__name__, + self.name, id(self))) + + def GetLockInfo(self, requested): + """Retrieves information for querying locks. + + @type requested: set + @param requested: Requested information, see C{query.LQ_*} + + """ + self.__lock.acquire() + try: + # Note: to avoid unintentional race conditions, no references to + # modifiable objects should be returned unless they were created in this + # function. + mode = None + owner_names = None + + if query.LQ_MODE in requested: + if self.__deleted: + mode = _DELETED_TEXT + assert not (self.__exc or self.__shr) + elif self.__exc: + mode = _EXCLUSIVE_TEXT + elif self.__shr: + mode = _SHARED_TEXT + + # Current owner(s) are wanted + if query.LQ_OWNER in requested: + if self.__exc: + owner = [self.__exc] + else: + owner = self.__shr + + if owner: + assert not self.__deleted + owner_names = [i.getName() for i in owner] + + # Pending acquires are wanted + if query.LQ_PENDING in requested: + pending = [] + + # Sorting instead of copying and using heaq functions for simplicity + for (_, prioqueue) in sorted(self.__pending): + for cond in prioqueue: + if cond.shared: + pendmode = _SHARED_TEXT + else: + pendmode = _EXCLUSIVE_TEXT + + # List of names will be sorted in L{query._GetLockPending} + pending.append((pendmode, [i.getName() + for i in cond.get_waiting()])) + else: + pending = None + + return [(self.name, mode, owner_names, pending)] + finally: + self.__lock.release() + def __check_deleted(self): """Raises an exception if the lock has been deleted. @@ -481,7 +541,7 @@ class SharedLock(object): else: return self.__is_exclusive() - def _is_owned(self, shared=-1): + def is_owned(self, shared=-1): """Is the current thread somehow owning the lock at this time? @param shared: @@ -496,6 +556,10 @@ class SharedLock(object): finally: self.__lock.release() + #: Necessary to remain compatible with threading.Condition, which tries to + #: retrieve a locks' "_is_owned" attribute + _is_owned = is_owned + def _count_pending(self): """Returns the number of pending acquires. @@ -504,7 +568,25 @@ class SharedLock(object): """ self.__lock.acquire() try: - return len(self.__pending) + return sum(len(prioqueue) for (_, prioqueue) in self.__pending) + finally: + self.__lock.release() + + def _check_empty(self): + """Checks whether there are any pending acquires. + + @rtype: bool + + """ + self.__lock.acquire() + try: + # Order is important: __find_first_pending_queue modifies __pending + (_, prioqueue) = self.__find_first_pending_queue() + + return not (prioqueue or + self.__pending or + self.__pending_by_prio or + self.__pending_shared) finally: self.__lock.release() @@ -526,20 +608,43 @@ class SharedLock(object): else: return len(self.__shr) == 0 and self.__exc is None + def __find_first_pending_queue(self): + """Tries to find the topmost queued entry with pending acquires. + + Removes empty entries while going through the list. + + """ + while self.__pending: + (priority, prioqueue) = self.__pending[0] + + if prioqueue: + return (priority, prioqueue) + + # Remove empty queue + heapq.heappop(self.__pending) + del self.__pending_by_prio[priority] + assert priority not in self.__pending_shared + + return (None, None) + def __is_on_top(self, cond): """Checks whether the passed condition is on top of the queue. The caller must make sure the queue isn't empty. """ - return self.__pending[0] == cond + (_, prioqueue) = self.__find_first_pending_queue() + + return cond == prioqueue[0] - def __acquire_unlocked(self, shared, timeout): + def __acquire_unlocked(self, shared, timeout, priority): """Acquire a shared lock. @param shared: whether to acquire in shared mode; by default an exclusive lock will be acquired @param timeout: maximum waiting time before giving up + @type priority: integer + @param priority: Priority for acquiring lock """ self.__check_deleted() @@ -548,48 +653,76 @@ class SharedLock(object): assert not self.__is_owned(), ("double acquire() on a non-recursive lock" " %s" % self.name) + # Remove empty entries from queue + self.__find_first_pending_queue() + # Check whether someone else holds the lock or there are pending acquires. if not self.__pending and self.__can_acquire(shared): # Apparently not, can acquire lock directly. self.__do_acquire(shared) return True - if shared: - wait_condition = self.__active_shr_c + prioqueue = self.__pending_by_prio.get(priority, None) - # Check if we're not yet in the queue - if wait_condition not in self.__pending: - self.__pending.append(wait_condition) + if shared: + # Try to re-use condition for shared acquire + wait_condition = self.__pending_shared.get(priority, None) + assert (wait_condition is None or + (wait_condition.shared and wait_condition in prioqueue)) else: - wait_condition = self.__condition_class(self.__lock) - # Always add to queue - self.__pending.append(wait_condition) + wait_condition = None + + if wait_condition is None: + if prioqueue is None: + assert priority not in self.__pending_by_prio + + prioqueue = [] + heapq.heappush(self.__pending, (priority, prioqueue)) + self.__pending_by_prio[priority] = prioqueue + + wait_condition = self.__condition_class(self.__lock, shared) + prioqueue.append(wait_condition) + + if shared: + # Keep reference for further shared acquires on same priority. This is + # better than trying to find it in the list of pending acquires. + assert priority not in self.__pending_shared + self.__pending_shared[priority] = wait_condition + + wait_start = self.__time_fn() + acquired = False try: # Wait until we become the topmost acquire in the queue or the timeout # expires. - while not (self.__is_on_top(wait_condition) and - self.__can_acquire(shared)): - # Wait for notification - wait_condition.wait(timeout) - self.__check_deleted() + while True: + if self.__is_on_top(wait_condition) and self.__can_acquire(shared): + self.__do_acquire(shared) + acquired = True + break - # A lot of code assumes blocking acquires always succeed. Loop - # internally for that case. - if timeout is not None: + # A lot of code assumes blocking acquires always succeed, therefore we + # can never return False for a blocking acquire + if (timeout is not None and + utils.TimeoutExpired(wait_start, timeout, _time_fn=self.__time_fn)): break - if self.__is_on_top(wait_condition) and self.__can_acquire(shared): - self.__do_acquire(shared) - return True + # Wait for notification + wait_condition.wait(timeout) + self.__check_deleted() finally: # Remove condition from queue if there are no more waiters - if not wait_condition.has_waiting() and not self.__deleted: - self.__pending.remove(wait_condition) + if not wait_condition.has_waiting(): + prioqueue.remove(wait_condition) + if wait_condition.shared: + # Remove from list of shared acquires if it wasn't while releasing + # (e.g. on lock deletion) + self.__pending_shared.pop(priority, None) - return False + return acquired - def acquire(self, shared=0, timeout=None, test_notify=None): + def acquire(self, shared=0, timeout=None, priority=None, + test_notify=None): """Acquire a shared lock. @type shared: integer (0/1) used as a boolean @@ -597,17 +730,64 @@ class SharedLock(object): exclusive lock will be acquired @type timeout: float @param timeout: maximum waiting time before giving up + @type priority: integer + @param priority: Priority for acquiring lock @type test_notify: callable or None @param test_notify: Special callback function for unittesting """ + if priority is None: + priority = _DEFAULT_PRIORITY + self.__lock.acquire() try: # We already got the lock, notify now if __debug__ and callable(test_notify): test_notify() - return self.__acquire_unlocked(shared, timeout) + return self.__acquire_unlocked(shared, timeout, priority) + finally: + self.__lock.release() + + def downgrade(self): + """Changes the lock mode from exclusive to shared. + + Pending acquires in shared mode on the same priority will go ahead. + + """ + self.__lock.acquire() + try: + assert self.__is_owned(), "Lock must be owned" + + if self.__is_exclusive(): + # Do nothing if the lock is already acquired in shared mode + self.__exc = None + self.__do_acquire(1) + + # Important: pending shared acquires should only jump ahead if there + # was a transition from exclusive to shared, otherwise an owner of a + # shared lock can keep calling this function to push incoming shared + # acquires + (priority, prioqueue) = self.__find_first_pending_queue() + if prioqueue: + # Is there a pending shared acquire on this priority? + cond = self.__pending_shared.pop(priority, None) + if cond: + assert cond.shared + assert cond in prioqueue + + # Ensure shared acquire is on top of queue + if len(prioqueue) > 1: + prioqueue.remove(cond) + prioqueue.insert(0, cond) + + # Notify + cond.notifyAll() + + assert not self.__is_exclusive() + assert self.__is_sharer() + + return True finally: self.__lock.release() @@ -626,22 +806,42 @@ class SharedLock(object): # Autodetect release type if self.__is_exclusive(): self.__exc = None + notify = True else: self.__shr.remove(threading.currentThread()) + notify = not self.__shr - # Notify topmost condition in queue - if self.__pending: - first_condition = self.__pending[0] - first_condition.notifyAll() + # Notify topmost condition in queue if there are no owners left (for + # shared locks) + if notify: + self.__notify_topmost() + finally: + self.__lock.release() - if first_condition == self.__active_shr_c: - self.__active_shr_c = self.__inactive_shr_c - self.__inactive_shr_c = first_condition + def __notify_topmost(self): + """Notifies topmost condition in queue of pending acquires. + """ + (priority, prioqueue) = self.__find_first_pending_queue() + if prioqueue: + cond = prioqueue[0] + cond.notifyAll() + if cond.shared: + # Prevent further shared acquires from sneaking in while waiters are + # notified + self.__pending_shared.pop(priority, None) + + def _notify_topmost(self): + """Exported version of L{__notify_topmost}. + + """ + self.__lock.acquire() + try: + return self.__notify_topmost() finally: self.__lock.release() - def delete(self, timeout=None): + def delete(self, timeout=None, priority=None): """Delete a Shared Lock. This operation will declare the lock for removal. First the lock will be @@ -650,8 +850,13 @@ class SharedLock(object): @type timeout: float @param timeout: maximum waiting time before giving up + @type priority: integer + @param priority: Priority for acquiring lock """ + if priority is None: + priority = _DEFAULT_PRIORITY + self.__lock.acquire() try: assert not self.__is_sharer(), "Cannot delete() a lock while sharing it" @@ -662,18 +867,23 @@ class SharedLock(object): acquired = self.__is_exclusive() if not acquired: - acquired = self.__acquire_unlocked(0, timeout) + acquired = self.__acquire_unlocked(0, timeout, priority) + if acquired: assert self.__is_exclusive() and not self.__is_sharer(), \ "Lock wasn't acquired in exclusive mode" - if acquired: self.__deleted = True self.__exc = None + assert not (self.__exc or self.__shr), "Found owner during deletion" + # Notify all acquires. They'll throw an error. - while self.__pending: - self.__pending.pop().notifyAll() + for (_, prioqueue) in self.__pending: + for cond in prioqueue: + cond.notifyAll() + + assert self.__deleted return acquired finally: @@ -713,25 +923,31 @@ class LockSet: @ivar name: the name of the lockset """ - def __init__(self, members, name): + def __init__(self, members, name, monitor=None): """Constructs a new LockSet. @type members: list of strings @param members: initial members of the set + @type monitor: L{LockMonitor} + @param monitor: Lock monitor with which to register member locks """ assert members is not None, "members parameter is not a list" self.name = name - # Used internally to guarantee coherency. - self.__lock = SharedLock(name) + # Lock monitor + self.__monitor = monitor + + # Used internally to guarantee coherency + self.__lock = SharedLock(self._GetLockName("[lockset]"), monitor=monitor) # The lockdict indexes the relationship name -> lock # The order-of-locking is implied by the alphabetical order of names self.__lockdict = {} for mname in members: - self.__lockdict[mname] = SharedLock("%s/%s" % (name, mname)) + self.__lockdict[mname] = SharedLock(self._GetLockName(mname), + monitor=monitor) # The owner dict contains the set of locks each thread owns. For # performance each thread can access its own key without a global lock on @@ -742,17 +958,74 @@ class LockSet: # will be trouble. self.__owners = {} - def _is_owned(self): - """Is the current thread a current level owner?""" + def _GetLockName(self, mname): + """Returns the name for a member lock. + + """ + return "%s/%s" % (self.name, mname) + + def _get_lock(self): + """Returns the lockset-internal lock. + + """ + return self.__lock + + def _get_lockdict(self): + """Returns the lockset-internal lock dictionary. + + Accessing this structure is only safe in single-thread usage or when the + lockset-internal lock is held. + + """ + return self.__lockdict + + def is_owned(self): + """Is the current thread a current level owner? + + @note: Use L{check_owned} to check if a specific lock is held + + """ return threading.currentThread() in self.__owners + def check_owned(self, names, shared=-1): + """Check if locks are owned in a specific mode. + + @type names: sequence or string + @param names: Lock names (or a single lock name) + @param shared: See L{SharedLock.is_owned} + @rtype: bool + @note: Use L{is_owned} to check if the current thread holds I{any} lock and + L{list_owned} to get the names of all owned locks + + """ + if isinstance(names, basestring): + names = [names] + + # Avoid check if no locks are owned anyway + if names and self.is_owned(): + candidates = [] + + # Gather references to all locks (in case they're deleted in the meantime) + for lname in names: + try: + lock = self.__lockdict[lname] + except KeyError: + raise errors.LockError("Non-existing lock '%s' in set '%s' (it may" + " have been removed)" % (lname, self.name)) + else: + candidates.append(lock) + + return compat.all(lock.is_owned(shared=shared) for lock in candidates) + else: + return False + def _add_owned(self, name=None): """Note the current thread owns the given lock""" if name is None: - if not self._is_owned(): + if not self.is_owned(): self.__owners[threading.currentThread()] = set() else: - if self._is_owned(): + if self.is_owned(): self.__owners[threading.currentThread()].add(name) else: self.__owners[threading.currentThread()] = set([name]) @@ -760,29 +1033,29 @@ class LockSet: def _del_owned(self, name=None): """Note the current thread owns the given lock""" - assert not (name is None and self.__lock._is_owned()), \ + assert not (name is None and self.__lock.is_owned()), \ "Cannot hold internal lock when deleting owner status" if name is not None: self.__owners[threading.currentThread()].remove(name) # Only remove the key if we don't hold the set-lock as well - if (not self.__lock._is_owned() and + if (not self.__lock.is_owned() and not self.__owners[threading.currentThread()]): del self.__owners[threading.currentThread()] - def _list_owned(self): + def list_owned(self): """Get the set of resource names owned by the current thread""" - if self._is_owned(): + if self.is_owned(): return self.__owners[threading.currentThread()].copy() else: return set() def _release_and_delete_owned(self): """Release and delete all resources owned by the current thread""" - for lname in self._list_owned(): + for lname in self.list_owned(): lock = self.__lockdict[lname] - if lock._is_owned(): + if lock.is_owned(): lock.release() self._del_owned(name=lname) @@ -804,7 +1077,7 @@ class LockSet: # If we don't already own the set-level lock acquired # we'll get it and note we need to release it later. release_lock = False - if not self.__lock._is_owned(): + if not self.__lock.is_owned(): release_lock = True self.__lock.acquire(shared=1) try: @@ -814,7 +1087,8 @@ class LockSet: self.__lock.release() return set(result) - def acquire(self, names, timeout=None, shared=0, test_notify=None): + def acquire(self, names, timeout=None, shared=0, priority=None, + test_notify=None): """Acquire a set of resource locks. @type names: list of strings (or string) @@ -825,6 +1099,8 @@ class LockSet: exclusive lock will be acquired @type timeout: float or None @param timeout: Maximum time to acquire all locks + @type priority: integer + @param priority: Priority for acquiring locks @type test_notify: callable or None @param test_notify: Special callback function for unittesting @@ -838,12 +1114,15 @@ class LockSet: assert timeout is None or timeout >= 0.0 # Check we don't already own locks at this level - assert not self._is_owned(), ("Cannot acquire locks in the same set twice" - " (lockset %s)" % self.name) + assert not self.is_owned(), ("Cannot acquire locks in the same set twice" + " (lockset %s)" % self.name) + + if priority is None: + priority = _DEFAULT_PRIORITY # We need to keep track of how long we spent waiting for a lock. The # timeout passed to this function is over all lock acquires. - running_timeout = RunningTimeout(timeout, False) + running_timeout = utils.RunningTimeout(timeout, False) try: if names is not None: @@ -851,7 +1130,7 @@ class LockSet: if isinstance(names, basestring): names = [names] - return self.__acquire_inner(names, False, shared, + return self.__acquire_inner(names, False, shared, priority, running_timeout.Remaining, test_notify) else: @@ -860,18 +1139,18 @@ class LockSet: # Some of them may then be deleted later, but we'll cope with this. # # We'd like to acquire this lock in a shared way, as it's nice if - # everybody else can use the instances at the same time. If are + # everybody else can use the instances at the same time. If we are # acquiring them exclusively though they won't be able to do this # anyway, though, so we'll get the list lock exclusively as well in # order to be able to do add() on the set while owning it. - if not self.__lock.acquire(shared=shared, + if not self.__lock.acquire(shared=shared, priority=priority, timeout=running_timeout.Remaining()): raise _AcquireTimeout() try: # note we own the set-lock self._add_owned() - return self.__acquire_inner(self.__names(), True, shared, + return self.__acquire_inner(self.__names(), True, shared, priority, running_timeout.Remaining, test_notify) except: # We shouldn't have problems adding the lock to the owners list, but @@ -884,13 +1163,15 @@ class LockSet: except _AcquireTimeout: return None - def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify): + def __acquire_inner(self, names, want_all, shared, priority, + timeout_fn, test_notify): """Inner logic for acquiring a number of locks. @param names: Names of the locks to be acquired @param want_all: Whether all locks in the set should be acquired @param shared: Whether to acquire in shared mode @param timeout_fn: Function returning remaining timeout + @param priority: Priority for acquiring locks @param test_notify: Special callback function for unittesting """ @@ -909,8 +1190,8 @@ class LockSet: # element is not there anymore. continue - raise errors.LockError("Non-existing lock %s in set %s" % - (lname, self.name)) + raise errors.LockError("Non-existing lock %s in set %s (it may have" + " been removed)" % (lname, self.name)) acquire_list.append((lname, lock)) @@ -934,6 +1215,7 @@ class LockSet: try: # raises LockError if the lock was deleted acq_success = lock.acquire(shared=shared, timeout=timeout, + priority=priority, test_notify=test_notify_fn) except errors.LockError: if want_all: @@ -941,8 +1223,8 @@ class LockSet: # particular element is not there anymore. continue - raise errors.LockError("Non-existing lock %s in set %s" % - (lname, self.name)) + raise errors.LockError("Non-existing lock %s in set %s (it may" + " have been removed)" % (lname, self.name)) if not acq_success: # Couldn't get lock or timeout occurred @@ -963,7 +1245,7 @@ class LockSet: # We shouldn't have problems adding the lock to the owners list, but # if we did we'll try to release this lock and re-raise exception. # Of course something is going to be really wrong after this. - if lock._is_owned(): + if lock.is_owned(): lock.release() raise @@ -974,6 +1256,42 @@ class LockSet: return acquired + def downgrade(self, names=None): + """Downgrade a set of resource locks from exclusive to shared mode. + + The locks must have been acquired in exclusive mode. + + """ + assert self.is_owned(), ("downgrade on lockset %s while not owning any" + " lock" % self.name) + + # Support passing in a single resource to downgrade rather than many + if isinstance(names, basestring): + names = [names] + + owned = self.list_owned() + + if names is None: + names = owned + else: + names = set(names) + assert owned.issuperset(names), \ + ("downgrade() on unheld resources %s (set %s)" % + (names.difference(owned), self.name)) + + for lockname in names: + self.__lockdict[lockname].downgrade() + + # Do we own the lockset in exclusive mode? + if self.__lock.is_owned(shared=0): + # Have all locks been downgraded? + if not compat.any(lock.is_owned(shared=0) + for lock in self.__lockdict.values()): + self.__lock.downgrade() + assert self.__lock.is_owned(shared=1) + + return True + def release(self, names=None): """Release a set of resource locks, at the same level. @@ -985,24 +1303,24 @@ class LockSet: (defaults to all the locks acquired at that level). """ - assert self._is_owned(), ("release() on lock set %s while not owner" % - self.name) + assert self.is_owned(), ("release() on lock set %s while not owner" % + self.name) # Support passing in a single resource to release rather than many if isinstance(names, basestring): names = [names] if names is None: - names = self._list_owned() + names = self.list_owned() else: names = set(names) - assert self._list_owned().issuperset(names), ( + assert self.list_owned().issuperset(names), ( "release() on unheld resources %s (set %s)" % - (names.difference(self._list_owned()), self.name)) + (names.difference(self.list_owned()), self.name)) # First of all let's release the "all elements" lock, if set. # After this 'add' can work again - if self.__lock._is_owned(): + if self.__lock.is_owned(): self.__lock.release() self._del_owned() @@ -1024,7 +1342,7 @@ class LockSet: """ # Check we don't already own locks at this level - assert not self._is_owned() or self.__lock._is_owned(shared=0), \ + assert not self.is_owned() or self.__lock.is_owned(shared=0), \ ("Cannot add locks if the set %s is only partially owned, or shared" % self.name) @@ -1035,7 +1353,7 @@ class LockSet: # If we don't already own the set-level lock acquired in an exclusive way # we'll get it and note we need to release it later. release_lock = False - if not self.__lock._is_owned(): + if not self.__lock.is_owned(): release_lock = True self.__lock.acquire() @@ -1049,9 +1367,11 @@ class LockSet: (invalid_names, self.name)) for lockname in names: - lock = SharedLock("%s/%s" % (self.name, lockname)) + lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor) if acquired: + # No need for priority or timeout here as this lock has just been + # created lock.acquire(shared=shared) # now the lock cannot be deleted, we have it! try: @@ -1096,7 +1416,7 @@ class LockSet: # If we own any subset of this lock it must be a superset of what we want # to delete. The ownership must also be exclusive, but that will be checked # by the lock itself. - assert not self._is_owned() or self._list_owned().issuperset(names), ( + assert not self.is_owned() or self.list_owned().issuperset(names), ( "remove() on acquired lockset %s while not owning all elements" % self.name) @@ -1113,8 +1433,8 @@ class LockSet: removed.append(lname) except (KeyError, errors.LockError): # This cannot happen if we were already holding it, verify: - assert not self._is_owned(), ("remove failed while holding lockset %s" - % self.name) + assert not self.is_owned(), ("remove failed while holding lockset %s" % + self.name) else: # If no LockError was raised we are the ones who deleted the lock. # This means we can safely remove it from lockdict, as any further or @@ -1125,7 +1445,7 @@ class LockSet: # it's the job of the one who actually deleted it. del self.__lockdict[lname] # And let's remove it from our private list if we owned it. - if self._is_owned(): + if self.is_owned(): self._del_owned(name=lname) return removed @@ -1142,23 +1462,37 @@ class LockSet: # the same time. LEVEL_CLUSTER = 0 LEVEL_INSTANCE = 1 -LEVEL_NODE = 2 - -LEVELS = [LEVEL_CLUSTER, - LEVEL_INSTANCE, - LEVEL_NODE] +LEVEL_NODEGROUP = 2 +LEVEL_NODE = 3 +LEVEL_NODE_RES = 4 + +LEVELS = [ + LEVEL_CLUSTER, + LEVEL_INSTANCE, + LEVEL_NODEGROUP, + LEVEL_NODE, + LEVEL_NODE_RES, + ] # Lock levels which are modifiable -LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE] - +LEVELS_MOD = frozenset([ + LEVEL_NODE_RES, + LEVEL_NODE, + LEVEL_NODEGROUP, + LEVEL_INSTANCE, + ]) + +#: Lock level names (make sure to use singular form) LEVEL_NAMES = { LEVEL_CLUSTER: "cluster", LEVEL_INSTANCE: "instance", + LEVEL_NODEGROUP: "nodegroup", LEVEL_NODE: "node", + LEVEL_NODE_RES: "node-res", } # Constant for the big ganeti lock -BGL = 'BGL' +BGL = "BGL" class GanetiLockManager: @@ -1172,13 +1506,14 @@ class GanetiLockManager: """ _instance = None - def __init__(self, nodes=None, instances=None): + def __init__(self, nodes, nodegroups, instances): """Constructs a new GanetiLockManager object. There should be only a GanetiLockManager object at any time, so this function raises an error if this is not the case. @param nodes: list of node names + @param nodegroups: list of nodegroup uuids @param instances: list of instance names """ @@ -1187,13 +1522,37 @@ class GanetiLockManager: self.__class__._instance = self + self._monitor = LockMonitor() + # The keyring contains all the locks, at their level and in the correct # locking order. self.__keyring = { - LEVEL_CLUSTER: LockSet([BGL], "bgl lockset"), - LEVEL_NODE: LockSet(nodes, "nodes lockset"), - LEVEL_INSTANCE: LockSet(instances, "instances lockset"), - } + LEVEL_CLUSTER: LockSet([BGL], "cluster", monitor=self._monitor), + LEVEL_NODE: LockSet(nodes, "node", monitor=self._monitor), + LEVEL_NODE_RES: LockSet(nodes, "node-res", monitor=self._monitor), + LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroup", monitor=self._monitor), + LEVEL_INSTANCE: LockSet(instances, "instance", + monitor=self._monitor), + } + + assert compat.all(ls.name == LEVEL_NAMES[level] + for (level, ls) in self.__keyring.items()) + + def AddToLockMonitor(self, provider): + """Registers a new lock with the monitor. + + See L{LockMonitor.RegisterLock}. + + """ + return self._monitor.RegisterLock(provider) + + def QueryLocks(self, fields): + """Queries information from all locks. + + See L{LockMonitor.QueryLocks}. + + """ + return self._monitor.QueryLocks(fields) def _names(self, level): """List the lock names at the given level. @@ -1206,19 +1565,25 @@ class GanetiLockManager: assert level in LEVELS, "Invalid locking level %s" % level return self.__keyring[level]._names() - def _is_owned(self, level): + def is_owned(self, level): """Check whether we are owning locks at the given level """ - return self.__keyring[level]._is_owned() - - is_owned = _is_owned + return self.__keyring[level].is_owned() - def _list_owned(self, level): + def list_owned(self, level): """Get the set of owned locks at the given level """ - return self.__keyring[level]._list_owned() + return self.__keyring[level].list_owned() + + def check_owned(self, level, names, shared=-1): + """Check if locks at a certain level are owned in a specific mode. + + @see: L{LockSet.check_owned} + + """ + return self.__keyring[level].check_owned(names, shared=shared) def _upper_owned(self, level): """Check that we don't own any lock at a level greater than the given one. @@ -1226,18 +1591,18 @@ class GanetiLockManager: """ # This way of checking only works if LEVELS[i] = i, which we check for in # the test cases. - return compat.any((self._is_owned(l) for l in LEVELS[level + 1:])) + return compat.any((self.is_owned(l) for l in LEVELS[level + 1:])) - def _BGL_owned(self): # pylint: disable-msg=C0103 + def _BGL_owned(self): # pylint: disable=C0103 """Check if the current thread owns the BGL. Both an exclusive or a shared acquisition work. """ - return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned() + return BGL in self.__keyring[LEVEL_CLUSTER].list_owned() @staticmethod - def _contains_BGL(level, names): # pylint: disable-msg=C0103 + def _contains_BGL(level, names): # pylint: disable=C0103 """Check if the level contains the BGL. Check if acting on the given level and set of names will change @@ -1246,7 +1611,7 @@ class GanetiLockManager: """ return level == LEVEL_CLUSTER and (names is None or BGL in names) - def acquire(self, level, names, timeout=None, shared=0): + def acquire(self, level, names, timeout=None, shared=0, priority=None): """Acquire a set of resource locks, at the same level. @type level: member of locking.LEVELS @@ -1259,6 +1624,8 @@ class GanetiLockManager: an exclusive lock will be acquired @type timeout: float @param timeout: Maximum time to acquire all locks + @type priority: integer + @param priority: Priority for acquiring lock """ assert level in LEVELS, "Invalid locking level %s" % level @@ -1277,7 +1644,24 @@ class GanetiLockManager: " while owning some at a greater one") # Acquire the locks in the set. - return self.__keyring[level].acquire(names, shared=shared, timeout=timeout) + return self.__keyring[level].acquire(names, shared=shared, timeout=timeout, + priority=priority) + + def downgrade(self, level, names=None): + """Downgrade a set of resource locks from exclusive to shared mode. + + You must have acquired the locks in exclusive mode. + + @type level: member of locking.LEVELS + @param level: the level at which the locks shall be downgraded + @type names: list of strings, or None + @param names: the names of the locks which shall be downgraded + (defaults to all the locks acquired at the level) + + """ + assert level in LEVELS, "Invalid locking level %s" % level + + return self.__keyring[level].downgrade(names=names) def release(self, level, names=None): """Release a set of resource locks, at the same level. @@ -1297,7 +1681,7 @@ class GanetiLockManager: not self._upper_owned(LEVEL_CLUSTER)), ( "Cannot release the Big Ganeti Lock while holding something" " at upper levels (%r)" % - (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i)) + (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self.list_owned(i)) for i in self.__keyring.keys()]), )) # Release will complain if we don't own the locks already @@ -1342,7 +1726,104 @@ class GanetiLockManager: # Check we either own the level or don't own anything from here # up. LockSet.remove() will check the case in which we don't own # all the needed resources, or we have a shared ownership. - assert self._is_owned(level) or not self._upper_owned(level), ( + assert self.is_owned(level) or not self._upper_owned(level), ( "Cannot remove locks at a level while not owning it or" " owning some at a greater one") return self.__keyring[level].remove(names) + + +def _MonitorSortKey((item, idx, num)): + """Sorting key function. + + Sort by name, registration order and then order of information. This provides + a stable sort order over different providers, even if they return the same + name. + + """ + (name, _, _, _) = item + + return (utils.NiceSortKey(name), num, idx) + + +class LockMonitor(object): + _LOCK_ATTR = "_lock" + + def __init__(self): + """Initializes this class. + + """ + self._lock = SharedLock("LockMonitor") + + # Counter for stable sorting + self._counter = itertools.count(0) + + # Tracked locks. Weak references are used to avoid issues with circular + # references and deletion. + self._locks = weakref.WeakKeyDictionary() + + @ssynchronized(_LOCK_ATTR) + def RegisterLock(self, provider): + """Registers a new lock. + + @param provider: Object with a callable method named C{GetLockInfo}, taking + a single C{set} containing the requested information items + @note: It would be nicer to only receive the function generating the + requested information but, as it turns out, weak references to bound + methods (e.g. C{self.GetLockInfo}) are tricky; there are several + workarounds, but none of the ones I found works properly in combination + with a standard C{WeakKeyDictionary} + + """ + assert provider not in self._locks, "Duplicate registration" + + # There used to be a check for duplicate names here. As it turned out, when + # a lock is re-created with the same name in a very short timeframe, the + # previous instance might not yet be removed from the weakref dictionary. + # By keeping track of the order of incoming registrations, a stable sort + # ordering can still be guaranteed. + + self._locks[provider] = self._counter.next() + + def _GetLockInfo(self, requested): + """Get information from all locks. + + """ + # Must hold lock while getting consistent list of tracked items + self._lock.acquire(shared=1) + try: + items = self._locks.items() + finally: + self._lock.release() + + return [(info, idx, num) + for (provider, num) in items + for (idx, info) in enumerate(provider.GetLockInfo(requested))] + + def _Query(self, fields): + """Queries information from all locks. + + @type fields: list of strings + @param fields: List of fields to return + + """ + qobj = query.Query(query.LOCK_FIELDS, fields) + + # Get all data with internal lock held and then sort by name and incoming + # order + lockinfo = sorted(self._GetLockInfo(qobj.RequestedData()), + key=_MonitorSortKey) + + # Extract lock information and build query data + return (qobj, query.LockQueryData(map(compat.fst, lockinfo))) + + def QueryLocks(self, fields): + """Queries information from all locks. + + @type fields: list of strings + @param fields: List of fields to return + + """ + (qobj, ctx) = self._Query(fields) + + # Prepare query response + return query.GetQueryResponse(qobj, ctx)