self.assertRaises(errors.LockError, self.sl.delete)
def testDeleteTimeout(self):
- self.sl.delete(timeout=60)
+ self.assertTrue(self.sl.delete(timeout=60))
+
+ def testDeleteTimeoutFail(self):
+ ready = threading.Event()
+ finish = threading.Event()
+
+ def fn():
+ self.sl.acquire(shared=0)
+ ready.set()
+
+ finish.wait()
+ self.sl.release()
+
+ self._addThread(target=fn)
+ ready.wait()
+
+ # Test if deleting a lock owned in exclusive mode by another thread fails
+ # to delete when a timeout is used
+ self.assertFalse(self.sl.delete(timeout=0.02))
+
+ finish.set()
+ self._waitThreads()
+
+ self.assertTrue(self.sl.delete())
+ self.assertRaises(errors.LockError, self.sl.acquire)
def testNoDeleteIfSharer(self):
self.sl.acquire(shared=1)
for i in sorted(perprio.keys())]
for (shared, _, threads) in acquires])
+ class _FakeTimeForSpuriousNotifications:
+ def __init__(self, now, check_end):
+ self.now = now
+ self.check_end = check_end
+
+ # Deterministic random number generator
+ self.rnd = random.Random(15086)
+
+ def time(self):
+ # Advance time if the random number generator thinks so (this is to test
+ # multiple notifications without advancing the time)
+ if self.rnd.random() < 0.3:
+ self.now += self.rnd.random()
+
+ self.check_end(self.now)
+
+ return self.now
+
+ @_Repeat
+ def testAcquireTimeoutWithSpuriousNotifications(self):
+ ready = threading.Event()
+ locked = threading.Event()
+ req = Queue.Queue(0)
+
+ epoch = 4000.0
+ timeout = 60.0
+
+ def check_end(now):
+ self.assertFalse(locked.isSet())
+
+ # If we waited long enough (in virtual time), tell main thread to release
+ # lock, otherwise tell it to notify once more
+ req.put(now < (epoch + (timeout * 0.8)))
+
+ time_fn = self._FakeTimeForSpuriousNotifications(epoch, check_end).time
+
+ sl = locking.SharedLock("test", _time_fn=time_fn)
+
+ # Acquire in exclusive mode
+ sl.acquire(shared=0)
+
+ def fn():
+ self.assertTrue(sl.acquire(shared=0, timeout=timeout,
+ test_notify=ready.set))
+ locked.set()
+ sl.release()
+ self.done.put("success")
+
+ # Start acquire with timeout and wait for it to be ready
+ self._addThread(target=fn)
+ ready.wait()
+
+ # The separate thread is now waiting to acquire the lock, so start sending
+ # spurious notifications.
+
+ # Wait for separate thread to ask for another notification
+ count = 0
+ while req.get():
+ # After sending the notification, the lock will take a short amount of
+ # time to notice and to retrieve the current time
+ sl._notify_topmost()
+ count += 1
+
+ self.assertTrue(count > 100, "Not enough notifications were sent")
+
+ self.assertFalse(locked.isSet())
+
+ # Some notifications have been sent, now actually release the lock
+ sl.release()
+
+ # Wait for lock to be acquired
+ locked.wait()
+
+ self._waitThreads()
+
+ self.assertEqual(self.done.get_nowait(), "success")
+ self.assertRaises(Queue.Empty, self.done.get_nowait)
+
class TestSharedLockInCondition(_ThreadedTestCase):
"""SharedLock as a condition lock tests"""
newls = locking.LockSet([], "TestLockSet.testResources")
self.assertEquals(newls._names(), set())
+ def testCheckOwnedUnknown(self):
+ self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one"))
+ for shared in [-1, 0, 1, 6378, 24255]:
+ self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one",
+ shared=shared))
+
+ def testCheckOwnedUnknownWhileHolding(self):
+ self.assertFalse(self.ls.check_owned([]))
+ self.ls.acquire("one", shared=1)
+ self.assertRaises(errors.LockError, self.ls.check_owned, "nonexist")
+ self.assertTrue(self.ls.check_owned("one", shared=1))
+ self.assertFalse(self.ls.check_owned("one", shared=0))
+ self.assertFalse(self.ls.check_owned(["one", "two"]))
+ self.assertRaises(errors.LockError, self.ls.check_owned,
+ ["one", "nonexist"])
+ self.assertRaises(errors.LockError, self.ls.check_owned, "")
+ self.ls.release()
+ self.assertFalse(self.ls.check_owned([]))
+ self.assertFalse(self.ls.check_owned("one"))
+
def testAcquireRelease(self):
+ self.assertFalse(self.ls.check_owned(self.ls._names()))
self.assert_(self.ls.acquire('one'))
self.assertEquals(self.ls.list_owned(), set(['one']))
+ self.assertTrue(self.ls.check_owned("one"))
+ self.assertTrue(self.ls.check_owned("one", shared=0))
+ self.assertFalse(self.ls.check_owned("one", shared=1))
self.ls.release()
self.assertEquals(self.ls.list_owned(), set())
+ self.assertFalse(self.ls.check_owned(self.ls._names()))
self.assertEquals(self.ls.acquire(['one']), set(['one']))
self.assertEquals(self.ls.list_owned(), set(['one']))
self.ls.release()
self.assertEquals(self.ls.list_owned(), set())
self.ls.acquire(['one', 'two', 'three'])
self.assertEquals(self.ls.list_owned(), set(['one', 'two', 'three']))
+ self.assertTrue(self.ls.check_owned(self.ls._names()))
+ self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0))
+ self.assertFalse(self.ls.check_owned(self.ls._names(), shared=1))
self.ls.release('one')
+ self.assertFalse(self.ls.check_owned(["one"]))
+ self.assertTrue(self.ls.check_owned(["two", "three"]))
+ self.assertTrue(self.ls.check_owned(["two", "three"], shared=0))
+ self.assertFalse(self.ls.check_owned(["two", "three"], shared=1))
self.assertEquals(self.ls.list_owned(), set(['two', 'three']))
self.ls.release(['three'])
self.assertEquals(self.ls.list_owned(), set(['two']))
self.assertEquals(self.ls.list_owned(), set(['one', 'three']))
self.ls.release()
self.assertEquals(self.ls.list_owned(), set())
+ for name in self.ls._names():
+ self.assertFalse(self.ls.check_owned(name))
def testNoDoubleAcquire(self):
self.ls.acquire('one')
self.assertFalse(compat.any(i.is_owned()
for i in self.ls._get_lockdict().values()))
+ self.assertFalse(self.ls.check_owned(self.ls._names()))
+ for name in self.ls._names():
+ self.assertFalse(self.ls.check_owned(name))
self.assertEquals(self.ls.acquire(None, shared=0),
set(["one", "two", "three"]))
self.assertRaises(AssertionError, self.ls.downgrade, "unknown lock")
+ self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0))
+ for name in self.ls._names():
+ self.assertTrue(self.ls.check_owned(name))
+ self.assertTrue(self.ls.check_owned(name, shared=0))
+ self.assertFalse(self.ls.check_owned(name, shared=1))
+
self.assertTrue(self.ls._get_lock().is_owned(shared=0))
self.assertTrue(compat.all(i.is_owned(shared=0)
for i in self.ls._get_lockdict().values()))
for name, lock in
self.ls._get_lockdict().items()))
+ self.assertFalse(self.ls.check_owned("one", shared=0))
+ self.assertTrue(self.ls.check_owned("one", shared=1))
+ self.assertTrue(self.ls.check_owned("two", shared=0))
+ self.assertTrue(self.ls.check_owned("three", shared=0))
+
+ # Downgrade second lock
self.assertTrue(self.ls.downgrade(names="two"))
self.assertTrue(self.ls._get_lock().is_owned(shared=0))
should_share = lambda name: [0, 1][int(name in ("one", "two"))]
for name, lock in
self.ls._get_lockdict().items()))
+ self.assertFalse(self.ls.check_owned("one", shared=0))
+ self.assertTrue(self.ls.check_owned("one", shared=1))
+ self.assertFalse(self.ls.check_owned("two", shared=0))
+ self.assertTrue(self.ls.check_owned("two", shared=1))
+ self.assertTrue(self.ls.check_owned("three", shared=0))
+
# Downgrading the last exclusive lock to shared must downgrade the
# lockset-internal lock too
self.assertTrue(self.ls.downgrade(names="three"))
self.assertTrue(compat.all(i.is_owned(shared=1)
for i in self.ls._get_lockdict().values()))
+ # Verify owned locks
+ for name in self.ls._names():
+ self.assertTrue(self.ls.check_owned(name, shared=1))
+
# Downgrading a shared lock must be a no-op
self.assertTrue(self.ls.downgrade(names=["one", "three"]))
self.assertTrue(self.ls._get_lock().is_owned(shared=1))
self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
self.GL.acquire(locking.LEVEL_NODEGROUP, ['g2'])
self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
+ self.assertTrue(self.GL.check_owned(locking.LEVEL_NODE, ["n1", "n2"],
+ shared=1))
+ self.assertFalse(self.GL.check_owned(locking.LEVEL_INSTANCE, ["i1", "i3"]))
self.GL.release(locking.LEVEL_NODE, ['n2'])
self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set(['n1']))
self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(['g2']))