+
+def _Repeat(fn):
+ """Decorator for executing a function many times"""
+ def wrapper(*args, **kwargs):
+ for i in ITERATIONS:
+ fn(*args, **kwargs)
+ return wrapper
+
+
+class _ThreadedTestCase(unittest.TestCase):
+ """Test class that supports adding/waiting on threads"""
+ def setUp(self):
+ unittest.TestCase.setUp(self)
+ self.done = Queue.Queue(0)
+ self.threads = []
+
+ def _addThread(self, *args, **kwargs):
+ """Create and remember a new thread"""
+ t = threading.Thread(*args, **kwargs)
+ self.threads.append(t)
+ t.start()
+ return t
+
+ def _waitThreads(self):
+ """Wait for all our threads to finish"""
+ for t in self.threads:
+ t.join(60)
+ self.failIf(t.isAlive())
+ self.threads = []
+
+
+class _ConditionTestCase(_ThreadedTestCase):
+ """Common test case for conditions"""
+
+ def setUp(self, cls):
+ _ThreadedTestCase.setUp(self)
+ self.lock = threading.Lock()
+ self.cond = cls(self.lock)
+
+ def _testAcquireRelease(self):
+ self.assert_(not self.cond._is_owned())
+ self.assertRaises(RuntimeError, self.cond.wait)
+ self.assertRaises(RuntimeError, self.cond.notifyAll)
+
+ self.cond.acquire()
+ self.assert_(self.cond._is_owned())
+ self.cond.notifyAll()
+ self.assert_(self.cond._is_owned())
+ self.cond.release()
+
+ self.assert_(not self.cond._is_owned())
+ self.assertRaises(RuntimeError, self.cond.wait)
+ self.assertRaises(RuntimeError, self.cond.notifyAll)
+
+ def _testNotification(self):
+ def _NotifyAll():
+ self.done.put("NE")
+ self.cond.acquire()
+ self.done.put("NA")
+ self.cond.notifyAll()
+ self.done.put("NN")
+ self.cond.release()
+
+ self.cond.acquire()
+ self._addThread(target=_NotifyAll)
+ self.assertEqual(self.done.get(True, 1), "NE")
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+ self.cond.wait()
+ self.assertEqual(self.done.get(True, 1), "NA")
+ self.assertEqual(self.done.get(True, 1), "NN")
+ self.assert_(self.cond._is_owned())
+ self.cond.release()
+ self.assert_(not self.cond._is_owned())
+
+
+class TestSingleNotifyPipeCondition(_ConditionTestCase):
+ """SingleNotifyPipeCondition tests"""
+
+ def setUp(self):
+ _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
+
+ def testAcquireRelease(self):
+ self._testAcquireRelease()
+
+ def testNotification(self):
+ self._testNotification()
+
+ def testWaitReuse(self):
+ self.cond.acquire()
+ self.cond.wait(0)
+ self.cond.wait(0.1)
+ self.cond.release()
+
+ def testNoNotifyReuse(self):
+ self.cond.acquire()
+ self.cond.notifyAll()
+ self.assertRaises(RuntimeError, self.cond.wait)
+ self.assertRaises(RuntimeError, self.cond.notifyAll)
+ self.cond.release()
+
+
+class TestPipeCondition(_ConditionTestCase):
+ """PipeCondition tests"""
+
+ def setUp(self):
+ _ConditionTestCase.setUp(self, locking.PipeCondition)
+
+ def testAcquireRelease(self):
+ self._testAcquireRelease()
+
+ def testNotification(self):
+ self._testNotification()
+
+ def _TestWait(self, fn):
+ self._addThread(target=fn)
+ self._addThread(target=fn)
+ self._addThread(target=fn)
+
+ # Wait for threads to be waiting
+ self.assertEqual(self.done.get(True, 1), "A")
+ self.assertEqual(self.done.get(True, 1), "A")
+ self.assertEqual(self.done.get(True, 1), "A")
+
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+ self.cond.acquire()
+ self.assertEqual(self.cond._nwaiters, 3)
+ # This new thread can"t acquire the lock, and thus call wait, before we
+ # release it
+ self._addThread(target=fn)
+ self.cond.notifyAll()
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+ self.cond.release()
+
+ # We should now get 3 W and 1 A (for the new thread) in whatever order
+ w = 0
+ a = 0
+ for i in range(4):
+ got = self.done.get(True, 1)
+ if got == "W":
+ w += 1
+ elif got == "A":
+ a += 1
+ else:
+ self.fail("Got %s on the done queue" % got)
+
+ self.assertEqual(w, 3)
+ self.assertEqual(a, 1)
+
+ self.cond.acquire()
+ self.cond.notifyAll()
+ self.cond.release()
+ self._waitThreads()
+ self.assertEqual(self.done.get_nowait(), "W")
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+ def testBlockingWait(self):
+ def _BlockingWait():
+ self.cond.acquire()
+ self.done.put("A")
+ self.cond.wait()
+ self.cond.release()
+ self.done.put("W")
+
+ self._TestWait(_BlockingWait)
+
+ def testLongTimeoutWait(self):
+ def _Helper():
+ self.cond.acquire()
+ self.done.put("A")
+ self.cond.wait(15.0)
+ self.cond.release()
+ self.done.put("W")
+
+ self._TestWait(_Helper)
+
+ def _TimeoutWait(self, timeout, check):
+ self.cond.acquire()
+ self.cond.wait(timeout)
+ self.cond.release()
+ self.done.put(check)
+
+ def testShortTimeoutWait(self):
+ self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
+ self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
+ self._waitThreads()
+ self.assertEqual(self.done.get_nowait(), "T1")
+ self.assertEqual(self.done.get_nowait(), "T1")
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+ def testZeroTimeoutWait(self):
+ self._addThread(target=self._TimeoutWait, args=(0, "T0"))
+ self._addThread(target=self._TimeoutWait, args=(0, "T0"))
+ self._addThread(target=self._TimeoutWait, args=(0, "T0"))
+ self._waitThreads()
+ self.assertEqual(self.done.get_nowait(), "T0")
+ self.assertEqual(self.done.get_nowait(), "T0")
+ self.assertEqual(self.done.get_nowait(), "T0")
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+
+
+class TestSharedLock(_ThreadedTestCase):