X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/7ee7c0c7e4cba67496a261bc0890bbe8d02cf7a0..c8fcde472922e4ee664d904e0bf1a583f1d5040d:/test/ganeti.locking_unittest.py diff --git a/test/ganeti.locking_unittest.py b/test/ganeti.locking_unittest.py index 331c31d..aee6860 100755 --- a/test/ganeti.locking_unittest.py +++ b/test/ganeti.locking_unittest.py @@ -32,10 +32,46 @@ from ganeti import errors from threading import Thread -class TestSharedLock(unittest.TestCase): +# This is used to test the ssynchronize decorator. +# Since it's passed as input to a decorator it must be declared as a global. +_decoratorlock = locking.SharedLock() + +#: List for looping tests +ITERATIONS = range(8) + +def _Repeat(fn): + """Decorator for executing a function many times""" + def wrapper(*args, **kwargs): + for i in ITERATIONS: + fn(*args, **kwargs) + return wrapper + +class _ThreadedTestCase(unittest.TestCase): + """Test class that supports adding/waiting on threads""" + def setUp(self): + unittest.TestCase.setUp(self) + self.threads = [] + + def _addThread(self, *args, **kwargs): + """Create and remember a new thread""" + t = Thread(*args, **kwargs) + self.threads.append(t) + t.start() + return t + + def _waitThreads(self): + """Wait for all our threads to finish""" + for t in self.threads: + t.join(60) + self.failIf(t.isAlive()) + self.threads = [] + + +class TestSharedLock(_ThreadedTestCase): """SharedLock tests""" def setUp(self): + _ThreadedTestCase.setUp(self) self.sl = locking.SharedLock() # helper threads use the 'done' queue to tell the master they finished. self.done = Queue.Queue(0) @@ -115,73 +151,88 @@ class TestSharedLock(unittest.TestCase): self.assert_(self.done.get(True, 1)) self.sl.release() + @_Repeat def testExclusiveBlocksExclusive(self): self.sl.acquire() - Thread(target=self._doItExclusive).start() - # give it a bit of time to check that it's not actually doing anything - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doItExclusive) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() - self.assert_(self.done.get(True, 1)) + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), 'EXC') + @_Repeat def testExclusiveBlocksDelete(self): self.sl.acquire() - Thread(target=self._doItDelete).start() - # give it a bit of time to check that it's not actually doing anything - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doItDelete) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() - self.assert_(self.done.get(True, 1)) + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), 'DEL') + self.sl = locking.SharedLock() + @_Repeat def testExclusiveBlocksSharer(self): self.sl.acquire() - Thread(target=self._doItSharer).start() - time.sleep(0.05) - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doItSharer) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() - self.assert_(self.done.get(True, 1)) + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), 'SHR') + @_Repeat def testSharerBlocksExclusive(self): self.sl.acquire(shared=1) - Thread(target=self._doItExclusive).start() - time.sleep(0.05) - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doItExclusive) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() - self.assert_(self.done.get(True, 1)) + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), 'EXC') + @_Repeat def testSharerBlocksDelete(self): self.sl.acquire(shared=1) - Thread(target=self._doItDelete).start() - time.sleep(0.05) - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doItDelete) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() - self.assert_(self.done.get(True, 1)) + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), 'DEL') + self.sl = locking.SharedLock() + @_Repeat def testWaitingExclusiveBlocksSharer(self): + """SKIPPED testWaitingExclusiveBlockSharer""" + return + self.sl.acquire(shared=1) # the lock is acquired in shared mode... - Thread(target=self._doItExclusive).start() + self._addThread(target=self._doItExclusive) # ...but now an exclusive is waiting... - time.sleep(0.05) - Thread(target=self._doItSharer).start() + self._addThread(target=self._doItSharer) # ...so the sharer should be blocked as well - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() + self._waitThreads() # The exclusive passed before - self.assertEqual(self.done.get(True, 1), 'EXC') - self.assertEqual(self.done.get(True, 1), 'SHR') + self.failUnlessEqual(self.done.get_nowait(), 'EXC') + self.failUnlessEqual(self.done.get_nowait(), 'SHR') + @_Repeat def testWaitingSharerBlocksExclusive(self): + """SKIPPED testWaitingSharerBlocksExclusive""" + return + self.sl.acquire() # the lock is acquired in exclusive mode... - Thread(target=self._doItSharer).start() + self._addThread(target=self._doItSharer) # ...but now a sharer is waiting... - time.sleep(0.05) - Thread(target=self._doItExclusive).start() + self._addThread(target=self._doItExclusive) # ...the exclusive is waiting too... - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() + self._waitThreads() # The sharer passed before - self.assertEqual(self.done.get(True, 1), 'SHR') - self.assertEqual(self.done.get(True, 1), 'EXC') + self.assertEqual(self.done.get_nowait(), 'SHR') + self.assertEqual(self.done.get_nowait(), 'EXC') def testNoNonBlocking(self): self.assertRaises(NotImplementedError, self.sl.acquire, blocking=0) @@ -199,57 +250,122 @@ class TestSharedLock(unittest.TestCase): self.sl.acquire(shared=1) self.assertRaises(AssertionError, self.sl.delete) + @_Repeat def testDeletePendingSharersExclusiveDelete(self): self.sl.acquire() - Thread(target=self._doItSharer).start() - Thread(target=self._doItSharer).start() - time.sleep(0.05) - Thread(target=self._doItExclusive).start() - Thread(target=self._doItDelete).start() - time.sleep(0.05) + self._addThread(target=self._doItSharer) + self._addThread(target=self._doItSharer) + self._addThread(target=self._doItExclusive) + self._addThread(target=self._doItDelete) self.sl.delete() - # The two threads who were pending return both ERR - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') + self._waitThreads() + # The threads who were pending return ERR + for _ in range(4): + self.assertEqual(self.done.get_nowait(), 'ERR') + self.sl = locking.SharedLock() + @_Repeat def testDeletePendingDeleteExclusiveSharers(self): self.sl.acquire() - Thread(target=self._doItDelete).start() - Thread(target=self._doItExclusive).start() - time.sleep(0.05) - Thread(target=self._doItSharer).start() - Thread(target=self._doItSharer).start() - time.sleep(0.05) + self._addThread(target=self._doItDelete) + self._addThread(target=self._doItExclusive) + self._addThread(target=self._doItSharer) + self._addThread(target=self._doItSharer) self.sl.delete() + self._waitThreads() # The two threads who were pending return both ERR - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') + self.assertEqual(self.done.get_nowait(), 'ERR') + self.assertEqual(self.done.get_nowait(), 'ERR') + self.assertEqual(self.done.get_nowait(), 'ERR') + self.assertEqual(self.done.get_nowait(), 'ERR') + self.sl = locking.SharedLock() + + +class TestSSynchronizedDecorator(_ThreadedTestCase): + """Shared Lock Synchronized decorator test""" + + def setUp(self): + _ThreadedTestCase.setUp(self) + # helper threads use the 'done' queue to tell the master they finished. + self.done = Queue.Queue(0) + + @locking.ssynchronized(_decoratorlock) + def _doItExclusive(self): + self.assert_(_decoratorlock._is_owned()) + self.done.put('EXC') + + @locking.ssynchronized(_decoratorlock, shared=1) + def _doItSharer(self): + self.assert_(_decoratorlock._is_owned(shared=1)) + self.done.put('SHR') + + def testDecoratedFunctions(self): + self._doItExclusive() + self.assert_(not _decoratorlock._is_owned()) + self._doItSharer() + self.assert_(not _decoratorlock._is_owned()) + + def testSharersCanCoexist(self): + _decoratorlock.acquire(shared=1) + Thread(target=self._doItSharer).start() + self.assert_(self.done.get(True, 1)) + _decoratorlock.release() + + @_Repeat + def testExclusiveBlocksExclusive(self): + _decoratorlock.acquire() + self._addThread(target=self._doItExclusive) + # give it a bit of time to check that it's not actually doing anything + self.assertRaises(Queue.Empty, self.done.get_nowait) + _decoratorlock.release() + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), 'EXC') + + @_Repeat + def testExclusiveBlocksSharer(self): + _decoratorlock.acquire() + self._addThread(target=self._doItSharer) + self.assertRaises(Queue.Empty, self.done.get_nowait) + _decoratorlock.release() + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), 'SHR') + + @_Repeat + def testSharerBlocksExclusive(self): + _decoratorlock.acquire(shared=1) + self._addThread(target=self._doItExclusive) + self.assertRaises(Queue.Empty, self.done.get_nowait) + _decoratorlock.release() + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), 'EXC') -class TestLockSet(unittest.TestCase): +class TestLockSet(_ThreadedTestCase): """LockSet tests""" def setUp(self): - self.resources = ['one', 'two', 'three'] - self.ls = locking.LockSet(self.resources) + _ThreadedTestCase.setUp(self) + self._setUpLS() # helper threads use the 'done' queue to tell the master they finished. self.done = Queue.Queue(0) + def _setUpLS(self): + """Helper to (re)initialize the lock set""" + self.resources = ['one', 'two', 'three'] + self.ls = locking.LockSet(members=self.resources) + + def testResources(self): self.assertEquals(self.ls._names(), set(self.resources)) newls = locking.LockSet() self.assertEquals(newls._names(), set()) def testAcquireRelease(self): - self.ls.acquire('one') + self.assert_(self.ls.acquire('one')) self.assertEquals(self.ls._list_owned(), set(['one'])) self.ls.release() self.assertEquals(self.ls._list_owned(), set()) - self.ls.acquire(['one']) + self.assertEquals(self.ls.acquire(['one']), set(['one'])) self.assertEquals(self.ls._list_owned(), set(['one'])) self.ls.release() self.assertEquals(self.ls._list_owned(), set()) @@ -261,7 +377,7 @@ class TestLockSet(unittest.TestCase): self.assertEquals(self.ls._list_owned(), set(['two'])) self.ls.release() self.assertEquals(self.ls._list_owned(), set()) - self.ls.acquire(['one', 'three']) + self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three'])) self.assertEquals(self.ls._list_owned(), set(['one', 'three'])) self.ls.release() self.assertEquals(self.ls._list_owned(), set()) @@ -291,32 +407,41 @@ class TestLockSet(unittest.TestCase): self.assert_('six' in self.ls._names()) self.assert_('seven' in self.ls._names()) self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven'])) - self.ls.remove(['five', 'six']) + self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six']) self.assert_('five' not in self.ls._names()) self.assert_('six' not in self.ls._names()) self.assertEquals(self.ls._list_owned(), set(['seven'])) - self.ls.add('eight', acquired=1, shared=1) - self.assert_('eight' in self.ls._names()) - self.assertEquals(self.ls._list_owned(), set(['seven', 'eight'])) + self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1) self.ls.remove('seven') self.assert_('seven' not in self.ls._names()) - self.assertEquals(self.ls._list_owned(), set(['eight'])) + self.assertEquals(self.ls._list_owned(), set([])) + self.ls.acquire(None, shared=1) + self.assertRaises(AssertionError, self.ls.add, 'eight') + self.ls.release() + self.ls.acquire(None) + self.ls.add('eight', acquired=1) + self.assert_('eight' in self.ls._names()) + self.assert_('eight' in self.ls._list_owned()) + self.ls.add('nine') + self.assert_('nine' in self.ls._names()) + self.assert_('nine' not in self.ls._list_owned()) self.ls.release() self.ls.remove(['two']) self.assert_('two' not in self.ls._names()) self.ls.acquire('three') - self.ls.remove(['three']) + self.assertEquals(self.ls.remove(['three']), ['three']) self.assert_('three' not in self.ls._names()) - self.assertEquals(self.ls.remove('three'), ['three']) - self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['three', 'six']) + self.assertEquals(self.ls.remove('three'), []) + self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one']) self.assert_('one' not in self.ls._names()) def testRemoveNonBlocking(self): self.assertRaises(NotImplementedError, self.ls.remove, 'one', blocking=0) self.ls.acquire('one') - self.assertEquals(self.ls.remove('one', blocking=0), []) + self.assertEquals(self.ls.remove('one', blocking=0), ['one']) self.ls.acquire(['two', 'three']) - self.assertEquals(self.ls.remove(['two', 'three'], blocking=0), []) + self.assertEquals(self.ls.remove(['two', 'three'], blocking=0), + ['two', 'three']) def testNoDoubleAdd(self): self.assertRaises(errors.LockError, self.ls.add, 'two') @@ -330,6 +455,40 @@ class TestLockSet(unittest.TestCase): # Cannot remove 'three' as we are sharing it self.assertRaises(AssertionError, self.ls.remove, 'three') + def testAcquireSetLock(self): + # acquire the set-lock exclusively + self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three'])) + self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three'])) + self.assertEquals(self.ls._is_owned(), True) + self.assertEquals(self.ls._names(), set(['one', 'two', 'three'])) + # I can still add/remove elements... + self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three']) + self.assert_(self.ls.add('six')) + self.ls.release() + # share the set-lock + self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six'])) + # adding new elements is not possible + self.assertRaises(AssertionError, self.ls.add, 'five') + self.ls.release() + + def testAcquireWithRepetitions(self): + self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1), + set(['two', 'two', 'three'])) + self.ls.release(['two', 'two']) + self.assertEquals(self.ls._list_owned(), set(['three'])) + + def testEmptyAcquire(self): + # Acquire an empty list of locks... + self.assertEquals(self.ls.acquire([]), set()) + self.assertEquals(self.ls._list_owned(), set()) + # New locks can still be addded + self.assert_(self.ls.add('six')) + # "re-acquiring" is not an issue, since we had really acquired nothing + self.assertEquals(self.ls.acquire([], shared=1), set()) + self.assertEquals(self.ls._list_owned(), set()) + # We haven't really acquired anything, so we cannot release + self.assertRaises(AssertionError, self.ls.release) + def _doLockSet(self, set, shared): try: self.ls.acquire(set, shared=shared) @@ -338,83 +497,205 @@ class TestLockSet(unittest.TestCase): except errors.LockError: self.done.put('ERR') + def _doAddSet(self, set): + try: + self.ls.add(set, acquired=1) + self.done.put('DONE') + self.ls.release() + except errors.LockError: + self.done.put('ERR') + def _doRemoveSet(self, set): self.done.put(self.ls.remove(set)) + @_Repeat def testConcurrentSharedAcquire(self): self.ls.acquire(['one', 'two'], shared=1) - Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - Thread(target=self._doLockSet, args=(['one', 'two', 'three'], 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - Thread(target=self._doLockSet, args=('three', 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start() - Thread(target=self._doLockSet, args=(['two', 'three'], 0)).start() - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doLockSet, args=(['one', 'two'], 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._addThread(target=self._doLockSet, args=('three', 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._addThread(target=self._doLockSet, args=(['one', 'two'], 0)) + self._addThread(target=self._doLockSet, args=(['two', 'three'], 0)) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.ls.release() - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self.assertEqual(self.done.get_nowait(), 'DONE') + @_Repeat def testConcurrentExclusiveAcquire(self): self.ls.acquire(['one', 'two']) - Thread(target=self._doLockSet, args=('three', 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - Thread(target=self._doLockSet, args=('three', 0)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start() - Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start() - Thread(target=self._doLockSet, args=('one', 0)).start() - Thread(target=self._doLockSet, args=('one', 1)).start() - Thread(target=self._doLockSet, args=(['two', 'three'], 0)).start() - Thread(target=self._doLockSet, args=(['two', 'three'], 1)).start() - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doLockSet, args=('three', 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._addThread(target=self._doLockSet, args=('three', 0)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._addThread(target=self._doLockSet, args=(['one', 'two'], 0)) + self._addThread(target=self._doLockSet, args=(['one', 'two'], 1)) + self._addThread(target=self._doLockSet, args=('one', 0)) + self._addThread(target=self._doLockSet, args=('one', 1)) + self._addThread(target=self._doLockSet, args=(['two', 'three'], 0)) + self._addThread(target=self._doLockSet, args=(['two', 'three'], 1)) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.ls.release() - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') + self._waitThreads() + for _ in range(6): + self.failUnlessEqual(self.done.get_nowait(), 'DONE') + @_Repeat def testConcurrentRemove(self): self.ls.add('four') self.ls.acquire(['one', 'two', 'four']) - Thread(target=self._doLockSet, args=(['one', 'four'], 0)).start() - Thread(target=self._doLockSet, args=(['one', 'four'], 1)).start() - Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start() - Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start() - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doLockSet, args=(['one', 'four'], 0)) + self._addThread(target=self._doLockSet, args=(['one', 'four'], 1)) + self._addThread(target=self._doLockSet, args=(['one', 'two'], 0)) + self._addThread(target=self._doLockSet, args=(['one', 'two'], 1)) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.ls.remove('one') self.ls.release() - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') + self._waitThreads() + for i in range(4): + self.failUnlessEqual(self.done.get_nowait(), 'ERR') self.ls.add(['five', 'six'], acquired=1) - Thread(target=self._doLockSet, args=(['three', 'six'], 1)).start() - Thread(target=self._doLockSet, args=(['three', 'six'], 0)).start() - Thread(target=self._doLockSet, args=(['four', 'six'], 1)).start() - Thread(target=self._doLockSet, args=(['four', 'six'], 0)).start() + self._addThread(target=self._doLockSet, args=(['three', 'six'], 1)) + self._addThread(target=self._doLockSet, args=(['three', 'six'], 0)) + self._addThread(target=self._doLockSet, args=(['four', 'six'], 1)) + self._addThread(target=self._doLockSet, args=(['four', 'six'], 0)) self.ls.remove('five') self.ls.release() - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') + self._waitThreads() + for i in range(4): + self.failUnlessEqual(self.done.get_nowait(), 'DONE') self.ls.acquire(['three', 'four']) - Thread(target=self._doRemoveSet, args=(['four', 'six'], )).start() - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doRemoveSet, args=(['four', 'six'], )) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.ls.remove('four') - self.assertEqual(self.done.get(True, 1), ['four']) - Thread(target=self._doRemoveSet, args=(['two'])).start() - self.assertEqual(self.done.get(True, 1), []) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), ['six']) + self._addThread(target=self._doRemoveSet, args=(['two'])) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), ['two']) + self.ls.release() + # reset lockset + self._setUpLS() + + @_Repeat + def testConcurrentSharedSetLock(self): + # share the set-lock... + self.ls.acquire(None, shared=1) + # ...another thread can share it too + self._addThread(target=self._doLockSet, args=(None, 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + # ...or just share some elements + self._addThread(target=self._doLockSet, args=(['one', 'three'], 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + # ...but not add new ones or remove any + t = self._addThread(target=self._doAddSet, args=(['nine'])) + self._addThread(target=self._doRemoveSet, args=(['two'], )) + self.assertRaises(Queue.Empty, self.done.get_nowait) + # this just releases the set-lock + self.ls.release([]) + t.join(60) + self.assertEqual(self.done.get_nowait(), 'DONE') + # release the lock on the actual elements so remove() can proceed too + self.ls.release() + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), ['two']) + # reset lockset + self._setUpLS() + + @_Repeat + def testConcurrentExclusiveSetLock(self): + # acquire the set-lock... + self.ls.acquire(None, shared=0) + # ...no one can do anything else + self._addThread(target=self._doLockSet, args=(None, 1)) + self._addThread(target=self._doLockSet, args=(None, 0)) + self._addThread(target=self._doLockSet, args=(['three'], 0)) + self._addThread(target=self._doLockSet, args=(['two'], 1)) + self._addThread(target=self._doAddSet, args=(['nine'])) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.ls.release() + self._waitThreads() + for _ in range(5): + self.assertEqual(self.done.get(True, 1), 'DONE') + # cleanup + self._setUpLS() + + @_Repeat + def testConcurrentSetLockAdd(self): + self.ls.acquire('one') + # Another thread wants the whole SetLock + self._addThread(target=self._doLockSet, args=(None, 0)) + self._addThread(target=self._doLockSet, args=(None, 1)) + self.assertRaises(Queue.Empty, self.done.get_nowait) + self.assertRaises(AssertionError, self.ls.add, 'four') + self.ls.release() + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self.assertEqual(self.done.get_nowait(), 'DONE') + self.ls.acquire(None) + self._addThread(target=self._doLockSet, args=(None, 0)) + self._addThread(target=self._doLockSet, args=(None, 1)) + self.assertRaises(Queue.Empty, self.done.get_nowait) + self.ls.add('four') + self.ls.add('five', acquired=1) + self.ls.add('six', acquired=1, shared=1) + self.assertEquals(self.ls._list_owned(), + set(['one', 'two', 'three', 'five', 'six'])) + self.assertEquals(self.ls._is_owned(), True) + self.assertEquals(self.ls._names(), + set(['one', 'two', 'three', 'four', 'five', 'six'])) + self.ls.release() + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self.assertEqual(self.done.get_nowait(), 'DONE') + self._setUpLS() + + @_Repeat + def testEmptyLockSet(self): + # get the set-lock + self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three'])) + # now empty it... + self.ls.remove(['one', 'two', 'three']) + # and adds/locks by another thread still wait + self._addThread(target=self._doAddSet, args=(['nine'])) + self._addThread(target=self._doLockSet, args=(None, 1)) + self._addThread(target=self._doLockSet, args=(None, 0)) + self.assertRaises(Queue.Empty, self.done.get_nowait) + self.ls.release() + self._waitThreads() + for _ in range(3): + self.assertEqual(self.done.get_nowait(), 'DONE') + # empty it again... + self.assertEqual(self.ls.remove(['nine']), ['nine']) + # now share it... + self.assertEqual(self.ls.acquire(None, shared=1), set()) + # other sharers can go, adds still wait + self._addThread(target=self._doLockSet, args=(None, 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._addThread(target=self._doAddSet, args=(['nine'])) + self.assertRaises(Queue.Empty, self.done.get_nowait) + self.ls.release() + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._setUpLS() -class TestGanetiLockManager(unittest.TestCase): +class TestGanetiLockManager(_ThreadedTestCase): def setUp(self): + _ThreadedTestCase.setUp(self) self.nodes=['n1', 'n2'] self.instances=['i1', 'i2', 'i3'] self.GL = locking.GanetiLockManager(nodes=self.nodes, @@ -428,18 +709,19 @@ class TestGanetiLockManager(unittest.TestCase): def testLockingConstants(self): # The locking library internally cheats by assuming its constants have some # relationships with each other. Check those hold true. + # This relationship is also used in the Processor to recursively acquire + # the right locks. Again, please don't break it. for i in range(len(locking.LEVELS)): self.assertEqual(i, locking.LEVELS[i]) def testDoubleGLFails(self): - # We are not passing test=True, so instantiating a new one should fail self.assertRaises(AssertionError, locking.GanetiLockManager) def testLockNames(self): self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL'])) self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes)) - self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(self.instances)) - self.assertEqual(self.GL._names(locking.LEVEL_CONFIG), set(['config'])) + self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), + set(self.instances)) def testInitAndResources(self): locking.GanetiLockManager._instance = None @@ -447,41 +729,65 @@ class TestGanetiLockManager(unittest.TestCase): self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL'])) self.assertEqual(self.GL._names(locking.LEVEL_NODE), set()) self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set()) - self.assertEqual(self.GL._names(locking.LEVEL_CONFIG), set(['config'])) locking.GanetiLockManager._instance = None self.GL = locking.GanetiLockManager(nodes=self.nodes) self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL'])) self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes)) self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set()) - self.assertEqual(self.GL._names(locking.LEVEL_CONFIG), set(['config'])) locking.GanetiLockManager._instance = None self.GL = locking.GanetiLockManager(instances=self.instances) self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL'])) self.assertEqual(self.GL._names(locking.LEVEL_NODE), set()) - self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(self.instances)) - self.assertEqual(self.GL._names(locking.LEVEL_CONFIG), set(['config'])) + self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), + set(self.instances)) def testAcquireRelease(self): self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL'])) + self.GL.acquire(locking.LEVEL_INSTANCE, ['i1']) self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1) - self.GL.release(locking.LEVEL_NODE) - self.GL.acquire(locking.LEVEL_NODE, ['n1']) + self.GL.release(locking.LEVEL_NODE, ['n2']) self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1'])) - self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2']) - self.GL.acquire(locking.LEVEL_CONFIG, ['config']) - self.GL.release(locking.LEVEL_INSTANCE, ['i2']) self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1'])) self.GL.release(locking.LEVEL_NODE) + self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set()) + self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1'])) self.GL.release(locking.LEVEL_INSTANCE) - self.GL.release(locking.LEVEL_CONFIG) self.assertRaises(errors.LockError, self.GL.acquire, locking.LEVEL_INSTANCE, ['i5']) self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1) self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3'])) + def testAcquireWholeSets(self): + self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) + self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None), + set(self.instances)) + self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), + set(self.instances)) + self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1), + set(self.nodes)) + self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), + set(self.nodes)) + self.GL.release(locking.LEVEL_NODE) + self.GL.release(locking.LEVEL_INSTANCE) + self.GL.release(locking.LEVEL_CLUSTER) + + def testAcquireWholeAndPartial(self): + self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) + self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None), + set(self.instances)) + self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), + set(self.instances)) + self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1), + set(['n2'])) + self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), + set(['n2'])) + self.GL.release(locking.LEVEL_NODE) + self.GL.release(locking.LEVEL_INSTANCE) + self.GL.release(locking.LEVEL_CLUSTER) + def testBGLDependency(self): self.assertRaises(AssertionError, self.GL.acquire, locking.LEVEL_NODE, ['n1', 'n2']) @@ -500,27 +806,14 @@ class TestGanetiLockManager(unittest.TestCase): self.assertRaises(AssertionError, self.GL.release, locking.LEVEL_CLUSTER) self.GL.release(locking.LEVEL_INSTANCE) - self.GL.acquire(locking.LEVEL_CONFIG, ['config']) - self.assertRaises(AssertionError, self.GL.release, - locking.LEVEL_CLUSTER) def testWrongOrder(self): self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) - self.GL.acquire(locking.LEVEL_INSTANCE, ['i3']) + self.GL.acquire(locking.LEVEL_NODE, ['n2']) self.assertRaises(AssertionError, self.GL.acquire, locking.LEVEL_NODE, ['n1']) self.assertRaises(AssertionError, self.GL.acquire, locking.LEVEL_INSTANCE, ['i2']) - self.GL.acquire(locking.LEVEL_CONFIG, ['config']) - self.assertRaises(AssertionError, self.GL.acquire, - locking.LEVEL_CONFIG, ['config']) - self.GL.release(locking.LEVEL_INSTANCE) - self.assertRaises(AssertionError, self.GL.acquire, - locking.LEVEL_NODE, ['n1']) - self.assertRaises(AssertionError, self.GL.acquire, - locking.LEVEL_INSTANCE, ['i2']) - self.assertRaises(AssertionError, self.GL.acquire, - locking.LEVEL_CONFIG, ['config']) # Helper function to run as a thread that shared the BGL and then acquires # some locks at another level. @@ -534,27 +827,36 @@ class TestGanetiLockManager(unittest.TestCase): except errors.LockError: self.done.put('ERR') + @_Repeat def testConcurrency(self): self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) - Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i1', 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - self.GL.acquire(locking.LEVEL_NODE, ['n1']) + self._addThread(target=self._doLock, + args=(locking.LEVEL_INSTANCE, 'i1', 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') self.GL.acquire(locking.LEVEL_INSTANCE, ['i3']) - self.GL.acquire(locking.LEVEL_CONFIG, ['config']) - Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i1', 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i3', 1)).start() - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) - self.GL.release(locking.LEVEL_CONFIG) + self._addThread(target=self._doLock, + args=(locking.LEVEL_INSTANCE, 'i1', 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._addThread(target=self._doLock, + args=(locking.LEVEL_INSTANCE, 'i3', 1)) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.GL.release(locking.LEVEL_INSTANCE) - self.assertEqual(self.done.get(True, 1), 'DONE') + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1) - Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i2', 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i2', 0)).start() - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doLock, + args=(locking.LEVEL_INSTANCE, 'i2', 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._addThread(target=self._doLock, + args=(locking.LEVEL_INSTANCE, 'i2', 0)) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.GL.release(locking.LEVEL_INSTANCE) + self._waitThreads() self.assertEqual(self.done.get(True, 1), 'DONE') + self.GL.release(locking.LEVEL_CLUSTER, ['BGL']) if __name__ == '__main__':