From: Guido Trotter Date: Thu, 28 Feb 2008 15:06:19 +0000 (+0000) Subject: LockSet implementation and unit tests X-Git-Tag: v2.0.0alpha0~663 X-Git-Url: https://code.grnet.gr/git/ganeti-local/commitdiff_plain/aaae9bc0033f02780f90332eec82db3ed018d9b9?hp=f3b100e1e1d1ac760d8e13d60c278b09f5d03254 LockSet implementation and unit tests A LockSet represents locking for a set of resources of the same type. A thread can acquire multiple resources at the same time, and release some or all of them, but cannot acquire more resources incrementally at different times without releasing all of them in between. Internally a LockSet uses a SharedLock for each resource to be able to grant both exclusive and shared acquisition. It also supports safe addition and removal of resources at runtime. Acquisitions are ordered alphabetically in order to grant them to be deadlock-free. A lot of assumptions about how the code interacts are made in order to grant both safety and speed; in order to document all of them the code features pretty lenghty comments. The test suit tries to catch most common interactions but cannot really tests tight race conditions, for which we still need to rely on human checking. This is the second basic building block for the Ganeti Lock Manager. Instance and Node locks will be put in LockSets to manage their acquisition and release. Reviewed-by: imsnah --- diff --git a/lib/locking.py b/lib/locking.py index b164ff3..15c33e7 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -259,3 +259,292 @@ class SharedLock: finally: self.__lock.release() + +class LockSet: + """Implements a set of locks. + + This abstraction implements a set of shared locks for the same resource type, + distinguished by name. The user can lock a subset of the resources and the + LockSet will take care of acquiring the locks always in the same order, thus + preventing deadlock. + + All the locks needed in the same set must be acquired together, though. + + """ + def __init__(self, members=None): + """Constructs a new LockSet. + + Args: + members: initial members of the set + + """ + # Used internally to guarantee coherency. + self.__lock = SharedLock() + + # The lockdict indexes the relationship name -> lock + # The order-of-locking is implied by the alphabetical order of names + self.__lockdict = {} + + if members is not None: + for name in members: + self.__lockdict[name] = SharedLock() + + # The owner dict contains the set of locks each thread owns. For + # performance each thread can access its own key without a global lock on + # this structure. It is paramount though that *no* other type of access is + # done to this structure (eg. no looping over its keys). *_owner helper + # function are defined to guarantee access is correct, but in general never + # do anything different than __owners[threading.currentThread()], or there + # will be trouble. + self.__owners = {} + + def _is_owned(self): + """Is the current thread a current level owner?""" + return threading.currentThread() in self.__owners + + def _add_owned(self, name): + """Note the current thread owns the given lock""" + if self._is_owned(): + self.__owners[threading.currentThread()].add(name) + else: + self.__owners[threading.currentThread()] = set([name]) + + def _del_owned(self, name): + """Note the current thread owns the given lock""" + self.__owners[threading.currentThread()].remove(name) + + if not self.__owners[threading.currentThread()]: + del self.__owners[threading.currentThread()] + + def _list_owned(self): + """Get the set of resource names owned by the current thread""" + if self._is_owned(): + return self.__owners[threading.currentThread()].copy() + else: + return set() + + def __names(self): + """Return the current set of names. + + Only call this function while holding __lock and don't iterate on the + result after releasing the lock. + + """ + return set(self.__lockdict.keys()) + + def _names(self): + """Return a copy of the current set of elements. + + Used only for debugging purposes. + """ + self.__lock.acquire(shared=1) + try: + result = self.__names() + finally: + self.__lock.release() + return result + + def acquire(self, names, blocking=1, shared=0): + """Acquire a set of resource locks. + + Args: + names: the names of the locks which shall be acquired. + (special lock names, or instance/node names) + shared: whether to acquire in shared mode. By default an exclusive lock + will be acquired. + blocking: whether to block while trying to acquire or to operate in try-lock mode. + this locking mode is not supported yet. + + Returns: + True: when all the locks are successfully acquired + + Raises: + errors.LockError: when any lock we try to acquire has been deleted + before we succeed. In this case none of the locks requested will be + acquired. + + """ + if not blocking: + # We don't have non-blocking mode for now + raise NotImplementedError + + # Check we don't already own locks at this level + assert not self._is_owned(), "Cannot acquire locks in the same set twice" + + # Support passing in a single resource to acquire rather than many + if isinstance(names, basestring): + names = [names] + else: + names.sort() + + # Now names contains a sorted list of resources whose lock we want to + # acquire. In order to get them we loop on this (private) list and look + # them up in __lockdict. Since we have no lock held on lockdict we have no + # guarantees on their presence, and they may even disappear after we looked + # them up. This is fine though as .acquire() itself is safe and will alert + # us if the lock gets deleted. + + try: + for lname in names: + lock = self.__lockdict[lname] # raises KeyError if the lock is not there + lock.acquire(shared=shared) # raises LockError if the lock is deleted + try: + # now the lock cannot be deleted, we have it! + self._add_owned(lname) + except: + # We shouldn't have problems adding the lock to the owners list, but + # if we did we'll try to release this lock and re-raise exception. + # Of course something is going to be really wrong, after this. + lock.release() + raise + + except (KeyError, errors.LockError): + name_fail = lname + for lname in self._list_owned(): + self.__lockdict[lname].release() + self._del_owned(lname) + raise errors.LockError('non-existing lock in set (%s)' % name_fail) + + return True + + def release(self, names=None): + """Release a set of resource locks, at the same level. + + You must have acquired the locks, either in shared or in exclusive mode, + before releasing them. + + Args: + names: the names of the locks which shall be released. + (defaults to all the locks acquired at that level). + + """ + + assert self._is_owned(), "release() on lock set while not owner" + + # Support passing in a single resource to release rather than many + if isinstance(names, basestring): + names = [names] + + if names is None: + names = self._list_owned() + else: + names = set(names) + assert self._list_owned().issuperset(names), ( + "release() on unheld resources %s" % + names.difference(self._list_owned())) + + for lockname in names: + # If we are sure the lock doesn't leave __lockdict without being + # exclusively held we can do this... + self.__lockdict[lockname].release() + self._del_owned(lockname) + + def add(self, names, acquired=0, shared=0): + """Add a new set of elements to the set + + Args: + names: names of the new elements to add + acquired: pre-acquire the new resource? + shared: is the pre-acquisition shared? + + """ + # Support passing in a single resource to add rather than many + if isinstance(names, basestring): + names = [names] + + # Acquire the internal lock in an exclusive way, so there cannot be a + # conflicting add() + self.__lock.acquire() + try: + invalid_names = self.__names().intersection(names) + if invalid_names: + # This must be an explicit raise, not an assert, because assert is + # turned off when using optimization, and this can happen because of + # concurrency even if the user doesn't want it. + raise errors.LockError("duplicate add() (%s)" % invalid_names) + + for lockname in names: + lock = SharedLock() + + if acquired: + lock.acquire(shared=shared) + # now the lock cannot be deleted, we have it! + try: + self._add_owned(lockname) + except: + # We shouldn't have problems adding the lock to the owners list, + # but if we did we'll try to release this lock and re-raise + # exception. Of course something is going to be really wrong, + # after this. On the other hand the lock hasn't been added to the + # __lockdict yet so no other threads should be pending on it. This + # release is just a safety measure. + lock.release() + raise + + self.__lockdict[lockname] = lock + + finally: + self.__lock.release() + + return True + + def remove(self, names, blocking=1): + """Remove elements from the lock set. + + You can either not hold anything in the lockset or already hold a superset + of the elements you want to delete, exclusively. + + Args: + names: names of the resource to remove. + blocking: whether to block while trying to acquire or to operate in + try-lock mode. this locking mode is not supported yet unless + you are already holding exclusively the locks. + + Returns: + A list of lock which we failed to delete. The list is always empty if we + were holding all the locks exclusively. + + """ + if not blocking and not self._is_owned(): + # We don't have non-blocking mode for now + raise NotImplementedError + + # Support passing in a single resource to remove rather than many + if isinstance(names, basestring): + names = [names] + + # If we own any subset of this lock it must be a superset of what we want + # to delete. The ownership must also be exclusive, but that will be checked + # by the lock itself. + assert not self._is_owned() or self._list_owned().issuperset(names), ( + "remove() on acquired lockset while not owning all elements") + + delete_failed=[] + + for lname in names: + # Calling delete() acquires the lock exclusively if we don't already own + # it, and causes all pending and subsequent lock acquires to fail. It's + # fine to call it out of order because delete() also implies release(), + # and the assertion above guarantees that if we either already hold + # everything we want to delete, or we hold none. + try: + self.__lockdict[lname].delete() + except (KeyError, errors.LockError): + delete_failed.append(lname) + # This cannot happen if we were already holding it, verify: + assert not self._is_owned(), "remove failed while holding lockset" + else: + # If no LockError was raised we are the ones who deleted the lock. + # This means we can safely remove it from lockdict, as any further or + # pending delete() or acquire() will fail (and nobody can have the lock + # since before our call to delete()). + # + # This is done in an else clause because if the exception was thrown + # it's the job of the one who actually deleted it. + del self.__lockdict[lname] + # And let's remove it from our private list if we owned it. + if self._is_owned(): + self._del_owned(lname) + + return delete_failed + diff --git a/test/ganeti.locking_unittest.py b/test/ganeti.locking_unittest.py index 75d9e94..4aaf5a5 100755 --- a/test/ganeti.locking_unittest.py +++ b/test/ganeti.locking_unittest.py @@ -230,6 +230,188 @@ class TestSharedLock(unittest.TestCase): self.assertEqual(self.done.get(True, 1), 'ERR') +class TestLockSet(unittest.TestCase): + """LockSet tests""" + + def setUp(self): + self.resources = ['one', 'two', 'three'] + self.ls = locking.LockSet(self.resources) + # helper threads use the 'done' queue to tell the master they finished. + self.done = Queue.Queue(0) + + def testResources(self): + self.assertEquals(self.ls._names(), set(self.resources)) + newls = locking.LockSet() + self.assertEquals(newls._names(), set()) + + def testAcquireRelease(self): + self.ls.acquire('one') + self.assertEquals(self.ls._list_owned(), set(['one'])) + self.ls.release() + self.assertEquals(self.ls._list_owned(), set()) + self.ls.acquire(['one']) + self.assertEquals(self.ls._list_owned(), set(['one'])) + self.ls.release() + self.assertEquals(self.ls._list_owned(), set()) + self.ls.acquire(['one', 'two', 'three']) + self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three'])) + self.ls.release('one') + self.assertEquals(self.ls._list_owned(), set(['two', 'three'])) + self.ls.release(['three']) + self.assertEquals(self.ls._list_owned(), set(['two'])) + self.ls.release() + self.assertEquals(self.ls._list_owned(), set()) + self.ls.acquire(['one', 'three']) + self.assertEquals(self.ls._list_owned(), set(['one', 'three'])) + self.ls.release() + self.assertEquals(self.ls._list_owned(), set()) + + def testNoDoubleAcquire(self): + self.ls.acquire('one') + self.assertRaises(AssertionError, self.ls.acquire, 'one') + self.assertRaises(AssertionError, self.ls.acquire, ['two']) + self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three']) + self.ls.release() + self.ls.acquire(['one', 'three']) + self.ls.release('one') + self.assertRaises(AssertionError, self.ls.acquire, ['two']) + self.ls.release('three') + + def testNoWrongRelease(self): + self.assertRaises(AssertionError, self.ls.release) + self.ls.acquire('one') + self.assertRaises(AssertionError, self.ls.release, 'two') + + def testAddRemove(self): + self.ls.add('four') + self.assertEquals(self.ls._list_owned(), set()) + self.assert_('four' in self.ls._names()) + self.ls.add(['five', 'six', 'seven'], acquired=1) + self.assert_('five' in self.ls._names()) + self.assert_('six' in self.ls._names()) + self.assert_('seven' in self.ls._names()) + self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven'])) + self.ls.remove(['five', 'six']) + self.assert_('five' not in self.ls._names()) + self.assert_('six' not in self.ls._names()) + self.assertEquals(self.ls._list_owned(), set(['seven'])) + self.ls.add('eight', acquired=1, shared=1) + self.assert_('eight' in self.ls._names()) + self.assertEquals(self.ls._list_owned(), set(['seven', 'eight'])) + self.ls.remove('seven') + self.assert_('seven' not in self.ls._names()) + self.assertEquals(self.ls._list_owned(), set(['eight'])) + self.ls.release() + self.ls.remove(['two']) + self.assert_('two' not in self.ls._names()) + self.ls.acquire('three') + self.ls.remove(['three']) + self.assert_('three' not in self.ls._names()) + self.assertEquals(self.ls.remove('three'), ['three']) + self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['three', 'six']) + self.assert_('one' not in self.ls._names()) + + def testRemoveNonBlocking(self): + self.assertRaises(NotImplementedError, self.ls.remove, 'one', blocking=0) + self.ls.acquire('one') + self.assertEquals(self.ls.remove('one', blocking=0), []) + self.ls.acquire(['two', 'three']) + self.assertEquals(self.ls.remove(['two', 'three'], blocking=0), []) + + def testNoDoubleAdd(self): + self.assertRaises(errors.LockError, self.ls.add, 'two') + self.ls.add('four') + self.assertRaises(errors.LockError, self.ls.add, 'four') + + def testNoWrongRemoves(self): + self.ls.acquire(['one', 'three'], shared=1) + # Cannot remove 'two' while holding something which is not a superset + self.assertRaises(AssertionError, self.ls.remove, 'two') + # Cannot remove 'three' as we are sharing it + self.assertRaises(AssertionError, self.ls.remove, 'three') + + def _doLockSet(self, set, shared): + try: + self.ls.acquire(set, shared=shared) + self.done.put('DONE') + self.ls.release() + except errors.LockError: + self.done.put('ERR') + + def _doRemoveSet(self, set): + self.done.put(self.ls.remove(set)) + + def testConcurrentSharedAcquire(self): + self.ls.acquire(['one', 'two'], shared=1) + Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start() + self.assertEqual(self.done.get(True, 1), 'DONE') + Thread(target=self._doLockSet, args=(['one', 'two', 'three'], 1)).start() + self.assertEqual(self.done.get(True, 1), 'DONE') + Thread(target=self._doLockSet, args=('three', 1)).start() + self.assertEqual(self.done.get(True, 1), 'DONE') + Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start() + Thread(target=self._doLockSet, args=(['two', 'three'], 0)).start() + self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self.ls.release() + self.assertEqual(self.done.get(True, 1), 'DONE') + self.assertEqual(self.done.get(True, 1), 'DONE') + + def testConcurrentExclusiveAcquire(self): + self.ls.acquire(['one', 'two']) + Thread(target=self._doLockSet, args=('three', 1)).start() + self.assertEqual(self.done.get(True, 1), 'DONE') + Thread(target=self._doLockSet, args=('three', 0)).start() + self.assertEqual(self.done.get(True, 1), 'DONE') + Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start() + Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start() + Thread(target=self._doLockSet, args=('one', 0)).start() + Thread(target=self._doLockSet, args=('one', 1)).start() + Thread(target=self._doLockSet, args=(['two', 'three'], 0)).start() + Thread(target=self._doLockSet, args=(['two', 'three'], 1)).start() + self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self.ls.release() + self.assertEqual(self.done.get(True, 1), 'DONE') + self.assertEqual(self.done.get(True, 1), 'DONE') + self.assertEqual(self.done.get(True, 1), 'DONE') + self.assertEqual(self.done.get(True, 1), 'DONE') + self.assertEqual(self.done.get(True, 1), 'DONE') + self.assertEqual(self.done.get(True, 1), 'DONE') + + def testConcurrentRemove(self): + self.ls.add('four') + self.ls.acquire(['one', 'two', 'four']) + Thread(target=self._doLockSet, args=(['one', 'four'], 0)).start() + Thread(target=self._doLockSet, args=(['one', 'four'], 1)).start() + Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start() + Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start() + self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self.ls.remove('one') + self.ls.release() + self.assertEqual(self.done.get(True, 1), 'ERR') + self.assertEqual(self.done.get(True, 1), 'ERR') + self.assertEqual(self.done.get(True, 1), 'ERR') + self.assertEqual(self.done.get(True, 1), 'ERR') + self.ls.add(['five', 'six'], acquired=1) + Thread(target=self._doLockSet, args=(['three', 'six'], 1)).start() + Thread(target=self._doLockSet, args=(['three', 'six'], 0)).start() + Thread(target=self._doLockSet, args=(['four', 'six'], 1)).start() + Thread(target=self._doLockSet, args=(['four', 'six'], 0)).start() + self.ls.remove('five') + self.ls.release() + self.assertEqual(self.done.get(True, 1), 'DONE') + self.assertEqual(self.done.get(True, 1), 'DONE') + self.assertEqual(self.done.get(True, 1), 'DONE') + self.assertEqual(self.done.get(True, 1), 'DONE') + self.ls.acquire(['three', 'four']) + Thread(target=self._doRemoveSet, args=(['four', 'six'], )).start() + self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self.ls.remove('four') + self.assertEqual(self.done.get(True, 1), ['four']) + Thread(target=self._doRemoveSet, args=(['two'])).start() + self.assertEqual(self.done.get(True, 1), []) + self.ls.release() + + if __name__ == '__main__': unittest.main() #suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock)