X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/42a999d1a3bbf9911dd2de149b02ad5354dd8261..0329617acee35cf4f5299e3d431fd8e0e7fb4cbe:/test/ganeti.locking_unittest.py diff --git a/test/ganeti.locking_unittest.py b/test/ganeti.locking_unittest.py index 41ec354..aee6860 100755 --- a/test/ganeti.locking_unittest.py +++ b/test/ganeti.locking_unittest.py @@ -36,11 +36,42 @@ from threading import Thread # Since it's passed as input to a decorator it must be declared as a global. _decoratorlock = locking.SharedLock() +#: List for looping tests +ITERATIONS = range(8) + +def _Repeat(fn): + """Decorator for executing a function many times""" + def wrapper(*args, **kwargs): + for i in ITERATIONS: + fn(*args, **kwargs) + return wrapper + +class _ThreadedTestCase(unittest.TestCase): + """Test class that supports adding/waiting on threads""" + def setUp(self): + unittest.TestCase.setUp(self) + self.threads = [] + + def _addThread(self, *args, **kwargs): + """Create and remember a new thread""" + t = Thread(*args, **kwargs) + self.threads.append(t) + t.start() + return t + + def _waitThreads(self): + """Wait for all our threads to finish""" + for t in self.threads: + t.join(60) + self.failIf(t.isAlive()) + self.threads = [] + -class TestSharedLock(unittest.TestCase): +class TestSharedLock(_ThreadedTestCase): """SharedLock tests""" def setUp(self): + _ThreadedTestCase.setUp(self) self.sl = locking.SharedLock() # helper threads use the 'done' queue to tell the master they finished. self.done = Queue.Queue(0) @@ -120,73 +151,88 @@ class TestSharedLock(unittest.TestCase): self.assert_(self.done.get(True, 1)) self.sl.release() + @_Repeat def testExclusiveBlocksExclusive(self): self.sl.acquire() - Thread(target=self._doItExclusive).start() - # give it a bit of time to check that it's not actually doing anything - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doItExclusive) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() - self.assert_(self.done.get(True, 1)) + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), 'EXC') + @_Repeat def testExclusiveBlocksDelete(self): self.sl.acquire() - Thread(target=self._doItDelete).start() - # give it a bit of time to check that it's not actually doing anything - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doItDelete) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() - self.assert_(self.done.get(True, 1)) + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), 'DEL') + self.sl = locking.SharedLock() + @_Repeat def testExclusiveBlocksSharer(self): self.sl.acquire() - Thread(target=self._doItSharer).start() - time.sleep(0.05) - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doItSharer) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() - self.assert_(self.done.get(True, 1)) + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), 'SHR') + @_Repeat def testSharerBlocksExclusive(self): self.sl.acquire(shared=1) - Thread(target=self._doItExclusive).start() - time.sleep(0.05) - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doItExclusive) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() - self.assert_(self.done.get(True, 1)) + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), 'EXC') + @_Repeat def testSharerBlocksDelete(self): self.sl.acquire(shared=1) - Thread(target=self._doItDelete).start() - time.sleep(0.05) - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doItDelete) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() - self.assert_(self.done.get(True, 1)) + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), 'DEL') + self.sl = locking.SharedLock() + @_Repeat def testWaitingExclusiveBlocksSharer(self): + """SKIPPED testWaitingExclusiveBlockSharer""" + return + self.sl.acquire(shared=1) # the lock is acquired in shared mode... - Thread(target=self._doItExclusive).start() + self._addThread(target=self._doItExclusive) # ...but now an exclusive is waiting... - time.sleep(0.05) - Thread(target=self._doItSharer).start() + self._addThread(target=self._doItSharer) # ...so the sharer should be blocked as well - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() + self._waitThreads() # The exclusive passed before - self.assertEqual(self.done.get(True, 1), 'EXC') - self.assertEqual(self.done.get(True, 1), 'SHR') + self.failUnlessEqual(self.done.get_nowait(), 'EXC') + self.failUnlessEqual(self.done.get_nowait(), 'SHR') + @_Repeat def testWaitingSharerBlocksExclusive(self): + """SKIPPED testWaitingSharerBlocksExclusive""" + return + self.sl.acquire() # the lock is acquired in exclusive mode... - Thread(target=self._doItSharer).start() + self._addThread(target=self._doItSharer) # ...but now a sharer is waiting... - time.sleep(0.05) - Thread(target=self._doItExclusive).start() + self._addThread(target=self._doItExclusive) # ...the exclusive is waiting too... - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() + self._waitThreads() # The sharer passed before - self.assertEqual(self.done.get(True, 1), 'SHR') - self.assertEqual(self.done.get(True, 1), 'EXC') + self.assertEqual(self.done.get_nowait(), 'SHR') + self.assertEqual(self.done.get_nowait(), 'EXC') def testNoNonBlocking(self): self.assertRaises(NotImplementedError, self.sl.acquire, blocking=0) @@ -204,41 +250,42 @@ class TestSharedLock(unittest.TestCase): self.sl.acquire(shared=1) self.assertRaises(AssertionError, self.sl.delete) + @_Repeat def testDeletePendingSharersExclusiveDelete(self): self.sl.acquire() - Thread(target=self._doItSharer).start() - Thread(target=self._doItSharer).start() - time.sleep(0.05) - Thread(target=self._doItExclusive).start() - Thread(target=self._doItDelete).start() - time.sleep(0.05) + self._addThread(target=self._doItSharer) + self._addThread(target=self._doItSharer) + self._addThread(target=self._doItExclusive) + self._addThread(target=self._doItDelete) self.sl.delete() - # The two threads who were pending return both ERR - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') + self._waitThreads() + # The threads who were pending return ERR + for _ in range(4): + self.assertEqual(self.done.get_nowait(), 'ERR') + self.sl = locking.SharedLock() + @_Repeat def testDeletePendingDeleteExclusiveSharers(self): self.sl.acquire() - Thread(target=self._doItDelete).start() - Thread(target=self._doItExclusive).start() - time.sleep(0.05) - Thread(target=self._doItSharer).start() - Thread(target=self._doItSharer).start() - time.sleep(0.05) + self._addThread(target=self._doItDelete) + self._addThread(target=self._doItExclusive) + self._addThread(target=self._doItSharer) + self._addThread(target=self._doItSharer) self.sl.delete() + self._waitThreads() # The two threads who were pending return both ERR - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') + self.assertEqual(self.done.get_nowait(), 'ERR') + self.assertEqual(self.done.get_nowait(), 'ERR') + self.assertEqual(self.done.get_nowait(), 'ERR') + self.assertEqual(self.done.get_nowait(), 'ERR') + self.sl = locking.SharedLock() -class TestSSynchronizedDecorator(unittest.TestCase): +class TestSSynchronizedDecorator(_ThreadedTestCase): """Shared Lock Synchronized decorator test""" def setUp(self): + _ThreadedTestCase.setUp(self) # helper threads use the 'done' queue to tell the master they finished. self.done = Queue.Queue(0) @@ -264,40 +311,50 @@ class TestSSynchronizedDecorator(unittest.TestCase): self.assert_(self.done.get(True, 1)) _decoratorlock.release() + @_Repeat def testExclusiveBlocksExclusive(self): _decoratorlock.acquire() - Thread(target=self._doItExclusive).start() + self._addThread(target=self._doItExclusive) # give it a bit of time to check that it's not actually doing anything - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self.assertRaises(Queue.Empty, self.done.get_nowait) _decoratorlock.release() - self.assert_(self.done.get(True, 1)) + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), 'EXC') + @_Repeat def testExclusiveBlocksSharer(self): _decoratorlock.acquire() - Thread(target=self._doItSharer).start() - time.sleep(0.05) - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doItSharer) + self.assertRaises(Queue.Empty, self.done.get_nowait) _decoratorlock.release() - self.assert_(self.done.get(True, 1)) + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), 'SHR') + @_Repeat def testSharerBlocksExclusive(self): _decoratorlock.acquire(shared=1) - Thread(target=self._doItExclusive).start() - time.sleep(0.05) - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doItExclusive) + self.assertRaises(Queue.Empty, self.done.get_nowait) _decoratorlock.release() - self.assert_(self.done.get(True, 1)) + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), 'EXC') -class TestLockSet(unittest.TestCase): +class TestLockSet(_ThreadedTestCase): """LockSet tests""" def setUp(self): - self.resources = ['one', 'two', 'three'] - self.ls = locking.LockSet(members=self.resources) + _ThreadedTestCase.setUp(self) + self._setUpLS() # helper threads use the 'done' queue to tell the master they finished. self.done = Queue.Queue(0) + def _setUpLS(self): + """Helper to (re)initialize the lock set""" + self.resources = ['one', 'two', 'three'] + self.ls = locking.LockSet(members=self.resources) + + def testResources(self): self.assertEquals(self.ls._names(), set(self.resources)) newls = locking.LockSet() @@ -354,12 +411,20 @@ class TestLockSet(unittest.TestCase): self.assert_('five' not in self.ls._names()) self.assert_('six' not in self.ls._names()) self.assertEquals(self.ls._list_owned(), set(['seven'])) - self.ls.add('eight', acquired=1, shared=1) - self.assert_('eight' in self.ls._names()) - self.assertEquals(self.ls._list_owned(), set(['seven', 'eight'])) + self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1) self.ls.remove('seven') self.assert_('seven' not in self.ls._names()) - self.assertEquals(self.ls._list_owned(), set(['eight'])) + self.assertEquals(self.ls._list_owned(), set([])) + self.ls.acquire(None, shared=1) + self.assertRaises(AssertionError, self.ls.add, 'eight') + self.ls.release() + self.ls.acquire(None) + self.ls.add('eight', acquired=1) + self.assert_('eight' in self.ls._names()) + self.assert_('eight' in self.ls._list_owned()) + self.ls.add('nine') + self.assert_('nine' in self.ls._names()) + self.assert_('nine' not in self.ls._list_owned()) self.ls.release() self.ls.remove(['two']) self.assert_('two' not in self.ls._names()) @@ -393,6 +458,9 @@ class TestLockSet(unittest.TestCase): def testAcquireSetLock(self): # acquire the set-lock exclusively self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three'])) + self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three'])) + self.assertEquals(self.ls._is_owned(), True) + self.assertEquals(self.ls._names(), set(['one', 'two', 'three'])) # I can still add/remove elements... self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three']) self.assert_(self.ls.add('six')) @@ -403,6 +471,24 @@ class TestLockSet(unittest.TestCase): self.assertRaises(AssertionError, self.ls.add, 'five') self.ls.release() + def testAcquireWithRepetitions(self): + self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1), + set(['two', 'two', 'three'])) + self.ls.release(['two', 'two']) + self.assertEquals(self.ls._list_owned(), set(['three'])) + + def testEmptyAcquire(self): + # Acquire an empty list of locks... + self.assertEquals(self.ls.acquire([]), set()) + self.assertEquals(self.ls._list_owned(), set()) + # New locks can still be addded + self.assert_(self.ls.add('six')) + # "re-acquiring" is not an issue, since we had really acquired nothing + self.assertEquals(self.ls.acquire([], shared=1), set()) + self.assertEquals(self.ls._list_owned(), set()) + # We haven't really acquired anything, so we cannot release + self.assertRaises(AssertionError, self.ls.release) + def _doLockSet(self, set, shared): try: self.ls.acquire(set, shared=shared) @@ -422,143 +508,194 @@ class TestLockSet(unittest.TestCase): def _doRemoveSet(self, set): self.done.put(self.ls.remove(set)) + @_Repeat def testConcurrentSharedAcquire(self): self.ls.acquire(['one', 'two'], shared=1) - Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - Thread(target=self._doLockSet, args=(['one', 'two', 'three'], 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - Thread(target=self._doLockSet, args=('three', 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start() - Thread(target=self._doLockSet, args=(['two', 'three'], 0)).start() - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doLockSet, args=(['one', 'two'], 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._addThread(target=self._doLockSet, args=('three', 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._addThread(target=self._doLockSet, args=(['one', 'two'], 0)) + self._addThread(target=self._doLockSet, args=(['two', 'three'], 0)) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.ls.release() - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self.assertEqual(self.done.get_nowait(), 'DONE') + @_Repeat def testConcurrentExclusiveAcquire(self): self.ls.acquire(['one', 'two']) - Thread(target=self._doLockSet, args=('three', 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - Thread(target=self._doLockSet, args=('three', 0)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start() - Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start() - Thread(target=self._doLockSet, args=('one', 0)).start() - Thread(target=self._doLockSet, args=('one', 1)).start() - Thread(target=self._doLockSet, args=(['two', 'three'], 0)).start() - Thread(target=self._doLockSet, args=(['two', 'three'], 1)).start() - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doLockSet, args=('three', 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._addThread(target=self._doLockSet, args=('three', 0)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._addThread(target=self._doLockSet, args=(['one', 'two'], 0)) + self._addThread(target=self._doLockSet, args=(['one', 'two'], 1)) + self._addThread(target=self._doLockSet, args=('one', 0)) + self._addThread(target=self._doLockSet, args=('one', 1)) + self._addThread(target=self._doLockSet, args=(['two', 'three'], 0)) + self._addThread(target=self._doLockSet, args=(['two', 'three'], 1)) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.ls.release() - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') + self._waitThreads() + for _ in range(6): + self.failUnlessEqual(self.done.get_nowait(), 'DONE') + @_Repeat def testConcurrentRemove(self): self.ls.add('four') self.ls.acquire(['one', 'two', 'four']) - Thread(target=self._doLockSet, args=(['one', 'four'], 0)).start() - Thread(target=self._doLockSet, args=(['one', 'four'], 1)).start() - Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start() - Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start() - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doLockSet, args=(['one', 'four'], 0)) + self._addThread(target=self._doLockSet, args=(['one', 'four'], 1)) + self._addThread(target=self._doLockSet, args=(['one', 'two'], 0)) + self._addThread(target=self._doLockSet, args=(['one', 'two'], 1)) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.ls.remove('one') self.ls.release() - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') + self._waitThreads() + for i in range(4): + self.failUnlessEqual(self.done.get_nowait(), 'ERR') self.ls.add(['five', 'six'], acquired=1) - Thread(target=self._doLockSet, args=(['three', 'six'], 1)).start() - Thread(target=self._doLockSet, args=(['three', 'six'], 0)).start() - Thread(target=self._doLockSet, args=(['four', 'six'], 1)).start() - Thread(target=self._doLockSet, args=(['four', 'six'], 0)).start() + self._addThread(target=self._doLockSet, args=(['three', 'six'], 1)) + self._addThread(target=self._doLockSet, args=(['three', 'six'], 0)) + self._addThread(target=self._doLockSet, args=(['four', 'six'], 1)) + self._addThread(target=self._doLockSet, args=(['four', 'six'], 0)) self.ls.remove('five') self.ls.release() - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') + self._waitThreads() + for i in range(4): + self.failUnlessEqual(self.done.get_nowait(), 'DONE') self.ls.acquire(['three', 'four']) - Thread(target=self._doRemoveSet, args=(['four', 'six'], )).start() - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doRemoveSet, args=(['four', 'six'], )) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.ls.remove('four') - self.assertEqual(self.done.get(True, 1), ['six']) - Thread(target=self._doRemoveSet, args=(['two'])).start() - self.assertEqual(self.done.get(True, 1), ['two']) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), ['six']) + self._addThread(target=self._doRemoveSet, args=(['two'])) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), ['two']) self.ls.release() + # reset lockset + self._setUpLS() + @_Repeat def testConcurrentSharedSetLock(self): # share the set-lock... self.ls.acquire(None, shared=1) # ...another thread can share it too - Thread(target=self._doLockSet, args=(None, 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') + self._addThread(target=self._doLockSet, args=(None, 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') # ...or just share some elements - Thread(target=self._doLockSet, args=(['one', 'three'], 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') + self._addThread(target=self._doLockSet, args=(['one', 'three'], 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') # ...but not add new ones or remove any - Thread(target=self._doAddSet, args=(['nine'])).start() - Thread(target=self._doRemoveSet, args=(['two'], )).start() - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + t = self._addThread(target=self._doAddSet, args=(['nine'])) + self._addThread(target=self._doRemoveSet, args=(['two'], )) + self.assertRaises(Queue.Empty, self.done.get_nowait) # this just releases the set-lock self.ls.release([]) - self.assertEqual(self.done.get(True, 1), 'DONE') + t.join(60) + self.assertEqual(self.done.get_nowait(), 'DONE') # release the lock on the actual elements so remove() can proceed too self.ls.release() - self.assertEqual(self.done.get(True, 1), ['two']) + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), ['two']) + # reset lockset + self._setUpLS() + @_Repeat def testConcurrentExclusiveSetLock(self): # acquire the set-lock... self.ls.acquire(None, shared=0) # ...no one can do anything else - Thread(target=self._doLockSet, args=(None, 1)).start() - Thread(target=self._doLockSet, args=(None, 0)).start() - Thread(target=self._doLockSet, args=(['three'], 0)).start() - Thread(target=self._doLockSet, args=(['two'], 1)).start() - Thread(target=self._doAddSet, args=(['nine'])).start() - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doLockSet, args=(None, 1)) + self._addThread(target=self._doLockSet, args=(None, 0)) + self._addThread(target=self._doLockSet, args=(['three'], 0)) + self._addThread(target=self._doLockSet, args=(['two'], 1)) + self._addThread(target=self._doAddSet, args=(['nine'])) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.ls.release() - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') + self._waitThreads() + for _ in range(5): + self.assertEqual(self.done.get(True, 1), 'DONE') + # cleanup + self._setUpLS() + + @_Repeat + def testConcurrentSetLockAdd(self): + self.ls.acquire('one') + # Another thread wants the whole SetLock + self._addThread(target=self._doLockSet, args=(None, 0)) + self._addThread(target=self._doLockSet, args=(None, 1)) + self.assertRaises(Queue.Empty, self.done.get_nowait) + self.assertRaises(AssertionError, self.ls.add, 'four') + self.ls.release() + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self.assertEqual(self.done.get_nowait(), 'DONE') + self.ls.acquire(None) + self._addThread(target=self._doLockSet, args=(None, 0)) + self._addThread(target=self._doLockSet, args=(None, 1)) + self.assertRaises(Queue.Empty, self.done.get_nowait) + self.ls.add('four') + self.ls.add('five', acquired=1) + self.ls.add('six', acquired=1, shared=1) + self.assertEquals(self.ls._list_owned(), + set(['one', 'two', 'three', 'five', 'six'])) + self.assertEquals(self.ls._is_owned(), True) + self.assertEquals(self.ls._names(), + set(['one', 'two', 'three', 'four', 'five', 'six'])) + self.ls.release() + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self.assertEqual(self.done.get_nowait(), 'DONE') + self._setUpLS() + @_Repeat def testEmptyLockSet(self): # get the set-lock self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three'])) # now empty it... self.ls.remove(['one', 'two', 'three']) # and adds/locks by another thread still wait - Thread(target=self._doAddSet, args=(['nine'])).start() - Thread(target=self._doLockSet, args=(None, 1)).start() - Thread(target=self._doLockSet, args=(None, 0)).start() - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doAddSet, args=(['nine'])) + self._addThread(target=self._doLockSet, args=(None, 1)) + self._addThread(target=self._doLockSet, args=(None, 0)) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.ls.release() - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') + self._waitThreads() + for _ in range(3): + self.assertEqual(self.done.get_nowait(), 'DONE') # empty it again... self.assertEqual(self.ls.remove(['nine']), ['nine']) # now share it... self.assertEqual(self.ls.acquire(None, shared=1), set()) # other sharers can go, adds still wait - Thread(target=self._doLockSet, args=(None, 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - Thread(target=self._doAddSet, args=(['nine'])).start() - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doLockSet, args=(None, 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._addThread(target=self._doAddSet, args=(['nine'])) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.ls.release() - self.assertEqual(self.done.get(True, 1), 'DONE') + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._setUpLS() -class TestGanetiLockManager(unittest.TestCase): +class TestGanetiLockManager(_ThreadedTestCase): def setUp(self): + _ThreadedTestCase.setUp(self) self.nodes=['n1', 'n2'] self.instances=['i1', 'i2', 'i3'] self.GL = locking.GanetiLockManager(nodes=self.nodes, @@ -572,6 +709,8 @@ class TestGanetiLockManager(unittest.TestCase): def testLockingConstants(self): # The locking library internally cheats by assuming its constants have some # relationships with each other. Check those hold true. + # This relationship is also used in the Processor to recursively acquire + # the right locks. Again, please don't break it. for i in range(len(locking.LEVELS)): self.assertEqual(i, locking.LEVELS[i]) @@ -583,7 +722,6 @@ class TestGanetiLockManager(unittest.TestCase): self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes)) self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(self.instances)) - self.assertEqual(self.GL._names(locking.LEVEL_CONFIG), set(['config'])) def testInitAndResources(self): locking.GanetiLockManager._instance = None @@ -591,14 +729,12 @@ class TestGanetiLockManager(unittest.TestCase): self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL'])) self.assertEqual(self.GL._names(locking.LEVEL_NODE), set()) self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set()) - self.assertEqual(self.GL._names(locking.LEVEL_CONFIG), set(['config'])) locking.GanetiLockManager._instance = None self.GL = locking.GanetiLockManager(nodes=self.nodes) self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL'])) self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes)) self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set()) - self.assertEqual(self.GL._names(locking.LEVEL_CONFIG), set(['config'])) locking.GanetiLockManager._instance = None self.GL = locking.GanetiLockManager(instances=self.instances) @@ -606,27 +742,52 @@ class TestGanetiLockManager(unittest.TestCase): self.assertEqual(self.GL._names(locking.LEVEL_NODE), set()) self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(self.instances)) - self.assertEqual(self.GL._names(locking.LEVEL_CONFIG), set(['config'])) def testAcquireRelease(self): self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL'])) + self.GL.acquire(locking.LEVEL_INSTANCE, ['i1']) self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1) - self.GL.release(locking.LEVEL_NODE) - self.GL.acquire(locking.LEVEL_NODE, ['n1']) + self.GL.release(locking.LEVEL_NODE, ['n2']) self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1'])) - self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2']) - self.GL.acquire(locking.LEVEL_CONFIG, ['config']) - self.GL.release(locking.LEVEL_INSTANCE, ['i2']) self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1'])) self.GL.release(locking.LEVEL_NODE) + self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set()) + self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1'])) self.GL.release(locking.LEVEL_INSTANCE) - self.GL.release(locking.LEVEL_CONFIG) self.assertRaises(errors.LockError, self.GL.acquire, locking.LEVEL_INSTANCE, ['i5']) self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1) self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3'])) + def testAcquireWholeSets(self): + self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) + self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None), + set(self.instances)) + self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), + set(self.instances)) + self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1), + set(self.nodes)) + self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), + set(self.nodes)) + self.GL.release(locking.LEVEL_NODE) + self.GL.release(locking.LEVEL_INSTANCE) + self.GL.release(locking.LEVEL_CLUSTER) + + def testAcquireWholeAndPartial(self): + self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) + self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None), + set(self.instances)) + self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), + set(self.instances)) + self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1), + set(['n2'])) + self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), + set(['n2'])) + self.GL.release(locking.LEVEL_NODE) + self.GL.release(locking.LEVEL_INSTANCE) + self.GL.release(locking.LEVEL_CLUSTER) + def testBGLDependency(self): self.assertRaises(AssertionError, self.GL.acquire, locking.LEVEL_NODE, ['n1', 'n2']) @@ -645,27 +806,14 @@ class TestGanetiLockManager(unittest.TestCase): self.assertRaises(AssertionError, self.GL.release, locking.LEVEL_CLUSTER) self.GL.release(locking.LEVEL_INSTANCE) - self.GL.acquire(locking.LEVEL_CONFIG, ['config']) - self.assertRaises(AssertionError, self.GL.release, - locking.LEVEL_CLUSTER) def testWrongOrder(self): self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) - self.GL.acquire(locking.LEVEL_INSTANCE, ['i3']) - self.assertRaises(AssertionError, self.GL.acquire, - locking.LEVEL_NODE, ['n1']) - self.assertRaises(AssertionError, self.GL.acquire, - locking.LEVEL_INSTANCE, ['i2']) - self.GL.acquire(locking.LEVEL_CONFIG, ['config']) - self.assertRaises(AssertionError, self.GL.acquire, - locking.LEVEL_CONFIG, ['config']) - self.GL.release(locking.LEVEL_INSTANCE) + self.GL.acquire(locking.LEVEL_NODE, ['n2']) self.assertRaises(AssertionError, self.GL.acquire, locking.LEVEL_NODE, ['n1']) self.assertRaises(AssertionError, self.GL.acquire, locking.LEVEL_INSTANCE, ['i2']) - self.assertRaises(AssertionError, self.GL.acquire, - locking.LEVEL_CONFIG, ['config']) # Helper function to run as a thread that shared the BGL and then acquires # some locks at another level. @@ -679,27 +827,36 @@ class TestGanetiLockManager(unittest.TestCase): except errors.LockError: self.done.put('ERR') + @_Repeat def testConcurrency(self): self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) - Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i1', 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - self.GL.acquire(locking.LEVEL_NODE, ['n1']) + self._addThread(target=self._doLock, + args=(locking.LEVEL_INSTANCE, 'i1', 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') self.GL.acquire(locking.LEVEL_INSTANCE, ['i3']) - self.GL.acquire(locking.LEVEL_CONFIG, ['config']) - Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i1', 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i3', 1)).start() - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) - self.GL.release(locking.LEVEL_CONFIG) + self._addThread(target=self._doLock, + args=(locking.LEVEL_INSTANCE, 'i1', 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._addThread(target=self._doLock, + args=(locking.LEVEL_INSTANCE, 'i3', 1)) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.GL.release(locking.LEVEL_INSTANCE) - self.assertEqual(self.done.get(True, 1), 'DONE') + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1) - Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i2', 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i2', 0)).start() - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doLock, + args=(locking.LEVEL_INSTANCE, 'i2', 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._addThread(target=self._doLock, + args=(locking.LEVEL_INSTANCE, 'i2', 0)) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.GL.release(locking.LEVEL_INSTANCE) + self._waitThreads() self.assertEqual(self.done.get(True, 1), 'DONE') + self.GL.release(locking.LEVEL_CLUSTER, ['BGL']) if __name__ == '__main__':