4 # Copyright (C) 2006, 2007, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the locking module"""
34 from ganeti import constants
35 from ganeti import locking
36 from ganeti import errors
37 from ganeti import utils
38 from ganeti import compat
39 from ganeti import objects
40 from ganeti import query
45 # This is used to test the ssynchronize decorator.
46 # Since it's passed as input to a decorator it must be declared as a global.
47 _decoratorlock = locking.SharedLock("decorator lock")
49 #: List for looping tests
54 """Decorator for executing a function many times"""
55 def wrapper(*args, **kwargs):
61 def SafeSleep(duration):
64 delay = start + duration - time.time()
70 class _ThreadedTestCase(unittest.TestCase):
71 """Test class that supports adding/waiting on threads"""
73 unittest.TestCase.setUp(self)
74 self.done = Queue.Queue(0)
77 def _addThread(self, *args, **kwargs):
78 """Create and remember a new thread"""
79 t = threading.Thread(*args, **kwargs)
80 self.threads.append(t)
84 def _waitThreads(self):
85 """Wait for all our threads to finish"""
86 for t in self.threads:
88 self.failIf(t.isAlive())
92 class _ConditionTestCase(_ThreadedTestCase):
93 """Common test case for conditions"""
96 _ThreadedTestCase.setUp(self)
97 self.lock = threading.Lock()
98 self.cond = cls(self.lock)
100 def _testAcquireRelease(self):
101 self.assertFalse(self.cond._is_owned())
102 self.assertRaises(RuntimeError, self.cond.wait, None)
103 self.assertRaises(RuntimeError, self.cond.notifyAll)
106 self.assert_(self.cond._is_owned())
107 self.cond.notifyAll()
108 self.assert_(self.cond._is_owned())
111 self.assertFalse(self.cond._is_owned())
112 self.assertRaises(RuntimeError, self.cond.wait, None)
113 self.assertRaises(RuntimeError, self.cond.notifyAll)
115 def _testNotification(self):
120 self.cond.notifyAll()
125 self._addThread(target=_NotifyAll)
126 self.assertEqual(self.done.get(True, 1), "NE")
127 self.assertRaises(Queue.Empty, self.done.get_nowait)
129 self.assertEqual(self.done.get(True, 1), "NA")
130 self.assertEqual(self.done.get(True, 1), "NN")
131 self.assert_(self.cond._is_owned())
133 self.assertFalse(self.cond._is_owned())
136 class TestSingleNotifyPipeCondition(_ConditionTestCase):
137 """SingleNotifyPipeCondition tests"""
140 _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
142 def testAcquireRelease(self):
143 self._testAcquireRelease()
145 def testNotification(self):
146 self._testNotification()
148 def testWaitReuse(self):
154 def testNoNotifyReuse(self):
156 self.cond.notifyAll()
157 self.assertRaises(RuntimeError, self.cond.wait, None)
158 self.assertRaises(RuntimeError, self.cond.notifyAll)
162 class TestPipeCondition(_ConditionTestCase):
163 """PipeCondition tests"""
166 _ConditionTestCase.setUp(self, locking.PipeCondition)
168 def testAcquireRelease(self):
169 self._testAcquireRelease()
171 def testNotification(self):
172 self._testNotification()
174 def _TestWait(self, fn):
176 self._addThread(target=fn),
177 self._addThread(target=fn),
178 self._addThread(target=fn),
181 # Wait for threads to be waiting
183 self.assertEqual(self.done.get(True, 1), "A")
185 self.assertRaises(Queue.Empty, self.done.get_nowait)
188 self.assertEqual(len(self.cond._waiters), 3)
189 self.assertEqual(self.cond._waiters, set(threads))
190 # This new thread can't acquire the lock, and thus call wait, before we
192 self._addThread(target=fn)
193 self.cond.notifyAll()
194 self.assertRaises(Queue.Empty, self.done.get_nowait)
197 # We should now get 3 W and 1 A (for the new thread) in whatever order
201 got = self.done.get(True, 1)
207 self.fail("Got %s on the done queue" % got)
209 self.assertEqual(w, 3)
210 self.assertEqual(a, 1)
213 self.cond.notifyAll()
216 self.assertEqual(self.done.get_nowait(), "W")
217 self.assertRaises(Queue.Empty, self.done.get_nowait)
219 def testBlockingWait(self):
227 self._TestWait(_BlockingWait)
229 def testLongTimeoutWait(self):
237 self._TestWait(_Helper)
239 def _TimeoutWait(self, timeout, check):
241 self.cond.wait(timeout)
245 def testShortTimeoutWait(self):
246 self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
247 self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
249 self.assertEqual(self.done.get_nowait(), "T1")
250 self.assertEqual(self.done.get_nowait(), "T1")
251 self.assertRaises(Queue.Empty, self.done.get_nowait)
253 def testZeroTimeoutWait(self):
254 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
255 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
256 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
258 self.assertEqual(self.done.get_nowait(), "T0")
259 self.assertEqual(self.done.get_nowait(), "T0")
260 self.assertEqual(self.done.get_nowait(), "T0")
261 self.assertRaises(Queue.Empty, self.done.get_nowait)
264 class TestSharedLock(_ThreadedTestCase):
265 """SharedLock tests"""
268 _ThreadedTestCase.setUp(self)
269 self.sl = locking.SharedLock("TestSharedLock")
271 def testSequenceAndOwnership(self):
272 self.assertFalse(self.sl.is_owned())
273 self.sl.acquire(shared=1)
274 self.assert_(self.sl.is_owned())
275 self.assert_(self.sl.is_owned(shared=1))
276 self.assertFalse(self.sl.is_owned(shared=0))
278 self.assertFalse(self.sl.is_owned())
280 self.assert_(self.sl.is_owned())
281 self.assertFalse(self.sl.is_owned(shared=1))
282 self.assert_(self.sl.is_owned(shared=0))
284 self.assertFalse(self.sl.is_owned())
285 self.sl.acquire(shared=1)
286 self.assert_(self.sl.is_owned())
287 self.assert_(self.sl.is_owned(shared=1))
288 self.assertFalse(self.sl.is_owned(shared=0))
290 self.assertFalse(self.sl.is_owned())
292 def testBooleanValue(self):
293 # semaphores are supposed to return a true value on a successful acquire
294 self.assert_(self.sl.acquire(shared=1))
296 self.assert_(self.sl.acquire())
299 def testDoubleLockingStoE(self):
300 self.sl.acquire(shared=1)
301 self.assertRaises(AssertionError, self.sl.acquire)
303 def testDoubleLockingEtoS(self):
305 self.assertRaises(AssertionError, self.sl.acquire, shared=1)
307 def testDoubleLockingStoS(self):
308 self.sl.acquire(shared=1)
309 self.assertRaises(AssertionError, self.sl.acquire, shared=1)
311 def testDoubleLockingEtoE(self):
313 self.assertRaises(AssertionError, self.sl.acquire)
315 # helper functions: called in a separate thread they acquire the lock, send
316 # their identifier on the done queue, then release it.
317 def _doItSharer(self):
319 self.sl.acquire(shared=1)
322 except errors.LockError:
325 def _doItExclusive(self):
330 except errors.LockError:
333 def _doItDelete(self):
337 except errors.LockError:
340 def testSharersCanCoexist(self):
341 self.sl.acquire(shared=1)
342 threading.Thread(target=self._doItSharer).start()
343 self.assert_(self.done.get(True, 1))
347 def testExclusiveBlocksExclusive(self):
349 self._addThread(target=self._doItExclusive)
350 self.assertRaises(Queue.Empty, self.done.get_nowait)
353 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
356 def testExclusiveBlocksDelete(self):
358 self._addThread(target=self._doItDelete)
359 self.assertRaises(Queue.Empty, self.done.get_nowait)
362 self.failUnlessEqual(self.done.get_nowait(), 'DEL')
363 self.sl = locking.SharedLock(self.sl.name)
366 def testExclusiveBlocksSharer(self):
368 self._addThread(target=self._doItSharer)
369 self.assertRaises(Queue.Empty, self.done.get_nowait)
372 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
375 def testSharerBlocksExclusive(self):
376 self.sl.acquire(shared=1)
377 self._addThread(target=self._doItExclusive)
378 self.assertRaises(Queue.Empty, self.done.get_nowait)
381 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
384 def testSharerBlocksDelete(self):
385 self.sl.acquire(shared=1)
386 self._addThread(target=self._doItDelete)
387 self.assertRaises(Queue.Empty, self.done.get_nowait)
390 self.failUnlessEqual(self.done.get_nowait(), 'DEL')
391 self.sl = locking.SharedLock(self.sl.name)
394 def testWaitingExclusiveBlocksSharer(self):
395 """SKIPPED testWaitingExclusiveBlockSharer"""
398 self.sl.acquire(shared=1)
399 # the lock is acquired in shared mode...
400 self._addThread(target=self._doItExclusive)
401 # ...but now an exclusive is waiting...
402 self._addThread(target=self._doItSharer)
403 # ...so the sharer should be blocked as well
404 self.assertRaises(Queue.Empty, self.done.get_nowait)
407 # The exclusive passed before
408 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
409 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
412 def testWaitingSharerBlocksExclusive(self):
413 """SKIPPED testWaitingSharerBlocksExclusive"""
417 # the lock is acquired in exclusive mode...
418 self._addThread(target=self._doItSharer)
419 # ...but now a sharer is waiting...
420 self._addThread(target=self._doItExclusive)
421 # ...the exclusive is waiting too...
422 self.assertRaises(Queue.Empty, self.done.get_nowait)
425 # The sharer passed before
426 self.assertEqual(self.done.get_nowait(), 'SHR')
427 self.assertEqual(self.done.get_nowait(), 'EXC')
429 def testDelete(self):
431 self.assertRaises(errors.LockError, self.sl.acquire)
432 self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
433 self.assertRaises(errors.LockError, self.sl.delete)
435 def testDeleteTimeout(self):
436 self.assertTrue(self.sl.delete(timeout=60))
438 def testDeleteTimeoutFail(self):
439 ready = threading.Event()
440 finish = threading.Event()
443 self.sl.acquire(shared=0)
449 self._addThread(target=fn)
452 # Test if deleting a lock owned in exclusive mode by another thread fails
453 # to delete when a timeout is used
454 self.assertFalse(self.sl.delete(timeout=0.02))
459 self.assertTrue(self.sl.delete())
460 self.assertRaises(errors.LockError, self.sl.acquire)
462 def testNoDeleteIfSharer(self):
463 self.sl.acquire(shared=1)
464 self.assertRaises(AssertionError, self.sl.delete)
467 def testDeletePendingSharersExclusiveDelete(self):
469 self._addThread(target=self._doItSharer)
470 self._addThread(target=self._doItSharer)
471 self._addThread(target=self._doItExclusive)
472 self._addThread(target=self._doItDelete)
475 # The threads who were pending return ERR
477 self.assertEqual(self.done.get_nowait(), 'ERR')
478 self.sl = locking.SharedLock(self.sl.name)
481 def testDeletePendingDeleteExclusiveSharers(self):
483 self._addThread(target=self._doItDelete)
484 self._addThread(target=self._doItExclusive)
485 self._addThread(target=self._doItSharer)
486 self._addThread(target=self._doItSharer)
489 # The two threads who were pending return both ERR
490 self.assertEqual(self.done.get_nowait(), 'ERR')
491 self.assertEqual(self.done.get_nowait(), 'ERR')
492 self.assertEqual(self.done.get_nowait(), 'ERR')
493 self.assertEqual(self.done.get_nowait(), 'ERR')
494 self.sl = locking.SharedLock(self.sl.name)
497 def testExclusiveAcquireTimeout(self):
498 for shared in [0, 1]:
499 on_queue = threading.Event()
500 release_exclusive = threading.Event()
502 def _LockExclusive():
503 self.sl.acquire(shared=0, test_notify=on_queue.set)
504 self.done.put("A: start wait")
505 release_exclusive.wait()
506 self.done.put("A: end wait")
509 # Start thread to hold lock in exclusive mode
510 self._addThread(target=_LockExclusive)
512 # Wait for wait to begin
513 self.assertEqual(self.done.get(timeout=60), "A: start wait")
515 # Wait up to 60s to get lock, but release exclusive lock as soon as we're
517 self.failUnless(self.sl.acquire(shared=shared, timeout=60,
518 test_notify=release_exclusive.set))
520 self.done.put("got 2nd")
525 self.assertEqual(self.done.get_nowait(), "A: end wait")
526 self.assertEqual(self.done.get_nowait(), "got 2nd")
527 self.assertRaises(Queue.Empty, self.done.get_nowait)
530 def testAcquireExpiringTimeout(self):
531 def _AcquireWithTimeout(shared, timeout):
532 if not self.sl.acquire(shared=shared, timeout=timeout):
533 self.done.put("timeout")
535 for shared in [0, 1]:
539 # Start shared acquires with timeout between 0 and 20 ms
541 self._addThread(target=_AcquireWithTimeout,
542 args=(shared, i * 2.0 / 1000.0))
544 # Wait for threads to finish (makes sure the acquire timeout expires
545 # before releasing the lock)
552 self.assertEqual(self.done.get_nowait(), "timeout")
554 self.assertRaises(Queue.Empty, self.done.get_nowait)
557 def testSharedSkipExclusiveAcquires(self):
558 # Tests whether shared acquires jump in front of exclusive acquires in the
561 def _Acquire(shared, name, notify_ev, wait_ev):
563 notify_fn = notify_ev.set
570 if not self.sl.acquire(shared=shared, test_notify=notify_fn):
576 # Get exclusive lock while we fill the queue
584 # Add acquires using threading.Event for synchronization. They'll be
585 # acquired exactly in the order defined in this list.
586 acquires = (shrcnt1 * [(1, "shared 1")] +
587 3 * [(0, "exclusive 1")] +
588 shrcnt2 * [(1, "shared 2")] +
589 shrcnt3 * [(1, "shared 3")] +
590 shrcnt4 * [(1, "shared 4")] +
591 3 * [(0, "exclusive 2")])
596 for args in acquires:
597 ev_cur = threading.Event()
598 self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
601 # Wait for last acquire to start
604 # Expect 6 pending exclusive acquires and 1 for all shared acquires
606 self.assertEqual(self.sl._count_pending(), 7)
608 # Release exclusive lock and wait
614 for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
615 # Shared locks aren't guaranteed to be notified in order, but they'll be
617 tmp = self.done.get_nowait()
618 if tmp == "shared 1":
620 elif tmp == "shared 2":
622 elif tmp == "shared 3":
624 elif tmp == "shared 4":
626 self.assertEqual(shrcnt1, 0)
627 self.assertEqual(shrcnt2, 0)
628 self.assertEqual(shrcnt3, 0)
629 self.assertEqual(shrcnt3, 0)
632 self.assertEqual(self.done.get_nowait(), "exclusive 1")
635 self.assertEqual(self.done.get_nowait(), "exclusive 2")
637 self.assertRaises(Queue.Empty, self.done.get_nowait)
639 def testIllegalDowngrade(self):
641 self.assertRaises(AssertionError, self.sl.downgrade)
643 # Acquire in shared mode, downgrade should be no-op
644 self.assertTrue(self.sl.acquire(shared=1))
645 self.assertTrue(self.sl.is_owned(shared=1))
646 self.assertTrue(self.sl.downgrade())
647 self.assertTrue(self.sl.is_owned(shared=1))
650 def testDowngrade(self):
651 self.assertTrue(self.sl.acquire())
652 self.assertTrue(self.sl.is_owned(shared=0))
653 self.assertTrue(self.sl.downgrade())
654 self.assertTrue(self.sl.is_owned(shared=1))
658 def testDowngradeJumpsAheadOfExclusive(self):
659 def _KeepExclusive(ev_got, ev_downgrade, ev_release):
660 self.assertTrue(self.sl.acquire())
661 self.assertTrue(self.sl.is_owned(shared=0))
664 self.assertTrue(self.sl.is_owned(shared=0))
665 self.assertTrue(self.sl.downgrade())
666 self.assertTrue(self.sl.is_owned(shared=1))
668 self.assertTrue(self.sl.is_owned(shared=1))
671 def _KeepExclusive2(ev_started, ev_release):
672 self.assertTrue(self.sl.acquire(test_notify=ev_started.set))
673 self.assertTrue(self.sl.is_owned(shared=0))
675 self.assertTrue(self.sl.is_owned(shared=0))
678 def _KeepShared(ev_started, ev_got, ev_release):
679 self.assertTrue(self.sl.acquire(shared=1, test_notify=ev_started.set))
680 self.assertTrue(self.sl.is_owned(shared=1))
683 self.assertTrue(self.sl.is_owned(shared=1))
686 # Acquire lock in exclusive mode
687 ev_got_excl1 = threading.Event()
688 ev_downgrade_excl1 = threading.Event()
689 ev_release_excl1 = threading.Event()
690 th_excl1 = self._addThread(target=_KeepExclusive,
691 args=(ev_got_excl1, ev_downgrade_excl1,
695 # Start a second exclusive acquire
696 ev_started_excl2 = threading.Event()
697 ev_release_excl2 = threading.Event()
698 th_excl2 = self._addThread(target=_KeepExclusive2,
699 args=(ev_started_excl2, ev_release_excl2))
700 ev_started_excl2.wait()
702 # Start shared acquires, will jump ahead of second exclusive acquire when
703 # first exclusive acquire downgrades
704 ev_shared = [(threading.Event(), threading.Event()) for _ in range(5)]
705 ev_release_shared = threading.Event()
707 th_shared = [self._addThread(target=_KeepShared,
708 args=(ev_started, ev_got, ev_release_shared))
709 for (ev_started, ev_got) in ev_shared]
711 # Wait for all shared acquires to start
712 for (ev, _) in ev_shared:
715 # Check lock information
716 self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
717 [(self.sl.name, "exclusive", [th_excl1.getName()], None)])
718 [(_, _, _, pending), ] = self.sl.GetLockInfo(set([query.LQ_PENDING]))
719 self.assertEqual([(pendmode, sorted(waiting))
720 for (pendmode, waiting) in pending],
721 [("exclusive", [th_excl2.getName()]),
722 ("shared", sorted(th.getName() for th in th_shared))])
724 # Shared acquires won't start until the exclusive lock is downgraded
725 ev_downgrade_excl1.set()
727 # Wait for all shared acquires to be successful
728 for (_, ev) in ev_shared:
731 # Check lock information again
732 self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE,
734 [(self.sl.name, "shared", None,
735 [("exclusive", [th_excl2.getName()])])])
736 [(_, _, owner, _), ] = self.sl.GetLockInfo(set([query.LQ_OWNER]))
737 self.assertEqual(set(owner), set([th_excl1.getName()] +
738 [th.getName() for th in th_shared]))
740 ev_release_excl1.set()
741 ev_release_excl2.set()
742 ev_release_shared.set()
746 self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER,
748 [(self.sl.name, None, None, [])])
751 def testMixedAcquireTimeout(self):
752 sync = threading.Event()
754 def _AcquireShared(ev):
755 if not self.sl.acquire(shared=1, timeout=None):
758 self.done.put("shared")
763 # Wait for notification from main thread
771 ev = threading.Event()
772 self._addThread(target=_AcquireShared, args=(ev, ))
775 # Wait for all acquires to finish
779 self.assertEqual(self.sl._count_pending(), 0)
781 # Try to get exclusive lock
782 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
784 # Acquire exclusive without timeout
785 exclsync = threading.Event()
786 exclev = threading.Event()
788 def _AcquireExclusive():
789 if not self.sl.acquire(shared=0):
792 self.done.put("exclusive")
797 # Wait for notification from main thread
802 self._addThread(target=_AcquireExclusive)
804 # Try to get exclusive lock
805 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
807 # Make all shared holders release their locks
810 # Wait for exclusive acquire to succeed
813 self.assertEqual(self.sl._count_pending(), 0)
815 # Try to get exclusive lock
816 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
818 def _AcquireSharedSimple():
819 if self.sl.acquire(shared=1, timeout=None):
820 self.done.put("shared2")
824 self._addThread(target=_AcquireSharedSimple)
826 # Tell exclusive lock to release
829 # Wait for everything to finish
832 self.assertEqual(self.sl._count_pending(), 0)
836 self.assertEqual(self.done.get_nowait(), "shared")
838 self.assertEqual(self.done.get_nowait(), "exclusive")
841 self.assertEqual(self.done.get_nowait(), "shared2")
843 self.assertRaises(Queue.Empty, self.done.get_nowait)
845 def testPriority(self):
846 # Acquire in exclusive mode
847 self.assert_(self.sl.acquire(shared=0))
850 def _Acquire(prev, next, shared, priority, result):
852 self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
854 self.done.put(result)
858 counter = itertools.count(0)
859 priorities = range(-20, 30)
860 first = threading.Event()
866 # [(shared/exclusive, set(acquire names), set(pending threads)),
867 # (shared/exclusive, ...),
873 # References shared acquire per priority in L{perprio}. Data structure:
875 # priority: (shared=1, set(acquire names), set(pending threads)),
879 for seed in [4979, 9523, 14902, 32440]:
880 # Use a deterministic random generator
881 rnd = random.Random(seed)
882 for priority in [rnd.choice(priorities) for _ in range(30)]:
887 acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
889 ev = threading.Event()
890 thread = self._addThread(target=_Acquire,
891 args=(prev, ev, shared, priority, acqname))
894 # Record expected aqcuire, see above for structure
895 data = (shared, set([acqname]), set([thread]))
896 priolist = perprio.setdefault(priority, [])
898 priosh = prioshared.get(priority, None)
900 # Shared acquires are merged
901 for i, j in zip(priosh[1:], data[1:]):
903 assert data[0] == priosh[0]
905 prioshared[priority] = data
906 priolist.append(data)
908 priolist.append(data)
910 # Start all acquires and wait for them
914 # Check lock information
915 self.assertEqual(self.sl.GetLockInfo(set()),
916 [(self.sl.name, None, None, None)])
917 self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
918 [(self.sl.name, "exclusive",
919 [threading.currentThread().getName()], None)])
921 self._VerifyPrioPending(self.sl.GetLockInfo(set([query.LQ_PENDING])),
924 # Let threads acquire the lock
927 # Wait for everything to finish
930 self.assert_(self.sl._check_empty())
932 # Check acquires by priority
933 for acquires in [perprio[i] for i in sorted(perprio.keys())]:
934 for (_, names, _) in acquires:
935 # For shared acquires, the set will contain 1..n entries. For exclusive
938 names.remove(self.done.get_nowait())
939 self.assertFalse(compat.any(names for (_, names, _) in acquires))
941 self.assertRaises(Queue.Empty, self.done.get_nowait)
943 def _VerifyPrioPending(self, ((name, mode, owner, pending), ), perprio):
944 self.assertEqual(name, self.sl.name)
945 self.assert_(mode is None)
946 self.assert_(owner is None)
948 self.assertEqual([(pendmode, sorted(waiting))
949 for (pendmode, waiting) in pending],
950 [(["exclusive", "shared"][int(bool(shared))],
951 sorted(t.getName() for t in threads))
952 for acquires in [perprio[i]
953 for i in sorted(perprio.keys())]
954 for (shared, _, threads) in acquires])
956 class _FakeTimeForSpuriousNotifications:
957 def __init__(self, now, check_end):
959 self.check_end = check_end
961 # Deterministic random number generator
962 self.rnd = random.Random(15086)
965 # Advance time if the random number generator thinks so (this is to test
966 # multiple notifications without advancing the time)
967 if self.rnd.random() < 0.3:
968 self.now += self.rnd.random()
970 self.check_end(self.now)
975 def testAcquireTimeoutWithSpuriousNotifications(self):
976 ready = threading.Event()
977 locked = threading.Event()
984 self.assertFalse(locked.isSet())
986 # If we waited long enough (in virtual time), tell main thread to release
987 # lock, otherwise tell it to notify once more
988 req.put(now < (epoch + (timeout * 0.8)))
990 time_fn = self._FakeTimeForSpuriousNotifications(epoch, check_end).time
992 sl = locking.SharedLock("test", _time_fn=time_fn)
994 # Acquire in exclusive mode
998 self.assertTrue(sl.acquire(shared=0, timeout=timeout,
999 test_notify=ready.set))
1002 self.done.put("success")
1004 # Start acquire with timeout and wait for it to be ready
1005 self._addThread(target=fn)
1008 # The separate thread is now waiting to acquire the lock, so start sending
1009 # spurious notifications.
1011 # Wait for separate thread to ask for another notification
1014 # After sending the notification, the lock will take a short amount of
1015 # time to notice and to retrieve the current time
1016 sl._notify_topmost()
1019 self.assertTrue(count > 100, "Not enough notifications were sent")
1021 self.assertFalse(locked.isSet())
1023 # Some notifications have been sent, now actually release the lock
1026 # Wait for lock to be acquired
1031 self.assertEqual(self.done.get_nowait(), "success")
1032 self.assertRaises(Queue.Empty, self.done.get_nowait)
1035 class TestSharedLockInCondition(_ThreadedTestCase):
1036 """SharedLock as a condition lock tests"""
1039 _ThreadedTestCase.setUp(self)
1040 self.sl = locking.SharedLock("TestSharedLockInCondition")
1043 def setCondition(self):
1044 self.cond = threading.Condition(self.sl)
1046 def testKeepMode(self):
1047 self.cond.acquire(shared=1)
1048 self.assert_(self.sl.is_owned(shared=1))
1050 self.assert_(self.sl.is_owned(shared=1))
1052 self.cond.acquire(shared=0)
1053 self.assert_(self.sl.is_owned(shared=0))
1055 self.assert_(self.sl.is_owned(shared=0))
1059 class TestSharedLockInPipeCondition(TestSharedLockInCondition):
1060 """SharedLock as a pipe condition lock tests"""
1062 def setCondition(self):
1063 self.cond = locking.PipeCondition(self.sl)
1066 class TestSSynchronizedDecorator(_ThreadedTestCase):
1067 """Shared Lock Synchronized decorator test"""
1070 _ThreadedTestCase.setUp(self)
1072 @locking.ssynchronized(_decoratorlock)
1073 def _doItExclusive(self):
1074 self.assert_(_decoratorlock.is_owned())
1075 self.done.put('EXC')
1077 @locking.ssynchronized(_decoratorlock, shared=1)
1078 def _doItSharer(self):
1079 self.assert_(_decoratorlock.is_owned(shared=1))
1080 self.done.put('SHR')
1082 def testDecoratedFunctions(self):
1083 self._doItExclusive()
1084 self.assertFalse(_decoratorlock.is_owned())
1086 self.assertFalse(_decoratorlock.is_owned())
1088 def testSharersCanCoexist(self):
1089 _decoratorlock.acquire(shared=1)
1090 threading.Thread(target=self._doItSharer).start()
1091 self.assert_(self.done.get(True, 1))
1092 _decoratorlock.release()
1095 def testExclusiveBlocksExclusive(self):
1096 _decoratorlock.acquire()
1097 self._addThread(target=self._doItExclusive)
1098 # give it a bit of time to check that it's not actually doing anything
1099 self.assertRaises(Queue.Empty, self.done.get_nowait)
1100 _decoratorlock.release()
1102 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
1105 def testExclusiveBlocksSharer(self):
1106 _decoratorlock.acquire()
1107 self._addThread(target=self._doItSharer)
1108 self.assertRaises(Queue.Empty, self.done.get_nowait)
1109 _decoratorlock.release()
1111 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
1114 def testSharerBlocksExclusive(self):
1115 _decoratorlock.acquire(shared=1)
1116 self._addThread(target=self._doItExclusive)
1117 self.assertRaises(Queue.Empty, self.done.get_nowait)
1118 _decoratorlock.release()
1120 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
1123 class TestLockSet(_ThreadedTestCase):
1127 _ThreadedTestCase.setUp(self)
1131 """Helper to (re)initialize the lock set"""
1132 self.resources = ['one', 'two', 'three']
1133 self.ls = locking.LockSet(self.resources, "TestLockSet")
1135 def testResources(self):
1136 self.assertEquals(self.ls._names(), set(self.resources))
1137 newls = locking.LockSet([], "TestLockSet.testResources")
1138 self.assertEquals(newls._names(), set())
1140 def testCheckOwnedUnknown(self):
1141 self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one"))
1142 for shared in [-1, 0, 1, 6378, 24255]:
1143 self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one",
1146 def testCheckOwnedUnknownWhileHolding(self):
1147 self.assertFalse(self.ls.check_owned([]))
1148 self.ls.acquire("one", shared=1)
1149 self.assertRaises(errors.LockError, self.ls.check_owned, "nonexist")
1150 self.assertTrue(self.ls.check_owned("one", shared=1))
1151 self.assertFalse(self.ls.check_owned("one", shared=0))
1152 self.assertFalse(self.ls.check_owned(["one", "two"]))
1153 self.assertRaises(errors.LockError, self.ls.check_owned,
1154 ["one", "nonexist"])
1155 self.assertRaises(errors.LockError, self.ls.check_owned, "")
1157 self.assertFalse(self.ls.check_owned([]))
1158 self.assertFalse(self.ls.check_owned("one"))
1160 def testAcquireRelease(self):
1161 self.assertFalse(self.ls.check_owned(self.ls._names()))
1162 self.assert_(self.ls.acquire('one'))
1163 self.assertEquals(self.ls.list_owned(), set(['one']))
1164 self.assertTrue(self.ls.check_owned("one"))
1165 self.assertTrue(self.ls.check_owned("one", shared=0))
1166 self.assertFalse(self.ls.check_owned("one", shared=1))
1168 self.assertEquals(self.ls.list_owned(), set())
1169 self.assertFalse(self.ls.check_owned(self.ls._names()))
1170 self.assertEquals(self.ls.acquire(['one']), set(['one']))
1171 self.assertEquals(self.ls.list_owned(), set(['one']))
1173 self.assertEquals(self.ls.list_owned(), set())
1174 self.ls.acquire(['one', 'two', 'three'])
1175 self.assertEquals(self.ls.list_owned(), set(['one', 'two', 'three']))
1176 self.assertTrue(self.ls.check_owned(self.ls._names()))
1177 self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0))
1178 self.assertFalse(self.ls.check_owned(self.ls._names(), shared=1))
1179 self.ls.release('one')
1180 self.assertFalse(self.ls.check_owned(["one"]))
1181 self.assertTrue(self.ls.check_owned(["two", "three"]))
1182 self.assertTrue(self.ls.check_owned(["two", "three"], shared=0))
1183 self.assertFalse(self.ls.check_owned(["two", "three"], shared=1))
1184 self.assertEquals(self.ls.list_owned(), set(['two', 'three']))
1185 self.ls.release(['three'])
1186 self.assertEquals(self.ls.list_owned(), set(['two']))
1188 self.assertEquals(self.ls.list_owned(), set())
1189 self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
1190 self.assertEquals(self.ls.list_owned(), set(['one', 'three']))
1192 self.assertEquals(self.ls.list_owned(), set())
1193 for name in self.ls._names():
1194 self.assertFalse(self.ls.check_owned(name))
1196 def testNoDoubleAcquire(self):
1197 self.ls.acquire('one')
1198 self.assertRaises(AssertionError, self.ls.acquire, 'one')
1199 self.assertRaises(AssertionError, self.ls.acquire, ['two'])
1200 self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
1202 self.ls.acquire(['one', 'three'])
1203 self.ls.release('one')
1204 self.assertRaises(AssertionError, self.ls.acquire, ['two'])
1205 self.ls.release('three')
1207 def testNoWrongRelease(self):
1208 self.assertRaises(AssertionError, self.ls.release)
1209 self.ls.acquire('one')
1210 self.assertRaises(AssertionError, self.ls.release, 'two')
1212 def testAddRemove(self):
1214 self.assertEquals(self.ls.list_owned(), set())
1215 self.assert_('four' in self.ls._names())
1216 self.ls.add(['five', 'six', 'seven'], acquired=1)
1217 self.assert_('five' in self.ls._names())
1218 self.assert_('six' in self.ls._names())
1219 self.assert_('seven' in self.ls._names())
1220 self.assertEquals(self.ls.list_owned(), set(['five', 'six', 'seven']))
1221 self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
1222 self.assert_('five' not in self.ls._names())
1223 self.assert_('six' not in self.ls._names())
1224 self.assertEquals(self.ls.list_owned(), set(['seven']))
1225 self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
1226 self.ls.remove('seven')
1227 self.assert_('seven' not in self.ls._names())
1228 self.assertEquals(self.ls.list_owned(), set([]))
1229 self.ls.acquire(None, shared=1)
1230 self.assertRaises(AssertionError, self.ls.add, 'eight')
1232 self.ls.acquire(None)
1233 self.ls.add('eight', acquired=1)
1234 self.assert_('eight' in self.ls._names())
1235 self.assert_('eight' in self.ls.list_owned())
1237 self.assert_('nine' in self.ls._names())
1238 self.assert_('nine' not in self.ls.list_owned())
1240 self.ls.remove(['two'])
1241 self.assert_('two' not in self.ls._names())
1242 self.ls.acquire('three')
1243 self.assertEquals(self.ls.remove(['three']), ['three'])
1244 self.assert_('three' not in self.ls._names())
1245 self.assertEquals(self.ls.remove('three'), [])
1246 self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
1247 self.assert_('one' not in self.ls._names())
1249 def testRemoveNonBlocking(self):
1250 self.ls.acquire('one')
1251 self.assertEquals(self.ls.remove('one'), ['one'])
1252 self.ls.acquire(['two', 'three'])
1253 self.assertEquals(self.ls.remove(['two', 'three']),
1256 def testNoDoubleAdd(self):
1257 self.assertRaises(errors.LockError, self.ls.add, 'two')
1259 self.assertRaises(errors.LockError, self.ls.add, 'four')
1261 def testNoWrongRemoves(self):
1262 self.ls.acquire(['one', 'three'], shared=1)
1263 # Cannot remove 'two' while holding something which is not a superset
1264 self.assertRaises(AssertionError, self.ls.remove, 'two')
1265 # Cannot remove 'three' as we are sharing it
1266 self.assertRaises(AssertionError, self.ls.remove, 'three')
1268 def testAcquireSetLock(self):
1269 # acquire the set-lock exclusively
1270 self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
1271 self.assertEquals(self.ls.list_owned(), set(['one', 'two', 'three']))
1272 self.assertEquals(self.ls.is_owned(), True)
1273 self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
1274 # I can still add/remove elements...
1275 self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
1276 self.assert_(self.ls.add('six'))
1278 # share the set-lock
1279 self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
1280 # adding new elements is not possible
1281 self.assertRaises(AssertionError, self.ls.add, 'five')
1284 def testAcquireWithRepetitions(self):
1285 self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
1286 set(['two', 'two', 'three']))
1287 self.ls.release(['two', 'two'])
1288 self.assertEquals(self.ls.list_owned(), set(['three']))
1290 def testEmptyAcquire(self):
1291 # Acquire an empty list of locks...
1292 self.assertEquals(self.ls.acquire([]), set())
1293 self.assertEquals(self.ls.list_owned(), set())
1294 # New locks can still be addded
1295 self.assert_(self.ls.add('six'))
1296 # "re-acquiring" is not an issue, since we had really acquired nothing
1297 self.assertEquals(self.ls.acquire([], shared=1), set())
1298 self.assertEquals(self.ls.list_owned(), set())
1299 # We haven't really acquired anything, so we cannot release
1300 self.assertRaises(AssertionError, self.ls.release)
1302 def _doLockSet(self, names, shared):
1304 self.ls.acquire(names, shared=shared)
1305 self.done.put('DONE')
1307 except errors.LockError:
1308 self.done.put('ERR')
1310 def _doAddSet(self, names):
1312 self.ls.add(names, acquired=1)
1313 self.done.put('DONE')
1315 except errors.LockError:
1316 self.done.put('ERR')
1318 def _doRemoveSet(self, names):
1319 self.done.put(self.ls.remove(names))
1322 def testConcurrentSharedAcquire(self):
1323 self.ls.acquire(['one', 'two'], shared=1)
1324 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1326 self.assertEqual(self.done.get_nowait(), 'DONE')
1327 self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
1329 self.assertEqual(self.done.get_nowait(), 'DONE')
1330 self._addThread(target=self._doLockSet, args=('three', 1))
1332 self.assertEqual(self.done.get_nowait(), 'DONE')
1333 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1334 self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1335 self.assertRaises(Queue.Empty, self.done.get_nowait)
1338 self.assertEqual(self.done.get_nowait(), 'DONE')
1339 self.assertEqual(self.done.get_nowait(), 'DONE')
1342 def testConcurrentExclusiveAcquire(self):
1343 self.ls.acquire(['one', 'two'])
1344 self._addThread(target=self._doLockSet, args=('three', 1))
1346 self.assertEqual(self.done.get_nowait(), 'DONE')
1347 self._addThread(target=self._doLockSet, args=('three', 0))
1349 self.assertEqual(self.done.get_nowait(), 'DONE')
1350 self.assertRaises(Queue.Empty, self.done.get_nowait)
1351 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1352 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1353 self._addThread(target=self._doLockSet, args=('one', 0))
1354 self._addThread(target=self._doLockSet, args=('one', 1))
1355 self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1356 self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
1357 self.assertRaises(Queue.Empty, self.done.get_nowait)
1361 self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1364 def testSimpleAcquireTimeoutExpiring(self):
1365 names = sorted(self.ls._names())
1366 self.assert_(len(names) >= 3)
1368 # Get name of first lock
1371 # Get name of last lock
1375 # Block first and try to lock it again
1378 # Block last and try to lock all locks
1381 # Block last and try to lock it again
1385 for (wanted, block) in checks:
1386 # Lock in exclusive mode
1387 self.assert_(self.ls.acquire(block, shared=0))
1390 # Try to get the same lock again with a timeout (should never succeed)
1391 acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1393 self.done.put("acquired")
1396 self.assert_(acquired is None)
1397 self.assertFalse(self.ls.list_owned())
1398 self.assertFalse(self.ls.is_owned())
1399 self.done.put("not acquired")
1401 self._addThread(target=_AcquireOne)
1403 # Wait for timeout in thread to expire
1406 # Release exclusive lock again
1409 self.assertEqual(self.done.get_nowait(), "not acquired")
1410 self.assertRaises(Queue.Empty, self.done.get_nowait)
1413 def testDelayedAndExpiringLockAcquire(self):
1415 self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1417 for expire in (False, True):
1418 names = sorted(self.ls._names())
1419 self.assertEqual(len(names), 8)
1421 lock_ev = dict([(i, threading.Event()) for i in names])
1423 # Lock all in exclusive mode
1424 self.assert_(self.ls.acquire(names, shared=0))
1427 # We'll wait at least 300ms per lock
1428 lockwait = len(names) * [0.3]
1430 # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1431 # this gives us up to 2.4s to fail.
1432 lockall_timeout = 0.4
1434 # This should finish rather quickly
1436 lockall_timeout = len(names) * 5.0
1439 def acquire_notification(name):
1441 self.done.put("getting %s" % name)
1446 if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1447 test_notify=acquire_notification):
1448 self.done.put("got all")
1451 self.done.put("timeout on all")
1454 for ev in lock_ev.values():
1457 t = self._addThread(target=_LockAll)
1459 for idx, name in enumerate(names):
1460 # Wait for actual acquire on this lock to start
1461 lock_ev[name].wait(10.0)
1463 if expire and t.isAlive():
1464 # Wait some time after getting the notification to make sure the lock
1465 # acquire will expire
1466 SafeSleep(lockwait[idx])
1468 self.ls.release(names=name)
1470 self.assertFalse(self.ls.list_owned())
1475 # Not checking which locks were actually acquired. Doing so would be
1476 # too timing-dependant.
1477 self.assertEqual(self.done.get_nowait(), "timeout on all")
1480 self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1481 self.assertEqual(self.done.get_nowait(), "got all")
1482 self.assertRaises(Queue.Empty, self.done.get_nowait)
1485 def testConcurrentRemove(self):
1487 self.ls.acquire(['one', 'two', 'four'])
1488 self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1489 self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1490 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1491 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1492 self.assertRaises(Queue.Empty, self.done.get_nowait)
1493 self.ls.remove('one')
1497 self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1498 self.ls.add(['five', 'six'], acquired=1)
1499 self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1500 self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1501 self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1502 self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1503 self.ls.remove('five')
1507 self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1508 self.ls.acquire(['three', 'four'])
1509 self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1510 self.assertRaises(Queue.Empty, self.done.get_nowait)
1511 self.ls.remove('four')
1513 self.assertEqual(self.done.get_nowait(), ['six'])
1514 self._addThread(target=self._doRemoveSet, args=(['two']))
1516 self.assertEqual(self.done.get_nowait(), ['two'])
1522 def testConcurrentSharedSetLock(self):
1523 # share the set-lock...
1524 self.ls.acquire(None, shared=1)
1525 # ...another thread can share it too
1526 self._addThread(target=self._doLockSet, args=(None, 1))
1528 self.assertEqual(self.done.get_nowait(), 'DONE')
1529 # ...or just share some elements
1530 self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1532 self.assertEqual(self.done.get_nowait(), 'DONE')
1533 # ...but not add new ones or remove any
1534 t = self._addThread(target=self._doAddSet, args=(['nine']))
1535 self._addThread(target=self._doRemoveSet, args=(['two'], ))
1536 self.assertRaises(Queue.Empty, self.done.get_nowait)
1537 # this just releases the set-lock
1540 self.assertEqual(self.done.get_nowait(), 'DONE')
1541 # release the lock on the actual elements so remove() can proceed too
1544 self.failUnlessEqual(self.done.get_nowait(), ['two'])
1549 def testConcurrentExclusiveSetLock(self):
1550 # acquire the set-lock...
1551 self.ls.acquire(None, shared=0)
1552 # ...no one can do anything else
1553 self._addThread(target=self._doLockSet, args=(None, 1))
1554 self._addThread(target=self._doLockSet, args=(None, 0))
1555 self._addThread(target=self._doLockSet, args=(['three'], 0))
1556 self._addThread(target=self._doLockSet, args=(['two'], 1))
1557 self._addThread(target=self._doAddSet, args=(['nine']))
1558 self.assertRaises(Queue.Empty, self.done.get_nowait)
1562 self.assertEqual(self.done.get(True, 1), 'DONE')
1567 def testConcurrentSetLockAdd(self):
1568 self.ls.acquire('one')
1569 # Another thread wants the whole SetLock
1570 self._addThread(target=self._doLockSet, args=(None, 0))
1571 self._addThread(target=self._doLockSet, args=(None, 1))
1572 self.assertRaises(Queue.Empty, self.done.get_nowait)
1573 self.assertRaises(AssertionError, self.ls.add, 'four')
1576 self.assertEqual(self.done.get_nowait(), 'DONE')
1577 self.assertEqual(self.done.get_nowait(), 'DONE')
1578 self.ls.acquire(None)
1579 self._addThread(target=self._doLockSet, args=(None, 0))
1580 self._addThread(target=self._doLockSet, args=(None, 1))
1581 self.assertRaises(Queue.Empty, self.done.get_nowait)
1583 self.ls.add('five', acquired=1)
1584 self.ls.add('six', acquired=1, shared=1)
1585 self.assertEquals(self.ls.list_owned(),
1586 set(['one', 'two', 'three', 'five', 'six']))
1587 self.assertEquals(self.ls.is_owned(), True)
1588 self.assertEquals(self.ls._names(),
1589 set(['one', 'two', 'three', 'four', 'five', 'six']))
1592 self.assertEqual(self.done.get_nowait(), 'DONE')
1593 self.assertEqual(self.done.get_nowait(), 'DONE')
1597 def testEmptyLockSet(self):
1599 self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1601 self.ls.remove(['one', 'two', 'three'])
1602 # and adds/locks by another thread still wait
1603 self._addThread(target=self._doAddSet, args=(['nine']))
1604 self._addThread(target=self._doLockSet, args=(None, 1))
1605 self._addThread(target=self._doLockSet, args=(None, 0))
1606 self.assertRaises(Queue.Empty, self.done.get_nowait)
1610 self.assertEqual(self.done.get_nowait(), 'DONE')
1612 self.assertEqual(self.ls.remove(['nine']), ['nine'])
1614 self.assertEqual(self.ls.acquire(None, shared=1), set())
1615 # other sharers can go, adds still wait
1616 self._addThread(target=self._doLockSet, args=(None, 1))
1618 self.assertEqual(self.done.get_nowait(), 'DONE')
1619 self._addThread(target=self._doAddSet, args=(['nine']))
1620 self.assertRaises(Queue.Empty, self.done.get_nowait)
1623 self.assertEqual(self.done.get_nowait(), 'DONE')
1626 def testAcquireWithNamesDowngrade(self):
1627 self.assertEquals(self.ls.acquire("two", shared=0), set(["two"]))
1628 self.assertTrue(self.ls.is_owned())
1629 self.assertFalse(self.ls._get_lock().is_owned())
1631 self.assertFalse(self.ls.is_owned())
1632 self.assertFalse(self.ls._get_lock().is_owned())
1633 # Can't downgrade after releasing
1634 self.assertRaises(AssertionError, self.ls.downgrade, "two")
1636 def testDowngrade(self):
1637 # Not owning anything, must raise an exception
1638 self.assertFalse(self.ls.is_owned())
1639 self.assertRaises(AssertionError, self.ls.downgrade)
1641 self.assertFalse(compat.any(i.is_owned()
1642 for i in self.ls._get_lockdict().values()))
1643 self.assertFalse(self.ls.check_owned(self.ls._names()))
1644 for name in self.ls._names():
1645 self.assertFalse(self.ls.check_owned(name))
1647 self.assertEquals(self.ls.acquire(None, shared=0),
1648 set(["one", "two", "three"]))
1649 self.assertRaises(AssertionError, self.ls.downgrade, "unknown lock")
1651 self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0))
1652 for name in self.ls._names():
1653 self.assertTrue(self.ls.check_owned(name))
1654 self.assertTrue(self.ls.check_owned(name, shared=0))
1655 self.assertFalse(self.ls.check_owned(name, shared=1))
1657 self.assertTrue(self.ls._get_lock().is_owned(shared=0))
1658 self.assertTrue(compat.all(i.is_owned(shared=0)
1659 for i in self.ls._get_lockdict().values()))
1661 # Start downgrading locks
1662 self.assertTrue(self.ls.downgrade(names=["one"]))
1663 self.assertTrue(self.ls._get_lock().is_owned(shared=0))
1664 self.assertTrue(compat.all(lock.is_owned(shared=[0, 1][int(name == "one")])
1666 self.ls._get_lockdict().items()))
1668 self.assertFalse(self.ls.check_owned("one", shared=0))
1669 self.assertTrue(self.ls.check_owned("one", shared=1))
1670 self.assertTrue(self.ls.check_owned("two", shared=0))
1671 self.assertTrue(self.ls.check_owned("three", shared=0))
1673 # Downgrade second lock
1674 self.assertTrue(self.ls.downgrade(names="two"))
1675 self.assertTrue(self.ls._get_lock().is_owned(shared=0))
1676 should_share = lambda name: [0, 1][int(name in ("one", "two"))]
1677 self.assertTrue(compat.all(lock.is_owned(shared=should_share(name))
1679 self.ls._get_lockdict().items()))
1681 self.assertFalse(self.ls.check_owned("one", shared=0))
1682 self.assertTrue(self.ls.check_owned("one", shared=1))
1683 self.assertFalse(self.ls.check_owned("two", shared=0))
1684 self.assertTrue(self.ls.check_owned("two", shared=1))
1685 self.assertTrue(self.ls.check_owned("three", shared=0))
1687 # Downgrading the last exclusive lock to shared must downgrade the
1688 # lockset-internal lock too
1689 self.assertTrue(self.ls.downgrade(names="three"))
1690 self.assertTrue(self.ls._get_lock().is_owned(shared=1))
1691 self.assertTrue(compat.all(i.is_owned(shared=1)
1692 for i in self.ls._get_lockdict().values()))
1694 # Verify owned locks
1695 for name in self.ls._names():
1696 self.assertTrue(self.ls.check_owned(name, shared=1))
1698 # Downgrading a shared lock must be a no-op
1699 self.assertTrue(self.ls.downgrade(names=["one", "three"]))
1700 self.assertTrue(self.ls._get_lock().is_owned(shared=1))
1701 self.assertTrue(compat.all(i.is_owned(shared=1)
1702 for i in self.ls._get_lockdict().values()))
1706 def testPriority(self):
1707 def _Acquire(prev, next, name, priority, success_fn):
1709 self.assert_(self.ls.acquire(name, shared=0,
1711 test_notify=lambda _: next.set()))
1717 # Get all in exclusive mode
1718 self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
1720 done_two = Queue.Queue(0)
1722 first = threading.Event()
1725 acquires = [("one", prio, self.done) for prio in range(1, 33)]
1726 acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
1728 # Use a deterministic random generator
1729 random.Random(741).shuffle(acquires)
1731 for (name, prio, done) in acquires:
1732 ev = threading.Event()
1733 self._addThread(target=_Acquire,
1734 args=(prev, ev, name, prio,
1735 compat.partial(done.put, "Prio%s" % prio)))
1741 # Wait for last acquire to start
1744 # Let threads acquire locks
1747 # Wait for threads to finish
1750 for i in range(1, 33):
1751 self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
1752 self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
1754 self.assertRaises(Queue.Empty, self.done.get_nowait)
1755 self.assertRaises(Queue.Empty, done_two.get_nowait)
1758 class TestGanetiLockManager(_ThreadedTestCase):
1761 _ThreadedTestCase.setUp(self)
1762 self.nodes=['n1', 'n2']
1763 self.nodegroups=['g1', 'g2']
1764 self.instances=['i1', 'i2', 'i3']
1765 self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups,
1769 # Don't try this at home...
1770 locking.GanetiLockManager._instance = None
1772 def testLockingConstants(self):
1773 # The locking library internally cheats by assuming its constants have some
1774 # relationships with each other. Check those hold true.
1775 # This relationship is also used in the Processor to recursively acquire
1776 # the right locks. Again, please don't break it.
1777 for i in range(len(locking.LEVELS)):
1778 self.assertEqual(i, locking.LEVELS[i])
1780 def testDoubleGLFails(self):
1781 self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [])
1783 def testLockNames(self):
1784 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1785 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1786 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1787 set(self.nodegroups))
1788 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1789 set(self.instances))
1791 def testInitAndResources(self):
1792 locking.GanetiLockManager._instance = None
1793 self.GL = locking.GanetiLockManager([], [], [])
1794 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1795 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1796 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1797 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1799 locking.GanetiLockManager._instance = None
1800 self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [])
1801 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1802 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1803 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1804 set(self.nodegroups))
1805 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1807 locking.GanetiLockManager._instance = None
1808 self.GL = locking.GanetiLockManager([], [], self.instances)
1809 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1810 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1811 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1812 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1813 set(self.instances))
1815 def testAcquireRelease(self):
1816 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1817 self.assertEquals(self.GL.list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1818 self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1819 self.GL.acquire(locking.LEVEL_NODEGROUP, ['g2'])
1820 self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1821 self.assertTrue(self.GL.check_owned(locking.LEVEL_NODE, ["n1", "n2"],
1823 self.assertFalse(self.GL.check_owned(locking.LEVEL_INSTANCE, ["i1", "i3"]))
1824 self.GL.release(locking.LEVEL_NODE, ['n2'])
1825 self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set(['n1']))
1826 self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1827 self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1828 self.GL.release(locking.LEVEL_NODE)
1829 self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set())
1830 self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1831 self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1832 self.GL.release(locking.LEVEL_NODEGROUP)
1833 self.GL.release(locking.LEVEL_INSTANCE)
1834 self.assertRaises(errors.LockError, self.GL.acquire,
1835 locking.LEVEL_INSTANCE, ['i5'])
1836 self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1837 self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1839 def testAcquireWholeSets(self):
1840 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1841 self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1842 set(self.instances))
1843 self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE),
1844 set(self.instances))
1845 self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
1846 set(self.nodegroups))
1847 self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP),
1848 set(self.nodegroups))
1849 self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1851 self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
1853 self.GL.release(locking.LEVEL_NODE)
1854 self.GL.release(locking.LEVEL_NODEGROUP)
1855 self.GL.release(locking.LEVEL_INSTANCE)
1856 self.GL.release(locking.LEVEL_CLUSTER)
1858 def testAcquireWholeAndPartial(self):
1859 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1860 self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1861 set(self.instances))
1862 self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE),
1863 set(self.instances))
1864 self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1866 self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
1868 self.GL.release(locking.LEVEL_NODE)
1869 self.GL.release(locking.LEVEL_INSTANCE)
1870 self.GL.release(locking.LEVEL_CLUSTER)
1872 def testBGLDependency(self):
1873 self.assertRaises(AssertionError, self.GL.acquire,
1874 locking.LEVEL_NODE, ['n1', 'n2'])
1875 self.assertRaises(AssertionError, self.GL.acquire,
1876 locking.LEVEL_INSTANCE, ['i3'])
1877 self.assertRaises(AssertionError, self.GL.acquire,
1878 locking.LEVEL_NODEGROUP, ['g1'])
1879 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1880 self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1881 self.assertRaises(AssertionError, self.GL.release,
1882 locking.LEVEL_CLUSTER, ['BGL'])
1883 self.assertRaises(AssertionError, self.GL.release,
1884 locking.LEVEL_CLUSTER)
1885 self.GL.release(locking.LEVEL_NODE)
1886 self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1887 self.assertRaises(AssertionError, self.GL.release,
1888 locking.LEVEL_CLUSTER, ['BGL'])
1889 self.assertRaises(AssertionError, self.GL.release,
1890 locking.LEVEL_CLUSTER)
1891 self.GL.release(locking.LEVEL_INSTANCE)
1892 self.GL.acquire(locking.LEVEL_NODEGROUP, None)
1893 self.GL.release(locking.LEVEL_NODEGROUP, ['g1'])
1894 self.assertRaises(AssertionError, self.GL.release,
1895 locking.LEVEL_CLUSTER, ['BGL'])
1896 self.assertRaises(AssertionError, self.GL.release,
1897 locking.LEVEL_CLUSTER)
1898 self.GL.release(locking.LEVEL_NODEGROUP)
1899 self.GL.release(locking.LEVEL_CLUSTER)
1901 def testWrongOrder(self):
1902 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1903 self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1904 self.assertRaises(AssertionError, self.GL.acquire,
1905 locking.LEVEL_NODE, ['n1'])
1906 self.assertRaises(AssertionError, self.GL.acquire,
1907 locking.LEVEL_NODEGROUP, ['g1'])
1908 self.assertRaises(AssertionError, self.GL.acquire,
1909 locking.LEVEL_INSTANCE, ['i2'])
1911 def testModifiableLevels(self):
1912 self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER,
1914 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'])
1915 self.GL.add(locking.LEVEL_INSTANCE, ['i4'])
1916 self.GL.remove(locking.LEVEL_INSTANCE, ['i3'])
1917 self.GL.remove(locking.LEVEL_INSTANCE, ['i1'])
1918 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(['i2', 'i4']))
1919 self.GL.add(locking.LEVEL_NODE, ['n3'])
1920 self.GL.remove(locking.LEVEL_NODE, ['n1'])
1921 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(['n2', 'n3']))
1922 self.GL.add(locking.LEVEL_NODEGROUP, ['g3'])
1923 self.GL.remove(locking.LEVEL_NODEGROUP, ['g2'])
1924 self.GL.remove(locking.LEVEL_NODEGROUP, ['g1'])
1925 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(['g3']))
1926 self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER,
1929 # Helper function to run as a thread that shared the BGL and then acquires
1930 # some locks at another level.
1931 def _doLock(self, level, names, shared):
1933 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1934 self.GL.acquire(level, names, shared=shared)
1935 self.done.put('DONE')
1936 self.GL.release(level)
1937 self.GL.release(locking.LEVEL_CLUSTER)
1938 except errors.LockError:
1939 self.done.put('ERR')
1942 def testConcurrency(self):
1943 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1944 self._addThread(target=self._doLock,
1945 args=(locking.LEVEL_INSTANCE, 'i1', 1))
1947 self.assertEqual(self.done.get_nowait(), 'DONE')
1948 self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1949 self._addThread(target=self._doLock,
1950 args=(locking.LEVEL_INSTANCE, 'i1', 1))
1952 self.assertEqual(self.done.get_nowait(), 'DONE')
1953 self._addThread(target=self._doLock,
1954 args=(locking.LEVEL_INSTANCE, 'i3', 1))
1955 self.assertRaises(Queue.Empty, self.done.get_nowait)
1956 self.GL.release(locking.LEVEL_INSTANCE)
1958 self.assertEqual(self.done.get_nowait(), 'DONE')
1959 self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1960 self._addThread(target=self._doLock,
1961 args=(locking.LEVEL_INSTANCE, 'i2', 1))
1963 self.assertEqual(self.done.get_nowait(), 'DONE')
1964 self._addThread(target=self._doLock,
1965 args=(locking.LEVEL_INSTANCE, 'i2', 0))
1966 self.assertRaises(Queue.Empty, self.done.get_nowait)
1967 self.GL.release(locking.LEVEL_INSTANCE)
1969 self.assertEqual(self.done.get(True, 1), 'DONE')
1970 self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1973 class TestLockMonitor(_ThreadedTestCase):
1975 _ThreadedTestCase.setUp(self)
1976 self.lm = locking.LockMonitor()
1978 def testSingleThread(self):
1981 for i in range(100):
1982 name = "TestLock%s" % i
1983 locks.append(locking.SharedLock(name, monitor=self.lm))
1985 self.assertEqual(len(self.lm._locks), len(locks))
1986 result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"]))
1987 self.assertEqual(len(result.fields), 1)
1988 self.assertEqual(len(result.data), 100)
1993 # The garbage collector might needs some time
1996 raise utils.RetryAgain()
1998 utils.Retry(_CheckLocks, 0.1, 30.0)
2000 self.assertFalse(self.lm._locks)
2002 def testMultiThread(self):
2005 def _CreateLock(prev, next, name):
2007 locks.append(locking.SharedLock(name, monitor=self.lm))
2013 first = threading.Event()
2016 # Use a deterministic random generator
2017 for i in random.Random(4263).sample(range(100), 33):
2018 name = "MtTestLock%s" % i
2019 expnames.append(name)
2021 ev = threading.Event()
2022 self._addThread(target=_CreateLock, args=(prev, ev, name))
2029 # Check order in which locks were added
2030 self.assertEqual([i.name for i in locks], expnames)
2032 # Check query result
2033 result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2034 self.assert_(isinstance(result, dict))
2035 response = objects.QueryResponse.FromDict(result)
2036 self.assertEqual(response.data,
2037 [[(constants.RS_NORMAL, name),
2038 (constants.RS_NORMAL, None),
2039 (constants.RS_NORMAL, None),
2040 (constants.RS_NORMAL, [])]
2041 for name in utils.NiceSort(expnames)])
2042 self.assertEqual(len(response.fields), 4)
2043 self.assertEqual(["name", "mode", "owner", "pending"],
2044 [fdef.name for fdef in response.fields])
2046 # Test exclusive acquire
2047 for tlock in locks[::4]:
2048 tlock.acquire(shared=0)
2050 def _GetExpResult(name):
2051 if tlock.name == name:
2052 return [(constants.RS_NORMAL, name),
2053 (constants.RS_NORMAL, "exclusive"),
2054 (constants.RS_NORMAL,
2055 [threading.currentThread().getName()]),
2056 (constants.RS_NORMAL, [])]
2057 return [(constants.RS_NORMAL, name),
2058 (constants.RS_NORMAL, None),
2059 (constants.RS_NORMAL, None),
2060 (constants.RS_NORMAL, [])]
2062 result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2063 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2064 [_GetExpResult(name)
2065 for name in utils.NiceSort(expnames)])
2069 # Test shared acquire
2070 def _Acquire(lock, shared, ev, notify):
2071 lock.acquire(shared=shared)
2078 for tlock1 in locks[::11]:
2079 for tlock2 in locks[::-15]:
2080 if tlock2 == tlock1:
2084 for tlock3 in locks[::10]:
2085 if tlock3 in (tlock2, tlock1):
2089 releaseev = threading.Event()
2095 ev = threading.Event()
2096 tthreads1.append(self._addThread(target=_Acquire,
2097 args=(tlock1, 1, releaseev, ev)))
2098 acquireev.append(ev)
2100 ev = threading.Event()
2101 tthread2 = self._addThread(target=_Acquire,
2102 args=(tlock2, 1, releaseev, ev))
2103 acquireev.append(ev)
2105 ev = threading.Event()
2106 tthread3 = self._addThread(target=_Acquire,
2107 args=(tlock3, 0, releaseev, ev))
2108 acquireev.append(ev)
2110 # Wait for all locks to be acquired
2114 # Check query result
2115 result = self.lm.QueryLocks(["name", "mode", "owner"])
2116 response = objects.QueryResponse.FromDict(result)
2117 for (name, mode, owner) in response.data:
2118 (name_status, name_value) = name
2119 (owner_status, owner_value) = owner
2121 self.assertEqual(name_status, constants.RS_NORMAL)
2122 self.assertEqual(owner_status, constants.RS_NORMAL)
2124 if name_value == tlock1.name:
2125 self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
2126 self.assertEqual(set(owner_value),
2127 set(i.getName() for i in tthreads1))
2130 if name_value == tlock2.name:
2131 self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
2132 self.assertEqual(owner_value, [tthread2.getName()])
2135 if name_value == tlock3.name:
2136 self.assertEqual(mode, (constants.RS_NORMAL, "exclusive"))
2137 self.assertEqual(owner_value, [tthread3.getName()])
2140 self.assert_(name_value in expnames)
2141 self.assertEqual(mode, (constants.RS_NORMAL, None))
2142 self.assert_(owner_value is None)
2144 # Release locks again
2149 result = self.lm.QueryLocks(["name", "mode", "owner"])
2150 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2151 [[(constants.RS_NORMAL, name),
2152 (constants.RS_NORMAL, None),
2153 (constants.RS_NORMAL, None)]
2154 for name in utils.NiceSort(expnames)])
2156 def testDelete(self):
2157 lock = locking.SharedLock("TestLock", monitor=self.lm)
2159 self.assertEqual(len(self.lm._locks), 1)
2160 result = self.lm.QueryLocks(["name", "mode", "owner"])
2161 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2162 [[(constants.RS_NORMAL, lock.name),
2163 (constants.RS_NORMAL, None),
2164 (constants.RS_NORMAL, None)]])
2168 result = self.lm.QueryLocks(["name", "mode", "owner"])
2169 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2170 [[(constants.RS_NORMAL, lock.name),
2171 (constants.RS_NORMAL, "deleted"),
2172 (constants.RS_NORMAL, None)]])
2173 self.assertEqual(len(self.lm._locks), 1)
2175 def testPending(self):
2176 def _Acquire(lock, shared, prev, next):
2179 lock.acquire(shared=shared, test_notify=next.set)
2185 lock = locking.SharedLock("ExcLock", monitor=self.lm)
2187 for shared in [0, 1]:
2190 self.assertEqual(len(self.lm._locks), 1)
2191 result = self.lm.QueryLocks(["name", "mode", "owner"])
2192 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2193 [[(constants.RS_NORMAL, lock.name),
2194 (constants.RS_NORMAL, "exclusive"),
2195 (constants.RS_NORMAL,
2196 [threading.currentThread().getName()])]])
2200 first = threading.Event()
2204 ev = threading.Event()
2205 threads.append(self._addThread(target=_Acquire,
2206 args=(lock, shared, prev, ev)))
2212 # Wait for last acquire to start waiting
2215 # NOTE: This works only because QueryLocks will acquire the
2216 # lock-internal lock again and won't be able to get the information
2217 # until it has the lock. By then the acquire should be registered in
2218 # SharedLock.__pending (otherwise it's a bug).
2220 # All acquires are waiting now
2222 pending = [("shared", utils.NiceSort(t.getName() for t in threads))]
2224 pending = [("exclusive", [t.getName()]) for t in threads]
2226 result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2227 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2228 [[(constants.RS_NORMAL, lock.name),
2229 (constants.RS_NORMAL, "exclusive"),
2230 (constants.RS_NORMAL,
2231 [threading.currentThread().getName()]),
2232 (constants.RS_NORMAL, pending)]])
2234 self.assertEqual(len(self.lm._locks), 1)
2240 # No pending acquires
2241 result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2242 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2243 [[(constants.RS_NORMAL, lock.name),
2244 (constants.RS_NORMAL, None),
2245 (constants.RS_NORMAL, None),
2246 (constants.RS_NORMAL, [])]])
2248 self.assertEqual(len(self.lm._locks), 1)
2250 def testDeleteAndRecreate(self):
2251 lname = "TestLock101923193"
2253 # Create some locks with the same name and keep all references
2254 locks = [locking.SharedLock(lname, monitor=self.lm)
2257 self.assertEqual(len(self.lm._locks), len(locks))
2259 result = self.lm.QueryLocks(["name", "mode", "owner"])
2260 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2261 [[(constants.RS_NORMAL, lname),
2262 (constants.RS_NORMAL, None),
2263 (constants.RS_NORMAL, None)]] * 5)
2267 # Check information order
2268 result = self.lm.QueryLocks(["name", "mode", "owner"])
2269 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2270 [[(constants.RS_NORMAL, lname),
2271 (constants.RS_NORMAL, None),
2272 (constants.RS_NORMAL, None)]] * 2 +
2273 [[(constants.RS_NORMAL, lname),
2274 (constants.RS_NORMAL, "deleted"),
2275 (constants.RS_NORMAL, None)]] +
2276 [[(constants.RS_NORMAL, lname),
2277 (constants.RS_NORMAL, None),
2278 (constants.RS_NORMAL, None)]] * 2)
2280 locks[1].acquire(shared=0)
2283 [(constants.RS_NORMAL, lname),
2284 (constants.RS_NORMAL, None),
2285 (constants.RS_NORMAL, None)],
2286 [(constants.RS_NORMAL, lname),
2287 (constants.RS_NORMAL, "exclusive"),
2288 (constants.RS_NORMAL, [threading.currentThread().getName()])],
2289 [(constants.RS_NORMAL, lname),
2290 (constants.RS_NORMAL, "deleted"),
2291 (constants.RS_NORMAL, None)],
2292 [(constants.RS_NORMAL, lname),
2293 (constants.RS_NORMAL, None),
2294 (constants.RS_NORMAL, None)],
2295 [(constants.RS_NORMAL, lname),
2296 (constants.RS_NORMAL, None),
2297 (constants.RS_NORMAL, None)],
2300 # Check information order
2301 result = self.lm.QueryLocks(["name", "mode", "owner"])
2302 self.assertEqual(objects.QueryResponse.FromDict(result).data, last_status)
2304 self.assertEqual(len(set(self.lm._locks.values())), len(locks))
2305 self.assertEqual(len(self.lm._locks), len(locks))
2307 # Check lock deletion
2308 for idx in range(len(locks)):
2310 assert gc.isenabled()
2312 self.assertEqual(len(self.lm._locks), len(locks))
2313 result = self.lm.QueryLocks(["name", "mode", "owner"])
2314 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2315 last_status[idx + 1:])
2317 # All locks should have been deleted
2319 self.assertFalse(self.lm._locks)
2321 result = self.lm.QueryLocks(["name", "mode", "owner"])
2322 self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
2328 def AddResult(self, *args):
2329 self._info.append(args)
2331 def CountPending(self):
2332 return len(self._info)
2334 def GetLockInfo(self, requested):
2335 (exp_requested, result) = self._info.pop(0)
2337 if exp_requested != requested:
2338 raise Exception("Requested information (%s) does not match"
2339 " expectations (%s)" % (requested, exp_requested))
2343 def testMultipleResults(self):
2344 fl1 = self._FakeLock()
2345 fl2 = self._FakeLock()
2347 self.lm.RegisterLock(fl1)
2348 self.lm.RegisterLock(fl2)
2351 for i in [fl1, fl2]:
2352 i.AddResult(set([query.LQ_MODE, query.LQ_OWNER]), [])
2353 result = self.lm.QueryLocks(["name", "mode", "owner"])
2354 self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
2355 for i in [fl1, fl2]:
2356 self.assertEqual(i.CountPending(), 0)
2359 for fn in [lambda x: x, reversed, sorted]:
2360 fl1.AddResult(set(), list(fn([
2361 ("aaa", None, None, None),
2362 ("bbb", None, None, None),
2364 fl2.AddResult(set(), [])
2365 result = self.lm.QueryLocks(["name"])
2366 self.assertEqual(objects.QueryResponse.FromDict(result).data, [
2367 [(constants.RS_NORMAL, "aaa")],
2368 [(constants.RS_NORMAL, "bbb")],
2370 for i in [fl1, fl2]:
2371 self.assertEqual(i.CountPending(), 0)
2373 for fn2 in [lambda x: x, reversed, sorted]:
2374 fl1.AddResult(set([query.LQ_MODE]), list(fn([
2375 # Same name, but different information
2376 ("aaa", "mode0", None, None),
2377 ("aaa", "mode1", None, None),
2378 ("aaa", "mode2", None, None),
2379 ("aaa", "mode3", None, None),
2381 fl2.AddResult(set([query.LQ_MODE]), [
2382 ("zzz", "end", None, None),
2383 ("000", "start", None, None),
2385 ("aaa", "b200", None, None),
2386 ("aaa", "b300", None, None),
2388 result = self.lm.QueryLocks(["name", "mode"])
2389 self.assertEqual(objects.QueryResponse.FromDict(result).data, [
2390 [(constants.RS_NORMAL, "000"), (constants.RS_NORMAL, "start")],
2392 # Name is the same, so order must be equal to incoming order
2393 [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode0")],
2394 [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode1")],
2395 [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode2")],
2396 [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode3")],
2398 [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b200")],
2399 [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b300")],
2401 [(constants.RS_NORMAL, "zzz"), (constants.RS_NORMAL, "end")],
2403 for i in [fl1, fl2]:
2404 self.assertEqual(i.CountPending(), 0)
2407 if __name__ == "__main__":
2408 testutils.GanetiTestProgram()