# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
-# Disable "Invalid name ..." message
-# pylint: disable-msg=C0103
-
"""Module implementing the Ganeti locking code."""
+# pylint: disable-msg=W0212
+
+# W0212 since e.g. LockSet methods use (a lot) the internals of
+# SharedLock
+
import os
import select
import threading
return wrap
+class RunningTimeout(object):
+ """Class to calculate remaining timeout when doing several operations.
+
+ """
+ __slots__ = [
+ "_allow_negative",
+ "_start_time",
+ "_time_fn",
+ "_timeout",
+ ]
+
+ def __init__(self, timeout, allow_negative, _time_fn=time.time):
+ """Initializes this class.
+
+ @type timeout: float
+ @param timeout: Timeout duration
+ @type allow_negative: bool
+ @param allow_negative: Whether to return values below zero
+ @param _time_fn: Time function for unittests
+
+ """
+ object.__init__(self)
+
+ if timeout is not None and timeout < 0.0:
+ raise ValueError("Timeout must not be negative")
+
+ self._timeout = timeout
+ self._allow_negative = allow_negative
+ self._time_fn = _time_fn
+
+ self._start_time = None
+
+ def Remaining(self):
+ """Returns the remaining timeout.
+
+ """
+ if self._timeout is None:
+ return None
+
+ # Get start time on first calculation
+ if self._start_time is None:
+ self._start_time = self._time_fn()
+
+ # Calculate remaining time
+ remaining_timeout = self._start_time + self._timeout - self._time_fn()
+
+ if not self._allow_negative:
+ # Ensure timeout is always >= 0
+ return max(0.0, remaining_timeout)
+
+ return remaining_timeout
+
+
class _SingleNotifyPipeConditionWaiter(object):
"""Helper class for SingleNotifyPipeCondition
@param timeout: Timeout for waiting (can be None)
"""
- start_time = time.time()
- remaining_time = timeout
+ running_timeout = RunningTimeout(timeout, True)
+
+ while True:
+ remaining_time = running_timeout.Remaining()
+
+ if remaining_time is not None:
+ if remaining_time < 0.0:
+ break
+
+ # Our calculation uses seconds, poll() wants milliseconds
+ remaining_time *= 1000
- while timeout is None or remaining_time > 0:
try:
result = self._poller.poll(remaining_time)
except EnvironmentError, err:
if result and result[0][0] == self._fd:
break
- # Re-calculate timeout if necessary
- if timeout is not None:
- remaining_time = start_time + timeout - time.time()
-
class _BaseCondition(object):
"""Base class containing common code for conditions.
"""
- __slots__ = _BaseCondition.__slots__ + [
+ __slots__ = [
"_poller",
"_read_fd",
"_write_fd",
if self._nwaiters == 0:
self._Cleanup()
- def notifyAll(self):
+ def notifyAll(self): # pylint: disable-msg=C0103
"""Close the writing side of the pipe to notify all waiters.
"""
there are any waiting threads.
"""
- __slots__ = _BaseCondition.__slots__ + [
+ __slots__ = [
"_nwaiters",
"_single_condition",
]
assert self._nwaiters > 0
self._nwaiters -= 1
- def notifyAll(self):
+ def notifyAll(self): # pylint: disable-msg=C0103
"""Notify all currently waiting threads.
"""
self._cond = threading.Condition(lock=lock)
self._nwaiters = 0
- def notifyAll(self):
+ def notifyAll(self): # pylint: disable-msg=C0103
"""Notifies the condition.
"""
def acquire(self, shared=0, timeout=None, test_notify=None):
"""Acquire a shared lock.
- @type shared: int
+ @type shared: integer (0/1) used as a boolean
@param shared: whether to acquire in shared mode; by default an
exclusive lock will be acquired
@type timeout: float
def __init__(self, members=None):
"""Constructs a new LockSet.
+ @type members: list of strings
@param members: initial members of the set
"""
def _del_owned(self, name=None):
"""Note the current thread owns the given lock"""
+ assert not (name is None and self.__lock._is_owned()), \
+ "Cannot hold internal lock when deleting owner status"
+
if name is not None:
self.__owners[threading.currentThread()].remove(name)
def _release_and_delete_owned(self):
"""Release and delete all resources owned by the current thread"""
for lname in self._list_owned():
- self.__lockdict[lname].release()
+ lock = self.__lockdict[lname]
+ if lock._is_owned():
+ lock.release()
self._del_owned(name=lname)
def __names(self):
def acquire(self, names, timeout=None, shared=0, test_notify=None):
"""Acquire a set of resource locks.
+ @type names: list of strings (or string)
@param names: the names of the locks which shall be acquired
(special lock names, or instance/node names)
+ @type shared: integer (0/1) used as a boolean
@param shared: whether to acquire in shared mode; by default an
exclusive lock will be acquired
@type timeout: float or None
# We need to keep track of how long we spent waiting for a lock. The
# timeout passed to this function is over all lock acquires.
- remaining_timeout = timeout
- if timeout is None:
- start = None
- calc_remaining_timeout = lambda: None
- else:
- start = time.time()
- calc_remaining_timeout = lambda: (start + timeout) - time.time()
-
- want_all = names is None
-
- if want_all:
- # If no names are given acquire the whole set by not letting new names
- # being added before we release, and getting the current list of names.
- # Some of them may then be deleted later, but we'll cope with this.
- #
- # We'd like to acquire this lock in a shared way, as it's nice if
- # everybody else can use the instances at the same time. If are acquiring
- # them exclusively though they won't be able to do this anyway, though,
- # so we'll get the list lock exclusively as well in order to be able to
- # do add() on the set while owning it.
- self.__lock.acquire(shared=shared, timeout=remaining_timeout)
- try:
- # note we own the set-lock
- self._add_owned()
- names = self.__names()
- except:
- # We shouldn't have problems adding the lock to the owners list, but
- # if we did we'll try to release this lock and re-raise exception.
- # Of course something is going to be really wrong, after this.
- self.__lock.release()
- raise
-
- # Re-calculate timeout
- remaining_timeout = calc_remaining_timeout()
+ running_timeout = RunningTimeout(timeout, False)
try:
- try:
+ if names is not None:
# Support passing in a single resource to acquire rather than many
if isinstance(names, basestring):
names = [names]
- else:
- names = sorted(names)
- acquire_list = []
- # First we look the locks up on __lockdict. We have no way of being sure
- # they will still be there after, but this makes it a lot faster should
- # just one of them be the already wrong
- for lname in utils.UniqueSequence(names):
- try:
- lock = self.__lockdict[lname] # raises KeyError if lock is not there
- acquire_list.append((lname, lock))
- except KeyError:
- if want_all:
- # We are acquiring all the set, it doesn't matter if this
- # particular element is not there anymore.
- continue
- else:
- raise errors.LockError("Non-existing lock in set (%s)" % lname)
-
- # This will hold the locknames we effectively acquired.
- acquired = set()
-
- # Now acquire_list contains a sorted list of resources and locks we
- # want. In order to get them we loop on this (private) list and
- # acquire() them. We gave no real guarantee they will still exist till
- # this is done but .acquire() itself is safe and will alert us if the
- # lock gets deleted.
- for (lname, lock) in acquire_list:
- if __debug__ and callable(test_notify):
- test_notify_fn = lambda: test_notify(lname)
- else:
- test_notify_fn = None
+ return self.__acquire_inner(names, False, shared,
+ running_timeout.Remaining, test_notify)
- try:
- if timeout is not None and remaining_timeout < 0:
- raise _AcquireTimeout()
-
- # raises LockError if the lock was deleted
- if not lock.acquire(shared=shared, timeout=remaining_timeout,
- test_notify=test_notify_fn):
- # Couldn't get lock or timeout occurred
- if timeout is None:
- # This shouldn't happen as SharedLock.acquire(timeout=None) is
- # blocking.
- raise errors.LockError("Failed to get lock %s" % lname)
-
- raise _AcquireTimeout()
-
- # Re-calculate timeout
- remaining_timeout = calc_remaining_timeout()
-
- # now the lock cannot be deleted, we have it!
- self._add_owned(name=lname)
- acquired.add(lname)
-
- except _AcquireTimeout:
- # Release all acquired locks
- self._release_and_delete_owned()
- raise
+ else:
+ # If no names are given acquire the whole set by not letting new names
+ # being added before we release, and getting the current list of names.
+ # Some of them may then be deleted later, but we'll cope with this.
+ #
+ # We'd like to acquire this lock in a shared way, as it's nice if
+ # everybody else can use the instances at the same time. If are
+ # acquiring them exclusively though they won't be able to do this
+ # anyway, though, so we'll get the list lock exclusively as well in
+ # order to be able to do add() on the set while owning it.
+ if not self.__lock.acquire(shared=shared,
+ timeout=running_timeout.Remaining()):
+ raise _AcquireTimeout()
+ try:
+ # note we own the set-lock
+ self._add_owned()
+
+ return self.__acquire_inner(self.__names(), True, shared,
+ running_timeout.Remaining, test_notify)
+ except:
+ # We shouldn't have problems adding the lock to the owners list, but
+ # if we did we'll try to release this lock and re-raise exception.
+ # Of course something is going to be really wrong, after this.
+ self.__lock.release()
+ self._del_owned()
+ raise
- except errors.LockError:
- if want_all:
- # We are acquiring all the set, it doesn't matter if this
- # particular element is not there anymore.
- continue
+ except _AcquireTimeout:
+ return None
- self._release_and_delete_owned()
+ def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify):
+ """Inner logic for acquiring a number of locks.
- raise errors.LockError("Non-existing lock in set (%s)" % lname)
+ @param names: Names of the locks to be acquired
+ @param want_all: Whether all locks in the set should be acquired
+ @param shared: Whether to acquire in shared mode
+ @param timeout_fn: Function returning remaining timeout
+ @param test_notify: Special callback function for unittesting
- except:
- # We shouldn't have problems adding the lock to the owners list, but
- # if we did we'll try to release this lock and re-raise exception.
- # Of course something is going to be really wrong, after this.
- if lock._is_owned():
- lock.release()
- raise
+ """
+ acquire_list = []
- except:
- # If something went wrong and we had the set-lock let's release it...
+ # First we look the locks up on __lockdict. We have no way of being sure
+ # they will still be there after, but this makes it a lot faster should
+ # just one of them be the already wrong. Using a sorted sequence to prevent
+ # deadlocks.
+ for lname in sorted(utils.UniqueSequence(names)):
+ try:
+ lock = self.__lockdict[lname] # raises KeyError if lock is not there
+ except KeyError:
if want_all:
- self.__lock.release()
- raise
+ # We are acquiring all the set, it doesn't matter if this particular
+ # element is not there anymore.
+ continue
- except _AcquireTimeout:
- if want_all:
- self._del_owned()
+ raise errors.LockError("Non-existing lock in set (%s)" % lname)
- return None
+ acquire_list.append((lname, lock))
+
+ # This will hold the locknames we effectively acquired.
+ acquired = set()
+
+ try:
+ # Now acquire_list contains a sorted list of resources and locks we
+ # want. In order to get them we loop on this (private) list and
+ # acquire() them. We gave no real guarantee they will still exist till
+ # this is done but .acquire() itself is safe and will alert us if the
+ # lock gets deleted.
+ for (lname, lock) in acquire_list:
+ if __debug__ and callable(test_notify):
+ test_notify_fn = lambda: test_notify(lname)
+ else:
+ test_notify_fn = None
+
+ timeout = timeout_fn()
+
+ try:
+ # raises LockError if the lock was deleted
+ acq_success = lock.acquire(shared=shared, timeout=timeout,
+ test_notify=test_notify_fn)
+ except errors.LockError:
+ if want_all:
+ # We are acquiring all the set, it doesn't matter if this
+ # particular element is not there anymore.
+ continue
+
+ raise errors.LockError("Non-existing lock in set (%s)" % lname)
+
+ if not acq_success:
+ # Couldn't get lock or timeout occurred
+ if timeout is None:
+ # This shouldn't happen as SharedLock.acquire(timeout=None) is
+ # blocking.
+ raise errors.LockError("Failed to get lock %s" % lname)
+
+ raise _AcquireTimeout()
+
+ try:
+ # now the lock cannot be deleted, we have it!
+ self._add_owned(name=lname)
+ acquired.add(lname)
+
+ except:
+ # We shouldn't have problems adding the lock to the owners list, but
+ # if we did we'll try to release this lock and re-raise exception.
+ # Of course something is going to be really wrong after this.
+ if lock._is_owned():
+ lock.release()
+ raise
+
+ except:
+ # Release all owned locks
+ self._release_and_delete_owned()
+ raise
return acquired
You must have acquired the locks, either in shared or in exclusive mode,
before releasing them.
+ @type names: list of strings, or None
@param names: the names of the locks which shall be released
(defaults to all the locks acquired at that level).
def add(self, names, acquired=0, shared=0):
"""Add a new set of elements to the set
+ @type names: list of strings
@param names: names of the new elements to add
+ @type acquired: integer (0/1) used as a boolean
@param acquired: pre-acquire the new resource?
+ @type shared: integer (0/1) used as a boolean
@param shared: is the pre-acquisition shared?
"""
You can either not hold anything in the lockset or already hold a superset
of the elements you want to delete, exclusively.
+ @type names: list of strings
@param names: names of the resource to remove.
@return: a list of locks which we removed; the list is always
# the test cases.
return utils.any((self._is_owned(l) for l in LEVELS[level + 1:]))
- def _BGL_owned(self):
+ def _BGL_owned(self): # pylint: disable-msg=C0103
"""Check if the current thread owns the BGL.
Both an exclusive or a shared acquisition work.
"""
return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
- def _contains_BGL(self, level, names):
+ @staticmethod
+ def _contains_BGL(level, names): # pylint: disable-msg=C0103
"""Check if the level contains the BGL.
Check if acting on the given level and set of names will change
def acquire(self, level, names, timeout=None, shared=0):
"""Acquire a set of resource locks, at the same level.
- @param level: the level at which the locks shall be acquired;
- it must be a member of LEVELS.
+ @type level: member of locking.LEVELS
+ @param level: the level at which the locks shall be acquired
+ @type names: list of strings (or string)
@param names: the names of the locks which shall be acquired
(special lock names, or instance/node names)
+ @type shared: integer (0/1) used as a boolean
@param shared: whether to acquire in shared mode; by default
an exclusive lock will be acquired
@type timeout: float
You must have acquired the locks, either in shared or in exclusive
mode, before releasing them.
- @param level: the level at which the locks shall be released;
- it must be a member of LEVELS
+ @type level: member of locking.LEVELS
+ @param level: the level at which the locks shall be released
+ @type names: list of strings, or None
@param names: the names of the locks which shall be released
(defaults to all the locks acquired at that level)
assert (not self._contains_BGL(level, names) or
not self._upper_owned(LEVEL_CLUSTER)), (
"Cannot release the Big Ganeti Lock while holding something"
- " at upper levels")
+ " at upper levels (%r)" %
+ (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
+ for i in self.__keyring.keys()]), ))
# Release will complain if we don't own the locks already
return self.__keyring[level].release(names)
def add(self, level, names, acquired=0, shared=0):
"""Add locks at the specified level.
- @param level: the level at which the locks shall be added;
- it must be a member of LEVELS_MOD.
+ @type level: member of locking.LEVELS_MOD
+ @param level: the level at which the locks shall be added
+ @type names: list of strings
@param names: names of the locks to acquire
+ @type acquired: integer (0/1) used as a boolean
@param acquired: whether to acquire the newly added locks
+ @type shared: integer (0/1) used as a boolean
@param shared: whether the acquisition will be shared
"""
You must either already own the locks you are trying to remove
exclusively or not own any lock at an upper level.
- @param level: the level at which the locks shall be removed;
- it must be a member of LEVELS_MOD
+ @type level: member of locking.LEVELS_MOD
+ @param level: the level at which the locks shall be removed
+ @type names: list of strings
@param names: the names of the locks which shall be removed
(special lock names, or instance/node names)