4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the locking module"""
31 from ganeti import locking
32 from ganeti import errors
35 # This is used to test the ssynchronize decorator.
36 # Since it's passed as input to a decorator it must be declared as a global.
37 _decoratorlock = locking.SharedLock()
39 #: List for looping tests
44 """Decorator for executing a function many times"""
45 def wrapper(*args, **kwargs):
51 class _ThreadedTestCase(unittest.TestCase):
52 """Test class that supports adding/waiting on threads"""
54 unittest.TestCase.setUp(self)
55 self.done = Queue.Queue(0)
58 def _addThread(self, *args, **kwargs):
59 """Create and remember a new thread"""
60 t = threading.Thread(*args, **kwargs)
61 self.threads.append(t)
65 def _waitThreads(self):
66 """Wait for all our threads to finish"""
67 for t in self.threads:
69 self.failIf(t.isAlive())
73 class _ConditionTestCase(_ThreadedTestCase):
74 """Common test case for conditions"""
77 _ThreadedTestCase.setUp(self)
78 self.lock = threading.Lock()
79 self.cond = cls(self.lock)
81 def _testAcquireRelease(self):
82 self.assert_(not self.cond._is_owned())
83 self.assertRaises(RuntimeError, self.cond.wait)
84 self.assertRaises(RuntimeError, self.cond.notifyAll)
87 self.assert_(self.cond._is_owned())
89 self.assert_(self.cond._is_owned())
92 self.assert_(not self.cond._is_owned())
93 self.assertRaises(RuntimeError, self.cond.wait)
94 self.assertRaises(RuntimeError, self.cond.notifyAll)
96 def _testNotification(self):
103 self._addThread(target=_NotifyAll)
105 self.assert_(self.cond._is_owned())
107 self.assert_(not self.cond._is_owned())
110 class TestPipeCondition(_ConditionTestCase):
111 """_PipeCondition tests"""
114 _ConditionTestCase.setUp(self, locking._PipeCondition)
116 def testAcquireRelease(self):
117 self._testAcquireRelease()
119 def testNotification(self):
120 self._testNotification()
122 def _TestWait(self, fn):
123 self._addThread(target=fn)
124 self._addThread(target=fn)
125 self._addThread(target=fn)
127 # Wait for threads to be waiting
128 self.assertEqual(self.done.get(True, 1), "A")
129 self.assertEqual(self.done.get(True, 1), "A")
130 self.assertEqual(self.done.get(True, 1), "A")
132 self.assertRaises(Queue.Empty, self.done.get_nowait)
135 self.assertEqual(self.cond._nwaiters, 3)
136 # This new thread can"t acquire the lock, and thus call wait, before we
138 self._addThread(target=fn)
139 self.cond.notifyAll()
140 self.assertRaises(Queue.Empty, self.done.get_nowait)
143 # We should now get 3 W and 1 A (for the new thread) in whatever order
147 got = self.done.get(True, 1)
153 self.fail("Got %s on the done queue" % got)
155 self.assertEqual(w, 3)
156 self.assertEqual(a, 1)
159 self.cond.notifyAll()
162 self.assertEqual(self.done.get_nowait(), "W")
163 self.assertRaises(Queue.Empty, self.done.get_nowait)
165 def testBlockingWait(self):
173 self._TestWait(_BlockingWait)
175 def testLongTimeoutWait(self):
183 self._TestWait(_Helper)
185 def _TimeoutWait(self, timeout, check):
187 self.cond.wait(timeout)
191 def testShortTimeoutWait(self):
192 self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
193 self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
195 self.assertEqual(self.done.get_nowait(), "T1")
196 self.assertEqual(self.done.get_nowait(), "T1")
197 self.assertRaises(Queue.Empty, self.done.get_nowait)
199 def testZeroTimeoutWait(self):
200 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
201 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
202 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
204 self.assertEqual(self.done.get_nowait(), "T0")
205 self.assertEqual(self.done.get_nowait(), "T0")
206 self.assertEqual(self.done.get_nowait(), "T0")
207 self.assertRaises(Queue.Empty, self.done.get_nowait)
210 class TestSingleActionPipeCondition(unittest.TestCase):
211 """_SingleActionPipeCondition tests"""
214 self.cond = locking._SingleActionPipeCondition()
216 def testInitialization(self):
217 self.assert_(self.cond._read_fd is not None)
218 self.assert_(self.cond._write_fd is not None)
219 self.assert_(self.cond._poller is not None)
220 self.assertEqual(self.cond._nwaiters, 0)
222 def testUsageCount(self):
223 self.cond.StartWaiting()
224 self.assert_(self.cond._read_fd is not None)
225 self.assert_(self.cond._write_fd is not None)
226 self.assert_(self.cond._poller is not None)
227 self.assertEqual(self.cond._nwaiters, 1)
230 self.cond.StartWaiting()
231 self.assertEqual(self.cond._nwaiters, 2)
233 # there is more than one user
234 self.assert_(not self.cond.DoneWaiting())
235 self.assert_(self.cond._read_fd is not None)
236 self.assert_(self.cond._write_fd is not None)
237 self.assert_(self.cond._poller is not None)
238 self.assertEqual(self.cond._nwaiters, 1)
240 self.assert_(self.cond.DoneWaiting())
241 self.assertEqual(self.cond._nwaiters, 0)
242 self.assert_(self.cond._read_fd is None)
243 self.assert_(self.cond._write_fd is None)
244 self.assert_(self.cond._poller is None)
246 def testNotify(self):
247 wait1 = self.cond.StartWaiting()
248 wait2 = self.cond.StartWaiting()
250 self.assert_(self.cond._read_fd is not None)
251 self.assert_(self.cond._write_fd is not None)
252 self.assert_(self.cond._poller is not None)
254 self.cond.notifyAll()
256 self.assert_(self.cond._read_fd is not None)
257 self.assert_(self.cond._write_fd is None)
258 self.assert_(self.cond._poller is not None)
260 self.assert_(not self.cond.DoneWaiting())
262 self.assert_(self.cond._read_fd is not None)
263 self.assert_(self.cond._write_fd is None)
264 self.assert_(self.cond._poller is not None)
266 self.assert_(self.cond.DoneWaiting())
268 self.assert_(self.cond._read_fd is None)
269 self.assert_(self.cond._write_fd is None)
270 self.assert_(self.cond._poller is None)
272 def testReusage(self):
273 self.cond.StartWaiting()
274 self.assert_(self.cond._read_fd is not None)
275 self.assert_(self.cond._write_fd is not None)
276 self.assert_(self.cond._poller is not None)
278 self.assert_(self.cond.DoneWaiting())
280 self.assertRaises(RuntimeError, self.cond.StartWaiting)
281 self.assert_(self.cond._read_fd is None)
282 self.assert_(self.cond._write_fd is None)
283 self.assert_(self.cond._poller is None)
285 def testNotifyTwice(self):
286 self.cond.notifyAll()
287 self.assertRaises(RuntimeError, self.cond.notifyAll)
290 class TestSharedLock(_ThreadedTestCase):
291 """SharedLock tests"""
294 _ThreadedTestCase.setUp(self)
295 self.sl = locking.SharedLock()
297 def testSequenceAndOwnership(self):
298 self.assert_(not self.sl._is_owned())
299 self.sl.acquire(shared=1)
300 self.assert_(self.sl._is_owned())
301 self.assert_(self.sl._is_owned(shared=1))
302 self.assert_(not self.sl._is_owned(shared=0))
304 self.assert_(not self.sl._is_owned())
306 self.assert_(self.sl._is_owned())
307 self.assert_(not self.sl._is_owned(shared=1))
308 self.assert_(self.sl._is_owned(shared=0))
310 self.assert_(not self.sl._is_owned())
311 self.sl.acquire(shared=1)
312 self.assert_(self.sl._is_owned())
313 self.assert_(self.sl._is_owned(shared=1))
314 self.assert_(not self.sl._is_owned(shared=0))
316 self.assert_(not self.sl._is_owned())
318 def testBooleanValue(self):
319 # semaphores are supposed to return a true value on a successful acquire
320 self.assert_(self.sl.acquire(shared=1))
322 self.assert_(self.sl.acquire())
325 def testDoubleLockingStoE(self):
326 self.sl.acquire(shared=1)
327 self.assertRaises(AssertionError, self.sl.acquire)
329 def testDoubleLockingEtoS(self):
331 self.assertRaises(AssertionError, self.sl.acquire, shared=1)
333 def testDoubleLockingStoS(self):
334 self.sl.acquire(shared=1)
335 self.assertRaises(AssertionError, self.sl.acquire, shared=1)
337 def testDoubleLockingEtoE(self):
339 self.assertRaises(AssertionError, self.sl.acquire)
341 # helper functions: called in a separate thread they acquire the lock, send
342 # their identifier on the done queue, then release it.
343 def _doItSharer(self):
345 self.sl.acquire(shared=1)
348 except errors.LockError:
351 def _doItExclusive(self):
356 except errors.LockError:
359 def _doItDelete(self):
363 except errors.LockError:
366 def testSharersCanCoexist(self):
367 self.sl.acquire(shared=1)
368 threading.Thread(target=self._doItSharer).start()
369 self.assert_(self.done.get(True, 1))
373 def testExclusiveBlocksExclusive(self):
375 self._addThread(target=self._doItExclusive)
376 self.assertRaises(Queue.Empty, self.done.get_nowait)
379 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
382 def testExclusiveBlocksDelete(self):
384 self._addThread(target=self._doItDelete)
385 self.assertRaises(Queue.Empty, self.done.get_nowait)
388 self.failUnlessEqual(self.done.get_nowait(), 'DEL')
389 self.sl = locking.SharedLock()
392 def testExclusiveBlocksSharer(self):
394 self._addThread(target=self._doItSharer)
395 self.assertRaises(Queue.Empty, self.done.get_nowait)
398 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
401 def testSharerBlocksExclusive(self):
402 self.sl.acquire(shared=1)
403 self._addThread(target=self._doItExclusive)
404 self.assertRaises(Queue.Empty, self.done.get_nowait)
407 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
410 def testSharerBlocksDelete(self):
411 self.sl.acquire(shared=1)
412 self._addThread(target=self._doItDelete)
413 self.assertRaises(Queue.Empty, self.done.get_nowait)
416 self.failUnlessEqual(self.done.get_nowait(), 'DEL')
417 self.sl = locking.SharedLock()
420 def testWaitingExclusiveBlocksSharer(self):
421 """SKIPPED testWaitingExclusiveBlockSharer"""
424 self.sl.acquire(shared=1)
425 # the lock is acquired in shared mode...
426 self._addThread(target=self._doItExclusive)
427 # ...but now an exclusive is waiting...
428 self._addThread(target=self._doItSharer)
429 # ...so the sharer should be blocked as well
430 self.assertRaises(Queue.Empty, self.done.get_nowait)
433 # The exclusive passed before
434 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
435 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
438 def testWaitingSharerBlocksExclusive(self):
439 """SKIPPED testWaitingSharerBlocksExclusive"""
443 # the lock is acquired in exclusive mode...
444 self._addThread(target=self._doItSharer)
445 # ...but now a sharer is waiting...
446 self._addThread(target=self._doItExclusive)
447 # ...the exclusive is waiting too...
448 self.assertRaises(Queue.Empty, self.done.get_nowait)
451 # The sharer passed before
452 self.assertEqual(self.done.get_nowait(), 'SHR')
453 self.assertEqual(self.done.get_nowait(), 'EXC')
455 def testDelete(self):
457 self.assertRaises(errors.LockError, self.sl.acquire)
458 self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
459 self.assertRaises(errors.LockError, self.sl.delete)
461 def testDeleteTimeout(self):
462 self.sl.delete(timeout=60)
464 def testNoDeleteIfSharer(self):
465 self.sl.acquire(shared=1)
466 self.assertRaises(AssertionError, self.sl.delete)
469 def testDeletePendingSharersExclusiveDelete(self):
471 self._addThread(target=self._doItSharer)
472 self._addThread(target=self._doItSharer)
473 self._addThread(target=self._doItExclusive)
474 self._addThread(target=self._doItDelete)
477 # The threads who were pending return ERR
479 self.assertEqual(self.done.get_nowait(), 'ERR')
480 self.sl = locking.SharedLock()
483 def testDeletePendingDeleteExclusiveSharers(self):
485 self._addThread(target=self._doItDelete)
486 self._addThread(target=self._doItExclusive)
487 self._addThread(target=self._doItSharer)
488 self._addThread(target=self._doItSharer)
491 # The two threads who were pending return both ERR
492 self.assertEqual(self.done.get_nowait(), 'ERR')
493 self.assertEqual(self.done.get_nowait(), 'ERR')
494 self.assertEqual(self.done.get_nowait(), 'ERR')
495 self.assertEqual(self.done.get_nowait(), 'ERR')
496 self.sl = locking.SharedLock()
499 def testExclusiveAcquireTimeout(self):
500 def _LockExclusive(wait):
501 self.sl.acquire(shared=0)
502 self.done.put("A: start sleep")
504 self.done.put("A: end sleep")
507 for shared in [0, 1]:
508 # Start thread to hold lock for 20 ms
509 self._addThread(target=_LockExclusive, args=(20.0 / 1000.0, ))
511 # Wait for sleep to begin
512 self.assertEqual(self.done.get(), "A: start sleep")
514 # Wait up to 100 ms to get lock
515 self.failUnless(self.sl.acquire(shared=shared, timeout=0.1))
516 self.done.put("got 2nd")
521 self.assertEqual(self.done.get_nowait(), "A: end sleep")
522 self.assertEqual(self.done.get_nowait(), "got 2nd")
523 self.assertRaises(Queue.Empty, self.done.get_nowait)
526 def testAcquireExpiringTimeout(self):
527 def _AcquireWithTimeout(shared, timeout):
528 if not self.sl.acquire(shared=shared, timeout=timeout):
529 self.done.put("timeout")
531 for shared in [0, 1]:
535 # Start shared acquires with timeout between 0 and 20 ms
537 self._addThread(target=_AcquireWithTimeout,
538 args=(shared, i * 2.0 / 1000.0))
540 # Wait for threads to finish (makes sure the acquire timeout expires
541 # before releasing the lock)
548 self.assertEqual(self.done.get_nowait(), "timeout")
550 self.assertRaises(Queue.Empty, self.done.get_nowait)
553 def testSharedSkipExclusiveAcquires(self):
554 # Tests whether shared acquires jump in front of exclusive acquires in the
557 # Get exclusive lock while we fill the queue
560 def _Acquire(shared, name):
561 if not self.sl.acquire(shared=shared):
567 # Start shared acquires
569 self._addThread(target=_Acquire, args=(1, "shared A"))
571 # Start exclusive acquires
573 self._addThread(target=_Acquire, args=(0, "exclusive B"))
575 # More shared acquires
577 self._addThread(target=_Acquire, args=(1, "shared C"))
579 # More exclusive acquires
581 self._addThread(target=_Acquire, args=(0, "exclusive D"))
583 # Expect 6 pending exclusive acquires and 1 for all shared acquires
584 # together. There's no way to wait for SharedLock.acquire to start
585 # its work. Hence the timeout of 2 seconds.
587 end_time = time.time() + 2.0
588 while time.time() < end_time:
589 pending = self.sl._count_pending()
590 self.assert_(pending >= 0 and pending <= 7)
594 self.assertEqual(pending, 7)
596 # Release exclusive lock and wait
605 # Shared locks aren't guaranteed to be notified in order, but they'll be
607 tmp = self.done.get_nowait()
608 if tmp == "shared A":
610 elif tmp == "shared C":
612 self.assertEqual(shr_a, 5)
613 self.assertEqual(shr_c, 5)
616 self.assertEqual(self.done.get_nowait(), "exclusive B")
619 self.assertEqual(self.done.get_nowait(), "exclusive D")
621 self.assertRaises(Queue.Empty, self.done.get_nowait)
624 def testMixedAcquireTimeout(self):
625 sync = threading.Condition()
627 def _AcquireShared(ev):
628 if not self.sl.acquire(shared=1, timeout=None):
631 self.done.put("shared")
636 # Wait for notification
648 ev = threading.Event()
649 self._addThread(target=_AcquireShared, args=(ev, ))
652 # Wait for all acquires to finish
656 self.assertEqual(self.sl._count_pending(), 0)
658 # Try to get exclusive lock
659 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
661 # Acquire exclusive without timeout
662 exclsync = threading.Condition()
663 exclev = threading.Event()
665 def _AcquireExclusive():
666 if not self.sl.acquire(shared=0):
669 self.done.put("exclusive")
682 self._addThread(target=_AcquireExclusive)
684 # Try to get exclusive lock
685 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
687 # Make all shared holders release their locks
694 # Wait for exclusive acquire to succeed
697 self.assertEqual(self.sl._count_pending(), 0)
699 # Try to get exclusive lock
700 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
702 def _AcquireSharedSimple():
703 if self.sl.acquire(shared=1, timeout=None):
704 self.done.put("shared2")
708 self._addThread(target=_AcquireSharedSimple)
710 # Tell exclusive lock to release
717 # Wait for everything to finish
720 self.assertEqual(self.sl._count_pending(), 0)
724 self.assertEqual(self.done.get_nowait(), "shared")
726 self.assertEqual(self.done.get_nowait(), "exclusive")
729 self.assertEqual(self.done.get_nowait(), "shared2")
731 self.assertRaises(Queue.Empty, self.done.get_nowait)
734 class TestSSynchronizedDecorator(_ThreadedTestCase):
735 """Shared Lock Synchronized decorator test"""
738 _ThreadedTestCase.setUp(self)
740 @locking.ssynchronized(_decoratorlock)
741 def _doItExclusive(self):
742 self.assert_(_decoratorlock._is_owned())
745 @locking.ssynchronized(_decoratorlock, shared=1)
746 def _doItSharer(self):
747 self.assert_(_decoratorlock._is_owned(shared=1))
750 def testDecoratedFunctions(self):
751 self._doItExclusive()
752 self.assert_(not _decoratorlock._is_owned())
754 self.assert_(not _decoratorlock._is_owned())
756 def testSharersCanCoexist(self):
757 _decoratorlock.acquire(shared=1)
758 threading.Thread(target=self._doItSharer).start()
759 self.assert_(self.done.get(True, 1))
760 _decoratorlock.release()
763 def testExclusiveBlocksExclusive(self):
764 _decoratorlock.acquire()
765 self._addThread(target=self._doItExclusive)
766 # give it a bit of time to check that it's not actually doing anything
767 self.assertRaises(Queue.Empty, self.done.get_nowait)
768 _decoratorlock.release()
770 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
773 def testExclusiveBlocksSharer(self):
774 _decoratorlock.acquire()
775 self._addThread(target=self._doItSharer)
776 self.assertRaises(Queue.Empty, self.done.get_nowait)
777 _decoratorlock.release()
779 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
782 def testSharerBlocksExclusive(self):
783 _decoratorlock.acquire(shared=1)
784 self._addThread(target=self._doItExclusive)
785 self.assertRaises(Queue.Empty, self.done.get_nowait)
786 _decoratorlock.release()
788 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
791 class TestLockSet(_ThreadedTestCase):
795 _ThreadedTestCase.setUp(self)
799 """Helper to (re)initialize the lock set"""
800 self.resources = ['one', 'two', 'three']
801 self.ls = locking.LockSet(members=self.resources)
803 def testResources(self):
804 self.assertEquals(self.ls._names(), set(self.resources))
805 newls = locking.LockSet()
806 self.assertEquals(newls._names(), set())
808 def testAcquireRelease(self):
809 self.assert_(self.ls.acquire('one'))
810 self.assertEquals(self.ls._list_owned(), set(['one']))
812 self.assertEquals(self.ls._list_owned(), set())
813 self.assertEquals(self.ls.acquire(['one']), set(['one']))
814 self.assertEquals(self.ls._list_owned(), set(['one']))
816 self.assertEquals(self.ls._list_owned(), set())
817 self.ls.acquire(['one', 'two', 'three'])
818 self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
819 self.ls.release('one')
820 self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
821 self.ls.release(['three'])
822 self.assertEquals(self.ls._list_owned(), set(['two']))
824 self.assertEquals(self.ls._list_owned(), set())
825 self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
826 self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
828 self.assertEquals(self.ls._list_owned(), set())
830 def testNoDoubleAcquire(self):
831 self.ls.acquire('one')
832 self.assertRaises(AssertionError, self.ls.acquire, 'one')
833 self.assertRaises(AssertionError, self.ls.acquire, ['two'])
834 self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
836 self.ls.acquire(['one', 'three'])
837 self.ls.release('one')
838 self.assertRaises(AssertionError, self.ls.acquire, ['two'])
839 self.ls.release('three')
841 def testNoWrongRelease(self):
842 self.assertRaises(AssertionError, self.ls.release)
843 self.ls.acquire('one')
844 self.assertRaises(AssertionError, self.ls.release, 'two')
846 def testAddRemove(self):
848 self.assertEquals(self.ls._list_owned(), set())
849 self.assert_('four' in self.ls._names())
850 self.ls.add(['five', 'six', 'seven'], acquired=1)
851 self.assert_('five' in self.ls._names())
852 self.assert_('six' in self.ls._names())
853 self.assert_('seven' in self.ls._names())
854 self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
855 self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
856 self.assert_('five' not in self.ls._names())
857 self.assert_('six' not in self.ls._names())
858 self.assertEquals(self.ls._list_owned(), set(['seven']))
859 self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
860 self.ls.remove('seven')
861 self.assert_('seven' not in self.ls._names())
862 self.assertEquals(self.ls._list_owned(), set([]))
863 self.ls.acquire(None, shared=1)
864 self.assertRaises(AssertionError, self.ls.add, 'eight')
866 self.ls.acquire(None)
867 self.ls.add('eight', acquired=1)
868 self.assert_('eight' in self.ls._names())
869 self.assert_('eight' in self.ls._list_owned())
871 self.assert_('nine' in self.ls._names())
872 self.assert_('nine' not in self.ls._list_owned())
874 self.ls.remove(['two'])
875 self.assert_('two' not in self.ls._names())
876 self.ls.acquire('three')
877 self.assertEquals(self.ls.remove(['three']), ['three'])
878 self.assert_('three' not in self.ls._names())
879 self.assertEquals(self.ls.remove('three'), [])
880 self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
881 self.assert_('one' not in self.ls._names())
883 def testRemoveNonBlocking(self):
884 self.ls.acquire('one')
885 self.assertEquals(self.ls.remove('one'), ['one'])
886 self.ls.acquire(['two', 'three'])
887 self.assertEquals(self.ls.remove(['two', 'three']),
890 def testNoDoubleAdd(self):
891 self.assertRaises(errors.LockError, self.ls.add, 'two')
893 self.assertRaises(errors.LockError, self.ls.add, 'four')
895 def testNoWrongRemoves(self):
896 self.ls.acquire(['one', 'three'], shared=1)
897 # Cannot remove 'two' while holding something which is not a superset
898 self.assertRaises(AssertionError, self.ls.remove, 'two')
899 # Cannot remove 'three' as we are sharing it
900 self.assertRaises(AssertionError, self.ls.remove, 'three')
902 def testAcquireSetLock(self):
903 # acquire the set-lock exclusively
904 self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
905 self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
906 self.assertEquals(self.ls._is_owned(), True)
907 self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
908 # I can still add/remove elements...
909 self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
910 self.assert_(self.ls.add('six'))
913 self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
914 # adding new elements is not possible
915 self.assertRaises(AssertionError, self.ls.add, 'five')
918 def testAcquireWithRepetitions(self):
919 self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
920 set(['two', 'two', 'three']))
921 self.ls.release(['two', 'two'])
922 self.assertEquals(self.ls._list_owned(), set(['three']))
924 def testEmptyAcquire(self):
925 # Acquire an empty list of locks...
926 self.assertEquals(self.ls.acquire([]), set())
927 self.assertEquals(self.ls._list_owned(), set())
928 # New locks can still be addded
929 self.assert_(self.ls.add('six'))
930 # "re-acquiring" is not an issue, since we had really acquired nothing
931 self.assertEquals(self.ls.acquire([], shared=1), set())
932 self.assertEquals(self.ls._list_owned(), set())
933 # We haven't really acquired anything, so we cannot release
934 self.assertRaises(AssertionError, self.ls.release)
936 def _doLockSet(self, names, shared):
938 self.ls.acquire(names, shared=shared)
939 self.done.put('DONE')
941 except errors.LockError:
944 def _doAddSet(self, names):
946 self.ls.add(names, acquired=1)
947 self.done.put('DONE')
949 except errors.LockError:
952 def _doRemoveSet(self, names):
953 self.done.put(self.ls.remove(names))
956 def testConcurrentSharedAcquire(self):
957 self.ls.acquire(['one', 'two'], shared=1)
958 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
960 self.assertEqual(self.done.get_nowait(), 'DONE')
961 self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
963 self.assertEqual(self.done.get_nowait(), 'DONE')
964 self._addThread(target=self._doLockSet, args=('three', 1))
966 self.assertEqual(self.done.get_nowait(), 'DONE')
967 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
968 self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
969 self.assertRaises(Queue.Empty, self.done.get_nowait)
972 self.assertEqual(self.done.get_nowait(), 'DONE')
973 self.assertEqual(self.done.get_nowait(), 'DONE')
976 def testConcurrentExclusiveAcquire(self):
977 self.ls.acquire(['one', 'two'])
978 self._addThread(target=self._doLockSet, args=('three', 1))
980 self.assertEqual(self.done.get_nowait(), 'DONE')
981 self._addThread(target=self._doLockSet, args=('three', 0))
983 self.assertEqual(self.done.get_nowait(), 'DONE')
984 self.assertRaises(Queue.Empty, self.done.get_nowait)
985 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
986 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
987 self._addThread(target=self._doLockSet, args=('one', 0))
988 self._addThread(target=self._doLockSet, args=('one', 1))
989 self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
990 self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
991 self.assertRaises(Queue.Empty, self.done.get_nowait)
995 self.failUnlessEqual(self.done.get_nowait(), 'DONE')
998 def testConcurrentRemove(self):
1000 self.ls.acquire(['one', 'two', 'four'])
1001 self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1002 self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1003 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1004 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1005 self.assertRaises(Queue.Empty, self.done.get_nowait)
1006 self.ls.remove('one')
1010 self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1011 self.ls.add(['five', 'six'], acquired=1)
1012 self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1013 self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1014 self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1015 self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1016 self.ls.remove('five')
1020 self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1021 self.ls.acquire(['three', 'four'])
1022 self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1023 self.assertRaises(Queue.Empty, self.done.get_nowait)
1024 self.ls.remove('four')
1026 self.assertEqual(self.done.get_nowait(), ['six'])
1027 self._addThread(target=self._doRemoveSet, args=(['two']))
1029 self.assertEqual(self.done.get_nowait(), ['two'])
1035 def testConcurrentSharedSetLock(self):
1036 # share the set-lock...
1037 self.ls.acquire(None, shared=1)
1038 # ...another thread can share it too
1039 self._addThread(target=self._doLockSet, args=(None, 1))
1041 self.assertEqual(self.done.get_nowait(), 'DONE')
1042 # ...or just share some elements
1043 self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1045 self.assertEqual(self.done.get_nowait(), 'DONE')
1046 # ...but not add new ones or remove any
1047 t = self._addThread(target=self._doAddSet, args=(['nine']))
1048 self._addThread(target=self._doRemoveSet, args=(['two'], ))
1049 self.assertRaises(Queue.Empty, self.done.get_nowait)
1050 # this just releases the set-lock
1053 self.assertEqual(self.done.get_nowait(), 'DONE')
1054 # release the lock on the actual elements so remove() can proceed too
1057 self.failUnlessEqual(self.done.get_nowait(), ['two'])
1062 def testConcurrentExclusiveSetLock(self):
1063 # acquire the set-lock...
1064 self.ls.acquire(None, shared=0)
1065 # ...no one can do anything else
1066 self._addThread(target=self._doLockSet, args=(None, 1))
1067 self._addThread(target=self._doLockSet, args=(None, 0))
1068 self._addThread(target=self._doLockSet, args=(['three'], 0))
1069 self._addThread(target=self._doLockSet, args=(['two'], 1))
1070 self._addThread(target=self._doAddSet, args=(['nine']))
1071 self.assertRaises(Queue.Empty, self.done.get_nowait)
1075 self.assertEqual(self.done.get(True, 1), 'DONE')
1080 def testConcurrentSetLockAdd(self):
1081 self.ls.acquire('one')
1082 # Another thread wants the whole SetLock
1083 self._addThread(target=self._doLockSet, args=(None, 0))
1084 self._addThread(target=self._doLockSet, args=(None, 1))
1085 self.assertRaises(Queue.Empty, self.done.get_nowait)
1086 self.assertRaises(AssertionError, self.ls.add, 'four')
1089 self.assertEqual(self.done.get_nowait(), 'DONE')
1090 self.assertEqual(self.done.get_nowait(), 'DONE')
1091 self.ls.acquire(None)
1092 self._addThread(target=self._doLockSet, args=(None, 0))
1093 self._addThread(target=self._doLockSet, args=(None, 1))
1094 self.assertRaises(Queue.Empty, self.done.get_nowait)
1096 self.ls.add('five', acquired=1)
1097 self.ls.add('six', acquired=1, shared=1)
1098 self.assertEquals(self.ls._list_owned(),
1099 set(['one', 'two', 'three', 'five', 'six']))
1100 self.assertEquals(self.ls._is_owned(), True)
1101 self.assertEquals(self.ls._names(),
1102 set(['one', 'two', 'three', 'four', 'five', 'six']))
1105 self.assertEqual(self.done.get_nowait(), 'DONE')
1106 self.assertEqual(self.done.get_nowait(), 'DONE')
1110 def testEmptyLockSet(self):
1112 self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1114 self.ls.remove(['one', 'two', 'three'])
1115 # and adds/locks by another thread still wait
1116 self._addThread(target=self._doAddSet, args=(['nine']))
1117 self._addThread(target=self._doLockSet, args=(None, 1))
1118 self._addThread(target=self._doLockSet, args=(None, 0))
1119 self.assertRaises(Queue.Empty, self.done.get_nowait)
1123 self.assertEqual(self.done.get_nowait(), 'DONE')
1125 self.assertEqual(self.ls.remove(['nine']), ['nine'])
1127 self.assertEqual(self.ls.acquire(None, shared=1), set())
1128 # other sharers can go, adds still wait
1129 self._addThread(target=self._doLockSet, args=(None, 1))
1131 self.assertEqual(self.done.get_nowait(), 'DONE')
1132 self._addThread(target=self._doAddSet, args=(['nine']))
1133 self.assertRaises(Queue.Empty, self.done.get_nowait)
1136 self.assertEqual(self.done.get_nowait(), 'DONE')
1140 class TestGanetiLockManager(_ThreadedTestCase):
1143 _ThreadedTestCase.setUp(self)
1144 self.nodes=['n1', 'n2']
1145 self.instances=['i1', 'i2', 'i3']
1146 self.GL = locking.GanetiLockManager(nodes=self.nodes,
1147 instances=self.instances)
1150 # Don't try this at home...
1151 locking.GanetiLockManager._instance = None
1153 def testLockingConstants(self):
1154 # The locking library internally cheats by assuming its constants have some
1155 # relationships with each other. Check those hold true.
1156 # This relationship is also used in the Processor to recursively acquire
1157 # the right locks. Again, please don't break it.
1158 for i in range(len(locking.LEVELS)):
1159 self.assertEqual(i, locking.LEVELS[i])
1161 def testDoubleGLFails(self):
1162 self.assertRaises(AssertionError, locking.GanetiLockManager)
1164 def testLockNames(self):
1165 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1166 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1167 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1168 set(self.instances))
1170 def testInitAndResources(self):
1171 locking.GanetiLockManager._instance = None
1172 self.GL = locking.GanetiLockManager()
1173 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1174 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1175 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1177 locking.GanetiLockManager._instance = None
1178 self.GL = locking.GanetiLockManager(nodes=self.nodes)
1179 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1180 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1181 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1183 locking.GanetiLockManager._instance = None
1184 self.GL = locking.GanetiLockManager(instances=self.instances)
1185 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1186 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1187 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1188 set(self.instances))
1190 def testAcquireRelease(self):
1191 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1192 self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1193 self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1194 self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1195 self.GL.release(locking.LEVEL_NODE, ['n2'])
1196 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1197 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1198 self.GL.release(locking.LEVEL_NODE)
1199 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1200 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1201 self.GL.release(locking.LEVEL_INSTANCE)
1202 self.assertRaises(errors.LockError, self.GL.acquire,
1203 locking.LEVEL_INSTANCE, ['i5'])
1204 self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1205 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1207 def testAcquireWholeSets(self):
1208 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1209 self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1210 set(self.instances))
1211 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1212 set(self.instances))
1213 self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1215 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1217 self.GL.release(locking.LEVEL_NODE)
1218 self.GL.release(locking.LEVEL_INSTANCE)
1219 self.GL.release(locking.LEVEL_CLUSTER)
1221 def testAcquireWholeAndPartial(self):
1222 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1223 self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1224 set(self.instances))
1225 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1226 set(self.instances))
1227 self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1229 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1231 self.GL.release(locking.LEVEL_NODE)
1232 self.GL.release(locking.LEVEL_INSTANCE)
1233 self.GL.release(locking.LEVEL_CLUSTER)
1235 def testBGLDependency(self):
1236 self.assertRaises(AssertionError, self.GL.acquire,
1237 locking.LEVEL_NODE, ['n1', 'n2'])
1238 self.assertRaises(AssertionError, self.GL.acquire,
1239 locking.LEVEL_INSTANCE, ['i3'])
1240 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1241 self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1242 self.assertRaises(AssertionError, self.GL.release,
1243 locking.LEVEL_CLUSTER, ['BGL'])
1244 self.assertRaises(AssertionError, self.GL.release,
1245 locking.LEVEL_CLUSTER)
1246 self.GL.release(locking.LEVEL_NODE)
1247 self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1248 self.assertRaises(AssertionError, self.GL.release,
1249 locking.LEVEL_CLUSTER, ['BGL'])
1250 self.assertRaises(AssertionError, self.GL.release,
1251 locking.LEVEL_CLUSTER)
1252 self.GL.release(locking.LEVEL_INSTANCE)
1254 def testWrongOrder(self):
1255 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1256 self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1257 self.assertRaises(AssertionError, self.GL.acquire,
1258 locking.LEVEL_NODE, ['n1'])
1259 self.assertRaises(AssertionError, self.GL.acquire,
1260 locking.LEVEL_INSTANCE, ['i2'])
1262 # Helper function to run as a thread that shared the BGL and then acquires
1263 # some locks at another level.
1264 def _doLock(self, level, names, shared):
1266 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1267 self.GL.acquire(level, names, shared=shared)
1268 self.done.put('DONE')
1269 self.GL.release(level)
1270 self.GL.release(locking.LEVEL_CLUSTER)
1271 except errors.LockError:
1272 self.done.put('ERR')
1275 def testConcurrency(self):
1276 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1277 self._addThread(target=self._doLock,
1278 args=(locking.LEVEL_INSTANCE, 'i1', 1))
1280 self.assertEqual(self.done.get_nowait(), 'DONE')
1281 self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1282 self._addThread(target=self._doLock,
1283 args=(locking.LEVEL_INSTANCE, 'i1', 1))
1285 self.assertEqual(self.done.get_nowait(), 'DONE')
1286 self._addThread(target=self._doLock,
1287 args=(locking.LEVEL_INSTANCE, 'i3', 1))
1288 self.assertRaises(Queue.Empty, self.done.get_nowait)
1289 self.GL.release(locking.LEVEL_INSTANCE)
1291 self.assertEqual(self.done.get_nowait(), 'DONE')
1292 self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1293 self._addThread(target=self._doLock,
1294 args=(locking.LEVEL_INSTANCE, 'i2', 1))
1296 self.assertEqual(self.done.get_nowait(), 'DONE')
1297 self._addThread(target=self._doLock,
1298 args=(locking.LEVEL_INSTANCE, 'i2', 0))
1299 self.assertRaises(Queue.Empty, self.done.get_nowait)
1300 self.GL.release(locking.LEVEL_INSTANCE)
1302 self.assertEqual(self.done.get(True, 1), 'DONE')
1303 self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1306 if __name__ == '__main__':
1308 #suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock)
1309 #unittest.TextTestRunner(verbosity=2).run(suite)