4 # Copyright (C) 2006, 2007, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the locking module"""
33 from ganeti import constants
34 from ganeti import locking
35 from ganeti import errors
36 from ganeti import utils
37 from ganeti import compat
38 from ganeti import objects
39 from ganeti import query
44 # This is used to test the ssynchronize decorator.
45 # Since it's passed as input to a decorator it must be declared as a global.
46 _decoratorlock = locking.SharedLock("decorator lock")
48 #: List for looping tests
53 """Decorator for executing a function many times"""
54 def wrapper(*args, **kwargs):
60 def SafeSleep(duration):
63 delay = start + duration - time.time()
69 class _ThreadedTestCase(unittest.TestCase):
70 """Test class that supports adding/waiting on threads"""
72 unittest.TestCase.setUp(self)
73 self.done = Queue.Queue(0)
76 def _addThread(self, *args, **kwargs):
77 """Create and remember a new thread"""
78 t = threading.Thread(*args, **kwargs)
79 self.threads.append(t)
83 def _waitThreads(self):
84 """Wait for all our threads to finish"""
85 for t in self.threads:
87 self.failIf(t.isAlive())
91 class _ConditionTestCase(_ThreadedTestCase):
92 """Common test case for conditions"""
95 _ThreadedTestCase.setUp(self)
96 self.lock = threading.Lock()
97 self.cond = cls(self.lock)
99 def _testAcquireRelease(self):
100 self.assertFalse(self.cond._is_owned())
101 self.assertRaises(RuntimeError, self.cond.wait)
102 self.assertRaises(RuntimeError, self.cond.notifyAll)
105 self.assert_(self.cond._is_owned())
106 self.cond.notifyAll()
107 self.assert_(self.cond._is_owned())
110 self.assertFalse(self.cond._is_owned())
111 self.assertRaises(RuntimeError, self.cond.wait)
112 self.assertRaises(RuntimeError, self.cond.notifyAll)
114 def _testNotification(self):
119 self.cond.notifyAll()
124 self._addThread(target=_NotifyAll)
125 self.assertEqual(self.done.get(True, 1), "NE")
126 self.assertRaises(Queue.Empty, self.done.get_nowait)
128 self.assertEqual(self.done.get(True, 1), "NA")
129 self.assertEqual(self.done.get(True, 1), "NN")
130 self.assert_(self.cond._is_owned())
132 self.assertFalse(self.cond._is_owned())
135 class TestSingleNotifyPipeCondition(_ConditionTestCase):
136 """SingleNotifyPipeCondition tests"""
139 _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
141 def testAcquireRelease(self):
142 self._testAcquireRelease()
144 def testNotification(self):
145 self._testNotification()
147 def testWaitReuse(self):
153 def testNoNotifyReuse(self):
155 self.cond.notifyAll()
156 self.assertRaises(RuntimeError, self.cond.wait)
157 self.assertRaises(RuntimeError, self.cond.notifyAll)
161 class TestPipeCondition(_ConditionTestCase):
162 """PipeCondition tests"""
165 _ConditionTestCase.setUp(self, locking.PipeCondition)
167 def testAcquireRelease(self):
168 self._testAcquireRelease()
170 def testNotification(self):
171 self._testNotification()
173 def _TestWait(self, fn):
175 self._addThread(target=fn),
176 self._addThread(target=fn),
177 self._addThread(target=fn),
180 # Wait for threads to be waiting
182 self.assertEqual(self.done.get(True, 1), "A")
184 self.assertRaises(Queue.Empty, self.done.get_nowait)
187 self.assertEqual(len(self.cond._waiters), 3)
188 self.assertEqual(self.cond._waiters, set(threads))
189 # This new thread can't acquire the lock, and thus call wait, before we
191 self._addThread(target=fn)
192 self.cond.notifyAll()
193 self.assertRaises(Queue.Empty, self.done.get_nowait)
196 # We should now get 3 W and 1 A (for the new thread) in whatever order
200 got = self.done.get(True, 1)
206 self.fail("Got %s on the done queue" % got)
208 self.assertEqual(w, 3)
209 self.assertEqual(a, 1)
212 self.cond.notifyAll()
215 self.assertEqual(self.done.get_nowait(), "W")
216 self.assertRaises(Queue.Empty, self.done.get_nowait)
218 def testBlockingWait(self):
226 self._TestWait(_BlockingWait)
228 def testLongTimeoutWait(self):
236 self._TestWait(_Helper)
238 def _TimeoutWait(self, timeout, check):
240 self.cond.wait(timeout)
244 def testShortTimeoutWait(self):
245 self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
246 self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
248 self.assertEqual(self.done.get_nowait(), "T1")
249 self.assertEqual(self.done.get_nowait(), "T1")
250 self.assertRaises(Queue.Empty, self.done.get_nowait)
252 def testZeroTimeoutWait(self):
253 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
254 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
255 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
257 self.assertEqual(self.done.get_nowait(), "T0")
258 self.assertEqual(self.done.get_nowait(), "T0")
259 self.assertEqual(self.done.get_nowait(), "T0")
260 self.assertRaises(Queue.Empty, self.done.get_nowait)
263 class TestSharedLock(_ThreadedTestCase):
264 """SharedLock tests"""
267 _ThreadedTestCase.setUp(self)
268 self.sl = locking.SharedLock("TestSharedLock")
270 def testSequenceAndOwnership(self):
271 self.assertFalse(self.sl._is_owned())
272 self.sl.acquire(shared=1)
273 self.assert_(self.sl._is_owned())
274 self.assert_(self.sl._is_owned(shared=1))
275 self.assertFalse(self.sl._is_owned(shared=0))
277 self.assertFalse(self.sl._is_owned())
279 self.assert_(self.sl._is_owned())
280 self.assertFalse(self.sl._is_owned(shared=1))
281 self.assert_(self.sl._is_owned(shared=0))
283 self.assertFalse(self.sl._is_owned())
284 self.sl.acquire(shared=1)
285 self.assert_(self.sl._is_owned())
286 self.assert_(self.sl._is_owned(shared=1))
287 self.assertFalse(self.sl._is_owned(shared=0))
289 self.assertFalse(self.sl._is_owned())
291 def testBooleanValue(self):
292 # semaphores are supposed to return a true value on a successful acquire
293 self.assert_(self.sl.acquire(shared=1))
295 self.assert_(self.sl.acquire())
298 def testDoubleLockingStoE(self):
299 self.sl.acquire(shared=1)
300 self.assertRaises(AssertionError, self.sl.acquire)
302 def testDoubleLockingEtoS(self):
304 self.assertRaises(AssertionError, self.sl.acquire, shared=1)
306 def testDoubleLockingStoS(self):
307 self.sl.acquire(shared=1)
308 self.assertRaises(AssertionError, self.sl.acquire, shared=1)
310 def testDoubleLockingEtoE(self):
312 self.assertRaises(AssertionError, self.sl.acquire)
314 # helper functions: called in a separate thread they acquire the lock, send
315 # their identifier on the done queue, then release it.
316 def _doItSharer(self):
318 self.sl.acquire(shared=1)
321 except errors.LockError:
324 def _doItExclusive(self):
329 except errors.LockError:
332 def _doItDelete(self):
336 except errors.LockError:
339 def testSharersCanCoexist(self):
340 self.sl.acquire(shared=1)
341 threading.Thread(target=self._doItSharer).start()
342 self.assert_(self.done.get(True, 1))
346 def testExclusiveBlocksExclusive(self):
348 self._addThread(target=self._doItExclusive)
349 self.assertRaises(Queue.Empty, self.done.get_nowait)
352 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
355 def testExclusiveBlocksDelete(self):
357 self._addThread(target=self._doItDelete)
358 self.assertRaises(Queue.Empty, self.done.get_nowait)
361 self.failUnlessEqual(self.done.get_nowait(), 'DEL')
362 self.sl = locking.SharedLock(self.sl.name)
365 def testExclusiveBlocksSharer(self):
367 self._addThread(target=self._doItSharer)
368 self.assertRaises(Queue.Empty, self.done.get_nowait)
371 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
374 def testSharerBlocksExclusive(self):
375 self.sl.acquire(shared=1)
376 self._addThread(target=self._doItExclusive)
377 self.assertRaises(Queue.Empty, self.done.get_nowait)
380 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
383 def testSharerBlocksDelete(self):
384 self.sl.acquire(shared=1)
385 self._addThread(target=self._doItDelete)
386 self.assertRaises(Queue.Empty, self.done.get_nowait)
389 self.failUnlessEqual(self.done.get_nowait(), 'DEL')
390 self.sl = locking.SharedLock(self.sl.name)
393 def testWaitingExclusiveBlocksSharer(self):
394 """SKIPPED testWaitingExclusiveBlockSharer"""
397 self.sl.acquire(shared=1)
398 # the lock is acquired in shared mode...
399 self._addThread(target=self._doItExclusive)
400 # ...but now an exclusive is waiting...
401 self._addThread(target=self._doItSharer)
402 # ...so the sharer should be blocked as well
403 self.assertRaises(Queue.Empty, self.done.get_nowait)
406 # The exclusive passed before
407 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
408 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
411 def testWaitingSharerBlocksExclusive(self):
412 """SKIPPED testWaitingSharerBlocksExclusive"""
416 # the lock is acquired in exclusive mode...
417 self._addThread(target=self._doItSharer)
418 # ...but now a sharer is waiting...
419 self._addThread(target=self._doItExclusive)
420 # ...the exclusive is waiting too...
421 self.assertRaises(Queue.Empty, self.done.get_nowait)
424 # The sharer passed before
425 self.assertEqual(self.done.get_nowait(), 'SHR')
426 self.assertEqual(self.done.get_nowait(), 'EXC')
428 def testDelete(self):
430 self.assertRaises(errors.LockError, self.sl.acquire)
431 self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
432 self.assertRaises(errors.LockError, self.sl.delete)
434 def testDeleteTimeout(self):
435 self.sl.delete(timeout=60)
437 def testNoDeleteIfSharer(self):
438 self.sl.acquire(shared=1)
439 self.assertRaises(AssertionError, self.sl.delete)
442 def testDeletePendingSharersExclusiveDelete(self):
444 self._addThread(target=self._doItSharer)
445 self._addThread(target=self._doItSharer)
446 self._addThread(target=self._doItExclusive)
447 self._addThread(target=self._doItDelete)
450 # The threads who were pending return ERR
452 self.assertEqual(self.done.get_nowait(), 'ERR')
453 self.sl = locking.SharedLock(self.sl.name)
456 def testDeletePendingDeleteExclusiveSharers(self):
458 self._addThread(target=self._doItDelete)
459 self._addThread(target=self._doItExclusive)
460 self._addThread(target=self._doItSharer)
461 self._addThread(target=self._doItSharer)
464 # The two threads who were pending return both ERR
465 self.assertEqual(self.done.get_nowait(), 'ERR')
466 self.assertEqual(self.done.get_nowait(), 'ERR')
467 self.assertEqual(self.done.get_nowait(), 'ERR')
468 self.assertEqual(self.done.get_nowait(), 'ERR')
469 self.sl = locking.SharedLock(self.sl.name)
472 def testExclusiveAcquireTimeout(self):
473 for shared in [0, 1]:
474 on_queue = threading.Event()
475 release_exclusive = threading.Event()
477 def _LockExclusive():
478 self.sl.acquire(shared=0, test_notify=on_queue.set)
479 self.done.put("A: start wait")
480 release_exclusive.wait()
481 self.done.put("A: end wait")
484 # Start thread to hold lock in exclusive mode
485 self._addThread(target=_LockExclusive)
487 # Wait for wait to begin
488 self.assertEqual(self.done.get(timeout=60), "A: start wait")
490 # Wait up to 60s to get lock, but release exclusive lock as soon as we're
492 self.failUnless(self.sl.acquire(shared=shared, timeout=60,
493 test_notify=release_exclusive.set))
495 self.done.put("got 2nd")
500 self.assertEqual(self.done.get_nowait(), "A: end wait")
501 self.assertEqual(self.done.get_nowait(), "got 2nd")
502 self.assertRaises(Queue.Empty, self.done.get_nowait)
505 def testAcquireExpiringTimeout(self):
506 def _AcquireWithTimeout(shared, timeout):
507 if not self.sl.acquire(shared=shared, timeout=timeout):
508 self.done.put("timeout")
510 for shared in [0, 1]:
514 # Start shared acquires with timeout between 0 and 20 ms
516 self._addThread(target=_AcquireWithTimeout,
517 args=(shared, i * 2.0 / 1000.0))
519 # Wait for threads to finish (makes sure the acquire timeout expires
520 # before releasing the lock)
527 self.assertEqual(self.done.get_nowait(), "timeout")
529 self.assertRaises(Queue.Empty, self.done.get_nowait)
532 def testSharedSkipExclusiveAcquires(self):
533 # Tests whether shared acquires jump in front of exclusive acquires in the
536 def _Acquire(shared, name, notify_ev, wait_ev):
538 notify_fn = notify_ev.set
545 if not self.sl.acquire(shared=shared, test_notify=notify_fn):
551 # Get exclusive lock while we fill the queue
559 # Add acquires using threading.Event for synchronization. They'll be
560 # acquired exactly in the order defined in this list.
561 acquires = (shrcnt1 * [(1, "shared 1")] +
562 3 * [(0, "exclusive 1")] +
563 shrcnt2 * [(1, "shared 2")] +
564 shrcnt3 * [(1, "shared 3")] +
565 shrcnt4 * [(1, "shared 4")] +
566 3 * [(0, "exclusive 2")])
571 for args in acquires:
572 ev_cur = threading.Event()
573 self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
576 # Wait for last acquire to start
579 # Expect 6 pending exclusive acquires and 1 for all shared acquires
581 self.assertEqual(self.sl._count_pending(), 7)
583 # Release exclusive lock and wait
589 for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
590 # Shared locks aren't guaranteed to be notified in order, but they'll be
592 tmp = self.done.get_nowait()
593 if tmp == "shared 1":
595 elif tmp == "shared 2":
597 elif tmp == "shared 3":
599 elif tmp == "shared 4":
601 self.assertEqual(shrcnt1, 0)
602 self.assertEqual(shrcnt2, 0)
603 self.assertEqual(shrcnt3, 0)
604 self.assertEqual(shrcnt3, 0)
607 self.assertEqual(self.done.get_nowait(), "exclusive 1")
610 self.assertEqual(self.done.get_nowait(), "exclusive 2")
612 self.assertRaises(Queue.Empty, self.done.get_nowait)
615 def testMixedAcquireTimeout(self):
616 sync = threading.Event()
618 def _AcquireShared(ev):
619 if not self.sl.acquire(shared=1, timeout=None):
622 self.done.put("shared")
627 # Wait for notification from main thread
635 ev = threading.Event()
636 self._addThread(target=_AcquireShared, args=(ev, ))
639 # Wait for all acquires to finish
643 self.assertEqual(self.sl._count_pending(), 0)
645 # Try to get exclusive lock
646 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
648 # Acquire exclusive without timeout
649 exclsync = threading.Event()
650 exclev = threading.Event()
652 def _AcquireExclusive():
653 if not self.sl.acquire(shared=0):
656 self.done.put("exclusive")
661 # Wait for notification from main thread
666 self._addThread(target=_AcquireExclusive)
668 # Try to get exclusive lock
669 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
671 # Make all shared holders release their locks
674 # Wait for exclusive acquire to succeed
677 self.assertEqual(self.sl._count_pending(), 0)
679 # Try to get exclusive lock
680 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
682 def _AcquireSharedSimple():
683 if self.sl.acquire(shared=1, timeout=None):
684 self.done.put("shared2")
688 self._addThread(target=_AcquireSharedSimple)
690 # Tell exclusive lock to release
693 # Wait for everything to finish
696 self.assertEqual(self.sl._count_pending(), 0)
700 self.assertEqual(self.done.get_nowait(), "shared")
702 self.assertEqual(self.done.get_nowait(), "exclusive")
705 self.assertEqual(self.done.get_nowait(), "shared2")
707 self.assertRaises(Queue.Empty, self.done.get_nowait)
709 def testPriority(self):
710 # Acquire in exclusive mode
711 self.assert_(self.sl.acquire(shared=0))
714 def _Acquire(prev, next, shared, priority, result):
716 self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
718 self.done.put(result)
722 counter = itertools.count(0)
723 priorities = range(-20, 30)
724 first = threading.Event()
730 # [(shared/exclusive, set(acquire names), set(pending threads)),
731 # (shared/exclusive, ...),
737 # References shared acquire per priority in L{perprio}. Data structure:
739 # priority: (shared=1, set(acquire names), set(pending threads)),
743 for seed in [4979, 9523, 14902, 32440]:
744 # Use a deterministic random generator
745 rnd = random.Random(seed)
746 for priority in [rnd.choice(priorities) for _ in range(30)]:
751 acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
753 ev = threading.Event()
754 thread = self._addThread(target=_Acquire,
755 args=(prev, ev, shared, priority, acqname))
758 # Record expected aqcuire, see above for structure
759 data = (shared, set([acqname]), set([thread]))
760 priolist = perprio.setdefault(priority, [])
762 priosh = prioshared.get(priority, None)
764 # Shared acquires are merged
765 for i, j in zip(priosh[1:], data[1:]):
767 assert data[0] == priosh[0]
769 prioshared[priority] = data
770 priolist.append(data)
772 priolist.append(data)
774 # Start all acquires and wait for them
778 # Check lock information
779 self.assertEqual(self.sl.GetInfo(set()), (self.sl.name, None, None, None))
780 self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER])),
781 (self.sl.name, "exclusive",
782 [threading.currentThread().getName()], None))
784 self._VerifyPrioPending(self.sl.GetInfo(set([query.LQ_PENDING])), perprio)
786 # Let threads acquire the lock
789 # Wait for everything to finish
792 self.assert_(self.sl._check_empty())
794 # Check acquires by priority
795 for acquires in [perprio[i] for i in sorted(perprio.keys())]:
796 for (_, names, _) in acquires:
797 # For shared acquires, the set will contain 1..n entries. For exclusive
800 names.remove(self.done.get_nowait())
801 self.assertFalse(compat.any(names for (_, names, _) in acquires))
803 self.assertRaises(Queue.Empty, self.done.get_nowait)
805 def _VerifyPrioPending(self, (name, mode, owner, pending), perprio):
806 self.assertEqual(name, self.sl.name)
807 self.assert_(mode is None)
808 self.assert_(owner is None)
810 self.assertEqual([(pendmode, sorted(waiting))
811 for (pendmode, waiting) in pending],
812 [(["exclusive", "shared"][int(bool(shared))],
813 sorted(t.getName() for t in threads))
814 for acquires in [perprio[i]
815 for i in sorted(perprio.keys())]
816 for (shared, _, threads) in acquires])
819 class TestSharedLockInCondition(_ThreadedTestCase):
820 """SharedLock as a condition lock tests"""
823 _ThreadedTestCase.setUp(self)
824 self.sl = locking.SharedLock("TestSharedLockInCondition")
827 def setCondition(self):
828 self.cond = threading.Condition(self.sl)
830 def testKeepMode(self):
831 self.cond.acquire(shared=1)
832 self.assert_(self.sl._is_owned(shared=1))
834 self.assert_(self.sl._is_owned(shared=1))
836 self.cond.acquire(shared=0)
837 self.assert_(self.sl._is_owned(shared=0))
839 self.assert_(self.sl._is_owned(shared=0))
843 class TestSharedLockInPipeCondition(TestSharedLockInCondition):
844 """SharedLock as a pipe condition lock tests"""
846 def setCondition(self):
847 self.cond = locking.PipeCondition(self.sl)
850 class TestSSynchronizedDecorator(_ThreadedTestCase):
851 """Shared Lock Synchronized decorator test"""
854 _ThreadedTestCase.setUp(self)
856 @locking.ssynchronized(_decoratorlock)
857 def _doItExclusive(self):
858 self.assert_(_decoratorlock._is_owned())
861 @locking.ssynchronized(_decoratorlock, shared=1)
862 def _doItSharer(self):
863 self.assert_(_decoratorlock._is_owned(shared=1))
866 def testDecoratedFunctions(self):
867 self._doItExclusive()
868 self.assertFalse(_decoratorlock._is_owned())
870 self.assertFalse(_decoratorlock._is_owned())
872 def testSharersCanCoexist(self):
873 _decoratorlock.acquire(shared=1)
874 threading.Thread(target=self._doItSharer).start()
875 self.assert_(self.done.get(True, 1))
876 _decoratorlock.release()
879 def testExclusiveBlocksExclusive(self):
880 _decoratorlock.acquire()
881 self._addThread(target=self._doItExclusive)
882 # give it a bit of time to check that it's not actually doing anything
883 self.assertRaises(Queue.Empty, self.done.get_nowait)
884 _decoratorlock.release()
886 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
889 def testExclusiveBlocksSharer(self):
890 _decoratorlock.acquire()
891 self._addThread(target=self._doItSharer)
892 self.assertRaises(Queue.Empty, self.done.get_nowait)
893 _decoratorlock.release()
895 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
898 def testSharerBlocksExclusive(self):
899 _decoratorlock.acquire(shared=1)
900 self._addThread(target=self._doItExclusive)
901 self.assertRaises(Queue.Empty, self.done.get_nowait)
902 _decoratorlock.release()
904 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
907 class TestLockSet(_ThreadedTestCase):
911 _ThreadedTestCase.setUp(self)
915 """Helper to (re)initialize the lock set"""
916 self.resources = ['one', 'two', 'three']
917 self.ls = locking.LockSet(self.resources, "TestLockSet")
919 def testResources(self):
920 self.assertEquals(self.ls._names(), set(self.resources))
921 newls = locking.LockSet([], "TestLockSet.testResources")
922 self.assertEquals(newls._names(), set())
924 def testAcquireRelease(self):
925 self.assert_(self.ls.acquire('one'))
926 self.assertEquals(self.ls._list_owned(), set(['one']))
928 self.assertEquals(self.ls._list_owned(), set())
929 self.assertEquals(self.ls.acquire(['one']), set(['one']))
930 self.assertEquals(self.ls._list_owned(), set(['one']))
932 self.assertEquals(self.ls._list_owned(), set())
933 self.ls.acquire(['one', 'two', 'three'])
934 self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
935 self.ls.release('one')
936 self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
937 self.ls.release(['three'])
938 self.assertEquals(self.ls._list_owned(), set(['two']))
940 self.assertEquals(self.ls._list_owned(), set())
941 self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
942 self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
944 self.assertEquals(self.ls._list_owned(), set())
946 def testNoDoubleAcquire(self):
947 self.ls.acquire('one')
948 self.assertRaises(AssertionError, self.ls.acquire, 'one')
949 self.assertRaises(AssertionError, self.ls.acquire, ['two'])
950 self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
952 self.ls.acquire(['one', 'three'])
953 self.ls.release('one')
954 self.assertRaises(AssertionError, self.ls.acquire, ['two'])
955 self.ls.release('three')
957 def testNoWrongRelease(self):
958 self.assertRaises(AssertionError, self.ls.release)
959 self.ls.acquire('one')
960 self.assertRaises(AssertionError, self.ls.release, 'two')
962 def testAddRemove(self):
964 self.assertEquals(self.ls._list_owned(), set())
965 self.assert_('four' in self.ls._names())
966 self.ls.add(['five', 'six', 'seven'], acquired=1)
967 self.assert_('five' in self.ls._names())
968 self.assert_('six' in self.ls._names())
969 self.assert_('seven' in self.ls._names())
970 self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
971 self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
972 self.assert_('five' not in self.ls._names())
973 self.assert_('six' not in self.ls._names())
974 self.assertEquals(self.ls._list_owned(), set(['seven']))
975 self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
976 self.ls.remove('seven')
977 self.assert_('seven' not in self.ls._names())
978 self.assertEquals(self.ls._list_owned(), set([]))
979 self.ls.acquire(None, shared=1)
980 self.assertRaises(AssertionError, self.ls.add, 'eight')
982 self.ls.acquire(None)
983 self.ls.add('eight', acquired=1)
984 self.assert_('eight' in self.ls._names())
985 self.assert_('eight' in self.ls._list_owned())
987 self.assert_('nine' in self.ls._names())
988 self.assert_('nine' not in self.ls._list_owned())
990 self.ls.remove(['two'])
991 self.assert_('two' not in self.ls._names())
992 self.ls.acquire('three')
993 self.assertEquals(self.ls.remove(['three']), ['three'])
994 self.assert_('three' not in self.ls._names())
995 self.assertEquals(self.ls.remove('three'), [])
996 self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
997 self.assert_('one' not in self.ls._names())
999 def testRemoveNonBlocking(self):
1000 self.ls.acquire('one')
1001 self.assertEquals(self.ls.remove('one'), ['one'])
1002 self.ls.acquire(['two', 'three'])
1003 self.assertEquals(self.ls.remove(['two', 'three']),
1006 def testNoDoubleAdd(self):
1007 self.assertRaises(errors.LockError, self.ls.add, 'two')
1009 self.assertRaises(errors.LockError, self.ls.add, 'four')
1011 def testNoWrongRemoves(self):
1012 self.ls.acquire(['one', 'three'], shared=1)
1013 # Cannot remove 'two' while holding something which is not a superset
1014 self.assertRaises(AssertionError, self.ls.remove, 'two')
1015 # Cannot remove 'three' as we are sharing it
1016 self.assertRaises(AssertionError, self.ls.remove, 'three')
1018 def testAcquireSetLock(self):
1019 # acquire the set-lock exclusively
1020 self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
1021 self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
1022 self.assertEquals(self.ls._is_owned(), True)
1023 self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
1024 # I can still add/remove elements...
1025 self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
1026 self.assert_(self.ls.add('six'))
1028 # share the set-lock
1029 self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
1030 # adding new elements is not possible
1031 self.assertRaises(AssertionError, self.ls.add, 'five')
1034 def testAcquireWithRepetitions(self):
1035 self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
1036 set(['two', 'two', 'three']))
1037 self.ls.release(['two', 'two'])
1038 self.assertEquals(self.ls._list_owned(), set(['three']))
1040 def testEmptyAcquire(self):
1041 # Acquire an empty list of locks...
1042 self.assertEquals(self.ls.acquire([]), set())
1043 self.assertEquals(self.ls._list_owned(), set())
1044 # New locks can still be addded
1045 self.assert_(self.ls.add('six'))
1046 # "re-acquiring" is not an issue, since we had really acquired nothing
1047 self.assertEquals(self.ls.acquire([], shared=1), set())
1048 self.assertEquals(self.ls._list_owned(), set())
1049 # We haven't really acquired anything, so we cannot release
1050 self.assertRaises(AssertionError, self.ls.release)
1052 def _doLockSet(self, names, shared):
1054 self.ls.acquire(names, shared=shared)
1055 self.done.put('DONE')
1057 except errors.LockError:
1058 self.done.put('ERR')
1060 def _doAddSet(self, names):
1062 self.ls.add(names, acquired=1)
1063 self.done.put('DONE')
1065 except errors.LockError:
1066 self.done.put('ERR')
1068 def _doRemoveSet(self, names):
1069 self.done.put(self.ls.remove(names))
1072 def testConcurrentSharedAcquire(self):
1073 self.ls.acquire(['one', 'two'], shared=1)
1074 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1076 self.assertEqual(self.done.get_nowait(), 'DONE')
1077 self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
1079 self.assertEqual(self.done.get_nowait(), 'DONE')
1080 self._addThread(target=self._doLockSet, args=('three', 1))
1082 self.assertEqual(self.done.get_nowait(), 'DONE')
1083 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1084 self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1085 self.assertRaises(Queue.Empty, self.done.get_nowait)
1088 self.assertEqual(self.done.get_nowait(), 'DONE')
1089 self.assertEqual(self.done.get_nowait(), 'DONE')
1092 def testConcurrentExclusiveAcquire(self):
1093 self.ls.acquire(['one', 'two'])
1094 self._addThread(target=self._doLockSet, args=('three', 1))
1096 self.assertEqual(self.done.get_nowait(), 'DONE')
1097 self._addThread(target=self._doLockSet, args=('three', 0))
1099 self.assertEqual(self.done.get_nowait(), 'DONE')
1100 self.assertRaises(Queue.Empty, self.done.get_nowait)
1101 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1102 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1103 self._addThread(target=self._doLockSet, args=('one', 0))
1104 self._addThread(target=self._doLockSet, args=('one', 1))
1105 self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1106 self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
1107 self.assertRaises(Queue.Empty, self.done.get_nowait)
1111 self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1114 def testSimpleAcquireTimeoutExpiring(self):
1115 names = sorted(self.ls._names())
1116 self.assert_(len(names) >= 3)
1118 # Get name of first lock
1121 # Get name of last lock
1125 # Block first and try to lock it again
1128 # Block last and try to lock all locks
1131 # Block last and try to lock it again
1135 for (wanted, block) in checks:
1136 # Lock in exclusive mode
1137 self.assert_(self.ls.acquire(block, shared=0))
1140 # Try to get the same lock again with a timeout (should never succeed)
1141 acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1143 self.done.put("acquired")
1146 self.assert_(acquired is None)
1147 self.assertFalse(self.ls._list_owned())
1148 self.assertFalse(self.ls._is_owned())
1149 self.done.put("not acquired")
1151 self._addThread(target=_AcquireOne)
1153 # Wait for timeout in thread to expire
1156 # Release exclusive lock again
1159 self.assertEqual(self.done.get_nowait(), "not acquired")
1160 self.assertRaises(Queue.Empty, self.done.get_nowait)
1163 def testDelayedAndExpiringLockAcquire(self):
1165 self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1167 for expire in (False, True):
1168 names = sorted(self.ls._names())
1169 self.assertEqual(len(names), 8)
1171 lock_ev = dict([(i, threading.Event()) for i in names])
1173 # Lock all in exclusive mode
1174 self.assert_(self.ls.acquire(names, shared=0))
1177 # We'll wait at least 300ms per lock
1178 lockwait = len(names) * [0.3]
1180 # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1181 # this gives us up to 2.4s to fail.
1182 lockall_timeout = 0.4
1184 # This should finish rather quickly
1186 lockall_timeout = len(names) * 5.0
1189 def acquire_notification(name):
1191 self.done.put("getting %s" % name)
1196 if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1197 test_notify=acquire_notification):
1198 self.done.put("got all")
1201 self.done.put("timeout on all")
1204 for ev in lock_ev.values():
1207 t = self._addThread(target=_LockAll)
1209 for idx, name in enumerate(names):
1210 # Wait for actual acquire on this lock to start
1211 lock_ev[name].wait(10.0)
1213 if expire and t.isAlive():
1214 # Wait some time after getting the notification to make sure the lock
1215 # acquire will expire
1216 SafeSleep(lockwait[idx])
1218 self.ls.release(names=name)
1220 self.assertFalse(self.ls._list_owned())
1225 # Not checking which locks were actually acquired. Doing so would be
1226 # too timing-dependant.
1227 self.assertEqual(self.done.get_nowait(), "timeout on all")
1230 self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1231 self.assertEqual(self.done.get_nowait(), "got all")
1232 self.assertRaises(Queue.Empty, self.done.get_nowait)
1235 def testConcurrentRemove(self):
1237 self.ls.acquire(['one', 'two', 'four'])
1238 self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1239 self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1240 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1241 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1242 self.assertRaises(Queue.Empty, self.done.get_nowait)
1243 self.ls.remove('one')
1247 self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1248 self.ls.add(['five', 'six'], acquired=1)
1249 self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1250 self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1251 self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1252 self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1253 self.ls.remove('five')
1257 self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1258 self.ls.acquire(['three', 'four'])
1259 self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1260 self.assertRaises(Queue.Empty, self.done.get_nowait)
1261 self.ls.remove('four')
1263 self.assertEqual(self.done.get_nowait(), ['six'])
1264 self._addThread(target=self._doRemoveSet, args=(['two']))
1266 self.assertEqual(self.done.get_nowait(), ['two'])
1272 def testConcurrentSharedSetLock(self):
1273 # share the set-lock...
1274 self.ls.acquire(None, shared=1)
1275 # ...another thread can share it too
1276 self._addThread(target=self._doLockSet, args=(None, 1))
1278 self.assertEqual(self.done.get_nowait(), 'DONE')
1279 # ...or just share some elements
1280 self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1282 self.assertEqual(self.done.get_nowait(), 'DONE')
1283 # ...but not add new ones or remove any
1284 t = self._addThread(target=self._doAddSet, args=(['nine']))
1285 self._addThread(target=self._doRemoveSet, args=(['two'], ))
1286 self.assertRaises(Queue.Empty, self.done.get_nowait)
1287 # this just releases the set-lock
1290 self.assertEqual(self.done.get_nowait(), 'DONE')
1291 # release the lock on the actual elements so remove() can proceed too
1294 self.failUnlessEqual(self.done.get_nowait(), ['two'])
1299 def testConcurrentExclusiveSetLock(self):
1300 # acquire the set-lock...
1301 self.ls.acquire(None, shared=0)
1302 # ...no one can do anything else
1303 self._addThread(target=self._doLockSet, args=(None, 1))
1304 self._addThread(target=self._doLockSet, args=(None, 0))
1305 self._addThread(target=self._doLockSet, args=(['three'], 0))
1306 self._addThread(target=self._doLockSet, args=(['two'], 1))
1307 self._addThread(target=self._doAddSet, args=(['nine']))
1308 self.assertRaises(Queue.Empty, self.done.get_nowait)
1312 self.assertEqual(self.done.get(True, 1), 'DONE')
1317 def testConcurrentSetLockAdd(self):
1318 self.ls.acquire('one')
1319 # Another thread wants the whole SetLock
1320 self._addThread(target=self._doLockSet, args=(None, 0))
1321 self._addThread(target=self._doLockSet, args=(None, 1))
1322 self.assertRaises(Queue.Empty, self.done.get_nowait)
1323 self.assertRaises(AssertionError, self.ls.add, 'four')
1326 self.assertEqual(self.done.get_nowait(), 'DONE')
1327 self.assertEqual(self.done.get_nowait(), 'DONE')
1328 self.ls.acquire(None)
1329 self._addThread(target=self._doLockSet, args=(None, 0))
1330 self._addThread(target=self._doLockSet, args=(None, 1))
1331 self.assertRaises(Queue.Empty, self.done.get_nowait)
1333 self.ls.add('five', acquired=1)
1334 self.ls.add('six', acquired=1, shared=1)
1335 self.assertEquals(self.ls._list_owned(),
1336 set(['one', 'two', 'three', 'five', 'six']))
1337 self.assertEquals(self.ls._is_owned(), True)
1338 self.assertEquals(self.ls._names(),
1339 set(['one', 'two', 'three', 'four', 'five', 'six']))
1342 self.assertEqual(self.done.get_nowait(), 'DONE')
1343 self.assertEqual(self.done.get_nowait(), 'DONE')
1347 def testEmptyLockSet(self):
1349 self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1351 self.ls.remove(['one', 'two', 'three'])
1352 # and adds/locks by another thread still wait
1353 self._addThread(target=self._doAddSet, args=(['nine']))
1354 self._addThread(target=self._doLockSet, args=(None, 1))
1355 self._addThread(target=self._doLockSet, args=(None, 0))
1356 self.assertRaises(Queue.Empty, self.done.get_nowait)
1360 self.assertEqual(self.done.get_nowait(), 'DONE')
1362 self.assertEqual(self.ls.remove(['nine']), ['nine'])
1364 self.assertEqual(self.ls.acquire(None, shared=1), set())
1365 # other sharers can go, adds still wait
1366 self._addThread(target=self._doLockSet, args=(None, 1))
1368 self.assertEqual(self.done.get_nowait(), 'DONE')
1369 self._addThread(target=self._doAddSet, args=(['nine']))
1370 self.assertRaises(Queue.Empty, self.done.get_nowait)
1373 self.assertEqual(self.done.get_nowait(), 'DONE')
1376 def testPriority(self):
1377 def _Acquire(prev, next, name, priority, success_fn):
1379 self.assert_(self.ls.acquire(name, shared=0,
1381 test_notify=lambda _: next.set()))
1387 # Get all in exclusive mode
1388 self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
1390 done_two = Queue.Queue(0)
1392 first = threading.Event()
1395 acquires = [("one", prio, self.done) for prio in range(1, 33)]
1396 acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
1398 # Use a deterministic random generator
1399 random.Random(741).shuffle(acquires)
1401 for (name, prio, done) in acquires:
1402 ev = threading.Event()
1403 self._addThread(target=_Acquire,
1404 args=(prev, ev, name, prio,
1405 compat.partial(done.put, "Prio%s" % prio)))
1411 # Wait for last acquire to start
1414 # Let threads acquire locks
1417 # Wait for threads to finish
1420 for i in range(1, 33):
1421 self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
1422 self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
1424 self.assertRaises(Queue.Empty, self.done.get_nowait)
1425 self.assertRaises(Queue.Empty, done_two.get_nowait)
1428 class TestGanetiLockManager(_ThreadedTestCase):
1431 _ThreadedTestCase.setUp(self)
1432 self.nodes=['n1', 'n2']
1433 self.nodegroups=['g1', 'g2']
1434 self.instances=['i1', 'i2', 'i3']
1435 self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups,
1439 # Don't try this at home...
1440 locking.GanetiLockManager._instance = None
1442 def testLockingConstants(self):
1443 # The locking library internally cheats by assuming its constants have some
1444 # relationships with each other. Check those hold true.
1445 # This relationship is also used in the Processor to recursively acquire
1446 # the right locks. Again, please don't break it.
1447 for i in range(len(locking.LEVELS)):
1448 self.assertEqual(i, locking.LEVELS[i])
1450 def testDoubleGLFails(self):
1451 self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [])
1453 def testLockNames(self):
1454 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1455 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1456 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1457 set(self.nodegroups))
1458 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1459 set(self.instances))
1461 def testInitAndResources(self):
1462 locking.GanetiLockManager._instance = None
1463 self.GL = locking.GanetiLockManager([], [], [])
1464 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1465 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1466 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1467 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1469 locking.GanetiLockManager._instance = None
1470 self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [])
1471 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1472 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1473 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1474 set(self.nodegroups))
1475 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1477 locking.GanetiLockManager._instance = None
1478 self.GL = locking.GanetiLockManager([], [], self.instances)
1479 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1480 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1481 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1482 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1483 set(self.instances))
1485 def testAcquireRelease(self):
1486 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1487 self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1488 self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1489 self.GL.acquire(locking.LEVEL_NODEGROUP, ['g2'])
1490 self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1491 self.GL.release(locking.LEVEL_NODE, ['n2'])
1492 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1493 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1494 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1495 self.GL.release(locking.LEVEL_NODE)
1496 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1497 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1498 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1499 self.GL.release(locking.LEVEL_NODEGROUP)
1500 self.GL.release(locking.LEVEL_INSTANCE)
1501 self.assertRaises(errors.LockError, self.GL.acquire,
1502 locking.LEVEL_INSTANCE, ['i5'])
1503 self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1504 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1506 def testAcquireWholeSets(self):
1507 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1508 self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1509 set(self.instances))
1510 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1511 set(self.instances))
1512 self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
1513 set(self.nodegroups))
1514 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP),
1515 set(self.nodegroups))
1516 self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1518 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1520 self.GL.release(locking.LEVEL_NODE)
1521 self.GL.release(locking.LEVEL_NODEGROUP)
1522 self.GL.release(locking.LEVEL_INSTANCE)
1523 self.GL.release(locking.LEVEL_CLUSTER)
1525 def testAcquireWholeAndPartial(self):
1526 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1527 self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1528 set(self.instances))
1529 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1530 set(self.instances))
1531 self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1533 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1535 self.GL.release(locking.LEVEL_NODE)
1536 self.GL.release(locking.LEVEL_INSTANCE)
1537 self.GL.release(locking.LEVEL_CLUSTER)
1539 def testBGLDependency(self):
1540 self.assertRaises(AssertionError, self.GL.acquire,
1541 locking.LEVEL_NODE, ['n1', 'n2'])
1542 self.assertRaises(AssertionError, self.GL.acquire,
1543 locking.LEVEL_INSTANCE, ['i3'])
1544 self.assertRaises(AssertionError, self.GL.acquire,
1545 locking.LEVEL_NODEGROUP, ['g1'])
1546 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1547 self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1548 self.assertRaises(AssertionError, self.GL.release,
1549 locking.LEVEL_CLUSTER, ['BGL'])
1550 self.assertRaises(AssertionError, self.GL.release,
1551 locking.LEVEL_CLUSTER)
1552 self.GL.release(locking.LEVEL_NODE)
1553 self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1554 self.assertRaises(AssertionError, self.GL.release,
1555 locking.LEVEL_CLUSTER, ['BGL'])
1556 self.assertRaises(AssertionError, self.GL.release,
1557 locking.LEVEL_CLUSTER)
1558 self.GL.release(locking.LEVEL_INSTANCE)
1559 self.GL.acquire(locking.LEVEL_NODEGROUP, None)
1560 self.GL.release(locking.LEVEL_NODEGROUP, ['g1'])
1561 self.assertRaises(AssertionError, self.GL.release,
1562 locking.LEVEL_CLUSTER, ['BGL'])
1563 self.assertRaises(AssertionError, self.GL.release,
1564 locking.LEVEL_CLUSTER)
1565 self.GL.release(locking.LEVEL_NODEGROUP)
1566 self.GL.release(locking.LEVEL_CLUSTER)
1568 def testWrongOrder(self):
1569 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1570 self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1571 self.assertRaises(AssertionError, self.GL.acquire,
1572 locking.LEVEL_NODE, ['n1'])
1573 self.assertRaises(AssertionError, self.GL.acquire,
1574 locking.LEVEL_NODEGROUP, ['g1'])
1575 self.assertRaises(AssertionError, self.GL.acquire,
1576 locking.LEVEL_INSTANCE, ['i2'])
1578 def testModifiableLevels(self):
1579 self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER,
1581 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'])
1582 self.GL.add(locking.LEVEL_INSTANCE, ['i4'])
1583 self.GL.remove(locking.LEVEL_INSTANCE, ['i3'])
1584 self.GL.remove(locking.LEVEL_INSTANCE, ['i1'])
1585 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(['i2', 'i4']))
1586 self.GL.add(locking.LEVEL_NODE, ['n3'])
1587 self.GL.remove(locking.LEVEL_NODE, ['n1'])
1588 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(['n2', 'n3']))
1589 self.GL.add(locking.LEVEL_NODEGROUP, ['g3'])
1590 self.GL.remove(locking.LEVEL_NODEGROUP, ['g2'])
1591 self.GL.remove(locking.LEVEL_NODEGROUP, ['g1'])
1592 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(['g3']))
1593 self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER,
1596 # Helper function to run as a thread that shared the BGL and then acquires
1597 # some locks at another level.
1598 def _doLock(self, level, names, shared):
1600 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1601 self.GL.acquire(level, names, shared=shared)
1602 self.done.put('DONE')
1603 self.GL.release(level)
1604 self.GL.release(locking.LEVEL_CLUSTER)
1605 except errors.LockError:
1606 self.done.put('ERR')
1609 def testConcurrency(self):
1610 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1611 self._addThread(target=self._doLock,
1612 args=(locking.LEVEL_INSTANCE, 'i1', 1))
1614 self.assertEqual(self.done.get_nowait(), 'DONE')
1615 self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1616 self._addThread(target=self._doLock,
1617 args=(locking.LEVEL_INSTANCE, 'i1', 1))
1619 self.assertEqual(self.done.get_nowait(), 'DONE')
1620 self._addThread(target=self._doLock,
1621 args=(locking.LEVEL_INSTANCE, 'i3', 1))
1622 self.assertRaises(Queue.Empty, self.done.get_nowait)
1623 self.GL.release(locking.LEVEL_INSTANCE)
1625 self.assertEqual(self.done.get_nowait(), 'DONE')
1626 self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1627 self._addThread(target=self._doLock,
1628 args=(locking.LEVEL_INSTANCE, 'i2', 1))
1630 self.assertEqual(self.done.get_nowait(), 'DONE')
1631 self._addThread(target=self._doLock,
1632 args=(locking.LEVEL_INSTANCE, 'i2', 0))
1633 self.assertRaises(Queue.Empty, self.done.get_nowait)
1634 self.GL.release(locking.LEVEL_INSTANCE)
1636 self.assertEqual(self.done.get(True, 1), 'DONE')
1637 self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1640 class TestLockMonitor(_ThreadedTestCase):
1642 _ThreadedTestCase.setUp(self)
1643 self.lm = locking.LockMonitor()
1645 def testSingleThread(self):
1648 for i in range(100):
1649 name = "TestLock%s" % i
1650 locks.append(locking.SharedLock(name, monitor=self.lm))
1652 self.assertEqual(len(self.lm._locks), len(locks))
1653 result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"]))
1654 self.assertEqual(len(result.fields), 1)
1655 self.assertEqual(len(result.data), 100)
1660 # The garbage collector might needs some time
1663 raise utils.RetryAgain()
1665 utils.Retry(_CheckLocks, 0.1, 30.0)
1667 self.assertFalse(self.lm._locks)
1669 def testMultiThread(self):
1672 def _CreateLock(prev, next, name):
1674 locks.append(locking.SharedLock(name, monitor=self.lm))
1680 first = threading.Event()
1683 # Use a deterministic random generator
1684 for i in random.Random(4263).sample(range(100), 33):
1685 name = "MtTestLock%s" % i
1686 expnames.append(name)
1688 ev = threading.Event()
1689 self._addThread(target=_CreateLock, args=(prev, ev, name))
1696 # Check order in which locks were added
1697 self.assertEqual([i.name for i in locks], expnames)
1699 # Check query result
1700 result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1701 self.assert_(isinstance(result, dict))
1702 response = objects.QueryResponse.FromDict(result)
1703 self.assertEqual(response.data,
1704 [[(constants.RS_NORMAL, name),
1705 (constants.RS_NORMAL, None),
1706 (constants.RS_NORMAL, None),
1707 (constants.RS_NORMAL, [])]
1708 for name in utils.NiceSort(expnames)])
1709 self.assertEqual(len(response.fields), 4)
1710 self.assertEqual(["name", "mode", "owner", "pending"],
1711 [fdef.name for fdef in response.fields])
1713 # Test exclusive acquire
1714 for tlock in locks[::4]:
1715 tlock.acquire(shared=0)
1717 def _GetExpResult(name):
1718 if tlock.name == name:
1719 return [(constants.RS_NORMAL, name),
1720 (constants.RS_NORMAL, "exclusive"),
1721 (constants.RS_NORMAL,
1722 [threading.currentThread().getName()]),
1723 (constants.RS_NORMAL, [])]
1724 return [(constants.RS_NORMAL, name),
1725 (constants.RS_NORMAL, None),
1726 (constants.RS_NORMAL, None),
1727 (constants.RS_NORMAL, [])]
1729 result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1730 self.assertEqual(objects.QueryResponse.FromDict(result).data,
1731 [_GetExpResult(name)
1732 for name in utils.NiceSort(expnames)])
1736 # Test shared acquire
1737 def _Acquire(lock, shared, ev, notify):
1738 lock.acquire(shared=shared)
1745 for tlock1 in locks[::11]:
1746 for tlock2 in locks[::-15]:
1747 if tlock2 == tlock1:
1751 for tlock3 in locks[::10]:
1752 if tlock3 in (tlock2, tlock1):
1756 releaseev = threading.Event()
1762 ev = threading.Event()
1763 tthreads1.append(self._addThread(target=_Acquire,
1764 args=(tlock1, 1, releaseev, ev)))
1765 acquireev.append(ev)
1767 ev = threading.Event()
1768 tthread2 = self._addThread(target=_Acquire,
1769 args=(tlock2, 1, releaseev, ev))
1770 acquireev.append(ev)
1772 ev = threading.Event()
1773 tthread3 = self._addThread(target=_Acquire,
1774 args=(tlock3, 0, releaseev, ev))
1775 acquireev.append(ev)
1777 # Wait for all locks to be acquired
1781 # Check query result
1782 result = self.lm.QueryLocks(["name", "mode", "owner"])
1783 response = objects.QueryResponse.FromDict(result)
1784 for (name, mode, owner) in response.data:
1785 (name_status, name_value) = name
1786 (owner_status, owner_value) = owner
1788 self.assertEqual(name_status, constants.RS_NORMAL)
1789 self.assertEqual(owner_status, constants.RS_NORMAL)
1791 if name_value == tlock1.name:
1792 self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
1793 self.assertEqual(set(owner_value),
1794 set(i.getName() for i in tthreads1))
1797 if name_value == tlock2.name:
1798 self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
1799 self.assertEqual(owner_value, [tthread2.getName()])
1802 if name_value == tlock3.name:
1803 self.assertEqual(mode, (constants.RS_NORMAL, "exclusive"))
1804 self.assertEqual(owner_value, [tthread3.getName()])
1807 self.assert_(name_value in expnames)
1808 self.assertEqual(mode, (constants.RS_NORMAL, None))
1809 self.assert_(owner_value is None)
1811 # Release locks again
1816 result = self.lm.QueryLocks(["name", "mode", "owner"])
1817 self.assertEqual(objects.QueryResponse.FromDict(result).data,
1818 [[(constants.RS_NORMAL, name),
1819 (constants.RS_NORMAL, None),
1820 (constants.RS_NORMAL, None)]
1821 for name in utils.NiceSort(expnames)])
1823 def testDelete(self):
1824 lock = locking.SharedLock("TestLock", monitor=self.lm)
1826 self.assertEqual(len(self.lm._locks), 1)
1827 result = self.lm.QueryLocks(["name", "mode", "owner"])
1828 self.assertEqual(objects.QueryResponse.FromDict(result).data,
1829 [[(constants.RS_NORMAL, lock.name),
1830 (constants.RS_NORMAL, None),
1831 (constants.RS_NORMAL, None)]])
1835 result = self.lm.QueryLocks(["name", "mode", "owner"])
1836 self.assertEqual(objects.QueryResponse.FromDict(result).data,
1837 [[(constants.RS_NORMAL, lock.name),
1838 (constants.RS_NORMAL, "deleted"),
1839 (constants.RS_NORMAL, None)]])
1840 self.assertEqual(len(self.lm._locks), 1)
1842 def testPending(self):
1843 def _Acquire(lock, shared, prev, next):
1846 lock.acquire(shared=shared, test_notify=next.set)
1852 lock = locking.SharedLock("ExcLock", monitor=self.lm)
1854 for shared in [0, 1]:
1857 self.assertEqual(len(self.lm._locks), 1)
1858 result = self.lm.QueryLocks(["name", "mode", "owner"])
1859 self.assertEqual(objects.QueryResponse.FromDict(result).data,
1860 [[(constants.RS_NORMAL, lock.name),
1861 (constants.RS_NORMAL, "exclusive"),
1862 (constants.RS_NORMAL,
1863 [threading.currentThread().getName()])]])
1867 first = threading.Event()
1871 ev = threading.Event()
1872 threads.append(self._addThread(target=_Acquire,
1873 args=(lock, shared, prev, ev)))
1879 # Wait for last acquire to start waiting
1882 # NOTE: This works only because QueryLocks will acquire the
1883 # lock-internal lock again and won't be able to get the information
1884 # until it has the lock. By then the acquire should be registered in
1885 # SharedLock.__pending (otherwise it's a bug).
1887 # All acquires are waiting now
1889 pending = [("shared", utils.NiceSort(t.getName() for t in threads))]
1891 pending = [("exclusive", [t.getName()]) for t in threads]
1893 result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1894 self.assertEqual(objects.QueryResponse.FromDict(result).data,
1895 [[(constants.RS_NORMAL, lock.name),
1896 (constants.RS_NORMAL, "exclusive"),
1897 (constants.RS_NORMAL,
1898 [threading.currentThread().getName()]),
1899 (constants.RS_NORMAL, pending)]])
1901 self.assertEqual(len(self.lm._locks), 1)
1907 # No pending acquires
1908 result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1909 self.assertEqual(objects.QueryResponse.FromDict(result).data,
1910 [[(constants.RS_NORMAL, lock.name),
1911 (constants.RS_NORMAL, None),
1912 (constants.RS_NORMAL, None),
1913 (constants.RS_NORMAL, [])]])
1915 self.assertEqual(len(self.lm._locks), 1)
1918 if __name__ == '__main__':
1919 testutils.GanetiTestProgram()