X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/7100c2fa74bd70b34ee43472ac8585562ebf77a0..d24bc00093906f86c8f054557b4fe6869cb75885:/lib/locking.py?ds=sidebyside diff --git a/lib/locking.py b/lib/locking.py index 9304cf9..3575085 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -28,19 +28,21 @@ import os import select import threading -import time import errno import weakref import logging import heapq +import operator from ganeti import errors from ganeti import utils from ganeti import compat +from ganeti import query _EXCLUSIVE_TEXT = "exclusive" _SHARED_TEXT = "shared" +_DELETED_TEXT = "deleted" _DEFAULT_PRIORITY = 0 @@ -73,59 +75,6 @@ def ssynchronized(mylock, shared=0): return wrap -class RunningTimeout(object): - """Class to calculate remaining timeout when doing several operations. - - """ - __slots__ = [ - "_allow_negative", - "_start_time", - "_time_fn", - "_timeout", - ] - - def __init__(self, timeout, allow_negative, _time_fn=time.time): - """Initializes this class. - - @type timeout: float - @param timeout: Timeout duration - @type allow_negative: bool - @param allow_negative: Whether to return values below zero - @param _time_fn: Time function for unittests - - """ - object.__init__(self) - - if timeout is not None and timeout < 0.0: - raise ValueError("Timeout must not be negative") - - self._timeout = timeout - self._allow_negative = allow_negative - self._time_fn = _time_fn - - self._start_time = None - - def Remaining(self): - """Returns the remaining timeout. - - """ - if self._timeout is None: - return None - - # Get start time on first calculation - if self._start_time is None: - self._start_time = self._time_fn() - - # Calculate remaining time - remaining_timeout = self._start_time + self._timeout - self._time_fn() - - if not self._allow_negative: - # Ensure timeout is always >= 0 - return max(0.0, remaining_timeout) - - return remaining_timeout - - class _SingleNotifyPipeConditionWaiter(object): """Helper class for SingleNotifyPipeCondition @@ -155,7 +104,7 @@ class _SingleNotifyPipeConditionWaiter(object): @param timeout: Timeout for waiting (can be None) """ - running_timeout = RunningTimeout(timeout, True) + running_timeout = utils.RunningTimeout(timeout, True) while True: remaining_time = running_timeout.Remaining() @@ -486,65 +435,60 @@ class SharedLock(object): if monitor: monitor.RegisterLock(self) - def GetInfo(self, fields): + def GetInfo(self, requested): """Retrieves information for querying locks. - @type fields: list of strings - @param fields: List of fields to return + @type requested: set + @param requested: Requested information, see C{query.LQ_*} """ self.__lock.acquire() try: - info = [] - # Note: to avoid unintentional race conditions, no references to # modifiable objects should be returned unless they were created in this # function. - for fname in fields: - if fname == "name": - info.append(self.name) - elif fname == "mode": - if self.__deleted: - info.append("deleted") - assert not (self.__exc or self.__shr) - elif self.__exc: - info.append(_EXCLUSIVE_TEXT) - elif self.__shr: - info.append(_SHARED_TEXT) - else: - info.append(None) - elif fname == "owner": - if self.__exc: - owner = [self.__exc] - else: - owner = self.__shr - - if owner: - assert not self.__deleted - info.append([i.getName() for i in owner]) - else: - info.append(None) - elif fname == "pending": - data = [] - - # Sorting instead of copying and using heaq functions for simplicity - for (_, prioqueue) in sorted(self.__pending): - for cond in prioqueue: - if cond.shared: - mode = _SHARED_TEXT - else: - mode = _EXCLUSIVE_TEXT - - # This function should be fast as it runs with the lock held. - # Hence not using utils.NiceSort. - data.append((mode, sorted(i.getName() - for i in cond.get_waiting()))) - - info.append(data) + mode = None + owner_names = None + + if query.LQ_MODE in requested: + if self.__deleted: + mode = _DELETED_TEXT + assert not (self.__exc or self.__shr) + elif self.__exc: + mode = _EXCLUSIVE_TEXT + elif self.__shr: + mode = _SHARED_TEXT + + # Current owner(s) are wanted + if query.LQ_OWNER in requested: + if self.__exc: + owner = [self.__exc] else: - raise errors.OpExecError("Invalid query field '%s'" % fname) + owner = self.__shr + + if owner: + assert not self.__deleted + owner_names = [i.getName() for i in owner] - return info + # Pending acquires are wanted + if query.LQ_PENDING in requested: + pending = [] + + # Sorting instead of copying and using heaq functions for simplicity + for (_, prioqueue) in sorted(self.__pending): + for cond in prioqueue: + if cond.shared: + pendmode = _SHARED_TEXT + else: + pendmode = _EXCLUSIVE_TEXT + + # List of names will be sorted in L{query._GetLockPending} + pending.append((pendmode, [i.getName() + for i in cond.get_waiting()])) + else: + pending = None + + return (self.name, mode, owner_names, pending) finally: self.__lock.release() @@ -1033,7 +977,7 @@ class LockSet: # We need to keep track of how long we spent waiting for a lock. The # timeout passed to this function is over all lock acquires. - running_timeout = RunningTimeout(timeout, False) + running_timeout = utils.RunningTimeout(timeout, False) try: if names is not None: @@ -1101,8 +1045,8 @@ class LockSet: # element is not there anymore. continue - raise errors.LockError("Non-existing lock %s in set %s" % - (lname, self.name)) + raise errors.LockError("Non-existing lock %s in set %s (it may have" + " been removed)" % (lname, self.name)) acquire_list.append((lname, lock)) @@ -1134,8 +1078,8 @@ class LockSet: # particular element is not there anymore. continue - raise errors.LockError("Non-existing lock %s in set %s" % - (lname, self.name)) + raise errors.LockError("Non-existing lock %s in set %s (it may" + " have been removed)" % (lname, self.name)) if not acq_success: # Couldn't get lock or timeout occurred @@ -1337,18 +1281,21 @@ class LockSet: # the same time. LEVEL_CLUSTER = 0 LEVEL_INSTANCE = 1 -LEVEL_NODE = 2 +LEVEL_NODEGROUP = 2 +LEVEL_NODE = 3 LEVELS = [LEVEL_CLUSTER, LEVEL_INSTANCE, + LEVEL_NODEGROUP, LEVEL_NODE] # Lock levels which are modifiable -LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE] +LEVELS_MOD = [LEVEL_NODE, LEVEL_NODEGROUP, LEVEL_INSTANCE] LEVEL_NAMES = { LEVEL_CLUSTER: "cluster", LEVEL_INSTANCE: "instance", + LEVEL_NODEGROUP: "nodegroup", LEVEL_NODE: "node", } @@ -1367,13 +1314,14 @@ class GanetiLockManager: """ _instance = None - def __init__(self, nodes=None, instances=None): + def __init__(self, nodes, nodegroups, instances): """Constructs a new GanetiLockManager object. There should be only a GanetiLockManager object at any time, so this function raises an error if this is not the case. @param nodes: list of node names + @param nodegroups: list of nodegroup uuids @param instances: list of instance names """ @@ -1389,17 +1337,26 @@ class GanetiLockManager: self.__keyring = { LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor), LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor), + LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroups", monitor=self._monitor), LEVEL_INSTANCE: LockSet(instances, "instances", monitor=self._monitor), } - def QueryLocks(self, fields, sync): + def QueryLocks(self, fields): """Queries information from all locks. See L{LockMonitor.QueryLocks}. """ - return self._monitor.QueryLocks(fields, sync) + return self._monitor.QueryLocks(fields) + + def OldStyleQueryLocks(self, fields): + """Queries information from all locks, returning old-style data. + + See L{LockMonitor.OldStyleQueryLocks}. + + """ + return self._monitor.OldStyleQueryLocks(fields) def _names(self, level): """List the lock names at the given level. @@ -1452,7 +1409,7 @@ class GanetiLockManager: """ return level == LEVEL_CLUSTER and (names is None or BGL in names) - def acquire(self, level, names, timeout=None, shared=0): + def acquire(self, level, names, timeout=None, shared=0, priority=None): """Acquire a set of resource locks, at the same level. @type level: member of locking.LEVELS @@ -1465,6 +1422,8 @@ class GanetiLockManager: an exclusive lock will be acquired @type timeout: float @param timeout: Maximum time to acquire all locks + @type priority: integer + @param priority: Priority for acquiring lock """ assert level in LEVELS, "Invalid locking level %s" % level @@ -1483,7 +1442,8 @@ class GanetiLockManager: " while owning some at a greater one") # Acquire the locks in the set. - return self.__keyring[level].acquire(names, shared=shared, timeout=timeout) + return self.__keyring[level].acquire(names, shared=shared, timeout=timeout, + priority=priority) def release(self, level, names=None): """Release a set of resource locks, at the same level. @@ -1579,32 +1539,46 @@ class LockMonitor(object): self._locks[lock] = None @ssynchronized(_LOCK_ATTR) - def _GetLockInfo(self, fields): + def _GetLockInfo(self, requested): """Get information from all locks while the monitor lock is held. """ - result = {} + return [lock.GetInfo(requested) for lock in self._locks.keys()] - for lock in self._locks.keys(): - assert lock.name not in result, "Found duplicate lock name" - result[lock.name] = lock.GetInfo(fields) + def _Query(self, fields): + """Queries information from all locks. - return result + @type fields: list of strings + @param fields: List of fields to return + + """ + qobj = query.Query(query.LOCK_FIELDS, fields) + + # Get all data and sort by name + lockinfo = utils.NiceSort(self._GetLockInfo(qobj.RequestedData()), + key=operator.itemgetter(0)) + + return (qobj, query.LockQueryData(lockinfo)) - def QueryLocks(self, fields, sync): + def QueryLocks(self, fields): """Queries information from all locks. @type fields: list of strings @param fields: List of fields to return - @type sync: boolean - @param sync: Whether to operate in synchronous mode """ - if sync: - raise NotImplementedError("Synchronous queries are not implemented") + (qobj, ctx) = self._Query(fields) - # Get all data without sorting - result = self._GetLockInfo(fields) + # Prepare query response + return query.GetQueryResponse(qobj, ctx) + + def OldStyleQueryLocks(self, fields): + """Queries information from all locks, returning old-style data. + + @type fields: list of strings + @param fields: List of fields to return + + """ + (qobj, ctx) = self._Query(fields) - # Sort by name - return [result[name] for name in utils.NiceSort(result.keys())] + return qobj.OldStyleQuery(ctx)