4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the locking module"""
31 from ganeti import locking
32 from ganeti import errors
37 # This is used to test the ssynchronize decorator.
38 # Since it's passed as input to a decorator it must be declared as a global.
39 _decoratorlock = locking.SharedLock()
41 #: List for looping tests
46 """Decorator for executing a function many times"""
47 def wrapper(*args, **kwargs):
53 def SafeSleep(duration):
56 delay = start + duration - time.time()
62 class _ThreadedTestCase(unittest.TestCase):
63 """Test class that supports adding/waiting on threads"""
65 unittest.TestCase.setUp(self)
66 self.done = Queue.Queue(0)
69 def _addThread(self, *args, **kwargs):
70 """Create and remember a new thread"""
71 t = threading.Thread(*args, **kwargs)
72 self.threads.append(t)
76 def _waitThreads(self):
77 """Wait for all our threads to finish"""
78 for t in self.threads:
80 self.failIf(t.isAlive())
84 class _ConditionTestCase(_ThreadedTestCase):
85 """Common test case for conditions"""
88 _ThreadedTestCase.setUp(self)
89 self.lock = threading.Lock()
90 self.cond = cls(self.lock)
92 def _testAcquireRelease(self):
93 self.assert_(not self.cond._is_owned())
94 self.assertRaises(RuntimeError, self.cond.wait)
95 self.assertRaises(RuntimeError, self.cond.notifyAll)
98 self.assert_(self.cond._is_owned())
100 self.assert_(self.cond._is_owned())
103 self.assert_(not self.cond._is_owned())
104 self.assertRaises(RuntimeError, self.cond.wait)
105 self.assertRaises(RuntimeError, self.cond.notifyAll)
107 def _testNotification(self):
112 self.cond.notifyAll()
117 self._addThread(target=_NotifyAll)
118 self.assertEqual(self.done.get(True, 1), "NE")
119 self.assertRaises(Queue.Empty, self.done.get_nowait)
121 self.assertEqual(self.done.get(True, 1), "NA")
122 self.assertEqual(self.done.get(True, 1), "NN")
123 self.assert_(self.cond._is_owned())
125 self.assert_(not self.cond._is_owned())
128 class TestSingleNotifyPipeCondition(_ConditionTestCase):
129 """SingleNotifyPipeCondition tests"""
132 _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
134 def testAcquireRelease(self):
135 self._testAcquireRelease()
137 def testNotification(self):
138 self._testNotification()
140 def testWaitReuse(self):
146 def testNoNotifyReuse(self):
148 self.cond.notifyAll()
149 self.assertRaises(RuntimeError, self.cond.wait)
150 self.assertRaises(RuntimeError, self.cond.notifyAll)
154 class TestPipeCondition(_ConditionTestCase):
155 """PipeCondition tests"""
158 _ConditionTestCase.setUp(self, locking.PipeCondition)
160 def testAcquireRelease(self):
161 self._testAcquireRelease()
163 def testNotification(self):
164 self._testNotification()
166 def _TestWait(self, fn):
167 self._addThread(target=fn)
168 self._addThread(target=fn)
169 self._addThread(target=fn)
171 # Wait for threads to be waiting
172 self.assertEqual(self.done.get(True, 1), "A")
173 self.assertEqual(self.done.get(True, 1), "A")
174 self.assertEqual(self.done.get(True, 1), "A")
176 self.assertRaises(Queue.Empty, self.done.get_nowait)
179 self.assertEqual(self.cond._nwaiters, 3)
180 # This new thread can"t acquire the lock, and thus call wait, before we
182 self._addThread(target=fn)
183 self.cond.notifyAll()
184 self.assertRaises(Queue.Empty, self.done.get_nowait)
187 # We should now get 3 W and 1 A (for the new thread) in whatever order
191 got = self.done.get(True, 1)
197 self.fail("Got %s on the done queue" % got)
199 self.assertEqual(w, 3)
200 self.assertEqual(a, 1)
203 self.cond.notifyAll()
206 self.assertEqual(self.done.get_nowait(), "W")
207 self.assertRaises(Queue.Empty, self.done.get_nowait)
209 def testBlockingWait(self):
217 self._TestWait(_BlockingWait)
219 def testLongTimeoutWait(self):
227 self._TestWait(_Helper)
229 def _TimeoutWait(self, timeout, check):
231 self.cond.wait(timeout)
235 def testShortTimeoutWait(self):
236 self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
237 self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
239 self.assertEqual(self.done.get_nowait(), "T1")
240 self.assertEqual(self.done.get_nowait(), "T1")
241 self.assertRaises(Queue.Empty, self.done.get_nowait)
243 def testZeroTimeoutWait(self):
244 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
245 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
246 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
248 self.assertEqual(self.done.get_nowait(), "T0")
249 self.assertEqual(self.done.get_nowait(), "T0")
250 self.assertEqual(self.done.get_nowait(), "T0")
251 self.assertRaises(Queue.Empty, self.done.get_nowait)
254 class TestSharedLock(_ThreadedTestCase):
255 """SharedLock tests"""
258 _ThreadedTestCase.setUp(self)
259 self.sl = locking.SharedLock()
261 def testSequenceAndOwnership(self):
262 self.assert_(not self.sl._is_owned())
263 self.sl.acquire(shared=1)
264 self.assert_(self.sl._is_owned())
265 self.assert_(self.sl._is_owned(shared=1))
266 self.assert_(not self.sl._is_owned(shared=0))
268 self.assert_(not self.sl._is_owned())
270 self.assert_(self.sl._is_owned())
271 self.assert_(not self.sl._is_owned(shared=1))
272 self.assert_(self.sl._is_owned(shared=0))
274 self.assert_(not self.sl._is_owned())
275 self.sl.acquire(shared=1)
276 self.assert_(self.sl._is_owned())
277 self.assert_(self.sl._is_owned(shared=1))
278 self.assert_(not self.sl._is_owned(shared=0))
280 self.assert_(not self.sl._is_owned())
282 def testBooleanValue(self):
283 # semaphores are supposed to return a true value on a successful acquire
284 self.assert_(self.sl.acquire(shared=1))
286 self.assert_(self.sl.acquire())
289 def testDoubleLockingStoE(self):
290 self.sl.acquire(shared=1)
291 self.assertRaises(AssertionError, self.sl.acquire)
293 def testDoubleLockingEtoS(self):
295 self.assertRaises(AssertionError, self.sl.acquire, shared=1)
297 def testDoubleLockingStoS(self):
298 self.sl.acquire(shared=1)
299 self.assertRaises(AssertionError, self.sl.acquire, shared=1)
301 def testDoubleLockingEtoE(self):
303 self.assertRaises(AssertionError, self.sl.acquire)
305 # helper functions: called in a separate thread they acquire the lock, send
306 # their identifier on the done queue, then release it.
307 def _doItSharer(self):
309 self.sl.acquire(shared=1)
312 except errors.LockError:
315 def _doItExclusive(self):
320 except errors.LockError:
323 def _doItDelete(self):
327 except errors.LockError:
330 def testSharersCanCoexist(self):
331 self.sl.acquire(shared=1)
332 threading.Thread(target=self._doItSharer).start()
333 self.assert_(self.done.get(True, 1))
337 def testExclusiveBlocksExclusive(self):
339 self._addThread(target=self._doItExclusive)
340 self.assertRaises(Queue.Empty, self.done.get_nowait)
343 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
346 def testExclusiveBlocksDelete(self):
348 self._addThread(target=self._doItDelete)
349 self.assertRaises(Queue.Empty, self.done.get_nowait)
352 self.failUnlessEqual(self.done.get_nowait(), 'DEL')
353 self.sl = locking.SharedLock()
356 def testExclusiveBlocksSharer(self):
358 self._addThread(target=self._doItSharer)
359 self.assertRaises(Queue.Empty, self.done.get_nowait)
362 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
365 def testSharerBlocksExclusive(self):
366 self.sl.acquire(shared=1)
367 self._addThread(target=self._doItExclusive)
368 self.assertRaises(Queue.Empty, self.done.get_nowait)
371 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
374 def testSharerBlocksDelete(self):
375 self.sl.acquire(shared=1)
376 self._addThread(target=self._doItDelete)
377 self.assertRaises(Queue.Empty, self.done.get_nowait)
380 self.failUnlessEqual(self.done.get_nowait(), 'DEL')
381 self.sl = locking.SharedLock()
384 def testWaitingExclusiveBlocksSharer(self):
385 """SKIPPED testWaitingExclusiveBlockSharer"""
388 self.sl.acquire(shared=1)
389 # the lock is acquired in shared mode...
390 self._addThread(target=self._doItExclusive)
391 # ...but now an exclusive is waiting...
392 self._addThread(target=self._doItSharer)
393 # ...so the sharer should be blocked as well
394 self.assertRaises(Queue.Empty, self.done.get_nowait)
397 # The exclusive passed before
398 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
399 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
402 def testWaitingSharerBlocksExclusive(self):
403 """SKIPPED testWaitingSharerBlocksExclusive"""
407 # the lock is acquired in exclusive mode...
408 self._addThread(target=self._doItSharer)
409 # ...but now a sharer is waiting...
410 self._addThread(target=self._doItExclusive)
411 # ...the exclusive is waiting too...
412 self.assertRaises(Queue.Empty, self.done.get_nowait)
415 # The sharer passed before
416 self.assertEqual(self.done.get_nowait(), 'SHR')
417 self.assertEqual(self.done.get_nowait(), 'EXC')
419 def testDelete(self):
421 self.assertRaises(errors.LockError, self.sl.acquire)
422 self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
423 self.assertRaises(errors.LockError, self.sl.delete)
425 def testDeleteTimeout(self):
426 self.sl.delete(timeout=60)
428 def testNoDeleteIfSharer(self):
429 self.sl.acquire(shared=1)
430 self.assertRaises(AssertionError, self.sl.delete)
433 def testDeletePendingSharersExclusiveDelete(self):
435 self._addThread(target=self._doItSharer)
436 self._addThread(target=self._doItSharer)
437 self._addThread(target=self._doItExclusive)
438 self._addThread(target=self._doItDelete)
441 # The threads who were pending return ERR
443 self.assertEqual(self.done.get_nowait(), 'ERR')
444 self.sl = locking.SharedLock()
447 def testDeletePendingDeleteExclusiveSharers(self):
449 self._addThread(target=self._doItDelete)
450 self._addThread(target=self._doItExclusive)
451 self._addThread(target=self._doItSharer)
452 self._addThread(target=self._doItSharer)
455 # The two threads who were pending return both ERR
456 self.assertEqual(self.done.get_nowait(), 'ERR')
457 self.assertEqual(self.done.get_nowait(), 'ERR')
458 self.assertEqual(self.done.get_nowait(), 'ERR')
459 self.assertEqual(self.done.get_nowait(), 'ERR')
460 self.sl = locking.SharedLock()
463 def testExclusiveAcquireTimeout(self):
464 for shared in [0, 1]:
465 on_queue = threading.Event()
466 release_exclusive = threading.Event()
468 def _LockExclusive():
469 self.sl.acquire(shared=0, test_notify=on_queue.set)
470 self.done.put("A: start wait")
471 release_exclusive.wait()
472 self.done.put("A: end wait")
475 # Start thread to hold lock in exclusive mode
476 self._addThread(target=_LockExclusive)
478 # Wait for wait to begin
479 self.assertEqual(self.done.get(timeout=60), "A: start wait")
481 # Wait up to 60s to get lock, but release exclusive lock as soon as we're
483 self.failUnless(self.sl.acquire(shared=shared, timeout=60,
484 test_notify=release_exclusive.set))
486 self.done.put("got 2nd")
491 self.assertEqual(self.done.get_nowait(), "A: end wait")
492 self.assertEqual(self.done.get_nowait(), "got 2nd")
493 self.assertRaises(Queue.Empty, self.done.get_nowait)
496 def testAcquireExpiringTimeout(self):
497 def _AcquireWithTimeout(shared, timeout):
498 if not self.sl.acquire(shared=shared, timeout=timeout):
499 self.done.put("timeout")
501 for shared in [0, 1]:
505 # Start shared acquires with timeout between 0 and 20 ms
507 self._addThread(target=_AcquireWithTimeout,
508 args=(shared, i * 2.0 / 1000.0))
510 # Wait for threads to finish (makes sure the acquire timeout expires
511 # before releasing the lock)
518 self.assertEqual(self.done.get_nowait(), "timeout")
520 self.assertRaises(Queue.Empty, self.done.get_nowait)
523 def testSharedSkipExclusiveAcquires(self):
524 # Tests whether shared acquires jump in front of exclusive acquires in the
527 def _Acquire(shared, name, notify_ev, wait_ev):
529 notify_fn = notify_ev.set
536 if not self.sl.acquire(shared=shared, test_notify=notify_fn):
542 # Get exclusive lock while we fill the queue
550 # Add acquires using threading.Event for synchronization. They'll be
551 # acquired exactly in the order defined in this list.
552 acquires = (shrcnt1 * [(1, "shared 1")] +
553 3 * [(0, "exclusive 1")] +
554 shrcnt2 * [(1, "shared 2")] +
555 shrcnt3 * [(1, "shared 3")] +
556 shrcnt4 * [(1, "shared 4")] +
557 3 * [(0, "exclusive 2")])
562 for args in acquires:
563 ev_cur = threading.Event()
564 self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
567 # Wait for last acquire to start
570 # Expect 6 pending exclusive acquires and 1 for all shared acquires
572 self.assertEqual(self.sl._count_pending(), 7)
574 # Release exclusive lock and wait
580 for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
581 # Shared locks aren't guaranteed to be notified in order, but they'll be
583 tmp = self.done.get_nowait()
584 if tmp == "shared 1":
586 elif tmp == "shared 2":
588 elif tmp == "shared 3":
590 elif tmp == "shared 4":
592 self.assertEqual(shrcnt1, 0)
593 self.assertEqual(shrcnt2, 0)
594 self.assertEqual(shrcnt3, 0)
595 self.assertEqual(shrcnt3, 0)
598 self.assertEqual(self.done.get_nowait(), "exclusive 1")
601 self.assertEqual(self.done.get_nowait(), "exclusive 2")
603 self.assertRaises(Queue.Empty, self.done.get_nowait)
606 def testMixedAcquireTimeout(self):
607 sync = threading.Event()
609 def _AcquireShared(ev):
610 if not self.sl.acquire(shared=1, timeout=None):
613 self.done.put("shared")
618 # Wait for notification from main thread
626 ev = threading.Event()
627 self._addThread(target=_AcquireShared, args=(ev, ))
630 # Wait for all acquires to finish
634 self.assertEqual(self.sl._count_pending(), 0)
636 # Try to get exclusive lock
637 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
639 # Acquire exclusive without timeout
640 exclsync = threading.Event()
641 exclev = threading.Event()
643 def _AcquireExclusive():
644 if not self.sl.acquire(shared=0):
647 self.done.put("exclusive")
652 # Wait for notification from main thread
657 self._addThread(target=_AcquireExclusive)
659 # Try to get exclusive lock
660 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
662 # Make all shared holders release their locks
665 # Wait for exclusive acquire to succeed
668 self.assertEqual(self.sl._count_pending(), 0)
670 # Try to get exclusive lock
671 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
673 def _AcquireSharedSimple():
674 if self.sl.acquire(shared=1, timeout=None):
675 self.done.put("shared2")
679 self._addThread(target=_AcquireSharedSimple)
681 # Tell exclusive lock to release
684 # Wait for everything to finish
687 self.assertEqual(self.sl._count_pending(), 0)
691 self.assertEqual(self.done.get_nowait(), "shared")
693 self.assertEqual(self.done.get_nowait(), "exclusive")
696 self.assertEqual(self.done.get_nowait(), "shared2")
698 self.assertRaises(Queue.Empty, self.done.get_nowait)
701 class TestSharedLockInCondition(_ThreadedTestCase):
702 """SharedLock as a condition lock tests"""
705 _ThreadedTestCase.setUp(self)
706 self.sl = locking.SharedLock()
709 def setCondition(self):
710 self.cond = threading.Condition(self.sl)
712 def testKeepMode(self):
713 self.cond.acquire(shared=1)
714 self.assert_(self.sl._is_owned(shared=1))
716 self.assert_(self.sl._is_owned(shared=1))
718 self.cond.acquire(shared=0)
719 self.assert_(self.sl._is_owned(shared=0))
721 self.assert_(self.sl._is_owned(shared=0))
725 class TestSharedLockInPipeCondition(TestSharedLockInCondition):
726 """SharedLock as a pipe condition lock tests"""
728 def setCondition(self):
729 self.cond = locking.PipeCondition(self.sl)
732 class TestSSynchronizedDecorator(_ThreadedTestCase):
733 """Shared Lock Synchronized decorator test"""
736 _ThreadedTestCase.setUp(self)
738 @locking.ssynchronized(_decoratorlock)
739 def _doItExclusive(self):
740 self.assert_(_decoratorlock._is_owned())
743 @locking.ssynchronized(_decoratorlock, shared=1)
744 def _doItSharer(self):
745 self.assert_(_decoratorlock._is_owned(shared=1))
748 def testDecoratedFunctions(self):
749 self._doItExclusive()
750 self.assert_(not _decoratorlock._is_owned())
752 self.assert_(not _decoratorlock._is_owned())
754 def testSharersCanCoexist(self):
755 _decoratorlock.acquire(shared=1)
756 threading.Thread(target=self._doItSharer).start()
757 self.assert_(self.done.get(True, 1))
758 _decoratorlock.release()
761 def testExclusiveBlocksExclusive(self):
762 _decoratorlock.acquire()
763 self._addThread(target=self._doItExclusive)
764 # give it a bit of time to check that it's not actually doing anything
765 self.assertRaises(Queue.Empty, self.done.get_nowait)
766 _decoratorlock.release()
768 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
771 def testExclusiveBlocksSharer(self):
772 _decoratorlock.acquire()
773 self._addThread(target=self._doItSharer)
774 self.assertRaises(Queue.Empty, self.done.get_nowait)
775 _decoratorlock.release()
777 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
780 def testSharerBlocksExclusive(self):
781 _decoratorlock.acquire(shared=1)
782 self._addThread(target=self._doItExclusive)
783 self.assertRaises(Queue.Empty, self.done.get_nowait)
784 _decoratorlock.release()
786 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
789 class TestLockSet(_ThreadedTestCase):
793 _ThreadedTestCase.setUp(self)
797 """Helper to (re)initialize the lock set"""
798 self.resources = ['one', 'two', 'three']
799 self.ls = locking.LockSet(members=self.resources)
801 def testResources(self):
802 self.assertEquals(self.ls._names(), set(self.resources))
803 newls = locking.LockSet()
804 self.assertEquals(newls._names(), set())
806 def testAcquireRelease(self):
807 self.assert_(self.ls.acquire('one'))
808 self.assertEquals(self.ls._list_owned(), set(['one']))
810 self.assertEquals(self.ls._list_owned(), set())
811 self.assertEquals(self.ls.acquire(['one']), set(['one']))
812 self.assertEquals(self.ls._list_owned(), set(['one']))
814 self.assertEquals(self.ls._list_owned(), set())
815 self.ls.acquire(['one', 'two', 'three'])
816 self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
817 self.ls.release('one')
818 self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
819 self.ls.release(['three'])
820 self.assertEquals(self.ls._list_owned(), set(['two']))
822 self.assertEquals(self.ls._list_owned(), set())
823 self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
824 self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
826 self.assertEquals(self.ls._list_owned(), set())
828 def testNoDoubleAcquire(self):
829 self.ls.acquire('one')
830 self.assertRaises(AssertionError, self.ls.acquire, 'one')
831 self.assertRaises(AssertionError, self.ls.acquire, ['two'])
832 self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
834 self.ls.acquire(['one', 'three'])
835 self.ls.release('one')
836 self.assertRaises(AssertionError, self.ls.acquire, ['two'])
837 self.ls.release('three')
839 def testNoWrongRelease(self):
840 self.assertRaises(AssertionError, self.ls.release)
841 self.ls.acquire('one')
842 self.assertRaises(AssertionError, self.ls.release, 'two')
844 def testAddRemove(self):
846 self.assertEquals(self.ls._list_owned(), set())
847 self.assert_('four' in self.ls._names())
848 self.ls.add(['five', 'six', 'seven'], acquired=1)
849 self.assert_('five' in self.ls._names())
850 self.assert_('six' in self.ls._names())
851 self.assert_('seven' in self.ls._names())
852 self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
853 self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
854 self.assert_('five' not in self.ls._names())
855 self.assert_('six' not in self.ls._names())
856 self.assertEquals(self.ls._list_owned(), set(['seven']))
857 self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
858 self.ls.remove('seven')
859 self.assert_('seven' not in self.ls._names())
860 self.assertEquals(self.ls._list_owned(), set([]))
861 self.ls.acquire(None, shared=1)
862 self.assertRaises(AssertionError, self.ls.add, 'eight')
864 self.ls.acquire(None)
865 self.ls.add('eight', acquired=1)
866 self.assert_('eight' in self.ls._names())
867 self.assert_('eight' in self.ls._list_owned())
869 self.assert_('nine' in self.ls._names())
870 self.assert_('nine' not in self.ls._list_owned())
872 self.ls.remove(['two'])
873 self.assert_('two' not in self.ls._names())
874 self.ls.acquire('three')
875 self.assertEquals(self.ls.remove(['three']), ['three'])
876 self.assert_('three' not in self.ls._names())
877 self.assertEquals(self.ls.remove('three'), [])
878 self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
879 self.assert_('one' not in self.ls._names())
881 def testRemoveNonBlocking(self):
882 self.ls.acquire('one')
883 self.assertEquals(self.ls.remove('one'), ['one'])
884 self.ls.acquire(['two', 'three'])
885 self.assertEquals(self.ls.remove(['two', 'three']),
888 def testNoDoubleAdd(self):
889 self.assertRaises(errors.LockError, self.ls.add, 'two')
891 self.assertRaises(errors.LockError, self.ls.add, 'four')
893 def testNoWrongRemoves(self):
894 self.ls.acquire(['one', 'three'], shared=1)
895 # Cannot remove 'two' while holding something which is not a superset
896 self.assertRaises(AssertionError, self.ls.remove, 'two')
897 # Cannot remove 'three' as we are sharing it
898 self.assertRaises(AssertionError, self.ls.remove, 'three')
900 def testAcquireSetLock(self):
901 # acquire the set-lock exclusively
902 self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
903 self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
904 self.assertEquals(self.ls._is_owned(), True)
905 self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
906 # I can still add/remove elements...
907 self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
908 self.assert_(self.ls.add('six'))
911 self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
912 # adding new elements is not possible
913 self.assertRaises(AssertionError, self.ls.add, 'five')
916 def testAcquireWithRepetitions(self):
917 self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
918 set(['two', 'two', 'three']))
919 self.ls.release(['two', 'two'])
920 self.assertEquals(self.ls._list_owned(), set(['three']))
922 def testEmptyAcquire(self):
923 # Acquire an empty list of locks...
924 self.assertEquals(self.ls.acquire([]), set())
925 self.assertEquals(self.ls._list_owned(), set())
926 # New locks can still be addded
927 self.assert_(self.ls.add('six'))
928 # "re-acquiring" is not an issue, since we had really acquired nothing
929 self.assertEquals(self.ls.acquire([], shared=1), set())
930 self.assertEquals(self.ls._list_owned(), set())
931 # We haven't really acquired anything, so we cannot release
932 self.assertRaises(AssertionError, self.ls.release)
934 def _doLockSet(self, names, shared):
936 self.ls.acquire(names, shared=shared)
937 self.done.put('DONE')
939 except errors.LockError:
942 def _doAddSet(self, names):
944 self.ls.add(names, acquired=1)
945 self.done.put('DONE')
947 except errors.LockError:
950 def _doRemoveSet(self, names):
951 self.done.put(self.ls.remove(names))
954 def testConcurrentSharedAcquire(self):
955 self.ls.acquire(['one', 'two'], shared=1)
956 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
958 self.assertEqual(self.done.get_nowait(), 'DONE')
959 self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
961 self.assertEqual(self.done.get_nowait(), 'DONE')
962 self._addThread(target=self._doLockSet, args=('three', 1))
964 self.assertEqual(self.done.get_nowait(), 'DONE')
965 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
966 self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
967 self.assertRaises(Queue.Empty, self.done.get_nowait)
970 self.assertEqual(self.done.get_nowait(), 'DONE')
971 self.assertEqual(self.done.get_nowait(), 'DONE')
974 def testConcurrentExclusiveAcquire(self):
975 self.ls.acquire(['one', 'two'])
976 self._addThread(target=self._doLockSet, args=('three', 1))
978 self.assertEqual(self.done.get_nowait(), 'DONE')
979 self._addThread(target=self._doLockSet, args=('three', 0))
981 self.assertEqual(self.done.get_nowait(), 'DONE')
982 self.assertRaises(Queue.Empty, self.done.get_nowait)
983 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
984 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
985 self._addThread(target=self._doLockSet, args=('one', 0))
986 self._addThread(target=self._doLockSet, args=('one', 1))
987 self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
988 self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
989 self.assertRaises(Queue.Empty, self.done.get_nowait)
993 self.failUnlessEqual(self.done.get_nowait(), 'DONE')
996 def testSimpleAcquireTimeoutExpiring(self):
997 names = sorted(self.ls._names())
998 self.assert_(len(names) >= 3)
1000 # Get name of first lock
1003 # Get name of last lock
1007 # Block first and try to lock it again
1010 # Block last and try to lock all locks
1013 # Block last and try to lock it again
1017 for (wanted, block) in checks:
1018 # Lock in exclusive mode
1019 self.assert_(self.ls.acquire(block, shared=0))
1022 # Try to get the same lock again with a timeout (should never succeed)
1023 acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1025 self.done.put("acquired")
1028 self.assert_(acquired is None)
1029 self.assert_(not self.ls._list_owned())
1030 self.assert_(not self.ls._is_owned())
1031 self.done.put("not acquired")
1033 self._addThread(target=_AcquireOne)
1035 # Wait for timeout in thread to expire
1038 # Release exclusive lock again
1041 self.assertEqual(self.done.get_nowait(), "not acquired")
1042 self.assertRaises(Queue.Empty, self.done.get_nowait)
1045 def testDelayedAndExpiringLockAcquire(self):
1047 self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1049 for expire in (False, True):
1050 names = sorted(self.ls._names())
1051 self.assertEqual(len(names), 8)
1053 lock_ev = dict([(i, threading.Event()) for i in names])
1055 # Lock all in exclusive mode
1056 self.assert_(self.ls.acquire(names, shared=0))
1059 # We'll wait at least 300ms per lock
1060 lockwait = len(names) * [0.3]
1062 # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1063 # this gives us up to 2.4s to fail.
1064 lockall_timeout = 0.4
1066 # This should finish rather quickly
1068 lockall_timeout = len(names) * 5.0
1071 def acquire_notification(name):
1073 self.done.put("getting %s" % name)
1078 if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1079 test_notify=acquire_notification):
1080 self.done.put("got all")
1083 self.done.put("timeout on all")
1086 for ev in lock_ev.values():
1089 t = self._addThread(target=_LockAll)
1091 for idx, name in enumerate(names):
1092 # Wait for actual acquire on this lock to start
1093 lock_ev[name].wait(10.0)
1095 if expire and t.isAlive():
1096 # Wait some time after getting the notification to make sure the lock
1097 # acquire will expire
1098 SafeSleep(lockwait[idx])
1100 self.ls.release(names=name)
1102 self.assert_(not self.ls._list_owned())
1107 # Not checking which locks were actually acquired. Doing so would be
1108 # too timing-dependant.
1109 self.assertEqual(self.done.get_nowait(), "timeout on all")
1112 self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1113 self.assertEqual(self.done.get_nowait(), "got all")
1114 self.assertRaises(Queue.Empty, self.done.get_nowait)
1117 def testConcurrentRemove(self):
1119 self.ls.acquire(['one', 'two', 'four'])
1120 self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1121 self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1122 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1123 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1124 self.assertRaises(Queue.Empty, self.done.get_nowait)
1125 self.ls.remove('one')
1129 self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1130 self.ls.add(['five', 'six'], acquired=1)
1131 self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1132 self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1133 self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1134 self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1135 self.ls.remove('five')
1139 self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1140 self.ls.acquire(['three', 'four'])
1141 self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1142 self.assertRaises(Queue.Empty, self.done.get_nowait)
1143 self.ls.remove('four')
1145 self.assertEqual(self.done.get_nowait(), ['six'])
1146 self._addThread(target=self._doRemoveSet, args=(['two']))
1148 self.assertEqual(self.done.get_nowait(), ['two'])
1154 def testConcurrentSharedSetLock(self):
1155 # share the set-lock...
1156 self.ls.acquire(None, shared=1)
1157 # ...another thread can share it too
1158 self._addThread(target=self._doLockSet, args=(None, 1))
1160 self.assertEqual(self.done.get_nowait(), 'DONE')
1161 # ...or just share some elements
1162 self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1164 self.assertEqual(self.done.get_nowait(), 'DONE')
1165 # ...but not add new ones or remove any
1166 t = self._addThread(target=self._doAddSet, args=(['nine']))
1167 self._addThread(target=self._doRemoveSet, args=(['two'], ))
1168 self.assertRaises(Queue.Empty, self.done.get_nowait)
1169 # this just releases the set-lock
1172 self.assertEqual(self.done.get_nowait(), 'DONE')
1173 # release the lock on the actual elements so remove() can proceed too
1176 self.failUnlessEqual(self.done.get_nowait(), ['two'])
1181 def testConcurrentExclusiveSetLock(self):
1182 # acquire the set-lock...
1183 self.ls.acquire(None, shared=0)
1184 # ...no one can do anything else
1185 self._addThread(target=self._doLockSet, args=(None, 1))
1186 self._addThread(target=self._doLockSet, args=(None, 0))
1187 self._addThread(target=self._doLockSet, args=(['three'], 0))
1188 self._addThread(target=self._doLockSet, args=(['two'], 1))
1189 self._addThread(target=self._doAddSet, args=(['nine']))
1190 self.assertRaises(Queue.Empty, self.done.get_nowait)
1194 self.assertEqual(self.done.get(True, 1), 'DONE')
1199 def testConcurrentSetLockAdd(self):
1200 self.ls.acquire('one')
1201 # Another thread wants the whole SetLock
1202 self._addThread(target=self._doLockSet, args=(None, 0))
1203 self._addThread(target=self._doLockSet, args=(None, 1))
1204 self.assertRaises(Queue.Empty, self.done.get_nowait)
1205 self.assertRaises(AssertionError, self.ls.add, 'four')
1208 self.assertEqual(self.done.get_nowait(), 'DONE')
1209 self.assertEqual(self.done.get_nowait(), 'DONE')
1210 self.ls.acquire(None)
1211 self._addThread(target=self._doLockSet, args=(None, 0))
1212 self._addThread(target=self._doLockSet, args=(None, 1))
1213 self.assertRaises(Queue.Empty, self.done.get_nowait)
1215 self.ls.add('five', acquired=1)
1216 self.ls.add('six', acquired=1, shared=1)
1217 self.assertEquals(self.ls._list_owned(),
1218 set(['one', 'two', 'three', 'five', 'six']))
1219 self.assertEquals(self.ls._is_owned(), True)
1220 self.assertEquals(self.ls._names(),
1221 set(['one', 'two', 'three', 'four', 'five', 'six']))
1224 self.assertEqual(self.done.get_nowait(), 'DONE')
1225 self.assertEqual(self.done.get_nowait(), 'DONE')
1229 def testEmptyLockSet(self):
1231 self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1233 self.ls.remove(['one', 'two', 'three'])
1234 # and adds/locks by another thread still wait
1235 self._addThread(target=self._doAddSet, args=(['nine']))
1236 self._addThread(target=self._doLockSet, args=(None, 1))
1237 self._addThread(target=self._doLockSet, args=(None, 0))
1238 self.assertRaises(Queue.Empty, self.done.get_nowait)
1242 self.assertEqual(self.done.get_nowait(), 'DONE')
1244 self.assertEqual(self.ls.remove(['nine']), ['nine'])
1246 self.assertEqual(self.ls.acquire(None, shared=1), set())
1247 # other sharers can go, adds still wait
1248 self._addThread(target=self._doLockSet, args=(None, 1))
1250 self.assertEqual(self.done.get_nowait(), 'DONE')
1251 self._addThread(target=self._doAddSet, args=(['nine']))
1252 self.assertRaises(Queue.Empty, self.done.get_nowait)
1255 self.assertEqual(self.done.get_nowait(), 'DONE')
1259 class TestGanetiLockManager(_ThreadedTestCase):
1262 _ThreadedTestCase.setUp(self)
1263 self.nodes=['n1', 'n2']
1264 self.instances=['i1', 'i2', 'i3']
1265 self.GL = locking.GanetiLockManager(nodes=self.nodes,
1266 instances=self.instances)
1269 # Don't try this at home...
1270 locking.GanetiLockManager._instance = None
1272 def testLockingConstants(self):
1273 # The locking library internally cheats by assuming its constants have some
1274 # relationships with each other. Check those hold true.
1275 # This relationship is also used in the Processor to recursively acquire
1276 # the right locks. Again, please don't break it.
1277 for i in range(len(locking.LEVELS)):
1278 self.assertEqual(i, locking.LEVELS[i])
1280 def testDoubleGLFails(self):
1281 self.assertRaises(AssertionError, locking.GanetiLockManager)
1283 def testLockNames(self):
1284 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1285 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1286 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1287 set(self.instances))
1289 def testInitAndResources(self):
1290 locking.GanetiLockManager._instance = None
1291 self.GL = locking.GanetiLockManager()
1292 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1293 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1294 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1296 locking.GanetiLockManager._instance = None
1297 self.GL = locking.GanetiLockManager(nodes=self.nodes)
1298 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1299 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1300 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1302 locking.GanetiLockManager._instance = None
1303 self.GL = locking.GanetiLockManager(instances=self.instances)
1304 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1305 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1306 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1307 set(self.instances))
1309 def testAcquireRelease(self):
1310 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1311 self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1312 self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1313 self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1314 self.GL.release(locking.LEVEL_NODE, ['n2'])
1315 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1316 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1317 self.GL.release(locking.LEVEL_NODE)
1318 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1319 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1320 self.GL.release(locking.LEVEL_INSTANCE)
1321 self.assertRaises(errors.LockError, self.GL.acquire,
1322 locking.LEVEL_INSTANCE, ['i5'])
1323 self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1324 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1326 def testAcquireWholeSets(self):
1327 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1328 self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1329 set(self.instances))
1330 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1331 set(self.instances))
1332 self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1334 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1336 self.GL.release(locking.LEVEL_NODE)
1337 self.GL.release(locking.LEVEL_INSTANCE)
1338 self.GL.release(locking.LEVEL_CLUSTER)
1340 def testAcquireWholeAndPartial(self):
1341 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1342 self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1343 set(self.instances))
1344 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1345 set(self.instances))
1346 self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1348 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1350 self.GL.release(locking.LEVEL_NODE)
1351 self.GL.release(locking.LEVEL_INSTANCE)
1352 self.GL.release(locking.LEVEL_CLUSTER)
1354 def testBGLDependency(self):
1355 self.assertRaises(AssertionError, self.GL.acquire,
1356 locking.LEVEL_NODE, ['n1', 'n2'])
1357 self.assertRaises(AssertionError, self.GL.acquire,
1358 locking.LEVEL_INSTANCE, ['i3'])
1359 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1360 self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1361 self.assertRaises(AssertionError, self.GL.release,
1362 locking.LEVEL_CLUSTER, ['BGL'])
1363 self.assertRaises(AssertionError, self.GL.release,
1364 locking.LEVEL_CLUSTER)
1365 self.GL.release(locking.LEVEL_NODE)
1366 self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1367 self.assertRaises(AssertionError, self.GL.release,
1368 locking.LEVEL_CLUSTER, ['BGL'])
1369 self.assertRaises(AssertionError, self.GL.release,
1370 locking.LEVEL_CLUSTER)
1371 self.GL.release(locking.LEVEL_INSTANCE)
1373 def testWrongOrder(self):
1374 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1375 self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1376 self.assertRaises(AssertionError, self.GL.acquire,
1377 locking.LEVEL_NODE, ['n1'])
1378 self.assertRaises(AssertionError, self.GL.acquire,
1379 locking.LEVEL_INSTANCE, ['i2'])
1381 # Helper function to run as a thread that shared the BGL and then acquires
1382 # some locks at another level.
1383 def _doLock(self, level, names, shared):
1385 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1386 self.GL.acquire(level, names, shared=shared)
1387 self.done.put('DONE')
1388 self.GL.release(level)
1389 self.GL.release(locking.LEVEL_CLUSTER)
1390 except errors.LockError:
1391 self.done.put('ERR')
1394 def testConcurrency(self):
1395 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1396 self._addThread(target=self._doLock,
1397 args=(locking.LEVEL_INSTANCE, 'i1', 1))
1399 self.assertEqual(self.done.get_nowait(), 'DONE')
1400 self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1401 self._addThread(target=self._doLock,
1402 args=(locking.LEVEL_INSTANCE, 'i1', 1))
1404 self.assertEqual(self.done.get_nowait(), 'DONE')
1405 self._addThread(target=self._doLock,
1406 args=(locking.LEVEL_INSTANCE, 'i3', 1))
1407 self.assertRaises(Queue.Empty, self.done.get_nowait)
1408 self.GL.release(locking.LEVEL_INSTANCE)
1410 self.assertEqual(self.done.get_nowait(), 'DONE')
1411 self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1412 self._addThread(target=self._doLock,
1413 args=(locking.LEVEL_INSTANCE, 'i2', 1))
1415 self.assertEqual(self.done.get_nowait(), 'DONE')
1416 self._addThread(target=self._doLock,
1417 args=(locking.LEVEL_INSTANCE, 'i2', 0))
1418 self.assertRaises(Queue.Empty, self.done.get_nowait)
1419 self.GL.release(locking.LEVEL_INSTANCE)
1421 self.assertEqual(self.done.get(True, 1), 'DONE')
1422 self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1425 if __name__ == '__main__':
1426 testutils.GanetiTestProgram()