X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/b10b9d740788b32fc3521aa086590f066495dc38..c8fcde472922e4ee664d904e0bf1a583f1d5040d:/test/ganeti.locking_unittest.py diff --git a/test/ganeti.locking_unittest.py b/test/ganeti.locking_unittest.py index 195b27f..aee6860 100755 --- a/test/ganeti.locking_unittest.py +++ b/test/ganeti.locking_unittest.py @@ -36,11 +36,42 @@ from threading import Thread # Since it's passed as input to a decorator it must be declared as a global. _decoratorlock = locking.SharedLock() +#: List for looping tests +ITERATIONS = range(8) + +def _Repeat(fn): + """Decorator for executing a function many times""" + def wrapper(*args, **kwargs): + for i in ITERATIONS: + fn(*args, **kwargs) + return wrapper + +class _ThreadedTestCase(unittest.TestCase): + """Test class that supports adding/waiting on threads""" + def setUp(self): + unittest.TestCase.setUp(self) + self.threads = [] + + def _addThread(self, *args, **kwargs): + """Create and remember a new thread""" + t = Thread(*args, **kwargs) + self.threads.append(t) + t.start() + return t + + def _waitThreads(self): + """Wait for all our threads to finish""" + for t in self.threads: + t.join(60) + self.failIf(t.isAlive()) + self.threads = [] -class TestSharedLock(unittest.TestCase): + +class TestSharedLock(_ThreadedTestCase): """SharedLock tests""" def setUp(self): + _ThreadedTestCase.setUp(self) self.sl = locking.SharedLock() # helper threads use the 'done' queue to tell the master they finished. self.done = Queue.Queue(0) @@ -120,73 +151,88 @@ class TestSharedLock(unittest.TestCase): self.assert_(self.done.get(True, 1)) self.sl.release() + @_Repeat def testExclusiveBlocksExclusive(self): self.sl.acquire() - Thread(target=self._doItExclusive).start() - # give it a bit of time to check that it's not actually doing anything - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doItExclusive) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() - self.assert_(self.done.get(True, 1)) + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), 'EXC') + @_Repeat def testExclusiveBlocksDelete(self): self.sl.acquire() - Thread(target=self._doItDelete).start() - # give it a bit of time to check that it's not actually doing anything - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doItDelete) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() - self.assert_(self.done.get(True, 1)) + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), 'DEL') + self.sl = locking.SharedLock() + @_Repeat def testExclusiveBlocksSharer(self): self.sl.acquire() - Thread(target=self._doItSharer).start() - time.sleep(0.05) - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doItSharer) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() - self.assert_(self.done.get(True, 1)) + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), 'SHR') + @_Repeat def testSharerBlocksExclusive(self): self.sl.acquire(shared=1) - Thread(target=self._doItExclusive).start() - time.sleep(0.05) - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doItExclusive) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() - self.assert_(self.done.get(True, 1)) + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), 'EXC') + @_Repeat def testSharerBlocksDelete(self): self.sl.acquire(shared=1) - Thread(target=self._doItDelete).start() - time.sleep(0.05) - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doItDelete) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() - self.assert_(self.done.get(True, 1)) + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), 'DEL') + self.sl = locking.SharedLock() + @_Repeat def testWaitingExclusiveBlocksSharer(self): + """SKIPPED testWaitingExclusiveBlockSharer""" + return + self.sl.acquire(shared=1) # the lock is acquired in shared mode... - Thread(target=self._doItExclusive).start() + self._addThread(target=self._doItExclusive) # ...but now an exclusive is waiting... - time.sleep(0.05) - Thread(target=self._doItSharer).start() + self._addThread(target=self._doItSharer) # ...so the sharer should be blocked as well - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() + self._waitThreads() # The exclusive passed before - self.assertEqual(self.done.get(True, 1), 'EXC') - self.assertEqual(self.done.get(True, 1), 'SHR') + self.failUnlessEqual(self.done.get_nowait(), 'EXC') + self.failUnlessEqual(self.done.get_nowait(), 'SHR') + @_Repeat def testWaitingSharerBlocksExclusive(self): + """SKIPPED testWaitingSharerBlocksExclusive""" + return + self.sl.acquire() # the lock is acquired in exclusive mode... - Thread(target=self._doItSharer).start() + self._addThread(target=self._doItSharer) # ...but now a sharer is waiting... - time.sleep(0.05) - Thread(target=self._doItExclusive).start() + self._addThread(target=self._doItExclusive) # ...the exclusive is waiting too... - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() + self._waitThreads() # The sharer passed before - self.assertEqual(self.done.get(True, 1), 'SHR') - self.assertEqual(self.done.get(True, 1), 'EXC') + self.assertEqual(self.done.get_nowait(), 'SHR') + self.assertEqual(self.done.get_nowait(), 'EXC') def testNoNonBlocking(self): self.assertRaises(NotImplementedError, self.sl.acquire, blocking=0) @@ -204,41 +250,42 @@ class TestSharedLock(unittest.TestCase): self.sl.acquire(shared=1) self.assertRaises(AssertionError, self.sl.delete) + @_Repeat def testDeletePendingSharersExclusiveDelete(self): self.sl.acquire() - Thread(target=self._doItSharer).start() - Thread(target=self._doItSharer).start() - time.sleep(0.05) - Thread(target=self._doItExclusive).start() - Thread(target=self._doItDelete).start() - time.sleep(0.05) + self._addThread(target=self._doItSharer) + self._addThread(target=self._doItSharer) + self._addThread(target=self._doItExclusive) + self._addThread(target=self._doItDelete) self.sl.delete() - # The two threads who were pending return both ERR - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') + self._waitThreads() + # The threads who were pending return ERR + for _ in range(4): + self.assertEqual(self.done.get_nowait(), 'ERR') + self.sl = locking.SharedLock() + @_Repeat def testDeletePendingDeleteExclusiveSharers(self): self.sl.acquire() - Thread(target=self._doItDelete).start() - Thread(target=self._doItExclusive).start() - time.sleep(0.05) - Thread(target=self._doItSharer).start() - Thread(target=self._doItSharer).start() - time.sleep(0.05) + self._addThread(target=self._doItDelete) + self._addThread(target=self._doItExclusive) + self._addThread(target=self._doItSharer) + self._addThread(target=self._doItSharer) self.sl.delete() + self._waitThreads() # The two threads who were pending return both ERR - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') + self.assertEqual(self.done.get_nowait(), 'ERR') + self.assertEqual(self.done.get_nowait(), 'ERR') + self.assertEqual(self.done.get_nowait(), 'ERR') + self.assertEqual(self.done.get_nowait(), 'ERR') + self.sl = locking.SharedLock() -class TestSSynchronizedDecorator(unittest.TestCase): +class TestSSynchronizedDecorator(_ThreadedTestCase): """Shared Lock Synchronized decorator test""" def setUp(self): + _ThreadedTestCase.setUp(self) # helper threads use the 'done' queue to tell the master they finished. self.done = Queue.Queue(0) @@ -264,40 +311,50 @@ class TestSSynchronizedDecorator(unittest.TestCase): self.assert_(self.done.get(True, 1)) _decoratorlock.release() + @_Repeat def testExclusiveBlocksExclusive(self): _decoratorlock.acquire() - Thread(target=self._doItExclusive).start() + self._addThread(target=self._doItExclusive) # give it a bit of time to check that it's not actually doing anything - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self.assertRaises(Queue.Empty, self.done.get_nowait) _decoratorlock.release() - self.assert_(self.done.get(True, 1)) + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), 'EXC') + @_Repeat def testExclusiveBlocksSharer(self): _decoratorlock.acquire() - Thread(target=self._doItSharer).start() - time.sleep(0.05) - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doItSharer) + self.assertRaises(Queue.Empty, self.done.get_nowait) _decoratorlock.release() - self.assert_(self.done.get(True, 1)) + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), 'SHR') + @_Repeat def testSharerBlocksExclusive(self): _decoratorlock.acquire(shared=1) - Thread(target=self._doItExclusive).start() - time.sleep(0.05) - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doItExclusive) + self.assertRaises(Queue.Empty, self.done.get_nowait) _decoratorlock.release() - self.assert_(self.done.get(True, 1)) + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), 'EXC') -class TestLockSet(unittest.TestCase): +class TestLockSet(_ThreadedTestCase): """LockSet tests""" def setUp(self): - self.resources = ['one', 'two', 'three'] - self.ls = locking.LockSet(members=self.resources) + _ThreadedTestCase.setUp(self) + self._setUpLS() # helper threads use the 'done' queue to tell the master they finished. self.done = Queue.Queue(0) + def _setUpLS(self): + """Helper to (re)initialize the lock set""" + self.resources = ['one', 'two', 'three'] + self.ls = locking.LockSet(members=self.resources) + + def testResources(self): self.assertEquals(self.ls._names(), set(self.resources)) newls = locking.LockSet() @@ -354,12 +411,20 @@ class TestLockSet(unittest.TestCase): self.assert_('five' not in self.ls._names()) self.assert_('six' not in self.ls._names()) self.assertEquals(self.ls._list_owned(), set(['seven'])) - self.ls.add('eight', acquired=1, shared=1) - self.assert_('eight' in self.ls._names()) - self.assertEquals(self.ls._list_owned(), set(['seven', 'eight'])) + self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1) self.ls.remove('seven') self.assert_('seven' not in self.ls._names()) - self.assertEquals(self.ls._list_owned(), set(['eight'])) + self.assertEquals(self.ls._list_owned(), set([])) + self.ls.acquire(None, shared=1) + self.assertRaises(AssertionError, self.ls.add, 'eight') + self.ls.release() + self.ls.acquire(None) + self.ls.add('eight', acquired=1) + self.assert_('eight' in self.ls._names()) + self.assert_('eight' in self.ls._list_owned()) + self.ls.add('nine') + self.assert_('nine' in self.ls._names()) + self.assert_('nine' not in self.ls._list_owned()) self.ls.release() self.ls.remove(['two']) self.assert_('two' not in self.ls._names()) @@ -393,6 +458,9 @@ class TestLockSet(unittest.TestCase): def testAcquireSetLock(self): # acquire the set-lock exclusively self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three'])) + self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three'])) + self.assertEquals(self.ls._is_owned(), True) + self.assertEquals(self.ls._names(), set(['one', 'two', 'three'])) # I can still add/remove elements... self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three']) self.assert_(self.ls.add('six')) @@ -403,6 +471,12 @@ class TestLockSet(unittest.TestCase): self.assertRaises(AssertionError, self.ls.add, 'five') self.ls.release() + def testAcquireWithRepetitions(self): + self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1), + set(['two', 'two', 'three'])) + self.ls.release(['two', 'two']) + self.assertEquals(self.ls._list_owned(), set(['three'])) + def testEmptyAcquire(self): # Acquire an empty list of locks... self.assertEquals(self.ls.acquire([]), set()) @@ -434,143 +508,194 @@ class TestLockSet(unittest.TestCase): def _doRemoveSet(self, set): self.done.put(self.ls.remove(set)) + @_Repeat def testConcurrentSharedAcquire(self): self.ls.acquire(['one', 'two'], shared=1) - Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - Thread(target=self._doLockSet, args=(['one', 'two', 'three'], 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - Thread(target=self._doLockSet, args=('three', 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start() - Thread(target=self._doLockSet, args=(['two', 'three'], 0)).start() - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doLockSet, args=(['one', 'two'], 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._addThread(target=self._doLockSet, args=('three', 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._addThread(target=self._doLockSet, args=(['one', 'two'], 0)) + self._addThread(target=self._doLockSet, args=(['two', 'three'], 0)) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.ls.release() - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self.assertEqual(self.done.get_nowait(), 'DONE') + @_Repeat def testConcurrentExclusiveAcquire(self): self.ls.acquire(['one', 'two']) - Thread(target=self._doLockSet, args=('three', 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - Thread(target=self._doLockSet, args=('three', 0)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start() - Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start() - Thread(target=self._doLockSet, args=('one', 0)).start() - Thread(target=self._doLockSet, args=('one', 1)).start() - Thread(target=self._doLockSet, args=(['two', 'three'], 0)).start() - Thread(target=self._doLockSet, args=(['two', 'three'], 1)).start() - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doLockSet, args=('three', 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._addThread(target=self._doLockSet, args=('three', 0)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._addThread(target=self._doLockSet, args=(['one', 'two'], 0)) + self._addThread(target=self._doLockSet, args=(['one', 'two'], 1)) + self._addThread(target=self._doLockSet, args=('one', 0)) + self._addThread(target=self._doLockSet, args=('one', 1)) + self._addThread(target=self._doLockSet, args=(['two', 'three'], 0)) + self._addThread(target=self._doLockSet, args=(['two', 'three'], 1)) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.ls.release() - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') + self._waitThreads() + for _ in range(6): + self.failUnlessEqual(self.done.get_nowait(), 'DONE') + @_Repeat def testConcurrentRemove(self): self.ls.add('four') self.ls.acquire(['one', 'two', 'four']) - Thread(target=self._doLockSet, args=(['one', 'four'], 0)).start() - Thread(target=self._doLockSet, args=(['one', 'four'], 1)).start() - Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start() - Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start() - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doLockSet, args=(['one', 'four'], 0)) + self._addThread(target=self._doLockSet, args=(['one', 'four'], 1)) + self._addThread(target=self._doLockSet, args=(['one', 'two'], 0)) + self._addThread(target=self._doLockSet, args=(['one', 'two'], 1)) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.ls.remove('one') self.ls.release() - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') + self._waitThreads() + for i in range(4): + self.failUnlessEqual(self.done.get_nowait(), 'ERR') self.ls.add(['five', 'six'], acquired=1) - Thread(target=self._doLockSet, args=(['three', 'six'], 1)).start() - Thread(target=self._doLockSet, args=(['three', 'six'], 0)).start() - Thread(target=self._doLockSet, args=(['four', 'six'], 1)).start() - Thread(target=self._doLockSet, args=(['four', 'six'], 0)).start() + self._addThread(target=self._doLockSet, args=(['three', 'six'], 1)) + self._addThread(target=self._doLockSet, args=(['three', 'six'], 0)) + self._addThread(target=self._doLockSet, args=(['four', 'six'], 1)) + self._addThread(target=self._doLockSet, args=(['four', 'six'], 0)) self.ls.remove('five') self.ls.release() - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') + self._waitThreads() + for i in range(4): + self.failUnlessEqual(self.done.get_nowait(), 'DONE') self.ls.acquire(['three', 'four']) - Thread(target=self._doRemoveSet, args=(['four', 'six'], )).start() - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doRemoveSet, args=(['four', 'six'], )) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.ls.remove('four') - self.assertEqual(self.done.get(True, 1), ['six']) - Thread(target=self._doRemoveSet, args=(['two'])).start() - self.assertEqual(self.done.get(True, 1), ['two']) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), ['six']) + self._addThread(target=self._doRemoveSet, args=(['two'])) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), ['two']) self.ls.release() + # reset lockset + self._setUpLS() + @_Repeat def testConcurrentSharedSetLock(self): # share the set-lock... self.ls.acquire(None, shared=1) # ...another thread can share it too - Thread(target=self._doLockSet, args=(None, 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') + self._addThread(target=self._doLockSet, args=(None, 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') # ...or just share some elements - Thread(target=self._doLockSet, args=(['one', 'three'], 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') + self._addThread(target=self._doLockSet, args=(['one', 'three'], 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') # ...but not add new ones or remove any - Thread(target=self._doAddSet, args=(['nine'])).start() - Thread(target=self._doRemoveSet, args=(['two'], )).start() - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + t = self._addThread(target=self._doAddSet, args=(['nine'])) + self._addThread(target=self._doRemoveSet, args=(['two'], )) + self.assertRaises(Queue.Empty, self.done.get_nowait) # this just releases the set-lock self.ls.release([]) - self.assertEqual(self.done.get(True, 1), 'DONE') + t.join(60) + self.assertEqual(self.done.get_nowait(), 'DONE') # release the lock on the actual elements so remove() can proceed too self.ls.release() - self.assertEqual(self.done.get(True, 1), ['two']) + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), ['two']) + # reset lockset + self._setUpLS() + @_Repeat def testConcurrentExclusiveSetLock(self): # acquire the set-lock... self.ls.acquire(None, shared=0) # ...no one can do anything else - Thread(target=self._doLockSet, args=(None, 1)).start() - Thread(target=self._doLockSet, args=(None, 0)).start() - Thread(target=self._doLockSet, args=(['three'], 0)).start() - Thread(target=self._doLockSet, args=(['two'], 1)).start() - Thread(target=self._doAddSet, args=(['nine'])).start() - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doLockSet, args=(None, 1)) + self._addThread(target=self._doLockSet, args=(None, 0)) + self._addThread(target=self._doLockSet, args=(['three'], 0)) + self._addThread(target=self._doLockSet, args=(['two'], 1)) + self._addThread(target=self._doAddSet, args=(['nine'])) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.ls.release() - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') + self._waitThreads() + for _ in range(5): + self.assertEqual(self.done.get(True, 1), 'DONE') + # cleanup + self._setUpLS() + + @_Repeat + def testConcurrentSetLockAdd(self): + self.ls.acquire('one') + # Another thread wants the whole SetLock + self._addThread(target=self._doLockSet, args=(None, 0)) + self._addThread(target=self._doLockSet, args=(None, 1)) + self.assertRaises(Queue.Empty, self.done.get_nowait) + self.assertRaises(AssertionError, self.ls.add, 'four') + self.ls.release() + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self.assertEqual(self.done.get_nowait(), 'DONE') + self.ls.acquire(None) + self._addThread(target=self._doLockSet, args=(None, 0)) + self._addThread(target=self._doLockSet, args=(None, 1)) + self.assertRaises(Queue.Empty, self.done.get_nowait) + self.ls.add('four') + self.ls.add('five', acquired=1) + self.ls.add('six', acquired=1, shared=1) + self.assertEquals(self.ls._list_owned(), + set(['one', 'two', 'three', 'five', 'six'])) + self.assertEquals(self.ls._is_owned(), True) + self.assertEquals(self.ls._names(), + set(['one', 'two', 'three', 'four', 'five', 'six'])) + self.ls.release() + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self.assertEqual(self.done.get_nowait(), 'DONE') + self._setUpLS() + @_Repeat def testEmptyLockSet(self): # get the set-lock self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three'])) # now empty it... self.ls.remove(['one', 'two', 'three']) # and adds/locks by another thread still wait - Thread(target=self._doAddSet, args=(['nine'])).start() - Thread(target=self._doLockSet, args=(None, 1)).start() - Thread(target=self._doLockSet, args=(None, 0)).start() - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doAddSet, args=(['nine'])) + self._addThread(target=self._doLockSet, args=(None, 1)) + self._addThread(target=self._doLockSet, args=(None, 0)) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.ls.release() - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') + self._waitThreads() + for _ in range(3): + self.assertEqual(self.done.get_nowait(), 'DONE') # empty it again... self.assertEqual(self.ls.remove(['nine']), ['nine']) # now share it... self.assertEqual(self.ls.acquire(None, shared=1), set()) # other sharers can go, adds still wait - Thread(target=self._doLockSet, args=(None, 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - Thread(target=self._doAddSet, args=(['nine'])).start() - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doLockSet, args=(None, 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._addThread(target=self._doAddSet, args=(['nine'])) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.ls.release() - self.assertEqual(self.done.get(True, 1), 'DONE') + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._setUpLS() -class TestGanetiLockManager(unittest.TestCase): +class TestGanetiLockManager(_ThreadedTestCase): def setUp(self): + _ThreadedTestCase.setUp(self) self.nodes=['n1', 'n2'] self.instances=['i1', 'i2', 'i3'] self.GL = locking.GanetiLockManager(nodes=self.nodes, @@ -621,20 +746,48 @@ class TestGanetiLockManager(unittest.TestCase): def testAcquireRelease(self): self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL'])) + self.GL.acquire(locking.LEVEL_INSTANCE, ['i1']) self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1) - self.GL.release(locking.LEVEL_NODE) - self.GL.acquire(locking.LEVEL_NODE, ['n1']) + self.GL.release(locking.LEVEL_NODE, ['n2']) self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1'])) - self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2']) - self.GL.release(locking.LEVEL_INSTANCE, ['i2']) self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1'])) self.GL.release(locking.LEVEL_NODE) + self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set()) + self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1'])) self.GL.release(locking.LEVEL_INSTANCE) self.assertRaises(errors.LockError, self.GL.acquire, locking.LEVEL_INSTANCE, ['i5']) self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1) self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3'])) + def testAcquireWholeSets(self): + self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) + self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None), + set(self.instances)) + self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), + set(self.instances)) + self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1), + set(self.nodes)) + self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), + set(self.nodes)) + self.GL.release(locking.LEVEL_NODE) + self.GL.release(locking.LEVEL_INSTANCE) + self.GL.release(locking.LEVEL_CLUSTER) + + def testAcquireWholeAndPartial(self): + self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) + self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None), + set(self.instances)) + self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), + set(self.instances)) + self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1), + set(['n2'])) + self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), + set(['n2'])) + self.GL.release(locking.LEVEL_NODE) + self.GL.release(locking.LEVEL_INSTANCE) + self.GL.release(locking.LEVEL_CLUSTER) + def testBGLDependency(self): self.assertRaises(AssertionError, self.GL.acquire, locking.LEVEL_NODE, ['n1', 'n2']) @@ -656,7 +809,7 @@ class TestGanetiLockManager(unittest.TestCase): def testWrongOrder(self): self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) - self.GL.acquire(locking.LEVEL_INSTANCE, ['i3']) + self.GL.acquire(locking.LEVEL_NODE, ['n2']) self.assertRaises(AssertionError, self.GL.acquire, locking.LEVEL_NODE, ['n1']) self.assertRaises(AssertionError, self.GL.acquire, @@ -674,25 +827,36 @@ class TestGanetiLockManager(unittest.TestCase): except errors.LockError: self.done.put('ERR') + @_Repeat def testConcurrency(self): self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) - Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i1', 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - self.GL.acquire(locking.LEVEL_NODE, ['n1']) + self._addThread(target=self._doLock, + args=(locking.LEVEL_INSTANCE, 'i1', 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') self.GL.acquire(locking.LEVEL_INSTANCE, ['i3']) - Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i1', 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i3', 1)).start() - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doLock, + args=(locking.LEVEL_INSTANCE, 'i1', 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._addThread(target=self._doLock, + args=(locking.LEVEL_INSTANCE, 'i3', 1)) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.GL.release(locking.LEVEL_INSTANCE) - self.assertEqual(self.done.get(True, 1), 'DONE') + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1) - Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i2', 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i2', 0)).start() - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doLock, + args=(locking.LEVEL_INSTANCE, 'i2', 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._addThread(target=self._doLock, + args=(locking.LEVEL_INSTANCE, 'i2', 0)) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.GL.release(locking.LEVEL_INSTANCE) + self._waitThreads() self.assertEqual(self.done.get(True, 1), 'DONE') + self.GL.release(locking.LEVEL_CLUSTER, ['BGL']) if __name__ == '__main__':