from ganeti import utils
+def ssynchronized(lock, shared=0):
+ """Shared Synchronization decorator.
+
+ Calls the function holding the given lock, either in exclusive or shared
+ mode. It requires the passed lock to be a SharedLock (or support its
+ semantics).
+
+ """
+ def wrap(fn):
+ def sync_function(*args, **kwargs):
+ lock.acquire(shared=shared)
+ try:
+ return fn(*args, **kwargs)
+ finally:
+ lock.release()
+ return sync_function
+ return wrap
+
+
class SharedLock:
"""Implements a shared lock.
# lock waiters
self.__nwait_exc = 0
self.__nwait_shr = 0
+ self.__npass_shr = 0
# is this lock in the deleted state?
self.__deleted = False
"""
self.__lock.acquire()
try:
- result = self.__is_owned(shared)
+ result = self.__is_owned(shared=shared)
finally:
self.__lock.release()
return result
- def __wait(self,c):
+ def __wait(self, c):
"""Wait on the given condition, and raise an exception if the current lock
is declared deleted in the meantime.
finally:
self.__nwait_exc -= 1
+ assert self.__npass_shr == 0, "SharedLock: internal fairness violation"
def acquire(self, blocking=1, shared=0):
"""Acquire a shared lock.
Args:
shared: whether to acquire in shared mode. By default an exclusive lock
will be acquired.
- blocking: whether to block while trying to acquire or to operate in try-lock mode.
- this locking mode is not supported yet.
+ blocking: whether to block while trying to acquire or to operate in
+ try-lock mode. this locking mode is not supported yet.
"""
if not blocking:
# We cannot acquire the lock if we already have it
assert not self.__is_owned(), "double acquire() on a non-recursive lock"
+ assert self.__npass_shr >= 0, "Internal fairness condition weirdness"
if shared:
self.__nwait_shr += 1
try:
+ wait = False
# If there is an exclusive holder waiting we have to wait. We'll
# only do this once, though, when we start waiting for the lock. Then
# we'll just wait while there are no exclusive holders.
if self.__nwait_exc > 0:
# TODO: if !blocking...
+ wait = True
self.__wait(self.__turn_shr)
while self.__exc is not None:
+ wait = True
# TODO: if !blocking...
self.__wait(self.__turn_shr)
self.__shr.add(threading.currentThread())
+
+ # If we were waiting note that we passed
+ if wait:
+ self.__npass_shr -= 1
+
finally:
self.__nwait_shr -= 1
+ assert self.__npass_shr >= 0, "Internal fairness condition weirdness"
else:
# TODO: if !blocking...
# (or modify __exclusive_acquire for non-blocking mode)
"""
self.__lock.acquire()
try:
+ assert self.__npass_shr >= 0, "Internal fairness condition weirdness"
# Autodetect release type
if self.__is_exclusive():
self.__exc = None
# mode if there are shared holders waiting. Otherwise wake up the next
# exclusive holder.
if self.__nwait_shr > 0:
+ # Make sure at least the ones which were blocked pass.
+ self.__npass_shr = self.__nwait_shr
self.__turn_shr.notifyAll()
elif self.__nwait_exc > 0:
- self.__turn_exc.notify()
+ self.__turn_exc.notify()
elif self.__is_sharer():
self.__shr.remove(threading.currentThread())
- # If there are shared holders waiting there *must* be an exclusive holder
- # waiting as well; otherwise what were they waiting for?
- assert (self.__nwait_shr == 0 or self.__nwait_exc > 0,
- "Lock sharers waiting while no exclusive is queueing")
-
- # If there are no more shared holders and some exclusive holders are
- # waiting let's wake one up.
- if len(self.__shr) == 0 and self.__nwait_exc > 0:
+ # If there are shared holders waiting (and not just scheduled to pass)
+ # there *must* be an exclusive holder waiting as well; otherwise what
+ # were they waiting for?
+ assert (self.__nwait_exc > 0 or self.__npass_shr == self.__nwait_shr), \
+ "Lock sharers waiting while no exclusive is queueing"
+
+ # If there are no more shared holders either in or scheduled to pass,
+ # and some exclusive holders are waiting let's wake one up.
+ if (len(self.__shr) == 0 and
+ self.__nwait_exc > 0 and
+ not self.__npass_shr > 0):
self.__turn_exc.notify()
else:
"""Is the current thread a current level owner?"""
return threading.currentThread() in self.__owners
- def _add_owned(self, name):
+ def _add_owned(self, name=None):
"""Note the current thread owns the given lock"""
- if self._is_owned():
- self.__owners[threading.currentThread()].add(name)
+ if name is None:
+ if not self._is_owned():
+ self.__owners[threading.currentThread()] = set()
else:
- self.__owners[threading.currentThread()] = set([name])
+ if self._is_owned():
+ self.__owners[threading.currentThread()].add(name)
+ else:
+ self.__owners[threading.currentThread()] = set([name])
+
- def _del_owned(self, name):
+ def _del_owned(self, name=None):
"""Note the current thread owns the given lock"""
- self.__owners[threading.currentThread()].remove(name)
- if not self.__owners[threading.currentThread()]:
+ if name is not None:
+ self.__owners[threading.currentThread()].remove(name)
+
+ # Only remove the key if we don't hold the set-lock as well
+ if (not self.__lock._is_owned() and
+ not self.__owners[threading.currentThread()]):
del self.__owners[threading.currentThread()]
def _list_owned(self):
"""Return a copy of the current set of elements.
Used only for debugging purposes.
+
"""
self.__lock.acquire(shared=1)
try:
(special lock names, or instance/node names)
shared: whether to acquire in shared mode. By default an exclusive lock
will be acquired.
- blocking: whether to block while trying to acquire or to operate in try-lock mode.
- this locking mode is not supported yet.
+ blocking: whether to block while trying to acquire or to operate in
+ try-lock mode. this locking mode is not supported yet.
Returns:
True: when all the locks are successfully acquired
# so we'll get the list lock exclusively as well in order to be able to
# do add() on the set while owning it.
self.__lock.acquire(shared=shared)
+ try:
+ # note we own the set-lock
+ self._add_owned()
+ names = self.__names()
+ except:
+ # We shouldn't have problems adding the lock to the owners list, but
+ # if we did we'll try to release this lock and re-raise exception.
+ # Of course something is going to be really wrong, after this.
+ self.__lock.release()
+ raise
try:
# Support passing in a single resource to acquire rather than many
if isinstance(names, basestring):
names = [names]
else:
- if names is None:
- names = self.__names()
names.sort()
acquire_list = []
# just one of them be the already wrong
for lname in names:
try:
- lock = self.__lockdict[lname] # raises KeyError if the lock is not there
+ lock = self.__lockdict[lname] # raises KeyError if lock is not there
acquire_list.append((lname, lock))
except (KeyError):
if self.__lock._is_owned():
for (lname, lock) in acquire_list:
try:
lock.acquire(shared=shared) # raises LockError if the lock is deleted
- try:
- # now the lock cannot be deleted, we have it!
- self._add_owned(lname)
- acquired.add(lname)
- except:
- # We shouldn't have problems adding the lock to the owners list, but
- # if we did we'll try to release this lock and re-raise exception.
- # Of course something is going to be really wrong, after this.
- lock.release()
- raise
-
+ # now the lock cannot be deleted, we have it!
+ self._add_owned(name=lname)
+ acquired.add(lname)
except (errors.LockError):
if self.__lock._is_owned():
# We are acquiring all the set, it doesn't matter if this particular
name_fail = lname
for lname in self._list_owned():
self.__lockdict[lname].release()
- self._del_owned(lname)
+ self._del_owned(name=lname)
raise errors.LockError('non-existing lock in set (%s)' % name_fail)
+ except:
+ # We shouldn't have problems adding the lock to the owners list, but
+ # if we did we'll try to release this lock and re-raise exception.
+ # Of course something is going to be really wrong, after this.
+ if lock._is_owned():
+ lock.release()
+ raise
except:
# If something went wrong and we had the set-lock let's release it...
(defaults to all the locks acquired at that level).
"""
-
assert self._is_owned(), "release() on lock set while not owner"
# Support passing in a single resource to release rather than many
# After this 'add' can work again
if self.__lock._is_owned():
self.__lock.release()
+ self._del_owned()
for lockname in names:
# If we are sure the lock doesn't leave __lockdict without being
# exclusively held we can do this...
self.__lockdict[lockname].release()
- self._del_owned(lockname)
+ self._del_owned(name=lockname)
def add(self, names, acquired=0, shared=0):
"""Add a new set of elements to the set
lock.acquire(shared=shared)
# now the lock cannot be deleted, we have it!
try:
- self._add_owned(lockname)
+ self._add_owned(name=lockname)
except:
# We shouldn't have problems adding the lock to the owners list,
# but if we did we'll try to release this lock and re-raise
del self.__lockdict[lname]
# And let's remove it from our private list if we owned it.
if self._is_owned():
- self._del_owned(lname)
+ self._del_owned(name=lname)
return removed
# - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
# If you need more than one node, or more than one instance, acquire them at
# the same time.
-# - level LEVEL_CONFIG contains the configuration lock, which you must acquire
-# before reading or changing the config file.
LEVEL_CLUSTER = 0
-LEVEL_NODE = 1
-LEVEL_INSTANCE = 2
-LEVEL_CONFIG = 3
+LEVEL_INSTANCE = 1
+LEVEL_NODE = 2
LEVELS = [LEVEL_CLUSTER,
- LEVEL_NODE,
LEVEL_INSTANCE,
- LEVEL_CONFIG]
+ LEVEL_NODE]
# Lock levels which are modifiable
LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
-# Constant for the big ganeti lock and config lock
+# Constant for the big ganeti lock
BGL = 'BGL'
-CONFIG = 'config'
class GanetiLockManager:
def __init__(self, nodes=None, instances=None):
"""Constructs a new GanetiLockManager object.
- There should be only a
- GanetiLockManager object at any time, so this function raises an error if this
- is not the case.
+ There should be only a GanetiLockManager object at any time, so this
+ function raises an error if this is not the case.
Args:
nodes: list of node names
LEVEL_CLUSTER: LockSet([BGL]),
LEVEL_NODE: LockSet(nodes),
LEVEL_INSTANCE: LockSet(instances),
- LEVEL_CONFIG: LockSet([CONFIG]),
}
def _names(self, level):
(special lock names, or instance/node names)
shared: whether to acquire in shared mode. By default an exclusive lock
will be acquired.
- blocking: whether to block while trying to acquire or to operate in try-lock mode.
- this locking mode is not supported yet.
+ blocking: whether to block while trying to acquire or to operate in
+ try-lock mode. this locking mode is not supported yet.
"""
assert level in LEVELS, "Invalid locking level %s" % level
"You must own the Big Ganeti Lock before acquiring any other")
# Check we don't own locks at the same or upper levels.
- assert not self._upper_owned(level), ("Cannot acquire locks at a level"
+ assert not self._upper_owned(level), ("Cannot acquire locks at a level"
" while owning some at a greater one")
# Acquire the locks in the set.
assert self._is_owned(level) or not self._upper_owned(level), (
"Cannot remove locks at a level while not owning it or"
" owning some at a greater one")
- return self.__keyring[level].remove(names, blocking)
-
+ return self.__keyring[level].remove(names, blocking=blocking)