X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/7ee7c0c7e4cba67496a261bc0890bbe8d02cf7a0..4a89c54a18904f71afc93d6af639c02114fdeabc:/lib/locking.py diff --git a/lib/locking.py b/lib/locking.py index 41519eb..1e4f1d1 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -29,6 +29,25 @@ from ganeti import errors from ganeti import utils +def ssynchronized(lock, shared=0): + """Shared Synchronization decorator. + + Calls the function holding the given lock, either in exclusive or shared + mode. It requires the passed lock to be a SharedLock (or support its + semantics). + + """ + def wrap(fn): + def sync_function(*args, **kwargs): + lock.acquire(shared=shared) + try: + return fn(*args, **kwargs) + finally: + lock.release() + return sync_function + return wrap + + class SharedLock: """Implements a shared lock. @@ -55,6 +74,7 @@ class SharedLock: # lock waiters self.__nwait_exc = 0 self.__nwait_shr = 0 + self.__npass_shr = 0 # is this lock in the deleted state? self.__deleted = False @@ -84,27 +104,25 @@ class SharedLock: def _is_owned(self, shared=-1): """Is the current thread somehow owning the lock at this time? - Args: - shared: - < 0: check for any type of ownership (default) - 0: check for exclusive ownership - > 0: check for shared ownership + @param shared: + - < 0: check for any type of ownership (default) + - 0: check for exclusive ownership + - > 0: check for shared ownership """ self.__lock.acquire() try: - result = self.__is_owned(shared) + result = self.__is_owned(shared=shared) finally: self.__lock.release() return result - def __wait(self,c): + def __wait(self, c): """Wait on the given condition, and raise an exception if the current lock is declared deleted in the meantime. - Args: - c: condition to wait on + @param c: the condition to wait on """ c.wait() @@ -133,15 +151,15 @@ class SharedLock: finally: self.__nwait_exc -= 1 + assert self.__npass_shr == 0, "SharedLock: internal fairness violation" def acquire(self, blocking=1, shared=0): """Acquire a shared lock. - Args: - shared: whether to acquire in shared mode. By default an exclusive lock - will be acquired. - blocking: whether to block while trying to acquire or to operate in try-lock mode. - this locking mode is not supported yet. + @param shared: whether to acquire in shared mode; by default an + exclusive lock will be acquired + @param blocking: whether to block while trying to acquire or to + operate in try-lock mode (this locking mode is not supported yet) """ if not blocking: @@ -155,25 +173,35 @@ class SharedLock: # We cannot acquire the lock if we already have it assert not self.__is_owned(), "double acquire() on a non-recursive lock" + assert self.__npass_shr >= 0, "Internal fairness condition weirdness" if shared: self.__nwait_shr += 1 try: + wait = False # If there is an exclusive holder waiting we have to wait. We'll # only do this once, though, when we start waiting for the lock. Then # we'll just wait while there are no exclusive holders. if self.__nwait_exc > 0: # TODO: if !blocking... + wait = True self.__wait(self.__turn_shr) while self.__exc is not None: + wait = True # TODO: if !blocking... self.__wait(self.__turn_shr) self.__shr.add(threading.currentThread()) + + # If we were waiting note that we passed + if wait: + self.__npass_shr -= 1 + finally: self.__nwait_shr -= 1 + assert self.__npass_shr >= 0, "Internal fairness condition weirdness" else: # TODO: if !blocking... # (or modify __exclusive_acquire for non-blocking mode) @@ -193,6 +221,7 @@ class SharedLock: """ self.__lock.acquire() try: + assert self.__npass_shr >= 0, "Internal fairness condition weirdness" # Autodetect release type if self.__is_exclusive(): self.__exc = None @@ -201,21 +230,26 @@ class SharedLock: # mode if there are shared holders waiting. Otherwise wake up the next # exclusive holder. if self.__nwait_shr > 0: + # Make sure at least the ones which were blocked pass. + self.__npass_shr = self.__nwait_shr self.__turn_shr.notifyAll() elif self.__nwait_exc > 0: - self.__turn_exc.notify() + self.__turn_exc.notify() elif self.__is_sharer(): self.__shr.remove(threading.currentThread()) - # If there are shared holders waiting there *must* be an exclusive holder - # waiting as well; otherwise what were they waiting for? - assert (self.__nwait_shr == 0 or self.__nwait_exc > 0, - "Lock sharers waiting while no exclusive is queueing") - - # If there are no more shared holders and some exclusive holders are - # waiting let's wake one up. - if len(self.__shr) == 0 and self.__nwait_exc > 0: + # If there are shared holders waiting (and not just scheduled to pass) + # there *must* be an exclusive holder waiting as well; otherwise what + # were they waiting for? + assert (self.__nwait_exc > 0 or self.__npass_shr == self.__nwait_shr), \ + "Lock sharers waiting while no exclusive is queueing" + + # If there are no more shared holders either in or scheduled to pass, + # and some exclusive holders are waiting let's wake one up. + if (len(self.__shr) == 0 and + self.__nwait_exc > 0 and + not self.__npass_shr > 0): self.__turn_exc.notify() else: @@ -231,10 +265,9 @@ class SharedLock: acquired in exclusive mode if you don't already own it, then the lock will be put in a state where any future and pending acquire() fail. - Args: - blocking: whether to block while trying to acquire or to operate in - try-lock mode. this locking mode is not supported yet unless - you are already holding exclusively the lock. + @param blocking: whether to block while trying to acquire or to + operate in try-lock mode. this locking mode is not supported + yet unless you are already holding exclusively the lock. """ self.__lock.acquire() @@ -261,6 +294,11 @@ class SharedLock: self.__lock.release() +# Whenever we want to acquire a full LockSet we pass None as the value to acquire. +# Hide this behing this nicely named constant. +ALL_SET = None + + class LockSet: """Implements a set of locks. @@ -275,8 +313,7 @@ class LockSet: def __init__(self, members=None): """Constructs a new LockSet. - Args: - members: initial members of the set + @param members: initial members of the set """ # Used internally to guarantee coherency. @@ -303,18 +340,27 @@ class LockSet: """Is the current thread a current level owner?""" return threading.currentThread() in self.__owners - def _add_owned(self, name): + def _add_owned(self, name=None): """Note the current thread owns the given lock""" - if self._is_owned(): - self.__owners[threading.currentThread()].add(name) + if name is None: + if not self._is_owned(): + self.__owners[threading.currentThread()] = set() else: - self.__owners[threading.currentThread()] = set([name]) + if self._is_owned(): + self.__owners[threading.currentThread()].add(name) + else: + self.__owners[threading.currentThread()] = set([name]) + - def _del_owned(self, name): + def _del_owned(self, name=None): """Note the current thread owns the given lock""" - self.__owners[threading.currentThread()].remove(name) - if not self.__owners[threading.currentThread()]: + if name is not None: + self.__owners[threading.currentThread()].remove(name) + + # Only remove the key if we don't hold the set-lock as well + if (not self.__lock._is_owned() and + not self.__owners[threading.currentThread()]): del self.__owners[threading.currentThread()] def _list_owned(self): @@ -331,38 +377,42 @@ class LockSet: result after releasing the lock. """ - return set(self.__lockdict.keys()) + return self.__lockdict.keys() def _names(self): """Return a copy of the current set of elements. Used only for debugging purposes. + """ - self.__lock.acquire(shared=1) + # If we don't already own the set-level lock acquired + # we'll get it and note we need to release it later. + release_lock = False + if not self.__lock._is_owned(): + release_lock = True + self.__lock.acquire(shared=1) try: result = self.__names() finally: - self.__lock.release() - return result + if release_lock: + self.__lock.release() + return set(result) def acquire(self, names, blocking=1, shared=0): """Acquire a set of resource locks. - Args: - names: the names of the locks which shall be acquired. - (special lock names, or instance/node names) - shared: whether to acquire in shared mode. By default an exclusive lock - will be acquired. - blocking: whether to block while trying to acquire or to operate in try-lock mode. - this locking mode is not supported yet. + @param names: the names of the locks which shall be acquired + (special lock names, or instance/node names) + @param shared: whether to acquire in shared mode; by default an + exclusive lock will be acquired + @param blocking: whether to block while trying to acquire or to + operate in try-lock mode (this locking mode is not supported yet) - Returns: - True: when all the locks are successfully acquired + @return: True when all the locks are successfully acquired - Raises: - errors.LockError: when any lock we try to acquire has been deleted - before we succeed. In this case none of the locks requested will be - acquired. + @raise errors.LockError: when any lock we try to acquire has + been deleted before we succeed. In this case none of the + locks requested will be acquired. """ if not blocking: @@ -372,48 +422,89 @@ class LockSet: # Check we don't already own locks at this level assert not self._is_owned(), "Cannot acquire locks in the same set twice" - # Support passing in a single resource to acquire rather than many - if isinstance(names, basestring): - names = [names] - else: - names.sort() + if names is None: + # If no names are given acquire the whole set by not letting new names + # being added before we release, and getting the current list of names. + # Some of them may then be deleted later, but we'll cope with this. + # + # We'd like to acquire this lock in a shared way, as it's nice if + # everybody else can use the instances at the same time. If are acquiring + # them exclusively though they won't be able to do this anyway, though, + # so we'll get the list lock exclusively as well in order to be able to + # do add() on the set while owning it. + self.__lock.acquire(shared=shared) + try: + # note we own the set-lock + self._add_owned() + names = self.__names() + except: + # We shouldn't have problems adding the lock to the owners list, but + # if we did we'll try to release this lock and re-raise exception. + # Of course something is going to be really wrong, after this. + self.__lock.release() + raise - acquire_list = [] - # First we look the locks up on __lockdict. We have no way of being sure - # they will still be there after, but this makes it a lot faster should - # just one of them be the already wrong - try: - for lname in names: - lock = self.__lockdict[lname] # raises KeyError if the lock is not there - acquire_list.append((lname, lock)) - except (KeyError): - raise errors.LockError('non-existing lock in set (%s)' % lname) - - # Now acquire_list contains a sorted list of resources and locks we want. - # In order to get them we loop on this (private) list and acquire() them. - # We gave no real guarantee they will still exist till this is done but - # .acquire() itself is safe and will alert us if the lock gets deleted. try: + # Support passing in a single resource to acquire rather than many + if isinstance(names, basestring): + names = [names] + else: + names.sort() + + acquire_list = [] + # First we look the locks up on __lockdict. We have no way of being sure + # they will still be there after, but this makes it a lot faster should + # just one of them be the already wrong + for lname in utils.UniqueSequence(names): + try: + lock = self.__lockdict[lname] # raises KeyError if lock is not there + acquire_list.append((lname, lock)) + except (KeyError): + if self.__lock._is_owned(): + # We are acquiring all the set, it doesn't matter if this particular + # element is not there anymore. + continue + else: + raise errors.LockError('non-existing lock in set (%s)' % lname) + + # This will hold the locknames we effectively acquired. + acquired = set() + # Now acquire_list contains a sorted list of resources and locks we want. + # In order to get them we loop on this (private) list and acquire() them. + # We gave no real guarantee they will still exist till this is done but + # .acquire() itself is safe and will alert us if the lock gets deleted. for (lname, lock) in acquire_list: - lock.acquire(shared=shared) # raises LockError if the lock is deleted try: + lock.acquire(shared=shared) # raises LockError if the lock is deleted # now the lock cannot be deleted, we have it! - self._add_owned(lname) + self._add_owned(name=lname) + acquired.add(lname) + except (errors.LockError): + if self.__lock._is_owned(): + # We are acquiring all the set, it doesn't matter if this particular + # element is not there anymore. + continue + else: + name_fail = lname + for lname in self._list_owned(): + self.__lockdict[lname].release() + self._del_owned(name=lname) + raise errors.LockError('non-existing lock in set (%s)' % name_fail) except: # We shouldn't have problems adding the lock to the owners list, but # if we did we'll try to release this lock and re-raise exception. # Of course something is going to be really wrong, after this. - lock.release() - raise + if lock._is_owned(): + lock.release() + raise - except (errors.LockError): - name_fail = lname - for lname in self._list_owned(): - self.__lockdict[lname].release() - self._del_owned(lname) - raise errors.LockError('non-existing lock in set (%s)' % name_fail) + except: + # If something went wrong and we had the set-lock let's release it... + if self.__lock._is_owned(): + self.__lock.release() + raise - return True + return acquired def release(self, names=None): """Release a set of resource locks, at the same level. @@ -421,12 +512,10 @@ class LockSet: You must have acquired the locks, either in shared or in exclusive mode, before releasing them. - Args: - names: the names of the locks which shall be released. - (defaults to all the locks acquired at that level). + @param names: the names of the locks which shall be released + (defaults to all the locks acquired at that level). """ - assert self._is_owned(), "release() on lock set while not owner" # Support passing in a single resource to release rather than many @@ -441,30 +530,43 @@ class LockSet: "release() on unheld resources %s" % names.difference(self._list_owned())) + # First of all let's release the "all elements" lock, if set. + # After this 'add' can work again + if self.__lock._is_owned(): + self.__lock.release() + self._del_owned() + for lockname in names: # If we are sure the lock doesn't leave __lockdict without being # exclusively held we can do this... self.__lockdict[lockname].release() - self._del_owned(lockname) + self._del_owned(name=lockname) def add(self, names, acquired=0, shared=0): """Add a new set of elements to the set - Args: - names: names of the new elements to add - acquired: pre-acquire the new resource? - shared: is the pre-acquisition shared? + @param names: names of the new elements to add + @param acquired: pre-acquire the new resource? + @param shared: is the pre-acquisition shared? """ + # Check we don't already own locks at this level + assert not self._is_owned() or self.__lock._is_owned(shared=0), \ + "Cannot add locks if the set is only partially owned, or shared" + # Support passing in a single resource to add rather than many if isinstance(names, basestring): names = [names] - # Acquire the internal lock in an exclusive way, so there cannot be a - # conflicting add() - self.__lock.acquire() + # If we don't already own the set-level lock acquired in an exclusive way + # we'll get it and note we need to release it later. + release_lock = False + if not self.__lock._is_owned(): + release_lock = True + self.__lock.acquire() + try: - invalid_names = self.__names().intersection(names) + invalid_names = set(self.__names()).intersection(names) if invalid_names: # This must be an explicit raise, not an assert, because assert is # turned off when using optimization, and this can happen because of @@ -478,7 +580,7 @@ class LockSet: lock.acquire(shared=shared) # now the lock cannot be deleted, we have it! try: - self._add_owned(lockname) + self._add_owned(name=lockname) except: # We shouldn't have problems adding the lock to the owners list, # but if we did we'll try to release this lock and re-raise @@ -492,7 +594,9 @@ class LockSet: self.__lockdict[lockname] = lock finally: - self.__lock.release() + # Only release __lock if we were not holding it previously. + if release_lock: + self.__lock.release() return True @@ -502,15 +606,14 @@ class LockSet: You can either not hold anything in the lockset or already hold a superset of the elements you want to delete, exclusively. - Args: - names: names of the resource to remove. - blocking: whether to block while trying to acquire or to operate in - try-lock mode. this locking mode is not supported yet unless - you are already holding exclusively the locks. + @param names: names of the resource to remove. + @param blocking: whether to block while trying to acquire or to + operate in try-lock mode (this locking mode is not supported + yet unless you are already holding exclusively the locks) - Returns: - A list of lock which we failed to delete. The list is always empty if we - were holding all the locks exclusively. + @return:: a list of locks which we removed; the list is always + equal to the names list if we were holding all the locks + exclusively """ if not blocking and not self._is_owned(): @@ -527,7 +630,7 @@ class LockSet: assert not self._is_owned() or self._list_owned().issuperset(names), ( "remove() on acquired lockset while not owning all elements") - delete_failed=[] + removed = [] for lname in names: # Calling delete() acquires the lock exclusively if we don't already own @@ -537,8 +640,8 @@ class LockSet: # everything we want to delete, or we hold none. try: self.__lockdict[lname].delete() + removed.append(lname) except (KeyError, errors.LockError): - delete_failed.append(lname) # This cannot happen if we were already holding it, verify: assert not self._is_owned(), "remove failed while holding lockset" else: @@ -552,9 +655,9 @@ class LockSet: del self.__lockdict[lname] # And let's remove it from our private list if we owned it. if self._is_owned(): - self._del_owned(lname) + self._del_owned(name=lname) - return delete_failed + return removed # Locking levels, must be acquired in increasing order. @@ -566,24 +669,19 @@ class LockSet: # - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks. # If you need more than one node, or more than one instance, acquire them at # the same time. -# - level LEVEL_CONFIG contains the configuration lock, which you must acquire -# before reading or changing the config file. LEVEL_CLUSTER = 0 -LEVEL_NODE = 1 -LEVEL_INSTANCE = 2 -LEVEL_CONFIG = 3 +LEVEL_INSTANCE = 1 +LEVEL_NODE = 2 LEVELS = [LEVEL_CLUSTER, - LEVEL_NODE, LEVEL_INSTANCE, - LEVEL_CONFIG] + LEVEL_NODE] # Lock levels which are modifiable LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE] -# Constant for the big ganeti lock and config lock +# Constant for the big ganeti lock BGL = 'BGL' -CONFIG = 'config' class GanetiLockManager: @@ -600,16 +698,16 @@ class GanetiLockManager: def __init__(self, nodes=None, instances=None): """Constructs a new GanetiLockManager object. - There should be only a - GanetiLockManager object at any time, so this function raises an error if this - is not the case. + There should be only a GanetiLockManager object at any time, so this + function raises an error if this is not the case. - Args: - nodes: list of node names - instances: list of instance names + @param nodes: list of node names + @param instances: list of instance names """ - assert self.__class__._instance is None, "double GanetiLockManager instance" + assert self.__class__._instance is None, \ + "double GanetiLockManager instance" + self.__class__._instance = self # The keyring contains all the locks, at their level and in the correct @@ -618,15 +716,14 @@ class GanetiLockManager: LEVEL_CLUSTER: LockSet([BGL]), LEVEL_NODE: LockSet(nodes), LEVEL_INSTANCE: LockSet(instances), - LEVEL_CONFIG: LockSet([CONFIG]), } def _names(self, level): """List the lock names at the given level. - Used for debugging/testing purposes. - Args: - level: the level whose list of locks to get + This can be used for debugging/testing purposes. + + @param level: the level whose list of locks to get """ assert level in LEVELS, "Invalid locking level %s" % level @@ -638,6 +735,8 @@ class GanetiLockManager: """ return self.__keyring[level]._is_owned() + is_owned = _is_owned + def _list_owned(self, level): """Get the set of owned locks at the given level @@ -661,8 +760,10 @@ class GanetiLockManager: return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned() def _contains_BGL(self, level, names): - """Check if acting on the given level and set of names will change the - status of the Big Ganeti Lock. + """Check if the level contains the BGL. + + Check if acting on the given level and set of names will change + the status of the Big Ganeti Lock. """ return level == LEVEL_CLUSTER and (names is None or BGL in names) @@ -670,15 +771,14 @@ class GanetiLockManager: def acquire(self, level, names, blocking=1, shared=0): """Acquire a set of resource locks, at the same level. - Args: - level: the level at which the locks shall be acquired. - It must be a memmber of LEVELS. - names: the names of the locks which shall be acquired. - (special lock names, or instance/node names) - shared: whether to acquire in shared mode. By default an exclusive lock - will be acquired. - blocking: whether to block while trying to acquire or to operate in try-lock mode. - this locking mode is not supported yet. + @param level: the level at which the locks shall be acquired; + it must be a memmber of LEVELS. + @param names: the names of the locks which shall be acquired + (special lock names, or instance/node names) + @param shared: whether to acquire in shared mode; by default + an exclusive lock will be acquired + @param blocking: whether to block while trying to acquire or to + operate in try-lock mode (this locking mode is not supported yet) """ assert level in LEVELS, "Invalid locking level %s" % level @@ -693,7 +793,7 @@ class GanetiLockManager: "You must own the Big Ganeti Lock before acquiring any other") # Check we don't own locks at the same or upper levels. - assert not self._upper_owned(level), ("Cannot acquire locks at a level" + assert not self._upper_owned(level), ("Cannot acquire locks at a level" " while owning some at a greater one") # Acquire the locks in the set. @@ -703,14 +803,13 @@ class GanetiLockManager: def release(self, level, names=None): """Release a set of resource locks, at the same level. - You must have acquired the locks, either in shared or in exclusive mode, - before releasing them. + You must have acquired the locks, either in shared or in exclusive + mode, before releasing them. - Args: - level: the level at which the locks shall be released. - It must be a memmber of LEVELS. - names: the names of the locks which shall be released. - (defaults to all the locks acquired at that level). + @param level: the level at which the locks shall be released; + it must be a memmber of LEVELS + @param names: the names of the locks which shall be released + (defaults to all the locks acquired at that level) """ assert level in LEVELS, "Invalid locking level %s" % level @@ -725,12 +824,12 @@ class GanetiLockManager: def add(self, level, names, acquired=0, shared=0): """Add locks at the specified level. - Args: - level: the level at which the locks shall be added. - It must be a memmber of LEVELS_MOD. - names: names of the locks to acquire - acquired: whether to acquire the newly added locks - shared: whether the acquisition will be shared + @param level: the level at which the locks shall be added; + it must be a memmber of LEVELS_MOD. + @param names: names of the locks to acquire + @param acquired: whether to acquire the newly added locks + @param shared: whether the acquisition will be shared + """ assert level in LEVELS_MOD, "Invalid or immutable level %s" % level assert self._BGL_owned(), ("You must own the BGL before performing other" @@ -742,16 +841,15 @@ class GanetiLockManager: def remove(self, level, names, blocking=1): """Remove locks from the specified level. - You must either already own the locks you are trying to remove exclusively - or not own any lock at an upper level. + You must either already own the locks you are trying to remove + exclusively or not own any lock at an upper level. - Args: - level: the level at which the locks shall be removed. - It must be a memmber of LEVELS_MOD. - names: the names of the locks which shall be removed. - (special lock names, or instance/node names) - blocking: whether to block while trying to operate in try-lock mode. - this locking mode is not supported yet. + @param level: the level at which the locks shall be removed; + it must be a member of LEVELS_MOD + @param names: the names of the locks which shall be removed + (special lock names, or instance/node names) + @param blocking: whether to block while trying to operate in + try-lock mode (this locking mode is not supported yet) """ assert level in LEVELS_MOD, "Invalid or immutable level %s" % level @@ -763,5 +861,4 @@ class GanetiLockManager: assert self._is_owned(level) or not self._upper_owned(level), ( "Cannot remove locks at a level while not owning it or" " owning some at a greater one") - return self.__keyring[level].remove(names, blocking) - + return self.__keyring[level].remove(names, blocking=blocking)