4 # Copyright (C) 2006, 2007, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the locking module"""
34 from ganeti import constants
35 from ganeti import locking
36 from ganeti import errors
37 from ganeti import utils
38 from ganeti import compat
39 from ganeti import objects
40 from ganeti import query
45 # This is used to test the ssynchronize decorator.
46 # Since it's passed as input to a decorator it must be declared as a global.
47 _decoratorlock = locking.SharedLock("decorator lock")
49 #: List for looping tests
54 """Decorator for executing a function many times"""
55 def wrapper(*args, **kwargs):
61 def SafeSleep(duration):
64 delay = start + duration - time.time()
70 class _ThreadedTestCase(unittest.TestCase):
71 """Test class that supports adding/waiting on threads"""
73 unittest.TestCase.setUp(self)
74 self.done = Queue.Queue(0)
77 def _addThread(self, *args, **kwargs):
78 """Create and remember a new thread"""
79 t = threading.Thread(*args, **kwargs)
80 self.threads.append(t)
84 def _waitThreads(self):
85 """Wait for all our threads to finish"""
86 for t in self.threads:
88 self.failIf(t.isAlive())
92 class _ConditionTestCase(_ThreadedTestCase):
93 """Common test case for conditions"""
96 _ThreadedTestCase.setUp(self)
97 self.lock = threading.Lock()
98 self.cond = cls(self.lock)
100 def _testAcquireRelease(self):
101 self.assertFalse(self.cond._is_owned())
102 self.assertRaises(RuntimeError, self.cond.wait, None)
103 self.assertRaises(RuntimeError, self.cond.notifyAll)
106 self.assert_(self.cond._is_owned())
107 self.cond.notifyAll()
108 self.assert_(self.cond._is_owned())
111 self.assertFalse(self.cond._is_owned())
112 self.assertRaises(RuntimeError, self.cond.wait, None)
113 self.assertRaises(RuntimeError, self.cond.notifyAll)
115 def _testNotification(self):
120 self.cond.notifyAll()
125 self._addThread(target=_NotifyAll)
126 self.assertEqual(self.done.get(True, 1), "NE")
127 self.assertRaises(Queue.Empty, self.done.get_nowait)
129 self.assertEqual(self.done.get(True, 1), "NA")
130 self.assertEqual(self.done.get(True, 1), "NN")
131 self.assert_(self.cond._is_owned())
133 self.assertFalse(self.cond._is_owned())
136 class TestSingleNotifyPipeCondition(_ConditionTestCase):
137 """SingleNotifyPipeCondition tests"""
140 _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
142 def testAcquireRelease(self):
143 self._testAcquireRelease()
145 def testNotification(self):
146 self._testNotification()
148 def testWaitReuse(self):
154 def testNoNotifyReuse(self):
156 self.cond.notifyAll()
157 self.assertRaises(RuntimeError, self.cond.wait, None)
158 self.assertRaises(RuntimeError, self.cond.notifyAll)
162 class TestPipeCondition(_ConditionTestCase):
163 """PipeCondition tests"""
166 _ConditionTestCase.setUp(self, locking.PipeCondition)
168 def testAcquireRelease(self):
169 self._testAcquireRelease()
171 def testNotification(self):
172 self._testNotification()
174 def _TestWait(self, fn):
176 self._addThread(target=fn),
177 self._addThread(target=fn),
178 self._addThread(target=fn),
181 # Wait for threads to be waiting
183 self.assertEqual(self.done.get(True, 1), "A")
185 self.assertRaises(Queue.Empty, self.done.get_nowait)
188 self.assertEqual(len(self.cond._waiters), 3)
189 self.assertEqual(self.cond._waiters, set(threads))
190 # This new thread can't acquire the lock, and thus call wait, before we
192 self._addThread(target=fn)
193 self.cond.notifyAll()
194 self.assertRaises(Queue.Empty, self.done.get_nowait)
197 # We should now get 3 W and 1 A (for the new thread) in whatever order
201 got = self.done.get(True, 1)
207 self.fail("Got %s on the done queue" % got)
209 self.assertEqual(w, 3)
210 self.assertEqual(a, 1)
213 self.cond.notifyAll()
216 self.assertEqual(self.done.get_nowait(), "W")
217 self.assertRaises(Queue.Empty, self.done.get_nowait)
219 def testBlockingWait(self):
227 self._TestWait(_BlockingWait)
229 def testLongTimeoutWait(self):
237 self._TestWait(_Helper)
239 def _TimeoutWait(self, timeout, check):
241 self.cond.wait(timeout)
245 def testShortTimeoutWait(self):
246 self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
247 self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
249 self.assertEqual(self.done.get_nowait(), "T1")
250 self.assertEqual(self.done.get_nowait(), "T1")
251 self.assertRaises(Queue.Empty, self.done.get_nowait)
253 def testZeroTimeoutWait(self):
254 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
255 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
256 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
258 self.assertEqual(self.done.get_nowait(), "T0")
259 self.assertEqual(self.done.get_nowait(), "T0")
260 self.assertEqual(self.done.get_nowait(), "T0")
261 self.assertRaises(Queue.Empty, self.done.get_nowait)
264 class TestSharedLock(_ThreadedTestCase):
265 """SharedLock tests"""
268 _ThreadedTestCase.setUp(self)
269 self.sl = locking.SharedLock("TestSharedLock")
271 def testSequenceAndOwnership(self):
272 self.assertFalse(self.sl._is_owned())
273 self.sl.acquire(shared=1)
274 self.assert_(self.sl._is_owned())
275 self.assert_(self.sl._is_owned(shared=1))
276 self.assertFalse(self.sl._is_owned(shared=0))
278 self.assertFalse(self.sl._is_owned())
280 self.assert_(self.sl._is_owned())
281 self.assertFalse(self.sl._is_owned(shared=1))
282 self.assert_(self.sl._is_owned(shared=0))
284 self.assertFalse(self.sl._is_owned())
285 self.sl.acquire(shared=1)
286 self.assert_(self.sl._is_owned())
287 self.assert_(self.sl._is_owned(shared=1))
288 self.assertFalse(self.sl._is_owned(shared=0))
290 self.assertFalse(self.sl._is_owned())
292 def testBooleanValue(self):
293 # semaphores are supposed to return a true value on a successful acquire
294 self.assert_(self.sl.acquire(shared=1))
296 self.assert_(self.sl.acquire())
299 def testDoubleLockingStoE(self):
300 self.sl.acquire(shared=1)
301 self.assertRaises(AssertionError, self.sl.acquire)
303 def testDoubleLockingEtoS(self):
305 self.assertRaises(AssertionError, self.sl.acquire, shared=1)
307 def testDoubleLockingStoS(self):
308 self.sl.acquire(shared=1)
309 self.assertRaises(AssertionError, self.sl.acquire, shared=1)
311 def testDoubleLockingEtoE(self):
313 self.assertRaises(AssertionError, self.sl.acquire)
315 # helper functions: called in a separate thread they acquire the lock, send
316 # their identifier on the done queue, then release it.
317 def _doItSharer(self):
319 self.sl.acquire(shared=1)
322 except errors.LockError:
325 def _doItExclusive(self):
330 except errors.LockError:
333 def _doItDelete(self):
337 except errors.LockError:
340 def testSharersCanCoexist(self):
341 self.sl.acquire(shared=1)
342 threading.Thread(target=self._doItSharer).start()
343 self.assert_(self.done.get(True, 1))
347 def testExclusiveBlocksExclusive(self):
349 self._addThread(target=self._doItExclusive)
350 self.assertRaises(Queue.Empty, self.done.get_nowait)
353 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
356 def testExclusiveBlocksDelete(self):
358 self._addThread(target=self._doItDelete)
359 self.assertRaises(Queue.Empty, self.done.get_nowait)
362 self.failUnlessEqual(self.done.get_nowait(), 'DEL')
363 self.sl = locking.SharedLock(self.sl.name)
366 def testExclusiveBlocksSharer(self):
368 self._addThread(target=self._doItSharer)
369 self.assertRaises(Queue.Empty, self.done.get_nowait)
372 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
375 def testSharerBlocksExclusive(self):
376 self.sl.acquire(shared=1)
377 self._addThread(target=self._doItExclusive)
378 self.assertRaises(Queue.Empty, self.done.get_nowait)
381 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
384 def testSharerBlocksDelete(self):
385 self.sl.acquire(shared=1)
386 self._addThread(target=self._doItDelete)
387 self.assertRaises(Queue.Empty, self.done.get_nowait)
390 self.failUnlessEqual(self.done.get_nowait(), 'DEL')
391 self.sl = locking.SharedLock(self.sl.name)
394 def testWaitingExclusiveBlocksSharer(self):
395 """SKIPPED testWaitingExclusiveBlockSharer"""
398 self.sl.acquire(shared=1)
399 # the lock is acquired in shared mode...
400 self._addThread(target=self._doItExclusive)
401 # ...but now an exclusive is waiting...
402 self._addThread(target=self._doItSharer)
403 # ...so the sharer should be blocked as well
404 self.assertRaises(Queue.Empty, self.done.get_nowait)
407 # The exclusive passed before
408 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
409 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
412 def testWaitingSharerBlocksExclusive(self):
413 """SKIPPED testWaitingSharerBlocksExclusive"""
417 # the lock is acquired in exclusive mode...
418 self._addThread(target=self._doItSharer)
419 # ...but now a sharer is waiting...
420 self._addThread(target=self._doItExclusive)
421 # ...the exclusive is waiting too...
422 self.assertRaises(Queue.Empty, self.done.get_nowait)
425 # The sharer passed before
426 self.assertEqual(self.done.get_nowait(), 'SHR')
427 self.assertEqual(self.done.get_nowait(), 'EXC')
429 def testDelete(self):
431 self.assertRaises(errors.LockError, self.sl.acquire)
432 self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
433 self.assertRaises(errors.LockError, self.sl.delete)
435 def testDeleteTimeout(self):
436 self.sl.delete(timeout=60)
438 def testNoDeleteIfSharer(self):
439 self.sl.acquire(shared=1)
440 self.assertRaises(AssertionError, self.sl.delete)
443 def testDeletePendingSharersExclusiveDelete(self):
445 self._addThread(target=self._doItSharer)
446 self._addThread(target=self._doItSharer)
447 self._addThread(target=self._doItExclusive)
448 self._addThread(target=self._doItDelete)
451 # The threads who were pending return ERR
453 self.assertEqual(self.done.get_nowait(), 'ERR')
454 self.sl = locking.SharedLock(self.sl.name)
457 def testDeletePendingDeleteExclusiveSharers(self):
459 self._addThread(target=self._doItDelete)
460 self._addThread(target=self._doItExclusive)
461 self._addThread(target=self._doItSharer)
462 self._addThread(target=self._doItSharer)
465 # The two threads who were pending return both ERR
466 self.assertEqual(self.done.get_nowait(), 'ERR')
467 self.assertEqual(self.done.get_nowait(), 'ERR')
468 self.assertEqual(self.done.get_nowait(), 'ERR')
469 self.assertEqual(self.done.get_nowait(), 'ERR')
470 self.sl = locking.SharedLock(self.sl.name)
473 def testExclusiveAcquireTimeout(self):
474 for shared in [0, 1]:
475 on_queue = threading.Event()
476 release_exclusive = threading.Event()
478 def _LockExclusive():
479 self.sl.acquire(shared=0, test_notify=on_queue.set)
480 self.done.put("A: start wait")
481 release_exclusive.wait()
482 self.done.put("A: end wait")
485 # Start thread to hold lock in exclusive mode
486 self._addThread(target=_LockExclusive)
488 # Wait for wait to begin
489 self.assertEqual(self.done.get(timeout=60), "A: start wait")
491 # Wait up to 60s to get lock, but release exclusive lock as soon as we're
493 self.failUnless(self.sl.acquire(shared=shared, timeout=60,
494 test_notify=release_exclusive.set))
496 self.done.put("got 2nd")
501 self.assertEqual(self.done.get_nowait(), "A: end wait")
502 self.assertEqual(self.done.get_nowait(), "got 2nd")
503 self.assertRaises(Queue.Empty, self.done.get_nowait)
506 def testAcquireExpiringTimeout(self):
507 def _AcquireWithTimeout(shared, timeout):
508 if not self.sl.acquire(shared=shared, timeout=timeout):
509 self.done.put("timeout")
511 for shared in [0, 1]:
515 # Start shared acquires with timeout between 0 and 20 ms
517 self._addThread(target=_AcquireWithTimeout,
518 args=(shared, i * 2.0 / 1000.0))
520 # Wait for threads to finish (makes sure the acquire timeout expires
521 # before releasing the lock)
528 self.assertEqual(self.done.get_nowait(), "timeout")
530 self.assertRaises(Queue.Empty, self.done.get_nowait)
533 def testSharedSkipExclusiveAcquires(self):
534 # Tests whether shared acquires jump in front of exclusive acquires in the
537 def _Acquire(shared, name, notify_ev, wait_ev):
539 notify_fn = notify_ev.set
546 if not self.sl.acquire(shared=shared, test_notify=notify_fn):
552 # Get exclusive lock while we fill the queue
560 # Add acquires using threading.Event for synchronization. They'll be
561 # acquired exactly in the order defined in this list.
562 acquires = (shrcnt1 * [(1, "shared 1")] +
563 3 * [(0, "exclusive 1")] +
564 shrcnt2 * [(1, "shared 2")] +
565 shrcnt3 * [(1, "shared 3")] +
566 shrcnt4 * [(1, "shared 4")] +
567 3 * [(0, "exclusive 2")])
572 for args in acquires:
573 ev_cur = threading.Event()
574 self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
577 # Wait for last acquire to start
580 # Expect 6 pending exclusive acquires and 1 for all shared acquires
582 self.assertEqual(self.sl._count_pending(), 7)
584 # Release exclusive lock and wait
590 for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
591 # Shared locks aren't guaranteed to be notified in order, but they'll be
593 tmp = self.done.get_nowait()
594 if tmp == "shared 1":
596 elif tmp == "shared 2":
598 elif tmp == "shared 3":
600 elif tmp == "shared 4":
602 self.assertEqual(shrcnt1, 0)
603 self.assertEqual(shrcnt2, 0)
604 self.assertEqual(shrcnt3, 0)
605 self.assertEqual(shrcnt3, 0)
608 self.assertEqual(self.done.get_nowait(), "exclusive 1")
611 self.assertEqual(self.done.get_nowait(), "exclusive 2")
613 self.assertRaises(Queue.Empty, self.done.get_nowait)
615 def testIllegalDowngrade(self):
617 self.assertRaises(AssertionError, self.sl.downgrade)
619 # Acquire in shared mode, downgrade should be no-op
620 self.assertTrue(self.sl.acquire(shared=1))
621 self.assertTrue(self.sl._is_owned(shared=1))
622 self.assertTrue(self.sl.downgrade())
623 self.assertTrue(self.sl._is_owned(shared=1))
626 def testDowngrade(self):
627 self.assertTrue(self.sl.acquire())
628 self.assertTrue(self.sl._is_owned(shared=0))
629 self.assertTrue(self.sl.downgrade())
630 self.assertTrue(self.sl._is_owned(shared=1))
634 def testDowngradeJumpsAheadOfExclusive(self):
635 def _KeepExclusive(ev_got, ev_downgrade, ev_release):
636 self.assertTrue(self.sl.acquire())
637 self.assertTrue(self.sl._is_owned(shared=0))
640 self.assertTrue(self.sl._is_owned(shared=0))
641 self.assertTrue(self.sl.downgrade())
642 self.assertTrue(self.sl._is_owned(shared=1))
644 self.assertTrue(self.sl._is_owned(shared=1))
647 def _KeepExclusive2(ev_started, ev_release):
648 self.assertTrue(self.sl.acquire(test_notify=ev_started.set))
649 self.assertTrue(self.sl._is_owned(shared=0))
651 self.assertTrue(self.sl._is_owned(shared=0))
654 def _KeepShared(ev_started, ev_got, ev_release):
655 self.assertTrue(self.sl.acquire(shared=1, test_notify=ev_started.set))
656 self.assertTrue(self.sl._is_owned(shared=1))
659 self.assertTrue(self.sl._is_owned(shared=1))
662 # Acquire lock in exclusive mode
663 ev_got_excl1 = threading.Event()
664 ev_downgrade_excl1 = threading.Event()
665 ev_release_excl1 = threading.Event()
666 th_excl1 = self._addThread(target=_KeepExclusive,
667 args=(ev_got_excl1, ev_downgrade_excl1,
671 # Start a second exclusive acquire
672 ev_started_excl2 = threading.Event()
673 ev_release_excl2 = threading.Event()
674 th_excl2 = self._addThread(target=_KeepExclusive2,
675 args=(ev_started_excl2, ev_release_excl2))
676 ev_started_excl2.wait()
678 # Start shared acquires, will jump ahead of second exclusive acquire when
679 # first exclusive acquire downgrades
680 ev_shared = [(threading.Event(), threading.Event()) for _ in range(5)]
681 ev_release_shared = threading.Event()
683 th_shared = [self._addThread(target=_KeepShared,
684 args=(ev_started, ev_got, ev_release_shared))
685 for (ev_started, ev_got) in ev_shared]
687 # Wait for all shared acquires to start
688 for (ev, _) in ev_shared:
691 # Check lock information
692 self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER])),
693 (self.sl.name, "exclusive", [th_excl1.getName()], None))
694 (_, _, _, pending) = self.sl.GetInfo(set([query.LQ_PENDING]))
695 self.assertEqual([(pendmode, sorted(waiting))
696 for (pendmode, waiting) in pending],
697 [("exclusive", [th_excl2.getName()]),
698 ("shared", sorted(th.getName() for th in th_shared))])
700 # Shared acquires won't start until the exclusive lock is downgraded
701 ev_downgrade_excl1.set()
703 # Wait for all shared acquires to be successful
704 for (_, ev) in ev_shared:
707 # Check lock information again
708 self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_PENDING])),
709 (self.sl.name, "shared", None,
710 [("exclusive", [th_excl2.getName()])]))
711 (_, _, owner, _) = self.sl.GetInfo(set([query.LQ_OWNER]))
712 self.assertEqual(set(owner), set([th_excl1.getName()] +
713 [th.getName() for th in th_shared]))
715 ev_release_excl1.set()
716 ev_release_excl2.set()
717 ev_release_shared.set()
721 self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER,
723 (self.sl.name, None, None, []))
726 def testMixedAcquireTimeout(self):
727 sync = threading.Event()
729 def _AcquireShared(ev):
730 if not self.sl.acquire(shared=1, timeout=None):
733 self.done.put("shared")
738 # Wait for notification from main thread
746 ev = threading.Event()
747 self._addThread(target=_AcquireShared, args=(ev, ))
750 # Wait for all acquires to finish
754 self.assertEqual(self.sl._count_pending(), 0)
756 # Try to get exclusive lock
757 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
759 # Acquire exclusive without timeout
760 exclsync = threading.Event()
761 exclev = threading.Event()
763 def _AcquireExclusive():
764 if not self.sl.acquire(shared=0):
767 self.done.put("exclusive")
772 # Wait for notification from main thread
777 self._addThread(target=_AcquireExclusive)
779 # Try to get exclusive lock
780 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
782 # Make all shared holders release their locks
785 # Wait for exclusive acquire to succeed
788 self.assertEqual(self.sl._count_pending(), 0)
790 # Try to get exclusive lock
791 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
793 def _AcquireSharedSimple():
794 if self.sl.acquire(shared=1, timeout=None):
795 self.done.put("shared2")
799 self._addThread(target=_AcquireSharedSimple)
801 # Tell exclusive lock to release
804 # Wait for everything to finish
807 self.assertEqual(self.sl._count_pending(), 0)
811 self.assertEqual(self.done.get_nowait(), "shared")
813 self.assertEqual(self.done.get_nowait(), "exclusive")
816 self.assertEqual(self.done.get_nowait(), "shared2")
818 self.assertRaises(Queue.Empty, self.done.get_nowait)
820 def testPriority(self):
821 # Acquire in exclusive mode
822 self.assert_(self.sl.acquire(shared=0))
825 def _Acquire(prev, next, shared, priority, result):
827 self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
829 self.done.put(result)
833 counter = itertools.count(0)
834 priorities = range(-20, 30)
835 first = threading.Event()
841 # [(shared/exclusive, set(acquire names), set(pending threads)),
842 # (shared/exclusive, ...),
848 # References shared acquire per priority in L{perprio}. Data structure:
850 # priority: (shared=1, set(acquire names), set(pending threads)),
854 for seed in [4979, 9523, 14902, 32440]:
855 # Use a deterministic random generator
856 rnd = random.Random(seed)
857 for priority in [rnd.choice(priorities) for _ in range(30)]:
862 acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
864 ev = threading.Event()
865 thread = self._addThread(target=_Acquire,
866 args=(prev, ev, shared, priority, acqname))
869 # Record expected aqcuire, see above for structure
870 data = (shared, set([acqname]), set([thread]))
871 priolist = perprio.setdefault(priority, [])
873 priosh = prioshared.get(priority, None)
875 # Shared acquires are merged
876 for i, j in zip(priosh[1:], data[1:]):
878 assert data[0] == priosh[0]
880 prioshared[priority] = data
881 priolist.append(data)
883 priolist.append(data)
885 # Start all acquires and wait for them
889 # Check lock information
890 self.assertEqual(self.sl.GetInfo(set()), (self.sl.name, None, None, None))
891 self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER])),
892 (self.sl.name, "exclusive",
893 [threading.currentThread().getName()], None))
895 self._VerifyPrioPending(self.sl.GetInfo(set([query.LQ_PENDING])), perprio)
897 # Let threads acquire the lock
900 # Wait for everything to finish
903 self.assert_(self.sl._check_empty())
905 # Check acquires by priority
906 for acquires in [perprio[i] for i in sorted(perprio.keys())]:
907 for (_, names, _) in acquires:
908 # For shared acquires, the set will contain 1..n entries. For exclusive
911 names.remove(self.done.get_nowait())
912 self.assertFalse(compat.any(names for (_, names, _) in acquires))
914 self.assertRaises(Queue.Empty, self.done.get_nowait)
916 def _VerifyPrioPending(self, (name, mode, owner, pending), perprio):
917 self.assertEqual(name, self.sl.name)
918 self.assert_(mode is None)
919 self.assert_(owner is None)
921 self.assertEqual([(pendmode, sorted(waiting))
922 for (pendmode, waiting) in pending],
923 [(["exclusive", "shared"][int(bool(shared))],
924 sorted(t.getName() for t in threads))
925 for acquires in [perprio[i]
926 for i in sorted(perprio.keys())]
927 for (shared, _, threads) in acquires])
930 class TestSharedLockInCondition(_ThreadedTestCase):
931 """SharedLock as a condition lock tests"""
934 _ThreadedTestCase.setUp(self)
935 self.sl = locking.SharedLock("TestSharedLockInCondition")
938 def setCondition(self):
939 self.cond = threading.Condition(self.sl)
941 def testKeepMode(self):
942 self.cond.acquire(shared=1)
943 self.assert_(self.sl._is_owned(shared=1))
945 self.assert_(self.sl._is_owned(shared=1))
947 self.cond.acquire(shared=0)
948 self.assert_(self.sl._is_owned(shared=0))
950 self.assert_(self.sl._is_owned(shared=0))
954 class TestSharedLockInPipeCondition(TestSharedLockInCondition):
955 """SharedLock as a pipe condition lock tests"""
957 def setCondition(self):
958 self.cond = locking.PipeCondition(self.sl)
961 class TestSSynchronizedDecorator(_ThreadedTestCase):
962 """Shared Lock Synchronized decorator test"""
965 _ThreadedTestCase.setUp(self)
967 @locking.ssynchronized(_decoratorlock)
968 def _doItExclusive(self):
969 self.assert_(_decoratorlock._is_owned())
972 @locking.ssynchronized(_decoratorlock, shared=1)
973 def _doItSharer(self):
974 self.assert_(_decoratorlock._is_owned(shared=1))
977 def testDecoratedFunctions(self):
978 self._doItExclusive()
979 self.assertFalse(_decoratorlock._is_owned())
981 self.assertFalse(_decoratorlock._is_owned())
983 def testSharersCanCoexist(self):
984 _decoratorlock.acquire(shared=1)
985 threading.Thread(target=self._doItSharer).start()
986 self.assert_(self.done.get(True, 1))
987 _decoratorlock.release()
990 def testExclusiveBlocksExclusive(self):
991 _decoratorlock.acquire()
992 self._addThread(target=self._doItExclusive)
993 # give it a bit of time to check that it's not actually doing anything
994 self.assertRaises(Queue.Empty, self.done.get_nowait)
995 _decoratorlock.release()
997 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
1000 def testExclusiveBlocksSharer(self):
1001 _decoratorlock.acquire()
1002 self._addThread(target=self._doItSharer)
1003 self.assertRaises(Queue.Empty, self.done.get_nowait)
1004 _decoratorlock.release()
1006 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
1009 def testSharerBlocksExclusive(self):
1010 _decoratorlock.acquire(shared=1)
1011 self._addThread(target=self._doItExclusive)
1012 self.assertRaises(Queue.Empty, self.done.get_nowait)
1013 _decoratorlock.release()
1015 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
1018 class TestLockSet(_ThreadedTestCase):
1022 _ThreadedTestCase.setUp(self)
1026 """Helper to (re)initialize the lock set"""
1027 self.resources = ['one', 'two', 'three']
1028 self.ls = locking.LockSet(self.resources, "TestLockSet")
1030 def testResources(self):
1031 self.assertEquals(self.ls._names(), set(self.resources))
1032 newls = locking.LockSet([], "TestLockSet.testResources")
1033 self.assertEquals(newls._names(), set())
1035 def testAcquireRelease(self):
1036 self.assert_(self.ls.acquire('one'))
1037 self.assertEquals(self.ls._list_owned(), set(['one']))
1039 self.assertEquals(self.ls._list_owned(), set())
1040 self.assertEquals(self.ls.acquire(['one']), set(['one']))
1041 self.assertEquals(self.ls._list_owned(), set(['one']))
1043 self.assertEquals(self.ls._list_owned(), set())
1044 self.ls.acquire(['one', 'two', 'three'])
1045 self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
1046 self.ls.release('one')
1047 self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
1048 self.ls.release(['three'])
1049 self.assertEquals(self.ls._list_owned(), set(['two']))
1051 self.assertEquals(self.ls._list_owned(), set())
1052 self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
1053 self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
1055 self.assertEquals(self.ls._list_owned(), set())
1057 def testNoDoubleAcquire(self):
1058 self.ls.acquire('one')
1059 self.assertRaises(AssertionError, self.ls.acquire, 'one')
1060 self.assertRaises(AssertionError, self.ls.acquire, ['two'])
1061 self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
1063 self.ls.acquire(['one', 'three'])
1064 self.ls.release('one')
1065 self.assertRaises(AssertionError, self.ls.acquire, ['two'])
1066 self.ls.release('three')
1068 def testNoWrongRelease(self):
1069 self.assertRaises(AssertionError, self.ls.release)
1070 self.ls.acquire('one')
1071 self.assertRaises(AssertionError, self.ls.release, 'two')
1073 def testAddRemove(self):
1075 self.assertEquals(self.ls._list_owned(), set())
1076 self.assert_('four' in self.ls._names())
1077 self.ls.add(['five', 'six', 'seven'], acquired=1)
1078 self.assert_('five' in self.ls._names())
1079 self.assert_('six' in self.ls._names())
1080 self.assert_('seven' in self.ls._names())
1081 self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
1082 self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
1083 self.assert_('five' not in self.ls._names())
1084 self.assert_('six' not in self.ls._names())
1085 self.assertEquals(self.ls._list_owned(), set(['seven']))
1086 self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
1087 self.ls.remove('seven')
1088 self.assert_('seven' not in self.ls._names())
1089 self.assertEquals(self.ls._list_owned(), set([]))
1090 self.ls.acquire(None, shared=1)
1091 self.assertRaises(AssertionError, self.ls.add, 'eight')
1093 self.ls.acquire(None)
1094 self.ls.add('eight', acquired=1)
1095 self.assert_('eight' in self.ls._names())
1096 self.assert_('eight' in self.ls._list_owned())
1098 self.assert_('nine' in self.ls._names())
1099 self.assert_('nine' not in self.ls._list_owned())
1101 self.ls.remove(['two'])
1102 self.assert_('two' not in self.ls._names())
1103 self.ls.acquire('three')
1104 self.assertEquals(self.ls.remove(['three']), ['three'])
1105 self.assert_('three' not in self.ls._names())
1106 self.assertEquals(self.ls.remove('three'), [])
1107 self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
1108 self.assert_('one' not in self.ls._names())
1110 def testRemoveNonBlocking(self):
1111 self.ls.acquire('one')
1112 self.assertEquals(self.ls.remove('one'), ['one'])
1113 self.ls.acquire(['two', 'three'])
1114 self.assertEquals(self.ls.remove(['two', 'three']),
1117 def testNoDoubleAdd(self):
1118 self.assertRaises(errors.LockError, self.ls.add, 'two')
1120 self.assertRaises(errors.LockError, self.ls.add, 'four')
1122 def testNoWrongRemoves(self):
1123 self.ls.acquire(['one', 'three'], shared=1)
1124 # Cannot remove 'two' while holding something which is not a superset
1125 self.assertRaises(AssertionError, self.ls.remove, 'two')
1126 # Cannot remove 'three' as we are sharing it
1127 self.assertRaises(AssertionError, self.ls.remove, 'three')
1129 def testAcquireSetLock(self):
1130 # acquire the set-lock exclusively
1131 self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
1132 self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
1133 self.assertEquals(self.ls._is_owned(), True)
1134 self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
1135 # I can still add/remove elements...
1136 self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
1137 self.assert_(self.ls.add('six'))
1139 # share the set-lock
1140 self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
1141 # adding new elements is not possible
1142 self.assertRaises(AssertionError, self.ls.add, 'five')
1145 def testAcquireWithRepetitions(self):
1146 self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
1147 set(['two', 'two', 'three']))
1148 self.ls.release(['two', 'two'])
1149 self.assertEquals(self.ls._list_owned(), set(['three']))
1151 def testEmptyAcquire(self):
1152 # Acquire an empty list of locks...
1153 self.assertEquals(self.ls.acquire([]), set())
1154 self.assertEquals(self.ls._list_owned(), set())
1155 # New locks can still be addded
1156 self.assert_(self.ls.add('six'))
1157 # "re-acquiring" is not an issue, since we had really acquired nothing
1158 self.assertEquals(self.ls.acquire([], shared=1), set())
1159 self.assertEquals(self.ls._list_owned(), set())
1160 # We haven't really acquired anything, so we cannot release
1161 self.assertRaises(AssertionError, self.ls.release)
1163 def _doLockSet(self, names, shared):
1165 self.ls.acquire(names, shared=shared)
1166 self.done.put('DONE')
1168 except errors.LockError:
1169 self.done.put('ERR')
1171 def _doAddSet(self, names):
1173 self.ls.add(names, acquired=1)
1174 self.done.put('DONE')
1176 except errors.LockError:
1177 self.done.put('ERR')
1179 def _doRemoveSet(self, names):
1180 self.done.put(self.ls.remove(names))
1183 def testConcurrentSharedAcquire(self):
1184 self.ls.acquire(['one', 'two'], shared=1)
1185 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1187 self.assertEqual(self.done.get_nowait(), 'DONE')
1188 self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
1190 self.assertEqual(self.done.get_nowait(), 'DONE')
1191 self._addThread(target=self._doLockSet, args=('three', 1))
1193 self.assertEqual(self.done.get_nowait(), 'DONE')
1194 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1195 self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1196 self.assertRaises(Queue.Empty, self.done.get_nowait)
1199 self.assertEqual(self.done.get_nowait(), 'DONE')
1200 self.assertEqual(self.done.get_nowait(), 'DONE')
1203 def testConcurrentExclusiveAcquire(self):
1204 self.ls.acquire(['one', 'two'])
1205 self._addThread(target=self._doLockSet, args=('three', 1))
1207 self.assertEqual(self.done.get_nowait(), 'DONE')
1208 self._addThread(target=self._doLockSet, args=('three', 0))
1210 self.assertEqual(self.done.get_nowait(), 'DONE')
1211 self.assertRaises(Queue.Empty, self.done.get_nowait)
1212 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1213 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1214 self._addThread(target=self._doLockSet, args=('one', 0))
1215 self._addThread(target=self._doLockSet, args=('one', 1))
1216 self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1217 self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
1218 self.assertRaises(Queue.Empty, self.done.get_nowait)
1222 self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1225 def testSimpleAcquireTimeoutExpiring(self):
1226 names = sorted(self.ls._names())
1227 self.assert_(len(names) >= 3)
1229 # Get name of first lock
1232 # Get name of last lock
1236 # Block first and try to lock it again
1239 # Block last and try to lock all locks
1242 # Block last and try to lock it again
1246 for (wanted, block) in checks:
1247 # Lock in exclusive mode
1248 self.assert_(self.ls.acquire(block, shared=0))
1251 # Try to get the same lock again with a timeout (should never succeed)
1252 acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1254 self.done.put("acquired")
1257 self.assert_(acquired is None)
1258 self.assertFalse(self.ls._list_owned())
1259 self.assertFalse(self.ls._is_owned())
1260 self.done.put("not acquired")
1262 self._addThread(target=_AcquireOne)
1264 # Wait for timeout in thread to expire
1267 # Release exclusive lock again
1270 self.assertEqual(self.done.get_nowait(), "not acquired")
1271 self.assertRaises(Queue.Empty, self.done.get_nowait)
1274 def testDelayedAndExpiringLockAcquire(self):
1276 self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1278 for expire in (False, True):
1279 names = sorted(self.ls._names())
1280 self.assertEqual(len(names), 8)
1282 lock_ev = dict([(i, threading.Event()) for i in names])
1284 # Lock all in exclusive mode
1285 self.assert_(self.ls.acquire(names, shared=0))
1288 # We'll wait at least 300ms per lock
1289 lockwait = len(names) * [0.3]
1291 # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1292 # this gives us up to 2.4s to fail.
1293 lockall_timeout = 0.4
1295 # This should finish rather quickly
1297 lockall_timeout = len(names) * 5.0
1300 def acquire_notification(name):
1302 self.done.put("getting %s" % name)
1307 if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1308 test_notify=acquire_notification):
1309 self.done.put("got all")
1312 self.done.put("timeout on all")
1315 for ev in lock_ev.values():
1318 t = self._addThread(target=_LockAll)
1320 for idx, name in enumerate(names):
1321 # Wait for actual acquire on this lock to start
1322 lock_ev[name].wait(10.0)
1324 if expire and t.isAlive():
1325 # Wait some time after getting the notification to make sure the lock
1326 # acquire will expire
1327 SafeSleep(lockwait[idx])
1329 self.ls.release(names=name)
1331 self.assertFalse(self.ls._list_owned())
1336 # Not checking which locks were actually acquired. Doing so would be
1337 # too timing-dependant.
1338 self.assertEqual(self.done.get_nowait(), "timeout on all")
1341 self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1342 self.assertEqual(self.done.get_nowait(), "got all")
1343 self.assertRaises(Queue.Empty, self.done.get_nowait)
1346 def testConcurrentRemove(self):
1348 self.ls.acquire(['one', 'two', 'four'])
1349 self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1350 self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1351 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1352 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1353 self.assertRaises(Queue.Empty, self.done.get_nowait)
1354 self.ls.remove('one')
1358 self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1359 self.ls.add(['five', 'six'], acquired=1)
1360 self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1361 self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1362 self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1363 self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1364 self.ls.remove('five')
1368 self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1369 self.ls.acquire(['three', 'four'])
1370 self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1371 self.assertRaises(Queue.Empty, self.done.get_nowait)
1372 self.ls.remove('four')
1374 self.assertEqual(self.done.get_nowait(), ['six'])
1375 self._addThread(target=self._doRemoveSet, args=(['two']))
1377 self.assertEqual(self.done.get_nowait(), ['two'])
1383 def testConcurrentSharedSetLock(self):
1384 # share the set-lock...
1385 self.ls.acquire(None, shared=1)
1386 # ...another thread can share it too
1387 self._addThread(target=self._doLockSet, args=(None, 1))
1389 self.assertEqual(self.done.get_nowait(), 'DONE')
1390 # ...or just share some elements
1391 self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1393 self.assertEqual(self.done.get_nowait(), 'DONE')
1394 # ...but not add new ones or remove any
1395 t = self._addThread(target=self._doAddSet, args=(['nine']))
1396 self._addThread(target=self._doRemoveSet, args=(['two'], ))
1397 self.assertRaises(Queue.Empty, self.done.get_nowait)
1398 # this just releases the set-lock
1401 self.assertEqual(self.done.get_nowait(), 'DONE')
1402 # release the lock on the actual elements so remove() can proceed too
1405 self.failUnlessEqual(self.done.get_nowait(), ['two'])
1410 def testConcurrentExclusiveSetLock(self):
1411 # acquire the set-lock...
1412 self.ls.acquire(None, shared=0)
1413 # ...no one can do anything else
1414 self._addThread(target=self._doLockSet, args=(None, 1))
1415 self._addThread(target=self._doLockSet, args=(None, 0))
1416 self._addThread(target=self._doLockSet, args=(['three'], 0))
1417 self._addThread(target=self._doLockSet, args=(['two'], 1))
1418 self._addThread(target=self._doAddSet, args=(['nine']))
1419 self.assertRaises(Queue.Empty, self.done.get_nowait)
1423 self.assertEqual(self.done.get(True, 1), 'DONE')
1428 def testConcurrentSetLockAdd(self):
1429 self.ls.acquire('one')
1430 # Another thread wants the whole SetLock
1431 self._addThread(target=self._doLockSet, args=(None, 0))
1432 self._addThread(target=self._doLockSet, args=(None, 1))
1433 self.assertRaises(Queue.Empty, self.done.get_nowait)
1434 self.assertRaises(AssertionError, self.ls.add, 'four')
1437 self.assertEqual(self.done.get_nowait(), 'DONE')
1438 self.assertEqual(self.done.get_nowait(), 'DONE')
1439 self.ls.acquire(None)
1440 self._addThread(target=self._doLockSet, args=(None, 0))
1441 self._addThread(target=self._doLockSet, args=(None, 1))
1442 self.assertRaises(Queue.Empty, self.done.get_nowait)
1444 self.ls.add('five', acquired=1)
1445 self.ls.add('six', acquired=1, shared=1)
1446 self.assertEquals(self.ls._list_owned(),
1447 set(['one', 'two', 'three', 'five', 'six']))
1448 self.assertEquals(self.ls._is_owned(), True)
1449 self.assertEquals(self.ls._names(),
1450 set(['one', 'two', 'three', 'four', 'five', 'six']))
1453 self.assertEqual(self.done.get_nowait(), 'DONE')
1454 self.assertEqual(self.done.get_nowait(), 'DONE')
1458 def testEmptyLockSet(self):
1460 self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1462 self.ls.remove(['one', 'two', 'three'])
1463 # and adds/locks by another thread still wait
1464 self._addThread(target=self._doAddSet, args=(['nine']))
1465 self._addThread(target=self._doLockSet, args=(None, 1))
1466 self._addThread(target=self._doLockSet, args=(None, 0))
1467 self.assertRaises(Queue.Empty, self.done.get_nowait)
1471 self.assertEqual(self.done.get_nowait(), 'DONE')
1473 self.assertEqual(self.ls.remove(['nine']), ['nine'])
1475 self.assertEqual(self.ls.acquire(None, shared=1), set())
1476 # other sharers can go, adds still wait
1477 self._addThread(target=self._doLockSet, args=(None, 1))
1479 self.assertEqual(self.done.get_nowait(), 'DONE')
1480 self._addThread(target=self._doAddSet, args=(['nine']))
1481 self.assertRaises(Queue.Empty, self.done.get_nowait)
1484 self.assertEqual(self.done.get_nowait(), 'DONE')
1487 def testAcquireWithNamesDowngrade(self):
1488 self.assertEquals(self.ls.acquire("two", shared=0), set(["two"]))
1489 self.assertTrue(self.ls._is_owned())
1490 self.assertFalse(self.ls._get_lock()._is_owned())
1492 self.assertFalse(self.ls._is_owned())
1493 self.assertFalse(self.ls._get_lock()._is_owned())
1494 # Can't downgrade after releasing
1495 self.assertRaises(AssertionError, self.ls.downgrade, "two")
1497 def testDowngrade(self):
1498 # Not owning anything, must raise an exception
1499 self.assertFalse(self.ls._is_owned())
1500 self.assertRaises(AssertionError, self.ls.downgrade)
1502 self.assertFalse(compat.any(i._is_owned()
1503 for i in self.ls._get_lockdict().values()))
1505 self.assertEquals(self.ls.acquire(None, shared=0),
1506 set(["one", "two", "three"]))
1507 self.assertRaises(AssertionError, self.ls.downgrade, "unknown lock")
1509 self.assertTrue(self.ls._get_lock()._is_owned(shared=0))
1510 self.assertTrue(compat.all(i._is_owned(shared=0)
1511 for i in self.ls._get_lockdict().values()))
1513 # Start downgrading locks
1514 self.assertTrue(self.ls.downgrade(names=["one"]))
1515 self.assertTrue(self.ls._get_lock()._is_owned(shared=0))
1516 self.assertTrue(compat.all(lock._is_owned(shared=[0, 1][int(name == "one")])
1518 self.ls._get_lockdict().items()))
1520 self.assertTrue(self.ls.downgrade(names="two"))
1521 self.assertTrue(self.ls._get_lock()._is_owned(shared=0))
1522 should_share = lambda name: [0, 1][int(name in ("one", "two"))]
1523 self.assertTrue(compat.all(lock._is_owned(shared=should_share(name))
1525 self.ls._get_lockdict().items()))
1527 # Downgrading the last exclusive lock to shared must downgrade the
1528 # lockset-internal lock too
1529 self.assertTrue(self.ls.downgrade(names="three"))
1530 self.assertTrue(self.ls._get_lock()._is_owned(shared=1))
1531 self.assertTrue(compat.all(i._is_owned(shared=1)
1532 for i in self.ls._get_lockdict().values()))
1534 # Downgrading a shared lock must be a no-op
1535 self.assertTrue(self.ls.downgrade(names=["one", "three"]))
1536 self.assertTrue(self.ls._get_lock()._is_owned(shared=1))
1537 self.assertTrue(compat.all(i._is_owned(shared=1)
1538 for i in self.ls._get_lockdict().values()))
1542 def testPriority(self):
1543 def _Acquire(prev, next, name, priority, success_fn):
1545 self.assert_(self.ls.acquire(name, shared=0,
1547 test_notify=lambda _: next.set()))
1553 # Get all in exclusive mode
1554 self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
1556 done_two = Queue.Queue(0)
1558 first = threading.Event()
1561 acquires = [("one", prio, self.done) for prio in range(1, 33)]
1562 acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
1564 # Use a deterministic random generator
1565 random.Random(741).shuffle(acquires)
1567 for (name, prio, done) in acquires:
1568 ev = threading.Event()
1569 self._addThread(target=_Acquire,
1570 args=(prev, ev, name, prio,
1571 compat.partial(done.put, "Prio%s" % prio)))
1577 # Wait for last acquire to start
1580 # Let threads acquire locks
1583 # Wait for threads to finish
1586 for i in range(1, 33):
1587 self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
1588 self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
1590 self.assertRaises(Queue.Empty, self.done.get_nowait)
1591 self.assertRaises(Queue.Empty, done_two.get_nowait)
1594 class TestGanetiLockManager(_ThreadedTestCase):
1597 _ThreadedTestCase.setUp(self)
1598 self.nodes=['n1', 'n2']
1599 self.nodegroups=['g1', 'g2']
1600 self.instances=['i1', 'i2', 'i3']
1601 self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups,
1605 # Don't try this at home...
1606 locking.GanetiLockManager._instance = None
1608 def testLockingConstants(self):
1609 # The locking library internally cheats by assuming its constants have some
1610 # relationships with each other. Check those hold true.
1611 # This relationship is also used in the Processor to recursively acquire
1612 # the right locks. Again, please don't break it.
1613 for i in range(len(locking.LEVELS)):
1614 self.assertEqual(i, locking.LEVELS[i])
1616 def testDoubleGLFails(self):
1617 self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [])
1619 def testLockNames(self):
1620 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1621 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1622 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1623 set(self.nodegroups))
1624 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1625 set(self.instances))
1627 def testInitAndResources(self):
1628 locking.GanetiLockManager._instance = None
1629 self.GL = locking.GanetiLockManager([], [], [])
1630 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1631 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1632 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1633 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1635 locking.GanetiLockManager._instance = None
1636 self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [])
1637 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1638 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1639 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1640 set(self.nodegroups))
1641 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1643 locking.GanetiLockManager._instance = None
1644 self.GL = locking.GanetiLockManager([], [], self.instances)
1645 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1646 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1647 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1648 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1649 set(self.instances))
1651 def testAcquireRelease(self):
1652 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1653 self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1654 self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1655 self.GL.acquire(locking.LEVEL_NODEGROUP, ['g2'])
1656 self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1657 self.GL.release(locking.LEVEL_NODE, ['n2'])
1658 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1659 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1660 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1661 self.GL.release(locking.LEVEL_NODE)
1662 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1663 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1664 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1665 self.GL.release(locking.LEVEL_NODEGROUP)
1666 self.GL.release(locking.LEVEL_INSTANCE)
1667 self.assertRaises(errors.LockError, self.GL.acquire,
1668 locking.LEVEL_INSTANCE, ['i5'])
1669 self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1670 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1672 def testAcquireWholeSets(self):
1673 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1674 self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1675 set(self.instances))
1676 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1677 set(self.instances))
1678 self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
1679 set(self.nodegroups))
1680 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP),
1681 set(self.nodegroups))
1682 self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1684 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1686 self.GL.release(locking.LEVEL_NODE)
1687 self.GL.release(locking.LEVEL_NODEGROUP)
1688 self.GL.release(locking.LEVEL_INSTANCE)
1689 self.GL.release(locking.LEVEL_CLUSTER)
1691 def testAcquireWholeAndPartial(self):
1692 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1693 self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1694 set(self.instances))
1695 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1696 set(self.instances))
1697 self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1699 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1701 self.GL.release(locking.LEVEL_NODE)
1702 self.GL.release(locking.LEVEL_INSTANCE)
1703 self.GL.release(locking.LEVEL_CLUSTER)
1705 def testBGLDependency(self):
1706 self.assertRaises(AssertionError, self.GL.acquire,
1707 locking.LEVEL_NODE, ['n1', 'n2'])
1708 self.assertRaises(AssertionError, self.GL.acquire,
1709 locking.LEVEL_INSTANCE, ['i3'])
1710 self.assertRaises(AssertionError, self.GL.acquire,
1711 locking.LEVEL_NODEGROUP, ['g1'])
1712 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1713 self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1714 self.assertRaises(AssertionError, self.GL.release,
1715 locking.LEVEL_CLUSTER, ['BGL'])
1716 self.assertRaises(AssertionError, self.GL.release,
1717 locking.LEVEL_CLUSTER)
1718 self.GL.release(locking.LEVEL_NODE)
1719 self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1720 self.assertRaises(AssertionError, self.GL.release,
1721 locking.LEVEL_CLUSTER, ['BGL'])
1722 self.assertRaises(AssertionError, self.GL.release,
1723 locking.LEVEL_CLUSTER)
1724 self.GL.release(locking.LEVEL_INSTANCE)
1725 self.GL.acquire(locking.LEVEL_NODEGROUP, None)
1726 self.GL.release(locking.LEVEL_NODEGROUP, ['g1'])
1727 self.assertRaises(AssertionError, self.GL.release,
1728 locking.LEVEL_CLUSTER, ['BGL'])
1729 self.assertRaises(AssertionError, self.GL.release,
1730 locking.LEVEL_CLUSTER)
1731 self.GL.release(locking.LEVEL_NODEGROUP)
1732 self.GL.release(locking.LEVEL_CLUSTER)
1734 def testWrongOrder(self):
1735 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1736 self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1737 self.assertRaises(AssertionError, self.GL.acquire,
1738 locking.LEVEL_NODE, ['n1'])
1739 self.assertRaises(AssertionError, self.GL.acquire,
1740 locking.LEVEL_NODEGROUP, ['g1'])
1741 self.assertRaises(AssertionError, self.GL.acquire,
1742 locking.LEVEL_INSTANCE, ['i2'])
1744 def testModifiableLevels(self):
1745 self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER,
1747 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'])
1748 self.GL.add(locking.LEVEL_INSTANCE, ['i4'])
1749 self.GL.remove(locking.LEVEL_INSTANCE, ['i3'])
1750 self.GL.remove(locking.LEVEL_INSTANCE, ['i1'])
1751 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(['i2', 'i4']))
1752 self.GL.add(locking.LEVEL_NODE, ['n3'])
1753 self.GL.remove(locking.LEVEL_NODE, ['n1'])
1754 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(['n2', 'n3']))
1755 self.GL.add(locking.LEVEL_NODEGROUP, ['g3'])
1756 self.GL.remove(locking.LEVEL_NODEGROUP, ['g2'])
1757 self.GL.remove(locking.LEVEL_NODEGROUP, ['g1'])
1758 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(['g3']))
1759 self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER,
1762 # Helper function to run as a thread that shared the BGL and then acquires
1763 # some locks at another level.
1764 def _doLock(self, level, names, shared):
1766 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1767 self.GL.acquire(level, names, shared=shared)
1768 self.done.put('DONE')
1769 self.GL.release(level)
1770 self.GL.release(locking.LEVEL_CLUSTER)
1771 except errors.LockError:
1772 self.done.put('ERR')
1775 def testConcurrency(self):
1776 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1777 self._addThread(target=self._doLock,
1778 args=(locking.LEVEL_INSTANCE, 'i1', 1))
1780 self.assertEqual(self.done.get_nowait(), 'DONE')
1781 self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1782 self._addThread(target=self._doLock,
1783 args=(locking.LEVEL_INSTANCE, 'i1', 1))
1785 self.assertEqual(self.done.get_nowait(), 'DONE')
1786 self._addThread(target=self._doLock,
1787 args=(locking.LEVEL_INSTANCE, 'i3', 1))
1788 self.assertRaises(Queue.Empty, self.done.get_nowait)
1789 self.GL.release(locking.LEVEL_INSTANCE)
1791 self.assertEqual(self.done.get_nowait(), 'DONE')
1792 self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1793 self._addThread(target=self._doLock,
1794 args=(locking.LEVEL_INSTANCE, 'i2', 1))
1796 self.assertEqual(self.done.get_nowait(), 'DONE')
1797 self._addThread(target=self._doLock,
1798 args=(locking.LEVEL_INSTANCE, 'i2', 0))
1799 self.assertRaises(Queue.Empty, self.done.get_nowait)
1800 self.GL.release(locking.LEVEL_INSTANCE)
1802 self.assertEqual(self.done.get(True, 1), 'DONE')
1803 self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1806 class TestLockMonitor(_ThreadedTestCase):
1808 _ThreadedTestCase.setUp(self)
1809 self.lm = locking.LockMonitor()
1811 def testSingleThread(self):
1814 for i in range(100):
1815 name = "TestLock%s" % i
1816 locks.append(locking.SharedLock(name, monitor=self.lm))
1818 self.assertEqual(len(self.lm._locks), len(locks))
1819 result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"]))
1820 self.assertEqual(len(result.fields), 1)
1821 self.assertEqual(len(result.data), 100)
1826 # The garbage collector might needs some time
1829 raise utils.RetryAgain()
1831 utils.Retry(_CheckLocks, 0.1, 30.0)
1833 self.assertFalse(self.lm._locks)
1835 def testMultiThread(self):
1838 def _CreateLock(prev, next, name):
1840 locks.append(locking.SharedLock(name, monitor=self.lm))
1846 first = threading.Event()
1849 # Use a deterministic random generator
1850 for i in random.Random(4263).sample(range(100), 33):
1851 name = "MtTestLock%s" % i
1852 expnames.append(name)
1854 ev = threading.Event()
1855 self._addThread(target=_CreateLock, args=(prev, ev, name))
1862 # Check order in which locks were added
1863 self.assertEqual([i.name for i in locks], expnames)
1865 # Check query result
1866 result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1867 self.assert_(isinstance(result, dict))
1868 response = objects.QueryResponse.FromDict(result)
1869 self.assertEqual(response.data,
1870 [[(constants.RS_NORMAL, name),
1871 (constants.RS_NORMAL, None),
1872 (constants.RS_NORMAL, None),
1873 (constants.RS_NORMAL, [])]
1874 for name in utils.NiceSort(expnames)])
1875 self.assertEqual(len(response.fields), 4)
1876 self.assertEqual(["name", "mode", "owner", "pending"],
1877 [fdef.name for fdef in response.fields])
1879 # Test exclusive acquire
1880 for tlock in locks[::4]:
1881 tlock.acquire(shared=0)
1883 def _GetExpResult(name):
1884 if tlock.name == name:
1885 return [(constants.RS_NORMAL, name),
1886 (constants.RS_NORMAL, "exclusive"),
1887 (constants.RS_NORMAL,
1888 [threading.currentThread().getName()]),
1889 (constants.RS_NORMAL, [])]
1890 return [(constants.RS_NORMAL, name),
1891 (constants.RS_NORMAL, None),
1892 (constants.RS_NORMAL, None),
1893 (constants.RS_NORMAL, [])]
1895 result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1896 self.assertEqual(objects.QueryResponse.FromDict(result).data,
1897 [_GetExpResult(name)
1898 for name in utils.NiceSort(expnames)])
1902 # Test shared acquire
1903 def _Acquire(lock, shared, ev, notify):
1904 lock.acquire(shared=shared)
1911 for tlock1 in locks[::11]:
1912 for tlock2 in locks[::-15]:
1913 if tlock2 == tlock1:
1917 for tlock3 in locks[::10]:
1918 if tlock3 in (tlock2, tlock1):
1922 releaseev = threading.Event()
1928 ev = threading.Event()
1929 tthreads1.append(self._addThread(target=_Acquire,
1930 args=(tlock1, 1, releaseev, ev)))
1931 acquireev.append(ev)
1933 ev = threading.Event()
1934 tthread2 = self._addThread(target=_Acquire,
1935 args=(tlock2, 1, releaseev, ev))
1936 acquireev.append(ev)
1938 ev = threading.Event()
1939 tthread3 = self._addThread(target=_Acquire,
1940 args=(tlock3, 0, releaseev, ev))
1941 acquireev.append(ev)
1943 # Wait for all locks to be acquired
1947 # Check query result
1948 result = self.lm.QueryLocks(["name", "mode", "owner"])
1949 response = objects.QueryResponse.FromDict(result)
1950 for (name, mode, owner) in response.data:
1951 (name_status, name_value) = name
1952 (owner_status, owner_value) = owner
1954 self.assertEqual(name_status, constants.RS_NORMAL)
1955 self.assertEqual(owner_status, constants.RS_NORMAL)
1957 if name_value == tlock1.name:
1958 self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
1959 self.assertEqual(set(owner_value),
1960 set(i.getName() for i in tthreads1))
1963 if name_value == tlock2.name:
1964 self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
1965 self.assertEqual(owner_value, [tthread2.getName()])
1968 if name_value == tlock3.name:
1969 self.assertEqual(mode, (constants.RS_NORMAL, "exclusive"))
1970 self.assertEqual(owner_value, [tthread3.getName()])
1973 self.assert_(name_value in expnames)
1974 self.assertEqual(mode, (constants.RS_NORMAL, None))
1975 self.assert_(owner_value is None)
1977 # Release locks again
1982 result = self.lm.QueryLocks(["name", "mode", "owner"])
1983 self.assertEqual(objects.QueryResponse.FromDict(result).data,
1984 [[(constants.RS_NORMAL, name),
1985 (constants.RS_NORMAL, None),
1986 (constants.RS_NORMAL, None)]
1987 for name in utils.NiceSort(expnames)])
1989 def testDelete(self):
1990 lock = locking.SharedLock("TestLock", monitor=self.lm)
1992 self.assertEqual(len(self.lm._locks), 1)
1993 result = self.lm.QueryLocks(["name", "mode", "owner"])
1994 self.assertEqual(objects.QueryResponse.FromDict(result).data,
1995 [[(constants.RS_NORMAL, lock.name),
1996 (constants.RS_NORMAL, None),
1997 (constants.RS_NORMAL, None)]])
2001 result = self.lm.QueryLocks(["name", "mode", "owner"])
2002 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2003 [[(constants.RS_NORMAL, lock.name),
2004 (constants.RS_NORMAL, "deleted"),
2005 (constants.RS_NORMAL, None)]])
2006 self.assertEqual(len(self.lm._locks), 1)
2008 def testPending(self):
2009 def _Acquire(lock, shared, prev, next):
2012 lock.acquire(shared=shared, test_notify=next.set)
2018 lock = locking.SharedLock("ExcLock", monitor=self.lm)
2020 for shared in [0, 1]:
2023 self.assertEqual(len(self.lm._locks), 1)
2024 result = self.lm.QueryLocks(["name", "mode", "owner"])
2025 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2026 [[(constants.RS_NORMAL, lock.name),
2027 (constants.RS_NORMAL, "exclusive"),
2028 (constants.RS_NORMAL,
2029 [threading.currentThread().getName()])]])
2033 first = threading.Event()
2037 ev = threading.Event()
2038 threads.append(self._addThread(target=_Acquire,
2039 args=(lock, shared, prev, ev)))
2045 # Wait for last acquire to start waiting
2048 # NOTE: This works only because QueryLocks will acquire the
2049 # lock-internal lock again and won't be able to get the information
2050 # until it has the lock. By then the acquire should be registered in
2051 # SharedLock.__pending (otherwise it's a bug).
2053 # All acquires are waiting now
2055 pending = [("shared", utils.NiceSort(t.getName() for t in threads))]
2057 pending = [("exclusive", [t.getName()]) for t in threads]
2059 result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2060 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2061 [[(constants.RS_NORMAL, lock.name),
2062 (constants.RS_NORMAL, "exclusive"),
2063 (constants.RS_NORMAL,
2064 [threading.currentThread().getName()]),
2065 (constants.RS_NORMAL, pending)]])
2067 self.assertEqual(len(self.lm._locks), 1)
2073 # No pending acquires
2074 result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2075 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2076 [[(constants.RS_NORMAL, lock.name),
2077 (constants.RS_NORMAL, None),
2078 (constants.RS_NORMAL, None),
2079 (constants.RS_NORMAL, [])]])
2081 self.assertEqual(len(self.lm._locks), 1)
2083 def testDeleteAndRecreate(self):
2084 lname = "TestLock101923193"
2086 # Create some locks with the same name and keep all references
2087 locks = [locking.SharedLock(lname, monitor=self.lm)
2090 self.assertEqual(len(self.lm._locks), len(locks))
2092 result = self.lm.QueryLocks(["name", "mode", "owner"])
2093 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2094 [[(constants.RS_NORMAL, lname),
2095 (constants.RS_NORMAL, None),
2096 (constants.RS_NORMAL, None)]] * 5)
2100 # Check information order
2101 result = self.lm.QueryLocks(["name", "mode", "owner"])
2102 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2103 [[(constants.RS_NORMAL, lname),
2104 (constants.RS_NORMAL, None),
2105 (constants.RS_NORMAL, None)]] * 2 +
2106 [[(constants.RS_NORMAL, lname),
2107 (constants.RS_NORMAL, "deleted"),
2108 (constants.RS_NORMAL, None)]] +
2109 [[(constants.RS_NORMAL, lname),
2110 (constants.RS_NORMAL, None),
2111 (constants.RS_NORMAL, None)]] * 2)
2113 locks[1].acquire(shared=0)
2116 [(constants.RS_NORMAL, lname),
2117 (constants.RS_NORMAL, None),
2118 (constants.RS_NORMAL, None)],
2119 [(constants.RS_NORMAL, lname),
2120 (constants.RS_NORMAL, "exclusive"),
2121 (constants.RS_NORMAL, [threading.currentThread().getName()])],
2122 [(constants.RS_NORMAL, lname),
2123 (constants.RS_NORMAL, "deleted"),
2124 (constants.RS_NORMAL, None)],
2125 [(constants.RS_NORMAL, lname),
2126 (constants.RS_NORMAL, None),
2127 (constants.RS_NORMAL, None)],
2128 [(constants.RS_NORMAL, lname),
2129 (constants.RS_NORMAL, None),
2130 (constants.RS_NORMAL, None)],
2133 # Check information order
2134 result = self.lm.QueryLocks(["name", "mode", "owner"])
2135 self.assertEqual(objects.QueryResponse.FromDict(result).data, last_status)
2137 self.assertEqual(len(set(self.lm._locks.values())), len(locks))
2138 self.assertEqual(len(self.lm._locks), len(locks))
2140 # Check lock deletion
2141 for idx in range(len(locks)):
2143 assert gc.isenabled()
2145 self.assertEqual(len(self.lm._locks), len(locks))
2146 result = self.lm.QueryLocks(["name", "mode", "owner"])
2147 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2148 last_status[idx + 1:])
2150 # All locks should have been deleted
2152 self.assertFalse(self.lm._locks)
2154 result = self.lm.QueryLocks(["name", "mode", "owner"])
2155 self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
2158 if __name__ == '__main__':
2159 testutils.GanetiTestProgram()