self._testNotification()
def _TestWait(self, fn):
- self._addThread(target=fn)
- self._addThread(target=fn)
- self._addThread(target=fn)
+ threads = [
+ self._addThread(target=fn),
+ self._addThread(target=fn),
+ self._addThread(target=fn),
+ ]
# Wait for threads to be waiting
- self.assertEqual(self.done.get(True, 1), "A")
- self.assertEqual(self.done.get(True, 1), "A")
- self.assertEqual(self.done.get(True, 1), "A")
+ for _ in threads:
+ self.assertEqual(self.done.get(True, 1), "A")
self.assertRaises(Queue.Empty, self.done.get_nowait)
self.cond.acquire()
- self.assertEqual(self.cond._nwaiters, 3)
- # This new thread can"t acquire the lock, and thus call wait, before we
+ self.assertEqual(len(self.cond._waiters), 3)
+ self.assertEqual(self.cond._waiters, set(threads))
+ # This new thread can't acquire the lock, and thus call wait, before we
# release it
self._addThread(target=fn)
self.cond.notifyAll()
self.assertEqual(len(self.lm._locks), len(locks))
+ self.assertEqual(len(self.lm.QueryLocks(["name"], False)),
+ 100)
+
# Delete all locks
del locks[:]
self.assertRaises(NotImplementedError, self.lm.QueryLocks, ["name"], True)
# Check query result
- self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
- [[name, None, None] for name in utils.NiceSort(expnames)])
+ self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", "pending"],
+ False),
+ [[name, None, None, []]
+ for name in utils.NiceSort(expnames)])
# Test exclusive acquire
for tlock in locks[::4]:
try:
def _GetExpResult(name):
if tlock.name == name:
- return [name, "exclusive", [threading.currentThread().getName()]]
- return [name, None, None]
+ return [name, "exclusive", [threading.currentThread().getName()],
+ []]
+ return [name, None, None, []]
- self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
+ self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner",
+ "pending"], False),
[_GetExpResult(name)
for name in utils.NiceSort(expnames)])
finally:
[[lock.name, "deleted", None]])
self.assertEqual(len(self.lm._locks), 1)
+ def testPending(self):
+ def _Acquire(lock, shared, prev, next):
+ prev.wait()
+
+ lock.acquire(shared=shared, test_notify=next.set)
+ try:
+ pass
+ finally:
+ lock.release()
+
+ lock = locking.SharedLock("ExcLock", monitor=self.lm)
+
+ for shared in [0, 1]:
+ lock.acquire()
+ try:
+ self.assertEqual(len(self.lm._locks), 1)
+ self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
+ [[lock.name, "exclusive",
+ [threading.currentThread().getName()]]])
+
+ threads = []
+
+ first = threading.Event()
+ prev = first
+
+ for i in range(5):
+ ev = threading.Event()
+ threads.append(self._addThread(target=_Acquire,
+ args=(lock, shared, prev, ev)))
+ prev = ev
+
+ # Start acquires
+ first.set()
+
+ # Wait for last acquire to start waiting
+ prev.wait()
+
+ # NOTE: This works only because QueryLocks will acquire the
+ # lock-internal lock again and won't be able to get the information
+ # until it has the lock. By then the acquire should be registered in
+ # SharedLock.__pending (otherwise it's a bug).
+
+ # All acquires are waiting now
+ if shared:
+ pending = [("shared", sorted([t.getName() for t in threads]))]
+ else:
+ pending = [("exclusive", [t.getName()]) for t in threads]
+
+ self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner",
+ "pending"], False),
+ [[lock.name, "exclusive",
+ [threading.currentThread().getName()],
+ pending]])
+
+ self.assertEqual(len(self.lm._locks), 1)
+ finally:
+ lock.release()
+
+ self._waitThreads()
+
+ # No pending acquires
+ self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", "pending"],
+ False),
+ [[lock.name, None, None, []]])
+
+ self.assertEqual(len(self.lm._locks), 1)
+
if __name__ == '__main__':
testutils.GanetiTestProgram()