4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the locking module"""
31 from ganeti import locking
32 from ganeti import errors
37 # This is used to test the ssynchronize decorator.
38 # Since it's passed as input to a decorator it must be declared as a global.
39 _decoratorlock = locking.SharedLock()
41 #: List for looping tests
46 """Decorator for executing a function many times"""
47 def wrapper(*args, **kwargs):
53 def SafeSleep(duration):
56 delay = start + duration - time.time()
62 class _ThreadedTestCase(unittest.TestCase):
63 """Test class that supports adding/waiting on threads"""
65 unittest.TestCase.setUp(self)
66 self.done = Queue.Queue(0)
69 def _addThread(self, *args, **kwargs):
70 """Create and remember a new thread"""
71 t = threading.Thread(*args, **kwargs)
72 self.threads.append(t)
76 def _waitThreads(self):
77 """Wait for all our threads to finish"""
78 for t in self.threads:
80 self.failIf(t.isAlive())
84 class _ConditionTestCase(_ThreadedTestCase):
85 """Common test case for conditions"""
88 _ThreadedTestCase.setUp(self)
89 self.lock = threading.Lock()
90 self.cond = cls(self.lock)
92 def _testAcquireRelease(self):
93 self.assert_(not self.cond._is_owned())
94 self.assertRaises(RuntimeError, self.cond.wait)
95 self.assertRaises(RuntimeError, self.cond.notifyAll)
98 self.assert_(self.cond._is_owned())
100 self.assert_(self.cond._is_owned())
103 self.assert_(not self.cond._is_owned())
104 self.assertRaises(RuntimeError, self.cond.wait)
105 self.assertRaises(RuntimeError, self.cond.notifyAll)
107 def _testNotification(self):
112 self.cond.notifyAll()
117 self._addThread(target=_NotifyAll)
118 self.assertEqual(self.done.get(True, 1), "NE")
119 self.assertRaises(Queue.Empty, self.done.get_nowait)
121 self.assertEqual(self.done.get(True, 1), "NA")
122 self.assertEqual(self.done.get(True, 1), "NN")
123 self.assert_(self.cond._is_owned())
125 self.assert_(not self.cond._is_owned())
128 class TestSingleNotifyPipeCondition(_ConditionTestCase):
129 """SingleNotifyPipeCondition tests"""
132 _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
134 def testAcquireRelease(self):
135 self._testAcquireRelease()
137 def testNotification(self):
138 self._testNotification()
140 def testWaitReuse(self):
146 def testNoNotifyReuse(self):
148 self.cond.notifyAll()
149 self.assertRaises(RuntimeError, self.cond.wait)
150 self.assertRaises(RuntimeError, self.cond.notifyAll)
154 class TestPipeCondition(_ConditionTestCase):
155 """PipeCondition tests"""
158 _ConditionTestCase.setUp(self, locking.PipeCondition)
160 def testAcquireRelease(self):
161 self._testAcquireRelease()
163 def testNotification(self):
164 self._testNotification()
166 def _TestWait(self, fn):
167 self._addThread(target=fn)
168 self._addThread(target=fn)
169 self._addThread(target=fn)
171 # Wait for threads to be waiting
172 self.assertEqual(self.done.get(True, 1), "A")
173 self.assertEqual(self.done.get(True, 1), "A")
174 self.assertEqual(self.done.get(True, 1), "A")
176 self.assertRaises(Queue.Empty, self.done.get_nowait)
179 self.assertEqual(self.cond._nwaiters, 3)
180 # This new thread can"t acquire the lock, and thus call wait, before we
182 self._addThread(target=fn)
183 self.cond.notifyAll()
184 self.assertRaises(Queue.Empty, self.done.get_nowait)
187 # We should now get 3 W and 1 A (for the new thread) in whatever order
191 got = self.done.get(True, 1)
197 self.fail("Got %s on the done queue" % got)
199 self.assertEqual(w, 3)
200 self.assertEqual(a, 1)
203 self.cond.notifyAll()
206 self.assertEqual(self.done.get_nowait(), "W")
207 self.assertRaises(Queue.Empty, self.done.get_nowait)
209 def testBlockingWait(self):
217 self._TestWait(_BlockingWait)
219 def testLongTimeoutWait(self):
227 self._TestWait(_Helper)
229 def _TimeoutWait(self, timeout, check):
231 self.cond.wait(timeout)
235 def testShortTimeoutWait(self):
236 self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
237 self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
239 self.assertEqual(self.done.get_nowait(), "T1")
240 self.assertEqual(self.done.get_nowait(), "T1")
241 self.assertRaises(Queue.Empty, self.done.get_nowait)
243 def testZeroTimeoutWait(self):
244 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
245 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
246 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
248 self.assertEqual(self.done.get_nowait(), "T0")
249 self.assertEqual(self.done.get_nowait(), "T0")
250 self.assertEqual(self.done.get_nowait(), "T0")
251 self.assertRaises(Queue.Empty, self.done.get_nowait)
254 class TestSharedLock(_ThreadedTestCase):
255 """SharedLock tests"""
258 _ThreadedTestCase.setUp(self)
259 self.sl = locking.SharedLock()
261 def testSequenceAndOwnership(self):
262 self.assert_(not self.sl._is_owned())
263 self.sl.acquire(shared=1)
264 self.assert_(self.sl._is_owned())
265 self.assert_(self.sl._is_owned(shared=1))
266 self.assert_(not self.sl._is_owned(shared=0))
268 self.assert_(not self.sl._is_owned())
270 self.assert_(self.sl._is_owned())
271 self.assert_(not self.sl._is_owned(shared=1))
272 self.assert_(self.sl._is_owned(shared=0))
274 self.assert_(not self.sl._is_owned())
275 self.sl.acquire(shared=1)
276 self.assert_(self.sl._is_owned())
277 self.assert_(self.sl._is_owned(shared=1))
278 self.assert_(not self.sl._is_owned(shared=0))
280 self.assert_(not self.sl._is_owned())
282 def testBooleanValue(self):
283 # semaphores are supposed to return a true value on a successful acquire
284 self.assert_(self.sl.acquire(shared=1))
286 self.assert_(self.sl.acquire())
289 def testDoubleLockingStoE(self):
290 self.sl.acquire(shared=1)
291 self.assertRaises(AssertionError, self.sl.acquire)
293 def testDoubleLockingEtoS(self):
295 self.assertRaises(AssertionError, self.sl.acquire, shared=1)
297 def testDoubleLockingStoS(self):
298 self.sl.acquire(shared=1)
299 self.assertRaises(AssertionError, self.sl.acquire, shared=1)
301 def testDoubleLockingEtoE(self):
303 self.assertRaises(AssertionError, self.sl.acquire)
305 # helper functions: called in a separate thread they acquire the lock, send
306 # their identifier on the done queue, then release it.
307 def _doItSharer(self):
309 self.sl.acquire(shared=1)
312 except errors.LockError:
315 def _doItExclusive(self):
320 except errors.LockError:
323 def _doItDelete(self):
327 except errors.LockError:
330 def testSharersCanCoexist(self):
331 self.sl.acquire(shared=1)
332 threading.Thread(target=self._doItSharer).start()
333 self.assert_(self.done.get(True, 1))
337 def testExclusiveBlocksExclusive(self):
339 self._addThread(target=self._doItExclusive)
340 self.assertRaises(Queue.Empty, self.done.get_nowait)
343 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
346 def testExclusiveBlocksDelete(self):
348 self._addThread(target=self._doItDelete)
349 self.assertRaises(Queue.Empty, self.done.get_nowait)
352 self.failUnlessEqual(self.done.get_nowait(), 'DEL')
353 self.sl = locking.SharedLock()
356 def testExclusiveBlocksSharer(self):
358 self._addThread(target=self._doItSharer)
359 self.assertRaises(Queue.Empty, self.done.get_nowait)
362 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
365 def testSharerBlocksExclusive(self):
366 self.sl.acquire(shared=1)
367 self._addThread(target=self._doItExclusive)
368 self.assertRaises(Queue.Empty, self.done.get_nowait)
371 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
374 def testSharerBlocksDelete(self):
375 self.sl.acquire(shared=1)
376 self._addThread(target=self._doItDelete)
377 self.assertRaises(Queue.Empty, self.done.get_nowait)
380 self.failUnlessEqual(self.done.get_nowait(), 'DEL')
381 self.sl = locking.SharedLock()
384 def testWaitingExclusiveBlocksSharer(self):
385 """SKIPPED testWaitingExclusiveBlockSharer"""
388 self.sl.acquire(shared=1)
389 # the lock is acquired in shared mode...
390 self._addThread(target=self._doItExclusive)
391 # ...but now an exclusive is waiting...
392 self._addThread(target=self._doItSharer)
393 # ...so the sharer should be blocked as well
394 self.assertRaises(Queue.Empty, self.done.get_nowait)
397 # The exclusive passed before
398 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
399 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
402 def testWaitingSharerBlocksExclusive(self):
403 """SKIPPED testWaitingSharerBlocksExclusive"""
407 # the lock is acquired in exclusive mode...
408 self._addThread(target=self._doItSharer)
409 # ...but now a sharer is waiting...
410 self._addThread(target=self._doItExclusive)
411 # ...the exclusive is waiting too...
412 self.assertRaises(Queue.Empty, self.done.get_nowait)
415 # The sharer passed before
416 self.assertEqual(self.done.get_nowait(), 'SHR')
417 self.assertEqual(self.done.get_nowait(), 'EXC')
419 def testDelete(self):
421 self.assertRaises(errors.LockError, self.sl.acquire)
422 self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
423 self.assertRaises(errors.LockError, self.sl.delete)
425 def testDeleteTimeout(self):
426 self.sl.delete(timeout=60)
428 def testNoDeleteIfSharer(self):
429 self.sl.acquire(shared=1)
430 self.assertRaises(AssertionError, self.sl.delete)
433 def testDeletePendingSharersExclusiveDelete(self):
435 self._addThread(target=self._doItSharer)
436 self._addThread(target=self._doItSharer)
437 self._addThread(target=self._doItExclusive)
438 self._addThread(target=self._doItDelete)
441 # The threads who were pending return ERR
443 self.assertEqual(self.done.get_nowait(), 'ERR')
444 self.sl = locking.SharedLock()
447 def testDeletePendingDeleteExclusiveSharers(self):
449 self._addThread(target=self._doItDelete)
450 self._addThread(target=self._doItExclusive)
451 self._addThread(target=self._doItSharer)
452 self._addThread(target=self._doItSharer)
455 # The two threads who were pending return both ERR
456 self.assertEqual(self.done.get_nowait(), 'ERR')
457 self.assertEqual(self.done.get_nowait(), 'ERR')
458 self.assertEqual(self.done.get_nowait(), 'ERR')
459 self.assertEqual(self.done.get_nowait(), 'ERR')
460 self.sl = locking.SharedLock()
463 def testExclusiveAcquireTimeout(self):
464 for shared in [0, 1]:
465 on_queue = threading.Event()
466 release_exclusive = threading.Event()
468 def _LockExclusive():
469 self.sl.acquire(shared=0, test_notify=on_queue.set)
470 self.done.put("A: start wait")
471 release_exclusive.wait()
472 self.done.put("A: end wait")
475 # Start thread to hold lock in exclusive mode
476 self._addThread(target=_LockExclusive)
478 # Wait for wait to begin
479 self.assertEqual(self.done.get(timeout=60), "A: start wait")
481 # Wait up to 60s to get lock, but release exclusive lock as soon as we're
483 self.failUnless(self.sl.acquire(shared=shared, timeout=60,
484 test_notify=release_exclusive.set))
486 self.done.put("got 2nd")
491 self.assertEqual(self.done.get_nowait(), "A: end wait")
492 self.assertEqual(self.done.get_nowait(), "got 2nd")
493 self.assertRaises(Queue.Empty, self.done.get_nowait)
496 def testAcquireExpiringTimeout(self):
497 def _AcquireWithTimeout(shared, timeout):
498 if not self.sl.acquire(shared=shared, timeout=timeout):
499 self.done.put("timeout")
501 for shared in [0, 1]:
505 # Start shared acquires with timeout between 0 and 20 ms
507 self._addThread(target=_AcquireWithTimeout,
508 args=(shared, i * 2.0 / 1000.0))
510 # Wait for threads to finish (makes sure the acquire timeout expires
511 # before releasing the lock)
518 self.assertEqual(self.done.get_nowait(), "timeout")
520 self.assertRaises(Queue.Empty, self.done.get_nowait)
523 def testSharedSkipExclusiveAcquires(self):
524 # Tests whether shared acquires jump in front of exclusive acquires in the
527 def _Acquire(shared, name, notify_ev, wait_ev):
529 notify_fn = notify_ev.set
536 if not self.sl.acquire(shared=shared, test_notify=notify_fn):
542 # Get exclusive lock while we fill the queue
550 # Add acquires using threading.Event for synchronization. They'll be
551 # acquired exactly in the order defined in this list.
552 acquires = (shrcnt1 * [(1, "shared 1")] +
553 3 * [(0, "exclusive 1")] +
554 shrcnt2 * [(1, "shared 2")] +
555 shrcnt3 * [(1, "shared 3")] +
556 shrcnt4 * [(1, "shared 4")] +
557 3 * [(0, "exclusive 2")])
562 for args in acquires:
563 ev_cur = threading.Event()
564 self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
567 # Wait for last acquire to start
570 # Expect 6 pending exclusive acquires and 1 for all shared acquires
572 self.assertEqual(self.sl._count_pending(), 7)
574 # Release exclusive lock and wait
580 for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
581 # Shared locks aren't guaranteed to be notified in order, but they'll be
583 tmp = self.done.get_nowait()
584 if tmp == "shared 1":
586 elif tmp == "shared 2":
588 elif tmp == "shared 3":
590 elif tmp == "shared 4":
592 self.assertEqual(shrcnt1, 0)
593 self.assertEqual(shrcnt2, 0)
594 self.assertEqual(shrcnt3, 0)
595 self.assertEqual(shrcnt3, 0)
598 self.assertEqual(self.done.get_nowait(), "exclusive 1")
601 self.assertEqual(self.done.get_nowait(), "exclusive 2")
603 self.assertRaises(Queue.Empty, self.done.get_nowait)
606 def testMixedAcquireTimeout(self):
607 sync = threading.Event()
609 def _AcquireShared(ev):
610 if not self.sl.acquire(shared=1, timeout=None):
613 self.done.put("shared")
618 # Wait for notification from main thread
626 ev = threading.Event()
627 self._addThread(target=_AcquireShared, args=(ev, ))
630 # Wait for all acquires to finish
634 self.assertEqual(self.sl._count_pending(), 0)
636 # Try to get exclusive lock
637 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
639 # Acquire exclusive without timeout
640 exclsync = threading.Event()
641 exclev = threading.Event()
643 def _AcquireExclusive():
644 if not self.sl.acquire(shared=0):
647 self.done.put("exclusive")
652 # Wait for notification from main thread
657 self._addThread(target=_AcquireExclusive)
659 # Try to get exclusive lock
660 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
662 # Make all shared holders release their locks
665 # Wait for exclusive acquire to succeed
668 self.assertEqual(self.sl._count_pending(), 0)
670 # Try to get exclusive lock
671 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
673 def _AcquireSharedSimple():
674 if self.sl.acquire(shared=1, timeout=None):
675 self.done.put("shared2")
679 self._addThread(target=_AcquireSharedSimple)
681 # Tell exclusive lock to release
684 # Wait for everything to finish
687 self.assertEqual(self.sl._count_pending(), 0)
691 self.assertEqual(self.done.get_nowait(), "shared")
693 self.assertEqual(self.done.get_nowait(), "exclusive")
696 self.assertEqual(self.done.get_nowait(), "shared2")
698 self.assertRaises(Queue.Empty, self.done.get_nowait)
701 class TestSSynchronizedDecorator(_ThreadedTestCase):
702 """Shared Lock Synchronized decorator test"""
705 _ThreadedTestCase.setUp(self)
707 @locking.ssynchronized(_decoratorlock)
708 def _doItExclusive(self):
709 self.assert_(_decoratorlock._is_owned())
712 @locking.ssynchronized(_decoratorlock, shared=1)
713 def _doItSharer(self):
714 self.assert_(_decoratorlock._is_owned(shared=1))
717 def testDecoratedFunctions(self):
718 self._doItExclusive()
719 self.assert_(not _decoratorlock._is_owned())
721 self.assert_(not _decoratorlock._is_owned())
723 def testSharersCanCoexist(self):
724 _decoratorlock.acquire(shared=1)
725 threading.Thread(target=self._doItSharer).start()
726 self.assert_(self.done.get(True, 1))
727 _decoratorlock.release()
730 def testExclusiveBlocksExclusive(self):
731 _decoratorlock.acquire()
732 self._addThread(target=self._doItExclusive)
733 # give it a bit of time to check that it's not actually doing anything
734 self.assertRaises(Queue.Empty, self.done.get_nowait)
735 _decoratorlock.release()
737 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
740 def testExclusiveBlocksSharer(self):
741 _decoratorlock.acquire()
742 self._addThread(target=self._doItSharer)
743 self.assertRaises(Queue.Empty, self.done.get_nowait)
744 _decoratorlock.release()
746 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
749 def testSharerBlocksExclusive(self):
750 _decoratorlock.acquire(shared=1)
751 self._addThread(target=self._doItExclusive)
752 self.assertRaises(Queue.Empty, self.done.get_nowait)
753 _decoratorlock.release()
755 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
758 class TestLockSet(_ThreadedTestCase):
762 _ThreadedTestCase.setUp(self)
766 """Helper to (re)initialize the lock set"""
767 self.resources = ['one', 'two', 'three']
768 self.ls = locking.LockSet(members=self.resources)
770 def testResources(self):
771 self.assertEquals(self.ls._names(), set(self.resources))
772 newls = locking.LockSet()
773 self.assertEquals(newls._names(), set())
775 def testAcquireRelease(self):
776 self.assert_(self.ls.acquire('one'))
777 self.assertEquals(self.ls._list_owned(), set(['one']))
779 self.assertEquals(self.ls._list_owned(), set())
780 self.assertEquals(self.ls.acquire(['one']), set(['one']))
781 self.assertEquals(self.ls._list_owned(), set(['one']))
783 self.assertEquals(self.ls._list_owned(), set())
784 self.ls.acquire(['one', 'two', 'three'])
785 self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
786 self.ls.release('one')
787 self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
788 self.ls.release(['three'])
789 self.assertEquals(self.ls._list_owned(), set(['two']))
791 self.assertEquals(self.ls._list_owned(), set())
792 self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
793 self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
795 self.assertEquals(self.ls._list_owned(), set())
797 def testNoDoubleAcquire(self):
798 self.ls.acquire('one')
799 self.assertRaises(AssertionError, self.ls.acquire, 'one')
800 self.assertRaises(AssertionError, self.ls.acquire, ['two'])
801 self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
803 self.ls.acquire(['one', 'three'])
804 self.ls.release('one')
805 self.assertRaises(AssertionError, self.ls.acquire, ['two'])
806 self.ls.release('three')
808 def testNoWrongRelease(self):
809 self.assertRaises(AssertionError, self.ls.release)
810 self.ls.acquire('one')
811 self.assertRaises(AssertionError, self.ls.release, 'two')
813 def testAddRemove(self):
815 self.assertEquals(self.ls._list_owned(), set())
816 self.assert_('four' in self.ls._names())
817 self.ls.add(['five', 'six', 'seven'], acquired=1)
818 self.assert_('five' in self.ls._names())
819 self.assert_('six' in self.ls._names())
820 self.assert_('seven' in self.ls._names())
821 self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
822 self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
823 self.assert_('five' not in self.ls._names())
824 self.assert_('six' not in self.ls._names())
825 self.assertEquals(self.ls._list_owned(), set(['seven']))
826 self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
827 self.ls.remove('seven')
828 self.assert_('seven' not in self.ls._names())
829 self.assertEquals(self.ls._list_owned(), set([]))
830 self.ls.acquire(None, shared=1)
831 self.assertRaises(AssertionError, self.ls.add, 'eight')
833 self.ls.acquire(None)
834 self.ls.add('eight', acquired=1)
835 self.assert_('eight' in self.ls._names())
836 self.assert_('eight' in self.ls._list_owned())
838 self.assert_('nine' in self.ls._names())
839 self.assert_('nine' not in self.ls._list_owned())
841 self.ls.remove(['two'])
842 self.assert_('two' not in self.ls._names())
843 self.ls.acquire('three')
844 self.assertEquals(self.ls.remove(['three']), ['three'])
845 self.assert_('three' not in self.ls._names())
846 self.assertEquals(self.ls.remove('three'), [])
847 self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
848 self.assert_('one' not in self.ls._names())
850 def testRemoveNonBlocking(self):
851 self.ls.acquire('one')
852 self.assertEquals(self.ls.remove('one'), ['one'])
853 self.ls.acquire(['two', 'three'])
854 self.assertEquals(self.ls.remove(['two', 'three']),
857 def testNoDoubleAdd(self):
858 self.assertRaises(errors.LockError, self.ls.add, 'two')
860 self.assertRaises(errors.LockError, self.ls.add, 'four')
862 def testNoWrongRemoves(self):
863 self.ls.acquire(['one', 'three'], shared=1)
864 # Cannot remove 'two' while holding something which is not a superset
865 self.assertRaises(AssertionError, self.ls.remove, 'two')
866 # Cannot remove 'three' as we are sharing it
867 self.assertRaises(AssertionError, self.ls.remove, 'three')
869 def testAcquireSetLock(self):
870 # acquire the set-lock exclusively
871 self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
872 self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
873 self.assertEquals(self.ls._is_owned(), True)
874 self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
875 # I can still add/remove elements...
876 self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
877 self.assert_(self.ls.add('six'))
880 self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
881 # adding new elements is not possible
882 self.assertRaises(AssertionError, self.ls.add, 'five')
885 def testAcquireWithRepetitions(self):
886 self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
887 set(['two', 'two', 'three']))
888 self.ls.release(['two', 'two'])
889 self.assertEquals(self.ls._list_owned(), set(['three']))
891 def testEmptyAcquire(self):
892 # Acquire an empty list of locks...
893 self.assertEquals(self.ls.acquire([]), set())
894 self.assertEquals(self.ls._list_owned(), set())
895 # New locks can still be addded
896 self.assert_(self.ls.add('six'))
897 # "re-acquiring" is not an issue, since we had really acquired nothing
898 self.assertEquals(self.ls.acquire([], shared=1), set())
899 self.assertEquals(self.ls._list_owned(), set())
900 # We haven't really acquired anything, so we cannot release
901 self.assertRaises(AssertionError, self.ls.release)
903 def _doLockSet(self, names, shared):
905 self.ls.acquire(names, shared=shared)
906 self.done.put('DONE')
908 except errors.LockError:
911 def _doAddSet(self, names):
913 self.ls.add(names, acquired=1)
914 self.done.put('DONE')
916 except errors.LockError:
919 def _doRemoveSet(self, names):
920 self.done.put(self.ls.remove(names))
923 def testConcurrentSharedAcquire(self):
924 self.ls.acquire(['one', 'two'], shared=1)
925 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
927 self.assertEqual(self.done.get_nowait(), 'DONE')
928 self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
930 self.assertEqual(self.done.get_nowait(), 'DONE')
931 self._addThread(target=self._doLockSet, args=('three', 1))
933 self.assertEqual(self.done.get_nowait(), 'DONE')
934 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
935 self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
936 self.assertRaises(Queue.Empty, self.done.get_nowait)
939 self.assertEqual(self.done.get_nowait(), 'DONE')
940 self.assertEqual(self.done.get_nowait(), 'DONE')
943 def testConcurrentExclusiveAcquire(self):
944 self.ls.acquire(['one', 'two'])
945 self._addThread(target=self._doLockSet, args=('three', 1))
947 self.assertEqual(self.done.get_nowait(), 'DONE')
948 self._addThread(target=self._doLockSet, args=('three', 0))
950 self.assertEqual(self.done.get_nowait(), 'DONE')
951 self.assertRaises(Queue.Empty, self.done.get_nowait)
952 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
953 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
954 self._addThread(target=self._doLockSet, args=('one', 0))
955 self._addThread(target=self._doLockSet, args=('one', 1))
956 self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
957 self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
958 self.assertRaises(Queue.Empty, self.done.get_nowait)
962 self.failUnlessEqual(self.done.get_nowait(), 'DONE')
965 def testSimpleAcquireTimeoutExpiring(self):
966 names = sorted(self.ls._names())
967 self.assert_(len(names) >= 3)
969 # Get name of first lock
972 # Get name of last lock
976 # Block first and try to lock it again
979 # Block last and try to lock all locks
982 # Block last and try to lock it again
986 for (wanted, block) in checks:
987 # Lock in exclusive mode
988 self.assert_(self.ls.acquire(block, shared=0))
991 # Try to get the same lock again with a timeout (should never succeed)
992 acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
994 self.done.put("acquired")
997 self.assert_(acquired is None)
998 self.assert_(not self.ls._list_owned())
999 self.assert_(not self.ls._is_owned())
1000 self.done.put("not acquired")
1002 self._addThread(target=_AcquireOne)
1004 # Wait for timeout in thread to expire
1007 # Release exclusive lock again
1010 self.assertEqual(self.done.get_nowait(), "not acquired")
1011 self.assertRaises(Queue.Empty, self.done.get_nowait)
1014 def testDelayedAndExpiringLockAcquire(self):
1016 self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1018 for expire in (False, True):
1019 names = sorted(self.ls._names())
1020 self.assertEqual(len(names), 8)
1022 lock_ev = dict([(i, threading.Event()) for i in names])
1024 # Lock all in exclusive mode
1025 self.assert_(self.ls.acquire(names, shared=0))
1028 # We'll wait at least 300ms per lock
1029 lockwait = len(names) * [0.3]
1031 # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1032 # this gives us up to 2.4s to fail.
1033 lockall_timeout = 0.4
1035 # This should finish rather quickly
1037 lockall_timeout = len(names) * 5.0
1040 def acquire_notification(name):
1042 self.done.put("getting %s" % name)
1047 if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1048 test_notify=acquire_notification):
1049 self.done.put("got all")
1052 self.done.put("timeout on all")
1055 for ev in lock_ev.values():
1058 t = self._addThread(target=_LockAll)
1060 for idx, name in enumerate(names):
1061 # Wait for actual acquire on this lock to start
1062 lock_ev[name].wait(10.0)
1064 if expire and t.isAlive():
1065 # Wait some time after getting the notification to make sure the lock
1066 # acquire will expire
1067 SafeSleep(lockwait[idx])
1069 self.ls.release(names=name)
1071 self.assert_(not self.ls._list_owned())
1076 # Not checking which locks were actually acquired. Doing so would be
1077 # too timing-dependant.
1078 self.assertEqual(self.done.get_nowait(), "timeout on all")
1081 self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1082 self.assertEqual(self.done.get_nowait(), "got all")
1083 self.assertRaises(Queue.Empty, self.done.get_nowait)
1086 def testConcurrentRemove(self):
1088 self.ls.acquire(['one', 'two', 'four'])
1089 self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1090 self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1091 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1092 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1093 self.assertRaises(Queue.Empty, self.done.get_nowait)
1094 self.ls.remove('one')
1098 self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1099 self.ls.add(['five', 'six'], acquired=1)
1100 self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1101 self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1102 self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1103 self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1104 self.ls.remove('five')
1108 self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1109 self.ls.acquire(['three', 'four'])
1110 self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1111 self.assertRaises(Queue.Empty, self.done.get_nowait)
1112 self.ls.remove('four')
1114 self.assertEqual(self.done.get_nowait(), ['six'])
1115 self._addThread(target=self._doRemoveSet, args=(['two']))
1117 self.assertEqual(self.done.get_nowait(), ['two'])
1123 def testConcurrentSharedSetLock(self):
1124 # share the set-lock...
1125 self.ls.acquire(None, shared=1)
1126 # ...another thread can share it too
1127 self._addThread(target=self._doLockSet, args=(None, 1))
1129 self.assertEqual(self.done.get_nowait(), 'DONE')
1130 # ...or just share some elements
1131 self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1133 self.assertEqual(self.done.get_nowait(), 'DONE')
1134 # ...but not add new ones or remove any
1135 t = self._addThread(target=self._doAddSet, args=(['nine']))
1136 self._addThread(target=self._doRemoveSet, args=(['two'], ))
1137 self.assertRaises(Queue.Empty, self.done.get_nowait)
1138 # this just releases the set-lock
1141 self.assertEqual(self.done.get_nowait(), 'DONE')
1142 # release the lock on the actual elements so remove() can proceed too
1145 self.failUnlessEqual(self.done.get_nowait(), ['two'])
1150 def testConcurrentExclusiveSetLock(self):
1151 # acquire the set-lock...
1152 self.ls.acquire(None, shared=0)
1153 # ...no one can do anything else
1154 self._addThread(target=self._doLockSet, args=(None, 1))
1155 self._addThread(target=self._doLockSet, args=(None, 0))
1156 self._addThread(target=self._doLockSet, args=(['three'], 0))
1157 self._addThread(target=self._doLockSet, args=(['two'], 1))
1158 self._addThread(target=self._doAddSet, args=(['nine']))
1159 self.assertRaises(Queue.Empty, self.done.get_nowait)
1163 self.assertEqual(self.done.get(True, 1), 'DONE')
1168 def testConcurrentSetLockAdd(self):
1169 self.ls.acquire('one')
1170 # Another thread wants the whole SetLock
1171 self._addThread(target=self._doLockSet, args=(None, 0))
1172 self._addThread(target=self._doLockSet, args=(None, 1))
1173 self.assertRaises(Queue.Empty, self.done.get_nowait)
1174 self.assertRaises(AssertionError, self.ls.add, 'four')
1177 self.assertEqual(self.done.get_nowait(), 'DONE')
1178 self.assertEqual(self.done.get_nowait(), 'DONE')
1179 self.ls.acquire(None)
1180 self._addThread(target=self._doLockSet, args=(None, 0))
1181 self._addThread(target=self._doLockSet, args=(None, 1))
1182 self.assertRaises(Queue.Empty, self.done.get_nowait)
1184 self.ls.add('five', acquired=1)
1185 self.ls.add('six', acquired=1, shared=1)
1186 self.assertEquals(self.ls._list_owned(),
1187 set(['one', 'two', 'three', 'five', 'six']))
1188 self.assertEquals(self.ls._is_owned(), True)
1189 self.assertEquals(self.ls._names(),
1190 set(['one', 'two', 'three', 'four', 'five', 'six']))
1193 self.assertEqual(self.done.get_nowait(), 'DONE')
1194 self.assertEqual(self.done.get_nowait(), 'DONE')
1198 def testEmptyLockSet(self):
1200 self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1202 self.ls.remove(['one', 'two', 'three'])
1203 # and adds/locks by another thread still wait
1204 self._addThread(target=self._doAddSet, args=(['nine']))
1205 self._addThread(target=self._doLockSet, args=(None, 1))
1206 self._addThread(target=self._doLockSet, args=(None, 0))
1207 self.assertRaises(Queue.Empty, self.done.get_nowait)
1211 self.assertEqual(self.done.get_nowait(), 'DONE')
1213 self.assertEqual(self.ls.remove(['nine']), ['nine'])
1215 self.assertEqual(self.ls.acquire(None, shared=1), set())
1216 # other sharers can go, adds still wait
1217 self._addThread(target=self._doLockSet, args=(None, 1))
1219 self.assertEqual(self.done.get_nowait(), 'DONE')
1220 self._addThread(target=self._doAddSet, args=(['nine']))
1221 self.assertRaises(Queue.Empty, self.done.get_nowait)
1224 self.assertEqual(self.done.get_nowait(), 'DONE')
1228 class TestGanetiLockManager(_ThreadedTestCase):
1231 _ThreadedTestCase.setUp(self)
1232 self.nodes=['n1', 'n2']
1233 self.instances=['i1', 'i2', 'i3']
1234 self.GL = locking.GanetiLockManager(nodes=self.nodes,
1235 instances=self.instances)
1238 # Don't try this at home...
1239 locking.GanetiLockManager._instance = None
1241 def testLockingConstants(self):
1242 # The locking library internally cheats by assuming its constants have some
1243 # relationships with each other. Check those hold true.
1244 # This relationship is also used in the Processor to recursively acquire
1245 # the right locks. Again, please don't break it.
1246 for i in range(len(locking.LEVELS)):
1247 self.assertEqual(i, locking.LEVELS[i])
1249 def testDoubleGLFails(self):
1250 self.assertRaises(AssertionError, locking.GanetiLockManager)
1252 def testLockNames(self):
1253 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1254 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1255 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1256 set(self.instances))
1258 def testInitAndResources(self):
1259 locking.GanetiLockManager._instance = None
1260 self.GL = locking.GanetiLockManager()
1261 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1262 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1263 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1265 locking.GanetiLockManager._instance = None
1266 self.GL = locking.GanetiLockManager(nodes=self.nodes)
1267 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1268 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1269 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1271 locking.GanetiLockManager._instance = None
1272 self.GL = locking.GanetiLockManager(instances=self.instances)
1273 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1274 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1275 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1276 set(self.instances))
1278 def testAcquireRelease(self):
1279 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1280 self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1281 self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1282 self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1283 self.GL.release(locking.LEVEL_NODE, ['n2'])
1284 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1285 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1286 self.GL.release(locking.LEVEL_NODE)
1287 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1288 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1289 self.GL.release(locking.LEVEL_INSTANCE)
1290 self.assertRaises(errors.LockError, self.GL.acquire,
1291 locking.LEVEL_INSTANCE, ['i5'])
1292 self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1293 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1295 def testAcquireWholeSets(self):
1296 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1297 self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1298 set(self.instances))
1299 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1300 set(self.instances))
1301 self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1303 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1305 self.GL.release(locking.LEVEL_NODE)
1306 self.GL.release(locking.LEVEL_INSTANCE)
1307 self.GL.release(locking.LEVEL_CLUSTER)
1309 def testAcquireWholeAndPartial(self):
1310 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1311 self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1312 set(self.instances))
1313 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1314 set(self.instances))
1315 self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1317 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1319 self.GL.release(locking.LEVEL_NODE)
1320 self.GL.release(locking.LEVEL_INSTANCE)
1321 self.GL.release(locking.LEVEL_CLUSTER)
1323 def testBGLDependency(self):
1324 self.assertRaises(AssertionError, self.GL.acquire,
1325 locking.LEVEL_NODE, ['n1', 'n2'])
1326 self.assertRaises(AssertionError, self.GL.acquire,
1327 locking.LEVEL_INSTANCE, ['i3'])
1328 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1329 self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1330 self.assertRaises(AssertionError, self.GL.release,
1331 locking.LEVEL_CLUSTER, ['BGL'])
1332 self.assertRaises(AssertionError, self.GL.release,
1333 locking.LEVEL_CLUSTER)
1334 self.GL.release(locking.LEVEL_NODE)
1335 self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1336 self.assertRaises(AssertionError, self.GL.release,
1337 locking.LEVEL_CLUSTER, ['BGL'])
1338 self.assertRaises(AssertionError, self.GL.release,
1339 locking.LEVEL_CLUSTER)
1340 self.GL.release(locking.LEVEL_INSTANCE)
1342 def testWrongOrder(self):
1343 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1344 self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1345 self.assertRaises(AssertionError, self.GL.acquire,
1346 locking.LEVEL_NODE, ['n1'])
1347 self.assertRaises(AssertionError, self.GL.acquire,
1348 locking.LEVEL_INSTANCE, ['i2'])
1350 # Helper function to run as a thread that shared the BGL and then acquires
1351 # some locks at another level.
1352 def _doLock(self, level, names, shared):
1354 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1355 self.GL.acquire(level, names, shared=shared)
1356 self.done.put('DONE')
1357 self.GL.release(level)
1358 self.GL.release(locking.LEVEL_CLUSTER)
1359 except errors.LockError:
1360 self.done.put('ERR')
1363 def testConcurrency(self):
1364 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1365 self._addThread(target=self._doLock,
1366 args=(locking.LEVEL_INSTANCE, 'i1', 1))
1368 self.assertEqual(self.done.get_nowait(), 'DONE')
1369 self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1370 self._addThread(target=self._doLock,
1371 args=(locking.LEVEL_INSTANCE, 'i1', 1))
1373 self.assertEqual(self.done.get_nowait(), 'DONE')
1374 self._addThread(target=self._doLock,
1375 args=(locking.LEVEL_INSTANCE, 'i3', 1))
1376 self.assertRaises(Queue.Empty, self.done.get_nowait)
1377 self.GL.release(locking.LEVEL_INSTANCE)
1379 self.assertEqual(self.done.get_nowait(), 'DONE')
1380 self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1381 self._addThread(target=self._doLock,
1382 args=(locking.LEVEL_INSTANCE, 'i2', 1))
1384 self.assertEqual(self.done.get_nowait(), 'DONE')
1385 self._addThread(target=self._doLock,
1386 args=(locking.LEVEL_INSTANCE, 'i2', 0))
1387 self.assertRaises(Queue.Empty, self.done.get_nowait)
1388 self.GL.release(locking.LEVEL_INSTANCE)
1390 self.assertEqual(self.done.get(True, 1), 'DONE')
1391 self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1394 if __name__ == '__main__':
1395 testutils.GanetiTestProgram()