4 # Copyright (C) 2006, 2007, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the locking module"""
32 from ganeti import locking
33 from ganeti import errors
34 from ganeti import utils
39 # This is used to test the ssynchronize decorator.
40 # Since it's passed as input to a decorator it must be declared as a global.
41 _decoratorlock = locking.SharedLock("decorator lock")
43 #: List for looping tests
48 """Decorator for executing a function many times"""
49 def wrapper(*args, **kwargs):
55 def SafeSleep(duration):
58 delay = start + duration - time.time()
64 class _ThreadedTestCase(unittest.TestCase):
65 """Test class that supports adding/waiting on threads"""
67 unittest.TestCase.setUp(self)
68 self.done = Queue.Queue(0)
71 def _addThread(self, *args, **kwargs):
72 """Create and remember a new thread"""
73 t = threading.Thread(*args, **kwargs)
74 self.threads.append(t)
78 def _waitThreads(self):
79 """Wait for all our threads to finish"""
80 for t in self.threads:
82 self.failIf(t.isAlive())
86 class _ConditionTestCase(_ThreadedTestCase):
87 """Common test case for conditions"""
90 _ThreadedTestCase.setUp(self)
91 self.lock = threading.Lock()
92 self.cond = cls(self.lock)
94 def _testAcquireRelease(self):
95 self.assertFalse(self.cond._is_owned())
96 self.assertRaises(RuntimeError, self.cond.wait)
97 self.assertRaises(RuntimeError, self.cond.notifyAll)
100 self.assert_(self.cond._is_owned())
101 self.cond.notifyAll()
102 self.assert_(self.cond._is_owned())
105 self.assertFalse(self.cond._is_owned())
106 self.assertRaises(RuntimeError, self.cond.wait)
107 self.assertRaises(RuntimeError, self.cond.notifyAll)
109 def _testNotification(self):
114 self.cond.notifyAll()
119 self._addThread(target=_NotifyAll)
120 self.assertEqual(self.done.get(True, 1), "NE")
121 self.assertRaises(Queue.Empty, self.done.get_nowait)
123 self.assertEqual(self.done.get(True, 1), "NA")
124 self.assertEqual(self.done.get(True, 1), "NN")
125 self.assert_(self.cond._is_owned())
127 self.assertFalse(self.cond._is_owned())
130 class TestSingleNotifyPipeCondition(_ConditionTestCase):
131 """SingleNotifyPipeCondition tests"""
134 _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
136 def testAcquireRelease(self):
137 self._testAcquireRelease()
139 def testNotification(self):
140 self._testNotification()
142 def testWaitReuse(self):
148 def testNoNotifyReuse(self):
150 self.cond.notifyAll()
151 self.assertRaises(RuntimeError, self.cond.wait)
152 self.assertRaises(RuntimeError, self.cond.notifyAll)
156 class TestPipeCondition(_ConditionTestCase):
157 """PipeCondition tests"""
160 _ConditionTestCase.setUp(self, locking.PipeCondition)
162 def testAcquireRelease(self):
163 self._testAcquireRelease()
165 def testNotification(self):
166 self._testNotification()
168 def _TestWait(self, fn):
169 self._addThread(target=fn)
170 self._addThread(target=fn)
171 self._addThread(target=fn)
173 # Wait for threads to be waiting
174 self.assertEqual(self.done.get(True, 1), "A")
175 self.assertEqual(self.done.get(True, 1), "A")
176 self.assertEqual(self.done.get(True, 1), "A")
178 self.assertRaises(Queue.Empty, self.done.get_nowait)
181 self.assertEqual(self.cond._nwaiters, 3)
182 # This new thread can"t acquire the lock, and thus call wait, before we
184 self._addThread(target=fn)
185 self.cond.notifyAll()
186 self.assertRaises(Queue.Empty, self.done.get_nowait)
189 # We should now get 3 W and 1 A (for the new thread) in whatever order
193 got = self.done.get(True, 1)
199 self.fail("Got %s on the done queue" % got)
201 self.assertEqual(w, 3)
202 self.assertEqual(a, 1)
205 self.cond.notifyAll()
208 self.assertEqual(self.done.get_nowait(), "W")
209 self.assertRaises(Queue.Empty, self.done.get_nowait)
211 def testBlockingWait(self):
219 self._TestWait(_BlockingWait)
221 def testLongTimeoutWait(self):
229 self._TestWait(_Helper)
231 def _TimeoutWait(self, timeout, check):
233 self.cond.wait(timeout)
237 def testShortTimeoutWait(self):
238 self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
239 self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
241 self.assertEqual(self.done.get_nowait(), "T1")
242 self.assertEqual(self.done.get_nowait(), "T1")
243 self.assertRaises(Queue.Empty, self.done.get_nowait)
245 def testZeroTimeoutWait(self):
246 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
247 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
248 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
250 self.assertEqual(self.done.get_nowait(), "T0")
251 self.assertEqual(self.done.get_nowait(), "T0")
252 self.assertEqual(self.done.get_nowait(), "T0")
253 self.assertRaises(Queue.Empty, self.done.get_nowait)
256 class TestSharedLock(_ThreadedTestCase):
257 """SharedLock tests"""
260 _ThreadedTestCase.setUp(self)
261 self.sl = locking.SharedLock("TestSharedLock")
263 def testSequenceAndOwnership(self):
264 self.assertFalse(self.sl._is_owned())
265 self.sl.acquire(shared=1)
266 self.assert_(self.sl._is_owned())
267 self.assert_(self.sl._is_owned(shared=1))
268 self.assertFalse(self.sl._is_owned(shared=0))
270 self.assertFalse(self.sl._is_owned())
272 self.assert_(self.sl._is_owned())
273 self.assertFalse(self.sl._is_owned(shared=1))
274 self.assert_(self.sl._is_owned(shared=0))
276 self.assertFalse(self.sl._is_owned())
277 self.sl.acquire(shared=1)
278 self.assert_(self.sl._is_owned())
279 self.assert_(self.sl._is_owned(shared=1))
280 self.assertFalse(self.sl._is_owned(shared=0))
282 self.assertFalse(self.sl._is_owned())
284 def testBooleanValue(self):
285 # semaphores are supposed to return a true value on a successful acquire
286 self.assert_(self.sl.acquire(shared=1))
288 self.assert_(self.sl.acquire())
291 def testDoubleLockingStoE(self):
292 self.sl.acquire(shared=1)
293 self.assertRaises(AssertionError, self.sl.acquire)
295 def testDoubleLockingEtoS(self):
297 self.assertRaises(AssertionError, self.sl.acquire, shared=1)
299 def testDoubleLockingStoS(self):
300 self.sl.acquire(shared=1)
301 self.assertRaises(AssertionError, self.sl.acquire, shared=1)
303 def testDoubleLockingEtoE(self):
305 self.assertRaises(AssertionError, self.sl.acquire)
307 # helper functions: called in a separate thread they acquire the lock, send
308 # their identifier on the done queue, then release it.
309 def _doItSharer(self):
311 self.sl.acquire(shared=1)
314 except errors.LockError:
317 def _doItExclusive(self):
322 except errors.LockError:
325 def _doItDelete(self):
329 except errors.LockError:
332 def testSharersCanCoexist(self):
333 self.sl.acquire(shared=1)
334 threading.Thread(target=self._doItSharer).start()
335 self.assert_(self.done.get(True, 1))
339 def testExclusiveBlocksExclusive(self):
341 self._addThread(target=self._doItExclusive)
342 self.assertRaises(Queue.Empty, self.done.get_nowait)
345 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
348 def testExclusiveBlocksDelete(self):
350 self._addThread(target=self._doItDelete)
351 self.assertRaises(Queue.Empty, self.done.get_nowait)
354 self.failUnlessEqual(self.done.get_nowait(), 'DEL')
355 self.sl = locking.SharedLock(self.sl.name)
358 def testExclusiveBlocksSharer(self):
360 self._addThread(target=self._doItSharer)
361 self.assertRaises(Queue.Empty, self.done.get_nowait)
364 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
367 def testSharerBlocksExclusive(self):
368 self.sl.acquire(shared=1)
369 self._addThread(target=self._doItExclusive)
370 self.assertRaises(Queue.Empty, self.done.get_nowait)
373 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
376 def testSharerBlocksDelete(self):
377 self.sl.acquire(shared=1)
378 self._addThread(target=self._doItDelete)
379 self.assertRaises(Queue.Empty, self.done.get_nowait)
382 self.failUnlessEqual(self.done.get_nowait(), 'DEL')
383 self.sl = locking.SharedLock(self.sl.name)
386 def testWaitingExclusiveBlocksSharer(self):
387 """SKIPPED testWaitingExclusiveBlockSharer"""
390 self.sl.acquire(shared=1)
391 # the lock is acquired in shared mode...
392 self._addThread(target=self._doItExclusive)
393 # ...but now an exclusive is waiting...
394 self._addThread(target=self._doItSharer)
395 # ...so the sharer should be blocked as well
396 self.assertRaises(Queue.Empty, self.done.get_nowait)
399 # The exclusive passed before
400 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
401 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
404 def testWaitingSharerBlocksExclusive(self):
405 """SKIPPED testWaitingSharerBlocksExclusive"""
409 # the lock is acquired in exclusive mode...
410 self._addThread(target=self._doItSharer)
411 # ...but now a sharer is waiting...
412 self._addThread(target=self._doItExclusive)
413 # ...the exclusive is waiting too...
414 self.assertRaises(Queue.Empty, self.done.get_nowait)
417 # The sharer passed before
418 self.assertEqual(self.done.get_nowait(), 'SHR')
419 self.assertEqual(self.done.get_nowait(), 'EXC')
421 def testDelete(self):
423 self.assertRaises(errors.LockError, self.sl.acquire)
424 self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
425 self.assertRaises(errors.LockError, self.sl.delete)
427 def testDeleteTimeout(self):
428 self.sl.delete(timeout=60)
430 def testNoDeleteIfSharer(self):
431 self.sl.acquire(shared=1)
432 self.assertRaises(AssertionError, self.sl.delete)
435 def testDeletePendingSharersExclusiveDelete(self):
437 self._addThread(target=self._doItSharer)
438 self._addThread(target=self._doItSharer)
439 self._addThread(target=self._doItExclusive)
440 self._addThread(target=self._doItDelete)
443 # The threads who were pending return ERR
445 self.assertEqual(self.done.get_nowait(), 'ERR')
446 self.sl = locking.SharedLock(self.sl.name)
449 def testDeletePendingDeleteExclusiveSharers(self):
451 self._addThread(target=self._doItDelete)
452 self._addThread(target=self._doItExclusive)
453 self._addThread(target=self._doItSharer)
454 self._addThread(target=self._doItSharer)
457 # The two threads who were pending return both ERR
458 self.assertEqual(self.done.get_nowait(), 'ERR')
459 self.assertEqual(self.done.get_nowait(), 'ERR')
460 self.assertEqual(self.done.get_nowait(), 'ERR')
461 self.assertEqual(self.done.get_nowait(), 'ERR')
462 self.sl = locking.SharedLock(self.sl.name)
465 def testExclusiveAcquireTimeout(self):
466 for shared in [0, 1]:
467 on_queue = threading.Event()
468 release_exclusive = threading.Event()
470 def _LockExclusive():
471 self.sl.acquire(shared=0, test_notify=on_queue.set)
472 self.done.put("A: start wait")
473 release_exclusive.wait()
474 self.done.put("A: end wait")
477 # Start thread to hold lock in exclusive mode
478 self._addThread(target=_LockExclusive)
480 # Wait for wait to begin
481 self.assertEqual(self.done.get(timeout=60), "A: start wait")
483 # Wait up to 60s to get lock, but release exclusive lock as soon as we're
485 self.failUnless(self.sl.acquire(shared=shared, timeout=60,
486 test_notify=release_exclusive.set))
488 self.done.put("got 2nd")
493 self.assertEqual(self.done.get_nowait(), "A: end wait")
494 self.assertEqual(self.done.get_nowait(), "got 2nd")
495 self.assertRaises(Queue.Empty, self.done.get_nowait)
498 def testAcquireExpiringTimeout(self):
499 def _AcquireWithTimeout(shared, timeout):
500 if not self.sl.acquire(shared=shared, timeout=timeout):
501 self.done.put("timeout")
503 for shared in [0, 1]:
507 # Start shared acquires with timeout between 0 and 20 ms
509 self._addThread(target=_AcquireWithTimeout,
510 args=(shared, i * 2.0 / 1000.0))
512 # Wait for threads to finish (makes sure the acquire timeout expires
513 # before releasing the lock)
520 self.assertEqual(self.done.get_nowait(), "timeout")
522 self.assertRaises(Queue.Empty, self.done.get_nowait)
525 def testSharedSkipExclusiveAcquires(self):
526 # Tests whether shared acquires jump in front of exclusive acquires in the
529 def _Acquire(shared, name, notify_ev, wait_ev):
531 notify_fn = notify_ev.set
538 if not self.sl.acquire(shared=shared, test_notify=notify_fn):
544 # Get exclusive lock while we fill the queue
552 # Add acquires using threading.Event for synchronization. They'll be
553 # acquired exactly in the order defined in this list.
554 acquires = (shrcnt1 * [(1, "shared 1")] +
555 3 * [(0, "exclusive 1")] +
556 shrcnt2 * [(1, "shared 2")] +
557 shrcnt3 * [(1, "shared 3")] +
558 shrcnt4 * [(1, "shared 4")] +
559 3 * [(0, "exclusive 2")])
564 for args in acquires:
565 ev_cur = threading.Event()
566 self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
569 # Wait for last acquire to start
572 # Expect 6 pending exclusive acquires and 1 for all shared acquires
574 self.assertEqual(self.sl._count_pending(), 7)
576 # Release exclusive lock and wait
582 for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
583 # Shared locks aren't guaranteed to be notified in order, but they'll be
585 tmp = self.done.get_nowait()
586 if tmp == "shared 1":
588 elif tmp == "shared 2":
590 elif tmp == "shared 3":
592 elif tmp == "shared 4":
594 self.assertEqual(shrcnt1, 0)
595 self.assertEqual(shrcnt2, 0)
596 self.assertEqual(shrcnt3, 0)
597 self.assertEqual(shrcnt3, 0)
600 self.assertEqual(self.done.get_nowait(), "exclusive 1")
603 self.assertEqual(self.done.get_nowait(), "exclusive 2")
605 self.assertRaises(Queue.Empty, self.done.get_nowait)
608 def testMixedAcquireTimeout(self):
609 sync = threading.Event()
611 def _AcquireShared(ev):
612 if not self.sl.acquire(shared=1, timeout=None):
615 self.done.put("shared")
620 # Wait for notification from main thread
628 ev = threading.Event()
629 self._addThread(target=_AcquireShared, args=(ev, ))
632 # Wait for all acquires to finish
636 self.assertEqual(self.sl._count_pending(), 0)
638 # Try to get exclusive lock
639 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
641 # Acquire exclusive without timeout
642 exclsync = threading.Event()
643 exclev = threading.Event()
645 def _AcquireExclusive():
646 if not self.sl.acquire(shared=0):
649 self.done.put("exclusive")
654 # Wait for notification from main thread
659 self._addThread(target=_AcquireExclusive)
661 # Try to get exclusive lock
662 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
664 # Make all shared holders release their locks
667 # Wait for exclusive acquire to succeed
670 self.assertEqual(self.sl._count_pending(), 0)
672 # Try to get exclusive lock
673 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
675 def _AcquireSharedSimple():
676 if self.sl.acquire(shared=1, timeout=None):
677 self.done.put("shared2")
681 self._addThread(target=_AcquireSharedSimple)
683 # Tell exclusive lock to release
686 # Wait for everything to finish
689 self.assertEqual(self.sl._count_pending(), 0)
693 self.assertEqual(self.done.get_nowait(), "shared")
695 self.assertEqual(self.done.get_nowait(), "exclusive")
698 self.assertEqual(self.done.get_nowait(), "shared2")
700 self.assertRaises(Queue.Empty, self.done.get_nowait)
703 class TestSharedLockInCondition(_ThreadedTestCase):
704 """SharedLock as a condition lock tests"""
707 _ThreadedTestCase.setUp(self)
708 self.sl = locking.SharedLock("TestSharedLockInCondition")
711 def setCondition(self):
712 self.cond = threading.Condition(self.sl)
714 def testKeepMode(self):
715 self.cond.acquire(shared=1)
716 self.assert_(self.sl._is_owned(shared=1))
718 self.assert_(self.sl._is_owned(shared=1))
720 self.cond.acquire(shared=0)
721 self.assert_(self.sl._is_owned(shared=0))
723 self.assert_(self.sl._is_owned(shared=0))
727 class TestSharedLockInPipeCondition(TestSharedLockInCondition):
728 """SharedLock as a pipe condition lock tests"""
730 def setCondition(self):
731 self.cond = locking.PipeCondition(self.sl)
734 class TestSSynchronizedDecorator(_ThreadedTestCase):
735 """Shared Lock Synchronized decorator test"""
738 _ThreadedTestCase.setUp(self)
740 @locking.ssynchronized(_decoratorlock)
741 def _doItExclusive(self):
742 self.assert_(_decoratorlock._is_owned())
745 @locking.ssynchronized(_decoratorlock, shared=1)
746 def _doItSharer(self):
747 self.assert_(_decoratorlock._is_owned(shared=1))
750 def testDecoratedFunctions(self):
751 self._doItExclusive()
752 self.assertFalse(_decoratorlock._is_owned())
754 self.assertFalse(_decoratorlock._is_owned())
756 def testSharersCanCoexist(self):
757 _decoratorlock.acquire(shared=1)
758 threading.Thread(target=self._doItSharer).start()
759 self.assert_(self.done.get(True, 1))
760 _decoratorlock.release()
763 def testExclusiveBlocksExclusive(self):
764 _decoratorlock.acquire()
765 self._addThread(target=self._doItExclusive)
766 # give it a bit of time to check that it's not actually doing anything
767 self.assertRaises(Queue.Empty, self.done.get_nowait)
768 _decoratorlock.release()
770 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
773 def testExclusiveBlocksSharer(self):
774 _decoratorlock.acquire()
775 self._addThread(target=self._doItSharer)
776 self.assertRaises(Queue.Empty, self.done.get_nowait)
777 _decoratorlock.release()
779 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
782 def testSharerBlocksExclusive(self):
783 _decoratorlock.acquire(shared=1)
784 self._addThread(target=self._doItExclusive)
785 self.assertRaises(Queue.Empty, self.done.get_nowait)
786 _decoratorlock.release()
788 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
791 class TestLockSet(_ThreadedTestCase):
795 _ThreadedTestCase.setUp(self)
799 """Helper to (re)initialize the lock set"""
800 self.resources = ['one', 'two', 'three']
801 self.ls = locking.LockSet(self.resources, "TestLockSet")
803 def testResources(self):
804 self.assertEquals(self.ls._names(), set(self.resources))
805 newls = locking.LockSet([], "TestLockSet.testResources")
806 self.assertEquals(newls._names(), set())
808 def testAcquireRelease(self):
809 self.assert_(self.ls.acquire('one'))
810 self.assertEquals(self.ls._list_owned(), set(['one']))
812 self.assertEquals(self.ls._list_owned(), set())
813 self.assertEquals(self.ls.acquire(['one']), set(['one']))
814 self.assertEquals(self.ls._list_owned(), set(['one']))
816 self.assertEquals(self.ls._list_owned(), set())
817 self.ls.acquire(['one', 'two', 'three'])
818 self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
819 self.ls.release('one')
820 self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
821 self.ls.release(['three'])
822 self.assertEquals(self.ls._list_owned(), set(['two']))
824 self.assertEquals(self.ls._list_owned(), set())
825 self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
826 self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
828 self.assertEquals(self.ls._list_owned(), set())
830 def testNoDoubleAcquire(self):
831 self.ls.acquire('one')
832 self.assertRaises(AssertionError, self.ls.acquire, 'one')
833 self.assertRaises(AssertionError, self.ls.acquire, ['two'])
834 self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
836 self.ls.acquire(['one', 'three'])
837 self.ls.release('one')
838 self.assertRaises(AssertionError, self.ls.acquire, ['two'])
839 self.ls.release('three')
841 def testNoWrongRelease(self):
842 self.assertRaises(AssertionError, self.ls.release)
843 self.ls.acquire('one')
844 self.assertRaises(AssertionError, self.ls.release, 'two')
846 def testAddRemove(self):
848 self.assertEquals(self.ls._list_owned(), set())
849 self.assert_('four' in self.ls._names())
850 self.ls.add(['five', 'six', 'seven'], acquired=1)
851 self.assert_('five' in self.ls._names())
852 self.assert_('six' in self.ls._names())
853 self.assert_('seven' in self.ls._names())
854 self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
855 self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
856 self.assert_('five' not in self.ls._names())
857 self.assert_('six' not in self.ls._names())
858 self.assertEquals(self.ls._list_owned(), set(['seven']))
859 self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
860 self.ls.remove('seven')
861 self.assert_('seven' not in self.ls._names())
862 self.assertEquals(self.ls._list_owned(), set([]))
863 self.ls.acquire(None, shared=1)
864 self.assertRaises(AssertionError, self.ls.add, 'eight')
866 self.ls.acquire(None)
867 self.ls.add('eight', acquired=1)
868 self.assert_('eight' in self.ls._names())
869 self.assert_('eight' in self.ls._list_owned())
871 self.assert_('nine' in self.ls._names())
872 self.assert_('nine' not in self.ls._list_owned())
874 self.ls.remove(['two'])
875 self.assert_('two' not in self.ls._names())
876 self.ls.acquire('three')
877 self.assertEquals(self.ls.remove(['three']), ['three'])
878 self.assert_('three' not in self.ls._names())
879 self.assertEquals(self.ls.remove('three'), [])
880 self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
881 self.assert_('one' not in self.ls._names())
883 def testRemoveNonBlocking(self):
884 self.ls.acquire('one')
885 self.assertEquals(self.ls.remove('one'), ['one'])
886 self.ls.acquire(['two', 'three'])
887 self.assertEquals(self.ls.remove(['two', 'three']),
890 def testNoDoubleAdd(self):
891 self.assertRaises(errors.LockError, self.ls.add, 'two')
893 self.assertRaises(errors.LockError, self.ls.add, 'four')
895 def testNoWrongRemoves(self):
896 self.ls.acquire(['one', 'three'], shared=1)
897 # Cannot remove 'two' while holding something which is not a superset
898 self.assertRaises(AssertionError, self.ls.remove, 'two')
899 # Cannot remove 'three' as we are sharing it
900 self.assertRaises(AssertionError, self.ls.remove, 'three')
902 def testAcquireSetLock(self):
903 # acquire the set-lock exclusively
904 self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
905 self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
906 self.assertEquals(self.ls._is_owned(), True)
907 self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
908 # I can still add/remove elements...
909 self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
910 self.assert_(self.ls.add('six'))
913 self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
914 # adding new elements is not possible
915 self.assertRaises(AssertionError, self.ls.add, 'five')
918 def testAcquireWithRepetitions(self):
919 self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
920 set(['two', 'two', 'three']))
921 self.ls.release(['two', 'two'])
922 self.assertEquals(self.ls._list_owned(), set(['three']))
924 def testEmptyAcquire(self):
925 # Acquire an empty list of locks...
926 self.assertEquals(self.ls.acquire([]), set())
927 self.assertEquals(self.ls._list_owned(), set())
928 # New locks can still be addded
929 self.assert_(self.ls.add('six'))
930 # "re-acquiring" is not an issue, since we had really acquired nothing
931 self.assertEquals(self.ls.acquire([], shared=1), set())
932 self.assertEquals(self.ls._list_owned(), set())
933 # We haven't really acquired anything, so we cannot release
934 self.assertRaises(AssertionError, self.ls.release)
936 def _doLockSet(self, names, shared):
938 self.ls.acquire(names, shared=shared)
939 self.done.put('DONE')
941 except errors.LockError:
944 def _doAddSet(self, names):
946 self.ls.add(names, acquired=1)
947 self.done.put('DONE')
949 except errors.LockError:
952 def _doRemoveSet(self, names):
953 self.done.put(self.ls.remove(names))
956 def testConcurrentSharedAcquire(self):
957 self.ls.acquire(['one', 'two'], shared=1)
958 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
960 self.assertEqual(self.done.get_nowait(), 'DONE')
961 self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
963 self.assertEqual(self.done.get_nowait(), 'DONE')
964 self._addThread(target=self._doLockSet, args=('three', 1))
966 self.assertEqual(self.done.get_nowait(), 'DONE')
967 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
968 self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
969 self.assertRaises(Queue.Empty, self.done.get_nowait)
972 self.assertEqual(self.done.get_nowait(), 'DONE')
973 self.assertEqual(self.done.get_nowait(), 'DONE')
976 def testConcurrentExclusiveAcquire(self):
977 self.ls.acquire(['one', 'two'])
978 self._addThread(target=self._doLockSet, args=('three', 1))
980 self.assertEqual(self.done.get_nowait(), 'DONE')
981 self._addThread(target=self._doLockSet, args=('three', 0))
983 self.assertEqual(self.done.get_nowait(), 'DONE')
984 self.assertRaises(Queue.Empty, self.done.get_nowait)
985 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
986 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
987 self._addThread(target=self._doLockSet, args=('one', 0))
988 self._addThread(target=self._doLockSet, args=('one', 1))
989 self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
990 self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
991 self.assertRaises(Queue.Empty, self.done.get_nowait)
995 self.failUnlessEqual(self.done.get_nowait(), 'DONE')
998 def testSimpleAcquireTimeoutExpiring(self):
999 names = sorted(self.ls._names())
1000 self.assert_(len(names) >= 3)
1002 # Get name of first lock
1005 # Get name of last lock
1009 # Block first and try to lock it again
1012 # Block last and try to lock all locks
1015 # Block last and try to lock it again
1019 for (wanted, block) in checks:
1020 # Lock in exclusive mode
1021 self.assert_(self.ls.acquire(block, shared=0))
1024 # Try to get the same lock again with a timeout (should never succeed)
1025 acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1027 self.done.put("acquired")
1030 self.assert_(acquired is None)
1031 self.assertFalse(self.ls._list_owned())
1032 self.assertFalse(self.ls._is_owned())
1033 self.done.put("not acquired")
1035 self._addThread(target=_AcquireOne)
1037 # Wait for timeout in thread to expire
1040 # Release exclusive lock again
1043 self.assertEqual(self.done.get_nowait(), "not acquired")
1044 self.assertRaises(Queue.Empty, self.done.get_nowait)
1047 def testDelayedAndExpiringLockAcquire(self):
1049 self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1051 for expire in (False, True):
1052 names = sorted(self.ls._names())
1053 self.assertEqual(len(names), 8)
1055 lock_ev = dict([(i, threading.Event()) for i in names])
1057 # Lock all in exclusive mode
1058 self.assert_(self.ls.acquire(names, shared=0))
1061 # We'll wait at least 300ms per lock
1062 lockwait = len(names) * [0.3]
1064 # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1065 # this gives us up to 2.4s to fail.
1066 lockall_timeout = 0.4
1068 # This should finish rather quickly
1070 lockall_timeout = len(names) * 5.0
1073 def acquire_notification(name):
1075 self.done.put("getting %s" % name)
1080 if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1081 test_notify=acquire_notification):
1082 self.done.put("got all")
1085 self.done.put("timeout on all")
1088 for ev in lock_ev.values():
1091 t = self._addThread(target=_LockAll)
1093 for idx, name in enumerate(names):
1094 # Wait for actual acquire on this lock to start
1095 lock_ev[name].wait(10.0)
1097 if expire and t.isAlive():
1098 # Wait some time after getting the notification to make sure the lock
1099 # acquire will expire
1100 SafeSleep(lockwait[idx])
1102 self.ls.release(names=name)
1104 self.assertFalse(self.ls._list_owned())
1109 # Not checking which locks were actually acquired. Doing so would be
1110 # too timing-dependant.
1111 self.assertEqual(self.done.get_nowait(), "timeout on all")
1114 self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1115 self.assertEqual(self.done.get_nowait(), "got all")
1116 self.assertRaises(Queue.Empty, self.done.get_nowait)
1119 def testConcurrentRemove(self):
1121 self.ls.acquire(['one', 'two', 'four'])
1122 self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1123 self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1124 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1125 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1126 self.assertRaises(Queue.Empty, self.done.get_nowait)
1127 self.ls.remove('one')
1131 self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1132 self.ls.add(['five', 'six'], acquired=1)
1133 self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1134 self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1135 self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1136 self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1137 self.ls.remove('five')
1141 self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1142 self.ls.acquire(['three', 'four'])
1143 self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1144 self.assertRaises(Queue.Empty, self.done.get_nowait)
1145 self.ls.remove('four')
1147 self.assertEqual(self.done.get_nowait(), ['six'])
1148 self._addThread(target=self._doRemoveSet, args=(['two']))
1150 self.assertEqual(self.done.get_nowait(), ['two'])
1156 def testConcurrentSharedSetLock(self):
1157 # share the set-lock...
1158 self.ls.acquire(None, shared=1)
1159 # ...another thread can share it too
1160 self._addThread(target=self._doLockSet, args=(None, 1))
1162 self.assertEqual(self.done.get_nowait(), 'DONE')
1163 # ...or just share some elements
1164 self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1166 self.assertEqual(self.done.get_nowait(), 'DONE')
1167 # ...but not add new ones or remove any
1168 t = self._addThread(target=self._doAddSet, args=(['nine']))
1169 self._addThread(target=self._doRemoveSet, args=(['two'], ))
1170 self.assertRaises(Queue.Empty, self.done.get_nowait)
1171 # this just releases the set-lock
1174 self.assertEqual(self.done.get_nowait(), 'DONE')
1175 # release the lock on the actual elements so remove() can proceed too
1178 self.failUnlessEqual(self.done.get_nowait(), ['two'])
1183 def testConcurrentExclusiveSetLock(self):
1184 # acquire the set-lock...
1185 self.ls.acquire(None, shared=0)
1186 # ...no one can do anything else
1187 self._addThread(target=self._doLockSet, args=(None, 1))
1188 self._addThread(target=self._doLockSet, args=(None, 0))
1189 self._addThread(target=self._doLockSet, args=(['three'], 0))
1190 self._addThread(target=self._doLockSet, args=(['two'], 1))
1191 self._addThread(target=self._doAddSet, args=(['nine']))
1192 self.assertRaises(Queue.Empty, self.done.get_nowait)
1196 self.assertEqual(self.done.get(True, 1), 'DONE')
1201 def testConcurrentSetLockAdd(self):
1202 self.ls.acquire('one')
1203 # Another thread wants the whole SetLock
1204 self._addThread(target=self._doLockSet, args=(None, 0))
1205 self._addThread(target=self._doLockSet, args=(None, 1))
1206 self.assertRaises(Queue.Empty, self.done.get_nowait)
1207 self.assertRaises(AssertionError, self.ls.add, 'four')
1210 self.assertEqual(self.done.get_nowait(), 'DONE')
1211 self.assertEqual(self.done.get_nowait(), 'DONE')
1212 self.ls.acquire(None)
1213 self._addThread(target=self._doLockSet, args=(None, 0))
1214 self._addThread(target=self._doLockSet, args=(None, 1))
1215 self.assertRaises(Queue.Empty, self.done.get_nowait)
1217 self.ls.add('five', acquired=1)
1218 self.ls.add('six', acquired=1, shared=1)
1219 self.assertEquals(self.ls._list_owned(),
1220 set(['one', 'two', 'three', 'five', 'six']))
1221 self.assertEquals(self.ls._is_owned(), True)
1222 self.assertEquals(self.ls._names(),
1223 set(['one', 'two', 'three', 'four', 'five', 'six']))
1226 self.assertEqual(self.done.get_nowait(), 'DONE')
1227 self.assertEqual(self.done.get_nowait(), 'DONE')
1231 def testEmptyLockSet(self):
1233 self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1235 self.ls.remove(['one', 'two', 'three'])
1236 # and adds/locks by another thread still wait
1237 self._addThread(target=self._doAddSet, args=(['nine']))
1238 self._addThread(target=self._doLockSet, args=(None, 1))
1239 self._addThread(target=self._doLockSet, args=(None, 0))
1240 self.assertRaises(Queue.Empty, self.done.get_nowait)
1244 self.assertEqual(self.done.get_nowait(), 'DONE')
1246 self.assertEqual(self.ls.remove(['nine']), ['nine'])
1248 self.assertEqual(self.ls.acquire(None, shared=1), set())
1249 # other sharers can go, adds still wait
1250 self._addThread(target=self._doLockSet, args=(None, 1))
1252 self.assertEqual(self.done.get_nowait(), 'DONE')
1253 self._addThread(target=self._doAddSet, args=(['nine']))
1254 self.assertRaises(Queue.Empty, self.done.get_nowait)
1257 self.assertEqual(self.done.get_nowait(), 'DONE')
1261 class TestGanetiLockManager(_ThreadedTestCase):
1264 _ThreadedTestCase.setUp(self)
1265 self.nodes=['n1', 'n2']
1266 self.instances=['i1', 'i2', 'i3']
1267 self.GL = locking.GanetiLockManager(nodes=self.nodes,
1268 instances=self.instances)
1271 # Don't try this at home...
1272 locking.GanetiLockManager._instance = None
1274 def testLockingConstants(self):
1275 # The locking library internally cheats by assuming its constants have some
1276 # relationships with each other. Check those hold true.
1277 # This relationship is also used in the Processor to recursively acquire
1278 # the right locks. Again, please don't break it.
1279 for i in range(len(locking.LEVELS)):
1280 self.assertEqual(i, locking.LEVELS[i])
1282 def testDoubleGLFails(self):
1283 self.assertRaises(AssertionError, locking.GanetiLockManager)
1285 def testLockNames(self):
1286 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1287 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1288 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1289 set(self.instances))
1291 def testInitAndResources(self):
1292 locking.GanetiLockManager._instance = None
1293 self.GL = locking.GanetiLockManager([], [])
1294 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1295 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1296 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1298 locking.GanetiLockManager._instance = None
1299 self.GL = locking.GanetiLockManager(self.nodes, [])
1300 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1301 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1302 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1304 locking.GanetiLockManager._instance = None
1305 self.GL = locking.GanetiLockManager([], self.instances)
1306 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1307 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1308 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1309 set(self.instances))
1311 def testAcquireRelease(self):
1312 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1313 self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1314 self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1315 self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1316 self.GL.release(locking.LEVEL_NODE, ['n2'])
1317 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1318 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1319 self.GL.release(locking.LEVEL_NODE)
1320 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1321 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1322 self.GL.release(locking.LEVEL_INSTANCE)
1323 self.assertRaises(errors.LockError, self.GL.acquire,
1324 locking.LEVEL_INSTANCE, ['i5'])
1325 self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1326 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1328 def testAcquireWholeSets(self):
1329 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1330 self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1331 set(self.instances))
1332 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1333 set(self.instances))
1334 self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1336 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1338 self.GL.release(locking.LEVEL_NODE)
1339 self.GL.release(locking.LEVEL_INSTANCE)
1340 self.GL.release(locking.LEVEL_CLUSTER)
1342 def testAcquireWholeAndPartial(self):
1343 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1344 self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1345 set(self.instances))
1346 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1347 set(self.instances))
1348 self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1350 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1352 self.GL.release(locking.LEVEL_NODE)
1353 self.GL.release(locking.LEVEL_INSTANCE)
1354 self.GL.release(locking.LEVEL_CLUSTER)
1356 def testBGLDependency(self):
1357 self.assertRaises(AssertionError, self.GL.acquire,
1358 locking.LEVEL_NODE, ['n1', 'n2'])
1359 self.assertRaises(AssertionError, self.GL.acquire,
1360 locking.LEVEL_INSTANCE, ['i3'])
1361 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1362 self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1363 self.assertRaises(AssertionError, self.GL.release,
1364 locking.LEVEL_CLUSTER, ['BGL'])
1365 self.assertRaises(AssertionError, self.GL.release,
1366 locking.LEVEL_CLUSTER)
1367 self.GL.release(locking.LEVEL_NODE)
1368 self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1369 self.assertRaises(AssertionError, self.GL.release,
1370 locking.LEVEL_CLUSTER, ['BGL'])
1371 self.assertRaises(AssertionError, self.GL.release,
1372 locking.LEVEL_CLUSTER)
1373 self.GL.release(locking.LEVEL_INSTANCE)
1375 def testWrongOrder(self):
1376 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1377 self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1378 self.assertRaises(AssertionError, self.GL.acquire,
1379 locking.LEVEL_NODE, ['n1'])
1380 self.assertRaises(AssertionError, self.GL.acquire,
1381 locking.LEVEL_INSTANCE, ['i2'])
1383 # Helper function to run as a thread that shared the BGL and then acquires
1384 # some locks at another level.
1385 def _doLock(self, level, names, shared):
1387 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1388 self.GL.acquire(level, names, shared=shared)
1389 self.done.put('DONE')
1390 self.GL.release(level)
1391 self.GL.release(locking.LEVEL_CLUSTER)
1392 except errors.LockError:
1393 self.done.put('ERR')
1396 def testConcurrency(self):
1397 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1398 self._addThread(target=self._doLock,
1399 args=(locking.LEVEL_INSTANCE, 'i1', 1))
1401 self.assertEqual(self.done.get_nowait(), 'DONE')
1402 self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1403 self._addThread(target=self._doLock,
1404 args=(locking.LEVEL_INSTANCE, 'i1', 1))
1406 self.assertEqual(self.done.get_nowait(), 'DONE')
1407 self._addThread(target=self._doLock,
1408 args=(locking.LEVEL_INSTANCE, 'i3', 1))
1409 self.assertRaises(Queue.Empty, self.done.get_nowait)
1410 self.GL.release(locking.LEVEL_INSTANCE)
1412 self.assertEqual(self.done.get_nowait(), 'DONE')
1413 self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1414 self._addThread(target=self._doLock,
1415 args=(locking.LEVEL_INSTANCE, 'i2', 1))
1417 self.assertEqual(self.done.get_nowait(), 'DONE')
1418 self._addThread(target=self._doLock,
1419 args=(locking.LEVEL_INSTANCE, 'i2', 0))
1420 self.assertRaises(Queue.Empty, self.done.get_nowait)
1421 self.GL.release(locking.LEVEL_INSTANCE)
1423 self.assertEqual(self.done.get(True, 1), 'DONE')
1424 self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1427 class TestLockMonitor(_ThreadedTestCase):
1429 _ThreadedTestCase.setUp(self)
1430 self.lm = locking.LockMonitor()
1432 def testSingleThread(self):
1435 for i in range(100):
1436 name = "TestLock%s" % i
1437 locks.append(locking.SharedLock(name, monitor=self.lm))
1439 self.assertEqual(len(self.lm._locks), len(locks))
1444 # The garbage collector might needs some time
1447 raise utils.RetryAgain()
1449 utils.Retry(_CheckLocks, 0.1, 30.0)
1451 self.assertFalse(self.lm._locks)
1453 def testMultiThread(self):
1456 def _CreateLock(prev, next, name):
1458 locks.append(locking.SharedLock(name, monitor=self.lm))
1464 first = threading.Event()
1467 # Use a deterministic random generator
1468 for i in random.Random(4263).sample(range(100), 33):
1469 name = "MtTestLock%s" % i
1470 expnames.append(name)
1472 ev = threading.Event()
1473 self._addThread(target=_CreateLock, args=(prev, ev, name))
1480 # Check order in which locks were added
1481 self.assertEqual([i.name for i in locks], expnames)
1483 # Sync queries are not supported
1484 self.assertRaises(NotImplementedError, self.lm.QueryLocks, ["name"], True)
1486 # Check query result
1487 self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1488 [[name, None, None] for name in utils.NiceSort(expnames)])
1490 # Test exclusive acquire
1491 for tlock in locks[::4]:
1492 tlock.acquire(shared=0)
1494 def _GetExpResult(name):
1495 if tlock.name == name:
1496 return [name, "exclusive", [threading.currentThread().getName()]]
1497 return [name, None, None]
1499 self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1500 [_GetExpResult(name)
1501 for name in utils.NiceSort(expnames)])
1505 # Test shared acquire
1506 def _Acquire(lock, shared, ev):
1507 lock.acquire(shared=shared)
1513 for tlock1 in locks[::11]:
1514 for tlock2 in locks[::-15]:
1515 if tlock2 == tlock1:
1518 for tlock3 in locks[::10]:
1519 if tlock3 == tlock2:
1522 ev = threading.Event()
1527 tthreads1.append(self._addThread(target=_Acquire,
1528 args=(tlock1, 1, ev)))
1529 tthread2 = self._addThread(target=_Acquire, args=(tlock2, 1, ev))
1530 tthread3 = self._addThread(target=_Acquire, args=(tlock3, 0, ev))
1532 # Check query result
1533 for (name, mode, owner) in self.lm.QueryLocks(["name", "mode",
1535 if name == tlock1.name:
1536 self.assertEqual(mode, "shared")
1537 self.assertEqual(set(owner), set(i.getName() for i in tthreads1))
1540 if name == tlock2.name:
1541 self.assertEqual(mode, "shared")
1542 self.assertEqual(owner, [tthread2.getName()])
1545 if name == tlock3.name:
1546 self.assertEqual(mode, "exclusive")
1547 self.assertEqual(owner, [tthread3.getName()])
1550 self.assert_(name in expnames)
1551 self.assert_(mode is None)
1552 self.assert_(owner is None)
1554 # Release locks again
1559 self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1561 for name in utils.NiceSort(expnames)])
1563 def testDelete(self):
1564 lock = locking.SharedLock("TestLock", monitor=self.lm)
1566 self.assertEqual(len(self.lm._locks), 1)
1567 self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1568 [[lock.name, None, None]])
1572 self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1573 [[lock.name, "deleted", None]])
1574 self.assertEqual(len(self.lm._locks), 1)
1577 if __name__ == '__main__':
1578 testutils.GanetiTestProgram()