Replace all xrange() with range()
[ganeti-local] / test / ganeti.locking_unittest.py
index 94843fe..9483625 100755 (executable)
@@ -26,24 +26,226 @@ import os
 import unittest
 import time
 import Queue
+import threading
 
 from ganeti import locking
 from ganeti import errors
-from threading import Thread
 
 
 # This is used to test the ssynchronize decorator.
 # Since it's passed as input to a decorator it must be declared as a global.
 _decoratorlock = locking.SharedLock()
 
+#: List for looping tests
+ITERATIONS = range(8)
 
-class TestSharedLock(unittest.TestCase):
+
+def _Repeat(fn):
+  """Decorator for executing a function many times"""
+  def wrapper(*args, **kwargs):
+    for i in ITERATIONS:
+      fn(*args, **kwargs)
+  return wrapper
+
+
+class _ThreadedTestCase(unittest.TestCase):
+  """Test class that supports adding/waiting on threads"""
+  def setUp(self):
+    unittest.TestCase.setUp(self)
+    self.done = Queue.Queue(0)
+    self.threads = []
+
+  def _addThread(self, *args, **kwargs):
+    """Create and remember a new thread"""
+    t = threading.Thread(*args, **kwargs)
+    self.threads.append(t)
+    t.start()
+    return t
+
+  def _waitThreads(self):
+    """Wait for all our threads to finish"""
+    for t in self.threads:
+      t.join(60)
+      self.failIf(t.isAlive())
+    self.threads = []
+
+
+class _ConditionTestCase(_ThreadedTestCase):
+  """Common test case for conditions"""
+
+  def setUp(self, cls):
+    _ThreadedTestCase.setUp(self)
+    self.lock = threading.Lock()
+    self.cond = cls(self.lock)
+
+  def _testAcquireRelease(self):
+    self.assert_(not self.cond._is_owned())
+    self.assertRaises(RuntimeError, self.cond.wait)
+    self.assertRaises(RuntimeError, self.cond.notifyAll)
+
+    self.cond.acquire()
+    self.assert_(self.cond._is_owned())
+    self.cond.notifyAll()
+    self.assert_(self.cond._is_owned())
+    self.cond.release()
+
+    self.assert_(not self.cond._is_owned())
+    self.assertRaises(RuntimeError, self.cond.wait)
+    self.assertRaises(RuntimeError, self.cond.notifyAll)
+
+  def _testNotification(self):
+    def _NotifyAll():
+      self.done.put("NE")
+      self.cond.acquire()
+      self.done.put("NA")
+      self.cond.notifyAll()
+      self.done.put("NN")
+      self.cond.release()
+
+    self.cond.acquire()
+    self._addThread(target=_NotifyAll)
+    self.assertEqual(self.done.get(True, 1), "NE")
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
+    self.cond.wait()
+    self.assertEqual(self.done.get(True, 1), "NA")
+    self.assertEqual(self.done.get(True, 1), "NN")
+    self.assert_(self.cond._is_owned())
+    self.cond.release()
+    self.assert_(not self.cond._is_owned())
+
+
+class TestSingleNotifyPipeCondition(_ConditionTestCase):
+  """SingleNotifyPipeCondition tests"""
+
+  def setUp(self):
+    _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
+
+  def testAcquireRelease(self):
+    self._testAcquireRelease()
+
+  def testNotification(self):
+    self._testNotification()
+
+  def testWaitReuse(self):
+    self.cond.acquire()
+    self.cond.wait(0)
+    self.cond.wait(0.1)
+    self.cond.release()
+
+  def testNoNotifyReuse(self):
+    self.cond.acquire()
+    self.cond.notifyAll()
+    self.assertRaises(RuntimeError, self.cond.wait)
+    self.assertRaises(RuntimeError, self.cond.notifyAll)
+    self.cond.release()
+
+
+class TestPipeCondition(_ConditionTestCase):
+  """PipeCondition tests"""
+
+  def setUp(self):
+    _ConditionTestCase.setUp(self, locking.PipeCondition)
+
+  def testAcquireRelease(self):
+    self._testAcquireRelease()
+
+  def testNotification(self):
+    self._testNotification()
+
+  def _TestWait(self, fn):
+    self._addThread(target=fn)
+    self._addThread(target=fn)
+    self._addThread(target=fn)
+
+    # Wait for threads to be waiting
+    self.assertEqual(self.done.get(True, 1), "A")
+    self.assertEqual(self.done.get(True, 1), "A")
+    self.assertEqual(self.done.get(True, 1), "A")
+
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+    self.cond.acquire()
+    self.assertEqual(self.cond._nwaiters, 3)
+    # This new thread can"t acquire the lock, and thus call wait, before we
+    # release it
+    self._addThread(target=fn)
+    self.cond.notifyAll()
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
+    self.cond.release()
+
+    # We should now get 3 W and 1 A (for the new thread) in whatever order
+    w = 0
+    a = 0
+    for i in range(4):
+      got = self.done.get(True, 1)
+      if got == "W":
+        w += 1
+      elif got == "A":
+        a += 1
+      else:
+        self.fail("Got %s on the done queue" % got)
+
+    self.assertEqual(w, 3)
+    self.assertEqual(a, 1)
+
+    self.cond.acquire()
+    self.cond.notifyAll()
+    self.cond.release()
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), "W")
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+  def testBlockingWait(self):
+    def _BlockingWait():
+      self.cond.acquire()
+      self.done.put("A")
+      self.cond.wait()
+      self.cond.release()
+      self.done.put("W")
+
+    self._TestWait(_BlockingWait)
+
+  def testLongTimeoutWait(self):
+    def _Helper():
+      self.cond.acquire()
+      self.done.put("A")
+      self.cond.wait(15.0)
+      self.cond.release()
+      self.done.put("W")
+
+    self._TestWait(_Helper)
+
+  def _TimeoutWait(self, timeout, check):
+    self.cond.acquire()
+    self.cond.wait(timeout)
+    self.cond.release()
+    self.done.put(check)
+
+  def testShortTimeoutWait(self):
+    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
+    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), "T1")
+    self.assertEqual(self.done.get_nowait(), "T1")
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+  def testZeroTimeoutWait(self):
+    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
+    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
+    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), "T0")
+    self.assertEqual(self.done.get_nowait(), "T0")
+    self.assertEqual(self.done.get_nowait(), "T0")
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+
+class TestSharedLock(_ThreadedTestCase):
   """SharedLock tests"""
 
   def setUp(self):
+    _ThreadedTestCase.setUp(self)
     self.sl = locking.SharedLock()
-    # helper threads use the 'done' queue to tell the master they finished.
-    self.done = Queue.Queue(0)
 
   def testSequenceAndOwnership(self):
     self.assert_(not self.sl._is_owned())
@@ -116,83 +318,92 @@ class TestSharedLock(unittest.TestCase):
 
   def testSharersCanCoexist(self):
     self.sl.acquire(shared=1)
-    Thread(target=self._doItSharer).start()
+    threading.Thread(target=self._doItSharer).start()
     self.assert_(self.done.get(True, 1))
     self.sl.release()
 
+  @_Repeat
   def testExclusiveBlocksExclusive(self):
     self.sl.acquire()
-    Thread(target=self._doItExclusive).start()
-    # give it a bit of time to check that it's not actually doing anything
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self._addThread(target=self._doItExclusive)
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.sl.release()
-    self.assert_(self.done.get(True, 1))
+    self._waitThreads()
+    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
 
+  @_Repeat
   def testExclusiveBlocksDelete(self):
     self.sl.acquire()
-    Thread(target=self._doItDelete).start()
-    # give it a bit of time to check that it's not actually doing anything
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self._addThread(target=self._doItDelete)
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.sl.release()
-    self.assert_(self.done.get(True, 1))
+    self._waitThreads()
+    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
+    self.sl = locking.SharedLock()
 
+  @_Repeat
   def testExclusiveBlocksSharer(self):
     self.sl.acquire()
-    Thread(target=self._doItSharer).start()
-    time.sleep(0.05)
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self._addThread(target=self._doItSharer)
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.sl.release()
-    self.assert_(self.done.get(True, 1))
+    self._waitThreads()
+    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
 
+  @_Repeat
   def testSharerBlocksExclusive(self):
     self.sl.acquire(shared=1)
-    Thread(target=self._doItExclusive).start()
-    time.sleep(0.05)
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self._addThread(target=self._doItExclusive)
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.sl.release()
-    self.assert_(self.done.get(True, 1))
+    self._waitThreads()
+    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
 
+  @_Repeat
   def testSharerBlocksDelete(self):
     self.sl.acquire(shared=1)
-    Thread(target=self._doItDelete).start()
-    time.sleep(0.05)
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self._addThread(target=self._doItDelete)
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.sl.release()
-    self.assert_(self.done.get(True, 1))
+    self._waitThreads()
+    self.failUnlessEqual(self.done.get_nowait(), 'DEL')
+    self.sl = locking.SharedLock()
 
+  @_Repeat
   def testWaitingExclusiveBlocksSharer(self):
+    """SKIPPED testWaitingExclusiveBlockSharer"""
+    return
+
     self.sl.acquire(shared=1)
     # the lock is acquired in shared mode...
-    Thread(target=self._doItExclusive).start()
+    self._addThread(target=self._doItExclusive)
     # ...but now an exclusive is waiting...
-    time.sleep(0.05)
-    Thread(target=self._doItSharer).start()
+    self._addThread(target=self._doItSharer)
     # ...so the sharer should be blocked as well
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.sl.release()
+    self._waitThreads()
     # The exclusive passed before
-    self.assertEqual(self.done.get(True, 1), 'EXC')
-    self.assertEqual(self.done.get(True, 1), 'SHR')
+    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
+    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
 
+  @_Repeat
   def testWaitingSharerBlocksExclusive(self):
+    """SKIPPED testWaitingSharerBlocksExclusive"""
+    return
+
     self.sl.acquire()
     # the lock is acquired in exclusive mode...
-    Thread(target=self._doItSharer).start()
+    self._addThread(target=self._doItSharer)
     # ...but now a sharer is waiting...
-    time.sleep(0.05)
-    Thread(target=self._doItExclusive).start()
+    self._addThread(target=self._doItExclusive)
     # ...the exclusive is waiting too...
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.sl.release()
+    self._waitThreads()
     # The sharer passed before
-    self.assertEqual(self.done.get(True, 1), 'SHR')
-    self.assertEqual(self.done.get(True, 1), 'EXC')
-
-  def testNoNonBlocking(self):
-    self.assertRaises(NotImplementedError, self.sl.acquire, blocking=0)
-    self.assertRaises(NotImplementedError, self.sl.delete, blocking=0)
-    self.sl.acquire()
-    self.sl.delete(blocking=0) # Fine, because the lock is already acquired
+    self.assertEqual(self.done.get_nowait(), 'SHR')
+    self.assertEqual(self.done.get_nowait(), 'EXC')
 
   def testDelete(self):
     self.sl.delete()
@@ -200,47 +411,302 @@ class TestSharedLock(unittest.TestCase):
     self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
     self.assertRaises(errors.LockError, self.sl.delete)
 
+  def testDeleteTimeout(self):
+    self.sl.delete(timeout=60)
+
   def testNoDeleteIfSharer(self):
     self.sl.acquire(shared=1)
     self.assertRaises(AssertionError, self.sl.delete)
 
+  @_Repeat
   def testDeletePendingSharersExclusiveDelete(self):
     self.sl.acquire()
-    Thread(target=self._doItSharer).start()
-    Thread(target=self._doItSharer).start()
-    time.sleep(0.05)
-    Thread(target=self._doItExclusive).start()
-    Thread(target=self._doItDelete).start()
-    time.sleep(0.05)
+    self._addThread(target=self._doItSharer)
+    self._addThread(target=self._doItSharer)
+    self._addThread(target=self._doItExclusive)
+    self._addThread(target=self._doItDelete)
     self.sl.delete()
-    # The two threads who were pending return both ERR
-    self.assertEqual(self.done.get(True, 1), 'ERR')
-    self.assertEqual(self.done.get(True, 1), 'ERR')
-    self.assertEqual(self.done.get(True, 1), 'ERR')
-    self.assertEqual(self.done.get(True, 1), 'ERR')
+    self._waitThreads()
+    # The threads who were pending return ERR
+    for _ in range(4):
+      self.assertEqual(self.done.get_nowait(), 'ERR')
+    self.sl = locking.SharedLock()
 
+  @_Repeat
   def testDeletePendingDeleteExclusiveSharers(self):
     self.sl.acquire()
-    Thread(target=self._doItDelete).start()
-    Thread(target=self._doItExclusive).start()
-    time.sleep(0.05)
-    Thread(target=self._doItSharer).start()
-    Thread(target=self._doItSharer).start()
-    time.sleep(0.05)
+    self._addThread(target=self._doItDelete)
+    self._addThread(target=self._doItExclusive)
+    self._addThread(target=self._doItSharer)
+    self._addThread(target=self._doItSharer)
     self.sl.delete()
+    self._waitThreads()
     # The two threads who were pending return both ERR
-    self.assertEqual(self.done.get(True, 1), 'ERR')
-    self.assertEqual(self.done.get(True, 1), 'ERR')
-    self.assertEqual(self.done.get(True, 1), 'ERR')
-    self.assertEqual(self.done.get(True, 1), 'ERR')
+    self.assertEqual(self.done.get_nowait(), 'ERR')
+    self.assertEqual(self.done.get_nowait(), 'ERR')
+    self.assertEqual(self.done.get_nowait(), 'ERR')
+    self.assertEqual(self.done.get_nowait(), 'ERR')
+    self.sl = locking.SharedLock()
+
+  @_Repeat
+  def testExclusiveAcquireTimeout(self):
+    for shared in [0, 1]:
+      on_queue = threading.Event()
+      release_exclusive = threading.Event()
+
+      def _LockExclusive():
+        self.sl.acquire(shared=0, test_notify=on_queue.set)
+        self.done.put("A: start wait")
+        release_exclusive.wait()
+        self.done.put("A: end wait")
+        self.sl.release()
+
+      # Start thread to hold lock in exclusive mode
+      self._addThread(target=_LockExclusive)
+
+      # Wait for wait to begin
+      self.assertEqual(self.done.get(timeout=60), "A: start wait")
+
+      # Wait up to 60s to get lock, but release exclusive lock as soon as we're
+      # on the queue
+      self.failUnless(self.sl.acquire(shared=shared, timeout=60,
+                                      test_notify=release_exclusive.set))
+
+      self.done.put("got 2nd")
+      self.sl.release()
+
+      self._waitThreads()
+
+      self.assertEqual(self.done.get_nowait(), "A: end wait")
+      self.assertEqual(self.done.get_nowait(), "got 2nd")
+      self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+  @_Repeat
+  def testAcquireExpiringTimeout(self):
+    def _AcquireWithTimeout(shared, timeout):
+      if not self.sl.acquire(shared=shared, timeout=timeout):
+        self.done.put("timeout")
+
+    for shared in [0, 1]:
+      # Lock exclusively
+      self.sl.acquire()
+
+      # Start shared acquires with timeout between 0 and 20 ms
+      for i in range(11):
+        self._addThread(target=_AcquireWithTimeout,
+                        args=(shared, i * 2.0 / 1000.0))
+
+      # Wait for threads to finish (makes sure the acquire timeout expires
+      # before releasing the lock)
+      self._waitThreads()
+
+      # Release lock
+      self.sl.release()
+
+      for _ in range(11):
+        self.assertEqual(self.done.get_nowait(), "timeout")
+
+      self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+  @_Repeat
+  def testSharedSkipExclusiveAcquires(self):
+    # Tests whether shared acquires jump in front of exclusive acquires in the
+    # queue.
+
+    def _Acquire(shared, name, notify_ev, wait_ev):
+      if notify_ev:
+        notify_fn = notify_ev.set
+      else:
+        notify_fn = None
+
+      if wait_ev:
+        wait_ev.wait()
+
+      if not self.sl.acquire(shared=shared, test_notify=notify_fn):
+        return
+
+      self.done.put(name)
+      self.sl.release()
+
+    # Get exclusive lock while we fill the queue
+    self.sl.acquire()
+
+    shrcnt1 = 5
+    shrcnt2 = 7
+    shrcnt3 = 9
+    shrcnt4 = 2
+
+    # Add acquires using threading.Event for synchronization. They'll be
+    # acquired exactly in the order defined in this list.
+    acquires = (shrcnt1 * [(1, "shared 1")] +
+                3 * [(0, "exclusive 1")] +
+                shrcnt2 * [(1, "shared 2")] +
+                shrcnt3 * [(1, "shared 3")] +
+                shrcnt4 * [(1, "shared 4")] +
+                3 * [(0, "exclusive 2")])
+
+    ev_cur = None
+    ev_prev = None
+
+    for args in acquires:
+      ev_cur = threading.Event()
+      self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
+      ev_prev = ev_cur
+
+    # Wait for last acquire to start
+    ev_prev.wait()
+
+    # Expect 6 pending exclusive acquires and 1 for all shared acquires
+    # together
+    self.assertEqual(self.sl._count_pending(), 7)
+
+    # Release exclusive lock and wait
+    self.sl.release()
+
+    self._waitThreads()
+
+    # Check sequence
+    for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
+      # Shared locks aren't guaranteed to be notified in order, but they'll be
+      # first
+      tmp = self.done.get_nowait()
+      if tmp == "shared 1":
+        shrcnt1 -= 1
+      elif tmp == "shared 2":
+        shrcnt2 -= 1
+      elif tmp == "shared 3":
+        shrcnt3 -= 1
+      elif tmp == "shared 4":
+        shrcnt4 -= 1
+    self.assertEqual(shrcnt1, 0)
+    self.assertEqual(shrcnt2, 0)
+    self.assertEqual(shrcnt3, 0)
+    self.assertEqual(shrcnt3, 0)
+
+    for _ in range(3):
+      self.assertEqual(self.done.get_nowait(), "exclusive 1")
+
+    for _ in range(3):
+      self.assertEqual(self.done.get_nowait(), "exclusive 2")
+
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+  @_Repeat
+  def testMixedAcquireTimeout(self):
+    sync = threading.Condition()
+
+    def _AcquireShared(ev):
+      if not self.sl.acquire(shared=1, timeout=None):
+        return
+
+      self.done.put("shared")
+
+      # Notify main thread
+      ev.set()
+
+      # Wait for notification
+      sync.acquire()
+      try:
+        sync.wait()
+      finally:
+        sync.release()
+
+      # Release lock
+      self.sl.release()
+
+    acquires = []
+    for _ in range(3):
+      ev = threading.Event()
+      self._addThread(target=_AcquireShared, args=(ev, ))
+      acquires.append(ev)
 
+    # Wait for all acquires to finish
+    for i in acquires:
+      i.wait()
 
-class TestSSynchronizedDecorator(unittest.TestCase):
+    self.assertEqual(self.sl._count_pending(), 0)
+
+    # Try to get exclusive lock
+    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
+
+    # Acquire exclusive without timeout
+    exclsync = threading.Condition()
+    exclev = threading.Event()
+
+    def _AcquireExclusive():
+      if not self.sl.acquire(shared=0):
+        return
+
+      self.done.put("exclusive")
+
+      # Notify main thread
+      exclev.set()
+
+      exclsync.acquire()
+      try:
+        exclsync.wait()
+      finally:
+        exclsync.release()
+
+      self.sl.release()
+
+    self._addThread(target=_AcquireExclusive)
+
+    # Try to get exclusive lock
+    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
+
+    # Make all shared holders release their locks
+    sync.acquire()
+    try:
+      sync.notifyAll()
+    finally:
+      sync.release()
+
+    # Wait for exclusive acquire to succeed
+    exclev.wait()
+
+    self.assertEqual(self.sl._count_pending(), 0)
+
+    # Try to get exclusive lock
+    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
+
+    def _AcquireSharedSimple():
+      if self.sl.acquire(shared=1, timeout=None):
+        self.done.put("shared2")
+        self.sl.release()
+
+    for _ in range(10):
+      self._addThread(target=_AcquireSharedSimple)
+
+    # Tell exclusive lock to release
+    exclsync.acquire()
+    try:
+      exclsync.notifyAll()
+    finally:
+      exclsync.release()
+
+    # Wait for everything to finish
+    self._waitThreads()
+
+    self.assertEqual(self.sl._count_pending(), 0)
+
+    # Check sequence
+    for _ in range(3):
+      self.assertEqual(self.done.get_nowait(), "shared")
+
+    self.assertEqual(self.done.get_nowait(), "exclusive")
+
+    for _ in range(10):
+      self.assertEqual(self.done.get_nowait(), "shared2")
+
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+
+class TestSSynchronizedDecorator(_ThreadedTestCase):
   """Shared Lock Synchronized decorator test"""
 
   def setUp(self):
-    # helper threads use the 'done' queue to tell the master they finished.
-    self.done = Queue.Queue(0)
+    _ThreadedTestCase.setUp(self)
 
   @locking.ssynchronized(_decoratorlock)
   def _doItExclusive(self):
@@ -260,43 +726,50 @@ class TestSSynchronizedDecorator(unittest.TestCase):
 
   def testSharersCanCoexist(self):
     _decoratorlock.acquire(shared=1)
-    Thread(target=self._doItSharer).start()
+    threading.Thread(target=self._doItSharer).start()
     self.assert_(self.done.get(True, 1))
     _decoratorlock.release()
 
+  @_Repeat
   def testExclusiveBlocksExclusive(self):
     _decoratorlock.acquire()
-    Thread(target=self._doItExclusive).start()
+    self._addThread(target=self._doItExclusive)
     # give it a bit of time to check that it's not actually doing anything
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     _decoratorlock.release()
-    self.assert_(self.done.get(True, 1))
+    self._waitThreads()
+    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
 
+  @_Repeat
   def testExclusiveBlocksSharer(self):
     _decoratorlock.acquire()
-    Thread(target=self._doItSharer).start()
-    time.sleep(0.05)
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self._addThread(target=self._doItSharer)
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     _decoratorlock.release()
-    self.assert_(self.done.get(True, 1))
+    self._waitThreads()
+    self.failUnlessEqual(self.done.get_nowait(), 'SHR')
 
+  @_Repeat
   def testSharerBlocksExclusive(self):
     _decoratorlock.acquire(shared=1)
-    Thread(target=self._doItExclusive).start()
-    time.sleep(0.05)
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self._addThread(target=self._doItExclusive)
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     _decoratorlock.release()
-    self.assert_(self.done.get(True, 1))
+    self._waitThreads()
+    self.failUnlessEqual(self.done.get_nowait(), 'EXC')
 
 
-class TestLockSet(unittest.TestCase):
+class TestLockSet(_ThreadedTestCase):
   """LockSet tests"""
 
   def setUp(self):
+    _ThreadedTestCase.setUp(self)
+    self._setUpLS()
+
+  def _setUpLS(self):
+    """Helper to (re)initialize the lock set"""
     self.resources = ['one', 'two', 'three']
     self.ls = locking.LockSet(members=self.resources)
-    # helper threads use the 'done' queue to tell the master they finished.
-    self.done = Queue.Queue(0)
 
   def testResources(self):
     self.assertEquals(self.ls._names(), set(self.resources))
@@ -379,11 +852,10 @@ class TestLockSet(unittest.TestCase):
     self.assert_('one' not in self.ls._names())
 
   def testRemoveNonBlocking(self):
-    self.assertRaises(NotImplementedError, self.ls.remove, 'one', blocking=0)
     self.ls.acquire('one')
-    self.assertEquals(self.ls.remove('one', blocking=0), ['one'])
+    self.assertEquals(self.ls.remove('one'), ['one'])
     self.ls.acquire(['two', 'three'])
-    self.assertEquals(self.ls.remove(['two', 'three'], blocking=0),
+    self.assertEquals(self.ls.remove(['two', 'three']),
                       ['two', 'three'])
 
   def testNoDoubleAdd(self):
@@ -432,146 +904,165 @@ class TestLockSet(unittest.TestCase):
     # We haven't really acquired anything, so we cannot release
     self.assertRaises(AssertionError, self.ls.release)
 
-  def _doLockSet(self, set, shared):
+  def _doLockSet(self, names, shared):
     try:
-      self.ls.acquire(set, shared=shared)
+      self.ls.acquire(names, shared=shared)
       self.done.put('DONE')
       self.ls.release()
     except errors.LockError:
       self.done.put('ERR')
 
-  def _doAddSet(self, set):
+  def _doAddSet(self, names):
     try:
-      self.ls.add(set, acquired=1)
+      self.ls.add(names, acquired=1)
       self.done.put('DONE')
       self.ls.release()
     except errors.LockError:
       self.done.put('ERR')
 
-  def _doRemoveSet(self, set):
-    self.done.put(self.ls.remove(set))
+  def _doRemoveSet(self, names):
+    self.done.put(self.ls.remove(names))
 
+  @_Repeat
   def testConcurrentSharedAcquire(self):
     self.ls.acquire(['one', 'two'], shared=1)
-    Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    Thread(target=self._doLockSet, args=(['one', 'two', 'three'], 1)).start()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    Thread(target=self._doLockSet, args=('three', 1)).start()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start()
-    Thread(target=self._doLockSet, args=(['two', 'three'], 0)).start()
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), 'DONE')
+    self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), 'DONE')
+    self._addThread(target=self._doLockSet, args=('three', 1))
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), 'DONE')
+    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
+    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.ls.release()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    self.assertEqual(self.done.get(True, 1), 'DONE')
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), 'DONE')
+    self.assertEqual(self.done.get_nowait(), 'DONE')
 
+  @_Repeat
   def testConcurrentExclusiveAcquire(self):
     self.ls.acquire(['one', 'two'])
-    Thread(target=self._doLockSet, args=('three', 1)).start()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    Thread(target=self._doLockSet, args=('three', 0)).start()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start()
-    Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start()
-    Thread(target=self._doLockSet, args=('one', 0)).start()
-    Thread(target=self._doLockSet, args=('one', 1)).start()
-    Thread(target=self._doLockSet, args=(['two', 'three'], 0)).start()
-    Thread(target=self._doLockSet, args=(['two', 'three'], 1)).start()
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self._addThread(target=self._doLockSet, args=('three', 1))
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), 'DONE')
+    self._addThread(target=self._doLockSet, args=('three', 0))
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), 'DONE')
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
+    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
+    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
+    self._addThread(target=self._doLockSet, args=('one', 0))
+    self._addThread(target=self._doLockSet, args=('one', 1))
+    self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
+    self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.ls.release()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    self.assertEqual(self.done.get(True, 1), 'DONE')
+    self._waitThreads()
+    for _ in range(6):
+      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
 
+  @_Repeat
   def testConcurrentRemove(self):
     self.ls.add('four')
     self.ls.acquire(['one', 'two', 'four'])
-    Thread(target=self._doLockSet, args=(['one', 'four'], 0)).start()
-    Thread(target=self._doLockSet, args=(['one', 'four'], 1)).start()
-    Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start()
-    Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start()
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
+    self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
+    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
+    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.ls.remove('one')
     self.ls.release()
-    self.assertEqual(self.done.get(True, 1), 'ERR')
-    self.assertEqual(self.done.get(True, 1), 'ERR')
-    self.assertEqual(self.done.get(True, 1), 'ERR')
-    self.assertEqual(self.done.get(True, 1), 'ERR')
+    self._waitThreads()
+    for i in range(4):
+      self.failUnlessEqual(self.done.get_nowait(), 'ERR')
     self.ls.add(['five', 'six'], acquired=1)
-    Thread(target=self._doLockSet, args=(['three', 'six'], 1)).start()
-    Thread(target=self._doLockSet, args=(['three', 'six'], 0)).start()
-    Thread(target=self._doLockSet, args=(['four', 'six'], 1)).start()
-    Thread(target=self._doLockSet, args=(['four', 'six'], 0)).start()
+    self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
+    self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
+    self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
+    self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
     self.ls.remove('five')
     self.ls.release()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    self.assertEqual(self.done.get(True, 1), 'DONE')
+    self._waitThreads()
+    for i in range(4):
+      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
     self.ls.acquire(['three', 'four'])
-    Thread(target=self._doRemoveSet, args=(['four', 'six'], )).start()
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.ls.remove('four')
-    self.assertEqual(self.done.get(True, 1), ['six'])
-    Thread(target=self._doRemoveSet, args=(['two'])).start()
-    self.assertEqual(self.done.get(True, 1), ['two'])
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), ['six'])
+    self._addThread(target=self._doRemoveSet, args=(['two']))
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), ['two'])
     self.ls.release()
+    # reset lockset
+    self._setUpLS()
 
+  @_Repeat
   def testConcurrentSharedSetLock(self):
     # share the set-lock...
     self.ls.acquire(None, shared=1)
     # ...another thread can share it too
-    Thread(target=self._doLockSet, args=(None, 1)).start()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
+    self._addThread(target=self._doLockSet, args=(None, 1))
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), 'DONE')
     # ...or just share some elements
-    Thread(target=self._doLockSet, args=(['one', 'three'], 1)).start()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
+    self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), 'DONE')
     # ...but not add new ones or remove any
-    Thread(target=self._doAddSet, args=(['nine'])).start()
-    Thread(target=self._doRemoveSet, args=(['two'], )).start()
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    t = self._addThread(target=self._doAddSet, args=(['nine']))
+    self._addThread(target=self._doRemoveSet, args=(['two'], ))
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     # this just releases the set-lock
     self.ls.release([])
-    self.assertEqual(self.done.get(True, 1), 'DONE')
+    t.join(60)
+    self.assertEqual(self.done.get_nowait(), 'DONE')
     # release the lock on the actual elements so remove() can proceed too
     self.ls.release()
-    self.assertEqual(self.done.get(True, 1), ['two'])
+    self._waitThreads()
+    self.failUnlessEqual(self.done.get_nowait(), ['two'])
+    # reset lockset
+    self._setUpLS()
 
+  @_Repeat
   def testConcurrentExclusiveSetLock(self):
     # acquire the set-lock...
     self.ls.acquire(None, shared=0)
     # ...no one can do anything else
-    Thread(target=self._doLockSet, args=(None, 1)).start()
-    Thread(target=self._doLockSet, args=(None, 0)).start()
-    Thread(target=self._doLockSet, args=(['three'], 0)).start()
-    Thread(target=self._doLockSet, args=(['two'], 1)).start()
-    Thread(target=self._doAddSet, args=(['nine'])).start()
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self._addThread(target=self._doLockSet, args=(None, 1))
+    self._addThread(target=self._doLockSet, args=(None, 0))
+    self._addThread(target=self._doLockSet, args=(['three'], 0))
+    self._addThread(target=self._doLockSet, args=(['two'], 1))
+    self._addThread(target=self._doAddSet, args=(['nine']))
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.ls.release()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    self.assertEqual(self.done.get(True, 1), 'DONE')
+    self._waitThreads()
+    for _ in range(5):
+      self.assertEqual(self.done.get(True, 1), 'DONE')
+    # cleanup
+    self._setUpLS()
 
+  @_Repeat
   def testConcurrentSetLockAdd(self):
     self.ls.acquire('one')
     # Another thread wants the whole SetLock
-    Thread(target=self._doLockSet, args=(None, 0)).start()
-    Thread(target=self._doLockSet, args=(None, 1)).start()
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self._addThread(target=self._doLockSet, args=(None, 0))
+    self._addThread(target=self._doLockSet, args=(None, 1))
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.assertRaises(AssertionError, self.ls.add, 'four')
     self.ls.release()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    self.assertEqual(self.done.get(True, 1), 'DONE')
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), 'DONE')
+    self.assertEqual(self.done.get_nowait(), 'DONE')
     self.ls.acquire(None)
-    Thread(target=self._doLockSet, args=(None, 0)).start()
-    Thread(target=self._doLockSet, args=(None, 1)).start()
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self._addThread(target=self._doLockSet, args=(None, 0))
+    self._addThread(target=self._doLockSet, args=(None, 1))
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.ls.add('four')
     self.ls.add('five', acquired=1)
     self.ls.add('six', acquired=1, shared=1)
@@ -581,44 +1072,50 @@ class TestLockSet(unittest.TestCase):
     self.assertEquals(self.ls._names(),
       set(['one', 'two', 'three', 'four', 'five', 'six']))
     self.ls.release()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    self.assertEqual(self.done.get(True, 1), 'DONE')
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), 'DONE')
+    self.assertEqual(self.done.get_nowait(), 'DONE')
+    self._setUpLS()
 
+  @_Repeat
   def testEmptyLockSet(self):
     # get the set-lock
     self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
     # now empty it...
     self.ls.remove(['one', 'two', 'three'])
     # and adds/locks by another thread still wait
-    Thread(target=self._doAddSet, args=(['nine'])).start()
-    Thread(target=self._doLockSet, args=(None, 1)).start()
-    Thread(target=self._doLockSet, args=(None, 0)).start()
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self._addThread(target=self._doAddSet, args=(['nine']))
+    self._addThread(target=self._doLockSet, args=(None, 1))
+    self._addThread(target=self._doLockSet, args=(None, 0))
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.ls.release()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    self.assertEqual(self.done.get(True, 1), 'DONE')
+    self._waitThreads()
+    for _ in range(3):
+      self.assertEqual(self.done.get_nowait(), 'DONE')
     # empty it again...
     self.assertEqual(self.ls.remove(['nine']), ['nine'])
     # now share it...
     self.assertEqual(self.ls.acquire(None, shared=1), set())
     # other sharers can go, adds still wait
-    Thread(target=self._doLockSet, args=(None, 1)).start()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    Thread(target=self._doAddSet, args=(['nine'])).start()
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self._addThread(target=self._doLockSet, args=(None, 1))
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), 'DONE')
+    self._addThread(target=self._doAddSet, args=(['nine']))
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.ls.release()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), 'DONE')
+    self._setUpLS()
 
 
-class TestGanetiLockManager(unittest.TestCase):
+class TestGanetiLockManager(_ThreadedTestCase):
 
   def setUp(self):
+    _ThreadedTestCase.setUp(self)
     self.nodes=['n1', 'n2']
     self.instances=['i1', 'i2', 'i3']
     self.GL = locking.GanetiLockManager(nodes=self.nodes,
                                         instances=self.instances)
-    self.done = Queue.Queue(0)
 
   def tearDown(self):
     # Don't try this at home...
@@ -745,24 +1242,36 @@ class TestGanetiLockManager(unittest.TestCase):
     except errors.LockError:
       self.done.put('ERR')
 
+  @_Repeat
   def testConcurrency(self):
     self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
-    Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i1', 1)).start()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
+    self._addThread(target=self._doLock,
+                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), 'DONE')
     self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
-    Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i1', 1)).start()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i3', 1)).start()
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self._addThread(target=self._doLock,
+                    args=(locking.LEVEL_INSTANCE, 'i1', 1))
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), 'DONE')
+    self._addThread(target=self._doLock,
+                    args=(locking.LEVEL_INSTANCE, 'i3', 1))
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.GL.release(locking.LEVEL_INSTANCE)
-    self.assertEqual(self.done.get(True, 1), 'DONE')
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), 'DONE')
     self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
-    Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i2', 1)).start()
-    self.assertEqual(self.done.get(True, 1), 'DONE')
-    Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i2', 0)).start()
-    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
+    self._addThread(target=self._doLock,
+                    args=(locking.LEVEL_INSTANCE, 'i2', 1))
+    self._waitThreads()
+    self.assertEqual(self.done.get_nowait(), 'DONE')
+    self._addThread(target=self._doLock,
+                    args=(locking.LEVEL_INSTANCE, 'i2', 0))
+    self.assertRaises(Queue.Empty, self.done.get_nowait)
     self.GL.release(locking.LEVEL_INSTANCE)
+    self._waitThreads()
     self.assertEqual(self.done.get(True, 1), 'DONE')
+    self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
 
 
 if __name__ == '__main__':