Revision 84e344d4 test/ganeti.locking_unittest.py
b/test/ganeti.locking_unittest.py | ||
---|---|---|
26 | 26 |
import unittest |
27 | 27 |
import time |
28 | 28 |
import Queue |
29 |
import threading |
|
29 | 30 |
|
30 | 31 |
from ganeti import locking |
31 | 32 |
from ganeti import errors |
32 |
from threading import Thread |
|
33 | 33 |
|
34 | 34 |
|
35 | 35 |
# This is used to test the ssynchronize decorator. |
... | ... | |
39 | 39 |
#: List for looping tests |
40 | 40 |
ITERATIONS = range(8) |
41 | 41 |
|
42 |
|
|
42 | 43 |
def _Repeat(fn): |
43 | 44 |
"""Decorator for executing a function many times""" |
44 | 45 |
def wrapper(*args, **kwargs): |
... | ... | |
46 | 47 |
fn(*args, **kwargs) |
47 | 48 |
return wrapper |
48 | 49 |
|
50 |
|
|
49 | 51 |
class _ThreadedTestCase(unittest.TestCase): |
50 | 52 |
"""Test class that supports adding/waiting on threads""" |
51 | 53 |
def setUp(self): |
... | ... | |
54 | 56 |
|
55 | 57 |
def _addThread(self, *args, **kwargs): |
56 | 58 |
"""Create and remember a new thread""" |
57 |
t = Thread(*args, **kwargs) |
|
59 |
t = threading.Thread(*args, **kwargs)
|
|
58 | 60 |
self.threads.append(t) |
59 | 61 |
t.start() |
60 | 62 |
return t |
... | ... | |
147 | 149 |
|
148 | 150 |
def testSharersCanCoexist(self): |
149 | 151 |
self.sl.acquire(shared=1) |
150 |
Thread(target=self._doItSharer).start() |
|
152 |
threading.Thread(target=self._doItSharer).start()
|
|
151 | 153 |
self.assert_(self.done.get(True, 1)) |
152 | 154 |
self.sl.release() |
153 | 155 |
|
... | ... | |
234 | 236 |
self.assertEqual(self.done.get_nowait(), 'SHR') |
235 | 237 |
self.assertEqual(self.done.get_nowait(), 'EXC') |
236 | 238 |
|
237 |
def testNoNonBlocking(self): |
|
238 |
self.assertRaises(NotImplementedError, self.sl.acquire, blocking=0) |
|
239 |
self.assertRaises(NotImplementedError, self.sl.delete, blocking=0) |
|
240 |
self.sl.acquire() |
|
241 |
self.sl.delete(blocking=0) # Fine, because the lock is already acquired |
|
242 |
|
|
243 | 239 |
def testDelete(self): |
244 | 240 |
self.sl.delete() |
245 | 241 |
self.assertRaises(errors.LockError, self.sl.acquire) |
... | ... | |
280 | 276 |
self.assertEqual(self.done.get_nowait(), 'ERR') |
281 | 277 |
self.sl = locking.SharedLock() |
282 | 278 |
|
279 |
@_Repeat |
|
280 |
def testExclusiveAcquireTimeout(self): |
|
281 |
def _LockExclusive(wait): |
|
282 |
self.sl.acquire(shared=0) |
|
283 |
self.done.put("A: start sleep") |
|
284 |
time.sleep(wait) |
|
285 |
self.done.put("A: end sleep") |
|
286 |
self.sl.release() |
|
287 |
|
|
288 |
for shared in [0, 1]: |
|
289 |
# Start thread to hold lock for 20 ms |
|
290 |
self._addThread(target=_LockExclusive, args=(20.0 / 1000.0, )) |
|
291 |
|
|
292 |
# Wait up to 100 ms to get lock |
|
293 |
self.failUnless(self.sl.acquire(shared=shared, timeout=0.1)) |
|
294 |
self.done.put("got 2nd") |
|
295 |
self.sl.release() |
|
296 |
|
|
297 |
self._waitThreads() |
|
298 |
|
|
299 |
self.assertEqual(self.done.get_nowait(), "A: start sleep") |
|
300 |
self.assertEqual(self.done.get_nowait(), "A: end sleep") |
|
301 |
self.assertEqual(self.done.get_nowait(), "got 2nd") |
|
302 |
self.assertRaises(Queue.Empty, self.done.get_nowait) |
|
303 |
|
|
304 |
@_Repeat |
|
305 |
def testAcquireExpiringTimeout(self): |
|
306 |
def _AcquireWithTimeout(shared, timeout): |
|
307 |
if not self.sl.acquire(shared=shared, timeout=timeout): |
|
308 |
self.done.put("timeout") |
|
309 |
|
|
310 |
for shared in [0, 1]: |
|
311 |
# Lock exclusively |
|
312 |
self.sl.acquire() |
|
313 |
|
|
314 |
# Start shared acquires with timeout between 0 and 20 ms |
|
315 |
for i in xrange(11): |
|
316 |
self._addThread(target=_AcquireWithTimeout, |
|
317 |
args=(shared, i * 2.0 / 1000.0)) |
|
318 |
|
|
319 |
# Wait for threads to finish (makes sure the acquire timeout expires |
|
320 |
# before releasing the lock) |
|
321 |
self._waitThreads() |
|
322 |
|
|
323 |
# Release lock |
|
324 |
self.sl.release() |
|
325 |
|
|
326 |
for _ in xrange(11): |
|
327 |
self.assertEqual(self.done.get_nowait(), "timeout") |
|
328 |
|
|
329 |
self.assertRaises(Queue.Empty, self.done.get_nowait) |
|
330 |
|
|
331 |
@_Repeat |
|
332 |
def testSharedSkipExclusiveAcquires(self): |
|
333 |
# Tests whether shared acquires jump in front of exclusive acquires in the |
|
334 |
# queue. |
|
335 |
|
|
336 |
# Get exclusive lock while we fill the queue |
|
337 |
self.sl.acquire() |
|
338 |
|
|
339 |
def _Acquire(shared, name): |
|
340 |
if not self.sl.acquire(shared=shared): |
|
341 |
return |
|
342 |
|
|
343 |
self.done.put(name) |
|
344 |
self.sl.release() |
|
345 |
|
|
346 |
# Start shared acquires |
|
347 |
for _ in xrange(5): |
|
348 |
self._addThread(target=_Acquire, args=(1, "shared A")) |
|
349 |
|
|
350 |
# Start exclusive acquires |
|
351 |
for _ in xrange(3): |
|
352 |
self._addThread(target=_Acquire, args=(0, "exclusive B")) |
|
353 |
|
|
354 |
# More shared acquires |
|
355 |
for _ in xrange(5): |
|
356 |
self._addThread(target=_Acquire, args=(1, "shared C")) |
|
357 |
|
|
358 |
# More exclusive acquires |
|
359 |
for _ in xrange(3): |
|
360 |
self._addThread(target=_Acquire, args=(0, "exclusive D")) |
|
361 |
|
|
362 |
# Expect 6 pending exclusive acquires and 1 for all shared acquires |
|
363 |
# together |
|
364 |
self.assertEqual(self.sl._count_pending(), 7) |
|
365 |
|
|
366 |
# Release exclusive lock and wait |
|
367 |
self.sl.release() |
|
368 |
|
|
369 |
self._waitThreads() |
|
370 |
|
|
371 |
# Check sequence |
|
372 |
for _ in xrange(10): |
|
373 |
# Shared locks aren't guaranteed to be notified in order, but they'll be |
|
374 |
# first |
|
375 |
self.assert_(self.done.get_nowait() in ("shared A", "shared C")) |
|
376 |
|
|
377 |
for _ in xrange(3): |
|
378 |
self.assertEqual(self.done.get_nowait(), "exclusive B") |
|
379 |
|
|
380 |
for _ in xrange(3): |
|
381 |
self.assertEqual(self.done.get_nowait(), "exclusive D") |
|
382 |
|
|
383 |
self.assertRaises(Queue.Empty, self.done.get_nowait) |
|
384 |
|
|
385 |
@_Repeat |
|
386 |
def testMixedAcquireTimeout(self): |
|
387 |
sync = threading.Condition() |
|
388 |
|
|
389 |
def _AcquireShared(ev): |
|
390 |
if not self.sl.acquire(shared=1, timeout=None): |
|
391 |
return |
|
392 |
|
|
393 |
self.done.put("shared") |
|
394 |
|
|
395 |
# Notify main thread |
|
396 |
ev.set() |
|
397 |
|
|
398 |
# Wait for notification |
|
399 |
sync.acquire() |
|
400 |
try: |
|
401 |
sync.wait() |
|
402 |
finally: |
|
403 |
sync.release() |
|
404 |
|
|
405 |
# Release lock |
|
406 |
self.sl.release() |
|
407 |
|
|
408 |
acquires = [] |
|
409 |
for _ in xrange(3): |
|
410 |
ev = threading.Event() |
|
411 |
self._addThread(target=_AcquireShared, args=(ev, )) |
|
412 |
acquires.append(ev) |
|
413 |
|
|
414 |
# Wait for all acquires to finish |
|
415 |
for i in acquires: |
|
416 |
i.wait() |
|
417 |
|
|
418 |
self.assertEqual(self.sl._count_pending(), 0) |
|
419 |
|
|
420 |
# Try to get exclusive lock |
|
421 |
self.failIf(self.sl.acquire(shared=0, timeout=0.02)) |
|
422 |
|
|
423 |
# Acquire exclusive without timeout |
|
424 |
exclsync = threading.Condition() |
|
425 |
exclev = threading.Event() |
|
426 |
|
|
427 |
def _AcquireExclusive(): |
|
428 |
if not self.sl.acquire(shared=0): |
|
429 |
return |
|
430 |
|
|
431 |
self.done.put("exclusive") |
|
432 |
|
|
433 |
# Notify main thread |
|
434 |
exclev.set() |
|
435 |
|
|
436 |
exclsync.acquire() |
|
437 |
try: |
|
438 |
exclsync.wait() |
|
439 |
finally: |
|
440 |
exclsync.release() |
|
441 |
|
|
442 |
self.sl.release() |
|
443 |
|
|
444 |
self._addThread(target=_AcquireExclusive) |
|
445 |
|
|
446 |
# Try to get exclusive lock |
|
447 |
self.failIf(self.sl.acquire(shared=0, timeout=0.02)) |
|
448 |
|
|
449 |
# Make all shared holders release their locks |
|
450 |
sync.acquire() |
|
451 |
try: |
|
452 |
sync.notifyAll() |
|
453 |
finally: |
|
454 |
sync.release() |
|
455 |
|
|
456 |
# Wait for exclusive acquire to succeed |
|
457 |
exclev.wait() |
|
458 |
|
|
459 |
self.assertEqual(self.sl._count_pending(), 0) |
|
460 |
|
|
461 |
# Try to get exclusive lock |
|
462 |
self.failIf(self.sl.acquire(shared=0, timeout=0.02)) |
|
463 |
|
|
464 |
def _AcquireSharedSimple(): |
|
465 |
if self.sl.acquire(shared=1, timeout=None): |
|
466 |
self.done.put("shared2") |
|
467 |
self.sl.release() |
|
468 |
|
|
469 |
for _ in xrange(10): |
|
470 |
self._addThread(target=_AcquireSharedSimple) |
|
471 |
|
|
472 |
# Tell exclusive lock to release |
|
473 |
exclsync.acquire() |
|
474 |
try: |
|
475 |
exclsync.notifyAll() |
|
476 |
finally: |
|
477 |
exclsync.release() |
|
478 |
|
|
479 |
# Wait for everything to finish |
|
480 |
self._waitThreads() |
|
481 |
|
|
482 |
self.assertEqual(self.sl._count_pending(), 0) |
|
483 |
|
|
484 |
# Check sequence |
|
485 |
for _ in xrange(3): |
|
486 |
self.assertEqual(self.done.get_nowait(), "shared") |
|
487 |
|
|
488 |
self.assertEqual(self.done.get_nowait(), "exclusive") |
|
489 |
|
|
490 |
for _ in xrange(10): |
|
491 |
self.assertEqual(self.done.get_nowait(), "shared2") |
|
492 |
|
|
493 |
self.assertRaises(Queue.Empty, self.done.get_nowait) |
|
494 |
|
|
283 | 495 |
|
284 | 496 |
class TestSSynchronizedDecorator(_ThreadedTestCase): |
285 | 497 |
"""Shared Lock Synchronized decorator test""" |
... | ... | |
307 | 519 |
|
308 | 520 |
def testSharersCanCoexist(self): |
309 | 521 |
_decoratorlock.acquire(shared=1) |
310 |
Thread(target=self._doItSharer).start() |
|
522 |
threading.Thread(target=self._doItSharer).start()
|
|
311 | 523 |
self.assert_(self.done.get(True, 1)) |
312 | 524 |
_decoratorlock.release() |
313 | 525 |
|
... | ... | |
354 | 566 |
self.resources = ['one', 'two', 'three'] |
355 | 567 |
self.ls = locking.LockSet(members=self.resources) |
356 | 568 |
|
357 |
|
|
358 | 569 |
def testResources(self): |
359 | 570 |
self.assertEquals(self.ls._names(), set(self.resources)) |
360 | 571 |
newls = locking.LockSet() |
... | ... | |
489 | 700 |
# We haven't really acquired anything, so we cannot release |
490 | 701 |
self.assertRaises(AssertionError, self.ls.release) |
491 | 702 |
|
492 |
def _doLockSet(self, set, shared):
|
|
703 |
def _doLockSet(self, names, shared):
|
|
493 | 704 |
try: |
494 |
self.ls.acquire(set, shared=shared)
|
|
705 |
self.ls.acquire(names, shared=shared)
|
|
495 | 706 |
self.done.put('DONE') |
496 | 707 |
self.ls.release() |
497 | 708 |
except errors.LockError: |
498 | 709 |
self.done.put('ERR') |
499 | 710 |
|
500 |
def _doAddSet(self, set):
|
|
711 |
def _doAddSet(self, names):
|
|
501 | 712 |
try: |
502 |
self.ls.add(set, acquired=1)
|
|
713 |
self.ls.add(names, acquired=1)
|
|
503 | 714 |
self.done.put('DONE') |
504 | 715 |
self.ls.release() |
505 | 716 |
except errors.LockError: |
506 | 717 |
self.done.put('ERR') |
507 | 718 |
|
508 |
def _doRemoveSet(self, set):
|
|
509 |
self.done.put(self.ls.remove(set))
|
|
719 |
def _doRemoveSet(self, names):
|
|
720 |
self.done.put(self.ls.remove(names))
|
|
510 | 721 |
|
511 | 722 |
@_Repeat |
512 | 723 |
def testConcurrentSharedAcquire(self): |
... | ... | |
537 | 748 |
self._addThread(target=self._doLockSet, args=('three', 0)) |
538 | 749 |
self._waitThreads() |
539 | 750 |
self.assertEqual(self.done.get_nowait(), 'DONE') |
751 |
self.assertRaises(Queue.Empty, self.done.get_nowait) |
|
540 | 752 |
self._addThread(target=self._doLockSet, args=(['one', 'two'], 0)) |
541 | 753 |
self._addThread(target=self._doLockSet, args=(['one', 'two'], 1)) |
542 | 754 |
self._addThread(target=self._doLockSet, args=('one', 0)) |
Also available in: Unified diff