#
#
-# Copyright (C) 2006, 2007 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
import threading
import time
import errno
+import weakref
+import logging
from ganeti import errors
from ganeti import utils
from ganeti import compat
+_EXCLUSIVE_TEXT = "exclusive"
+_SHARED_TEXT = "shared"
+
+
def ssynchronized(mylock, shared=0):
"""Shared Synchronization decorator.
"""
__slots__ = [
- "_nwaiters",
+ "_waiters",
"_single_condition",
]
"""
_BaseCondition.__init__(self, lock)
- self._nwaiters = 0
+ self._waiters = set()
self._single_condition = self._single_condition_class(self._lock)
def wait(self, timeout=None):
# Keep local reference to the pipe. It could be replaced by another thread
# notifying while we're waiting.
- my_condition = self._single_condition
+ cond = self._single_condition
- assert self._nwaiters >= 0
- self._nwaiters += 1
+ self._waiters.add(threading.currentThread())
try:
- my_condition.wait(timeout)
+ cond.wait(timeout)
finally:
- assert self._nwaiters > 0
- self._nwaiters -= 1
+ self._check_owned()
+ self._waiters.remove(threading.currentThread())
def notifyAll(self): # pylint: disable-msg=C0103
"""Notify all currently waiting threads.
self._single_condition.notifyAll()
self._single_condition = self._single_condition_class(self._lock)
+ def get_waiting(self):
+ """Returns a list of all waiting threads.
+
+ """
+ self._check_owned()
+
+ return self._waiters
+
def has_waiting(self):
"""Returns whether there are active waiters.
"""
self._check_owned()
- return bool(self._nwaiters)
+ return bool(self._waiters)
class SharedLock(object):
"""Implements a shared lock.
- Multiple threads can acquire the lock in a shared way, calling
- acquire_shared(). In order to acquire the lock in an exclusive way threads
- can call acquire_exclusive().
+ Multiple threads can acquire the lock in a shared way by calling
+ C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
+ threads can call C{acquire(shared=0)}.
The lock prevents starvation but does not guarantee that threads will acquire
the shared lock in the order they queued for it, just that they will
eventually do so.
+ @type name: string
+ @ivar name: the name of the lock
+
"""
__slots__ = [
+ "__weakref__",
"__active_shr_c",
"__inactive_shr_c",
"__deleted",
"__lock",
"__pending",
"__shr",
+ "name",
]
__condition_class = PipeCondition
- def __init__(self):
+ def __init__(self, name, monitor=None):
"""Construct a new SharedLock.
+ @param name: the name of the lock
+ @type monitor: L{LockMonitor}
+ @param monitor: Lock monitor with which to register
+
"""
object.__init__(self)
+ self.name = name
+
# Internal lock
self.__lock = threading.Lock()
# is this lock in the deleted state?
self.__deleted = False
+ # Register with lock monitor
+ if monitor:
+ monitor.RegisterLock(self)
+
+ def GetInfo(self, fields):
+ """Retrieves information for querying locks.
+
+ @type fields: list of strings
+ @param fields: List of fields to return
+
+ """
+ self.__lock.acquire()
+ try:
+ info = []
+
+ # Note: to avoid unintentional race conditions, no references to
+ # modifiable objects should be returned unless they were created in this
+ # function.
+ for fname in fields:
+ if fname == "name":
+ info.append(self.name)
+ elif fname == "mode":
+ if self.__deleted:
+ info.append("deleted")
+ assert not (self.__exc or self.__shr)
+ elif self.__exc:
+ info.append(_EXCLUSIVE_TEXT)
+ elif self.__shr:
+ info.append(_SHARED_TEXT)
+ else:
+ info.append(None)
+ elif fname == "owner":
+ if self.__exc:
+ owner = [self.__exc]
+ else:
+ owner = self.__shr
+
+ if owner:
+ assert not self.__deleted
+ info.append([i.getName() for i in owner])
+ else:
+ info.append(None)
+ elif fname == "pending":
+ data = []
+
+ for cond in self.__pending:
+ if cond in (self.__active_shr_c, self.__inactive_shr_c):
+ mode = _SHARED_TEXT
+ else:
+ mode = _EXCLUSIVE_TEXT
+
+ # This function should be fast as it runs with the lock held. Hence
+ # not using utils.NiceSort.
+ data.append((mode, sorted([i.getName()
+ for i in cond.get_waiting()])))
+
+ info.append(data)
+ else:
+ raise errors.OpExecError("Invalid query field '%s'" % fname)
+
+ return info
+ finally:
+ self.__lock.release()
+
def __check_deleted(self):
"""Raises an exception if the lock has been deleted.
"""
if self.__deleted:
- raise errors.LockError("Deleted lock")
+ raise errors.LockError("Deleted lock %s" % self.name)
def __is_sharer(self):
"""Is the current thread sharing the lock at this time?
self.__check_deleted()
# We cannot acquire the lock if we already have it
- assert not self.__is_owned(), "double acquire() on a non-recursive lock"
+ assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
+ " %s" % self.name)
# Check whether someone else holds the lock or there are pending acquires.
if not self.__pending and self.__can_acquire(shared):
self.__deleted = True
self.__exc = None
+ assert not (self.__exc or self.__shr), "Found owner during deletion"
+
# Notify all acquires. They'll throw an error.
while self.__pending:
self.__pending.pop().notifyAll()
All the locks needed in the same set must be acquired together, though.
+ @type name: string
+ @ivar name: the name of the lockset
+
"""
- def __init__(self, members=None):
+ def __init__(self, members, name, monitor=None):
"""Constructs a new LockSet.
@type members: list of strings
@param members: initial members of the set
+ @type monitor: L{LockMonitor}
+ @param monitor: Lock monitor with which to register member locks
"""
+ assert members is not None, "members parameter is not a list"
+ self.name = name
+
+ # Lock monitor
+ self.__monitor = monitor
+
# Used internally to guarantee coherency.
- self.__lock = SharedLock()
+ self.__lock = SharedLock(name)
# The lockdict indexes the relationship name -> lock
# The order-of-locking is implied by the alphabetical order of names
self.__lockdict = {}
- if members is not None:
- for name in members:
- self.__lockdict[name] = SharedLock()
+ for mname in members:
+ self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
+ monitor=monitor)
# The owner dict contains the set of locks each thread owns. For
# performance each thread can access its own key without a global lock on
# will be trouble.
self.__owners = {}
+ def _GetLockName(self, mname):
+ """Returns the name for a member lock.
+
+ """
+ return "%s/%s" % (self.name, mname)
+
def _is_owned(self):
"""Is the current thread a current level owner?"""
return threading.currentThread() in self.__owners
assert timeout is None or timeout >= 0.0
# Check we don't already own locks at this level
- assert not self._is_owned(), "Cannot acquire locks in the same set twice"
+ assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
+ " (lockset %s)" % self.name)
# We need to keep track of how long we spent waiting for a lock. The
# timeout passed to this function is over all lock acquires.
# element is not there anymore.
continue
- raise errors.LockError("Non-existing lock in set (%s)" % lname)
+ raise errors.LockError("Non-existing lock %s in set %s" %
+ (lname, self.name))
acquire_list.append((lname, lock))
# particular element is not there anymore.
continue
- raise errors.LockError("Non-existing lock in set (%s)" % lname)
+ raise errors.LockError("Non-existing lock %s in set %s" %
+ (lname, self.name))
if not acq_success:
# Couldn't get lock or timeout occurred
if timeout is None:
# This shouldn't happen as SharedLock.acquire(timeout=None) is
# blocking.
- raise errors.LockError("Failed to get lock %s" % lname)
+ raise errors.LockError("Failed to get lock %s (set %s)" %
+ (lname, self.name))
raise _AcquireTimeout()
(defaults to all the locks acquired at that level).
"""
- assert self._is_owned(), "release() on lock set while not owner"
+ assert self._is_owned(), ("release() on lock set %s while not owner" %
+ self.name)
# Support passing in a single resource to release rather than many
if isinstance(names, basestring):
else:
names = set(names)
assert self._list_owned().issuperset(names), (
- "release() on unheld resources %s" %
- names.difference(self._list_owned()))
+ "release() on unheld resources %s (set %s)" %
+ (names.difference(self._list_owned()), self.name))
# First of all let's release the "all elements" lock, if set.
# After this 'add' can work again
"""
# Check we don't already own locks at this level
assert not self._is_owned() or self.__lock._is_owned(shared=0), \
- "Cannot add locks if the set is only partially owned, or shared"
+ ("Cannot add locks if the set %s is only partially owned, or shared" %
+ self.name)
# Support passing in a single resource to add rather than many
if isinstance(names, basestring):
# This must be an explicit raise, not an assert, because assert is
# turned off when using optimization, and this can happen because of
# concurrency even if the user doesn't want it.
- raise errors.LockError("duplicate add() (%s)" % invalid_names)
+ raise errors.LockError("duplicate add(%s) on lockset %s" %
+ (invalid_names, self.name))
for lockname in names:
- lock = SharedLock()
+ lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
if acquired:
lock.acquire(shared=shared)
# to delete. The ownership must also be exclusive, but that will be checked
# by the lock itself.
assert not self._is_owned() or self._list_owned().issuperset(names), (
- "remove() on acquired lockset while not owning all elements")
+ "remove() on acquired lockset %s while not owning all elements" %
+ self.name)
removed = []
removed.append(lname)
except (KeyError, errors.LockError):
# This cannot happen if we were already holding it, verify:
- assert not self._is_owned(), "remove failed while holding lockset"
+ assert not self._is_owned(), ("remove failed while holding lockset %s"
+ % self.name)
else:
# If no LockError was raised we are the ones who deleted the lock.
# This means we can safely remove it from lockdict, as any further or
self.__class__._instance = self
+ self._monitor = LockMonitor()
+
# The keyring contains all the locks, at their level and in the correct
# locking order.
self.__keyring = {
- LEVEL_CLUSTER: LockSet([BGL]),
- LEVEL_NODE: LockSet(nodes),
- LEVEL_INSTANCE: LockSet(instances),
- }
+ LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
+ LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
+ LEVEL_INSTANCE: LockSet(instances, "instances",
+ monitor=self._monitor),
+ }
+
+ def QueryLocks(self, fields, sync):
+ """Queries information from all locks.
+
+ See L{LockMonitor.QueryLocks}.
+
+ """
+ return self._monitor.QueryLocks(fields, sync)
def _names(self, level):
"""List the lock names at the given level.
"Cannot remove locks at a level while not owning it or"
" owning some at a greater one")
return self.__keyring[level].remove(names)
+
+
+class LockMonitor(object):
+ _LOCK_ATTR = "_lock"
+
+ def __init__(self):
+ """Initializes this class.
+
+ """
+ self._lock = SharedLock("LockMonitor")
+
+ # Tracked locks. Weak references are used to avoid issues with circular
+ # references and deletion.
+ self._locks = weakref.WeakKeyDictionary()
+
+ @ssynchronized(_LOCK_ATTR)
+ def RegisterLock(self, lock):
+ """Registers a new lock.
+
+ """
+ logging.debug("Registering lock %s", lock.name)
+ assert lock not in self._locks, "Duplicate lock registration"
+ assert not compat.any(lock.name == i.name for i in self._locks.keys()), \
+ "Found duplicate lock name"
+ self._locks[lock] = None
+
+ @ssynchronized(_LOCK_ATTR)
+ def _GetLockInfo(self, fields):
+ """Get information from all locks while the monitor lock is held.
+
+ """
+ result = {}
+
+ for lock in self._locks.keys():
+ assert lock.name not in result, "Found duplicate lock name"
+ result[lock.name] = lock.GetInfo(fields)
+
+ return result
+
+ def QueryLocks(self, fields, sync):
+ """Queries information from all locks.
+
+ @type fields: list of strings
+ @param fields: List of fields to return
+ @type sync: boolean
+ @param sync: Whether to operate in synchronous mode
+
+ """
+ if sync:
+ raise NotImplementedError("Synchronous queries are not implemented")
+
+ # Get all data without sorting
+ result = self._GetLockInfo(fields)
+
+ # Sort by name
+ return [result[name] for name in utils.NiceSort(result.keys())]