X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/20699809e8c9ecdcc4a518d5eed659d6b847b9ef..397b7844524ffe0c512f44d37c0c5a059cc485c8:/lib/locking.py diff --git a/lib/locking.py b/lib/locking.py index 5b7414d..7a23af6 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc. +# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -33,6 +33,7 @@ import weakref import logging import heapq import itertools +import time from ganeti import errors from ganeti import utils @@ -46,6 +47,21 @@ _DELETED_TEXT = "deleted" _DEFAULT_PRIORITY = 0 +#: Minimum timeout required to consider scheduling a pending acquisition +#: (seconds) +_LOCK_ACQUIRE_MIN_TIMEOUT = (1.0 / 1000) + +# Internal lock acquisition modes for L{LockSet} +(_LS_ACQUIRE_EXACT, + _LS_ACQUIRE_ALL, + _LS_ACQUIRE_OPPORTUNISTIC) = range(1, 4) + +_LS_ACQUIRE_MODES = compat.UniqueFrozenset([ + _LS_ACQUIRE_EXACT, + _LS_ACQUIRE_ALL, + _LS_ACQUIRE_OPPORTUNISTIC, + ]) + def ssynchronized(mylock, shared=0): """Shared Synchronization decorator. @@ -161,7 +177,7 @@ class _BaseCondition(object): except AttributeError: self._acquire_restore = self._base_acquire_restore try: - self._is_owned = lock._is_owned + self._is_owned = lock.is_owned except AttributeError: self._is_owned = self._base_is_owned @@ -404,12 +420,13 @@ class SharedLock(object): "__pending_by_prio", "__pending_shared", "__shr", + "__time_fn", "name", ] __condition_class = _PipeConditionWithMode - def __init__(self, name, monitor=None): + def __init__(self, name, monitor=None, _time_fn=time.time): """Construct a new SharedLock. @param name: the name of the lock @@ -421,6 +438,9 @@ class SharedLock(object): self.name = name + # Used for unittesting + self.__time_fn = _time_fn + # Internal lock self.__lock = threading.Lock() @@ -536,7 +556,7 @@ class SharedLock(object): else: return self.__is_exclusive() - def _is_owned(self, shared=-1): + def is_owned(self, shared=-1): """Is the current thread somehow owning the lock at this time? @param shared: @@ -551,7 +571,9 @@ class SharedLock(object): finally: self.__lock.release() - is_owned = _is_owned + #: Necessary to remain compatible with threading.Condition, which tries to + #: retrieve a locks' "_is_owned" attribute + _is_owned = is_owned def _count_pending(self): """Returns the number of pending acquires. @@ -655,6 +677,12 @@ class SharedLock(object): self.__do_acquire(shared) return True + # The lock couldn't be acquired right away, so if a timeout is given and is + # considered too short, return right away as scheduling a pending + # acquisition is quite expensive + if timeout is not None and timeout < _LOCK_ACQUIRE_MIN_TIMEOUT: + return False + prioqueue = self.__pending_by_prio.get(priority, None) if shared: @@ -682,24 +710,27 @@ class SharedLock(object): assert priority not in self.__pending_shared self.__pending_shared[priority] = wait_condition + wait_start = self.__time_fn() + acquired = False + try: # Wait until we become the topmost acquire in the queue or the timeout # expires. - # TODO: Decrease timeout with spurious notifications - while not (self.__is_on_top(wait_condition) and - self.__can_acquire(shared)): - # Wait for notification - wait_condition.wait(timeout) - self.__check_deleted() + while True: + if self.__is_on_top(wait_condition) and self.__can_acquire(shared): + self.__do_acquire(shared) + acquired = True + break - # A lot of code assumes blocking acquires always succeed. Loop - # internally for that case. - if timeout is not None: + # A lot of code assumes blocking acquires always succeed, therefore we + # can never return False for a blocking acquire + if (timeout is not None and + utils.TimeoutExpired(wait_start, timeout, _time_fn=self.__time_fn)): break - if self.__is_on_top(wait_condition) and self.__can_acquire(shared): - self.__do_acquire(shared) - return True + # Wait for notification + wait_condition.wait(timeout) + self.__check_deleted() finally: # Remove condition from queue if there are no more waiters if not wait_condition.has_waiting(): @@ -709,7 +740,7 @@ class SharedLock(object): # (e.g. on lock deletion) self.__pending_shared.pop(priority, None) - return False + return acquired def acquire(self, shared=0, timeout=None, priority=None, test_notify=None): @@ -796,19 +827,38 @@ class SharedLock(object): # Autodetect release type if self.__is_exclusive(): self.__exc = None + notify = True else: self.__shr.remove(threading.currentThread()) + notify = not self.__shr - # Notify topmost condition in queue - (priority, prioqueue) = self.__find_first_pending_queue() - if prioqueue: - cond = prioqueue[0] - cond.notifyAll() - if cond.shared: - # Prevent further shared acquires from sneaking in while waiters are - # notified - self.__pending_shared.pop(priority, None) + # Notify topmost condition in queue if there are no owners left (for + # shared locks) + if notify: + self.__notify_topmost() + finally: + self.__lock.release() + + def __notify_topmost(self): + """Notifies topmost condition in queue of pending acquires. + """ + (priority, prioqueue) = self.__find_first_pending_queue() + if prioqueue: + cond = prioqueue[0] + cond.notifyAll() + if cond.shared: + # Prevent further shared acquires from sneaking in while waiters are + # notified + self.__pending_shared.pop(priority, None) + + def _notify_topmost(self): + """Exported version of L{__notify_topmost}. + + """ + self.__lock.acquire() + try: + return self.__notify_topmost() finally: self.__lock.release() @@ -840,10 +890,10 @@ class SharedLock(object): if not acquired: acquired = self.__acquire_unlocked(0, timeout, priority) + if acquired: assert self.__is_exclusive() and not self.__is_sharer(), \ "Lock wasn't acquired in exclusive mode" - if acquired: self.__deleted = True self.__exc = None @@ -874,6 +924,52 @@ class SharedLock(object): ALL_SET = None +def _TimeoutZero(): + """Returns the number zero. + + """ + return 0 + + +def _GetLsAcquireModeAndTimeouts(want_all, timeout, opportunistic): + """Determines modes and timeouts for L{LockSet.acquire}. + + @type want_all: boolean + @param want_all: Whether all locks in set should be acquired + @param timeout: Timeout in seconds or C{None} + @param opportunistic: Whther locks should be acquired opportunistically + @rtype: tuple + @return: Tuple containing mode to be passed to L{LockSet.__acquire_inner} + (one of L{_LS_ACQUIRE_MODES}), a function to calculate timeout for + acquiring the lockset-internal lock (might be C{None}) and a function to + calculate the timeout for acquiring individual locks + + """ + # Short circuit when no running timeout is needed + if opportunistic and not want_all: + assert timeout is None, "Got timeout for an opportunistic acquisition" + return (_LS_ACQUIRE_OPPORTUNISTIC, None, _TimeoutZero) + + # We need to keep track of how long we spent waiting for a lock. The + # timeout passed to this function is over all lock acquisitions. + running_timeout = utils.RunningTimeout(timeout, False) + + if want_all: + mode = _LS_ACQUIRE_ALL + ls_timeout_fn = running_timeout.Remaining + else: + mode = _LS_ACQUIRE_EXACT + ls_timeout_fn = None + + if opportunistic: + mode = _LS_ACQUIRE_OPPORTUNISTIC + timeout_fn = _TimeoutZero + else: + timeout_fn = running_timeout.Remaining + + return (mode, ls_timeout_fn, timeout_fn) + + class _AcquireTimeout(Exception): """Internal exception to abort an acquire on a timeout. @@ -950,17 +1046,65 @@ class LockSet: """ return self.__lockdict - def _is_owned(self): - """Is the current thread a current level owner?""" + def is_owned(self): + """Is the current thread a current level owner? + + @note: Use L{check_owned} to check if a specific lock is held + + """ return threading.currentThread() in self.__owners + def check_owned(self, names, shared=-1): + """Check if locks are owned in a specific mode. + + @type names: sequence or string + @param names: Lock names (or a single lock name) + @param shared: See L{SharedLock.is_owned} + @rtype: bool + @note: Use L{is_owned} to check if the current thread holds I{any} lock and + L{list_owned} to get the names of all owned locks + + """ + if isinstance(names, basestring): + names = [names] + + # Avoid check if no locks are owned anyway + if names and self.is_owned(): + candidates = [] + + # Gather references to all locks (in case they're deleted in the meantime) + for lname in names: + try: + lock = self.__lockdict[lname] + except KeyError: + raise errors.LockError("Non-existing lock '%s' in set '%s' (it may" + " have been removed)" % (lname, self.name)) + else: + candidates.append(lock) + + return compat.all(lock.is_owned(shared=shared) for lock in candidates) + else: + return False + + def owning_all(self): + """Checks whether current thread owns internal lock. + + Holding the internal lock is equivalent with holding all locks in the set + (the opposite does not necessarily hold as it can not be easily + determined). L{add} and L{remove} require the internal lock. + + @rtype: boolean + + """ + return self.__lock.is_owned() + def _add_owned(self, name=None): """Note the current thread owns the given lock""" if name is None: - if not self._is_owned(): + if not self.is_owned(): self.__owners[threading.currentThread()] = set() else: - if self._is_owned(): + if self.is_owned(): self.__owners[threading.currentThread()].add(name) else: self.__owners[threading.currentThread()] = set([name]) @@ -968,29 +1112,29 @@ class LockSet: def _del_owned(self, name=None): """Note the current thread owns the given lock""" - assert not (name is None and self.__lock._is_owned()), \ + assert not (name is None and self.__lock.is_owned()), \ "Cannot hold internal lock when deleting owner status" if name is not None: self.__owners[threading.currentThread()].remove(name) # Only remove the key if we don't hold the set-lock as well - if (not self.__lock._is_owned() and - not self.__owners[threading.currentThread()]): + if not (self.__lock.is_owned() or + self.__owners[threading.currentThread()]): del self.__owners[threading.currentThread()] - def _list_owned(self): + def list_owned(self): """Get the set of resource names owned by the current thread""" - if self._is_owned(): + if self.is_owned(): return self.__owners[threading.currentThread()].copy() else: return set() def _release_and_delete_owned(self): """Release and delete all resources owned by the current thread""" - for lname in self._list_owned(): + for lname in self.list_owned(): lock = self.__lockdict[lname] - if lock._is_owned(): + if lock.is_owned(): lock.release() self._del_owned(name=lname) @@ -1012,7 +1156,7 @@ class LockSet: # If we don't already own the set-level lock acquired # we'll get it and note we need to release it later. release_lock = False - if not self.__lock._is_owned(): + if not self.__lock.is_owned(): release_lock = True self.__lock.acquire(shared=1) try: @@ -1023,9 +1167,12 @@ class LockSet: return set(result) def acquire(self, names, timeout=None, shared=0, priority=None, - test_notify=None): + opportunistic=False, test_notify=None): """Acquire a set of resource locks. + @note: When acquiring locks opportunistically, any number of locks might + actually be acquired, even zero. + @type names: list of strings (or string) @param names: the names of the locks which shall be acquired (special lock names, or instance/node names) @@ -1033,9 +1180,16 @@ class LockSet: @param shared: whether to acquire in shared mode; by default an exclusive lock will be acquired @type timeout: float or None - @param timeout: Maximum time to acquire all locks + @param timeout: Maximum time to acquire all locks; for opportunistic + acquisitions, a timeout can only be given when C{names} is C{None}, in + which case it is exclusively used for acquiring the L{LockSet}-internal + lock; opportunistic acquisitions don't use a timeout for acquiring + individual locks @type priority: integer @param priority: Priority for acquiring locks + @type opportunistic: boolean + @param opportunistic: Acquire locks opportunistically; use the return value + to determine which locks were actually acquired @type test_notify: callable or None @param test_notify: Special callback function for unittesting @@ -1049,26 +1203,32 @@ class LockSet: assert timeout is None or timeout >= 0.0 # Check we don't already own locks at this level - assert not self._is_owned(), ("Cannot acquire locks in the same set twice" - " (lockset %s)" % self.name) + assert not self.is_owned(), ("Cannot acquire locks in the same set twice" + " (lockset %s)" % self.name) if priority is None: priority = _DEFAULT_PRIORITY - # We need to keep track of how long we spent waiting for a lock. The - # timeout passed to this function is over all lock acquires. - running_timeout = utils.RunningTimeout(timeout, False) - try: if names is not None: + assert timeout is None or not opportunistic, \ + ("Opportunistic acquisitions can only use a timeout if no" + " names are given; see docstring for details") + # Support passing in a single resource to acquire rather than many if isinstance(names, basestring): names = [names] - return self.__acquire_inner(names, False, shared, priority, - running_timeout.Remaining, test_notify) + (mode, _, timeout_fn) = \ + _GetLsAcquireModeAndTimeouts(False, timeout, opportunistic) + + return self.__acquire_inner(names, mode, shared, priority, + timeout_fn, test_notify) else: + (mode, ls_timeout_fn, timeout_fn) = \ + _GetLsAcquireModeAndTimeouts(True, timeout, opportunistic) + # If no names are given acquire the whole set by not letting new names # being added before we release, and getting the current list of names. # Some of them may then be deleted later, but we'll cope with this. @@ -1079,14 +1239,15 @@ class LockSet: # anyway, though, so we'll get the list lock exclusively as well in # order to be able to do add() on the set while owning it. if not self.__lock.acquire(shared=shared, priority=priority, - timeout=running_timeout.Remaining()): + timeout=ls_timeout_fn()): raise _AcquireTimeout() + try: # note we own the set-lock self._add_owned() - return self.__acquire_inner(self.__names(), True, shared, priority, - running_timeout.Remaining, test_notify) + return self.__acquire_inner(self.__names(), mode, shared, + priority, timeout_fn, test_notify) except: # We shouldn't have problems adding the lock to the owners list, but # if we did we'll try to release this lock and re-raise exception. @@ -1098,37 +1259,50 @@ class LockSet: except _AcquireTimeout: return None - def __acquire_inner(self, names, want_all, shared, priority, + def __acquire_inner(self, names, mode, shared, priority, timeout_fn, test_notify): """Inner logic for acquiring a number of locks. + Acquisition modes: + + - C{_LS_ACQUIRE_ALL}: C{names} contains names of all locks in set, but + deleted locks can be ignored as the whole set is being acquired with + its internal lock held + - C{_LS_ACQUIRE_EXACT}: The names listed in C{names} must be acquired; + timeouts and deleted locks are fatal + - C{_LS_ACQUIRE_OPPORTUNISTIC}: C{names} lists names of locks (potentially + all within the set) which should be acquired opportunistically, that is + failures are ignored + @param names: Names of the locks to be acquired - @param want_all: Whether all locks in the set should be acquired + @param mode: Lock acquisition mode (one of L{_LS_ACQUIRE_MODES}) @param shared: Whether to acquire in shared mode - @param timeout_fn: Function returning remaining timeout + @param timeout_fn: Function returning remaining timeout (C{None} for + opportunistic acquisitions) @param priority: Priority for acquiring locks @param test_notify: Special callback function for unittesting """ + assert mode in _LS_ACQUIRE_MODES + acquire_list = [] # First we look the locks up on __lockdict. We have no way of being sure # they will still be there after, but this makes it a lot faster should # just one of them be the already wrong. Using a sorted sequence to prevent # deadlocks. - for lname in sorted(utils.UniqueSequence(names)): + for lname in sorted(frozenset(names)): try: lock = self.__lockdict[lname] # raises KeyError if lock is not there except KeyError: - if want_all: - # We are acquiring all the set, it doesn't matter if this particular - # element is not there anymore. - continue - - raise errors.LockError("Non-existing lock %s in set %s (it may have" - " been removed)" % (lname, self.name)) - - acquire_list.append((lname, lock)) + # We are acquiring the whole set, it doesn't matter if this particular + # element is not there anymore. If, however, only certain names should + # be acquired, not finding a lock is an error. + if mode == _LS_ACQUIRE_EXACT: + raise errors.LockError("Lock '%s' not found in set '%s' (it may have" + " been removed)" % (lname, self.name)) + else: + acquire_list.append((lname, lock)) # This will hold the locknames we effectively acquired. acquired = set() @@ -1153,16 +1327,20 @@ class LockSet: priority=priority, test_notify=test_notify_fn) except errors.LockError: - if want_all: - # We are acquiring all the set, it doesn't matter if this + if mode in (_LS_ACQUIRE_ALL, _LS_ACQUIRE_OPPORTUNISTIC): + # We are acquiring the whole set, it doesn't matter if this # particular element is not there anymore. continue - raise errors.LockError("Non-existing lock %s in set %s (it may" - " have been removed)" % (lname, self.name)) + raise errors.LockError("Lock '%s' not found in set '%s' (it may have" + " been removed)" % (lname, self.name)) if not acq_success: # Couldn't get lock or timeout occurred + if mode == _LS_ACQUIRE_OPPORTUNISTIC: + # Ignore timeouts on opportunistic acquisitions + continue + if timeout is None: # This shouldn't happen as SharedLock.acquire(timeout=None) is # blocking. @@ -1180,7 +1358,7 @@ class LockSet: # We shouldn't have problems adding the lock to the owners list, but # if we did we'll try to release this lock and re-raise exception. # Of course something is going to be really wrong after this. - if lock._is_owned(): + if lock.is_owned(): lock.release() raise @@ -1197,14 +1375,14 @@ class LockSet: The locks must have been acquired in exclusive mode. """ - assert self._is_owned(), ("downgrade on lockset %s while not owning any" - " lock" % self.name) + assert self.is_owned(), ("downgrade on lockset %s while not owning any" + " lock" % self.name) # Support passing in a single resource to downgrade rather than many if isinstance(names, basestring): names = [names] - owned = self._list_owned() + owned = self.list_owned() if names is None: names = owned @@ -1218,12 +1396,12 @@ class LockSet: self.__lockdict[lockname].downgrade() # Do we own the lockset in exclusive mode? - if self.__lock._is_owned(shared=0): + if self.__lock.is_owned(shared=0): # Have all locks been downgraded? - if not compat.any(lock._is_owned(shared=0) + if not compat.any(lock.is_owned(shared=0) for lock in self.__lockdict.values()): self.__lock.downgrade() - assert self.__lock._is_owned(shared=1) + assert self.__lock.is_owned(shared=1) return True @@ -1238,24 +1416,24 @@ class LockSet: (defaults to all the locks acquired at that level). """ - assert self._is_owned(), ("release() on lock set %s while not owner" % - self.name) + assert self.is_owned(), ("release() on lock set %s while not owner" % + self.name) # Support passing in a single resource to release rather than many if isinstance(names, basestring): names = [names] if names is None: - names = self._list_owned() + names = self.list_owned() else: names = set(names) - assert self._list_owned().issuperset(names), ( + assert self.list_owned().issuperset(names), ( "release() on unheld resources %s (set %s)" % - (names.difference(self._list_owned()), self.name)) + (names.difference(self.list_owned()), self.name)) # First of all let's release the "all elements" lock, if set. # After this 'add' can work again - if self.__lock._is_owned(): + if self.__lock.is_owned(): self.__lock.release() self._del_owned() @@ -1277,7 +1455,7 @@ class LockSet: """ # Check we don't already own locks at this level - assert not self._is_owned() or self.__lock._is_owned(shared=0), \ + assert not self.is_owned() or self.__lock.is_owned(shared=0), \ ("Cannot add locks if the set %s is only partially owned, or shared" % self.name) @@ -1288,7 +1466,7 @@ class LockSet: # If we don't already own the set-level lock acquired in an exclusive way # we'll get it and note we need to release it later. release_lock = False - if not self.__lock._is_owned(): + if not self.__lock.is_owned(): release_lock = True self.__lock.acquire() @@ -1351,7 +1529,7 @@ class LockSet: # If we own any subset of this lock it must be a superset of what we want # to delete. The ownership must also be exclusive, but that will be checked # by the lock itself. - assert not self._is_owned() or self._list_owned().issuperset(names), ( + assert not self.is_owned() or self.list_owned().issuperset(names), ( "remove() on acquired lockset %s while not owning all elements" % self.name) @@ -1368,8 +1546,8 @@ class LockSet: removed.append(lname) except (KeyError, errors.LockError): # This cannot happen if we were already holding it, verify: - assert not self._is_owned(), ("remove failed while holding lockset %s" - % self.name) + assert not self.is_owned(), ("remove failed while holding lockset %s" % + self.name) else: # If no LockError was raised we are the ones who deleted the lock. # This means we can safely remove it from lockdict, as any further or @@ -1380,43 +1558,71 @@ class LockSet: # it's the job of the one who actually deleted it. del self.__lockdict[lname] # And let's remove it from our private list if we owned it. - if self._is_owned(): + if self.is_owned(): self._del_owned(name=lname) return removed -# Locking levels, must be acquired in increasing order. -# Current rules are: -# - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be -# acquired before performing any operation, either in shared or in exclusive -# mode. acquiring the BGL in exclusive mode is discouraged and should be -# avoided. -# - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks. -# If you need more than one node, or more than one instance, acquire them at -# the same time. -LEVEL_CLUSTER = 0 -LEVEL_INSTANCE = 1 -LEVEL_NODEGROUP = 2 -LEVEL_NODE = 3 - -LEVELS = [LEVEL_CLUSTER, - LEVEL_INSTANCE, - LEVEL_NODEGROUP, - LEVEL_NODE] +# Locking levels, must be acquired in increasing order. Current rules are: +# - At level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be +# acquired before performing any operation, either in shared or exclusive +# mode. Acquiring the BGL in exclusive mode is discouraged and should be +# avoided.. +# - At levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks. If +# you need more than one node, or more than one instance, acquire them at the +# same time. +# - LEVEL_NODE_RES is for node resources and should be used by operations with +# possibly high impact on the node's disks. +# - LEVEL_NODE_ALLOC blocks instance allocations for the whole cluster +# ("NAL" is the only lock at this level). It should be acquired in shared +# mode when an opcode blocks all or a significant amount of a cluster's +# locks. Opcodes doing instance allocations should acquire in exclusive mode. +# Once the set of acquired locks for an opcode has been reduced to the working +# set, the NAL should be released as well to allow allocations to proceed. +(LEVEL_CLUSTER, + LEVEL_INSTANCE, + LEVEL_NODE_ALLOC, + LEVEL_NODEGROUP, + LEVEL_NODE, + LEVEL_NODE_RES, + LEVEL_NETWORK) = range(0, 7) + +LEVELS = [ + LEVEL_CLUSTER, + LEVEL_INSTANCE, + LEVEL_NODE_ALLOC, + LEVEL_NODEGROUP, + LEVEL_NODE, + LEVEL_NODE_RES, + LEVEL_NETWORK, + ] # Lock levels which are modifiable -LEVELS_MOD = [LEVEL_NODE, LEVEL_NODEGROUP, LEVEL_INSTANCE] - +LEVELS_MOD = compat.UniqueFrozenset([ + LEVEL_NODE_RES, + LEVEL_NODE, + LEVEL_NODEGROUP, + LEVEL_INSTANCE, + LEVEL_NETWORK, + ]) + +#: Lock level names (make sure to use singular form) LEVEL_NAMES = { LEVEL_CLUSTER: "cluster", LEVEL_INSTANCE: "instance", + LEVEL_NODE_ALLOC: "node-alloc", LEVEL_NODEGROUP: "nodegroup", LEVEL_NODE: "node", + LEVEL_NODE_RES: "node-res", + LEVEL_NETWORK: "network", } # Constant for the big ganeti lock -BGL = 'BGL' +BGL = "BGL" + +#: Node allocation lock +NAL = "NAL" class GanetiLockManager: @@ -1430,15 +1636,15 @@ class GanetiLockManager: """ _instance = None - def __init__(self, nodes, nodegroups, instances): + def __init__(self, node_uuids, nodegroups, instance_names, networks): """Constructs a new GanetiLockManager object. There should be only a GanetiLockManager object at any time, so this function raises an error if this is not the case. - @param nodes: list of node names + @param node_uuids: list of node UUIDs @param nodegroups: list of nodegroup uuids - @param instances: list of instance names + @param instance_names: list of instance names """ assert self.__class__._instance is None, \ @@ -1451,13 +1657,20 @@ class GanetiLockManager: # The keyring contains all the locks, at their level and in the correct # locking order. self.__keyring = { - LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor), - LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor), - LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroups", monitor=self._monitor), - LEVEL_INSTANCE: LockSet(instances, "instances", + LEVEL_CLUSTER: LockSet([BGL], "cluster", monitor=self._monitor), + LEVEL_NODE: LockSet(node_uuids, "node", monitor=self._monitor), + LEVEL_NODE_RES: LockSet(node_uuids, "node-res", monitor=self._monitor), + LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroup", monitor=self._monitor), + LEVEL_INSTANCE: LockSet(instance_names, "instance", monitor=self._monitor), + LEVEL_NETWORK: LockSet(networks, "network", monitor=self._monitor), + LEVEL_NODE_ALLOC: LockSet([NAL], "node-alloc", monitor=self._monitor), } + assert compat.all(ls.name == LEVEL_NAMES[level] + for (level, ls) in self.__keyring.items()), \ + "Keyring name mismatch" + def AddToLockMonitor(self, provider): """Registers a new lock with the monitor. @@ -1474,14 +1687,6 @@ class GanetiLockManager: """ return self._monitor.QueryLocks(fields) - def OldStyleQueryLocks(self, fields): - """Queries information from all locks, returning old-style data. - - See L{LockMonitor.OldStyleQueryLocks}. - - """ - return self._monitor.OldStyleQueryLocks(fields) - def _names(self, level): """List the lock names at the given level. @@ -1493,21 +1698,33 @@ class GanetiLockManager: assert level in LEVELS, "Invalid locking level %s" % level return self.__keyring[level]._names() - def _is_owned(self, level): + def is_owned(self, level): """Check whether we are owning locks at the given level """ - return self.__keyring[level]._is_owned() + return self.__keyring[level].is_owned() - is_owned = _is_owned - - def _list_owned(self, level): + def list_owned(self, level): """Get the set of owned locks at the given level """ - return self.__keyring[level]._list_owned() + return self.__keyring[level].list_owned() + + def check_owned(self, level, names, shared=-1): + """Check if locks at a certain level are owned in a specific mode. + + @see: L{LockSet.check_owned} + + """ + return self.__keyring[level].check_owned(names, shared=shared) - list_owned = _list_owned + def owning_all(self, level): + """Checks whether current thread owns all locks at a certain level. + + @see: L{LockSet.owning_all} + + """ + return self.__keyring[level].owning_all() def _upper_owned(self, level): """Check that we don't own any lock at a level greater than the given one. @@ -1515,7 +1732,7 @@ class GanetiLockManager: """ # This way of checking only works if LEVELS[i] = i, which we check for in # the test cases. - return compat.any((self._is_owned(l) for l in LEVELS[level + 1:])) + return compat.any((self.is_owned(l) for l in LEVELS[level + 1:])) def _BGL_owned(self): # pylint: disable=C0103 """Check if the current thread owns the BGL. @@ -1523,7 +1740,7 @@ class GanetiLockManager: Both an exclusive or a shared acquisition work. """ - return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned() + return BGL in self.__keyring[LEVEL_CLUSTER].list_owned() @staticmethod def _contains_BGL(level, names): # pylint: disable=C0103 @@ -1535,7 +1752,8 @@ class GanetiLockManager: """ return level == LEVEL_CLUSTER and (names is None or BGL in names) - def acquire(self, level, names, timeout=None, shared=0, priority=None): + def acquire(self, level, names, timeout=None, shared=0, priority=None, + opportunistic=False): """Acquire a set of resource locks, at the same level. @type level: member of locking.LEVELS @@ -1550,6 +1768,9 @@ class GanetiLockManager: @param timeout: Maximum time to acquire all locks @type priority: integer @param priority: Priority for acquiring lock + @type opportunistic: boolean + @param opportunistic: Acquire locks opportunistically; use the return value + to determine which locks were actually acquired """ assert level in LEVELS, "Invalid locking level %s" % level @@ -1561,15 +1782,16 @@ class GanetiLockManager: # point in acquiring any other lock, unless perhaps we are half way through # the migration of the current opcode. assert (self._contains_BGL(level, names) or self._BGL_owned()), ( - "You must own the Big Ganeti Lock before acquiring any other") + "You must own the Big Ganeti Lock before acquiring any other") # Check we don't own locks at the same or upper levels. assert not self._upper_owned(level), ("Cannot acquire locks at a level" - " while owning some at a greater one") + " while owning some at a greater one") # Acquire the locks in the set. return self.__keyring[level].acquire(names, shared=shared, timeout=timeout, - priority=priority) + priority=priority, + opportunistic=opportunistic) def downgrade(self, level, names=None): """Downgrade a set of resource locks from exclusive to shared mode. @@ -1603,10 +1825,10 @@ class GanetiLockManager: assert level in LEVELS, "Invalid locking level %s" % level assert (not self._contains_BGL(level, names) or not self._upper_owned(LEVEL_CLUSTER)), ( - "Cannot release the Big Ganeti Lock while holding something" - " at upper levels (%r)" % - (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i)) - for i in self.__keyring.keys()]), )) + "Cannot release the Big Ganeti Lock while holding something" + " at upper levels (%r)" % + (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self.list_owned(i)) + for i in self.__keyring.keys()]), )) # Release will complain if we don't own the locks already return self.__keyring[level].release(names) @@ -1626,9 +1848,9 @@ class GanetiLockManager: """ assert level in LEVELS_MOD, "Invalid or immutable level %s" % level assert self._BGL_owned(), ("You must own the BGL before performing other" - " operations") + " operations") assert not self._upper_owned(level), ("Cannot add locks at a level" - " while owning some at a greater one") + " while owning some at a greater one") return self.__keyring[level].add(names, acquired=acquired, shared=shared) def remove(self, level, names): @@ -1646,11 +1868,11 @@ class GanetiLockManager: """ assert level in LEVELS_MOD, "Invalid or immutable level %s" % level assert self._BGL_owned(), ("You must own the BGL before performing other" - " operations") + " operations") # Check we either own the level or don't own anything from here # up. LockSet.remove() will check the case in which we don't own # all the needed resources, or we have a shared ownership. - assert self._is_owned(level) or not self._upper_owned(level), ( + assert self.is_owned(level) or not self._upper_owned(level), ( "Cannot remove locks at a level while not owning it or" " owning some at a greater one") return self.__keyring[level].remove(names) @@ -1751,14 +1973,3 @@ class LockMonitor(object): # Prepare query response return query.GetQueryResponse(qobj, ctx) - - def OldStyleQueryLocks(self, fields): - """Queries information from all locks, returning old-style data. - - @type fields: list of strings - @param fields: List of fields to return - - """ - (qobj, ctx) = self._Query(fields) - - return qobj.OldStyleQuery(ctx)