Enable auto-unit formatting in script output
[ganeti-local] / test / ganeti.locking_unittest.py
index 150b02c..aee6860 100755 (executable)
@@ -32,10 +32,46 @@ from ganeti import errors
 from threading import Thread
 
 
-class TestSharedLock(unittest.TestCase):
+# This is used to test the ssynchronize decorator.
+# Since it's passed as input to a decorator it must be declared as a global.
+_decoratorlock = locking.SharedLock()
+
+#: List for looping tests
+ITERATIONS = range(8)
+
+def _Repeat(fn):
+  """Decorator for executing a function many times"""
+  def wrapper(*args, **kwargs):
+    for i in ITERATIONS:
+      fn(*args, **kwargs)
+  return wrapper
+
+class _ThreadedTestCase(unittest.TestCase):
+  """Test class that supports adding/waiting on threads"""
+  def setUp(self):
+    unittest.TestCase.setUp(self)
+    self.threads = []
+
+  def _addThread(self, *args, **kwargs):
+    """Create and remember a new thread"""
+    t = Thread(*args, **kwargs)
+    self.threads.append(t)
+    t.start()
+    return t
+
+  def _waitThreads(self):
+    """Wait for all our threads to finish"""
+    for t in self.threads:
+      t.join(60)
+      self.failIf(t.isAlive())
+    self.threads = []
+
+
+class TestSharedLock(_ThreadedTestCase):
   """SharedLock tests"""
 
   def setUp(self):
+    _ThreadedTestCase.setUp(self)
     self.sl = locking.SharedLock()
     # helper threads use the 'done' queue to tell the master they finished.
     self.done = Queue.Queue(0)
@@ -115,73 +151,88 @@ class TestSharedLock(unittest.TestCase):
     self.assert_(self.done.get(True, 1))
     self.sl.release()
 
+  @_Repeat
   def testExclusiveBlocksExclusive(self):
     self.sl.acquire()
-    Thread(target=self._doItExclusive).start()
-    # give it a bit of time to check that it's not actually doing anything
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self._addThread(target=self._doItExclusive)
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.sl.release()
-    self.assert_(self.done.get(True, 1))
+    self._waitThreads()
+    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
 
+  @_Repeat
   def testExclusiveBlocksDelete(self):
     self.sl.acquire()
-    Thread(target=self._doItDelete).start()
-    # give it a bit of time to check that it's not actually doing anything
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self._addThread(target=self._doItDelete)
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.sl.release()
-    self.assert_(self.done.get(True, 1))
+    self._waitThreads()
+    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
+    self.sl = locking.SharedLock()
 
+  @_Repeat
   def testExclusiveBlocksSharer(self):
     self.sl.acquire()
-    Thread(target=self._doItSharer).start()
-    time.sleep(0.05)
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self._addThread(target=self._doItSharer)
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.sl.release()
-    self.assert_(self.done.get(True, 1))
+    self._waitThreads()
+    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
 
+  @_Repeat
   def testSharerBlocksExclusive(self):
     self.sl.acquire(shared=1)
-    Thread(target=self._doItExclusive).start()
-    time.sleep(0.05)
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self._addThread(target=self._doItExclusive)
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.sl.release()
-    self.assert_(self.done.get(True, 1))
+    self._waitThreads()
+    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
 
+  @_Repeat
   def testSharerBlocksDelete(self):
     self.sl.acquire(shared=1)
-    Thread(target=self._doItDelete).start()
-    time.sleep(0.05)
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self._addThread(target=self._doItDelete)
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.sl.release()
-    self.assert_(self.done.get(True, 1))
+    self._waitThreads()
+    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
+    self.sl = locking.SharedLock()
 
+  @_Repeat
   def testWaitingExclusiveBlocksSharer(self):
+    """SKIPPED testWaitingExclusiveBlockSharer"""
+    return
+
     self.sl.acquire(shared=1)
     # the lock is acquired in shared mode...
-    Thread(target=self._doItExclusive).start()
+    self._addThread(target=self._doItExclusive)
     # ...but now an exclusive is waiting...
-    time.sleep(0.05)
-    Thread(target=self._doItSharer).start()
+    self._addThread(target=self._doItSharer)
     # ...so the sharer should be blocked as well
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.sl.release()
+    self._waitThreads()
     # The exclusive passed before
-    self.assertEqual(self.done.get(True, 1), 'EXC')
-    self.assertEqual(self.done.get(True, 1), 'SHR')
+    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
+    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
 
+  @_Repeat
   def testWaitingSharerBlocksExclusive(self):
+    """SKIPPED testWaitingSharerBlocksExclusive"""
+    return
+
     self.sl.acquire()
     # the lock is acquired in exclusive mode...
-    Thread(target=self._doItSharer).start()
+    self._addThread(target=self._doItSharer)
     # ...but now a sharer is waiting...
-    time.sleep(0.05)
-    Thread(target=self._doItExclusive).start()
+    self._addThread(target=self._doItExclusive)
     # ...the exclusive is waiting too...
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.sl.release()
+    self._waitThreads()
     # The sharer passed before
-    self.assertEqual(self.done.get(True, 1), 'SHR')
-    self.assertEqual(self.done.get(True, 1), 'EXC')
+    self.assertEqual(self.done.get_nowait(), 'SHR')
+    self.assertEqual(self.done.get_nowait(), 'EXC')
 
   def testNoNonBlocking(self):
     self.assertRaises(NotImplementedError, self.sl.acquire, blocking=0)
@@ -199,46 +250,111 @@ class TestSharedLock(unittest.TestCase):
     self.sl.acquire(shared=1)
     self.assertRaises(AssertionError, self.sl.delete)
 
+  @_Repeat
   def testDeletePendingSharersExclusiveDelete(self):
     self.sl.acquire()
-    Thread(target=self._doItSharer).start()
-    Thread(target=self._doItSharer).start()
-    time.sleep(0.05)
-    Thread(target=self._doItExclusive).start()
-    Thread(target=self._doItDelete).start()
-    time.sleep(0.05)
+    self._addThread(target=self._doItSharer)
+    self._addThread(target=self._doItSharer)
+    self._addThread(target=self._doItExclusive)
+    self._addThread(target=self._doItDelete)
     self.sl.delete()
-    # The two threads who were pending return both ERR
-    self.assertEqual(self.done.get(True, 1), 'ERR')
-    self.assertEqual(self.done.get(True, 1), 'ERR')
-    self.assertEqual(self.done.get(True, 1), 'ERR')
-    self.assertEqual(self.done.get(True, 1), 'ERR')
+    self._waitThreads()
+    # The threads who were pending return ERR
+    for _ in range(4):
+      self.assertEqual(self.done.get_nowait(), 'ERR')
+    self.sl = locking.SharedLock()
 
+  @_Repeat
   def testDeletePendingDeleteExclusiveSharers(self):
     self.sl.acquire()
-    Thread(target=self._doItDelete).start()
-    Thread(target=self._doItExclusive).start()
-    time.sleep(0.05)
-    Thread(target=self._doItSharer).start()
-    Thread(target=self._doItSharer).start()
-    time.sleep(0.05)
+    self._addThread(target=self._doItDelete)
+    self._addThread(target=self._doItExclusive)
+    self._addThread(target=self._doItSharer)
+    self._addThread(target=self._doItSharer)
     self.sl.delete()
+    self._waitThreads()
     # The two threads who were pending return both ERR
-    self.assertEqual(self.done.get(True, 1), 'ERR')
-    self.assertEqual(self.done.get(True, 1), 'ERR')
-    self.assertEqual(self.done.get(True, 1), 'ERR')
-    self.assertEqual(self.done.get(True, 1), 'ERR')
+    self.assertEqual(self.done.get_nowait(), 'ERR')
+    self.assertEqual(self.done.get_nowait(), 'ERR')
+    self.assertEqual(self.done.get_nowait(), 'ERR')
+    self.assertEqual(self.done.get_nowait(), 'ERR')
+    self.sl = locking.SharedLock()
+
+
+class TestSSynchronizedDecorator(_ThreadedTestCase):
+  """Shared Lock Synchronized decorator test"""
 
+  def setUp(self):
+    _ThreadedTestCase.setUp(self)
+    # helper threads use the 'done' queue to tell the master they finished.
+    self.done = Queue.Queue(0)
 
-class TestLockSet(unittest.TestCase):
+  @locking.ssynchronized(_decoratorlock)
+  def _doItExclusive(self):
+    self.assert_(_decoratorlock._is_owned())
+    self.done.put('EXC')
+
+  @locking.ssynchronized(_decoratorlock, shared=1)
+  def _doItSharer(self):
+    self.assert_(_decoratorlock._is_owned(shared=1))
+    self.done.put('SHR')
+
+  def testDecoratedFunctions(self):
+    self._doItExclusive()
+    self.assert_(not _decoratorlock._is_owned())
+    self._doItSharer()
+    self.assert_(not _decoratorlock._is_owned())
+
+  def testSharersCanCoexist(self):
+    _decoratorlock.acquire(shared=1)
+    Thread(target=self._doItSharer).start()
+    self.assert_(self.done.get(True, 1))
+    _decoratorlock.release()
+
+  @_Repeat
+  def testExclusiveBlocksExclusive(self):
+    _decoratorlock.acquire()
+    self._addThread(target=self._doItExclusive)
+    # give it a bit of time to check that it's not actually doing anything
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
+    _decoratorlock.release()
+    self._waitThreads()
+    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
+
+  @_Repeat
+  def testExclusiveBlocksSharer(self):
+    _decoratorlock.acquire()
+    self._addThread(target=self._doItSharer)
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
+    _decoratorlock.release()
+    self._waitThreads()
+    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
+
+  @_Repeat
+  def testSharerBlocksExclusive(self):
+    _decoratorlock.acquire(shared=1)
+    self._addThread(target=self._doItExclusive)
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
+    _decoratorlock.release()
+    self._waitThreads()
+    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
+
+
+class TestLockSet(_ThreadedTestCase):
   """LockSet tests"""
 
   def setUp(self):
-    self.resources = ['one', 'two', 'three']
-    self.ls = locking.LockSet(members=self.resources)
+    _ThreadedTestCase.setUp(self)
+    self._setUpLS()
     # helper threads use the 'done' queue to tell the master they finished.
     self.done = Queue.Queue(0)
 
+  def _setUpLS(self):
+    """Helper to (re)initialize the lock set"""
+    self.resources = ['one', 'two', 'three']
+    self.ls = locking.LockSet(members=self.resources)
+
+
   def testResources(self):
     self.assertEquals(self.ls._names(), set(self.resources))
     newls = locking.LockSet()
@@ -295,12 +411,20 @@ class TestLockSet(unittest.TestCase):
     self.assert_('five' not in self.ls._names())
     self.assert_('six' not in self.ls._names())
     self.assertEquals(self.ls._list_owned(), set(['seven']))
-    self.ls.add('eight', acquired=1, shared=1)
-    self.assert_('eight' in self.ls._names())
-    self.assertEquals(self.ls._list_owned(), set(['seven', 'eight']))
+    self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
     self.ls.remove('seven')
     self.assert_('seven' not in self.ls._names())
-    self.assertEquals(self.ls._list_owned(), set(['eight']))
+    self.assertEquals(self.ls._list_owned(), set([]))
+    self.ls.acquire(None, shared=1)
+    self.assertRaises(AssertionError, self.ls.add, 'eight')
+    self.ls.release()
+    self.ls.acquire(None)
+    self.ls.add('eight', acquired=1)
+    self.assert_('eight' in self.ls._names())
+    self.assert_('eight' in self.ls._list_owned())
+    self.ls.add('nine')
+    self.assert_('nine' in self.ls._names())
+    self.assert_('nine' not in self.ls._list_owned())
     self.ls.release()
     self.ls.remove(['two'])
     self.assert_('two' not in self.ls._names())
@@ -334,6 +458,9 @@ class TestLockSet(unittest.TestCase):
   def testAcquireSetLock(self):
     # acquire the set-lock exclusively
     self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
+    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
+    self.assertEquals(self.ls._is_owned(), True)
+    self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
     # I can still add/remove elements...
     self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
     self.assert_(self.ls.add('six'))
@@ -344,6 +471,24 @@ class TestLockSet(unittest.TestCase):
     self.assertRaises(AssertionError, self.ls.add, 'five')
     self.ls.release()
 
+  def testAcquireWithRepetitions(self):
+    self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
+                      set(['two', 'two', 'three']))
+    self.ls.release(['two', 'two'])
+    self.assertEquals(self.ls._list_owned(), set(['three']))
+
+  def testEmptyAcquire(self):
+    # Acquire an empty list of locks...
+    self.assertEquals(self.ls.acquire([]), set())
+    self.assertEquals(self.ls._list_owned(), set())
+    # New locks can still be addded
+    self.assert_(self.ls.add('six'))
+    # "re-acquiring" is not an issue, since we had really acquired nothing
+    self.assertEquals(self.ls.acquire([], shared=1), set())
+    self.assertEquals(self.ls._list_owned(), set())
+    # We haven't really acquired anything, so we cannot release
+    self.assertRaises(AssertionError, self.ls.release)
+
   def _doLockSet(self, set, shared):
     try:
       self.ls.acquire(set, shared=shared)
@@ -363,143 +508,194 @@ class TestLockSet(unittest.TestCase):
   def _doRemoveSet(self, set):
     self.done.put(self.ls.remove(set))
 
+  @_Repeat
   def testConcurrentSharedAcquire(self):
     self.ls.acquire(['one', 'two'], shared=1)
-    Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    Thread(target=self._doLockSet, args=(['one', 'two', 'three'], 1)).start()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    Thread(target=self._doLockSet, args=('three', 1)).start()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start()
-    Thread(target=self._doLockSet, args=(['two', 'three'], 0)).start()
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), 'DONE')
+    self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), 'DONE')
+    self._addThread(target=self._doLockSet, args=('three', 1))
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), 'DONE')
+    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
+    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.ls.release()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    self.assertEqual(self.done.get(True, 1), 'DONE')
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), 'DONE')
+    self.assertEqual(self.done.get_nowait(), 'DONE')
 
+  @_Repeat
   def testConcurrentExclusiveAcquire(self):
     self.ls.acquire(['one', 'two'])
-    Thread(target=self._doLockSet, args=('three', 1)).start()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    Thread(target=self._doLockSet, args=('three', 0)).start()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start()
-    Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start()
-    Thread(target=self._doLockSet, args=('one', 0)).start()
-    Thread(target=self._doLockSet, args=('one', 1)).start()
-    Thread(target=self._doLockSet, args=(['two', 'three'], 0)).start()
-    Thread(target=self._doLockSet, args=(['two', 'three'], 1)).start()
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self._addThread(target=self._doLockSet, args=('three', 1))
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), 'DONE')
+    self._addThread(target=self._doLockSet, args=('three', 0))
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), 'DONE')
+    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
+    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
+    self._addThread(target=self._doLockSet, args=('one', 0))
+    self._addThread(target=self._doLockSet, args=('one', 1))
+    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
+    self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.ls.release()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    self.assertEqual(self.done.get(True, 1), 'DONE')
+    self._waitThreads()
+    for _ in range(6):
+      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
 
+  @_Repeat
   def testConcurrentRemove(self):
     self.ls.add('four')
     self.ls.acquire(['one', 'two', 'four'])
-    Thread(target=self._doLockSet, args=(['one', 'four'], 0)).start()
-    Thread(target=self._doLockSet, args=(['one', 'four'], 1)).start()
-    Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start()
-    Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start()
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
+    self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
+    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
+    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.ls.remove('one')
     self.ls.release()
-    self.assertEqual(self.done.get(True, 1), 'ERR')
-    self.assertEqual(self.done.get(True, 1), 'ERR')
-    self.assertEqual(self.done.get(True, 1), 'ERR')
-    self.assertEqual(self.done.get(True, 1), 'ERR')
+    self._waitThreads()
+    for i in range(4):
+      self.failUnlessEqual(self.done.get_nowait(), 'ERR')
     self.ls.add(['five', 'six'], acquired=1)
-    Thread(target=self._doLockSet, args=(['three', 'six'], 1)).start()
-    Thread(target=self._doLockSet, args=(['three', 'six'], 0)).start()
-    Thread(target=self._doLockSet, args=(['four', 'six'], 1)).start()
-    Thread(target=self._doLockSet, args=(['four', 'six'], 0)).start()
+    self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
+    self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
+    self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
+    self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
     self.ls.remove('five')
     self.ls.release()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    self.assertEqual(self.done.get(True, 1), 'DONE')
+    self._waitThreads()
+    for i in range(4):
+      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
     self.ls.acquire(['three', 'four'])
-    Thread(target=self._doRemoveSet, args=(['four', 'six'], )).start()
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.ls.remove('four')
-    self.assertEqual(self.done.get(True, 1), ['six'])
-    Thread(target=self._doRemoveSet, args=(['two'])).start()
-    self.assertEqual(self.done.get(True, 1), ['two'])
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), ['six'])
+    self._addThread(target=self._doRemoveSet, args=(['two']))
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), ['two'])
     self.ls.release()
+    # reset lockset
+    self._setUpLS()
 
+  @_Repeat
   def testConcurrentSharedSetLock(self):
     # share the set-lock...
     self.ls.acquire(None, shared=1)
     # ...another thread can share it too
-    Thread(target=self._doLockSet, args=(None, 1)).start()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
+    self._addThread(target=self._doLockSet, args=(None, 1))
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), 'DONE')
     # ...or just share some elements
-    Thread(target=self._doLockSet, args=(['one', 'three'], 1)).start()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
+    self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), 'DONE')
     # ...but not add new ones or remove any
-    Thread(target=self._doAddSet, args=(['nine'])).start()
-    Thread(target=self._doRemoveSet, args=(['two'], )).start()
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    t = self._addThread(target=self._doAddSet, args=(['nine']))
+    self._addThread(target=self._doRemoveSet, args=(['two'], ))
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     # this just releases the set-lock
     self.ls.release([])
-    self.assertEqual(self.done.get(True, 1), 'DONE')
+    t.join(60)
+    self.assertEqual(self.done.get_nowait(), 'DONE')
     # release the lock on the actual elements so remove() can proceed too
     self.ls.release()
-    self.assertEqual(self.done.get(True, 1), ['two'])
+    self._waitThreads()
+    self.failUnlessEqual(self.done.get_nowait(), ['two'])
+    # reset lockset
+    self._setUpLS()
 
+  @_Repeat
   def testConcurrentExclusiveSetLock(self):
     # acquire the set-lock...
     self.ls.acquire(None, shared=0)
     # ...no one can do anything else
-    Thread(target=self._doLockSet, args=(None, 1)).start()
-    Thread(target=self._doLockSet, args=(None, 0)).start()
-    Thread(target=self._doLockSet, args=(['three'], 0)).start()
-    Thread(target=self._doLockSet, args=(['two'], 1)).start()
-    Thread(target=self._doAddSet, args=(['nine'])).start()
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self._addThread(target=self._doLockSet, args=(None, 1))
+    self._addThread(target=self._doLockSet, args=(None, 0))
+    self._addThread(target=self._doLockSet, args=(['three'], 0))
+    self._addThread(target=self._doLockSet, args=(['two'], 1))
+    self._addThread(target=self._doAddSet, args=(['nine']))
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.ls.release()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    self.assertEqual(self.done.get(True, 1), 'DONE')
+    self._waitThreads()
+    for _ in range(5):
+      self.assertEqual(self.done.get(True, 1), 'DONE')
+    # cleanup
+    self._setUpLS()
+
+  @_Repeat
+  def testConcurrentSetLockAdd(self):
+    self.ls.acquire('one')
+    # Another thread wants the whole SetLock
+    self._addThread(target=self._doLockSet, args=(None, 0))
+    self._addThread(target=self._doLockSet, args=(None, 1))
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
+    self.assertRaises(AssertionError, self.ls.add, 'four')
+    self.ls.release()
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), 'DONE')
+    self.assertEqual(self.done.get_nowait(), 'DONE')
+    self.ls.acquire(None)
+    self._addThread(target=self._doLockSet, args=(None, 0))
+    self._addThread(target=self._doLockSet, args=(None, 1))
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
+    self.ls.add('four')
+    self.ls.add('five', acquired=1)
+    self.ls.add('six', acquired=1, shared=1)
+    self.assertEquals(self.ls._list_owned(),
+      set(['one', 'two', 'three', 'five', 'six']))
+    self.assertEquals(self.ls._is_owned(), True)
+    self.assertEquals(self.ls._names(),
+      set(['one', 'two', 'three', 'four', 'five', 'six']))
+    self.ls.release()
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), 'DONE')
+    self.assertEqual(self.done.get_nowait(), 'DONE')
+    self._setUpLS()
 
+  @_Repeat
   def testEmptyLockSet(self):
     # get the set-lock
     self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
     # now empty it...
     self.ls.remove(['one', 'two', 'three'])
     # and adds/locks by another thread still wait
-    Thread(target=self._doAddSet, args=(['nine'])).start()
-    Thread(target=self._doLockSet, args=(None, 1)).start()
-    Thread(target=self._doLockSet, args=(None, 0)).start()
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self._addThread(target=self._doAddSet, args=(['nine']))
+    self._addThread(target=self._doLockSet, args=(None, 1))
+    self._addThread(target=self._doLockSet, args=(None, 0))
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.ls.release()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    self.assertEqual(self.done.get(True, 1), 'DONE')
+    self._waitThreads()
+    for _ in range(3):
+      self.assertEqual(self.done.get_nowait(), 'DONE')
     # empty it again...
     self.assertEqual(self.ls.remove(['nine']), ['nine'])
     # now share it...
     self.assertEqual(self.ls.acquire(None, shared=1), set())
     # other sharers can go, adds still wait
-    Thread(target=self._doLockSet, args=(None, 1)).start()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    Thread(target=self._doAddSet, args=(['nine'])).start()
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self._addThread(target=self._doLockSet, args=(None, 1))
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), 'DONE')
+    self._addThread(target=self._doAddSet, args=(['nine']))
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.ls.release()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), 'DONE')
+    self._setUpLS()
 
 
-class TestGanetiLockManager(unittest.TestCase):
+class TestGanetiLockManager(_ThreadedTestCase):
 
   def setUp(self):
+    _ThreadedTestCase.setUp(self)
     self.nodes=['n1', 'n2']
     self.instances=['i1', 'i2', 'i3']
     self.GL = locking.GanetiLockManager(nodes=self.nodes,
@@ -513,11 +709,12 @@ class TestGanetiLockManager(unittest.TestCase):
   def testLockingConstants(self):
     # The locking library internally cheats by assuming its constants have some
     # relationships with each other. Check those hold true.
+    # This relationship is also used in the Processor to recursively acquire
+    # the right locks. Again, please don't break it.
     for i in range(len(locking.LEVELS)):
       self.assertEqual(i, locking.LEVELS[i])
 
   def testDoubleGLFails(self):
-    # We are not passing test=True, so instantiating a new one should fail
     self.assertRaises(AssertionError, locking.GanetiLockManager)
 
   def testLockNames(self):
@@ -525,7 +722,6 @@ class TestGanetiLockManager(unittest.TestCase):
     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
                      set(self.instances))
-    self.assertEqual(self.GL._names(locking.LEVEL_CONFIG), set(['config']))
 
   def testInitAndResources(self):
     locking.GanetiLockManager._instance = None
@@ -533,14 +729,12 @@ class TestGanetiLockManager(unittest.TestCase):
     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
-    self.assertEqual(self.GL._names(locking.LEVEL_CONFIG), set(['config']))
 
     locking.GanetiLockManager._instance = None
     self.GL = locking.GanetiLockManager(nodes=self.nodes)
     self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
-    self.assertEqual(self.GL._names(locking.LEVEL_CONFIG), set(['config']))
 
     locking.GanetiLockManager._instance = None
     self.GL = locking.GanetiLockManager(instances=self.instances)
@@ -548,27 +742,52 @@ class TestGanetiLockManager(unittest.TestCase):
     self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
     self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
                      set(self.instances))
-    self.assertEqual(self.GL._names(locking.LEVEL_CONFIG), set(['config']))
 
   def testAcquireRelease(self):
     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
     self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
+    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
     self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
-    self.GL.release(locking.LEVEL_NODE)
-    self.GL.acquire(locking.LEVEL_NODE, ['n1'])
+    self.GL.release(locking.LEVEL_NODE, ['n2'])
     self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
-    self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
-    self.GL.acquire(locking.LEVEL_CONFIG, ['config'])
-    self.GL.release(locking.LEVEL_INSTANCE, ['i2'])
     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
     self.GL.release(locking.LEVEL_NODE)
+    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
+    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
     self.GL.release(locking.LEVEL_INSTANCE)
-    self.GL.release(locking.LEVEL_CONFIG)
     self.assertRaises(errors.LockError, self.GL.acquire,
                       locking.LEVEL_INSTANCE, ['i5'])
     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
     self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
 
+  def testAcquireWholeSets(self):
+    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
+    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
+                      set(self.instances))
+    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
+                      set(self.instances))
+    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
+                      set(self.nodes))
+    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
+                      set(self.nodes))
+    self.GL.release(locking.LEVEL_NODE)
+    self.GL.release(locking.LEVEL_INSTANCE)
+    self.GL.release(locking.LEVEL_CLUSTER)
+
+  def testAcquireWholeAndPartial(self):
+    self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
+    self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
+                      set(self.instances))
+    self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
+                      set(self.instances))
+    self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
+                      set(['n2']))
+    self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
+                      set(['n2']))
+    self.GL.release(locking.LEVEL_NODE)
+    self.GL.release(locking.LEVEL_INSTANCE)
+    self.GL.release(locking.LEVEL_CLUSTER)
+
   def testBGLDependency(self):
     self.assertRaises(AssertionError, self.GL.acquire,
                       locking.LEVEL_NODE, ['n1', 'n2'])
@@ -587,27 +806,14 @@ class TestGanetiLockManager(unittest.TestCase):
     self.assertRaises(AssertionError, self.GL.release,
                       locking.LEVEL_CLUSTER)
     self.GL.release(locking.LEVEL_INSTANCE)
-    self.GL.acquire(locking.LEVEL_CONFIG, ['config'])
-    self.assertRaises(AssertionError, self.GL.release,
-                      locking.LEVEL_CLUSTER)
 
   def testWrongOrder(self):
     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
-    self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
+    self.GL.acquire(locking.LEVEL_NODE, ['n2'])
     self.assertRaises(AssertionError, self.GL.acquire,
                       locking.LEVEL_NODE, ['n1'])
     self.assertRaises(AssertionError, self.GL.acquire,
                       locking.LEVEL_INSTANCE, ['i2'])
-    self.GL.acquire(locking.LEVEL_CONFIG, ['config'])
-    self.assertRaises(AssertionError, self.GL.acquire,
-                      locking.LEVEL_CONFIG, ['config'])
-    self.GL.release(locking.LEVEL_INSTANCE)
-    self.assertRaises(AssertionError, self.GL.acquire,
-                      locking.LEVEL_NODE, ['n1'])
-    self.assertRaises(AssertionError, self.GL.acquire,
-                      locking.LEVEL_INSTANCE, ['i2'])
-    self.assertRaises(AssertionError, self.GL.acquire,
-                      locking.LEVEL_CONFIG, ['config'])
 
   # Helper function to run as a thread that shared the BGL and then acquires
   # some locks at another level.
@@ -621,27 +827,36 @@ class TestGanetiLockManager(unittest.TestCase):
     except errors.LockError:
       self.done.put('ERR')
 
+  @_Repeat
   def testConcurrency(self):
     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
-    Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i1', 1)).start()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    self.GL.acquire(locking.LEVEL_NODE, ['n1'])
+    self._addThread(target=self._doLock,
+                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), 'DONE')
     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
-    self.GL.acquire(locking.LEVEL_CONFIG, ['config'])
-    Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i1', 1)).start()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i3', 1)).start()
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
-    self.GL.release(locking.LEVEL_CONFIG)
+    self._addThread(target=self._doLock,
+                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), 'DONE')
+    self._addThread(target=self._doLock,
+                    args=(locking.LEVEL_INSTANCE, 'i3', 1))
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.GL.release(locking.LEVEL_INSTANCE)
-    self.assertEqual(self.done.get(True, 1), 'DONE')
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), 'DONE')
     self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
-    Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i2', 1)).start()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i2', 0)).start()
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self._addThread(target=self._doLock,
+                    args=(locking.LEVEL_INSTANCE, 'i2', 1))
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), 'DONE')
+    self._addThread(target=self._doLock,
+                    args=(locking.LEVEL_INSTANCE, 'i2', 0))
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.GL.release(locking.LEVEL_INSTANCE)
+    self._waitThreads()
     self.assertEqual(self.done.get(True, 1), 'DONE')
+    self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
 
 
 if __name__ == '__main__':