Revision 887c7aa6 test/ganeti.locking_unittest.py
b/test/ganeti.locking_unittest.py | ||
---|---|---|
28 | 28 |
import Queue |
29 | 29 |
import threading |
30 | 30 |
import random |
31 |
import itertools |
|
31 | 32 |
|
32 | 33 |
from ganeti import locking |
33 | 34 |
from ganeti import errors |
34 | 35 |
from ganeti import utils |
36 |
from ganeti import compat |
|
35 | 37 |
|
36 | 38 |
import testutils |
37 | 39 |
|
... | ... | |
701 | 703 |
|
702 | 704 |
self.assertRaises(Queue.Empty, self.done.get_nowait) |
703 | 705 |
|
706 |
def testPriority(self): |
|
707 |
# Acquire in exclusive mode |
|
708 |
self.assert_(self.sl.acquire(shared=0)) |
|
709 |
|
|
710 |
# Queue acquires |
|
711 |
def _Acquire(prev, next, shared, priority, result): |
|
712 |
prev.wait() |
|
713 |
self.sl.acquire(shared=shared, priority=priority, test_notify=next.set) |
|
714 |
try: |
|
715 |
self.done.put(result) |
|
716 |
finally: |
|
717 |
self.sl.release() |
|
718 |
|
|
719 |
counter = itertools.count(0) |
|
720 |
priorities = range(-20, 30) |
|
721 |
first = threading.Event() |
|
722 |
prev = first |
|
723 |
|
|
724 |
# Data structure: |
|
725 |
# { |
|
726 |
# priority: |
|
727 |
# [(shared/exclusive, set(acquire names), set(pending threads)), |
|
728 |
# (shared/exclusive, ...), |
|
729 |
# ..., |
|
730 |
# ], |
|
731 |
# } |
|
732 |
perprio = {} |
|
733 |
|
|
734 |
# References shared acquire per priority in L{perprio}. Data structure: |
|
735 |
# { |
|
736 |
# priority: (shared=1, set(acquire names), set(pending threads)), |
|
737 |
# } |
|
738 |
prioshared = {} |
|
739 |
|
|
740 |
for seed in [4979, 9523, 14902, 32440]: |
|
741 |
# Use a deterministic random generator |
|
742 |
rnd = random.Random(seed) |
|
743 |
for priority in [rnd.choice(priorities) for _ in range(30)]: |
|
744 |
modes = [0, 1] |
|
745 |
rnd.shuffle(modes) |
|
746 |
for shared in modes: |
|
747 |
# Unique name |
|
748 |
acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority) |
|
749 |
|
|
750 |
ev = threading.Event() |
|
751 |
thread = self._addThread(target=_Acquire, |
|
752 |
args=(prev, ev, shared, priority, acqname)) |
|
753 |
prev = ev |
|
754 |
|
|
755 |
# Record expected aqcuire, see above for structure |
|
756 |
data = (shared, set([acqname]), set([thread])) |
|
757 |
priolist = perprio.setdefault(priority, []) |
|
758 |
if shared: |
|
759 |
priosh = prioshared.get(priority, None) |
|
760 |
if priosh: |
|
761 |
# Shared acquires are merged |
|
762 |
for i, j in zip(priosh[1:], data[1:]): |
|
763 |
i.update(j) |
|
764 |
assert data[0] == priosh[0] |
|
765 |
else: |
|
766 |
prioshared[priority] = data |
|
767 |
priolist.append(data) |
|
768 |
else: |
|
769 |
priolist.append(data) |
|
770 |
|
|
771 |
# Start all acquires and wait for them |
|
772 |
first.set() |
|
773 |
prev.wait() |
|
774 |
|
|
775 |
# Check lock information |
|
776 |
self.assertEqual(self.sl.GetInfo(["name"]), [self.sl.name]) |
|
777 |
self.assertEqual(self.sl.GetInfo(["mode", "owner"]), |
|
778 |
["exclusive", [threading.currentThread().getName()]]) |
|
779 |
self.assertEqual(self.sl.GetInfo(["name", "pending"]), |
|
780 |
[self.sl.name, |
|
781 |
[(["exclusive", "shared"][int(bool(shared))], |
|
782 |
sorted([t.getName() for t in threads])) |
|
783 |
for acquires in [perprio[i] |
|
784 |
for i in sorted(perprio.keys())] |
|
785 |
for (shared, _, threads) in acquires]]) |
|
786 |
|
|
787 |
# Let threads acquire the lock |
|
788 |
self.sl.release() |
|
789 |
|
|
790 |
# Wait for everything to finish |
|
791 |
self._waitThreads() |
|
792 |
|
|
793 |
self.assert_(self.sl._check_empty()) |
|
794 |
|
|
795 |
# Check acquires by priority |
|
796 |
for acquires in [perprio[i] for i in sorted(perprio.keys())]: |
|
797 |
for (_, names, _) in acquires: |
|
798 |
# For shared acquires, the set will contain 1..n entries. For exclusive |
|
799 |
# acquires only one. |
|
800 |
while names: |
|
801 |
names.remove(self.done.get_nowait()) |
|
802 |
self.assertFalse(compat.any(names for (_, names, _) in acquires)) |
|
803 |
|
|
804 |
self.assertRaises(Queue.Empty, self.done.get_nowait) |
|
805 |
|
|
704 | 806 |
|
705 | 807 |
class TestSharedLockInCondition(_ThreadedTestCase): |
706 | 808 |
"""SharedLock as a condition lock tests""" |
... | ... | |
1259 | 1361 |
self.assertEqual(self.done.get_nowait(), 'DONE') |
1260 | 1362 |
self._setUpLS() |
1261 | 1363 |
|
1364 |
def testPriority(self): |
|
1365 |
def _Acquire(prev, next, name, priority, success_fn): |
|
1366 |
prev.wait() |
|
1367 |
self.assert_(self.ls.acquire(name, shared=0, |
|
1368 |
priority=priority, |
|
1369 |
test_notify=lambda _: next.set())) |
|
1370 |
try: |
|
1371 |
success_fn() |
|
1372 |
finally: |
|
1373 |
self.ls.release() |
|
1374 |
|
|
1375 |
# Get all in exclusive mode |
|
1376 |
self.assert_(self.ls.acquire(locking.ALL_SET, shared=0)) |
|
1377 |
|
|
1378 |
done_two = Queue.Queue(0) |
|
1379 |
|
|
1380 |
first = threading.Event() |
|
1381 |
prev = first |
|
1382 |
|
|
1383 |
acquires = [("one", prio, self.done) for prio in range(1, 33)] |
|
1384 |
acquires.extend([("two", prio, done_two) for prio in range(1, 33)]) |
|
1385 |
|
|
1386 |
# Use a deterministic random generator |
|
1387 |
random.Random(741).shuffle(acquires) |
|
1388 |
|
|
1389 |
for (name, prio, done) in acquires: |
|
1390 |
ev = threading.Event() |
|
1391 |
self._addThread(target=_Acquire, |
|
1392 |
args=(prev, ev, name, prio, |
|
1393 |
compat.partial(done.put, "Prio%s" % prio))) |
|
1394 |
prev = ev |
|
1395 |
|
|
1396 |
# Start acquires |
|
1397 |
first.set() |
|
1398 |
|
|
1399 |
# Wait for last acquire to start |
|
1400 |
prev.wait() |
|
1401 |
|
|
1402 |
# Let threads acquire locks |
|
1403 |
self.ls.release() |
|
1404 |
|
|
1405 |
# Wait for threads to finish |
|
1406 |
self._waitThreads() |
|
1407 |
|
|
1408 |
for i in range(1, 33): |
|
1409 |
self.assertEqual(self.done.get_nowait(), "Prio%s" % i) |
|
1410 |
self.assertEqual(done_two.get_nowait(), "Prio%s" % i) |
|
1411 |
|
|
1412 |
self.assertRaises(Queue.Empty, self.done.get_nowait) |
|
1413 |
self.assertRaises(Queue.Empty, done_two.get_nowait) |
|
1414 |
|
|
1262 | 1415 |
|
1263 | 1416 |
class TestGanetiLockManager(_ThreadedTestCase): |
1264 | 1417 |
|
Also available in: Unified diff