#
#
-# Copyright (C) 2006, 2007 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
"""Module implementing the Ganeti locking code."""
+# pylint: disable-msg=W0212
+
+# W0212 since e.g. LockSet methods use (a lot) the internals of
+# SharedLock
+
import os
import select
import threading
-import time
import errno
+import weakref
+import logging
+import heapq
+import itertools
from ganeti import errors
from ganeti import utils
+from ganeti import compat
+from ganeti import query
+
+_EXCLUSIVE_TEXT = "exclusive"
+_SHARED_TEXT = "shared"
+_DELETED_TEXT = "deleted"
-def ssynchronized(lock, shared=0):
+_DEFAULT_PRIORITY = 0
+
+
+def ssynchronized(mylock, shared=0):
"""Shared Synchronization decorator.
Calls the function holding the given lock, either in exclusive or shared
mode. It requires the passed lock to be a SharedLock (or support its
semantics).
+ @type mylock: lockable object or string
+ @param mylock: lock to acquire or class member name of the lock to acquire
+
"""
def wrap(fn):
def sync_function(*args, **kwargs):
+ if isinstance(mylock, basestring):
+ assert args, "cannot ssynchronize on non-class method: self not found"
+ # args[0] is "self"
+ lock = getattr(args[0], mylock)
+ else:
+ lock = mylock
lock.acquire(shared=shared)
try:
return fn(*args, **kwargs)
return wrap
-class _SingleActionPipeConditionWaiter(object):
- """Callable helper class for _SingleActionPipeCondition.
+class _SingleNotifyPipeConditionWaiter(object):
+ """Helper class for SingleNotifyPipeCondition
"""
__slots__ = [
- "_cond",
"_fd",
"_poller",
]
- def __init__(self, cond, poller, fd):
- """Initializes this class.
+ def __init__(self, poller, fd):
+ """Constructor for _SingleNotifyPipeConditionWaiter
- @type cond: L{_SingleActionPipeCondition}
- @param cond: Parent condition
@type poller: select.poll
@param poller: Poller object
@type fd: int
"""
object.__init__(self)
-
- self._cond = cond
self._poller = poller
self._fd = fd
@param timeout: Timeout for waiting (can be None)
"""
- start_time = time.time()
- remaining_time = timeout
+ running_timeout = utils.RunningTimeout(timeout, True)
+
+ while True:
+ remaining_time = running_timeout.Remaining()
+
+ if remaining_time is not None:
+ if remaining_time < 0.0:
+ break
+
+ # Our calculation uses seconds, poll() wants milliseconds
+ remaining_time *= 1000
- while timeout is None or remaining_time > 0:
try:
result = self._poller.poll(remaining_time)
except EnvironmentError, err:
if result and result[0][0] == self._fd:
break
- # Re-calculate timeout if necessary
- if timeout is not None:
- remaining_time = start_time + timeout - time.time()
-
-
-class _SingleActionPipeCondition(object):
- """Wrapper around a pipe for usage inside conditions.
- This class contains a POSIX pipe(2) and a poller to poll it. The pipe is
- always allocated when constructing the class. Extra care is taken to always
- close the file descriptors.
+class _BaseCondition(object):
+ """Base class containing common code for conditions.
- An additional class, L{_SingleActionPipeConditionWaiter}, is used to wait for
- notifications.
-
- Warning: This class is designed to be used as the underlying component of a
- locking condition, but is not by itself thread safe, and needs to be
- protected by an external lock.
+ Some of this code is taken from python's threading module.
"""
__slots__ = [
- "_poller",
- "_read_fd",
- "_write_fd",
- "_nwaiters",
+ "_lock",
+ "acquire",
+ "release",
+ "_is_owned",
+ "_acquire_restore",
+ "_release_save",
]
- _waiter_class = _SingleActionPipeConditionWaiter
+ def __init__(self, lock):
+ """Constructor for _BaseCondition.
- def __init__(self):
- """Initializes this class.
+ @type lock: threading.Lock
+ @param lock: condition base lock
"""
object.__init__(self)
- self._nwaiters = 0
-
- # Just assume the unpacking is successful, otherwise error handling gets
- # very complicated.
- (self._read_fd, self._write_fd) = os.pipe()
try:
- # The poller looks for closure of the write side
- poller = select.poll()
- poller.register(self._read_fd, select.POLLHUP)
-
- self._poller = poller
- except:
- if self._read_fd is not None:
- os.close(self._read_fd)
- if self._write_fd is not None:
- os.close(self._write_fd)
- raise
+ self._release_save = lock._release_save
+ except AttributeError:
+ self._release_save = self._base_release_save
+ try:
+ self._acquire_restore = lock._acquire_restore
+ except AttributeError:
+ self._acquire_restore = self._base_acquire_restore
+ try:
+ self._is_owned = lock._is_owned
+ except AttributeError:
+ self._is_owned = self._base_is_owned
- # There should be no code here anymore, otherwise the pipe file descriptors
- # may be not be cleaned up properly in case of errors.
+ self._lock = lock
- def StartWaiting(self):
- """Return function to wait for notification.
+ # Export the lock's acquire() and release() methods
+ self.acquire = lock.acquire
+ self.release = lock.release
- @rtype: L{_SingleActionPipeConditionWaiter}
- @return: Function to wait for notification
+ def _base_is_owned(self):
+ """Check whether lock is owned by current thread.
"""
- assert self._nwaiters >= 0
+ if self._lock.acquire(0):
+ self._lock.release()
+ return False
+ return True
- if self._poller is None:
- raise RuntimeError("Already cleaned up")
+ def _base_release_save(self):
+ self._lock.release()
- # Create waiter function and increase number of waiters
- wait_fn = self._waiter_class(self, self._poller, self._read_fd)
- self._nwaiters += 1
- return wait_fn
+ def _base_acquire_restore(self, _):
+ self._lock.acquire()
+
+ def _check_owned(self):
+ """Raise an exception if the current thread doesn't own the lock.
- def DoneWaiting(self):
- """Decrement number of waiters and automatic cleanup.
+ """
+ if not self._is_owned():
+ raise RuntimeError("cannot work with un-aquired lock")
- Must be called after waiting for a notification.
- @rtype: bool
- @return: Whether this was the last waiter
+class SingleNotifyPipeCondition(_BaseCondition):
+ """Condition which can only be notified once.
- """
- assert self._nwaiters > 0
+ This condition class uses pipes and poll, internally, to be able to wait for
+ notification with a timeout, without resorting to polling. It is almost
+ compatible with Python's threading.Condition, with the following differences:
+ - notifyAll can only be called once, and no wait can happen after that
+ - notify is not supported, only notifyAll
- self._nwaiters -= 1
+ """
- if self._nwaiters == 0:
- self._Cleanup()
- return True
+ __slots__ = [
+ "_poller",
+ "_read_fd",
+ "_write_fd",
+ "_nwaiters",
+ "_notified",
+ ]
- return False
+ _waiter_class = _SingleNotifyPipeConditionWaiter
- def notifyAll(self):
- """Close the writing side of the pipe to notify all waiters.
+ def __init__(self, lock):
+ """Constructor for SingleNotifyPipeCondition
"""
- if self._write_fd is None:
- raise RuntimeError("Can only notify once")
-
- os.close(self._write_fd)
+ _BaseCondition.__init__(self, lock)
+ self._nwaiters = 0
+ self._notified = False
+ self._read_fd = None
self._write_fd = None
+ self._poller = None
+
+ def _check_unnotified(self):
+ """Throws an exception if already notified.
+
+ """
+ if self._notified:
+ raise RuntimeError("cannot use already notified condition")
def _Cleanup(self):
- """Close all file descriptors.
+ """Cleanup open file descriptors, if any.
"""
if self._read_fd is not None:
if self._write_fd is not None:
os.close(self._write_fd)
self._write_fd = None
-
self._poller = None
- def __del__(self):
- """Called on object deletion.
+ def wait(self, timeout):
+ """Wait for a notification.
- Ensure no file descriptors are left open.
+ @type timeout: float or None
+ @param timeout: Waiting timeout (can be None)
"""
- self._Cleanup()
+ self._check_owned()
+ self._check_unnotified()
+
+ self._nwaiters += 1
+ try:
+ if self._poller is None:
+ (self._read_fd, self._write_fd) = os.pipe()
+ self._poller = select.poll()
+ self._poller.register(self._read_fd, select.POLLHUP)
+
+ wait_fn = self._waiter_class(self._poller, self._read_fd)
+ state = self._release_save()
+ try:
+ # Wait for notification
+ wait_fn(timeout)
+ finally:
+ # Re-acquire lock
+ self._acquire_restore(state)
+ finally:
+ self._nwaiters -= 1
+ if self._nwaiters == 0:
+ self._Cleanup()
+
+ def notifyAll(self): # pylint: disable-msg=C0103
+ """Close the writing side of the pipe to notify all waiters.
+
+ """
+ self._check_owned()
+ self._check_unnotified()
+ self._notified = True
+ if self._write_fd is not None:
+ os.close(self._write_fd)
+ self._write_fd = None
-class _PipeCondition(object):
+class PipeCondition(_BaseCondition):
"""Group-only non-polling condition with counters.
This condition class uses pipes and poll, internally, to be able to wait for
"""
__slots__ = [
- "_lock",
- "_nwaiters",
- "_pipe",
- "acquire",
- "release",
+ "_waiters",
+ "_single_condition",
]
- _pipe_class = _SingleActionPipeCondition
+ _single_condition_class = SingleNotifyPipeCondition
def __init__(self, lock):
"""Initializes this class.
"""
- object.__init__(self)
-
- # Recursive locks are not supported
- assert not hasattr(lock, "_acquire_restore")
- assert not hasattr(lock, "_release_save")
-
- self._lock = lock
-
- # Export the lock's acquire() and release() methods
- self.acquire = lock.acquire
- self.release = lock.release
-
- self._nwaiters = 0
- self._pipe = None
-
- def _is_owned(self):
- """Check whether lock is owned by current thread.
-
- """
- if self._lock.acquire(0):
- self._lock.release()
- return False
-
- return True
-
- def _check_owned(self):
- """Raise an exception if the current thread doesn't own the lock.
-
- """
- if not self._is_owned():
- raise RuntimeError("cannot work with un-aquired lock")
+ _BaseCondition.__init__(self, lock)
+ self._waiters = set()
+ self._single_condition = self._single_condition_class(self._lock)
- def wait(self, timeout=None):
+ def wait(self, timeout):
"""Wait for a notification.
@type timeout: float or None
"""
self._check_owned()
- if not self._pipe:
- self._pipe = self._pipe_class()
-
# Keep local reference to the pipe. It could be replaced by another thread
# notifying while we're waiting.
- pipe = self._pipe
+ cond = self._single_condition
- assert self._nwaiters >= 0
- self._nwaiters += 1
+ self._waiters.add(threading.currentThread())
try:
- # Get function to wait on the pipe
- wait_fn = pipe.StartWaiting()
- try:
- # Release lock while waiting
- self.release()
- try:
- # Wait for notification
- wait_fn(timeout)
- finally:
- # Re-acquire lock
- self.acquire()
- finally:
- # Destroy pipe if this was the last waiter and the current pipe is
- # still the same. The same pipe cannot be reused after cleanup.
- if pipe.DoneWaiting() and pipe == self._pipe:
- self._pipe = None
+ cond.wait(timeout)
finally:
- assert self._nwaiters > 0
- self._nwaiters -= 1
+ self._check_owned()
+ self._waiters.remove(threading.currentThread())
- def notifyAll(self):
+ def notifyAll(self): # pylint: disable-msg=C0103
"""Notify all currently waiting threads.
"""
self._check_owned()
+ self._single_condition.notifyAll()
+ self._single_condition = self._single_condition_class(self._lock)
+
+ def get_waiting(self):
+ """Returns a list of all waiting threads.
+
+ """
+ self._check_owned()
- # Notify and forget pipe. A new one will be created on the next call to
- # wait.
- if self._pipe is not None:
- self._pipe.notifyAll()
- self._pipe = None
+ return self._waiters
def has_waiting(self):
"""Returns whether there are active waiters.
"""
self._check_owned()
- return bool(self._nwaiters)
+ return bool(self._waiters)
-class _CountingCondition(object):
- """Wrapper for Python's built-in threading.Condition class.
-
- This wrapper keeps a count of active waiters. We can't access the internal
- "__waiters" attribute of threading.Condition because it's not thread-safe.
-
- """
+class _PipeConditionWithMode(PipeCondition):
__slots__ = [
- "_cond",
- "_nwaiters",
+ "shared",
]
- def __init__(self, lock):
+ def __init__(self, lock, shared):
"""Initializes this class.
"""
- object.__init__(self)
- self._cond = threading.Condition(lock=lock)
- self._nwaiters = 0
-
- def notifyAll(self):
- """Notifies the condition.
-
- """
- return self._cond.notifyAll()
-
- def wait(self, timeout=None):
- """Waits for the condition to be notified.
-
- @type timeout: float or None
- @param timeout: Timeout in seconds
-
- """
- assert self._nwaiters >= 0
-
- self._nwaiters += 1
- try:
- return self._cond.wait(timeout=timeout)
- finally:
- self._nwaiters -= 1
-
- def has_waiting(self):
- """Returns whether there are active waiters.
-
- """
- return bool(self._nwaiters)
+ self.shared = shared
+ PipeCondition.__init__(self, lock)
class SharedLock(object):
"""Implements a shared lock.
- Multiple threads can acquire the lock in a shared way, calling
- acquire_shared(). In order to acquire the lock in an exclusive way threads
- can call acquire_exclusive().
+ Multiple threads can acquire the lock in a shared way by calling
+ C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
+ threads can call C{acquire(shared=0)}.
+
+ Notes on data structures: C{__pending} contains a priority queue (heapq) of
+ all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
+ ...]}. Each per-priority queue contains a normal in-order list of conditions
+ to be notified when the lock can be acquired. Shared locks are grouped
+ together by priority and the condition for them is stored in
+ C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
+ references for the per-priority queues indexed by priority for faster access.
- The lock prevents starvation but does not guarantee that threads will acquire
- the shared lock in the order they queued for it, just that they will
- eventually do so.
+ @type name: string
+ @ivar name: the name of the lock
"""
__slots__ = [
- "__active_shr_c",
- "__inactive_shr_c",
+ "__weakref__",
"__deleted",
"__exc",
"__lock",
"__pending",
+ "__pending_by_prio",
+ "__pending_shared",
"__shr",
+ "name",
]
- __condition_class = _PipeCondition
+ __condition_class = _PipeConditionWithMode
- def __init__(self):
+ def __init__(self, name, monitor=None):
"""Construct a new SharedLock.
+ @param name: the name of the lock
+ @type monitor: L{LockMonitor}
+ @param monitor: Lock monitor with which to register
+
"""
object.__init__(self)
+ self.name = name
+
# Internal lock
self.__lock = threading.Lock()
# Queue containing waiting acquires
self.__pending = []
-
- # Active and inactive conditions for shared locks
- self.__active_shr_c = self.__condition_class(self.__lock)
- self.__inactive_shr_c = self.__condition_class(self.__lock)
+ self.__pending_by_prio = {}
+ self.__pending_shared = {}
# Current lock holders
self.__shr = set()
# is this lock in the deleted state?
self.__deleted = False
+ # Register with lock monitor
+ if monitor:
+ logging.debug("Adding lock %s to monitor", name)
+ monitor.RegisterLock(self)
+
+ def GetLockInfo(self, requested):
+ """Retrieves information for querying locks.
+
+ @type requested: set
+ @param requested: Requested information, see C{query.LQ_*}
+
+ """
+ self.__lock.acquire()
+ try:
+ # Note: to avoid unintentional race conditions, no references to
+ # modifiable objects should be returned unless they were created in this
+ # function.
+ mode = None
+ owner_names = None
+
+ if query.LQ_MODE in requested:
+ if self.__deleted:
+ mode = _DELETED_TEXT
+ assert not (self.__exc or self.__shr)
+ elif self.__exc:
+ mode = _EXCLUSIVE_TEXT
+ elif self.__shr:
+ mode = _SHARED_TEXT
+
+ # Current owner(s) are wanted
+ if query.LQ_OWNER in requested:
+ if self.__exc:
+ owner = [self.__exc]
+ else:
+ owner = self.__shr
+
+ if owner:
+ assert not self.__deleted
+ owner_names = [i.getName() for i in owner]
+
+ # Pending acquires are wanted
+ if query.LQ_PENDING in requested:
+ pending = []
+
+ # Sorting instead of copying and using heaq functions for simplicity
+ for (_, prioqueue) in sorted(self.__pending):
+ for cond in prioqueue:
+ if cond.shared:
+ pendmode = _SHARED_TEXT
+ else:
+ pendmode = _EXCLUSIVE_TEXT
+
+ # List of names will be sorted in L{query._GetLockPending}
+ pending.append((pendmode, [i.getName()
+ for i in cond.get_waiting()]))
+ else:
+ pending = None
+
+ return [(self.name, mode, owner_names, pending)]
+ finally:
+ self.__lock.release()
+
def __check_deleted(self):
"""Raises an exception if the lock has been deleted.
"""
if self.__deleted:
- raise errors.LockError("Deleted lock")
+ raise errors.LockError("Deleted lock %s" % self.name)
def __is_sharer(self):
"""Is the current thread sharing the lock at this time?
finally:
self.__lock.release()
+ is_owned = _is_owned
+
def _count_pending(self):
"""Returns the number of pending acquires.
"""
self.__lock.acquire()
try:
- return len(self.__pending)
+ return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
+ finally:
+ self.__lock.release()
+
+ def _check_empty(self):
+ """Checks whether there are any pending acquires.
+
+ @rtype: bool
+
+ """
+ self.__lock.acquire()
+ try:
+ # Order is important: __find_first_pending_queue modifies __pending
+ (_, prioqueue) = self.__find_first_pending_queue()
+
+ return not (prioqueue or
+ self.__pending or
+ self.__pending_by_prio or
+ self.__pending_shared)
finally:
self.__lock.release()
else:
return len(self.__shr) == 0 and self.__exc is None
+ def __find_first_pending_queue(self):
+ """Tries to find the topmost queued entry with pending acquires.
+
+ Removes empty entries while going through the list.
+
+ """
+ while self.__pending:
+ (priority, prioqueue) = self.__pending[0]
+
+ if prioqueue:
+ return (priority, prioqueue)
+
+ # Remove empty queue
+ heapq.heappop(self.__pending)
+ del self.__pending_by_prio[priority]
+ assert priority not in self.__pending_shared
+
+ return (None, None)
+
def __is_on_top(self, cond):
"""Checks whether the passed condition is on top of the queue.
The caller must make sure the queue isn't empty.
"""
- return self.__pending[0] == cond
+ (_, prioqueue) = self.__find_first_pending_queue()
+
+ return cond == prioqueue[0]
- def __acquire_unlocked(self, shared, timeout):
+ def __acquire_unlocked(self, shared, timeout, priority):
"""Acquire a shared lock.
@param shared: whether to acquire in shared mode; by default an
exclusive lock will be acquired
@param timeout: maximum waiting time before giving up
+ @type priority: integer
+ @param priority: Priority for acquiring lock
"""
self.__check_deleted()
# We cannot acquire the lock if we already have it
- assert not self.__is_owned(), "double acquire() on a non-recursive lock"
+ assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
+ " %s" % self.name)
+
+ # Remove empty entries from queue
+ self.__find_first_pending_queue()
# Check whether someone else holds the lock or there are pending acquires.
if not self.__pending and self.__can_acquire(shared):
self.__do_acquire(shared)
return True
- if shared:
- wait_condition = self.__active_shr_c
+ prioqueue = self.__pending_by_prio.get(priority, None)
- # Check if we're not yet in the queue
- if wait_condition not in self.__pending:
- self.__pending.append(wait_condition)
+ if shared:
+ # Try to re-use condition for shared acquire
+ wait_condition = self.__pending_shared.get(priority, None)
+ assert (wait_condition is None or
+ (wait_condition.shared and wait_condition in prioqueue))
else:
- wait_condition = self.__condition_class(self.__lock)
- # Always add to queue
- self.__pending.append(wait_condition)
+ wait_condition = None
+
+ if wait_condition is None:
+ if prioqueue is None:
+ assert priority not in self.__pending_by_prio
+
+ prioqueue = []
+ heapq.heappush(self.__pending, (priority, prioqueue))
+ self.__pending_by_prio[priority] = prioqueue
+
+ wait_condition = self.__condition_class(self.__lock, shared)
+ prioqueue.append(wait_condition)
+
+ if shared:
+ # Keep reference for further shared acquires on same priority. This is
+ # better than trying to find it in the list of pending acquires.
+ assert priority not in self.__pending_shared
+ self.__pending_shared[priority] = wait_condition
try:
# Wait until we become the topmost acquire in the queue or the timeout
# expires.
+ # TODO: Decrease timeout with spurious notifications
while not (self.__is_on_top(wait_condition) and
self.__can_acquire(shared)):
# Wait for notification
return True
finally:
# Remove condition from queue if there are no more waiters
- if not wait_condition.has_waiting() and not self.__deleted:
- self.__pending.remove(wait_condition)
+ if not wait_condition.has_waiting():
+ prioqueue.remove(wait_condition)
+ if wait_condition.shared:
+ # Remove from list of shared acquires if it wasn't while releasing
+ # (e.g. on lock deletion)
+ self.__pending_shared.pop(priority, None)
return False
- def acquire(self, shared=0, timeout=None):
+ def acquire(self, shared=0, timeout=None, priority=None,
+ test_notify=None):
"""Acquire a shared lock.
- @type shared: int
+ @type shared: integer (0/1) used as a boolean
@param shared: whether to acquire in shared mode; by default an
exclusive lock will be acquired
@type timeout: float
@param timeout: maximum waiting time before giving up
+ @type priority: integer
+ @param priority: Priority for acquiring lock
+ @type test_notify: callable or None
+ @param test_notify: Special callback function for unittesting
+
+ """
+ if priority is None:
+ priority = _DEFAULT_PRIORITY
+
+ self.__lock.acquire()
+ try:
+ # We already got the lock, notify now
+ if __debug__ and callable(test_notify):
+ test_notify()
+
+ return self.__acquire_unlocked(shared, timeout, priority)
+ finally:
+ self.__lock.release()
+
+ def downgrade(self):
+ """Changes the lock mode from exclusive to shared.
+
+ Pending acquires in shared mode on the same priority will go ahead.
"""
self.__lock.acquire()
try:
- return self.__acquire_unlocked(shared, timeout)
+ assert self.__is_owned(), "Lock must be owned"
+
+ if self.__is_exclusive():
+ # Do nothing if the lock is already acquired in shared mode
+ self.__exc = None
+ self.__do_acquire(1)
+
+ # Important: pending shared acquires should only jump ahead if there
+ # was a transition from exclusive to shared, otherwise an owner of a
+ # shared lock can keep calling this function to push incoming shared
+ # acquires
+ (priority, prioqueue) = self.__find_first_pending_queue()
+ if prioqueue:
+ # Is there a pending shared acquire on this priority?
+ cond = self.__pending_shared.pop(priority, None)
+ if cond:
+ assert cond.shared
+ assert cond in prioqueue
+
+ # Ensure shared acquire is on top of queue
+ if len(prioqueue) > 1:
+ prioqueue.remove(cond)
+ prioqueue.insert(0, cond)
+
+ # Notify
+ cond.notifyAll()
+
+ assert not self.__is_exclusive()
+ assert self.__is_sharer()
+
+ return True
finally:
self.__lock.release()
self.__shr.remove(threading.currentThread())
# Notify topmost condition in queue
- if self.__pending:
- first_condition = self.__pending[0]
- first_condition.notifyAll()
-
- if first_condition == self.__active_shr_c:
- self.__active_shr_c = self.__inactive_shr_c
- self.__inactive_shr_c = first_condition
+ (priority, prioqueue) = self.__find_first_pending_queue()
+ if prioqueue:
+ cond = prioqueue[0]
+ cond.notifyAll()
+ if cond.shared:
+ # Prevent further shared acquires from sneaking in while waiters are
+ # notified
+ self.__pending_shared.pop(priority, None)
finally:
self.__lock.release()
- def delete(self, timeout=None):
+ def delete(self, timeout=None, priority=None):
"""Delete a Shared Lock.
This operation will declare the lock for removal. First the lock will be
@type timeout: float
@param timeout: maximum waiting time before giving up
+ @type priority: integer
+ @param priority: Priority for acquiring lock
"""
+ if priority is None:
+ priority = _DEFAULT_PRIORITY
+
self.__lock.acquire()
try:
assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
acquired = self.__is_exclusive()
if not acquired:
- acquired = self.__acquire_unlocked(0, timeout)
+ acquired = self.__acquire_unlocked(0, timeout, priority)
assert self.__is_exclusive() and not self.__is_sharer(), \
"Lock wasn't acquired in exclusive mode"
self.__deleted = True
self.__exc = None
+ assert not (self.__exc or self.__shr), "Found owner during deletion"
+
# Notify all acquires. They'll throw an error.
- while self.__pending:
- self.__pending.pop().notifyAll()
+ for (_, prioqueue) in self.__pending:
+ for cond in prioqueue:
+ cond.notifyAll()
+
+ assert self.__deleted
return acquired
finally:
self.__lock.release()
+ def _release_save(self):
+ shared = self.__is_sharer()
+ self.release()
+ return shared
+
+ def _acquire_restore(self, shared):
+ self.acquire(shared=shared)
+
# Whenever we want to acquire a full LockSet we pass None as the value
# to acquire. Hide this behind this nicely named constant.
ALL_SET = None
+class _AcquireTimeout(Exception):
+ """Internal exception to abort an acquire on a timeout.
+
+ """
+
+
class LockSet:
"""Implements a set of locks.
All the locks needed in the same set must be acquired together, though.
+ @type name: string
+ @ivar name: the name of the lockset
+
"""
- def __init__(self, members=None):
+ def __init__(self, members, name, monitor=None):
"""Constructs a new LockSet.
+ @type members: list of strings
@param members: initial members of the set
+ @type monitor: L{LockMonitor}
+ @param monitor: Lock monitor with which to register member locks
"""
- # Used internally to guarantee coherency.
- self.__lock = SharedLock()
+ assert members is not None, "members parameter is not a list"
+ self.name = name
+
+ # Lock monitor
+ self.__monitor = monitor
+
+ # Used internally to guarantee coherency
+ self.__lock = SharedLock(self._GetLockName("[lockset]"), monitor=monitor)
# The lockdict indexes the relationship name -> lock
# The order-of-locking is implied by the alphabetical order of names
self.__lockdict = {}
- if members is not None:
- for name in members:
- self.__lockdict[name] = SharedLock()
+ for mname in members:
+ self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
+ monitor=monitor)
# The owner dict contains the set of locks each thread owns. For
# performance each thread can access its own key without a global lock on
# will be trouble.
self.__owners = {}
+ def _GetLockName(self, mname):
+ """Returns the name for a member lock.
+
+ """
+ return "%s/%s" % (self.name, mname)
+
+ def _get_lock(self):
+ """Returns the lockset-internal lock.
+
+ """
+ return self.__lock
+
+ def _get_lockdict(self):
+ """Returns the lockset-internal lock dictionary.
+
+ Accessing this structure is only safe in single-thread usage or when the
+ lockset-internal lock is held.
+
+ """
+ return self.__lockdict
+
def _is_owned(self):
"""Is the current thread a current level owner?"""
return threading.currentThread() in self.__owners
def _del_owned(self, name=None):
"""Note the current thread owns the given lock"""
+ assert not (name is None and self.__lock._is_owned()), \
+ "Cannot hold internal lock when deleting owner status"
+
if name is not None:
self.__owners[threading.currentThread()].remove(name)
else:
return set()
+ def _release_and_delete_owned(self):
+ """Release and delete all resources owned by the current thread"""
+ for lname in self._list_owned():
+ lock = self.__lockdict[lname]
+ if lock._is_owned():
+ lock.release()
+ self._del_owned(name=lname)
+
def __names(self):
"""Return the current set of names.
self.__lock.release()
return set(result)
- def acquire(self, names, timeout=None, shared=0):
+ def acquire(self, names, timeout=None, shared=0, priority=None,
+ test_notify=None):
"""Acquire a set of resource locks.
+ @type names: list of strings (or string)
@param names: the names of the locks which shall be acquired
(special lock names, or instance/node names)
+ @type shared: integer (0/1) used as a boolean
@param shared: whether to acquire in shared mode; by default an
exclusive lock will be acquired
- @type timeout: float
+ @type timeout: float or None
@param timeout: Maximum time to acquire all locks
+ @type priority: integer
+ @param priority: Priority for acquiring locks
+ @type test_notify: callable or None
+ @param test_notify: Special callback function for unittesting
- @return: True when all the locks are successfully acquired
+ @return: Set of all locks successfully acquired or None in case of timeout
@raise errors.LockError: when any lock we try to acquire has
been deleted before we succeed. In this case none of the
locks requested will be acquired.
"""
- if timeout is not None:
- raise NotImplementedError
+ assert timeout is None or timeout >= 0.0
# Check we don't already own locks at this level
- assert not self._is_owned(), "Cannot acquire locks in the same set twice"
+ assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
+ " (lockset %s)" % self.name)
- if names is None:
- # If no names are given acquire the whole set by not letting new names
- # being added before we release, and getting the current list of names.
- # Some of them may then be deleted later, but we'll cope with this.
- #
- # We'd like to acquire this lock in a shared way, as it's nice if
- # everybody else can use the instances at the same time. If are acquiring
- # them exclusively though they won't be able to do this anyway, though,
- # so we'll get the list lock exclusively as well in order to be able to
- # do add() on the set while owning it.
- self.__lock.acquire(shared=shared)
- try:
- # note we own the set-lock
- self._add_owned()
- names = self.__names()
- except:
- # We shouldn't have problems adding the lock to the owners list, but
- # if we did we'll try to release this lock and re-raise exception.
- # Of course something is going to be really wrong, after this.
- self.__lock.release()
- raise
+ if priority is None:
+ priority = _DEFAULT_PRIORITY
+
+ # We need to keep track of how long we spent waiting for a lock. The
+ # timeout passed to this function is over all lock acquires.
+ running_timeout = utils.RunningTimeout(timeout, False)
try:
- # Support passing in a single resource to acquire rather than many
- if isinstance(names, basestring):
- names = [names]
+ if names is not None:
+ # Support passing in a single resource to acquire rather than many
+ if isinstance(names, basestring):
+ names = [names]
+
+ return self.__acquire_inner(names, False, shared, priority,
+ running_timeout.Remaining, test_notify)
+
else:
- names = sorted(names)
+ # If no names are given acquire the whole set by not letting new names
+ # being added before we release, and getting the current list of names.
+ # Some of them may then be deleted later, but we'll cope with this.
+ #
+ # We'd like to acquire this lock in a shared way, as it's nice if
+ # everybody else can use the instances at the same time. If we are
+ # acquiring them exclusively though they won't be able to do this
+ # anyway, though, so we'll get the list lock exclusively as well in
+ # order to be able to do add() on the set while owning it.
+ if not self.__lock.acquire(shared=shared, priority=priority,
+ timeout=running_timeout.Remaining()):
+ raise _AcquireTimeout()
+ try:
+ # note we own the set-lock
+ self._add_owned()
+
+ return self.__acquire_inner(self.__names(), True, shared, priority,
+ running_timeout.Remaining, test_notify)
+ except:
+ # We shouldn't have problems adding the lock to the owners list, but
+ # if we did we'll try to release this lock and re-raise exception.
+ # Of course something is going to be really wrong, after this.
+ self.__lock.release()
+ self._del_owned()
+ raise
+
+ except _AcquireTimeout:
+ return None
+
+ def __acquire_inner(self, names, want_all, shared, priority,
+ timeout_fn, test_notify):
+ """Inner logic for acquiring a number of locks.
+
+ @param names: Names of the locks to be acquired
+ @param want_all: Whether all locks in the set should be acquired
+ @param shared: Whether to acquire in shared mode
+ @param timeout_fn: Function returning remaining timeout
+ @param priority: Priority for acquiring locks
+ @param test_notify: Special callback function for unittesting
+
+ """
+ acquire_list = []
+
+ # First we look the locks up on __lockdict. We have no way of being sure
+ # they will still be there after, but this makes it a lot faster should
+ # just one of them be the already wrong. Using a sorted sequence to prevent
+ # deadlocks.
+ for lname in sorted(utils.UniqueSequence(names)):
+ try:
+ lock = self.__lockdict[lname] # raises KeyError if lock is not there
+ except KeyError:
+ if want_all:
+ # We are acquiring all the set, it doesn't matter if this particular
+ # element is not there anymore.
+ continue
+
+ raise errors.LockError("Non-existing lock %s in set %s (it may have"
+ " been removed)" % (lname, self.name))
+
+ acquire_list.append((lname, lock))
+
+ # This will hold the locknames we effectively acquired.
+ acquired = set()
+
+ try:
+ # Now acquire_list contains a sorted list of resources and locks we
+ # want. In order to get them we loop on this (private) list and
+ # acquire() them. We gave no real guarantee they will still exist till
+ # this is done but .acquire() itself is safe and will alert us if the
+ # lock gets deleted.
+ for (lname, lock) in acquire_list:
+ if __debug__ and callable(test_notify):
+ test_notify_fn = lambda: test_notify(lname)
+ else:
+ test_notify_fn = None
+
+ timeout = timeout_fn()
- acquire_list = []
- # First we look the locks up on __lockdict. We have no way of being sure
- # they will still be there after, but this makes it a lot faster should
- # just one of them be the already wrong
- for lname in utils.UniqueSequence(names):
try:
- lock = self.__lockdict[lname] # raises KeyError if lock is not there
- acquire_list.append((lname, lock))
- except (KeyError):
- if self.__lock._is_owned():
+ # raises LockError if the lock was deleted
+ acq_success = lock.acquire(shared=shared, timeout=timeout,
+ priority=priority,
+ test_notify=test_notify_fn)
+ except errors.LockError:
+ if want_all:
# We are acquiring all the set, it doesn't matter if this
# particular element is not there anymore.
continue
- else:
- raise errors.LockError('non-existing lock in set (%s)' % lname)
-
- # This will hold the locknames we effectively acquired.
- acquired = set()
- # Now acquire_list contains a sorted list of resources and locks we want.
- # In order to get them we loop on this (private) list and acquire() them.
- # We gave no real guarantee they will still exist till this is done but
- # .acquire() itself is safe and will alert us if the lock gets deleted.
- for (lname, lock) in acquire_list:
+
+ raise errors.LockError("Non-existing lock %s in set %s (it may"
+ " have been removed)" % (lname, self.name))
+
+ if not acq_success:
+ # Couldn't get lock or timeout occurred
+ if timeout is None:
+ # This shouldn't happen as SharedLock.acquire(timeout=None) is
+ # blocking.
+ raise errors.LockError("Failed to get lock %s (set %s)" %
+ (lname, self.name))
+
+ raise _AcquireTimeout()
+
try:
- lock.acquire(shared=shared) # raises LockError if the lock is deleted
# now the lock cannot be deleted, we have it!
self._add_owned(name=lname)
acquired.add(lname)
- except (errors.LockError):
- if self.__lock._is_owned():
- # We are acquiring all the set, it doesn't matter if this
- # particular element is not there anymore.
- continue
- else:
- name_fail = lname
- for lname in self._list_owned():
- self.__lockdict[lname].release()
- self._del_owned(name=lname)
- raise errors.LockError('non-existing lock in set (%s)' % name_fail)
+
except:
# We shouldn't have problems adding the lock to the owners list, but
# if we did we'll try to release this lock and re-raise exception.
- # Of course something is going to be really wrong, after this.
+ # Of course something is going to be really wrong after this.
if lock._is_owned():
lock.release()
raise
except:
- # If something went wrong and we had the set-lock let's release it...
- if self.__lock._is_owned():
- self.__lock.release()
+ # Release all owned locks
+ self._release_and_delete_owned()
raise
return acquired
+ def downgrade(self, names=None):
+ """Downgrade a set of resource locks from exclusive to shared mode.
+
+ The locks must have been acquired in exclusive mode.
+
+ """
+ assert self._is_owned(), ("downgrade on lockset %s while not owning any"
+ " lock" % self.name)
+
+ # Support passing in a single resource to downgrade rather than many
+ if isinstance(names, basestring):
+ names = [names]
+
+ owned = self._list_owned()
+
+ if names is None:
+ names = owned
+ else:
+ names = set(names)
+ assert owned.issuperset(names), \
+ ("downgrade() on unheld resources %s (set %s)" %
+ (names.difference(owned), self.name))
+
+ for lockname in names:
+ self.__lockdict[lockname].downgrade()
+
+ # Do we own the lockset in exclusive mode?
+ if self.__lock._is_owned(shared=0):
+ # Have all locks been downgraded?
+ if not compat.any(lock._is_owned(shared=0)
+ for lock in self.__lockdict.values()):
+ self.__lock.downgrade()
+ assert self.__lock._is_owned(shared=1)
+
+ return True
+
def release(self, names=None):
"""Release a set of resource locks, at the same level.
You must have acquired the locks, either in shared or in exclusive mode,
before releasing them.
+ @type names: list of strings, or None
@param names: the names of the locks which shall be released
(defaults to all the locks acquired at that level).
"""
- assert self._is_owned(), "release() on lock set while not owner"
+ assert self._is_owned(), ("release() on lock set %s while not owner" %
+ self.name)
# Support passing in a single resource to release rather than many
if isinstance(names, basestring):
else:
names = set(names)
assert self._list_owned().issuperset(names), (
- "release() on unheld resources %s" %
- names.difference(self._list_owned()))
+ "release() on unheld resources %s (set %s)" %
+ (names.difference(self._list_owned()), self.name))
# First of all let's release the "all elements" lock, if set.
# After this 'add' can work again
def add(self, names, acquired=0, shared=0):
"""Add a new set of elements to the set
+ @type names: list of strings
@param names: names of the new elements to add
+ @type acquired: integer (0/1) used as a boolean
@param acquired: pre-acquire the new resource?
+ @type shared: integer (0/1) used as a boolean
@param shared: is the pre-acquisition shared?
"""
# Check we don't already own locks at this level
assert not self._is_owned() or self.__lock._is_owned(shared=0), \
- "Cannot add locks if the set is only partially owned, or shared"
+ ("Cannot add locks if the set %s is only partially owned, or shared" %
+ self.name)
# Support passing in a single resource to add rather than many
if isinstance(names, basestring):
# This must be an explicit raise, not an assert, because assert is
# turned off when using optimization, and this can happen because of
# concurrency even if the user doesn't want it.
- raise errors.LockError("duplicate add() (%s)" % invalid_names)
+ raise errors.LockError("duplicate add(%s) on lockset %s" %
+ (invalid_names, self.name))
for lockname in names:
- lock = SharedLock()
+ lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
if acquired:
+ # No need for priority or timeout here as this lock has just been
+ # created
lock.acquire(shared=shared)
# now the lock cannot be deleted, we have it!
try:
You can either not hold anything in the lockset or already hold a superset
of the elements you want to delete, exclusively.
+ @type names: list of strings
@param names: names of the resource to remove.
- @return:: a list of locks which we removed; the list is always
+ @return: a list of locks which we removed; the list is always
equal to the names list if we were holding all the locks
exclusively
# to delete. The ownership must also be exclusive, but that will be checked
# by the lock itself.
assert not self._is_owned() or self._list_owned().issuperset(names), (
- "remove() on acquired lockset while not owning all elements")
+ "remove() on acquired lockset %s while not owning all elements" %
+ self.name)
removed = []
removed.append(lname)
except (KeyError, errors.LockError):
# This cannot happen if we were already holding it, verify:
- assert not self._is_owned(), "remove failed while holding lockset"
+ assert not self._is_owned(), ("remove failed while holding lockset %s"
+ % self.name)
else:
# If no LockError was raised we are the ones who deleted the lock.
# This means we can safely remove it from lockdict, as any further or
# the same time.
LEVEL_CLUSTER = 0
LEVEL_INSTANCE = 1
-LEVEL_NODE = 2
+LEVEL_NODEGROUP = 2
+LEVEL_NODE = 3
LEVELS = [LEVEL_CLUSTER,
LEVEL_INSTANCE,
+ LEVEL_NODEGROUP,
LEVEL_NODE]
# Lock levels which are modifiable
-LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
+LEVELS_MOD = [LEVEL_NODE, LEVEL_NODEGROUP, LEVEL_INSTANCE]
LEVEL_NAMES = {
LEVEL_CLUSTER: "cluster",
LEVEL_INSTANCE: "instance",
+ LEVEL_NODEGROUP: "nodegroup",
LEVEL_NODE: "node",
}
"""
_instance = None
- def __init__(self, nodes=None, instances=None):
+ def __init__(self, nodes, nodegroups, instances):
"""Constructs a new GanetiLockManager object.
There should be only a GanetiLockManager object at any time, so this
function raises an error if this is not the case.
@param nodes: list of node names
+ @param nodegroups: list of nodegroup uuids
@param instances: list of instance names
"""
self.__class__._instance = self
+ self._monitor = LockMonitor()
+
# The keyring contains all the locks, at their level and in the correct
# locking order.
self.__keyring = {
- LEVEL_CLUSTER: LockSet([BGL]),
- LEVEL_NODE: LockSet(nodes),
- LEVEL_INSTANCE: LockSet(instances),
- }
+ LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
+ LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
+ LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroups", monitor=self._monitor),
+ LEVEL_INSTANCE: LockSet(instances, "instances",
+ monitor=self._monitor),
+ }
+
+ def AddToLockMonitor(self, provider):
+ """Registers a new lock with the monitor.
+
+ See L{LockMonitor.RegisterLock}.
+
+ """
+ return self._monitor.RegisterLock(provider)
+
+ def QueryLocks(self, fields):
+ """Queries information from all locks.
+
+ See L{LockMonitor.QueryLocks}.
+
+ """
+ return self._monitor.QueryLocks(fields)
+
+ def OldStyleQueryLocks(self, fields):
+ """Queries information from all locks, returning old-style data.
+
+ See L{LockMonitor.OldStyleQueryLocks}.
+
+ """
+ return self._monitor.OldStyleQueryLocks(fields)
def _names(self, level):
"""List the lock names at the given level.
"""
return self.__keyring[level]._list_owned()
+ list_owned = _list_owned
+
def _upper_owned(self, level):
"""Check that we don't own any lock at a level greater than the given one.
"""
# This way of checking only works if LEVELS[i] = i, which we check for in
# the test cases.
- return utils.any((self._is_owned(l) for l in LEVELS[level + 1:]))
+ return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
- def _BGL_owned(self):
+ def _BGL_owned(self): # pylint: disable-msg=C0103
"""Check if the current thread owns the BGL.
Both an exclusive or a shared acquisition work.
"""
return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
- def _contains_BGL(self, level, names):
+ @staticmethod
+ def _contains_BGL(level, names): # pylint: disable-msg=C0103
"""Check if the level contains the BGL.
Check if acting on the given level and set of names will change
"""
return level == LEVEL_CLUSTER and (names is None or BGL in names)
- def acquire(self, level, names, timeout=None, shared=0):
+ def acquire(self, level, names, timeout=None, shared=0, priority=None):
"""Acquire a set of resource locks, at the same level.
- @param level: the level at which the locks shall be acquired;
- it must be a member of LEVELS.
+ @type level: member of locking.LEVELS
+ @param level: the level at which the locks shall be acquired
+ @type names: list of strings (or string)
@param names: the names of the locks which shall be acquired
(special lock names, or instance/node names)
+ @type shared: integer (0/1) used as a boolean
@param shared: whether to acquire in shared mode; by default
an exclusive lock will be acquired
@type timeout: float
@param timeout: Maximum time to acquire all locks
+ @type priority: integer
+ @param priority: Priority for acquiring lock
"""
assert level in LEVELS, "Invalid locking level %s" % level
" while owning some at a greater one")
# Acquire the locks in the set.
- return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
+ return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
+ priority=priority)
+
+ def downgrade(self, level, names=None):
+ """Downgrade a set of resource locks from exclusive to shared mode.
+
+ You must have acquired the locks in exclusive mode.
+
+ @type level: member of locking.LEVELS
+ @param level: the level at which the locks shall be downgraded
+ @type names: list of strings, or None
+ @param names: the names of the locks which shall be downgraded
+ (defaults to all the locks acquired at the level)
+
+ """
+ assert level in LEVELS, "Invalid locking level %s" % level
+
+ return self.__keyring[level].downgrade(names=names)
def release(self, level, names=None):
"""Release a set of resource locks, at the same level.
You must have acquired the locks, either in shared or in exclusive
mode, before releasing them.
- @param level: the level at which the locks shall be released;
- it must be a member of LEVELS
+ @type level: member of locking.LEVELS
+ @param level: the level at which the locks shall be released
+ @type names: list of strings, or None
@param names: the names of the locks which shall be released
(defaults to all the locks acquired at that level)
assert (not self._contains_BGL(level, names) or
not self._upper_owned(LEVEL_CLUSTER)), (
"Cannot release the Big Ganeti Lock while holding something"
- " at upper levels")
+ " at upper levels (%r)" %
+ (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
+ for i in self.__keyring.keys()]), ))
# Release will complain if we don't own the locks already
return self.__keyring[level].release(names)
def add(self, level, names, acquired=0, shared=0):
"""Add locks at the specified level.
- @param level: the level at which the locks shall be added;
- it must be a member of LEVELS_MOD.
+ @type level: member of locking.LEVELS_MOD
+ @param level: the level at which the locks shall be added
+ @type names: list of strings
@param names: names of the locks to acquire
+ @type acquired: integer (0/1) used as a boolean
@param acquired: whether to acquire the newly added locks
+ @type shared: integer (0/1) used as a boolean
@param shared: whether the acquisition will be shared
"""
You must either already own the locks you are trying to remove
exclusively or not own any lock at an upper level.
- @param level: the level at which the locks shall be removed;
- it must be a member of LEVELS_MOD
+ @type level: member of locking.LEVELS_MOD
+ @param level: the level at which the locks shall be removed
+ @type names: list of strings
@param names: the names of the locks which shall be removed
(special lock names, or instance/node names)
"Cannot remove locks at a level while not owning it or"
" owning some at a greater one")
return self.__keyring[level].remove(names)
+
+
+def _MonitorSortKey((item, idx, num)):
+ """Sorting key function.
+
+ Sort by name, registration order and then order of information. This provides
+ a stable sort order over different providers, even if they return the same
+ name.
+
+ """
+ (name, _, _, _) = item
+
+ return (utils.NiceSortKey(name), num, idx)
+
+
+class LockMonitor(object):
+ _LOCK_ATTR = "_lock"
+
+ def __init__(self):
+ """Initializes this class.
+
+ """
+ self._lock = SharedLock("LockMonitor")
+
+ # Counter for stable sorting
+ self._counter = itertools.count(0)
+
+ # Tracked locks. Weak references are used to avoid issues with circular
+ # references and deletion.
+ self._locks = weakref.WeakKeyDictionary()
+
+ @ssynchronized(_LOCK_ATTR)
+ def RegisterLock(self, provider):
+ """Registers a new lock.
+
+ @param provider: Object with a callable method named C{GetLockInfo}, taking
+ a single C{set} containing the requested information items
+ @note: It would be nicer to only receive the function generating the
+ requested information but, as it turns out, weak references to bound
+ methods (e.g. C{self.GetLockInfo}) are tricky; there are several
+ workarounds, but none of the ones I found works properly in combination
+ with a standard C{WeakKeyDictionary}
+
+ """
+ assert provider not in self._locks, "Duplicate registration"
+
+ # There used to be a check for duplicate names here. As it turned out, when
+ # a lock is re-created with the same name in a very short timeframe, the
+ # previous instance might not yet be removed from the weakref dictionary.
+ # By keeping track of the order of incoming registrations, a stable sort
+ # ordering can still be guaranteed.
+
+ self._locks[provider] = self._counter.next()
+
+ def _GetLockInfo(self, requested):
+ """Get information from all locks.
+
+ """
+ # Must hold lock while getting consistent list of tracked items
+ self._lock.acquire(shared=1)
+ try:
+ items = self._locks.items()
+ finally:
+ self._lock.release()
+
+ return [(info, idx, num)
+ for (provider, num) in items
+ for (idx, info) in enumerate(provider.GetLockInfo(requested))]
+
+ def _Query(self, fields):
+ """Queries information from all locks.
+
+ @type fields: list of strings
+ @param fields: List of fields to return
+
+ """
+ qobj = query.Query(query.LOCK_FIELDS, fields)
+
+ # Get all data with internal lock held and then sort by name and incoming
+ # order
+ lockinfo = sorted(self._GetLockInfo(qobj.RequestedData()),
+ key=_MonitorSortKey)
+
+ # Extract lock information and build query data
+ return (qobj, query.LockQueryData(map(compat.fst, lockinfo)))
+
+ def QueryLocks(self, fields):
+ """Queries information from all locks.
+
+ @type fields: list of strings
+ @param fields: List of fields to return
+
+ """
+ (qobj, ctx) = self._Query(fields)
+
+ # Prepare query response
+ return query.GetQueryResponse(qobj, ctx)
+
+ def OldStyleQueryLocks(self, fields):
+ """Queries information from all locks, returning old-style data.
+
+ @type fields: list of strings
+ @param fields: List of fields to return
+
+ """
+ (qobj, ctx) = self._Query(fields)
+
+ return qobj.OldStyleQuery(ctx)