#
#
-# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
"""Module implementing the Ganeti locking code."""
-# pylint: disable-msg=W0212
+# pylint: disable=W0212
# W0212 since e.g. LockSet methods use (a lot) the internals of
# SharedLock
import weakref
import logging
import heapq
-import operator
import itertools
from ganeti import errors
except AttributeError:
self._acquire_restore = self._base_acquire_restore
try:
- self._is_owned = lock._is_owned
+ self._is_owned = lock.is_owned
except AttributeError:
self._is_owned = self._base_is_owned
self._write_fd = None
self._poller = None
- def wait(self, timeout=None):
+ def wait(self, timeout):
"""Wait for a notification.
@type timeout: float or None
if self._nwaiters == 0:
self._Cleanup()
- def notifyAll(self): # pylint: disable-msg=C0103
+ def notifyAll(self): # pylint: disable=C0103
"""Close the writing side of the pipe to notify all waiters.
"""
self._waiters = set()
self._single_condition = self._single_condition_class(self._lock)
- def wait(self, timeout=None):
+ def wait(self, timeout):
"""Wait for a notification.
@type timeout: float or None
self._check_owned()
self._waiters.remove(threading.currentThread())
- def notifyAll(self): # pylint: disable-msg=C0103
+ def notifyAll(self): # pylint: disable=C0103
"""Notify all currently waiting threads.
"""
return bool(self._waiters)
+ def __repr__(self):
+ return ("<%s.%s waiters=%s at %#x>" %
+ (self.__class__.__module__, self.__class__.__name__,
+ self._waiters, id(self)))
+
class _PipeConditionWithMode(PipeCondition):
__slots__ = [
# Register with lock monitor
if monitor:
+ logging.debug("Adding lock %s to monitor", name)
monitor.RegisterLock(self)
- def GetInfo(self, requested):
+ def __repr__(self):
+ return ("<%s.%s name=%s at %#x>" %
+ (self.__class__.__module__, self.__class__.__name__,
+ self.name, id(self)))
+
+ def GetLockInfo(self, requested):
"""Retrieves information for querying locks.
@type requested: set
else:
pending = None
- return (self.name, mode, owner_names, pending)
+ return [(self.name, mode, owner_names, pending)]
finally:
self.__lock.release()
else:
return self.__is_exclusive()
- def _is_owned(self, shared=-1):
+ def is_owned(self, shared=-1):
"""Is the current thread somehow owning the lock at this time?
@param shared:
finally:
self.__lock.release()
+ #: Necessary to remain compatible with threading.Condition, which tries to
+ #: retrieve a locks' "_is_owned" attribute
+ _is_owned = is_owned
+
def _count_pending(self):
"""Returns the number of pending acquires.
self.__lock.acquire()
try:
# Order is important: __find_first_pending_queue modifies __pending
- return not (self.__find_first_pending_queue() or
+ (_, prioqueue) = self.__find_first_pending_queue()
+
+ return not (prioqueue or
self.__pending or
self.__pending_by_prio or
self.__pending_shared)
while self.__pending:
(priority, prioqueue) = self.__pending[0]
- if not prioqueue:
- heapq.heappop(self.__pending)
- del self.__pending_by_prio[priority]
- assert priority not in self.__pending_shared
- continue
-
if prioqueue:
- return prioqueue
+ return (priority, prioqueue)
- return None
+ # Remove empty queue
+ heapq.heappop(self.__pending)
+ del self.__pending_by_prio[priority]
+ assert priority not in self.__pending_shared
+
+ return (None, None)
def __is_on_top(self, cond):
"""Checks whether the passed condition is on top of the queue.
The caller must make sure the queue isn't empty.
"""
- return cond == self.__find_first_pending_queue()[0]
+ (_, prioqueue) = self.__find_first_pending_queue()
+
+ return cond == prioqueue[0]
def __acquire_unlocked(self, shared, timeout, priority):
"""Acquire a shared lock.
if not wait_condition.has_waiting():
prioqueue.remove(wait_condition)
if wait_condition.shared:
- del self.__pending_shared[priority]
+ # Remove from list of shared acquires if it wasn't while releasing
+ # (e.g. on lock deletion)
+ self.__pending_shared.pop(priority, None)
return False
finally:
self.__lock.release()
+ def downgrade(self):
+ """Changes the lock mode from exclusive to shared.
+
+ Pending acquires in shared mode on the same priority will go ahead.
+
+ """
+ self.__lock.acquire()
+ try:
+ assert self.__is_owned(), "Lock must be owned"
+
+ if self.__is_exclusive():
+ # Do nothing if the lock is already acquired in shared mode
+ self.__exc = None
+ self.__do_acquire(1)
+
+ # Important: pending shared acquires should only jump ahead if there
+ # was a transition from exclusive to shared, otherwise an owner of a
+ # shared lock can keep calling this function to push incoming shared
+ # acquires
+ (priority, prioqueue) = self.__find_first_pending_queue()
+ if prioqueue:
+ # Is there a pending shared acquire on this priority?
+ cond = self.__pending_shared.pop(priority, None)
+ if cond:
+ assert cond.shared
+ assert cond in prioqueue
+
+ # Ensure shared acquire is on top of queue
+ if len(prioqueue) > 1:
+ prioqueue.remove(cond)
+ prioqueue.insert(0, cond)
+
+ # Notify
+ cond.notifyAll()
+
+ assert not self.__is_exclusive()
+ assert self.__is_sharer()
+
+ return True
+ finally:
+ self.__lock.release()
+
def release(self):
"""Release a Shared Lock.
self.__shr.remove(threading.currentThread())
# Notify topmost condition in queue
- prioqueue = self.__find_first_pending_queue()
+ (priority, prioqueue) = self.__find_first_pending_queue()
if prioqueue:
- prioqueue[0].notifyAll()
+ cond = prioqueue[0]
+ cond.notifyAll()
+ if cond.shared:
+ # Prevent further shared acquires from sneaking in while waiters are
+ # notified
+ self.__pending_shared.pop(priority, None)
finally:
self.__lock.release()
# Lock monitor
self.__monitor = monitor
- # Used internally to guarantee coherency.
- self.__lock = SharedLock(name)
+ # Used internally to guarantee coherency
+ self.__lock = SharedLock(self._GetLockName("[lockset]"), monitor=monitor)
# The lockdict indexes the relationship name -> lock
# The order-of-locking is implied by the alphabetical order of names
"""
return "%s/%s" % (self.name, mname)
- def _is_owned(self):
- """Is the current thread a current level owner?"""
+ def _get_lock(self):
+ """Returns the lockset-internal lock.
+
+ """
+ return self.__lock
+
+ def _get_lockdict(self):
+ """Returns the lockset-internal lock dictionary.
+
+ Accessing this structure is only safe in single-thread usage or when the
+ lockset-internal lock is held.
+
+ """
+ return self.__lockdict
+
+ def is_owned(self):
+ """Is the current thread a current level owner?
+
+ @note: Use L{check_owned} to check if a specific lock is held
+
+ """
return threading.currentThread() in self.__owners
+ def check_owned(self, names, shared=-1):
+ """Check if locks are owned in a specific mode.
+
+ @type names: sequence or string
+ @param names: Lock names (or a single lock name)
+ @param shared: See L{SharedLock.is_owned}
+ @rtype: bool
+ @note: Use L{is_owned} to check if the current thread holds I{any} lock and
+ L{list_owned} to get the names of all owned locks
+
+ """
+ if isinstance(names, basestring):
+ names = [names]
+
+ # Avoid check if no locks are owned anyway
+ if names and self.is_owned():
+ candidates = []
+
+ # Gather references to all locks (in case they're deleted in the meantime)
+ for lname in names:
+ try:
+ lock = self.__lockdict[lname]
+ except KeyError:
+ raise errors.LockError("Non-existing lock '%s' in set '%s' (it may"
+ " have been removed)" % (lname, self.name))
+ else:
+ candidates.append(lock)
+
+ return compat.all(lock.is_owned(shared=shared) for lock in candidates)
+ else:
+ return False
+
def _add_owned(self, name=None):
"""Note the current thread owns the given lock"""
if name is None:
- if not self._is_owned():
+ if not self.is_owned():
self.__owners[threading.currentThread()] = set()
else:
- if self._is_owned():
+ if self.is_owned():
self.__owners[threading.currentThread()].add(name)
else:
self.__owners[threading.currentThread()] = set([name])
def _del_owned(self, name=None):
"""Note the current thread owns the given lock"""
- assert not (name is None and self.__lock._is_owned()), \
+ assert not (name is None and self.__lock.is_owned()), \
"Cannot hold internal lock when deleting owner status"
if name is not None:
self.__owners[threading.currentThread()].remove(name)
# Only remove the key if we don't hold the set-lock as well
- if (not self.__lock._is_owned() and
+ if (not self.__lock.is_owned() and
not self.__owners[threading.currentThread()]):
del self.__owners[threading.currentThread()]
- def _list_owned(self):
+ def list_owned(self):
"""Get the set of resource names owned by the current thread"""
- if self._is_owned():
+ if self.is_owned():
return self.__owners[threading.currentThread()].copy()
else:
return set()
def _release_and_delete_owned(self):
"""Release and delete all resources owned by the current thread"""
- for lname in self._list_owned():
+ for lname in self.list_owned():
lock = self.__lockdict[lname]
- if lock._is_owned():
+ if lock.is_owned():
lock.release()
self._del_owned(name=lname)
# If we don't already own the set-level lock acquired
# we'll get it and note we need to release it later.
release_lock = False
- if not self.__lock._is_owned():
+ if not self.__lock.is_owned():
release_lock = True
self.__lock.acquire(shared=1)
try:
assert timeout is None or timeout >= 0.0
# Check we don't already own locks at this level
- assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
- " (lockset %s)" % self.name)
+ assert not self.is_owned(), ("Cannot acquire locks in the same set twice"
+ " (lockset %s)" % self.name)
if priority is None:
priority = _DEFAULT_PRIORITY
# We shouldn't have problems adding the lock to the owners list, but
# if we did we'll try to release this lock and re-raise exception.
# Of course something is going to be really wrong after this.
- if lock._is_owned():
+ if lock.is_owned():
lock.release()
raise
return acquired
+ def downgrade(self, names=None):
+ """Downgrade a set of resource locks from exclusive to shared mode.
+
+ The locks must have been acquired in exclusive mode.
+
+ """
+ assert self.is_owned(), ("downgrade on lockset %s while not owning any"
+ " lock" % self.name)
+
+ # Support passing in a single resource to downgrade rather than many
+ if isinstance(names, basestring):
+ names = [names]
+
+ owned = self.list_owned()
+
+ if names is None:
+ names = owned
+ else:
+ names = set(names)
+ assert owned.issuperset(names), \
+ ("downgrade() on unheld resources %s (set %s)" %
+ (names.difference(owned), self.name))
+
+ for lockname in names:
+ self.__lockdict[lockname].downgrade()
+
+ # Do we own the lockset in exclusive mode?
+ if self.__lock.is_owned(shared=0):
+ # Have all locks been downgraded?
+ if not compat.any(lock.is_owned(shared=0)
+ for lock in self.__lockdict.values()):
+ self.__lock.downgrade()
+ assert self.__lock.is_owned(shared=1)
+
+ return True
+
def release(self, names=None):
"""Release a set of resource locks, at the same level.
(defaults to all the locks acquired at that level).
"""
- assert self._is_owned(), ("release() on lock set %s while not owner" %
- self.name)
+ assert self.is_owned(), ("release() on lock set %s while not owner" %
+ self.name)
# Support passing in a single resource to release rather than many
if isinstance(names, basestring):
names = [names]
if names is None:
- names = self._list_owned()
+ names = self.list_owned()
else:
names = set(names)
- assert self._list_owned().issuperset(names), (
+ assert self.list_owned().issuperset(names), (
"release() on unheld resources %s (set %s)" %
- (names.difference(self._list_owned()), self.name))
+ (names.difference(self.list_owned()), self.name))
# First of all let's release the "all elements" lock, if set.
# After this 'add' can work again
- if self.__lock._is_owned():
+ if self.__lock.is_owned():
self.__lock.release()
self._del_owned()
"""
# Check we don't already own locks at this level
- assert not self._is_owned() or self.__lock._is_owned(shared=0), \
+ assert not self.is_owned() or self.__lock.is_owned(shared=0), \
("Cannot add locks if the set %s is only partially owned, or shared" %
self.name)
# If we don't already own the set-level lock acquired in an exclusive way
# we'll get it and note we need to release it later.
release_lock = False
- if not self.__lock._is_owned():
+ if not self.__lock.is_owned():
release_lock = True
self.__lock.acquire()
# If we own any subset of this lock it must be a superset of what we want
# to delete. The ownership must also be exclusive, but that will be checked
# by the lock itself.
- assert not self._is_owned() or self._list_owned().issuperset(names), (
+ assert not self.is_owned() or self.list_owned().issuperset(names), (
"remove() on acquired lockset %s while not owning all elements" %
self.name)
removed.append(lname)
except (KeyError, errors.LockError):
# This cannot happen if we were already holding it, verify:
- assert not self._is_owned(), ("remove failed while holding lockset %s"
- % self.name)
+ assert not self.is_owned(), ("remove failed while holding lockset %s" %
+ self.name)
else:
# If no LockError was raised we are the ones who deleted the lock.
# This means we can safely remove it from lockdict, as any further or
# it's the job of the one who actually deleted it.
del self.__lockdict[lname]
# And let's remove it from our private list if we owned it.
- if self._is_owned():
+ if self.is_owned():
self._del_owned(name=lname)
return removed
LEVEL_INSTANCE = 1
LEVEL_NODEGROUP = 2
LEVEL_NODE = 3
+LEVEL_NODE_RES = 4
-LEVELS = [LEVEL_CLUSTER,
- LEVEL_INSTANCE,
- LEVEL_NODEGROUP,
- LEVEL_NODE]
+LEVELS = [
+ LEVEL_CLUSTER,
+ LEVEL_INSTANCE,
+ LEVEL_NODEGROUP,
+ LEVEL_NODE,
+ LEVEL_NODE_RES,
+ ]
# Lock levels which are modifiable
-LEVELS_MOD = [LEVEL_NODE, LEVEL_NODEGROUP, LEVEL_INSTANCE]
-
+LEVELS_MOD = frozenset([
+ LEVEL_NODE_RES,
+ LEVEL_NODE,
+ LEVEL_NODEGROUP,
+ LEVEL_INSTANCE,
+ ])
+
+#: Lock level names (make sure to use singular form)
LEVEL_NAMES = {
LEVEL_CLUSTER: "cluster",
LEVEL_INSTANCE: "instance",
LEVEL_NODEGROUP: "nodegroup",
LEVEL_NODE: "node",
+ LEVEL_NODE_RES: "node-res",
}
# Constant for the big ganeti lock
-BGL = 'BGL'
+BGL = "BGL"
class GanetiLockManager:
# The keyring contains all the locks, at their level and in the correct
# locking order.
self.__keyring = {
- LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
- LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
- LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroups", monitor=self._monitor),
- LEVEL_INSTANCE: LockSet(instances, "instances",
+ LEVEL_CLUSTER: LockSet([BGL], "cluster", monitor=self._monitor),
+ LEVEL_NODE: LockSet(nodes, "node", monitor=self._monitor),
+ LEVEL_NODE_RES: LockSet(nodes, "node-res", monitor=self._monitor),
+ LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroup", monitor=self._monitor),
+ LEVEL_INSTANCE: LockSet(instances, "instance",
monitor=self._monitor),
}
+ assert compat.all(ls.name == LEVEL_NAMES[level]
+ for (level, ls) in self.__keyring.items())
+
+ def AddToLockMonitor(self, provider):
+ """Registers a new lock with the monitor.
+
+ See L{LockMonitor.RegisterLock}.
+
+ """
+ return self._monitor.RegisterLock(provider)
+
def QueryLocks(self, fields):
"""Queries information from all locks.
assert level in LEVELS, "Invalid locking level %s" % level
return self.__keyring[level]._names()
- def _is_owned(self, level):
+ def is_owned(self, level):
"""Check whether we are owning locks at the given level
"""
- return self.__keyring[level]._is_owned()
-
- is_owned = _is_owned
+ return self.__keyring[level].is_owned()
- def _list_owned(self, level):
+ def list_owned(self, level):
"""Get the set of owned locks at the given level
"""
- return self.__keyring[level]._list_owned()
+ return self.__keyring[level].list_owned()
+
+ def check_owned(self, level, names, shared=-1):
+ """Check if locks at a certain level are owned in a specific mode.
+
+ @see: L{LockSet.check_owned}
+
+ """
+ return self.__keyring[level].check_owned(names, shared=shared)
def _upper_owned(self, level):
"""Check that we don't own any lock at a level greater than the given one.
"""
# This way of checking only works if LEVELS[i] = i, which we check for in
# the test cases.
- return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
+ return compat.any((self.is_owned(l) for l in LEVELS[level + 1:]))
- def _BGL_owned(self): # pylint: disable-msg=C0103
+ def _BGL_owned(self): # pylint: disable=C0103
"""Check if the current thread owns the BGL.
Both an exclusive or a shared acquisition work.
"""
- return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
+ return BGL in self.__keyring[LEVEL_CLUSTER].list_owned()
@staticmethod
- def _contains_BGL(level, names): # pylint: disable-msg=C0103
+ def _contains_BGL(level, names): # pylint: disable=C0103
"""Check if the level contains the BGL.
Check if acting on the given level and set of names will change
return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
priority=priority)
+ def downgrade(self, level, names=None):
+ """Downgrade a set of resource locks from exclusive to shared mode.
+
+ You must have acquired the locks in exclusive mode.
+
+ @type level: member of locking.LEVELS
+ @param level: the level at which the locks shall be downgraded
+ @type names: list of strings, or None
+ @param names: the names of the locks which shall be downgraded
+ (defaults to all the locks acquired at the level)
+
+ """
+ assert level in LEVELS, "Invalid locking level %s" % level
+
+ return self.__keyring[level].downgrade(names=names)
+
def release(self, level, names=None):
"""Release a set of resource locks, at the same level.
not self._upper_owned(LEVEL_CLUSTER)), (
"Cannot release the Big Ganeti Lock while holding something"
" at upper levels (%r)" %
- (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
+ (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self.list_owned(i))
for i in self.__keyring.keys()]), ))
# Release will complain if we don't own the locks already
# Check we either own the level or don't own anything from here
# up. LockSet.remove() will check the case in which we don't own
# all the needed resources, or we have a shared ownership.
- assert self._is_owned(level) or not self._upper_owned(level), (
+ assert self.is_owned(level) or not self._upper_owned(level), (
"Cannot remove locks at a level while not owning it or"
" owning some at a greater one")
return self.__keyring[level].remove(names)
-def _MonitorSortKey((num, item)):
+def _MonitorSortKey((item, idx, num)):
"""Sorting key function.
- Sort by name, then by incoming order.
+ Sort by name, registration order and then order of information. This provides
+ a stable sort order over different providers, even if they return the same
+ name.
"""
(name, _, _, _) = item
- return (utils.NiceSortKey(name), num)
+ return (utils.NiceSortKey(name), num, idx)
class LockMonitor(object):
self._locks = weakref.WeakKeyDictionary()
@ssynchronized(_LOCK_ATTR)
- def RegisterLock(self, lock):
+ def RegisterLock(self, provider):
"""Registers a new lock.
+ @param provider: Object with a callable method named C{GetLockInfo}, taking
+ a single C{set} containing the requested information items
+ @note: It would be nicer to only receive the function generating the
+ requested information but, as it turns out, weak references to bound
+ methods (e.g. C{self.GetLockInfo}) are tricky; there are several
+ workarounds, but none of the ones I found works properly in combination
+ with a standard C{WeakKeyDictionary}
+
"""
- logging.debug("Registering lock %s", lock.name)
- assert lock not in self._locks, "Duplicate lock registration"
+ assert provider not in self._locks, "Duplicate registration"
# There used to be a check for duplicate names here. As it turned out, when
# a lock is re-created with the same name in a very short timeframe, the
# By keeping track of the order of incoming registrations, a stable sort
# ordering can still be guaranteed.
- self._locks[lock] = self._counter.next()
+ self._locks[provider] = self._counter.next()
- @ssynchronized(_LOCK_ATTR)
def _GetLockInfo(self, requested):
- """Get information from all locks while the monitor lock is held.
+ """Get information from all locks.
"""
- return [(num, lock.GetInfo(requested)) for lock, num in self._locks.items()]
+ # Must hold lock while getting consistent list of tracked items
+ self._lock.acquire(shared=1)
+ try:
+ items = self._locks.items()
+ finally:
+ self._lock.release()
+
+ return [(info, idx, num)
+ for (provider, num) in items
+ for (idx, info) in enumerate(provider.GetLockInfo(requested))]
def _Query(self, fields):
"""Queries information from all locks.
key=_MonitorSortKey)
# Extract lock information and build query data
- return (qobj, query.LockQueryData(map(operator.itemgetter(1), lockinfo)))
+ return (qobj, query.LockQueryData(map(compat.fst, lockinfo)))
def QueryLocks(self, fields):
"""Queries information from all locks.