Revision 84e344d4
b/lib/locking.py | ||
---|---|---|
20 | 20 |
|
21 | 21 |
"""Module implementing the Ganeti locking code.""" |
22 | 22 |
|
23 |
# pylint: disable-msg=W0613,W0201 |
|
24 |
|
|
25 | 23 |
import threading |
26 |
# Wouldn't it be better to define LockingError in the locking module? |
|
27 |
# Well, for now that's how the rest of the code does it... |
|
24 |
|
|
28 | 25 |
from ganeti import errors |
29 | 26 |
from ganeti import utils |
30 | 27 |
|
... | ... | |
48 | 45 |
return wrap |
49 | 46 |
|
50 | 47 |
|
51 |
class SharedLock: |
|
48 |
class _CountingCondition(object): |
|
49 |
"""Wrapper for Python's built-in threading.Condition class. |
|
50 |
|
|
51 |
This wrapper keeps a count of active waiters. We can't access the internal |
|
52 |
"__waiters" attribute of threading.Condition because it's not thread-safe. |
|
53 |
|
|
54 |
""" |
|
55 |
__slots__ = [ |
|
56 |
"_cond", |
|
57 |
"_nwaiters", |
|
58 |
] |
|
59 |
|
|
60 |
def __init__(self, lock): |
|
61 |
"""Initializes this class. |
|
62 |
|
|
63 |
""" |
|
64 |
object.__init__(self) |
|
65 |
self._cond = threading.Condition(lock=lock) |
|
66 |
self._nwaiters = 0 |
|
67 |
|
|
68 |
def notifyAll(self): |
|
69 |
"""Notifies the condition. |
|
70 |
|
|
71 |
""" |
|
72 |
return self._cond.notifyAll() |
|
73 |
|
|
74 |
def wait(self, timeout=None): |
|
75 |
"""Waits for the condition to be notified. |
|
76 |
|
|
77 |
@type timeout: float or None |
|
78 |
@param timeout: Timeout in seconds |
|
79 |
|
|
80 |
""" |
|
81 |
assert self._nwaiters >= 0 |
|
82 |
|
|
83 |
self._nwaiters += 1 |
|
84 |
try: |
|
85 |
return self._cond.wait(timeout=timeout) |
|
86 |
finally: |
|
87 |
self._nwaiters -= 1 |
|
88 |
|
|
89 |
def has_waiting(self): |
|
90 |
"""Returns whether there are active waiters. |
|
91 |
|
|
92 |
""" |
|
93 |
return bool(self._nwaiters) |
|
94 |
|
|
95 |
|
|
96 |
class SharedLock(object): |
|
52 | 97 |
"""Implements a shared lock. |
53 | 98 |
|
54 | 99 |
Multiple threads can acquire the lock in a shared way, calling |
... | ... | |
60 | 105 |
eventually do so. |
61 | 106 |
|
62 | 107 |
""" |
108 |
__slots__ = [ |
|
109 |
"__active_shr_c", |
|
110 |
"__inactive_shr_c", |
|
111 |
"__deleted", |
|
112 |
"__exc", |
|
113 |
"__lock", |
|
114 |
"__pending", |
|
115 |
"__shr", |
|
116 |
] |
|
117 |
|
|
118 |
__condition_class = _CountingCondition |
|
119 |
|
|
63 | 120 |
def __init__(self): |
64 |
"""Construct a new SharedLock""" |
|
65 |
# we have two conditions, c_shr and c_exc, sharing the same lock. |
|
121 |
"""Construct a new SharedLock. |
|
122 |
|
|
123 |
""" |
|
124 |
object.__init__(self) |
|
125 |
|
|
126 |
# Internal lock |
|
66 | 127 |
self.__lock = threading.Lock() |
67 |
self.__turn_shr = threading.Condition(self.__lock) |
|
68 |
self.__turn_exc = threading.Condition(self.__lock) |
|
69 | 128 |
|
70 |
# current lock holders |
|
129 |
# Queue containing waiting acquires |
|
130 |
self.__pending = [] |
|
131 |
|
|
132 |
# Active and inactive conditions for shared locks |
|
133 |
self.__active_shr_c = self.__condition_class(self.__lock) |
|
134 |
self.__inactive_shr_c = self.__condition_class(self.__lock) |
|
135 |
|
|
136 |
# Current lock holders |
|
71 | 137 |
self.__shr = set() |
72 | 138 |
self.__exc = None |
73 | 139 |
|
74 |
# lock waiters |
|
75 |
self.__nwait_exc = 0 |
|
76 |
self.__nwait_shr = 0 |
|
77 |
self.__npass_shr = 0 |
|
78 |
|
|
79 | 140 |
# is this lock in the deleted state? |
80 | 141 |
self.__deleted = False |
81 | 142 |
|
143 |
def __check_deleted(self): |
|
144 |
"""Raises an exception if the lock has been deleted. |
|
145 |
|
|
146 |
""" |
|
147 |
if self.__deleted: |
|
148 |
raise errors.LockError("Deleted lock") |
|
149 |
|
|
82 | 150 |
def __is_sharer(self): |
83 |
"""Is the current thread sharing the lock at this time?""" |
|
151 |
"""Is the current thread sharing the lock at this time? |
|
152 |
|
|
153 |
""" |
|
84 | 154 |
return threading.currentThread() in self.__shr |
85 | 155 |
|
86 | 156 |
def __is_exclusive(self): |
87 |
"""Is the current thread holding the lock exclusively at this time?""" |
|
157 |
"""Is the current thread holding the lock exclusively at this time? |
|
158 |
|
|
159 |
""" |
|
88 | 160 |
return threading.currentThread() == self.__exc |
89 | 161 |
|
90 | 162 |
def __is_owned(self, shared=-1): |
... | ... | |
112 | 184 |
""" |
113 | 185 |
self.__lock.acquire() |
114 | 186 |
try: |
115 |
result = self.__is_owned(shared=shared)
|
|
187 |
return self.__is_owned(shared=shared)
|
|
116 | 188 |
finally: |
117 | 189 |
self.__lock.release() |
118 | 190 |
|
119 |
return result |
|
120 |
|
|
121 |
def __wait(self, c): |
|
122 |
"""Wait on the given condition, and raise an exception if the current lock |
|
123 |
is declared deleted in the meantime. |
|
191 |
def _count_pending(self): |
|
192 |
"""Returns the number of pending acquires. |
|
124 | 193 |
|
125 |
@param c: the condition to wait on
|
|
194 |
@rtype: int
|
|
126 | 195 |
|
127 | 196 |
""" |
128 |
c.wait() |
|
129 |
if self.__deleted: |
|
130 |
raise errors.LockError('deleted lock') |
|
197 |
self.__lock.acquire() |
|
198 |
try: |
|
199 |
return len(self.__pending) |
|
200 |
finally: |
|
201 |
self.__lock.release() |
|
131 | 202 |
|
132 |
def __exclusive_acquire(self):
|
|
133 |
"""Acquire the lock exclusively.
|
|
203 |
def __do_acquire(self, shared):
|
|
204 |
"""Actually acquire the lock.
|
|
134 | 205 |
|
135 |
This is a private function that presumes you are already holding the |
|
136 |
internal lock. It's defined separately to avoid code duplication between |
|
137 |
acquire() and delete() |
|
206 |
""" |
|
207 |
if shared: |
|
208 |
self.__shr.add(threading.currentThread()) |
|
209 |
else: |
|
210 |
self.__exc = threading.currentThread() |
|
211 |
|
|
212 |
def __can_acquire(self, shared): |
|
213 |
"""Determine whether lock can be acquired. |
|
138 | 214 |
|
139 | 215 |
""" |
140 |
self.__nwait_exc += 1 |
|
141 |
try: |
|
142 |
# This is to save ourselves from a nasty race condition that could |
|
143 |
# theoretically make the sharers starve. |
|
144 |
if self.__nwait_shr > 0 or self.__nwait_exc > 1: |
|
145 |
self.__wait(self.__turn_exc) |
|
216 |
if shared: |
|
217 |
return self.__exc is None |
|
218 |
else: |
|
219 |
return len(self.__shr) == 0 and self.__exc is None |
|
146 | 220 |
|
147 |
while len(self.__shr) > 0 or self.__exc is not None:
|
|
148 |
self.__wait(self.__turn_exc)
|
|
221 |
def __is_on_top(self, cond):
|
|
222 |
"""Checks whether the passed condition is on top of the queue.
|
|
149 | 223 |
|
150 |
self.__exc = threading.currentThread() |
|
151 |
finally: |
|
152 |
self.__nwait_exc -= 1 |
|
224 |
The caller must make sure the queue isn't empty. |
|
153 | 225 |
|
154 |
assert self.__npass_shr == 0, "SharedLock: internal fairness violation" |
|
226 |
""" |
|
227 |
return self.__pending[0] == cond |
|
155 | 228 |
|
156 |
def __shared_acquire(self):
|
|
157 |
"""Acquire the lock in shared mode
|
|
229 |
def __acquire_unlocked(self, shared=0, timeout=None):
|
|
230 |
"""Acquire a shared lock.
|
|
158 | 231 |
|
159 |
This is a private function that presumes you are already holding the |
|
160 |
internal lock. |
|
232 |
@param shared: whether to acquire in shared mode; by default an |
|
233 |
exclusive lock will be acquired |
|
234 |
@param timeout: maximum waiting time before giving up |
|
161 | 235 |
|
162 | 236 |
""" |
163 |
self.__nwait_shr += 1 |
|
164 |
try: |
|
165 |
wait = False |
|
166 |
# If there is an exclusive holder waiting we have to wait. |
|
167 |
# We'll only do this once, though, when we start waiting for |
|
168 |
# the lock. Then we'll just wait while there are no |
|
169 |
# exclusive holders. |
|
170 |
if self.__nwait_exc > 0: |
|
171 |
# TODO: if !blocking... |
|
172 |
wait = True |
|
173 |
self.__wait(self.__turn_shr) |
|
174 |
|
|
175 |
while self.__exc is not None: |
|
176 |
wait = True |
|
177 |
# TODO: if !blocking... |
|
178 |
self.__wait(self.__turn_shr) |
|
237 |
self.__check_deleted() |
|
179 | 238 |
|
180 |
self.__shr.add(threading.currentThread()) |
|
239 |
# We cannot acquire the lock if we already have it |
|
240 |
assert not self.__is_owned(), "double acquire() on a non-recursive lock" |
|
241 |
|
|
242 |
# Check whether someone else holds the lock or there are pending acquires. |
|
243 |
if not self.__pending and self.__can_acquire(shared): |
|
244 |
# Apparently not, can acquire lock directly. |
|
245 |
self.__do_acquire(shared) |
|
246 |
return True |
|
181 | 247 |
|
182 |
# If we were waiting note that we passed |
|
183 |
if wait: |
|
184 |
self.__npass_shr -= 1 |
|
248 |
if shared: |
|
249 |
wait_condition = self.__active_shr_c |
|
185 | 250 |
|
251 |
# Check if we're not yet in the queue |
|
252 |
if wait_condition not in self.__pending: |
|
253 |
self.__pending.append(wait_condition) |
|
254 |
else: |
|
255 |
wait_condition = self.__condition_class(self.__lock) |
|
256 |
# Always add to queue |
|
257 |
self.__pending.append(wait_condition) |
|
258 |
|
|
259 |
try: |
|
260 |
# Wait until we become the topmost acquire in the queue or the timeout |
|
261 |
# expires. |
|
262 |
while not (self.__is_on_top(wait_condition) and |
|
263 |
self.__can_acquire(shared)): |
|
264 |
# Wait for notification |
|
265 |
wait_condition.wait(timeout) |
|
266 |
self.__check_deleted() |
|
267 |
|
|
268 |
# A lot of code assumes blocking acquires always succeed. Loop |
|
269 |
# internally for that case. |
|
270 |
if timeout is not None: |
|
271 |
break |
|
272 |
|
|
273 |
if self.__is_on_top(wait_condition) and self.__can_acquire(shared): |
|
274 |
self.__do_acquire(shared) |
|
275 |
return True |
|
186 | 276 |
finally: |
187 |
self.__nwait_shr -= 1 |
|
277 |
# Remove condition from queue if there are no more waiters |
|
278 |
if not wait_condition.has_waiting() and not self.__deleted: |
|
279 |
self.__pending.remove(wait_condition) |
|
188 | 280 |
|
189 |
assert self.__npass_shr >= 0, "Internal fairness condition weirdness"
|
|
281 |
return False
|
|
190 | 282 |
|
191 |
def acquire(self, blocking=1, shared=0):
|
|
283 |
def acquire(self, shared=0, timeout=None):
|
|
192 | 284 |
"""Acquire a shared lock. |
193 | 285 |
|
286 |
@type shared: int |
|
194 | 287 |
@param shared: whether to acquire in shared mode; by default an |
195 | 288 |
exclusive lock will be acquired |
196 |
@param blocking: whether to block while trying to acquire or to
|
|
197 |
operate in try-lock mode (this locking mode is not supported yet)
|
|
289 |
@type timeout: float
|
|
290 |
@param timeout: maximum waiting time before giving up
|
|
198 | 291 |
|
199 | 292 |
""" |
200 |
if not blocking: |
|
201 |
# We don't have non-blocking mode for now |
|
202 |
raise NotImplementedError |
|
203 |
|
|
204 | 293 |
self.__lock.acquire() |
205 | 294 |
try: |
206 |
if self.__deleted: |
|
207 |
raise errors.LockError('deleted lock') |
|
208 |
|
|
209 |
# We cannot acquire the lock if we already have it |
|
210 |
assert not self.__is_owned(), "double acquire() on a non-recursive lock" |
|
211 |
assert self.__npass_shr >= 0, "Internal fairness condition weirdness" |
|
212 |
|
|
213 |
if shared: |
|
214 |
self.__shared_acquire() |
|
215 |
else: |
|
216 |
# TODO: if !blocking... |
|
217 |
# (or modify __exclusive_acquire for non-blocking mode) |
|
218 |
self.__exclusive_acquire() |
|
219 |
|
|
295 |
return self.__acquire_unlocked(shared, timeout) |
|
220 | 296 |
finally: |
221 | 297 |
self.__lock.release() |
222 | 298 |
|
223 |
return True |
|
224 |
|
|
225 | 299 |
def release(self): |
226 | 300 |
"""Release a Shared Lock. |
227 | 301 |
|
... | ... | |
231 | 305 |
""" |
232 | 306 |
self.__lock.acquire() |
233 | 307 |
try: |
234 |
assert self.__npass_shr >= 0, "Internal fairness condition weirdness" |
|
308 |
assert self.__is_exclusive() or self.__is_sharer(), \ |
|
309 |
"Cannot release non-owned lock" |
|
310 |
|
|
235 | 311 |
# Autodetect release type |
236 | 312 |
if self.__is_exclusive(): |
237 | 313 |
self.__exc = None |
238 |
|
|
239 |
# An exclusive holder has just had the lock, time to put it in shared |
|
240 |
# mode if there are shared holders waiting. Otherwise wake up the next |
|
241 |
# exclusive holder. |
|
242 |
if self.__nwait_shr > 0: |
|
243 |
# Make sure at least the ones which were blocked pass. |
|
244 |
self.__npass_shr = self.__nwait_shr |
|
245 |
self.__turn_shr.notifyAll() |
|
246 |
elif self.__nwait_exc > 0: |
|
247 |
self.__turn_exc.notify() |
|
248 |
|
|
249 |
elif self.__is_sharer(): |
|
314 |
else: |
|
250 | 315 |
self.__shr.remove(threading.currentThread()) |
251 | 316 |
|
252 |
# If there are shared holders waiting (and not just scheduled to pass) |
|
253 |
# there *must* be an exclusive holder waiting as well; otherwise what |
|
254 |
# were they waiting for? |
|
255 |
assert (self.__nwait_exc > 0 or |
|
256 |
self.__npass_shr == self.__nwait_shr), \ |
|
257 |
"Lock sharers waiting while no exclusive is queueing" |
|
258 |
|
|
259 |
# If there are no more shared holders either in or scheduled to pass, |
|
260 |
# and some exclusive holders are waiting let's wake one up. |
|
261 |
if (len(self.__shr) == 0 and |
|
262 |
self.__nwait_exc > 0 and |
|
263 |
not self.__npass_shr > 0): |
|
264 |
self.__turn_exc.notify() |
|
317 |
# Notify topmost condition in queue |
|
318 |
if self.__pending: |
|
319 |
first_condition = self.__pending[0] |
|
320 |
first_condition.notifyAll() |
|
265 | 321 |
|
266 |
else: |
|
267 |
assert False, "Cannot release non-owned lock" |
|
322 |
if first_condition == self.__active_shr_c: |
|
323 |
self.__active_shr_c = self.__inactive_shr_c |
|
324 |
self.__inactive_shr_c = first_condition |
|
268 | 325 |
|
269 | 326 |
finally: |
270 | 327 |
self.__lock.release() |
271 | 328 |
|
272 |
def delete(self, blocking=1):
|
|
329 |
def delete(self, timeout=None):
|
|
273 | 330 |
"""Delete a Shared Lock. |
274 | 331 |
|
275 | 332 |
This operation will declare the lock for removal. First the lock will be |
276 | 333 |
acquired in exclusive mode if you don't already own it, then the lock |
277 | 334 |
will be put in a state where any future and pending acquire() fail. |
278 | 335 |
|
279 |
@param blocking: whether to block while trying to acquire or to |
|
280 |
operate in try-lock mode. this locking mode is not supported |
|
281 |
yet unless you are already holding exclusively the lock. |
|
336 |
@type timeout: float |
|
337 |
@param timeout: maximum waiting time before giving up |
|
282 | 338 |
|
283 | 339 |
""" |
284 | 340 |
self.__lock.acquire() |
285 | 341 |
try: |
286 |
assert not self.__is_sharer(), "cannot delete() a lock while sharing it" |
|
342 |
assert not self.__is_sharer(), "Cannot delete() a lock while sharing it" |
|
343 |
|
|
344 |
self.__check_deleted() |
|
287 | 345 |
|
288 |
if self.__deleted:
|
|
289 |
raise errors.LockError('deleted lock')
|
|
346 |
# The caller is allowed to hold the lock exclusively already.
|
|
347 |
acquired = self.__is_exclusive()
|
|
290 | 348 |
|
291 |
if not self.__is_exclusive(): |
|
292 |
if not blocking: |
|
293 |
# We don't have non-blocking mode for now |
|
294 |
raise NotImplementedError |
|
295 |
self.__exclusive_acquire() |
|
349 |
if not acquired: |
|
350 |
acquired = self.__acquire_unlocked(timeout) |
|
351 |
|
|
352 |
if acquired: |
|
353 |
self.__deleted = True |
|
354 |
self.__exc = None |
|
296 | 355 |
|
297 |
self.__deleted = True |
|
298 |
self.__exc = None |
|
299 |
# Wake up everybody, they will fail acquiring the lock and |
|
300 |
# raise an exception instead. |
|
301 |
self.__turn_exc.notifyAll() |
|
302 |
self.__turn_shr.notifyAll() |
|
356 |
# Notify all acquires. They'll throw an error. |
|
357 |
while self.__pending: |
|
358 |
self.__pending.pop().notifyAll() |
|
303 | 359 |
|
360 |
return acquired |
|
304 | 361 |
finally: |
305 | 362 |
self.__lock.release() |
306 | 363 |
|
b/test/ganeti.locking_unittest.py | ||
---|---|---|
26 | 26 |
import unittest |
27 | 27 |
import time |
28 | 28 |
import Queue |
29 |
import threading |
|
29 | 30 |
|
30 | 31 |
from ganeti import locking |
31 | 32 |
from ganeti import errors |
32 |
from threading import Thread |
|
33 | 33 |
|
34 | 34 |
|
35 | 35 |
# This is used to test the ssynchronize decorator. |
... | ... | |
39 | 39 |
#: List for looping tests |
40 | 40 |
ITERATIONS = range(8) |
41 | 41 |
|
42 |
|
|
42 | 43 |
def _Repeat(fn): |
43 | 44 |
"""Decorator for executing a function many times""" |
44 | 45 |
def wrapper(*args, **kwargs): |
... | ... | |
46 | 47 |
fn(*args, **kwargs) |
47 | 48 |
return wrapper |
48 | 49 |
|
50 |
|
|
49 | 51 |
class _ThreadedTestCase(unittest.TestCase): |
50 | 52 |
"""Test class that supports adding/waiting on threads""" |
51 | 53 |
def setUp(self): |
... | ... | |
54 | 56 |
|
55 | 57 |
def _addThread(self, *args, **kwargs): |
56 | 58 |
"""Create and remember a new thread""" |
57 |
t = Thread(*args, **kwargs) |
|
59 |
t = threading.Thread(*args, **kwargs)
|
|
58 | 60 |
self.threads.append(t) |
59 | 61 |
t.start() |
60 | 62 |
return t |
... | ... | |
147 | 149 |
|
148 | 150 |
def testSharersCanCoexist(self): |
149 | 151 |
self.sl.acquire(shared=1) |
150 |
Thread(target=self._doItSharer).start() |
|
152 |
threading.Thread(target=self._doItSharer).start()
|
|
151 | 153 |
self.assert_(self.done.get(True, 1)) |
152 | 154 |
self.sl.release() |
153 | 155 |
|
... | ... | |
234 | 236 |
self.assertEqual(self.done.get_nowait(), 'SHR') |
235 | 237 |
self.assertEqual(self.done.get_nowait(), 'EXC') |
236 | 238 |
|
237 |
def testNoNonBlocking(self): |
|
238 |
self.assertRaises(NotImplementedError, self.sl.acquire, blocking=0) |
|
239 |
self.assertRaises(NotImplementedError, self.sl.delete, blocking=0) |
|
240 |
self.sl.acquire() |
|
241 |
self.sl.delete(blocking=0) # Fine, because the lock is already acquired |
|
242 |
|
|
243 | 239 |
def testDelete(self): |
244 | 240 |
self.sl.delete() |
245 | 241 |
self.assertRaises(errors.LockError, self.sl.acquire) |
... | ... | |
280 | 276 |
self.assertEqual(self.done.get_nowait(), 'ERR') |
281 | 277 |
self.sl = locking.SharedLock() |
282 | 278 |
|
279 |
@_Repeat |
|
280 |
def testExclusiveAcquireTimeout(self): |
|
281 |
def _LockExclusive(wait): |
|
282 |
self.sl.acquire(shared=0) |
|
283 |
self.done.put("A: start sleep") |
|
284 |
time.sleep(wait) |
|
285 |
self.done.put("A: end sleep") |
|
286 |
self.sl.release() |
|
287 |
|
|
288 |
for shared in [0, 1]: |
|
289 |
# Start thread to hold lock for 20 ms |
|
290 |
self._addThread(target=_LockExclusive, args=(20.0 / 1000.0, )) |
|
291 |
|
|
292 |
# Wait up to 100 ms to get lock |
|
293 |
self.failUnless(self.sl.acquire(shared=shared, timeout=0.1)) |
|
294 |
self.done.put("got 2nd") |
|
295 |
self.sl.release() |
|
296 |
|
|
297 |
self._waitThreads() |
|
298 |
|
|
299 |
self.assertEqual(self.done.get_nowait(), "A: start sleep") |
|
300 |
self.assertEqual(self.done.get_nowait(), "A: end sleep") |
|
301 |
self.assertEqual(self.done.get_nowait(), "got 2nd") |
|
302 |
self.assertRaises(Queue.Empty, self.done.get_nowait) |
|
303 |
|
|
304 |
@_Repeat |
|
305 |
def testAcquireExpiringTimeout(self): |
|
306 |
def _AcquireWithTimeout(shared, timeout): |
|
307 |
if not self.sl.acquire(shared=shared, timeout=timeout): |
|
308 |
self.done.put("timeout") |
|
309 |
|
|
310 |
for shared in [0, 1]: |
|
311 |
# Lock exclusively |
|
312 |
self.sl.acquire() |
|
313 |
|
|
314 |
# Start shared acquires with timeout between 0 and 20 ms |
|
315 |
for i in xrange(11): |
|
316 |
self._addThread(target=_AcquireWithTimeout, |
|
317 |
args=(shared, i * 2.0 / 1000.0)) |
|
318 |
|
|
319 |
# Wait for threads to finish (makes sure the acquire timeout expires |
|
320 |
# before releasing the lock) |
|
321 |
self._waitThreads() |
|
322 |
|
|
323 |
# Release lock |
|
324 |
self.sl.release() |
|
325 |
|
|
326 |
for _ in xrange(11): |
|
327 |
self.assertEqual(self.done.get_nowait(), "timeout") |
|
328 |
|
|
329 |
self.assertRaises(Queue.Empty, self.done.get_nowait) |
|
330 |
|
|
331 |
@_Repeat |
|
332 |
def testSharedSkipExclusiveAcquires(self): |
|
333 |
# Tests whether shared acquires jump in front of exclusive acquires in the |
|
334 |
# queue. |
|
335 |
|
|
336 |
# Get exclusive lock while we fill the queue |
|
337 |
self.sl.acquire() |
|
338 |
|
|
339 |
def _Acquire(shared, name): |
|
340 |
if not self.sl.acquire(shared=shared): |
|
341 |
return |
|
342 |
|
|
343 |
self.done.put(name) |
|
344 |
self.sl.release() |
|
345 |
|
|
346 |
# Start shared acquires |
|
347 |
for _ in xrange(5): |
|
348 |
self._addThread(target=_Acquire, args=(1, "shared A")) |
|
349 |
|
|
350 |
# Start exclusive acquires |
|
351 |
for _ in xrange(3): |
|
352 |
self._addThread(target=_Acquire, args=(0, "exclusive B")) |
|
353 |
|
|
354 |
# More shared acquires |
|
355 |
for _ in xrange(5): |
|
356 |
self._addThread(target=_Acquire, args=(1, "shared C")) |
|
357 |
|
|
358 |
# More exclusive acquires |
|
359 |
for _ in xrange(3): |
|
360 |
self._addThread(target=_Acquire, args=(0, "exclusive D")) |
|
361 |
|
|
362 |
# Expect 6 pending exclusive acquires and 1 for all shared acquires |
|
363 |
# together |
|
364 |
self.assertEqual(self.sl._count_pending(), 7) |
|
365 |
|
|
366 |
# Release exclusive lock and wait |
|
367 |
self.sl.release() |
|
368 |
|
|
369 |
self._waitThreads() |
|
370 |
|
|
371 |
# Check sequence |
|
372 |
for _ in xrange(10): |
|
373 |
# Shared locks aren't guaranteed to be notified in order, but they'll be |
|
374 |
# first |
|
375 |
self.assert_(self.done.get_nowait() in ("shared A", "shared C")) |
|
376 |
|
|
377 |
for _ in xrange(3): |
|
378 |
self.assertEqual(self.done.get_nowait(), "exclusive B") |
|
379 |
|
|
380 |
for _ in xrange(3): |
|
381 |
self.assertEqual(self.done.get_nowait(), "exclusive D") |
|
382 |
|
|
383 |
self.assertRaises(Queue.Empty, self.done.get_nowait) |
|
384 |
|
|
385 |
@_Repeat |
|
386 |
def testMixedAcquireTimeout(self): |
|
387 |
sync = threading.Condition() |
|
388 |
|
|
389 |
def _AcquireShared(ev): |
|
390 |
if not self.sl.acquire(shared=1, timeout=None): |
|
391 |
return |
|
392 |
|
|
393 |
self.done.put("shared") |
|
394 |
|
|
395 |
# Notify main thread |
|
396 |
ev.set() |
|
397 |
|
|
398 |
# Wait for notification |
|
399 |
sync.acquire() |
|
400 |
try: |
|
401 |
sync.wait() |
|
402 |
finally: |
|
403 |
sync.release() |
|
404 |
|
|
405 |
# Release lock |
|
406 |
self.sl.release() |
|
407 |
|
|
408 |
acquires = [] |
|
409 |
for _ in xrange(3): |
|
410 |
ev = threading.Event() |
|
411 |
self._addThread(target=_AcquireShared, args=(ev, )) |
|
412 |
acquires.append(ev) |
|
413 |
|
|
414 |
# Wait for all acquires to finish |
|
415 |
for i in acquires: |
|
416 |
i.wait() |
|
417 |
|
|
418 |
self.assertEqual(self.sl._count_pending(), 0) |
|
419 |
|
|
420 |
# Try to get exclusive lock |
|
421 |
self.failIf(self.sl.acquire(shared=0, timeout=0.02)) |
|
422 |
|
|
423 |
# Acquire exclusive without timeout |
|
424 |
exclsync = threading.Condition() |
|
425 |
exclev = threading.Event() |
|
426 |
|
|
427 |
def _AcquireExclusive(): |
|
428 |
if not self.sl.acquire(shared=0): |
|
429 |
return |
|
430 |
|
|
431 |
self.done.put("exclusive") |
|
432 |
|
|
433 |
# Notify main thread |
|
434 |
exclev.set() |
|
435 |
|
|
436 |
exclsync.acquire() |
|
437 |
try: |
|
438 |
exclsync.wait() |
|
439 |
finally: |
|
440 |
exclsync.release() |
|
441 |
|
|
442 |
self.sl.release() |
|
443 |
|
|
444 |
self._addThread(target=_AcquireExclusive) |
|
445 |
|
|
446 |
# Try to get exclusive lock |
|
447 |
self.failIf(self.sl.acquire(shared=0, timeout=0.02)) |
|
448 |
|
|
449 |
# Make all shared holders release their locks |
|
450 |
sync.acquire() |
|
451 |
try: |
|
452 |
sync.notifyAll() |
|
453 |
finally: |
|
454 |
sync.release() |
|
455 |
|
|
456 |
# Wait for exclusive acquire to succeed |
|
457 |
exclev.wait() |
|
458 |
|
|
459 |
self.assertEqual(self.sl._count_pending(), 0) |
|
460 |
|
|
461 |
# Try to get exclusive lock |
|
462 |
self.failIf(self.sl.acquire(shared=0, timeout=0.02)) |
|
463 |
|
|
464 |
def _AcquireSharedSimple(): |
|
465 |
if self.sl.acquire(shared=1, timeout=None): |
|
466 |
self.done.put("shared2") |
|
467 |
self.sl.release() |
|
468 |
|
|
469 |
for _ in xrange(10): |
|
470 |
self._addThread(target=_AcquireSharedSimple) |
|
471 |
|
|
472 |
# Tell exclusive lock to release |
|
473 |
exclsync.acquire() |
|
474 |
try: |
|
475 |
exclsync.notifyAll() |
|
476 |
finally: |
|
477 |
exclsync.release() |
|
478 |
|
|
479 |
# Wait for everything to finish |
|
480 |
self._waitThreads() |
|
481 |
|
|
482 |
self.assertEqual(self.sl._count_pending(), 0) |
|
483 |
|
|
484 |
# Check sequence |
|
485 |
for _ in xrange(3): |
|
486 |
self.assertEqual(self.done.get_nowait(), "shared") |
|
487 |
|
|
488 |
self.assertEqual(self.done.get_nowait(), "exclusive") |
|
489 |
|
|
490 |
for _ in xrange(10): |
|
491 |
self.assertEqual(self.done.get_nowait(), "shared2") |
|
492 |
|
|
493 |
self.assertRaises(Queue.Empty, self.done.get_nowait) |
|
494 |
|
|
283 | 495 |
|
284 | 496 |
class TestSSynchronizedDecorator(_ThreadedTestCase): |
285 | 497 |
"""Shared Lock Synchronized decorator test""" |
... | ... | |
307 | 519 |
|
308 | 520 |
def testSharersCanCoexist(self): |
309 | 521 |
_decoratorlock.acquire(shared=1) |
310 |
Thread(target=self._doItSharer).start() |
|
522 |
threading.Thread(target=self._doItSharer).start()
|
|
311 | 523 |
self.assert_(self.done.get(True, 1)) |
312 | 524 |
_decoratorlock.release() |
313 | 525 |
|
... | ... | |
354 | 566 |
self.resources = ['one', 'two', 'three'] |
355 | 567 |
self.ls = locking.LockSet(members=self.resources) |
356 | 568 |
|
357 |
|
|
358 | 569 |
def testResources(self): |
359 | 570 |
self.assertEquals(self.ls._names(), set(self.resources)) |
360 | 571 |
newls = locking.LockSet() |
... | ... | |
489 | 700 |
# We haven't really acquired anything, so we cannot release |
490 | 701 |
self.assertRaises(AssertionError, self.ls.release) |
491 | 702 |
|
492 |
def _doLockSet(self, set, shared):
|
|
703 |
def _doLockSet(self, names, shared):
|
|
493 | 704 |
try: |
494 |
self.ls.acquire(set, shared=shared)
|
|
705 |
self.ls.acquire(names, shared=shared)
|
|
495 | 706 |
self.done.put('DONE') |
496 | 707 |
self.ls.release() |
497 | 708 |
except errors.LockError: |
498 | 709 |
self.done.put('ERR') |
499 | 710 |
|
500 |
def _doAddSet(self, set):
|
|
711 |
def _doAddSet(self, names):
|
|
501 | 712 |
try: |
502 |
self.ls.add(set, acquired=1)
|
|
713 |
self.ls.add(names, acquired=1)
|
|
503 | 714 |
self.done.put('DONE') |
504 | 715 |
self.ls.release() |
505 | 716 |
except errors.LockError: |
506 | 717 |
self.done.put('ERR') |
507 | 718 |
|
508 |
def _doRemoveSet(self, set):
|
|
509 |
self.done.put(self.ls.remove(set))
|
|
719 |
def _doRemoveSet(self, names):
|
|
720 |
self.done.put(self.ls.remove(names))
|
|
510 | 721 |
|
511 | 722 |
@_Repeat |
512 | 723 |
def testConcurrentSharedAcquire(self): |
... | ... | |
537 | 748 |
self._addThread(target=self._doLockSet, args=('three', 0)) |
538 | 749 |
self._waitThreads() |
539 | 750 |
self.assertEqual(self.done.get_nowait(), 'DONE') |
751 |
self.assertRaises(Queue.Empty, self.done.get_nowait) |
|
540 | 752 |
self._addThread(target=self._doLockSet, args=(['one', 'two'], 0)) |
541 | 753 |
self._addThread(target=self._doLockSet, args=(['one', 'two'], 1)) |
542 | 754 |
self._addThread(target=self._doLockSet, args=('one', 0)) |
Also available in: Unified diff