X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/b2dabfd6457eda98987ff13f9c1e928c31a46866..fc8a6b8f3334c1352caafcede1e31d801fc146ba:/lib/locking.py diff --git a/lib/locking.py b/lib/locking.py index 25a8bf1..9fb6507 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -20,7 +20,10 @@ """Module implementing the Ganeti locking code.""" -# pylint: disable-msg=W0613,W0201 +# pylint: disable-msg=W0212 + +# W0212 since e.g. LockSet methods use (a lot) the internals of +# SharedLock import threading # Wouldn't it be better to define LockingError in the locking module? @@ -29,6 +32,25 @@ from ganeti import errors from ganeti import utils +def ssynchronized(lock, shared=0): + """Shared Synchronization decorator. + + Calls the function holding the given lock, either in exclusive or shared + mode. It requires the passed lock to be a SharedLock (or support its + semantics). + + """ + def wrap(fn): + def sync_function(*args, **kwargs): + lock.acquire(shared=shared) + try: + return fn(*args, **kwargs) + finally: + lock.release() + return sync_function + return wrap + + class SharedLock: """Implements a shared lock. @@ -55,6 +77,7 @@ class SharedLock: # lock waiters self.__nwait_exc = 0 self.__nwait_shr = 0 + self.__npass_shr = 0 # is this lock in the deleted state? self.__deleted = False @@ -84,11 +107,10 @@ class SharedLock: def _is_owned(self, shared=-1): """Is the current thread somehow owning the lock at this time? - Args: - shared: - < 0: check for any type of ownership (default) - 0: check for exclusive ownership - > 0: check for shared ownership + @param shared: + - < 0: check for any type of ownership (default) + - 0: check for exclusive ownership + - > 0: check for shared ownership """ self.__lock.acquire() @@ -103,8 +125,7 @@ class SharedLock: """Wait on the given condition, and raise an exception if the current lock is declared deleted in the meantime. - Args: - c: condition to wait on + @param c: the condition to wait on """ c.wait() @@ -133,14 +154,15 @@ class SharedLock: finally: self.__nwait_exc -= 1 + assert self.__npass_shr == 0, "SharedLock: internal fairness violation" + def acquire(self, blocking=1, shared=0): """Acquire a shared lock. - Args: - shared: whether to acquire in shared mode. By default an exclusive lock - will be acquired. - blocking: whether to block while trying to acquire or to operate in try-lock mode. - this locking mode is not supported yet. + @param shared: whether to acquire in shared mode; by default an + exclusive lock will be acquired + @param blocking: whether to block while trying to acquire or to + operate in try-lock mode (this locking mode is not supported yet) """ if not blocking: @@ -154,25 +176,36 @@ class SharedLock: # We cannot acquire the lock if we already have it assert not self.__is_owned(), "double acquire() on a non-recursive lock" + assert self.__npass_shr >= 0, "Internal fairness condition weirdness" if shared: self.__nwait_shr += 1 try: - # If there is an exclusive holder waiting we have to wait. We'll - # only do this once, though, when we start waiting for the lock. Then - # we'll just wait while there are no exclusive holders. + wait = False + # If there is an exclusive holder waiting we have to wait. + # We'll only do this once, though, when we start waiting for + # the lock. Then we'll just wait while there are no + # exclusive holders. if self.__nwait_exc > 0: # TODO: if !blocking... + wait = True self.__wait(self.__turn_shr) while self.__exc is not None: + wait = True # TODO: if !blocking... self.__wait(self.__turn_shr) self.__shr.add(threading.currentThread()) + + # If we were waiting note that we passed + if wait: + self.__npass_shr -= 1 + finally: self.__nwait_shr -= 1 + assert self.__npass_shr >= 0, "Internal fairness condition weirdness" else: # TODO: if !blocking... # (or modify __exclusive_acquire for non-blocking mode) @@ -192,6 +225,7 @@ class SharedLock: """ self.__lock.acquire() try: + assert self.__npass_shr >= 0, "Internal fairness condition weirdness" # Autodetect release type if self.__is_exclusive(): self.__exc = None @@ -200,16 +234,27 @@ class SharedLock: # mode if there are shared holders waiting. Otherwise wake up the next # exclusive holder. if self.__nwait_shr > 0: + # Make sure at least the ones which were blocked pass. + self.__npass_shr = self.__nwait_shr self.__turn_shr.notifyAll() elif self.__nwait_exc > 0: - self.__turn_exc.notify() + self.__turn_exc.notify() elif self.__is_sharer(): self.__shr.remove(threading.currentThread()) - # If there are no more shared holders and some exclusive holders are - # waiting let's wake one up. - if len(self.__shr) == 0 and self.__nwait_exc > 0: + # If there are shared holders waiting (and not just scheduled to pass) + # there *must* be an exclusive holder waiting as well; otherwise what + # were they waiting for? + assert (self.__nwait_exc > 0 or + self.__npass_shr == self.__nwait_shr), \ + "Lock sharers waiting while no exclusive is queueing" + + # If there are no more shared holders either in or scheduled to pass, + # and some exclusive holders are waiting let's wake one up. + if (len(self.__shr) == 0 and + self.__nwait_exc > 0 and + not self.__npass_shr > 0): self.__turn_exc.notify() else: @@ -225,10 +270,9 @@ class SharedLock: acquired in exclusive mode if you don't already own it, then the lock will be put in a state where any future and pending acquire() fail. - Args: - blocking: whether to block while trying to acquire or to operate in - try-lock mode. this locking mode is not supported yet unless - you are already holding exclusively the lock. + @param blocking: whether to block while trying to acquire or to + operate in try-lock mode. this locking mode is not supported + yet unless you are already holding exclusively the lock. """ self.__lock.acquire() @@ -255,6 +299,11 @@ class SharedLock: self.__lock.release() +# Whenever we want to acquire a full LockSet we pass None as the value +# to acquire. Hide this behind this nicely named constant. +ALL_SET = None + + class LockSet: """Implements a set of locks. @@ -269,8 +318,7 @@ class LockSet: def __init__(self, members=None): """Constructs a new LockSet. - Args: - members: initial members of the set + @param members: initial members of the set """ # Used internally to guarantee coherency. @@ -342,31 +390,34 @@ class LockSet: Used only for debugging purposes. """ - self.__lock.acquire(shared=1) + # If we don't already own the set-level lock acquired + # we'll get it and note we need to release it later. + release_lock = False + if not self.__lock._is_owned(): + release_lock = True + self.__lock.acquire(shared=1) try: result = self.__names() finally: - self.__lock.release() + if release_lock: + self.__lock.release() return set(result) def acquire(self, names, blocking=1, shared=0): """Acquire a set of resource locks. - Args: - names: the names of the locks which shall be acquired. - (special lock names, or instance/node names) - shared: whether to acquire in shared mode. By default an exclusive lock - will be acquired. - blocking: whether to block while trying to acquire or to operate in try-lock mode. - this locking mode is not supported yet. + @param names: the names of the locks which shall be acquired + (special lock names, or instance/node names) + @param shared: whether to acquire in shared mode; by default an + exclusive lock will be acquired + @param blocking: whether to block while trying to acquire or to + operate in try-lock mode (this locking mode is not supported yet) - Returns: - True: when all the locks are successfully acquired + @return: True when all the locks are successfully acquired - Raises: - errors.LockError: when any lock we try to acquire has been deleted - before we succeed. In this case none of the locks requested will be - acquired. + @raise errors.LockError: when any lock we try to acquire has + been deleted before we succeed. In this case none of the + locks requested will be acquired. """ if not blocking: @@ -403,20 +454,20 @@ class LockSet: if isinstance(names, basestring): names = [names] else: - names.sort() + names = sorted(names) acquire_list = [] # First we look the locks up on __lockdict. We have no way of being sure # they will still be there after, but this makes it a lot faster should # just one of them be the already wrong - for lname in names: + for lname in utils.UniqueSequence(names): try: - lock = self.__lockdict[lname] # raises KeyError if the lock is not there + lock = self.__lockdict[lname] # raises KeyError if lock is not there acquire_list.append((lname, lock)) except (KeyError): if self.__lock._is_owned(): - # We are acquiring all the set, it doesn't matter if this particular - # element is not there anymore. + # We are acquiring all the set, it doesn't matter if this + # particular element is not there anymore. continue else: raise errors.LockError('non-existing lock in set (%s)' % lname) @@ -435,8 +486,8 @@ class LockSet: acquired.add(lname) except (errors.LockError): if self.__lock._is_owned(): - # We are acquiring all the set, it doesn't matter if this particular - # element is not there anymore. + # We are acquiring all the set, it doesn't matter if this + # particular element is not there anymore. continue else: name_fail = lname @@ -450,7 +501,7 @@ class LockSet: # Of course something is going to be really wrong, after this. if lock._is_owned(): lock.release() - raise + raise except: # If something went wrong and we had the set-lock let's release it... @@ -466,9 +517,8 @@ class LockSet: You must have acquired the locks, either in shared or in exclusive mode, before releasing them. - Args: - names: the names of the locks which shall be released. - (defaults to all the locks acquired at that level). + @param names: the names of the locks which shall be released + (defaults to all the locks acquired at that level). """ assert self._is_owned(), "release() on lock set while not owner" @@ -500,21 +550,20 @@ class LockSet: def add(self, names, acquired=0, shared=0): """Add a new set of elements to the set - Args: - names: names of the new elements to add - acquired: pre-acquire the new resource? - shared: is the pre-acquisition shared? + @param names: names of the new elements to add + @param acquired: pre-acquire the new resource? + @param shared: is the pre-acquisition shared? """ - - assert not self.__lock._is_owned(shared=1), ( - "Cannot add new elements while sharing the set-lock") + # Check we don't already own locks at this level + assert not self._is_owned() or self.__lock._is_owned(shared=0), \ + "Cannot add locks if the set is only partially owned, or shared" # Support passing in a single resource to add rather than many if isinstance(names, basestring): names = [names] - # If we don't already own the set-level lock acquire it in an exclusive way + # If we don't already own the set-level lock acquired in an exclusive way # we'll get it and note we need to release it later. release_lock = False if not self.__lock._is_owned(): @@ -562,15 +611,14 @@ class LockSet: You can either not hold anything in the lockset or already hold a superset of the elements you want to delete, exclusively. - Args: - names: names of the resource to remove. - blocking: whether to block while trying to acquire or to operate in - try-lock mode. this locking mode is not supported yet unless - you are already holding exclusively the locks. + @param names: names of the resource to remove. + @param blocking: whether to block while trying to acquire or to + operate in try-lock mode (this locking mode is not supported + yet unless you are already holding exclusively the locks) - Returns: - A list of lock which we removed. The list is always equal to the names - list if we were holding all the locks exclusively. + @return:: a list of locks which we removed; the list is always + equal to the names list if we were holding all the locks + exclusively """ if not blocking and not self._is_owned(): @@ -626,30 +674,25 @@ class LockSet: # - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks. # If you need more than one node, or more than one instance, acquire them at # the same time. -# - level LEVEL_CONFIG contains the configuration lock, which you must acquire -# before reading or changing the config file. LEVEL_CLUSTER = 0 -LEVEL_NODE = 1 -LEVEL_INSTANCE = 2 -LEVEL_CONFIG = 3 +LEVEL_INSTANCE = 1 +LEVEL_NODE = 2 LEVELS = [LEVEL_CLUSTER, - LEVEL_NODE, LEVEL_INSTANCE, - LEVEL_CONFIG] + LEVEL_NODE] # Lock levels which are modifiable LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE] -# Constant for the big ganeti lock and config lock +# Constant for the big ganeti lock BGL = 'BGL' -CONFIG = 'config' class GanetiLockManager: """The Ganeti Locking Library - The purpouse of this small library is to manage locking for ganeti clusters + The purpose of this small library is to manage locking for ganeti clusters in a central place, while at the same time doing dynamic checks against possible deadlocks. It will also make it easier to transition to a different lock type should we migrate away from python threads. @@ -660,16 +703,16 @@ class GanetiLockManager: def __init__(self, nodes=None, instances=None): """Constructs a new GanetiLockManager object. - There should be only a - GanetiLockManager object at any time, so this function raises an error if this - is not the case. + There should be only a GanetiLockManager object at any time, so this + function raises an error if this is not the case. - Args: - nodes: list of node names - instances: list of instance names + @param nodes: list of node names + @param instances: list of instance names """ - assert self.__class__._instance is None, "double GanetiLockManager instance" + assert self.__class__._instance is None, \ + "double GanetiLockManager instance" + self.__class__._instance = self # The keyring contains all the locks, at their level and in the correct @@ -678,15 +721,14 @@ class GanetiLockManager: LEVEL_CLUSTER: LockSet([BGL]), LEVEL_NODE: LockSet(nodes), LEVEL_INSTANCE: LockSet(instances), - LEVEL_CONFIG: LockSet([CONFIG]), } def _names(self, level): """List the lock names at the given level. - Used for debugging/testing purposes. - Args: - level: the level whose list of locks to get + This can be used for debugging/testing purposes. + + @param level: the level whose list of locks to get """ assert level in LEVELS, "Invalid locking level %s" % level @@ -698,6 +740,8 @@ class GanetiLockManager: """ return self.__keyring[level]._is_owned() + is_owned = _is_owned + def _list_owned(self, level): """Get the set of owned locks at the given level @@ -712,7 +756,7 @@ class GanetiLockManager: # the test cases. return utils.any((self._is_owned(l) for l in LEVELS[level + 1:])) - def _BGL_owned(self): + def _BGL_owned(self): # pylint: disable-msg=C0103 """Check if the current thread owns the BGL. Both an exclusive or a shared acquisition work. @@ -720,9 +764,12 @@ class GanetiLockManager: """ return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned() - def _contains_BGL(self, level, names): - """Check if acting on the given level and set of names will change the - status of the Big Ganeti Lock. + @staticmethod + def _contains_BGL(level, names): # pylint: disable-msg=C0103 + """Check if the level contains the BGL. + + Check if acting on the given level and set of names will change + the status of the Big Ganeti Lock. """ return level == LEVEL_CLUSTER and (names is None or BGL in names) @@ -730,15 +777,14 @@ class GanetiLockManager: def acquire(self, level, names, blocking=1, shared=0): """Acquire a set of resource locks, at the same level. - Args: - level: the level at which the locks shall be acquired. - It must be a memmber of LEVELS. - names: the names of the locks which shall be acquired. - (special lock names, or instance/node names) - shared: whether to acquire in shared mode. By default an exclusive lock - will be acquired. - blocking: whether to block while trying to acquire or to operate in try-lock mode. - this locking mode is not supported yet. + @param level: the level at which the locks shall be acquired; + it must be a member of LEVELS. + @param names: the names of the locks which shall be acquired + (special lock names, or instance/node names) + @param shared: whether to acquire in shared mode; by default + an exclusive lock will be acquired + @param blocking: whether to block while trying to acquire or to + operate in try-lock mode (this locking mode is not supported yet) """ assert level in LEVELS, "Invalid locking level %s" % level @@ -753,7 +799,7 @@ class GanetiLockManager: "You must own the Big Ganeti Lock before acquiring any other") # Check we don't own locks at the same or upper levels. - assert not self._upper_owned(level), ("Cannot acquire locks at a level" + assert not self._upper_owned(level), ("Cannot acquire locks at a level" " while owning some at a greater one") # Acquire the locks in the set. @@ -763,14 +809,13 @@ class GanetiLockManager: def release(self, level, names=None): """Release a set of resource locks, at the same level. - You must have acquired the locks, either in shared or in exclusive mode, - before releasing them. + You must have acquired the locks, either in shared or in exclusive + mode, before releasing them. - Args: - level: the level at which the locks shall be released. - It must be a memmber of LEVELS. - names: the names of the locks which shall be released. - (defaults to all the locks acquired at that level). + @param level: the level at which the locks shall be released; + it must be a member of LEVELS + @param names: the names of the locks which shall be released + (defaults to all the locks acquired at that level) """ assert level in LEVELS, "Invalid locking level %s" % level @@ -785,12 +830,12 @@ class GanetiLockManager: def add(self, level, names, acquired=0, shared=0): """Add locks at the specified level. - Args: - level: the level at which the locks shall be added. - It must be a memmber of LEVELS_MOD. - names: names of the locks to acquire - acquired: whether to acquire the newly added locks - shared: whether the acquisition will be shared + @param level: the level at which the locks shall be added; + it must be a member of LEVELS_MOD. + @param names: names of the locks to acquire + @param acquired: whether to acquire the newly added locks + @param shared: whether the acquisition will be shared + """ assert level in LEVELS_MOD, "Invalid or immutable level %s" % level assert self._BGL_owned(), ("You must own the BGL before performing other" @@ -802,24 +847,23 @@ class GanetiLockManager: def remove(self, level, names, blocking=1): """Remove locks from the specified level. - You must either already own the locks you are trying to remove exclusively - or not own any lock at an upper level. + You must either already own the locks you are trying to remove + exclusively or not own any lock at an upper level. - Args: - level: the level at which the locks shall be removed. - It must be a memmber of LEVELS_MOD. - names: the names of the locks which shall be removed. - (special lock names, or instance/node names) - blocking: whether to block while trying to operate in try-lock mode. - this locking mode is not supported yet. + @param level: the level at which the locks shall be removed; + it must be a member of LEVELS_MOD + @param names: the names of the locks which shall be removed + (special lock names, or instance/node names) + @param blocking: whether to block while trying to operate in + try-lock mode (this locking mode is not supported yet) """ assert level in LEVELS_MOD, "Invalid or immutable level %s" % level assert self._BGL_owned(), ("You must own the BGL before performing other" " operations") - # Check we either own the level or don't own anything from here up. - # LockSet.remove() will check the case in which we don't own all the needed - # resources, or we have a shared ownership. + # Check we either own the level or don't own anything from here + # up. LockSet.remove() will check the case in which we don't own + # all the needed resources, or we have a shared ownership. assert self._is_owned(level) or not self._upper_owned(level), ( "Cannot remove locks at a level while not owning it or" " owning some at a greater one")