X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/e6c200d61554b9cd06c203a8800e94941de82a6a..944bf54895c1d4491c6d06ad464aa6e97844c366:/lib/locking.py diff --git a/lib/locking.py b/lib/locking.py index 216c205..647e14f 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -26,6 +26,26 @@ import threading # Wouldn't it be better to define LockingError in the locking module? # Well, for now that's how the rest of the code does it... from ganeti import errors +from ganeti import utils + + +def ssynchronized(lock, shared=0): + """Shared Synchronization decorator. + + Calls the function holding the given lock, either in exclusive or shared + mode. It requires the passed lock to be a SharedLock (or support its + semantics). + + """ + def wrap(fn): + def sync_function(*args, **kwargs): + lock.acquire(shared=shared) + try: + return fn(*args, **kwargs) + finally: + lock.release() + return sync_function + return wrap class SharedLock: @@ -54,6 +74,7 @@ class SharedLock: # lock waiters self.__nwait_exc = 0 self.__nwait_shr = 0 + self.__npass_shr = 0 # is this lock in the deleted state? self.__deleted = False @@ -83,27 +104,25 @@ class SharedLock: def _is_owned(self, shared=-1): """Is the current thread somehow owning the lock at this time? - Args: - shared: - < 0: check for any type of ownership (default) - 0: check for exclusive ownership - > 0: check for shared ownership + @param shared: + - < 0: check for any type of ownership (default) + - 0: check for exclusive ownership + - > 0: check for shared ownership """ self.__lock.acquire() try: - result = self.__is_owned(shared) + result = self.__is_owned(shared=shared) finally: self.__lock.release() return result - def __wait(self,c): + def __wait(self, c): """Wait on the given condition, and raise an exception if the current lock is declared deleted in the meantime. - Args: - c: condition to wait on + @param c: the condition to wait on """ c.wait() @@ -132,15 +151,15 @@ class SharedLock: finally: self.__nwait_exc -= 1 + assert self.__npass_shr == 0, "SharedLock: internal fairness violation" def acquire(self, blocking=1, shared=0): """Acquire a shared lock. - Args: - shared: whether to acquire in shared mode. By default an exclusive lock - will be acquired. - blocking: whether to block while trying to acquire or to operate in try-lock mode. - this locking mode is not supported yet. + @param shared: whether to acquire in shared mode; by default an + exclusive lock will be acquired + @param blocking: whether to block while trying to acquire or to + operate in try-lock mode (this locking mode is not supported yet) """ if not blocking: @@ -154,25 +173,36 @@ class SharedLock: # We cannot acquire the lock if we already have it assert not self.__is_owned(), "double acquire() on a non-recursive lock" + assert self.__npass_shr >= 0, "Internal fairness condition weirdness" if shared: self.__nwait_shr += 1 try: - # If there is an exclusive holder waiting we have to wait. We'll - # only do this once, though, when we start waiting for the lock. Then - # we'll just wait while there are no exclusive holders. + wait = False + # If there is an exclusive holder waiting we have to wait. + # We'll only do this once, though, when we start waiting for + # the lock. Then we'll just wait while there are no + # exclusive holders. if self.__nwait_exc > 0: # TODO: if !blocking... + wait = True self.__wait(self.__turn_shr) while self.__exc is not None: + wait = True # TODO: if !blocking... self.__wait(self.__turn_shr) self.__shr.add(threading.currentThread()) + + # If we were waiting note that we passed + if wait: + self.__npass_shr -= 1 + finally: self.__nwait_shr -= 1 + assert self.__npass_shr >= 0, "Internal fairness condition weirdness" else: # TODO: if !blocking... # (or modify __exclusive_acquire for non-blocking mode) @@ -192,6 +222,7 @@ class SharedLock: """ self.__lock.acquire() try: + assert self.__npass_shr >= 0, "Internal fairness condition weirdness" # Autodetect release type if self.__is_exclusive(): self.__exc = None @@ -200,21 +231,27 @@ class SharedLock: # mode if there are shared holders waiting. Otherwise wake up the next # exclusive holder. if self.__nwait_shr > 0: + # Make sure at least the ones which were blocked pass. + self.__npass_shr = self.__nwait_shr self.__turn_shr.notifyAll() elif self.__nwait_exc > 0: - self.__turn_exc.notify() + self.__turn_exc.notify() elif self.__is_sharer(): self.__shr.remove(threading.currentThread()) - # If there are shared holders waiting there *must* be an exclusive holder - # waiting as well; otherwise what were they waiting for? - assert (self.__nwait_shr == 0 or self.__nwait_exc > 0, - "Lock sharers waiting while no exclusive is queueing") - - # If there are no more shared holders and some exclusive holders are - # waiting let's wake one up. - if len(self.__shr) == 0 and self.__nwait_exc > 0: + # If there are shared holders waiting (and not just scheduled to pass) + # there *must* be an exclusive holder waiting as well; otherwise what + # were they waiting for? + assert (self.__nwait_exc > 0 or + self.__npass_shr == self.__nwait_shr), \ + "Lock sharers waiting while no exclusive is queueing" + + # If there are no more shared holders either in or scheduled to pass, + # and some exclusive holders are waiting let's wake one up. + if (len(self.__shr) == 0 and + self.__nwait_exc > 0 and + not self.__npass_shr > 0): self.__turn_exc.notify() else: @@ -230,10 +267,9 @@ class SharedLock: acquired in exclusive mode if you don't already own it, then the lock will be put in a state where any future and pending acquire() fail. - Args: - blocking: whether to block while trying to acquire or to operate in - try-lock mode. this locking mode is not supported yet unless - you are already holding exclusively the lock. + @param blocking: whether to block while trying to acquire or to + operate in try-lock mode. this locking mode is not supported + yet unless you are already holding exclusively the lock. """ self.__lock.acquire() @@ -260,6 +296,11 @@ class SharedLock: self.__lock.release() +# Whenever we want to acquire a full LockSet we pass None as the value +# to acquire. Hide this behing this nicely named constant. +ALL_SET = None + + class LockSet: """Implements a set of locks. @@ -274,8 +315,7 @@ class LockSet: def __init__(self, members=None): """Constructs a new LockSet. - Args: - members: initial members of the set + @param members: initial members of the set """ # Used internally to guarantee coherency. @@ -302,18 +342,27 @@ class LockSet: """Is the current thread a current level owner?""" return threading.currentThread() in self.__owners - def _add_owned(self, name): + def _add_owned(self, name=None): """Note the current thread owns the given lock""" - if self._is_owned(): - self.__owners[threading.currentThread()].add(name) + if name is None: + if not self._is_owned(): + self.__owners[threading.currentThread()] = set() else: - self.__owners[threading.currentThread()] = set([name]) + if self._is_owned(): + self.__owners[threading.currentThread()].add(name) + else: + self.__owners[threading.currentThread()] = set([name]) + - def _del_owned(self, name): + def _del_owned(self, name=None): """Note the current thread owns the given lock""" - self.__owners[threading.currentThread()].remove(name) - if not self.__owners[threading.currentThread()]: + if name is not None: + self.__owners[threading.currentThread()].remove(name) + + # Only remove the key if we don't hold the set-lock as well + if (not self.__lock._is_owned() and + not self.__owners[threading.currentThread()]): del self.__owners[threading.currentThread()] def _list_owned(self): @@ -330,38 +379,42 @@ class LockSet: result after releasing the lock. """ - return set(self.__lockdict.keys()) + return self.__lockdict.keys() def _names(self): """Return a copy of the current set of elements. Used only for debugging purposes. + """ - self.__lock.acquire(shared=1) + # If we don't already own the set-level lock acquired + # we'll get it and note we need to release it later. + release_lock = False + if not self.__lock._is_owned(): + release_lock = True + self.__lock.acquire(shared=1) try: result = self.__names() finally: - self.__lock.release() - return result + if release_lock: + self.__lock.release() + return set(result) def acquire(self, names, blocking=1, shared=0): """Acquire a set of resource locks. - Args: - names: the names of the locks which shall be acquired. - (special lock names, or instance/node names) - shared: whether to acquire in shared mode. By default an exclusive lock - will be acquired. - blocking: whether to block while trying to acquire or to operate in try-lock mode. - this locking mode is not supported yet. + @param names: the names of the locks which shall be acquired + (special lock names, or instance/node names) + @param shared: whether to acquire in shared mode; by default an + exclusive lock will be acquired + @param blocking: whether to block while trying to acquire or to + operate in try-lock mode (this locking mode is not supported yet) - Returns: - True: when all the locks are successfully acquired + @return: True when all the locks are successfully acquired - Raises: - errors.LockError: when any lock we try to acquire has been deleted - before we succeed. In this case none of the locks requested will be - acquired. + @raise errors.LockError: when any lock we try to acquire has + been deleted before we succeed. In this case none of the + locks requested will be acquired. """ if not blocking: @@ -371,48 +424,89 @@ class LockSet: # Check we don't already own locks at this level assert not self._is_owned(), "Cannot acquire locks in the same set twice" - # Support passing in a single resource to acquire rather than many - if isinstance(names, basestring): - names = [names] - else: - names.sort() + if names is None: + # If no names are given acquire the whole set by not letting new names + # being added before we release, and getting the current list of names. + # Some of them may then be deleted later, but we'll cope with this. + # + # We'd like to acquire this lock in a shared way, as it's nice if + # everybody else can use the instances at the same time. If are acquiring + # them exclusively though they won't be able to do this anyway, though, + # so we'll get the list lock exclusively as well in order to be able to + # do add() on the set while owning it. + self.__lock.acquire(shared=shared) + try: + # note we own the set-lock + self._add_owned() + names = self.__names() + except: + # We shouldn't have problems adding the lock to the owners list, but + # if we did we'll try to release this lock and re-raise exception. + # Of course something is going to be really wrong, after this. + self.__lock.release() + raise - acquire_list = [] - # First we look the locks up on __lockdict. We have no way of being sure - # they will still be there after, but this makes it a lot faster should - # just one of them be the already wrong - try: - for lname in names: - lock = self.__lockdict[lname] # raises KeyError if the lock is not there - acquire_list.append((lname, lock)) - except (KeyError): - raise errors.LockError('non-existing lock in set (%s)' % lname) - - # Now acquire_list contains a sorted list of resources and locks we want. - # In order to get them we loop on this (private) list and acquire() them. - # We gave no real guarantee they will still exist till this is done but - # .acquire() itself is safe and will alert us if the lock gets deleted. try: + # Support passing in a single resource to acquire rather than many + if isinstance(names, basestring): + names = [names] + else: + names = sorted(names) + + acquire_list = [] + # First we look the locks up on __lockdict. We have no way of being sure + # they will still be there after, but this makes it a lot faster should + # just one of them be the already wrong + for lname in utils.UniqueSequence(names): + try: + lock = self.__lockdict[lname] # raises KeyError if lock is not there + acquire_list.append((lname, lock)) + except (KeyError): + if self.__lock._is_owned(): + # We are acquiring all the set, it doesn't matter if this + # particular element is not there anymore. + continue + else: + raise errors.LockError('non-existing lock in set (%s)' % lname) + + # This will hold the locknames we effectively acquired. + acquired = set() + # Now acquire_list contains a sorted list of resources and locks we want. + # In order to get them we loop on this (private) list and acquire() them. + # We gave no real guarantee they will still exist till this is done but + # .acquire() itself is safe and will alert us if the lock gets deleted. for (lname, lock) in acquire_list: - lock.acquire(shared=shared) # raises LockError if the lock is deleted try: + lock.acquire(shared=shared) # raises LockError if the lock is deleted # now the lock cannot be deleted, we have it! - self._add_owned(lname) + self._add_owned(name=lname) + acquired.add(lname) + except (errors.LockError): + if self.__lock._is_owned(): + # We are acquiring all the set, it doesn't matter if this + # particular element is not there anymore. + continue + else: + name_fail = lname + for lname in self._list_owned(): + self.__lockdict[lname].release() + self._del_owned(name=lname) + raise errors.LockError('non-existing lock in set (%s)' % name_fail) except: # We shouldn't have problems adding the lock to the owners list, but # if we did we'll try to release this lock and re-raise exception. # Of course something is going to be really wrong, after this. - lock.release() - raise + if lock._is_owned(): + lock.release() + raise - except (errors.LockError): - name_fail = lname - for lname in self._list_owned(): - self.__lockdict[lname].release() - self._del_owned(lname) - raise errors.LockError('non-existing lock in set (%s)' % name_fail) + except: + # If something went wrong and we had the set-lock let's release it... + if self.__lock._is_owned(): + self.__lock.release() + raise - return True + return acquired def release(self, names=None): """Release a set of resource locks, at the same level. @@ -420,12 +514,10 @@ class LockSet: You must have acquired the locks, either in shared or in exclusive mode, before releasing them. - Args: - names: the names of the locks which shall be released. - (defaults to all the locks acquired at that level). + @param names: the names of the locks which shall be released + (defaults to all the locks acquired at that level). """ - assert self._is_owned(), "release() on lock set while not owner" # Support passing in a single resource to release rather than many @@ -440,30 +532,43 @@ class LockSet: "release() on unheld resources %s" % names.difference(self._list_owned())) + # First of all let's release the "all elements" lock, if set. + # After this 'add' can work again + if self.__lock._is_owned(): + self.__lock.release() + self._del_owned() + for lockname in names: # If we are sure the lock doesn't leave __lockdict without being # exclusively held we can do this... self.__lockdict[lockname].release() - self._del_owned(lockname) + self._del_owned(name=lockname) def add(self, names, acquired=0, shared=0): """Add a new set of elements to the set - Args: - names: names of the new elements to add - acquired: pre-acquire the new resource? - shared: is the pre-acquisition shared? + @param names: names of the new elements to add + @param acquired: pre-acquire the new resource? + @param shared: is the pre-acquisition shared? """ + # Check we don't already own locks at this level + assert not self._is_owned() or self.__lock._is_owned(shared=0), \ + "Cannot add locks if the set is only partially owned, or shared" + # Support passing in a single resource to add rather than many if isinstance(names, basestring): names = [names] - # Acquire the internal lock in an exclusive way, so there cannot be a - # conflicting add() - self.__lock.acquire() + # If we don't already own the set-level lock acquired in an exclusive way + # we'll get it and note we need to release it later. + release_lock = False + if not self.__lock._is_owned(): + release_lock = True + self.__lock.acquire() + try: - invalid_names = self.__names().intersection(names) + invalid_names = set(self.__names()).intersection(names) if invalid_names: # This must be an explicit raise, not an assert, because assert is # turned off when using optimization, and this can happen because of @@ -477,7 +582,7 @@ class LockSet: lock.acquire(shared=shared) # now the lock cannot be deleted, we have it! try: - self._add_owned(lockname) + self._add_owned(name=lockname) except: # We shouldn't have problems adding the lock to the owners list, # but if we did we'll try to release this lock and re-raise @@ -491,7 +596,9 @@ class LockSet: self.__lockdict[lockname] = lock finally: - self.__lock.release() + # Only release __lock if we were not holding it previously. + if release_lock: + self.__lock.release() return True @@ -501,15 +608,14 @@ class LockSet: You can either not hold anything in the lockset or already hold a superset of the elements you want to delete, exclusively. - Args: - names: names of the resource to remove. - blocking: whether to block while trying to acquire or to operate in - try-lock mode. this locking mode is not supported yet unless - you are already holding exclusively the locks. + @param names: names of the resource to remove. + @param blocking: whether to block while trying to acquire or to + operate in try-lock mode (this locking mode is not supported + yet unless you are already holding exclusively the locks) - Returns: - A list of lock which we failed to delete. The list is always empty if we - were holding all the locks exclusively. + @return:: a list of locks which we removed; the list is always + equal to the names list if we were holding all the locks + exclusively """ if not blocking and not self._is_owned(): @@ -526,7 +632,7 @@ class LockSet: assert not self._is_owned() or self._list_owned().issuperset(names), ( "remove() on acquired lockset while not owning all elements") - delete_failed=[] + removed = [] for lname in names: # Calling delete() acquires the lock exclusively if we don't already own @@ -536,8 +642,8 @@ class LockSet: # everything we want to delete, or we hold none. try: self.__lockdict[lname].delete() + removed.append(lname) except (KeyError, errors.LockError): - delete_failed.append(lname) # This cannot happen if we were already holding it, verify: assert not self._is_owned(), "remove failed while holding lockset" else: @@ -551,7 +657,210 @@ class LockSet: del self.__lockdict[lname] # And let's remove it from our private list if we owned it. if self._is_owned(): - self._del_owned(lname) + self._del_owned(name=lname) + + return removed + + +# Locking levels, must be acquired in increasing order. +# Current rules are: +# - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be +# acquired before performing any operation, either in shared or in exclusive +# mode. acquiring the BGL in exclusive mode is discouraged and should be +# avoided. +# - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks. +# If you need more than one node, or more than one instance, acquire them at +# the same time. +LEVEL_CLUSTER = 0 +LEVEL_INSTANCE = 1 +LEVEL_NODE = 2 + +LEVELS = [LEVEL_CLUSTER, + LEVEL_INSTANCE, + LEVEL_NODE] + +# Lock levels which are modifiable +LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE] + +# Constant for the big ganeti lock +BGL = 'BGL' + + +class GanetiLockManager: + """The Ganeti Locking Library - return delete_failed + The purpouse of this small library is to manage locking for ganeti clusters + in a central place, while at the same time doing dynamic checks against + possible deadlocks. It will also make it easier to transition to a different + lock type should we migrate away from python threads. + """ + _instance = None + + def __init__(self, nodes=None, instances=None): + """Constructs a new GanetiLockManager object. + + There should be only a GanetiLockManager object at any time, so this + function raises an error if this is not the case. + + @param nodes: list of node names + @param instances: list of instance names + + """ + assert self.__class__._instance is None, \ + "double GanetiLockManager instance" + + self.__class__._instance = self + + # The keyring contains all the locks, at their level and in the correct + # locking order. + self.__keyring = { + LEVEL_CLUSTER: LockSet([BGL]), + LEVEL_NODE: LockSet(nodes), + LEVEL_INSTANCE: LockSet(instances), + } + + def _names(self, level): + """List the lock names at the given level. + + This can be used for debugging/testing purposes. + + @param level: the level whose list of locks to get + + """ + assert level in LEVELS, "Invalid locking level %s" % level + return self.__keyring[level]._names() + + def _is_owned(self, level): + """Check whether we are owning locks at the given level + + """ + return self.__keyring[level]._is_owned() + + is_owned = _is_owned + + def _list_owned(self, level): + """Get the set of owned locks at the given level + + """ + return self.__keyring[level]._list_owned() + + def _upper_owned(self, level): + """Check that we don't own any lock at a level greater than the given one. + + """ + # This way of checking only works if LEVELS[i] = i, which we check for in + # the test cases. + return utils.any((self._is_owned(l) for l in LEVELS[level + 1:])) + + def _BGL_owned(self): + """Check if the current thread owns the BGL. + + Both an exclusive or a shared acquisition work. + + """ + return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned() + + def _contains_BGL(self, level, names): + """Check if the level contains the BGL. + + Check if acting on the given level and set of names will change + the status of the Big Ganeti Lock. + + """ + return level == LEVEL_CLUSTER and (names is None or BGL in names) + + def acquire(self, level, names, blocking=1, shared=0): + """Acquire a set of resource locks, at the same level. + + @param level: the level at which the locks shall be acquired; + it must be a memmber of LEVELS. + @param names: the names of the locks which shall be acquired + (special lock names, or instance/node names) + @param shared: whether to acquire in shared mode; by default + an exclusive lock will be acquired + @param blocking: whether to block while trying to acquire or to + operate in try-lock mode (this locking mode is not supported yet) + + """ + assert level in LEVELS, "Invalid locking level %s" % level + + # Check that we are either acquiring the Big Ganeti Lock or we already own + # it. Some "legacy" opcodes need to be sure they are run non-concurrently + # so even if we've migrated we need to at least share the BGL to be + # compatible with them. Of course if we own the BGL exclusively there's no + # point in acquiring any other lock, unless perhaps we are half way through + # the migration of the current opcode. + assert (self._contains_BGL(level, names) or self._BGL_owned()), ( + "You must own the Big Ganeti Lock before acquiring any other") + + # Check we don't own locks at the same or upper levels. + assert not self._upper_owned(level), ("Cannot acquire locks at a level" + " while owning some at a greater one") + + # Acquire the locks in the set. + return self.__keyring[level].acquire(names, shared=shared, + blocking=blocking) + + def release(self, level, names=None): + """Release a set of resource locks, at the same level. + + You must have acquired the locks, either in shared or in exclusive + mode, before releasing them. + + @param level: the level at which the locks shall be released; + it must be a memmber of LEVELS + @param names: the names of the locks which shall be released + (defaults to all the locks acquired at that level) + + """ + assert level in LEVELS, "Invalid locking level %s" % level + assert (not self._contains_BGL(level, names) or + not self._upper_owned(LEVEL_CLUSTER)), ( + "Cannot release the Big Ganeti Lock while holding something" + " at upper levels") + + # Release will complain if we don't own the locks already + return self.__keyring[level].release(names) + + def add(self, level, names, acquired=0, shared=0): + """Add locks at the specified level. + + @param level: the level at which the locks shall be added; + it must be a memmber of LEVELS_MOD. + @param names: names of the locks to acquire + @param acquired: whether to acquire the newly added locks + @param shared: whether the acquisition will be shared + + """ + assert level in LEVELS_MOD, "Invalid or immutable level %s" % level + assert self._BGL_owned(), ("You must own the BGL before performing other" + " operations") + assert not self._upper_owned(level), ("Cannot add locks at a level" + " while owning some at a greater one") + return self.__keyring[level].add(names, acquired=acquired, shared=shared) + + def remove(self, level, names, blocking=1): + """Remove locks from the specified level. + + You must either already own the locks you are trying to remove + exclusively or not own any lock at an upper level. + + @param level: the level at which the locks shall be removed; + it must be a member of LEVELS_MOD + @param names: the names of the locks which shall be removed + (special lock names, or instance/node names) + @param blocking: whether to block while trying to operate in + try-lock mode (this locking mode is not supported yet) + + """ + assert level in LEVELS_MOD, "Invalid or immutable level %s" % level + assert self._BGL_owned(), ("You must own the BGL before performing other" + " operations") + # Check we either own the level or don't own anything from here + # up. LockSet.remove() will check the case in which we don't own + # all the needed resources, or we have a shared ownership. + assert self._is_owned(level) or not self._upper_owned(level), ( + "Cannot remove locks at a level while not owning it or" + " owning some at a greater one") + return self.__keyring[level].remove(names, blocking=blocking)