4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the locking module"""
31 from ganeti import locking
32 from ganeti import errors
35 # This is used to test the ssynchronize decorator.
36 # Since it's passed as input to a decorator it must be declared as a global.
37 _decoratorlock = locking.SharedLock()
39 #: List for looping tests
44 """Decorator for executing a function many times"""
45 def wrapper(*args, **kwargs):
51 class _ThreadedTestCase(unittest.TestCase):
52 """Test class that supports adding/waiting on threads"""
54 unittest.TestCase.setUp(self)
55 self.done = Queue.Queue(0)
58 def _addThread(self, *args, **kwargs):
59 """Create and remember a new thread"""
60 t = threading.Thread(*args, **kwargs)
61 self.threads.append(t)
65 def _waitThreads(self):
66 """Wait for all our threads to finish"""
67 for t in self.threads:
69 self.failIf(t.isAlive())
73 class _ConditionTestCase(_ThreadedTestCase):
74 """Common test case for conditions"""
77 _ThreadedTestCase.setUp(self)
78 self.lock = threading.Lock()
79 self.cond = cls(self.lock)
81 def _testAcquireRelease(self):
82 self.assert_(not self.cond._is_owned())
83 self.assertRaises(RuntimeError, self.cond.wait)
84 self.assertRaises(RuntimeError, self.cond.notifyAll)
87 self.assert_(self.cond._is_owned())
89 self.assert_(self.cond._is_owned())
92 self.assert_(not self.cond._is_owned())
93 self.assertRaises(RuntimeError, self.cond.wait)
94 self.assertRaises(RuntimeError, self.cond.notifyAll)
96 def _testNotification(self):
101 self.cond.notifyAll()
106 self._addThread(target=_NotifyAll)
107 self.assertEqual(self.done.get(True, 1), "NE")
108 self.assertRaises(Queue.Empty, self.done.get_nowait)
110 self.assertEqual(self.done.get(True, 1), "NA")
111 self.assertEqual(self.done.get(True, 1), "NN")
112 self.assert_(self.cond._is_owned())
114 self.assert_(not self.cond._is_owned())
117 class TestSingleNotifyPipeCondition(_ConditionTestCase):
118 """SingleNotifyPipeCondition tests"""
121 _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
123 def testAcquireRelease(self):
124 self._testAcquireRelease()
126 def testNotification(self):
127 self._testNotification()
129 def testWaitReuse(self):
135 def testNoNotifyReuse(self):
137 self.cond.notifyAll()
138 self.assertRaises(RuntimeError, self.cond.wait)
139 self.assertRaises(RuntimeError, self.cond.notifyAll)
143 class TestPipeCondition(_ConditionTestCase):
144 """PipeCondition tests"""
147 _ConditionTestCase.setUp(self, locking.PipeCondition)
149 def testAcquireRelease(self):
150 self._testAcquireRelease()
152 def testNotification(self):
153 self._testNotification()
155 def _TestWait(self, fn):
156 self._addThread(target=fn)
157 self._addThread(target=fn)
158 self._addThread(target=fn)
160 # Wait for threads to be waiting
161 self.assertEqual(self.done.get(True, 1), "A")
162 self.assertEqual(self.done.get(True, 1), "A")
163 self.assertEqual(self.done.get(True, 1), "A")
165 self.assertRaises(Queue.Empty, self.done.get_nowait)
168 self.assertEqual(self.cond._nwaiters, 3)
169 # This new thread can"t acquire the lock, and thus call wait, before we
171 self._addThread(target=fn)
172 self.cond.notifyAll()
173 self.assertRaises(Queue.Empty, self.done.get_nowait)
176 # We should now get 3 W and 1 A (for the new thread) in whatever order
180 got = self.done.get(True, 1)
186 self.fail("Got %s on the done queue" % got)
188 self.assertEqual(w, 3)
189 self.assertEqual(a, 1)
192 self.cond.notifyAll()
195 self.assertEqual(self.done.get_nowait(), "W")
196 self.assertRaises(Queue.Empty, self.done.get_nowait)
198 def testBlockingWait(self):
206 self._TestWait(_BlockingWait)
208 def testLongTimeoutWait(self):
216 self._TestWait(_Helper)
218 def _TimeoutWait(self, timeout, check):
220 self.cond.wait(timeout)
224 def testShortTimeoutWait(self):
225 self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
226 self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
228 self.assertEqual(self.done.get_nowait(), "T1")
229 self.assertEqual(self.done.get_nowait(), "T1")
230 self.assertRaises(Queue.Empty, self.done.get_nowait)
232 def testZeroTimeoutWait(self):
233 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
234 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
235 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
237 self.assertEqual(self.done.get_nowait(), "T0")
238 self.assertEqual(self.done.get_nowait(), "T0")
239 self.assertEqual(self.done.get_nowait(), "T0")
240 self.assertRaises(Queue.Empty, self.done.get_nowait)
243 class TestSharedLock(_ThreadedTestCase):
244 """SharedLock tests"""
247 _ThreadedTestCase.setUp(self)
248 self.sl = locking.SharedLock()
250 def testSequenceAndOwnership(self):
251 self.assert_(not self.sl._is_owned())
252 self.sl.acquire(shared=1)
253 self.assert_(self.sl._is_owned())
254 self.assert_(self.sl._is_owned(shared=1))
255 self.assert_(not self.sl._is_owned(shared=0))
257 self.assert_(not self.sl._is_owned())
259 self.assert_(self.sl._is_owned())
260 self.assert_(not self.sl._is_owned(shared=1))
261 self.assert_(self.sl._is_owned(shared=0))
263 self.assert_(not self.sl._is_owned())
264 self.sl.acquire(shared=1)
265 self.assert_(self.sl._is_owned())
266 self.assert_(self.sl._is_owned(shared=1))
267 self.assert_(not self.sl._is_owned(shared=0))
269 self.assert_(not self.sl._is_owned())
271 def testBooleanValue(self):
272 # semaphores are supposed to return a true value on a successful acquire
273 self.assert_(self.sl.acquire(shared=1))
275 self.assert_(self.sl.acquire())
278 def testDoubleLockingStoE(self):
279 self.sl.acquire(shared=1)
280 self.assertRaises(AssertionError, self.sl.acquire)
282 def testDoubleLockingEtoS(self):
284 self.assertRaises(AssertionError, self.sl.acquire, shared=1)
286 def testDoubleLockingStoS(self):
287 self.sl.acquire(shared=1)
288 self.assertRaises(AssertionError, self.sl.acquire, shared=1)
290 def testDoubleLockingEtoE(self):
292 self.assertRaises(AssertionError, self.sl.acquire)
294 # helper functions: called in a separate thread they acquire the lock, send
295 # their identifier on the done queue, then release it.
296 def _doItSharer(self):
298 self.sl.acquire(shared=1)
301 except errors.LockError:
304 def _doItExclusive(self):
309 except errors.LockError:
312 def _doItDelete(self):
316 except errors.LockError:
319 def testSharersCanCoexist(self):
320 self.sl.acquire(shared=1)
321 threading.Thread(target=self._doItSharer).start()
322 self.assert_(self.done.get(True, 1))
326 def testExclusiveBlocksExclusive(self):
328 self._addThread(target=self._doItExclusive)
329 self.assertRaises(Queue.Empty, self.done.get_nowait)
332 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
335 def testExclusiveBlocksDelete(self):
337 self._addThread(target=self._doItDelete)
338 self.assertRaises(Queue.Empty, self.done.get_nowait)
341 self.failUnlessEqual(self.done.get_nowait(), 'DEL')
342 self.sl = locking.SharedLock()
345 def testExclusiveBlocksSharer(self):
347 self._addThread(target=self._doItSharer)
348 self.assertRaises(Queue.Empty, self.done.get_nowait)
351 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
354 def testSharerBlocksExclusive(self):
355 self.sl.acquire(shared=1)
356 self._addThread(target=self._doItExclusive)
357 self.assertRaises(Queue.Empty, self.done.get_nowait)
360 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
363 def testSharerBlocksDelete(self):
364 self.sl.acquire(shared=1)
365 self._addThread(target=self._doItDelete)
366 self.assertRaises(Queue.Empty, self.done.get_nowait)
369 self.failUnlessEqual(self.done.get_nowait(), 'DEL')
370 self.sl = locking.SharedLock()
373 def testWaitingExclusiveBlocksSharer(self):
374 """SKIPPED testWaitingExclusiveBlockSharer"""
377 self.sl.acquire(shared=1)
378 # the lock is acquired in shared mode...
379 self._addThread(target=self._doItExclusive)
380 # ...but now an exclusive is waiting...
381 self._addThread(target=self._doItSharer)
382 # ...so the sharer should be blocked as well
383 self.assertRaises(Queue.Empty, self.done.get_nowait)
386 # The exclusive passed before
387 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
388 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
391 def testWaitingSharerBlocksExclusive(self):
392 """SKIPPED testWaitingSharerBlocksExclusive"""
396 # the lock is acquired in exclusive mode...
397 self._addThread(target=self._doItSharer)
398 # ...but now a sharer is waiting...
399 self._addThread(target=self._doItExclusive)
400 # ...the exclusive is waiting too...
401 self.assertRaises(Queue.Empty, self.done.get_nowait)
404 # The sharer passed before
405 self.assertEqual(self.done.get_nowait(), 'SHR')
406 self.assertEqual(self.done.get_nowait(), 'EXC')
408 def testDelete(self):
410 self.assertRaises(errors.LockError, self.sl.acquire)
411 self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
412 self.assertRaises(errors.LockError, self.sl.delete)
414 def testDeleteTimeout(self):
415 self.sl.delete(timeout=60)
417 def testNoDeleteIfSharer(self):
418 self.sl.acquire(shared=1)
419 self.assertRaises(AssertionError, self.sl.delete)
422 def testDeletePendingSharersExclusiveDelete(self):
424 self._addThread(target=self._doItSharer)
425 self._addThread(target=self._doItSharer)
426 self._addThread(target=self._doItExclusive)
427 self._addThread(target=self._doItDelete)
430 # The threads who were pending return ERR
432 self.assertEqual(self.done.get_nowait(), 'ERR')
433 self.sl = locking.SharedLock()
436 def testDeletePendingDeleteExclusiveSharers(self):
438 self._addThread(target=self._doItDelete)
439 self._addThread(target=self._doItExclusive)
440 self._addThread(target=self._doItSharer)
441 self._addThread(target=self._doItSharer)
444 # The two threads who were pending return both ERR
445 self.assertEqual(self.done.get_nowait(), 'ERR')
446 self.assertEqual(self.done.get_nowait(), 'ERR')
447 self.assertEqual(self.done.get_nowait(), 'ERR')
448 self.assertEqual(self.done.get_nowait(), 'ERR')
449 self.sl = locking.SharedLock()
452 def testExclusiveAcquireTimeout(self):
453 for shared in [0, 1]:
454 on_queue = threading.Event()
455 release_exclusive = threading.Event()
457 def _LockExclusive():
458 self.sl.acquire(shared=0, test_notify=on_queue.set)
459 self.done.put("A: start wait")
460 release_exclusive.wait()
461 self.done.put("A: end wait")
464 # Start thread to hold lock in exclusive mode
465 self._addThread(target=_LockExclusive)
467 # Wait for wait to begin
468 self.assertEqual(self.done.get(timeout=60), "A: start wait")
470 # Wait up to 60s to get lock, but release exclusive lock as soon as we're
472 self.failUnless(self.sl.acquire(shared=shared, timeout=60,
473 test_notify=release_exclusive.set))
475 self.done.put("got 2nd")
480 self.assertEqual(self.done.get_nowait(), "A: end wait")
481 self.assertEqual(self.done.get_nowait(), "got 2nd")
482 self.assertRaises(Queue.Empty, self.done.get_nowait)
485 def testAcquireExpiringTimeout(self):
486 def _AcquireWithTimeout(shared, timeout):
487 if not self.sl.acquire(shared=shared, timeout=timeout):
488 self.done.put("timeout")
490 for shared in [0, 1]:
494 # Start shared acquires with timeout between 0 and 20 ms
496 self._addThread(target=_AcquireWithTimeout,
497 args=(shared, i * 2.0 / 1000.0))
499 # Wait for threads to finish (makes sure the acquire timeout expires
500 # before releasing the lock)
507 self.assertEqual(self.done.get_nowait(), "timeout")
509 self.assertRaises(Queue.Empty, self.done.get_nowait)
512 def testSharedSkipExclusiveAcquires(self):
513 # Tests whether shared acquires jump in front of exclusive acquires in the
516 def _Acquire(shared, name, notify_ev, wait_ev):
518 notify_fn = notify_ev.set
525 if not self.sl.acquire(shared=shared, test_notify=notify_fn):
531 # Get exclusive lock while we fill the queue
539 # Add acquires using threading.Event for synchronization. They'll be
540 # acquired exactly in the order defined in this list.
541 acquires = (shrcnt1 * [(1, "shared 1")] +
542 3 * [(0, "exclusive 1")] +
543 shrcnt2 * [(1, "shared 2")] +
544 shrcnt3 * [(1, "shared 3")] +
545 shrcnt4 * [(1, "shared 4")] +
546 3 * [(0, "exclusive 2")])
551 for args in acquires:
552 ev_cur = threading.Event()
553 self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
556 # Wait for last acquire to start
559 # Expect 6 pending exclusive acquires and 1 for all shared acquires
561 self.assertEqual(self.sl._count_pending(), 7)
563 # Release exclusive lock and wait
569 for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
570 # Shared locks aren't guaranteed to be notified in order, but they'll be
572 tmp = self.done.get_nowait()
573 if tmp == "shared 1":
575 elif tmp == "shared 2":
577 elif tmp == "shared 3":
579 elif tmp == "shared 4":
581 self.assertEqual(shrcnt1, 0)
582 self.assertEqual(shrcnt2, 0)
583 self.assertEqual(shrcnt3, 0)
584 self.assertEqual(shrcnt3, 0)
587 self.assertEqual(self.done.get_nowait(), "exclusive 1")
590 self.assertEqual(self.done.get_nowait(), "exclusive 2")
592 self.assertRaises(Queue.Empty, self.done.get_nowait)
595 def testMixedAcquireTimeout(self):
596 sync = threading.Condition()
598 def _AcquireShared(ev):
599 if not self.sl.acquire(shared=1, timeout=None):
602 self.done.put("shared")
607 # Wait for notification
619 ev = threading.Event()
620 self._addThread(target=_AcquireShared, args=(ev, ))
623 # Wait for all acquires to finish
627 self.assertEqual(self.sl._count_pending(), 0)
629 # Try to get exclusive lock
630 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
632 # Acquire exclusive without timeout
633 exclsync = threading.Condition()
634 exclev = threading.Event()
636 def _AcquireExclusive():
637 if not self.sl.acquire(shared=0):
640 self.done.put("exclusive")
653 self._addThread(target=_AcquireExclusive)
655 # Try to get exclusive lock
656 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
658 # Make all shared holders release their locks
665 # Wait for exclusive acquire to succeed
668 self.assertEqual(self.sl._count_pending(), 0)
670 # Try to get exclusive lock
671 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
673 def _AcquireSharedSimple():
674 if self.sl.acquire(shared=1, timeout=None):
675 self.done.put("shared2")
679 self._addThread(target=_AcquireSharedSimple)
681 # Tell exclusive lock to release
688 # Wait for everything to finish
691 self.assertEqual(self.sl._count_pending(), 0)
695 self.assertEqual(self.done.get_nowait(), "shared")
697 self.assertEqual(self.done.get_nowait(), "exclusive")
700 self.assertEqual(self.done.get_nowait(), "shared2")
702 self.assertRaises(Queue.Empty, self.done.get_nowait)
705 class TestSSynchronizedDecorator(_ThreadedTestCase):
706 """Shared Lock Synchronized decorator test"""
709 _ThreadedTestCase.setUp(self)
711 @locking.ssynchronized(_decoratorlock)
712 def _doItExclusive(self):
713 self.assert_(_decoratorlock._is_owned())
716 @locking.ssynchronized(_decoratorlock, shared=1)
717 def _doItSharer(self):
718 self.assert_(_decoratorlock._is_owned(shared=1))
721 def testDecoratedFunctions(self):
722 self._doItExclusive()
723 self.assert_(not _decoratorlock._is_owned())
725 self.assert_(not _decoratorlock._is_owned())
727 def testSharersCanCoexist(self):
728 _decoratorlock.acquire(shared=1)
729 threading.Thread(target=self._doItSharer).start()
730 self.assert_(self.done.get(True, 1))
731 _decoratorlock.release()
734 def testExclusiveBlocksExclusive(self):
735 _decoratorlock.acquire()
736 self._addThread(target=self._doItExclusive)
737 # give it a bit of time to check that it's not actually doing anything
738 self.assertRaises(Queue.Empty, self.done.get_nowait)
739 _decoratorlock.release()
741 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
744 def testExclusiveBlocksSharer(self):
745 _decoratorlock.acquire()
746 self._addThread(target=self._doItSharer)
747 self.assertRaises(Queue.Empty, self.done.get_nowait)
748 _decoratorlock.release()
750 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
753 def testSharerBlocksExclusive(self):
754 _decoratorlock.acquire(shared=1)
755 self._addThread(target=self._doItExclusive)
756 self.assertRaises(Queue.Empty, self.done.get_nowait)
757 _decoratorlock.release()
759 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
762 class TestLockSet(_ThreadedTestCase):
766 _ThreadedTestCase.setUp(self)
770 """Helper to (re)initialize the lock set"""
771 self.resources = ['one', 'two', 'three']
772 self.ls = locking.LockSet(members=self.resources)
774 def testResources(self):
775 self.assertEquals(self.ls._names(), set(self.resources))
776 newls = locking.LockSet()
777 self.assertEquals(newls._names(), set())
779 def testAcquireRelease(self):
780 self.assert_(self.ls.acquire('one'))
781 self.assertEquals(self.ls._list_owned(), set(['one']))
783 self.assertEquals(self.ls._list_owned(), set())
784 self.assertEquals(self.ls.acquire(['one']), set(['one']))
785 self.assertEquals(self.ls._list_owned(), set(['one']))
787 self.assertEquals(self.ls._list_owned(), set())
788 self.ls.acquire(['one', 'two', 'three'])
789 self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
790 self.ls.release('one')
791 self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
792 self.ls.release(['three'])
793 self.assertEquals(self.ls._list_owned(), set(['two']))
795 self.assertEquals(self.ls._list_owned(), set())
796 self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
797 self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
799 self.assertEquals(self.ls._list_owned(), set())
801 def testNoDoubleAcquire(self):
802 self.ls.acquire('one')
803 self.assertRaises(AssertionError, self.ls.acquire, 'one')
804 self.assertRaises(AssertionError, self.ls.acquire, ['two'])
805 self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
807 self.ls.acquire(['one', 'three'])
808 self.ls.release('one')
809 self.assertRaises(AssertionError, self.ls.acquire, ['two'])
810 self.ls.release('three')
812 def testNoWrongRelease(self):
813 self.assertRaises(AssertionError, self.ls.release)
814 self.ls.acquire('one')
815 self.assertRaises(AssertionError, self.ls.release, 'two')
817 def testAddRemove(self):
819 self.assertEquals(self.ls._list_owned(), set())
820 self.assert_('four' in self.ls._names())
821 self.ls.add(['five', 'six', 'seven'], acquired=1)
822 self.assert_('five' in self.ls._names())
823 self.assert_('six' in self.ls._names())
824 self.assert_('seven' in self.ls._names())
825 self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
826 self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
827 self.assert_('five' not in self.ls._names())
828 self.assert_('six' not in self.ls._names())
829 self.assertEquals(self.ls._list_owned(), set(['seven']))
830 self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
831 self.ls.remove('seven')
832 self.assert_('seven' not in self.ls._names())
833 self.assertEquals(self.ls._list_owned(), set([]))
834 self.ls.acquire(None, shared=1)
835 self.assertRaises(AssertionError, self.ls.add, 'eight')
837 self.ls.acquire(None)
838 self.ls.add('eight', acquired=1)
839 self.assert_('eight' in self.ls._names())
840 self.assert_('eight' in self.ls._list_owned())
842 self.assert_('nine' in self.ls._names())
843 self.assert_('nine' not in self.ls._list_owned())
845 self.ls.remove(['two'])
846 self.assert_('two' not in self.ls._names())
847 self.ls.acquire('three')
848 self.assertEquals(self.ls.remove(['three']), ['three'])
849 self.assert_('three' not in self.ls._names())
850 self.assertEquals(self.ls.remove('three'), [])
851 self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
852 self.assert_('one' not in self.ls._names())
854 def testRemoveNonBlocking(self):
855 self.ls.acquire('one')
856 self.assertEquals(self.ls.remove('one'), ['one'])
857 self.ls.acquire(['two', 'three'])
858 self.assertEquals(self.ls.remove(['two', 'three']),
861 def testNoDoubleAdd(self):
862 self.assertRaises(errors.LockError, self.ls.add, 'two')
864 self.assertRaises(errors.LockError, self.ls.add, 'four')
866 def testNoWrongRemoves(self):
867 self.ls.acquire(['one', 'three'], shared=1)
868 # Cannot remove 'two' while holding something which is not a superset
869 self.assertRaises(AssertionError, self.ls.remove, 'two')
870 # Cannot remove 'three' as we are sharing it
871 self.assertRaises(AssertionError, self.ls.remove, 'three')
873 def testAcquireSetLock(self):
874 # acquire the set-lock exclusively
875 self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
876 self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
877 self.assertEquals(self.ls._is_owned(), True)
878 self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
879 # I can still add/remove elements...
880 self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
881 self.assert_(self.ls.add('six'))
884 self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
885 # adding new elements is not possible
886 self.assertRaises(AssertionError, self.ls.add, 'five')
889 def testAcquireWithRepetitions(self):
890 self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
891 set(['two', 'two', 'three']))
892 self.ls.release(['two', 'two'])
893 self.assertEquals(self.ls._list_owned(), set(['three']))
895 def testEmptyAcquire(self):
896 # Acquire an empty list of locks...
897 self.assertEquals(self.ls.acquire([]), set())
898 self.assertEquals(self.ls._list_owned(), set())
899 # New locks can still be addded
900 self.assert_(self.ls.add('six'))
901 # "re-acquiring" is not an issue, since we had really acquired nothing
902 self.assertEquals(self.ls.acquire([], shared=1), set())
903 self.assertEquals(self.ls._list_owned(), set())
904 # We haven't really acquired anything, so we cannot release
905 self.assertRaises(AssertionError, self.ls.release)
907 def _doLockSet(self, names, shared):
909 self.ls.acquire(names, shared=shared)
910 self.done.put('DONE')
912 except errors.LockError:
915 def _doAddSet(self, names):
917 self.ls.add(names, acquired=1)
918 self.done.put('DONE')
920 except errors.LockError:
923 def _doRemoveSet(self, names):
924 self.done.put(self.ls.remove(names))
927 def testConcurrentSharedAcquire(self):
928 self.ls.acquire(['one', 'two'], shared=1)
929 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
931 self.assertEqual(self.done.get_nowait(), 'DONE')
932 self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
934 self.assertEqual(self.done.get_nowait(), 'DONE')
935 self._addThread(target=self._doLockSet, args=('three', 1))
937 self.assertEqual(self.done.get_nowait(), 'DONE')
938 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
939 self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
940 self.assertRaises(Queue.Empty, self.done.get_nowait)
943 self.assertEqual(self.done.get_nowait(), 'DONE')
944 self.assertEqual(self.done.get_nowait(), 'DONE')
947 def testConcurrentExclusiveAcquire(self):
948 self.ls.acquire(['one', 'two'])
949 self._addThread(target=self._doLockSet, args=('three', 1))
951 self.assertEqual(self.done.get_nowait(), 'DONE')
952 self._addThread(target=self._doLockSet, args=('three', 0))
954 self.assertEqual(self.done.get_nowait(), 'DONE')
955 self.assertRaises(Queue.Empty, self.done.get_nowait)
956 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
957 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
958 self._addThread(target=self._doLockSet, args=('one', 0))
959 self._addThread(target=self._doLockSet, args=('one', 1))
960 self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
961 self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
962 self.assertRaises(Queue.Empty, self.done.get_nowait)
966 self.failUnlessEqual(self.done.get_nowait(), 'DONE')
969 def testConcurrentRemove(self):
971 self.ls.acquire(['one', 'two', 'four'])
972 self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
973 self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
974 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
975 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
976 self.assertRaises(Queue.Empty, self.done.get_nowait)
977 self.ls.remove('one')
981 self.failUnlessEqual(self.done.get_nowait(), 'ERR')
982 self.ls.add(['five', 'six'], acquired=1)
983 self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
984 self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
985 self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
986 self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
987 self.ls.remove('five')
991 self.failUnlessEqual(self.done.get_nowait(), 'DONE')
992 self.ls.acquire(['three', 'four'])
993 self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
994 self.assertRaises(Queue.Empty, self.done.get_nowait)
995 self.ls.remove('four')
997 self.assertEqual(self.done.get_nowait(), ['six'])
998 self._addThread(target=self._doRemoveSet, args=(['two']))
1000 self.assertEqual(self.done.get_nowait(), ['two'])
1006 def testConcurrentSharedSetLock(self):
1007 # share the set-lock...
1008 self.ls.acquire(None, shared=1)
1009 # ...another thread can share it too
1010 self._addThread(target=self._doLockSet, args=(None, 1))
1012 self.assertEqual(self.done.get_nowait(), 'DONE')
1013 # ...or just share some elements
1014 self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1016 self.assertEqual(self.done.get_nowait(), 'DONE')
1017 # ...but not add new ones or remove any
1018 t = self._addThread(target=self._doAddSet, args=(['nine']))
1019 self._addThread(target=self._doRemoveSet, args=(['two'], ))
1020 self.assertRaises(Queue.Empty, self.done.get_nowait)
1021 # this just releases the set-lock
1024 self.assertEqual(self.done.get_nowait(), 'DONE')
1025 # release the lock on the actual elements so remove() can proceed too
1028 self.failUnlessEqual(self.done.get_nowait(), ['two'])
1033 def testConcurrentExclusiveSetLock(self):
1034 # acquire the set-lock...
1035 self.ls.acquire(None, shared=0)
1036 # ...no one can do anything else
1037 self._addThread(target=self._doLockSet, args=(None, 1))
1038 self._addThread(target=self._doLockSet, args=(None, 0))
1039 self._addThread(target=self._doLockSet, args=(['three'], 0))
1040 self._addThread(target=self._doLockSet, args=(['two'], 1))
1041 self._addThread(target=self._doAddSet, args=(['nine']))
1042 self.assertRaises(Queue.Empty, self.done.get_nowait)
1046 self.assertEqual(self.done.get(True, 1), 'DONE')
1051 def testConcurrentSetLockAdd(self):
1052 self.ls.acquire('one')
1053 # Another thread wants the whole SetLock
1054 self._addThread(target=self._doLockSet, args=(None, 0))
1055 self._addThread(target=self._doLockSet, args=(None, 1))
1056 self.assertRaises(Queue.Empty, self.done.get_nowait)
1057 self.assertRaises(AssertionError, self.ls.add, 'four')
1060 self.assertEqual(self.done.get_nowait(), 'DONE')
1061 self.assertEqual(self.done.get_nowait(), 'DONE')
1062 self.ls.acquire(None)
1063 self._addThread(target=self._doLockSet, args=(None, 0))
1064 self._addThread(target=self._doLockSet, args=(None, 1))
1065 self.assertRaises(Queue.Empty, self.done.get_nowait)
1067 self.ls.add('five', acquired=1)
1068 self.ls.add('six', acquired=1, shared=1)
1069 self.assertEquals(self.ls._list_owned(),
1070 set(['one', 'two', 'three', 'five', 'six']))
1071 self.assertEquals(self.ls._is_owned(), True)
1072 self.assertEquals(self.ls._names(),
1073 set(['one', 'two', 'three', 'four', 'five', 'six']))
1076 self.assertEqual(self.done.get_nowait(), 'DONE')
1077 self.assertEqual(self.done.get_nowait(), 'DONE')
1081 def testEmptyLockSet(self):
1083 self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1085 self.ls.remove(['one', 'two', 'three'])
1086 # and adds/locks by another thread still wait
1087 self._addThread(target=self._doAddSet, args=(['nine']))
1088 self._addThread(target=self._doLockSet, args=(None, 1))
1089 self._addThread(target=self._doLockSet, args=(None, 0))
1090 self.assertRaises(Queue.Empty, self.done.get_nowait)
1094 self.assertEqual(self.done.get_nowait(), 'DONE')
1096 self.assertEqual(self.ls.remove(['nine']), ['nine'])
1098 self.assertEqual(self.ls.acquire(None, shared=1), set())
1099 # other sharers can go, adds still wait
1100 self._addThread(target=self._doLockSet, args=(None, 1))
1102 self.assertEqual(self.done.get_nowait(), 'DONE')
1103 self._addThread(target=self._doAddSet, args=(['nine']))
1104 self.assertRaises(Queue.Empty, self.done.get_nowait)
1107 self.assertEqual(self.done.get_nowait(), 'DONE')
1111 class TestGanetiLockManager(_ThreadedTestCase):
1114 _ThreadedTestCase.setUp(self)
1115 self.nodes=['n1', 'n2']
1116 self.instances=['i1', 'i2', 'i3']
1117 self.GL = locking.GanetiLockManager(nodes=self.nodes,
1118 instances=self.instances)
1121 # Don't try this at home...
1122 locking.GanetiLockManager._instance = None
1124 def testLockingConstants(self):
1125 # The locking library internally cheats by assuming its constants have some
1126 # relationships with each other. Check those hold true.
1127 # This relationship is also used in the Processor to recursively acquire
1128 # the right locks. Again, please don't break it.
1129 for i in range(len(locking.LEVELS)):
1130 self.assertEqual(i, locking.LEVELS[i])
1132 def testDoubleGLFails(self):
1133 self.assertRaises(AssertionError, locking.GanetiLockManager)
1135 def testLockNames(self):
1136 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1137 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1138 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1139 set(self.instances))
1141 def testInitAndResources(self):
1142 locking.GanetiLockManager._instance = None
1143 self.GL = locking.GanetiLockManager()
1144 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1145 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1146 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1148 locking.GanetiLockManager._instance = None
1149 self.GL = locking.GanetiLockManager(nodes=self.nodes)
1150 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1151 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1152 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1154 locking.GanetiLockManager._instance = None
1155 self.GL = locking.GanetiLockManager(instances=self.instances)
1156 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1157 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1158 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1159 set(self.instances))
1161 def testAcquireRelease(self):
1162 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1163 self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1164 self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1165 self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1166 self.GL.release(locking.LEVEL_NODE, ['n2'])
1167 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1168 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1169 self.GL.release(locking.LEVEL_NODE)
1170 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1171 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1172 self.GL.release(locking.LEVEL_INSTANCE)
1173 self.assertRaises(errors.LockError, self.GL.acquire,
1174 locking.LEVEL_INSTANCE, ['i5'])
1175 self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1176 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1178 def testAcquireWholeSets(self):
1179 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1180 self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1181 set(self.instances))
1182 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1183 set(self.instances))
1184 self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1186 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1188 self.GL.release(locking.LEVEL_NODE)
1189 self.GL.release(locking.LEVEL_INSTANCE)
1190 self.GL.release(locking.LEVEL_CLUSTER)
1192 def testAcquireWholeAndPartial(self):
1193 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1194 self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1195 set(self.instances))
1196 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1197 set(self.instances))
1198 self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1200 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1202 self.GL.release(locking.LEVEL_NODE)
1203 self.GL.release(locking.LEVEL_INSTANCE)
1204 self.GL.release(locking.LEVEL_CLUSTER)
1206 def testBGLDependency(self):
1207 self.assertRaises(AssertionError, self.GL.acquire,
1208 locking.LEVEL_NODE, ['n1', 'n2'])
1209 self.assertRaises(AssertionError, self.GL.acquire,
1210 locking.LEVEL_INSTANCE, ['i3'])
1211 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1212 self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1213 self.assertRaises(AssertionError, self.GL.release,
1214 locking.LEVEL_CLUSTER, ['BGL'])
1215 self.assertRaises(AssertionError, self.GL.release,
1216 locking.LEVEL_CLUSTER)
1217 self.GL.release(locking.LEVEL_NODE)
1218 self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1219 self.assertRaises(AssertionError, self.GL.release,
1220 locking.LEVEL_CLUSTER, ['BGL'])
1221 self.assertRaises(AssertionError, self.GL.release,
1222 locking.LEVEL_CLUSTER)
1223 self.GL.release(locking.LEVEL_INSTANCE)
1225 def testWrongOrder(self):
1226 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1227 self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1228 self.assertRaises(AssertionError, self.GL.acquire,
1229 locking.LEVEL_NODE, ['n1'])
1230 self.assertRaises(AssertionError, self.GL.acquire,
1231 locking.LEVEL_INSTANCE, ['i2'])
1233 # Helper function to run as a thread that shared the BGL and then acquires
1234 # some locks at another level.
1235 def _doLock(self, level, names, shared):
1237 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1238 self.GL.acquire(level, names, shared=shared)
1239 self.done.put('DONE')
1240 self.GL.release(level)
1241 self.GL.release(locking.LEVEL_CLUSTER)
1242 except errors.LockError:
1243 self.done.put('ERR')
1246 def testConcurrency(self):
1247 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1248 self._addThread(target=self._doLock,
1249 args=(locking.LEVEL_INSTANCE, 'i1', 1))
1251 self.assertEqual(self.done.get_nowait(), 'DONE')
1252 self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1253 self._addThread(target=self._doLock,
1254 args=(locking.LEVEL_INSTANCE, 'i1', 1))
1256 self.assertEqual(self.done.get_nowait(), 'DONE')
1257 self._addThread(target=self._doLock,
1258 args=(locking.LEVEL_INSTANCE, 'i3', 1))
1259 self.assertRaises(Queue.Empty, self.done.get_nowait)
1260 self.GL.release(locking.LEVEL_INSTANCE)
1262 self.assertEqual(self.done.get_nowait(), 'DONE')
1263 self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1264 self._addThread(target=self._doLock,
1265 args=(locking.LEVEL_INSTANCE, 'i2', 1))
1267 self.assertEqual(self.done.get_nowait(), 'DONE')
1268 self._addThread(target=self._doLock,
1269 args=(locking.LEVEL_INSTANCE, 'i2', 0))
1270 self.assertRaises(Queue.Empty, self.done.get_nowait)
1271 self.GL.release(locking.LEVEL_INSTANCE)
1273 self.assertEqual(self.done.get(True, 1), 'DONE')
1274 self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1277 if __name__ == '__main__':
1279 #suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock)
1280 #unittest.TextTestRunner(verbosity=2).run(suite)