X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/05ad571c12aeb86e88dbf31296b5b462e488c48f..18ffc0fe13aa3c58fdd6926551ebe4569fc0125f:/lib/locking.py diff --git a/lib/locking.py b/lib/locking.py index f901490..3575085 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -28,18 +28,23 @@ import os import select import threading -import time import errno import weakref import logging +import heapq +import operator from ganeti import errors from ganeti import utils from ganeti import compat +from ganeti import query _EXCLUSIVE_TEXT = "exclusive" _SHARED_TEXT = "shared" +_DELETED_TEXT = "deleted" + +_DEFAULT_PRIORITY = 0 def ssynchronized(mylock, shared=0): @@ -70,59 +75,6 @@ def ssynchronized(mylock, shared=0): return wrap -class RunningTimeout(object): - """Class to calculate remaining timeout when doing several operations. - - """ - __slots__ = [ - "_allow_negative", - "_start_time", - "_time_fn", - "_timeout", - ] - - def __init__(self, timeout, allow_negative, _time_fn=time.time): - """Initializes this class. - - @type timeout: float - @param timeout: Timeout duration - @type allow_negative: bool - @param allow_negative: Whether to return values below zero - @param _time_fn: Time function for unittests - - """ - object.__init__(self) - - if timeout is not None and timeout < 0.0: - raise ValueError("Timeout must not be negative") - - self._timeout = timeout - self._allow_negative = allow_negative - self._time_fn = _time_fn - - self._start_time = None - - def Remaining(self): - """Returns the remaining timeout. - - """ - if self._timeout is None: - return None - - # Get start time on first calculation - if self._start_time is None: - self._start_time = self._time_fn() - - # Calculate remaining time - remaining_timeout = self._start_time + self._timeout - self._time_fn() - - if not self._allow_negative: - # Ensure timeout is always >= 0 - return max(0.0, remaining_timeout) - - return remaining_timeout - - class _SingleNotifyPipeConditionWaiter(object): """Helper class for SingleNotifyPipeCondition @@ -152,7 +104,7 @@ class _SingleNotifyPipeConditionWaiter(object): @param timeout: Timeout for waiting (can be None) """ - running_timeout = RunningTimeout(timeout, True) + running_timeout = utils.RunningTimeout(timeout, True) while True: remaining_time = running_timeout.Remaining() @@ -406,6 +358,19 @@ class PipeCondition(_BaseCondition): return bool(self._waiters) +class _PipeConditionWithMode(PipeCondition): + __slots__ = [ + "shared", + ] + + def __init__(self, lock, shared): + """Initializes this class. + + """ + self.shared = shared + PipeCondition.__init__(self, lock) + + class SharedLock(object): """Implements a shared lock. @@ -413,9 +378,13 @@ class SharedLock(object): C{acquire(shared=1)}. In order to acquire the lock in an exclusive way threads can call C{acquire(shared=0)}. - The lock prevents starvation but does not guarantee that threads will acquire - the shared lock in the order they queued for it, just that they will - eventually do so. + Notes on data structures: C{__pending} contains a priority queue (heapq) of + all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2), + ...]}. Each per-priority queue contains a normal in-order list of conditions + to be notified when the lock can be acquired. Shared locks are grouped + together by priority and the condition for them is stored in + C{__pending_shared} if it already exists. C{__pending_by_prio} keeps + references for the per-priority queues indexed by priority for faster access. @type name: string @ivar name: the name of the lock @@ -423,17 +392,17 @@ class SharedLock(object): """ __slots__ = [ "__weakref__", - "__active_shr_c", - "__inactive_shr_c", "__deleted", "__exc", "__lock", "__pending", + "__pending_by_prio", + "__pending_shared", "__shr", "name", ] - __condition_class = PipeCondition + __condition_class = _PipeConditionWithMode def __init__(self, name, monitor=None): """Construct a new SharedLock. @@ -452,10 +421,8 @@ class SharedLock(object): # Queue containing waiting acquires self.__pending = [] - - # Active and inactive conditions for shared locks - self.__active_shr_c = self.__condition_class(self.__lock) - self.__inactive_shr_c = self.__condition_class(self.__lock) + self.__pending_by_prio = {} + self.__pending_shared = {} # Current lock holders self.__shr = set() @@ -468,63 +435,60 @@ class SharedLock(object): if monitor: monitor.RegisterLock(self) - def GetInfo(self, fields): + def GetInfo(self, requested): """Retrieves information for querying locks. - @type fields: list of strings - @param fields: List of fields to return + @type requested: set + @param requested: Requested information, see C{query.LQ_*} """ self.__lock.acquire() try: - info = [] - # Note: to avoid unintentional race conditions, no references to # modifiable objects should be returned unless they were created in this # function. - for fname in fields: - if fname == "name": - info.append(self.name) - elif fname == "mode": - if self.__deleted: - info.append("deleted") - assert not (self.__exc or self.__shr) - elif self.__exc: - info.append(_EXCLUSIVE_TEXT) - elif self.__shr: - info.append(_SHARED_TEXT) - else: - info.append(None) - elif fname == "owner": - if self.__exc: - owner = [self.__exc] - else: - owner = self.__shr - - if owner: - assert not self.__deleted - info.append([i.getName() for i in owner]) - else: - info.append(None) - elif fname == "pending": - data = [] - - for cond in self.__pending: - if cond in (self.__active_shr_c, self.__inactive_shr_c): - mode = _SHARED_TEXT - else: - mode = _EXCLUSIVE_TEXT + mode = None + owner_names = None + + if query.LQ_MODE in requested: + if self.__deleted: + mode = _DELETED_TEXT + assert not (self.__exc or self.__shr) + elif self.__exc: + mode = _EXCLUSIVE_TEXT + elif self.__shr: + mode = _SHARED_TEXT + + # Current owner(s) are wanted + if query.LQ_OWNER in requested: + if self.__exc: + owner = [self.__exc] + else: + owner = self.__shr - # This function should be fast as it runs with the lock held. Hence - # not using utils.NiceSort. - data.append((mode, sorted([i.getName() - for i in cond.get_waiting()]))) + if owner: + assert not self.__deleted + owner_names = [i.getName() for i in owner] - info.append(data) - else: - raise errors.OpExecError("Invalid query field '%s'" % fname) + # Pending acquires are wanted + if query.LQ_PENDING in requested: + pending = [] + + # Sorting instead of copying and using heaq functions for simplicity + for (_, prioqueue) in sorted(self.__pending): + for cond in prioqueue: + if cond.shared: + pendmode = _SHARED_TEXT + else: + pendmode = _EXCLUSIVE_TEXT + + # List of names will be sorted in L{query._GetLockPending} + pending.append((pendmode, [i.getName() + for i in cond.get_waiting()])) + else: + pending = None - return info + return (self.name, mode, owner_names, pending) finally: self.__lock.release() @@ -584,7 +548,23 @@ class SharedLock(object): """ self.__lock.acquire() try: - return len(self.__pending) + return sum(len(prioqueue) for (_, prioqueue) in self.__pending) + finally: + self.__lock.release() + + def _check_empty(self): + """Checks whether there are any pending acquires. + + @rtype: bool + + """ + self.__lock.acquire() + try: + # Order is important: __find_first_pending_queue modifies __pending + return not (self.__find_first_pending_queue() or + self.__pending or + self.__pending_by_prio or + self.__pending_shared) finally: self.__lock.release() @@ -606,20 +586,42 @@ class SharedLock(object): else: return len(self.__shr) == 0 and self.__exc is None + def __find_first_pending_queue(self): + """Tries to find the topmost queued entry with pending acquires. + + Removes empty entries while going through the list. + + """ + while self.__pending: + (priority, prioqueue) = self.__pending[0] + + if not prioqueue: + heapq.heappop(self.__pending) + del self.__pending_by_prio[priority] + assert priority not in self.__pending_shared + continue + + if prioqueue: + return prioqueue + + return None + def __is_on_top(self, cond): """Checks whether the passed condition is on top of the queue. The caller must make sure the queue isn't empty. """ - return self.__pending[0] == cond + return cond == self.__find_first_pending_queue()[0] - def __acquire_unlocked(self, shared, timeout): + def __acquire_unlocked(self, shared, timeout, priority): """Acquire a shared lock. @param shared: whether to acquire in shared mode; by default an exclusive lock will be acquired @param timeout: maximum waiting time before giving up + @type priority: integer + @param priority: Priority for acquiring lock """ self.__check_deleted() @@ -628,26 +630,46 @@ class SharedLock(object): assert not self.__is_owned(), ("double acquire() on a non-recursive lock" " %s" % self.name) + # Remove empty entries from queue + self.__find_first_pending_queue() + # Check whether someone else holds the lock or there are pending acquires. if not self.__pending and self.__can_acquire(shared): # Apparently not, can acquire lock directly. self.__do_acquire(shared) return True - if shared: - wait_condition = self.__active_shr_c + prioqueue = self.__pending_by_prio.get(priority, None) - # Check if we're not yet in the queue - if wait_condition not in self.__pending: - self.__pending.append(wait_condition) + if shared: + # Try to re-use condition for shared acquire + wait_condition = self.__pending_shared.get(priority, None) + assert (wait_condition is None or + (wait_condition.shared and wait_condition in prioqueue)) else: - wait_condition = self.__condition_class(self.__lock) - # Always add to queue - self.__pending.append(wait_condition) + wait_condition = None + + if wait_condition is None: + if prioqueue is None: + assert priority not in self.__pending_by_prio + + prioqueue = [] + heapq.heappush(self.__pending, (priority, prioqueue)) + self.__pending_by_prio[priority] = prioqueue + + wait_condition = self.__condition_class(self.__lock, shared) + prioqueue.append(wait_condition) + + if shared: + # Keep reference for further shared acquires on same priority. This is + # better than trying to find it in the list of pending acquires. + assert priority not in self.__pending_shared + self.__pending_shared[priority] = wait_condition try: # Wait until we become the topmost acquire in the queue or the timeout # expires. + # TODO: Decrease timeout with spurious notifications while not (self.__is_on_top(wait_condition) and self.__can_acquire(shared)): # Wait for notification @@ -664,12 +686,15 @@ class SharedLock(object): return True finally: # Remove condition from queue if there are no more waiters - if not wait_condition.has_waiting() and not self.__deleted: - self.__pending.remove(wait_condition) + if not wait_condition.has_waiting(): + prioqueue.remove(wait_condition) + if wait_condition.shared: + del self.__pending_shared[priority] return False - def acquire(self, shared=0, timeout=None, test_notify=None): + def acquire(self, shared=0, timeout=None, priority=None, + test_notify=None): """Acquire a shared lock. @type shared: integer (0/1) used as a boolean @@ -677,17 +702,22 @@ class SharedLock(object): exclusive lock will be acquired @type timeout: float @param timeout: maximum waiting time before giving up + @type priority: integer + @param priority: Priority for acquiring lock @type test_notify: callable or None @param test_notify: Special callback function for unittesting """ + if priority is None: + priority = _DEFAULT_PRIORITY + self.__lock.acquire() try: # We already got the lock, notify now if __debug__ and callable(test_notify): test_notify() - return self.__acquire_unlocked(shared, timeout) + return self.__acquire_unlocked(shared, timeout, priority) finally: self.__lock.release() @@ -710,18 +740,14 @@ class SharedLock(object): self.__shr.remove(threading.currentThread()) # Notify topmost condition in queue - if self.__pending: - first_condition = self.__pending[0] - first_condition.notifyAll() - - if first_condition == self.__active_shr_c: - self.__active_shr_c = self.__inactive_shr_c - self.__inactive_shr_c = first_condition + prioqueue = self.__find_first_pending_queue() + if prioqueue: + prioqueue[0].notifyAll() finally: self.__lock.release() - def delete(self, timeout=None): + def delete(self, timeout=None, priority=None): """Delete a Shared Lock. This operation will declare the lock for removal. First the lock will be @@ -730,8 +756,13 @@ class SharedLock(object): @type timeout: float @param timeout: maximum waiting time before giving up + @type priority: integer + @param priority: Priority for acquiring lock """ + if priority is None: + priority = _DEFAULT_PRIORITY + self.__lock.acquire() try: assert not self.__is_sharer(), "Cannot delete() a lock while sharing it" @@ -742,7 +773,7 @@ class SharedLock(object): acquired = self.__is_exclusive() if not acquired: - acquired = self.__acquire_unlocked(0, timeout) + acquired = self.__acquire_unlocked(0, timeout, priority) assert self.__is_exclusive() and not self.__is_sharer(), \ "Lock wasn't acquired in exclusive mode" @@ -754,8 +785,11 @@ class SharedLock(object): assert not (self.__exc or self.__shr), "Found owner during deletion" # Notify all acquires. They'll throw an error. - while self.__pending: - self.__pending.pop().notifyAll() + for (_, prioqueue) in self.__pending: + for cond in prioqueue: + cond.notifyAll() + + assert self.__deleted return acquired finally: @@ -908,7 +942,8 @@ class LockSet: self.__lock.release() return set(result) - def acquire(self, names, timeout=None, shared=0, test_notify=None): + def acquire(self, names, timeout=None, shared=0, priority=None, + test_notify=None): """Acquire a set of resource locks. @type names: list of strings (or string) @@ -919,6 +954,8 @@ class LockSet: exclusive lock will be acquired @type timeout: float or None @param timeout: Maximum time to acquire all locks + @type priority: integer + @param priority: Priority for acquiring locks @type test_notify: callable or None @param test_notify: Special callback function for unittesting @@ -935,9 +972,12 @@ class LockSet: assert not self._is_owned(), ("Cannot acquire locks in the same set twice" " (lockset %s)" % self.name) + if priority is None: + priority = _DEFAULT_PRIORITY + # We need to keep track of how long we spent waiting for a lock. The # timeout passed to this function is over all lock acquires. - running_timeout = RunningTimeout(timeout, False) + running_timeout = utils.RunningTimeout(timeout, False) try: if names is not None: @@ -945,7 +985,7 @@ class LockSet: if isinstance(names, basestring): names = [names] - return self.__acquire_inner(names, False, shared, + return self.__acquire_inner(names, False, shared, priority, running_timeout.Remaining, test_notify) else: @@ -954,18 +994,18 @@ class LockSet: # Some of them may then be deleted later, but we'll cope with this. # # We'd like to acquire this lock in a shared way, as it's nice if - # everybody else can use the instances at the same time. If are + # everybody else can use the instances at the same time. If we are # acquiring them exclusively though they won't be able to do this # anyway, though, so we'll get the list lock exclusively as well in # order to be able to do add() on the set while owning it. - if not self.__lock.acquire(shared=shared, + if not self.__lock.acquire(shared=shared, priority=priority, timeout=running_timeout.Remaining()): raise _AcquireTimeout() try: # note we own the set-lock self._add_owned() - return self.__acquire_inner(self.__names(), True, shared, + return self.__acquire_inner(self.__names(), True, shared, priority, running_timeout.Remaining, test_notify) except: # We shouldn't have problems adding the lock to the owners list, but @@ -978,13 +1018,15 @@ class LockSet: except _AcquireTimeout: return None - def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify): + def __acquire_inner(self, names, want_all, shared, priority, + timeout_fn, test_notify): """Inner logic for acquiring a number of locks. @param names: Names of the locks to be acquired @param want_all: Whether all locks in the set should be acquired @param shared: Whether to acquire in shared mode @param timeout_fn: Function returning remaining timeout + @param priority: Priority for acquiring locks @param test_notify: Special callback function for unittesting """ @@ -1003,8 +1045,8 @@ class LockSet: # element is not there anymore. continue - raise errors.LockError("Non-existing lock %s in set %s" % - (lname, self.name)) + raise errors.LockError("Non-existing lock %s in set %s (it may have" + " been removed)" % (lname, self.name)) acquire_list.append((lname, lock)) @@ -1028,6 +1070,7 @@ class LockSet: try: # raises LockError if the lock was deleted acq_success = lock.acquire(shared=shared, timeout=timeout, + priority=priority, test_notify=test_notify_fn) except errors.LockError: if want_all: @@ -1035,8 +1078,8 @@ class LockSet: # particular element is not there anymore. continue - raise errors.LockError("Non-existing lock %s in set %s" % - (lname, self.name)) + raise errors.LockError("Non-existing lock %s in set %s (it may" + " have been removed)" % (lname, self.name)) if not acq_success: # Couldn't get lock or timeout occurred @@ -1146,6 +1189,8 @@ class LockSet: lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor) if acquired: + # No need for priority or timeout here as this lock has just been + # created lock.acquire(shared=shared) # now the lock cannot be deleted, we have it! try: @@ -1236,18 +1281,21 @@ class LockSet: # the same time. LEVEL_CLUSTER = 0 LEVEL_INSTANCE = 1 -LEVEL_NODE = 2 +LEVEL_NODEGROUP = 2 +LEVEL_NODE = 3 LEVELS = [LEVEL_CLUSTER, LEVEL_INSTANCE, + LEVEL_NODEGROUP, LEVEL_NODE] # Lock levels which are modifiable -LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE] +LEVELS_MOD = [LEVEL_NODE, LEVEL_NODEGROUP, LEVEL_INSTANCE] LEVEL_NAMES = { LEVEL_CLUSTER: "cluster", LEVEL_INSTANCE: "instance", + LEVEL_NODEGROUP: "nodegroup", LEVEL_NODE: "node", } @@ -1266,13 +1314,14 @@ class GanetiLockManager: """ _instance = None - def __init__(self, nodes=None, instances=None): + def __init__(self, nodes, nodegroups, instances): """Constructs a new GanetiLockManager object. There should be only a GanetiLockManager object at any time, so this function raises an error if this is not the case. @param nodes: list of node names + @param nodegroups: list of nodegroup uuids @param instances: list of instance names """ @@ -1288,17 +1337,26 @@ class GanetiLockManager: self.__keyring = { LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor), LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor), + LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroups", monitor=self._monitor), LEVEL_INSTANCE: LockSet(instances, "instances", monitor=self._monitor), } - def QueryLocks(self, fields, sync): + def QueryLocks(self, fields): """Queries information from all locks. See L{LockMonitor.QueryLocks}. """ - return self._monitor.QueryLocks(fields, sync) + return self._monitor.QueryLocks(fields) + + def OldStyleQueryLocks(self, fields): + """Queries information from all locks, returning old-style data. + + See L{LockMonitor.OldStyleQueryLocks}. + + """ + return self._monitor.OldStyleQueryLocks(fields) def _names(self, level): """List the lock names at the given level. @@ -1351,7 +1409,7 @@ class GanetiLockManager: """ return level == LEVEL_CLUSTER and (names is None or BGL in names) - def acquire(self, level, names, timeout=None, shared=0): + def acquire(self, level, names, timeout=None, shared=0, priority=None): """Acquire a set of resource locks, at the same level. @type level: member of locking.LEVELS @@ -1364,6 +1422,8 @@ class GanetiLockManager: an exclusive lock will be acquired @type timeout: float @param timeout: Maximum time to acquire all locks + @type priority: integer + @param priority: Priority for acquiring lock """ assert level in LEVELS, "Invalid locking level %s" % level @@ -1382,7 +1442,8 @@ class GanetiLockManager: " while owning some at a greater one") # Acquire the locks in the set. - return self.__keyring[level].acquire(names, shared=shared, timeout=timeout) + return self.__keyring[level].acquire(names, shared=shared, timeout=timeout, + priority=priority) def release(self, level, names=None): """Release a set of resource locks, at the same level. @@ -1478,32 +1539,46 @@ class LockMonitor(object): self._locks[lock] = None @ssynchronized(_LOCK_ATTR) - def _GetLockInfo(self, fields): + def _GetLockInfo(self, requested): """Get information from all locks while the monitor lock is held. """ - result = {} + return [lock.GetInfo(requested) for lock in self._locks.keys()] + + def _Query(self, fields): + """Queries information from all locks. + + @type fields: list of strings + @param fields: List of fields to return + + """ + qobj = query.Query(query.LOCK_FIELDS, fields) - for lock in self._locks.keys(): - assert lock.name not in result, "Found duplicate lock name" - result[lock.name] = lock.GetInfo(fields) + # Get all data and sort by name + lockinfo = utils.NiceSort(self._GetLockInfo(qobj.RequestedData()), + key=operator.itemgetter(0)) - return result + return (qobj, query.LockQueryData(lockinfo)) - def QueryLocks(self, fields, sync): + def QueryLocks(self, fields): """Queries information from all locks. @type fields: list of strings @param fields: List of fields to return - @type sync: boolean - @param sync: Whether to operate in synchronous mode """ - if sync: - raise NotImplementedError("Synchronous queries are not implemented") + (qobj, ctx) = self._Query(fields) + + # Prepare query response + return query.GetQueryResponse(qobj, ctx) - # Get all data without sorting - result = self._GetLockInfo(fields) + def OldStyleQueryLocks(self, fields): + """Queries information from all locks, returning old-style data. + + @type fields: list of strings + @param fields: List of fields to return + + """ + (qobj, ctx) = self._Query(fields) - # Sort by name - return [result[name] for name in utils.NiceSort(result.keys())] + return qobj.OldStyleQuery(ctx)