4 # Copyright (C) 2006, 2007, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the locking module"""
34 from ganeti import constants
35 from ganeti import locking
36 from ganeti import errors
37 from ganeti import utils
38 from ganeti import compat
39 from ganeti import objects
40 from ganeti import query
45 # This is used to test the ssynchronize decorator.
46 # Since it's passed as input to a decorator it must be declared as a global.
47 _decoratorlock = locking.SharedLock("decorator lock")
49 #: List for looping tests
54 """Decorator for executing a function many times"""
55 def wrapper(*args, **kwargs):
61 def SafeSleep(duration):
64 delay = start + duration - time.time()
70 class _ThreadedTestCase(unittest.TestCase):
71 """Test class that supports adding/waiting on threads"""
73 unittest.TestCase.setUp(self)
74 self.done = Queue.Queue(0)
77 def _addThread(self, *args, **kwargs):
78 """Create and remember a new thread"""
79 t = threading.Thread(*args, **kwargs)
80 self.threads.append(t)
84 def _waitThreads(self):
85 """Wait for all our threads to finish"""
86 for t in self.threads:
88 self.failIf(t.isAlive())
92 class _ConditionTestCase(_ThreadedTestCase):
93 """Common test case for conditions"""
96 _ThreadedTestCase.setUp(self)
97 self.lock = threading.Lock()
98 self.cond = cls(self.lock)
100 def _testAcquireRelease(self):
101 self.assertFalse(self.cond._is_owned())
102 self.assertRaises(RuntimeError, self.cond.wait, None)
103 self.assertRaises(RuntimeError, self.cond.notifyAll)
106 self.assert_(self.cond._is_owned())
107 self.cond.notifyAll()
108 self.assert_(self.cond._is_owned())
111 self.assertFalse(self.cond._is_owned())
112 self.assertRaises(RuntimeError, self.cond.wait, None)
113 self.assertRaises(RuntimeError, self.cond.notifyAll)
115 def _testNotification(self):
120 self.cond.notifyAll()
125 self._addThread(target=_NotifyAll)
126 self.assertEqual(self.done.get(True, 1), "NE")
127 self.assertRaises(Queue.Empty, self.done.get_nowait)
129 self.assertEqual(self.done.get(True, 1), "NA")
130 self.assertEqual(self.done.get(True, 1), "NN")
131 self.assert_(self.cond._is_owned())
133 self.assertFalse(self.cond._is_owned())
136 class TestSingleNotifyPipeCondition(_ConditionTestCase):
137 """SingleNotifyPipeCondition tests"""
140 _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
142 def testAcquireRelease(self):
143 self._testAcquireRelease()
145 def testNotification(self):
146 self._testNotification()
148 def testWaitReuse(self):
154 def testNoNotifyReuse(self):
156 self.cond.notifyAll()
157 self.assertRaises(RuntimeError, self.cond.wait, None)
158 self.assertRaises(RuntimeError, self.cond.notifyAll)
162 class TestPipeCondition(_ConditionTestCase):
163 """PipeCondition tests"""
166 _ConditionTestCase.setUp(self, locking.PipeCondition)
168 def testAcquireRelease(self):
169 self._testAcquireRelease()
171 def testNotification(self):
172 self._testNotification()
174 def _TestWait(self, fn):
176 self._addThread(target=fn),
177 self._addThread(target=fn),
178 self._addThread(target=fn),
181 # Wait for threads to be waiting
183 self.assertEqual(self.done.get(True, 1), "A")
185 self.assertRaises(Queue.Empty, self.done.get_nowait)
188 self.assertEqual(len(self.cond._waiters), 3)
189 self.assertEqual(self.cond._waiters, set(threads))
190 # This new thread can't acquire the lock, and thus call wait, before we
192 self._addThread(target=fn)
193 self.cond.notifyAll()
194 self.assertRaises(Queue.Empty, self.done.get_nowait)
197 # We should now get 3 W and 1 A (for the new thread) in whatever order
201 got = self.done.get(True, 1)
207 self.fail("Got %s on the done queue" % got)
209 self.assertEqual(w, 3)
210 self.assertEqual(a, 1)
213 self.cond.notifyAll()
216 self.assertEqual(self.done.get_nowait(), "W")
217 self.assertRaises(Queue.Empty, self.done.get_nowait)
219 def testBlockingWait(self):
227 self._TestWait(_BlockingWait)
229 def testLongTimeoutWait(self):
237 self._TestWait(_Helper)
239 def _TimeoutWait(self, timeout, check):
241 self.cond.wait(timeout)
245 def testShortTimeoutWait(self):
246 self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
247 self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
249 self.assertEqual(self.done.get_nowait(), "T1")
250 self.assertEqual(self.done.get_nowait(), "T1")
251 self.assertRaises(Queue.Empty, self.done.get_nowait)
253 def testZeroTimeoutWait(self):
254 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
255 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
256 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
258 self.assertEqual(self.done.get_nowait(), "T0")
259 self.assertEqual(self.done.get_nowait(), "T0")
260 self.assertEqual(self.done.get_nowait(), "T0")
261 self.assertRaises(Queue.Empty, self.done.get_nowait)
264 class TestSharedLock(_ThreadedTestCase):
265 """SharedLock tests"""
268 _ThreadedTestCase.setUp(self)
269 self.sl = locking.SharedLock("TestSharedLock")
271 def testSequenceAndOwnership(self):
272 self.assertFalse(self.sl.is_owned())
273 self.sl.acquire(shared=1)
274 self.assert_(self.sl.is_owned())
275 self.assert_(self.sl.is_owned(shared=1))
276 self.assertFalse(self.sl.is_owned(shared=0))
278 self.assertFalse(self.sl.is_owned())
280 self.assert_(self.sl.is_owned())
281 self.assertFalse(self.sl.is_owned(shared=1))
282 self.assert_(self.sl.is_owned(shared=0))
284 self.assertFalse(self.sl.is_owned())
285 self.sl.acquire(shared=1)
286 self.assert_(self.sl.is_owned())
287 self.assert_(self.sl.is_owned(shared=1))
288 self.assertFalse(self.sl.is_owned(shared=0))
290 self.assertFalse(self.sl.is_owned())
292 def testBooleanValue(self):
293 # semaphores are supposed to return a true value on a successful acquire
294 self.assert_(self.sl.acquire(shared=1))
296 self.assert_(self.sl.acquire())
299 def testDoubleLockingStoE(self):
300 self.sl.acquire(shared=1)
301 self.assertRaises(AssertionError, self.sl.acquire)
303 def testDoubleLockingEtoS(self):
305 self.assertRaises(AssertionError, self.sl.acquire, shared=1)
307 def testDoubleLockingStoS(self):
308 self.sl.acquire(shared=1)
309 self.assertRaises(AssertionError, self.sl.acquire, shared=1)
311 def testDoubleLockingEtoE(self):
313 self.assertRaises(AssertionError, self.sl.acquire)
315 # helper functions: called in a separate thread they acquire the lock, send
316 # their identifier on the done queue, then release it.
317 def _doItSharer(self):
319 self.sl.acquire(shared=1)
322 except errors.LockError:
325 def _doItExclusive(self):
330 except errors.LockError:
333 def _doItDelete(self):
337 except errors.LockError:
340 def testSharersCanCoexist(self):
341 self.sl.acquire(shared=1)
342 threading.Thread(target=self._doItSharer).start()
343 self.assert_(self.done.get(True, 1))
347 def testExclusiveBlocksExclusive(self):
349 self._addThread(target=self._doItExclusive)
350 self.assertRaises(Queue.Empty, self.done.get_nowait)
353 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
356 def testExclusiveBlocksDelete(self):
358 self._addThread(target=self._doItDelete)
359 self.assertRaises(Queue.Empty, self.done.get_nowait)
362 self.failUnlessEqual(self.done.get_nowait(), 'DEL')
363 self.sl = locking.SharedLock(self.sl.name)
366 def testExclusiveBlocksSharer(self):
368 self._addThread(target=self._doItSharer)
369 self.assertRaises(Queue.Empty, self.done.get_nowait)
372 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
375 def testSharerBlocksExclusive(self):
376 self.sl.acquire(shared=1)
377 self._addThread(target=self._doItExclusive)
378 self.assertRaises(Queue.Empty, self.done.get_nowait)
381 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
384 def testSharerBlocksDelete(self):
385 self.sl.acquire(shared=1)
386 self._addThread(target=self._doItDelete)
387 self.assertRaises(Queue.Empty, self.done.get_nowait)
390 self.failUnlessEqual(self.done.get_nowait(), 'DEL')
391 self.sl = locking.SharedLock(self.sl.name)
394 def testWaitingExclusiveBlocksSharer(self):
395 """SKIPPED testWaitingExclusiveBlockSharer"""
398 self.sl.acquire(shared=1)
399 # the lock is acquired in shared mode...
400 self._addThread(target=self._doItExclusive)
401 # ...but now an exclusive is waiting...
402 self._addThread(target=self._doItSharer)
403 # ...so the sharer should be blocked as well
404 self.assertRaises(Queue.Empty, self.done.get_nowait)
407 # The exclusive passed before
408 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
409 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
412 def testWaitingSharerBlocksExclusive(self):
413 """SKIPPED testWaitingSharerBlocksExclusive"""
417 # the lock is acquired in exclusive mode...
418 self._addThread(target=self._doItSharer)
419 # ...but now a sharer is waiting...
420 self._addThread(target=self._doItExclusive)
421 # ...the exclusive is waiting too...
422 self.assertRaises(Queue.Empty, self.done.get_nowait)
425 # The sharer passed before
426 self.assertEqual(self.done.get_nowait(), 'SHR')
427 self.assertEqual(self.done.get_nowait(), 'EXC')
429 def testDelete(self):
431 self.assertRaises(errors.LockError, self.sl.acquire)
432 self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
433 self.assertRaises(errors.LockError, self.sl.delete)
435 def testDeleteTimeout(self):
436 self.sl.delete(timeout=60)
438 def testNoDeleteIfSharer(self):
439 self.sl.acquire(shared=1)
440 self.assertRaises(AssertionError, self.sl.delete)
443 def testDeletePendingSharersExclusiveDelete(self):
445 self._addThread(target=self._doItSharer)
446 self._addThread(target=self._doItSharer)
447 self._addThread(target=self._doItExclusive)
448 self._addThread(target=self._doItDelete)
451 # The threads who were pending return ERR
453 self.assertEqual(self.done.get_nowait(), 'ERR')
454 self.sl = locking.SharedLock(self.sl.name)
457 def testDeletePendingDeleteExclusiveSharers(self):
459 self._addThread(target=self._doItDelete)
460 self._addThread(target=self._doItExclusive)
461 self._addThread(target=self._doItSharer)
462 self._addThread(target=self._doItSharer)
465 # The two threads who were pending return both ERR
466 self.assertEqual(self.done.get_nowait(), 'ERR')
467 self.assertEqual(self.done.get_nowait(), 'ERR')
468 self.assertEqual(self.done.get_nowait(), 'ERR')
469 self.assertEqual(self.done.get_nowait(), 'ERR')
470 self.sl = locking.SharedLock(self.sl.name)
473 def testExclusiveAcquireTimeout(self):
474 for shared in [0, 1]:
475 on_queue = threading.Event()
476 release_exclusive = threading.Event()
478 def _LockExclusive():
479 self.sl.acquire(shared=0, test_notify=on_queue.set)
480 self.done.put("A: start wait")
481 release_exclusive.wait()
482 self.done.put("A: end wait")
485 # Start thread to hold lock in exclusive mode
486 self._addThread(target=_LockExclusive)
488 # Wait for wait to begin
489 self.assertEqual(self.done.get(timeout=60), "A: start wait")
491 # Wait up to 60s to get lock, but release exclusive lock as soon as we're
493 self.failUnless(self.sl.acquire(shared=shared, timeout=60,
494 test_notify=release_exclusive.set))
496 self.done.put("got 2nd")
501 self.assertEqual(self.done.get_nowait(), "A: end wait")
502 self.assertEqual(self.done.get_nowait(), "got 2nd")
503 self.assertRaises(Queue.Empty, self.done.get_nowait)
506 def testAcquireExpiringTimeout(self):
507 def _AcquireWithTimeout(shared, timeout):
508 if not self.sl.acquire(shared=shared, timeout=timeout):
509 self.done.put("timeout")
511 for shared in [0, 1]:
515 # Start shared acquires with timeout between 0 and 20 ms
517 self._addThread(target=_AcquireWithTimeout,
518 args=(shared, i * 2.0 / 1000.0))
520 # Wait for threads to finish (makes sure the acquire timeout expires
521 # before releasing the lock)
528 self.assertEqual(self.done.get_nowait(), "timeout")
530 self.assertRaises(Queue.Empty, self.done.get_nowait)
533 def testSharedSkipExclusiveAcquires(self):
534 # Tests whether shared acquires jump in front of exclusive acquires in the
537 def _Acquire(shared, name, notify_ev, wait_ev):
539 notify_fn = notify_ev.set
546 if not self.sl.acquire(shared=shared, test_notify=notify_fn):
552 # Get exclusive lock while we fill the queue
560 # Add acquires using threading.Event for synchronization. They'll be
561 # acquired exactly in the order defined in this list.
562 acquires = (shrcnt1 * [(1, "shared 1")] +
563 3 * [(0, "exclusive 1")] +
564 shrcnt2 * [(1, "shared 2")] +
565 shrcnt3 * [(1, "shared 3")] +
566 shrcnt4 * [(1, "shared 4")] +
567 3 * [(0, "exclusive 2")])
572 for args in acquires:
573 ev_cur = threading.Event()
574 self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
577 # Wait for last acquire to start
580 # Expect 6 pending exclusive acquires and 1 for all shared acquires
582 self.assertEqual(self.sl._count_pending(), 7)
584 # Release exclusive lock and wait
590 for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
591 # Shared locks aren't guaranteed to be notified in order, but they'll be
593 tmp = self.done.get_nowait()
594 if tmp == "shared 1":
596 elif tmp == "shared 2":
598 elif tmp == "shared 3":
600 elif tmp == "shared 4":
602 self.assertEqual(shrcnt1, 0)
603 self.assertEqual(shrcnt2, 0)
604 self.assertEqual(shrcnt3, 0)
605 self.assertEqual(shrcnt3, 0)
608 self.assertEqual(self.done.get_nowait(), "exclusive 1")
611 self.assertEqual(self.done.get_nowait(), "exclusive 2")
613 self.assertRaises(Queue.Empty, self.done.get_nowait)
615 def testIllegalDowngrade(self):
617 self.assertRaises(AssertionError, self.sl.downgrade)
619 # Acquire in shared mode, downgrade should be no-op
620 self.assertTrue(self.sl.acquire(shared=1))
621 self.assertTrue(self.sl.is_owned(shared=1))
622 self.assertTrue(self.sl.downgrade())
623 self.assertTrue(self.sl.is_owned(shared=1))
626 def testDowngrade(self):
627 self.assertTrue(self.sl.acquire())
628 self.assertTrue(self.sl.is_owned(shared=0))
629 self.assertTrue(self.sl.downgrade())
630 self.assertTrue(self.sl.is_owned(shared=1))
634 def testDowngradeJumpsAheadOfExclusive(self):
635 def _KeepExclusive(ev_got, ev_downgrade, ev_release):
636 self.assertTrue(self.sl.acquire())
637 self.assertTrue(self.sl.is_owned(shared=0))
640 self.assertTrue(self.sl.is_owned(shared=0))
641 self.assertTrue(self.sl.downgrade())
642 self.assertTrue(self.sl.is_owned(shared=1))
644 self.assertTrue(self.sl.is_owned(shared=1))
647 def _KeepExclusive2(ev_started, ev_release):
648 self.assertTrue(self.sl.acquire(test_notify=ev_started.set))
649 self.assertTrue(self.sl.is_owned(shared=0))
651 self.assertTrue(self.sl.is_owned(shared=0))
654 def _KeepShared(ev_started, ev_got, ev_release):
655 self.assertTrue(self.sl.acquire(shared=1, test_notify=ev_started.set))
656 self.assertTrue(self.sl.is_owned(shared=1))
659 self.assertTrue(self.sl.is_owned(shared=1))
662 # Acquire lock in exclusive mode
663 ev_got_excl1 = threading.Event()
664 ev_downgrade_excl1 = threading.Event()
665 ev_release_excl1 = threading.Event()
666 th_excl1 = self._addThread(target=_KeepExclusive,
667 args=(ev_got_excl1, ev_downgrade_excl1,
671 # Start a second exclusive acquire
672 ev_started_excl2 = threading.Event()
673 ev_release_excl2 = threading.Event()
674 th_excl2 = self._addThread(target=_KeepExclusive2,
675 args=(ev_started_excl2, ev_release_excl2))
676 ev_started_excl2.wait()
678 # Start shared acquires, will jump ahead of second exclusive acquire when
679 # first exclusive acquire downgrades
680 ev_shared = [(threading.Event(), threading.Event()) for _ in range(5)]
681 ev_release_shared = threading.Event()
683 th_shared = [self._addThread(target=_KeepShared,
684 args=(ev_started, ev_got, ev_release_shared))
685 for (ev_started, ev_got) in ev_shared]
687 # Wait for all shared acquires to start
688 for (ev, _) in ev_shared:
691 # Check lock information
692 self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
693 [(self.sl.name, "exclusive", [th_excl1.getName()], None)])
694 [(_, _, _, pending), ] = self.sl.GetLockInfo(set([query.LQ_PENDING]))
695 self.assertEqual([(pendmode, sorted(waiting))
696 for (pendmode, waiting) in pending],
697 [("exclusive", [th_excl2.getName()]),
698 ("shared", sorted(th.getName() for th in th_shared))])
700 # Shared acquires won't start until the exclusive lock is downgraded
701 ev_downgrade_excl1.set()
703 # Wait for all shared acquires to be successful
704 for (_, ev) in ev_shared:
707 # Check lock information again
708 self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE,
710 [(self.sl.name, "shared", None,
711 [("exclusive", [th_excl2.getName()])])])
712 [(_, _, owner, _), ] = self.sl.GetLockInfo(set([query.LQ_OWNER]))
713 self.assertEqual(set(owner), set([th_excl1.getName()] +
714 [th.getName() for th in th_shared]))
716 ev_release_excl1.set()
717 ev_release_excl2.set()
718 ev_release_shared.set()
722 self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER,
724 [(self.sl.name, None, None, [])])
727 def testMixedAcquireTimeout(self):
728 sync = threading.Event()
730 def _AcquireShared(ev):
731 if not self.sl.acquire(shared=1, timeout=None):
734 self.done.put("shared")
739 # Wait for notification from main thread
747 ev = threading.Event()
748 self._addThread(target=_AcquireShared, args=(ev, ))
751 # Wait for all acquires to finish
755 self.assertEqual(self.sl._count_pending(), 0)
757 # Try to get exclusive lock
758 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
760 # Acquire exclusive without timeout
761 exclsync = threading.Event()
762 exclev = threading.Event()
764 def _AcquireExclusive():
765 if not self.sl.acquire(shared=0):
768 self.done.put("exclusive")
773 # Wait for notification from main thread
778 self._addThread(target=_AcquireExclusive)
780 # Try to get exclusive lock
781 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
783 # Make all shared holders release their locks
786 # Wait for exclusive acquire to succeed
789 self.assertEqual(self.sl._count_pending(), 0)
791 # Try to get exclusive lock
792 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
794 def _AcquireSharedSimple():
795 if self.sl.acquire(shared=1, timeout=None):
796 self.done.put("shared2")
800 self._addThread(target=_AcquireSharedSimple)
802 # Tell exclusive lock to release
805 # Wait for everything to finish
808 self.assertEqual(self.sl._count_pending(), 0)
812 self.assertEqual(self.done.get_nowait(), "shared")
814 self.assertEqual(self.done.get_nowait(), "exclusive")
817 self.assertEqual(self.done.get_nowait(), "shared2")
819 self.assertRaises(Queue.Empty, self.done.get_nowait)
821 def testPriority(self):
822 # Acquire in exclusive mode
823 self.assert_(self.sl.acquire(shared=0))
826 def _Acquire(prev, next, shared, priority, result):
828 self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
830 self.done.put(result)
834 counter = itertools.count(0)
835 priorities = range(-20, 30)
836 first = threading.Event()
842 # [(shared/exclusive, set(acquire names), set(pending threads)),
843 # (shared/exclusive, ...),
849 # References shared acquire per priority in L{perprio}. Data structure:
851 # priority: (shared=1, set(acquire names), set(pending threads)),
855 for seed in [4979, 9523, 14902, 32440]:
856 # Use a deterministic random generator
857 rnd = random.Random(seed)
858 for priority in [rnd.choice(priorities) for _ in range(30)]:
863 acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
865 ev = threading.Event()
866 thread = self._addThread(target=_Acquire,
867 args=(prev, ev, shared, priority, acqname))
870 # Record expected aqcuire, see above for structure
871 data = (shared, set([acqname]), set([thread]))
872 priolist = perprio.setdefault(priority, [])
874 priosh = prioshared.get(priority, None)
876 # Shared acquires are merged
877 for i, j in zip(priosh[1:], data[1:]):
879 assert data[0] == priosh[0]
881 prioshared[priority] = data
882 priolist.append(data)
884 priolist.append(data)
886 # Start all acquires and wait for them
890 # Check lock information
891 self.assertEqual(self.sl.GetLockInfo(set()),
892 [(self.sl.name, None, None, None)])
893 self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
894 [(self.sl.name, "exclusive",
895 [threading.currentThread().getName()], None)])
897 self._VerifyPrioPending(self.sl.GetLockInfo(set([query.LQ_PENDING])),
900 # Let threads acquire the lock
903 # Wait for everything to finish
906 self.assert_(self.sl._check_empty())
908 # Check acquires by priority
909 for acquires in [perprio[i] for i in sorted(perprio.keys())]:
910 for (_, names, _) in acquires:
911 # For shared acquires, the set will contain 1..n entries. For exclusive
914 names.remove(self.done.get_nowait())
915 self.assertFalse(compat.any(names for (_, names, _) in acquires))
917 self.assertRaises(Queue.Empty, self.done.get_nowait)
919 def _VerifyPrioPending(self, ((name, mode, owner, pending), ), perprio):
920 self.assertEqual(name, self.sl.name)
921 self.assert_(mode is None)
922 self.assert_(owner is None)
924 self.assertEqual([(pendmode, sorted(waiting))
925 for (pendmode, waiting) in pending],
926 [(["exclusive", "shared"][int(bool(shared))],
927 sorted(t.getName() for t in threads))
928 for acquires in [perprio[i]
929 for i in sorted(perprio.keys())]
930 for (shared, _, threads) in acquires])
933 class TestSharedLockInCondition(_ThreadedTestCase):
934 """SharedLock as a condition lock tests"""
937 _ThreadedTestCase.setUp(self)
938 self.sl = locking.SharedLock("TestSharedLockInCondition")
941 def setCondition(self):
942 self.cond = threading.Condition(self.sl)
944 def testKeepMode(self):
945 self.cond.acquire(shared=1)
946 self.assert_(self.sl.is_owned(shared=1))
948 self.assert_(self.sl.is_owned(shared=1))
950 self.cond.acquire(shared=0)
951 self.assert_(self.sl.is_owned(shared=0))
953 self.assert_(self.sl.is_owned(shared=0))
957 class TestSharedLockInPipeCondition(TestSharedLockInCondition):
958 """SharedLock as a pipe condition lock tests"""
960 def setCondition(self):
961 self.cond = locking.PipeCondition(self.sl)
964 class TestSSynchronizedDecorator(_ThreadedTestCase):
965 """Shared Lock Synchronized decorator test"""
968 _ThreadedTestCase.setUp(self)
970 @locking.ssynchronized(_decoratorlock)
971 def _doItExclusive(self):
972 self.assert_(_decoratorlock.is_owned())
975 @locking.ssynchronized(_decoratorlock, shared=1)
976 def _doItSharer(self):
977 self.assert_(_decoratorlock.is_owned(shared=1))
980 def testDecoratedFunctions(self):
981 self._doItExclusive()
982 self.assertFalse(_decoratorlock.is_owned())
984 self.assertFalse(_decoratorlock.is_owned())
986 def testSharersCanCoexist(self):
987 _decoratorlock.acquire(shared=1)
988 threading.Thread(target=self._doItSharer).start()
989 self.assert_(self.done.get(True, 1))
990 _decoratorlock.release()
993 def testExclusiveBlocksExclusive(self):
994 _decoratorlock.acquire()
995 self._addThread(target=self._doItExclusive)
996 # give it a bit of time to check that it's not actually doing anything
997 self.assertRaises(Queue.Empty, self.done.get_nowait)
998 _decoratorlock.release()
1000 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
1003 def testExclusiveBlocksSharer(self):
1004 _decoratorlock.acquire()
1005 self._addThread(target=self._doItSharer)
1006 self.assertRaises(Queue.Empty, self.done.get_nowait)
1007 _decoratorlock.release()
1009 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
1012 def testSharerBlocksExclusive(self):
1013 _decoratorlock.acquire(shared=1)
1014 self._addThread(target=self._doItExclusive)
1015 self.assertRaises(Queue.Empty, self.done.get_nowait)
1016 _decoratorlock.release()
1018 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
1021 class TestLockSet(_ThreadedTestCase):
1025 _ThreadedTestCase.setUp(self)
1029 """Helper to (re)initialize the lock set"""
1030 self.resources = ['one', 'two', 'three']
1031 self.ls = locking.LockSet(self.resources, "TestLockSet")
1033 def testResources(self):
1034 self.assertEquals(self.ls._names(), set(self.resources))
1035 newls = locking.LockSet([], "TestLockSet.testResources")
1036 self.assertEquals(newls._names(), set())
1038 def testCheckOwnedUnknown(self):
1039 self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one"))
1040 for shared in [-1, 0, 1, 6378, 24255]:
1041 self.assertFalse(self.ls.check_owned("certainly-not-owning-this-one",
1044 def testCheckOwnedUnknownWhileHolding(self):
1045 self.assertFalse(self.ls.check_owned([]))
1046 self.ls.acquire("one", shared=1)
1047 self.assertRaises(errors.LockError, self.ls.check_owned, "nonexist")
1048 self.assertTrue(self.ls.check_owned("one", shared=1))
1049 self.assertFalse(self.ls.check_owned("one", shared=0))
1050 self.assertFalse(self.ls.check_owned(["one", "two"]))
1051 self.assertRaises(errors.LockError, self.ls.check_owned,
1052 ["one", "nonexist"])
1053 self.assertRaises(errors.LockError, self.ls.check_owned, "")
1055 self.assertFalse(self.ls.check_owned([]))
1056 self.assertFalse(self.ls.check_owned("one"))
1058 def testAcquireRelease(self):
1059 self.assertFalse(self.ls.check_owned(self.ls._names()))
1060 self.assert_(self.ls.acquire('one'))
1061 self.assertEquals(self.ls.list_owned(), set(['one']))
1062 self.assertTrue(self.ls.check_owned("one"))
1063 self.assertTrue(self.ls.check_owned("one", shared=0))
1064 self.assertFalse(self.ls.check_owned("one", shared=1))
1066 self.assertEquals(self.ls.list_owned(), set())
1067 self.assertFalse(self.ls.check_owned(self.ls._names()))
1068 self.assertEquals(self.ls.acquire(['one']), set(['one']))
1069 self.assertEquals(self.ls.list_owned(), set(['one']))
1071 self.assertEquals(self.ls.list_owned(), set())
1072 self.ls.acquire(['one', 'two', 'three'])
1073 self.assertEquals(self.ls.list_owned(), set(['one', 'two', 'three']))
1074 self.assertTrue(self.ls.check_owned(self.ls._names()))
1075 self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0))
1076 self.assertFalse(self.ls.check_owned(self.ls._names(), shared=1))
1077 self.ls.release('one')
1078 self.assertFalse(self.ls.check_owned(["one"]))
1079 self.assertTrue(self.ls.check_owned(["two", "three"]))
1080 self.assertTrue(self.ls.check_owned(["two", "three"], shared=0))
1081 self.assertFalse(self.ls.check_owned(["two", "three"], shared=1))
1082 self.assertEquals(self.ls.list_owned(), set(['two', 'three']))
1083 self.ls.release(['three'])
1084 self.assertEquals(self.ls.list_owned(), set(['two']))
1086 self.assertEquals(self.ls.list_owned(), set())
1087 self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
1088 self.assertEquals(self.ls.list_owned(), set(['one', 'three']))
1090 self.assertEquals(self.ls.list_owned(), set())
1091 for name in self.ls._names():
1092 self.assertFalse(self.ls.check_owned(name))
1094 def testNoDoubleAcquire(self):
1095 self.ls.acquire('one')
1096 self.assertRaises(AssertionError, self.ls.acquire, 'one')
1097 self.assertRaises(AssertionError, self.ls.acquire, ['two'])
1098 self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
1100 self.ls.acquire(['one', 'three'])
1101 self.ls.release('one')
1102 self.assertRaises(AssertionError, self.ls.acquire, ['two'])
1103 self.ls.release('three')
1105 def testNoWrongRelease(self):
1106 self.assertRaises(AssertionError, self.ls.release)
1107 self.ls.acquire('one')
1108 self.assertRaises(AssertionError, self.ls.release, 'two')
1110 def testAddRemove(self):
1112 self.assertEquals(self.ls.list_owned(), set())
1113 self.assert_('four' in self.ls._names())
1114 self.ls.add(['five', 'six', 'seven'], acquired=1)
1115 self.assert_('five' in self.ls._names())
1116 self.assert_('six' in self.ls._names())
1117 self.assert_('seven' in self.ls._names())
1118 self.assertEquals(self.ls.list_owned(), set(['five', 'six', 'seven']))
1119 self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
1120 self.assert_('five' not in self.ls._names())
1121 self.assert_('six' not in self.ls._names())
1122 self.assertEquals(self.ls.list_owned(), set(['seven']))
1123 self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
1124 self.ls.remove('seven')
1125 self.assert_('seven' not in self.ls._names())
1126 self.assertEquals(self.ls.list_owned(), set([]))
1127 self.ls.acquire(None, shared=1)
1128 self.assertRaises(AssertionError, self.ls.add, 'eight')
1130 self.ls.acquire(None)
1131 self.ls.add('eight', acquired=1)
1132 self.assert_('eight' in self.ls._names())
1133 self.assert_('eight' in self.ls.list_owned())
1135 self.assert_('nine' in self.ls._names())
1136 self.assert_('nine' not in self.ls.list_owned())
1138 self.ls.remove(['two'])
1139 self.assert_('two' not in self.ls._names())
1140 self.ls.acquire('three')
1141 self.assertEquals(self.ls.remove(['three']), ['three'])
1142 self.assert_('three' not in self.ls._names())
1143 self.assertEquals(self.ls.remove('three'), [])
1144 self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
1145 self.assert_('one' not in self.ls._names())
1147 def testRemoveNonBlocking(self):
1148 self.ls.acquire('one')
1149 self.assertEquals(self.ls.remove('one'), ['one'])
1150 self.ls.acquire(['two', 'three'])
1151 self.assertEquals(self.ls.remove(['two', 'three']),
1154 def testNoDoubleAdd(self):
1155 self.assertRaises(errors.LockError, self.ls.add, 'two')
1157 self.assertRaises(errors.LockError, self.ls.add, 'four')
1159 def testNoWrongRemoves(self):
1160 self.ls.acquire(['one', 'three'], shared=1)
1161 # Cannot remove 'two' while holding something which is not a superset
1162 self.assertRaises(AssertionError, self.ls.remove, 'two')
1163 # Cannot remove 'three' as we are sharing it
1164 self.assertRaises(AssertionError, self.ls.remove, 'three')
1166 def testAcquireSetLock(self):
1167 # acquire the set-lock exclusively
1168 self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
1169 self.assertEquals(self.ls.list_owned(), set(['one', 'two', 'three']))
1170 self.assertEquals(self.ls.is_owned(), True)
1171 self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
1172 # I can still add/remove elements...
1173 self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
1174 self.assert_(self.ls.add('six'))
1176 # share the set-lock
1177 self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
1178 # adding new elements is not possible
1179 self.assertRaises(AssertionError, self.ls.add, 'five')
1182 def testAcquireWithRepetitions(self):
1183 self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
1184 set(['two', 'two', 'three']))
1185 self.ls.release(['two', 'two'])
1186 self.assertEquals(self.ls.list_owned(), set(['three']))
1188 def testEmptyAcquire(self):
1189 # Acquire an empty list of locks...
1190 self.assertEquals(self.ls.acquire([]), set())
1191 self.assertEquals(self.ls.list_owned(), set())
1192 # New locks can still be addded
1193 self.assert_(self.ls.add('six'))
1194 # "re-acquiring" is not an issue, since we had really acquired nothing
1195 self.assertEquals(self.ls.acquire([], shared=1), set())
1196 self.assertEquals(self.ls.list_owned(), set())
1197 # We haven't really acquired anything, so we cannot release
1198 self.assertRaises(AssertionError, self.ls.release)
1200 def _doLockSet(self, names, shared):
1202 self.ls.acquire(names, shared=shared)
1203 self.done.put('DONE')
1205 except errors.LockError:
1206 self.done.put('ERR')
1208 def _doAddSet(self, names):
1210 self.ls.add(names, acquired=1)
1211 self.done.put('DONE')
1213 except errors.LockError:
1214 self.done.put('ERR')
1216 def _doRemoveSet(self, names):
1217 self.done.put(self.ls.remove(names))
1220 def testConcurrentSharedAcquire(self):
1221 self.ls.acquire(['one', 'two'], shared=1)
1222 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1224 self.assertEqual(self.done.get_nowait(), 'DONE')
1225 self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
1227 self.assertEqual(self.done.get_nowait(), 'DONE')
1228 self._addThread(target=self._doLockSet, args=('three', 1))
1230 self.assertEqual(self.done.get_nowait(), 'DONE')
1231 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1232 self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1233 self.assertRaises(Queue.Empty, self.done.get_nowait)
1236 self.assertEqual(self.done.get_nowait(), 'DONE')
1237 self.assertEqual(self.done.get_nowait(), 'DONE')
1240 def testConcurrentExclusiveAcquire(self):
1241 self.ls.acquire(['one', 'two'])
1242 self._addThread(target=self._doLockSet, args=('three', 1))
1244 self.assertEqual(self.done.get_nowait(), 'DONE')
1245 self._addThread(target=self._doLockSet, args=('three', 0))
1247 self.assertEqual(self.done.get_nowait(), 'DONE')
1248 self.assertRaises(Queue.Empty, self.done.get_nowait)
1249 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1250 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1251 self._addThread(target=self._doLockSet, args=('one', 0))
1252 self._addThread(target=self._doLockSet, args=('one', 1))
1253 self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1254 self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
1255 self.assertRaises(Queue.Empty, self.done.get_nowait)
1259 self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1262 def testSimpleAcquireTimeoutExpiring(self):
1263 names = sorted(self.ls._names())
1264 self.assert_(len(names) >= 3)
1266 # Get name of first lock
1269 # Get name of last lock
1273 # Block first and try to lock it again
1276 # Block last and try to lock all locks
1279 # Block last and try to lock it again
1283 for (wanted, block) in checks:
1284 # Lock in exclusive mode
1285 self.assert_(self.ls.acquire(block, shared=0))
1288 # Try to get the same lock again with a timeout (should never succeed)
1289 acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1291 self.done.put("acquired")
1294 self.assert_(acquired is None)
1295 self.assertFalse(self.ls.list_owned())
1296 self.assertFalse(self.ls.is_owned())
1297 self.done.put("not acquired")
1299 self._addThread(target=_AcquireOne)
1301 # Wait for timeout in thread to expire
1304 # Release exclusive lock again
1307 self.assertEqual(self.done.get_nowait(), "not acquired")
1308 self.assertRaises(Queue.Empty, self.done.get_nowait)
1311 def testDelayedAndExpiringLockAcquire(self):
1313 self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1315 for expire in (False, True):
1316 names = sorted(self.ls._names())
1317 self.assertEqual(len(names), 8)
1319 lock_ev = dict([(i, threading.Event()) for i in names])
1321 # Lock all in exclusive mode
1322 self.assert_(self.ls.acquire(names, shared=0))
1325 # We'll wait at least 300ms per lock
1326 lockwait = len(names) * [0.3]
1328 # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1329 # this gives us up to 2.4s to fail.
1330 lockall_timeout = 0.4
1332 # This should finish rather quickly
1334 lockall_timeout = len(names) * 5.0
1337 def acquire_notification(name):
1339 self.done.put("getting %s" % name)
1344 if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1345 test_notify=acquire_notification):
1346 self.done.put("got all")
1349 self.done.put("timeout on all")
1352 for ev in lock_ev.values():
1355 t = self._addThread(target=_LockAll)
1357 for idx, name in enumerate(names):
1358 # Wait for actual acquire on this lock to start
1359 lock_ev[name].wait(10.0)
1361 if expire and t.isAlive():
1362 # Wait some time after getting the notification to make sure the lock
1363 # acquire will expire
1364 SafeSleep(lockwait[idx])
1366 self.ls.release(names=name)
1368 self.assertFalse(self.ls.list_owned())
1373 # Not checking which locks were actually acquired. Doing so would be
1374 # too timing-dependant.
1375 self.assertEqual(self.done.get_nowait(), "timeout on all")
1378 self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1379 self.assertEqual(self.done.get_nowait(), "got all")
1380 self.assertRaises(Queue.Empty, self.done.get_nowait)
1383 def testConcurrentRemove(self):
1385 self.ls.acquire(['one', 'two', 'four'])
1386 self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1387 self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1388 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1389 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1390 self.assertRaises(Queue.Empty, self.done.get_nowait)
1391 self.ls.remove('one')
1395 self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1396 self.ls.add(['five', 'six'], acquired=1)
1397 self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1398 self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1399 self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1400 self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1401 self.ls.remove('five')
1405 self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1406 self.ls.acquire(['three', 'four'])
1407 self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1408 self.assertRaises(Queue.Empty, self.done.get_nowait)
1409 self.ls.remove('four')
1411 self.assertEqual(self.done.get_nowait(), ['six'])
1412 self._addThread(target=self._doRemoveSet, args=(['two']))
1414 self.assertEqual(self.done.get_nowait(), ['two'])
1420 def testConcurrentSharedSetLock(self):
1421 # share the set-lock...
1422 self.ls.acquire(None, shared=1)
1423 # ...another thread can share it too
1424 self._addThread(target=self._doLockSet, args=(None, 1))
1426 self.assertEqual(self.done.get_nowait(), 'DONE')
1427 # ...or just share some elements
1428 self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1430 self.assertEqual(self.done.get_nowait(), 'DONE')
1431 # ...but not add new ones or remove any
1432 t = self._addThread(target=self._doAddSet, args=(['nine']))
1433 self._addThread(target=self._doRemoveSet, args=(['two'], ))
1434 self.assertRaises(Queue.Empty, self.done.get_nowait)
1435 # this just releases the set-lock
1438 self.assertEqual(self.done.get_nowait(), 'DONE')
1439 # release the lock on the actual elements so remove() can proceed too
1442 self.failUnlessEqual(self.done.get_nowait(), ['two'])
1447 def testConcurrentExclusiveSetLock(self):
1448 # acquire the set-lock...
1449 self.ls.acquire(None, shared=0)
1450 # ...no one can do anything else
1451 self._addThread(target=self._doLockSet, args=(None, 1))
1452 self._addThread(target=self._doLockSet, args=(None, 0))
1453 self._addThread(target=self._doLockSet, args=(['three'], 0))
1454 self._addThread(target=self._doLockSet, args=(['two'], 1))
1455 self._addThread(target=self._doAddSet, args=(['nine']))
1456 self.assertRaises(Queue.Empty, self.done.get_nowait)
1460 self.assertEqual(self.done.get(True, 1), 'DONE')
1465 def testConcurrentSetLockAdd(self):
1466 self.ls.acquire('one')
1467 # Another thread wants the whole SetLock
1468 self._addThread(target=self._doLockSet, args=(None, 0))
1469 self._addThread(target=self._doLockSet, args=(None, 1))
1470 self.assertRaises(Queue.Empty, self.done.get_nowait)
1471 self.assertRaises(AssertionError, self.ls.add, 'four')
1474 self.assertEqual(self.done.get_nowait(), 'DONE')
1475 self.assertEqual(self.done.get_nowait(), 'DONE')
1476 self.ls.acquire(None)
1477 self._addThread(target=self._doLockSet, args=(None, 0))
1478 self._addThread(target=self._doLockSet, args=(None, 1))
1479 self.assertRaises(Queue.Empty, self.done.get_nowait)
1481 self.ls.add('five', acquired=1)
1482 self.ls.add('six', acquired=1, shared=1)
1483 self.assertEquals(self.ls.list_owned(),
1484 set(['one', 'two', 'three', 'five', 'six']))
1485 self.assertEquals(self.ls.is_owned(), True)
1486 self.assertEquals(self.ls._names(),
1487 set(['one', 'two', 'three', 'four', 'five', 'six']))
1490 self.assertEqual(self.done.get_nowait(), 'DONE')
1491 self.assertEqual(self.done.get_nowait(), 'DONE')
1495 def testEmptyLockSet(self):
1497 self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1499 self.ls.remove(['one', 'two', 'three'])
1500 # and adds/locks by another thread still wait
1501 self._addThread(target=self._doAddSet, args=(['nine']))
1502 self._addThread(target=self._doLockSet, args=(None, 1))
1503 self._addThread(target=self._doLockSet, args=(None, 0))
1504 self.assertRaises(Queue.Empty, self.done.get_nowait)
1508 self.assertEqual(self.done.get_nowait(), 'DONE')
1510 self.assertEqual(self.ls.remove(['nine']), ['nine'])
1512 self.assertEqual(self.ls.acquire(None, shared=1), set())
1513 # other sharers can go, adds still wait
1514 self._addThread(target=self._doLockSet, args=(None, 1))
1516 self.assertEqual(self.done.get_nowait(), 'DONE')
1517 self._addThread(target=self._doAddSet, args=(['nine']))
1518 self.assertRaises(Queue.Empty, self.done.get_nowait)
1521 self.assertEqual(self.done.get_nowait(), 'DONE')
1524 def testAcquireWithNamesDowngrade(self):
1525 self.assertEquals(self.ls.acquire("two", shared=0), set(["two"]))
1526 self.assertTrue(self.ls.is_owned())
1527 self.assertFalse(self.ls._get_lock().is_owned())
1529 self.assertFalse(self.ls.is_owned())
1530 self.assertFalse(self.ls._get_lock().is_owned())
1531 # Can't downgrade after releasing
1532 self.assertRaises(AssertionError, self.ls.downgrade, "two")
1534 def testDowngrade(self):
1535 # Not owning anything, must raise an exception
1536 self.assertFalse(self.ls.is_owned())
1537 self.assertRaises(AssertionError, self.ls.downgrade)
1539 self.assertFalse(compat.any(i.is_owned()
1540 for i in self.ls._get_lockdict().values()))
1541 self.assertFalse(self.ls.check_owned(self.ls._names()))
1542 for name in self.ls._names():
1543 self.assertFalse(self.ls.check_owned(name))
1545 self.assertEquals(self.ls.acquire(None, shared=0),
1546 set(["one", "two", "three"]))
1547 self.assertRaises(AssertionError, self.ls.downgrade, "unknown lock")
1549 self.assertTrue(self.ls.check_owned(self.ls._names(), shared=0))
1550 for name in self.ls._names():
1551 self.assertTrue(self.ls.check_owned(name))
1552 self.assertTrue(self.ls.check_owned(name, shared=0))
1553 self.assertFalse(self.ls.check_owned(name, shared=1))
1555 self.assertTrue(self.ls._get_lock().is_owned(shared=0))
1556 self.assertTrue(compat.all(i.is_owned(shared=0)
1557 for i in self.ls._get_lockdict().values()))
1559 # Start downgrading locks
1560 self.assertTrue(self.ls.downgrade(names=["one"]))
1561 self.assertTrue(self.ls._get_lock().is_owned(shared=0))
1562 self.assertTrue(compat.all(lock.is_owned(shared=[0, 1][int(name == "one")])
1564 self.ls._get_lockdict().items()))
1566 self.assertFalse(self.ls.check_owned("one", shared=0))
1567 self.assertTrue(self.ls.check_owned("one", shared=1))
1568 self.assertTrue(self.ls.check_owned("two", shared=0))
1569 self.assertTrue(self.ls.check_owned("three", shared=0))
1571 # Downgrade second lock
1572 self.assertTrue(self.ls.downgrade(names="two"))
1573 self.assertTrue(self.ls._get_lock().is_owned(shared=0))
1574 should_share = lambda name: [0, 1][int(name in ("one", "two"))]
1575 self.assertTrue(compat.all(lock.is_owned(shared=should_share(name))
1577 self.ls._get_lockdict().items()))
1579 self.assertFalse(self.ls.check_owned("one", shared=0))
1580 self.assertTrue(self.ls.check_owned("one", shared=1))
1581 self.assertFalse(self.ls.check_owned("two", shared=0))
1582 self.assertTrue(self.ls.check_owned("two", shared=1))
1583 self.assertTrue(self.ls.check_owned("three", shared=0))
1585 # Downgrading the last exclusive lock to shared must downgrade the
1586 # lockset-internal lock too
1587 self.assertTrue(self.ls.downgrade(names="three"))
1588 self.assertTrue(self.ls._get_lock().is_owned(shared=1))
1589 self.assertTrue(compat.all(i.is_owned(shared=1)
1590 for i in self.ls._get_lockdict().values()))
1592 # Verify owned locks
1593 for name in self.ls._names():
1594 self.assertTrue(self.ls.check_owned(name, shared=1))
1596 # Downgrading a shared lock must be a no-op
1597 self.assertTrue(self.ls.downgrade(names=["one", "three"]))
1598 self.assertTrue(self.ls._get_lock().is_owned(shared=1))
1599 self.assertTrue(compat.all(i.is_owned(shared=1)
1600 for i in self.ls._get_lockdict().values()))
1604 def testPriority(self):
1605 def _Acquire(prev, next, name, priority, success_fn):
1607 self.assert_(self.ls.acquire(name, shared=0,
1609 test_notify=lambda _: next.set()))
1615 # Get all in exclusive mode
1616 self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
1618 done_two = Queue.Queue(0)
1620 first = threading.Event()
1623 acquires = [("one", prio, self.done) for prio in range(1, 33)]
1624 acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
1626 # Use a deterministic random generator
1627 random.Random(741).shuffle(acquires)
1629 for (name, prio, done) in acquires:
1630 ev = threading.Event()
1631 self._addThread(target=_Acquire,
1632 args=(prev, ev, name, prio,
1633 compat.partial(done.put, "Prio%s" % prio)))
1639 # Wait for last acquire to start
1642 # Let threads acquire locks
1645 # Wait for threads to finish
1648 for i in range(1, 33):
1649 self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
1650 self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
1652 self.assertRaises(Queue.Empty, self.done.get_nowait)
1653 self.assertRaises(Queue.Empty, done_two.get_nowait)
1656 class TestGanetiLockManager(_ThreadedTestCase):
1659 _ThreadedTestCase.setUp(self)
1660 self.nodes=['n1', 'n2']
1661 self.nodegroups=['g1', 'g2']
1662 self.instances=['i1', 'i2', 'i3']
1663 self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups,
1667 # Don't try this at home...
1668 locking.GanetiLockManager._instance = None
1670 def testLockingConstants(self):
1671 # The locking library internally cheats by assuming its constants have some
1672 # relationships with each other. Check those hold true.
1673 # This relationship is also used in the Processor to recursively acquire
1674 # the right locks. Again, please don't break it.
1675 for i in range(len(locking.LEVELS)):
1676 self.assertEqual(i, locking.LEVELS[i])
1678 def testDoubleGLFails(self):
1679 self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [])
1681 def testLockNames(self):
1682 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1683 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1684 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1685 set(self.nodegroups))
1686 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1687 set(self.instances))
1689 def testInitAndResources(self):
1690 locking.GanetiLockManager._instance = None
1691 self.GL = locking.GanetiLockManager([], [], [])
1692 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1693 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1694 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1695 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1697 locking.GanetiLockManager._instance = None
1698 self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [])
1699 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1700 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1701 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1702 set(self.nodegroups))
1703 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1705 locking.GanetiLockManager._instance = None
1706 self.GL = locking.GanetiLockManager([], [], self.instances)
1707 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1708 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1709 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1710 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1711 set(self.instances))
1713 def testAcquireRelease(self):
1714 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1715 self.assertEquals(self.GL.list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1716 self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1717 self.GL.acquire(locking.LEVEL_NODEGROUP, ['g2'])
1718 self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1719 self.assertTrue(self.GL.check_owned(locking.LEVEL_NODE, ["n1", "n2"],
1721 self.assertFalse(self.GL.check_owned(locking.LEVEL_INSTANCE, ["i1", "i3"]))
1722 self.GL.release(locking.LEVEL_NODE, ['n2'])
1723 self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set(['n1']))
1724 self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1725 self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1726 self.GL.release(locking.LEVEL_NODE)
1727 self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE), set())
1728 self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1729 self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1730 self.GL.release(locking.LEVEL_NODEGROUP)
1731 self.GL.release(locking.LEVEL_INSTANCE)
1732 self.assertRaises(errors.LockError, self.GL.acquire,
1733 locking.LEVEL_INSTANCE, ['i5'])
1734 self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1735 self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1737 def testAcquireWholeSets(self):
1738 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1739 self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1740 set(self.instances))
1741 self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE),
1742 set(self.instances))
1743 self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
1744 set(self.nodegroups))
1745 self.assertEquals(self.GL.list_owned(locking.LEVEL_NODEGROUP),
1746 set(self.nodegroups))
1747 self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1749 self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
1751 self.GL.release(locking.LEVEL_NODE)
1752 self.GL.release(locking.LEVEL_NODEGROUP)
1753 self.GL.release(locking.LEVEL_INSTANCE)
1754 self.GL.release(locking.LEVEL_CLUSTER)
1756 def testAcquireWholeAndPartial(self):
1757 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1758 self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1759 set(self.instances))
1760 self.assertEquals(self.GL.list_owned(locking.LEVEL_INSTANCE),
1761 set(self.instances))
1762 self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1764 self.assertEquals(self.GL.list_owned(locking.LEVEL_NODE),
1766 self.GL.release(locking.LEVEL_NODE)
1767 self.GL.release(locking.LEVEL_INSTANCE)
1768 self.GL.release(locking.LEVEL_CLUSTER)
1770 def testBGLDependency(self):
1771 self.assertRaises(AssertionError, self.GL.acquire,
1772 locking.LEVEL_NODE, ['n1', 'n2'])
1773 self.assertRaises(AssertionError, self.GL.acquire,
1774 locking.LEVEL_INSTANCE, ['i3'])
1775 self.assertRaises(AssertionError, self.GL.acquire,
1776 locking.LEVEL_NODEGROUP, ['g1'])
1777 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1778 self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1779 self.assertRaises(AssertionError, self.GL.release,
1780 locking.LEVEL_CLUSTER, ['BGL'])
1781 self.assertRaises(AssertionError, self.GL.release,
1782 locking.LEVEL_CLUSTER)
1783 self.GL.release(locking.LEVEL_NODE)
1784 self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1785 self.assertRaises(AssertionError, self.GL.release,
1786 locking.LEVEL_CLUSTER, ['BGL'])
1787 self.assertRaises(AssertionError, self.GL.release,
1788 locking.LEVEL_CLUSTER)
1789 self.GL.release(locking.LEVEL_INSTANCE)
1790 self.GL.acquire(locking.LEVEL_NODEGROUP, None)
1791 self.GL.release(locking.LEVEL_NODEGROUP, ['g1'])
1792 self.assertRaises(AssertionError, self.GL.release,
1793 locking.LEVEL_CLUSTER, ['BGL'])
1794 self.assertRaises(AssertionError, self.GL.release,
1795 locking.LEVEL_CLUSTER)
1796 self.GL.release(locking.LEVEL_NODEGROUP)
1797 self.GL.release(locking.LEVEL_CLUSTER)
1799 def testWrongOrder(self):
1800 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1801 self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1802 self.assertRaises(AssertionError, self.GL.acquire,
1803 locking.LEVEL_NODE, ['n1'])
1804 self.assertRaises(AssertionError, self.GL.acquire,
1805 locking.LEVEL_NODEGROUP, ['g1'])
1806 self.assertRaises(AssertionError, self.GL.acquire,
1807 locking.LEVEL_INSTANCE, ['i2'])
1809 def testModifiableLevels(self):
1810 self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER,
1812 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'])
1813 self.GL.add(locking.LEVEL_INSTANCE, ['i4'])
1814 self.GL.remove(locking.LEVEL_INSTANCE, ['i3'])
1815 self.GL.remove(locking.LEVEL_INSTANCE, ['i1'])
1816 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(['i2', 'i4']))
1817 self.GL.add(locking.LEVEL_NODE, ['n3'])
1818 self.GL.remove(locking.LEVEL_NODE, ['n1'])
1819 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(['n2', 'n3']))
1820 self.GL.add(locking.LEVEL_NODEGROUP, ['g3'])
1821 self.GL.remove(locking.LEVEL_NODEGROUP, ['g2'])
1822 self.GL.remove(locking.LEVEL_NODEGROUP, ['g1'])
1823 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(['g3']))
1824 self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER,
1827 # Helper function to run as a thread that shared the BGL and then acquires
1828 # some locks at another level.
1829 def _doLock(self, level, names, shared):
1831 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1832 self.GL.acquire(level, names, shared=shared)
1833 self.done.put('DONE')
1834 self.GL.release(level)
1835 self.GL.release(locking.LEVEL_CLUSTER)
1836 except errors.LockError:
1837 self.done.put('ERR')
1840 def testConcurrency(self):
1841 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1842 self._addThread(target=self._doLock,
1843 args=(locking.LEVEL_INSTANCE, 'i1', 1))
1845 self.assertEqual(self.done.get_nowait(), 'DONE')
1846 self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1847 self._addThread(target=self._doLock,
1848 args=(locking.LEVEL_INSTANCE, 'i1', 1))
1850 self.assertEqual(self.done.get_nowait(), 'DONE')
1851 self._addThread(target=self._doLock,
1852 args=(locking.LEVEL_INSTANCE, 'i3', 1))
1853 self.assertRaises(Queue.Empty, self.done.get_nowait)
1854 self.GL.release(locking.LEVEL_INSTANCE)
1856 self.assertEqual(self.done.get_nowait(), 'DONE')
1857 self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1858 self._addThread(target=self._doLock,
1859 args=(locking.LEVEL_INSTANCE, 'i2', 1))
1861 self.assertEqual(self.done.get_nowait(), 'DONE')
1862 self._addThread(target=self._doLock,
1863 args=(locking.LEVEL_INSTANCE, 'i2', 0))
1864 self.assertRaises(Queue.Empty, self.done.get_nowait)
1865 self.GL.release(locking.LEVEL_INSTANCE)
1867 self.assertEqual(self.done.get(True, 1), 'DONE')
1868 self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1871 class TestLockMonitor(_ThreadedTestCase):
1873 _ThreadedTestCase.setUp(self)
1874 self.lm = locking.LockMonitor()
1876 def testSingleThread(self):
1879 for i in range(100):
1880 name = "TestLock%s" % i
1881 locks.append(locking.SharedLock(name, monitor=self.lm))
1883 self.assertEqual(len(self.lm._locks), len(locks))
1884 result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"]))
1885 self.assertEqual(len(result.fields), 1)
1886 self.assertEqual(len(result.data), 100)
1891 # The garbage collector might needs some time
1894 raise utils.RetryAgain()
1896 utils.Retry(_CheckLocks, 0.1, 30.0)
1898 self.assertFalse(self.lm._locks)
1900 def testMultiThread(self):
1903 def _CreateLock(prev, next, name):
1905 locks.append(locking.SharedLock(name, monitor=self.lm))
1911 first = threading.Event()
1914 # Use a deterministic random generator
1915 for i in random.Random(4263).sample(range(100), 33):
1916 name = "MtTestLock%s" % i
1917 expnames.append(name)
1919 ev = threading.Event()
1920 self._addThread(target=_CreateLock, args=(prev, ev, name))
1927 # Check order in which locks were added
1928 self.assertEqual([i.name for i in locks], expnames)
1930 # Check query result
1931 result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1932 self.assert_(isinstance(result, dict))
1933 response = objects.QueryResponse.FromDict(result)
1934 self.assertEqual(response.data,
1935 [[(constants.RS_NORMAL, name),
1936 (constants.RS_NORMAL, None),
1937 (constants.RS_NORMAL, None),
1938 (constants.RS_NORMAL, [])]
1939 for name in utils.NiceSort(expnames)])
1940 self.assertEqual(len(response.fields), 4)
1941 self.assertEqual(["name", "mode", "owner", "pending"],
1942 [fdef.name for fdef in response.fields])
1944 # Test exclusive acquire
1945 for tlock in locks[::4]:
1946 tlock.acquire(shared=0)
1948 def _GetExpResult(name):
1949 if tlock.name == name:
1950 return [(constants.RS_NORMAL, name),
1951 (constants.RS_NORMAL, "exclusive"),
1952 (constants.RS_NORMAL,
1953 [threading.currentThread().getName()]),
1954 (constants.RS_NORMAL, [])]
1955 return [(constants.RS_NORMAL, name),
1956 (constants.RS_NORMAL, None),
1957 (constants.RS_NORMAL, None),
1958 (constants.RS_NORMAL, [])]
1960 result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1961 self.assertEqual(objects.QueryResponse.FromDict(result).data,
1962 [_GetExpResult(name)
1963 for name in utils.NiceSort(expnames)])
1967 # Test shared acquire
1968 def _Acquire(lock, shared, ev, notify):
1969 lock.acquire(shared=shared)
1976 for tlock1 in locks[::11]:
1977 for tlock2 in locks[::-15]:
1978 if tlock2 == tlock1:
1982 for tlock3 in locks[::10]:
1983 if tlock3 in (tlock2, tlock1):
1987 releaseev = threading.Event()
1993 ev = threading.Event()
1994 tthreads1.append(self._addThread(target=_Acquire,
1995 args=(tlock1, 1, releaseev, ev)))
1996 acquireev.append(ev)
1998 ev = threading.Event()
1999 tthread2 = self._addThread(target=_Acquire,
2000 args=(tlock2, 1, releaseev, ev))
2001 acquireev.append(ev)
2003 ev = threading.Event()
2004 tthread3 = self._addThread(target=_Acquire,
2005 args=(tlock3, 0, releaseev, ev))
2006 acquireev.append(ev)
2008 # Wait for all locks to be acquired
2012 # Check query result
2013 result = self.lm.QueryLocks(["name", "mode", "owner"])
2014 response = objects.QueryResponse.FromDict(result)
2015 for (name, mode, owner) in response.data:
2016 (name_status, name_value) = name
2017 (owner_status, owner_value) = owner
2019 self.assertEqual(name_status, constants.RS_NORMAL)
2020 self.assertEqual(owner_status, constants.RS_NORMAL)
2022 if name_value == tlock1.name:
2023 self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
2024 self.assertEqual(set(owner_value),
2025 set(i.getName() for i in tthreads1))
2028 if name_value == tlock2.name:
2029 self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
2030 self.assertEqual(owner_value, [tthread2.getName()])
2033 if name_value == tlock3.name:
2034 self.assertEqual(mode, (constants.RS_NORMAL, "exclusive"))
2035 self.assertEqual(owner_value, [tthread3.getName()])
2038 self.assert_(name_value in expnames)
2039 self.assertEqual(mode, (constants.RS_NORMAL, None))
2040 self.assert_(owner_value is None)
2042 # Release locks again
2047 result = self.lm.QueryLocks(["name", "mode", "owner"])
2048 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2049 [[(constants.RS_NORMAL, name),
2050 (constants.RS_NORMAL, None),
2051 (constants.RS_NORMAL, None)]
2052 for name in utils.NiceSort(expnames)])
2054 def testDelete(self):
2055 lock = locking.SharedLock("TestLock", monitor=self.lm)
2057 self.assertEqual(len(self.lm._locks), 1)
2058 result = self.lm.QueryLocks(["name", "mode", "owner"])
2059 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2060 [[(constants.RS_NORMAL, lock.name),
2061 (constants.RS_NORMAL, None),
2062 (constants.RS_NORMAL, None)]])
2066 result = self.lm.QueryLocks(["name", "mode", "owner"])
2067 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2068 [[(constants.RS_NORMAL, lock.name),
2069 (constants.RS_NORMAL, "deleted"),
2070 (constants.RS_NORMAL, None)]])
2071 self.assertEqual(len(self.lm._locks), 1)
2073 def testPending(self):
2074 def _Acquire(lock, shared, prev, next):
2077 lock.acquire(shared=shared, test_notify=next.set)
2083 lock = locking.SharedLock("ExcLock", monitor=self.lm)
2085 for shared in [0, 1]:
2088 self.assertEqual(len(self.lm._locks), 1)
2089 result = self.lm.QueryLocks(["name", "mode", "owner"])
2090 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2091 [[(constants.RS_NORMAL, lock.name),
2092 (constants.RS_NORMAL, "exclusive"),
2093 (constants.RS_NORMAL,
2094 [threading.currentThread().getName()])]])
2098 first = threading.Event()
2102 ev = threading.Event()
2103 threads.append(self._addThread(target=_Acquire,
2104 args=(lock, shared, prev, ev)))
2110 # Wait for last acquire to start waiting
2113 # NOTE: This works only because QueryLocks will acquire the
2114 # lock-internal lock again and won't be able to get the information
2115 # until it has the lock. By then the acquire should be registered in
2116 # SharedLock.__pending (otherwise it's a bug).
2118 # All acquires are waiting now
2120 pending = [("shared", utils.NiceSort(t.getName() for t in threads))]
2122 pending = [("exclusive", [t.getName()]) for t in threads]
2124 result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2125 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2126 [[(constants.RS_NORMAL, lock.name),
2127 (constants.RS_NORMAL, "exclusive"),
2128 (constants.RS_NORMAL,
2129 [threading.currentThread().getName()]),
2130 (constants.RS_NORMAL, pending)]])
2132 self.assertEqual(len(self.lm._locks), 1)
2138 # No pending acquires
2139 result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2140 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2141 [[(constants.RS_NORMAL, lock.name),
2142 (constants.RS_NORMAL, None),
2143 (constants.RS_NORMAL, None),
2144 (constants.RS_NORMAL, [])]])
2146 self.assertEqual(len(self.lm._locks), 1)
2148 def testDeleteAndRecreate(self):
2149 lname = "TestLock101923193"
2151 # Create some locks with the same name and keep all references
2152 locks = [locking.SharedLock(lname, monitor=self.lm)
2155 self.assertEqual(len(self.lm._locks), len(locks))
2157 result = self.lm.QueryLocks(["name", "mode", "owner"])
2158 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2159 [[(constants.RS_NORMAL, lname),
2160 (constants.RS_NORMAL, None),
2161 (constants.RS_NORMAL, None)]] * 5)
2165 # Check information order
2166 result = self.lm.QueryLocks(["name", "mode", "owner"])
2167 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2168 [[(constants.RS_NORMAL, lname),
2169 (constants.RS_NORMAL, None),
2170 (constants.RS_NORMAL, None)]] * 2 +
2171 [[(constants.RS_NORMAL, lname),
2172 (constants.RS_NORMAL, "deleted"),
2173 (constants.RS_NORMAL, None)]] +
2174 [[(constants.RS_NORMAL, lname),
2175 (constants.RS_NORMAL, None),
2176 (constants.RS_NORMAL, None)]] * 2)
2178 locks[1].acquire(shared=0)
2181 [(constants.RS_NORMAL, lname),
2182 (constants.RS_NORMAL, None),
2183 (constants.RS_NORMAL, None)],
2184 [(constants.RS_NORMAL, lname),
2185 (constants.RS_NORMAL, "exclusive"),
2186 (constants.RS_NORMAL, [threading.currentThread().getName()])],
2187 [(constants.RS_NORMAL, lname),
2188 (constants.RS_NORMAL, "deleted"),
2189 (constants.RS_NORMAL, None)],
2190 [(constants.RS_NORMAL, lname),
2191 (constants.RS_NORMAL, None),
2192 (constants.RS_NORMAL, None)],
2193 [(constants.RS_NORMAL, lname),
2194 (constants.RS_NORMAL, None),
2195 (constants.RS_NORMAL, None)],
2198 # Check information order
2199 result = self.lm.QueryLocks(["name", "mode", "owner"])
2200 self.assertEqual(objects.QueryResponse.FromDict(result).data, last_status)
2202 self.assertEqual(len(set(self.lm._locks.values())), len(locks))
2203 self.assertEqual(len(self.lm._locks), len(locks))
2205 # Check lock deletion
2206 for idx in range(len(locks)):
2208 assert gc.isenabled()
2210 self.assertEqual(len(self.lm._locks), len(locks))
2211 result = self.lm.QueryLocks(["name", "mode", "owner"])
2212 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2213 last_status[idx + 1:])
2215 # All locks should have been deleted
2217 self.assertFalse(self.lm._locks)
2219 result = self.lm.QueryLocks(["name", "mode", "owner"])
2220 self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
2226 def AddResult(self, *args):
2227 self._info.append(args)
2229 def CountPending(self):
2230 return len(self._info)
2232 def GetLockInfo(self, requested):
2233 (exp_requested, result) = self._info.pop(0)
2235 if exp_requested != requested:
2236 raise Exception("Requested information (%s) does not match"
2237 " expectations (%s)" % (requested, exp_requested))
2241 def testMultipleResults(self):
2242 fl1 = self._FakeLock()
2243 fl2 = self._FakeLock()
2245 self.lm.RegisterLock(fl1)
2246 self.lm.RegisterLock(fl2)
2249 for i in [fl1, fl2]:
2250 i.AddResult(set([query.LQ_MODE, query.LQ_OWNER]), [])
2251 result = self.lm.QueryLocks(["name", "mode", "owner"])
2252 self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
2253 for i in [fl1, fl2]:
2254 self.assertEqual(i.CountPending(), 0)
2257 for fn in [lambda x: x, reversed, sorted]:
2258 fl1.AddResult(set(), list(fn([
2259 ("aaa", None, None, None),
2260 ("bbb", None, None, None),
2262 fl2.AddResult(set(), [])
2263 result = self.lm.QueryLocks(["name"])
2264 self.assertEqual(objects.QueryResponse.FromDict(result).data, [
2265 [(constants.RS_NORMAL, "aaa")],
2266 [(constants.RS_NORMAL, "bbb")],
2268 for i in [fl1, fl2]:
2269 self.assertEqual(i.CountPending(), 0)
2271 for fn2 in [lambda x: x, reversed, sorted]:
2272 fl1.AddResult(set([query.LQ_MODE]), list(fn([
2273 # Same name, but different information
2274 ("aaa", "mode0", None, None),
2275 ("aaa", "mode1", None, None),
2276 ("aaa", "mode2", None, None),
2277 ("aaa", "mode3", None, None),
2279 fl2.AddResult(set([query.LQ_MODE]), [
2280 ("zzz", "end", None, None),
2281 ("000", "start", None, None),
2283 ("aaa", "b200", None, None),
2284 ("aaa", "b300", None, None),
2286 result = self.lm.QueryLocks(["name", "mode"])
2287 self.assertEqual(objects.QueryResponse.FromDict(result).data, [
2288 [(constants.RS_NORMAL, "000"), (constants.RS_NORMAL, "start")],
2290 # Name is the same, so order must be equal to incoming order
2291 [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode0")],
2292 [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode1")],
2293 [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode2")],
2294 [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode3")],
2296 [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b200")],
2297 [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b300")],
2299 [(constants.RS_NORMAL, "zzz"), (constants.RS_NORMAL, "end")],
2301 for i in [fl1, fl2]:
2302 self.assertEqual(i.CountPending(), 0)
2305 if __name__ == '__main__':
2306 testutils.GanetiTestProgram()