X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/5e0a6daf01e7cb3f1d420fee35c3006b20bfbc5e..05b35f15d87b1c367e62e97c39cbb0b402b0d9c3:/lib/locking.py diff --git a/lib/locking.py b/lib/locking.py index a5ce5fa..3e25917 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -20,6 +20,11 @@ """Module implementing the Ganeti locking code.""" +# pylint: disable-msg=W0212 + +# W0212 since e.g. LockSet methods use (a lot) the internals of +# SharedLock + import os import select import threading @@ -28,6 +33,7 @@ import errno from ganeti import errors from ganeti import utils +from ganeti import compat def ssynchronized(lock, shared=0): @@ -49,21 +55,71 @@ def ssynchronized(lock, shared=0): return wrap -class _SingleActionPipeConditionWaiter(object): - """Callable helper class for _SingleActionPipeCondition. +class RunningTimeout(object): + """Class to calculate remaining timeout when doing several operations. + + """ + __slots__ = [ + "_allow_negative", + "_start_time", + "_time_fn", + "_timeout", + ] + + def __init__(self, timeout, allow_negative, _time_fn=time.time): + """Initializes this class. + + @type timeout: float + @param timeout: Timeout duration + @type allow_negative: bool + @param allow_negative: Whether to return values below zero + @param _time_fn: Time function for unittests + + """ + object.__init__(self) + + if timeout is not None and timeout < 0.0: + raise ValueError("Timeout must not be negative") + + self._timeout = timeout + self._allow_negative = allow_negative + self._time_fn = _time_fn + + self._start_time = None + + def Remaining(self): + """Returns the remaining timeout. + + """ + if self._timeout is None: + return None + + # Get start time on first calculation + if self._start_time is None: + self._start_time = self._time_fn() + + # Calculate remaining time + remaining_timeout = self._start_time + self._timeout - self._time_fn() + + if not self._allow_negative: + # Ensure timeout is always >= 0 + return max(0.0, remaining_timeout) + + return remaining_timeout + + +class _SingleNotifyPipeConditionWaiter(object): + """Helper class for SingleNotifyPipeCondition """ __slots__ = [ - "_cond", "_fd", "_poller", ] - def __init__(self, cond, poller, fd): - """Initializes this class. + def __init__(self, poller, fd): + """Constructor for _SingleNotifyPipeConditionWaiter - @type cond: L{_SingleActionPipeCondition} - @param cond: Parent condition @type poller: select.poll @param poller: Poller object @type fd: int @@ -71,8 +127,6 @@ class _SingleActionPipeConditionWaiter(object): """ object.__init__(self) - - self._cond = cond self._poller = poller self._fd = fd @@ -83,10 +137,18 @@ class _SingleActionPipeConditionWaiter(object): @param timeout: Timeout for waiting (can be None) """ - start_time = time.time() - remaining_time = timeout + running_timeout = RunningTimeout(timeout, True) + + while True: + remaining_time = running_timeout.Remaining() + + if remaining_time is not None: + if remaining_time < 0.0: + break + + # Our calculation uses seconds, poll() wants milliseconds + remaining_time *= 1000 - while timeout is None or remaining_time > 0: try: result = self._poller.poll(remaining_time) except EnvironmentError, err: @@ -98,110 +160,97 @@ class _SingleActionPipeConditionWaiter(object): if result and result[0][0] == self._fd: break - # Re-calculate timeout if necessary - if timeout is not None: - remaining_time = start_time + timeout - time.time() - -class _SingleActionPipeCondition(object): - """Wrapper around a pipe for usage inside conditions. +class _BaseCondition(object): + """Base class containing common code for conditions. - This class contains a POSIX pipe(2) and a poller to poll it. The pipe is - always allocated when constructing the class. Extra care is taken to always - close the file descriptors. - - An additional class, L{_SingleActionPipeConditionWaiter}, is used to wait for - notifications. - - Warning: This class is designed to be used as the underlying component of a - locking condition, but is not by itself thread safe, and needs to be - protected by an external lock. + Some of this code is taken from python's threading module. """ __slots__ = [ - "_poller", - "_read_fd", - "_write_fd", - "_nwaiters", + "_lock", + "acquire", + "release", ] - _waiter_class = _SingleActionPipeConditionWaiter + def __init__(self, lock): + """Constructor for _BaseCondition. - def __init__(self): - """Initializes this class. + @type lock: threading.Lock + @param lock: condition base lock """ object.__init__(self) - self._nwaiters = 0 - - # Just assume the unpacking is successful, otherwise error handling gets - # very complicated. - (self._read_fd, self._write_fd) = os.pipe() - try: - # The poller looks for closure of the write side - poller = select.poll() - poller.register(self._read_fd, select.POLLHUP) - - self._poller = poller - except: - if self._read_fd is not None: - os.close(self._read_fd) - if self._write_fd is not None: - os.close(self._write_fd) - raise + # Recursive locks are not supported + assert not hasattr(lock, "_acquire_restore") + assert not hasattr(lock, "_release_save") - # There should be no code here anymore, otherwise the pipe file descriptors - # may be not be cleaned up properly in case of errors. + self._lock = lock - def StartWaiting(self): - """Return function to wait for notification. + # Export the lock's acquire() and release() methods + self.acquire = lock.acquire + self.release = lock.release - @rtype: L{_SingleActionPipeConditionWaiter} - @return: Function to wait for notification + def _is_owned(self): + """Check whether lock is owned by current thread. """ - assert self._nwaiters >= 0 + if self._lock.acquire(0): + self._lock.release() + return False - if self._poller is None: - raise RuntimeError("Already cleaned up") + return True - # Create waiter function and increase number of waiters - wait_fn = self._waiter_class(self, self._poller, self._read_fd) - self._nwaiters += 1 - return wait_fn + def _check_owned(self): + """Raise an exception if the current thread doesn't own the lock. - def DoneWaiting(self): - """Decrement number of waiters and automatic cleanup. + """ + if not self._is_owned(): + raise RuntimeError("cannot work with un-aquired lock") - Must be called after waiting for a notification. - @rtype: bool - @return: Whether this was the last waiter +class SingleNotifyPipeCondition(_BaseCondition): + """Condition which can only be notified once. - """ - assert self._nwaiters > 0 + This condition class uses pipes and poll, internally, to be able to wait for + notification with a timeout, without resorting to polling. It is almost + compatible with Python's threading.Condition, with the following differences: + - notifyAll can only be called once, and no wait can happen after that + - notify is not supported, only notifyAll - self._nwaiters -= 1 + """ - if self._nwaiters == 0: - self._Cleanup() - return True + __slots__ = [ + "_poller", + "_read_fd", + "_write_fd", + "_nwaiters", + "_notified", + ] - return False + _waiter_class = _SingleNotifyPipeConditionWaiter - def notifyAll(self): - """Close the writing side of the pipe to notify all waiters. + def __init__(self, lock): + """Constructor for SingleNotifyPipeCondition """ - if self._write_fd is None: - raise RuntimeError("Can only notify once") - - os.close(self._write_fd) + _BaseCondition.__init__(self, lock) + self._nwaiters = 0 + self._notified = False + self._read_fd = None self._write_fd = None + self._poller = None + + def _check_unnotified(self): + """Throws an exception if already notified. + + """ + if self._notified: + raise RuntimeError("cannot use already notified condition") def _Cleanup(self): - """Close all file descriptors. + """Cleanup open file descriptors, if any. """ if self._read_fd is not None: @@ -211,19 +260,51 @@ class _SingleActionPipeCondition(object): if self._write_fd is not None: os.close(self._write_fd) self._write_fd = None - self._poller = None - def __del__(self): - """Called on object deletion. + def wait(self, timeout=None): + """Wait for a notification. - Ensure no file descriptors are left open. + @type timeout: float or None + @param timeout: Waiting timeout (can be None) """ - self._Cleanup() + self._check_owned() + self._check_unnotified() + self._nwaiters += 1 + try: + if self._poller is None: + (self._read_fd, self._write_fd) = os.pipe() + self._poller = select.poll() + self._poller.register(self._read_fd, select.POLLHUP) -class _PipeCondition(object): + wait_fn = self._waiter_class(self._poller, self._read_fd) + self.release() + try: + # Wait for notification + wait_fn(timeout) + finally: + # Re-acquire lock + self.acquire() + finally: + self._nwaiters -= 1 + if self._nwaiters == 0: + self._Cleanup() + + def notifyAll(self): # pylint: disable-msg=C0103 + """Close the writing side of the pipe to notify all waiters. + + """ + self._check_owned() + self._check_unnotified() + self._notified = True + if self._write_fd is not None: + os.close(self._write_fd) + self._write_fd = None + + +class PipeCondition(_BaseCondition): """Group-only non-polling condition with counters. This condition class uses pipes and poll, internally, to be able to wait for @@ -234,50 +315,19 @@ class _PipeCondition(object): """ __slots__ = [ - "_lock", "_nwaiters", - "_pipe", - "acquire", - "release", + "_single_condition", ] - _pipe_class = _SingleActionPipeCondition + _single_condition_class = SingleNotifyPipeCondition def __init__(self, lock): """Initializes this class. """ - object.__init__(self) - - # Recursive locks are not supported - assert not hasattr(lock, "_acquire_restore") - assert not hasattr(lock, "_release_save") - - self._lock = lock - - # Export the lock's acquire() and release() methods - self.acquire = lock.acquire - self.release = lock.release - + _BaseCondition.__init__(self, lock) self._nwaiters = 0 - self._pipe = None - - def _is_owned(self): - """Check whether lock is owned by current thread. - - """ - if self._lock.acquire(0): - self._lock.release() - return False - - return True - - def _check_owned(self): - """Raise an exception if the current thread doesn't own the lock. - - """ - if not self._is_owned(): - raise RuntimeError("cannot work with un-aquired lock") + self._single_condition = self._single_condition_class(self._lock) def wait(self, timeout=None): """Wait for a notification. @@ -288,47 +338,25 @@ class _PipeCondition(object): """ self._check_owned() - if not self._pipe: - self._pipe = self._pipe_class() - # Keep local reference to the pipe. It could be replaced by another thread # notifying while we're waiting. - pipe = self._pipe + my_condition = self._single_condition assert self._nwaiters >= 0 self._nwaiters += 1 try: - # Get function to wait on the pipe - wait_fn = pipe.StartWaiting() - try: - # Release lock while waiting - self.release() - try: - # Wait for notification - wait_fn(timeout) - finally: - # Re-acquire lock - self.acquire() - finally: - # Destroy pipe if this was the last waiter and the current pipe is - # still the same. The same pipe cannot be reused after cleanup. - if pipe.DoneWaiting() and pipe == self._pipe: - self._pipe = None + my_condition.wait(timeout) finally: assert self._nwaiters > 0 self._nwaiters -= 1 - def notifyAll(self): + def notifyAll(self): # pylint: disable-msg=C0103 """Notify all currently waiting threads. """ self._check_owned() - - # Notify and forget pipe. A new one will be created on the next call to - # wait. - if self._pipe is not None: - self._pipe.notifyAll() - self._pipe = None + self._single_condition.notifyAll() + self._single_condition = self._single_condition_class(self._lock) def has_waiting(self): """Returns whether there are active waiters. @@ -359,7 +387,7 @@ class _CountingCondition(object): self._cond = threading.Condition(lock=lock) self._nwaiters = 0 - def notifyAll(self): + def notifyAll(self): # pylint: disable-msg=C0103 """Notifies the condition. """ @@ -369,7 +397,7 @@ class _CountingCondition(object): """Waits for the condition to be notified. @type timeout: float or None - @param timeout: Timeout in seconds + @param timeout: Waiting timeout (can be None) """ assert self._nwaiters >= 0 @@ -409,7 +437,7 @@ class SharedLock(object): "__shr", ] - __condition_class = _PipeCondition + __condition_class = PipeCondition def __init__(self): """Construct a new SharedLock. @@ -520,7 +548,7 @@ class SharedLock(object): """ return self.__pending[0] == cond - def __acquire_unlocked(self, shared=0, timeout=None): + def __acquire_unlocked(self, shared, timeout): """Acquire a shared lock. @param shared: whether to acquire in shared mode; by default an @@ -574,18 +602,24 @@ class SharedLock(object): return False - def acquire(self, shared=0, timeout=None): + def acquire(self, shared=0, timeout=None, test_notify=None): """Acquire a shared lock. - @type shared: int + @type shared: integer (0/1) used as a boolean @param shared: whether to acquire in shared mode; by default an exclusive lock will be acquired @type timeout: float @param timeout: maximum waiting time before giving up + @type test_notify: callable or None + @param test_notify: Special callback function for unittesting """ self.__lock.acquire() try: + # We already got the lock, notify now + if __debug__ and callable(test_notify): + test_notify() + return self.__acquire_unlocked(shared, timeout) finally: self.__lock.release() @@ -641,7 +675,10 @@ class SharedLock(object): acquired = self.__is_exclusive() if not acquired: - acquired = self.__acquire_unlocked(timeout) + acquired = self.__acquire_unlocked(0, timeout) + + assert self.__is_exclusive() and not self.__is_sharer(), \ + "Lock wasn't acquired in exclusive mode" if acquired: self.__deleted = True @@ -661,6 +698,12 @@ class SharedLock(object): ALL_SET = None +class _AcquireTimeout(Exception): + """Internal exception to abort an acquire on a timeout. + + """ + + class LockSet: """Implements a set of locks. @@ -675,6 +718,7 @@ class LockSet: def __init__(self, members=None): """Constructs a new LockSet. + @type members: list of strings @param members: initial members of the set """ @@ -716,6 +760,9 @@ class LockSet: def _del_owned(self, name=None): """Note the current thread owns the given lock""" + assert not (name is None and self.__lock._is_owned()), \ + "Cannot hold internal lock when deleting owner status" + if name is not None: self.__owners[threading.currentThread()].remove(name) @@ -731,6 +778,14 @@ class LockSet: else: return set() + def _release_and_delete_owned(self): + """Release and delete all resources owned by the current thread""" + for lname in self._list_owned(): + lock = self.__lockdict[lname] + if lock._is_owned(): + lock.release() + self._del_owned(name=lname) + def __names(self): """Return the current set of names. @@ -759,109 +814,158 @@ class LockSet: self.__lock.release() return set(result) - def acquire(self, names, timeout=None, shared=0): + def acquire(self, names, timeout=None, shared=0, test_notify=None): """Acquire a set of resource locks. + @type names: list of strings (or string) @param names: the names of the locks which shall be acquired (special lock names, or instance/node names) + @type shared: integer (0/1) used as a boolean @param shared: whether to acquire in shared mode; by default an exclusive lock will be acquired - @type timeout: float + @type timeout: float or None @param timeout: Maximum time to acquire all locks + @type test_notify: callable or None + @param test_notify: Special callback function for unittesting - @return: True when all the locks are successfully acquired + @return: Set of all locks successfully acquired or None in case of timeout @raise errors.LockError: when any lock we try to acquire has been deleted before we succeed. In this case none of the locks requested will be acquired. """ - if timeout is not None: - raise NotImplementedError + assert timeout is None or timeout >= 0.0 # Check we don't already own locks at this level assert not self._is_owned(), "Cannot acquire locks in the same set twice" - if names is None: - # If no names are given acquire the whole set by not letting new names - # being added before we release, and getting the current list of names. - # Some of them may then be deleted later, but we'll cope with this. - # - # We'd like to acquire this lock in a shared way, as it's nice if - # everybody else can use the instances at the same time. If are acquiring - # them exclusively though they won't be able to do this anyway, though, - # so we'll get the list lock exclusively as well in order to be able to - # do add() on the set while owning it. - self.__lock.acquire(shared=shared) - try: - # note we own the set-lock - self._add_owned() - names = self.__names() - except: - # We shouldn't have problems adding the lock to the owners list, but - # if we did we'll try to release this lock and re-raise exception. - # Of course something is going to be really wrong, after this. - self.__lock.release() - raise + # We need to keep track of how long we spent waiting for a lock. The + # timeout passed to this function is over all lock acquires. + running_timeout = RunningTimeout(timeout, False) try: - # Support passing in a single resource to acquire rather than many - if isinstance(names, basestring): - names = [names] + if names is not None: + # Support passing in a single resource to acquire rather than many + if isinstance(names, basestring): + names = [names] + + return self.__acquire_inner(names, False, shared, + running_timeout.Remaining, test_notify) + else: - names = sorted(names) + # If no names are given acquire the whole set by not letting new names + # being added before we release, and getting the current list of names. + # Some of them may then be deleted later, but we'll cope with this. + # + # We'd like to acquire this lock in a shared way, as it's nice if + # everybody else can use the instances at the same time. If are + # acquiring them exclusively though they won't be able to do this + # anyway, though, so we'll get the list lock exclusively as well in + # order to be able to do add() on the set while owning it. + if not self.__lock.acquire(shared=shared, + timeout=running_timeout.Remaining()): + raise _AcquireTimeout() + try: + # note we own the set-lock + self._add_owned() + + return self.__acquire_inner(self.__names(), True, shared, + running_timeout.Remaining, test_notify) + except: + # We shouldn't have problems adding the lock to the owners list, but + # if we did we'll try to release this lock and re-raise exception. + # Of course something is going to be really wrong, after this. + self.__lock.release() + self._del_owned() + raise + + except _AcquireTimeout: + return None + + def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify): + """Inner logic for acquiring a number of locks. + + @param names: Names of the locks to be acquired + @param want_all: Whether all locks in the set should be acquired + @param shared: Whether to acquire in shared mode + @param timeout_fn: Function returning remaining timeout + @param test_notify: Special callback function for unittesting + + """ + acquire_list = [] + + # First we look the locks up on __lockdict. We have no way of being sure + # they will still be there after, but this makes it a lot faster should + # just one of them be the already wrong. Using a sorted sequence to prevent + # deadlocks. + for lname in sorted(utils.UniqueSequence(names)): + try: + lock = self.__lockdict[lname] # raises KeyError if lock is not there + except KeyError: + if want_all: + # We are acquiring all the set, it doesn't matter if this particular + # element is not there anymore. + continue + + raise errors.LockError("Non-existing lock in set (%s)" % lname) + + acquire_list.append((lname, lock)) + + # This will hold the locknames we effectively acquired. + acquired = set() + + try: + # Now acquire_list contains a sorted list of resources and locks we + # want. In order to get them we loop on this (private) list and + # acquire() them. We gave no real guarantee they will still exist till + # this is done but .acquire() itself is safe and will alert us if the + # lock gets deleted. + for (lname, lock) in acquire_list: + if __debug__ and callable(test_notify): + test_notify_fn = lambda: test_notify(lname) + else: + test_notify_fn = None + + timeout = timeout_fn() - acquire_list = [] - # First we look the locks up on __lockdict. We have no way of being sure - # they will still be there after, but this makes it a lot faster should - # just one of them be the already wrong - for lname in utils.UniqueSequence(names): try: - lock = self.__lockdict[lname] # raises KeyError if lock is not there - acquire_list.append((lname, lock)) - except (KeyError): - if self.__lock._is_owned(): + # raises LockError if the lock was deleted + acq_success = lock.acquire(shared=shared, timeout=timeout, + test_notify=test_notify_fn) + except errors.LockError: + if want_all: # We are acquiring all the set, it doesn't matter if this # particular element is not there anymore. continue - else: - raise errors.LockError('non-existing lock in set (%s)' % lname) - - # This will hold the locknames we effectively acquired. - acquired = set() - # Now acquire_list contains a sorted list of resources and locks we want. - # In order to get them we loop on this (private) list and acquire() them. - # We gave no real guarantee they will still exist till this is done but - # .acquire() itself is safe and will alert us if the lock gets deleted. - for (lname, lock) in acquire_list: + + raise errors.LockError("Non-existing lock in set (%s)" % lname) + + if not acq_success: + # Couldn't get lock or timeout occurred + if timeout is None: + # This shouldn't happen as SharedLock.acquire(timeout=None) is + # blocking. + raise errors.LockError("Failed to get lock %s" % lname) + + raise _AcquireTimeout() + try: - lock.acquire(shared=shared) # raises LockError if the lock is deleted # now the lock cannot be deleted, we have it! self._add_owned(name=lname) acquired.add(lname) - except (errors.LockError): - if self.__lock._is_owned(): - # We are acquiring all the set, it doesn't matter if this - # particular element is not there anymore. - continue - else: - name_fail = lname - for lname in self._list_owned(): - self.__lockdict[lname].release() - self._del_owned(name=lname) - raise errors.LockError('non-existing lock in set (%s)' % name_fail) + except: # We shouldn't have problems adding the lock to the owners list, but # if we did we'll try to release this lock and re-raise exception. - # Of course something is going to be really wrong, after this. + # Of course something is going to be really wrong after this. if lock._is_owned(): lock.release() raise except: - # If something went wrong and we had the set-lock let's release it... - if self.__lock._is_owned(): - self.__lock.release() + # Release all owned locks + self._release_and_delete_owned() raise return acquired @@ -872,6 +976,7 @@ class LockSet: You must have acquired the locks, either in shared or in exclusive mode, before releasing them. + @type names: list of strings, or None @param names: the names of the locks which shall be released (defaults to all the locks acquired at that level). @@ -905,8 +1010,11 @@ class LockSet: def add(self, names, acquired=0, shared=0): """Add a new set of elements to the set + @type names: list of strings @param names: names of the new elements to add + @type acquired: integer (0/1) used as a boolean @param acquired: pre-acquire the new resource? + @type shared: integer (0/1) used as a boolean @param shared: is the pre-acquisition shared? """ @@ -966,9 +1074,10 @@ class LockSet: You can either not hold anything in the lockset or already hold a superset of the elements you want to delete, exclusively. + @type names: list of strings @param names: names of the resource to remove. - @return:: a list of locks which we removed; the list is always + @return: a list of locks which we removed; the list is always equal to the names list if we were holding all the locks exclusively @@ -1108,9 +1217,9 @@ class GanetiLockManager: """ # This way of checking only works if LEVELS[i] = i, which we check for in # the test cases. - return utils.any((self._is_owned(l) for l in LEVELS[level + 1:])) + return compat.any((self._is_owned(l) for l in LEVELS[level + 1:])) - def _BGL_owned(self): + def _BGL_owned(self): # pylint: disable-msg=C0103 """Check if the current thread owns the BGL. Both an exclusive or a shared acquisition work. @@ -1118,7 +1227,8 @@ class GanetiLockManager: """ return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned() - def _contains_BGL(self, level, names): + @staticmethod + def _contains_BGL(level, names): # pylint: disable-msg=C0103 """Check if the level contains the BGL. Check if acting on the given level and set of names will change @@ -1130,10 +1240,12 @@ class GanetiLockManager: def acquire(self, level, names, timeout=None, shared=0): """Acquire a set of resource locks, at the same level. - @param level: the level at which the locks shall be acquired; - it must be a member of LEVELS. + @type level: member of locking.LEVELS + @param level: the level at which the locks shall be acquired + @type names: list of strings (or string) @param names: the names of the locks which shall be acquired (special lock names, or instance/node names) + @type shared: integer (0/1) used as a boolean @param shared: whether to acquire in shared mode; by default an exclusive lock will be acquired @type timeout: float @@ -1164,8 +1276,9 @@ class GanetiLockManager: You must have acquired the locks, either in shared or in exclusive mode, before releasing them. - @param level: the level at which the locks shall be released; - it must be a member of LEVELS + @type level: member of locking.LEVELS + @param level: the level at which the locks shall be released + @type names: list of strings, or None @param names: the names of the locks which shall be released (defaults to all the locks acquired at that level) @@ -1174,7 +1287,9 @@ class GanetiLockManager: assert (not self._contains_BGL(level, names) or not self._upper_owned(LEVEL_CLUSTER)), ( "Cannot release the Big Ganeti Lock while holding something" - " at upper levels") + " at upper levels (%r)" % + (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i)) + for i in self.__keyring.keys()]), )) # Release will complain if we don't own the locks already return self.__keyring[level].release(names) @@ -1182,10 +1297,13 @@ class GanetiLockManager: def add(self, level, names, acquired=0, shared=0): """Add locks at the specified level. - @param level: the level at which the locks shall be added; - it must be a member of LEVELS_MOD. + @type level: member of locking.LEVELS_MOD + @param level: the level at which the locks shall be added + @type names: list of strings @param names: names of the locks to acquire + @type acquired: integer (0/1) used as a boolean @param acquired: whether to acquire the newly added locks + @type shared: integer (0/1) used as a boolean @param shared: whether the acquisition will be shared """ @@ -1202,8 +1320,9 @@ class GanetiLockManager: You must either already own the locks you are trying to remove exclusively or not own any lock at an upper level. - @param level: the level at which the locks shall be removed; - it must be a member of LEVELS_MOD + @type level: member of locking.LEVELS_MOD + @param level: the level at which the locks shall be removed + @type names: list of strings @param names: the names of the locks which shall be removed (special lock names, or instance/node names)