4 # Copyright (C) 2006, 2007, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the locking module"""
34 from ganeti import constants
35 from ganeti import locking
36 from ganeti import errors
37 from ganeti import utils
38 from ganeti import compat
39 from ganeti import objects
40 from ganeti import query
45 # This is used to test the ssynchronize decorator.
46 # Since it's passed as input to a decorator it must be declared as a global.
47 _decoratorlock = locking.SharedLock("decorator lock")
49 #: List for looping tests
54 """Decorator for executing a function many times"""
55 def wrapper(*args, **kwargs):
61 def SafeSleep(duration):
64 delay = start + duration - time.time()
70 class _ThreadedTestCase(unittest.TestCase):
71 """Test class that supports adding/waiting on threads"""
73 unittest.TestCase.setUp(self)
74 self.done = Queue.Queue(0)
77 def _addThread(self, *args, **kwargs):
78 """Create and remember a new thread"""
79 t = threading.Thread(*args, **kwargs)
80 self.threads.append(t)
84 def _waitThreads(self):
85 """Wait for all our threads to finish"""
86 for t in self.threads:
88 self.failIf(t.isAlive())
92 class _ConditionTestCase(_ThreadedTestCase):
93 """Common test case for conditions"""
96 _ThreadedTestCase.setUp(self)
97 self.lock = threading.Lock()
98 self.cond = cls(self.lock)
100 def _testAcquireRelease(self):
101 self.assertFalse(self.cond._is_owned())
102 self.assertRaises(RuntimeError, self.cond.wait)
103 self.assertRaises(RuntimeError, self.cond.notifyAll)
106 self.assert_(self.cond._is_owned())
107 self.cond.notifyAll()
108 self.assert_(self.cond._is_owned())
111 self.assertFalse(self.cond._is_owned())
112 self.assertRaises(RuntimeError, self.cond.wait)
113 self.assertRaises(RuntimeError, self.cond.notifyAll)
115 def _testNotification(self):
120 self.cond.notifyAll()
125 self._addThread(target=_NotifyAll)
126 self.assertEqual(self.done.get(True, 1), "NE")
127 self.assertRaises(Queue.Empty, self.done.get_nowait)
129 self.assertEqual(self.done.get(True, 1), "NA")
130 self.assertEqual(self.done.get(True, 1), "NN")
131 self.assert_(self.cond._is_owned())
133 self.assertFalse(self.cond._is_owned())
136 class TestSingleNotifyPipeCondition(_ConditionTestCase):
137 """SingleNotifyPipeCondition tests"""
140 _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
142 def testAcquireRelease(self):
143 self._testAcquireRelease()
145 def testNotification(self):
146 self._testNotification()
148 def testWaitReuse(self):
154 def testNoNotifyReuse(self):
156 self.cond.notifyAll()
157 self.assertRaises(RuntimeError, self.cond.wait)
158 self.assertRaises(RuntimeError, self.cond.notifyAll)
162 class TestPipeCondition(_ConditionTestCase):
163 """PipeCondition tests"""
166 _ConditionTestCase.setUp(self, locking.PipeCondition)
168 def testAcquireRelease(self):
169 self._testAcquireRelease()
171 def testNotification(self):
172 self._testNotification()
174 def _TestWait(self, fn):
176 self._addThread(target=fn),
177 self._addThread(target=fn),
178 self._addThread(target=fn),
181 # Wait for threads to be waiting
183 self.assertEqual(self.done.get(True, 1), "A")
185 self.assertRaises(Queue.Empty, self.done.get_nowait)
188 self.assertEqual(len(self.cond._waiters), 3)
189 self.assertEqual(self.cond._waiters, set(threads))
190 # This new thread can't acquire the lock, and thus call wait, before we
192 self._addThread(target=fn)
193 self.cond.notifyAll()
194 self.assertRaises(Queue.Empty, self.done.get_nowait)
197 # We should now get 3 W and 1 A (for the new thread) in whatever order
201 got = self.done.get(True, 1)
207 self.fail("Got %s on the done queue" % got)
209 self.assertEqual(w, 3)
210 self.assertEqual(a, 1)
213 self.cond.notifyAll()
216 self.assertEqual(self.done.get_nowait(), "W")
217 self.assertRaises(Queue.Empty, self.done.get_nowait)
219 def testBlockingWait(self):
227 self._TestWait(_BlockingWait)
229 def testLongTimeoutWait(self):
237 self._TestWait(_Helper)
239 def _TimeoutWait(self, timeout, check):
241 self.cond.wait(timeout)
245 def testShortTimeoutWait(self):
246 self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
247 self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
249 self.assertEqual(self.done.get_nowait(), "T1")
250 self.assertEqual(self.done.get_nowait(), "T1")
251 self.assertRaises(Queue.Empty, self.done.get_nowait)
253 def testZeroTimeoutWait(self):
254 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
255 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
256 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
258 self.assertEqual(self.done.get_nowait(), "T0")
259 self.assertEqual(self.done.get_nowait(), "T0")
260 self.assertEqual(self.done.get_nowait(), "T0")
261 self.assertRaises(Queue.Empty, self.done.get_nowait)
264 class TestSharedLock(_ThreadedTestCase):
265 """SharedLock tests"""
268 _ThreadedTestCase.setUp(self)
269 self.sl = locking.SharedLock("TestSharedLock")
271 def testSequenceAndOwnership(self):
272 self.assertFalse(self.sl._is_owned())
273 self.sl.acquire(shared=1)
274 self.assert_(self.sl._is_owned())
275 self.assert_(self.sl._is_owned(shared=1))
276 self.assertFalse(self.sl._is_owned(shared=0))
278 self.assertFalse(self.sl._is_owned())
280 self.assert_(self.sl._is_owned())
281 self.assertFalse(self.sl._is_owned(shared=1))
282 self.assert_(self.sl._is_owned(shared=0))
284 self.assertFalse(self.sl._is_owned())
285 self.sl.acquire(shared=1)
286 self.assert_(self.sl._is_owned())
287 self.assert_(self.sl._is_owned(shared=1))
288 self.assertFalse(self.sl._is_owned(shared=0))
290 self.assertFalse(self.sl._is_owned())
292 def testBooleanValue(self):
293 # semaphores are supposed to return a true value on a successful acquire
294 self.assert_(self.sl.acquire(shared=1))
296 self.assert_(self.sl.acquire())
299 def testDoubleLockingStoE(self):
300 self.sl.acquire(shared=1)
301 self.assertRaises(AssertionError, self.sl.acquire)
303 def testDoubleLockingEtoS(self):
305 self.assertRaises(AssertionError, self.sl.acquire, shared=1)
307 def testDoubleLockingStoS(self):
308 self.sl.acquire(shared=1)
309 self.assertRaises(AssertionError, self.sl.acquire, shared=1)
311 def testDoubleLockingEtoE(self):
313 self.assertRaises(AssertionError, self.sl.acquire)
315 # helper functions: called in a separate thread they acquire the lock, send
316 # their identifier on the done queue, then release it.
317 def _doItSharer(self):
319 self.sl.acquire(shared=1)
322 except errors.LockError:
325 def _doItExclusive(self):
330 except errors.LockError:
333 def _doItDelete(self):
337 except errors.LockError:
340 def testSharersCanCoexist(self):
341 self.sl.acquire(shared=1)
342 threading.Thread(target=self._doItSharer).start()
343 self.assert_(self.done.get(True, 1))
347 def testExclusiveBlocksExclusive(self):
349 self._addThread(target=self._doItExclusive)
350 self.assertRaises(Queue.Empty, self.done.get_nowait)
353 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
356 def testExclusiveBlocksDelete(self):
358 self._addThread(target=self._doItDelete)
359 self.assertRaises(Queue.Empty, self.done.get_nowait)
362 self.failUnlessEqual(self.done.get_nowait(), 'DEL')
363 self.sl = locking.SharedLock(self.sl.name)
366 def testExclusiveBlocksSharer(self):
368 self._addThread(target=self._doItSharer)
369 self.assertRaises(Queue.Empty, self.done.get_nowait)
372 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
375 def testSharerBlocksExclusive(self):
376 self.sl.acquire(shared=1)
377 self._addThread(target=self._doItExclusive)
378 self.assertRaises(Queue.Empty, self.done.get_nowait)
381 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
384 def testSharerBlocksDelete(self):
385 self.sl.acquire(shared=1)
386 self._addThread(target=self._doItDelete)
387 self.assertRaises(Queue.Empty, self.done.get_nowait)
390 self.failUnlessEqual(self.done.get_nowait(), 'DEL')
391 self.sl = locking.SharedLock(self.sl.name)
394 def testWaitingExclusiveBlocksSharer(self):
395 """SKIPPED testWaitingExclusiveBlockSharer"""
398 self.sl.acquire(shared=1)
399 # the lock is acquired in shared mode...
400 self._addThread(target=self._doItExclusive)
401 # ...but now an exclusive is waiting...
402 self._addThread(target=self._doItSharer)
403 # ...so the sharer should be blocked as well
404 self.assertRaises(Queue.Empty, self.done.get_nowait)
407 # The exclusive passed before
408 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
409 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
412 def testWaitingSharerBlocksExclusive(self):
413 """SKIPPED testWaitingSharerBlocksExclusive"""
417 # the lock is acquired in exclusive mode...
418 self._addThread(target=self._doItSharer)
419 # ...but now a sharer is waiting...
420 self._addThread(target=self._doItExclusive)
421 # ...the exclusive is waiting too...
422 self.assertRaises(Queue.Empty, self.done.get_nowait)
425 # The sharer passed before
426 self.assertEqual(self.done.get_nowait(), 'SHR')
427 self.assertEqual(self.done.get_nowait(), 'EXC')
429 def testDelete(self):
431 self.assertRaises(errors.LockError, self.sl.acquire)
432 self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
433 self.assertRaises(errors.LockError, self.sl.delete)
435 def testDeleteTimeout(self):
436 self.sl.delete(timeout=60)
438 def testNoDeleteIfSharer(self):
439 self.sl.acquire(shared=1)
440 self.assertRaises(AssertionError, self.sl.delete)
443 def testDeletePendingSharersExclusiveDelete(self):
445 self._addThread(target=self._doItSharer)
446 self._addThread(target=self._doItSharer)
447 self._addThread(target=self._doItExclusive)
448 self._addThread(target=self._doItDelete)
451 # The threads who were pending return ERR
453 self.assertEqual(self.done.get_nowait(), 'ERR')
454 self.sl = locking.SharedLock(self.sl.name)
457 def testDeletePendingDeleteExclusiveSharers(self):
459 self._addThread(target=self._doItDelete)
460 self._addThread(target=self._doItExclusive)
461 self._addThread(target=self._doItSharer)
462 self._addThread(target=self._doItSharer)
465 # The two threads who were pending return both ERR
466 self.assertEqual(self.done.get_nowait(), 'ERR')
467 self.assertEqual(self.done.get_nowait(), 'ERR')
468 self.assertEqual(self.done.get_nowait(), 'ERR')
469 self.assertEqual(self.done.get_nowait(), 'ERR')
470 self.sl = locking.SharedLock(self.sl.name)
473 def testExclusiveAcquireTimeout(self):
474 for shared in [0, 1]:
475 on_queue = threading.Event()
476 release_exclusive = threading.Event()
478 def _LockExclusive():
479 self.sl.acquire(shared=0, test_notify=on_queue.set)
480 self.done.put("A: start wait")
481 release_exclusive.wait()
482 self.done.put("A: end wait")
485 # Start thread to hold lock in exclusive mode
486 self._addThread(target=_LockExclusive)
488 # Wait for wait to begin
489 self.assertEqual(self.done.get(timeout=60), "A: start wait")
491 # Wait up to 60s to get lock, but release exclusive lock as soon as we're
493 self.failUnless(self.sl.acquire(shared=shared, timeout=60,
494 test_notify=release_exclusive.set))
496 self.done.put("got 2nd")
501 self.assertEqual(self.done.get_nowait(), "A: end wait")
502 self.assertEqual(self.done.get_nowait(), "got 2nd")
503 self.assertRaises(Queue.Empty, self.done.get_nowait)
506 def testAcquireExpiringTimeout(self):
507 def _AcquireWithTimeout(shared, timeout):
508 if not self.sl.acquire(shared=shared, timeout=timeout):
509 self.done.put("timeout")
511 for shared in [0, 1]:
515 # Start shared acquires with timeout between 0 and 20 ms
517 self._addThread(target=_AcquireWithTimeout,
518 args=(shared, i * 2.0 / 1000.0))
520 # Wait for threads to finish (makes sure the acquire timeout expires
521 # before releasing the lock)
528 self.assertEqual(self.done.get_nowait(), "timeout")
530 self.assertRaises(Queue.Empty, self.done.get_nowait)
533 def testSharedSkipExclusiveAcquires(self):
534 # Tests whether shared acquires jump in front of exclusive acquires in the
537 def _Acquire(shared, name, notify_ev, wait_ev):
539 notify_fn = notify_ev.set
546 if not self.sl.acquire(shared=shared, test_notify=notify_fn):
552 # Get exclusive lock while we fill the queue
560 # Add acquires using threading.Event for synchronization. They'll be
561 # acquired exactly in the order defined in this list.
562 acquires = (shrcnt1 * [(1, "shared 1")] +
563 3 * [(0, "exclusive 1")] +
564 shrcnt2 * [(1, "shared 2")] +
565 shrcnt3 * [(1, "shared 3")] +
566 shrcnt4 * [(1, "shared 4")] +
567 3 * [(0, "exclusive 2")])
572 for args in acquires:
573 ev_cur = threading.Event()
574 self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
577 # Wait for last acquire to start
580 # Expect 6 pending exclusive acquires and 1 for all shared acquires
582 self.assertEqual(self.sl._count_pending(), 7)
584 # Release exclusive lock and wait
590 for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
591 # Shared locks aren't guaranteed to be notified in order, but they'll be
593 tmp = self.done.get_nowait()
594 if tmp == "shared 1":
596 elif tmp == "shared 2":
598 elif tmp == "shared 3":
600 elif tmp == "shared 4":
602 self.assertEqual(shrcnt1, 0)
603 self.assertEqual(shrcnt2, 0)
604 self.assertEqual(shrcnt3, 0)
605 self.assertEqual(shrcnt3, 0)
608 self.assertEqual(self.done.get_nowait(), "exclusive 1")
611 self.assertEqual(self.done.get_nowait(), "exclusive 2")
613 self.assertRaises(Queue.Empty, self.done.get_nowait)
616 def testMixedAcquireTimeout(self):
617 sync = threading.Event()
619 def _AcquireShared(ev):
620 if not self.sl.acquire(shared=1, timeout=None):
623 self.done.put("shared")
628 # Wait for notification from main thread
636 ev = threading.Event()
637 self._addThread(target=_AcquireShared, args=(ev, ))
640 # Wait for all acquires to finish
644 self.assertEqual(self.sl._count_pending(), 0)
646 # Try to get exclusive lock
647 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
649 # Acquire exclusive without timeout
650 exclsync = threading.Event()
651 exclev = threading.Event()
653 def _AcquireExclusive():
654 if not self.sl.acquire(shared=0):
657 self.done.put("exclusive")
662 # Wait for notification from main thread
667 self._addThread(target=_AcquireExclusive)
669 # Try to get exclusive lock
670 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
672 # Make all shared holders release their locks
675 # Wait for exclusive acquire to succeed
678 self.assertEqual(self.sl._count_pending(), 0)
680 # Try to get exclusive lock
681 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
683 def _AcquireSharedSimple():
684 if self.sl.acquire(shared=1, timeout=None):
685 self.done.put("shared2")
689 self._addThread(target=_AcquireSharedSimple)
691 # Tell exclusive lock to release
694 # Wait for everything to finish
697 self.assertEqual(self.sl._count_pending(), 0)
701 self.assertEqual(self.done.get_nowait(), "shared")
703 self.assertEqual(self.done.get_nowait(), "exclusive")
706 self.assertEqual(self.done.get_nowait(), "shared2")
708 self.assertRaises(Queue.Empty, self.done.get_nowait)
710 def testPriority(self):
711 # Acquire in exclusive mode
712 self.assert_(self.sl.acquire(shared=0))
715 def _Acquire(prev, next, shared, priority, result):
717 self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
719 self.done.put(result)
723 counter = itertools.count(0)
724 priorities = range(-20, 30)
725 first = threading.Event()
731 # [(shared/exclusive, set(acquire names), set(pending threads)),
732 # (shared/exclusive, ...),
738 # References shared acquire per priority in L{perprio}. Data structure:
740 # priority: (shared=1, set(acquire names), set(pending threads)),
744 for seed in [4979, 9523, 14902, 32440]:
745 # Use a deterministic random generator
746 rnd = random.Random(seed)
747 for priority in [rnd.choice(priorities) for _ in range(30)]:
752 acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
754 ev = threading.Event()
755 thread = self._addThread(target=_Acquire,
756 args=(prev, ev, shared, priority, acqname))
759 # Record expected aqcuire, see above for structure
760 data = (shared, set([acqname]), set([thread]))
761 priolist = perprio.setdefault(priority, [])
763 priosh = prioshared.get(priority, None)
765 # Shared acquires are merged
766 for i, j in zip(priosh[1:], data[1:]):
768 assert data[0] == priosh[0]
770 prioshared[priority] = data
771 priolist.append(data)
773 priolist.append(data)
775 # Start all acquires and wait for them
779 # Check lock information
780 self.assertEqual(self.sl.GetInfo(set()), (self.sl.name, None, None, None))
781 self.assertEqual(self.sl.GetInfo(set([query.LQ_MODE, query.LQ_OWNER])),
782 (self.sl.name, "exclusive",
783 [threading.currentThread().getName()], None))
785 self._VerifyPrioPending(self.sl.GetInfo(set([query.LQ_PENDING])), perprio)
787 # Let threads acquire the lock
790 # Wait for everything to finish
793 self.assert_(self.sl._check_empty())
795 # Check acquires by priority
796 for acquires in [perprio[i] for i in sorted(perprio.keys())]:
797 for (_, names, _) in acquires:
798 # For shared acquires, the set will contain 1..n entries. For exclusive
801 names.remove(self.done.get_nowait())
802 self.assertFalse(compat.any(names for (_, names, _) in acquires))
804 self.assertRaises(Queue.Empty, self.done.get_nowait)
806 def _VerifyPrioPending(self, (name, mode, owner, pending), perprio):
807 self.assertEqual(name, self.sl.name)
808 self.assert_(mode is None)
809 self.assert_(owner is None)
811 self.assertEqual([(pendmode, sorted(waiting))
812 for (pendmode, waiting) in pending],
813 [(["exclusive", "shared"][int(bool(shared))],
814 sorted(t.getName() for t in threads))
815 for acquires in [perprio[i]
816 for i in sorted(perprio.keys())]
817 for (shared, _, threads) in acquires])
820 class TestSharedLockInCondition(_ThreadedTestCase):
821 """SharedLock as a condition lock tests"""
824 _ThreadedTestCase.setUp(self)
825 self.sl = locking.SharedLock("TestSharedLockInCondition")
828 def setCondition(self):
829 self.cond = threading.Condition(self.sl)
831 def testKeepMode(self):
832 self.cond.acquire(shared=1)
833 self.assert_(self.sl._is_owned(shared=1))
835 self.assert_(self.sl._is_owned(shared=1))
837 self.cond.acquire(shared=0)
838 self.assert_(self.sl._is_owned(shared=0))
840 self.assert_(self.sl._is_owned(shared=0))
844 class TestSharedLockInPipeCondition(TestSharedLockInCondition):
845 """SharedLock as a pipe condition lock tests"""
847 def setCondition(self):
848 self.cond = locking.PipeCondition(self.sl)
851 class TestSSynchronizedDecorator(_ThreadedTestCase):
852 """Shared Lock Synchronized decorator test"""
855 _ThreadedTestCase.setUp(self)
857 @locking.ssynchronized(_decoratorlock)
858 def _doItExclusive(self):
859 self.assert_(_decoratorlock._is_owned())
862 @locking.ssynchronized(_decoratorlock, shared=1)
863 def _doItSharer(self):
864 self.assert_(_decoratorlock._is_owned(shared=1))
867 def testDecoratedFunctions(self):
868 self._doItExclusive()
869 self.assertFalse(_decoratorlock._is_owned())
871 self.assertFalse(_decoratorlock._is_owned())
873 def testSharersCanCoexist(self):
874 _decoratorlock.acquire(shared=1)
875 threading.Thread(target=self._doItSharer).start()
876 self.assert_(self.done.get(True, 1))
877 _decoratorlock.release()
880 def testExclusiveBlocksExclusive(self):
881 _decoratorlock.acquire()
882 self._addThread(target=self._doItExclusive)
883 # give it a bit of time to check that it's not actually doing anything
884 self.assertRaises(Queue.Empty, self.done.get_nowait)
885 _decoratorlock.release()
887 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
890 def testExclusiveBlocksSharer(self):
891 _decoratorlock.acquire()
892 self._addThread(target=self._doItSharer)
893 self.assertRaises(Queue.Empty, self.done.get_nowait)
894 _decoratorlock.release()
896 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
899 def testSharerBlocksExclusive(self):
900 _decoratorlock.acquire(shared=1)
901 self._addThread(target=self._doItExclusive)
902 self.assertRaises(Queue.Empty, self.done.get_nowait)
903 _decoratorlock.release()
905 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
908 class TestLockSet(_ThreadedTestCase):
912 _ThreadedTestCase.setUp(self)
916 """Helper to (re)initialize the lock set"""
917 self.resources = ['one', 'two', 'three']
918 self.ls = locking.LockSet(self.resources, "TestLockSet")
920 def testResources(self):
921 self.assertEquals(self.ls._names(), set(self.resources))
922 newls = locking.LockSet([], "TestLockSet.testResources")
923 self.assertEquals(newls._names(), set())
925 def testAcquireRelease(self):
926 self.assert_(self.ls.acquire('one'))
927 self.assertEquals(self.ls._list_owned(), set(['one']))
929 self.assertEquals(self.ls._list_owned(), set())
930 self.assertEquals(self.ls.acquire(['one']), set(['one']))
931 self.assertEquals(self.ls._list_owned(), set(['one']))
933 self.assertEquals(self.ls._list_owned(), set())
934 self.ls.acquire(['one', 'two', 'three'])
935 self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
936 self.ls.release('one')
937 self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
938 self.ls.release(['three'])
939 self.assertEquals(self.ls._list_owned(), set(['two']))
941 self.assertEquals(self.ls._list_owned(), set())
942 self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
943 self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
945 self.assertEquals(self.ls._list_owned(), set())
947 def testNoDoubleAcquire(self):
948 self.ls.acquire('one')
949 self.assertRaises(AssertionError, self.ls.acquire, 'one')
950 self.assertRaises(AssertionError, self.ls.acquire, ['two'])
951 self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
953 self.ls.acquire(['one', 'three'])
954 self.ls.release('one')
955 self.assertRaises(AssertionError, self.ls.acquire, ['two'])
956 self.ls.release('three')
958 def testNoWrongRelease(self):
959 self.assertRaises(AssertionError, self.ls.release)
960 self.ls.acquire('one')
961 self.assertRaises(AssertionError, self.ls.release, 'two')
963 def testAddRemove(self):
965 self.assertEquals(self.ls._list_owned(), set())
966 self.assert_('four' in self.ls._names())
967 self.ls.add(['five', 'six', 'seven'], acquired=1)
968 self.assert_('five' in self.ls._names())
969 self.assert_('six' in self.ls._names())
970 self.assert_('seven' in self.ls._names())
971 self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
972 self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
973 self.assert_('five' not in self.ls._names())
974 self.assert_('six' not in self.ls._names())
975 self.assertEquals(self.ls._list_owned(), set(['seven']))
976 self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
977 self.ls.remove('seven')
978 self.assert_('seven' not in self.ls._names())
979 self.assertEquals(self.ls._list_owned(), set([]))
980 self.ls.acquire(None, shared=1)
981 self.assertRaises(AssertionError, self.ls.add, 'eight')
983 self.ls.acquire(None)
984 self.ls.add('eight', acquired=1)
985 self.assert_('eight' in self.ls._names())
986 self.assert_('eight' in self.ls._list_owned())
988 self.assert_('nine' in self.ls._names())
989 self.assert_('nine' not in self.ls._list_owned())
991 self.ls.remove(['two'])
992 self.assert_('two' not in self.ls._names())
993 self.ls.acquire('three')
994 self.assertEquals(self.ls.remove(['three']), ['three'])
995 self.assert_('three' not in self.ls._names())
996 self.assertEquals(self.ls.remove('three'), [])
997 self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
998 self.assert_('one' not in self.ls._names())
1000 def testRemoveNonBlocking(self):
1001 self.ls.acquire('one')
1002 self.assertEquals(self.ls.remove('one'), ['one'])
1003 self.ls.acquire(['two', 'three'])
1004 self.assertEquals(self.ls.remove(['two', 'three']),
1007 def testNoDoubleAdd(self):
1008 self.assertRaises(errors.LockError, self.ls.add, 'two')
1010 self.assertRaises(errors.LockError, self.ls.add, 'four')
1012 def testNoWrongRemoves(self):
1013 self.ls.acquire(['one', 'three'], shared=1)
1014 # Cannot remove 'two' while holding something which is not a superset
1015 self.assertRaises(AssertionError, self.ls.remove, 'two')
1016 # Cannot remove 'three' as we are sharing it
1017 self.assertRaises(AssertionError, self.ls.remove, 'three')
1019 def testAcquireSetLock(self):
1020 # acquire the set-lock exclusively
1021 self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
1022 self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
1023 self.assertEquals(self.ls._is_owned(), True)
1024 self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
1025 # I can still add/remove elements...
1026 self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
1027 self.assert_(self.ls.add('six'))
1029 # share the set-lock
1030 self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
1031 # adding new elements is not possible
1032 self.assertRaises(AssertionError, self.ls.add, 'five')
1035 def testAcquireWithRepetitions(self):
1036 self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
1037 set(['two', 'two', 'three']))
1038 self.ls.release(['two', 'two'])
1039 self.assertEquals(self.ls._list_owned(), set(['three']))
1041 def testEmptyAcquire(self):
1042 # Acquire an empty list of locks...
1043 self.assertEquals(self.ls.acquire([]), set())
1044 self.assertEquals(self.ls._list_owned(), set())
1045 # New locks can still be addded
1046 self.assert_(self.ls.add('six'))
1047 # "re-acquiring" is not an issue, since we had really acquired nothing
1048 self.assertEquals(self.ls.acquire([], shared=1), set())
1049 self.assertEquals(self.ls._list_owned(), set())
1050 # We haven't really acquired anything, so we cannot release
1051 self.assertRaises(AssertionError, self.ls.release)
1053 def _doLockSet(self, names, shared):
1055 self.ls.acquire(names, shared=shared)
1056 self.done.put('DONE')
1058 except errors.LockError:
1059 self.done.put('ERR')
1061 def _doAddSet(self, names):
1063 self.ls.add(names, acquired=1)
1064 self.done.put('DONE')
1066 except errors.LockError:
1067 self.done.put('ERR')
1069 def _doRemoveSet(self, names):
1070 self.done.put(self.ls.remove(names))
1073 def testConcurrentSharedAcquire(self):
1074 self.ls.acquire(['one', 'two'], shared=1)
1075 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1077 self.assertEqual(self.done.get_nowait(), 'DONE')
1078 self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
1080 self.assertEqual(self.done.get_nowait(), 'DONE')
1081 self._addThread(target=self._doLockSet, args=('three', 1))
1083 self.assertEqual(self.done.get_nowait(), 'DONE')
1084 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1085 self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1086 self.assertRaises(Queue.Empty, self.done.get_nowait)
1089 self.assertEqual(self.done.get_nowait(), 'DONE')
1090 self.assertEqual(self.done.get_nowait(), 'DONE')
1093 def testConcurrentExclusiveAcquire(self):
1094 self.ls.acquire(['one', 'two'])
1095 self._addThread(target=self._doLockSet, args=('three', 1))
1097 self.assertEqual(self.done.get_nowait(), 'DONE')
1098 self._addThread(target=self._doLockSet, args=('three', 0))
1100 self.assertEqual(self.done.get_nowait(), 'DONE')
1101 self.assertRaises(Queue.Empty, self.done.get_nowait)
1102 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1103 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1104 self._addThread(target=self._doLockSet, args=('one', 0))
1105 self._addThread(target=self._doLockSet, args=('one', 1))
1106 self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1107 self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
1108 self.assertRaises(Queue.Empty, self.done.get_nowait)
1112 self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1115 def testSimpleAcquireTimeoutExpiring(self):
1116 names = sorted(self.ls._names())
1117 self.assert_(len(names) >= 3)
1119 # Get name of first lock
1122 # Get name of last lock
1126 # Block first and try to lock it again
1129 # Block last and try to lock all locks
1132 # Block last and try to lock it again
1136 for (wanted, block) in checks:
1137 # Lock in exclusive mode
1138 self.assert_(self.ls.acquire(block, shared=0))
1141 # Try to get the same lock again with a timeout (should never succeed)
1142 acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1144 self.done.put("acquired")
1147 self.assert_(acquired is None)
1148 self.assertFalse(self.ls._list_owned())
1149 self.assertFalse(self.ls._is_owned())
1150 self.done.put("not acquired")
1152 self._addThread(target=_AcquireOne)
1154 # Wait for timeout in thread to expire
1157 # Release exclusive lock again
1160 self.assertEqual(self.done.get_nowait(), "not acquired")
1161 self.assertRaises(Queue.Empty, self.done.get_nowait)
1164 def testDelayedAndExpiringLockAcquire(self):
1166 self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1168 for expire in (False, True):
1169 names = sorted(self.ls._names())
1170 self.assertEqual(len(names), 8)
1172 lock_ev = dict([(i, threading.Event()) for i in names])
1174 # Lock all in exclusive mode
1175 self.assert_(self.ls.acquire(names, shared=0))
1178 # We'll wait at least 300ms per lock
1179 lockwait = len(names) * [0.3]
1181 # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1182 # this gives us up to 2.4s to fail.
1183 lockall_timeout = 0.4
1185 # This should finish rather quickly
1187 lockall_timeout = len(names) * 5.0
1190 def acquire_notification(name):
1192 self.done.put("getting %s" % name)
1197 if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1198 test_notify=acquire_notification):
1199 self.done.put("got all")
1202 self.done.put("timeout on all")
1205 for ev in lock_ev.values():
1208 t = self._addThread(target=_LockAll)
1210 for idx, name in enumerate(names):
1211 # Wait for actual acquire on this lock to start
1212 lock_ev[name].wait(10.0)
1214 if expire and t.isAlive():
1215 # Wait some time after getting the notification to make sure the lock
1216 # acquire will expire
1217 SafeSleep(lockwait[idx])
1219 self.ls.release(names=name)
1221 self.assertFalse(self.ls._list_owned())
1226 # Not checking which locks were actually acquired. Doing so would be
1227 # too timing-dependant.
1228 self.assertEqual(self.done.get_nowait(), "timeout on all")
1231 self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1232 self.assertEqual(self.done.get_nowait(), "got all")
1233 self.assertRaises(Queue.Empty, self.done.get_nowait)
1236 def testConcurrentRemove(self):
1238 self.ls.acquire(['one', 'two', 'four'])
1239 self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1240 self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1241 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1242 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1243 self.assertRaises(Queue.Empty, self.done.get_nowait)
1244 self.ls.remove('one')
1248 self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1249 self.ls.add(['five', 'six'], acquired=1)
1250 self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1251 self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1252 self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1253 self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1254 self.ls.remove('five')
1258 self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1259 self.ls.acquire(['three', 'four'])
1260 self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1261 self.assertRaises(Queue.Empty, self.done.get_nowait)
1262 self.ls.remove('four')
1264 self.assertEqual(self.done.get_nowait(), ['six'])
1265 self._addThread(target=self._doRemoveSet, args=(['two']))
1267 self.assertEqual(self.done.get_nowait(), ['two'])
1273 def testConcurrentSharedSetLock(self):
1274 # share the set-lock...
1275 self.ls.acquire(None, shared=1)
1276 # ...another thread can share it too
1277 self._addThread(target=self._doLockSet, args=(None, 1))
1279 self.assertEqual(self.done.get_nowait(), 'DONE')
1280 # ...or just share some elements
1281 self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1283 self.assertEqual(self.done.get_nowait(), 'DONE')
1284 # ...but not add new ones or remove any
1285 t = self._addThread(target=self._doAddSet, args=(['nine']))
1286 self._addThread(target=self._doRemoveSet, args=(['two'], ))
1287 self.assertRaises(Queue.Empty, self.done.get_nowait)
1288 # this just releases the set-lock
1291 self.assertEqual(self.done.get_nowait(), 'DONE')
1292 # release the lock on the actual elements so remove() can proceed too
1295 self.failUnlessEqual(self.done.get_nowait(), ['two'])
1300 def testConcurrentExclusiveSetLock(self):
1301 # acquire the set-lock...
1302 self.ls.acquire(None, shared=0)
1303 # ...no one can do anything else
1304 self._addThread(target=self._doLockSet, args=(None, 1))
1305 self._addThread(target=self._doLockSet, args=(None, 0))
1306 self._addThread(target=self._doLockSet, args=(['three'], 0))
1307 self._addThread(target=self._doLockSet, args=(['two'], 1))
1308 self._addThread(target=self._doAddSet, args=(['nine']))
1309 self.assertRaises(Queue.Empty, self.done.get_nowait)
1313 self.assertEqual(self.done.get(True, 1), 'DONE')
1318 def testConcurrentSetLockAdd(self):
1319 self.ls.acquire('one')
1320 # Another thread wants the whole SetLock
1321 self._addThread(target=self._doLockSet, args=(None, 0))
1322 self._addThread(target=self._doLockSet, args=(None, 1))
1323 self.assertRaises(Queue.Empty, self.done.get_nowait)
1324 self.assertRaises(AssertionError, self.ls.add, 'four')
1327 self.assertEqual(self.done.get_nowait(), 'DONE')
1328 self.assertEqual(self.done.get_nowait(), 'DONE')
1329 self.ls.acquire(None)
1330 self._addThread(target=self._doLockSet, args=(None, 0))
1331 self._addThread(target=self._doLockSet, args=(None, 1))
1332 self.assertRaises(Queue.Empty, self.done.get_nowait)
1334 self.ls.add('five', acquired=1)
1335 self.ls.add('six', acquired=1, shared=1)
1336 self.assertEquals(self.ls._list_owned(),
1337 set(['one', 'two', 'three', 'five', 'six']))
1338 self.assertEquals(self.ls._is_owned(), True)
1339 self.assertEquals(self.ls._names(),
1340 set(['one', 'two', 'three', 'four', 'five', 'six']))
1343 self.assertEqual(self.done.get_nowait(), 'DONE')
1344 self.assertEqual(self.done.get_nowait(), 'DONE')
1348 def testEmptyLockSet(self):
1350 self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1352 self.ls.remove(['one', 'two', 'three'])
1353 # and adds/locks by another thread still wait
1354 self._addThread(target=self._doAddSet, args=(['nine']))
1355 self._addThread(target=self._doLockSet, args=(None, 1))
1356 self._addThread(target=self._doLockSet, args=(None, 0))
1357 self.assertRaises(Queue.Empty, self.done.get_nowait)
1361 self.assertEqual(self.done.get_nowait(), 'DONE')
1363 self.assertEqual(self.ls.remove(['nine']), ['nine'])
1365 self.assertEqual(self.ls.acquire(None, shared=1), set())
1366 # other sharers can go, adds still wait
1367 self._addThread(target=self._doLockSet, args=(None, 1))
1369 self.assertEqual(self.done.get_nowait(), 'DONE')
1370 self._addThread(target=self._doAddSet, args=(['nine']))
1371 self.assertRaises(Queue.Empty, self.done.get_nowait)
1374 self.assertEqual(self.done.get_nowait(), 'DONE')
1377 def testPriority(self):
1378 def _Acquire(prev, next, name, priority, success_fn):
1380 self.assert_(self.ls.acquire(name, shared=0,
1382 test_notify=lambda _: next.set()))
1388 # Get all in exclusive mode
1389 self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
1391 done_two = Queue.Queue(0)
1393 first = threading.Event()
1396 acquires = [("one", prio, self.done) for prio in range(1, 33)]
1397 acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
1399 # Use a deterministic random generator
1400 random.Random(741).shuffle(acquires)
1402 for (name, prio, done) in acquires:
1403 ev = threading.Event()
1404 self._addThread(target=_Acquire,
1405 args=(prev, ev, name, prio,
1406 compat.partial(done.put, "Prio%s" % prio)))
1412 # Wait for last acquire to start
1415 # Let threads acquire locks
1418 # Wait for threads to finish
1421 for i in range(1, 33):
1422 self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
1423 self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
1425 self.assertRaises(Queue.Empty, self.done.get_nowait)
1426 self.assertRaises(Queue.Empty, done_two.get_nowait)
1429 class TestGanetiLockManager(_ThreadedTestCase):
1432 _ThreadedTestCase.setUp(self)
1433 self.nodes=['n1', 'n2']
1434 self.nodegroups=['g1', 'g2']
1435 self.instances=['i1', 'i2', 'i3']
1436 self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups,
1440 # Don't try this at home...
1441 locking.GanetiLockManager._instance = None
1443 def testLockingConstants(self):
1444 # The locking library internally cheats by assuming its constants have some
1445 # relationships with each other. Check those hold true.
1446 # This relationship is also used in the Processor to recursively acquire
1447 # the right locks. Again, please don't break it.
1448 for i in range(len(locking.LEVELS)):
1449 self.assertEqual(i, locking.LEVELS[i])
1451 def testDoubleGLFails(self):
1452 self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [])
1454 def testLockNames(self):
1455 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1456 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1457 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1458 set(self.nodegroups))
1459 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1460 set(self.instances))
1462 def testInitAndResources(self):
1463 locking.GanetiLockManager._instance = None
1464 self.GL = locking.GanetiLockManager([], [], [])
1465 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1466 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1467 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1468 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1470 locking.GanetiLockManager._instance = None
1471 self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [])
1472 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1473 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1474 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1475 set(self.nodegroups))
1476 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1478 locking.GanetiLockManager._instance = None
1479 self.GL = locking.GanetiLockManager([], [], self.instances)
1480 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1481 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1482 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1483 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1484 set(self.instances))
1486 def testAcquireRelease(self):
1487 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1488 self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1489 self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1490 self.GL.acquire(locking.LEVEL_NODEGROUP, ['g2'])
1491 self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1492 self.GL.release(locking.LEVEL_NODE, ['n2'])
1493 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1494 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1495 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1496 self.GL.release(locking.LEVEL_NODE)
1497 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1498 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1499 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1500 self.GL.release(locking.LEVEL_NODEGROUP)
1501 self.GL.release(locking.LEVEL_INSTANCE)
1502 self.assertRaises(errors.LockError, self.GL.acquire,
1503 locking.LEVEL_INSTANCE, ['i5'])
1504 self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1505 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1507 def testAcquireWholeSets(self):
1508 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1509 self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1510 set(self.instances))
1511 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1512 set(self.instances))
1513 self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
1514 set(self.nodegroups))
1515 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP),
1516 set(self.nodegroups))
1517 self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1519 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1521 self.GL.release(locking.LEVEL_NODE)
1522 self.GL.release(locking.LEVEL_NODEGROUP)
1523 self.GL.release(locking.LEVEL_INSTANCE)
1524 self.GL.release(locking.LEVEL_CLUSTER)
1526 def testAcquireWholeAndPartial(self):
1527 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1528 self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1529 set(self.instances))
1530 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1531 set(self.instances))
1532 self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1534 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1536 self.GL.release(locking.LEVEL_NODE)
1537 self.GL.release(locking.LEVEL_INSTANCE)
1538 self.GL.release(locking.LEVEL_CLUSTER)
1540 def testBGLDependency(self):
1541 self.assertRaises(AssertionError, self.GL.acquire,
1542 locking.LEVEL_NODE, ['n1', 'n2'])
1543 self.assertRaises(AssertionError, self.GL.acquire,
1544 locking.LEVEL_INSTANCE, ['i3'])
1545 self.assertRaises(AssertionError, self.GL.acquire,
1546 locking.LEVEL_NODEGROUP, ['g1'])
1547 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1548 self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1549 self.assertRaises(AssertionError, self.GL.release,
1550 locking.LEVEL_CLUSTER, ['BGL'])
1551 self.assertRaises(AssertionError, self.GL.release,
1552 locking.LEVEL_CLUSTER)
1553 self.GL.release(locking.LEVEL_NODE)
1554 self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1555 self.assertRaises(AssertionError, self.GL.release,
1556 locking.LEVEL_CLUSTER, ['BGL'])
1557 self.assertRaises(AssertionError, self.GL.release,
1558 locking.LEVEL_CLUSTER)
1559 self.GL.release(locking.LEVEL_INSTANCE)
1560 self.GL.acquire(locking.LEVEL_NODEGROUP, None)
1561 self.GL.release(locking.LEVEL_NODEGROUP, ['g1'])
1562 self.assertRaises(AssertionError, self.GL.release,
1563 locking.LEVEL_CLUSTER, ['BGL'])
1564 self.assertRaises(AssertionError, self.GL.release,
1565 locking.LEVEL_CLUSTER)
1566 self.GL.release(locking.LEVEL_NODEGROUP)
1567 self.GL.release(locking.LEVEL_CLUSTER)
1569 def testWrongOrder(self):
1570 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1571 self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1572 self.assertRaises(AssertionError, self.GL.acquire,
1573 locking.LEVEL_NODE, ['n1'])
1574 self.assertRaises(AssertionError, self.GL.acquire,
1575 locking.LEVEL_NODEGROUP, ['g1'])
1576 self.assertRaises(AssertionError, self.GL.acquire,
1577 locking.LEVEL_INSTANCE, ['i2'])
1579 def testModifiableLevels(self):
1580 self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER,
1582 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'])
1583 self.GL.add(locking.LEVEL_INSTANCE, ['i4'])
1584 self.GL.remove(locking.LEVEL_INSTANCE, ['i3'])
1585 self.GL.remove(locking.LEVEL_INSTANCE, ['i1'])
1586 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(['i2', 'i4']))
1587 self.GL.add(locking.LEVEL_NODE, ['n3'])
1588 self.GL.remove(locking.LEVEL_NODE, ['n1'])
1589 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(['n2', 'n3']))
1590 self.GL.add(locking.LEVEL_NODEGROUP, ['g3'])
1591 self.GL.remove(locking.LEVEL_NODEGROUP, ['g2'])
1592 self.GL.remove(locking.LEVEL_NODEGROUP, ['g1'])
1593 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(['g3']))
1594 self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER,
1597 # Helper function to run as a thread that shared the BGL and then acquires
1598 # some locks at another level.
1599 def _doLock(self, level, names, shared):
1601 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1602 self.GL.acquire(level, names, shared=shared)
1603 self.done.put('DONE')
1604 self.GL.release(level)
1605 self.GL.release(locking.LEVEL_CLUSTER)
1606 except errors.LockError:
1607 self.done.put('ERR')
1610 def testConcurrency(self):
1611 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1612 self._addThread(target=self._doLock,
1613 args=(locking.LEVEL_INSTANCE, 'i1', 1))
1615 self.assertEqual(self.done.get_nowait(), 'DONE')
1616 self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1617 self._addThread(target=self._doLock,
1618 args=(locking.LEVEL_INSTANCE, 'i1', 1))
1620 self.assertEqual(self.done.get_nowait(), 'DONE')
1621 self._addThread(target=self._doLock,
1622 args=(locking.LEVEL_INSTANCE, 'i3', 1))
1623 self.assertRaises(Queue.Empty, self.done.get_nowait)
1624 self.GL.release(locking.LEVEL_INSTANCE)
1626 self.assertEqual(self.done.get_nowait(), 'DONE')
1627 self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1628 self._addThread(target=self._doLock,
1629 args=(locking.LEVEL_INSTANCE, 'i2', 1))
1631 self.assertEqual(self.done.get_nowait(), 'DONE')
1632 self._addThread(target=self._doLock,
1633 args=(locking.LEVEL_INSTANCE, 'i2', 0))
1634 self.assertRaises(Queue.Empty, self.done.get_nowait)
1635 self.GL.release(locking.LEVEL_INSTANCE)
1637 self.assertEqual(self.done.get(True, 1), 'DONE')
1638 self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1641 class TestLockMonitor(_ThreadedTestCase):
1643 _ThreadedTestCase.setUp(self)
1644 self.lm = locking.LockMonitor()
1646 def testSingleThread(self):
1649 for i in range(100):
1650 name = "TestLock%s" % i
1651 locks.append(locking.SharedLock(name, monitor=self.lm))
1653 self.assertEqual(len(self.lm._locks), len(locks))
1654 result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"]))
1655 self.assertEqual(len(result.fields), 1)
1656 self.assertEqual(len(result.data), 100)
1661 # The garbage collector might needs some time
1664 raise utils.RetryAgain()
1666 utils.Retry(_CheckLocks, 0.1, 30.0)
1668 self.assertFalse(self.lm._locks)
1670 def testMultiThread(self):
1673 def _CreateLock(prev, next, name):
1675 locks.append(locking.SharedLock(name, monitor=self.lm))
1681 first = threading.Event()
1684 # Use a deterministic random generator
1685 for i in random.Random(4263).sample(range(100), 33):
1686 name = "MtTestLock%s" % i
1687 expnames.append(name)
1689 ev = threading.Event()
1690 self._addThread(target=_CreateLock, args=(prev, ev, name))
1697 # Check order in which locks were added
1698 self.assertEqual([i.name for i in locks], expnames)
1700 # Check query result
1701 result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1702 self.assert_(isinstance(result, dict))
1703 response = objects.QueryResponse.FromDict(result)
1704 self.assertEqual(response.data,
1705 [[(constants.RS_NORMAL, name),
1706 (constants.RS_NORMAL, None),
1707 (constants.RS_NORMAL, None),
1708 (constants.RS_NORMAL, [])]
1709 for name in utils.NiceSort(expnames)])
1710 self.assertEqual(len(response.fields), 4)
1711 self.assertEqual(["name", "mode", "owner", "pending"],
1712 [fdef.name for fdef in response.fields])
1714 # Test exclusive acquire
1715 for tlock in locks[::4]:
1716 tlock.acquire(shared=0)
1718 def _GetExpResult(name):
1719 if tlock.name == name:
1720 return [(constants.RS_NORMAL, name),
1721 (constants.RS_NORMAL, "exclusive"),
1722 (constants.RS_NORMAL,
1723 [threading.currentThread().getName()]),
1724 (constants.RS_NORMAL, [])]
1725 return [(constants.RS_NORMAL, name),
1726 (constants.RS_NORMAL, None),
1727 (constants.RS_NORMAL, None),
1728 (constants.RS_NORMAL, [])]
1730 result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1731 self.assertEqual(objects.QueryResponse.FromDict(result).data,
1732 [_GetExpResult(name)
1733 for name in utils.NiceSort(expnames)])
1737 # Test shared acquire
1738 def _Acquire(lock, shared, ev, notify):
1739 lock.acquire(shared=shared)
1746 for tlock1 in locks[::11]:
1747 for tlock2 in locks[::-15]:
1748 if tlock2 == tlock1:
1752 for tlock3 in locks[::10]:
1753 if tlock3 in (tlock2, tlock1):
1757 releaseev = threading.Event()
1763 ev = threading.Event()
1764 tthreads1.append(self._addThread(target=_Acquire,
1765 args=(tlock1, 1, releaseev, ev)))
1766 acquireev.append(ev)
1768 ev = threading.Event()
1769 tthread2 = self._addThread(target=_Acquire,
1770 args=(tlock2, 1, releaseev, ev))
1771 acquireev.append(ev)
1773 ev = threading.Event()
1774 tthread3 = self._addThread(target=_Acquire,
1775 args=(tlock3, 0, releaseev, ev))
1776 acquireev.append(ev)
1778 # Wait for all locks to be acquired
1782 # Check query result
1783 result = self.lm.QueryLocks(["name", "mode", "owner"])
1784 response = objects.QueryResponse.FromDict(result)
1785 for (name, mode, owner) in response.data:
1786 (name_status, name_value) = name
1787 (owner_status, owner_value) = owner
1789 self.assertEqual(name_status, constants.RS_NORMAL)
1790 self.assertEqual(owner_status, constants.RS_NORMAL)
1792 if name_value == tlock1.name:
1793 self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
1794 self.assertEqual(set(owner_value),
1795 set(i.getName() for i in tthreads1))
1798 if name_value == tlock2.name:
1799 self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
1800 self.assertEqual(owner_value, [tthread2.getName()])
1803 if name_value == tlock3.name:
1804 self.assertEqual(mode, (constants.RS_NORMAL, "exclusive"))
1805 self.assertEqual(owner_value, [tthread3.getName()])
1808 self.assert_(name_value in expnames)
1809 self.assertEqual(mode, (constants.RS_NORMAL, None))
1810 self.assert_(owner_value is None)
1812 # Release locks again
1817 result = self.lm.QueryLocks(["name", "mode", "owner"])
1818 self.assertEqual(objects.QueryResponse.FromDict(result).data,
1819 [[(constants.RS_NORMAL, name),
1820 (constants.RS_NORMAL, None),
1821 (constants.RS_NORMAL, None)]
1822 for name in utils.NiceSort(expnames)])
1824 def testDelete(self):
1825 lock = locking.SharedLock("TestLock", monitor=self.lm)
1827 self.assertEqual(len(self.lm._locks), 1)
1828 result = self.lm.QueryLocks(["name", "mode", "owner"])
1829 self.assertEqual(objects.QueryResponse.FromDict(result).data,
1830 [[(constants.RS_NORMAL, lock.name),
1831 (constants.RS_NORMAL, None),
1832 (constants.RS_NORMAL, None)]])
1836 result = self.lm.QueryLocks(["name", "mode", "owner"])
1837 self.assertEqual(objects.QueryResponse.FromDict(result).data,
1838 [[(constants.RS_NORMAL, lock.name),
1839 (constants.RS_NORMAL, "deleted"),
1840 (constants.RS_NORMAL, None)]])
1841 self.assertEqual(len(self.lm._locks), 1)
1843 def testPending(self):
1844 def _Acquire(lock, shared, prev, next):
1847 lock.acquire(shared=shared, test_notify=next.set)
1853 lock = locking.SharedLock("ExcLock", monitor=self.lm)
1855 for shared in [0, 1]:
1858 self.assertEqual(len(self.lm._locks), 1)
1859 result = self.lm.QueryLocks(["name", "mode", "owner"])
1860 self.assertEqual(objects.QueryResponse.FromDict(result).data,
1861 [[(constants.RS_NORMAL, lock.name),
1862 (constants.RS_NORMAL, "exclusive"),
1863 (constants.RS_NORMAL,
1864 [threading.currentThread().getName()])]])
1868 first = threading.Event()
1872 ev = threading.Event()
1873 threads.append(self._addThread(target=_Acquire,
1874 args=(lock, shared, prev, ev)))
1880 # Wait for last acquire to start waiting
1883 # NOTE: This works only because QueryLocks will acquire the
1884 # lock-internal lock again and won't be able to get the information
1885 # until it has the lock. By then the acquire should be registered in
1886 # SharedLock.__pending (otherwise it's a bug).
1888 # All acquires are waiting now
1890 pending = [("shared", utils.NiceSort(t.getName() for t in threads))]
1892 pending = [("exclusive", [t.getName()]) for t in threads]
1894 result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1895 self.assertEqual(objects.QueryResponse.FromDict(result).data,
1896 [[(constants.RS_NORMAL, lock.name),
1897 (constants.RS_NORMAL, "exclusive"),
1898 (constants.RS_NORMAL,
1899 [threading.currentThread().getName()]),
1900 (constants.RS_NORMAL, pending)]])
1902 self.assertEqual(len(self.lm._locks), 1)
1908 # No pending acquires
1909 result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1910 self.assertEqual(objects.QueryResponse.FromDict(result).data,
1911 [[(constants.RS_NORMAL, lock.name),
1912 (constants.RS_NORMAL, None),
1913 (constants.RS_NORMAL, None),
1914 (constants.RS_NORMAL, [])]])
1916 self.assertEqual(len(self.lm._locks), 1)
1918 def testDeleteAndRecreate(self):
1919 lname = "TestLock101923193"
1921 # Create some locks with the same name and keep all references
1922 locks = [locking.SharedLock(lname, monitor=self.lm)
1925 self.assertEqual(len(self.lm._locks), len(locks))
1927 result = self.lm.QueryLocks(["name", "mode", "owner"])
1928 self.assertEqual(objects.QueryResponse.FromDict(result).data,
1929 [[(constants.RS_NORMAL, lname),
1930 (constants.RS_NORMAL, None),
1931 (constants.RS_NORMAL, None)]] * 5)
1935 # Check information order
1936 result = self.lm.QueryLocks(["name", "mode", "owner"])
1937 self.assertEqual(objects.QueryResponse.FromDict(result).data,
1938 [[(constants.RS_NORMAL, lname),
1939 (constants.RS_NORMAL, None),
1940 (constants.RS_NORMAL, None)]] * 2 +
1941 [[(constants.RS_NORMAL, lname),
1942 (constants.RS_NORMAL, "deleted"),
1943 (constants.RS_NORMAL, None)]] +
1944 [[(constants.RS_NORMAL, lname),
1945 (constants.RS_NORMAL, None),
1946 (constants.RS_NORMAL, None)]] * 2)
1948 locks[1].acquire(shared=0)
1951 [(constants.RS_NORMAL, lname),
1952 (constants.RS_NORMAL, None),
1953 (constants.RS_NORMAL, None)],
1954 [(constants.RS_NORMAL, lname),
1955 (constants.RS_NORMAL, "exclusive"),
1956 (constants.RS_NORMAL, [threading.currentThread().getName()])],
1957 [(constants.RS_NORMAL, lname),
1958 (constants.RS_NORMAL, "deleted"),
1959 (constants.RS_NORMAL, None)],
1960 [(constants.RS_NORMAL, lname),
1961 (constants.RS_NORMAL, None),
1962 (constants.RS_NORMAL, None)],
1963 [(constants.RS_NORMAL, lname),
1964 (constants.RS_NORMAL, None),
1965 (constants.RS_NORMAL, None)],
1968 # Check information order
1969 result = self.lm.QueryLocks(["name", "mode", "owner"])
1970 self.assertEqual(objects.QueryResponse.FromDict(result).data, last_status)
1972 self.assertEqual(len(set(self.lm._locks.values())), len(locks))
1973 self.assertEqual(len(self.lm._locks), len(locks))
1975 # Check lock deletion
1976 for idx in range(len(locks)):
1978 assert gc.isenabled()
1980 self.assertEqual(len(self.lm._locks), len(locks))
1981 result = self.lm.QueryLocks(["name", "mode", "owner"])
1982 self.assertEqual(objects.QueryResponse.FromDict(result).data,
1983 last_status[idx + 1:])
1985 # All locks should have been deleted
1987 self.assertFalse(self.lm._locks)
1989 result = self.lm.QueryLocks(["name", "mode", "owner"])
1990 self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
1993 if __name__ == '__main__':
1994 testutils.GanetiTestProgram()