# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
+# Disable "Invalid name ..." message
+# pylint: disable-msg=C0103
+
"""Module implementing the Ganeti locking code."""
import os
return wrap
-class _SingleActionPipeConditionWaiter(object):
- """Callable helper class for _SingleActionPipeCondition.
+class _SingleNotifyPipeConditionWaiter(object):
+ """Helper class for SingleNotifyPipeCondition
"""
__slots__ = [
- "_cond",
"_fd",
"_poller",
]
- def __init__(self, cond, poller, fd):
- """Initializes this class.
+ def __init__(self, poller, fd):
+ """Constructor for _SingleNotifyPipeConditionWaiter
- @type cond: L{_SingleActionPipeCondition}
- @param cond: Parent condition
@type poller: select.poll
@param poller: Poller object
@type fd: int
"""
object.__init__(self)
-
- self._cond = cond
self._poller = poller
self._fd = fd
remaining_time = start_time + timeout - time.time()
-class _SingleActionPipeCondition(object):
- """Wrapper around a pipe for usage inside conditions.
-
- This class contains a POSIX pipe(2) and a poller to poll it. The pipe is
- always allocated when constructing the class. Extra care is taken to always
- close the file descriptors.
+class _BaseCondition(object):
+ """Base class containing common code for conditions.
- An additional class, L{_SingleActionPipeConditionWaiter}, is used to wait for
- notifications.
-
- Warning: This class is designed to be used as the underlying component of a
- locking condition, but is not by itself thread safe, and needs to be
- protected by an external lock.
+ Some of this code is taken from python's threading module.
"""
__slots__ = [
- "_poller",
- "_read_fd",
- "_write_fd",
- "_nwaiters",
+ "_lock",
+ "acquire",
+ "release",
]
- _waiter_class = _SingleActionPipeConditionWaiter
+ def __init__(self, lock):
+ """Constructor for _BaseCondition.
- def __init__(self):
- """Initializes this class.
+ @type lock: threading.Lock
+ @param lock: condition base lock
"""
object.__init__(self)
- self._nwaiters = 0
-
- # Just assume the unpacking is successful, otherwise error handling gets
- # very complicated.
- (self._read_fd, self._write_fd) = os.pipe()
- try:
- # The poller looks for closure of the write side
- poller = select.poll()
- poller.register(self._read_fd, select.POLLHUP)
-
- self._poller = poller
- except:
- if self._read_fd is not None:
- os.close(self._read_fd)
- if self._write_fd is not None:
- os.close(self._write_fd)
- raise
+ # Recursive locks are not supported
+ assert not hasattr(lock, "_acquire_restore")
+ assert not hasattr(lock, "_release_save")
- # There should be no code here anymore, otherwise the pipe file descriptors
- # may be not be cleaned up properly in case of errors.
+ self._lock = lock
- def StartWaiting(self):
- """Return function to wait for notification.
+ # Export the lock's acquire() and release() methods
+ self.acquire = lock.acquire
+ self.release = lock.release
- @rtype: L{_SingleActionPipeConditionWaiter}
- @return: Function to wait for notification
+ def _is_owned(self):
+ """Check whether lock is owned by current thread.
"""
- assert self._nwaiters >= 0
+ if self._lock.acquire(0):
+ self._lock.release()
+ return False
- if self._poller is None:
- raise RuntimeError("Already cleaned up")
+ return True
- # Create waiter function and increase number of waiters
- wait_fn = self._waiter_class(self, self._poller, self._read_fd)
- self._nwaiters += 1
- return wait_fn
+ def _check_owned(self):
+ """Raise an exception if the current thread doesn't own the lock.
- def DoneWaiting(self):
- """Decrement number of waiters and automatic cleanup.
+ """
+ if not self._is_owned():
+ raise RuntimeError("cannot work with un-aquired lock")
- Must be called after waiting for a notification.
- @rtype: bool
- @return: Whether this was the last waiter
+class SingleNotifyPipeCondition(_BaseCondition):
+ """Condition which can only be notified once.
- """
- assert self._nwaiters > 0
+ This condition class uses pipes and poll, internally, to be able to wait for
+ notification with a timeout, without resorting to polling. It is almost
+ compatible with Python's threading.Condition, with the following differences:
+ - notifyAll can only be called once, and no wait can happen after that
+ - notify is not supported, only notifyAll
- self._nwaiters -= 1
+ """
- if self._nwaiters == 0:
- self._Cleanup()
- return True
+ __slots__ = _BaseCondition.__slots__ + [
+ "_poller",
+ "_read_fd",
+ "_write_fd",
+ "_nwaiters",
+ "_notified",
+ ]
- return False
+ _waiter_class = _SingleNotifyPipeConditionWaiter
- def notifyAll(self):
- """Close the writing side of the pipe to notify all waiters.
+ def __init__(self, lock):
+ """Constructor for SingleNotifyPipeCondition
"""
- if self._write_fd is None:
- raise RuntimeError("Can only notify once")
-
- os.close(self._write_fd)
+ _BaseCondition.__init__(self, lock)
+ self._nwaiters = 0
+ self._notified = False
+ self._read_fd = None
self._write_fd = None
+ self._poller = None
+
+ def _check_unnotified(self):
+ """Throws an exception if already notified.
+
+ """
+ if self._notified:
+ raise RuntimeError("cannot use already notified condition")
def _Cleanup(self):
- """Close all file descriptors.
+ """Cleanup open file descriptors, if any.
"""
if self._read_fd is not None:
if self._write_fd is not None:
os.close(self._write_fd)
self._write_fd = None
-
self._poller = None
- def __del__(self):
- """Called on object deletion.
+ def wait(self, timeout=None):
+ """Wait for a notification.
+
+ @type timeout: float or None
+ @param timeout: Waiting timeout (can be None)
+
+ """
+ self._check_owned()
+ self._check_unnotified()
- Ensure no file descriptors are left open.
+ self._nwaiters += 1
+ try:
+ if self._poller is None:
+ (self._read_fd, self._write_fd) = os.pipe()
+ self._poller = select.poll()
+ self._poller.register(self._read_fd, select.POLLHUP)
+
+ wait_fn = self._waiter_class(self._poller, self._read_fd)
+ self.release()
+ try:
+ # Wait for notification
+ wait_fn(timeout)
+ finally:
+ # Re-acquire lock
+ self.acquire()
+ finally:
+ self._nwaiters -= 1
+ if self._nwaiters == 0:
+ self._Cleanup()
+
+ def notifyAll(self):
+ """Close the writing side of the pipe to notify all waiters.
"""
- self._Cleanup()
+ self._check_owned()
+ self._check_unnotified()
+ self._notified = True
+ if self._write_fd is not None:
+ os.close(self._write_fd)
+ self._write_fd = None
-class _PipeCondition(object):
+class PipeCondition(_BaseCondition):
"""Group-only non-polling condition with counters.
This condition class uses pipes and poll, internally, to be able to wait for
there are any waiting threads.
"""
- __slots__ = [
- "_lock",
+ __slots__ = _BaseCondition.__slots__ + [
"_nwaiters",
- "_pipe",
- "acquire",
- "release",
+ "_single_condition",
]
- _pipe_class = _SingleActionPipeCondition
+ _single_condition_class = SingleNotifyPipeCondition
def __init__(self, lock):
"""Initializes this class.
"""
- object.__init__(self)
-
- # Recursive locks are not supported
- assert not hasattr(lock, "_acquire_restore")
- assert not hasattr(lock, "_release_save")
-
- self._lock = lock
-
- # Export the lock's acquire() and release() methods
- self.acquire = lock.acquire
- self.release = lock.release
-
+ _BaseCondition.__init__(self, lock)
self._nwaiters = 0
- self._pipe = None
-
- def _is_owned(self):
- """Check whether lock is owned by current thread.
-
- """
- if self._lock.acquire(0):
- self._lock.release()
- return False
-
- return True
-
- def _check_owned(self):
- """Raise an exception if the current thread doesn't own the lock.
-
- """
- if not self._is_owned():
- raise RuntimeError("cannot work with un-aquired lock")
+ self._single_condition = self._single_condition_class(self._lock)
def wait(self, timeout=None):
"""Wait for a notification.
"""
self._check_owned()
- if not self._pipe:
- self._pipe = self._pipe_class()
-
# Keep local reference to the pipe. It could be replaced by another thread
# notifying while we're waiting.
- pipe = self._pipe
+ my_condition = self._single_condition
assert self._nwaiters >= 0
self._nwaiters += 1
try:
- # Get function to wait on the pipe
- wait_fn = pipe.StartWaiting()
- try:
- # Release lock while waiting
- self.release()
- try:
- # Wait for notification
- wait_fn(timeout)
- finally:
- # Re-acquire lock
- self.acquire()
- finally:
- # Destroy pipe if this was the last waiter and the current pipe is
- # still the same. The same pipe cannot be reused after cleanup.
- if pipe.DoneWaiting() and pipe == self._pipe:
- self._pipe = None
+ my_condition.wait(timeout)
finally:
assert self._nwaiters > 0
self._nwaiters -= 1
"""
self._check_owned()
-
- # Notify and forget pipe. A new one will be created on the next call to
- # wait.
- if self._pipe is not None:
- self._pipe.notifyAll()
- self._pipe = None
+ self._single_condition.notifyAll()
+ self._single_condition = self._single_condition_class(self._lock)
def has_waiting(self):
"""Returns whether there are active waiters.
"""Waits for the condition to be notified.
@type timeout: float or None
- @param timeout: Timeout in seconds
+ @param timeout: Waiting timeout (can be None)
"""
assert self._nwaiters >= 0
"__shr",
]
- __condition_class = _CountingCondition
+ __condition_class = PipeCondition
def __init__(self):
"""Construct a new SharedLock.
"""
return self.__pending[0] == cond
- def __acquire_unlocked(self, shared=0, timeout=None):
+ def __acquire_unlocked(self, shared, timeout):
"""Acquire a shared lock.
@param shared: whether to acquire in shared mode; by default an
return False
- def acquire(self, shared=0, timeout=None):
+ def acquire(self, shared=0, timeout=None, test_notify=None):
"""Acquire a shared lock.
@type shared: int
exclusive lock will be acquired
@type timeout: float
@param timeout: maximum waiting time before giving up
+ @type test_notify: callable or None
+ @param test_notify: Special callback function for unittesting
"""
self.__lock.acquire()
try:
+ # We already got the lock, notify now
+ if __debug__ and callable(test_notify):
+ test_notify()
+
return self.__acquire_unlocked(shared, timeout)
finally:
self.__lock.release()
acquired = self.__is_exclusive()
if not acquired:
- acquired = self.__acquire_unlocked(timeout)
+ acquired = self.__acquire_unlocked(0, timeout)
+
+ assert self.__is_exclusive() and not self.__is_sharer(), \
+ "Lock wasn't acquired in exclusive mode"
if acquired:
self.__deleted = True
ALL_SET = None
+class _AcquireTimeout(Exception):
+ """Internal exception to abort an acquire on a timeout.
+
+ """
+
+
class LockSet:
"""Implements a set of locks.
else:
return set()
+ def _release_and_delete_owned(self):
+ """Release and delete all resources owned by the current thread"""
+ for lname in self._list_owned():
+ self.__lockdict[lname].release()
+ self._del_owned(name=lname)
+
def __names(self):
"""Return the current set of names.
self.__lock.release()
return set(result)
- def acquire(self, names, blocking=1, shared=0):
+ def acquire(self, names, timeout=None, shared=0, test_notify=None):
"""Acquire a set of resource locks.
@param names: the names of the locks which shall be acquired
(special lock names, or instance/node names)
@param shared: whether to acquire in shared mode; by default an
exclusive lock will be acquired
- @param blocking: whether to block while trying to acquire or to
- operate in try-lock mode (this locking mode is not supported yet)
+ @type timeout: float or None
+ @param timeout: Maximum time to acquire all locks
+ @type test_notify: callable or None
+ @param test_notify: Special callback function for unittesting
- @return: True when all the locks are successfully acquired
+ @return: Set of all locks successfully acquired or None in case of timeout
@raise errors.LockError: when any lock we try to acquire has
been deleted before we succeed. In this case none of the
locks requested will be acquired.
"""
- if not blocking:
- # We don't have non-blocking mode for now
- raise NotImplementedError
+ assert timeout is None or timeout >= 0.0
# Check we don't already own locks at this level
assert not self._is_owned(), "Cannot acquire locks in the same set twice"
- if names is None:
+ # We need to keep track of how long we spent waiting for a lock. The
+ # timeout passed to this function is over all lock acquires.
+ remaining_timeout = timeout
+ if timeout is None:
+ start = None
+ calc_remaining_timeout = lambda: None
+ else:
+ start = time.time()
+ calc_remaining_timeout = lambda: (start + timeout) - time.time()
+
+ want_all = names is None
+
+ if want_all:
# If no names are given acquire the whole set by not letting new names
# being added before we release, and getting the current list of names.
# Some of them may then be deleted later, but we'll cope with this.
# them exclusively though they won't be able to do this anyway, though,
# so we'll get the list lock exclusively as well in order to be able to
# do add() on the set while owning it.
- self.__lock.acquire(shared=shared)
+ self.__lock.acquire(shared=shared, timeout=remaining_timeout)
try:
# note we own the set-lock
self._add_owned()
self.__lock.release()
raise
+ # Re-calculate timeout
+ remaining_timeout = calc_remaining_timeout()
+
try:
- # Support passing in a single resource to acquire rather than many
- if isinstance(names, basestring):
- names = [names]
- else:
- names = sorted(names)
-
- acquire_list = []
- # First we look the locks up on __lockdict. We have no way of being sure
- # they will still be there after, but this makes it a lot faster should
- # just one of them be the already wrong
- for lname in utils.UniqueSequence(names):
- try:
- lock = self.__lockdict[lname] # raises KeyError if lock is not there
- acquire_list.append((lname, lock))
- except (KeyError):
- if self.__lock._is_owned():
- # We are acquiring all the set, it doesn't matter if this
- # particular element is not there anymore.
- continue
- else:
- raise errors.LockError('non-existing lock in set (%s)' % lname)
-
- # This will hold the locknames we effectively acquired.
- acquired = set()
- # Now acquire_list contains a sorted list of resources and locks we want.
- # In order to get them we loop on this (private) list and acquire() them.
- # We gave no real guarantee they will still exist till this is done but
- # .acquire() itself is safe and will alert us if the lock gets deleted.
- for (lname, lock) in acquire_list:
- try:
- lock.acquire(shared=shared) # raises LockError if the lock is deleted
- # now the lock cannot be deleted, we have it!
- self._add_owned(name=lname)
- acquired.add(lname)
- except (errors.LockError):
- if self.__lock._is_owned():
- # We are acquiring all the set, it doesn't matter if this
- # particular element is not there anymore.
- continue
+ try:
+ # Support passing in a single resource to acquire rather than many
+ if isinstance(names, basestring):
+ names = [names]
+ else:
+ names = sorted(names)
+
+ acquire_list = []
+ # First we look the locks up on __lockdict. We have no way of being sure
+ # they will still be there after, but this makes it a lot faster should
+ # just one of them be the already wrong
+ for lname in utils.UniqueSequence(names):
+ try:
+ lock = self.__lockdict[lname] # raises KeyError if lock is not there
+ acquire_list.append((lname, lock))
+ except KeyError:
+ if want_all:
+ # We are acquiring all the set, it doesn't matter if this
+ # particular element is not there anymore.
+ continue
+ else:
+ raise errors.LockError("Non-existing lock in set (%s)" % lname)
+
+ # This will hold the locknames we effectively acquired.
+ acquired = set()
+
+ # Now acquire_list contains a sorted list of resources and locks we
+ # want. In order to get them we loop on this (private) list and
+ # acquire() them. We gave no real guarantee they will still exist till
+ # this is done but .acquire() itself is safe and will alert us if the
+ # lock gets deleted.
+ for (lname, lock) in acquire_list:
+ if __debug__ and callable(test_notify):
+ test_notify_fn = lambda: test_notify(lname)
else:
- name_fail = lname
- for lname in self._list_owned():
- self.__lockdict[lname].release()
- self._del_owned(name=lname)
- raise errors.LockError('non-existing lock in set (%s)' % name_fail)
- except:
- # We shouldn't have problems adding the lock to the owners list, but
- # if we did we'll try to release this lock and re-raise exception.
- # Of course something is going to be really wrong, after this.
- if lock._is_owned():
- lock.release()
- raise
+ test_notify_fn = None
- except:
- # If something went wrong and we had the set-lock let's release it...
- if self.__lock._is_owned():
- self.__lock.release()
- raise
+ try:
+ if timeout is not None and remaining_timeout < 0:
+ raise _AcquireTimeout()
+
+ # raises LockError if the lock was deleted
+ if not lock.acquire(shared=shared, timeout=remaining_timeout,
+ test_notify=test_notify_fn):
+ # Couldn't get lock or timeout occurred
+ if timeout is None:
+ # This shouldn't happen as SharedLock.acquire(timeout=None) is
+ # blocking.
+ raise errors.LockError("Failed to get lock %s" % lname)
+
+ raise _AcquireTimeout()
+
+ # Re-calculate timeout
+ remaining_timeout = calc_remaining_timeout()
+
+ # now the lock cannot be deleted, we have it!
+ self._add_owned(name=lname)
+ acquired.add(lname)
+
+ except _AcquireTimeout:
+ # Release all acquired locks
+ self._release_and_delete_owned()
+ raise
+
+ except errors.LockError:
+ if want_all:
+ # We are acquiring all the set, it doesn't matter if this
+ # particular element is not there anymore.
+ continue
+
+ self._release_and_delete_owned()
+
+ raise errors.LockError("Non-existing lock in set (%s)" % lname)
+
+ except:
+ # We shouldn't have problems adding the lock to the owners list, but
+ # if we did we'll try to release this lock and re-raise exception.
+ # Of course something is going to be really wrong, after this.
+ if lock._is_owned():
+ lock.release()
+ raise
+
+ except:
+ # If something went wrong and we had the set-lock let's release it...
+ if want_all:
+ self.__lock.release()
+ raise
+
+ except _AcquireTimeout:
+ if want_all:
+ self._del_owned()
+
+ return None
return acquired
return True
- def remove(self, names, blocking=1):
+ def remove(self, names):
"""Remove elements from the lock set.
You can either not hold anything in the lockset or already hold a superset
of the elements you want to delete, exclusively.
@param names: names of the resource to remove.
- @param blocking: whether to block while trying to acquire or to
- operate in try-lock mode (this locking mode is not supported
- yet unless you are already holding exclusively the locks)
- @return:: a list of locks which we removed; the list is always
+ @return: a list of locks which we removed; the list is always
equal to the names list if we were holding all the locks
exclusively
"""
- if not blocking and not self._is_owned():
- # We don't have non-blocking mode for now
- raise NotImplementedError
-
# Support passing in a single resource to remove rather than many
if isinstance(names, basestring):
names = [names]
"""
return level == LEVEL_CLUSTER and (names is None or BGL in names)
- def acquire(self, level, names, blocking=1, shared=0):
+ def acquire(self, level, names, timeout=None, shared=0):
"""Acquire a set of resource locks, at the same level.
@param level: the level at which the locks shall be acquired;
(special lock names, or instance/node names)
@param shared: whether to acquire in shared mode; by default
an exclusive lock will be acquired
- @param blocking: whether to block while trying to acquire or to
- operate in try-lock mode (this locking mode is not supported yet)
+ @type timeout: float
+ @param timeout: Maximum time to acquire all locks
"""
assert level in LEVELS, "Invalid locking level %s" % level
" while owning some at a greater one")
# Acquire the locks in the set.
- return self.__keyring[level].acquire(names, shared=shared,
- blocking=blocking)
+ return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
def release(self, level, names=None):
"""Release a set of resource locks, at the same level.
" while owning some at a greater one")
return self.__keyring[level].add(names, acquired=acquired, shared=shared)
- def remove(self, level, names, blocking=1):
+ def remove(self, level, names):
"""Remove locks from the specified level.
You must either already own the locks you are trying to remove
it must be a member of LEVELS_MOD
@param names: the names of the locks which shall be removed
(special lock names, or instance/node names)
- @param blocking: whether to block while trying to operate in
- try-lock mode (this locking mode is not supported yet)
"""
assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
assert self._is_owned(level) or not self._upper_owned(level), (
"Cannot remove locks at a level while not owning it or"
" owning some at a greater one")
- return self.__keyring[level].remove(names, blocking=blocking)
+ return self.__keyring[level].remove(names)