# Wouldn't it be better to define LockingError in the locking module?
# Well, for now that's how the rest of the code does it...
from ganeti import errors
+from ganeti import utils
+
+
+def ssynchronized(lock, shared=0):
+ """Shared Synchronization decorator.
+
+ Calls the function holding the given lock, either in exclusive or shared
+ mode. It requires the passed lock to be a SharedLock (or support its
+ semantics).
+
+ """
+ def wrap(fn):
+ def sync_function(*args, **kwargs):
+ lock.acquire(shared=shared)
+ try:
+ return fn(*args, **kwargs)
+ finally:
+ lock.release()
+ return sync_function
+ return wrap
class SharedLock:
# lock waiters
self.__nwait_exc = 0
self.__nwait_shr = 0
+ self.__npass_shr = 0
# is this lock in the deleted state?
self.__deleted = False
def _is_owned(self, shared=-1):
"""Is the current thread somehow owning the lock at this time?
- Args:
- shared:
- < 0: check for any type of ownership (default)
- 0: check for exclusive ownership
- > 0: check for shared ownership
+ @param shared:
+ - < 0: check for any type of ownership (default)
+ - 0: check for exclusive ownership
+ - > 0: check for shared ownership
"""
self.__lock.acquire()
try:
- result = self.__is_owned(shared)
+ result = self.__is_owned(shared=shared)
finally:
self.__lock.release()
return result
- def __wait(self,c):
+ def __wait(self, c):
"""Wait on the given condition, and raise an exception if the current lock
is declared deleted in the meantime.
- Args:
- c: condition to wait on
+ @param c: the condition to wait on
"""
c.wait()
finally:
self.__nwait_exc -= 1
+ assert self.__npass_shr == 0, "SharedLock: internal fairness violation"
def acquire(self, blocking=1, shared=0):
"""Acquire a shared lock.
- Args:
- shared: whether to acquire in shared mode. By default an exclusive lock
- will be acquired.
- blocking: whether to block while trying to acquire or to operate in try-lock mode.
- this locking mode is not supported yet.
+ @param shared: whether to acquire in shared mode; by default an
+ exclusive lock will be acquired
+ @param blocking: whether to block while trying to acquire or to
+ operate in try-lock mode (this locking mode is not supported yet)
"""
if not blocking:
# We cannot acquire the lock if we already have it
assert not self.__is_owned(), "double acquire() on a non-recursive lock"
+ assert self.__npass_shr >= 0, "Internal fairness condition weirdness"
if shared:
self.__nwait_shr += 1
try:
- # If there is an exclusive holder waiting we have to wait. We'll
- # only do this once, though, when we start waiting for the lock. Then
- # we'll just wait while there are no exclusive holders.
+ wait = False
+ # If there is an exclusive holder waiting we have to wait.
+ # We'll only do this once, though, when we start waiting for
+ # the lock. Then we'll just wait while there are no
+ # exclusive holders.
if self.__nwait_exc > 0:
# TODO: if !blocking...
+ wait = True
self.__wait(self.__turn_shr)
while self.__exc is not None:
+ wait = True
# TODO: if !blocking...
self.__wait(self.__turn_shr)
self.__shr.add(threading.currentThread())
+
+ # If we were waiting note that we passed
+ if wait:
+ self.__npass_shr -= 1
+
finally:
self.__nwait_shr -= 1
+ assert self.__npass_shr >= 0, "Internal fairness condition weirdness"
else:
# TODO: if !blocking...
# (or modify __exclusive_acquire for non-blocking mode)
"""
self.__lock.acquire()
try:
+ assert self.__npass_shr >= 0, "Internal fairness condition weirdness"
# Autodetect release type
if self.__is_exclusive():
self.__exc = None
# mode if there are shared holders waiting. Otherwise wake up the next
# exclusive holder.
if self.__nwait_shr > 0:
+ # Make sure at least the ones which were blocked pass.
+ self.__npass_shr = self.__nwait_shr
self.__turn_shr.notifyAll()
elif self.__nwait_exc > 0:
- self.__turn_exc.notify()
+ self.__turn_exc.notify()
elif self.__is_sharer():
self.__shr.remove(threading.currentThread())
- # If there are shared holders waiting there *must* be an exclusive holder
- # waiting as well; otherwise what were they waiting for?
- assert (self.__nwait_shr == 0 or self.__nwait_exc > 0,
- "Lock sharers waiting while no exclusive is queueing")
-
- # If there are no more shared holders and some exclusive holders are
- # waiting let's wake one up.
- if len(self.__shr) == 0 and self.__nwait_exc > 0:
+ # If there are shared holders waiting (and not just scheduled to pass)
+ # there *must* be an exclusive holder waiting as well; otherwise what
+ # were they waiting for?
+ assert (self.__nwait_exc > 0 or
+ self.__npass_shr == self.__nwait_shr), \
+ "Lock sharers waiting while no exclusive is queueing"
+
+ # If there are no more shared holders either in or scheduled to pass,
+ # and some exclusive holders are waiting let's wake one up.
+ if (len(self.__shr) == 0 and
+ self.__nwait_exc > 0 and
+ not self.__npass_shr > 0):
self.__turn_exc.notify()
else:
acquired in exclusive mode if you don't already own it, then the lock
will be put in a state where any future and pending acquire() fail.
- Args:
- blocking: whether to block while trying to acquire or to operate in
- try-lock mode. this locking mode is not supported yet unless
- you are already holding exclusively the lock.
+ @param blocking: whether to block while trying to acquire or to
+ operate in try-lock mode. this locking mode is not supported
+ yet unless you are already holding exclusively the lock.
"""
self.__lock.acquire()
self.__lock.release()
+# Whenever we want to acquire a full LockSet we pass None as the value
+# to acquire. Hide this behing this nicely named constant.
+ALL_SET = None
+
+
class LockSet:
"""Implements a set of locks.
def __init__(self, members=None):
"""Constructs a new LockSet.
- Args:
- members: initial members of the set
+ @param members: initial members of the set
"""
# Used internally to guarantee coherency.
"""Is the current thread a current level owner?"""
return threading.currentThread() in self.__owners
- def _add_owned(self, name):
+ def _add_owned(self, name=None):
"""Note the current thread owns the given lock"""
- if self._is_owned():
- self.__owners[threading.currentThread()].add(name)
+ if name is None:
+ if not self._is_owned():
+ self.__owners[threading.currentThread()] = set()
else:
- self.__owners[threading.currentThread()] = set([name])
+ if self._is_owned():
+ self.__owners[threading.currentThread()].add(name)
+ else:
+ self.__owners[threading.currentThread()] = set([name])
+
- def _del_owned(self, name):
+ def _del_owned(self, name=None):
"""Note the current thread owns the given lock"""
- self.__owners[threading.currentThread()].remove(name)
- if not self.__owners[threading.currentThread()]:
+ if name is not None:
+ self.__owners[threading.currentThread()].remove(name)
+
+ # Only remove the key if we don't hold the set-lock as well
+ if (not self.__lock._is_owned() and
+ not self.__owners[threading.currentThread()]):
del self.__owners[threading.currentThread()]
def _list_owned(self):
result after releasing the lock.
"""
- return set(self.__lockdict.keys())
+ return self.__lockdict.keys()
def _names(self):
"""Return a copy of the current set of elements.
Used only for debugging purposes.
+
"""
- self.__lock.acquire(shared=1)
+ # If we don't already own the set-level lock acquired
+ # we'll get it and note we need to release it later.
+ release_lock = False
+ if not self.__lock._is_owned():
+ release_lock = True
+ self.__lock.acquire(shared=1)
try:
result = self.__names()
finally:
- self.__lock.release()
- return result
+ if release_lock:
+ self.__lock.release()
+ return set(result)
def acquire(self, names, blocking=1, shared=0):
"""Acquire a set of resource locks.
- Args:
- names: the names of the locks which shall be acquired.
- (special lock names, or instance/node names)
- shared: whether to acquire in shared mode. By default an exclusive lock
- will be acquired.
- blocking: whether to block while trying to acquire or to operate in try-lock mode.
- this locking mode is not supported yet.
+ @param names: the names of the locks which shall be acquired
+ (special lock names, or instance/node names)
+ @param shared: whether to acquire in shared mode; by default an
+ exclusive lock will be acquired
+ @param blocking: whether to block while trying to acquire or to
+ operate in try-lock mode (this locking mode is not supported yet)
- Returns:
- True: when all the locks are successfully acquired
+ @return: True when all the locks are successfully acquired
- Raises:
- errors.LockError: when any lock we try to acquire has been deleted
- before we succeed. In this case none of the locks requested will be
- acquired.
+ @raise errors.LockError: when any lock we try to acquire has
+ been deleted before we succeed. In this case none of the
+ locks requested will be acquired.
"""
if not blocking:
# Check we don't already own locks at this level
assert not self._is_owned(), "Cannot acquire locks in the same set twice"
- # Support passing in a single resource to acquire rather than many
- if isinstance(names, basestring):
- names = [names]
- else:
- names.sort()
+ if names is None:
+ # If no names are given acquire the whole set by not letting new names
+ # being added before we release, and getting the current list of names.
+ # Some of them may then be deleted later, but we'll cope with this.
+ #
+ # We'd like to acquire this lock in a shared way, as it's nice if
+ # everybody else can use the instances at the same time. If are acquiring
+ # them exclusively though they won't be able to do this anyway, though,
+ # so we'll get the list lock exclusively as well in order to be able to
+ # do add() on the set while owning it.
+ self.__lock.acquire(shared=shared)
+ try:
+ # note we own the set-lock
+ self._add_owned()
+ names = self.__names()
+ except:
+ # We shouldn't have problems adding the lock to the owners list, but
+ # if we did we'll try to release this lock and re-raise exception.
+ # Of course something is going to be really wrong, after this.
+ self.__lock.release()
+ raise
- acquire_list = []
- # First we look the locks up on __lockdict. We have no way of being sure
- # they will still be there after, but this makes it a lot faster should
- # just one of them be the already wrong
- try:
- for lname in names:
- lock = self.__lockdict[lname] # raises KeyError if the lock is not there
- acquire_list.append((lname, lock))
- except (KeyError):
- raise errors.LockError('non-existing lock in set (%s)' % lname)
-
- # Now acquire_list contains a sorted list of resources and locks we want.
- # In order to get them we loop on this (private) list and acquire() them.
- # We gave no real guarantee they will still exist till this is done but
- # .acquire() itself is safe and will alert us if the lock gets deleted.
try:
+ # Support passing in a single resource to acquire rather than many
+ if isinstance(names, basestring):
+ names = [names]
+ else:
+ names = sorted(names)
+
+ acquire_list = []
+ # First we look the locks up on __lockdict. We have no way of being sure
+ # they will still be there after, but this makes it a lot faster should
+ # just one of them be the already wrong
+ for lname in utils.UniqueSequence(names):
+ try:
+ lock = self.__lockdict[lname] # raises KeyError if lock is not there
+ acquire_list.append((lname, lock))
+ except (KeyError):
+ if self.__lock._is_owned():
+ # We are acquiring all the set, it doesn't matter if this
+ # particular element is not there anymore.
+ continue
+ else:
+ raise errors.LockError('non-existing lock in set (%s)' % lname)
+
+ # This will hold the locknames we effectively acquired.
+ acquired = set()
+ # Now acquire_list contains a sorted list of resources and locks we want.
+ # In order to get them we loop on this (private) list and acquire() them.
+ # We gave no real guarantee they will still exist till this is done but
+ # .acquire() itself is safe and will alert us if the lock gets deleted.
for (lname, lock) in acquire_list:
- lock.acquire(shared=shared) # raises LockError if the lock is deleted
try:
+ lock.acquire(shared=shared) # raises LockError if the lock is deleted
# now the lock cannot be deleted, we have it!
- self._add_owned(lname)
+ self._add_owned(name=lname)
+ acquired.add(lname)
+ except (errors.LockError):
+ if self.__lock._is_owned():
+ # We are acquiring all the set, it doesn't matter if this
+ # particular element is not there anymore.
+ continue
+ else:
+ name_fail = lname
+ for lname in self._list_owned():
+ self.__lockdict[lname].release()
+ self._del_owned(name=lname)
+ raise errors.LockError('non-existing lock in set (%s)' % name_fail)
except:
# We shouldn't have problems adding the lock to the owners list, but
# if we did we'll try to release this lock and re-raise exception.
# Of course something is going to be really wrong, after this.
- lock.release()
- raise
+ if lock._is_owned():
+ lock.release()
+ raise
- except (errors.LockError):
- name_fail = lname
- for lname in self._list_owned():
- self.__lockdict[lname].release()
- self._del_owned(lname)
- raise errors.LockError('non-existing lock in set (%s)' % name_fail)
+ except:
+ # If something went wrong and we had the set-lock let's release it...
+ if self.__lock._is_owned():
+ self.__lock.release()
+ raise
- return True
+ return acquired
def release(self, names=None):
"""Release a set of resource locks, at the same level.
You must have acquired the locks, either in shared or in exclusive mode,
before releasing them.
- Args:
- names: the names of the locks which shall be released.
- (defaults to all the locks acquired at that level).
+ @param names: the names of the locks which shall be released
+ (defaults to all the locks acquired at that level).
"""
-
assert self._is_owned(), "release() on lock set while not owner"
# Support passing in a single resource to release rather than many
"release() on unheld resources %s" %
names.difference(self._list_owned()))
+ # First of all let's release the "all elements" lock, if set.
+ # After this 'add' can work again
+ if self.__lock._is_owned():
+ self.__lock.release()
+ self._del_owned()
+
for lockname in names:
# If we are sure the lock doesn't leave __lockdict without being
# exclusively held we can do this...
self.__lockdict[lockname].release()
- self._del_owned(lockname)
+ self._del_owned(name=lockname)
def add(self, names, acquired=0, shared=0):
"""Add a new set of elements to the set
- Args:
- names: names of the new elements to add
- acquired: pre-acquire the new resource?
- shared: is the pre-acquisition shared?
+ @param names: names of the new elements to add
+ @param acquired: pre-acquire the new resource?
+ @param shared: is the pre-acquisition shared?
"""
+ # Check we don't already own locks at this level
+ assert not self._is_owned() or self.__lock._is_owned(shared=0), \
+ "Cannot add locks if the set is only partially owned, or shared"
+
# Support passing in a single resource to add rather than many
if isinstance(names, basestring):
names = [names]
- # Acquire the internal lock in an exclusive way, so there cannot be a
- # conflicting add()
- self.__lock.acquire()
+ # If we don't already own the set-level lock acquired in an exclusive way
+ # we'll get it and note we need to release it later.
+ release_lock = False
+ if not self.__lock._is_owned():
+ release_lock = True
+ self.__lock.acquire()
+
try:
- invalid_names = self.__names().intersection(names)
+ invalid_names = set(self.__names()).intersection(names)
if invalid_names:
# This must be an explicit raise, not an assert, because assert is
# turned off when using optimization, and this can happen because of
lock.acquire(shared=shared)
# now the lock cannot be deleted, we have it!
try:
- self._add_owned(lockname)
+ self._add_owned(name=lockname)
except:
# We shouldn't have problems adding the lock to the owners list,
# but if we did we'll try to release this lock and re-raise
self.__lockdict[lockname] = lock
finally:
- self.__lock.release()
+ # Only release __lock if we were not holding it previously.
+ if release_lock:
+ self.__lock.release()
return True
You can either not hold anything in the lockset or already hold a superset
of the elements you want to delete, exclusively.
- Args:
- names: names of the resource to remove.
- blocking: whether to block while trying to acquire or to operate in
- try-lock mode. this locking mode is not supported yet unless
- you are already holding exclusively the locks.
+ @param names: names of the resource to remove.
+ @param blocking: whether to block while trying to acquire or to
+ operate in try-lock mode (this locking mode is not supported
+ yet unless you are already holding exclusively the locks)
- Returns:
- A list of lock which we failed to delete. The list is always empty if we
- were holding all the locks exclusively.
+ @return:: a list of locks which we removed; the list is always
+ equal to the names list if we were holding all the locks
+ exclusively
"""
if not blocking and not self._is_owned():
assert not self._is_owned() or self._list_owned().issuperset(names), (
"remove() on acquired lockset while not owning all elements")
- delete_failed=[]
+ removed = []
for lname in names:
# Calling delete() acquires the lock exclusively if we don't already own
# everything we want to delete, or we hold none.
try:
self.__lockdict[lname].delete()
+ removed.append(lname)
except (KeyError, errors.LockError):
- delete_failed.append(lname)
# This cannot happen if we were already holding it, verify:
assert not self._is_owned(), "remove failed while holding lockset"
else:
del self.__lockdict[lname]
# And let's remove it from our private list if we owned it.
if self._is_owned():
- self._del_owned(lname)
+ self._del_owned(name=lname)
+
+ return removed
+
+
+# Locking levels, must be acquired in increasing order.
+# Current rules are:
+# - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
+# acquired before performing any operation, either in shared or in exclusive
+# mode. acquiring the BGL in exclusive mode is discouraged and should be
+# avoided.
+# - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
+# If you need more than one node, or more than one instance, acquire them at
+# the same time.
+LEVEL_CLUSTER = 0
+LEVEL_INSTANCE = 1
+LEVEL_NODE = 2
+
+LEVELS = [LEVEL_CLUSTER,
+ LEVEL_INSTANCE,
+ LEVEL_NODE]
+
+# Lock levels which are modifiable
+LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
+
+# Constant for the big ganeti lock
+BGL = 'BGL'
+
+
+class GanetiLockManager:
+ """The Ganeti Locking Library
- return delete_failed
+ The purpouse of this small library is to manage locking for ganeti clusters
+ in a central place, while at the same time doing dynamic checks against
+ possible deadlocks. It will also make it easier to transition to a different
+ lock type should we migrate away from python threads.
+ """
+ _instance = None
+
+ def __init__(self, nodes=None, instances=None):
+ """Constructs a new GanetiLockManager object.
+
+ There should be only a GanetiLockManager object at any time, so this
+ function raises an error if this is not the case.
+
+ @param nodes: list of node names
+ @param instances: list of instance names
+
+ """
+ assert self.__class__._instance is None, \
+ "double GanetiLockManager instance"
+
+ self.__class__._instance = self
+
+ # The keyring contains all the locks, at their level and in the correct
+ # locking order.
+ self.__keyring = {
+ LEVEL_CLUSTER: LockSet([BGL]),
+ LEVEL_NODE: LockSet(nodes),
+ LEVEL_INSTANCE: LockSet(instances),
+ }
+
+ def _names(self, level):
+ """List the lock names at the given level.
+
+ This can be used for debugging/testing purposes.
+
+ @param level: the level whose list of locks to get
+
+ """
+ assert level in LEVELS, "Invalid locking level %s" % level
+ return self.__keyring[level]._names()
+
+ def _is_owned(self, level):
+ """Check whether we are owning locks at the given level
+
+ """
+ return self.__keyring[level]._is_owned()
+
+ is_owned = _is_owned
+
+ def _list_owned(self, level):
+ """Get the set of owned locks at the given level
+
+ """
+ return self.__keyring[level]._list_owned()
+
+ def _upper_owned(self, level):
+ """Check that we don't own any lock at a level greater than the given one.
+
+ """
+ # This way of checking only works if LEVELS[i] = i, which we check for in
+ # the test cases.
+ return utils.any((self._is_owned(l) for l in LEVELS[level + 1:]))
+
+ def _BGL_owned(self):
+ """Check if the current thread owns the BGL.
+
+ Both an exclusive or a shared acquisition work.
+
+ """
+ return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
+
+ def _contains_BGL(self, level, names):
+ """Check if the level contains the BGL.
+
+ Check if acting on the given level and set of names will change
+ the status of the Big Ganeti Lock.
+
+ """
+ return level == LEVEL_CLUSTER and (names is None or BGL in names)
+
+ def acquire(self, level, names, blocking=1, shared=0):
+ """Acquire a set of resource locks, at the same level.
+
+ @param level: the level at which the locks shall be acquired;
+ it must be a memmber of LEVELS.
+ @param names: the names of the locks which shall be acquired
+ (special lock names, or instance/node names)
+ @param shared: whether to acquire in shared mode; by default
+ an exclusive lock will be acquired
+ @param blocking: whether to block while trying to acquire or to
+ operate in try-lock mode (this locking mode is not supported yet)
+
+ """
+ assert level in LEVELS, "Invalid locking level %s" % level
+
+ # Check that we are either acquiring the Big Ganeti Lock or we already own
+ # it. Some "legacy" opcodes need to be sure they are run non-concurrently
+ # so even if we've migrated we need to at least share the BGL to be
+ # compatible with them. Of course if we own the BGL exclusively there's no
+ # point in acquiring any other lock, unless perhaps we are half way through
+ # the migration of the current opcode.
+ assert (self._contains_BGL(level, names) or self._BGL_owned()), (
+ "You must own the Big Ganeti Lock before acquiring any other")
+
+ # Check we don't own locks at the same or upper levels.
+ assert not self._upper_owned(level), ("Cannot acquire locks at a level"
+ " while owning some at a greater one")
+
+ # Acquire the locks in the set.
+ return self.__keyring[level].acquire(names, shared=shared,
+ blocking=blocking)
+
+ def release(self, level, names=None):
+ """Release a set of resource locks, at the same level.
+
+ You must have acquired the locks, either in shared or in exclusive
+ mode, before releasing them.
+
+ @param level: the level at which the locks shall be released;
+ it must be a memmber of LEVELS
+ @param names: the names of the locks which shall be released
+ (defaults to all the locks acquired at that level)
+
+ """
+ assert level in LEVELS, "Invalid locking level %s" % level
+ assert (not self._contains_BGL(level, names) or
+ not self._upper_owned(LEVEL_CLUSTER)), (
+ "Cannot release the Big Ganeti Lock while holding something"
+ " at upper levels")
+
+ # Release will complain if we don't own the locks already
+ return self.__keyring[level].release(names)
+
+ def add(self, level, names, acquired=0, shared=0):
+ """Add locks at the specified level.
+
+ @param level: the level at which the locks shall be added;
+ it must be a memmber of LEVELS_MOD.
+ @param names: names of the locks to acquire
+ @param acquired: whether to acquire the newly added locks
+ @param shared: whether the acquisition will be shared
+
+ """
+ assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
+ assert self._BGL_owned(), ("You must own the BGL before performing other"
+ " operations")
+ assert not self._upper_owned(level), ("Cannot add locks at a level"
+ " while owning some at a greater one")
+ return self.__keyring[level].add(names, acquired=acquired, shared=shared)
+
+ def remove(self, level, names, blocking=1):
+ """Remove locks from the specified level.
+
+ You must either already own the locks you are trying to remove
+ exclusively or not own any lock at an upper level.
+
+ @param level: the level at which the locks shall be removed;
+ it must be a member of LEVELS_MOD
+ @param names: the names of the locks which shall be removed
+ (special lock names, or instance/node names)
+ @param blocking: whether to block while trying to operate in
+ try-lock mode (this locking mode is not supported yet)
+
+ """
+ assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
+ assert self._BGL_owned(), ("You must own the BGL before performing other"
+ " operations")
+ # Check we either own the level or don't own anything from here
+ # up. LockSet.remove() will check the case in which we don't own
+ # all the needed resources, or we have a shared ownership.
+ assert self._is_owned(level) or not self._upper_owned(level), (
+ "Cannot remove locks at a level while not owning it or"
+ " owning some at a greater one")
+ return self.__keyring[level].remove(names, blocking=blocking)