4 # Copyright (C) 2006, 2007, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the locking module"""
34 from ganeti import constants
35 from ganeti import locking
36 from ganeti import errors
37 from ganeti import utils
38 from ganeti import compat
39 from ganeti import objects
40 from ganeti import query
45 # This is used to test the ssynchronize decorator.
46 # Since it's passed as input to a decorator it must be declared as a global.
47 _decoratorlock = locking.SharedLock("decorator lock")
49 #: List for looping tests
54 """Decorator for executing a function many times"""
55 def wrapper(*args, **kwargs):
61 def SafeSleep(duration):
64 delay = start + duration - time.time()
70 class _ThreadedTestCase(unittest.TestCase):
71 """Test class that supports adding/waiting on threads"""
73 unittest.TestCase.setUp(self)
74 self.done = Queue.Queue(0)
77 def _addThread(self, *args, **kwargs):
78 """Create and remember a new thread"""
79 t = threading.Thread(*args, **kwargs)
80 self.threads.append(t)
84 def _waitThreads(self):
85 """Wait for all our threads to finish"""
86 for t in self.threads:
88 self.failIf(t.isAlive())
92 class _ConditionTestCase(_ThreadedTestCase):
93 """Common test case for conditions"""
96 _ThreadedTestCase.setUp(self)
97 self.lock = threading.Lock()
98 self.cond = cls(self.lock)
100 def _testAcquireRelease(self):
101 self.assertFalse(self.cond._is_owned())
102 self.assertRaises(RuntimeError, self.cond.wait, None)
103 self.assertRaises(RuntimeError, self.cond.notifyAll)
106 self.assert_(self.cond._is_owned())
107 self.cond.notifyAll()
108 self.assert_(self.cond._is_owned())
111 self.assertFalse(self.cond._is_owned())
112 self.assertRaises(RuntimeError, self.cond.wait, None)
113 self.assertRaises(RuntimeError, self.cond.notifyAll)
115 def _testNotification(self):
120 self.cond.notifyAll()
125 self._addThread(target=_NotifyAll)
126 self.assertEqual(self.done.get(True, 1), "NE")
127 self.assertRaises(Queue.Empty, self.done.get_nowait)
129 self.assertEqual(self.done.get(True, 1), "NA")
130 self.assertEqual(self.done.get(True, 1), "NN")
131 self.assert_(self.cond._is_owned())
133 self.assertFalse(self.cond._is_owned())
136 class TestSingleNotifyPipeCondition(_ConditionTestCase):
137 """SingleNotifyPipeCondition tests"""
140 _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
142 def testAcquireRelease(self):
143 self._testAcquireRelease()
145 def testNotification(self):
146 self._testNotification()
148 def testWaitReuse(self):
154 def testNoNotifyReuse(self):
156 self.cond.notifyAll()
157 self.assertRaises(RuntimeError, self.cond.wait, None)
158 self.assertRaises(RuntimeError, self.cond.notifyAll)
162 class TestPipeCondition(_ConditionTestCase):
163 """PipeCondition tests"""
166 _ConditionTestCase.setUp(self, locking.PipeCondition)
168 def testAcquireRelease(self):
169 self._testAcquireRelease()
171 def testNotification(self):
172 self._testNotification()
174 def _TestWait(self, fn):
176 self._addThread(target=fn),
177 self._addThread(target=fn),
178 self._addThread(target=fn),
181 # Wait for threads to be waiting
183 self.assertEqual(self.done.get(True, 1), "A")
185 self.assertRaises(Queue.Empty, self.done.get_nowait)
188 self.assertEqual(len(self.cond._waiters), 3)
189 self.assertEqual(self.cond._waiters, set(threads))
190 # This new thread can't acquire the lock, and thus call wait, before we
192 self._addThread(target=fn)
193 self.cond.notifyAll()
194 self.assertRaises(Queue.Empty, self.done.get_nowait)
197 # We should now get 3 W and 1 A (for the new thread) in whatever order
201 got = self.done.get(True, 1)
207 self.fail("Got %s on the done queue" % got)
209 self.assertEqual(w, 3)
210 self.assertEqual(a, 1)
213 self.cond.notifyAll()
216 self.assertEqual(self.done.get_nowait(), "W")
217 self.assertRaises(Queue.Empty, self.done.get_nowait)
219 def testBlockingWait(self):
227 self._TestWait(_BlockingWait)
229 def testLongTimeoutWait(self):
237 self._TestWait(_Helper)
239 def _TimeoutWait(self, timeout, check):
241 self.cond.wait(timeout)
245 def testShortTimeoutWait(self):
246 self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
247 self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
249 self.assertEqual(self.done.get_nowait(), "T1")
250 self.assertEqual(self.done.get_nowait(), "T1")
251 self.assertRaises(Queue.Empty, self.done.get_nowait)
253 def testZeroTimeoutWait(self):
254 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
255 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
256 self._addThread(target=self._TimeoutWait, args=(0, "T0"))
258 self.assertEqual(self.done.get_nowait(), "T0")
259 self.assertEqual(self.done.get_nowait(), "T0")
260 self.assertEqual(self.done.get_nowait(), "T0")
261 self.assertRaises(Queue.Empty, self.done.get_nowait)
264 class TestSharedLock(_ThreadedTestCase):
265 """SharedLock tests"""
268 _ThreadedTestCase.setUp(self)
269 self.sl = locking.SharedLock("TestSharedLock")
271 def testSequenceAndOwnership(self):
272 self.assertFalse(self.sl._is_owned())
273 self.sl.acquire(shared=1)
274 self.assert_(self.sl._is_owned())
275 self.assert_(self.sl._is_owned(shared=1))
276 self.assertFalse(self.sl._is_owned(shared=0))
278 self.assertFalse(self.sl._is_owned())
280 self.assert_(self.sl._is_owned())
281 self.assertFalse(self.sl._is_owned(shared=1))
282 self.assert_(self.sl._is_owned(shared=0))
284 self.assertFalse(self.sl._is_owned())
285 self.sl.acquire(shared=1)
286 self.assert_(self.sl._is_owned())
287 self.assert_(self.sl._is_owned(shared=1))
288 self.assertFalse(self.sl._is_owned(shared=0))
290 self.assertFalse(self.sl._is_owned())
292 def testBooleanValue(self):
293 # semaphores are supposed to return a true value on a successful acquire
294 self.assert_(self.sl.acquire(shared=1))
296 self.assert_(self.sl.acquire())
299 def testDoubleLockingStoE(self):
300 self.sl.acquire(shared=1)
301 self.assertRaises(AssertionError, self.sl.acquire)
303 def testDoubleLockingEtoS(self):
305 self.assertRaises(AssertionError, self.sl.acquire, shared=1)
307 def testDoubleLockingStoS(self):
308 self.sl.acquire(shared=1)
309 self.assertRaises(AssertionError, self.sl.acquire, shared=1)
311 def testDoubleLockingEtoE(self):
313 self.assertRaises(AssertionError, self.sl.acquire)
315 # helper functions: called in a separate thread they acquire the lock, send
316 # their identifier on the done queue, then release it.
317 def _doItSharer(self):
319 self.sl.acquire(shared=1)
322 except errors.LockError:
325 def _doItExclusive(self):
330 except errors.LockError:
333 def _doItDelete(self):
337 except errors.LockError:
340 def testSharersCanCoexist(self):
341 self.sl.acquire(shared=1)
342 threading.Thread(target=self._doItSharer).start()
343 self.assert_(self.done.get(True, 1))
347 def testExclusiveBlocksExclusive(self):
349 self._addThread(target=self._doItExclusive)
350 self.assertRaises(Queue.Empty, self.done.get_nowait)
353 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
356 def testExclusiveBlocksDelete(self):
358 self._addThread(target=self._doItDelete)
359 self.assertRaises(Queue.Empty, self.done.get_nowait)
362 self.failUnlessEqual(self.done.get_nowait(), 'DEL')
363 self.sl = locking.SharedLock(self.sl.name)
366 def testExclusiveBlocksSharer(self):
368 self._addThread(target=self._doItSharer)
369 self.assertRaises(Queue.Empty, self.done.get_nowait)
372 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
375 def testSharerBlocksExclusive(self):
376 self.sl.acquire(shared=1)
377 self._addThread(target=self._doItExclusive)
378 self.assertRaises(Queue.Empty, self.done.get_nowait)
381 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
384 def testSharerBlocksDelete(self):
385 self.sl.acquire(shared=1)
386 self._addThread(target=self._doItDelete)
387 self.assertRaises(Queue.Empty, self.done.get_nowait)
390 self.failUnlessEqual(self.done.get_nowait(), 'DEL')
391 self.sl = locking.SharedLock(self.sl.name)
394 def testWaitingExclusiveBlocksSharer(self):
395 """SKIPPED testWaitingExclusiveBlockSharer"""
398 self.sl.acquire(shared=1)
399 # the lock is acquired in shared mode...
400 self._addThread(target=self._doItExclusive)
401 # ...but now an exclusive is waiting...
402 self._addThread(target=self._doItSharer)
403 # ...so the sharer should be blocked as well
404 self.assertRaises(Queue.Empty, self.done.get_nowait)
407 # The exclusive passed before
408 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
409 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
412 def testWaitingSharerBlocksExclusive(self):
413 """SKIPPED testWaitingSharerBlocksExclusive"""
417 # the lock is acquired in exclusive mode...
418 self._addThread(target=self._doItSharer)
419 # ...but now a sharer is waiting...
420 self._addThread(target=self._doItExclusive)
421 # ...the exclusive is waiting too...
422 self.assertRaises(Queue.Empty, self.done.get_nowait)
425 # The sharer passed before
426 self.assertEqual(self.done.get_nowait(), 'SHR')
427 self.assertEqual(self.done.get_nowait(), 'EXC')
429 def testDelete(self):
431 self.assertRaises(errors.LockError, self.sl.acquire)
432 self.assertRaises(errors.LockError, self.sl.acquire, shared=1)
433 self.assertRaises(errors.LockError, self.sl.delete)
435 def testDeleteTimeout(self):
436 self.sl.delete(timeout=60)
438 def testNoDeleteIfSharer(self):
439 self.sl.acquire(shared=1)
440 self.assertRaises(AssertionError, self.sl.delete)
443 def testDeletePendingSharersExclusiveDelete(self):
445 self._addThread(target=self._doItSharer)
446 self._addThread(target=self._doItSharer)
447 self._addThread(target=self._doItExclusive)
448 self._addThread(target=self._doItDelete)
451 # The threads who were pending return ERR
453 self.assertEqual(self.done.get_nowait(), 'ERR')
454 self.sl = locking.SharedLock(self.sl.name)
457 def testDeletePendingDeleteExclusiveSharers(self):
459 self._addThread(target=self._doItDelete)
460 self._addThread(target=self._doItExclusive)
461 self._addThread(target=self._doItSharer)
462 self._addThread(target=self._doItSharer)
465 # The two threads who were pending return both ERR
466 self.assertEqual(self.done.get_nowait(), 'ERR')
467 self.assertEqual(self.done.get_nowait(), 'ERR')
468 self.assertEqual(self.done.get_nowait(), 'ERR')
469 self.assertEqual(self.done.get_nowait(), 'ERR')
470 self.sl = locking.SharedLock(self.sl.name)
473 def testExclusiveAcquireTimeout(self):
474 for shared in [0, 1]:
475 on_queue = threading.Event()
476 release_exclusive = threading.Event()
478 def _LockExclusive():
479 self.sl.acquire(shared=0, test_notify=on_queue.set)
480 self.done.put("A: start wait")
481 release_exclusive.wait()
482 self.done.put("A: end wait")
485 # Start thread to hold lock in exclusive mode
486 self._addThread(target=_LockExclusive)
488 # Wait for wait to begin
489 self.assertEqual(self.done.get(timeout=60), "A: start wait")
491 # Wait up to 60s to get lock, but release exclusive lock as soon as we're
493 self.failUnless(self.sl.acquire(shared=shared, timeout=60,
494 test_notify=release_exclusive.set))
496 self.done.put("got 2nd")
501 self.assertEqual(self.done.get_nowait(), "A: end wait")
502 self.assertEqual(self.done.get_nowait(), "got 2nd")
503 self.assertRaises(Queue.Empty, self.done.get_nowait)
506 def testAcquireExpiringTimeout(self):
507 def _AcquireWithTimeout(shared, timeout):
508 if not self.sl.acquire(shared=shared, timeout=timeout):
509 self.done.put("timeout")
511 for shared in [0, 1]:
515 # Start shared acquires with timeout between 0 and 20 ms
517 self._addThread(target=_AcquireWithTimeout,
518 args=(shared, i * 2.0 / 1000.0))
520 # Wait for threads to finish (makes sure the acquire timeout expires
521 # before releasing the lock)
528 self.assertEqual(self.done.get_nowait(), "timeout")
530 self.assertRaises(Queue.Empty, self.done.get_nowait)
533 def testSharedSkipExclusiveAcquires(self):
534 # Tests whether shared acquires jump in front of exclusive acquires in the
537 def _Acquire(shared, name, notify_ev, wait_ev):
539 notify_fn = notify_ev.set
546 if not self.sl.acquire(shared=shared, test_notify=notify_fn):
552 # Get exclusive lock while we fill the queue
560 # Add acquires using threading.Event for synchronization. They'll be
561 # acquired exactly in the order defined in this list.
562 acquires = (shrcnt1 * [(1, "shared 1")] +
563 3 * [(0, "exclusive 1")] +
564 shrcnt2 * [(1, "shared 2")] +
565 shrcnt3 * [(1, "shared 3")] +
566 shrcnt4 * [(1, "shared 4")] +
567 3 * [(0, "exclusive 2")])
572 for args in acquires:
573 ev_cur = threading.Event()
574 self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
577 # Wait for last acquire to start
580 # Expect 6 pending exclusive acquires and 1 for all shared acquires
582 self.assertEqual(self.sl._count_pending(), 7)
584 # Release exclusive lock and wait
590 for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
591 # Shared locks aren't guaranteed to be notified in order, but they'll be
593 tmp = self.done.get_nowait()
594 if tmp == "shared 1":
596 elif tmp == "shared 2":
598 elif tmp == "shared 3":
600 elif tmp == "shared 4":
602 self.assertEqual(shrcnt1, 0)
603 self.assertEqual(shrcnt2, 0)
604 self.assertEqual(shrcnt3, 0)
605 self.assertEqual(shrcnt3, 0)
608 self.assertEqual(self.done.get_nowait(), "exclusive 1")
611 self.assertEqual(self.done.get_nowait(), "exclusive 2")
613 self.assertRaises(Queue.Empty, self.done.get_nowait)
615 def testIllegalDowngrade(self):
617 self.assertRaises(AssertionError, self.sl.downgrade)
619 # Acquire in shared mode, downgrade should be no-op
620 self.assertTrue(self.sl.acquire(shared=1))
621 self.assertTrue(self.sl._is_owned(shared=1))
622 self.assertTrue(self.sl.downgrade())
623 self.assertTrue(self.sl._is_owned(shared=1))
626 def testDowngrade(self):
627 self.assertTrue(self.sl.acquire())
628 self.assertTrue(self.sl._is_owned(shared=0))
629 self.assertTrue(self.sl.downgrade())
630 self.assertTrue(self.sl._is_owned(shared=1))
634 def testDowngradeJumpsAheadOfExclusive(self):
635 def _KeepExclusive(ev_got, ev_downgrade, ev_release):
636 self.assertTrue(self.sl.acquire())
637 self.assertTrue(self.sl._is_owned(shared=0))
640 self.assertTrue(self.sl._is_owned(shared=0))
641 self.assertTrue(self.sl.downgrade())
642 self.assertTrue(self.sl._is_owned(shared=1))
644 self.assertTrue(self.sl._is_owned(shared=1))
647 def _KeepExclusive2(ev_started, ev_release):
648 self.assertTrue(self.sl.acquire(test_notify=ev_started.set))
649 self.assertTrue(self.sl._is_owned(shared=0))
651 self.assertTrue(self.sl._is_owned(shared=0))
654 def _KeepShared(ev_started, ev_got, ev_release):
655 self.assertTrue(self.sl.acquire(shared=1, test_notify=ev_started.set))
656 self.assertTrue(self.sl._is_owned(shared=1))
659 self.assertTrue(self.sl._is_owned(shared=1))
662 # Acquire lock in exclusive mode
663 ev_got_excl1 = threading.Event()
664 ev_downgrade_excl1 = threading.Event()
665 ev_release_excl1 = threading.Event()
666 th_excl1 = self._addThread(target=_KeepExclusive,
667 args=(ev_got_excl1, ev_downgrade_excl1,
671 # Start a second exclusive acquire
672 ev_started_excl2 = threading.Event()
673 ev_release_excl2 = threading.Event()
674 th_excl2 = self._addThread(target=_KeepExclusive2,
675 args=(ev_started_excl2, ev_release_excl2))
676 ev_started_excl2.wait()
678 # Start shared acquires, will jump ahead of second exclusive acquire when
679 # first exclusive acquire downgrades
680 ev_shared = [(threading.Event(), threading.Event()) for _ in range(5)]
681 ev_release_shared = threading.Event()
683 th_shared = [self._addThread(target=_KeepShared,
684 args=(ev_started, ev_got, ev_release_shared))
685 for (ev_started, ev_got) in ev_shared]
687 # Wait for all shared acquires to start
688 for (ev, _) in ev_shared:
691 # Check lock information
692 self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
693 [(self.sl.name, "exclusive", [th_excl1.getName()], None)])
694 [(_, _, _, pending), ] = self.sl.GetLockInfo(set([query.LQ_PENDING]))
695 self.assertEqual([(pendmode, sorted(waiting))
696 for (pendmode, waiting) in pending],
697 [("exclusive", [th_excl2.getName()]),
698 ("shared", sorted(th.getName() for th in th_shared))])
700 # Shared acquires won't start until the exclusive lock is downgraded
701 ev_downgrade_excl1.set()
703 # Wait for all shared acquires to be successful
704 for (_, ev) in ev_shared:
707 # Check lock information again
708 self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE,
710 [(self.sl.name, "shared", None,
711 [("exclusive", [th_excl2.getName()])])])
712 [(_, _, owner, _), ] = self.sl.GetLockInfo(set([query.LQ_OWNER]))
713 self.assertEqual(set(owner), set([th_excl1.getName()] +
714 [th.getName() for th in th_shared]))
716 ev_release_excl1.set()
717 ev_release_excl2.set()
718 ev_release_shared.set()
722 self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER,
724 [(self.sl.name, None, None, [])])
727 def testMixedAcquireTimeout(self):
728 sync = threading.Event()
730 def _AcquireShared(ev):
731 if not self.sl.acquire(shared=1, timeout=None):
734 self.done.put("shared")
739 # Wait for notification from main thread
747 ev = threading.Event()
748 self._addThread(target=_AcquireShared, args=(ev, ))
751 # Wait for all acquires to finish
755 self.assertEqual(self.sl._count_pending(), 0)
757 # Try to get exclusive lock
758 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
760 # Acquire exclusive without timeout
761 exclsync = threading.Event()
762 exclev = threading.Event()
764 def _AcquireExclusive():
765 if not self.sl.acquire(shared=0):
768 self.done.put("exclusive")
773 # Wait for notification from main thread
778 self._addThread(target=_AcquireExclusive)
780 # Try to get exclusive lock
781 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
783 # Make all shared holders release their locks
786 # Wait for exclusive acquire to succeed
789 self.assertEqual(self.sl._count_pending(), 0)
791 # Try to get exclusive lock
792 self.failIf(self.sl.acquire(shared=0, timeout=0.02))
794 def _AcquireSharedSimple():
795 if self.sl.acquire(shared=1, timeout=None):
796 self.done.put("shared2")
800 self._addThread(target=_AcquireSharedSimple)
802 # Tell exclusive lock to release
805 # Wait for everything to finish
808 self.assertEqual(self.sl._count_pending(), 0)
812 self.assertEqual(self.done.get_nowait(), "shared")
814 self.assertEqual(self.done.get_nowait(), "exclusive")
817 self.assertEqual(self.done.get_nowait(), "shared2")
819 self.assertRaises(Queue.Empty, self.done.get_nowait)
821 def testPriority(self):
822 # Acquire in exclusive mode
823 self.assert_(self.sl.acquire(shared=0))
826 def _Acquire(prev, next, shared, priority, result):
828 self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
830 self.done.put(result)
834 counter = itertools.count(0)
835 priorities = range(-20, 30)
836 first = threading.Event()
842 # [(shared/exclusive, set(acquire names), set(pending threads)),
843 # (shared/exclusive, ...),
849 # References shared acquire per priority in L{perprio}. Data structure:
851 # priority: (shared=1, set(acquire names), set(pending threads)),
855 for seed in [4979, 9523, 14902, 32440]:
856 # Use a deterministic random generator
857 rnd = random.Random(seed)
858 for priority in [rnd.choice(priorities) for _ in range(30)]:
863 acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
865 ev = threading.Event()
866 thread = self._addThread(target=_Acquire,
867 args=(prev, ev, shared, priority, acqname))
870 # Record expected aqcuire, see above for structure
871 data = (shared, set([acqname]), set([thread]))
872 priolist = perprio.setdefault(priority, [])
874 priosh = prioshared.get(priority, None)
876 # Shared acquires are merged
877 for i, j in zip(priosh[1:], data[1:]):
879 assert data[0] == priosh[0]
881 prioshared[priority] = data
882 priolist.append(data)
884 priolist.append(data)
886 # Start all acquires and wait for them
890 # Check lock information
891 self.assertEqual(self.sl.GetLockInfo(set()),
892 [(self.sl.name, None, None, None)])
893 self.assertEqual(self.sl.GetLockInfo(set([query.LQ_MODE, query.LQ_OWNER])),
894 [(self.sl.name, "exclusive",
895 [threading.currentThread().getName()], None)])
897 self._VerifyPrioPending(self.sl.GetLockInfo(set([query.LQ_PENDING])),
900 # Let threads acquire the lock
903 # Wait for everything to finish
906 self.assert_(self.sl._check_empty())
908 # Check acquires by priority
909 for acquires in [perprio[i] for i in sorted(perprio.keys())]:
910 for (_, names, _) in acquires:
911 # For shared acquires, the set will contain 1..n entries. For exclusive
914 names.remove(self.done.get_nowait())
915 self.assertFalse(compat.any(names for (_, names, _) in acquires))
917 self.assertRaises(Queue.Empty, self.done.get_nowait)
919 def _VerifyPrioPending(self, ((name, mode, owner, pending), ), perprio):
920 self.assertEqual(name, self.sl.name)
921 self.assert_(mode is None)
922 self.assert_(owner is None)
924 self.assertEqual([(pendmode, sorted(waiting))
925 for (pendmode, waiting) in pending],
926 [(["exclusive", "shared"][int(bool(shared))],
927 sorted(t.getName() for t in threads))
928 for acquires in [perprio[i]
929 for i in sorted(perprio.keys())]
930 for (shared, _, threads) in acquires])
933 class TestSharedLockInCondition(_ThreadedTestCase):
934 """SharedLock as a condition lock tests"""
937 _ThreadedTestCase.setUp(self)
938 self.sl = locking.SharedLock("TestSharedLockInCondition")
941 def setCondition(self):
942 self.cond = threading.Condition(self.sl)
944 def testKeepMode(self):
945 self.cond.acquire(shared=1)
946 self.assert_(self.sl._is_owned(shared=1))
948 self.assert_(self.sl._is_owned(shared=1))
950 self.cond.acquire(shared=0)
951 self.assert_(self.sl._is_owned(shared=0))
953 self.assert_(self.sl._is_owned(shared=0))
957 class TestSharedLockInPipeCondition(TestSharedLockInCondition):
958 """SharedLock as a pipe condition lock tests"""
960 def setCondition(self):
961 self.cond = locking.PipeCondition(self.sl)
964 class TestSSynchronizedDecorator(_ThreadedTestCase):
965 """Shared Lock Synchronized decorator test"""
968 _ThreadedTestCase.setUp(self)
970 @locking.ssynchronized(_decoratorlock)
971 def _doItExclusive(self):
972 self.assert_(_decoratorlock._is_owned())
975 @locking.ssynchronized(_decoratorlock, shared=1)
976 def _doItSharer(self):
977 self.assert_(_decoratorlock._is_owned(shared=1))
980 def testDecoratedFunctions(self):
981 self._doItExclusive()
982 self.assertFalse(_decoratorlock._is_owned())
984 self.assertFalse(_decoratorlock._is_owned())
986 def testSharersCanCoexist(self):
987 _decoratorlock.acquire(shared=1)
988 threading.Thread(target=self._doItSharer).start()
989 self.assert_(self.done.get(True, 1))
990 _decoratorlock.release()
993 def testExclusiveBlocksExclusive(self):
994 _decoratorlock.acquire()
995 self._addThread(target=self._doItExclusive)
996 # give it a bit of time to check that it's not actually doing anything
997 self.assertRaises(Queue.Empty, self.done.get_nowait)
998 _decoratorlock.release()
1000 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
1003 def testExclusiveBlocksSharer(self):
1004 _decoratorlock.acquire()
1005 self._addThread(target=self._doItSharer)
1006 self.assertRaises(Queue.Empty, self.done.get_nowait)
1007 _decoratorlock.release()
1009 self.failUnlessEqual(self.done.get_nowait(), 'SHR')
1012 def testSharerBlocksExclusive(self):
1013 _decoratorlock.acquire(shared=1)
1014 self._addThread(target=self._doItExclusive)
1015 self.assertRaises(Queue.Empty, self.done.get_nowait)
1016 _decoratorlock.release()
1018 self.failUnlessEqual(self.done.get_nowait(), 'EXC')
1021 class TestLockSet(_ThreadedTestCase):
1025 _ThreadedTestCase.setUp(self)
1029 """Helper to (re)initialize the lock set"""
1030 self.resources = ['one', 'two', 'three']
1031 self.ls = locking.LockSet(self.resources, "TestLockSet")
1033 def testResources(self):
1034 self.assertEquals(self.ls._names(), set(self.resources))
1035 newls = locking.LockSet([], "TestLockSet.testResources")
1036 self.assertEquals(newls._names(), set())
1038 def testAcquireRelease(self):
1039 self.assert_(self.ls.acquire('one'))
1040 self.assertEquals(self.ls._list_owned(), set(['one']))
1042 self.assertEquals(self.ls._list_owned(), set())
1043 self.assertEquals(self.ls.acquire(['one']), set(['one']))
1044 self.assertEquals(self.ls._list_owned(), set(['one']))
1046 self.assertEquals(self.ls._list_owned(), set())
1047 self.ls.acquire(['one', 'two', 'three'])
1048 self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
1049 self.ls.release('one')
1050 self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
1051 self.ls.release(['three'])
1052 self.assertEquals(self.ls._list_owned(), set(['two']))
1054 self.assertEquals(self.ls._list_owned(), set())
1055 self.assertEquals(self.ls.acquire(['one', 'three']), set(['one', 'three']))
1056 self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
1058 self.assertEquals(self.ls._list_owned(), set())
1060 def testNoDoubleAcquire(self):
1061 self.ls.acquire('one')
1062 self.assertRaises(AssertionError, self.ls.acquire, 'one')
1063 self.assertRaises(AssertionError, self.ls.acquire, ['two'])
1064 self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
1066 self.ls.acquire(['one', 'three'])
1067 self.ls.release('one')
1068 self.assertRaises(AssertionError, self.ls.acquire, ['two'])
1069 self.ls.release('three')
1071 def testNoWrongRelease(self):
1072 self.assertRaises(AssertionError, self.ls.release)
1073 self.ls.acquire('one')
1074 self.assertRaises(AssertionError, self.ls.release, 'two')
1076 def testAddRemove(self):
1078 self.assertEquals(self.ls._list_owned(), set())
1079 self.assert_('four' in self.ls._names())
1080 self.ls.add(['five', 'six', 'seven'], acquired=1)
1081 self.assert_('five' in self.ls._names())
1082 self.assert_('six' in self.ls._names())
1083 self.assert_('seven' in self.ls._names())
1084 self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
1085 self.assertEquals(self.ls.remove(['five', 'six']), ['five', 'six'])
1086 self.assert_('five' not in self.ls._names())
1087 self.assert_('six' not in self.ls._names())
1088 self.assertEquals(self.ls._list_owned(), set(['seven']))
1089 self.assertRaises(AssertionError, self.ls.add, 'eight', acquired=1)
1090 self.ls.remove('seven')
1091 self.assert_('seven' not in self.ls._names())
1092 self.assertEquals(self.ls._list_owned(), set([]))
1093 self.ls.acquire(None, shared=1)
1094 self.assertRaises(AssertionError, self.ls.add, 'eight')
1096 self.ls.acquire(None)
1097 self.ls.add('eight', acquired=1)
1098 self.assert_('eight' in self.ls._names())
1099 self.assert_('eight' in self.ls._list_owned())
1101 self.assert_('nine' in self.ls._names())
1102 self.assert_('nine' not in self.ls._list_owned())
1104 self.ls.remove(['two'])
1105 self.assert_('two' not in self.ls._names())
1106 self.ls.acquire('three')
1107 self.assertEquals(self.ls.remove(['three']), ['three'])
1108 self.assert_('three' not in self.ls._names())
1109 self.assertEquals(self.ls.remove('three'), [])
1110 self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['one'])
1111 self.assert_('one' not in self.ls._names())
1113 def testRemoveNonBlocking(self):
1114 self.ls.acquire('one')
1115 self.assertEquals(self.ls.remove('one'), ['one'])
1116 self.ls.acquire(['two', 'three'])
1117 self.assertEquals(self.ls.remove(['two', 'three']),
1120 def testNoDoubleAdd(self):
1121 self.assertRaises(errors.LockError, self.ls.add, 'two')
1123 self.assertRaises(errors.LockError, self.ls.add, 'four')
1125 def testNoWrongRemoves(self):
1126 self.ls.acquire(['one', 'three'], shared=1)
1127 # Cannot remove 'two' while holding something which is not a superset
1128 self.assertRaises(AssertionError, self.ls.remove, 'two')
1129 # Cannot remove 'three' as we are sharing it
1130 self.assertRaises(AssertionError, self.ls.remove, 'three')
1132 def testAcquireSetLock(self):
1133 # acquire the set-lock exclusively
1134 self.assertEquals(self.ls.acquire(None), set(['one', 'two', 'three']))
1135 self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
1136 self.assertEquals(self.ls._is_owned(), True)
1137 self.assertEquals(self.ls._names(), set(['one', 'two', 'three']))
1138 # I can still add/remove elements...
1139 self.assertEquals(self.ls.remove(['two', 'three']), ['two', 'three'])
1140 self.assert_(self.ls.add('six'))
1142 # share the set-lock
1143 self.assertEquals(self.ls.acquire(None, shared=1), set(['one', 'six']))
1144 # adding new elements is not possible
1145 self.assertRaises(AssertionError, self.ls.add, 'five')
1148 def testAcquireWithRepetitions(self):
1149 self.assertEquals(self.ls.acquire(['two', 'two', 'three'], shared=1),
1150 set(['two', 'two', 'three']))
1151 self.ls.release(['two', 'two'])
1152 self.assertEquals(self.ls._list_owned(), set(['three']))
1154 def testEmptyAcquire(self):
1155 # Acquire an empty list of locks...
1156 self.assertEquals(self.ls.acquire([]), set())
1157 self.assertEquals(self.ls._list_owned(), set())
1158 # New locks can still be addded
1159 self.assert_(self.ls.add('six'))
1160 # "re-acquiring" is not an issue, since we had really acquired nothing
1161 self.assertEquals(self.ls.acquire([], shared=1), set())
1162 self.assertEquals(self.ls._list_owned(), set())
1163 # We haven't really acquired anything, so we cannot release
1164 self.assertRaises(AssertionError, self.ls.release)
1166 def _doLockSet(self, names, shared):
1168 self.ls.acquire(names, shared=shared)
1169 self.done.put('DONE')
1171 except errors.LockError:
1172 self.done.put('ERR')
1174 def _doAddSet(self, names):
1176 self.ls.add(names, acquired=1)
1177 self.done.put('DONE')
1179 except errors.LockError:
1180 self.done.put('ERR')
1182 def _doRemoveSet(self, names):
1183 self.done.put(self.ls.remove(names))
1186 def testConcurrentSharedAcquire(self):
1187 self.ls.acquire(['one', 'two'], shared=1)
1188 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1190 self.assertEqual(self.done.get_nowait(), 'DONE')
1191 self._addThread(target=self._doLockSet, args=(['one', 'two', 'three'], 1))
1193 self.assertEqual(self.done.get_nowait(), 'DONE')
1194 self._addThread(target=self._doLockSet, args=('three', 1))
1196 self.assertEqual(self.done.get_nowait(), 'DONE')
1197 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1198 self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1199 self.assertRaises(Queue.Empty, self.done.get_nowait)
1202 self.assertEqual(self.done.get_nowait(), 'DONE')
1203 self.assertEqual(self.done.get_nowait(), 'DONE')
1206 def testConcurrentExclusiveAcquire(self):
1207 self.ls.acquire(['one', 'two'])
1208 self._addThread(target=self._doLockSet, args=('three', 1))
1210 self.assertEqual(self.done.get_nowait(), 'DONE')
1211 self._addThread(target=self._doLockSet, args=('three', 0))
1213 self.assertEqual(self.done.get_nowait(), 'DONE')
1214 self.assertRaises(Queue.Empty, self.done.get_nowait)
1215 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1216 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1217 self._addThread(target=self._doLockSet, args=('one', 0))
1218 self._addThread(target=self._doLockSet, args=('one', 1))
1219 self._addThread(target=self._doLockSet, args=(['two', 'three'], 0))
1220 self._addThread(target=self._doLockSet, args=(['two', 'three'], 1))
1221 self.assertRaises(Queue.Empty, self.done.get_nowait)
1225 self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1228 def testSimpleAcquireTimeoutExpiring(self):
1229 names = sorted(self.ls._names())
1230 self.assert_(len(names) >= 3)
1232 # Get name of first lock
1235 # Get name of last lock
1239 # Block first and try to lock it again
1242 # Block last and try to lock all locks
1245 # Block last and try to lock it again
1249 for (wanted, block) in checks:
1250 # Lock in exclusive mode
1251 self.assert_(self.ls.acquire(block, shared=0))
1254 # Try to get the same lock again with a timeout (should never succeed)
1255 acquired = self.ls.acquire(wanted, timeout=0.1, shared=0)
1257 self.done.put("acquired")
1260 self.assert_(acquired is None)
1261 self.assertFalse(self.ls._list_owned())
1262 self.assertFalse(self.ls._is_owned())
1263 self.done.put("not acquired")
1265 self._addThread(target=_AcquireOne)
1267 # Wait for timeout in thread to expire
1270 # Release exclusive lock again
1273 self.assertEqual(self.done.get_nowait(), "not acquired")
1274 self.assertRaises(Queue.Empty, self.done.get_nowait)
1277 def testDelayedAndExpiringLockAcquire(self):
1279 self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1281 for expire in (False, True):
1282 names = sorted(self.ls._names())
1283 self.assertEqual(len(names), 8)
1285 lock_ev = dict([(i, threading.Event()) for i in names])
1287 # Lock all in exclusive mode
1288 self.assert_(self.ls.acquire(names, shared=0))
1291 # We'll wait at least 300ms per lock
1292 lockwait = len(names) * [0.3]
1294 # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1295 # this gives us up to 2.4s to fail.
1296 lockall_timeout = 0.4
1298 # This should finish rather quickly
1300 lockall_timeout = len(names) * 5.0
1303 def acquire_notification(name):
1305 self.done.put("getting %s" % name)
1310 if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1311 test_notify=acquire_notification):
1312 self.done.put("got all")
1315 self.done.put("timeout on all")
1318 for ev in lock_ev.values():
1321 t = self._addThread(target=_LockAll)
1323 for idx, name in enumerate(names):
1324 # Wait for actual acquire on this lock to start
1325 lock_ev[name].wait(10.0)
1327 if expire and t.isAlive():
1328 # Wait some time after getting the notification to make sure the lock
1329 # acquire will expire
1330 SafeSleep(lockwait[idx])
1332 self.ls.release(names=name)
1334 self.assertFalse(self.ls._list_owned())
1339 # Not checking which locks were actually acquired. Doing so would be
1340 # too timing-dependant.
1341 self.assertEqual(self.done.get_nowait(), "timeout on all")
1344 self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1345 self.assertEqual(self.done.get_nowait(), "got all")
1346 self.assertRaises(Queue.Empty, self.done.get_nowait)
1349 def testConcurrentRemove(self):
1351 self.ls.acquire(['one', 'two', 'four'])
1352 self._addThread(target=self._doLockSet, args=(['one', 'four'], 0))
1353 self._addThread(target=self._doLockSet, args=(['one', 'four'], 1))
1354 self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
1355 self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
1356 self.assertRaises(Queue.Empty, self.done.get_nowait)
1357 self.ls.remove('one')
1361 self.failUnlessEqual(self.done.get_nowait(), 'ERR')
1362 self.ls.add(['five', 'six'], acquired=1)
1363 self._addThread(target=self._doLockSet, args=(['three', 'six'], 1))
1364 self._addThread(target=self._doLockSet, args=(['three', 'six'], 0))
1365 self._addThread(target=self._doLockSet, args=(['four', 'six'], 1))
1366 self._addThread(target=self._doLockSet, args=(['four', 'six'], 0))
1367 self.ls.remove('five')
1371 self.failUnlessEqual(self.done.get_nowait(), 'DONE')
1372 self.ls.acquire(['three', 'four'])
1373 self._addThread(target=self._doRemoveSet, args=(['four', 'six'], ))
1374 self.assertRaises(Queue.Empty, self.done.get_nowait)
1375 self.ls.remove('four')
1377 self.assertEqual(self.done.get_nowait(), ['six'])
1378 self._addThread(target=self._doRemoveSet, args=(['two']))
1380 self.assertEqual(self.done.get_nowait(), ['two'])
1386 def testConcurrentSharedSetLock(self):
1387 # share the set-lock...
1388 self.ls.acquire(None, shared=1)
1389 # ...another thread can share it too
1390 self._addThread(target=self._doLockSet, args=(None, 1))
1392 self.assertEqual(self.done.get_nowait(), 'DONE')
1393 # ...or just share some elements
1394 self._addThread(target=self._doLockSet, args=(['one', 'three'], 1))
1396 self.assertEqual(self.done.get_nowait(), 'DONE')
1397 # ...but not add new ones or remove any
1398 t = self._addThread(target=self._doAddSet, args=(['nine']))
1399 self._addThread(target=self._doRemoveSet, args=(['two'], ))
1400 self.assertRaises(Queue.Empty, self.done.get_nowait)
1401 # this just releases the set-lock
1404 self.assertEqual(self.done.get_nowait(), 'DONE')
1405 # release the lock on the actual elements so remove() can proceed too
1408 self.failUnlessEqual(self.done.get_nowait(), ['two'])
1413 def testConcurrentExclusiveSetLock(self):
1414 # acquire the set-lock...
1415 self.ls.acquire(None, shared=0)
1416 # ...no one can do anything else
1417 self._addThread(target=self._doLockSet, args=(None, 1))
1418 self._addThread(target=self._doLockSet, args=(None, 0))
1419 self._addThread(target=self._doLockSet, args=(['three'], 0))
1420 self._addThread(target=self._doLockSet, args=(['two'], 1))
1421 self._addThread(target=self._doAddSet, args=(['nine']))
1422 self.assertRaises(Queue.Empty, self.done.get_nowait)
1426 self.assertEqual(self.done.get(True, 1), 'DONE')
1431 def testConcurrentSetLockAdd(self):
1432 self.ls.acquire('one')
1433 # Another thread wants the whole SetLock
1434 self._addThread(target=self._doLockSet, args=(None, 0))
1435 self._addThread(target=self._doLockSet, args=(None, 1))
1436 self.assertRaises(Queue.Empty, self.done.get_nowait)
1437 self.assertRaises(AssertionError, self.ls.add, 'four')
1440 self.assertEqual(self.done.get_nowait(), 'DONE')
1441 self.assertEqual(self.done.get_nowait(), 'DONE')
1442 self.ls.acquire(None)
1443 self._addThread(target=self._doLockSet, args=(None, 0))
1444 self._addThread(target=self._doLockSet, args=(None, 1))
1445 self.assertRaises(Queue.Empty, self.done.get_nowait)
1447 self.ls.add('five', acquired=1)
1448 self.ls.add('six', acquired=1, shared=1)
1449 self.assertEquals(self.ls._list_owned(),
1450 set(['one', 'two', 'three', 'five', 'six']))
1451 self.assertEquals(self.ls._is_owned(), True)
1452 self.assertEquals(self.ls._names(),
1453 set(['one', 'two', 'three', 'four', 'five', 'six']))
1456 self.assertEqual(self.done.get_nowait(), 'DONE')
1457 self.assertEqual(self.done.get_nowait(), 'DONE')
1461 def testEmptyLockSet(self):
1463 self.assertEqual(self.ls.acquire(None), set(['one', 'two', 'three']))
1465 self.ls.remove(['one', 'two', 'three'])
1466 # and adds/locks by another thread still wait
1467 self._addThread(target=self._doAddSet, args=(['nine']))
1468 self._addThread(target=self._doLockSet, args=(None, 1))
1469 self._addThread(target=self._doLockSet, args=(None, 0))
1470 self.assertRaises(Queue.Empty, self.done.get_nowait)
1474 self.assertEqual(self.done.get_nowait(), 'DONE')
1476 self.assertEqual(self.ls.remove(['nine']), ['nine'])
1478 self.assertEqual(self.ls.acquire(None, shared=1), set())
1479 # other sharers can go, adds still wait
1480 self._addThread(target=self._doLockSet, args=(None, 1))
1482 self.assertEqual(self.done.get_nowait(), 'DONE')
1483 self._addThread(target=self._doAddSet, args=(['nine']))
1484 self.assertRaises(Queue.Empty, self.done.get_nowait)
1487 self.assertEqual(self.done.get_nowait(), 'DONE')
1490 def testAcquireWithNamesDowngrade(self):
1491 self.assertEquals(self.ls.acquire("two", shared=0), set(["two"]))
1492 self.assertTrue(self.ls._is_owned())
1493 self.assertFalse(self.ls._get_lock()._is_owned())
1495 self.assertFalse(self.ls._is_owned())
1496 self.assertFalse(self.ls._get_lock()._is_owned())
1497 # Can't downgrade after releasing
1498 self.assertRaises(AssertionError, self.ls.downgrade, "two")
1500 def testDowngrade(self):
1501 # Not owning anything, must raise an exception
1502 self.assertFalse(self.ls._is_owned())
1503 self.assertRaises(AssertionError, self.ls.downgrade)
1505 self.assertFalse(compat.any(i._is_owned()
1506 for i in self.ls._get_lockdict().values()))
1508 self.assertEquals(self.ls.acquire(None, shared=0),
1509 set(["one", "two", "three"]))
1510 self.assertRaises(AssertionError, self.ls.downgrade, "unknown lock")
1512 self.assertTrue(self.ls._get_lock()._is_owned(shared=0))
1513 self.assertTrue(compat.all(i._is_owned(shared=0)
1514 for i in self.ls._get_lockdict().values()))
1516 # Start downgrading locks
1517 self.assertTrue(self.ls.downgrade(names=["one"]))
1518 self.assertTrue(self.ls._get_lock()._is_owned(shared=0))
1519 self.assertTrue(compat.all(lock._is_owned(shared=[0, 1][int(name == "one")])
1521 self.ls._get_lockdict().items()))
1523 self.assertTrue(self.ls.downgrade(names="two"))
1524 self.assertTrue(self.ls._get_lock()._is_owned(shared=0))
1525 should_share = lambda name: [0, 1][int(name in ("one", "two"))]
1526 self.assertTrue(compat.all(lock._is_owned(shared=should_share(name))
1528 self.ls._get_lockdict().items()))
1530 # Downgrading the last exclusive lock to shared must downgrade the
1531 # lockset-internal lock too
1532 self.assertTrue(self.ls.downgrade(names="three"))
1533 self.assertTrue(self.ls._get_lock()._is_owned(shared=1))
1534 self.assertTrue(compat.all(i._is_owned(shared=1)
1535 for i in self.ls._get_lockdict().values()))
1537 # Downgrading a shared lock must be a no-op
1538 self.assertTrue(self.ls.downgrade(names=["one", "three"]))
1539 self.assertTrue(self.ls._get_lock()._is_owned(shared=1))
1540 self.assertTrue(compat.all(i._is_owned(shared=1)
1541 for i in self.ls._get_lockdict().values()))
1545 def testPriority(self):
1546 def _Acquire(prev, next, name, priority, success_fn):
1548 self.assert_(self.ls.acquire(name, shared=0,
1550 test_notify=lambda _: next.set()))
1556 # Get all in exclusive mode
1557 self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
1559 done_two = Queue.Queue(0)
1561 first = threading.Event()
1564 acquires = [("one", prio, self.done) for prio in range(1, 33)]
1565 acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
1567 # Use a deterministic random generator
1568 random.Random(741).shuffle(acquires)
1570 for (name, prio, done) in acquires:
1571 ev = threading.Event()
1572 self._addThread(target=_Acquire,
1573 args=(prev, ev, name, prio,
1574 compat.partial(done.put, "Prio%s" % prio)))
1580 # Wait for last acquire to start
1583 # Let threads acquire locks
1586 # Wait for threads to finish
1589 for i in range(1, 33):
1590 self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
1591 self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
1593 self.assertRaises(Queue.Empty, self.done.get_nowait)
1594 self.assertRaises(Queue.Empty, done_two.get_nowait)
1597 class TestGanetiLockManager(_ThreadedTestCase):
1600 _ThreadedTestCase.setUp(self)
1601 self.nodes=['n1', 'n2']
1602 self.nodegroups=['g1', 'g2']
1603 self.instances=['i1', 'i2', 'i3']
1604 self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups,
1608 # Don't try this at home...
1609 locking.GanetiLockManager._instance = None
1611 def testLockingConstants(self):
1612 # The locking library internally cheats by assuming its constants have some
1613 # relationships with each other. Check those hold true.
1614 # This relationship is also used in the Processor to recursively acquire
1615 # the right locks. Again, please don't break it.
1616 for i in range(len(locking.LEVELS)):
1617 self.assertEqual(i, locking.LEVELS[i])
1619 def testDoubleGLFails(self):
1620 self.assertRaises(AssertionError, locking.GanetiLockManager, [], [], [])
1622 def testLockNames(self):
1623 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1624 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1625 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1626 set(self.nodegroups))
1627 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1628 set(self.instances))
1630 def testInitAndResources(self):
1631 locking.GanetiLockManager._instance = None
1632 self.GL = locking.GanetiLockManager([], [], [])
1633 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1634 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1635 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1636 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1638 locking.GanetiLockManager._instance = None
1639 self.GL = locking.GanetiLockManager(self.nodes, self.nodegroups, [])
1640 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1641 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(self.nodes))
1642 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP),
1643 set(self.nodegroups))
1644 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set())
1646 locking.GanetiLockManager._instance = None
1647 self.GL = locking.GanetiLockManager([], [], self.instances)
1648 self.assertEqual(self.GL._names(locking.LEVEL_CLUSTER), set(['BGL']))
1649 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set())
1650 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set())
1651 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE),
1652 set(self.instances))
1654 def testAcquireRelease(self):
1655 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1656 self.assertEquals(self.GL._list_owned(locking.LEVEL_CLUSTER), set(['BGL']))
1657 self.GL.acquire(locking.LEVEL_INSTANCE, ['i1'])
1658 self.GL.acquire(locking.LEVEL_NODEGROUP, ['g2'])
1659 self.GL.acquire(locking.LEVEL_NODE, ['n1', 'n2'], shared=1)
1660 self.GL.release(locking.LEVEL_NODE, ['n2'])
1661 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set(['n1']))
1662 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1663 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1664 self.GL.release(locking.LEVEL_NODE)
1665 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE), set())
1666 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP), set(['g2']))
1667 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i1']))
1668 self.GL.release(locking.LEVEL_NODEGROUP)
1669 self.GL.release(locking.LEVEL_INSTANCE)
1670 self.assertRaises(errors.LockError, self.GL.acquire,
1671 locking.LEVEL_INSTANCE, ['i5'])
1672 self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'], shared=1)
1673 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE), set(['i3']))
1675 def testAcquireWholeSets(self):
1676 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1677 self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1678 set(self.instances))
1679 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1680 set(self.instances))
1681 self.assertEquals(self.GL.acquire(locking.LEVEL_NODEGROUP, None),
1682 set(self.nodegroups))
1683 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODEGROUP),
1684 set(self.nodegroups))
1685 self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, None, shared=1),
1687 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1689 self.GL.release(locking.LEVEL_NODE)
1690 self.GL.release(locking.LEVEL_NODEGROUP)
1691 self.GL.release(locking.LEVEL_INSTANCE)
1692 self.GL.release(locking.LEVEL_CLUSTER)
1694 def testAcquireWholeAndPartial(self):
1695 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1696 self.assertEquals(self.GL.acquire(locking.LEVEL_INSTANCE, None),
1697 set(self.instances))
1698 self.assertEquals(self.GL._list_owned(locking.LEVEL_INSTANCE),
1699 set(self.instances))
1700 self.assertEquals(self.GL.acquire(locking.LEVEL_NODE, ['n2'], shared=1),
1702 self.assertEquals(self.GL._list_owned(locking.LEVEL_NODE),
1704 self.GL.release(locking.LEVEL_NODE)
1705 self.GL.release(locking.LEVEL_INSTANCE)
1706 self.GL.release(locking.LEVEL_CLUSTER)
1708 def testBGLDependency(self):
1709 self.assertRaises(AssertionError, self.GL.acquire,
1710 locking.LEVEL_NODE, ['n1', 'n2'])
1711 self.assertRaises(AssertionError, self.GL.acquire,
1712 locking.LEVEL_INSTANCE, ['i3'])
1713 self.assertRaises(AssertionError, self.GL.acquire,
1714 locking.LEVEL_NODEGROUP, ['g1'])
1715 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1716 self.GL.acquire(locking.LEVEL_NODE, ['n1'])
1717 self.assertRaises(AssertionError, self.GL.release,
1718 locking.LEVEL_CLUSTER, ['BGL'])
1719 self.assertRaises(AssertionError, self.GL.release,
1720 locking.LEVEL_CLUSTER)
1721 self.GL.release(locking.LEVEL_NODE)
1722 self.GL.acquire(locking.LEVEL_INSTANCE, ['i1', 'i2'])
1723 self.assertRaises(AssertionError, self.GL.release,
1724 locking.LEVEL_CLUSTER, ['BGL'])
1725 self.assertRaises(AssertionError, self.GL.release,
1726 locking.LEVEL_CLUSTER)
1727 self.GL.release(locking.LEVEL_INSTANCE)
1728 self.GL.acquire(locking.LEVEL_NODEGROUP, None)
1729 self.GL.release(locking.LEVEL_NODEGROUP, ['g1'])
1730 self.assertRaises(AssertionError, self.GL.release,
1731 locking.LEVEL_CLUSTER, ['BGL'])
1732 self.assertRaises(AssertionError, self.GL.release,
1733 locking.LEVEL_CLUSTER)
1734 self.GL.release(locking.LEVEL_NODEGROUP)
1735 self.GL.release(locking.LEVEL_CLUSTER)
1737 def testWrongOrder(self):
1738 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1739 self.GL.acquire(locking.LEVEL_NODE, ['n2'])
1740 self.assertRaises(AssertionError, self.GL.acquire,
1741 locking.LEVEL_NODE, ['n1'])
1742 self.assertRaises(AssertionError, self.GL.acquire,
1743 locking.LEVEL_NODEGROUP, ['g1'])
1744 self.assertRaises(AssertionError, self.GL.acquire,
1745 locking.LEVEL_INSTANCE, ['i2'])
1747 def testModifiableLevels(self):
1748 self.assertRaises(AssertionError, self.GL.add, locking.LEVEL_CLUSTER,
1750 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'])
1751 self.GL.add(locking.LEVEL_INSTANCE, ['i4'])
1752 self.GL.remove(locking.LEVEL_INSTANCE, ['i3'])
1753 self.GL.remove(locking.LEVEL_INSTANCE, ['i1'])
1754 self.assertEqual(self.GL._names(locking.LEVEL_INSTANCE), set(['i2', 'i4']))
1755 self.GL.add(locking.LEVEL_NODE, ['n3'])
1756 self.GL.remove(locking.LEVEL_NODE, ['n1'])
1757 self.assertEqual(self.GL._names(locking.LEVEL_NODE), set(['n2', 'n3']))
1758 self.GL.add(locking.LEVEL_NODEGROUP, ['g3'])
1759 self.GL.remove(locking.LEVEL_NODEGROUP, ['g2'])
1760 self.GL.remove(locking.LEVEL_NODEGROUP, ['g1'])
1761 self.assertEqual(self.GL._names(locking.LEVEL_NODEGROUP), set(['g3']))
1762 self.assertRaises(AssertionError, self.GL.remove, locking.LEVEL_CLUSTER,
1765 # Helper function to run as a thread that shared the BGL and then acquires
1766 # some locks at another level.
1767 def _doLock(self, level, names, shared):
1769 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1770 self.GL.acquire(level, names, shared=shared)
1771 self.done.put('DONE')
1772 self.GL.release(level)
1773 self.GL.release(locking.LEVEL_CLUSTER)
1774 except errors.LockError:
1775 self.done.put('ERR')
1778 def testConcurrency(self):
1779 self.GL.acquire(locking.LEVEL_CLUSTER, ['BGL'], shared=1)
1780 self._addThread(target=self._doLock,
1781 args=(locking.LEVEL_INSTANCE, 'i1', 1))
1783 self.assertEqual(self.done.get_nowait(), 'DONE')
1784 self.GL.acquire(locking.LEVEL_INSTANCE, ['i3'])
1785 self._addThread(target=self._doLock,
1786 args=(locking.LEVEL_INSTANCE, 'i1', 1))
1788 self.assertEqual(self.done.get_nowait(), 'DONE')
1789 self._addThread(target=self._doLock,
1790 args=(locking.LEVEL_INSTANCE, 'i3', 1))
1791 self.assertRaises(Queue.Empty, self.done.get_nowait)
1792 self.GL.release(locking.LEVEL_INSTANCE)
1794 self.assertEqual(self.done.get_nowait(), 'DONE')
1795 self.GL.acquire(locking.LEVEL_INSTANCE, ['i2'], shared=1)
1796 self._addThread(target=self._doLock,
1797 args=(locking.LEVEL_INSTANCE, 'i2', 1))
1799 self.assertEqual(self.done.get_nowait(), 'DONE')
1800 self._addThread(target=self._doLock,
1801 args=(locking.LEVEL_INSTANCE, 'i2', 0))
1802 self.assertRaises(Queue.Empty, self.done.get_nowait)
1803 self.GL.release(locking.LEVEL_INSTANCE)
1805 self.assertEqual(self.done.get(True, 1), 'DONE')
1806 self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1809 class TestLockMonitor(_ThreadedTestCase):
1811 _ThreadedTestCase.setUp(self)
1812 self.lm = locking.LockMonitor()
1814 def testSingleThread(self):
1817 for i in range(100):
1818 name = "TestLock%s" % i
1819 locks.append(locking.SharedLock(name, monitor=self.lm))
1821 self.assertEqual(len(self.lm._locks), len(locks))
1822 result = objects.QueryResponse.FromDict(self.lm.QueryLocks(["name"]))
1823 self.assertEqual(len(result.fields), 1)
1824 self.assertEqual(len(result.data), 100)
1829 # The garbage collector might needs some time
1832 raise utils.RetryAgain()
1834 utils.Retry(_CheckLocks, 0.1, 30.0)
1836 self.assertFalse(self.lm._locks)
1838 def testMultiThread(self):
1841 def _CreateLock(prev, next, name):
1843 locks.append(locking.SharedLock(name, monitor=self.lm))
1849 first = threading.Event()
1852 # Use a deterministic random generator
1853 for i in random.Random(4263).sample(range(100), 33):
1854 name = "MtTestLock%s" % i
1855 expnames.append(name)
1857 ev = threading.Event()
1858 self._addThread(target=_CreateLock, args=(prev, ev, name))
1865 # Check order in which locks were added
1866 self.assertEqual([i.name for i in locks], expnames)
1868 # Check query result
1869 result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1870 self.assert_(isinstance(result, dict))
1871 response = objects.QueryResponse.FromDict(result)
1872 self.assertEqual(response.data,
1873 [[(constants.RS_NORMAL, name),
1874 (constants.RS_NORMAL, None),
1875 (constants.RS_NORMAL, None),
1876 (constants.RS_NORMAL, [])]
1877 for name in utils.NiceSort(expnames)])
1878 self.assertEqual(len(response.fields), 4)
1879 self.assertEqual(["name", "mode", "owner", "pending"],
1880 [fdef.name for fdef in response.fields])
1882 # Test exclusive acquire
1883 for tlock in locks[::4]:
1884 tlock.acquire(shared=0)
1886 def _GetExpResult(name):
1887 if tlock.name == name:
1888 return [(constants.RS_NORMAL, name),
1889 (constants.RS_NORMAL, "exclusive"),
1890 (constants.RS_NORMAL,
1891 [threading.currentThread().getName()]),
1892 (constants.RS_NORMAL, [])]
1893 return [(constants.RS_NORMAL, name),
1894 (constants.RS_NORMAL, None),
1895 (constants.RS_NORMAL, None),
1896 (constants.RS_NORMAL, [])]
1898 result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
1899 self.assertEqual(objects.QueryResponse.FromDict(result).data,
1900 [_GetExpResult(name)
1901 for name in utils.NiceSort(expnames)])
1905 # Test shared acquire
1906 def _Acquire(lock, shared, ev, notify):
1907 lock.acquire(shared=shared)
1914 for tlock1 in locks[::11]:
1915 for tlock2 in locks[::-15]:
1916 if tlock2 == tlock1:
1920 for tlock3 in locks[::10]:
1921 if tlock3 in (tlock2, tlock1):
1925 releaseev = threading.Event()
1931 ev = threading.Event()
1932 tthreads1.append(self._addThread(target=_Acquire,
1933 args=(tlock1, 1, releaseev, ev)))
1934 acquireev.append(ev)
1936 ev = threading.Event()
1937 tthread2 = self._addThread(target=_Acquire,
1938 args=(tlock2, 1, releaseev, ev))
1939 acquireev.append(ev)
1941 ev = threading.Event()
1942 tthread3 = self._addThread(target=_Acquire,
1943 args=(tlock3, 0, releaseev, ev))
1944 acquireev.append(ev)
1946 # Wait for all locks to be acquired
1950 # Check query result
1951 result = self.lm.QueryLocks(["name", "mode", "owner"])
1952 response = objects.QueryResponse.FromDict(result)
1953 for (name, mode, owner) in response.data:
1954 (name_status, name_value) = name
1955 (owner_status, owner_value) = owner
1957 self.assertEqual(name_status, constants.RS_NORMAL)
1958 self.assertEqual(owner_status, constants.RS_NORMAL)
1960 if name_value == tlock1.name:
1961 self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
1962 self.assertEqual(set(owner_value),
1963 set(i.getName() for i in tthreads1))
1966 if name_value == tlock2.name:
1967 self.assertEqual(mode, (constants.RS_NORMAL, "shared"))
1968 self.assertEqual(owner_value, [tthread2.getName()])
1971 if name_value == tlock3.name:
1972 self.assertEqual(mode, (constants.RS_NORMAL, "exclusive"))
1973 self.assertEqual(owner_value, [tthread3.getName()])
1976 self.assert_(name_value in expnames)
1977 self.assertEqual(mode, (constants.RS_NORMAL, None))
1978 self.assert_(owner_value is None)
1980 # Release locks again
1985 result = self.lm.QueryLocks(["name", "mode", "owner"])
1986 self.assertEqual(objects.QueryResponse.FromDict(result).data,
1987 [[(constants.RS_NORMAL, name),
1988 (constants.RS_NORMAL, None),
1989 (constants.RS_NORMAL, None)]
1990 for name in utils.NiceSort(expnames)])
1992 def testDelete(self):
1993 lock = locking.SharedLock("TestLock", monitor=self.lm)
1995 self.assertEqual(len(self.lm._locks), 1)
1996 result = self.lm.QueryLocks(["name", "mode", "owner"])
1997 self.assertEqual(objects.QueryResponse.FromDict(result).data,
1998 [[(constants.RS_NORMAL, lock.name),
1999 (constants.RS_NORMAL, None),
2000 (constants.RS_NORMAL, None)]])
2004 result = self.lm.QueryLocks(["name", "mode", "owner"])
2005 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2006 [[(constants.RS_NORMAL, lock.name),
2007 (constants.RS_NORMAL, "deleted"),
2008 (constants.RS_NORMAL, None)]])
2009 self.assertEqual(len(self.lm._locks), 1)
2011 def testPending(self):
2012 def _Acquire(lock, shared, prev, next):
2015 lock.acquire(shared=shared, test_notify=next.set)
2021 lock = locking.SharedLock("ExcLock", monitor=self.lm)
2023 for shared in [0, 1]:
2026 self.assertEqual(len(self.lm._locks), 1)
2027 result = self.lm.QueryLocks(["name", "mode", "owner"])
2028 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2029 [[(constants.RS_NORMAL, lock.name),
2030 (constants.RS_NORMAL, "exclusive"),
2031 (constants.RS_NORMAL,
2032 [threading.currentThread().getName()])]])
2036 first = threading.Event()
2040 ev = threading.Event()
2041 threads.append(self._addThread(target=_Acquire,
2042 args=(lock, shared, prev, ev)))
2048 # Wait for last acquire to start waiting
2051 # NOTE: This works only because QueryLocks will acquire the
2052 # lock-internal lock again and won't be able to get the information
2053 # until it has the lock. By then the acquire should be registered in
2054 # SharedLock.__pending (otherwise it's a bug).
2056 # All acquires are waiting now
2058 pending = [("shared", utils.NiceSort(t.getName() for t in threads))]
2060 pending = [("exclusive", [t.getName()]) for t in threads]
2062 result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2063 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2064 [[(constants.RS_NORMAL, lock.name),
2065 (constants.RS_NORMAL, "exclusive"),
2066 (constants.RS_NORMAL,
2067 [threading.currentThread().getName()]),
2068 (constants.RS_NORMAL, pending)]])
2070 self.assertEqual(len(self.lm._locks), 1)
2076 # No pending acquires
2077 result = self.lm.QueryLocks(["name", "mode", "owner", "pending"])
2078 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2079 [[(constants.RS_NORMAL, lock.name),
2080 (constants.RS_NORMAL, None),
2081 (constants.RS_NORMAL, None),
2082 (constants.RS_NORMAL, [])]])
2084 self.assertEqual(len(self.lm._locks), 1)
2086 def testDeleteAndRecreate(self):
2087 lname = "TestLock101923193"
2089 # Create some locks with the same name and keep all references
2090 locks = [locking.SharedLock(lname, monitor=self.lm)
2093 self.assertEqual(len(self.lm._locks), len(locks))
2095 result = self.lm.QueryLocks(["name", "mode", "owner"])
2096 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2097 [[(constants.RS_NORMAL, lname),
2098 (constants.RS_NORMAL, None),
2099 (constants.RS_NORMAL, None)]] * 5)
2103 # Check information order
2104 result = self.lm.QueryLocks(["name", "mode", "owner"])
2105 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2106 [[(constants.RS_NORMAL, lname),
2107 (constants.RS_NORMAL, None),
2108 (constants.RS_NORMAL, None)]] * 2 +
2109 [[(constants.RS_NORMAL, lname),
2110 (constants.RS_NORMAL, "deleted"),
2111 (constants.RS_NORMAL, None)]] +
2112 [[(constants.RS_NORMAL, lname),
2113 (constants.RS_NORMAL, None),
2114 (constants.RS_NORMAL, None)]] * 2)
2116 locks[1].acquire(shared=0)
2119 [(constants.RS_NORMAL, lname),
2120 (constants.RS_NORMAL, None),
2121 (constants.RS_NORMAL, None)],
2122 [(constants.RS_NORMAL, lname),
2123 (constants.RS_NORMAL, "exclusive"),
2124 (constants.RS_NORMAL, [threading.currentThread().getName()])],
2125 [(constants.RS_NORMAL, lname),
2126 (constants.RS_NORMAL, "deleted"),
2127 (constants.RS_NORMAL, None)],
2128 [(constants.RS_NORMAL, lname),
2129 (constants.RS_NORMAL, None),
2130 (constants.RS_NORMAL, None)],
2131 [(constants.RS_NORMAL, lname),
2132 (constants.RS_NORMAL, None),
2133 (constants.RS_NORMAL, None)],
2136 # Check information order
2137 result = self.lm.QueryLocks(["name", "mode", "owner"])
2138 self.assertEqual(objects.QueryResponse.FromDict(result).data, last_status)
2140 self.assertEqual(len(set(self.lm._locks.values())), len(locks))
2141 self.assertEqual(len(self.lm._locks), len(locks))
2143 # Check lock deletion
2144 for idx in range(len(locks)):
2146 assert gc.isenabled()
2148 self.assertEqual(len(self.lm._locks), len(locks))
2149 result = self.lm.QueryLocks(["name", "mode", "owner"])
2150 self.assertEqual(objects.QueryResponse.FromDict(result).data,
2151 last_status[idx + 1:])
2153 # All locks should have been deleted
2155 self.assertFalse(self.lm._locks)
2157 result = self.lm.QueryLocks(["name", "mode", "owner"])
2158 self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
2164 def AddResult(self, *args):
2165 self._info.append(args)
2167 def CountPending(self):
2168 return len(self._info)
2170 def GetLockInfo(self, requested):
2171 (exp_requested, result) = self._info.pop(0)
2173 if exp_requested != requested:
2174 raise Exception("Requested information (%s) does not match"
2175 " expectations (%s)" % (requested, exp_requested))
2179 def testMultipleResults(self):
2180 fl1 = self._FakeLock()
2181 fl2 = self._FakeLock()
2183 self.lm.RegisterLock(fl1)
2184 self.lm.RegisterLock(fl2)
2187 for i in [fl1, fl2]:
2188 i.AddResult(set([query.LQ_MODE, query.LQ_OWNER]), [])
2189 result = self.lm.QueryLocks(["name", "mode", "owner"])
2190 self.assertEqual(objects.QueryResponse.FromDict(result).data, [])
2191 for i in [fl1, fl2]:
2192 self.assertEqual(i.CountPending(), 0)
2195 for fn in [lambda x: x, reversed, sorted]:
2196 fl1.AddResult(set(), list(fn([
2197 ("aaa", None, None, None),
2198 ("bbb", None, None, None),
2200 fl2.AddResult(set(), [])
2201 result = self.lm.QueryLocks(["name"])
2202 self.assertEqual(objects.QueryResponse.FromDict(result).data, [
2203 [(constants.RS_NORMAL, "aaa")],
2204 [(constants.RS_NORMAL, "bbb")],
2206 for i in [fl1, fl2]:
2207 self.assertEqual(i.CountPending(), 0)
2209 for fn2 in [lambda x: x, reversed, sorted]:
2210 fl1.AddResult(set([query.LQ_MODE]), list(fn([
2211 # Same name, but different information
2212 ("aaa", "mode0", None, None),
2213 ("aaa", "mode1", None, None),
2214 ("aaa", "mode2", None, None),
2215 ("aaa", "mode3", None, None),
2217 fl2.AddResult(set([query.LQ_MODE]), [
2218 ("zzz", "end", None, None),
2219 ("000", "start", None, None),
2221 ("aaa", "b200", None, None),
2222 ("aaa", "b300", None, None),
2224 result = self.lm.QueryLocks(["name", "mode"])
2225 self.assertEqual(objects.QueryResponse.FromDict(result).data, [
2226 [(constants.RS_NORMAL, "000"), (constants.RS_NORMAL, "start")],
2228 # Name is the same, so order must be equal to incoming order
2229 [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode0")],
2230 [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode1")],
2231 [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode2")],
2232 [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "mode3")],
2234 [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b200")],
2235 [(constants.RS_NORMAL, "aaa"), (constants.RS_NORMAL, "b300")],
2237 [(constants.RS_NORMAL, "zzz"), (constants.RS_NORMAL, "end")],
2239 for i in [fl1, fl2]:
2240 self.assertEqual(i.CountPending(), 0)
2243 if __name__ == '__main__':
2244 testutils.GanetiTestProgram()