import os
import select
import threading
-import time
import errno
+import weakref
+import logging
+import heapq
from ganeti import errors
from ganeti import utils
from ganeti import compat
+_EXCLUSIVE_TEXT = "exclusive"
+_SHARED_TEXT = "shared"
+
+_DEFAULT_PRIORITY = 0
+
+
def ssynchronized(mylock, shared=0):
"""Shared Synchronization decorator.
return wrap
-class RunningTimeout(object):
- """Class to calculate remaining timeout when doing several operations.
-
- """
- __slots__ = [
- "_allow_negative",
- "_start_time",
- "_time_fn",
- "_timeout",
- ]
-
- def __init__(self, timeout, allow_negative, _time_fn=time.time):
- """Initializes this class.
-
- @type timeout: float
- @param timeout: Timeout duration
- @type allow_negative: bool
- @param allow_negative: Whether to return values below zero
- @param _time_fn: Time function for unittests
-
- """
- object.__init__(self)
-
- if timeout is not None and timeout < 0.0:
- raise ValueError("Timeout must not be negative")
-
- self._timeout = timeout
- self._allow_negative = allow_negative
- self._time_fn = _time_fn
-
- self._start_time = None
-
- def Remaining(self):
- """Returns the remaining timeout.
-
- """
- if self._timeout is None:
- return None
-
- # Get start time on first calculation
- if self._start_time is None:
- self._start_time = self._time_fn()
-
- # Calculate remaining time
- remaining_timeout = self._start_time + self._timeout - self._time_fn()
-
- if not self._allow_negative:
- # Ensure timeout is always >= 0
- return max(0.0, remaining_timeout)
-
- return remaining_timeout
-
-
class _SingleNotifyPipeConditionWaiter(object):
"""Helper class for SingleNotifyPipeCondition
@param timeout: Timeout for waiting (can be None)
"""
- running_timeout = RunningTimeout(timeout, True)
+ running_timeout = utils.RunningTimeout(timeout, True)
while True:
remaining_time = running_timeout.Remaining()
"""
__slots__ = [
- "_nwaiters",
+ "_waiters",
"_single_condition",
]
"""
_BaseCondition.__init__(self, lock)
- self._nwaiters = 0
+ self._waiters = set()
self._single_condition = self._single_condition_class(self._lock)
def wait(self, timeout=None):
# Keep local reference to the pipe. It could be replaced by another thread
# notifying while we're waiting.
- my_condition = self._single_condition
+ cond = self._single_condition
- assert self._nwaiters >= 0
- self._nwaiters += 1
+ self._waiters.add(threading.currentThread())
try:
- my_condition.wait(timeout)
+ cond.wait(timeout)
finally:
- assert self._nwaiters > 0
- self._nwaiters -= 1
+ self._check_owned()
+ self._waiters.remove(threading.currentThread())
def notifyAll(self): # pylint: disable-msg=C0103
"""Notify all currently waiting threads.
self._single_condition.notifyAll()
self._single_condition = self._single_condition_class(self._lock)
+ def get_waiting(self):
+ """Returns a list of all waiting threads.
+
+ """
+ self._check_owned()
+
+ return self._waiters
+
def has_waiting(self):
"""Returns whether there are active waiters.
"""
self._check_owned()
- return bool(self._nwaiters)
+ return bool(self._waiters)
+
+
+class _PipeConditionWithMode(PipeCondition):
+ __slots__ = [
+ "shared",
+ ]
+
+ def __init__(self, lock, shared):
+ """Initializes this class.
+
+ """
+ self.shared = shared
+ PipeCondition.__init__(self, lock)
class SharedLock(object):
"""Implements a shared lock.
- Multiple threads can acquire the lock in a shared way, calling
- acquire_shared(). In order to acquire the lock in an exclusive way threads
- can call acquire_exclusive().
+ Multiple threads can acquire the lock in a shared way by calling
+ C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
+ threads can call C{acquire(shared=0)}.
- The lock prevents starvation but does not guarantee that threads will acquire
- the shared lock in the order they queued for it, just that they will
- eventually do so.
+ Notes on data structures: C{__pending} contains a priority queue (heapq) of
+ all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
+ ...]}. Each per-priority queue contains a normal in-order list of conditions
+ to be notified when the lock can be acquired. Shared locks are grouped
+ together by priority and the condition for them is stored in
+ C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
+ references for the per-priority queues indexed by priority for faster access.
@type name: string
@ivar name: the name of the lock
"""
__slots__ = [
- "__active_shr_c",
- "__inactive_shr_c",
+ "__weakref__",
"__deleted",
"__exc",
"__lock",
"__pending",
+ "__pending_by_prio",
+ "__pending_shared",
"__shr",
"name",
]
- __condition_class = PipeCondition
+ __condition_class = _PipeConditionWithMode
- def __init__(self, name):
+ def __init__(self, name, monitor=None):
"""Construct a new SharedLock.
@param name: the name of the lock
+ @type monitor: L{LockMonitor}
+ @param monitor: Lock monitor with which to register
"""
object.__init__(self)
# Queue containing waiting acquires
self.__pending = []
-
- # Active and inactive conditions for shared locks
- self.__active_shr_c = self.__condition_class(self.__lock)
- self.__inactive_shr_c = self.__condition_class(self.__lock)
+ self.__pending_by_prio = {}
+ self.__pending_shared = {}
# Current lock holders
self.__shr = set()
# is this lock in the deleted state?
self.__deleted = False
+ # Register with lock monitor
+ if monitor:
+ monitor.RegisterLock(self)
+
+ def GetInfo(self, fields):
+ """Retrieves information for querying locks.
+
+ @type fields: list of strings
+ @param fields: List of fields to return
+
+ """
+ self.__lock.acquire()
+ try:
+ info = []
+
+ # Note: to avoid unintentional race conditions, no references to
+ # modifiable objects should be returned unless they were created in this
+ # function.
+ for fname in fields:
+ if fname == "name":
+ info.append(self.name)
+ elif fname == "mode":
+ if self.__deleted:
+ info.append("deleted")
+ assert not (self.__exc or self.__shr)
+ elif self.__exc:
+ info.append(_EXCLUSIVE_TEXT)
+ elif self.__shr:
+ info.append(_SHARED_TEXT)
+ else:
+ info.append(None)
+ elif fname == "owner":
+ if self.__exc:
+ owner = [self.__exc]
+ else:
+ owner = self.__shr
+
+ if owner:
+ assert not self.__deleted
+ info.append([i.getName() for i in owner])
+ else:
+ info.append(None)
+ elif fname == "pending":
+ data = []
+
+ # Sorting instead of copying and using heaq functions for simplicity
+ for (_, prioqueue) in sorted(self.__pending):
+ for cond in prioqueue:
+ if cond.shared:
+ mode = _SHARED_TEXT
+ else:
+ mode = _EXCLUSIVE_TEXT
+
+ # This function should be fast as it runs with the lock held.
+ # Hence not using utils.NiceSort.
+ data.append((mode, sorted(i.getName()
+ for i in cond.get_waiting())))
+
+ info.append(data)
+ else:
+ raise errors.OpExecError("Invalid query field '%s'" % fname)
+
+ return info
+ finally:
+ self.__lock.release()
+
def __check_deleted(self):
"""Raises an exception if the lock has been deleted.
"""
self.__lock.acquire()
try:
- return len(self.__pending)
+ return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
+ finally:
+ self.__lock.release()
+
+ def _check_empty(self):
+ """Checks whether there are any pending acquires.
+
+ @rtype: bool
+
+ """
+ self.__lock.acquire()
+ try:
+ # Order is important: __find_first_pending_queue modifies __pending
+ return not (self.__find_first_pending_queue() or
+ self.__pending or
+ self.__pending_by_prio or
+ self.__pending_shared)
finally:
self.__lock.release()
else:
return len(self.__shr) == 0 and self.__exc is None
+ def __find_first_pending_queue(self):
+ """Tries to find the topmost queued entry with pending acquires.
+
+ Removes empty entries while going through the list.
+
+ """
+ while self.__pending:
+ (priority, prioqueue) = self.__pending[0]
+
+ if not prioqueue:
+ heapq.heappop(self.__pending)
+ del self.__pending_by_prio[priority]
+ assert priority not in self.__pending_shared
+ continue
+
+ if prioqueue:
+ return prioqueue
+
+ return None
+
def __is_on_top(self, cond):
"""Checks whether the passed condition is on top of the queue.
The caller must make sure the queue isn't empty.
"""
- return self.__pending[0] == cond
+ return cond == self.__find_first_pending_queue()[0]
- def __acquire_unlocked(self, shared, timeout):
+ def __acquire_unlocked(self, shared, timeout, priority):
"""Acquire a shared lock.
@param shared: whether to acquire in shared mode; by default an
exclusive lock will be acquired
@param timeout: maximum waiting time before giving up
+ @type priority: integer
+ @param priority: Priority for acquiring lock
"""
self.__check_deleted()
assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
" %s" % self.name)
+ # Remove empty entries from queue
+ self.__find_first_pending_queue()
+
# Check whether someone else holds the lock or there are pending acquires.
if not self.__pending and self.__can_acquire(shared):
# Apparently not, can acquire lock directly.
self.__do_acquire(shared)
return True
- if shared:
- wait_condition = self.__active_shr_c
+ prioqueue = self.__pending_by_prio.get(priority, None)
- # Check if we're not yet in the queue
- if wait_condition not in self.__pending:
- self.__pending.append(wait_condition)
+ if shared:
+ # Try to re-use condition for shared acquire
+ wait_condition = self.__pending_shared.get(priority, None)
+ assert (wait_condition is None or
+ (wait_condition.shared and wait_condition in prioqueue))
else:
- wait_condition = self.__condition_class(self.__lock)
- # Always add to queue
- self.__pending.append(wait_condition)
+ wait_condition = None
+
+ if wait_condition is None:
+ if prioqueue is None:
+ assert priority not in self.__pending_by_prio
+
+ prioqueue = []
+ heapq.heappush(self.__pending, (priority, prioqueue))
+ self.__pending_by_prio[priority] = prioqueue
+
+ wait_condition = self.__condition_class(self.__lock, shared)
+ prioqueue.append(wait_condition)
+
+ if shared:
+ # Keep reference for further shared acquires on same priority. This is
+ # better than trying to find it in the list of pending acquires.
+ assert priority not in self.__pending_shared
+ self.__pending_shared[priority] = wait_condition
try:
# Wait until we become the topmost acquire in the queue or the timeout
# expires.
+ # TODO: Decrease timeout with spurious notifications
while not (self.__is_on_top(wait_condition) and
self.__can_acquire(shared)):
# Wait for notification
return True
finally:
# Remove condition from queue if there are no more waiters
- if not wait_condition.has_waiting() and not self.__deleted:
- self.__pending.remove(wait_condition)
+ if not wait_condition.has_waiting():
+ prioqueue.remove(wait_condition)
+ if wait_condition.shared:
+ del self.__pending_shared[priority]
return False
- def acquire(self, shared=0, timeout=None, test_notify=None):
+ def acquire(self, shared=0, timeout=None, priority=None,
+ test_notify=None):
"""Acquire a shared lock.
@type shared: integer (0/1) used as a boolean
exclusive lock will be acquired
@type timeout: float
@param timeout: maximum waiting time before giving up
+ @type priority: integer
+ @param priority: Priority for acquiring lock
@type test_notify: callable or None
@param test_notify: Special callback function for unittesting
"""
+ if priority is None:
+ priority = _DEFAULT_PRIORITY
+
self.__lock.acquire()
try:
# We already got the lock, notify now
if __debug__ and callable(test_notify):
test_notify()
- return self.__acquire_unlocked(shared, timeout)
+ return self.__acquire_unlocked(shared, timeout, priority)
finally:
self.__lock.release()
self.__shr.remove(threading.currentThread())
# Notify topmost condition in queue
- if self.__pending:
- first_condition = self.__pending[0]
- first_condition.notifyAll()
-
- if first_condition == self.__active_shr_c:
- self.__active_shr_c = self.__inactive_shr_c
- self.__inactive_shr_c = first_condition
+ prioqueue = self.__find_first_pending_queue()
+ if prioqueue:
+ prioqueue[0].notifyAll()
finally:
self.__lock.release()
- def delete(self, timeout=None):
+ def delete(self, timeout=None, priority=None):
"""Delete a Shared Lock.
This operation will declare the lock for removal. First the lock will be
@type timeout: float
@param timeout: maximum waiting time before giving up
+ @type priority: integer
+ @param priority: Priority for acquiring lock
"""
+ if priority is None:
+ priority = _DEFAULT_PRIORITY
+
self.__lock.acquire()
try:
assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
acquired = self.__is_exclusive()
if not acquired:
- acquired = self.__acquire_unlocked(0, timeout)
+ acquired = self.__acquire_unlocked(0, timeout, priority)
assert self.__is_exclusive() and not self.__is_sharer(), \
"Lock wasn't acquired in exclusive mode"
self.__deleted = True
self.__exc = None
+ assert not (self.__exc or self.__shr), "Found owner during deletion"
+
# Notify all acquires. They'll throw an error.
- while self.__pending:
- self.__pending.pop().notifyAll()
+ for (_, prioqueue) in self.__pending:
+ for cond in prioqueue:
+ cond.notifyAll()
+
+ assert self.__deleted
return acquired
finally:
@ivar name: the name of the lockset
"""
- def __init__(self, members, name):
+ def __init__(self, members, name, monitor=None):
"""Constructs a new LockSet.
@type members: list of strings
@param members: initial members of the set
+ @type monitor: L{LockMonitor}
+ @param monitor: Lock monitor with which to register member locks
"""
assert members is not None, "members parameter is not a list"
self.name = name
+ # Lock monitor
+ self.__monitor = monitor
+
# Used internally to guarantee coherency.
self.__lock = SharedLock(name)
self.__lockdict = {}
for mname in members:
- self.__lockdict[mname] = SharedLock(self._GetLockName(mname))
+ self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
+ monitor=monitor)
# The owner dict contains the set of locks each thread owns. For
# performance each thread can access its own key without a global lock on
self.__lock.release()
return set(result)
- def acquire(self, names, timeout=None, shared=0, test_notify=None):
+ def acquire(self, names, timeout=None, shared=0, priority=None,
+ test_notify=None):
"""Acquire a set of resource locks.
@type names: list of strings (or string)
exclusive lock will be acquired
@type timeout: float or None
@param timeout: Maximum time to acquire all locks
+ @type priority: integer
+ @param priority: Priority for acquiring locks
@type test_notify: callable or None
@param test_notify: Special callback function for unittesting
assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
" (lockset %s)" % self.name)
+ if priority is None:
+ priority = _DEFAULT_PRIORITY
+
# We need to keep track of how long we spent waiting for a lock. The
# timeout passed to this function is over all lock acquires.
- running_timeout = RunningTimeout(timeout, False)
+ running_timeout = utils.RunningTimeout(timeout, False)
try:
if names is not None:
if isinstance(names, basestring):
names = [names]
- return self.__acquire_inner(names, False, shared,
+ return self.__acquire_inner(names, False, shared, priority,
running_timeout.Remaining, test_notify)
else:
# Some of them may then be deleted later, but we'll cope with this.
#
# We'd like to acquire this lock in a shared way, as it's nice if
- # everybody else can use the instances at the same time. If are
+ # everybody else can use the instances at the same time. If we are
# acquiring them exclusively though they won't be able to do this
# anyway, though, so we'll get the list lock exclusively as well in
# order to be able to do add() on the set while owning it.
- if not self.__lock.acquire(shared=shared,
+ if not self.__lock.acquire(shared=shared, priority=priority,
timeout=running_timeout.Remaining()):
raise _AcquireTimeout()
try:
# note we own the set-lock
self._add_owned()
- return self.__acquire_inner(self.__names(), True, shared,
+ return self.__acquire_inner(self.__names(), True, shared, priority,
running_timeout.Remaining, test_notify)
except:
# We shouldn't have problems adding the lock to the owners list, but
except _AcquireTimeout:
return None
- def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify):
+ def __acquire_inner(self, names, want_all, shared, priority,
+ timeout_fn, test_notify):
"""Inner logic for acquiring a number of locks.
@param names: Names of the locks to be acquired
@param want_all: Whether all locks in the set should be acquired
@param shared: Whether to acquire in shared mode
@param timeout_fn: Function returning remaining timeout
+ @param priority: Priority for acquiring locks
@param test_notify: Special callback function for unittesting
"""
# element is not there anymore.
continue
- raise errors.LockError("Non-existing lock %s in set %s" %
- (lname, self.name))
+ raise errors.LockError("Non-existing lock %s in set %s (it may have"
+ " been removed)" % (lname, self.name))
acquire_list.append((lname, lock))
try:
# raises LockError if the lock was deleted
acq_success = lock.acquire(shared=shared, timeout=timeout,
+ priority=priority,
test_notify=test_notify_fn)
except errors.LockError:
if want_all:
# particular element is not there anymore.
continue
- raise errors.LockError("Non-existing lock %s in set %s" %
- (lname, self.name))
+ raise errors.LockError("Non-existing lock %s in set %s (it may"
+ " have been removed)" % (lname, self.name))
if not acq_success:
# Couldn't get lock or timeout occurred
(invalid_names, self.name))
for lockname in names:
- lock = SharedLock(self._GetLockName(lockname))
+ lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
if acquired:
+ # No need for priority or timeout here as this lock has just been
+ # created
lock.acquire(shared=shared)
# now the lock cannot be deleted, we have it!
try:
# the same time.
LEVEL_CLUSTER = 0
LEVEL_INSTANCE = 1
-LEVEL_NODE = 2
+LEVEL_NODEGROUP = 2
+LEVEL_NODE = 3
LEVELS = [LEVEL_CLUSTER,
LEVEL_INSTANCE,
+ LEVEL_NODEGROUP,
LEVEL_NODE]
# Lock levels which are modifiable
-LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
+LEVELS_MOD = [LEVEL_NODE, LEVEL_NODEGROUP, LEVEL_INSTANCE]
LEVEL_NAMES = {
LEVEL_CLUSTER: "cluster",
LEVEL_INSTANCE: "instance",
+ LEVEL_NODEGROUP: "nodegroup",
LEVEL_NODE: "node",
}
"""
_instance = None
- def __init__(self, nodes=None, instances=None):
+ def __init__(self, nodes, nodegroups, instances):
"""Constructs a new GanetiLockManager object.
There should be only a GanetiLockManager object at any time, so this
function raises an error if this is not the case.
@param nodes: list of node names
+ @param nodegroups: list of nodegroup uuids
@param instances: list of instance names
"""
self.__class__._instance = self
+ self._monitor = LockMonitor()
+
# The keyring contains all the locks, at their level and in the correct
# locking order.
self.__keyring = {
- LEVEL_CLUSTER: LockSet([BGL], "bgl lockset"),
- LEVEL_NODE: LockSet(nodes, "nodes lockset"),
- LEVEL_INSTANCE: LockSet(instances, "instances lockset"),
- }
+ LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
+ LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
+ LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroups", monitor=self._monitor),
+ LEVEL_INSTANCE: LockSet(instances, "instances",
+ monitor=self._monitor),
+ }
+
+ def QueryLocks(self, fields, sync):
+ """Queries information from all locks.
+
+ See L{LockMonitor.QueryLocks}.
+
+ """
+ return self._monitor.QueryLocks(fields, sync)
def _names(self, level):
"""List the lock names at the given level.
"""
return level == LEVEL_CLUSTER and (names is None or BGL in names)
- def acquire(self, level, names, timeout=None, shared=0):
+ def acquire(self, level, names, timeout=None, shared=0, priority=None):
"""Acquire a set of resource locks, at the same level.
@type level: member of locking.LEVELS
an exclusive lock will be acquired
@type timeout: float
@param timeout: Maximum time to acquire all locks
+ @type priority: integer
+ @param priority: Priority for acquiring lock
"""
assert level in LEVELS, "Invalid locking level %s" % level
" while owning some at a greater one")
# Acquire the locks in the set.
- return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
+ return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
+ priority=priority)
def release(self, level, names=None):
"""Release a set of resource locks, at the same level.
"Cannot remove locks at a level while not owning it or"
" owning some at a greater one")
return self.__keyring[level].remove(names)
+
+
+class LockMonitor(object):
+ _LOCK_ATTR = "_lock"
+
+ def __init__(self):
+ """Initializes this class.
+
+ """
+ self._lock = SharedLock("LockMonitor")
+
+ # Tracked locks. Weak references are used to avoid issues with circular
+ # references and deletion.
+ self._locks = weakref.WeakKeyDictionary()
+
+ @ssynchronized(_LOCK_ATTR)
+ def RegisterLock(self, lock):
+ """Registers a new lock.
+
+ """
+ logging.debug("Registering lock %s", lock.name)
+ assert lock not in self._locks, "Duplicate lock registration"
+ assert not compat.any(lock.name == i.name for i in self._locks.keys()), \
+ "Found duplicate lock name"
+ self._locks[lock] = None
+
+ @ssynchronized(_LOCK_ATTR)
+ def _GetLockInfo(self, fields):
+ """Get information from all locks while the monitor lock is held.
+
+ """
+ result = {}
+
+ for lock in self._locks.keys():
+ assert lock.name not in result, "Found duplicate lock name"
+ result[lock.name] = lock.GetInfo(fields)
+
+ return result
+
+ def QueryLocks(self, fields, sync):
+ """Queries information from all locks.
+
+ @type fields: list of strings
+ @param fields: List of fields to return
+ @type sync: boolean
+ @param sync: Whether to operate in synchronous mode
+
+ """
+ if sync:
+ raise NotImplementedError("Synchronous queries are not implemented")
+
+ # Get all data without sorting
+ result = self._GetLockInfo(fields)
+
+ # Sort by name
+ return [result[name] for name in utils.NiceSort(result.keys())]