4 # Copyright (C) 2006, 2007, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the locking module"""
33 from ganeti import locking
34 from ganeti import errors
35 from ganeti import utils
36 from ganeti import compat
41 # This is used to test the ssynchronize decorator.
42 # Since it's passed as input to a decorator it must be declared as a global.
43 _decoratorlock = locking.SharedLock("decorator lock")
45 #: List for looping tests
50 """Decorator for executing a function many times"""
51 def wrapper(*args, **kwargs):
57 def SafeSleep(duration):
60 delay = start + duration - time.time()
66 class _ThreadedTestCase(unittest.TestCase):
67 """Test class that supports adding/waiting on threads"""
69 unittest.TestCase.setUp(self)
70 self.done = Queue.Queue(0)
73 def _addThread(self, *args, **kwargs):
74 """Create and remember a new thread"""
75 t = threading.Thread(*args, **kwargs)
76 self.threads.append(t)
80 def _waitThreads(self):
81 """Wait for all our threads to finish"""
82 for t in self.threads:
84 self.failIf(t.isAlive())
88 class _ConditionTestCase(_ThreadedTestCase):
89 """Common test case for conditions"""
92 _ThreadedTestCase.setUp(self)
93 self.lock = threading.Lock()
94 self.cond = cls(self.lock)
96 def _testAcquireRelease(self):
97 self.assertFalse(self.cond._is_owned())
98 self.assertRaises(RuntimeError, self.cond.wait)
99 self.assertRaises(RuntimeError, self.cond.notifyAll)
102 self.assert_(self.cond._is_owned())
103 self.cond.notifyAll()
104 self.assert_(self.cond._is_owned())
107 self.assertFalse(self.cond._is_owned())
108 self.assertRaises(RuntimeError, self.cond.wait)
109 self.assertRaises(RuntimeError, self.cond.notifyAll)
111 def _testNotification(self):
116 self.cond.notifyAll()
121 self._addThread(target=_NotifyAll)
122 self.assertEqual(self.done.get(True, 1), "NE")
123 self.assertRaises(Queue.Empty, self.done.get_nowait)
125 self.assertEqual(self.done.get(True, 1), "NA")
126 self.assertEqual(self.done.get(True, 1), "NN")
127 self.assert_(self.cond._is_owned())
129 self.assertFalse(self.cond._is_owned())
132 class TestSingleNotifyPipeCondition(_ConditionTestCase):
133 """SingleNotifyPipeCondition tests"""
136 _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
138 def testAcquireRelease(self):
139 self._testAcquireRelease()
141 def testNotification(self):
142 self._testNotification()
144 def testWaitReuse(self):
150 def testNoNotifyReuse(self):
152 self.cond.notifyAll()
153 self.assertRaises(RuntimeError, self.cond.wait)
154 self.assertRaises(RuntimeError, self.cond.notifyAll)
158 class TestPipeCondition(_ConditionTestCase):
159 """PipeCondition tests"""
162 _ConditionTestCase.setUp(self, locking.PipeCondition)
164 def testAcquireRelease(self):
165 self._testAcquireRelease()
167 def testNotification(self):
168 self._testNotification()
170 def _TestWait(self, fn):
172 self._addThread(target=fn),
173 self._addThread(target=fn),
174 self._addThread(target=fn),
177 # Wait for threads to be waiting
179 self.assertEqual(self.done.get(True, 1), "A")
181 self.assertRaises(Queue.Empty, self.done.get_nowait)
184 self.assertEqual(len(self.cond._waiters), 3)
185 self.assertEqual(self.cond._waiters, set(threads))
186 # This new thread can't acquire the lock, and thus call wait, before we
188 self._addThread(target=fn)
189 self.cond.notifyAll()
190 self.assertRaises(Queue.Empty, self.done.get_nowait)
193 # We should now get 3 W and 1 A (for the new thread) in whatever order
197 got = self.done.get(True, 1)
203 self.fail("Got %s on the done queue" % got)
205 self.assertEqual(w, 3)
206 self.assertEqual(a, 1)
209 self.cond.notifyAll()
212 self.assertEqual(self.done.get_nowait(), "W")
213 self.assertRaises(Queue.Empty, self.done.get_nowait)
215 def testBlockingWait(self):
223 self._TestWait(_BlockingWait)
225 def testLongTimeoutWait(self):
233 self._TestWait(_Helper)
235 def _TimeoutWait(self, timeout, check):
237 self.cond.wait(timeout)
241 def testShortTimeoutWait(self):
242 self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
243 self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
245 self.assertEqual(self.done.get_nowait(), "T1")
246 self.assertEqual(self.done.get_nowait(), "T1")
247 self.assertRaises(Queue.Empty, self.done.get_nowait)
249 def testZeroTimeoutWait(self):
250 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
251 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
252 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
254 self.assertEqual(self.done.get_nowait(), "T0")
255 self.assertEqual(self.done.get_nowait(), "T0")
256 self.assertEqual(self.done.get_nowait(), "T0")
257 self.assertRaises(Queue.Empty, self.done.get_nowait)
260 class TestSharedLock(_ThreadedTestCase):
261 """SharedLock tests"""
264 _ThreadedTestCase.setUp(self)
265 self.sl = locking.SharedLock("TestSharedLock")
267 def testSequenceAndOwnership(self):
268 self.assertFalse(self.sl._is_owned())
269 self.sl.acquire(shared=1)
270 self.assert_(self.sl._is_owned())
271 self.assert_(self.sl._is_owned(shared=1))
272 self.assertFalse(self.sl._is_owned(shared=0))
274 self.assertFalse(self.sl._is_owned())
276 self.assert_(self.sl._is_owned())
277 self.assertFalse(self.sl._is_owned(shared=1))
278 self.assert_(self.sl._is_owned(shared=0))
280 self.assertFalse(self.sl._is_owned())
281 self.sl.acquire(shared=1)
282 self.assert_(self.sl._is_owned())
283 self.assert_(self.sl._is_owned(shared=1))
284 self.assertFalse(self.sl._is_owned(shared=0))
286 self.assertFalse(self.sl._is_owned())
288 def testBooleanValue(self):
289 # semaphores are supposed to return a true value on a successful acquire
290 self.assert_(self.sl.acquire(shared=1))
292 self.assert_(self.sl.acquire())
295 def testDoubleLockingStoE(self):
296 self.sl.acquire(shared=1)
297 self.assertRaises(AssertionError, self.sl.acquire)
299 def testDoubleLockingEtoS(self):
301 self.assertRaises(AssertionError, self.sl.acquire, shared=1)
303 def testDoubleLockingStoS(self):
304 self.sl.acquire(shared=1)
305 self.assertRaises(AssertionError, self.sl.acquire, shared=1)
307 def testDoubleLockingEtoE(self):
309 self.assertRaises(AssertionError, self.sl.acquire)
311 # helper functions: called in a separate thread they acquire the lock, send
312 # their identifier on the done queue, then release it.
313 def _doItSharer(self):
315 self.sl.acquire(shared=1)
318 except errors.LockError:
321 def _doItExclusive(self):
326 except errors.LockError:
329 def _doItDelete(self):
333 except errors.LockError:
336 def testSharersCanCoexist(self):
337 self.sl.acquire(shared=1)
338 threading.Thread(target=self._doItSharer).start()
339 self.assert_(self.done.get(True, 1))
343 def testExclusiveBlocksExclusive(self):
345 self._addThread(target=self._doItExclusive)
346 self.assertRaises(Queue.Empty, self.done.get_nowait)
349 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
352 def testExclusiveBlocksDelete(self):
354 self._addThread(target=self._doItDelete)
355 self.assertRaises(Queue.Empty, self.done.get_nowait)
358 self.failUnlessEqual(self.done.get_nowait(), 'DEL')
359 self.sl = locking.SharedLock(self.sl.name)
362 def testExclusiveBlocksSharer(self):
364 self._addThread(target=self._doItSharer)
365 self.assertRaises(Queue.Empty, self.done.get_nowait)
368 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
371 def testSharerBlocksExclusive(self):
372 self.sl.acquire(shared=1)
373 self._addThread(target=self._doItExclusive)
374 self.assertRaises(Queue.Empty, self.done.get_nowait)
377 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
380 def testSharerBlocksDelete(self):
381 self.sl.acquire(shared=1)
382 self._addThread(target=self._doItDelete)
383 self.assertRaises(Queue.Empty, self.done.get_nowait)
386 self.failUnlessEqual(self.done.get_nowait(), 'DEL')
387 self.sl = locking.SharedLock(self.sl.name)
390 def testWaitingExclusiveBlocksSharer(self):
391 """SKIPPED testWaitingExclusiveBlockSharer"""
394 self.sl.acquire(shared=1)
395 # the lock is acquired in shared mode...
396 self._addThread(target=self._doItExclusive)
397 # ...but now an exclusive is waiting...
398 self._addThread(target=self._doItSharer)
399 # ...so the sharer should be blocked as well
400 self.assertRaises(Queue.Empty, self.done.get_nowait)
403 # The exclusive passed before
404 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
405 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
408 def testWaitingSharerBlocksExclusive(self):
409 """SKIPPED testWaitingSharerBlocksExclusive"""
413 # the lock is acquired in exclusive mode...
414 self._addThread(target=self._doItSharer)
415 # ...but now a sharer is waiting...
416 self._addThread(target=self._doItExclusive)
417 # ...the exclusive is waiting too...
418 self.assertRaises(Queue.Empty, self.done.get_nowait)
421 # The sharer passed before
422 self.assertEqual(self.done.get_nowait(), 'SHR')
423 self.assertEqual(self.done.get_nowait(), 'EXC')
425 def testDelete(self):
427 self.assertRaises(errors.LockError, self.sl.acquire)
428 self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
429 self.assertRaises(errors.LockError, self.sl.delete)
431 def testDeleteTimeout(self):
432 self.sl.delete(timeout=60)
434 def testNoDeleteIfSharer(self):
435 self.sl.acquire(shared=1)
436 self.assertRaises(AssertionError, self.sl.delete)
439 def testDeletePendingSharersExclusiveDelete(self):
441 self._addThread(target=self._doItSharer)
442 self._addThread(target=self._doItSharer)
443 self._addThread(target=self._doItExclusive)
444 self._addThread(target=self._doItDelete)
447 # The threads who were pending return ERR
449 self.assertEqual(self.done.get_nowait(), 'ERR')
450 self.sl = locking.SharedLock(self.sl.name)
453 def testDeletePendingDeleteExclusiveSharers(self):
455 self._addThread(target=self._doItDelete)
456 self._addThread(target=self._doItExclusive)
457 self._addThread(target=self._doItSharer)
458 self._addThread(target=self._doItSharer)
461 # The two threads who were pending return both ERR
462 self.assertEqual(self.done.get_nowait(), 'ERR')
463 self.assertEqual(self.done.get_nowait(), 'ERR')
464 self.assertEqual(self.done.get_nowait(), 'ERR')
465 self.assertEqual(self.done.get_nowait(), 'ERR')
466 self.sl = locking.SharedLock(self.sl.name)
469 def testExclusiveAcquireTimeout(self):
470 for shared in [0, 1]:
471 on_queue = threading.Event()
472 release_exclusive = threading.Event()
474 def _LockExclusive():
475 self.sl.acquire(shared=0, test_notify=on_queue.set)
476 self.done.put("A: start wait")
477 release_exclusive.wait()
478 self.done.put("A: end wait")
481 # Start thread to hold lock in exclusive mode
482 self._addThread(target=_LockExclusive)
484 # Wait for wait to begin
485 self.assertEqual(self.done.get(timeout=60), "A: start wait")
487 # Wait up to 60s to get lock, but release exclusive lock as soon as we're
489 self.failUnless(self.sl.acquire(shared=shared, timeout=60,
490 test_notify=release_exclusive.set))
492 self.done.put("got 2nd")
497 self.assertEqual(self.done.get_nowait(), "A: end wait")
498 self.assertEqual(self.done.get_nowait(), "got 2nd")
499 self.assertRaises(Queue.Empty, self.done.get_nowait)
502 def testAcquireExpiringTimeout(self):
503 def _AcquireWithTimeout(shared, timeout):
504 if not self.sl.acquire(shared=shared, timeout=timeout):
505 self.done.put("timeout")
507 for shared in [0, 1]:
511 # Start shared acquires with timeout between 0 and 20 ms
513 self._addThread(target=_AcquireWithTimeout,
514 args=(shared, i * 2.0 / 1000.0))
516 # Wait for threads to finish (makes sure the acquire timeout expires
517 # before releasing the lock)
524 self.assertEqual(self.done.get_nowait(), "timeout")
526 self.assertRaises(Queue.Empty, self.done.get_nowait)
529 def testSharedSkipExclusiveAcquires(self):
530 # Tests whether shared acquires jump in front of exclusive acquires in the
533 def _Acquire(shared, name, notify_ev, wait_ev):
535 notify_fn = notify_ev.set
542 if not self.sl.acquire(shared=shared, test_notify=notify_fn):
548 # Get exclusive lock while we fill the queue
556 # Add acquires using threading.Event for synchronization. They'll be
557 # acquired exactly in the order defined in this list.
558 acquires = (shrcnt1 * [(1, "shared 1")] +
559 3 * [(0, "exclusive 1")] +
560 shrcnt2 * [(1, "shared 2")] +
561 shrcnt3 * [(1, "shared 3")] +
562 shrcnt4 * [(1, "shared 4")] +
563 3 * [(0, "exclusive 2")])
568 for args in acquires:
569 ev_cur = threading.Event()
570 self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
573 # Wait for last acquire to start
576 # Expect 6 pending exclusive acquires and 1 for all shared acquires
578 self.assertEqual(self.sl._count_pending(), 7)
580 # Release exclusive lock and wait
586 for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
587 # Shared locks aren't guaranteed to be notified in order, but they'll be
589 tmp = self.done.get_nowait()
590 if tmp == "shared 1":
592 elif tmp == "shared 2":
594 elif tmp == "shared 3":
596 elif tmp == "shared 4":
598 self.assertEqual(shrcnt1, 0)
599 self.assertEqual(shrcnt2, 0)
600 self.assertEqual(shrcnt3, 0)
601 self.assertEqual(shrcnt3, 0)
604 self.assertEqual(self.done.get_nowait(), "exclusive 1")
607 self.assertEqual(self.done.get_nowait(), "exclusive 2")
609 self.assertRaises(Queue.Empty, self.done.get_nowait)
612 def testMixedAcquireTimeout(self):
613 sync = threading.Event()
615 def _AcquireShared(ev):
616 if not self.sl.acquire(shared=1, timeout=None):
619 self.done.put("shared")
624 # Wait for notification from main thread
632 ev = threading.Event()
633 self._addThread(target=_AcquireShared, args=(ev, ))
636 # Wait for all acquires to finish
640 self.assertEqual(self.sl._count_pending(), 0)
642 # Try to get exclusive lock
643 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
645 # Acquire exclusive without timeout
646 exclsync = threading.Event()
647 exclev = threading.Event()
649 def _AcquireExclusive():
650 if not self.sl.acquire(shared=0):
653 self.done.put("exclusive")
658 # Wait for notification from main thread
663 self._addThread(target=_AcquireExclusive)
665 # Try to get exclusive lock
666 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
668 # Make all shared holders release their locks
671 # Wait for exclusive acquire to succeed
674 self.assertEqual(self.sl._count_pending(), 0)
676 # Try to get exclusive lock
677 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
679 def _AcquireSharedSimple():
680 if self.sl.acquire(shared=1, timeout=None):
681 self.done.put("shared2")
685 self._addThread(target=_AcquireSharedSimple)
687 # Tell exclusive lock to release
690 # Wait for everything to finish
693 self.assertEqual(self.sl._count_pending(), 0)
697 self.assertEqual(self.done.get_nowait(), "shared")
699 self.assertEqual(self.done.get_nowait(), "exclusive")
702 self.assertEqual(self.done.get_nowait(), "shared2")
704 self.assertRaises(Queue.Empty, self.done.get_nowait)
706 def testPriority(self):
707 # Acquire in exclusive mode
708 self.assert_(self.sl.acquire(shared=0))
711 def _Acquire(prev, next, shared, priority, result):
713 self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
715 self.done.put(result)
719 counter = itertools.count(0)
720 priorities = range(-20, 30)
721 first = threading.Event()
727 # [(shared/exclusive, set(acquire names), set(pending threads)),
728 # (shared/exclusive, ...),
734 # References shared acquire per priority in L{perprio}. Data structure:
736 # priority: (shared=1, set(acquire names), set(pending threads)),
740 for seed in [4979, 9523, 14902, 32440]:
741 # Use a deterministic random generator
742 rnd = random.Random(seed)
743 for priority in [rnd.choice(priorities) for _ in range(30)]:
748 acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
750 ev = threading.Event()
751 thread = self._addThread(target=_Acquire,
752 args=(prev, ev, shared, priority, acqname))
755 # Record expected aqcuire, see above for structure
756 data = (shared, set([acqname]), set([thread]))
757 priolist = perprio.setdefault(priority, [])
759 priosh = prioshared.get(priority, None)
761 # Shared acquires are merged
762 for i, j in zip(priosh[1:], data[1:]):
764 assert data[0] == priosh[0]
766 prioshared[priority] = data
767 priolist.append(data)
769 priolist.append(data)
771 # Start all acquires and wait for them
775 # Check lock information
776 self.assertEqual(self.sl.GetInfo(["name"]), [self.sl.name])
777 self.assertEqual(self.sl.GetInfo(["mode", "owner"]),
778 ["exclusive", [threading.currentThread().getName()]])
779 self.assertEqual(self.sl.GetInfo(["name", "pending"]),
781 [(["exclusive", "shared"][int(bool(shared))],
782 sorted([t.getName() for t in threads]))
783 for acquires in [perprio[i]
784 for i in sorted(perprio.keys())]
785 for (shared, _, threads) in acquires]])
787 # Let threads acquire the lock
790 # Wait for everything to finish
793 self.assert_(self.sl._check_empty())
795 # Check acquires by priority
796 for acquires in [perprio[i] for i in sorted(perprio.keys())]:
797 for (_, names, _) in acquires:
798 # For shared acquires, the set will contain 1..n entries. For exclusive
801 names.remove(self.done.get_nowait())
802 self.assertFalse(compat.any(names for (_, names, _) in acquires))
804 self.assertRaises(Queue.Empty, self.done.get_nowait)
807 class TestSharedLockInCondition(_ThreadedTestCase):
808 """SharedLock as a condition lock tests"""
811 _ThreadedTestCase.setUp(self)
812 self.sl = locking.SharedLock("TestSharedLockInCondition")
815 def setCondition(self):
816 self.cond = threading.Condition(self.sl)
818 def testKeepMode(self):
819 self.cond.acquire(shared=1)
820 self.assert_(self.sl._is_owned(shared=1))
822 self.assert_(self.sl._is_owned(shared=1))
824 self.cond.acquire(shared=0)
825 self.assert_(self.sl._is_owned(shared=0))
827 self.assert_(self.sl._is_owned(shared=0))
831 class TestSharedLockInPipeCondition(TestSharedLockInCondition):
832 """SharedLock as a pipe condition lock tests"""
834 def setCondition(self):
835 self.cond = locking.PipeCondition(self.sl)
838 class TestSSynchronizedDecorator(_ThreadedTestCase):
839 """Shared Lock Synchronized decorator test"""
842 _ThreadedTestCase.setUp(self)
844 @locking.ssynchronized(_decoratorlock)
845 def _doItExclusive(self):
846 self.assert_(_decoratorlock._is_owned())
849 @locking.ssynchronized(_decoratorlock, shared=1)
850 def _doItSharer(self):
851 self.assert_(_decoratorlock._is_owned(shared=1))
854 def testDecoratedFunctions(self):
855 self._doItExclusive()
856 self.assertFalse(_decoratorlock._is_owned())
858 self.assertFalse(_decoratorlock._is_owned())
860 def testSharersCanCoexist(self):
861 _decoratorlock.acquire(shared=1)
862 threading.Thread(target=self._doItSharer).start()
863 self.assert_(self.done.get(True, 1))
864 _decoratorlock.release()
867 def testExclusiveBlocksExclusive(self):
868 _decoratorlock.acquire()
869 self._addThread(target=self._doItExclusive)
870 # give it a bit of time to check that it's not actually doing anything
871 self.assertRaises(Queue.Empty, self.done.get_nowait)
872 _decoratorlock.release()
874 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
877 def testExclusiveBlocksSharer(self):
878 _decoratorlock.acquire()
879 self._addThread(target=self._doItSharer)
880 self.assertRaises(Queue.Empty, self.done.get_nowait)
881 _decoratorlock.release()
883 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
886 def testSharerBlocksExclusive(self):
887 _decoratorlock.acquire(shared=1)
888 self._addThread(target=self._doItExclusive)
889 self.assertRaises(Queue.Empty, self.done.get_nowait)
890 _decoratorlock.release()
892 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
895 class TestLockSet(_ThreadedTestCase):
899 _ThreadedTestCase.setUp(self)
903 """Helper to (re)initialize the lock set"""
904 self.resources = ['one', 'two', 'three']
905 self.ls = locking.LockSet(self.resources, "TestLockSet")
907 def testResources(self):
908 self.assertEquals(self.ls._names(), set(self.resources))
909 newls = locking.LockSet([], "TestLockSet.testResources")
910 self.assertEquals(newls._names(), set())
912 def testAcquireRelease(self):
913 self.assert_(self.ls.acquire('one'))
914 self.assertEquals(self.ls._list_owned(), set(['one']))
916 self.assertEquals(self.ls._list_owned(), set())
917 self.assertEquals(self.ls.acquire(['one']), set(['one']))
918 self.assertEquals(self.ls._list_owned(), set(['one']))
920 self.assertEquals(self.ls._list_owned(), set())
921 self.ls.acquire(['one', 'two', 'three'])
922 self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
923 self.ls.release('one')
924 self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
925 self.ls.release(['three'])
926 self.assertEquals(self.ls._list_owned(), set(['two']))
928 self.assertEquals(self.ls._list_owned(), set())
929 self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
930 self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
932 self.assertEquals(self.ls._list_owned(), set())
934 def testNoDoubleAcquire(self):
935 self.ls.acquire('one')
936 self.assertRaises(AssertionError, self.ls.acquire, 'one')
937 self.assertRaises(AssertionError, self.ls.acquire, ['two'])
938 self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
940 self.ls.acquire(['one', 'three'])
941 self.ls.release('one')
942 self.assertRaises(AssertionError, self.ls.acquire, ['two'])
943 self.ls.release('three')
945 def testNoWrongRelease(self):
946 self.assertRaises(AssertionError, self.ls.release)
947 self.ls.acquire('one')
948 self.assertRaises(AssertionError, self.ls.release, 'two')
950 def testAddRemove(self):
952 self.assertEquals(self.ls._list_owned(), set())
953 self.assert_('four' in self.ls._names())
954 self.ls.add(['five', 'six', 'seven'], acquired=1)
955 self.assert_('five' in self.ls._names())
956 self.assert_('six' in self.ls._names())
957 self.assert_('seven' in self.ls._names())
958 self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
959 self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
960 self.assert_('five' not in self.ls._names())
961 self.assert_('six' not in self.ls._names())
962 self.assertEquals(self.ls._list_owned(), set(['seven']))
963 self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
964 self.ls.remove('seven')
965 self.assert_('seven' not in self.ls._names())
966 self.assertEquals(self.ls._list_owned(), set([]))
967 self.ls.acquire(None, shared=1)
968 self.assertRaises(AssertionError, self.ls.add, 'eight')
970 self.ls.acquire(None)
971 self.ls.add('eight', acquired=1)
972 self.assert_('eight' in self.ls._names())
973 self.assert_('eight' in self.ls._list_owned())
975 self.assert_('nine' in self.ls._names())
976 self.assert_('nine' not in self.ls._list_owned())
978 self.ls.remove(['two'])
979 self.assert_('two' not in self.ls._names())
980 self.ls.acquire('three')
981 self.assertEquals(self.ls.remove(['three']), ['three'])
982 self.assert_('three' not in self.ls._names())
983 self.assertEquals(self.ls.remove('three'), [])
984 self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
985 self.assert_('one' not in self.ls._names())
987 def testRemoveNonBlocking(self):
988 self.ls.acquire('one')
989 self.assertEquals(self.ls.remove('one'), ['one'])
990 self.ls.acquire(['two', 'three'])
991 self.assertEquals(self.ls.remove(['two', 'three']),
994 def testNoDoubleAdd(self):
995 self.assertRaises(errors.LockError, self.ls.add, 'two')
997 self.assertRaises(errors.LockError, self.ls.add, 'four')
999 def testNoWrongRemoves(self):
1000 self.ls.acquire(['one', 'three'], shared=1)
1001 # Cannot remove 'two' while holding something which is not a superset
1002 self.assertRaises(AssertionError, self.ls.remove, 'two')
1003 # Cannot remove 'three' as we are sharing it
1004 self.assertRaises(AssertionError, self.ls.remove, 'three')
1006 def testAcquireSetLock(self):
1007 # acquire the set-lock exclusively
1008 self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
1009 self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
1010 self.assertEquals(self.ls._is_owned(), True)
1011 self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
1012 # I can still add/remove elements...
1013 self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
1014 self.assert_(self.ls.add('six'))
1016 # share the set-lock
1017 self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
1018 # adding new elements is not possible
1019 self.assertRaises(AssertionError, self.ls.add, 'five')
1022 def testAcquireWithRepetitions(self):
1023 self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
1024 set(['two', 'two', 'three']))
1025 self.ls.release(['two', 'two'])
1026 self.assertEquals(self.ls._list_owned(), set(['three']))
1028 def testEmptyAcquire(self):
1029 # Acquire an empty list of locks...
1030 self.assertEquals(self.ls.acquire([]), set())
1031 self.assertEquals(self.ls._list_owned(), set())
1032 # New locks can still be addded
1033 self.assert_(self.ls.add('six'))
1034 # "re-acquiring" is not an issue, since we had really acquired nothing
1035 self.assertEquals(self.ls.acquire([], shared=1), set())
1036 self.assertEquals(self.ls._list_owned(), set())
1037 # We haven't really acquired anything, so we cannot release
1038 self.assertRaises(AssertionError, self.ls.release)
1040 def _doLockSet(self, names, shared):
1042 self.ls.acquire(names, shared=shared)
1043 self.done.put('DONE')
1045 except errors.LockError:
1046 self.done.put('ERR')
1048 def _doAddSet(self, names):
1050 self.ls.add(names, acquired=1)
1051 self.done.put('DONE')
1053 except errors.LockError:
1054 self.done.put('ERR')
1056 def _doRemoveSet(self, names):
1057 self.done.put(self.ls.remove(names))
1060 def testConcurrentSharedAcquire(self):
1061 self.ls.acquire(['one', 'two'], shared=1)
1062 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1064 self.assertEqual(self.done.get_nowait(), 'DONE')
1065 self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
1067 self.assertEqual(self.done.get_nowait(), 'DONE')
1068 self._addThread(target=self._doLockSet, args=('three', 1))
1070 self.assertEqual(self.done.get_nowait(), 'DONE')
1071 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1072 self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1073 self.assertRaises(Queue.Empty, self.done.get_nowait)
1076 self.assertEqual(self.done.get_nowait(), 'DONE')
1077 self.assertEqual(self.done.get_nowait(), 'DONE')
1080 def testConcurrentExclusiveAcquire(self):
1081 self.ls.acquire(['one', 'two'])
1082 self._addThread(target=self._doLockSet, args=('three', 1))
1084 self.assertEqual(self.done.get_nowait(), 'DONE')
1085 self._addThread(target=self._doLockSet, args=('three', 0))
1087 self.assertEqual(self.done.get_nowait(), 'DONE')
1088 self.assertRaises(Queue.Empty, self.done.get_nowait)
1089 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1090 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1091 self._addThread(target=self._doLockSet, args=('one', 0))
1092 self._addThread(target=self._doLockSet, args=('one', 1))
1093 self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1094 self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
1095 self.assertRaises(Queue.Empty, self.done.get_nowait)
1099 self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1102 def testSimpleAcquireTimeoutExpiring(self):
1103 names = sorted(self.ls._names())
1104 self.assert_(len(names) >= 3)
1106 # Get name of first lock
1109 # Get name of last lock
1113 # Block first and try to lock it again
1116 # Block last and try to lock all locks
1119 # Block last and try to lock it again
1123 for (wanted, block) in checks:
1124 # Lock in exclusive mode
1125 self.assert_(self.ls.acquire(block, shared=0))
1128 # Try to get the same lock again with a timeout (should never succeed)
1129 acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1131 self.done.put("acquired")
1134 self.assert_(acquired is None)
1135 self.assertFalse(self.ls._list_owned())
1136 self.assertFalse(self.ls._is_owned())
1137 self.done.put("not acquired")
1139 self._addThread(target=_AcquireOne)
1141 # Wait for timeout in thread to expire
1144 # Release exclusive lock again
1147 self.assertEqual(self.done.get_nowait(), "not acquired")
1148 self.assertRaises(Queue.Empty, self.done.get_nowait)
1151 def testDelayedAndExpiringLockAcquire(self):
1153 self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1155 for expire in (False, True):
1156 names = sorted(self.ls._names())
1157 self.assertEqual(len(names), 8)
1159 lock_ev = dict([(i, threading.Event()) for i in names])
1161 # Lock all in exclusive mode
1162 self.assert_(self.ls.acquire(names, shared=0))
1165 # We'll wait at least 300ms per lock
1166 lockwait = len(names) * [0.3]
1168 # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1169 # this gives us up to 2.4s to fail.
1170 lockall_timeout = 0.4
1172 # This should finish rather quickly
1174 lockall_timeout = len(names) * 5.0
1177 def acquire_notification(name):
1179 self.done.put("getting %s" % name)
1184 if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1185 test_notify=acquire_notification):
1186 self.done.put("got all")
1189 self.done.put("timeout on all")
1192 for ev in lock_ev.values():
1195 t = self._addThread(target=_LockAll)
1197 for idx, name in enumerate(names):
1198 # Wait for actual acquire on this lock to start
1199 lock_ev[name].wait(10.0)
1201 if expire and t.isAlive():
1202 # Wait some time after getting the notification to make sure the lock
1203 # acquire will expire
1204 SafeSleep(lockwait[idx])
1206 self.ls.release(names=name)
1208 self.assertFalse(self.ls._list_owned())
1213 # Not checking which locks were actually acquired. Doing so would be
1214 # too timing-dependant.
1215 self.assertEqual(self.done.get_nowait(), "timeout on all")
1218 self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1219 self.assertEqual(self.done.get_nowait(), "got all")
1220 self.assertRaises(Queue.Empty, self.done.get_nowait)
1223 def testConcurrentRemove(self):
1225 self.ls.acquire(['one', 'two', 'four'])
1226 self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1227 self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1228 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1229 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1230 self.assertRaises(Queue.Empty, self.done.get_nowait)
1231 self.ls.remove('one')
1235 self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1236 self.ls.add(['five', 'six'], acquired=1)
1237 self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1238 self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1239 self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1240 self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1241 self.ls.remove('five')
1245 self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1246 self.ls.acquire(['three', 'four'])
1247 self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1248 self.assertRaises(Queue.Empty, self.done.get_nowait)
1249 self.ls.remove('four')
1251 self.assertEqual(self.done.get_nowait(), ['six'])
1252 self._addThread(target=self._doRemoveSet, args=(['two']))
1254 self.assertEqual(self.done.get_nowait(), ['two'])
1260 def testConcurrentSharedSetLock(self):
1261 # share the set-lock...
1262 self.ls.acquire(None, shared=1)
1263 # ...another thread can share it too
1264 self._addThread(target=self._doLockSet, args=(None, 1))
1266 self.assertEqual(self.done.get_nowait(), 'DONE')
1267 # ...or just share some elements
1268 self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1270 self.assertEqual(self.done.get_nowait(), 'DONE')
1271 # ...but not add new ones or remove any
1272 t = self._addThread(target=self._doAddSet, args=(['nine']))
1273 self._addThread(target=self._doRemoveSet, args=(['two'], ))
1274 self.assertRaises(Queue.Empty, self.done.get_nowait)
1275 # this just releases the set-lock
1278 self.assertEqual(self.done.get_nowait(), 'DONE')
1279 # release the lock on the actual elements so remove() can proceed too
1282 self.failUnlessEqual(self.done.get_nowait(), ['two'])
1287 def testConcurrentExclusiveSetLock(self):
1288 # acquire the set-lock...
1289 self.ls.acquire(None, shared=0)
1290 # ...no one can do anything else
1291 self._addThread(target=self._doLockSet, args=(None, 1))
1292 self._addThread(target=self._doLockSet, args=(None, 0))
1293 self._addThread(target=self._doLockSet, args=(['three'], 0))
1294 self._addThread(target=self._doLockSet, args=(['two'], 1))
1295 self._addThread(target=self._doAddSet, args=(['nine']))
1296 self.assertRaises(Queue.Empty, self.done.get_nowait)
1300 self.assertEqual(self.done.get(True, 1), 'DONE')
1305 def testConcurrentSetLockAdd(self):
1306 self.ls.acquire('one')
1307 # Another thread wants the whole SetLock
1308 self._addThread(target=self._doLockSet, args=(None, 0))
1309 self._addThread(target=self._doLockSet, args=(None, 1))
1310 self.assertRaises(Queue.Empty, self.done.get_nowait)
1311 self.assertRaises(AssertionError, self.ls.add, 'four')
1314 self.assertEqual(self.done.get_nowait(), 'DONE')
1315 self.assertEqual(self.done.get_nowait(), 'DONE')
1316 self.ls.acquire(None)
1317 self._addThread(target=self._doLockSet, args=(None, 0))
1318 self._addThread(target=self._doLockSet, args=(None, 1))
1319 self.assertRaises(Queue.Empty, self.done.get_nowait)
1321 self.ls.add('five', acquired=1)
1322 self.ls.add('six', acquired=1, shared=1)
1323 self.assertEquals(self.ls._list_owned(),
1324 set(['one', 'two', 'three', 'five', 'six']))
1325 self.assertEquals(self.ls._is_owned(), True)
1326 self.assertEquals(self.ls._names(),
1327 set(['one', 'two', 'three', 'four', 'five', 'six']))
1330 self.assertEqual(self.done.get_nowait(), 'DONE')
1331 self.assertEqual(self.done.get_nowait(), 'DONE')
1335 def testEmptyLockSet(self):
1337 self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1339 self.ls.remove(['one', 'two', 'three'])
1340 # and adds/locks by another thread still wait
1341 self._addThread(target=self._doAddSet, args=(['nine']))
1342 self._addThread(target=self._doLockSet, args=(None, 1))
1343 self._addThread(target=self._doLockSet, args=(None, 0))
1344 self.assertRaises(Queue.Empty, self.done.get_nowait)
1348 self.assertEqual(self.done.get_nowait(), 'DONE')
1350 self.assertEqual(self.ls.remove(['nine']), ['nine'])
1352 self.assertEqual(self.ls.acquire(None, shared=1), set())
1353 # other sharers can go, adds still wait
1354 self._addThread(target=self._doLockSet, args=(None, 1))
1356 self.assertEqual(self.done.get_nowait(), 'DONE')
1357 self._addThread(target=self._doAddSet, args=(['nine']))
1358 self.assertRaises(Queue.Empty, self.done.get_nowait)
1361 self.assertEqual(self.done.get_nowait(), 'DONE')
1364 def testPriority(self):
1365 def _Acquire(prev, next, name, priority, success_fn):
1367 self.assert_(self.ls.acquire(name, shared=0,
1369 test_notify=lambda _: next.set()))
1375 # Get all in exclusive mode
1376 self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
1378 done_two = Queue.Queue(0)
1380 first = threading.Event()
1383 acquires = [("one", prio, self.done) for prio in range(1, 33)]
1384 acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
1386 # Use a deterministic random generator
1387 random.Random(741).shuffle(acquires)
1389 for (name, prio, done) in acquires:
1390 ev = threading.Event()
1391 self._addThread(target=_Acquire,
1392 args=(prev, ev, name, prio,
1393 compat.partial(done.put, "Prio%s" % prio)))
1399 # Wait for last acquire to start
1402 # Let threads acquire locks
1405 # Wait for threads to finish
1408 for i in range(1, 33):
1409 self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
1410 self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
1412 self.assertRaises(Queue.Empty, self.done.get_nowait)
1413 self.assertRaises(Queue.Empty, done_two.get_nowait)
1416 class TestGanetiLockManager(_ThreadedTestCase):
1419 _ThreadedTestCase.setUp(self)
1420 self.nodes=['n1', 'n2']
1421 self.nodegroups=['g1', 'g2']
1422 self.instances=['i1', 'i2', 'i3']
1423 self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups,
1427 # Don't try this at home...
1428 locking.GanetiLockManager._instance = None
1430 def testLockingConstants(self):
1431 # The locking library internally cheats by assuming its constants have some
1432 # relationships with each other. Check those hold true.
1433 # This relationship is also used in the Processor to recursively acquire
1434 # the right locks. Again, please don't break it.
1435 for i in range(len(locking.LEVELS)):
1436 self.assertEqual(i, locking.LEVELS[i])
1438 def testDoubleGLFails(self):
1439 self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [])
1441 def testLockNames(self):
1442 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1443 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1444 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1445 set(self.nodegroups))
1446 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1447 set(self.instances))
1449 def testInitAndResources(self):
1450 locking.GanetiLockManager._instance = None
1451 self.GL = locking.GanetiLockManager([], [], [])
1452 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1453 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1454 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1455 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1457 locking.GanetiLockManager._instance = None
1458 self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [])
1459 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1460 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1461 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1462 set(self.nodegroups))
1463 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1465 locking.GanetiLockManager._instance = None
1466 self.GL = locking.GanetiLockManager([], [], self.instances)
1467 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1468 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1469 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1470 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1471 set(self.instances))
1473 def testAcquireRelease(self):
1474 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1475 self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1476 self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1477 self.GL.acquire(locking.LEVEL_NODEGROUP, ['g2'])
1478 self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1479 self.GL.release(locking.LEVEL_NODE, ['n2'])
1480 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1481 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1482 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1483 self.GL.release(locking.LEVEL_NODE)
1484 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1485 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1486 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1487 self.GL.release(locking.LEVEL_NODEGROUP)
1488 self.GL.release(locking.LEVEL_INSTANCE)
1489 self.assertRaises(errors.LockError, self.GL.acquire,
1490 locking.LEVEL_INSTANCE, ['i5'])
1491 self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1492 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1494 def testAcquireWholeSets(self):
1495 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1496 self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1497 set(self.instances))
1498 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1499 set(self.instances))
1500 self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
1501 set(self.nodegroups))
1502 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP),
1503 set(self.nodegroups))
1504 self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1506 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1508 self.GL.release(locking.LEVEL_NODE)
1509 self.GL.release(locking.LEVEL_NODEGROUP)
1510 self.GL.release(locking.LEVEL_INSTANCE)
1511 self.GL.release(locking.LEVEL_CLUSTER)
1513 def testAcquireWholeAndPartial(self):
1514 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1515 self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1516 set(self.instances))
1517 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1518 set(self.instances))
1519 self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1521 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1523 self.GL.release(locking.LEVEL_NODE)
1524 self.GL.release(locking.LEVEL_INSTANCE)
1525 self.GL.release(locking.LEVEL_CLUSTER)
1527 def testBGLDependency(self):
1528 self.assertRaises(AssertionError, self.GL.acquire,
1529 locking.LEVEL_NODE, ['n1', 'n2'])
1530 self.assertRaises(AssertionError, self.GL.acquire,
1531 locking.LEVEL_INSTANCE, ['i3'])
1532 self.assertRaises(AssertionError, self.GL.acquire,
1533 locking.LEVEL_NODEGROUP, ['g1'])
1534 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1535 self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1536 self.assertRaises(AssertionError, self.GL.release,
1537 locking.LEVEL_CLUSTER, ['BGL'])
1538 self.assertRaises(AssertionError, self.GL.release,
1539 locking.LEVEL_CLUSTER)
1540 self.GL.release(locking.LEVEL_NODE)
1541 self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1542 self.assertRaises(AssertionError, self.GL.release,
1543 locking.LEVEL_CLUSTER, ['BGL'])
1544 self.assertRaises(AssertionError, self.GL.release,
1545 locking.LEVEL_CLUSTER)
1546 self.GL.release(locking.LEVEL_INSTANCE)
1547 self.GL.acquire(locking.LEVEL_NODEGROUP, None)
1548 self.GL.release(locking.LEVEL_NODEGROUP, ['g1'])
1549 self.assertRaises(AssertionError, self.GL.release,
1550 locking.LEVEL_CLUSTER, ['BGL'])
1551 self.assertRaises(AssertionError, self.GL.release,
1552 locking.LEVEL_CLUSTER)
1553 self.GL.release(locking.LEVEL_NODEGROUP)
1554 self.GL.release(locking.LEVEL_CLUSTER)
1556 def testWrongOrder(self):
1557 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1558 self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1559 self.assertRaises(AssertionError, self.GL.acquire,
1560 locking.LEVEL_NODE, ['n1'])
1561 self.assertRaises(AssertionError, self.GL.acquire,
1562 locking.LEVEL_NODEGROUP, ['g1'])
1563 self.assertRaises(AssertionError, self.GL.acquire,
1564 locking.LEVEL_INSTANCE, ['i2'])
1566 def testModifiableLevels(self):
1567 self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER,
1569 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'])
1570 self.GL.add(locking.LEVEL_INSTANCE, ['i4'])
1571 self.GL.remove(locking.LEVEL_INSTANCE, ['i3'])
1572 self.GL.remove(locking.LEVEL_INSTANCE, ['i1'])
1573 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(['i2', 'i4']))
1574 self.GL.add(locking.LEVEL_NODE, ['n3'])
1575 self.GL.remove(locking.LEVEL_NODE, ['n1'])
1576 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(['n2', 'n3']))
1577 self.GL.add(locking.LEVEL_NODEGROUP, ['g3'])
1578 self.GL.remove(locking.LEVEL_NODEGROUP, ['g2'])
1579 self.GL.remove(locking.LEVEL_NODEGROUP, ['g1'])
1580 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(['g3']))
1581 self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER,
1584 # Helper function to run as a thread that shared the BGL and then acquires
1585 # some locks at another level.
1586 def _doLock(self, level, names, shared):
1588 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1589 self.GL.acquire(level, names, shared=shared)
1590 self.done.put('DONE')
1591 self.GL.release(level)
1592 self.GL.release(locking.LEVEL_CLUSTER)
1593 except errors.LockError:
1594 self.done.put('ERR')
1597 def testConcurrency(self):
1598 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1599 self._addThread(target=self._doLock,
1600 args=(locking.LEVEL_INSTANCE, 'i1', 1))
1602 self.assertEqual(self.done.get_nowait(), 'DONE')
1603 self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1604 self._addThread(target=self._doLock,
1605 args=(locking.LEVEL_INSTANCE, 'i1', 1))
1607 self.assertEqual(self.done.get_nowait(), 'DONE')
1608 self._addThread(target=self._doLock,
1609 args=(locking.LEVEL_INSTANCE, 'i3', 1))
1610 self.assertRaises(Queue.Empty, self.done.get_nowait)
1611 self.GL.release(locking.LEVEL_INSTANCE)
1613 self.assertEqual(self.done.get_nowait(), 'DONE')
1614 self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1615 self._addThread(target=self._doLock,
1616 args=(locking.LEVEL_INSTANCE, 'i2', 1))
1618 self.assertEqual(self.done.get_nowait(), 'DONE')
1619 self._addThread(target=self._doLock,
1620 args=(locking.LEVEL_INSTANCE, 'i2', 0))
1621 self.assertRaises(Queue.Empty, self.done.get_nowait)
1622 self.GL.release(locking.LEVEL_INSTANCE)
1624 self.assertEqual(self.done.get(True, 1), 'DONE')
1625 self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1628 class TestLockMonitor(_ThreadedTestCase):
1630 _ThreadedTestCase.setUp(self)
1631 self.lm = locking.LockMonitor()
1633 def testSingleThread(self):
1636 for i in range(100):
1637 name = "TestLock%s" % i
1638 locks.append(locking.SharedLock(name, monitor=self.lm))
1640 self.assertEqual(len(self.lm._locks), len(locks))
1642 self.assertEqual(len(self.lm.QueryLocks(["name"], False)),
1648 # The garbage collector might needs some time
1651 raise utils.RetryAgain()
1653 utils.Retry(_CheckLocks, 0.1, 30.0)
1655 self.assertFalse(self.lm._locks)
1657 def testMultiThread(self):
1660 def _CreateLock(prev, next, name):
1662 locks.append(locking.SharedLock(name, monitor=self.lm))
1668 first = threading.Event()
1671 # Use a deterministic random generator
1672 for i in random.Random(4263).sample(range(100), 33):
1673 name = "MtTestLock%s" % i
1674 expnames.append(name)
1676 ev = threading.Event()
1677 self._addThread(target=_CreateLock, args=(prev, ev, name))
1684 # Check order in which locks were added
1685 self.assertEqual([i.name for i in locks], expnames)
1687 # Sync queries are not supported
1688 self.assertRaises(NotImplementedError, self.lm.QueryLocks, ["name"], True)
1690 # Check query result
1691 self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", "pending"],
1693 [[name, None, None, []]
1694 for name in utils.NiceSort(expnames)])
1696 # Test exclusive acquire
1697 for tlock in locks[::4]:
1698 tlock.acquire(shared=0)
1700 def _GetExpResult(name):
1701 if tlock.name == name:
1702 return [name, "exclusive", [threading.currentThread().getName()],
1704 return [name, None, None, []]
1706 self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner",
1708 [_GetExpResult(name)
1709 for name in utils.NiceSort(expnames)])
1713 # Test shared acquire
1714 def _Acquire(lock, shared, ev, notify):
1715 lock.acquire(shared=shared)
1722 for tlock1 in locks[::11]:
1723 for tlock2 in locks[::-15]:
1724 if tlock2 == tlock1:
1728 for tlock3 in locks[::10]:
1729 if tlock3 in (tlock2, tlock1):
1733 releaseev = threading.Event()
1739 ev = threading.Event()
1740 tthreads1.append(self._addThread(target=_Acquire,
1741 args=(tlock1, 1, releaseev, ev)))
1742 acquireev.append(ev)
1744 ev = threading.Event()
1745 tthread2 = self._addThread(target=_Acquire,
1746 args=(tlock2, 1, releaseev, ev))
1747 acquireev.append(ev)
1749 ev = threading.Event()
1750 tthread3 = self._addThread(target=_Acquire,
1751 args=(tlock3, 0, releaseev, ev))
1752 acquireev.append(ev)
1754 # Wait for all locks to be acquired
1758 # Check query result
1759 for (name, mode, owner) in self.lm.QueryLocks(["name", "mode",
1761 if name == tlock1.name:
1762 self.assertEqual(mode, "shared")
1763 self.assertEqual(set(owner), set(i.getName() for i in tthreads1))
1766 if name == tlock2.name:
1767 self.assertEqual(mode, "shared")
1768 self.assertEqual(owner, [tthread2.getName()])
1771 if name == tlock3.name:
1772 self.assertEqual(mode, "exclusive")
1773 self.assertEqual(owner, [tthread3.getName()])
1776 self.assert_(name in expnames)
1777 self.assert_(mode is None)
1778 self.assert_(owner is None)
1780 # Release locks again
1785 self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1787 for name in utils.NiceSort(expnames)])
1789 def testDelete(self):
1790 lock = locking.SharedLock("TestLock", monitor=self.lm)
1792 self.assertEqual(len(self.lm._locks), 1)
1793 self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1794 [[lock.name, None, None]])
1798 self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1799 [[lock.name, "deleted", None]])
1800 self.assertEqual(len(self.lm._locks), 1)
1802 def testPending(self):
1803 def _Acquire(lock, shared, prev, next):
1806 lock.acquire(shared=shared, test_notify=next.set)
1812 lock = locking.SharedLock("ExcLock", monitor=self.lm)
1814 for shared in [0, 1]:
1817 self.assertEqual(len(self.lm._locks), 1)
1818 self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1819 [[lock.name, "exclusive",
1820 [threading.currentThread().getName()]]])
1824 first = threading.Event()
1828 ev = threading.Event()
1829 threads.append(self._addThread(target=_Acquire,
1830 args=(lock, shared, prev, ev)))
1836 # Wait for last acquire to start waiting
1839 # NOTE: This works only because QueryLocks will acquire the
1840 # lock-internal lock again and won't be able to get the information
1841 # until it has the lock. By then the acquire should be registered in
1842 # SharedLock.__pending (otherwise it's a bug).
1844 # All acquires are waiting now
1846 pending = [("shared", sorted([t.getName() for t in threads]))]
1848 pending = [("exclusive", [t.getName()]) for t in threads]
1850 self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner",
1852 [[lock.name, "exclusive",
1853 [threading.currentThread().getName()],
1856 self.assertEqual(len(self.lm._locks), 1)
1862 # No pending acquires
1863 self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner", "pending"],
1865 [[lock.name, None, None, []]])
1867 self.assertEqual(len(self.lm._locks), 1)
1870 if __name__ == '__main__':
1871 testutils.GanetiTestProgram()