X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/dbb11e8b7e14ec3028966a2479d42447bec153b2..845c79d8915138ae2d72ecaa3730ee410264282d:/lib/locking.py?ds=sidebyside diff --git a/lib/locking.py b/lib/locking.py index 0b419c7..f901490 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007 Google Inc. +# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -30,12 +30,18 @@ import select import threading import time import errno +import weakref +import logging from ganeti import errors from ganeti import utils from ganeti import compat +_EXCLUSIVE_TEXT = "exclusive" +_SHARED_TEXT = "shared" + + def ssynchronized(mylock, shared=0): """Shared Synchronization decorator. @@ -341,7 +347,7 @@ class PipeCondition(_BaseCondition): """ __slots__ = [ - "_nwaiters", + "_waiters", "_single_condition", ] @@ -352,7 +358,7 @@ class PipeCondition(_BaseCondition): """ _BaseCondition.__init__(self, lock) - self._nwaiters = 0 + self._waiters = set() self._single_condition = self._single_condition_class(self._lock) def wait(self, timeout=None): @@ -366,15 +372,14 @@ class PipeCondition(_BaseCondition): # Keep local reference to the pipe. It could be replaced by another thread # notifying while we're waiting. - my_condition = self._single_condition + cond = self._single_condition - assert self._nwaiters >= 0 - self._nwaiters += 1 + self._waiters.add(threading.currentThread()) try: - my_condition.wait(timeout) + cond.wait(timeout) finally: - assert self._nwaiters > 0 - self._nwaiters -= 1 + self._check_owned() + self._waiters.remove(threading.currentThread()) def notifyAll(self): # pylint: disable-msg=C0103 """Notify all currently waiting threads. @@ -384,28 +389,40 @@ class PipeCondition(_BaseCondition): self._single_condition.notifyAll() self._single_condition = self._single_condition_class(self._lock) + def get_waiting(self): + """Returns a list of all waiting threads. + + """ + self._check_owned() + + return self._waiters + def has_waiting(self): """Returns whether there are active waiters. """ self._check_owned() - return bool(self._nwaiters) + return bool(self._waiters) class SharedLock(object): """Implements a shared lock. - Multiple threads can acquire the lock in a shared way, calling - acquire_shared(). In order to acquire the lock in an exclusive way threads - can call acquire_exclusive(). + Multiple threads can acquire the lock in a shared way by calling + C{acquire(shared=1)}. In order to acquire the lock in an exclusive way + threads can call C{acquire(shared=0)}. The lock prevents starvation but does not guarantee that threads will acquire the shared lock in the order they queued for it, just that they will eventually do so. + @type name: string + @ivar name: the name of the lock + """ __slots__ = [ + "__weakref__", "__active_shr_c", "__inactive_shr_c", "__deleted", @@ -413,16 +430,23 @@ class SharedLock(object): "__lock", "__pending", "__shr", + "name", ] __condition_class = PipeCondition - def __init__(self): + def __init__(self, name, monitor=None): """Construct a new SharedLock. + @param name: the name of the lock + @type monitor: L{LockMonitor} + @param monitor: Lock monitor with which to register + """ object.__init__(self) + self.name = name + # Internal lock self.__lock = threading.Lock() @@ -440,12 +464,76 @@ class SharedLock(object): # is this lock in the deleted state? self.__deleted = False + # Register with lock monitor + if monitor: + monitor.RegisterLock(self) + + def GetInfo(self, fields): + """Retrieves information for querying locks. + + @type fields: list of strings + @param fields: List of fields to return + + """ + self.__lock.acquire() + try: + info = [] + + # Note: to avoid unintentional race conditions, no references to + # modifiable objects should be returned unless they were created in this + # function. + for fname in fields: + if fname == "name": + info.append(self.name) + elif fname == "mode": + if self.__deleted: + info.append("deleted") + assert not (self.__exc or self.__shr) + elif self.__exc: + info.append(_EXCLUSIVE_TEXT) + elif self.__shr: + info.append(_SHARED_TEXT) + else: + info.append(None) + elif fname == "owner": + if self.__exc: + owner = [self.__exc] + else: + owner = self.__shr + + if owner: + assert not self.__deleted + info.append([i.getName() for i in owner]) + else: + info.append(None) + elif fname == "pending": + data = [] + + for cond in self.__pending: + if cond in (self.__active_shr_c, self.__inactive_shr_c): + mode = _SHARED_TEXT + else: + mode = _EXCLUSIVE_TEXT + + # This function should be fast as it runs with the lock held. Hence + # not using utils.NiceSort. + data.append((mode, sorted([i.getName() + for i in cond.get_waiting()]))) + + info.append(data) + else: + raise errors.OpExecError("Invalid query field '%s'" % fname) + + return info + finally: + self.__lock.release() + def __check_deleted(self): """Raises an exception if the lock has been deleted. """ if self.__deleted: - raise errors.LockError("Deleted lock") + raise errors.LockError("Deleted lock %s" % self.name) def __is_sharer(self): """Is the current thread sharing the lock at this time? @@ -537,7 +625,8 @@ class SharedLock(object): self.__check_deleted() # We cannot acquire the lock if we already have it - assert not self.__is_owned(), "double acquire() on a non-recursive lock" + assert not self.__is_owned(), ("double acquire() on a non-recursive lock" + " %s" % self.name) # Check whether someone else holds the lock or there are pending acquires. if not self.__pending and self.__can_acquire(shared): @@ -662,6 +751,8 @@ class SharedLock(object): self.__deleted = True self.__exc = None + assert not (self.__exc or self.__shr), "Found owner during deletion" + # Notify all acquires. They'll throw an error. while self.__pending: self.__pending.pop().notifyAll() @@ -700,24 +791,35 @@ class LockSet: All the locks needed in the same set must be acquired together, though. + @type name: string + @ivar name: the name of the lockset + """ - def __init__(self, members=None): + def __init__(self, members, name, monitor=None): """Constructs a new LockSet. @type members: list of strings @param members: initial members of the set + @type monitor: L{LockMonitor} + @param monitor: Lock monitor with which to register member locks """ + assert members is not None, "members parameter is not a list" + self.name = name + + # Lock monitor + self.__monitor = monitor + # Used internally to guarantee coherency. - self.__lock = SharedLock() + self.__lock = SharedLock(name) # The lockdict indexes the relationship name -> lock # The order-of-locking is implied by the alphabetical order of names self.__lockdict = {} - if members is not None: - for name in members: - self.__lockdict[name] = SharedLock() + for mname in members: + self.__lockdict[mname] = SharedLock(self._GetLockName(mname), + monitor=monitor) # The owner dict contains the set of locks each thread owns. For # performance each thread can access its own key without a global lock on @@ -728,6 +830,12 @@ class LockSet: # will be trouble. self.__owners = {} + def _GetLockName(self, mname): + """Returns the name for a member lock. + + """ + return "%s/%s" % (self.name, mname) + def _is_owned(self): """Is the current thread a current level owner?""" return threading.currentThread() in self.__owners @@ -824,7 +932,8 @@ class LockSet: assert timeout is None or timeout >= 0.0 # Check we don't already own locks at this level - assert not self._is_owned(), "Cannot acquire locks in the same set twice" + assert not self._is_owned(), ("Cannot acquire locks in the same set twice" + " (lockset %s)" % self.name) # We need to keep track of how long we spent waiting for a lock. The # timeout passed to this function is over all lock acquires. @@ -894,7 +1003,8 @@ class LockSet: # element is not there anymore. continue - raise errors.LockError("Non-existing lock in set (%s)" % lname) + raise errors.LockError("Non-existing lock %s in set %s" % + (lname, self.name)) acquire_list.append((lname, lock)) @@ -925,14 +1035,16 @@ class LockSet: # particular element is not there anymore. continue - raise errors.LockError("Non-existing lock in set (%s)" % lname) + raise errors.LockError("Non-existing lock %s in set %s" % + (lname, self.name)) if not acq_success: # Couldn't get lock or timeout occurred if timeout is None: # This shouldn't happen as SharedLock.acquire(timeout=None) is # blocking. - raise errors.LockError("Failed to get lock %s" % lname) + raise errors.LockError("Failed to get lock %s (set %s)" % + (lname, self.name)) raise _AcquireTimeout() @@ -967,7 +1079,8 @@ class LockSet: (defaults to all the locks acquired at that level). """ - assert self._is_owned(), "release() on lock set while not owner" + assert self._is_owned(), ("release() on lock set %s while not owner" % + self.name) # Support passing in a single resource to release rather than many if isinstance(names, basestring): @@ -978,8 +1091,8 @@ class LockSet: else: names = set(names) assert self._list_owned().issuperset(names), ( - "release() on unheld resources %s" % - names.difference(self._list_owned())) + "release() on unheld resources %s (set %s)" % + (names.difference(self._list_owned()), self.name)) # First of all let's release the "all elements" lock, if set. # After this 'add' can work again @@ -1006,7 +1119,8 @@ class LockSet: """ # Check we don't already own locks at this level assert not self._is_owned() or self.__lock._is_owned(shared=0), \ - "Cannot add locks if the set is only partially owned, or shared" + ("Cannot add locks if the set %s is only partially owned, or shared" % + self.name) # Support passing in a single resource to add rather than many if isinstance(names, basestring): @@ -1025,10 +1139,11 @@ class LockSet: # This must be an explicit raise, not an assert, because assert is # turned off when using optimization, and this can happen because of # concurrency even if the user doesn't want it. - raise errors.LockError("duplicate add() (%s)" % invalid_names) + raise errors.LockError("duplicate add(%s) on lockset %s" % + (invalid_names, self.name)) for lockname in names: - lock = SharedLock() + lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor) if acquired: lock.acquire(shared=shared) @@ -1076,7 +1191,8 @@ class LockSet: # to delete. The ownership must also be exclusive, but that will be checked # by the lock itself. assert not self._is_owned() or self._list_owned().issuperset(names), ( - "remove() on acquired lockset while not owning all elements") + "remove() on acquired lockset %s while not owning all elements" % + self.name) removed = [] @@ -1091,7 +1207,8 @@ class LockSet: removed.append(lname) except (KeyError, errors.LockError): # This cannot happen if we were already holding it, verify: - assert not self._is_owned(), "remove failed while holding lockset" + assert not self._is_owned(), ("remove failed while holding lockset %s" + % self.name) else: # If no LockError was raised we are the ones who deleted the lock. # This means we can safely remove it from lockdict, as any further or @@ -1164,13 +1281,24 @@ class GanetiLockManager: self.__class__._instance = self + self._monitor = LockMonitor() + # The keyring contains all the locks, at their level and in the correct # locking order. self.__keyring = { - LEVEL_CLUSTER: LockSet([BGL]), - LEVEL_NODE: LockSet(nodes), - LEVEL_INSTANCE: LockSet(instances), - } + LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor), + LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor), + LEVEL_INSTANCE: LockSet(instances, "instances", + monitor=self._monitor), + } + + def QueryLocks(self, fields, sync): + """Queries information from all locks. + + See L{LockMonitor.QueryLocks}. + + """ + return self._monitor.QueryLocks(fields, sync) def _names(self, level): """List the lock names at the given level. @@ -1323,3 +1451,59 @@ class GanetiLockManager: "Cannot remove locks at a level while not owning it or" " owning some at a greater one") return self.__keyring[level].remove(names) + + +class LockMonitor(object): + _LOCK_ATTR = "_lock" + + def __init__(self): + """Initializes this class. + + """ + self._lock = SharedLock("LockMonitor") + + # Tracked locks. Weak references are used to avoid issues with circular + # references and deletion. + self._locks = weakref.WeakKeyDictionary() + + @ssynchronized(_LOCK_ATTR) + def RegisterLock(self, lock): + """Registers a new lock. + + """ + logging.debug("Registering lock %s", lock.name) + assert lock not in self._locks, "Duplicate lock registration" + assert not compat.any(lock.name == i.name for i in self._locks.keys()), \ + "Found duplicate lock name" + self._locks[lock] = None + + @ssynchronized(_LOCK_ATTR) + def _GetLockInfo(self, fields): + """Get information from all locks while the monitor lock is held. + + """ + result = {} + + for lock in self._locks.keys(): + assert lock.name not in result, "Found duplicate lock name" + result[lock.name] = lock.GetInfo(fields) + + return result + + def QueryLocks(self, fields, sync): + """Queries information from all locks. + + @type fields: list of strings + @param fields: List of fields to return + @type sync: boolean + @param sync: Whether to operate in synchronous mode + + """ + if sync: + raise NotImplementedError("Synchronous queries are not implemented") + + # Get all data without sorting + result = self._GetLockInfo(fields) + + # Sort by name + return [result[name] for name in utils.NiceSort(result.keys())]