+ self._waitThreads()
+ for _ in range(5):
+ self.assertEqual(self.done.get(True, 1), 'DONE')
+ # cleanup
+ self._setUpLS()
+
+ @_Repeat
+ def testConcurrentSetLockAdd(self):
+ self.ls.acquire('one')
+ # Another thread wants the whole SetLock
+ self._addThread(target=self._doLockSet, args=(None, 0))
+ self._addThread(target=self._doLockSet, args=(None, 1))
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+ self.assertRaises(AssertionError, self.ls.add, 'four')
+ self.ls.release()
+ self._waitThreads()
+ self.assertEqual(self.done.get_nowait(), 'DONE')
+ self.assertEqual(self.done.get_nowait(), 'DONE')
+ self.ls.acquire(None)
+ self._addThread(target=self._doLockSet, args=(None, 0))
+ self._addThread(target=self._doLockSet, args=(None, 1))
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+ self.ls.add('four')
+ self.ls.add('five', acquired=1)
+ self.ls.add('six', acquired=1, shared=1)
+ self.assertEquals(self.ls._list_owned(),
+ set(['one', 'two', 'three', 'five', 'six']))
+ self.assertEquals(self.ls._is_owned(), True)
+ self.assertEquals(self.ls._names(),
+ set(['one', 'two', 'three', 'four', 'five', 'six']))
+ self.ls.release()
+ self._waitThreads()
+ self.assertEqual(self.done.get_nowait(), 'DONE')
+ self.assertEqual(self.done.get_nowait(), 'DONE')
+ self._setUpLS()
+
+ @_Repeat
+ def testEmptyLockSet(self):
+ # get the set-lock
+ self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
+ # now empty it...
+ self.ls.remove(['one', 'two', 'three'])
+ # and adds/locks by another thread still wait
+ self._addThread(target=self._doAddSet, args=(['nine']))
+ self._addThread(target=self._doLockSet, args=(None, 1))
+ self._addThread(target=self._doLockSet, args=(None, 0))
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+ self.ls.release()
+ self._waitThreads()
+ for _ in range(3):
+ self.assertEqual(self.done.get_nowait(), 'DONE')
+ # empty it again...
+ self.assertEqual(self.ls.remove(['nine']), ['nine'])
+ # now share it...
+ self.assertEqual(self.ls.acquire(None, shared=1), set())
+ # other sharers can go, adds still wait
+ self._addThread(target=self._doLockSet, args=(None, 1))
+ self._waitThreads()
+ self.assertEqual(self.done.get_nowait(), 'DONE')
+ self._addThread(target=self._doAddSet, args=(['nine']))
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+ self.ls.release()
+ self._waitThreads()
+ self.assertEqual(self.done.get_nowait(), 'DONE')
+ self._setUpLS()
+
+
+class TestGanetiLockManager(_ThreadedTestCase):
+
+ def setUp(self):
+ _ThreadedTestCase.setUp(self)
+ self.nodes=['n1', 'n2']
+ self.instances=['i1', 'i2', 'i3']
+ self.GL = locking.GanetiLockManager(nodes=self.nodes,
+ instances=self.instances)
+ self.done = Queue.Queue(0)
+
+ def tearDown(self):
+ # Don't try this at home...
+ locking.GanetiLockManager._instance = None
+
+ def testLockingConstants(self):
+ # The locking library internally cheats by assuming its constants have some
+ # relationships with each other. Check those hold true.
+ # This relationship is also used in the Processor to recursively acquire
+ # the right locks. Again, please don't break it.
+ for i in range(len(locking.LEVELS)):
+ self.assertEqual(i, locking.LEVELS[i])
+
+ def testDoubleGLFails(self):
+ self.assertRaises(AssertionError, locking.GanetiLockManager)
+
+ def testLockNames(self):
+ self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
+ self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
+ self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
+ set(self.instances))
+
+ def testInitAndResources(self):
+ locking.GanetiLockManager._instance = None
+ self.GL = locking.GanetiLockManager()
+ self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
+ self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
+ self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
+
+ locking.GanetiLockManager._instance = None
+ self.GL = locking.GanetiLockManager(nodes=self.nodes)
+ self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
+ self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
+ self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
+
+ locking.GanetiLockManager._instance = None
+ self.GL = locking.GanetiLockManager(instances=self.instances)
+ self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
+ self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
+ self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
+ set(self.instances))
+
+ def testAcquireRelease(self):
+ self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
+ self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
+ self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
+ self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
+ self.GL.release(locking.LEVEL_NODE, ['n2'])
+ self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
+ self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
+ self.GL.release(locking.LEVEL_NODE)
+ self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
+ self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
+ self.GL.release(locking.LEVEL_INSTANCE)
+ self.assertRaises(errors.LockError, self.GL.acquire,
+ locking.LEVEL_INSTANCE, ['i5'])
+ self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
+ self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
+
+ def testAcquireWholeSets(self):
+ self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
+ self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
+ set(self.instances))
+ self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
+ set(self.instances))
+ self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
+ set(self.nodes))
+ self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
+ set(self.nodes))
+ self.GL.release(locking.LEVEL_NODE)
+ self.GL.release(locking.LEVEL_INSTANCE)
+ self.GL.release(locking.LEVEL_CLUSTER)
+
+ def testAcquireWholeAndPartial(self):
+ self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
+ self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
+ set(self.instances))
+ self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
+ set(self.instances))
+ self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
+ set(['n2']))
+ self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
+ set(['n2']))
+ self.GL.release(locking.LEVEL_NODE)
+ self.GL.release(locking.LEVEL_INSTANCE)
+ self.GL.release(locking.LEVEL_CLUSTER)
+
+ def testBGLDependency(self):
+ self.assertRaises(AssertionError, self.GL.acquire,
+ locking.LEVEL_NODE, ['n1', 'n2'])
+ self.assertRaises(AssertionError, self.GL.acquire,
+ locking.LEVEL_INSTANCE, ['i3'])
+ self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
+ self.GL.acquire(locking.LEVEL_NODE, ['n1'])
+ self.assertRaises(AssertionError, self.GL.release,
+ locking.LEVEL_CLUSTER, ['BGL'])
+ self.assertRaises(AssertionError, self.GL.release,
+ locking.LEVEL_CLUSTER)
+ self.GL.release(locking.LEVEL_NODE)
+ self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
+ self.assertRaises(AssertionError, self.GL.release,
+ locking.LEVEL_CLUSTER, ['BGL'])
+ self.assertRaises(AssertionError, self.GL.release,
+ locking.LEVEL_CLUSTER)
+ self.GL.release(locking.LEVEL_INSTANCE)
+
+ def testWrongOrder(self):
+ self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
+ self.GL.acquire(locking.LEVEL_NODE, ['n2'])
+ self.assertRaises(AssertionError, self.GL.acquire,
+ locking.LEVEL_NODE, ['n1'])
+ self.assertRaises(AssertionError, self.GL.acquire,
+ locking.LEVEL_INSTANCE, ['i2'])
+
+ # Helper function to run as a thread that shared the BGL and then acquires
+ # some locks at another level.
+ def _doLock(self, level, names, shared):
+ try:
+ self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
+ self.GL.acquire(level, names, shared=shared)
+ self.done.put('DONE')
+ self.GL.release(level)
+ self.GL.release(locking.LEVEL_CLUSTER)
+ except errors.LockError:
+ self.done.put('ERR')
+
+ @_Repeat
+ def testConcurrency(self):
+ self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
+ self._addThread(target=self._doLock,
+ args=(locking.LEVEL_INSTANCE, 'i1', 1))
+ self._waitThreads()
+ self.assertEqual(self.done.get_nowait(), 'DONE')
+ self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
+ self._addThread(target=self._doLock,
+ args=(locking.LEVEL_INSTANCE, 'i1', 1))
+ self._waitThreads()
+ self.assertEqual(self.done.get_nowait(), 'DONE')
+ self._addThread(target=self._doLock,
+ args=(locking.LEVEL_INSTANCE, 'i3', 1))
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+ self.GL.release(locking.LEVEL_INSTANCE)
+ self._waitThreads()
+ self.assertEqual(self.done.get_nowait(), 'DONE')
+ self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
+ self._addThread(target=self._doLock,
+ args=(locking.LEVEL_INSTANCE, 'i2', 1))
+ self._waitThreads()
+ self.assertEqual(self.done.get_nowait(), 'DONE')
+ self._addThread(target=self._doLock,
+ args=(locking.LEVEL_INSTANCE, 'i2', 0))
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+ self.GL.release(locking.LEVEL_INSTANCE)
+ self._waitThreads()
+ self.assertEqual(self.done.get(True, 1), 'DONE')
+ self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])