Revision 008b92fa test/ganeti.locking_unittest.py
b/test/ganeti.locking_unittest.py | ||
---|---|---|
450 | 450 |
|
451 | 451 |
@_Repeat |
452 | 452 |
def testExclusiveAcquireTimeout(self): |
453 |
def _LockExclusive(wait): |
|
454 |
self.sl.acquire(shared=0) |
|
455 |
self.done.put("A: start sleep") |
|
456 |
time.sleep(wait) |
|
457 |
self.done.put("A: end sleep") |
|
458 |
self.sl.release() |
|
459 |
|
|
460 | 453 |
for shared in [0, 1]: |
461 |
# Start thread to hold lock for 20 ms |
|
462 |
self._addThread(target=_LockExclusive, args=(20.0 / 1000.0, )) |
|
454 |
on_queue = threading.Event() |
|
455 |
release_exclusive = threading.Event() |
|
456 |
|
|
457 |
def _LockExclusive(): |
|
458 |
self.sl.acquire(shared=0, test_notify=on_queue.set) |
|
459 |
self.done.put("A: start wait") |
|
460 |
release_exclusive.wait() |
|
461 |
self.done.put("A: end wait") |
|
462 |
self.sl.release() |
|
463 |
|
|
464 |
# Start thread to hold lock in exclusive mode |
|
465 |
self._addThread(target=_LockExclusive) |
|
463 | 466 |
|
464 |
# Wait for sleep to begin |
|
465 |
self.assertEqual(self.done.get(), "A: start sleep") |
|
467 |
# Wait for wait to begin |
|
468 |
self.assertEqual(self.done.get(timeout=60), "A: start wait") |
|
469 |
|
|
470 |
# Wait up to 60s to get lock, but release exclusive lock as soon as we're |
|
471 |
# on the queue |
|
472 |
self.failUnless(self.sl.acquire(shared=shared, timeout=60, |
|
473 |
test_notify=release_exclusive.set)) |
|
466 | 474 |
|
467 |
# Wait up to 100 ms to get lock |
|
468 |
self.failUnless(self.sl.acquire(shared=shared, timeout=0.1)) |
|
469 | 475 |
self.done.put("got 2nd") |
470 | 476 |
self.sl.release() |
471 | 477 |
|
472 | 478 |
self._waitThreads() |
473 | 479 |
|
474 |
self.assertEqual(self.done.get_nowait(), "A: end sleep")
|
|
480 |
self.assertEqual(self.done.get_nowait(), "A: end wait")
|
|
475 | 481 |
self.assertEqual(self.done.get_nowait(), "got 2nd") |
476 | 482 |
self.assertRaises(Queue.Empty, self.done.get_nowait) |
477 | 483 |
|
... | ... | |
507 | 513 |
# Tests whether shared acquires jump in front of exclusive acquires in the |
508 | 514 |
# queue. |
509 | 515 |
|
510 |
# Get exclusive lock while we fill the queue |
|
511 |
self.sl.acquire() |
|
516 |
def _Acquire(shared, name, notify_ev, wait_ev): |
|
517 |
if notify_ev: |
|
518 |
notify_fn = notify_ev.set |
|
519 |
else: |
|
520 |
notify_fn = None |
|
512 | 521 |
|
513 |
def _Acquire(shared, name): |
|
514 |
if not self.sl.acquire(shared=shared): |
|
522 |
if wait_ev: |
|
523 |
wait_ev.wait() |
|
524 |
|
|
525 |
if not self.sl.acquire(shared=shared, test_notify=notify_fn): |
|
515 | 526 |
return |
516 | 527 |
|
517 | 528 |
self.done.put(name) |
518 | 529 |
self.sl.release() |
519 | 530 |
|
520 |
# Start shared acquires |
|
521 |
for _ in xrange(5): |
|
522 |
self._addThread(target=_Acquire, args=(1, "shared A")) |
|
531 |
# Get exclusive lock while we fill the queue |
|
532 |
self.sl.acquire() |
|
523 | 533 |
|
524 |
# Start exclusive acquires |
|
525 |
for _ in xrange(3): |
|
526 |
self._addThread(target=_Acquire, args=(0, "exclusive B")) |
|
534 |
shrcnt1 = 5 |
|
535 |
shrcnt2 = 7 |
|
536 |
shrcnt3 = 9 |
|
537 |
shrcnt4 = 2 |
|
527 | 538 |
|
528 |
# More shared acquires |
|
529 |
for _ in xrange(5): |
|
530 |
self._addThread(target=_Acquire, args=(1, "shared C")) |
|
539 |
# Add acquires using threading.Event for synchronization. They'll be |
|
540 |
# acquired exactly in the order defined in this list. |
|
541 |
acquires = (shrcnt1 * [(1, "shared 1")] + |
|
542 |
3 * [(0, "exclusive 1")] + |
|
543 |
shrcnt2 * [(1, "shared 2")] + |
|
544 |
shrcnt3 * [(1, "shared 3")] + |
|
545 |
shrcnt4 * [(1, "shared 4")] + |
|
546 |
3 * [(0, "exclusive 2")]) |
|
531 | 547 |
|
532 |
# More exclusive acquires |
|
533 |
for _ in xrange(3): |
|
534 |
self._addThread(target=_Acquire, args=(0, "exclusive D")) |
|
548 |
ev_cur = None |
|
549 |
ev_prev = None |
|
550 |
|
|
551 |
for args in acquires: |
|
552 |
ev_cur = threading.Event() |
|
553 |
self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev)) |
|
554 |
ev_prev = ev_cur |
|
555 |
|
|
556 |
# Wait for last acquire to start |
|
557 |
ev_prev.wait() |
|
535 | 558 |
|
536 | 559 |
# Expect 6 pending exclusive acquires and 1 for all shared acquires |
537 |
# together. There's no way to wait for SharedLock.acquire to start |
|
538 |
# its work. Hence the timeout of 2 seconds. |
|
539 |
pending = 0 |
|
540 |
end_time = time.time() + 2.0 |
|
541 |
while time.time() < end_time: |
|
542 |
pending = self.sl._count_pending() |
|
543 |
self.assert_(pending >= 0 and pending <= 7) |
|
544 |
if pending == 7: |
|
545 |
break |
|
546 |
time.sleep(0.05) |
|
547 |
self.assertEqual(pending, 7) |
|
560 |
# together |
|
561 |
self.assertEqual(self.sl._count_pending(), 7) |
|
548 | 562 |
|
549 | 563 |
# Release exclusive lock and wait |
550 | 564 |
self.sl.release() |
... | ... | |
552 | 566 |
self._waitThreads() |
553 | 567 |
|
554 | 568 |
# Check sequence |
555 |
shr_a = 0 |
|
556 |
shr_c = 0 |
|
557 |
for _ in xrange(10): |
|
569 |
for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4): |
|
558 | 570 |
# Shared locks aren't guaranteed to be notified in order, but they'll be |
559 | 571 |
# first |
560 | 572 |
tmp = self.done.get_nowait() |
561 |
if tmp == "shared A": |
|
562 |
shr_a += 1 |
|
563 |
elif tmp == "shared C": |
|
564 |
shr_c += 1 |
|
565 |
self.assertEqual(shr_a, 5) |
|
566 |
self.assertEqual(shr_c, 5) |
|
573 |
if tmp == "shared 1": |
|
574 |
shrcnt1 -= 1 |
|
575 |
elif tmp == "shared 2": |
|
576 |
shrcnt2 -= 1 |
|
577 |
elif tmp == "shared 3": |
|
578 |
shrcnt3 -= 1 |
|
579 |
elif tmp == "shared 4": |
|
580 |
shrcnt4 -= 1 |
|
581 |
self.assertEqual(shrcnt1, 0) |
|
582 |
self.assertEqual(shrcnt2, 0) |
|
583 |
self.assertEqual(shrcnt3, 0) |
|
584 |
self.assertEqual(shrcnt3, 0) |
|
567 | 585 |
|
568 | 586 |
for _ in xrange(3): |
569 |
self.assertEqual(self.done.get_nowait(), "exclusive B")
|
|
587 |
self.assertEqual(self.done.get_nowait(), "exclusive 1")
|
|
570 | 588 |
|
571 | 589 |
for _ in xrange(3): |
572 |
self.assertEqual(self.done.get_nowait(), "exclusive D")
|
|
590 |
self.assertEqual(self.done.get_nowait(), "exclusive 2")
|
|
573 | 591 |
|
574 | 592 |
self.assertRaises(Queue.Empty, self.done.get_nowait) |
575 | 593 |
|
Also available in: Unified diff