4 # Copyright (C) 2006, 2007, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the locking module"""
34 from ganeti import constants
35 from ganeti import locking
36 from ganeti import errors
37 from ganeti import utils
38 from ganeti import compat
39 from ganeti import objects
40 from ganeti import query
45 # This is used to test the ssynchronize decorator.
46 # Since it's passed as input to a decorator it must be declared as a global.
47 _decoratorlock = locking.SharedLock("decorator lock")
49 #: List for looping tests
54 """Decorator for executing a function many times"""
55 def wrapper(*args, **kwargs):
61 def SafeSleep(duration):
64 delay = start + duration - time.time()
70 class _ThreadedTestCase(unittest.TestCase):
71 """Test class that supports adding/waiting on threads"""
73 unittest.TestCase.setUp(self)
74 self.done = Queue.Queue(0)
77 def _addThread(self, *args, **kwargs):
78 """Create and remember a new thread"""
79 t = threading.Thread(*args, **kwargs)
80 self.threads.append(t)
84 def _waitThreads(self):
85 """Wait for all our threads to finish"""
86 for t in self.threads:
88 self.failIf(t.isAlive())
92 class _ConditionTestCase(_ThreadedTestCase):
93 """Common test case for conditions"""
96 _ThreadedTestCase.setUp(self)
97 self.lock = threading.Lock()
98 self.cond = cls(self.lock)
100 def _testAcquireRelease(self):
101 self.assertFalse(self.cond._is_owned())
102 self.assertRaises(RuntimeError, self.cond.wait, None)
103 self.assertRaises(RuntimeError, self.cond.notifyAll)
106 self.assert_(self.cond._is_owned())
107 self.cond.notifyAll()
108 self.assert_(self.cond._is_owned())
111 self.assertFalse(self.cond._is_owned())
112 self.assertRaises(RuntimeError, self.cond.wait, None)
113 self.assertRaises(RuntimeError, self.cond.notifyAll)
115 def _testNotification(self):
120 self.cond.notifyAll()
125 self._addThread(target=_NotifyAll)
126 self.assertEqual(self.done.get(True, 1), "NE")
127 self.assertRaises(Queue.Empty, self.done.get_nowait)
129 self.assertEqual(self.done.get(True, 1), "NA")
130 self.assertEqual(self.done.get(True, 1), "NN")
131 self.assert_(self.cond._is_owned())
133 self.assertFalse(self.cond._is_owned())
136 class TestSingleNotifyPipeCondition(_ConditionTestCase):
137 """SingleNotifyPipeCondition tests"""
140 _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
142 def testAcquireRelease(self):
143 self._testAcquireRelease()
145 def testNotification(self):
146 self._testNotification()
148 def testWaitReuse(self):
154 def testNoNotifyReuse(self):
156 self.cond.notifyAll()
157 self.assertRaises(RuntimeError, self.cond.wait, None)
158 self.assertRaises(RuntimeError, self.cond.notifyAll)
162 class TestPipeCondition(_ConditionTestCase):
163 """PipeCondition tests"""
166 _ConditionTestCase.setUp(self, locking.PipeCondition)
168 def testAcquireRelease(self):
169 self._testAcquireRelease()
171 def testNotification(self):
172 self._testNotification()
174 def _TestWait(self, fn):
176 self._addThread(target=fn),
177 self._addThread(target=fn),
178 self._addThread(target=fn),
181 # Wait for threads to be waiting
183 self.assertEqual(self.done.get(True, 1), "A")
185 self.assertRaises(Queue.Empty, self.done.get_nowait)
188 self.assertEqual(len(self.cond._waiters), 3)
189 self.assertEqual(self.cond._waiters, set(threads))
191 self.assertTrue(repr(self.cond).startswith("<"))
192 self.assertTrue("waiters=" in repr(self.cond))
194 # This new thread can't acquire the lock, and thus call wait, before we
196 self._addThread(target=fn)
197 self.cond.notifyAll()
198 self.assertRaises(Queue.Empty, self.done.get_nowait)
201 # We should now get 3 W and 1 A (for the new thread) in whatever order
205 got = self.done.get(True, 1)
211 self.fail("Got %s on the done queue" % got)
213 self.assertEqual(w, 3)
214 self.assertEqual(a, 1)
217 self.cond.notifyAll()
220 self.assertEqual(self.done.get_nowait(), "W")
221 self.assertRaises(Queue.Empty, self.done.get_nowait)
223 def testBlockingWait(self):
231 self._TestWait(_BlockingWait)
233 def testLongTimeoutWait(self):
241 self._TestWait(_Helper)
243 def _TimeoutWait(self, timeout, check):
245 self.cond.wait(timeout)
249 def testShortTimeoutWait(self):
250 self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
251 self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
253 self.assertEqual(self.done.get_nowait(), "T1")
254 self.assertEqual(self.done.get_nowait(), "T1")
255 self.assertRaises(Queue.Empty, self.done.get_nowait)
257 def testZeroTimeoutWait(self):
258 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
259 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
260 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
262 self.assertEqual(self.done.get_nowait(), "T0")
263 self.assertEqual(self.done.get_nowait(), "T0")
264 self.assertEqual(self.done.get_nowait(), "T0")
265 self.assertRaises(Queue.Empty, self.done.get_nowait)
268 class TestSharedLock(_ThreadedTestCase):
269 """SharedLock tests"""
272 _ThreadedTestCase.setUp(self)
273 self.sl = locking.SharedLock("TestSharedLock")
275 self.assertTrue(repr(self.sl).startswith("<"))
276 self.assertTrue("name=TestSharedLock" in repr(self.sl))
278 def testSequenceAndOwnership(self):
279 self.assertFalse(self.sl.is_owned())
280 self.sl.acquire(shared=1)
281 self.assert_(self.sl.is_owned())
282 self.assert_(self.sl.is_owned(shared=1))
283 self.assertFalse(self.sl.is_owned(shared=0))
285 self.assertFalse(self.sl.is_owned())
287 self.assert_(self.sl.is_owned())
288 self.assertFalse(self.sl.is_owned(shared=1))
289 self.assert_(self.sl.is_owned(shared=0))
291 self.assertFalse(self.sl.is_owned())
292 self.sl.acquire(shared=1)
293 self.assert_(self.sl.is_owned())
294 self.assert_(self.sl.is_owned(shared=1))
295 self.assertFalse(self.sl.is_owned(shared=0))
297 self.assertFalse(self.sl.is_owned())
299 def testBooleanValue(self):
300 # semaphores are supposed to return a true value on a successful acquire
301 self.assert_(self.sl.acquire(shared=1))
303 self.assert_(self.sl.acquire())
306 def testDoubleLockingStoE(self):
307 self.sl.acquire(shared=1)
308 self.assertRaises(AssertionError, self.sl.acquire)
310 def testDoubleLockingEtoS(self):
312 self.assertRaises(AssertionError, self.sl.acquire, shared=1)
314 def testDoubleLockingStoS(self):
315 self.sl.acquire(shared=1)
316 self.assertRaises(AssertionError, self.sl.acquire, shared=1)
318 def testDoubleLockingEtoE(self):
320 self.assertRaises(AssertionError, self.sl.acquire)
322 # helper functions: called in a separate thread they acquire the lock, send
323 # their identifier on the done queue, then release it.
324 def _doItSharer(self):
326 self.sl.acquire(shared=1)
329 except errors.LockError:
332 def _doItExclusive(self):
337 except errors.LockError:
340 def _doItDelete(self):
344 except errors.LockError:
347 def testSharersCanCoexist(self):
348 self.sl.acquire(shared=1)
349 threading.Thread(target=self._doItSharer).start()
350 self.assert_(self.done.get(True, 1))
354 def testExclusiveBlocksExclusive(self):
356 self._addThread(target=self._doItExclusive)
357 self.assertRaises(Queue.Empty, self.done.get_nowait)
360 self.failUnlessEqual(self.done.get_nowait(), "EXC")
363 def testExclusiveBlocksDelete(self):
365 self._addThread(target=self._doItDelete)
366 self.assertRaises(Queue.Empty, self.done.get_nowait)
369 self.failUnlessEqual(self.done.get_nowait(), "DEL")
370 self.sl = locking.SharedLock(self.sl.name)
373 def testExclusiveBlocksSharer(self):
375 self._addThread(target=self._doItSharer)
376 self.assertRaises(Queue.Empty, self.done.get_nowait)
379 self.failUnlessEqual(self.done.get_nowait(), "SHR")
382 def testSharerBlocksExclusive(self):
383 self.sl.acquire(shared=1)
384 self._addThread(target=self._doItExclusive)
385 self.assertRaises(Queue.Empty, self.done.get_nowait)
388 self.failUnlessEqual(self.done.get_nowait(), "EXC")
391 def testSharerBlocksDelete(self):
392 self.sl.acquire(shared=1)
393 self._addThread(target=self._doItDelete)
394 self.assertRaises(Queue.Empty, self.done.get_nowait)
397 self.failUnlessEqual(self.done.get_nowait(), "DEL")
398 self.sl = locking.SharedLock(self.sl.name)
401 def testWaitingExclusiveBlocksSharer(self):
402 """SKIPPED testWaitingExclusiveBlockSharer"""
405 self.sl.acquire(shared=1)
406 # the lock is acquired in shared mode...
407 self._addThread(target=self._doItExclusive)
408 # ...but now an exclusive is waiting...
409 self._addThread(target=self._doItSharer)
410 # ...so the sharer should be blocked as well
411 self.assertRaises(Queue.Empty, self.done.get_nowait)
414 # The exclusive passed before
415 self.failUnlessEqual(self.done.get_nowait(), "EXC")
416 self.failUnlessEqual(self.done.get_nowait(), "SHR")
419 def testWaitingSharerBlocksExclusive(self):
420 """SKIPPED testWaitingSharerBlocksExclusive"""
424 # the lock is acquired in exclusive mode...
425 self._addThread(target=self._doItSharer)
426 # ...but now a sharer is waiting...
427 self._addThread(target=self._doItExclusive)
428 # ...the exclusive is waiting too...
429 self.assertRaises(Queue.Empty, self.done.get_nowait)
432 # The sharer passed before
433 self.assertEqual(self.done.get_nowait(), "SHR")
434 self.assertEqual(self.done.get_nowait(), "EXC")
436 def testDelete(self):
438 self.assertRaises(errors.LockError, self.sl.acquire)
439 self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
440 self.assertRaises(errors.LockError, self.sl.delete)
442 def testDeleteTimeout(self):
443 self.assertTrue(self.sl.delete(timeout=60))
445 def testDeleteTimeoutFail(self):
446 ready = threading.Event()
447 finish = threading.Event()
450 self.sl.acquire(shared=0)
456 self._addThread(target=fn)
459 # Test if deleting a lock owned in exclusive mode by another thread fails
460 # to delete when a timeout is used
461 self.assertFalse(self.sl.delete(timeout=0.02))
466 self.assertTrue(self.sl.delete())
467 self.assertRaises(errors.LockError, self.sl.acquire)
469 def testNoDeleteIfSharer(self):
470 self.sl.acquire(shared=1)
471 self.assertRaises(AssertionError, self.sl.delete)
474 def testDeletePendingSharersExclusiveDelete(self):
476 self._addThread(target=self._doItSharer)
477 self._addThread(target=self._doItSharer)
478 self._addThread(target=self._doItExclusive)
479 self._addThread(target=self._doItDelete)
482 # The threads who were pending return ERR
484 self.assertEqual(self.done.get_nowait(), "ERR")
485 self.sl = locking.SharedLock(self.sl.name)
488 def testDeletePendingDeleteExclusiveSharers(self):
490 self._addThread(target=self._doItDelete)
491 self._addThread(target=self._doItExclusive)
492 self._addThread(target=self._doItSharer)
493 self._addThread(target=self._doItSharer)
496 # The two threads who were pending return both ERR
497 self.assertEqual(self.done.get_nowait(), "ERR")
498 self.assertEqual(self.done.get_nowait(), "ERR")
499 self.assertEqual(self.done.get_nowait(), "ERR")
500 self.assertEqual(self.done.get_nowait(), "ERR")
501 self.sl = locking.SharedLock(self.sl.name)
504 def testExclusiveAcquireTimeout(self):
505 for shared in [0, 1]:
506 on_queue = threading.Event()
507 release_exclusive = threading.Event()
509 def _LockExclusive():
510 self.sl.acquire(shared=0, test_notify=on_queue.set)
511 self.done.put("A: start wait")
512 release_exclusive.wait()
513 self.done.put("A: end wait")
516 # Start thread to hold lock in exclusive mode
517 self._addThread(target=_LockExclusive)
519 # Wait for wait to begin
520 self.assertEqual(self.done.get(timeout=60), "A: start wait")
522 # Wait up to 60s to get lock, but release exclusive lock as soon as we're
524 self.failUnless(self.sl.acquire(shared=shared, timeout=60,
525 test_notify=release_exclusive.set))
527 self.done.put("got 2nd")
532 self.assertEqual(self.done.get_nowait(), "A: end wait")
533 self.assertEqual(self.done.get_nowait(), "got 2nd")
534 self.assertRaises(Queue.Empty, self.done.get_nowait)
537 def testAcquireExpiringTimeout(self):
538 def _AcquireWithTimeout(shared, timeout):
539 if not self.sl.acquire(shared=shared, timeout=timeout):
540 self.done.put("timeout")
542 for shared in [0, 1]:
546 # Start shared acquires with timeout between 0 and 20 ms
548 self._addThread(target=_AcquireWithTimeout,
549 args=(shared, i * 2.0 / 1000.0))
551 # Wait for threads to finish (makes sure the acquire timeout expires
552 # before releasing the lock)
559 self.assertEqual(self.done.get_nowait(), "timeout")
561 self.assertRaises(Queue.Empty, self.done.get_nowait)
564 def testSharedSkipExclusiveAcquires(self):
565 # Tests whether shared acquires jump in front of exclusive acquires in the
568 def _Acquire(shared, name, notify_ev, wait_ev):
570 notify_fn = notify_ev.set
577 if not self.sl.acquire(shared=shared, test_notify=notify_fn):
583 # Get exclusive lock while we fill the queue
591 # Add acquires using threading.Event for synchronization. They'll be
592 # acquired exactly in the order defined in this list.
593 acquires = (shrcnt1 * [(1, "shared 1")] +
594 3 * [(0, "exclusive 1")] +
595 shrcnt2 * [(1, "shared 2")] +
596 shrcnt3 * [(1, "shared 3")] +
597 shrcnt4 * [(1, "shared 4")] +
598 3 * [(0, "exclusive 2")])
603 for args in acquires:
604 ev_cur = threading.Event()
605 self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
608 # Wait for last acquire to start
611 # Expect 6 pending exclusive acquires and 1 for all shared acquires
613 self.assertEqual(self.sl._count_pending(), 7)
615 # Release exclusive lock and wait
621 for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
622 # Shared locks aren't guaranteed to be notified in order, but they'll be
624 tmp = self.done.get_nowait()
625 if tmp == "shared 1":
627 elif tmp == "shared 2":
629 elif tmp == "shared 3":
631 elif tmp == "shared 4":
633 self.assertEqual(shrcnt1, 0)
634 self.assertEqual(shrcnt2, 0)
635 self.assertEqual(shrcnt3, 0)
636 self.assertEqual(shrcnt3, 0)
639 self.assertEqual(self.done.get_nowait(), "exclusive 1")
642 self.assertEqual(self.done.get_nowait(), "exclusive 2")
644 self.assertRaises(Queue.Empty, self.done.get_nowait)
646 def testIllegalDowngrade(self):
648 self.assertRaises(AssertionError, self.sl.downgrade)
650 # Acquire in shared mode, downgrade should be no-op
651 self.assertTrue(self.sl.acquire(shared=1))
652 self.assertTrue(self.sl.is_owned(shared=1))
653 self.assertTrue(self.sl.downgrade())
654 self.assertTrue(self.sl.is_owned(shared=1))
657 def testDowngrade(self):
658 self.assertTrue(self.sl.acquire())
659 self.assertTrue(self.sl.is_owned(shared=0))
660 self.assertTrue(self.sl.downgrade())
661 self.assertTrue(self.sl.is_owned(shared=1))
665 def testDowngradeJumpsAheadOfExclusive(self):
666 def _KeepExclusive(ev_got, ev_downgrade, ev_release):
667 self.assertTrue(self.sl.acquire())
668 self.assertTrue(self.sl.is_owned(shared=0))
671 self.assertTrue(self.sl.is_owned(shared=0))
672 self.assertTrue(self.sl.downgrade())
673 self.assertTrue(self.sl.is_owned(shared=1))
675 self.assertTrue(self.sl.is_owned(shared=1))
678 def _KeepExclusive2(ev_started, ev_release):
679 self.assertTrue(self.sl.acquire(test_notify=ev_started.set))
680 self.assertTrue(self.sl.is_owned(shared=0))
682 self.assertTrue(self.sl.is_owned(shared=0))
685 def _KeepShared(ev_started, ev_got, ev_release):
686 self.assertTrue(self.sl.acquire(shared=1, test_notify=ev_started.set))
687 self.assertTrue(self.sl.is_owned(shared=1))
690 self.assertTrue(self.sl.is_owned(shared=1))
693 # Acquire lock in exclusive mode
694 ev_got_excl1 = threading.Event()
695 ev_downgrade_excl1 = threading.Event()
696 ev_release_excl1 = threading.Event()
697 th_excl1 = self._addThread(target=_KeepExclusive,
698 args=(ev_got_excl1, ev_downgrade_excl1,
702 # Start a second exclusive acquire
703 ev_started_excl2 = threading.Event()
704 ev_release_excl2 = threading.Event()
705 th_excl2 = self._addThread(target=_KeepExclusive2,
706 args=(ev_started_excl2, ev_release_excl2))
707 ev_started_excl2.wait()
709 # Start shared acquires, will jump ahead of second exclusive acquire when
710 # first exclusive acquire downgrades
711 ev_shared = [(threading.Event(), threading.Event()) for _ in range(5)]
712 ev_release_shared = threading.Event()
714 th_shared = [self._addThread(target=_KeepShared,
715 args=(ev_started, ev_got, ev_release_shared))
716 for (ev_started, ev_got) in ev_shared]
718 # Wait for all shared acquires to start
719 for (ev, _) in ev_shared:
722 # Check lock information
723 self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
724 [(self.sl.name, "exclusive", [th_excl1.getName()], None)])
725 [(_, _, _, pending), ] = self.sl.GetLockInfo(set([query.LQ_PENDING]))
726 self.assertEqual([(pendmode, sorted(waiting))
727 for (pendmode, waiting) in pending],
728 [("exclusive", [th_excl2.getName()]),
729 ("shared", sorted(th.getName() for th in th_shared))])
731 # Shared acquires won't start until the exclusive lock is downgraded
732 ev_downgrade_excl1.set()
734 # Wait for all shared acquires to be successful
735 for (_, ev) in ev_shared:
738 # Check lock information again
739 self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE,
741 [(self.sl.name, "shared", None,
742 [("exclusive", [th_excl2.getName()])])])
743 [(_, _, owner, _), ] = self.sl.GetLockInfo(set([query.LQ_OWNER]))
744 self.assertEqual(set(owner), set([th_excl1.getName()] +
745 [th.getName() for th in th_shared]))
747 ev_release_excl1.set()
748 ev_release_excl2.set()
749 ev_release_shared.set()
753 self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER,
755 [(self.sl.name, None, None, [])])
758 def testMixedAcquireTimeout(self):
759 sync = threading.Event()
761 def _AcquireShared(ev):
762 if not self.sl.acquire(shared=1, timeout=None):
765 self.done.put("shared")
770 # Wait for notification from main thread
778 ev = threading.Event()
779 self._addThread(target=_AcquireShared, args=(ev, ))
782 # Wait for all acquires to finish
786 self.assertEqual(self.sl._count_pending(), 0)
788 # Try to get exclusive lock
789 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
791 # Acquire exclusive without timeout
792 exclsync = threading.Event()
793 exclev = threading.Event()
795 def _AcquireExclusive():
796 if not self.sl.acquire(shared=0):
799 self.done.put("exclusive")
804 # Wait for notification from main thread
809 self._addThread(target=_AcquireExclusive)
811 # Try to get exclusive lock
812 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
814 # Make all shared holders release their locks
817 # Wait for exclusive acquire to succeed
820 self.assertEqual(self.sl._count_pending(), 0)
822 # Try to get exclusive lock
823 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
825 def _AcquireSharedSimple():
826 if self.sl.acquire(shared=1, timeout=None):
827 self.done.put("shared2")
831 self._addThread(target=_AcquireSharedSimple)
833 # Tell exclusive lock to release
836 # Wait for everything to finish
839 self.assertEqual(self.sl._count_pending(), 0)
843 self.assertEqual(self.done.get_nowait(), "shared")
845 self.assertEqual(self.done.get_nowait(), "exclusive")
848 self.assertEqual(self.done.get_nowait(), "shared2")
850 self.assertRaises(Queue.Empty, self.done.get_nowait)
852 def testPriority(self):
853 # Acquire in exclusive mode
854 self.assert_(self.sl.acquire(shared=0))
857 def _Acquire(prev, next, shared, priority, result):
859 self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
861 self.done.put(result)
865 counter = itertools.count(0)
866 priorities = range(-20, 30)
867 first = threading.Event()
873 # [(shared/exclusive, set(acquire names), set(pending threads)),
874 # (shared/exclusive, ...),
880 # References shared acquire per priority in L{perprio}. Data structure:
882 # priority: (shared=1, set(acquire names), set(pending threads)),
886 for seed in [4979, 9523, 14902, 32440]:
887 # Use a deterministic random generator
888 rnd = random.Random(seed)
889 for priority in [rnd.choice(priorities) for _ in range(30)]:
894 acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
896 ev = threading.Event()
897 thread = self._addThread(target=_Acquire,
898 args=(prev, ev, shared, priority, acqname))
901 # Record expected aqcuire, see above for structure
902 data = (shared, set([acqname]), set([thread]))
903 priolist = perprio.setdefault(priority, [])
905 priosh = prioshared.get(priority, None)
907 # Shared acquires are merged
908 for i, j in zip(priosh[1:], data[1:]):
910 assert data[0] == priosh[0]
912 prioshared[priority] = data
913 priolist.append(data)
915 priolist.append(data)
917 # Start all acquires and wait for them
921 # Check lock information
922 self.assertEqual(self.sl.GetLockInfo(set()),
923 [(self.sl.name, None, None, None)])
924 self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
925 [(self.sl.name, "exclusive",
926 [threading.currentThread().getName()], None)])
928 self._VerifyPrioPending(self.sl.GetLockInfo(set([query.LQ_PENDING])),
931 # Let threads acquire the lock
934 # Wait for everything to finish
937 self.assert_(self.sl._check_empty())
939 # Check acquires by priority
940 for acquires in [perprio[i] for i in sorted(perprio.keys())]:
941 for (_, names, _) in acquires:
942 # For shared acquires, the set will contain 1..n entries. For exclusive
945 names.remove(self.done.get_nowait())
946 self.assertFalse(compat.any(names for (_, names, _) in acquires))
948 self.assertRaises(Queue.Empty, self.done.get_nowait)
950 def _VerifyPrioPending(self, ((name, mode, owner, pending), ), perprio):
951 self.assertEqual(name, self.sl.name)
952 self.assert_(mode is None)
953 self.assert_(owner is None)
955 self.assertEqual([(pendmode, sorted(waiting))
956 for (pendmode, waiting) in pending],
957 [(["exclusive", "shared"][int(bool(shared))],
958 sorted(t.getName() for t in threads))
959 for acquires in [perprio[i]
960 for i in sorted(perprio.keys())]
961 for (shared, _, threads) in acquires])
963 class _FakeTimeForSpuriousNotifications:
964 def __init__(self, now, check_end):
966 self.check_end = check_end
968 # Deterministic random number generator
969 self.rnd = random.Random(15086)
972 # Advance time if the random number generator thinks so (this is to test
973 # multiple notifications without advancing the time)
974 if self.rnd.random() < 0.3:
975 self.now += self.rnd.random()
977 self.check_end(self.now)
982 def testAcquireTimeoutWithSpuriousNotifications(self):
983 ready = threading.Event()
984 locked = threading.Event()
991 self.assertFalse(locked.isSet())
993 # If we waited long enough (in virtual time), tell main thread to release
994 # lock, otherwise tell it to notify once more
995 req.put(now < (epoch + (timeout * 0.8)))
997 time_fn = self._FakeTimeForSpuriousNotifications(epoch, check_end).time
999 sl = locking.SharedLock("test", _time_fn=time_fn)
1001 # Acquire in exclusive mode
1002 sl.acquire(shared=0)
1005 self.assertTrue(sl.acquire(shared=0, timeout=timeout,
1006 test_notify=ready.set))
1009 self.done.put("success")
1011 # Start acquire with timeout and wait for it to be ready
1012 self._addThread(target=fn)
1015 # The separate thread is now waiting to acquire the lock, so start sending
1016 # spurious notifications.
1018 # Wait for separate thread to ask for another notification
1021 # After sending the notification, the lock will take a short amount of
1022 # time to notice and to retrieve the current time
1023 sl._notify_topmost()
1026 self.assertTrue(count > 100, "Not enough notifications were sent")
1028 self.assertFalse(locked.isSet())
1030 # Some notifications have been sent, now actually release the lock
1033 # Wait for lock to be acquired
1038 self.assertEqual(self.done.get_nowait(), "success")
1039 self.assertRaises(Queue.Empty, self.done.get_nowait)
1042 class TestSharedLockInCondition(_ThreadedTestCase):
1043 """SharedLock as a condition lock tests"""
1046 _ThreadedTestCase.setUp(self)
1047 self.sl = locking.SharedLock("TestSharedLockInCondition")
1050 def setCondition(self):
1051 self.cond = threading.Condition(self.sl)
1053 def testKeepMode(self):
1054 self.cond.acquire(shared=1)
1055 self.assert_(self.sl.is_owned(shared=1))
1057 self.assert_(self.sl.is_owned(shared=1))
1059 self.cond.acquire(shared=0)
1060 self.assert_(self.sl.is_owned(shared=0))
1062 self.assert_(self.sl.is_owned(shared=0))
1066 class TestSharedLockInPipeCondition(TestSharedLockInCondition):
1067 """SharedLock as a pipe condition lock tests"""
1069 def setCondition(self):
1070 self.cond = locking.PipeCondition(self.sl)
1073 class TestSSynchronizedDecorator(_ThreadedTestCase):
1074 """Shared Lock Synchronized decorator test"""
1077 _ThreadedTestCase.setUp(self)
1079 @locking.ssynchronized(_decoratorlock)
1080 def _doItExclusive(self):
1081 self.assert_(_decoratorlock.is_owned())
1082 self.done.put("EXC")
1084 @locking.ssynchronized(_decoratorlock, shared=1)
1085 def _doItSharer(self):
1086 self.assert_(_decoratorlock.is_owned(shared=1))
1087 self.done.put("SHR")
1089 def testDecoratedFunctions(self):
1090 self._doItExclusive()
1091 self.assertFalse(_decoratorlock.is_owned())
1093 self.assertFalse(_decoratorlock.is_owned())
1095 def testSharersCanCoexist(self):
1096 _decoratorlock.acquire(shared=1)
1097 threading.Thread(target=self._doItSharer).start()
1098 self.assert_(self.done.get(True, 1))
1099 _decoratorlock.release()
1102 def testExclusiveBlocksExclusive(self):
1103 _decoratorlock.acquire()
1104 self._addThread(target=self._doItExclusive)
1105 # give it a bit of time to check that it's not actually doing anything
1106 self.assertRaises(Queue.Empty, self.done.get_nowait)
1107 _decoratorlock.release()
1109 self.failUnlessEqual(self.done.get_nowait(), "EXC")
1112 def testExclusiveBlocksSharer(self):
1113 _decoratorlock.acquire()
1114 self._addThread(target=self._doItSharer)
1115 self.assertRaises(Queue.Empty, self.done.get_nowait)
1116 _decoratorlock.release()
1118 self.failUnlessEqual(self.done.get_nowait(), "SHR")
1121 def testSharerBlocksExclusive(self):
1122 _decoratorlock.acquire(shared=1)
1123 self._addThread(target=self._doItExclusive)
1124 self.assertRaises(Queue.Empty, self.done.get_nowait)
1125 _decoratorlock.release()
1127 self.failUnlessEqual(self.done.get_nowait(), "EXC")
1130 class TestLockSet(_ThreadedTestCase):
1134 _ThreadedTestCase.setUp(self)
1138 """Helper to (re)initialize the lock set"""
1139 self.resources = ["one", "two", "three"]
1140 self.ls = locking.LockSet(self.resources, "TestLockSet")
1142 def testResources(self):
1143 self.assertEquals(self.ls._names(), set(self.resources))
1144 newls = locking.LockSet([], "TestLockSet.testResources")
1145 self.assertEquals(newls._names(), set())
1147 def testCheckOwnedUnknown(self):
1148 self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one"))
1149 for shared in [-1, 0, 1, 6378, 24255]:
1150 self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one",
1153 def testCheckOwnedUnknownWhileHolding(self):
1154 self.assertFalse(self.ls.check_owned([]))
1155 self.ls.acquire("one", shared=1)
1156 self.assertRaises(errors.LockError, self.ls.check_owned, "nonexist")
1157 self.assertTrue(self.ls.check_owned("one", shared=1))
1158 self.assertFalse(self.ls.check_owned("one", shared=0))
1159 self.assertFalse(self.ls.check_owned(["one", "two"]))
1160 self.assertRaises(errors.LockError, self.ls.check_owned,
1161 ["one", "nonexist"])
1162 self.assertRaises(errors.LockError, self.ls.check_owned, "")
1164 self.assertFalse(self.ls.check_owned([]))
1165 self.assertFalse(self.ls.check_owned("one"))
1167 def testAcquireRelease(self):
1168 self.assertFalse(self.ls.check_owned(self.ls._names()))
1169 self.assert_(self.ls.acquire("one"))
1170 self.assertEquals(self.ls.list_owned(), set(["one"]))
1171 self.assertTrue(self.ls.check_owned("one"))
1172 self.assertTrue(self.ls.check_owned("one", shared=0))
1173 self.assertFalse(self.ls.check_owned("one", shared=1))
1175 self.assertEquals(self.ls.list_owned(), set())
1176 self.assertFalse(self.ls.check_owned(self.ls._names()))
1177 self.assertEquals(self.ls.acquire(["one"]), set(["one"]))
1178 self.assertEquals(self.ls.list_owned(), set(["one"]))
1180 self.assertEquals(self.ls.list_owned(), set())
1181 self.ls.acquire(["one", "two", "three"])
1182 self.assertEquals(self.ls.list_owned(), set(["one", "two", "three"]))
1183 self.assertTrue(self.ls.check_owned(self.ls._names()))
1184 self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0))
1185 self.assertFalse(self.ls.check_owned(self.ls._names(), shared=1))
1186 self.ls.release("one")
1187 self.assertFalse(self.ls.check_owned(["one"]))
1188 self.assertTrue(self.ls.check_owned(["two", "three"]))
1189 self.assertTrue(self.ls.check_owned(["two", "three"], shared=0))
1190 self.assertFalse(self.ls.check_owned(["two", "three"], shared=1))
1191 self.assertEquals(self.ls.list_owned(), set(["two", "three"]))
1192 self.ls.release(["three"])
1193 self.assertEquals(self.ls.list_owned(), set(["two"]))
1195 self.assertEquals(self.ls.list_owned(), set())
1196 self.assertEquals(self.ls.acquire(["one", "three"]), set(["one", "three"]))
1197 self.assertEquals(self.ls.list_owned(), set(["one", "three"]))
1199 self.assertEquals(self.ls.list_owned(), set())
1200 for name in self.ls._names():
1201 self.assertFalse(self.ls.check_owned(name))
1203 def testNoDoubleAcquire(self):
1204 self.ls.acquire("one")
1205 self.assertRaises(AssertionError, self.ls.acquire, "one")
1206 self.assertRaises(AssertionError, self.ls.acquire, ["two"])
1207 self.assertRaises(AssertionError, self.ls.acquire, ["two", "three"])
1209 self.ls.acquire(["one", "three"])
1210 self.ls.release("one")
1211 self.assertRaises(AssertionError, self.ls.acquire, ["two"])
1212 self.ls.release("three")
1214 def testNoWrongRelease(self):
1215 self.assertRaises(AssertionError, self.ls.release)
1216 self.ls.acquire("one")
1217 self.assertRaises(AssertionError, self.ls.release, "two")
1219 def testAddRemove(self):
1221 self.assertEquals(self.ls.list_owned(), set())
1222 self.assert_("four" in self.ls._names())
1223 self.ls.add(["five", "six", "seven"], acquired=1)
1224 self.assert_("five" in self.ls._names())
1225 self.assert_("six" in self.ls._names())
1226 self.assert_("seven" in self.ls._names())
1227 self.assertEquals(self.ls.list_owned(), set(["five", "six", "seven"]))
1228 self.assertEquals(self.ls.remove(["five", "six"]), ["five", "six"])
1229 self.assert_("five" not in self.ls._names())
1230 self.assert_("six" not in self.ls._names())
1231 self.assertEquals(self.ls.list_owned(), set(["seven"]))
1232 self.assertRaises(AssertionError, self.ls.add, "eight", acquired=1)
1233 self.ls.remove("seven")
1234 self.assert_("seven" not in self.ls._names())
1235 self.assertEquals(self.ls.list_owned(), set([]))
1236 self.ls.acquire(None, shared=1)
1237 self.assertRaises(AssertionError, self.ls.add, "eight")
1239 self.ls.acquire(None)
1240 self.ls.add("eight", acquired=1)
1241 self.assert_("eight" in self.ls._names())
1242 self.assert_("eight" in self.ls.list_owned())
1244 self.assert_("nine" in self.ls._names())
1245 self.assert_("nine" not in self.ls.list_owned())
1247 self.ls.remove(["two"])
1248 self.assert_("two" not in self.ls._names())
1249 self.ls.acquire("three")
1250 self.assertEquals(self.ls.remove(["three"]), ["three"])
1251 self.assert_("three" not in self.ls._names())
1252 self.assertEquals(self.ls.remove("three"), [])
1253 self.assertEquals(self.ls.remove(["one", "three", "six"]), ["one"])
1254 self.assert_("one" not in self.ls._names())
1256 def testRemoveNonBlocking(self):
1257 self.ls.acquire("one")
1258 self.assertEquals(self.ls.remove("one"), ["one"])
1259 self.ls.acquire(["two", "three"])
1260 self.assertEquals(self.ls.remove(["two", "three"]),
1263 def testNoDoubleAdd(self):
1264 self.assertRaises(errors.LockError, self.ls.add, "two")
1266 self.assertRaises(errors.LockError, self.ls.add, "four")
1268 def testNoWrongRemoves(self):
1269 self.ls.acquire(["one", "three"], shared=1)
1270 # Cannot remove "two" while holding something which is not a superset
1271 self.assertRaises(AssertionError, self.ls.remove, "two")
1272 # Cannot remove "three" as we are sharing it
1273 self.assertRaises(AssertionError, self.ls.remove, "three")
1275 def testAcquireSetLock(self):
1276 # acquire the set-lock exclusively
1277 self.assertEquals(self.ls.acquire(None), set(["one", "two", "three"]))
1278 self.assertEquals(self.ls.list_owned(), set(["one", "two", "three"]))
1279 self.assertEquals(self.ls.is_owned(), True)
1280 self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
1281 # I can still add/remove elements...
1282 self.assertEquals(self.ls.remove(["two", "three"]), ["two", "three"])
1283 self.assert_(self.ls.add("six"))
1285 # share the set-lock
1286 self.assertEquals(self.ls.acquire(None, shared=1), set(["one", "six"]))
1287 # adding new elements is not possible
1288 self.assertRaises(AssertionError, self.ls.add, "five")
1291 def testAcquireWithRepetitions(self):
1292 self.assertEquals(self.ls.acquire(["two", "two", "three"], shared=1),
1293 set(["two", "two", "three"]))
1294 self.ls.release(["two", "two"])
1295 self.assertEquals(self.ls.list_owned(), set(["three"]))
1297 def testEmptyAcquire(self):
1298 # Acquire an empty list of locks...
1299 self.assertEquals(self.ls.acquire([]), set())
1300 self.assertEquals(self.ls.list_owned(), set())
1301 # New locks can still be addded
1302 self.assert_(self.ls.add("six"))
1303 # "re-acquiring" is not an issue, since we had really acquired nothing
1304 self.assertEquals(self.ls.acquire([], shared=1), set())
1305 self.assertEquals(self.ls.list_owned(), set())
1306 # We haven't really acquired anything, so we cannot release
1307 self.assertRaises(AssertionError, self.ls.release)
1309 def _doLockSet(self, names, shared):
1311 self.ls.acquire(names, shared=shared)
1312 self.done.put("DONE")
1314 except errors.LockError:
1315 self.done.put("ERR")
1317 def _doAddSet(self, names):
1319 self.ls.add(names, acquired=1)
1320 self.done.put("DONE")
1322 except errors.LockError:
1323 self.done.put("ERR")
1325 def _doRemoveSet(self, names):
1326 self.done.put(self.ls.remove(names))
1329 def testConcurrentSharedAcquire(self):
1330 self.ls.acquire(["one", "two"], shared=1)
1331 self._addThread(target=self._doLockSet, args=(["one", "two"], 1))
1333 self.assertEqual(self.done.get_nowait(), "DONE")
1334 self._addThread(target=self._doLockSet, args=(["one", "two", "three"], 1))
1336 self.assertEqual(self.done.get_nowait(), "DONE")
1337 self._addThread(target=self._doLockSet, args=("three", 1))
1339 self.assertEqual(self.done.get_nowait(), "DONE")
1340 self._addThread(target=self._doLockSet, args=(["one", "two"], 0))
1341 self._addThread(target=self._doLockSet, args=(["two", "three"], 0))
1342 self.assertRaises(Queue.Empty, self.done.get_nowait)
1345 self.assertEqual(self.done.get_nowait(), "DONE")
1346 self.assertEqual(self.done.get_nowait(), "DONE")
1349 def testConcurrentExclusiveAcquire(self):
1350 self.ls.acquire(["one", "two"])
1351 self._addThread(target=self._doLockSet, args=("three", 1))
1353 self.assertEqual(self.done.get_nowait(), "DONE")
1354 self._addThread(target=self._doLockSet, args=("three", 0))
1356 self.assertEqual(self.done.get_nowait(), "DONE")
1357 self.assertRaises(Queue.Empty, self.done.get_nowait)
1358 self._addThread(target=self._doLockSet, args=(["one", "two"], 0))
1359 self._addThread(target=self._doLockSet, args=(["one", "two"], 1))
1360 self._addThread(target=self._doLockSet, args=("one", 0))
1361 self._addThread(target=self._doLockSet, args=("one", 1))
1362 self._addThread(target=self._doLockSet, args=(["two", "three"], 0))
1363 self._addThread(target=self._doLockSet, args=(["two", "three"], 1))
1364 self.assertRaises(Queue.Empty, self.done.get_nowait)
1368 self.failUnlessEqual(self.done.get_nowait(), "DONE")
1371 def testSimpleAcquireTimeoutExpiring(self):
1372 names = sorted(self.ls._names())
1373 self.assert_(len(names) >= 3)
1375 # Get name of first lock
1378 # Get name of last lock
1382 # Block first and try to lock it again
1385 # Block last and try to lock all locks
1388 # Block last and try to lock it again
1392 for (wanted, block) in checks:
1393 # Lock in exclusive mode
1394 self.assert_(self.ls.acquire(block, shared=0))
1397 # Try to get the same lock again with a timeout (should never succeed)
1398 acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1400 self.done.put("acquired")
1403 self.assert_(acquired is None)
1404 self.assertFalse(self.ls.list_owned())
1405 self.assertFalse(self.ls.is_owned())
1406 self.done.put("not acquired")
1408 self._addThread(target=_AcquireOne)
1410 # Wait for timeout in thread to expire
1413 # Release exclusive lock again
1416 self.assertEqual(self.done.get_nowait(), "not acquired")
1417 self.assertRaises(Queue.Empty, self.done.get_nowait)
1420 def testDelayedAndExpiringLockAcquire(self):
1422 self.ls.add(["five", "six", "seven", "eight", "nine"])
1424 for expire in (False, True):
1425 names = sorted(self.ls._names())
1426 self.assertEqual(len(names), 8)
1428 lock_ev = dict([(i, threading.Event()) for i in names])
1430 # Lock all in exclusive mode
1431 self.assert_(self.ls.acquire(names, shared=0))
1434 # We'll wait at least 300ms per lock
1435 lockwait = len(names) * [0.3]
1437 # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1438 # this gives us up to 2.4s to fail.
1439 lockall_timeout = 0.4
1441 # This should finish rather quickly
1443 lockall_timeout = len(names) * 5.0
1446 def acquire_notification(name):
1448 self.done.put("getting %s" % name)
1453 if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1454 test_notify=acquire_notification):
1455 self.done.put("got all")
1458 self.done.put("timeout on all")
1461 for ev in lock_ev.values():
1464 t = self._addThread(target=_LockAll)
1466 for idx, name in enumerate(names):
1467 # Wait for actual acquire on this lock to start
1468 lock_ev[name].wait(10.0)
1470 if expire and t.isAlive():
1471 # Wait some time after getting the notification to make sure the lock
1472 # acquire will expire
1473 SafeSleep(lockwait[idx])
1475 self.ls.release(names=name)
1477 self.assertFalse(self.ls.list_owned())
1482 # Not checking which locks were actually acquired. Doing so would be
1483 # too timing-dependant.
1484 self.assertEqual(self.done.get_nowait(), "timeout on all")
1487 self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1488 self.assertEqual(self.done.get_nowait(), "got all")
1489 self.assertRaises(Queue.Empty, self.done.get_nowait)
1492 def testConcurrentRemove(self):
1494 self.ls.acquire(["one", "two", "four"])
1495 self._addThread(target=self._doLockSet, args=(["one", "four"], 0))
1496 self._addThread(target=self._doLockSet, args=(["one", "four"], 1))
1497 self._addThread(target=self._doLockSet, args=(["one", "two"], 0))
1498 self._addThread(target=self._doLockSet, args=(["one", "two"], 1))
1499 self.assertRaises(Queue.Empty, self.done.get_nowait)
1500 self.ls.remove("one")
1504 self.failUnlessEqual(self.done.get_nowait(), "ERR")
1505 self.ls.add(["five", "six"], acquired=1)
1506 self._addThread(target=self._doLockSet, args=(["three", "six"], 1))
1507 self._addThread(target=self._doLockSet, args=(["three", "six"], 0))
1508 self._addThread(target=self._doLockSet, args=(["four", "six"], 1))
1509 self._addThread(target=self._doLockSet, args=(["four", "six"], 0))
1510 self.ls.remove("five")
1514 self.failUnlessEqual(self.done.get_nowait(), "DONE")
1515 self.ls.acquire(["three", "four"])
1516 self._addThread(target=self._doRemoveSet, args=(["four", "six"], ))
1517 self.assertRaises(Queue.Empty, self.done.get_nowait)
1518 self.ls.remove("four")
1520 self.assertEqual(self.done.get_nowait(), ["six"])
1521 self._addThread(target=self._doRemoveSet, args=(["two"]))
1523 self.assertEqual(self.done.get_nowait(), ["two"])
1529 def testConcurrentSharedSetLock(self):
1530 # share the set-lock...
1531 self.ls.acquire(None, shared=1)
1532 # ...another thread can share it too
1533 self._addThread(target=self._doLockSet, args=(None, 1))
1535 self.assertEqual(self.done.get_nowait(), "DONE")
1536 # ...or just share some elements
1537 self._addThread(target=self._doLockSet, args=(["one", "three"], 1))
1539 self.assertEqual(self.done.get_nowait(), "DONE")
1540 # ...but not add new ones or remove any
1541 t = self._addThread(target=self._doAddSet, args=(["nine"]))
1542 self._addThread(target=self._doRemoveSet, args=(["two"], ))
1543 self.assertRaises(Queue.Empty, self.done.get_nowait)
1544 # this just releases the set-lock
1547 self.assertEqual(self.done.get_nowait(), "DONE")
1548 # release the lock on the actual elements so remove() can proceed too
1551 self.failUnlessEqual(self.done.get_nowait(), ["two"])
1556 def testConcurrentExclusiveSetLock(self):
1557 # acquire the set-lock...
1558 self.ls.acquire(None, shared=0)
1559 # ...no one can do anything else
1560 self._addThread(target=self._doLockSet, args=(None, 1))
1561 self._addThread(target=self._doLockSet, args=(None, 0))
1562 self._addThread(target=self._doLockSet, args=(["three"], 0))
1563 self._addThread(target=self._doLockSet, args=(["two"], 1))
1564 self._addThread(target=self._doAddSet, args=(["nine"]))
1565 self.assertRaises(Queue.Empty, self.done.get_nowait)
1569 self.assertEqual(self.done.get(True, 1), "DONE")
1574 def testConcurrentSetLockAdd(self):
1575 self.ls.acquire("one")
1576 # Another thread wants the whole SetLock
1577 self._addThread(target=self._doLockSet, args=(None, 0))
1578 self._addThread(target=self._doLockSet, args=(None, 1))
1579 self.assertRaises(Queue.Empty, self.done.get_nowait)
1580 self.assertRaises(AssertionError, self.ls.add, "four")
1583 self.assertEqual(self.done.get_nowait(), "DONE")
1584 self.assertEqual(self.done.get_nowait(), "DONE")
1585 self.ls.acquire(None)
1586 self._addThread(target=self._doLockSet, args=(None, 0))
1587 self._addThread(target=self._doLockSet, args=(None, 1))
1588 self.assertRaises(Queue.Empty, self.done.get_nowait)
1590 self.ls.add("five", acquired=1)
1591 self.ls.add("six", acquired=1, shared=1)
1592 self.assertEquals(self.ls.list_owned(),
1593 set(["one", "two", "three", "five", "six"]))
1594 self.assertEquals(self.ls.is_owned(), True)
1595 self.assertEquals(self.ls._names(),
1596 set(["one", "two", "three", "four", "five", "six"]))
1599 self.assertEqual(self.done.get_nowait(), "DONE")
1600 self.assertEqual(self.done.get_nowait(), "DONE")
1604 def testEmptyLockSet(self):
1606 self.assertEqual(self.ls.acquire(None), set(["one", "two", "three"]))
1608 self.ls.remove(["one", "two", "three"])
1609 self.assertFalse(self.ls._names())
1610 # and adds/locks by another thread still wait
1611 self._addThread(target=self._doAddSet, args=(["nine"]))
1612 self._addThread(target=self._doLockSet, args=(None, 1))
1613 self._addThread(target=self._doLockSet, args=(None, 0))
1614 self.assertRaises(Queue.Empty, self.done.get_nowait)
1618 self.assertEqual(self.done.get_nowait(), "DONE")
1620 self.assertEqual(self.ls.remove(["nine"]), ["nine"])
1622 self.assertEqual(self.ls.acquire(None, shared=1), set())
1623 # other sharers can go, adds still wait
1624 self._addThread(target=self._doLockSet, args=(None, 1))
1626 self.assertEqual(self.done.get_nowait(), "DONE")
1627 self._addThread(target=self._doAddSet, args=(["nine"]))
1628 self.assertRaises(Queue.Empty, self.done.get_nowait)
1631 self.assertEqual(self.done.get_nowait(), "DONE")
1634 def testAcquireWithNamesDowngrade(self):
1635 self.assertEquals(self.ls.acquire("two", shared=0), set(["two"]))
1636 self.assertTrue(self.ls.is_owned())
1637 self.assertFalse(self.ls._get_lock().is_owned())
1639 self.assertFalse(self.ls.is_owned())
1640 self.assertFalse(self.ls._get_lock().is_owned())
1641 # Can't downgrade after releasing
1642 self.assertRaises(AssertionError, self.ls.downgrade, "two")
1644 def testDowngrade(self):
1645 # Not owning anything, must raise an exception
1646 self.assertFalse(self.ls.is_owned())
1647 self.assertRaises(AssertionError, self.ls.downgrade)
1649 self.assertFalse(compat.any(i.is_owned()
1650 for i in self.ls._get_lockdict().values()))
1651 self.assertFalse(self.ls.check_owned(self.ls._names()))
1652 for name in self.ls._names():
1653 self.assertFalse(self.ls.check_owned(name))
1655 self.assertEquals(self.ls.acquire(None, shared=0),
1656 set(["one", "two", "three"]))
1657 self.assertRaises(AssertionError, self.ls.downgrade, "unknown lock")
1659 self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0))
1660 for name in self.ls._names():
1661 self.assertTrue(self.ls.check_owned(name))
1662 self.assertTrue(self.ls.check_owned(name, shared=0))
1663 self.assertFalse(self.ls.check_owned(name, shared=1))
1665 self.assertTrue(self.ls._get_lock().is_owned(shared=0))
1666 self.assertTrue(compat.all(i.is_owned(shared=0)
1667 for i in self.ls._get_lockdict().values()))
1669 # Start downgrading locks
1670 self.assertTrue(self.ls.downgrade(names=["one"]))
1671 self.assertTrue(self.ls._get_lock().is_owned(shared=0))
1672 self.assertTrue(compat.all(lock.is_owned(shared=[0, 1][int(name == "one")])
1674 self.ls._get_lockdict().items()))
1676 self.assertFalse(self.ls.check_owned("one", shared=0))
1677 self.assertTrue(self.ls.check_owned("one", shared=1))
1678 self.assertTrue(self.ls.check_owned("two", shared=0))
1679 self.assertTrue(self.ls.check_owned("three", shared=0))
1681 # Downgrade second lock
1682 self.assertTrue(self.ls.downgrade(names="two"))
1683 self.assertTrue(self.ls._get_lock().is_owned(shared=0))
1684 should_share = lambda name: [0, 1][int(name in ("one", "two"))]
1685 self.assertTrue(compat.all(lock.is_owned(shared=should_share(name))
1687 self.ls._get_lockdict().items()))
1689 self.assertFalse(self.ls.check_owned("one", shared=0))
1690 self.assertTrue(self.ls.check_owned("one", shared=1))
1691 self.assertFalse(self.ls.check_owned("two", shared=0))
1692 self.assertTrue(self.ls.check_owned("two", shared=1))
1693 self.assertTrue(self.ls.check_owned("three", shared=0))
1695 # Downgrading the last exclusive lock to shared must downgrade the
1696 # lockset-internal lock too
1697 self.assertTrue(self.ls.downgrade(names="three"))
1698 self.assertTrue(self.ls._get_lock().is_owned(shared=1))
1699 self.assertTrue(compat.all(i.is_owned(shared=1)
1700 for i in self.ls._get_lockdict().values()))
1702 # Verify owned locks
1703 for name in self.ls._names():
1704 self.assertTrue(self.ls.check_owned(name, shared=1))
1706 # Downgrading a shared lock must be a no-op
1707 self.assertTrue(self.ls.downgrade(names=["one", "three"]))
1708 self.assertTrue(self.ls._get_lock().is_owned(shared=1))
1709 self.assertTrue(compat.all(i.is_owned(shared=1)
1710 for i in self.ls._get_lockdict().values()))
1714 def testDowngradeEverything(self):
1715 self.assertEqual(self.ls.acquire(locking.ALL_SET, shared=0),
1716 set(["one", "two", "three"]))
1717 self.assertTrue(self.ls.owning_all())
1719 # Ensure all locks are now owned in exclusive mode
1720 for name in self.ls._names():
1721 self.assertTrue(self.ls.check_owned(name, shared=0))
1723 # Downgrade everything
1724 self.assertTrue(self.ls.downgrade())
1726 # Ensure all locks are now owned in shared mode
1727 for name in self.ls._names():
1728 self.assertTrue(self.ls.check_owned(name, shared=1))
1730 self.assertTrue(self.ls.owning_all())
1732 def testPriority(self):
1733 def _Acquire(prev, next, name, priority, success_fn):
1735 self.assert_(self.ls.acquire(name, shared=0,
1737 test_notify=lambda _: next.set()))
1743 # Get all in exclusive mode
1744 self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
1746 done_two = Queue.Queue(0)
1748 first = threading.Event()
1751 acquires = [("one", prio, self.done) for prio in range(1, 33)]
1752 acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
1754 # Use a deterministic random generator
1755 random.Random(741).shuffle(acquires)
1757 for (name, prio, done) in acquires:
1758 ev = threading.Event()
1759 self._addThread(target=_Acquire,
1760 args=(prev, ev, name, prio,
1761 compat.partial(done.put, "Prio%s" % prio)))
1767 # Wait for last acquire to start
1770 # Let threads acquire locks
1773 # Wait for threads to finish
1776 for i in range(1, 33):
1777 self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
1778 self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
1780 self.assertRaises(Queue.Empty, self.done.get_nowait)
1781 self.assertRaises(Queue.Empty, done_two.get_nowait)
1784 class TestGanetiLockManager(_ThreadedTestCase):
1786 _ThreadedTestCase.setUp(self)
1787 self.nodes = ["n1", "n2"]
1788 self.nodegroups = ["g1", "g2"]
1789 self.instances = ["i1", "i2", "i3"]
1790 self.networks = ["net1", "net2", "net3"]
1791 self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups,
1792 self.instances, self.networks)
1795 # Don't try this at home...
1796 locking.GanetiLockManager._instance = None
1798 def testLockingConstants(self):
1799 # The locking library internally cheats by assuming its constants have some
1800 # relationships with each other. Check those hold true.
1801 # This relationship is also used in the Processor to recursively acquire
1802 # the right locks. Again, please don't break it.
1803 for i in range(len(locking.LEVELS)):
1804 self.assertEqual(i, locking.LEVELS[i])
1806 def testDoubleGLFails(self):
1807 self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [], [])
1809 def testLockNames(self):
1810 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
1811 self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
1812 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1813 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1814 set(self.nodegroups))
1815 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1816 set(self.instances))
1817 self.assertEqual(self.GL._names(locking.LEVEL_NETWORK),
1820 def testInitAndResources(self):
1821 locking.GanetiLockManager._instance = None
1822 self.GL = locking.GanetiLockManager([], [], [], [])
1823 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
1824 self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
1825 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1826 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1827 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1828 self.assertEqual(self.GL._names(locking.LEVEL_NETWORK), set())
1830 locking.GanetiLockManager._instance = None
1831 self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [], [])
1832 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
1833 self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
1834 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1835 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1836 set(self.nodegroups))
1837 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1838 self.assertEqual(self.GL._names(locking.LEVEL_NETWORK), set())
1840 locking.GanetiLockManager._instance = None
1841 self.GL = locking.GanetiLockManager([], [], self.instances, [])
1842 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
1843 self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
1844 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1845 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1846 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1847 set(self.instances))
1849 locking.GanetiLockManager._instance = None
1850 self.GL = locking.GanetiLockManager([], [], [], self.networks)
1851 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(["BGL"]))
1852 self.assertEqual(self.GL._names(locking.LEVEL_NODE_ALLOC), set(["NAL"]))
1853 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1854 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1855 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1856 self.assertEqual(self.GL._names(locking.LEVEL_NETWORK),
1859 def testAcquireRelease(self):
1860 self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
1861 self.assertEquals(self.GL.list_owned(locking.LEVEL_CLUSTER), set(["BGL"]))
1862 self.GL.acquire(locking.LEVEL_INSTANCE, ["i1"])
1863 self.GL.acquire(locking.LEVEL_NODEGROUP, ["g2"])
1864 self.GL.acquire(locking.LEVEL_NODE, ["n1", "n2"], shared=1)
1865 self.assertTrue(self.GL.check_owned(locking.LEVEL_NODE, ["n1", "n2"],
1867 self.assertFalse(self.GL.check_owned(locking.LEVEL_INSTANCE, ["i1", "i3"]))
1868 self.GL.release(locking.LEVEL_NODE, ["n2"])
1869 self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set(["n1"]))
1870 self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(["g2"]))
1871 self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(["i1"]))
1872 self.GL.release(locking.LEVEL_NODE)
1873 self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set())
1874 self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(["g2"]))
1875 self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(["i1"]))
1876 self.GL.release(locking.LEVEL_NODEGROUP)
1877 self.GL.release(locking.LEVEL_INSTANCE)
1878 self.assertRaises(errors.LockError, self.GL.acquire,
1879 locking.LEVEL_INSTANCE, ["i5"])
1880 self.GL.acquire(locking.LEVEL_INSTANCE, ["i3"], shared=1)
1881 self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(["i3"]))
1883 def testAcquireWholeSets(self):
1884 self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
1885 self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1886 set(self.instances))
1887 self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE),
1888 set(self.instances))
1889 self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
1890 set(self.nodegroups))
1891 self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP),
1892 set(self.nodegroups))
1893 self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1895 self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
1897 self.assertTrue(self.GL.owning_all(locking.LEVEL_INSTANCE))
1898 self.assertTrue(self.GL.owning_all(locking.LEVEL_NODEGROUP))
1899 self.assertTrue(self.GL.owning_all(locking.LEVEL_NODE))
1900 self.GL.release(locking.LEVEL_NODE)
1901 self.GL.release(locking.LEVEL_NODEGROUP)
1902 self.GL.release(locking.LEVEL_INSTANCE)
1903 self.GL.release(locking.LEVEL_CLUSTER)
1905 def testAcquireWholeAndPartial(self):
1906 self.assertFalse(self.GL.owning_all(locking.LEVEL_INSTANCE))
1907 self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
1908 self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1909 set(self.instances))
1910 self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE),
1911 set(self.instances))
1912 self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ["n2"], shared=1),
1914 self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
1916 self.assertTrue(self.GL.owning_all(locking.LEVEL_INSTANCE))
1917 self.assertFalse(self.GL.owning_all(locking.LEVEL_NODE))
1918 self.GL.release(locking.LEVEL_NODE)
1919 self.GL.release(locking.LEVEL_INSTANCE)
1920 self.GL.release(locking.LEVEL_CLUSTER)
1922 def testBGLDependency(self):
1923 self.assertRaises(AssertionError, self.GL.acquire,
1924 locking.LEVEL_NODE, ["n1", "n2"])
1925 self.assertRaises(AssertionError, self.GL.acquire,
1926 locking.LEVEL_INSTANCE, ["i3"])
1927 self.assertRaises(AssertionError, self.GL.acquire,
1928 locking.LEVEL_NODEGROUP, ["g1"])
1929 self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
1930 self.GL.acquire(locking.LEVEL_NODE, ["n1"])
1931 self.assertRaises(AssertionError, self.GL.release,
1932 locking.LEVEL_CLUSTER, ["BGL"])
1933 self.assertRaises(AssertionError, self.GL.release,
1934 locking.LEVEL_CLUSTER)
1935 self.GL.release(locking.LEVEL_NODE)
1936 self.GL.acquire(locking.LEVEL_INSTANCE, ["i1", "i2"])
1937 self.assertRaises(AssertionError, self.GL.release,
1938 locking.LEVEL_CLUSTER, ["BGL"])
1939 self.assertRaises(AssertionError, self.GL.release,
1940 locking.LEVEL_CLUSTER)
1941 self.GL.release(locking.LEVEL_INSTANCE)
1942 self.GL.acquire(locking.LEVEL_NODEGROUP, None)
1943 self.GL.release(locking.LEVEL_NODEGROUP, ["g1"])
1944 self.assertRaises(AssertionError, self.GL.release,
1945 locking.LEVEL_CLUSTER, ["BGL"])
1946 self.assertRaises(AssertionError, self.GL.release,
1947 locking.LEVEL_CLUSTER)
1948 self.GL.release(locking.LEVEL_NODEGROUP)
1949 self.GL.release(locking.LEVEL_CLUSTER)
1951 def testWrongOrder(self):
1952 self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
1953 self.GL.acquire(locking.LEVEL_NODE, ["n2"])
1954 self.assertRaises(AssertionError, self.GL.acquire,
1955 locking.LEVEL_NODE, ["n1"])
1956 self.assertRaises(AssertionError, self.GL.acquire,
1957 locking.LEVEL_NODEGROUP, ["g1"])
1958 self.assertRaises(AssertionError, self.GL.acquire,
1959 locking.LEVEL_INSTANCE, ["i2"])
1961 def testModifiableLevels(self):
1962 self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER,
1964 self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_NODE_ALLOC,
1966 self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"])
1967 self.GL.add(locking.LEVEL_INSTANCE, ["i4"])
1968 self.GL.remove(locking.LEVEL_INSTANCE, ["i3"])
1969 self.GL.remove(locking.LEVEL_INSTANCE, ["i1"])
1970 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(["i2", "i4"]))
1971 self.GL.add(locking.LEVEL_NODE, ["n3"])
1972 self.GL.remove(locking.LEVEL_NODE, ["n1"])
1973 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(["n2", "n3"]))
1974 self.GL.add(locking.LEVEL_NODEGROUP, ["g3"])
1975 self.GL.remove(locking.LEVEL_NODEGROUP, ["g2"])
1976 self.GL.remove(locking.LEVEL_NODEGROUP, ["g1"])
1977 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(["g3"]))
1978 self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER,
1981 # Helper function to run as a thread that shared the BGL and then acquires
1982 # some locks at another level.
1983 def _doLock(self, level, names, shared):
1985 self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
1986 self.GL.acquire(level, names, shared=shared)
1987 self.done.put("DONE")
1988 self.GL.release(level)
1989 self.GL.release(locking.LEVEL_CLUSTER)
1990 except errors.LockError:
1991 self.done.put("ERR")
1994 def testConcurrency(self):
1995 self.GL.acquire(locking.LEVEL_CLUSTER, ["BGL"], shared=1)
1996 self._addThread(target=self._doLock,
1997 args=(locking.LEVEL_INSTANCE, "i1", 1))
1999 self.assertEqual(self.done.get_nowait(), "DONE")
2000 self.GL.acquire(locking.LEVEL_INSTANCE, ["i3"])
2001 self._addThread(target=self._doLock,
2002 args=(locking.LEVEL_INSTANCE, "i1", 1))
2004 self.assertEqual(self.done.get_nowait(), "DONE")
2005 self._addThread(target=self._doLock,
2006 args=(locking.LEVEL_INSTANCE, "i3", 1))
2007 self.assertRaises(Queue.Empty, self.done.get_nowait)
2008 self.GL.release(locking.LEVEL_INSTANCE)
2010 self.assertEqual(self.done.get_nowait(), "DONE")
2011 self.GL.acquire(locking.LEVEL_INSTANCE, ["i2"], shared=1)
2012 self._addThread(target=self._doLock,
2013 args=(locking.LEVEL_INSTANCE, "i2", 1))
2015 self.assertEqual(self.done.get_nowait(), "DONE")
2016 self._addThread(target=self._doLock,
2017 args=(locking.LEVEL_INSTANCE, "i2", 0))
2018 self.assertRaises(Queue.Empty, self.done.get_nowait)
2019 self.GL.release(locking.LEVEL_INSTANCE)
2021 self.assertEqual(self.done.get(True, 1), "DONE")
2022 self.GL.release(locking.LEVEL_CLUSTER, ["BGL"])
2025 class TestLockMonitor(_ThreadedTestCase):
2027 _ThreadedTestCase.setUp(self)
2028 self.lm = locking.LockMonitor()
2030 def testSingleThread(self):
2033 for i in range(100):
2034 name = "TestLock%s" % i
2035 locks.append(locking.SharedLock(name, monitor=self.lm))
2037 self.assertEqual(len(self.lm._locks), len(locks))
2038 result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"]))
2039 self.assertEqual(len(result.fields), 1)
2040 self.assertEqual(len(result.data), 100)
2045 # The garbage collector might needs some time
2048 raise utils.RetryAgain()
2050 utils.Retry(_CheckLocks, 0.1, 30.0)
2052 self.assertFalse(self.lm._locks)
2054 def testMultiThread(self):
2057 def _CreateLock(prev, next, name):
2059 locks.append(locking.SharedLock(name, monitor=self.lm))
2065 first = threading.Event()
2068 # Use a deterministic random generator
2069 for i in random.Random(4263).sample(range(100), 33):
2070 name = "MtTestLock%s" % i
2071 expnames.append(name)
2073 ev = threading.Event()
2074 self._addThread(target=_CreateLock, args=(prev, ev, name))
2081 # Check order in which locks were added
2082 self.assertEqual([i.name for i in locks], expnames)
2084 # Check query result
2085 result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2086 self.assert_(isinstance(result, dict))
2087 response = objects.QueryResponse.FromDict(result)
2088 self.assertEqual(response.data,
2089 [[(constants.RS_NORMAL, name),
2090 (constants.RS_NORMAL, None),
2091 (constants.RS_NORMAL, None),
2092 (constants.RS_NORMAL, [])]
2093 for name in utils.NiceSort(expnames)])
2094 self.assertEqual(len(response.fields), 4)
2095 self.assertEqual(["name", "mode", "owner", "pending"],
2096 [fdef.name for fdef in response.fields])
2098 # Test exclusive acquire
2099 for tlock in locks[::4]:
2100 tlock.acquire(shared=0)
2102 def _GetExpResult(name):
2103 if tlock.name == name:
2104 return [(constants.RS_NORMAL, name),
2105 (constants.RS_NORMAL, "exclusive"),
2106 (constants.RS_NORMAL,
2107 [threading.currentThread().getName()]),
2108 (constants.RS_NORMAL, [])]
2109 return [(constants.RS_NORMAL, name),
2110 (constants.RS_NORMAL, None),
2111 (constants.RS_NORMAL, None),
2112 (constants.RS_NORMAL, [])]
2114 result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2115 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2116 [_GetExpResult(name)
2117 for name in utils.NiceSort(expnames)])
2121 # Test shared acquire
2122 def _Acquire(lock, shared, ev, notify):
2123 lock.acquire(shared=shared)
2130 for tlock1 in locks[::11]:
2131 for tlock2 in locks[::-15]:
2132 if tlock2 == tlock1:
2136 for tlock3 in locks[::10]:
2137 if tlock3 in (tlock2, tlock1):
2141 releaseev = threading.Event()
2147 ev = threading.Event()
2148 tthreads1.append(self._addThread(target=_Acquire,
2149 args=(tlock1, 1, releaseev, ev)))
2150 acquireev.append(ev)
2152 ev = threading.Event()
2153 tthread2 = self._addThread(target=_Acquire,
2154 args=(tlock2, 1, releaseev, ev))
2155 acquireev.append(ev)
2157 ev = threading.Event()
2158 tthread3 = self._addThread(target=_Acquire,
2159 args=(tlock3, 0, releaseev, ev))
2160 acquireev.append(ev)
2162 # Wait for all locks to be acquired
2166 # Check query result
2167 result = self.lm.QueryLocks(["name", "mode", "owner"])
2168 response = objects.QueryResponse.FromDict(result)
2169 for (name, mode, owner) in response.data:
2170 (name_status, name_value) = name
2171 (owner_status, owner_value) = owner
2173 self.assertEqual(name_status, constants.RS_NORMAL)
2174 self.assertEqual(owner_status, constants.RS_NORMAL)
2176 if name_value == tlock1.name:
2177 self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
2178 self.assertEqual(set(owner_value),
2179 set(i.getName() for i in tthreads1))
2182 if name_value == tlock2.name:
2183 self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
2184 self.assertEqual(owner_value, [tthread2.getName()])
2187 if name_value == tlock3.name:
2188 self.assertEqual(mode, (constants.RS_NORMAL, "exclusive"))
2189 self.assertEqual(owner_value, [tthread3.getName()])
2192 self.assert_(name_value in expnames)
2193 self.assertEqual(mode, (constants.RS_NORMAL, None))
2194 self.assert_(owner_value is None)
2196 # Release locks again
2201 result = self.lm.QueryLocks(["name", "mode", "owner"])
2202 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2203 [[(constants.RS_NORMAL, name),
2204 (constants.RS_NORMAL, None),
2205 (constants.RS_NORMAL, None)]
2206 for name in utils.NiceSort(expnames)])
2208 def testDelete(self):
2209 lock = locking.SharedLock("TestLock", monitor=self.lm)
2211 self.assertEqual(len(self.lm._locks), 1)
2212 result = self.lm.QueryLocks(["name", "mode", "owner"])
2213 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2214 [[(constants.RS_NORMAL, lock.name),
2215 (constants.RS_NORMAL, None),
2216 (constants.RS_NORMAL, None)]])
2220 result = self.lm.QueryLocks(["name", "mode", "owner"])
2221 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2222 [[(constants.RS_NORMAL, lock.name),
2223 (constants.RS_NORMAL, "deleted"),
2224 (constants.RS_NORMAL, None)]])
2225 self.assertEqual(len(self.lm._locks), 1)
2227 def testPending(self):
2228 def _Acquire(lock, shared, prev, next):
2231 lock.acquire(shared=shared, test_notify=next.set)
2237 lock = locking.SharedLock("ExcLock", monitor=self.lm)
2239 for shared in [0, 1]:
2242 self.assertEqual(len(self.lm._locks), 1)
2243 result = self.lm.QueryLocks(["name", "mode", "owner"])
2244 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2245 [[(constants.RS_NORMAL, lock.name),
2246 (constants.RS_NORMAL, "exclusive"),
2247 (constants.RS_NORMAL,
2248 [threading.currentThread().getName()])]])
2252 first = threading.Event()
2256 ev = threading.Event()
2257 threads.append(self._addThread(target=_Acquire,
2258 args=(lock, shared, prev, ev)))
2264 # Wait for last acquire to start waiting
2267 # NOTE: This works only because QueryLocks will acquire the
2268 # lock-internal lock again and won't be able to get the information
2269 # until it has the lock. By then the acquire should be registered in
2270 # SharedLock.__pending (otherwise it's a bug).
2272 # All acquires are waiting now
2274 pending = [("shared", utils.NiceSort(t.getName() for t in threads))]
2276 pending = [("exclusive", [t.getName()]) for t in threads]
2278 result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2279 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2280 [[(constants.RS_NORMAL, lock.name),
2281 (constants.RS_NORMAL, "exclusive"),
2282 (constants.RS_NORMAL,
2283 [threading.currentThread().getName()]),
2284 (constants.RS_NORMAL, pending)]])
2286 self.assertEqual(len(self.lm._locks), 1)
2292 # No pending acquires
2293 result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2294 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2295 [[(constants.RS_NORMAL, lock.name),
2296 (constants.RS_NORMAL, None),
2297 (constants.RS_NORMAL, None),
2298 (constants.RS_NORMAL, [])]])
2300 self.assertEqual(len(self.lm._locks), 1)
2302 def testDeleteAndRecreate(self):
2303 lname = "TestLock101923193"
2305 # Create some locks with the same name and keep all references
2306 locks = [locking.SharedLock(lname, monitor=self.lm)
2309 self.assertEqual(len(self.lm._locks), len(locks))
2311 result = self.lm.QueryLocks(["name", "mode", "owner"])
2312 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2313 [[(constants.RS_NORMAL, lname),
2314 (constants.RS_NORMAL, None),
2315 (constants.RS_NORMAL, None)]] * 5)
2319 # Check information order
2320 result = self.lm.QueryLocks(["name", "mode", "owner"])
2321 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2322 [[(constants.RS_NORMAL, lname),
2323 (constants.RS_NORMAL, None),
2324 (constants.RS_NORMAL, None)]] * 2 +
2325 [[(constants.RS_NORMAL, lname),
2326 (constants.RS_NORMAL, "deleted"),
2327 (constants.RS_NORMAL, None)]] +
2328 [[(constants.RS_NORMAL, lname),
2329 (constants.RS_NORMAL, None),
2330 (constants.RS_NORMAL, None)]] * 2)
2332 locks[1].acquire(shared=0)
2335 [(constants.RS_NORMAL, lname),
2336 (constants.RS_NORMAL, None),
2337 (constants.RS_NORMAL, None)],
2338 [(constants.RS_NORMAL, lname),
2339 (constants.RS_NORMAL, "exclusive"),
2340 (constants.RS_NORMAL, [threading.currentThread().getName()])],
2341 [(constants.RS_NORMAL, lname),
2342 (constants.RS_NORMAL, "deleted"),
2343 (constants.RS_NORMAL, None)],
2344 [(constants.RS_NORMAL, lname),
2345 (constants.RS_NORMAL, None),
2346 (constants.RS_NORMAL, None)],
2347 [(constants.RS_NORMAL, lname),
2348 (constants.RS_NORMAL, None),
2349 (constants.RS_NORMAL, None)],
2352 # Check information order
2353 result = self.lm.QueryLocks(["name", "mode", "owner"])
2354 self.assertEqual(objects.QueryResponse.FromDict(result).data, last_status)
2356 self.assertEqual(len(set(self.lm._locks.values())), len(locks))
2357 self.assertEqual(len(self.lm._locks), len(locks))
2359 # Check lock deletion
2360 for idx in range(len(locks)):
2362 assert gc.isenabled()
2364 self.assertEqual(len(self.lm._locks), len(locks))
2365 result = self.lm.QueryLocks(["name", "mode", "owner"])
2366 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2367 last_status[idx + 1:])
2369 # All locks should have been deleted
2371 self.assertFalse(self.lm._locks)
2373 result = self.lm.QueryLocks(["name", "mode", "owner"])
2374 self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
2380 def AddResult(self, *args):
2381 self._info.append(args)
2383 def CountPending(self):
2384 return len(self._info)
2386 def GetLockInfo(self, requested):
2387 (exp_requested, result) = self._info.pop(0)
2389 if exp_requested != requested:
2390 raise Exception("Requested information (%s) does not match"
2391 " expectations (%s)" % (requested, exp_requested))
2395 def testMultipleResults(self):
2396 fl1 = self._FakeLock()
2397 fl2 = self._FakeLock()
2399 self.lm.RegisterLock(fl1)
2400 self.lm.RegisterLock(fl2)
2403 for i in [fl1, fl2]:
2404 i.AddResult(set([query.LQ_MODE, query.LQ_OWNER]), [])
2405 result = self.lm.QueryLocks(["name", "mode", "owner"])
2406 self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
2407 for i in [fl1, fl2]:
2408 self.assertEqual(i.CountPending(), 0)
2411 for fn in [lambda x: x, reversed, sorted]:
2412 fl1.AddResult(set(), list(fn([
2413 ("aaa", None, None, None),
2414 ("bbb", None, None, None),
2416 fl2.AddResult(set(), [])
2417 result = self.lm.QueryLocks(["name"])
2418 self.assertEqual(objects.QueryResponse.FromDict(result).data, [
2419 [(constants.RS_NORMAL, "aaa")],
2420 [(constants.RS_NORMAL, "bbb")],
2422 for i in [fl1, fl2]:
2423 self.assertEqual(i.CountPending(), 0)
2425 for fn2 in [lambda x: x, reversed, sorted]:
2426 fl1.AddResult(set([query.LQ_MODE]), list(fn([
2427 # Same name, but different information
2428 ("aaa", "mode0", None, None),
2429 ("aaa", "mode1", None, None),
2430 ("aaa", "mode2", None, None),
2431 ("aaa", "mode3", None, None),
2433 fl2.AddResult(set([query.LQ_MODE]), [
2434 ("zzz", "end", None, None),
2435 ("000", "start", None, None),
2437 ("aaa", "b200", None, None),
2438 ("aaa", "b300", None, None),
2440 result = self.lm.QueryLocks(["name", "mode"])
2441 self.assertEqual(objects.QueryResponse.FromDict(result).data, [
2442 [(constants.RS_NORMAL, "000"), (constants.RS_NORMAL, "start")],
2444 # Name is the same, so order must be equal to incoming order
2445 [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode0")],
2446 [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode1")],
2447 [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode2")],
2448 [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode3")],
2450 [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b200")],
2451 [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b300")],
2453 [(constants.RS_NORMAL, "zzz"), (constants.RS_NORMAL, "end")],
2455 for i in [fl1, fl2]:
2456 self.assertEqual(i.CountPending(), 0)
2459 if __name__ == "__main__":
2460 testutils.GanetiTestProgram()