# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
-"""Module implementing the Ganeti locking code."""
+# Disable "Invalid name ..." message
+# pylint: disable-msg=C0103
-# pylint: disable-msg=W0613,W0201
+"""Module implementing the Ganeti locking code."""
+import os
+import select
import threading
+import time
+import errno
+
+from ganeti import errors
+from ganeti import utils
+
+
+def ssynchronized(lock, shared=0):
+ """Shared Synchronization decorator.
+
+ Calls the function holding the given lock, either in exclusive or shared
+ mode. It requires the passed lock to be a SharedLock (or support its
+ semantics).
+
+ """
+ def wrap(fn):
+ def sync_function(*args, **kwargs):
+ lock.acquire(shared=shared)
+ try:
+ return fn(*args, **kwargs)
+ finally:
+ lock.release()
+ return sync_function
+ return wrap
+
+
+class _SingleNotifyPipeConditionWaiter(object):
+ """Helper class for SingleNotifyPipeCondition
+
+ """
+ __slots__ = [
+ "_fd",
+ "_poller",
+ ]
+
+ def __init__(self, poller, fd):
+ """Constructor for _SingleNotifyPipeConditionWaiter
+
+ @type poller: select.poll
+ @param poller: Poller object
+ @type fd: int
+ @param fd: File descriptor to wait for
+
+ """
+ object.__init__(self)
+ self._poller = poller
+ self._fd = fd
+
+ def __call__(self, timeout):
+ """Wait for something to happen on the pipe.
+
+ @type timeout: float or None
+ @param timeout: Timeout for waiting (can be None)
+
+ """
+ start_time = time.time()
+ remaining_time = timeout
+
+ while timeout is None or remaining_time > 0:
+ try:
+ result = self._poller.poll(remaining_time)
+ except EnvironmentError, err:
+ if err.errno != errno.EINTR:
+ raise
+ result = None
+
+ # Check whether we were notified
+ if result and result[0][0] == self._fd:
+ break
+
+ # Re-calculate timeout if necessary
+ if timeout is not None:
+ remaining_time = start_time + timeout - time.time()
+
+
+class _BaseCondition(object):
+ """Base class containing common code for conditions.
+
+ Some of this code is taken from python's threading module.
+
+ """
+ __slots__ = [
+ "_lock",
+ "acquire",
+ "release",
+ ]
+
+ def __init__(self, lock):
+ """Constructor for _BaseCondition.
+
+ @type lock: threading.Lock
+ @param lock: condition base lock
+
+ """
+ object.__init__(self)
+
+ # Recursive locks are not supported
+ assert not hasattr(lock, "_acquire_restore")
+ assert not hasattr(lock, "_release_save")
+
+ self._lock = lock
+
+ # Export the lock's acquire() and release() methods
+ self.acquire = lock.acquire
+ self.release = lock.release
+
+ def _is_owned(self):
+ """Check whether lock is owned by current thread.
+
+ """
+ if self._lock.acquire(0):
+ self._lock.release()
+ return False
+
+ return True
+
+ def _check_owned(self):
+ """Raise an exception if the current thread doesn't own the lock.
+
+ """
+ if not self._is_owned():
+ raise RuntimeError("cannot work with un-aquired lock")
+
+
+class SingleNotifyPipeCondition(_BaseCondition):
+ """Condition which can only be notified once.
+ This condition class uses pipes and poll, internally, to be able to wait for
+ notification with a timeout, without resorting to polling. It is almost
+ compatible with Python's threading.Condition, with the following differences:
+ - notifyAll can only be called once, and no wait can happen after that
+ - notify is not supported, only notifyAll
-class SharedLock:
+ """
+
+ __slots__ = _BaseCondition.__slots__ + [
+ "_poller",
+ "_read_fd",
+ "_write_fd",
+ "_nwaiters",
+ "_notified",
+ ]
+
+ _waiter_class = _SingleNotifyPipeConditionWaiter
+
+ def __init__(self, lock):
+ """Constructor for SingleNotifyPipeCondition
+
+ """
+ _BaseCondition.__init__(self, lock)
+ self._nwaiters = 0
+ self._notified = False
+ self._read_fd = None
+ self._write_fd = None
+ self._poller = None
+
+ def _check_unnotified(self):
+ """Throws an exception if already notified.
+
+ """
+ if self._notified:
+ raise RuntimeError("cannot use already notified condition")
+
+ def _Cleanup(self):
+ """Cleanup open file descriptors, if any.
+
+ """
+ if self._read_fd is not None:
+ os.close(self._read_fd)
+ self._read_fd = None
+
+ if self._write_fd is not None:
+ os.close(self._write_fd)
+ self._write_fd = None
+ self._poller = None
+
+ def wait(self, timeout=None):
+ """Wait for a notification.
+
+ @type timeout: float or None
+ @param timeout: Waiting timeout (can be None)
+
+ """
+ self._check_owned()
+ self._check_unnotified()
+
+ self._nwaiters += 1
+ try:
+ if self._poller is None:
+ (self._read_fd, self._write_fd) = os.pipe()
+ self._poller = select.poll()
+ self._poller.register(self._read_fd, select.POLLHUP)
+
+ wait_fn = self._waiter_class(self._poller, self._read_fd)
+ self.release()
+ try:
+ # Wait for notification
+ wait_fn(timeout)
+ finally:
+ # Re-acquire lock
+ self.acquire()
+ finally:
+ self._nwaiters -= 1
+ if self._nwaiters == 0:
+ self._Cleanup()
+
+ def notifyAll(self):
+ """Close the writing side of the pipe to notify all waiters.
+
+ """
+ self._check_owned()
+ self._check_unnotified()
+ self._notified = True
+ if self._write_fd is not None:
+ os.close(self._write_fd)
+ self._write_fd = None
+
+
+class PipeCondition(_BaseCondition):
+ """Group-only non-polling condition with counters.
+
+ This condition class uses pipes and poll, internally, to be able to wait for
+ notification with a timeout, without resorting to polling. It is almost
+ compatible with Python's threading.Condition, but only supports notifyAll and
+ non-recursive locks. As an additional features it's able to report whether
+ there are any waiting threads.
+
+ """
+ __slots__ = _BaseCondition.__slots__ + [
+ "_nwaiters",
+ "_single_condition",
+ ]
+
+ _single_condition_class = SingleNotifyPipeCondition
+
+ def __init__(self, lock):
+ """Initializes this class.
+
+ """
+ _BaseCondition.__init__(self, lock)
+ self._nwaiters = 0
+ self._single_condition = self._single_condition_class(self._lock)
+
+ def wait(self, timeout=None):
+ """Wait for a notification.
+
+ @type timeout: float or None
+ @param timeout: Waiting timeout (can be None)
+
+ """
+ self._check_owned()
+
+ # Keep local reference to the pipe. It could be replaced by another thread
+ # notifying while we're waiting.
+ my_condition = self._single_condition
+
+ assert self._nwaiters >= 0
+ self._nwaiters += 1
+ try:
+ my_condition.wait(timeout)
+ finally:
+ assert self._nwaiters > 0
+ self._nwaiters -= 1
+
+ def notifyAll(self):
+ """Notify all currently waiting threads.
+
+ """
+ self._check_owned()
+ self._single_condition.notifyAll()
+ self._single_condition = self._single_condition_class(self._lock)
+
+ def has_waiting(self):
+ """Returns whether there are active waiters.
+
+ """
+ self._check_owned()
+
+ return bool(self._nwaiters)
+
+
+class _CountingCondition(object):
+ """Wrapper for Python's built-in threading.Condition class.
+
+ This wrapper keeps a count of active waiters. We can't access the internal
+ "__waiters" attribute of threading.Condition because it's not thread-safe.
+
+ """
+ __slots__ = [
+ "_cond",
+ "_nwaiters",
+ ]
+
+ def __init__(self, lock):
+ """Initializes this class.
+
+ """
+ object.__init__(self)
+ self._cond = threading.Condition(lock=lock)
+ self._nwaiters = 0
+
+ def notifyAll(self):
+ """Notifies the condition.
+
+ """
+ return self._cond.notifyAll()
+
+ def wait(self, timeout=None):
+ """Waits for the condition to be notified.
+
+ @type timeout: float or None
+ @param timeout: Waiting timeout (can be None)
+
+ """
+ assert self._nwaiters >= 0
+
+ self._nwaiters += 1
+ try:
+ return self._cond.wait(timeout=timeout)
+ finally:
+ self._nwaiters -= 1
+
+ def has_waiting(self):
+ """Returns whether there are active waiters.
+
+ """
+ return bool(self._nwaiters)
+
+
+class SharedLock(object):
"""Implements a shared lock.
Multiple threads can acquire the lock in a shared way, calling
eventually do so.
"""
+ __slots__ = [
+ "__active_shr_c",
+ "__inactive_shr_c",
+ "__deleted",
+ "__exc",
+ "__lock",
+ "__pending",
+ "__shr",
+ ]
+
+ __condition_class = PipeCondition
+
def __init__(self):
- """Construct a new SharedLock"""
- # we have two conditions, c_shr and c_exc, sharing the same lock.
+ """Construct a new SharedLock.
+
+ """
+ object.__init__(self)
+
+ # Internal lock
self.__lock = threading.Lock()
- self.__turn_shr = threading.Condition(self.__lock)
- self.__turn_exc = threading.Condition(self.__lock)
- # current lock holders
+ # Queue containing waiting acquires
+ self.__pending = []
+
+ # Active and inactive conditions for shared locks
+ self.__active_shr_c = self.__condition_class(self.__lock)
+ self.__inactive_shr_c = self.__condition_class(self.__lock)
+
+ # Current lock holders
self.__shr = set()
self.__exc = None
- # lock waiters
- self.__nwait_exc = 0
- self.__nwait_shr = 0
+ # is this lock in the deleted state?
+ self.__deleted = False
+
+ def __check_deleted(self):
+ """Raises an exception if the lock has been deleted.
+
+ """
+ if self.__deleted:
+ raise errors.LockError("Deleted lock")
def __is_sharer(self):
- """Is the current thread sharing the lock at this time?"""
+ """Is the current thread sharing the lock at this time?
+
+ """
return threading.currentThread() in self.__shr
def __is_exclusive(self):
- """Is the current thread holding the lock exclusively at this time?"""
+ """Is the current thread holding the lock exclusively at this time?
+
+ """
return threading.currentThread() == self.__exc
def __is_owned(self, shared=-1):
def _is_owned(self, shared=-1):
"""Is the current thread somehow owning the lock at this time?
- Args:
- shared:
- < 0: check for any type of ownership (default)
- 0: check for exclusive ownership
- > 0: check for shared ownership
+ @param shared:
+ - < 0: check for any type of ownership (default)
+ - 0: check for exclusive ownership
+ - > 0: check for shared ownership
+
+ """
+ self.__lock.acquire()
+ try:
+ return self.__is_owned(shared=shared)
+ finally:
+ self.__lock.release()
+
+ def _count_pending(self):
+ """Returns the number of pending acquires.
+
+ @rtype: int
"""
self.__lock.acquire()
try:
- result = self.__is_owned(shared)
+ return len(self.__pending)
finally:
self.__lock.release()
- return result
+ def __do_acquire(self, shared):
+ """Actually acquire the lock.
+
+ """
+ if shared:
+ self.__shr.add(threading.currentThread())
+ else:
+ self.__exc = threading.currentThread()
+
+ def __can_acquire(self, shared):
+ """Determine whether lock can be acquired.
+
+ """
+ if shared:
+ return self.__exc is None
+ else:
+ return len(self.__shr) == 0 and self.__exc is None
+
+ def __is_on_top(self, cond):
+ """Checks whether the passed condition is on top of the queue.
+
+ The caller must make sure the queue isn't empty.
- def acquire(self, blocking=1, shared=0):
+ """
+ return self.__pending[0] == cond
+
+ def __acquire_unlocked(self, shared, timeout):
"""Acquire a shared lock.
- Args:
- shared: whether to acquire in shared mode. By default an exclusive lock
- will be acquired.
- blocking: whether to block while trying to acquire or to operate in try-lock mode.
- this locking mode is not supported yet.
+ @param shared: whether to acquire in shared mode; by default an
+ exclusive lock will be acquired
+ @param timeout: maximum waiting time before giving up
"""
- if not blocking:
- # We don't have non-blocking mode for now
- raise NotImplementedError
+ self.__check_deleted()
+
+ # We cannot acquire the lock if we already have it
+ assert not self.__is_owned(), "double acquire() on a non-recursive lock"
+
+ # Check whether someone else holds the lock or there are pending acquires.
+ if not self.__pending and self.__can_acquire(shared):
+ # Apparently not, can acquire lock directly.
+ self.__do_acquire(shared)
+ return True
+
+ if shared:
+ wait_condition = self.__active_shr_c
+
+ # Check if we're not yet in the queue
+ if wait_condition not in self.__pending:
+ self.__pending.append(wait_condition)
+ else:
+ wait_condition = self.__condition_class(self.__lock)
+ # Always add to queue
+ self.__pending.append(wait_condition)
- self.__lock.acquire()
try:
- # We cannot acquire the lock if we already have it
- assert not self.__is_owned(), "double acquire() on a non-recursive lock"
-
- if shared:
- self.__nwait_shr += 1
- try:
- # If there is an exclusive holder waiting we have to wait. We'll
- # only do this once, though, when we start waiting for the lock. Then
- # we'll just wait while there are no exclusive holders.
- if self.__nwait_exc > 0:
- # TODO: if !blocking...
- self.__turn_shr.wait()
-
- while self.__exc is not None:
- # TODO: if !blocking...
- self.__turn_shr.wait()
-
- self.__shr.add(threading.currentThread())
- finally:
- self.__nwait_shr -= 1
+ # Wait until we become the topmost acquire in the queue or the timeout
+ # expires.
+ while not (self.__is_on_top(wait_condition) and
+ self.__can_acquire(shared)):
+ # Wait for notification
+ wait_condition.wait(timeout)
+ self.__check_deleted()
- else:
- self.__nwait_exc += 1
- try:
- # This is to save ourselves from a nasty race condition that could
- # theoretically make the sharers starve.
- if self.__nwait_shr > 0 or self.__nwait_exc > 1:
- # TODO: if !blocking...
- self.__turn_exc.wait()
-
- while len(self.__shr) > 0 or self.__exc is not None:
- # TODO: if !blocking...
- self.__turn_exc.wait()
-
- self.__exc = threading.currentThread()
- finally:
- self.__nwait_exc -= 1
+ # A lot of code assumes blocking acquires always succeed. Loop
+ # internally for that case.
+ if timeout is not None:
+ break
+ if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
+ self.__do_acquire(shared)
+ return True
finally:
- self.__lock.release()
+ # Remove condition from queue if there are no more waiters
+ if not wait_condition.has_waiting() and not self.__deleted:
+ self.__pending.remove(wait_condition)
- return True
+ return False
+
+ def acquire(self, shared=0, timeout=None, test_notify=None):
+ """Acquire a shared lock.
+
+ @type shared: int
+ @param shared: whether to acquire in shared mode; by default an
+ exclusive lock will be acquired
+ @type timeout: float
+ @param timeout: maximum waiting time before giving up
+ @type test_notify: callable or None
+ @param test_notify: Special callback function for unittesting
+
+ """
+ self.__lock.acquire()
+ try:
+ # We already got the lock, notify now
+ if __debug__ and callable(test_notify):
+ test_notify()
+
+ return self.__acquire_unlocked(shared, timeout)
+ finally:
+ self.__lock.release()
def release(self):
"""Release a Shared Lock.
"""
self.__lock.acquire()
try:
+ assert self.__is_exclusive() or self.__is_sharer(), \
+ "Cannot release non-owned lock"
+
# Autodetect release type
if self.__is_exclusive():
self.__exc = None
+ else:
+ self.__shr.remove(threading.currentThread())
- # An exclusive holder has just had the lock, time to put it in shared
- # mode if there are shared holders waiting. Otherwise wake up the next
- # exclusive holder.
- if self.__nwait_shr > 0:
- self.__turn_shr.notifyAll()
- elif self.__nwait_exc > 0:
- self.__turn_exc.notify()
+ # Notify topmost condition in queue
+ if self.__pending:
+ first_condition = self.__pending[0]
+ first_condition.notifyAll()
- elif self.__is_sharer():
- self.__shr.remove(threading.currentThread())
+ if first_condition == self.__active_shr_c:
+ self.__active_shr_c = self.__inactive_shr_c
+ self.__inactive_shr_c = first_condition
- # If there are shared holders waiting there *must* be an exclusive holder
- # waiting as well; otherwise what were they waiting for?
- assert (self.__nwait_shr == 0 or self.__nwait_exc > 0,
- "Lock sharers waiting while no exclusive is queueing")
+ finally:
+ self.__lock.release()
+
+ def delete(self, timeout=None):
+ """Delete a Shared Lock.
- # If there are no more shared holders and some exclusive holders are
- # waiting let's wake one up.
- if len(self.__shr) == 0 and self.__nwait_exc > 0:
- self.__turn_exc.notify()
+ This operation will declare the lock for removal. First the lock will be
+ acquired in exclusive mode if you don't already own it, then the lock
+ will be put in a state where any future and pending acquire() fail.
+ @type timeout: float
+ @param timeout: maximum waiting time before giving up
+
+ """
+ self.__lock.acquire()
+ try:
+ assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
+
+ self.__check_deleted()
+
+ # The caller is allowed to hold the lock exclusively already.
+ acquired = self.__is_exclusive()
+
+ if not acquired:
+ acquired = self.__acquire_unlocked(0, timeout)
+
+ assert self.__is_exclusive() and not self.__is_sharer(), \
+ "Lock wasn't acquired in exclusive mode"
+
+ if acquired:
+ self.__deleted = True
+ self.__exc = None
+
+ # Notify all acquires. They'll throw an error.
+ while self.__pending:
+ self.__pending.pop().notifyAll()
+
+ return acquired
+ finally:
+ self.__lock.release()
+
+
+# Whenever we want to acquire a full LockSet we pass None as the value
+# to acquire. Hide this behind this nicely named constant.
+ALL_SET = None
+
+
+class _AcquireTimeout(Exception):
+ """Internal exception to abort an acquire on a timeout.
+
+ """
+
+
+class LockSet:
+ """Implements a set of locks.
+
+ This abstraction implements a set of shared locks for the same resource type,
+ distinguished by name. The user can lock a subset of the resources and the
+ LockSet will take care of acquiring the locks always in the same order, thus
+ preventing deadlock.
+
+ All the locks needed in the same set must be acquired together, though.
+
+ """
+ def __init__(self, members=None):
+ """Constructs a new LockSet.
+
+ @param members: initial members of the set
+
+ """
+ # Used internally to guarantee coherency.
+ self.__lock = SharedLock()
+
+ # The lockdict indexes the relationship name -> lock
+ # The order-of-locking is implied by the alphabetical order of names
+ self.__lockdict = {}
+
+ if members is not None:
+ for name in members:
+ self.__lockdict[name] = SharedLock()
+
+ # The owner dict contains the set of locks each thread owns. For
+ # performance each thread can access its own key without a global lock on
+ # this structure. It is paramount though that *no* other type of access is
+ # done to this structure (eg. no looping over its keys). *_owner helper
+ # function are defined to guarantee access is correct, but in general never
+ # do anything different than __owners[threading.currentThread()], or there
+ # will be trouble.
+ self.__owners = {}
+
+ def _is_owned(self):
+ """Is the current thread a current level owner?"""
+ return threading.currentThread() in self.__owners
+
+ def _add_owned(self, name=None):
+ """Note the current thread owns the given lock"""
+ if name is None:
+ if not self._is_owned():
+ self.__owners[threading.currentThread()] = set()
+ else:
+ if self._is_owned():
+ self.__owners[threading.currentThread()].add(name)
else:
- assert False, "Cannot release non-owned lock"
+ self.__owners[threading.currentThread()] = set([name])
+
+ def _del_owned(self, name=None):
+ """Note the current thread owns the given lock"""
+
+ if name is not None:
+ self.__owners[threading.currentThread()].remove(name)
+
+ # Only remove the key if we don't hold the set-lock as well
+ if (not self.__lock._is_owned() and
+ not self.__owners[threading.currentThread()]):
+ del self.__owners[threading.currentThread()]
+
+ def _list_owned(self):
+ """Get the set of resource names owned by the current thread"""
+ if self._is_owned():
+ return self.__owners[threading.currentThread()].copy()
+ else:
+ return set()
+
+ def _release_and_delete_owned(self):
+ """Release and delete all resources owned by the current thread"""
+ for lname in self._list_owned():
+ self.__lockdict[lname].release()
+ self._del_owned(name=lname)
+
+ def __names(self):
+ """Return the current set of names.
+
+ Only call this function while holding __lock and don't iterate on the
+ result after releasing the lock.
+
+ """
+ return self.__lockdict.keys()
+
+ def _names(self):
+ """Return a copy of the current set of elements.
+ Used only for debugging purposes.
+
+ """
+ # If we don't already own the set-level lock acquired
+ # we'll get it and note we need to release it later.
+ release_lock = False
+ if not self.__lock._is_owned():
+ release_lock = True
+ self.__lock.acquire(shared=1)
+ try:
+ result = self.__names()
finally:
+ if release_lock:
+ self.__lock.release()
+ return set(result)
+
+ def acquire(self, names, timeout=None, shared=0, test_notify=None):
+ """Acquire a set of resource locks.
+
+ @param names: the names of the locks which shall be acquired
+ (special lock names, or instance/node names)
+ @param shared: whether to acquire in shared mode; by default an
+ exclusive lock will be acquired
+ @type timeout: float or None
+ @param timeout: Maximum time to acquire all locks
+ @type test_notify: callable or None
+ @param test_notify: Special callback function for unittesting
+
+ @return: Set of all locks successfully acquired or None in case of timeout
+
+ @raise errors.LockError: when any lock we try to acquire has
+ been deleted before we succeed. In this case none of the
+ locks requested will be acquired.
+
+ """
+ assert timeout is None or timeout >= 0.0
+
+ # Check we don't already own locks at this level
+ assert not self._is_owned(), "Cannot acquire locks in the same set twice"
+
+ # We need to keep track of how long we spent waiting for a lock. The
+ # timeout passed to this function is over all lock acquires.
+ remaining_timeout = timeout
+ if timeout is None:
+ start = None
+ calc_remaining_timeout = lambda: None
+ else:
+ start = time.time()
+ calc_remaining_timeout = lambda: (start + timeout) - time.time()
+
+ want_all = names is None
+
+ if want_all:
+ # If no names are given acquire the whole set by not letting new names
+ # being added before we release, and getting the current list of names.
+ # Some of them may then be deleted later, but we'll cope with this.
+ #
+ # We'd like to acquire this lock in a shared way, as it's nice if
+ # everybody else can use the instances at the same time. If are acquiring
+ # them exclusively though they won't be able to do this anyway, though,
+ # so we'll get the list lock exclusively as well in order to be able to
+ # do add() on the set while owning it.
+ self.__lock.acquire(shared=shared, timeout=remaining_timeout)
+ try:
+ # note we own the set-lock
+ self._add_owned()
+ names = self.__names()
+ except:
+ # We shouldn't have problems adding the lock to the owners list, but
+ # if we did we'll try to release this lock and re-raise exception.
+ # Of course something is going to be really wrong, after this.
+ self.__lock.release()
+ raise
+
+ # Re-calculate timeout
+ remaining_timeout = calc_remaining_timeout()
+
+ try:
+ try:
+ # Support passing in a single resource to acquire rather than many
+ if isinstance(names, basestring):
+ names = [names]
+ else:
+ names = sorted(names)
+
+ acquire_list = []
+ # First we look the locks up on __lockdict. We have no way of being sure
+ # they will still be there after, but this makes it a lot faster should
+ # just one of them be the already wrong
+ for lname in utils.UniqueSequence(names):
+ try:
+ lock = self.__lockdict[lname] # raises KeyError if lock is not there
+ acquire_list.append((lname, lock))
+ except KeyError:
+ if want_all:
+ # We are acquiring all the set, it doesn't matter if this
+ # particular element is not there anymore.
+ continue
+ else:
+ raise errors.LockError("Non-existing lock in set (%s)" % lname)
+
+ # This will hold the locknames we effectively acquired.
+ acquired = set()
+
+ # Now acquire_list contains a sorted list of resources and locks we
+ # want. In order to get them we loop on this (private) list and
+ # acquire() them. We gave no real guarantee they will still exist till
+ # this is done but .acquire() itself is safe and will alert us if the
+ # lock gets deleted.
+ for (lname, lock) in acquire_list:
+ if __debug__ and callable(test_notify):
+ test_notify_fn = lambda: test_notify(lname)
+ else:
+ test_notify_fn = None
+
+ try:
+ if timeout is not None and remaining_timeout < 0:
+ raise _AcquireTimeout()
+
+ # raises LockError if the lock was deleted
+ if not lock.acquire(shared=shared, timeout=remaining_timeout,
+ test_notify=test_notify_fn):
+ # Couldn't get lock or timeout occurred
+ if timeout is None:
+ # This shouldn't happen as SharedLock.acquire(timeout=None) is
+ # blocking.
+ raise errors.LockError("Failed to get lock %s" % lname)
+
+ raise _AcquireTimeout()
+
+ # Re-calculate timeout
+ remaining_timeout = calc_remaining_timeout()
+
+ # now the lock cannot be deleted, we have it!
+ self._add_owned(name=lname)
+ acquired.add(lname)
+
+ except _AcquireTimeout:
+ # Release all acquired locks
+ self._release_and_delete_owned()
+ raise
+
+ except errors.LockError:
+ if want_all:
+ # We are acquiring all the set, it doesn't matter if this
+ # particular element is not there anymore.
+ continue
+
+ self._release_and_delete_owned()
+
+ raise errors.LockError("Non-existing lock in set (%s)" % lname)
+
+ except:
+ # We shouldn't have problems adding the lock to the owners list, but
+ # if we did we'll try to release this lock and re-raise exception.
+ # Of course something is going to be really wrong, after this.
+ if lock._is_owned():
+ lock.release()
+ raise
+
+ except:
+ # If something went wrong and we had the set-lock let's release it...
+ if want_all:
+ self.__lock.release()
+ raise
+
+ except _AcquireTimeout:
+ if want_all:
+ self._del_owned()
+
+ return None
+
+ return acquired
+
+ def release(self, names=None):
+ """Release a set of resource locks, at the same level.
+
+ You must have acquired the locks, either in shared or in exclusive mode,
+ before releasing them.
+
+ @param names: the names of the locks which shall be released
+ (defaults to all the locks acquired at that level).
+
+ """
+ assert self._is_owned(), "release() on lock set while not owner"
+
+ # Support passing in a single resource to release rather than many
+ if isinstance(names, basestring):
+ names = [names]
+
+ if names is None:
+ names = self._list_owned()
+ else:
+ names = set(names)
+ assert self._list_owned().issuperset(names), (
+ "release() on unheld resources %s" %
+ names.difference(self._list_owned()))
+
+ # First of all let's release the "all elements" lock, if set.
+ # After this 'add' can work again
+ if self.__lock._is_owned():
self.__lock.release()
+ self._del_owned()
+
+ for lockname in names:
+ # If we are sure the lock doesn't leave __lockdict without being
+ # exclusively held we can do this...
+ self.__lockdict[lockname].release()
+ self._del_owned(name=lockname)
+ def add(self, names, acquired=0, shared=0):
+ """Add a new set of elements to the set
+
+ @param names: names of the new elements to add
+ @param acquired: pre-acquire the new resource?
+ @param shared: is the pre-acquisition shared?
+
+ """
+ # Check we don't already own locks at this level
+ assert not self._is_owned() or self.__lock._is_owned(shared=0), \
+ "Cannot add locks if the set is only partially owned, or shared"
+
+ # Support passing in a single resource to add rather than many
+ if isinstance(names, basestring):
+ names = [names]
+
+ # If we don't already own the set-level lock acquired in an exclusive way
+ # we'll get it and note we need to release it later.
+ release_lock = False
+ if not self.__lock._is_owned():
+ release_lock = True
+ self.__lock.acquire()
+
+ try:
+ invalid_names = set(self.__names()).intersection(names)
+ if invalid_names:
+ # This must be an explicit raise, not an assert, because assert is
+ # turned off when using optimization, and this can happen because of
+ # concurrency even if the user doesn't want it.
+ raise errors.LockError("duplicate add() (%s)" % invalid_names)
+
+ for lockname in names:
+ lock = SharedLock()
+
+ if acquired:
+ lock.acquire(shared=shared)
+ # now the lock cannot be deleted, we have it!
+ try:
+ self._add_owned(name=lockname)
+ except:
+ # We shouldn't have problems adding the lock to the owners list,
+ # but if we did we'll try to release this lock and re-raise
+ # exception. Of course something is going to be really wrong,
+ # after this. On the other hand the lock hasn't been added to the
+ # __lockdict yet so no other threads should be pending on it. This
+ # release is just a safety measure.
+ lock.release()
+ raise
+
+ self.__lockdict[lockname] = lock
+
+ finally:
+ # Only release __lock if we were not holding it previously.
+ if release_lock:
+ self.__lock.release()
+
+ return True
+
+ def remove(self, names):
+ """Remove elements from the lock set.
+
+ You can either not hold anything in the lockset or already hold a superset
+ of the elements you want to delete, exclusively.
+
+ @param names: names of the resource to remove.
+
+ @return: a list of locks which we removed; the list is always
+ equal to the names list if we were holding all the locks
+ exclusively
+
+ """
+ # Support passing in a single resource to remove rather than many
+ if isinstance(names, basestring):
+ names = [names]
+
+ # If we own any subset of this lock it must be a superset of what we want
+ # to delete. The ownership must also be exclusive, but that will be checked
+ # by the lock itself.
+ assert not self._is_owned() or self._list_owned().issuperset(names), (
+ "remove() on acquired lockset while not owning all elements")
+
+ removed = []
+
+ for lname in names:
+ # Calling delete() acquires the lock exclusively if we don't already own
+ # it, and causes all pending and subsequent lock acquires to fail. It's
+ # fine to call it out of order because delete() also implies release(),
+ # and the assertion above guarantees that if we either already hold
+ # everything we want to delete, or we hold none.
+ try:
+ self.__lockdict[lname].delete()
+ removed.append(lname)
+ except (KeyError, errors.LockError):
+ # This cannot happen if we were already holding it, verify:
+ assert not self._is_owned(), "remove failed while holding lockset"
+ else:
+ # If no LockError was raised we are the ones who deleted the lock.
+ # This means we can safely remove it from lockdict, as any further or
+ # pending delete() or acquire() will fail (and nobody can have the lock
+ # since before our call to delete()).
+ #
+ # This is done in an else clause because if the exception was thrown
+ # it's the job of the one who actually deleted it.
+ del self.__lockdict[lname]
+ # And let's remove it from our private list if we owned it.
+ if self._is_owned():
+ self._del_owned(name=lname)
+
+ return removed
+
+
+# Locking levels, must be acquired in increasing order.
+# Current rules are:
+# - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
+# acquired before performing any operation, either in shared or in exclusive
+# mode. acquiring the BGL in exclusive mode is discouraged and should be
+# avoided.
+# - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
+# If you need more than one node, or more than one instance, acquire them at
+# the same time.
+LEVEL_CLUSTER = 0
+LEVEL_INSTANCE = 1
+LEVEL_NODE = 2
+
+LEVELS = [LEVEL_CLUSTER,
+ LEVEL_INSTANCE,
+ LEVEL_NODE]
+
+# Lock levels which are modifiable
+LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
+
+LEVEL_NAMES = {
+ LEVEL_CLUSTER: "cluster",
+ LEVEL_INSTANCE: "instance",
+ LEVEL_NODE: "node",
+ }
+
+# Constant for the big ganeti lock
+BGL = 'BGL'
+
+
+class GanetiLockManager:
+ """The Ganeti Locking Library
+
+ The purpose of this small library is to manage locking for ganeti clusters
+ in a central place, while at the same time doing dynamic checks against
+ possible deadlocks. It will also make it easier to transition to a different
+ lock type should we migrate away from python threads.
+
+ """
+ _instance = None
+
+ def __init__(self, nodes=None, instances=None):
+ """Constructs a new GanetiLockManager object.
+
+ There should be only a GanetiLockManager object at any time, so this
+ function raises an error if this is not the case.
+
+ @param nodes: list of node names
+ @param instances: list of instance names
+
+ """
+ assert self.__class__._instance is None, \
+ "double GanetiLockManager instance"
+
+ self.__class__._instance = self
+
+ # The keyring contains all the locks, at their level and in the correct
+ # locking order.
+ self.__keyring = {
+ LEVEL_CLUSTER: LockSet([BGL]),
+ LEVEL_NODE: LockSet(nodes),
+ LEVEL_INSTANCE: LockSet(instances),
+ }
+
+ def _names(self, level):
+ """List the lock names at the given level.
+
+ This can be used for debugging/testing purposes.
+
+ @param level: the level whose list of locks to get
+
+ """
+ assert level in LEVELS, "Invalid locking level %s" % level
+ return self.__keyring[level]._names()
+
+ def _is_owned(self, level):
+ """Check whether we are owning locks at the given level
+
+ """
+ return self.__keyring[level]._is_owned()
+
+ is_owned = _is_owned
+
+ def _list_owned(self, level):
+ """Get the set of owned locks at the given level
+
+ """
+ return self.__keyring[level]._list_owned()
+
+ def _upper_owned(self, level):
+ """Check that we don't own any lock at a level greater than the given one.
+
+ """
+ # This way of checking only works if LEVELS[i] = i, which we check for in
+ # the test cases.
+ return utils.any((self._is_owned(l) for l in LEVELS[level + 1:]))
+
+ def _BGL_owned(self):
+ """Check if the current thread owns the BGL.
+
+ Both an exclusive or a shared acquisition work.
+
+ """
+ return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
+
+ def _contains_BGL(self, level, names):
+ """Check if the level contains the BGL.
+
+ Check if acting on the given level and set of names will change
+ the status of the Big Ganeti Lock.
+
+ """
+ return level == LEVEL_CLUSTER and (names is None or BGL in names)
+
+ def acquire(self, level, names, timeout=None, shared=0):
+ """Acquire a set of resource locks, at the same level.
+
+ @param level: the level at which the locks shall be acquired;
+ it must be a member of LEVELS.
+ @param names: the names of the locks which shall be acquired
+ (special lock names, or instance/node names)
+ @param shared: whether to acquire in shared mode; by default
+ an exclusive lock will be acquired
+ @type timeout: float
+ @param timeout: Maximum time to acquire all locks
+
+ """
+ assert level in LEVELS, "Invalid locking level %s" % level
+
+ # Check that we are either acquiring the Big Ganeti Lock or we already own
+ # it. Some "legacy" opcodes need to be sure they are run non-concurrently
+ # so even if we've migrated we need to at least share the BGL to be
+ # compatible with them. Of course if we own the BGL exclusively there's no
+ # point in acquiring any other lock, unless perhaps we are half way through
+ # the migration of the current opcode.
+ assert (self._contains_BGL(level, names) or self._BGL_owned()), (
+ "You must own the Big Ganeti Lock before acquiring any other")
+
+ # Check we don't own locks at the same or upper levels.
+ assert not self._upper_owned(level), ("Cannot acquire locks at a level"
+ " while owning some at a greater one")
+
+ # Acquire the locks in the set.
+ return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
+
+ def release(self, level, names=None):
+ """Release a set of resource locks, at the same level.
+
+ You must have acquired the locks, either in shared or in exclusive
+ mode, before releasing them.
+
+ @param level: the level at which the locks shall be released;
+ it must be a member of LEVELS
+ @param names: the names of the locks which shall be released
+ (defaults to all the locks acquired at that level)
+
+ """
+ assert level in LEVELS, "Invalid locking level %s" % level
+ assert (not self._contains_BGL(level, names) or
+ not self._upper_owned(LEVEL_CLUSTER)), (
+ "Cannot release the Big Ganeti Lock while holding something"
+ " at upper levels")
+
+ # Release will complain if we don't own the locks already
+ return self.__keyring[level].release(names)
+
+ def add(self, level, names, acquired=0, shared=0):
+ """Add locks at the specified level.
+
+ @param level: the level at which the locks shall be added;
+ it must be a member of LEVELS_MOD.
+ @param names: names of the locks to acquire
+ @param acquired: whether to acquire the newly added locks
+ @param shared: whether the acquisition will be shared
+
+ """
+ assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
+ assert self._BGL_owned(), ("You must own the BGL before performing other"
+ " operations")
+ assert not self._upper_owned(level), ("Cannot add locks at a level"
+ " while owning some at a greater one")
+ return self.__keyring[level].add(names, acquired=acquired, shared=shared)
+
+ def remove(self, level, names):
+ """Remove locks from the specified level.
+
+ You must either already own the locks you are trying to remove
+ exclusively or not own any lock at an upper level.
+
+ @param level: the level at which the locks shall be removed;
+ it must be a member of LEVELS_MOD
+ @param names: the names of the locks which shall be removed
+ (special lock names, or instance/node names)
+
+ """
+ assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
+ assert self._BGL_owned(), ("You must own the BGL before performing other"
+ " operations")
+ # Check we either own the level or don't own anything from here
+ # up. LockSet.remove() will check the case in which we don't own
+ # all the needed resources, or we have a shared ownership.
+ assert self._is_owned(level) or not self._upper_owned(level), (
+ "Cannot remove locks at a level while not owning it or"
+ " owning some at a greater one")
+ return self.__keyring[level].remove(names)