Revision 5aab242c test/ganeti.locking_unittest.py
b/test/ganeti.locking_unittest.py | ||
---|---|---|
48 | 48 |
return wrapper |
49 | 49 |
|
50 | 50 |
|
51 |
def SafeSleep(duration): |
|
52 |
start = time.time() |
|
53 |
while True: |
|
54 |
delay = start + duration - time.time() |
|
55 |
if delay <= 0.0: |
|
56 |
break |
|
57 |
time.sleep(delay) |
|
58 |
|
|
59 |
|
|
51 | 60 |
class _ThreadedTestCase(unittest.TestCase): |
52 | 61 |
"""Test class that supports adding/waiting on threads""" |
53 | 62 |
def setUp(self): |
... | ... | |
966 | 975 |
self.failUnlessEqual(self.done.get_nowait(), 'DONE') |
967 | 976 |
|
968 | 977 |
@_Repeat |
978 |
def testSimpleAcquireTimeoutExpiring(self): |
|
979 |
names = sorted(self.ls._names()) |
|
980 |
self.assert_(len(names) >= 3) |
|
981 |
|
|
982 |
# Get name of first lock |
|
983 |
first = names[0] |
|
984 |
|
|
985 |
# Get name of last lock |
|
986 |
last = names.pop() |
|
987 |
|
|
988 |
checks = [ |
|
989 |
# Block first and try to lock it again |
|
990 |
(first, first), |
|
991 |
|
|
992 |
# Block last and try to lock all locks |
|
993 |
(None, first), |
|
994 |
|
|
995 |
# Block last and try to lock it again |
|
996 |
(last, last), |
|
997 |
] |
|
998 |
|
|
999 |
for (wanted, block) in checks: |
|
1000 |
# Lock in exclusive mode |
|
1001 |
self.assert_(self.ls.acquire(block, shared=0)) |
|
1002 |
|
|
1003 |
def _AcquireOne(): |
|
1004 |
# Try to get the same lock again with a timeout (should never succeed) |
|
1005 |
if self.ls.acquire(wanted, timeout=0.1, shared=0): |
|
1006 |
self.done.put("acquired") |
|
1007 |
self.ls.release() |
|
1008 |
else: |
|
1009 |
self.assert_(not self.ls._list_owned()) |
|
1010 |
self.assert_(not self.ls._is_owned()) |
|
1011 |
self.done.put("not acquired") |
|
1012 |
|
|
1013 |
self._addThread(target=_AcquireOne) |
|
1014 |
|
|
1015 |
# Wait for timeout in thread to expire |
|
1016 |
self._waitThreads() |
|
1017 |
|
|
1018 |
# Release exclusive lock again |
|
1019 |
self.ls.release() |
|
1020 |
|
|
1021 |
self.assertEqual(self.done.get_nowait(), "not acquired") |
|
1022 |
self.assertRaises(Queue.Empty, self.done.get_nowait) |
|
1023 |
|
|
1024 |
@_Repeat |
|
1025 |
def testDelayedAndExpiringLockAcquire(self): |
|
1026 |
self._setUpLS() |
|
1027 |
self.ls.add(['five', 'six', 'seven', 'eight', 'nine']) |
|
1028 |
|
|
1029 |
for expire in (False, True): |
|
1030 |
names = sorted(self.ls._names()) |
|
1031 |
self.assertEqual(len(names), 8) |
|
1032 |
|
|
1033 |
lock_ev = dict([(i, threading.Event()) for i in names]) |
|
1034 |
|
|
1035 |
# Lock all in exclusive mode |
|
1036 |
self.assert_(self.ls.acquire(names, shared=0)) |
|
1037 |
|
|
1038 |
if expire: |
|
1039 |
# We'll wait at least 300ms per lock |
|
1040 |
lockwait = len(names) * [0.3] |
|
1041 |
|
|
1042 |
# Fail if we can't acquire all locks in 400ms. There are 8 locks, so |
|
1043 |
# this gives us up to 2.4s to fail. |
|
1044 |
lockall_timeout = 0.4 |
|
1045 |
else: |
|
1046 |
# This should finish rather quickly |
|
1047 |
lockwait = None |
|
1048 |
lockall_timeout = len(names) * 5.0 |
|
1049 |
|
|
1050 |
def _LockAll(): |
|
1051 |
def acquire_notification(name): |
|
1052 |
if not expire: |
|
1053 |
self.done.put("getting %s" % name) |
|
1054 |
|
|
1055 |
# Kick next lock |
|
1056 |
lock_ev[name].set() |
|
1057 |
|
|
1058 |
if self.ls.acquire(names, shared=0, timeout=lockall_timeout, |
|
1059 |
test_notify=acquire_notification): |
|
1060 |
self.done.put("got all") |
|
1061 |
self.ls.release() |
|
1062 |
else: |
|
1063 |
self.done.put("timeout on all") |
|
1064 |
|
|
1065 |
# Notify all locks |
|
1066 |
for ev in lock_ev.values(): |
|
1067 |
ev.set() |
|
1068 |
|
|
1069 |
t = self._addThread(target=_LockAll) |
|
1070 |
|
|
1071 |
for idx, name in enumerate(names): |
|
1072 |
# Wait for actual acquire on this lock to start |
|
1073 |
lock_ev[name].wait(10.0) |
|
1074 |
|
|
1075 |
if expire and t.isAlive(): |
|
1076 |
# Wait some time after getting the notification to make sure the lock |
|
1077 |
# acquire will expire |
|
1078 |
SafeSleep(lockwait[idx]) |
|
1079 |
|
|
1080 |
self.ls.release(names=name) |
|
1081 |
|
|
1082 |
self.assert_(not self.ls._list_owned()) |
|
1083 |
|
|
1084 |
self._waitThreads() |
|
1085 |
|
|
1086 |
if expire: |
|
1087 |
# Not checking which locks were actually acquired. Doing so would be |
|
1088 |
# too timing-dependant. |
|
1089 |
self.assertEqual(self.done.get_nowait(), "timeout on all") |
|
1090 |
else: |
|
1091 |
for i in names: |
|
1092 |
self.assertEqual(self.done.get_nowait(), "getting %s" % i) |
|
1093 |
self.assertEqual(self.done.get_nowait(), "got all") |
|
1094 |
self.assertRaises(Queue.Empty, self.done.get_nowait) |
|
1095 |
|
|
1096 |
@_Repeat |
|
969 | 1097 |
def testConcurrentRemove(self): |
970 | 1098 |
self.ls.add('four') |
971 | 1099 |
self.ls.acquire(['one', 'two', 'four']) |
Also available in: Unified diff