X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/f1501b3f3bc8c6253e80d1b94aac2c4f46c25800..f12e173649f0538e18f45fefdd9606a362ddab12:/test/ganeti.locking_unittest.py diff --git a/test/ganeti.locking_unittest.py b/test/ganeti.locking_unittest.py index 9483625..273e817 100755 --- a/test/ganeti.locking_unittest.py +++ b/test/ganeti.locking_unittest.py @@ -1,7 +1,7 @@ #!/usr/bin/python # -# Copyright (C) 2006, 2007 Google Inc. +# Copyright (C) 2006, 2007, 2010 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -27,14 +27,18 @@ import unittest import time import Queue import threading +import random from ganeti import locking from ganeti import errors +from ganeti import utils + +import testutils # This is used to test the ssynchronize decorator. # Since it's passed as input to a decorator it must be declared as a global. -_decoratorlock = locking.SharedLock() +_decoratorlock = locking.SharedLock("decorator lock") #: List for looping tests ITERATIONS = range(8) @@ -48,6 +52,15 @@ def _Repeat(fn): return wrapper +def SafeSleep(duration): + start = time.time() + while True: + delay = start + duration - time.time() + if delay <= 0.0: + break + time.sleep(delay) + + class _ThreadedTestCase(unittest.TestCase): """Test class that supports adding/waiting on threads""" def setUp(self): @@ -79,7 +92,7 @@ class _ConditionTestCase(_ThreadedTestCase): self.cond = cls(self.lock) def _testAcquireRelease(self): - self.assert_(not self.cond._is_owned()) + self.assertFalse(self.cond._is_owned()) self.assertRaises(RuntimeError, self.cond.wait) self.assertRaises(RuntimeError, self.cond.notifyAll) @@ -89,7 +102,7 @@ class _ConditionTestCase(_ThreadedTestCase): self.assert_(self.cond._is_owned()) self.cond.release() - self.assert_(not self.cond._is_owned()) + self.assertFalse(self.cond._is_owned()) self.assertRaises(RuntimeError, self.cond.wait) self.assertRaises(RuntimeError, self.cond.notifyAll) @@ -111,7 +124,7 @@ class _ConditionTestCase(_ThreadedTestCase): self.assertEqual(self.done.get(True, 1), "NN") self.assert_(self.cond._is_owned()) self.cond.release() - self.assert_(not self.cond._is_owned()) + self.assertFalse(self.cond._is_owned()) class TestSingleNotifyPipeCondition(_ConditionTestCase): @@ -153,20 +166,22 @@ class TestPipeCondition(_ConditionTestCase): self._testNotification() def _TestWait(self, fn): - self._addThread(target=fn) - self._addThread(target=fn) - self._addThread(target=fn) + threads = [ + self._addThread(target=fn), + self._addThread(target=fn), + self._addThread(target=fn), + ] # Wait for threads to be waiting - self.assertEqual(self.done.get(True, 1), "A") - self.assertEqual(self.done.get(True, 1), "A") - self.assertEqual(self.done.get(True, 1), "A") + for _ in threads: + self.assertEqual(self.done.get(True, 1), "A") self.assertRaises(Queue.Empty, self.done.get_nowait) self.cond.acquire() - self.assertEqual(self.cond._nwaiters, 3) - # This new thread can"t acquire the lock, and thus call wait, before we + self.assertEqual(len(self.cond._waiters), 3) + self.assertEqual(self.cond._waiters, set(threads)) + # This new thread can't acquire the lock, and thus call wait, before we # release it self._addThread(target=fn) self.cond.notifyAll() @@ -245,28 +260,28 @@ class TestSharedLock(_ThreadedTestCase): def setUp(self): _ThreadedTestCase.setUp(self) - self.sl = locking.SharedLock() + self.sl = locking.SharedLock("TestSharedLock") def testSequenceAndOwnership(self): - self.assert_(not self.sl._is_owned()) + self.assertFalse(self.sl._is_owned()) self.sl.acquire(shared=1) self.assert_(self.sl._is_owned()) self.assert_(self.sl._is_owned(shared=1)) - self.assert_(not self.sl._is_owned(shared=0)) + self.assertFalse(self.sl._is_owned(shared=0)) self.sl.release() - self.assert_(not self.sl._is_owned()) + self.assertFalse(self.sl._is_owned()) self.sl.acquire() self.assert_(self.sl._is_owned()) - self.assert_(not self.sl._is_owned(shared=1)) + self.assertFalse(self.sl._is_owned(shared=1)) self.assert_(self.sl._is_owned(shared=0)) self.sl.release() - self.assert_(not self.sl._is_owned()) + self.assertFalse(self.sl._is_owned()) self.sl.acquire(shared=1) self.assert_(self.sl._is_owned()) self.assert_(self.sl._is_owned(shared=1)) - self.assert_(not self.sl._is_owned(shared=0)) + self.assertFalse(self.sl._is_owned(shared=0)) self.sl.release() - self.assert_(not self.sl._is_owned()) + self.assertFalse(self.sl._is_owned()) def testBooleanValue(self): # semaphores are supposed to return a true value on a successful acquire @@ -339,7 +354,7 @@ class TestSharedLock(_ThreadedTestCase): self.sl.release() self._waitThreads() self.failUnlessEqual(self.done.get_nowait(), 'DEL') - self.sl = locking.SharedLock() + self.sl = locking.SharedLock(self.sl.name) @_Repeat def testExclusiveBlocksSharer(self): @@ -367,7 +382,7 @@ class TestSharedLock(_ThreadedTestCase): self.sl.release() self._waitThreads() self.failUnlessEqual(self.done.get_nowait(), 'DEL') - self.sl = locking.SharedLock() + self.sl = locking.SharedLock(self.sl.name) @_Repeat def testWaitingExclusiveBlocksSharer(self): @@ -430,7 +445,7 @@ class TestSharedLock(_ThreadedTestCase): # The threads who were pending return ERR for _ in range(4): self.assertEqual(self.done.get_nowait(), 'ERR') - self.sl = locking.SharedLock() + self.sl = locking.SharedLock(self.sl.name) @_Repeat def testDeletePendingDeleteExclusiveSharers(self): @@ -446,7 +461,7 @@ class TestSharedLock(_ThreadedTestCase): self.assertEqual(self.done.get_nowait(), 'ERR') self.assertEqual(self.done.get_nowait(), 'ERR') self.assertEqual(self.done.get_nowait(), 'ERR') - self.sl = locking.SharedLock() + self.sl = locking.SharedLock(self.sl.name) @_Repeat def testExclusiveAcquireTimeout(self): @@ -593,7 +608,7 @@ class TestSharedLock(_ThreadedTestCase): @_Repeat def testMixedAcquireTimeout(self): - sync = threading.Condition() + sync = threading.Event() def _AcquireShared(ev): if not self.sl.acquire(shared=1, timeout=None): @@ -604,12 +619,8 @@ class TestSharedLock(_ThreadedTestCase): # Notify main thread ev.set() - # Wait for notification - sync.acquire() - try: - sync.wait() - finally: - sync.release() + # Wait for notification from main thread + sync.wait() # Release lock self.sl.release() @@ -630,7 +641,7 @@ class TestSharedLock(_ThreadedTestCase): self.failIf(self.sl.acquire(shared=0, timeout=0.02)) # Acquire exclusive without timeout - exclsync = threading.Condition() + exclsync = threading.Event() exclev = threading.Event() def _AcquireExclusive(): @@ -642,11 +653,8 @@ class TestSharedLock(_ThreadedTestCase): # Notify main thread exclev.set() - exclsync.acquire() - try: - exclsync.wait() - finally: - exclsync.release() + # Wait for notification from main thread + exclsync.wait() self.sl.release() @@ -656,11 +664,7 @@ class TestSharedLock(_ThreadedTestCase): self.failIf(self.sl.acquire(shared=0, timeout=0.02)) # Make all shared holders release their locks - sync.acquire() - try: - sync.notifyAll() - finally: - sync.release() + sync.set() # Wait for exclusive acquire to succeed exclev.wait() @@ -679,11 +683,7 @@ class TestSharedLock(_ThreadedTestCase): self._addThread(target=_AcquireSharedSimple) # Tell exclusive lock to release - exclsync.acquire() - try: - exclsync.notifyAll() - finally: - exclsync.release() + exclsync.set() # Wait for everything to finish self._waitThreads() @@ -702,6 +702,37 @@ class TestSharedLock(_ThreadedTestCase): self.assertRaises(Queue.Empty, self.done.get_nowait) +class TestSharedLockInCondition(_ThreadedTestCase): + """SharedLock as a condition lock tests""" + + def setUp(self): + _ThreadedTestCase.setUp(self) + self.sl = locking.SharedLock("TestSharedLockInCondition") + self.setCondition() + + def setCondition(self): + self.cond = threading.Condition(self.sl) + + def testKeepMode(self): + self.cond.acquire(shared=1) + self.assert_(self.sl._is_owned(shared=1)) + self.cond.wait(0) + self.assert_(self.sl._is_owned(shared=1)) + self.cond.release() + self.cond.acquire(shared=0) + self.assert_(self.sl._is_owned(shared=0)) + self.cond.wait(0) + self.assert_(self.sl._is_owned(shared=0)) + self.cond.release() + + +class TestSharedLockInPipeCondition(TestSharedLockInCondition): + """SharedLock as a pipe condition lock tests""" + + def setCondition(self): + self.cond = locking.PipeCondition(self.sl) + + class TestSSynchronizedDecorator(_ThreadedTestCase): """Shared Lock Synchronized decorator test""" @@ -720,9 +751,9 @@ class TestSSynchronizedDecorator(_ThreadedTestCase): def testDecoratedFunctions(self): self._doItExclusive() - self.assert_(not _decoratorlock._is_owned()) + self.assertFalse(_decoratorlock._is_owned()) self._doItSharer() - self.assert_(not _decoratorlock._is_owned()) + self.assertFalse(_decoratorlock._is_owned()) def testSharersCanCoexist(self): _decoratorlock.acquire(shared=1) @@ -769,11 +800,11 @@ class TestLockSet(_ThreadedTestCase): def _setUpLS(self): """Helper to (re)initialize the lock set""" self.resources = ['one', 'two', 'three'] - self.ls = locking.LockSet(members=self.resources) + self.ls = locking.LockSet(self.resources, "TestLockSet") def testResources(self): self.assertEquals(self.ls._names(), set(self.resources)) - newls = locking.LockSet() + newls = locking.LockSet([], "TestLockSet.testResources") self.assertEquals(newls._names(), set()) def testAcquireRelease(self): @@ -966,6 +997,127 @@ class TestLockSet(_ThreadedTestCase): self.failUnlessEqual(self.done.get_nowait(), 'DONE') @_Repeat + def testSimpleAcquireTimeoutExpiring(self): + names = sorted(self.ls._names()) + self.assert_(len(names) >= 3) + + # Get name of first lock + first = names[0] + + # Get name of last lock + last = names.pop() + + checks = [ + # Block first and try to lock it again + (first, first), + + # Block last and try to lock all locks + (None, first), + + # Block last and try to lock it again + (last, last), + ] + + for (wanted, block) in checks: + # Lock in exclusive mode + self.assert_(self.ls.acquire(block, shared=0)) + + def _AcquireOne(): + # Try to get the same lock again with a timeout (should never succeed) + acquired = self.ls.acquire(wanted, timeout=0.1, shared=0) + if acquired: + self.done.put("acquired") + self.ls.release() + else: + self.assert_(acquired is None) + self.assertFalse(self.ls._list_owned()) + self.assertFalse(self.ls._is_owned()) + self.done.put("not acquired") + + self._addThread(target=_AcquireOne) + + # Wait for timeout in thread to expire + self._waitThreads() + + # Release exclusive lock again + self.ls.release() + + self.assertEqual(self.done.get_nowait(), "not acquired") + self.assertRaises(Queue.Empty, self.done.get_nowait) + + @_Repeat + def testDelayedAndExpiringLockAcquire(self): + self._setUpLS() + self.ls.add(['five', 'six', 'seven', 'eight', 'nine']) + + for expire in (False, True): + names = sorted(self.ls._names()) + self.assertEqual(len(names), 8) + + lock_ev = dict([(i, threading.Event()) for i in names]) + + # Lock all in exclusive mode + self.assert_(self.ls.acquire(names, shared=0)) + + if expire: + # We'll wait at least 300ms per lock + lockwait = len(names) * [0.3] + + # Fail if we can't acquire all locks in 400ms. There are 8 locks, so + # this gives us up to 2.4s to fail. + lockall_timeout = 0.4 + else: + # This should finish rather quickly + lockwait = None + lockall_timeout = len(names) * 5.0 + + def _LockAll(): + def acquire_notification(name): + if not expire: + self.done.put("getting %s" % name) + + # Kick next lock + lock_ev[name].set() + + if self.ls.acquire(names, shared=0, timeout=lockall_timeout, + test_notify=acquire_notification): + self.done.put("got all") + self.ls.release() + else: + self.done.put("timeout on all") + + # Notify all locks + for ev in lock_ev.values(): + ev.set() + + t = self._addThread(target=_LockAll) + + for idx, name in enumerate(names): + # Wait for actual acquire on this lock to start + lock_ev[name].wait(10.0) + + if expire and t.isAlive(): + # Wait some time after getting the notification to make sure the lock + # acquire will expire + SafeSleep(lockwait[idx]) + + self.ls.release(names=name) + + self.assertFalse(self.ls._list_owned()) + + self._waitThreads() + + if expire: + # Not checking which locks were actually acquired. Doing so would be + # too timing-dependant. + self.assertEqual(self.done.get_nowait(), "timeout on all") + else: + for i in names: + self.assertEqual(self.done.get_nowait(), "getting %s" % i) + self.assertEqual(self.done.get_nowait(), "got all") + self.assertRaises(Queue.Empty, self.done.get_nowait) + + @_Repeat def testConcurrentRemove(self): self.ls.add('four') self.ls.acquire(['one', 'two', 'four']) @@ -1140,19 +1292,19 @@ class TestGanetiLockManager(_ThreadedTestCase): def testInitAndResources(self): locking.GanetiLockManager._instance = None - self.GL = locking.GanetiLockManager() + self.GL = locking.GanetiLockManager([], []) self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL'])) self.assertEqual(self.GL._names(locking.LEVEL_NODE), set()) self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set()) locking.GanetiLockManager._instance = None - self.GL = locking.GanetiLockManager(nodes=self.nodes) + self.GL = locking.GanetiLockManager(self.nodes, []) self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL'])) self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes)) self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set()) locking.GanetiLockManager._instance = None - self.GL = locking.GanetiLockManager(instances=self.instances) + self.GL = locking.GanetiLockManager([], self.instances) self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL'])) self.assertEqual(self.GL._names(locking.LEVEL_NODE), set()) self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), @@ -1274,7 +1426,247 @@ class TestGanetiLockManager(_ThreadedTestCase): self.GL.release(locking.LEVEL_CLUSTER, ['BGL']) +class TestLockMonitor(_ThreadedTestCase): + def setUp(self): + _ThreadedTestCase.setUp(self) + self.lm = locking.LockMonitor() + + def testSingleThread(self): + locks = [] + + for i in range(100): + name = "TestLock%s" % i + locks.append(locking.SharedLock(name, monitor=self.lm)) + + self.assertEqual(len(self.lm._locks), len(locks)) + + self.assertEqual(len(self.lm.QueryLocks(["name"], False)), + 100) + + # Delete all locks + del locks[:] + + # The garbage collector might needs some time + def _CheckLocks(): + if self.lm._locks: + raise utils.RetryAgain() + + utils.Retry(_CheckLocks, 0.1, 30.0) + + self.assertFalse(self.lm._locks) + + def testMultiThread(self): + locks = [] + + def _CreateLock(prev, next, name): + prev.wait() + locks.append(locking.SharedLock(name, monitor=self.lm)) + if next: + next.set() + + expnames = [] + + first = threading.Event() + prev = first + + # Use a deterministic random generator + for i in random.Random(4263).sample(range(100), 33): + name = "MtTestLock%s" % i + expnames.append(name) + + ev = threading.Event() + self._addThread(target=_CreateLock, args=(prev, ev, name)) + prev = ev + + # Add locks + first.set() + self._waitThreads() + + # Check order in which locks were added + self.assertEqual([i.name for i in locks], expnames) + + # Sync queries are not supported + self.assertRaises(NotImplementedError, self.lm.QueryLocks, ["name"], True) + + # Check query result + self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", "pending"], + False), + [[name, None, None, []] + for name in utils.NiceSort(expnames)]) + + # Test exclusive acquire + for tlock in locks[::4]: + tlock.acquire(shared=0) + try: + def _GetExpResult(name): + if tlock.name == name: + return [name, "exclusive", [threading.currentThread().getName()], + []] + return [name, None, None, []] + + self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", + "pending"], False), + [_GetExpResult(name) + for name in utils.NiceSort(expnames)]) + finally: + tlock.release() + + # Test shared acquire + def _Acquire(lock, shared, ev, notify): + lock.acquire(shared=shared) + try: + notify.set() + ev.wait() + finally: + lock.release() + + for tlock1 in locks[::11]: + for tlock2 in locks[::-15]: + if tlock2 == tlock1: + # Avoid deadlocks + continue + + for tlock3 in locks[::10]: + if tlock3 in (tlock2, tlock1): + # Avoid deadlocks + continue + + releaseev = threading.Event() + + # Acquire locks + acquireev = [] + tthreads1 = [] + for i in range(3): + ev = threading.Event() + tthreads1.append(self._addThread(target=_Acquire, + args=(tlock1, 1, releaseev, ev))) + acquireev.append(ev) + + ev = threading.Event() + tthread2 = self._addThread(target=_Acquire, + args=(tlock2, 1, releaseev, ev)) + acquireev.append(ev) + + ev = threading.Event() + tthread3 = self._addThread(target=_Acquire, + args=(tlock3, 0, releaseev, ev)) + acquireev.append(ev) + + # Wait for all locks to be acquired + for i in acquireev: + i.wait() + + # Check query result + for (name, mode, owner) in self.lm.QueryLocks(["name", "mode", + "owner"], False): + if name == tlock1.name: + self.assertEqual(mode, "shared") + self.assertEqual(set(owner), set(i.getName() for i in tthreads1)) + continue + + if name == tlock2.name: + self.assertEqual(mode, "shared") + self.assertEqual(owner, [tthread2.getName()]) + continue + + if name == tlock3.name: + self.assertEqual(mode, "exclusive") + self.assertEqual(owner, [tthread3.getName()]) + continue + + self.assert_(name in expnames) + self.assert_(mode is None) + self.assert_(owner is None) + + # Release locks again + releaseev.set() + + self._waitThreads() + + self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False), + [[name, None, None] + for name in utils.NiceSort(expnames)]) + + def testDelete(self): + lock = locking.SharedLock("TestLock", monitor=self.lm) + + self.assertEqual(len(self.lm._locks), 1) + self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False), + [[lock.name, None, None]]) + + lock.delete() + + self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False), + [[lock.name, "deleted", None]]) + self.assertEqual(len(self.lm._locks), 1) + + def testPending(self): + def _Acquire(lock, shared, prev, next): + prev.wait() + + lock.acquire(shared=shared, test_notify=next.set) + try: + pass + finally: + lock.release() + + lock = locking.SharedLock("ExcLock", monitor=self.lm) + + for shared in [0, 1]: + lock.acquire() + try: + self.assertEqual(len(self.lm._locks), 1) + self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False), + [[lock.name, "exclusive", + [threading.currentThread().getName()]]]) + + threads = [] + + first = threading.Event() + prev = first + + for i in range(5): + ev = threading.Event() + threads.append(self._addThread(target=_Acquire, + args=(lock, shared, prev, ev))) + prev = ev + + # Start acquires + first.set() + + # Wait for last acquire to start waiting + prev.wait() + + # NOTE: This works only because QueryLocks will acquire the + # lock-internal lock again and won't be able to get the information + # until it has the lock. By then the acquire should be registered in + # SharedLock.__pending (otherwise it's a bug). + + # All acquires are waiting now + if shared: + pending = [("shared", sorted([t.getName() for t in threads]))] + else: + pending = [("exclusive", [t.getName()]) for t in threads] + + self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", + "pending"], False), + [[lock.name, "exclusive", + [threading.currentThread().getName()], + pending]]) + + self.assertEqual(len(self.lm._locks), 1) + finally: + lock.release() + + self._waitThreads() + + # No pending acquires + self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", "pending"], + False), + [[lock.name, None, None, []]]) + + self.assertEqual(len(self.lm._locks), 1) + + if __name__ == '__main__': - unittest.main() - #suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock) - #unittest.TextTestRunner(verbosity=2).run(suite) + testutils.GanetiTestProgram()