X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/7ee7c0c7e4cba67496a261bc0890bbe8d02cf7a0..cd67e3376f62b8ab278d4d7905ad65208070130b:/test/ganeti.locking_unittest.py diff --git a/test/ganeti.locking_unittest.py b/test/ganeti.locking_unittest.py index 331c31d..2706a72 100755 --- a/test/ganeti.locking_unittest.py +++ b/test/ganeti.locking_unittest.py @@ -1,7 +1,7 @@ #!/usr/bin/python # -# Copyright (C) 2006, 2007 Google Inc. +# Copyright (C) 2006, 2007, 2010 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -16,7 +16,7 @@ # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA -# 0.0510-1301, USA. +# 02110-1301, USA. """Script for unittesting the locking module""" @@ -26,40 +26,268 @@ import os import unittest import time import Queue +import threading +import random +import gc +import itertools +from ganeti import constants from ganeti import locking from ganeti import errors -from threading import Thread +from ganeti import utils +from ganeti import compat +from ganeti import objects +from ganeti import query +import testutils -class TestSharedLock(unittest.TestCase): - """SharedLock tests""" +# This is used to test the ssynchronize decorator. +# Since it's passed as input to a decorator it must be declared as a global. +_decoratorlock = locking.SharedLock("decorator lock") + +#: List for looping tests +ITERATIONS = range(8) + + +def _Repeat(fn): + """Decorator for executing a function many times""" + def wrapper(*args, **kwargs): + for i in ITERATIONS: + fn(*args, **kwargs) + return wrapper + + +def SafeSleep(duration): + start = time.time() + while True: + delay = start + duration - time.time() + if delay <= 0.0: + break + time.sleep(delay) + + +class _ThreadedTestCase(unittest.TestCase): + """Test class that supports adding/waiting on threads""" def setUp(self): - self.sl = locking.SharedLock() - # helper threads use the 'done' queue to tell the master they finished. + unittest.TestCase.setUp(self) self.done = Queue.Queue(0) + self.threads = [] + + def _addThread(self, *args, **kwargs): + """Create and remember a new thread""" + t = threading.Thread(*args, **kwargs) + self.threads.append(t) + t.start() + return t + + def _waitThreads(self): + """Wait for all our threads to finish""" + for t in self.threads: + t.join(60) + self.failIf(t.isAlive()) + self.threads = [] + + +class _ConditionTestCase(_ThreadedTestCase): + """Common test case for conditions""" + + def setUp(self, cls): + _ThreadedTestCase.setUp(self) + self.lock = threading.Lock() + self.cond = cls(self.lock) + + def _testAcquireRelease(self): + self.assertFalse(self.cond._is_owned()) + self.assertRaises(RuntimeError, self.cond.wait, None) + self.assertRaises(RuntimeError, self.cond.notifyAll) + + self.cond.acquire() + self.assert_(self.cond._is_owned()) + self.cond.notifyAll() + self.assert_(self.cond._is_owned()) + self.cond.release() + + self.assertFalse(self.cond._is_owned()) + self.assertRaises(RuntimeError, self.cond.wait, None) + self.assertRaises(RuntimeError, self.cond.notifyAll) + + def _testNotification(self): + def _NotifyAll(): + self.done.put("NE") + self.cond.acquire() + self.done.put("NA") + self.cond.notifyAll() + self.done.put("NN") + self.cond.release() + + self.cond.acquire() + self._addThread(target=_NotifyAll) + self.assertEqual(self.done.get(True, 1), "NE") + self.assertRaises(Queue.Empty, self.done.get_nowait) + self.cond.wait(None) + self.assertEqual(self.done.get(True, 1), "NA") + self.assertEqual(self.done.get(True, 1), "NN") + self.assert_(self.cond._is_owned()) + self.cond.release() + self.assertFalse(self.cond._is_owned()) + + +class TestSingleNotifyPipeCondition(_ConditionTestCase): + """SingleNotifyPipeCondition tests""" + + def setUp(self): + _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition) + + def testAcquireRelease(self): + self._testAcquireRelease() + + def testNotification(self): + self._testNotification() + + def testWaitReuse(self): + self.cond.acquire() + self.cond.wait(0) + self.cond.wait(0.1) + self.cond.release() + + def testNoNotifyReuse(self): + self.cond.acquire() + self.cond.notifyAll() + self.assertRaises(RuntimeError, self.cond.wait, None) + self.assertRaises(RuntimeError, self.cond.notifyAll) + self.cond.release() + + +class TestPipeCondition(_ConditionTestCase): + """PipeCondition tests""" + + def setUp(self): + _ConditionTestCase.setUp(self, locking.PipeCondition) + + def testAcquireRelease(self): + self._testAcquireRelease() + + def testNotification(self): + self._testNotification() + + def _TestWait(self, fn): + threads = [ + self._addThread(target=fn), + self._addThread(target=fn), + self._addThread(target=fn), + ] + + # Wait for threads to be waiting + for _ in threads: + self.assertEqual(self.done.get(True, 1), "A") + + self.assertRaises(Queue.Empty, self.done.get_nowait) + + self.cond.acquire() + self.assertEqual(len(self.cond._waiters), 3) + self.assertEqual(self.cond._waiters, set(threads)) + # This new thread can't acquire the lock, and thus call wait, before we + # release it + self._addThread(target=fn) + self.cond.notifyAll() + self.assertRaises(Queue.Empty, self.done.get_nowait) + self.cond.release() + + # We should now get 3 W and 1 A (for the new thread) in whatever order + w = 0 + a = 0 + for i in range(4): + got = self.done.get(True, 1) + if got == "W": + w += 1 + elif got == "A": + a += 1 + else: + self.fail("Got %s on the done queue" % got) + + self.assertEqual(w, 3) + self.assertEqual(a, 1) + + self.cond.acquire() + self.cond.notifyAll() + self.cond.release() + self._waitThreads() + self.assertEqual(self.done.get_nowait(), "W") + self.assertRaises(Queue.Empty, self.done.get_nowait) + + def testBlockingWait(self): + def _BlockingWait(): + self.cond.acquire() + self.done.put("A") + self.cond.wait(None) + self.cond.release() + self.done.put("W") + + self._TestWait(_BlockingWait) + + def testLongTimeoutWait(self): + def _Helper(): + self.cond.acquire() + self.done.put("A") + self.cond.wait(15.0) + self.cond.release() + self.done.put("W") + + self._TestWait(_Helper) + + def _TimeoutWait(self, timeout, check): + self.cond.acquire() + self.cond.wait(timeout) + self.cond.release() + self.done.put(check) + + def testShortTimeoutWait(self): + self._addThread(target=self._TimeoutWait, args=(0.1, "T1")) + self._addThread(target=self._TimeoutWait, args=(0.1, "T1")) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), "T1") + self.assertEqual(self.done.get_nowait(), "T1") + self.assertRaises(Queue.Empty, self.done.get_nowait) + + def testZeroTimeoutWait(self): + self._addThread(target=self._TimeoutWait, args=(0, "T0")) + self._addThread(target=self._TimeoutWait, args=(0, "T0")) + self._addThread(target=self._TimeoutWait, args=(0, "T0")) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), "T0") + self.assertEqual(self.done.get_nowait(), "T0") + self.assertEqual(self.done.get_nowait(), "T0") + self.assertRaises(Queue.Empty, self.done.get_nowait) + + +class TestSharedLock(_ThreadedTestCase): + """SharedLock tests""" + + def setUp(self): + _ThreadedTestCase.setUp(self) + self.sl = locking.SharedLock("TestSharedLock") def testSequenceAndOwnership(self): - self.assert_(not self.sl._is_owned()) + self.assertFalse(self.sl.is_owned()) self.sl.acquire(shared=1) - self.assert_(self.sl._is_owned()) - self.assert_(self.sl._is_owned(shared=1)) - self.assert_(not self.sl._is_owned(shared=0)) + self.assert_(self.sl.is_owned()) + self.assert_(self.sl.is_owned(shared=1)) + self.assertFalse(self.sl.is_owned(shared=0)) self.sl.release() - self.assert_(not self.sl._is_owned()) + self.assertFalse(self.sl.is_owned()) self.sl.acquire() - self.assert_(self.sl._is_owned()) - self.assert_(not self.sl._is_owned(shared=1)) - self.assert_(self.sl._is_owned(shared=0)) + self.assert_(self.sl.is_owned()) + self.assertFalse(self.sl.is_owned(shared=1)) + self.assert_(self.sl.is_owned(shared=0)) self.sl.release() - self.assert_(not self.sl._is_owned()) + self.assertFalse(self.sl.is_owned()) self.sl.acquire(shared=1) - self.assert_(self.sl._is_owned()) - self.assert_(self.sl._is_owned(shared=1)) - self.assert_(not self.sl._is_owned(shared=0)) + self.assert_(self.sl.is_owned()) + self.assert_(self.sl.is_owned(shared=1)) + self.assertFalse(self.sl.is_owned(shared=0)) self.sl.release() - self.assert_(not self.sl._is_owned()) + self.assertFalse(self.sl.is_owned()) def testBooleanValue(self): # semaphores are supposed to return a true value on a successful acquire @@ -111,83 +339,92 @@ class TestSharedLock(unittest.TestCase): def testSharersCanCoexist(self): self.sl.acquire(shared=1) - Thread(target=self._doItSharer).start() + threading.Thread(target=self._doItSharer).start() self.assert_(self.done.get(True, 1)) self.sl.release() + @_Repeat def testExclusiveBlocksExclusive(self): self.sl.acquire() - Thread(target=self._doItExclusive).start() - # give it a bit of time to check that it's not actually doing anything - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doItExclusive) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() - self.assert_(self.done.get(True, 1)) + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), 'EXC') + @_Repeat def testExclusiveBlocksDelete(self): self.sl.acquire() - Thread(target=self._doItDelete).start() - # give it a bit of time to check that it's not actually doing anything - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doItDelete) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() - self.assert_(self.done.get(True, 1)) + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), 'DEL') + self.sl = locking.SharedLock(self.sl.name) + @_Repeat def testExclusiveBlocksSharer(self): self.sl.acquire() - Thread(target=self._doItSharer).start() - time.sleep(0.05) - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doItSharer) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() - self.assert_(self.done.get(True, 1)) + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), 'SHR') + @_Repeat def testSharerBlocksExclusive(self): self.sl.acquire(shared=1) - Thread(target=self._doItExclusive).start() - time.sleep(0.05) - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doItExclusive) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() - self.assert_(self.done.get(True, 1)) + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), 'EXC') + @_Repeat def testSharerBlocksDelete(self): self.sl.acquire(shared=1) - Thread(target=self._doItDelete).start() - time.sleep(0.05) - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doItDelete) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() - self.assert_(self.done.get(True, 1)) + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), 'DEL') + self.sl = locking.SharedLock(self.sl.name) + @_Repeat def testWaitingExclusiveBlocksSharer(self): + """SKIPPED testWaitingExclusiveBlockSharer""" + return + self.sl.acquire(shared=1) # the lock is acquired in shared mode... - Thread(target=self._doItExclusive).start() + self._addThread(target=self._doItExclusive) # ...but now an exclusive is waiting... - time.sleep(0.05) - Thread(target=self._doItSharer).start() + self._addThread(target=self._doItSharer) # ...so the sharer should be blocked as well - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() + self._waitThreads() # The exclusive passed before - self.assertEqual(self.done.get(True, 1), 'EXC') - self.assertEqual(self.done.get(True, 1), 'SHR') + self.failUnlessEqual(self.done.get_nowait(), 'EXC') + self.failUnlessEqual(self.done.get_nowait(), 'SHR') + @_Repeat def testWaitingSharerBlocksExclusive(self): + """SKIPPED testWaitingSharerBlocksExclusive""" + return + self.sl.acquire() # the lock is acquired in exclusive mode... - Thread(target=self._doItSharer).start() + self._addThread(target=self._doItSharer) # ...but now a sharer is waiting... - time.sleep(0.05) - Thread(target=self._doItExclusive).start() + self._addThread(target=self._doItExclusive) # ...the exclusive is waiting too... - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.sl.release() + self._waitThreads() # The sharer passed before - self.assertEqual(self.done.get(True, 1), 'SHR') - self.assertEqual(self.done.get(True, 1), 'EXC') - - def testNoNonBlocking(self): - self.assertRaises(NotImplementedError, self.sl.acquire, blocking=0) - self.assertRaises(NotImplementedError, self.sl.delete, blocking=0) - self.sl.acquire() - self.sl.delete(blocking=0) # Fine, because the lock is already acquired + self.assertEqual(self.done.get_nowait(), 'SHR') + self.assertEqual(self.done.get_nowait(), 'EXC') def testDelete(self): self.sl.delete() @@ -195,76 +432,766 @@ class TestSharedLock(unittest.TestCase): self.assertRaises(errors.LockError, self.sl.acquire, shared=1) self.assertRaises(errors.LockError, self.sl.delete) + def testDeleteTimeout(self): + self.assertTrue(self.sl.delete(timeout=60)) + + def testDeleteTimeoutFail(self): + ready = threading.Event() + finish = threading.Event() + + def fn(): + self.sl.acquire(shared=0) + ready.set() + + finish.wait() + self.sl.release() + + self._addThread(target=fn) + ready.wait() + + # Test if deleting a lock owned in exclusive mode by another thread fails + # to delete when a timeout is used + self.assertFalse(self.sl.delete(timeout=0.02)) + + finish.set() + self._waitThreads() + + self.assertTrue(self.sl.delete()) + self.assertRaises(errors.LockError, self.sl.acquire) + def testNoDeleteIfSharer(self): self.sl.acquire(shared=1) self.assertRaises(AssertionError, self.sl.delete) + @_Repeat def testDeletePendingSharersExclusiveDelete(self): self.sl.acquire() - Thread(target=self._doItSharer).start() - Thread(target=self._doItSharer).start() - time.sleep(0.05) - Thread(target=self._doItExclusive).start() - Thread(target=self._doItDelete).start() - time.sleep(0.05) + self._addThread(target=self._doItSharer) + self._addThread(target=self._doItSharer) + self._addThread(target=self._doItExclusive) + self._addThread(target=self._doItDelete) self.sl.delete() - # The two threads who were pending return both ERR - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') + self._waitThreads() + # The threads who were pending return ERR + for _ in range(4): + self.assertEqual(self.done.get_nowait(), 'ERR') + self.sl = locking.SharedLock(self.sl.name) + @_Repeat def testDeletePendingDeleteExclusiveSharers(self): self.sl.acquire() - Thread(target=self._doItDelete).start() - Thread(target=self._doItExclusive).start() - time.sleep(0.05) - Thread(target=self._doItSharer).start() - Thread(target=self._doItSharer).start() - time.sleep(0.05) + self._addThread(target=self._doItDelete) + self._addThread(target=self._doItExclusive) + self._addThread(target=self._doItSharer) + self._addThread(target=self._doItSharer) self.sl.delete() + self._waitThreads() # The two threads who were pending return both ERR - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') + self.assertEqual(self.done.get_nowait(), 'ERR') + self.assertEqual(self.done.get_nowait(), 'ERR') + self.assertEqual(self.done.get_nowait(), 'ERR') + self.assertEqual(self.done.get_nowait(), 'ERR') + self.sl = locking.SharedLock(self.sl.name) + + @_Repeat + def testExclusiveAcquireTimeout(self): + for shared in [0, 1]: + on_queue = threading.Event() + release_exclusive = threading.Event() + + def _LockExclusive(): + self.sl.acquire(shared=0, test_notify=on_queue.set) + self.done.put("A: start wait") + release_exclusive.wait() + self.done.put("A: end wait") + self.sl.release() + + # Start thread to hold lock in exclusive mode + self._addThread(target=_LockExclusive) + + # Wait for wait to begin + self.assertEqual(self.done.get(timeout=60), "A: start wait") + + # Wait up to 60s to get lock, but release exclusive lock as soon as we're + # on the queue + self.failUnless(self.sl.acquire(shared=shared, timeout=60, + test_notify=release_exclusive.set)) + + self.done.put("got 2nd") + self.sl.release() + + self._waitThreads() + + self.assertEqual(self.done.get_nowait(), "A: end wait") + self.assertEqual(self.done.get_nowait(), "got 2nd") + self.assertRaises(Queue.Empty, self.done.get_nowait) + + @_Repeat + def testAcquireExpiringTimeout(self): + def _AcquireWithTimeout(shared, timeout): + if not self.sl.acquire(shared=shared, timeout=timeout): + self.done.put("timeout") + + for shared in [0, 1]: + # Lock exclusively + self.sl.acquire() + + # Start shared acquires with timeout between 0 and 20 ms + for i in range(11): + self._addThread(target=_AcquireWithTimeout, + args=(shared, i * 2.0 / 1000.0)) + + # Wait for threads to finish (makes sure the acquire timeout expires + # before releasing the lock) + self._waitThreads() + + # Release lock + self.sl.release() + + for _ in range(11): + self.assertEqual(self.done.get_nowait(), "timeout") + + self.assertRaises(Queue.Empty, self.done.get_nowait) + + @_Repeat + def testSharedSkipExclusiveAcquires(self): + # Tests whether shared acquires jump in front of exclusive acquires in the + # queue. + + def _Acquire(shared, name, notify_ev, wait_ev): + if notify_ev: + notify_fn = notify_ev.set + else: + notify_fn = None + + if wait_ev: + wait_ev.wait() + + if not self.sl.acquire(shared=shared, test_notify=notify_fn): + return + + self.done.put(name) + self.sl.release() + + # Get exclusive lock while we fill the queue + self.sl.acquire() + + shrcnt1 = 5 + shrcnt2 = 7 + shrcnt3 = 9 + shrcnt4 = 2 + + # Add acquires using threading.Event for synchronization. They'll be + # acquired exactly in the order defined in this list. + acquires = (shrcnt1 * [(1, "shared 1")] + + 3 * [(0, "exclusive 1")] + + shrcnt2 * [(1, "shared 2")] + + shrcnt3 * [(1, "shared 3")] + + shrcnt4 * [(1, "shared 4")] + + 3 * [(0, "exclusive 2")]) + + ev_cur = None + ev_prev = None + + for args in acquires: + ev_cur = threading.Event() + self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev)) + ev_prev = ev_cur + + # Wait for last acquire to start + ev_prev.wait() + + # Expect 6 pending exclusive acquires and 1 for all shared acquires + # together + self.assertEqual(self.sl._count_pending(), 7) + + # Release exclusive lock and wait + self.sl.release() + + self._waitThreads() + + # Check sequence + for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4): + # Shared locks aren't guaranteed to be notified in order, but they'll be + # first + tmp = self.done.get_nowait() + if tmp == "shared 1": + shrcnt1 -= 1 + elif tmp == "shared 2": + shrcnt2 -= 1 + elif tmp == "shared 3": + shrcnt3 -= 1 + elif tmp == "shared 4": + shrcnt4 -= 1 + self.assertEqual(shrcnt1, 0) + self.assertEqual(shrcnt2, 0) + self.assertEqual(shrcnt3, 0) + self.assertEqual(shrcnt3, 0) + + for _ in range(3): + self.assertEqual(self.done.get_nowait(), "exclusive 1") + + for _ in range(3): + self.assertEqual(self.done.get_nowait(), "exclusive 2") + + self.assertRaises(Queue.Empty, self.done.get_nowait) + + def testIllegalDowngrade(self): + # Not yet acquired + self.assertRaises(AssertionError, self.sl.downgrade) + + # Acquire in shared mode, downgrade should be no-op + self.assertTrue(self.sl.acquire(shared=1)) + self.assertTrue(self.sl.is_owned(shared=1)) + self.assertTrue(self.sl.downgrade()) + self.assertTrue(self.sl.is_owned(shared=1)) + self.sl.release() + + def testDowngrade(self): + self.assertTrue(self.sl.acquire()) + self.assertTrue(self.sl.is_owned(shared=0)) + self.assertTrue(self.sl.downgrade()) + self.assertTrue(self.sl.is_owned(shared=1)) + self.sl.release() + + @_Repeat + def testDowngradeJumpsAheadOfExclusive(self): + def _KeepExclusive(ev_got, ev_downgrade, ev_release): + self.assertTrue(self.sl.acquire()) + self.assertTrue(self.sl.is_owned(shared=0)) + ev_got.set() + ev_downgrade.wait() + self.assertTrue(self.sl.is_owned(shared=0)) + self.assertTrue(self.sl.downgrade()) + self.assertTrue(self.sl.is_owned(shared=1)) + ev_release.wait() + self.assertTrue(self.sl.is_owned(shared=1)) + self.sl.release() + + def _KeepExclusive2(ev_started, ev_release): + self.assertTrue(self.sl.acquire(test_notify=ev_started.set)) + self.assertTrue(self.sl.is_owned(shared=0)) + ev_release.wait() + self.assertTrue(self.sl.is_owned(shared=0)) + self.sl.release() + + def _KeepShared(ev_started, ev_got, ev_release): + self.assertTrue(self.sl.acquire(shared=1, test_notify=ev_started.set)) + self.assertTrue(self.sl.is_owned(shared=1)) + ev_got.set() + ev_release.wait() + self.assertTrue(self.sl.is_owned(shared=1)) + self.sl.release() + + # Acquire lock in exclusive mode + ev_got_excl1 = threading.Event() + ev_downgrade_excl1 = threading.Event() + ev_release_excl1 = threading.Event() + th_excl1 = self._addThread(target=_KeepExclusive, + args=(ev_got_excl1, ev_downgrade_excl1, + ev_release_excl1)) + ev_got_excl1.wait() + + # Start a second exclusive acquire + ev_started_excl2 = threading.Event() + ev_release_excl2 = threading.Event() + th_excl2 = self._addThread(target=_KeepExclusive2, + args=(ev_started_excl2, ev_release_excl2)) + ev_started_excl2.wait() + + # Start shared acquires, will jump ahead of second exclusive acquire when + # first exclusive acquire downgrades + ev_shared = [(threading.Event(), threading.Event()) for _ in range(5)] + ev_release_shared = threading.Event() + + th_shared = [self._addThread(target=_KeepShared, + args=(ev_started, ev_got, ev_release_shared)) + for (ev_started, ev_got) in ev_shared] + + # Wait for all shared acquires to start + for (ev, _) in ev_shared: + ev.wait() + + # Check lock information + self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])), + [(self.sl.name, "exclusive", [th_excl1.getName()], None)]) + [(_, _, _, pending), ] = self.sl.GetLockInfo(set([query.LQ_PENDING])) + self.assertEqual([(pendmode, sorted(waiting)) + for (pendmode, waiting) in pending], + [("exclusive", [th_excl2.getName()]), + ("shared", sorted(th.getName() for th in th_shared))]) + + # Shared acquires won't start until the exclusive lock is downgraded + ev_downgrade_excl1.set() + + # Wait for all shared acquires to be successful + for (_, ev) in ev_shared: + ev.wait() + + # Check lock information again + self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, + query.LQ_PENDING])), + [(self.sl.name, "shared", None, + [("exclusive", [th_excl2.getName()])])]) + [(_, _, owner, _), ] = self.sl.GetLockInfo(set([query.LQ_OWNER])) + self.assertEqual(set(owner), set([th_excl1.getName()] + + [th.getName() for th in th_shared])) + + ev_release_excl1.set() + ev_release_excl2.set() + ev_release_shared.set() + + self._waitThreads() + + self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER, + query.LQ_PENDING])), + [(self.sl.name, None, None, [])]) + + @_Repeat + def testMixedAcquireTimeout(self): + sync = threading.Event() + + def _AcquireShared(ev): + if not self.sl.acquire(shared=1, timeout=None): + return + + self.done.put("shared") + + # Notify main thread + ev.set() + + # Wait for notification from main thread + sync.wait() + + # Release lock + self.sl.release() + + acquires = [] + for _ in range(3): + ev = threading.Event() + self._addThread(target=_AcquireShared, args=(ev, )) + acquires.append(ev) + + # Wait for all acquires to finish + for i in acquires: + i.wait() + + self.assertEqual(self.sl._count_pending(), 0) + + # Try to get exclusive lock + self.failIf(self.sl.acquire(shared=0, timeout=0.02)) + + # Acquire exclusive without timeout + exclsync = threading.Event() + exclev = threading.Event() + + def _AcquireExclusive(): + if not self.sl.acquire(shared=0): + return + + self.done.put("exclusive") + + # Notify main thread + exclev.set() + + # Wait for notification from main thread + exclsync.wait() + + self.sl.release() + + self._addThread(target=_AcquireExclusive) + + # Try to get exclusive lock + self.failIf(self.sl.acquire(shared=0, timeout=0.02)) + + # Make all shared holders release their locks + sync.set() + + # Wait for exclusive acquire to succeed + exclev.wait() + + self.assertEqual(self.sl._count_pending(), 0) + + # Try to get exclusive lock + self.failIf(self.sl.acquire(shared=0, timeout=0.02)) + + def _AcquireSharedSimple(): + if self.sl.acquire(shared=1, timeout=None): + self.done.put("shared2") + self.sl.release() + + for _ in range(10): + self._addThread(target=_AcquireSharedSimple) + + # Tell exclusive lock to release + exclsync.set() + + # Wait for everything to finish + self._waitThreads() + + self.assertEqual(self.sl._count_pending(), 0) + + # Check sequence + for _ in range(3): + self.assertEqual(self.done.get_nowait(), "shared") + + self.assertEqual(self.done.get_nowait(), "exclusive") + + for _ in range(10): + self.assertEqual(self.done.get_nowait(), "shared2") + + self.assertRaises(Queue.Empty, self.done.get_nowait) + + def testPriority(self): + # Acquire in exclusive mode + self.assert_(self.sl.acquire(shared=0)) + + # Queue acquires + def _Acquire(prev, next, shared, priority, result): + prev.wait() + self.sl.acquire(shared=shared, priority=priority, test_notify=next.set) + try: + self.done.put(result) + finally: + self.sl.release() + + counter = itertools.count(0) + priorities = range(-20, 30) + first = threading.Event() + prev = first + + # Data structure: + # { + # priority: + # [(shared/exclusive, set(acquire names), set(pending threads)), + # (shared/exclusive, ...), + # ..., + # ], + # } + perprio = {} + + # References shared acquire per priority in L{perprio}. Data structure: + # { + # priority: (shared=1, set(acquire names), set(pending threads)), + # } + prioshared = {} + + for seed in [4979, 9523, 14902, 32440]: + # Use a deterministic random generator + rnd = random.Random(seed) + for priority in [rnd.choice(priorities) for _ in range(30)]: + modes = [0, 1] + rnd.shuffle(modes) + for shared in modes: + # Unique name + acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority) + + ev = threading.Event() + thread = self._addThread(target=_Acquire, + args=(prev, ev, shared, priority, acqname)) + prev = ev + + # Record expected aqcuire, see above for structure + data = (shared, set([acqname]), set([thread])) + priolist = perprio.setdefault(priority, []) + if shared: + priosh = prioshared.get(priority, None) + if priosh: + # Shared acquires are merged + for i, j in zip(priosh[1:], data[1:]): + i.update(j) + assert data[0] == priosh[0] + else: + prioshared[priority] = data + priolist.append(data) + else: + priolist.append(data) + + # Start all acquires and wait for them + first.set() + prev.wait() + + # Check lock information + self.assertEqual(self.sl.GetLockInfo(set()), + [(self.sl.name, None, None, None)]) + self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])), + [(self.sl.name, "exclusive", + [threading.currentThread().getName()], None)]) + + self._VerifyPrioPending(self.sl.GetLockInfo(set([query.LQ_PENDING])), + perprio) + + # Let threads acquire the lock + self.sl.release() + + # Wait for everything to finish + self._waitThreads() + + self.assert_(self.sl._check_empty()) + + # Check acquires by priority + for acquires in [perprio[i] for i in sorted(perprio.keys())]: + for (_, names, _) in acquires: + # For shared acquires, the set will contain 1..n entries. For exclusive + # acquires only one. + while names: + names.remove(self.done.get_nowait()) + self.assertFalse(compat.any(names for (_, names, _) in acquires)) + + self.assertRaises(Queue.Empty, self.done.get_nowait) + + def _VerifyPrioPending(self, ((name, mode, owner, pending), ), perprio): + self.assertEqual(name, self.sl.name) + self.assert_(mode is None) + self.assert_(owner is None) + + self.assertEqual([(pendmode, sorted(waiting)) + for (pendmode, waiting) in pending], + [(["exclusive", "shared"][int(bool(shared))], + sorted(t.getName() for t in threads)) + for acquires in [perprio[i] + for i in sorted(perprio.keys())] + for (shared, _, threads) in acquires]) + + class _FakeTimeForSpuriousNotifications: + def __init__(self, now, check_end): + self.now = now + self.check_end = check_end + + # Deterministic random number generator + self.rnd = random.Random(15086) + + def time(self): + # Advance time if the random number generator thinks so (this is to test + # multiple notifications without advancing the time) + if self.rnd.random() < 0.3: + self.now += self.rnd.random() + + self.check_end(self.now) + + return self.now + + @_Repeat + def testAcquireTimeoutWithSpuriousNotifications(self): + ready = threading.Event() + locked = threading.Event() + req = Queue.Queue(0) + + epoch = 4000.0 + timeout = 60.0 + + def check_end(now): + self.assertFalse(locked.isSet()) + + # If we waited long enough (in virtual time), tell main thread to release + # lock, otherwise tell it to notify once more + req.put(now < (epoch + (timeout * 0.8))) + + time_fn = self._FakeTimeForSpuriousNotifications(epoch, check_end).time + + sl = locking.SharedLock("test", _time_fn=time_fn) + + # Acquire in exclusive mode + sl.acquire(shared=0) + + def fn(): + self.assertTrue(sl.acquire(shared=0, timeout=timeout, + test_notify=ready.set)) + locked.set() + sl.release() + self.done.put("success") + + # Start acquire with timeout and wait for it to be ready + self._addThread(target=fn) + ready.wait() + + # The separate thread is now waiting to acquire the lock, so start sending + # spurious notifications. + + # Wait for separate thread to ask for another notification + count = 0 + while req.get(): + # After sending the notification, the lock will take a short amount of + # time to notice and to retrieve the current time + sl._notify_topmost() + count += 1 + + self.assertTrue(count > 100, "Not enough notifications were sent") + + self.assertFalse(locked.isSet()) + + # Some notifications have been sent, now actually release the lock + sl.release() + + # Wait for lock to be acquired + locked.wait() + + self._waitThreads() + + self.assertEqual(self.done.get_nowait(), "success") + self.assertRaises(Queue.Empty, self.done.get_nowait) + + +class TestSharedLockInCondition(_ThreadedTestCase): + """SharedLock as a condition lock tests""" + + def setUp(self): + _ThreadedTestCase.setUp(self) + self.sl = locking.SharedLock("TestSharedLockInCondition") + self.setCondition() + + def setCondition(self): + self.cond = threading.Condition(self.sl) + + def testKeepMode(self): + self.cond.acquire(shared=1) + self.assert_(self.sl.is_owned(shared=1)) + self.cond.wait(0) + self.assert_(self.sl.is_owned(shared=1)) + self.cond.release() + self.cond.acquire(shared=0) + self.assert_(self.sl.is_owned(shared=0)) + self.cond.wait(0) + self.assert_(self.sl.is_owned(shared=0)) + self.cond.release() + +class TestSharedLockInPipeCondition(TestSharedLockInCondition): + """SharedLock as a pipe condition lock tests""" -class TestLockSet(unittest.TestCase): + def setCondition(self): + self.cond = locking.PipeCondition(self.sl) + + +class TestSSynchronizedDecorator(_ThreadedTestCase): + """Shared Lock Synchronized decorator test""" + + def setUp(self): + _ThreadedTestCase.setUp(self) + + @locking.ssynchronized(_decoratorlock) + def _doItExclusive(self): + self.assert_(_decoratorlock.is_owned()) + self.done.put('EXC') + + @locking.ssynchronized(_decoratorlock, shared=1) + def _doItSharer(self): + self.assert_(_decoratorlock.is_owned(shared=1)) + self.done.put('SHR') + + def testDecoratedFunctions(self): + self._doItExclusive() + self.assertFalse(_decoratorlock.is_owned()) + self._doItSharer() + self.assertFalse(_decoratorlock.is_owned()) + + def testSharersCanCoexist(self): + _decoratorlock.acquire(shared=1) + threading.Thread(target=self._doItSharer).start() + self.assert_(self.done.get(True, 1)) + _decoratorlock.release() + + @_Repeat + def testExclusiveBlocksExclusive(self): + _decoratorlock.acquire() + self._addThread(target=self._doItExclusive) + # give it a bit of time to check that it's not actually doing anything + self.assertRaises(Queue.Empty, self.done.get_nowait) + _decoratorlock.release() + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), 'EXC') + + @_Repeat + def testExclusiveBlocksSharer(self): + _decoratorlock.acquire() + self._addThread(target=self._doItSharer) + self.assertRaises(Queue.Empty, self.done.get_nowait) + _decoratorlock.release() + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), 'SHR') + + @_Repeat + def testSharerBlocksExclusive(self): + _decoratorlock.acquire(shared=1) + self._addThread(target=self._doItExclusive) + self.assertRaises(Queue.Empty, self.done.get_nowait) + _decoratorlock.release() + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), 'EXC') + + +class TestLockSet(_ThreadedTestCase): """LockSet tests""" def setUp(self): + _ThreadedTestCase.setUp(self) + self._setUpLS() + + def _setUpLS(self): + """Helper to (re)initialize the lock set""" self.resources = ['one', 'two', 'three'] - self.ls = locking.LockSet(self.resources) - # helper threads use the 'done' queue to tell the master they finished. - self.done = Queue.Queue(0) + self.ls = locking.LockSet(self.resources, "TestLockSet") def testResources(self): self.assertEquals(self.ls._names(), set(self.resources)) - newls = locking.LockSet() + newls = locking.LockSet([], "TestLockSet.testResources") self.assertEquals(newls._names(), set()) + def testCheckOwnedUnknown(self): + self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one")) + for shared in [-1, 0, 1, 6378, 24255]: + self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one", + shared=shared)) + + def testCheckOwnedUnknownWhileHolding(self): + self.assertFalse(self.ls.check_owned([])) + self.ls.acquire("one", shared=1) + self.assertRaises(errors.LockError, self.ls.check_owned, "nonexist") + self.assertTrue(self.ls.check_owned("one", shared=1)) + self.assertFalse(self.ls.check_owned("one", shared=0)) + self.assertFalse(self.ls.check_owned(["one", "two"])) + self.assertRaises(errors.LockError, self.ls.check_owned, + ["one", "nonexist"]) + self.assertRaises(errors.LockError, self.ls.check_owned, "") + self.ls.release() + self.assertFalse(self.ls.check_owned([])) + self.assertFalse(self.ls.check_owned("one")) + def testAcquireRelease(self): - self.ls.acquire('one') - self.assertEquals(self.ls._list_owned(), set(['one'])) + self.assertFalse(self.ls.check_owned(self.ls._names())) + self.assert_(self.ls.acquire('one')) + self.assertEquals(self.ls.list_owned(), set(['one'])) + self.assertTrue(self.ls.check_owned("one")) + self.assertTrue(self.ls.check_owned("one", shared=0)) + self.assertFalse(self.ls.check_owned("one", shared=1)) self.ls.release() - self.assertEquals(self.ls._list_owned(), set()) - self.ls.acquire(['one']) - self.assertEquals(self.ls._list_owned(), set(['one'])) + self.assertEquals(self.ls.list_owned(), set()) + self.assertFalse(self.ls.check_owned(self.ls._names())) + self.assertEquals(self.ls.acquire(['one']), set(['one'])) + self.assertEquals(self.ls.list_owned(), set(['one'])) self.ls.release() - self.assertEquals(self.ls._list_owned(), set()) + self.assertEquals(self.ls.list_owned(), set()) self.ls.acquire(['one', 'two', 'three']) - self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three'])) + self.assertEquals(self.ls.list_owned(), set(['one', 'two', 'three'])) + self.assertTrue(self.ls.check_owned(self.ls._names())) + self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0)) + self.assertFalse(self.ls.check_owned(self.ls._names(), shared=1)) self.ls.release('one') - self.assertEquals(self.ls._list_owned(), set(['two', 'three'])) + self.assertFalse(self.ls.check_owned(["one"])) + self.assertTrue(self.ls.check_owned(["two", "three"])) + self.assertTrue(self.ls.check_owned(["two", "three"], shared=0)) + self.assertFalse(self.ls.check_owned(["two", "three"], shared=1)) + self.assertEquals(self.ls.list_owned(), set(['two', 'three'])) self.ls.release(['three']) - self.assertEquals(self.ls._list_owned(), set(['two'])) + self.assertEquals(self.ls.list_owned(), set(['two'])) self.ls.release() - self.assertEquals(self.ls._list_owned(), set()) - self.ls.acquire(['one', 'three']) - self.assertEquals(self.ls._list_owned(), set(['one', 'three'])) + self.assertEquals(self.ls.list_owned(), set()) + self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three'])) + self.assertEquals(self.ls.list_owned(), set(['one', 'three'])) self.ls.release() - self.assertEquals(self.ls._list_owned(), set()) + self.assertEquals(self.ls.list_owned(), set()) + for name in self.ls._names(): + self.assertFalse(self.ls.check_owned(name)) def testNoDoubleAcquire(self): self.ls.acquire('one') @@ -284,39 +1211,47 @@ class TestLockSet(unittest.TestCase): def testAddRemove(self): self.ls.add('four') - self.assertEquals(self.ls._list_owned(), set()) + self.assertEquals(self.ls.list_owned(), set()) self.assert_('four' in self.ls._names()) self.ls.add(['five', 'six', 'seven'], acquired=1) self.assert_('five' in self.ls._names()) self.assert_('six' in self.ls._names()) self.assert_('seven' in self.ls._names()) - self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven'])) - self.ls.remove(['five', 'six']) + self.assertEquals(self.ls.list_owned(), set(['five', 'six', 'seven'])) + self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six']) self.assert_('five' not in self.ls._names()) self.assert_('six' not in self.ls._names()) - self.assertEquals(self.ls._list_owned(), set(['seven'])) - self.ls.add('eight', acquired=1, shared=1) - self.assert_('eight' in self.ls._names()) - self.assertEquals(self.ls._list_owned(), set(['seven', 'eight'])) + self.assertEquals(self.ls.list_owned(), set(['seven'])) + self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1) self.ls.remove('seven') self.assert_('seven' not in self.ls._names()) - self.assertEquals(self.ls._list_owned(), set(['eight'])) + self.assertEquals(self.ls.list_owned(), set([])) + self.ls.acquire(None, shared=1) + self.assertRaises(AssertionError, self.ls.add, 'eight') + self.ls.release() + self.ls.acquire(None) + self.ls.add('eight', acquired=1) + self.assert_('eight' in self.ls._names()) + self.assert_('eight' in self.ls.list_owned()) + self.ls.add('nine') + self.assert_('nine' in self.ls._names()) + self.assert_('nine' not in self.ls.list_owned()) self.ls.release() self.ls.remove(['two']) self.assert_('two' not in self.ls._names()) self.ls.acquire('three') - self.ls.remove(['three']) + self.assertEquals(self.ls.remove(['three']), ['three']) self.assert_('three' not in self.ls._names()) - self.assertEquals(self.ls.remove('three'), ['three']) - self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['three', 'six']) + self.assertEquals(self.ls.remove('three'), []) + self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one']) self.assert_('one' not in self.ls._names()) def testRemoveNonBlocking(self): - self.assertRaises(NotImplementedError, self.ls.remove, 'one', blocking=0) self.ls.acquire('one') - self.assertEquals(self.ls.remove('one', blocking=0), []) + self.assertEquals(self.ls.remove('one'), ['one']) self.ls.acquire(['two', 'three']) - self.assertEquals(self.ls.remove(['two', 'three'], blocking=0), []) + self.assertEquals(self.ls.remove(['two', 'three']), + ['two', 'three']) def testNoDoubleAdd(self): self.assertRaises(errors.LockError, self.ls.add, 'two') @@ -330,96 +1265,505 @@ class TestLockSet(unittest.TestCase): # Cannot remove 'three' as we are sharing it self.assertRaises(AssertionError, self.ls.remove, 'three') - def _doLockSet(self, set, shared): + def testAcquireSetLock(self): + # acquire the set-lock exclusively + self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three'])) + self.assertEquals(self.ls.list_owned(), set(['one', 'two', 'three'])) + self.assertEquals(self.ls.is_owned(), True) + self.assertEquals(self.ls._names(), set(['one', 'two', 'three'])) + # I can still add/remove elements... + self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three']) + self.assert_(self.ls.add('six')) + self.ls.release() + # share the set-lock + self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six'])) + # adding new elements is not possible + self.assertRaises(AssertionError, self.ls.add, 'five') + self.ls.release() + + def testAcquireWithRepetitions(self): + self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1), + set(['two', 'two', 'three'])) + self.ls.release(['two', 'two']) + self.assertEquals(self.ls.list_owned(), set(['three'])) + + def testEmptyAcquire(self): + # Acquire an empty list of locks... + self.assertEquals(self.ls.acquire([]), set()) + self.assertEquals(self.ls.list_owned(), set()) + # New locks can still be addded + self.assert_(self.ls.add('six')) + # "re-acquiring" is not an issue, since we had really acquired nothing + self.assertEquals(self.ls.acquire([], shared=1), set()) + self.assertEquals(self.ls.list_owned(), set()) + # We haven't really acquired anything, so we cannot release + self.assertRaises(AssertionError, self.ls.release) + + def _doLockSet(self, names, shared): try: - self.ls.acquire(set, shared=shared) + self.ls.acquire(names, shared=shared) self.done.put('DONE') self.ls.release() except errors.LockError: self.done.put('ERR') - def _doRemoveSet(self, set): - self.done.put(self.ls.remove(set)) + def _doAddSet(self, names): + try: + self.ls.add(names, acquired=1) + self.done.put('DONE') + self.ls.release() + except errors.LockError: + self.done.put('ERR') + def _doRemoveSet(self, names): + self.done.put(self.ls.remove(names)) + + @_Repeat def testConcurrentSharedAcquire(self): self.ls.acquire(['one', 'two'], shared=1) - Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - Thread(target=self._doLockSet, args=(['one', 'two', 'three'], 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - Thread(target=self._doLockSet, args=('three', 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start() - Thread(target=self._doLockSet, args=(['two', 'three'], 0)).start() - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doLockSet, args=(['one', 'two'], 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._addThread(target=self._doLockSet, args=('three', 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._addThread(target=self._doLockSet, args=(['one', 'two'], 0)) + self._addThread(target=self._doLockSet, args=(['two', 'three'], 0)) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.ls.release() - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self.assertEqual(self.done.get_nowait(), 'DONE') + @_Repeat def testConcurrentExclusiveAcquire(self): self.ls.acquire(['one', 'two']) - Thread(target=self._doLockSet, args=('three', 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - Thread(target=self._doLockSet, args=('three', 0)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start() - Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start() - Thread(target=self._doLockSet, args=('one', 0)).start() - Thread(target=self._doLockSet, args=('one', 1)).start() - Thread(target=self._doLockSet, args=(['two', 'three'], 0)).start() - Thread(target=self._doLockSet, args=(['two', 'three'], 1)).start() - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doLockSet, args=('three', 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._addThread(target=self._doLockSet, args=('three', 0)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self.assertRaises(Queue.Empty, self.done.get_nowait) + self._addThread(target=self._doLockSet, args=(['one', 'two'], 0)) + self._addThread(target=self._doLockSet, args=(['one', 'two'], 1)) + self._addThread(target=self._doLockSet, args=('one', 0)) + self._addThread(target=self._doLockSet, args=('one', 1)) + self._addThread(target=self._doLockSet, args=(['two', 'three'], 0)) + self._addThread(target=self._doLockSet, args=(['two', 'three'], 1)) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.ls.release() - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') + self._waitThreads() + for _ in range(6): + self.failUnlessEqual(self.done.get_nowait(), 'DONE') + + @_Repeat + def testSimpleAcquireTimeoutExpiring(self): + names = sorted(self.ls._names()) + self.assert_(len(names) >= 3) + + # Get name of first lock + first = names[0] + + # Get name of last lock + last = names.pop() + + checks = [ + # Block first and try to lock it again + (first, first), + + # Block last and try to lock all locks + (None, first), + + # Block last and try to lock it again + (last, last), + ] + + for (wanted, block) in checks: + # Lock in exclusive mode + self.assert_(self.ls.acquire(block, shared=0)) + + def _AcquireOne(): + # Try to get the same lock again with a timeout (should never succeed) + acquired = self.ls.acquire(wanted, timeout=0.1, shared=0) + if acquired: + self.done.put("acquired") + self.ls.release() + else: + self.assert_(acquired is None) + self.assertFalse(self.ls.list_owned()) + self.assertFalse(self.ls.is_owned()) + self.done.put("not acquired") + + self._addThread(target=_AcquireOne) + + # Wait for timeout in thread to expire + self._waitThreads() + + # Release exclusive lock again + self.ls.release() + + self.assertEqual(self.done.get_nowait(), "not acquired") + self.assertRaises(Queue.Empty, self.done.get_nowait) + + @_Repeat + def testDelayedAndExpiringLockAcquire(self): + self._setUpLS() + self.ls.add(['five', 'six', 'seven', 'eight', 'nine']) + + for expire in (False, True): + names = sorted(self.ls._names()) + self.assertEqual(len(names), 8) + + lock_ev = dict([(i, threading.Event()) for i in names]) + + # Lock all in exclusive mode + self.assert_(self.ls.acquire(names, shared=0)) + + if expire: + # We'll wait at least 300ms per lock + lockwait = len(names) * [0.3] + + # Fail if we can't acquire all locks in 400ms. There are 8 locks, so + # this gives us up to 2.4s to fail. + lockall_timeout = 0.4 + else: + # This should finish rather quickly + lockwait = None + lockall_timeout = len(names) * 5.0 + + def _LockAll(): + def acquire_notification(name): + if not expire: + self.done.put("getting %s" % name) + + # Kick next lock + lock_ev[name].set() + + if self.ls.acquire(names, shared=0, timeout=lockall_timeout, + test_notify=acquire_notification): + self.done.put("got all") + self.ls.release() + else: + self.done.put("timeout on all") + + # Notify all locks + for ev in lock_ev.values(): + ev.set() + + t = self._addThread(target=_LockAll) + + for idx, name in enumerate(names): + # Wait for actual acquire on this lock to start + lock_ev[name].wait(10.0) + if expire and t.isAlive(): + # Wait some time after getting the notification to make sure the lock + # acquire will expire + SafeSleep(lockwait[idx]) + + self.ls.release(names=name) + + self.assertFalse(self.ls.list_owned()) + + self._waitThreads() + + if expire: + # Not checking which locks were actually acquired. Doing so would be + # too timing-dependant. + self.assertEqual(self.done.get_nowait(), "timeout on all") + else: + for i in names: + self.assertEqual(self.done.get_nowait(), "getting %s" % i) + self.assertEqual(self.done.get_nowait(), "got all") + self.assertRaises(Queue.Empty, self.done.get_nowait) + + @_Repeat def testConcurrentRemove(self): self.ls.add('four') self.ls.acquire(['one', 'two', 'four']) - Thread(target=self._doLockSet, args=(['one', 'four'], 0)).start() - Thread(target=self._doLockSet, args=(['one', 'four'], 1)).start() - Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start() - Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start() - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doLockSet, args=(['one', 'four'], 0)) + self._addThread(target=self._doLockSet, args=(['one', 'four'], 1)) + self._addThread(target=self._doLockSet, args=(['one', 'two'], 0)) + self._addThread(target=self._doLockSet, args=(['one', 'two'], 1)) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.ls.remove('one') self.ls.release() - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') - self.assertEqual(self.done.get(True, 1), 'ERR') + self._waitThreads() + for i in range(4): + self.failUnlessEqual(self.done.get_nowait(), 'ERR') self.ls.add(['five', 'six'], acquired=1) - Thread(target=self._doLockSet, args=(['three', 'six'], 1)).start() - Thread(target=self._doLockSet, args=(['three', 'six'], 0)).start() - Thread(target=self._doLockSet, args=(['four', 'six'], 1)).start() - Thread(target=self._doLockSet, args=(['four', 'six'], 0)).start() + self._addThread(target=self._doLockSet, args=(['three', 'six'], 1)) + self._addThread(target=self._doLockSet, args=(['three', 'six'], 0)) + self._addThread(target=self._doLockSet, args=(['four', 'six'], 1)) + self._addThread(target=self._doLockSet, args=(['four', 'six'], 0)) self.ls.remove('five') self.ls.release() - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') - self.assertEqual(self.done.get(True, 1), 'DONE') + self._waitThreads() + for i in range(4): + self.failUnlessEqual(self.done.get_nowait(), 'DONE') self.ls.acquire(['three', 'four']) - Thread(target=self._doRemoveSet, args=(['four', 'six'], )).start() - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doRemoveSet, args=(['four', 'six'], )) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.ls.remove('four') - self.assertEqual(self.done.get(True, 1), ['four']) - Thread(target=self._doRemoveSet, args=(['two'])).start() - self.assertEqual(self.done.get(True, 1), []) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), ['six']) + self._addThread(target=self._doRemoveSet, args=(['two'])) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), ['two']) + self.ls.release() + # reset lockset + self._setUpLS() + + @_Repeat + def testConcurrentSharedSetLock(self): + # share the set-lock... + self.ls.acquire(None, shared=1) + # ...another thread can share it too + self._addThread(target=self._doLockSet, args=(None, 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + # ...or just share some elements + self._addThread(target=self._doLockSet, args=(['one', 'three'], 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + # ...but not add new ones or remove any + t = self._addThread(target=self._doAddSet, args=(['nine'])) + self._addThread(target=self._doRemoveSet, args=(['two'], )) + self.assertRaises(Queue.Empty, self.done.get_nowait) + # this just releases the set-lock + self.ls.release([]) + t.join(60) + self.assertEqual(self.done.get_nowait(), 'DONE') + # release the lock on the actual elements so remove() can proceed too + self.ls.release() + self._waitThreads() + self.failUnlessEqual(self.done.get_nowait(), ['two']) + # reset lockset + self._setUpLS() + + @_Repeat + def testConcurrentExclusiveSetLock(self): + # acquire the set-lock... + self.ls.acquire(None, shared=0) + # ...no one can do anything else + self._addThread(target=self._doLockSet, args=(None, 1)) + self._addThread(target=self._doLockSet, args=(None, 0)) + self._addThread(target=self._doLockSet, args=(['three'], 0)) + self._addThread(target=self._doLockSet, args=(['two'], 1)) + self._addThread(target=self._doAddSet, args=(['nine'])) + self.assertRaises(Queue.Empty, self.done.get_nowait) + self.ls.release() + self._waitThreads() + for _ in range(5): + self.assertEqual(self.done.get(True, 1), 'DONE') + # cleanup + self._setUpLS() + + @_Repeat + def testConcurrentSetLockAdd(self): + self.ls.acquire('one') + # Another thread wants the whole SetLock + self._addThread(target=self._doLockSet, args=(None, 0)) + self._addThread(target=self._doLockSet, args=(None, 1)) + self.assertRaises(Queue.Empty, self.done.get_nowait) + self.assertRaises(AssertionError, self.ls.add, 'four') + self.ls.release() + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self.assertEqual(self.done.get_nowait(), 'DONE') + self.ls.acquire(None) + self._addThread(target=self._doLockSet, args=(None, 0)) + self._addThread(target=self._doLockSet, args=(None, 1)) + self.assertRaises(Queue.Empty, self.done.get_nowait) + self.ls.add('four') + self.ls.add('five', acquired=1) + self.ls.add('six', acquired=1, shared=1) + self.assertEquals(self.ls.list_owned(), + set(['one', 'two', 'three', 'five', 'six'])) + self.assertEquals(self.ls.is_owned(), True) + self.assertEquals(self.ls._names(), + set(['one', 'two', 'three', 'four', 'five', 'six'])) + self.ls.release() + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self.assertEqual(self.done.get_nowait(), 'DONE') + self._setUpLS() + + @_Repeat + def testEmptyLockSet(self): + # get the set-lock + self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three'])) + # now empty it... + self.ls.remove(['one', 'two', 'three']) + # and adds/locks by another thread still wait + self._addThread(target=self._doAddSet, args=(['nine'])) + self._addThread(target=self._doLockSet, args=(None, 1)) + self._addThread(target=self._doLockSet, args=(None, 0)) + self.assertRaises(Queue.Empty, self.done.get_nowait) + self.ls.release() + self._waitThreads() + for _ in range(3): + self.assertEqual(self.done.get_nowait(), 'DONE') + # empty it again... + self.assertEqual(self.ls.remove(['nine']), ['nine']) + # now share it... + self.assertEqual(self.ls.acquire(None, shared=1), set()) + # other sharers can go, adds still wait + self._addThread(target=self._doLockSet, args=(None, 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._addThread(target=self._doAddSet, args=(['nine'])) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.ls.release() + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._setUpLS() + + def testAcquireWithNamesDowngrade(self): + self.assertEquals(self.ls.acquire("two", shared=0), set(["two"])) + self.assertTrue(self.ls.is_owned()) + self.assertFalse(self.ls._get_lock().is_owned()) + self.ls.release() + self.assertFalse(self.ls.is_owned()) + self.assertFalse(self.ls._get_lock().is_owned()) + # Can't downgrade after releasing + self.assertRaises(AssertionError, self.ls.downgrade, "two") + + def testDowngrade(self): + # Not owning anything, must raise an exception + self.assertFalse(self.ls.is_owned()) + self.assertRaises(AssertionError, self.ls.downgrade) + + self.assertFalse(compat.any(i.is_owned() + for i in self.ls._get_lockdict().values())) + self.assertFalse(self.ls.check_owned(self.ls._names())) + for name in self.ls._names(): + self.assertFalse(self.ls.check_owned(name)) + + self.assertEquals(self.ls.acquire(None, shared=0), + set(["one", "two", "three"])) + self.assertRaises(AssertionError, self.ls.downgrade, "unknown lock") + + self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0)) + for name in self.ls._names(): + self.assertTrue(self.ls.check_owned(name)) + self.assertTrue(self.ls.check_owned(name, shared=0)) + self.assertFalse(self.ls.check_owned(name, shared=1)) + + self.assertTrue(self.ls._get_lock().is_owned(shared=0)) + self.assertTrue(compat.all(i.is_owned(shared=0) + for i in self.ls._get_lockdict().values())) + + # Start downgrading locks + self.assertTrue(self.ls.downgrade(names=["one"])) + self.assertTrue(self.ls._get_lock().is_owned(shared=0)) + self.assertTrue(compat.all(lock.is_owned(shared=[0, 1][int(name == "one")]) + for name, lock in + self.ls._get_lockdict().items())) + + self.assertFalse(self.ls.check_owned("one", shared=0)) + self.assertTrue(self.ls.check_owned("one", shared=1)) + self.assertTrue(self.ls.check_owned("two", shared=0)) + self.assertTrue(self.ls.check_owned("three", shared=0)) + + # Downgrade second lock + self.assertTrue(self.ls.downgrade(names="two")) + self.assertTrue(self.ls._get_lock().is_owned(shared=0)) + should_share = lambda name: [0, 1][int(name in ("one", "two"))] + self.assertTrue(compat.all(lock.is_owned(shared=should_share(name)) + for name, lock in + self.ls._get_lockdict().items())) + + self.assertFalse(self.ls.check_owned("one", shared=0)) + self.assertTrue(self.ls.check_owned("one", shared=1)) + self.assertFalse(self.ls.check_owned("two", shared=0)) + self.assertTrue(self.ls.check_owned("two", shared=1)) + self.assertTrue(self.ls.check_owned("three", shared=0)) + + # Downgrading the last exclusive lock to shared must downgrade the + # lockset-internal lock too + self.assertTrue(self.ls.downgrade(names="three")) + self.assertTrue(self.ls._get_lock().is_owned(shared=1)) + self.assertTrue(compat.all(i.is_owned(shared=1) + for i in self.ls._get_lockdict().values())) + + # Verify owned locks + for name in self.ls._names(): + self.assertTrue(self.ls.check_owned(name, shared=1)) + + # Downgrading a shared lock must be a no-op + self.assertTrue(self.ls.downgrade(names=["one", "three"])) + self.assertTrue(self.ls._get_lock().is_owned(shared=1)) + self.assertTrue(compat.all(i.is_owned(shared=1) + for i in self.ls._get_lockdict().values())) + + self.ls.release() + + def testPriority(self): + def _Acquire(prev, next, name, priority, success_fn): + prev.wait() + self.assert_(self.ls.acquire(name, shared=0, + priority=priority, + test_notify=lambda _: next.set())) + try: + success_fn() + finally: + self.ls.release() + + # Get all in exclusive mode + self.assert_(self.ls.acquire(locking.ALL_SET, shared=0)) + + done_two = Queue.Queue(0) + + first = threading.Event() + prev = first + + acquires = [("one", prio, self.done) for prio in range(1, 33)] + acquires.extend([("two", prio, done_two) for prio in range(1, 33)]) + + # Use a deterministic random generator + random.Random(741).shuffle(acquires) + + for (name, prio, done) in acquires: + ev = threading.Event() + self._addThread(target=_Acquire, + args=(prev, ev, name, prio, + compat.partial(done.put, "Prio%s" % prio))) + prev = ev + + # Start acquires + first.set() + + # Wait for last acquire to start + prev.wait() + + # Let threads acquire locks + self.ls.release() + + # Wait for threads to finish + self._waitThreads() + + for i in range(1, 33): + self.assertEqual(self.done.get_nowait(), "Prio%s" % i) + self.assertEqual(done_two.get_nowait(), "Prio%s" % i) + + self.assertRaises(Queue.Empty, self.done.get_nowait) + self.assertRaises(Queue.Empty, done_two.get_nowait) -class TestGanetiLockManager(unittest.TestCase): +class TestGanetiLockManager(_ThreadedTestCase): def setUp(self): + _ThreadedTestCase.setUp(self) self.nodes=['n1', 'n2'] + self.nodegroups=['g1', 'g2'] self.instances=['i1', 'i2', 'i3'] - self.GL = locking.GanetiLockManager(nodes=self.nodes, - instances=self.instances) - self.done = Queue.Queue(0) + self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, + self.instances) def tearDown(self): # Don't try this at home... @@ -428,65 +1772,110 @@ class TestGanetiLockManager(unittest.TestCase): def testLockingConstants(self): # The locking library internally cheats by assuming its constants have some # relationships with each other. Check those hold true. + # This relationship is also used in the Processor to recursively acquire + # the right locks. Again, please don't break it. for i in range(len(locking.LEVELS)): self.assertEqual(i, locking.LEVELS[i]) def testDoubleGLFails(self): - # We are not passing test=True, so instantiating a new one should fail - self.assertRaises(AssertionError, locking.GanetiLockManager) + self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], []) def testLockNames(self): self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL'])) self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes)) - self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(self.instances)) - self.assertEqual(self.GL._names(locking.LEVEL_CONFIG), set(['config'])) + self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), + set(self.nodegroups)) + self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), + set(self.instances)) def testInitAndResources(self): locking.GanetiLockManager._instance = None - self.GL = locking.GanetiLockManager() + self.GL = locking.GanetiLockManager([], [], []) self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL'])) self.assertEqual(self.GL._names(locking.LEVEL_NODE), set()) + self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set()) self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set()) - self.assertEqual(self.GL._names(locking.LEVEL_CONFIG), set(['config'])) locking.GanetiLockManager._instance = None - self.GL = locking.GanetiLockManager(nodes=self.nodes) + self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, []) self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL'])) self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes)) + self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), + set(self.nodegroups)) self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set()) - self.assertEqual(self.GL._names(locking.LEVEL_CONFIG), set(['config'])) locking.GanetiLockManager._instance = None - self.GL = locking.GanetiLockManager(instances=self.instances) + self.GL = locking.GanetiLockManager([], [], self.instances) self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL'])) self.assertEqual(self.GL._names(locking.LEVEL_NODE), set()) - self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(self.instances)) - self.assertEqual(self.GL._names(locking.LEVEL_CONFIG), set(['config'])) + self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set()) + self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), + set(self.instances)) def testAcquireRelease(self): self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) - self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL'])) + self.assertEquals(self.GL.list_owned(locking.LEVEL_CLUSTER), set(['BGL'])) + self.GL.acquire(locking.LEVEL_INSTANCE, ['i1']) + self.GL.acquire(locking.LEVEL_NODEGROUP, ['g2']) self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1) + self.assertTrue(self.GL.check_owned(locking.LEVEL_NODE, ["n1", "n2"], + shared=1)) + self.assertFalse(self.GL.check_owned(locking.LEVEL_INSTANCE, ["i1", "i3"])) + self.GL.release(locking.LEVEL_NODE, ['n2']) + self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set(['n1'])) + self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(['g2'])) + self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(['i1'])) self.GL.release(locking.LEVEL_NODE) - self.GL.acquire(locking.LEVEL_NODE, ['n1']) - self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1'])) - self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2']) - self.GL.acquire(locking.LEVEL_CONFIG, ['config']) - self.GL.release(locking.LEVEL_INSTANCE, ['i2']) - self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1'])) - self.GL.release(locking.LEVEL_NODE) + self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set()) + self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(['g2'])) + self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(['i1'])) + self.GL.release(locking.LEVEL_NODEGROUP) self.GL.release(locking.LEVEL_INSTANCE) - self.GL.release(locking.LEVEL_CONFIG) self.assertRaises(errors.LockError, self.GL.acquire, locking.LEVEL_INSTANCE, ['i5']) self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1) - self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3'])) + self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(['i3'])) + + def testAcquireWholeSets(self): + self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) + self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None), + set(self.instances)) + self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), + set(self.instances)) + self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None), + set(self.nodegroups)) + self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), + set(self.nodegroups)) + self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1), + set(self.nodes)) + self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), + set(self.nodes)) + self.GL.release(locking.LEVEL_NODE) + self.GL.release(locking.LEVEL_NODEGROUP) + self.GL.release(locking.LEVEL_INSTANCE) + self.GL.release(locking.LEVEL_CLUSTER) + + def testAcquireWholeAndPartial(self): + self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) + self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None), + set(self.instances)) + self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), + set(self.instances)) + self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1), + set(['n2'])) + self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), + set(['n2'])) + self.GL.release(locking.LEVEL_NODE) + self.GL.release(locking.LEVEL_INSTANCE) + self.GL.release(locking.LEVEL_CLUSTER) def testBGLDependency(self): self.assertRaises(AssertionError, self.GL.acquire, locking.LEVEL_NODE, ['n1', 'n2']) self.assertRaises(AssertionError, self.GL.acquire, locking.LEVEL_INSTANCE, ['i3']) + self.assertRaises(AssertionError, self.GL.acquire, + locking.LEVEL_NODEGROUP, ['g1']) self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) self.GL.acquire(locking.LEVEL_NODE, ['n1']) self.assertRaises(AssertionError, self.GL.release, @@ -500,27 +1889,42 @@ class TestGanetiLockManager(unittest.TestCase): self.assertRaises(AssertionError, self.GL.release, locking.LEVEL_CLUSTER) self.GL.release(locking.LEVEL_INSTANCE) - self.GL.acquire(locking.LEVEL_CONFIG, ['config']) + self.GL.acquire(locking.LEVEL_NODEGROUP, None) + self.GL.release(locking.LEVEL_NODEGROUP, ['g1']) + self.assertRaises(AssertionError, self.GL.release, + locking.LEVEL_CLUSTER, ['BGL']) self.assertRaises(AssertionError, self.GL.release, locking.LEVEL_CLUSTER) + self.GL.release(locking.LEVEL_NODEGROUP) + self.GL.release(locking.LEVEL_CLUSTER) def testWrongOrder(self): self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) - self.GL.acquire(locking.LEVEL_INSTANCE, ['i3']) + self.GL.acquire(locking.LEVEL_NODE, ['n2']) self.assertRaises(AssertionError, self.GL.acquire, locking.LEVEL_NODE, ['n1']) self.assertRaises(AssertionError, self.GL.acquire, - locking.LEVEL_INSTANCE, ['i2']) - self.GL.acquire(locking.LEVEL_CONFIG, ['config']) - self.assertRaises(AssertionError, self.GL.acquire, - locking.LEVEL_CONFIG, ['config']) - self.GL.release(locking.LEVEL_INSTANCE) - self.assertRaises(AssertionError, self.GL.acquire, - locking.LEVEL_NODE, ['n1']) + locking.LEVEL_NODEGROUP, ['g1']) self.assertRaises(AssertionError, self.GL.acquire, locking.LEVEL_INSTANCE, ['i2']) - self.assertRaises(AssertionError, self.GL.acquire, - locking.LEVEL_CONFIG, ['config']) + + def testModifiableLevels(self): + self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER, + ['BGL2']) + self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL']) + self.GL.add(locking.LEVEL_INSTANCE, ['i4']) + self.GL.remove(locking.LEVEL_INSTANCE, ['i3']) + self.GL.remove(locking.LEVEL_INSTANCE, ['i1']) + self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(['i2', 'i4'])) + self.GL.add(locking.LEVEL_NODE, ['n3']) + self.GL.remove(locking.LEVEL_NODE, ['n1']) + self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(['n2', 'n3'])) + self.GL.add(locking.LEVEL_NODEGROUP, ['g3']) + self.GL.remove(locking.LEVEL_NODEGROUP, ['g2']) + self.GL.remove(locking.LEVEL_NODEGROUP, ['g1']) + self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(['g3'])) + self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER, + ['BGL2']) # Helper function to run as a thread that shared the BGL and then acquires # some locks at another level. @@ -534,30 +1938,471 @@ class TestGanetiLockManager(unittest.TestCase): except errors.LockError: self.done.put('ERR') + @_Repeat def testConcurrency(self): self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) - Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i1', 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - self.GL.acquire(locking.LEVEL_NODE, ['n1']) + self._addThread(target=self._doLock, + args=(locking.LEVEL_INSTANCE, 'i1', 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') self.GL.acquire(locking.LEVEL_INSTANCE, ['i3']) - self.GL.acquire(locking.LEVEL_CONFIG, ['config']) - Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i1', 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i3', 1)).start() - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) - self.GL.release(locking.LEVEL_CONFIG) + self._addThread(target=self._doLock, + args=(locking.LEVEL_INSTANCE, 'i1', 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._addThread(target=self._doLock, + args=(locking.LEVEL_INSTANCE, 'i3', 1)) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.GL.release(locking.LEVEL_INSTANCE) - self.assertEqual(self.done.get(True, 1), 'DONE') + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1) - Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i2', 1)).start() - self.assertEqual(self.done.get(True, 1), 'DONE') - Thread(target=self._doLock, args=(locking.LEVEL_INSTANCE, 'i2', 0)).start() - self.assertRaises(Queue.Empty, self.done.get, True, 0.2) + self._addThread(target=self._doLock, + args=(locking.LEVEL_INSTANCE, 'i2', 1)) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'DONE') + self._addThread(target=self._doLock, + args=(locking.LEVEL_INSTANCE, 'i2', 0)) + self.assertRaises(Queue.Empty, self.done.get_nowait) self.GL.release(locking.LEVEL_INSTANCE) + self._waitThreads() self.assertEqual(self.done.get(True, 1), 'DONE') + self.GL.release(locking.LEVEL_CLUSTER, ['BGL']) + + +class TestLockMonitor(_ThreadedTestCase): + def setUp(self): + _ThreadedTestCase.setUp(self) + self.lm = locking.LockMonitor() + + def testSingleThread(self): + locks = [] + + for i in range(100): + name = "TestLock%s" % i + locks.append(locking.SharedLock(name, monitor=self.lm)) + + self.assertEqual(len(self.lm._locks), len(locks)) + result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"])) + self.assertEqual(len(result.fields), 1) + self.assertEqual(len(result.data), 100) + + # Delete all locks + del locks[:] + + # The garbage collector might needs some time + def _CheckLocks(): + if self.lm._locks: + raise utils.RetryAgain() + + utils.Retry(_CheckLocks, 0.1, 30.0) + + self.assertFalse(self.lm._locks) + + def testMultiThread(self): + locks = [] + + def _CreateLock(prev, next, name): + prev.wait() + locks.append(locking.SharedLock(name, monitor=self.lm)) + if next: + next.set() + + expnames = [] + + first = threading.Event() + prev = first + + # Use a deterministic random generator + for i in random.Random(4263).sample(range(100), 33): + name = "MtTestLock%s" % i + expnames.append(name) + + ev = threading.Event() + self._addThread(target=_CreateLock, args=(prev, ev, name)) + prev = ev + + # Add locks + first.set() + self._waitThreads() + + # Check order in which locks were added + self.assertEqual([i.name for i in locks], expnames) + + # Check query result + result = self.lm.QueryLocks(["name", "mode", "owner", "pending"]) + self.assert_(isinstance(result, dict)) + response = objects.QueryResponse.FromDict(result) + self.assertEqual(response.data, + [[(constants.RS_NORMAL, name), + (constants.RS_NORMAL, None), + (constants.RS_NORMAL, None), + (constants.RS_NORMAL, [])] + for name in utils.NiceSort(expnames)]) + self.assertEqual(len(response.fields), 4) + self.assertEqual(["name", "mode", "owner", "pending"], + [fdef.name for fdef in response.fields]) + + # Test exclusive acquire + for tlock in locks[::4]: + tlock.acquire(shared=0) + try: + def _GetExpResult(name): + if tlock.name == name: + return [(constants.RS_NORMAL, name), + (constants.RS_NORMAL, "exclusive"), + (constants.RS_NORMAL, + [threading.currentThread().getName()]), + (constants.RS_NORMAL, [])] + return [(constants.RS_NORMAL, name), + (constants.RS_NORMAL, None), + (constants.RS_NORMAL, None), + (constants.RS_NORMAL, [])] + + result = self.lm.QueryLocks(["name", "mode", "owner", "pending"]) + self.assertEqual(objects.QueryResponse.FromDict(result).data, + [_GetExpResult(name) + for name in utils.NiceSort(expnames)]) + finally: + tlock.release() + + # Test shared acquire + def _Acquire(lock, shared, ev, notify): + lock.acquire(shared=shared) + try: + notify.set() + ev.wait() + finally: + lock.release() + + for tlock1 in locks[::11]: + for tlock2 in locks[::-15]: + if tlock2 == tlock1: + # Avoid deadlocks + continue + + for tlock3 in locks[::10]: + if tlock3 in (tlock2, tlock1): + # Avoid deadlocks + continue + + releaseev = threading.Event() + + # Acquire locks + acquireev = [] + tthreads1 = [] + for i in range(3): + ev = threading.Event() + tthreads1.append(self._addThread(target=_Acquire, + args=(tlock1, 1, releaseev, ev))) + acquireev.append(ev) + + ev = threading.Event() + tthread2 = self._addThread(target=_Acquire, + args=(tlock2, 1, releaseev, ev)) + acquireev.append(ev) + + ev = threading.Event() + tthread3 = self._addThread(target=_Acquire, + args=(tlock3, 0, releaseev, ev)) + acquireev.append(ev) + + # Wait for all locks to be acquired + for i in acquireev: + i.wait() + + # Check query result + result = self.lm.QueryLocks(["name", "mode", "owner"]) + response = objects.QueryResponse.FromDict(result) + for (name, mode, owner) in response.data: + (name_status, name_value) = name + (owner_status, owner_value) = owner + + self.assertEqual(name_status, constants.RS_NORMAL) + self.assertEqual(owner_status, constants.RS_NORMAL) + + if name_value == tlock1.name: + self.assertEqual(mode, (constants.RS_NORMAL, "shared")) + self.assertEqual(set(owner_value), + set(i.getName() for i in tthreads1)) + continue + + if name_value == tlock2.name: + self.assertEqual(mode, (constants.RS_NORMAL, "shared")) + self.assertEqual(owner_value, [tthread2.getName()]) + continue + + if name_value == tlock3.name: + self.assertEqual(mode, (constants.RS_NORMAL, "exclusive")) + self.assertEqual(owner_value, [tthread3.getName()]) + continue + + self.assert_(name_value in expnames) + self.assertEqual(mode, (constants.RS_NORMAL, None)) + self.assert_(owner_value is None) + + # Release locks again + releaseev.set() + + self._waitThreads() + + result = self.lm.QueryLocks(["name", "mode", "owner"]) + self.assertEqual(objects.QueryResponse.FromDict(result).data, + [[(constants.RS_NORMAL, name), + (constants.RS_NORMAL, None), + (constants.RS_NORMAL, None)] + for name in utils.NiceSort(expnames)]) + + def testDelete(self): + lock = locking.SharedLock("TestLock", monitor=self.lm) + + self.assertEqual(len(self.lm._locks), 1) + result = self.lm.QueryLocks(["name", "mode", "owner"]) + self.assertEqual(objects.QueryResponse.FromDict(result).data, + [[(constants.RS_NORMAL, lock.name), + (constants.RS_NORMAL, None), + (constants.RS_NORMAL, None)]]) + + lock.delete() + + result = self.lm.QueryLocks(["name", "mode", "owner"]) + self.assertEqual(objects.QueryResponse.FromDict(result).data, + [[(constants.RS_NORMAL, lock.name), + (constants.RS_NORMAL, "deleted"), + (constants.RS_NORMAL, None)]]) + self.assertEqual(len(self.lm._locks), 1) + + def testPending(self): + def _Acquire(lock, shared, prev, next): + prev.wait() + + lock.acquire(shared=shared, test_notify=next.set) + try: + pass + finally: + lock.release() + + lock = locking.SharedLock("ExcLock", monitor=self.lm) + + for shared in [0, 1]: + lock.acquire() + try: + self.assertEqual(len(self.lm._locks), 1) + result = self.lm.QueryLocks(["name", "mode", "owner"]) + self.assertEqual(objects.QueryResponse.FromDict(result).data, + [[(constants.RS_NORMAL, lock.name), + (constants.RS_NORMAL, "exclusive"), + (constants.RS_NORMAL, + [threading.currentThread().getName()])]]) + + threads = [] + + first = threading.Event() + prev = first + + for i in range(5): + ev = threading.Event() + threads.append(self._addThread(target=_Acquire, + args=(lock, shared, prev, ev))) + prev = ev + + # Start acquires + first.set() + + # Wait for last acquire to start waiting + prev.wait() + + # NOTE: This works only because QueryLocks will acquire the + # lock-internal lock again and won't be able to get the information + # until it has the lock. By then the acquire should be registered in + # SharedLock.__pending (otherwise it's a bug). + + # All acquires are waiting now + if shared: + pending = [("shared", utils.NiceSort(t.getName() for t in threads))] + else: + pending = [("exclusive", [t.getName()]) for t in threads] + + result = self.lm.QueryLocks(["name", "mode", "owner", "pending"]) + self.assertEqual(objects.QueryResponse.FromDict(result).data, + [[(constants.RS_NORMAL, lock.name), + (constants.RS_NORMAL, "exclusive"), + (constants.RS_NORMAL, + [threading.currentThread().getName()]), + (constants.RS_NORMAL, pending)]]) + + self.assertEqual(len(self.lm._locks), 1) + finally: + lock.release() + + self._waitThreads() + + # No pending acquires + result = self.lm.QueryLocks(["name", "mode", "owner", "pending"]) + self.assertEqual(objects.QueryResponse.FromDict(result).data, + [[(constants.RS_NORMAL, lock.name), + (constants.RS_NORMAL, None), + (constants.RS_NORMAL, None), + (constants.RS_NORMAL, [])]]) + + self.assertEqual(len(self.lm._locks), 1) + + def testDeleteAndRecreate(self): + lname = "TestLock101923193" + + # Create some locks with the same name and keep all references + locks = [locking.SharedLock(lname, monitor=self.lm) + for _ in range(5)] + + self.assertEqual(len(self.lm._locks), len(locks)) + + result = self.lm.QueryLocks(["name", "mode", "owner"]) + self.assertEqual(objects.QueryResponse.FromDict(result).data, + [[(constants.RS_NORMAL, lname), + (constants.RS_NORMAL, None), + (constants.RS_NORMAL, None)]] * 5) + + locks[2].delete() + + # Check information order + result = self.lm.QueryLocks(["name", "mode", "owner"]) + self.assertEqual(objects.QueryResponse.FromDict(result).data, + [[(constants.RS_NORMAL, lname), + (constants.RS_NORMAL, None), + (constants.RS_NORMAL, None)]] * 2 + + [[(constants.RS_NORMAL, lname), + (constants.RS_NORMAL, "deleted"), + (constants.RS_NORMAL, None)]] + + [[(constants.RS_NORMAL, lname), + (constants.RS_NORMAL, None), + (constants.RS_NORMAL, None)]] * 2) + + locks[1].acquire(shared=0) + + last_status = [ + [(constants.RS_NORMAL, lname), + (constants.RS_NORMAL, None), + (constants.RS_NORMAL, None)], + [(constants.RS_NORMAL, lname), + (constants.RS_NORMAL, "exclusive"), + (constants.RS_NORMAL, [threading.currentThread().getName()])], + [(constants.RS_NORMAL, lname), + (constants.RS_NORMAL, "deleted"), + (constants.RS_NORMAL, None)], + [(constants.RS_NORMAL, lname), + (constants.RS_NORMAL, None), + (constants.RS_NORMAL, None)], + [(constants.RS_NORMAL, lname), + (constants.RS_NORMAL, None), + (constants.RS_NORMAL, None)], + ] + + # Check information order + result = self.lm.QueryLocks(["name", "mode", "owner"]) + self.assertEqual(objects.QueryResponse.FromDict(result).data, last_status) + + self.assertEqual(len(set(self.lm._locks.values())), len(locks)) + self.assertEqual(len(self.lm._locks), len(locks)) + + # Check lock deletion + for idx in range(len(locks)): + del locks[0] + assert gc.isenabled() + gc.collect() + self.assertEqual(len(self.lm._locks), len(locks)) + result = self.lm.QueryLocks(["name", "mode", "owner"]) + self.assertEqual(objects.QueryResponse.FromDict(result).data, + last_status[idx + 1:]) + + # All locks should have been deleted + assert not locks + self.assertFalse(self.lm._locks) + + result = self.lm.QueryLocks(["name", "mode", "owner"]) + self.assertEqual(objects.QueryResponse.FromDict(result).data, []) + + class _FakeLock: + def __init__(self): + self._info = [] + + def AddResult(self, *args): + self._info.append(args) + + def CountPending(self): + return len(self._info) + + def GetLockInfo(self, requested): + (exp_requested, result) = self._info.pop(0) + + if exp_requested != requested: + raise Exception("Requested information (%s) does not match" + " expectations (%s)" % (requested, exp_requested)) + + return result + + def testMultipleResults(self): + fl1 = self._FakeLock() + fl2 = self._FakeLock() + + self.lm.RegisterLock(fl1) + self.lm.RegisterLock(fl2) + + # Empty information + for i in [fl1, fl2]: + i.AddResult(set([query.LQ_MODE, query.LQ_OWNER]), []) + result = self.lm.QueryLocks(["name", "mode", "owner"]) + self.assertEqual(objects.QueryResponse.FromDict(result).data, []) + for i in [fl1, fl2]: + self.assertEqual(i.CountPending(), 0) + + # Check ordering + for fn in [lambda x: x, reversed, sorted]: + fl1.AddResult(set(), list(fn([ + ("aaa", None, None, None), + ("bbb", None, None, None), + ]))) + fl2.AddResult(set(), []) + result = self.lm.QueryLocks(["name"]) + self.assertEqual(objects.QueryResponse.FromDict(result).data, [ + [(constants.RS_NORMAL, "aaa")], + [(constants.RS_NORMAL, "bbb")], + ]) + for i in [fl1, fl2]: + self.assertEqual(i.CountPending(), 0) + + for fn2 in [lambda x: x, reversed, sorted]: + fl1.AddResult(set([query.LQ_MODE]), list(fn([ + # Same name, but different information + ("aaa", "mode0", None, None), + ("aaa", "mode1", None, None), + ("aaa", "mode2", None, None), + ("aaa", "mode3", None, None), + ]))) + fl2.AddResult(set([query.LQ_MODE]), [ + ("zzz", "end", None, None), + ("000", "start", None, None), + ] + list(fn2([ + ("aaa", "b200", None, None), + ("aaa", "b300", None, None), + ]))) + result = self.lm.QueryLocks(["name", "mode"]) + self.assertEqual(objects.QueryResponse.FromDict(result).data, [ + [(constants.RS_NORMAL, "000"), (constants.RS_NORMAL, "start")], + ] + list(fn([ + # Name is the same, so order must be equal to incoming order + [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode0")], + [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode1")], + [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode2")], + [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode3")], + ])) + list(fn2([ + [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b200")], + [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b300")], + ])) + [ + [(constants.RS_NORMAL, "zzz"), (constants.RS_NORMAL, "end")], + ]) + for i in [fl1, fl2]: + self.assertEqual(i.CountPending(), 0) if __name__ == '__main__': - unittest.main() - #suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock) - #unittest.TextTestRunner(verbosity=2).run(suite) + testutils.GanetiTestProgram()