X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/e4e353579c973418be791fc2acac1fb64ba80f40..0ef4d576dc2fc077b01d727f829ebcc616eb15c9:/test/ganeti.locking_unittest.py diff --git a/test/ganeti.locking_unittest.py b/test/ganeti.locking_unittest.py index b17e7e8..494994c 100755 --- a/test/ganeti.locking_unittest.py +++ b/test/ganeti.locking_unittest.py @@ -16,7 +16,7 @@ # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA -# 0.0510-1301, USA. +# 02110-1301, USA. """Script for unittesting the locking module""" @@ -99,7 +99,7 @@ class _ConditionTestCase(_ThreadedTestCase): def _testAcquireRelease(self): self.assertFalse(self.cond._is_owned()) - self.assertRaises(RuntimeError, self.cond.wait) + self.assertRaises(RuntimeError, self.cond.wait, None) self.assertRaises(RuntimeError, self.cond.notifyAll) self.cond.acquire() @@ -109,7 +109,7 @@ class _ConditionTestCase(_ThreadedTestCase): self.cond.release() self.assertFalse(self.cond._is_owned()) - self.assertRaises(RuntimeError, self.cond.wait) + self.assertRaises(RuntimeError, self.cond.wait, None) self.assertRaises(RuntimeError, self.cond.notifyAll) def _testNotification(self): @@ -125,7 +125,7 @@ class _ConditionTestCase(_ThreadedTestCase): self._addThread(target=_NotifyAll) self.assertEqual(self.done.get(True, 1), "NE") self.assertRaises(Queue.Empty, self.done.get_nowait) - self.cond.wait() + self.cond.wait(None) self.assertEqual(self.done.get(True, 1), "NA") self.assertEqual(self.done.get(True, 1), "NN") self.assert_(self.cond._is_owned()) @@ -154,7 +154,7 @@ class TestSingleNotifyPipeCondition(_ConditionTestCase): def testNoNotifyReuse(self): self.cond.acquire() self.cond.notifyAll() - self.assertRaises(RuntimeError, self.cond.wait) + self.assertRaises(RuntimeError, self.cond.wait, None) self.assertRaises(RuntimeError, self.cond.notifyAll) self.cond.release() @@ -220,7 +220,7 @@ class TestPipeCondition(_ConditionTestCase): def _BlockingWait(): self.cond.acquire() self.done.put("A") - self.cond.wait() + self.cond.wait(None) self.cond.release() self.done.put("W") @@ -269,25 +269,25 @@ class TestSharedLock(_ThreadedTestCase): self.sl = locking.SharedLock("TestSharedLock") def testSequenceAndOwnership(self): - self.assertFalse(self.sl._is_owned()) + self.assertFalse(self.sl.is_owned()) self.sl.acquire(shared=1) - self.assert_(self.sl._is_owned()) - self.assert_(self.sl._is_owned(shared=1)) - self.assertFalse(self.sl._is_owned(shared=0)) + self.assert_(self.sl.is_owned()) + self.assert_(self.sl.is_owned(shared=1)) + self.assertFalse(self.sl.is_owned(shared=0)) self.sl.release() - self.assertFalse(self.sl._is_owned()) + self.assertFalse(self.sl.is_owned()) self.sl.acquire() - self.assert_(self.sl._is_owned()) - self.assertFalse(self.sl._is_owned(shared=1)) - self.assert_(self.sl._is_owned(shared=0)) + self.assert_(self.sl.is_owned()) + self.assertFalse(self.sl.is_owned(shared=1)) + self.assert_(self.sl.is_owned(shared=0)) self.sl.release() - self.assertFalse(self.sl._is_owned()) + self.assertFalse(self.sl.is_owned()) self.sl.acquire(shared=1) - self.assert_(self.sl._is_owned()) - self.assert_(self.sl._is_owned(shared=1)) - self.assertFalse(self.sl._is_owned(shared=0)) + self.assert_(self.sl.is_owned()) + self.assert_(self.sl.is_owned(shared=1)) + self.assertFalse(self.sl.is_owned(shared=0)) self.sl.release() - self.assertFalse(self.sl._is_owned()) + self.assertFalse(self.sl.is_owned()) def testBooleanValue(self): # semaphores are supposed to return a true value on a successful acquire @@ -433,7 +433,31 @@ class TestSharedLock(_ThreadedTestCase): self.assertRaises(errors.LockError, self.sl.delete) def testDeleteTimeout(self): - self.sl.delete(timeout=60) + self.assertTrue(self.sl.delete(timeout=60)) + + def testDeleteTimeoutFail(self): + ready = threading.Event() + finish = threading.Event() + + def fn(): + self.sl.acquire(shared=0) + ready.set() + + finish.wait() + self.sl.release() + + self._addThread(target=fn) + ready.wait() + + # Test if deleting a lock owned in exclusive mode by another thread fails + # to delete when a timeout is used + self.assertFalse(self.sl.delete(timeout=0.02)) + + finish.set() + self._waitThreads() + + self.assertTrue(self.sl.delete()) + self.assertRaises(errors.LockError, self.sl.acquire) def testNoDeleteIfSharer(self): self.sl.acquire(shared=1) @@ -612,6 +636,117 @@ class TestSharedLock(_ThreadedTestCase): self.assertRaises(Queue.Empty, self.done.get_nowait) + def testIllegalDowngrade(self): + # Not yet acquired + self.assertRaises(AssertionError, self.sl.downgrade) + + # Acquire in shared mode, downgrade should be no-op + self.assertTrue(self.sl.acquire(shared=1)) + self.assertTrue(self.sl.is_owned(shared=1)) + self.assertTrue(self.sl.downgrade()) + self.assertTrue(self.sl.is_owned(shared=1)) + self.sl.release() + + def testDowngrade(self): + self.assertTrue(self.sl.acquire()) + self.assertTrue(self.sl.is_owned(shared=0)) + self.assertTrue(self.sl.downgrade()) + self.assertTrue(self.sl.is_owned(shared=1)) + self.sl.release() + + @_Repeat + def testDowngradeJumpsAheadOfExclusive(self): + def _KeepExclusive(ev_got, ev_downgrade, ev_release): + self.assertTrue(self.sl.acquire()) + self.assertTrue(self.sl.is_owned(shared=0)) + ev_got.set() + ev_downgrade.wait() + self.assertTrue(self.sl.is_owned(shared=0)) + self.assertTrue(self.sl.downgrade()) + self.assertTrue(self.sl.is_owned(shared=1)) + ev_release.wait() + self.assertTrue(self.sl.is_owned(shared=1)) + self.sl.release() + + def _KeepExclusive2(ev_started, ev_release): + self.assertTrue(self.sl.acquire(test_notify=ev_started.set)) + self.assertTrue(self.sl.is_owned(shared=0)) + ev_release.wait() + self.assertTrue(self.sl.is_owned(shared=0)) + self.sl.release() + + def _KeepShared(ev_started, ev_got, ev_release): + self.assertTrue(self.sl.acquire(shared=1, test_notify=ev_started.set)) + self.assertTrue(self.sl.is_owned(shared=1)) + ev_got.set() + ev_release.wait() + self.assertTrue(self.sl.is_owned(shared=1)) + self.sl.release() + + # Acquire lock in exclusive mode + ev_got_excl1 = threading.Event() + ev_downgrade_excl1 = threading.Event() + ev_release_excl1 = threading.Event() + th_excl1 = self._addThread(target=_KeepExclusive, + args=(ev_got_excl1, ev_downgrade_excl1, + ev_release_excl1)) + ev_got_excl1.wait() + + # Start a second exclusive acquire + ev_started_excl2 = threading.Event() + ev_release_excl2 = threading.Event() + th_excl2 = self._addThread(target=_KeepExclusive2, + args=(ev_started_excl2, ev_release_excl2)) + ev_started_excl2.wait() + + # Start shared acquires, will jump ahead of second exclusive acquire when + # first exclusive acquire downgrades + ev_shared = [(threading.Event(), threading.Event()) for _ in range(5)] + ev_release_shared = threading.Event() + + th_shared = [self._addThread(target=_KeepShared, + args=(ev_started, ev_got, ev_release_shared)) + for (ev_started, ev_got) in ev_shared] + + # Wait for all shared acquires to start + for (ev, _) in ev_shared: + ev.wait() + + # Check lock information + self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])), + [(self.sl.name, "exclusive", [th_excl1.getName()], None)]) + [(_, _, _, pending), ] = self.sl.GetLockInfo(set([query.LQ_PENDING])) + self.assertEqual([(pendmode, sorted(waiting)) + for (pendmode, waiting) in pending], + [("exclusive", [th_excl2.getName()]), + ("shared", sorted(th.getName() for th in th_shared))]) + + # Shared acquires won't start until the exclusive lock is downgraded + ev_downgrade_excl1.set() + + # Wait for all shared acquires to be successful + for (_, ev) in ev_shared: + ev.wait() + + # Check lock information again + self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, + query.LQ_PENDING])), + [(self.sl.name, "shared", None, + [("exclusive", [th_excl2.getName()])])]) + [(_, _, owner, _), ] = self.sl.GetLockInfo(set([query.LQ_OWNER])) + self.assertEqual(set(owner), set([th_excl1.getName()] + + [th.getName() for th in th_shared])) + + ev_release_excl1.set() + ev_release_excl2.set() + ev_release_shared.set() + + self._waitThreads() + + self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER, + query.LQ_PENDING])), + [(self.sl.name, None, None, [])]) + @_Repeat def testMixedAcquireTimeout(self): sync = threading.Event() @@ -777,12 +912,14 @@ class TestSharedLock(_ThreadedTestCase): prev.wait() # Check lock information - self.assertEqual(self.sl.GetInfo(set()), (self.sl.name, None, None, None)) - self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER])), - (self.sl.name, "exclusive", - [threading.currentThread().getName()], None)) + self.assertEqual(self.sl.GetLockInfo(set()), + [(self.sl.name, None, None, None)]) + self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])), + [(self.sl.name, "exclusive", + [threading.currentThread().getName()], None)]) - self._VerifyPrioPending(self.sl.GetInfo(set([query.LQ_PENDING])), perprio) + self._VerifyPrioPending(self.sl.GetLockInfo(set([query.LQ_PENDING])), + perprio) # Let threads acquire the lock self.sl.release() @@ -803,7 +940,7 @@ class TestSharedLock(_ThreadedTestCase): self.assertRaises(Queue.Empty, self.done.get_nowait) - def _VerifyPrioPending(self, (name, mode, owner, pending), perprio): + def _VerifyPrioPending(self, ((name, mode, owner, pending), ), perprio): self.assertEqual(name, self.sl.name) self.assert_(mode is None) self.assert_(owner is None) @@ -816,6 +953,84 @@ class TestSharedLock(_ThreadedTestCase): for i in sorted(perprio.keys())] for (shared, _, threads) in acquires]) + class _FakeTimeForSpuriousNotifications: + def __init__(self, now, check_end): + self.now = now + self.check_end = check_end + + # Deterministic random number generator + self.rnd = random.Random(15086) + + def time(self): + # Advance time if the random number generator thinks so (this is to test + # multiple notifications without advancing the time) + if self.rnd.random() < 0.3: + self.now += self.rnd.random() + + self.check_end(self.now) + + return self.now + + @_Repeat + def testAcquireTimeoutWithSpuriousNotifications(self): + ready = threading.Event() + locked = threading.Event() + req = Queue.Queue(0) + + epoch = 4000.0 + timeout = 60.0 + + def check_end(now): + self.assertFalse(locked.isSet()) + + # If we waited long enough (in virtual time), tell main thread to release + # lock, otherwise tell it to notify once more + req.put(now < (epoch + (timeout * 0.8))) + + time_fn = self._FakeTimeForSpuriousNotifications(epoch, check_end).time + + sl = locking.SharedLock("test", _time_fn=time_fn) + + # Acquire in exclusive mode + sl.acquire(shared=0) + + def fn(): + self.assertTrue(sl.acquire(shared=0, timeout=timeout, + test_notify=ready.set)) + locked.set() + sl.release() + self.done.put("success") + + # Start acquire with timeout and wait for it to be ready + self._addThread(target=fn) + ready.wait() + + # The separate thread is now waiting to acquire the lock, so start sending + # spurious notifications. + + # Wait for separate thread to ask for another notification + count = 0 + while req.get(): + # After sending the notification, the lock will take a short amount of + # time to notice and to retrieve the current time + sl._notify_topmost() + count += 1 + + self.assertTrue(count > 100, "Not enough notifications were sent") + + self.assertFalse(locked.isSet()) + + # Some notifications have been sent, now actually release the lock + sl.release() + + # Wait for lock to be acquired + locked.wait() + + self._waitThreads() + + self.assertEqual(self.done.get_nowait(), "success") + self.assertRaises(Queue.Empty, self.done.get_nowait) + class TestSharedLockInCondition(_ThreadedTestCase): """SharedLock as a condition lock tests""" @@ -830,14 +1045,14 @@ class TestSharedLockInCondition(_ThreadedTestCase): def testKeepMode(self): self.cond.acquire(shared=1) - self.assert_(self.sl._is_owned(shared=1)) + self.assert_(self.sl.is_owned(shared=1)) self.cond.wait(0) - self.assert_(self.sl._is_owned(shared=1)) + self.assert_(self.sl.is_owned(shared=1)) self.cond.release() self.cond.acquire(shared=0) - self.assert_(self.sl._is_owned(shared=0)) + self.assert_(self.sl.is_owned(shared=0)) self.cond.wait(0) - self.assert_(self.sl._is_owned(shared=0)) + self.assert_(self.sl.is_owned(shared=0)) self.cond.release() @@ -856,19 +1071,19 @@ class TestSSynchronizedDecorator(_ThreadedTestCase): @locking.ssynchronized(_decoratorlock) def _doItExclusive(self): - self.assert_(_decoratorlock._is_owned()) + self.assert_(_decoratorlock.is_owned()) self.done.put('EXC') @locking.ssynchronized(_decoratorlock, shared=1) def _doItSharer(self): - self.assert_(_decoratorlock._is_owned(shared=1)) + self.assert_(_decoratorlock.is_owned(shared=1)) self.done.put('SHR') def testDecoratedFunctions(self): self._doItExclusive() - self.assertFalse(_decoratorlock._is_owned()) + self.assertFalse(_decoratorlock.is_owned()) self._doItSharer() - self.assertFalse(_decoratorlock._is_owned()) + self.assertFalse(_decoratorlock.is_owned()) def testSharersCanCoexist(self): _decoratorlock.acquire(shared=1) @@ -922,27 +1137,61 @@ class TestLockSet(_ThreadedTestCase): newls = locking.LockSet([], "TestLockSet.testResources") self.assertEquals(newls._names(), set()) + def testCheckOwnedUnknown(self): + self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one")) + for shared in [-1, 0, 1, 6378, 24255]: + self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one", + shared=shared)) + + def testCheckOwnedUnknownWhileHolding(self): + self.assertFalse(self.ls.check_owned([])) + self.ls.acquire("one", shared=1) + self.assertRaises(errors.LockError, self.ls.check_owned, "nonexist") + self.assertTrue(self.ls.check_owned("one", shared=1)) + self.assertFalse(self.ls.check_owned("one", shared=0)) + self.assertFalse(self.ls.check_owned(["one", "two"])) + self.assertRaises(errors.LockError, self.ls.check_owned, + ["one", "nonexist"]) + self.assertRaises(errors.LockError, self.ls.check_owned, "") + self.ls.release() + self.assertFalse(self.ls.check_owned([])) + self.assertFalse(self.ls.check_owned("one")) + def testAcquireRelease(self): + self.assertFalse(self.ls.check_owned(self.ls._names())) self.assert_(self.ls.acquire('one')) - self.assertEquals(self.ls._list_owned(), set(['one'])) + self.assertEquals(self.ls.list_owned(), set(['one'])) + self.assertTrue(self.ls.check_owned("one")) + self.assertTrue(self.ls.check_owned("one", shared=0)) + self.assertFalse(self.ls.check_owned("one", shared=1)) self.ls.release() - self.assertEquals(self.ls._list_owned(), set()) + self.assertEquals(self.ls.list_owned(), set()) + self.assertFalse(self.ls.check_owned(self.ls._names())) self.assertEquals(self.ls.acquire(['one']), set(['one'])) - self.assertEquals(self.ls._list_owned(), set(['one'])) + self.assertEquals(self.ls.list_owned(), set(['one'])) self.ls.release() - self.assertEquals(self.ls._list_owned(), set()) + self.assertEquals(self.ls.list_owned(), set()) self.ls.acquire(['one', 'two', 'three']) - self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three'])) + self.assertEquals(self.ls.list_owned(), set(['one', 'two', 'three'])) + self.assertTrue(self.ls.check_owned(self.ls._names())) + self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0)) + self.assertFalse(self.ls.check_owned(self.ls._names(), shared=1)) self.ls.release('one') - self.assertEquals(self.ls._list_owned(), set(['two', 'three'])) + self.assertFalse(self.ls.check_owned(["one"])) + self.assertTrue(self.ls.check_owned(["two", "three"])) + self.assertTrue(self.ls.check_owned(["two", "three"], shared=0)) + self.assertFalse(self.ls.check_owned(["two", "three"], shared=1)) + self.assertEquals(self.ls.list_owned(), set(['two', 'three'])) self.ls.release(['three']) - self.assertEquals(self.ls._list_owned(), set(['two'])) + self.assertEquals(self.ls.list_owned(), set(['two'])) self.ls.release() - self.assertEquals(self.ls._list_owned(), set()) + self.assertEquals(self.ls.list_owned(), set()) self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three'])) - self.assertEquals(self.ls._list_owned(), set(['one', 'three'])) + self.assertEquals(self.ls.list_owned(), set(['one', 'three'])) self.ls.release() - self.assertEquals(self.ls._list_owned(), set()) + self.assertEquals(self.ls.list_owned(), set()) + for name in self.ls._names(): + self.assertFalse(self.ls.check_owned(name)) def testNoDoubleAcquire(self): self.ls.acquire('one') @@ -962,31 +1211,31 @@ class TestLockSet(_ThreadedTestCase): def testAddRemove(self): self.ls.add('four') - self.assertEquals(self.ls._list_owned(), set()) + self.assertEquals(self.ls.list_owned(), set()) self.assert_('four' in self.ls._names()) self.ls.add(['five', 'six', 'seven'], acquired=1) self.assert_('five' in self.ls._names()) self.assert_('six' in self.ls._names()) self.assert_('seven' in self.ls._names()) - self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven'])) + self.assertEquals(self.ls.list_owned(), set(['five', 'six', 'seven'])) self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six']) self.assert_('five' not in self.ls._names()) self.assert_('six' not in self.ls._names()) - self.assertEquals(self.ls._list_owned(), set(['seven'])) + self.assertEquals(self.ls.list_owned(), set(['seven'])) self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1) self.ls.remove('seven') self.assert_('seven' not in self.ls._names()) - self.assertEquals(self.ls._list_owned(), set([])) + self.assertEquals(self.ls.list_owned(), set([])) self.ls.acquire(None, shared=1) self.assertRaises(AssertionError, self.ls.add, 'eight') self.ls.release() self.ls.acquire(None) self.ls.add('eight', acquired=1) self.assert_('eight' in self.ls._names()) - self.assert_('eight' in self.ls._list_owned()) + self.assert_('eight' in self.ls.list_owned()) self.ls.add('nine') self.assert_('nine' in self.ls._names()) - self.assert_('nine' not in self.ls._list_owned()) + self.assert_('nine' not in self.ls.list_owned()) self.ls.release() self.ls.remove(['two']) self.assert_('two' not in self.ls._names()) @@ -1019,8 +1268,8 @@ class TestLockSet(_ThreadedTestCase): def testAcquireSetLock(self): # acquire the set-lock exclusively self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three'])) - self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three'])) - self.assertEquals(self.ls._is_owned(), True) + self.assertEquals(self.ls.list_owned(), set(['one', 'two', 'three'])) + self.assertEquals(self.ls.is_owned(), True) self.assertEquals(self.ls._names(), set(['one', 'two', 'three'])) # I can still add/remove elements... self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three']) @@ -1036,17 +1285,17 @@ class TestLockSet(_ThreadedTestCase): self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1), set(['two', 'two', 'three'])) self.ls.release(['two', 'two']) - self.assertEquals(self.ls._list_owned(), set(['three'])) + self.assertEquals(self.ls.list_owned(), set(['three'])) def testEmptyAcquire(self): # Acquire an empty list of locks... self.assertEquals(self.ls.acquire([]), set()) - self.assertEquals(self.ls._list_owned(), set()) + self.assertEquals(self.ls.list_owned(), set()) # New locks can still be addded self.assert_(self.ls.add('six')) # "re-acquiring" is not an issue, since we had really acquired nothing self.assertEquals(self.ls.acquire([], shared=1), set()) - self.assertEquals(self.ls._list_owned(), set()) + self.assertEquals(self.ls.list_owned(), set()) # We haven't really acquired anything, so we cannot release self.assertRaises(AssertionError, self.ls.release) @@ -1145,8 +1394,8 @@ class TestLockSet(_ThreadedTestCase): self.ls.release() else: self.assert_(acquired is None) - self.assertFalse(self.ls._list_owned()) - self.assertFalse(self.ls._is_owned()) + self.assertFalse(self.ls.list_owned()) + self.assertFalse(self.ls.is_owned()) self.done.put("not acquired") self._addThread(target=_AcquireOne) @@ -1218,7 +1467,7 @@ class TestLockSet(_ThreadedTestCase): self.ls.release(names=name) - self.assertFalse(self.ls._list_owned()) + self.assertFalse(self.ls.list_owned()) self._waitThreads() @@ -1333,9 +1582,9 @@ class TestLockSet(_ThreadedTestCase): self.ls.add('four') self.ls.add('five', acquired=1) self.ls.add('six', acquired=1, shared=1) - self.assertEquals(self.ls._list_owned(), + self.assertEquals(self.ls.list_owned(), set(['one', 'two', 'three', 'five', 'six'])) - self.assertEquals(self.ls._is_owned(), True) + self.assertEquals(self.ls.is_owned(), True) self.assertEquals(self.ls._names(), set(['one', 'two', 'three', 'four', 'five', 'six'])) self.ls.release() @@ -1374,6 +1623,101 @@ class TestLockSet(_ThreadedTestCase): self.assertEqual(self.done.get_nowait(), 'DONE') self._setUpLS() + def testAcquireWithNamesDowngrade(self): + self.assertEquals(self.ls.acquire("two", shared=0), set(["two"])) + self.assertTrue(self.ls.is_owned()) + self.assertFalse(self.ls._get_lock().is_owned()) + self.ls.release() + self.assertFalse(self.ls.is_owned()) + self.assertFalse(self.ls._get_lock().is_owned()) + # Can't downgrade after releasing + self.assertRaises(AssertionError, self.ls.downgrade, "two") + + def testDowngrade(self): + # Not owning anything, must raise an exception + self.assertFalse(self.ls.is_owned()) + self.assertRaises(AssertionError, self.ls.downgrade) + + self.assertFalse(compat.any(i.is_owned() + for i in self.ls._get_lockdict().values())) + self.assertFalse(self.ls.check_owned(self.ls._names())) + for name in self.ls._names(): + self.assertFalse(self.ls.check_owned(name)) + + self.assertEquals(self.ls.acquire(None, shared=0), + set(["one", "two", "three"])) + self.assertRaises(AssertionError, self.ls.downgrade, "unknown lock") + + self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0)) + for name in self.ls._names(): + self.assertTrue(self.ls.check_owned(name)) + self.assertTrue(self.ls.check_owned(name, shared=0)) + self.assertFalse(self.ls.check_owned(name, shared=1)) + + self.assertTrue(self.ls._get_lock().is_owned(shared=0)) + self.assertTrue(compat.all(i.is_owned(shared=0) + for i in self.ls._get_lockdict().values())) + + # Start downgrading locks + self.assertTrue(self.ls.downgrade(names=["one"])) + self.assertTrue(self.ls._get_lock().is_owned(shared=0)) + self.assertTrue(compat.all(lock.is_owned(shared=[0, 1][int(name == "one")]) + for name, lock in + self.ls._get_lockdict().items())) + + self.assertFalse(self.ls.check_owned("one", shared=0)) + self.assertTrue(self.ls.check_owned("one", shared=1)) + self.assertTrue(self.ls.check_owned("two", shared=0)) + self.assertTrue(self.ls.check_owned("three", shared=0)) + + # Downgrade second lock + self.assertTrue(self.ls.downgrade(names="two")) + self.assertTrue(self.ls._get_lock().is_owned(shared=0)) + should_share = lambda name: [0, 1][int(name in ("one", "two"))] + self.assertTrue(compat.all(lock.is_owned(shared=should_share(name)) + for name, lock in + self.ls._get_lockdict().items())) + + self.assertFalse(self.ls.check_owned("one", shared=0)) + self.assertTrue(self.ls.check_owned("one", shared=1)) + self.assertFalse(self.ls.check_owned("two", shared=0)) + self.assertTrue(self.ls.check_owned("two", shared=1)) + self.assertTrue(self.ls.check_owned("three", shared=0)) + + # Downgrading the last exclusive lock to shared must downgrade the + # lockset-internal lock too + self.assertTrue(self.ls.downgrade(names="three")) + self.assertTrue(self.ls._get_lock().is_owned(shared=1)) + self.assertTrue(compat.all(i.is_owned(shared=1) + for i in self.ls._get_lockdict().values())) + + # Verify owned locks + for name in self.ls._names(): + self.assertTrue(self.ls.check_owned(name, shared=1)) + + # Downgrading a shared lock must be a no-op + self.assertTrue(self.ls.downgrade(names=["one", "three"])) + self.assertTrue(self.ls._get_lock().is_owned(shared=1)) + self.assertTrue(compat.all(i.is_owned(shared=1) + for i in self.ls._get_lockdict().values())) + + self.ls.release() + + def testDowngradeEverything(self): + self.assertEqual(self.ls.acquire(locking.ALL_SET, shared=0), + set(["one", "two", "three"])) + + # Ensure all locks are now owned in exclusive mode + for name in self.ls._names(): + self.assertTrue(self.ls.check_owned(name, shared=0)) + + # Downgrade everything + self.assertTrue(self.ls.downgrade()) + + # Ensure all locks are now owned in shared mode + for name in self.ls._names(): + self.assertTrue(self.ls.check_owned(name, shared=1)) + def testPriority(self): def _Acquire(prev, next, name, priority, success_fn): prev.wait() @@ -1433,8 +1777,9 @@ class TestGanetiLockManager(_ThreadedTestCase): self.nodes=['n1', 'n2'] self.nodegroups=['g1', 'g2'] self.instances=['i1', 'i2', 'i3'] + self.networks=['net1', 'net2', 'net3'] self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, - self.instances) + self.instances, self.networks) def tearDown(self): # Don't try this at home... @@ -1449,7 +1794,7 @@ class TestGanetiLockManager(_ThreadedTestCase): self.assertEqual(i, locking.LEVELS[i]) def testDoubleGLFails(self): - self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], []) + self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [], []) def testLockNames(self): self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL'])) @@ -1458,65 +1803,81 @@ class TestGanetiLockManager(_ThreadedTestCase): set(self.nodegroups)) self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(self.instances)) + self.assertEqual(self.GL._names(locking.LEVEL_NETWORK), + set(self.networks)) def testInitAndResources(self): locking.GanetiLockManager._instance = None - self.GL = locking.GanetiLockManager([], [], []) + self.GL = locking.GanetiLockManager([], [], [], []) self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL'])) self.assertEqual(self.GL._names(locking.LEVEL_NODE), set()) self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set()) self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set()) + self.assertEqual(self.GL._names(locking.LEVEL_NETWORK), set()) locking.GanetiLockManager._instance = None - self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, []) + self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [], []) self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL'])) self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes)) self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(self.nodegroups)) self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set()) + self.assertEqual(self.GL._names(locking.LEVEL_NETWORK), set()) locking.GanetiLockManager._instance = None - self.GL = locking.GanetiLockManager([], [], self.instances) + self.GL = locking.GanetiLockManager([], [], self.instances, []) self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL'])) self.assertEqual(self.GL._names(locking.LEVEL_NODE), set()) self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set()) self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(self.instances)) + locking.GanetiLockManager._instance = None + self.GL = locking.GanetiLockManager([], [], [], self.networks) + self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL'])) + self.assertEqual(self.GL._names(locking.LEVEL_NODE), set()) + self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set()) + self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set()) + self.assertEqual(self.GL._names(locking.LEVEL_NETWORK), + set(self.networks)) + def testAcquireRelease(self): self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) - self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL'])) + self.assertEquals(self.GL.list_owned(locking.LEVEL_CLUSTER), set(['BGL'])) self.GL.acquire(locking.LEVEL_INSTANCE, ['i1']) self.GL.acquire(locking.LEVEL_NODEGROUP, ['g2']) self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1) + self.assertTrue(self.GL.check_owned(locking.LEVEL_NODE, ["n1", "n2"], + shared=1)) + self.assertFalse(self.GL.check_owned(locking.LEVEL_INSTANCE, ["i1", "i3"])) self.GL.release(locking.LEVEL_NODE, ['n2']) - self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1'])) - self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2'])) - self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1'])) + self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set(['n1'])) + self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(['g2'])) + self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(['i1'])) self.GL.release(locking.LEVEL_NODE) - self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set()) - self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2'])) - self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1'])) + self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set()) + self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(['g2'])) + self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(['i1'])) self.GL.release(locking.LEVEL_NODEGROUP) self.GL.release(locking.LEVEL_INSTANCE) self.assertRaises(errors.LockError, self.GL.acquire, locking.LEVEL_INSTANCE, ['i5']) self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1) - self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3'])) + self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(['i3'])) def testAcquireWholeSets(self): self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None), set(self.instances)) - self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), + self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(self.instances)) self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None), set(self.nodegroups)) - self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), + self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(self.nodegroups)) self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1), set(self.nodes)) - self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), + self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set(self.nodes)) self.GL.release(locking.LEVEL_NODE) self.GL.release(locking.LEVEL_NODEGROUP) @@ -1527,11 +1888,11 @@ class TestGanetiLockManager(_ThreadedTestCase): self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1) self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None), set(self.instances)) - self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), + self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(self.instances)) self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1), set(['n2'])) - self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), + self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set(['n2'])) self.GL.release(locking.LEVEL_NODE) self.GL.release(locking.LEVEL_INSTANCE) @@ -1989,6 +2350,88 @@ class TestLockMonitor(_ThreadedTestCase): result = self.lm.QueryLocks(["name", "mode", "owner"]) self.assertEqual(objects.QueryResponse.FromDict(result).data, []) + class _FakeLock: + def __init__(self): + self._info = [] + + def AddResult(self, *args): + self._info.append(args) + + def CountPending(self): + return len(self._info) + + def GetLockInfo(self, requested): + (exp_requested, result) = self._info.pop(0) + + if exp_requested != requested: + raise Exception("Requested information (%s) does not match" + " expectations (%s)" % (requested, exp_requested)) -if __name__ == '__main__': + return result + + def testMultipleResults(self): + fl1 = self._FakeLock() + fl2 = self._FakeLock() + + self.lm.RegisterLock(fl1) + self.lm.RegisterLock(fl2) + + # Empty information + for i in [fl1, fl2]: + i.AddResult(set([query.LQ_MODE, query.LQ_OWNER]), []) + result = self.lm.QueryLocks(["name", "mode", "owner"]) + self.assertEqual(objects.QueryResponse.FromDict(result).data, []) + for i in [fl1, fl2]: + self.assertEqual(i.CountPending(), 0) + + # Check ordering + for fn in [lambda x: x, reversed, sorted]: + fl1.AddResult(set(), list(fn([ + ("aaa", None, None, None), + ("bbb", None, None, None), + ]))) + fl2.AddResult(set(), []) + result = self.lm.QueryLocks(["name"]) + self.assertEqual(objects.QueryResponse.FromDict(result).data, [ + [(constants.RS_NORMAL, "aaa")], + [(constants.RS_NORMAL, "bbb")], + ]) + for i in [fl1, fl2]: + self.assertEqual(i.CountPending(), 0) + + for fn2 in [lambda x: x, reversed, sorted]: + fl1.AddResult(set([query.LQ_MODE]), list(fn([ + # Same name, but different information + ("aaa", "mode0", None, None), + ("aaa", "mode1", None, None), + ("aaa", "mode2", None, None), + ("aaa", "mode3", None, None), + ]))) + fl2.AddResult(set([query.LQ_MODE]), [ + ("zzz", "end", None, None), + ("000", "start", None, None), + ] + list(fn2([ + ("aaa", "b200", None, None), + ("aaa", "b300", None, None), + ]))) + result = self.lm.QueryLocks(["name", "mode"]) + self.assertEqual(objects.QueryResponse.FromDict(result).data, [ + [(constants.RS_NORMAL, "000"), (constants.RS_NORMAL, "start")], + ] + list(fn([ + # Name is the same, so order must be equal to incoming order + [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode0")], + [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode1")], + [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode2")], + [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode3")], + ])) + list(fn2([ + [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b200")], + [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b300")], + ])) + [ + [(constants.RS_NORMAL, "zzz"), (constants.RS_NORMAL, "end")], + ]) + for i in [fl1, fl2]: + self.assertEqual(i.CountPending(), 0) + + +if __name__ == "__main__": testutils.GanetiTestProgram()